Skip to content

Commit ed495da

Browse files
authored
Add a new partitioner that can try stopping producing events configured partitions (#39)
* new filtered partitioner * snyk fixes
1 parent b28543b commit ed495da

File tree

4 files changed

+396
-3
lines changed

4 files changed

+396
-3
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package org.hypertrace.core.kafkastreams.framework.partitioner;
2+
3+
import com.google.common.base.Splitter;
4+
import com.google.common.collect.Maps;
5+
import org.apache.kafka.clients.producer.internals.StickyPartitionCache;
6+
import org.apache.kafka.common.Cluster;
7+
8+
import java.util.Map;
9+
import java.util.Set;
10+
import java.util.regex.Matcher;
11+
import java.util.regex.Pattern;
12+
import java.util.stream.Collectors;
13+
14+
/**
15+
* An internal class that implements a cache used for filtered sticky partitioning behavior. The cache tracks the current sticky
16+
* partition for any given topic.
17+
*/
18+
public class FilteredUniformStickyPartitionCache extends StickyPartitionCache {
19+
private final String configRegex = "filtered\\.partitioner\\.(.*?)\\.excluded\\.partitions";
20+
private final Pattern pattern = Pattern.compile(configRegex);
21+
private final Map<String, Set<Integer>> excludedTopicPartitions = Maps.newConcurrentMap();
22+
23+
public void configure(Map<String, ?> configs) {
24+
configs.entrySet().stream().filter(config -> pattern.matcher(config.getKey()).find()).forEach(config -> {
25+
Matcher matcher = pattern.matcher(config.getKey());
26+
matcher.find();
27+
String topic = matcher.group(1);
28+
String excludePartitionsStr = (String) config.getValue();
29+
Set<Integer> excludedPartitions = Splitter.on(",")
30+
.trimResults()
31+
.splitToStream(excludePartitionsStr)
32+
.map(Integer::valueOf)
33+
.collect(Collectors.toSet());
34+
excludedTopicPartitions.put(topic, excludedPartitions);
35+
});
36+
}
37+
38+
public int nextPartition(String topic, Cluster cluster, int prevPartition) {
39+
int newPartition = super.nextPartition(topic, cluster, prevPartition);
40+
if(excludedTopicPartitions.containsKey(topic)) {
41+
Set<Integer> partitionsExcluded = excludedTopicPartitions.get(topic);
42+
while(partitionsExcluded.contains(newPartition)) {
43+
newPartition = super.nextPartition(topic, cluster, newPartition);
44+
}
45+
}
46+
return newPartition;
47+
}
48+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package org.hypertrace.core.kafkastreams.framework.partitioner;
2+
3+
import org.apache.kafka.clients.producer.Partitioner;
4+
import org.apache.kafka.common.Cluster;
5+
6+
import java.util.Map;
7+
8+
/**
9+
* The partitioning strategy:
10+
* <ul>
11+
* <li>If a partition is specified in the record, use it
12+
* <li>Otherwise choose the sticky non-excluded partition that changes when the batch is full.
13+
*
14+
*
15+
* NOTE: In contrast to the DefaultPartitioner, the record key is NOT used as part of the partitioning strategy in this
16+
* partitioner. Records with the same key are not guaranteed to be sent to the same partition.
17+
*
18+
* NOTE: Exclusion of partitions is done best-effort basis. There are some scenarios, where this
19+
* partitioner. Records with the same key are not guaranteed to be sent to the same partition.
20+
* See KIP-480 for details about sticky partitioning.
21+
*/
22+
public class FilteredUniformStickyPartitioner implements Partitioner {
23+
24+
private final FilteredUniformStickyPartitionCache stickyPartitionCache =
25+
new FilteredUniformStickyPartitionCache();
26+
27+
@Override
28+
public void configure(Map<String, ?> configs) {
29+
stickyPartitionCache.configure(configs);
30+
}
31+
32+
@Override
33+
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
34+
return stickyPartitionCache.partition(topic, cluster);
35+
}
36+
37+
@Override
38+
public void close() {
39+
}
40+
41+
@Override
42+
public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
43+
stickyPartitionCache.nextPartition(topic, cluster, prevPartition);
44+
}
45+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,300 @@
1+
package org.hypertrace.core.kafkastreams.framework.partitioner;
2+
3+
import com.google.common.collect.Maps;
4+
import org.apache.kafka.clients.producer.Partitioner;
5+
import org.apache.kafka.common.Cluster;
6+
import org.apache.kafka.common.Node;
7+
import org.apache.kafka.common.PartitionInfo;
8+
import org.junit.jupiter.api.Test;
9+
10+
import java.util.Collections;
11+
import java.util.HashMap;
12+
import java.util.List;
13+
import java.util.Map;
14+
15+
import static java.util.Arrays.asList;
16+
import static org.junit.jupiter.api.Assertions.assertEquals;
17+
import static org.junit.jupiter.api.Assertions.assertTrue;
18+
19+
public class FilteredUniformStickyPartitionerTest {
20+
private static final Node[] NODES =
21+
new Node[] {
22+
new Node(0, "localhost", 99), new Node(1, "localhost", 100), new Node(2, "localhost", 101)
23+
};
24+
25+
private static final String TOPIC_A = "TOPIC_A";
26+
private static final String TOPIC_B = "TOPIC_B";
27+
28+
@Test
29+
public void testRoundRobinWithUnavailablePartitions() {
30+
// Intentionally make the partition list not in partition order to test the edge
31+
// cases.
32+
List<PartitionInfo> partitions =
33+
asList(
34+
new PartitionInfo("test", 1, null, NODES, NODES),
35+
new PartitionInfo("test", 2, NODES[1], NODES, NODES),
36+
new PartitionInfo("test", 0, NODES[0], NODES, NODES));
37+
// When there are some unavailable partitions, we want to make sure that (1) we
38+
// always pick an available partition,
39+
// and (2) the available partitions are selected in a sticky way.
40+
int countForPart0 = 0;
41+
int countForPart2 = 0;
42+
int part = 0;
43+
Partitioner partitioner = new FilteredUniformStickyPartitioner();
44+
Cluster cluster =
45+
new Cluster(
46+
"clusterId",
47+
asList(NODES[0], NODES[1], NODES[2]),
48+
partitions,
49+
Collections.<String>emptySet(),
50+
Collections.<String>emptySet());
51+
for (int i = 0; i < 50; i++) {
52+
part = partitioner.partition("test", null, null, null, null, cluster);
53+
assertTrue(
54+
part == 0 || part == 2, "We should never choose a leader-less node in round robin");
55+
if (part == 0) countForPart0++;
56+
else countForPart2++;
57+
}
58+
// Simulates switching the sticky partition on a new batch.
59+
partitioner.onNewBatch("test", cluster, part);
60+
for (int i = 1; i <= 50; i++) {
61+
part = partitioner.partition("test", null, null, null, null, cluster);
62+
assertTrue(
63+
part == 0 || part == 2, "We should never choose a leader-less node in round robin");
64+
if (part == 0) countForPart0++;
65+
else countForPart2++;
66+
}
67+
assertEquals(
68+
countForPart0,
69+
countForPart2,
70+
"The distribution between two available partitions should be even");
71+
}
72+
73+
@Test
74+
public void testRoundRobinWithKeyBytes() throws InterruptedException {
75+
List<PartitionInfo> allPartitions =
76+
asList(
77+
new PartitionInfo(TOPIC_A, 0, NODES[0], NODES, NODES),
78+
new PartitionInfo(TOPIC_A, 1, NODES[1], NODES, NODES),
79+
new PartitionInfo(TOPIC_A, 2, NODES[1], NODES, NODES),
80+
new PartitionInfo(TOPIC_B, 0, NODES[0], NODES, NODES));
81+
Cluster testCluster =
82+
new Cluster(
83+
"clusterId",
84+
asList(NODES[0], NODES[1], NODES[2]),
85+
allPartitions,
86+
Collections.<String>emptySet(),
87+
Collections.<String>emptySet());
88+
89+
final Map<Integer, Integer> partitionCount = new HashMap<>();
90+
91+
final byte[] keyBytes = "key".getBytes();
92+
int partition = 0;
93+
Partitioner partitioner = new FilteredUniformStickyPartitioner();
94+
for (int i = 0; i < 30; ++i) {
95+
partition = partitioner.partition(TOPIC_A, null, keyBytes, null, null, testCluster);
96+
Integer count = partitionCount.get(partition);
97+
if (null == count) count = 0;
98+
partitionCount.put(partition, count + 1);
99+
100+
if (i % 5 == 0) {
101+
partitioner.partition(TOPIC_B, null, keyBytes, null, null, testCluster);
102+
}
103+
}
104+
// Simulate a batch filling up and switching the sticky partition.
105+
partitioner.onNewBatch(TOPIC_A, testCluster, partition);
106+
partitioner.onNewBatch(TOPIC_B, testCluster, 0);
107+
108+
// Save old partition to ensure that the wrong partition does not trigger a new batch.
109+
int oldPart = partition;
110+
111+
for (int i = 0; i < 30; ++i) {
112+
partition = partitioner.partition(TOPIC_A, null, keyBytes, null, null, testCluster);
113+
Integer count = partitionCount.get(partition);
114+
if (null == count) count = 0;
115+
partitionCount.put(partition, count + 1);
116+
117+
if (i % 5 == 0) {
118+
partitioner.partition(TOPIC_B, null, keyBytes, null, null, testCluster);
119+
}
120+
}
121+
122+
int newPart = partition;
123+
124+
// Attempt to switch the partition with the wrong previous partition. Sticky partition should
125+
// not change.
126+
partitioner.onNewBatch(TOPIC_A, testCluster, oldPart);
127+
128+
for (int i = 0; i < 30; ++i) {
129+
partition = partitioner.partition(TOPIC_A, null, keyBytes, null, null, testCluster);
130+
Integer count = partitionCount.get(partition);
131+
if (null == count) count = 0;
132+
partitionCount.put(partition, count + 1);
133+
134+
if (i % 5 == 0) {
135+
partitioner.partition(TOPIC_B, null, keyBytes, null, null, testCluster);
136+
}
137+
}
138+
139+
assertEquals(30, partitionCount.get(oldPart).intValue());
140+
assertEquals(60, partitionCount.get(newPart).intValue());
141+
}
142+
143+
@Test
144+
public void testRoundRobinWithNullKeyBytes() {
145+
List<PartitionInfo> allPartitions =
146+
asList(
147+
new PartitionInfo(TOPIC_A, 0, NODES[0], NODES, NODES),
148+
new PartitionInfo(TOPIC_A, 1, NODES[1], NODES, NODES),
149+
new PartitionInfo(TOPIC_A, 2, NODES[1], NODES, NODES),
150+
new PartitionInfo(TOPIC_B, 0, NODES[0], NODES, NODES));
151+
Cluster testCluster =
152+
new Cluster(
153+
"clusterId",
154+
asList(NODES[0], NODES[1], NODES[2]),
155+
allPartitions,
156+
Collections.<String>emptySet(),
157+
Collections.<String>emptySet());
158+
159+
final Map<Integer, Integer> partitionCount = new HashMap<>();
160+
161+
int partition = 0;
162+
Partitioner partitioner = new FilteredUniformStickyPartitioner();
163+
for (int i = 0; i < 30; ++i) {
164+
partition = partitioner.partition(TOPIC_A, null, null, null, null, testCluster);
165+
Integer count = partitionCount.get(partition);
166+
if (null == count) count = 0;
167+
partitionCount.put(partition, count + 1);
168+
169+
if (i % 5 == 0) {
170+
partitioner.partition(TOPIC_B, null, null, null, null, testCluster);
171+
}
172+
}
173+
// Simulate a batch filling up and switching the sticky partition.
174+
partitioner.onNewBatch(TOPIC_A, testCluster, partition);
175+
partitioner.onNewBatch(TOPIC_B, testCluster, 0);
176+
177+
// Save old partition to ensure that the wrong partition does not trigger a new batch.
178+
int oldPart = partition;
179+
180+
for (int i = 0; i < 30; ++i) {
181+
partition = partitioner.partition(TOPIC_A, null, null, null, null, testCluster);
182+
Integer count = partitionCount.get(partition);
183+
if (null == count) count = 0;
184+
partitionCount.put(partition, count + 1);
185+
186+
if (i % 5 == 0) {
187+
partitioner.partition(TOPIC_B, null, null, null, null, testCluster);
188+
}
189+
}
190+
191+
int newPart = partition;
192+
193+
// Attempt to switch the partition with the wrong previous partition. Sticky partition should
194+
// not change.
195+
partitioner.onNewBatch(TOPIC_A, testCluster, oldPart);
196+
197+
for (int i = 0; i < 30; ++i) {
198+
partition = partitioner.partition(TOPIC_A, null, null, null, null, testCluster);
199+
Integer count = partitionCount.get(partition);
200+
if (null == count) count = 0;
201+
partitionCount.put(partition, count + 1);
202+
203+
if (i % 5 == 0) {
204+
partitioner.partition(TOPIC_B, null, null, null, null, testCluster);
205+
}
206+
}
207+
208+
assertEquals(30, partitionCount.get(oldPart).intValue());
209+
assertEquals(60, partitionCount.get(newPart).intValue());
210+
}
211+
212+
@Test
213+
public void testRoundRobinWithNullKeyBytesAndPartitionExclusionFilter() {
214+
List<PartitionInfo> allPartitions =
215+
asList(
216+
new PartitionInfo(TOPIC_A, 0, NODES[0], NODES, NODES),
217+
new PartitionInfo(TOPIC_A, 1, NODES[1], NODES, NODES),
218+
new PartitionInfo(TOPIC_A, 2, NODES[0], NODES, NODES),
219+
new PartitionInfo(TOPIC_A, 3, NODES[1], NODES, NODES),
220+
new PartitionInfo(TOPIC_B, 0, NODES[0], NODES, NODES),
221+
new PartitionInfo(TOPIC_B, 1, NODES[1], NODES, NODES),
222+
new PartitionInfo(TOPIC_B, 2, NODES[1], NODES, NODES));
223+
Cluster testCluster =
224+
new Cluster(
225+
"clusterId",
226+
asList(NODES[0], NODES[1], NODES[2]),
227+
allPartitions,
228+
Collections.<String>emptySet(),
229+
Collections.<String>emptySet());
230+
231+
final Map<Integer, Integer> topicAMsgsPerPartitionCounter = new HashMap<>();
232+
final Map<Integer, Integer> topicBMsgsPerPartitionCounter = new HashMap<>();
233+
234+
Partitioner partitioner = new FilteredUniformStickyPartitioner();
235+
Map<String, String> kafkaProducerConfigs = Maps.newHashMap();
236+
kafkaProducerConfigs.put(
237+
String.format("filtered.partitioner.%s.excluded.partitions", TOPIC_A), "2,3");
238+
kafkaProducerConfigs.put(
239+
String.format("filtered.partitioner.%s.excluded.partitions", TOPIC_B), "0,2");
240+
241+
int topicAPartition = 0;
242+
int topicBPartition = 0;
243+
244+
partitioner.configure(kafkaProducerConfigs);
245+
for (int i = 0; i < 30; ++i) {
246+
topicAPartition = partitioner.partition(TOPIC_A, null, null, null, null, testCluster);
247+
Integer topicACount = topicAMsgsPerPartitionCounter.get(topicAPartition);
248+
if (null == topicACount) topicACount = 0;
249+
topicAMsgsPerPartitionCounter.put(topicAPartition, topicACount + 1);
250+
251+
topicBPartition = partitioner.partition(TOPIC_B, null, null, null, null, testCluster);
252+
Integer topicBCount = topicBMsgsPerPartitionCounter.get(topicBPartition);
253+
if (null == topicBCount) topicBCount = 0;
254+
topicBMsgsPerPartitionCounter.put(topicBPartition, topicBCount + 1);
255+
}
256+
257+
// Simulate a batch filling up and switching the sticky partition.
258+
partitioner.onNewBatch(TOPIC_A, testCluster, topicAPartition);
259+
partitioner.onNewBatch(TOPIC_B, testCluster, topicBPartition);
260+
261+
// Save old partition to ensure that the wrong partition does not trigger a new batch.
262+
int oldTopicAPart = topicAPartition;
263+
int oldTopicBPart = topicBPartition;
264+
265+
assertEquals(30, topicAMsgsPerPartitionCounter.get(oldTopicAPart).intValue());
266+
assertEquals(30, topicBMsgsPerPartitionCounter.get(1).intValue());
267+
268+
for (int i = 0; i < 30; ++i) {
269+
topicAPartition = partitioner.partition(TOPIC_A, null, null, null, null, testCluster);
270+
Integer topicACount = topicAMsgsPerPartitionCounter.get(topicAPartition);
271+
if (null == topicACount) topicACount = 0;
272+
topicAMsgsPerPartitionCounter.put(topicAPartition, topicACount + 1);
273+
274+
topicBPartition = partitioner.partition(TOPIC_B, null, null, null, null, testCluster);
275+
Integer topicBCount = topicBMsgsPerPartitionCounter.get(topicBPartition);
276+
if (null == topicBCount) topicBCount = 0;
277+
topicBMsgsPerPartitionCounter.put(topicBPartition, topicBCount + 1);
278+
}
279+
280+
// Attempt to switch the partition with the wrong previous partition. Sticky partition should
281+
// not change.
282+
partitioner.onNewBatch(TOPIC_A, testCluster, oldTopicAPart);
283+
partitioner.onNewBatch(TOPIC_B, testCluster, oldTopicBPart);
284+
285+
for (int i = 0; i < 30; ++i) {
286+
topicAPartition = partitioner.partition(TOPIC_A, null, null, null, null, testCluster);
287+
Integer topicACount = topicAMsgsPerPartitionCounter.get(topicAPartition);
288+
if (null == topicACount) topicACount = 0;
289+
topicAMsgsPerPartitionCounter.put(topicAPartition, topicACount + 1);
290+
291+
topicBPartition = partitioner.partition(TOPIC_B, null, null, null, null, testCluster);
292+
Integer topicBCount = topicBMsgsPerPartitionCounter.get(topicBPartition);
293+
if (null == topicBCount) topicBCount = 0;
294+
topicBMsgsPerPartitionCounter.put(topicBPartition, topicBCount + 1);
295+
}
296+
297+
assertEquals(90, topicAMsgsPerPartitionCounter.values().stream().reduce((a, b) -> a + b).get());
298+
assertEquals(90, topicBMsgsPerPartitionCounter.get(1).intValue());
299+
}
300+
}

0 commit comments

Comments
 (0)