Skip to content

Commit 6e9c415

Browse files
authored
feat: introduced defaultBalancer for Kafka producers that dynamically selects the balancing strategy
Introduced defaultBalancer for Kafka producers that dynamically selects the balancing strategy: Uses RoundRobin for messages without keys. Uses Murmur2 for messages with keys.
1 parent 8cef972 commit 6e9c415

File tree

5 files changed

+156
-12
lines changed

5 files changed

+156
-12
lines changed

balancer.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,22 @@ package kafka
22

33
import "github.com/segmentio/kafka-go"
44

5+
var (
6+
balancerRoundRobin = GetBalancerRoundRobin()
7+
balancerMurmur = GetBalancerMurmur2Balancer()
8+
)
9+
510
type Balancer kafka.Balancer
611

12+
type defaultBalancer struct{}
13+
14+
func (s *defaultBalancer) Balance(msg kafka.Message, partitions ...int) (partition int) {
15+
if msg.Key == nil {
16+
return balancerRoundRobin.Balance(msg, partitions...)
17+
}
18+
return balancerMurmur.Balance(msg, partitions...)
19+
}
20+
721
func GetBalancerCRC32() Balancer {
822
return &kafka.CRC32Balancer{}
923
}

balancer_test.go

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package kafka
33
import (
44
"reflect"
55
"testing"
6+
7+
"github.com/segmentio/kafka-go"
68
)
79

810
func TestGetBalancerCRC32(t *testing.T) {
@@ -115,3 +117,103 @@ func TestGetBalancerString(t *testing.T) {
115117
})
116118
}
117119
}
120+
121+
func TestDefaultBalancer_Balance(t *testing.T) {
122+
partitions := []int{0, 1, 2, 3}
123+
124+
t.Run("Should_Use_RoundRobin_When_Key_Is_Nil", func(t *testing.T) {
125+
// Given
126+
msg := kafka.Message{Key: nil}
127+
balancer := &defaultBalancer{}
128+
expected := GetBalancerRoundRobin().Balance(msg, partitions...)
129+
130+
// When
131+
result := balancer.Balance(msg, partitions...)
132+
133+
// Then
134+
if result != expected {
135+
t.Errorf("Expected RoundRobin partition %d, got %d", expected, result)
136+
}
137+
})
138+
139+
t.Run("Should_Use_Murmur2_When_Key_Is_Not_Nil", func(t *testing.T) {
140+
// Given
141+
msg := kafka.Message{Key: []byte("key")}
142+
balancer := &defaultBalancer{}
143+
expected := GetBalancerMurmur2Balancer().Balance(msg, partitions...)
144+
145+
// When
146+
result := balancer.Balance(msg, partitions...)
147+
148+
// Then
149+
if result != expected {
150+
t.Errorf("Expected Murmur2Balancer partition %d, got %d", expected, result)
151+
}
152+
})
153+
}
154+
155+
type optimizedBalancer struct{}
156+
157+
func (s *optimizedBalancer) Balance(msg kafka.Message, partitions ...int) int {
158+
var balancer kafka.Balancer
159+
if msg.Key == nil {
160+
balancer = balancerRoundRobin
161+
} else {
162+
balancer = balancerMurmur
163+
}
164+
return balancer.Balance(msg, partitions...)
165+
}
166+
167+
func BenchmarkDefaultBalancer_WithAlloc(b *testing.B) {
168+
partitions := []int{0, 1, 2, 3}
169+
msgWithKey := kafka.Message{Key: []byte("key")}
170+
msgWithoutKey := kafka.Message{Key: nil}
171+
balancer := &defaultBalancer{}
172+
173+
b.ReportAllocs()
174+
b.ResetTimer()
175+
176+
for i := 0; i < b.N; i++ {
177+
if i%2 == 0 {
178+
balancer.Balance(msgWithKey, partitions...)
179+
} else {
180+
balancer.Balance(msgWithoutKey, partitions...)
181+
}
182+
}
183+
}
184+
185+
func BenchmarkDefaultBalancer_Optimized(b *testing.B) {
186+
partitions := []int{0, 1, 2, 3}
187+
msgWithKey := kafka.Message{Key: []byte("key")}
188+
msgWithoutKey := kafka.Message{Key: nil}
189+
balancer := &optimizedBalancer{}
190+
191+
b.ReportAllocs()
192+
b.ResetTimer()
193+
194+
for i := 0; i < b.N; i++ {
195+
if i%2 == 0 {
196+
balancer.Balance(msgWithKey, partitions...)
197+
} else {
198+
balancer.Balance(msgWithoutKey, partitions...)
199+
}
200+
}
201+
}
202+
203+
func BenchmarkDefaultBalancer_Direct(b *testing.B) {
204+
partitions := []int{0, 1, 2, 3}
205+
msgWithKey := kafka.Message{Key: []byte("key")}
206+
msgWithoutKey := kafka.Message{Key: nil}
207+
balancer := &optimizedBalancer{}
208+
209+
b.ReportAllocs()
210+
b.ResetTimer()
211+
212+
for i := 0; i < b.N; i++ {
213+
if i%2 == 0 {
214+
balancer.Balance(msgWithKey, partitions...)
215+
} else {
216+
balancer.Balance(msgWithoutKey, partitions...)
217+
}
218+
}
219+
}

producer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ func NewProducer(cfg *ProducerConfig, interceptors ...ProducerInterceptor) (Prod
2727
kafkaWriter := &kafka.Writer{
2828
Addr: kafka.TCP(cfg.Writer.Brokers...),
2929
Topic: cfg.Writer.Topic,
30-
Balancer: cfg.Writer.Balancer,
30+
Balancer: &defaultBalancer{},
3131
MaxAttempts: cfg.Writer.MaxAttempts,
3232
WriteBackoffMin: cfg.Writer.WriteBackoffMin,
3333
WriteBackoffMax: cfg.Writer.WriteBackoffMax,

producer_config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,4 +111,8 @@ func (cfg *ProducerConfig) setDefaults() {
111111
cfg.DistributedTracingConfiguration.Propagator = otel.GetTextMapPropagator()
112112
}
113113
}
114+
115+
if cfg.Writer.Balancer == nil {
116+
cfg.Writer.Balancer = &defaultBalancer{}
117+
}
114118
}

producer_config_test.go

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,19 +8,43 @@ import (
88
)
99

1010
func TestProducerConfig_setDefaults(t *testing.T) {
11-
// Given
12-
cfg := ProducerConfig{DistributedTracingEnabled: true}
11+
t.Run("Should_Set_Default_DistributedTracing_Fields", func(t *testing.T) {
12+
// Given
13+
cfg := ProducerConfig{DistributedTracingEnabled: true}
1314

14-
// When
15-
cfg.setDefaults()
15+
// When
16+
cfg.setDefaults()
1617

17-
// Then
18-
if cfg.DistributedTracingConfiguration.TracerProvider == nil {
19-
t.Fatal("Traceprovider cannot be null")
20-
}
21-
if cfg.DistributedTracingConfiguration.Propagator == nil {
22-
t.Fatal("Propagator cannot be null")
23-
}
18+
// Then
19+
if cfg.DistributedTracingConfiguration.TracerProvider == nil {
20+
t.Fatal("TracerProvider cannot be nil")
21+
}
22+
if cfg.DistributedTracingConfiguration.Propagator == nil {
23+
t.Fatal("Propagator cannot be nil")
24+
}
25+
})
26+
27+
t.Run("Should_Set_Default_Balancer_When_Nil", func(t *testing.T) {
28+
// Given
29+
cfg := ProducerConfig{
30+
Writer: WriterConfig{
31+
Balancer: nil,
32+
},
33+
}
34+
35+
// When
36+
cfg.setDefaults()
37+
38+
// Then
39+
if cfg.Writer.Balancer == nil {
40+
t.Fatal("Balancer should not be nil")
41+
}
42+
43+
_, ok := cfg.Writer.Balancer.(*defaultBalancer)
44+
if !ok {
45+
t.Fatalf("Expected balancer to be of type *DefaultBalancer, but got %T", cfg.Writer.Balancer)
46+
}
47+
})
2448
}
2549

2650
func TestProducerConfig_Json(t *testing.T) {

0 commit comments

Comments
 (0)