From 78995b83e7b6de510c66b4910875fed4e716eda4 Mon Sep 17 00:00:00 2001 From: Lokesh Jain Date: Fri, 12 Jan 2024 18:28:40 +0530 Subject: [PATCH] added segmentio async producer --- kafka-client/producer_test.go | 11 +++++++ kafka-client/segmentio/producer.go | 53 +++++++++++++++++++++++------- 2 files changed, 53 insertions(+), 11 deletions(-) diff --git a/kafka-client/producer_test.go b/kafka-client/producer_test.go index a83b5b5..79bc831 100644 --- a/kafka-client/producer_test.go +++ b/kafka-client/producer_test.go @@ -67,6 +67,17 @@ func benchmarkSegmentioProducer(b *testing.B, prefix string, valueGenerator func for i := 0; i < b.N; i++ { producer.Produce(generateKey(prefix, i), valueGenerator(i)) } + + b.StopTimer() + }) + + b.Run(testName(prefix, "Segmentio@ProduceChannel"), func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + producer.ProduceChannel(generateKey(prefix, i), valueGenerator(i)) + } + + producer.Wait() b.StopTimer() }) } diff --git a/kafka-client/segmentio/producer.go b/kafka-client/segmentio/producer.go index ba0cea9..3b21be8 100644 --- a/kafka-client/segmentio/producer.go +++ b/kafka-client/segmentio/producer.go @@ -3,6 +3,7 @@ package segmentio import ( "context" "strings" + "sync" "time" kafka "github.com/segmentio/kafka-go" @@ -10,25 +11,55 @@ import ( type Producer struct { BootstrapServers []string - writer *kafka.Writer + wg *sync.WaitGroup + syncWriter *kafka.Writer + asyncWriter *kafka.Writer } func NewProducer(bootstrapServers string, topic string) *Producer { brokers := strings.Split(bootstrapServers, ",") - writer := kafka.NewWriter(kafka.WriterConfig{ - Brokers: brokers, - Topic: topic, - Balancer: &kafka.Hash{}, - BatchTimeout: time.Duration(1) * time.Millisecond, - QueueCapacity: 1000, - BatchSize: 1000000, - }) + syncWriter := &kafka.Writer{ + Addr: kafka.TCP(brokers...), + Topic: topic, + Balancer: &kafka.RoundRobin{}, + MaxAttempts: 10, + BatchSize: 100, + BatchTimeout: 10 * time.Millisecond, + } + + asyncWriter := &kafka.Writer{ + Addr: kafka.TCP(brokers...), + Topic: topic, + Balancer: &kafka.RoundRobin{}, + MaxAttempts: 10, + BatchSize: 100, + BatchTimeout: 10 * time.Millisecond, + Async: true, + } + + producer := &Producer{BootstrapServers: brokers, syncWriter: syncWriter, asyncWriter: asyncWriter, wg: &sync.WaitGroup{}} + producer.asyncWriter.Completion = producer.DeliveryReport - producer := &Producer{BootstrapServers: brokers, writer: writer} return producer } func (p *Producer) Produce(key string, value string) { - p.writer.WriteMessages(context.Background(), kafka.Message{Key: []byte(key), Value: []byte(value)}) + p.syncWriter.WriteMessages(context.Background(), kafka.Message{Key: []byte(key), Value: []byte(value)}) +} + +func (p *Producer) ProduceChannel(key string, value string) { + p.wg.Add(1) + p.asyncWriter.WriteMessages(context.Background(), kafka.Message{Key: []byte(key), Value: []byte(value)}) +} + +func (p *Producer) DeliveryReport(messages []kafka.Message, err error) { + for _, v := range messages { + _ = v + p.wg.Done() + } +} + +func (p *Producer) Wait() { + p.wg.Wait() }