Skip to content

Commit efb57af

Browse files
authored
ReferenceHash balancer (#906)
* ReferenceHash balancer * ReferenceHash randomBalancer
1 parent cc6b7a1 commit efb57af

File tree

3 files changed

+113
-3
lines changed

3 files changed

+113
-3
lines changed

README.md

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -468,9 +468,13 @@ The `Writer` will return an error if it detects this ambiguity.
468468

469469
#### Sarama
470470

471-
If you're switching from Sarama and need/want to use the same algorithm for message
472-
partitioning, you can use the ```kafka.Hash``` balancer. ```kafka.Hash``` routes
473-
messages to the same partitions that Sarama's default partitioner would route to.
471+
If you're switching from Sarama and need/want to use the same algorithm for message partitioning, you can either use
472+
the `kafka.Hash` balancer or the `kafka.ReferenceHash` balancer:
473+
* `kafka.Hash` = `sarama.NewHashPartitioner`
474+
* `kafka.ReferenceHash` = `sarama.NewReferenceHashPartitioner`
475+
476+
The `kafka.Hash` and `kafka.ReferenceHash` balancers would route messages to the same partitions that the two
477+
aforementioned Sarama partitioners would route them to.
474478

475479
```go
476480
w := &kafka.Writer{

balancer.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,53 @@ func (h *Hash) Balance(msg Message, partitions ...int) int {
167167
return int(partition)
168168
}
169169

170+
// ReferenceHash is a Balancer that uses the provided hash function to determine which
171+
// partition to route messages to. This ensures that messages with the same key
172+
// are routed to the same partition.
173+
//
174+
// The logic to calculate the partition is:
175+
//
176+
// (int32(hasher.Sum32()) & 0x7fffffff) % len(partitions) => partition
177+
//
178+
// By default, ReferenceHash uses the FNV-1a algorithm. This is the same algorithm as
179+
// the Sarama NewReferenceHashPartitioner and ensures that messages produced by kafka-go will
180+
// be delivered to the same topics that the Sarama producer would be delivered to.
181+
type ReferenceHash struct {
182+
rr randomBalancer
183+
Hasher hash.Hash32
184+
185+
// lock protects Hasher while calculating the hash code. It is assumed that
186+
// the Hasher field is read-only once the Balancer is created, so as a
187+
// performance optimization, reads of the field are not protected.
188+
lock sync.Mutex
189+
}
190+
191+
func (h *ReferenceHash) Balance(msg Message, partitions ...int) int {
192+
if msg.Key == nil {
193+
return h.rr.Balance(msg, partitions...)
194+
}
195+
196+
hasher := h.Hasher
197+
if hasher != nil {
198+
h.lock.Lock()
199+
defer h.lock.Unlock()
200+
} else {
201+
hasher = fnv1aPool.Get().(hash.Hash32)
202+
defer fnv1aPool.Put(hasher)
203+
}
204+
205+
hasher.Reset()
206+
if _, err := hasher.Write(msg.Key); err != nil {
207+
panic(err)
208+
}
209+
210+
// uses the same algorithm as the Sarama's referenceHashPartitioner.
211+
// note the type conversions here. if the uint32 hash code is not cast to
212+
// an int32, we do not get the same result as sarama.
213+
partition := (int32(hasher.Sum32()) & 0x7fffffff) % int32(len(partitions))
214+
return int(partition)
215+
}
216+
170217
type randomBalancer struct {
171218
mock int // mocked return value, used for testing
172219
}

balancer_test.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,65 @@ func TestHashBalancer(t *testing.T) {
6363
}
6464
}
6565

66+
func TestReferenceHashBalancer(t *testing.T) {
67+
testCases := map[string]struct {
68+
Key []byte
69+
Hasher hash.Hash32
70+
Partitions []int
71+
Partition int
72+
RndBalancerResult int
73+
}{
74+
"nil": {
75+
Key: nil, // nil key means random partition
76+
Partitions: []int{0, 1, 2},
77+
Partition: 123,
78+
RndBalancerResult: 123,
79+
},
80+
"partition-0": {
81+
Key: []byte("blah"),
82+
Partitions: []int{0, 1},
83+
Partition: 0,
84+
},
85+
"partition-1": {
86+
Key: []byte("blah"),
87+
Partitions: []int{0, 1, 2},
88+
Partition: 1,
89+
},
90+
"partition-2": {
91+
Key: []byte("castle"),
92+
Partitions: []int{0, 1, 2},
93+
Partition: 2,
94+
},
95+
"custom hash": {
96+
Key: []byte("boop"),
97+
Hasher: crc32.NewIEEE(),
98+
Partitions: []int{0, 1, 2},
99+
Partition: 1,
100+
},
101+
"hash code with MSB set": {
102+
Key: []byte("20"),
103+
Partitions: []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15},
104+
Partition: 15,
105+
},
106+
}
107+
108+
for label, test := range testCases {
109+
t.Run(label, func(t *testing.T) {
110+
var rr randomBalancer
111+
if test.Key == nil {
112+
rr.mock = test.RndBalancerResult
113+
}
114+
115+
msg := Message{Key: test.Key}
116+
h := ReferenceHash{Hasher: test.Hasher, rr: rr}
117+
partition := h.Balance(msg, test.Partitions...)
118+
if partition != test.Partition {
119+
t.Errorf("expected %v; got %v", test.Partition, partition)
120+
}
121+
})
122+
}
123+
}
124+
66125
func TestCRC32Balancer(t *testing.T) {
67126
// These tests are taken from the default "consistent_random" partitioner from
68127
// https://github.com/edenhill/librdkafka/blob/master/tests/0048-partitioner.c

0 commit comments

Comments
 (0)