Skip to content

Commit 5f44a8a

Browse files
authored
fix to prevent overlapping partitions in case of odd weight ratios (#98)
1 parent 08d4406 commit 5f44a8a

File tree

2 files changed

+63
-4
lines changed

2 files changed

+63
-4
lines changed

kafka-streams-partitioners/weighted-group-partitioner/src/main/java/org/hypertrace/core/kafkastreams/framework/partitioner/WeightedGroupPartitioner.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,15 +50,20 @@ public WeightedGroupPartitioner(
5050
@Override
5151
public Integer partition(String topic, K key, V value, int numPartitions) {
5252
WeightedGroup groupConfig = this.getGroupConfig(topic, key, value);
53-
int fromIndex = (int) Math.floor(groupConfig.getNormalizedFractionalStart() * numPartitions);
54-
int toIndex = (int) Math.ceil(groupConfig.getNormalizedFractionalEnd() * numPartitions);
55-
int numPartitionsForGroup = toIndex - fromIndex;
53+
int fromIndexInclusive =
54+
(int) Math.floor(groupConfig.getNormalizedFractionalStart() * numPartitions);
55+
int toIndexExclusive =
56+
(int) Math.floor(groupConfig.getNormalizedFractionalEnd() * numPartitions);
57+
// Partition indexing starts from 0.
58+
// Every group size should be at least one. This prevents divide by zero error in delegate
59+
// partitioner.
60+
int numPartitionsForGroup = Math.max(toIndexExclusive - fromIndexInclusive, 1);
5661

5762
// partitioner by contract can return null.
5863
// Refer api doc: org.apache.kafka.streams.processor.StreamPartitioner.partition
5964
// when delegate partitioner returns null, we use fallback partitioner (round-robin within
6065
// group)
61-
return fromIndex
66+
return fromIndexInclusive
6267
+ Optional.ofNullable(
6368
delegatePartitioner.partition(topic, key, value, numPartitionsForGroup))
6469
.orElse(

kafka-streams-partitioners/weighted-group-partitioner/src/test/java/org/hypertrace/core/kafkastreams/framework/partitioner/WeightedGroupPartitionerTest.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,13 +175,67 @@ public void testPartitionerWithKeyHashDelegatePartitioner() {
175175
assertEquals(4, partition);
176176
}
177177

178+
@Test
179+
public void testWithNonMultipleWeightRatio() {
180+
181+
int testCount = 100;
182+
PartitionerConfigServiceClient testClient =
183+
(profileName) ->
184+
new WeightedGroupProfile(
185+
PartitionerProfile.newBuilder()
186+
.addGroups(newPartitionerGroup("group1", new String[] {"tenant-1"}, 27))
187+
.addGroups(
188+
newPartitionerGroup("group2", new String[] {"tenant-2", "tenant-3"}, 27))
189+
.setDefaultGroupWeight(52)
190+
.setName(profileName)
191+
.build());
192+
193+
WeightedGroupPartitioner<String, String> partitioner =
194+
new WeightedGroupPartitioner<>(
195+
"spans", testClient, groupKeyExtractor, roundRobinPartitioner);
196+
int partition;
197+
198+
// Test case 1: tenant-1 belong to group-1 (partitions: [0 to 7])
199+
for (int i = 1; i <= testCount; i++) {
200+
partition = partitioner.partition("test-topic", "tenant-1", "span-" + i, 32);
201+
assertTrue(
202+
partition >= 0 && partition <= 7,
203+
"actual partition not in expected range. partition: " + partition);
204+
}
205+
206+
// Test case 2: tenant-2 belong to group-2 (partitions: [8 to 15])
207+
for (int i = 1; i <= testCount; i++) {
208+
partition = partitioner.partition("test-topic", "tenant-2", "span-" + i, 32);
209+
assertTrue(
210+
partition >= 8 && partition <= 15,
211+
"actual partition not in expected range. partition: " + partition);
212+
}
213+
214+
// Test case 3: tenant-3 belong to group-2 (partitions: [8 to 15])
215+
for (int i = 1; i <= testCount; i++) {
216+
partition = partitioner.partition("test-topic", "tenant-3", "span-" + i, 32);
217+
assertTrue(
218+
partition >= 8 && partition <= 15,
219+
"actual partition not in expected range. partition: " + partition);
220+
}
221+
222+
// Test case 4: groupKey=unknown should use default group [16 to 31]
223+
for (int i = 1; i <= testCount; i++) {
224+
partition = partitioner.partition("test-topic", "unknown", "span-" + i, 32);
225+
assertTrue(
226+
partition >= 16 && partition <= 31,
227+
"actual partition not in expected range. partition: " + partition);
228+
}
229+
}
230+
178231
private PartitionerConfigServiceClient getTestServiceClient() {
179232
return (profileName) ->
180233
new WeightedGroupProfile(
181234
PartitionerProfile.newBuilder()
182235
.addGroups(newPartitionerGroup("group1", new String[] {"tenant-1"}, 25))
183236
.addGroups(newPartitionerGroup("group2", new String[] {"tenant-2", "tenant-3"}, 25))
184237
.setDefaultGroupWeight(50)
238+
.setName(profileName)
185239
.build());
186240
}
187241

0 commit comments

Comments
 (0)