Skip to content

Commit

Permalink
added segmentio async producer
Browse files Browse the repository at this point in the history
  • Loading branch information
lkumarjain committed Jan 12, 2024
1 parent 8640818 commit 78995b8
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 11 deletions.
11 changes: 11 additions & 0 deletions kafka-client/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,17 @@ func benchmarkSegmentioProducer(b *testing.B, prefix string, valueGenerator func
for i := 0; i < b.N; i++ {
producer.Produce(generateKey(prefix, i), valueGenerator(i))
}

b.StopTimer()
})

b.Run(testName(prefix, "Segmentio@ProduceChannel"), func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
producer.ProduceChannel(generateKey(prefix, i), valueGenerator(i))
}

producer.Wait()
b.StopTimer()
})
}
53 changes: 42 additions & 11 deletions kafka-client/segmentio/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,63 @@ package segmentio
import (
"context"
"strings"
"sync"
"time"

kafka "github.com/segmentio/kafka-go"
)

type Producer struct {
BootstrapServers []string
writer *kafka.Writer
wg *sync.WaitGroup
syncWriter *kafka.Writer
asyncWriter *kafka.Writer
}

func NewProducer(bootstrapServers string, topic string) *Producer {
brokers := strings.Split(bootstrapServers, ",")

writer := kafka.NewWriter(kafka.WriterConfig{
Brokers: brokers,
Topic: topic,
Balancer: &kafka.Hash{},
BatchTimeout: time.Duration(1) * time.Millisecond,
QueueCapacity: 1000,
BatchSize: 1000000,
})
syncWriter := &kafka.Writer{
Addr: kafka.TCP(brokers...),
Topic: topic,
Balancer: &kafka.RoundRobin{},
MaxAttempts: 10,
BatchSize: 100,
BatchTimeout: 10 * time.Millisecond,
}

asyncWriter := &kafka.Writer{
Addr: kafka.TCP(brokers...),
Topic: topic,
Balancer: &kafka.RoundRobin{},
MaxAttempts: 10,
BatchSize: 100,
BatchTimeout: 10 * time.Millisecond,
Async: true,
}

producer := &Producer{BootstrapServers: brokers, syncWriter: syncWriter, asyncWriter: asyncWriter, wg: &sync.WaitGroup{}}
producer.asyncWriter.Completion = producer.DeliveryReport

producer := &Producer{BootstrapServers: brokers, writer: writer}
return producer
}

func (p *Producer) Produce(key string, value string) {
p.writer.WriteMessages(context.Background(), kafka.Message{Key: []byte(key), Value: []byte(value)})
p.syncWriter.WriteMessages(context.Background(), kafka.Message{Key: []byte(key), Value: []byte(value)})
}

func (p *Producer) ProduceChannel(key string, value string) {
p.wg.Add(1)
p.asyncWriter.WriteMessages(context.Background(), kafka.Message{Key: []byte(key), Value: []byte(value)})
}

func (p *Producer) DeliveryReport(messages []kafka.Message, err error) {
for _, v := range messages {
_ = v
p.wg.Done()
}
}

func (p *Producer) Wait() {
p.wg.Wait()
}

0 comments on commit 78995b8

Please sign in to comment.