Skip to content

Commit

Permalink
Merge pull request #39 from tidepool-org/fault-tolerant
Browse files Browse the repository at this point in the history
Add fault tolerant kafka consumer
  • Loading branch information
toddkazakov authored Apr 8, 2021
2 parents 4cd2435 + 1777bdc commit 5daac3a
Show file tree
Hide file tree
Showing 18 changed files with 1,137 additions and 70 deletions.
66 changes: 60 additions & 6 deletions events/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,32 @@ package events

import (
"context"
"fmt"
"github.com/Shopify/sarama"
"github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/binding"
"github.com/tidepool-org/go-common/errors"
"log"
"sync"
)

var ErrConsumerStopped = errors.New("consumer has been stopped")

type EventConsumer interface {
RegisterHandler(handler EventHandler)
Start() error
Stop() error
}

type SaramaConsumer struct {
config *CloudEventsConfig
consumerGroup sarama.ConsumerGroup
ready chan bool
stop chan struct{}
stopOnce *sync.Once
topic string
wg *sync.WaitGroup
handlers []EventHandler
deadLetterProducer *KafkaCloudEventsProducer
}
Expand All @@ -26,8 +39,11 @@ func NewSaramaCloudEventsConsumer(config *CloudEventsConfig) (EventConsumer, err

return &SaramaConsumer{
config: config,
ready: make(chan bool),
topic: config.GetPrefixedTopic(),
wg: &sync.WaitGroup{},
ready: make(chan bool, 1),
stop: make(chan struct{}),
stopOnce: &sync.Once{},
handlers: make([]EventHandler, 0),
}, nil
}
Expand Down Expand Up @@ -80,33 +96,71 @@ func (s *SaramaConsumer) RegisterHandler(handler EventHandler) {
s.handlers = append(s.handlers, handler)
}

func (s *SaramaConsumer) Start(ctx context.Context) error {
func (s *SaramaConsumer) Start() error {
if err := s.initialize(); err != nil {
return err
}

wg := &sync.WaitGroup{}
wg.Add(1)
ctx, cancel := context.WithCancel(context.Background())
go func() {
<-s.stop
cancel()
}()

errChan := make(chan error)
s.wg.Add(1)
go func() {
defer wg.Done()
defer s.wg.Done()
for {
// `Consume` should be called inside an infinite loop, when a
// server-side rebalance happens, the consumer session will need to be
// recreated to get the new claims
if err := s.consumerGroup.Consume(ctx, []string{s.topic}, s); err != nil {
log.Printf("Error from consumer: %v", err)
// It's not clear whether this condition can be true
if err == context.Canceled {
err = ErrConsumerStopped
}
errChan <- err
return
}
// check if context was cancelled, signaling that the consumer should stop
if ctx.Err() != nil {
errChan <- ErrConsumerStopped
return
}
s.ready = make(chan bool)
}
}()

wg.Wait()
err := <-errChan
if err == ErrConsumerStopped {
return err
}

// The consumer group was terminated with an unexpected error.
// We need to call stop, so we cancel the context and stop the
// go routine so it doesn't leak on restart.
if e := s.Stop(); e != nil {
err = fmt.Errorf("%w: %s", err, e.Error())
}

return err
}

func (s *SaramaConsumer) Stop() error {
// Initialization failed
if s.consumerGroup == nil {
return nil
}

// Signal that the consumer group should be terminated
s.stopOnce.Do(func() {
s.stop <- struct{}{}
})

// Wait for the consumer group to exit
s.wg.Wait()
return s.consumerGroup.Close()
}

Expand Down
5 changes: 5 additions & 0 deletions events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ const (
CreateUserEventType = "users:create"
)

type EventHandler interface {
CanHandle(ce cloudevents.Event) bool
Handle(ce cloudevents.Event) error
}

type Event interface {
GetEventType() string
GetEventKey() string
Expand Down
95 changes: 95 additions & 0 deletions events/fault_tolerant_consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package events

import (
"errors"
"github.com/avast/retry-go"
"log"
"sync"
"time"
)

var (
DefaultAttempts = uint(1000)
DefaultDelay = 30 * time.Second
DefaultDelayType = retry.FixedDelay
)

type FaultTolerantConsumer struct {
config *CloudEventsConfig
hanlders []EventHandler
m sync.Mutex
delegate EventConsumer
isShuttingDown bool
attempts uint
delay time.Duration
delayType retry.DelayTypeFunc
}

var _ EventConsumer = &FaultTolerantConsumer{}

func NewFaultTolerantCloudEventsConsumer(config *CloudEventsConfig) (*FaultTolerantConsumer, error) {
return &FaultTolerantConsumer{
config: config,
attempts: DefaultAttempts,
delay: DefaultDelay,
delayType: DefaultDelayType,
}, nil
}

func (f *FaultTolerantConsumer) RegisterHandler(handler EventHandler) {
f.hanlders = append(f.hanlders, handler)
}

func (f *FaultTolerantConsumer) Start() error {
return retry.Do(
f.restart,
retry.Attempts(f.attempts),
retry.Delay(f.delay),
retry.DelayType(f.delayType),
)
}

func (f *FaultTolerantConsumer) restart() error {
if err := f.recreateConsumer(); err != nil {
return err
}

err := f.delegate.Start()
log.Printf("Consumer exited. Reason: %v", err)

if errors.Is(err, ErrConsumerStopped) {
return retry.Unrecoverable(err)
}

return err
}

func (f *FaultTolerantConsumer) recreateConsumer() error {
f.m.Lock()
defer f.m.Unlock()

// Do not try to restart the consumer if we're trying to shut it down
if f.isShuttingDown {
return retry.Unrecoverable(ErrConsumerStopped)
}

delegate, err := NewSaramaCloudEventsConsumer(f.config)
if err != nil {
return retry.Unrecoverable(err)
}

f.delegate = delegate
for _, h := range f.hanlders {
f.RegisterHandler(h)
}
return nil
}

func (f *FaultTolerantConsumer) Stop() error {
f.m.Lock()
defer f.m.Unlock()

f.isShuttingDown = true
err := f.delegate.Stop()
return err
}
64 changes: 0 additions & 64 deletions events/receiver.go

This file was deleted.

1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.14

require (
github.com/Shopify/sarama v1.27.0
github.com/avast/retry-go v3.0.0+incompatible
github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2 v2.2.0
github.com/cloudevents/sdk-go/v2 v2.2.0
github.com/globalsign/mgo v0.0.0-20181015135952-eeefdecb41b8
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ github.com/Shopify/sarama v1.27.0 h1:tqo2zmyzPf1+gwTTwhI6W+EXDw4PVSczynpHKFtVAmo
github.com/Shopify/sarama v1.27.0/go.mod h1:aCdj6ymI8uyPEux1JJ9gcaDT6cinjGhNCAhs54taSUo=
github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc=
github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
github.com/avast/retry-go v3.0.0+incompatible h1:4SOWQ7Qs+oroOTQOYnAHqelpCO0biHSxpiH9JdtuBj0=
github.com/avast/retry-go v3.0.0+incompatible/go.mod h1:XtSnn+n/sHqQIpZ10K1qAevBhOOCWBLXXy3hyiqqBrY=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2 v2.2.0 h1:PB+tnuauZWE2RnfDJnyv8wuLc1tkWk/p8mYiy2xzdvQ=
github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2 v2.2.0/go.mod h1:XbBXL/a5TGNGs5N4UreDCIK7F71MNrXURJeVXury+XY=
Expand Down
21 changes: 21 additions & 0 deletions vendor/github.com/avast/retry-go/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 37 additions & 0 deletions vendor/github.com/avast/retry-go/.godocdown.tmpl

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions vendor/github.com/avast/retry-go/.travis.yml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions vendor/github.com/avast/retry-go/Gopkg.toml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 5daac3a

Please sign in to comment.