From 359d2648f2ed555e79166f16747be01541dd5a96 Mon Sep 17 00:00:00 2001 From: Lokesh Jain Date: Thu, 1 Feb 2024 19:37:25 +0530 Subject: [PATCH] Updated consumer --- .vscode/settings.json | 7 +++ kafka-client/confluent/consumer.go | 27 ++++---- kafka-client/consumer_test.go | 99 +++++++++++++----------------- kafka-client/franz/consumer.go | 24 ++++---- kafka-client/goka/consumer.go | 19 +++--- kafka-client/sarama/consumer.go | 36 +++++------ kafka-client/segmentio/consumer.go | 27 ++++---- 7 files changed, 110 insertions(+), 129 deletions(-) create mode 100644 .vscode/settings.json diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..1b2304a --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,7 @@ +{ + "cSpell.words": [ + "goka", + "kafkaclient", + "segmentio" + ] +} \ No newline at end of file diff --git a/kafka-client/confluent/consumer.go b/kafka-client/confluent/consumer.go index 6e2fd8b..eeebf7d 100644 --- a/kafka-client/confluent/consumer.go +++ b/kafka-client/confluent/consumer.go @@ -2,7 +2,6 @@ package confluent import ( "fmt" - "sync" "time" "github.com/confluentinc/confluent-kafka-go/kafka" @@ -16,7 +15,7 @@ type Consumer struct { Done chan bool } -func (c *Consumer) Start(wg *sync.WaitGroup) { +func (c *Consumer) Start() { c.Message = make(chan interface{}, 1) c.Done = make(chan bool, 1) @@ -33,24 +32,26 @@ func (c *Consumer) Start(wg *sync.WaitGroup) { consumer, err := kafka.NewConsumer(config) if err != nil { fmt.Printf("Failed to create consumer: %v\n", err) - wg.Done() return } consumer.SubscribeTopics([]string{c.Topic}, nil) - wg.Done() + go func() { + run := true - run := true - - for run { - select { - case <-c.Done: - run = false - default: - run = c.start(consumer) + for run { + select { + case <-c.Done: + run = false + default: + run = c.start(consumer) + } } - } + + consumer.Close() + }() + } func (c *Consumer) start(consumer *kafka.Consumer) bool { diff --git a/kafka-client/consumer_test.go b/kafka-client/consumer_test.go index 97859d0..8e0c34a 100644 --- a/kafka-client/consumer_test.go +++ b/kafka-client/consumer_test.go @@ -1,7 +1,6 @@ package kafkaclient import ( - "sync" "testing" "github.com/lkumarjain/benchmark/kafka-client/confluent" @@ -15,7 +14,7 @@ func BenchmarkConsumer(b *testing.B) { for _, tt := range tests { benchmarkConfluentConsumer(b, tt.name) benchmarkFranzConsumer(b, tt.name) - benchmarkGokaConsumer(b, tt.name) + // benchmarkGokaConsumer(b, tt.name) benchmarkSaramaConsumer(b, tt.name) benchmarkSegmentioConsumer(b, tt.name) } @@ -25,36 +24,33 @@ func benchmarkConfluentConsumer(b *testing.B, prefix string) { topicName := topicName(prefix) consumer := confluent.Consumer{Servers: bootstrapServers, EnableEvents: false, Topic: topicName} - wg := &sync.WaitGroup{} - wg.Add(1) - go consumer.Start(wg) - wg.Wait() - b.Run(testName(prefix, "Confluent@ConsumePoll"), func(b *testing.B) { + consumer.Start() + <-consumer.Message // Added this to wait till Consumer gets ready + b.ResetTimer() for i := 0; i < b.N; i++ { <-consumer.Message } + b.StopTimer() + close(consumer.Done) }) - close(consumer.Done) - consumer.EnableEvents = true - wg.Add(1) - go consumer.Start(wg) - wg.Wait() - b.Run(testName(prefix, "Confluent@ConsumeEvent"), func(b *testing.B) { + consumer.Start() + <-consumer.Message // Added this to wait till Consumer gets ready + b.ResetTimer() for i := 0; i < b.N; i++ { <-consumer.Message } + b.StopTimer() + close(consumer.Done) }) - - close(consumer.Done) } func benchmarkFranzConsumer(b *testing.B, prefix string) { @@ -62,36 +58,33 @@ func benchmarkFranzConsumer(b *testing.B, prefix string) { consumer := franz.Consumer{Servers: bootstrapServers, EnablePartition: false, Topic: topicName} - wg := &sync.WaitGroup{} - wg.Add(1) - go consumer.Start(wg) - wg.Wait() - b.Run(testName(prefix, "Franz@ConsumeRecord"), func(b *testing.B) { + consumer.Start() + <-consumer.Message // Added this to wait till Consumer gets ready + b.ResetTimer() for i := 0; i < b.N; i++ { <-consumer.Message } + b.StopTimer() + close(consumer.Done) }) - close(consumer.Done) - consumer.EnablePartition = true - wg.Add(1) - go consumer.Start(wg) - wg.Wait() - b.Run(testName(prefix, "Franz@ConsumePartition"), func(b *testing.B) { + consumer.Start() + <-consumer.Message // Added this to wait till Consumer gets ready + b.ResetTimer() for i := 0; i < b.N; i++ { <-consumer.Message } + b.StopTimer() + close(consumer.Done) }) - - close(consumer.Done) } func benchmarkGokaConsumer(b *testing.B, prefix string) { @@ -99,20 +92,18 @@ func benchmarkGokaConsumer(b *testing.B, prefix string) { consumer := goka.Consumer{Servers: bootstrapServers, Topic: topicName} - wg := &sync.WaitGroup{} - wg.Add(1) - go consumer.Start(wg) - wg.Wait() - b.Run(testName(prefix, "Goka@Consumer"), func(b *testing.B) { + consumer.Start() + <-consumer.Message // Added this to wait till Consumer gets ready + b.ResetTimer() for i := 0; i < b.N; i++ { <-consumer.Message } + b.StopTimer() + close(consumer.Done) }) - - close(consumer.Done) } func benchmarkSaramaConsumer(b *testing.B, prefix string) { @@ -120,38 +111,34 @@ func benchmarkSaramaConsumer(b *testing.B, prefix string) { consumer := sarama.Consumer{Servers: bootstrapServers, Topic: topicName, EnablePartition: false} - wg := &sync.WaitGroup{} - wg.Add(1) - go consumer.Start(wg) - wg.Wait() - - <-consumer.Message // Added this to wait till Consumer gets ready - b.Run(testName(prefix, "Sarama@ConsumerGroup"), func(b *testing.B) { + consumer.Start() + <-consumer.Message // Added this to wait till Consumer gets ready + b.ResetTimer() for i := 0; i < b.N; i++ { <-consumer.Message } + b.StopTimer() + close(consumer.Done) }) - close(consumer.Done) - consumer.EnablePartition = true - wg.Add(1) - go consumer.Start(wg) - wg.Wait() - b.Run(testName(prefix, "Sarama@ConsumePartition"), func(b *testing.B) { + consumer.Start() + <-consumer.Message // Added this to wait till Consumer gets ready + b.ResetTimer() for i := 0; i < b.N; i++ { <-consumer.Message } + + close(consumer.Done) b.StopTimer() }) - close(consumer.Done) } func benchmarkSegmentioConsumer(b *testing.B, prefix string) { @@ -159,20 +146,16 @@ func benchmarkSegmentioConsumer(b *testing.B, prefix string) { consumer := segmentio.Consumer{Servers: bootstrapServers, Topic: topicName, EnablePartition: false} - wg := &sync.WaitGroup{} - wg.Add(1) - go consumer.Start(wg) - wg.Wait() - - <-consumer.Message - b.Run(testName(prefix, "Segmentio@ConsumerFetch"), func(b *testing.B) { + consumer.Start() + <-consumer.Message // Added this to wait till Consumer gets ready + b.ResetTimer() for i := 0; i < b.N; i++ { <-consumer.Message } + + close(consumer.Done) b.StopTimer() }) - - close(consumer.Done) } diff --git a/kafka-client/franz/consumer.go b/kafka-client/franz/consumer.go index e2286df..500e5f1 100644 --- a/kafka-client/franz/consumer.go +++ b/kafka-client/franz/consumer.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "strings" - "sync" "time" "github.com/twmb/franz-go/pkg/kgo" @@ -18,7 +17,7 @@ type Consumer struct { Done chan bool } -func (c *Consumer) Start(wg *sync.WaitGroup) { +func (c *Consumer) Start() { c.Message = make(chan interface{}, 1) c.Done = make(chan bool, 1) @@ -33,22 +32,21 @@ func (c *Consumer) Start(wg *sync.WaitGroup) { if err != nil { fmt.Printf("Failed to create consumer: %v\n", err) - wg.Done() return } - wg.Done() + go func() { + run := true - run := true - - for run { - select { - case <-c.Done: - run = false - default: - run = c.start(consumer) + for run { + select { + case <-c.Done: + run = false + default: + run = c.start(consumer) + } } - } + }() } func (c *Consumer) start(consumer *kgo.Client) bool { diff --git a/kafka-client/goka/consumer.go b/kafka-client/goka/consumer.go index b9fa251..9e4de16 100644 --- a/kafka-client/goka/consumer.go +++ b/kafka-client/goka/consumer.go @@ -6,7 +6,6 @@ import ( "io" "log" "strings" - "sync" "time" "github.com/IBM/sarama" @@ -21,7 +20,7 @@ type Consumer struct { Done chan bool } -func (c *Consumer) Start(wg *sync.WaitGroup) { +func (c *Consumer) Start() { c.Message = make(chan interface{}, 1) c.Done = make(chan bool, 1) @@ -40,7 +39,7 @@ func (c *Consumer) Start(wg *sync.WaitGroup) { config := goka.NewTopicManagerConfig() config.Table.Replication = 1 - config.CreateTopicTimeout = time.Second * 10 + config.CreateTopicTimeout = time.Minute log := log.New(io.Discard, "", log.LstdFlags) @@ -48,16 +47,16 @@ func (c *Consumer) Start(wg *sync.WaitGroup) { goka.WithTopicManagerBuilder(goka.TopicManagerBuilderWithTopicManagerConfig(config)), goka.WithLogger(log)) if err != nil { - log.Fatalf("error creating processor: %v", err) - wg.Done() + log.Panicf("error creating processor: %v", err) return } - wg.Done() - ctx, cancel := context.WithCancel(context.Background()) - go p.Run(ctx) - <-c.Done - cancel() + go func() { + ctx, cancel := context.WithCancel(context.Background()) + go p.Run(ctx) + <-c.Done + cancel() + }() } func (c *Consumer) handler(ctx goka.Context, msg interface{}) { diff --git a/kafka-client/sarama/consumer.go b/kafka-client/sarama/consumer.go index bc6fa7a..b9fda08 100644 --- a/kafka-client/sarama/consumer.go +++ b/kafka-client/sarama/consumer.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "strings" - "sync" "time" "github.com/IBM/sarama" @@ -18,7 +17,7 @@ type Consumer struct { Done chan bool } -func (c *Consumer) Start(wg *sync.WaitGroup) { +func (c *Consumer) Start() { c.Message = make(chan interface{}, 1) c.Done = make(chan bool, 1) @@ -28,25 +27,23 @@ func (c *Consumer) Start(wg *sync.WaitGroup) { cfg.Consumer.Offsets.Initial = sarama.OffsetOldest if c.EnablePartition { - c.consumePartition(wg, cfg) + c.consumePartition(cfg) } else { - c.consumeGroup(wg, cfg) + c.consumeGroup(cfg) } } -func (c *Consumer) consumePartition(wg *sync.WaitGroup, cfg *sarama.Config) { +func (c *Consumer) consumePartition(cfg *sarama.Config) { brokers := strings.Split(c.Servers, ",") consumer, err := sarama.NewConsumer(brokers, cfg) if err != nil { fmt.Printf("Failed to create consumer: %v\n", err) - wg.Done() return } partitions, err := consumer.Partitions(c.Topic) if err != nil { fmt.Printf("Failed to create consumer: %v\n", err) - wg.Done() return } @@ -54,7 +51,6 @@ func (c *Consumer) consumePartition(wg *sync.WaitGroup, cfg *sarama.Config) { pc, err := consumer.ConsumePartition(c.Topic, partition, sarama.OffsetOldest) if err != nil { fmt.Printf("Failed to create consumer: %v\n", err) - wg.Done() return } @@ -69,34 +65,32 @@ func (c *Consumer) consumePartition(wg *sync.WaitGroup, cfg *sarama.Config) { } }(pc) } - - wg.Done() } -func (c *Consumer) consumeGroup(wg *sync.WaitGroup, cfg *sarama.Config) { +func (c *Consumer) consumeGroup(cfg *sarama.Config) { brokers := strings.Split(c.Servers, ",") group, err := sarama.NewConsumerGroup(brokers, fmt.Sprintf("sarama-consumer-group-%d", time.Now().UnixNano()), cfg) if err != nil { fmt.Printf("Failed to create consumer: %v\n", err) - wg.Done() return } handler := handler{message: c.Message, done: c.Done} - wg.Done() - run := true + go func() { + run := true - for run { - select { - case <-c.Done: - run = false - default: - group.Consume(context.Background(), []string{c.Topic}, handler) + for run { + select { + case <-c.Done: + run = false + default: + group.Consume(context.Background(), []string{c.Topic}, handler) + } } - } + }() } type handler struct { diff --git a/kafka-client/segmentio/consumer.go b/kafka-client/segmentio/consumer.go index 67164bc..1cfd351 100644 --- a/kafka-client/segmentio/consumer.go +++ b/kafka-client/segmentio/consumer.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "strings" - "sync" "time" kafka "github.com/segmentio/kafka-go" @@ -18,7 +17,7 @@ type Consumer struct { Done chan bool } -func (c *Consumer) Start(wg *sync.WaitGroup) { +func (c *Consumer) Start() { c.Message = make(chan interface{}, 1) c.Done = make(chan bool, 1) @@ -28,17 +27,17 @@ func (c *Consumer) Start(wg *sync.WaitGroup) { GroupID: fmt.Sprintf("segmentio-consumer-group-%d", time.Now().UnixNano()), }) - wg.Done() - - run := true - - for run { - select { - case <-c.Done: - run = false - default: - msg, _ := reader.FetchMessage(context.Background()) - c.Message <- msg + go func() { + run := true + + for run { + select { + case <-c.Done: + run = false + default: + msg, _ := reader.FetchMessage(context.Background()) + c.Message <- msg + } } - } + }() }