Skip to content

Commit

Permalink
Updated consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
lkumarjain committed Feb 1, 2024
1 parent 6350ceb commit 359d264
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 129 deletions.
7 changes: 7 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"cSpell.words": [
"goka",
"kafkaclient",
"segmentio"
]
}
27 changes: 14 additions & 13 deletions kafka-client/confluent/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package confluent

import (
"fmt"
"sync"
"time"

"github.com/confluentinc/confluent-kafka-go/kafka"
Expand All @@ -16,7 +15,7 @@ type Consumer struct {
Done chan bool
}

func (c *Consumer) Start(wg *sync.WaitGroup) {
func (c *Consumer) Start() {
c.Message = make(chan interface{}, 1)
c.Done = make(chan bool, 1)

Expand All @@ -33,24 +32,26 @@ func (c *Consumer) Start(wg *sync.WaitGroup) {
consumer, err := kafka.NewConsumer(config)
if err != nil {
fmt.Printf("Failed to create consumer: %v\n", err)
wg.Done()
return
}

consumer.SubscribeTopics([]string{c.Topic}, nil)

wg.Done()
go func() {
run := true

run := true

for run {
select {
case <-c.Done:
run = false
default:
run = c.start(consumer)
for run {
select {
case <-c.Done:
run = false
default:
run = c.start(consumer)
}
}
}

consumer.Close()
}()

}

func (c *Consumer) start(consumer *kafka.Consumer) bool {
Expand Down
99 changes: 41 additions & 58 deletions kafka-client/consumer_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package kafkaclient

import (
"sync"
"testing"

"github.com/lkumarjain/benchmark/kafka-client/confluent"
Expand All @@ -15,7 +14,7 @@ func BenchmarkConsumer(b *testing.B) {
for _, tt := range tests {
benchmarkConfluentConsumer(b, tt.name)
benchmarkFranzConsumer(b, tt.name)
benchmarkGokaConsumer(b, tt.name)
// benchmarkGokaConsumer(b, tt.name)
benchmarkSaramaConsumer(b, tt.name)
benchmarkSegmentioConsumer(b, tt.name)
}
Expand All @@ -25,154 +24,138 @@ func benchmarkConfluentConsumer(b *testing.B, prefix string) {
topicName := topicName(prefix)
consumer := confluent.Consumer{Servers: bootstrapServers, EnableEvents: false, Topic: topicName}

wg := &sync.WaitGroup{}
wg.Add(1)
go consumer.Start(wg)
wg.Wait()

b.Run(testName(prefix, "Confluent@ConsumePoll"), func(b *testing.B) {
consumer.Start()
<-consumer.Message // Added this to wait till Consumer gets ready

b.ResetTimer()
for i := 0; i < b.N; i++ {
<-consumer.Message
}

b.StopTimer()
close(consumer.Done)
})

close(consumer.Done)

consumer.EnableEvents = true

wg.Add(1)
go consumer.Start(wg)
wg.Wait()

b.Run(testName(prefix, "Confluent@ConsumeEvent"), func(b *testing.B) {
consumer.Start()
<-consumer.Message // Added this to wait till Consumer gets ready

b.ResetTimer()
for i := 0; i < b.N; i++ {
<-consumer.Message
}

b.StopTimer()
close(consumer.Done)
})

close(consumer.Done)
}

func benchmarkFranzConsumer(b *testing.B, prefix string) {
topicName := topicName(prefix)

consumer := franz.Consumer{Servers: bootstrapServers, EnablePartition: false, Topic: topicName}

wg := &sync.WaitGroup{}
wg.Add(1)
go consumer.Start(wg)
wg.Wait()

b.Run(testName(prefix, "Franz@ConsumeRecord"), func(b *testing.B) {
consumer.Start()
<-consumer.Message // Added this to wait till Consumer gets ready

b.ResetTimer()
for i := 0; i < b.N; i++ {
<-consumer.Message
}

b.StopTimer()
close(consumer.Done)
})

close(consumer.Done)

consumer.EnablePartition = true

wg.Add(1)
go consumer.Start(wg)
wg.Wait()

b.Run(testName(prefix, "Franz@ConsumePartition"), func(b *testing.B) {
consumer.Start()
<-consumer.Message // Added this to wait till Consumer gets ready

b.ResetTimer()
for i := 0; i < b.N; i++ {
<-consumer.Message
}

b.StopTimer()
close(consumer.Done)
})

close(consumer.Done)
}

func benchmarkGokaConsumer(b *testing.B, prefix string) {
topicName := topicName(prefix)

consumer := goka.Consumer{Servers: bootstrapServers, Topic: topicName}

wg := &sync.WaitGroup{}
wg.Add(1)
go consumer.Start(wg)
wg.Wait()

b.Run(testName(prefix, "Goka@Consumer"), func(b *testing.B) {
consumer.Start()
<-consumer.Message // Added this to wait till Consumer gets ready

b.ResetTimer()
for i := 0; i < b.N; i++ {
<-consumer.Message
}

b.StopTimer()
close(consumer.Done)
})

close(consumer.Done)
}

func benchmarkSaramaConsumer(b *testing.B, prefix string) {
topicName := topicName(prefix)

consumer := sarama.Consumer{Servers: bootstrapServers, Topic: topicName, EnablePartition: false}

wg := &sync.WaitGroup{}
wg.Add(1)
go consumer.Start(wg)
wg.Wait()

<-consumer.Message // Added this to wait till Consumer gets ready

b.Run(testName(prefix, "Sarama@ConsumerGroup"), func(b *testing.B) {
consumer.Start()
<-consumer.Message // Added this to wait till Consumer gets ready

b.ResetTimer()
for i := 0; i < b.N; i++ {
<-consumer.Message
}

b.StopTimer()
close(consumer.Done)
})

close(consumer.Done)

consumer.EnablePartition = true

wg.Add(1)
go consumer.Start(wg)
wg.Wait()

b.Run(testName(prefix, "Sarama@ConsumePartition"), func(b *testing.B) {
consumer.Start()
<-consumer.Message // Added this to wait till Consumer gets ready

b.ResetTimer()
for i := 0; i < b.N; i++ {
<-consumer.Message
}

close(consumer.Done)
b.StopTimer()
})

close(consumer.Done)
}

func benchmarkSegmentioConsumer(b *testing.B, prefix string) {
topicName := topicName(prefix)

consumer := segmentio.Consumer{Servers: bootstrapServers, Topic: topicName, EnablePartition: false}

wg := &sync.WaitGroup{}
wg.Add(1)
go consumer.Start(wg)
wg.Wait()

<-consumer.Message

b.Run(testName(prefix, "Segmentio@ConsumerFetch"), func(b *testing.B) {
consumer.Start()
<-consumer.Message // Added this to wait till Consumer gets ready

b.ResetTimer()
for i := 0; i < b.N; i++ {
<-consumer.Message
}

close(consumer.Done)
b.StopTimer()
})

close(consumer.Done)
}
24 changes: 11 additions & 13 deletions kafka-client/franz/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"strings"
"sync"
"time"

"github.com/twmb/franz-go/pkg/kgo"
Expand All @@ -18,7 +17,7 @@ type Consumer struct {
Done chan bool
}

func (c *Consumer) Start(wg *sync.WaitGroup) {
func (c *Consumer) Start() {
c.Message = make(chan interface{}, 1)
c.Done = make(chan bool, 1)

Expand All @@ -33,22 +32,21 @@ func (c *Consumer) Start(wg *sync.WaitGroup) {

if err != nil {
fmt.Printf("Failed to create consumer: %v\n", err)
wg.Done()
return
}

wg.Done()
go func() {
run := true

run := true

for run {
select {
case <-c.Done:
run = false
default:
run = c.start(consumer)
for run {
select {
case <-c.Done:
run = false
default:
run = c.start(consumer)
}
}
}
}()
}

func (c *Consumer) start(consumer *kgo.Client) bool {
Expand Down
19 changes: 9 additions & 10 deletions kafka-client/goka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"io"
"log"
"strings"
"sync"
"time"

"github.com/IBM/sarama"
Expand All @@ -21,7 +20,7 @@ type Consumer struct {
Done chan bool
}

func (c *Consumer) Start(wg *sync.WaitGroup) {
func (c *Consumer) Start() {
c.Message = make(chan interface{}, 1)
c.Done = make(chan bool, 1)

Expand All @@ -40,24 +39,24 @@ func (c *Consumer) Start(wg *sync.WaitGroup) {

config := goka.NewTopicManagerConfig()
config.Table.Replication = 1
config.CreateTopicTimeout = time.Second * 10
config.CreateTopicTimeout = time.Minute

log := log.New(io.Discard, "", log.LstdFlags)

p, err := goka.NewProcessor(brokers, g,
goka.WithTopicManagerBuilder(goka.TopicManagerBuilderWithTopicManagerConfig(config)),
goka.WithLogger(log))
if err != nil {
log.Fatalf("error creating processor: %v", err)
wg.Done()
log.Panicf("error creating processor: %v", err)
return
}

wg.Done()
ctx, cancel := context.WithCancel(context.Background())
go p.Run(ctx)
<-c.Done
cancel()
go func() {
ctx, cancel := context.WithCancel(context.Background())
go p.Run(ctx)
<-c.Done
cancel()
}()
}

func (c *Consumer) handler(ctx goka.Context, msg interface{}) {
Expand Down
Loading

0 comments on commit 359d264

Please sign in to comment.