Skip to content

Commit 736ed18

Browse files
authored
avro field value partitioner (#42)
1 parent ef18021 commit 736ed18

File tree

6 files changed

+612
-11
lines changed

6 files changed

+612
-11
lines changed

kafka-streams-framework/build.gradle.kts

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,34 +3,50 @@ plugins {
33
jacoco
44
id("org.hypertrace.publish-plugin")
55
id("org.hypertrace.jacoco-report-plugin")
6+
id("org.hypertrace.avro-plugin")
67
}
78

89
tasks.test {
910
useJUnitPlatform()
1011
}
1112

1213
dependencies {
14+
annotationProcessor("org.projectlombok:lombok:1.18.24")
15+
compileOnly("org.projectlombok:lombok:1.18.24")
16+
1317
api(project(":kafka-streams-serdes"))
14-
api("com.typesafe:config:1.4.1")
18+
api("com.typesafe:config:1.4.2")
1519
api("org.apache.kafka:kafka-streams:6.0.1-ccs")
1620
api("io.confluent:kafka-streams-avro-serde:6.0.1")
1721

18-
implementation("com.google.guava:guava:30.1-jre")
22+
implementation("com.google.guava:guava:31.1-jre")
23+
implementation("org.apache.avro:avro:1.10.2")
24+
implementation("org.apache.kafka:kafka-clients:6.0.1-ccs")
1925
implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.31")
2026
implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.31")
21-
implementation("org.apache.kafka:kafka-clients:6.0.1-ccs")
2227

2328
constraints {
2429
api("org.glassfish.jersey.core:jersey-common:2.34") {
2530
because("https://snyk.io/vuln/SNYK-JAVA-ORGGLASSFISHJERSEYCORE-1255637")
2631
}
32+
33+
implementation("com.fasterxml.jackson.core:jackson-databind:2.13.2.1") {
34+
because("Denial of Service (DoS) [High Severity]" +
35+
"[https://snyk.io/vuln/SNYK-JAVA-COMFASTERXMLJACKSONCORE-2421244] " +
36+
"in com.fasterxml.jackson.core:[email protected]")
37+
}
2738
}
2839

2940
testImplementation("org.apache.kafka:kafka-streams-test-utils:6.0.1-ccs")
30-
testImplementation("org.junit.jupiter:junit-jupiter:5.7.0")
31-
testImplementation("org.junit-pioneer:junit-pioneer:1.1.0")
32-
testImplementation("org.mockito:mockito-core:3.6.28")
41+
testImplementation("org.junit.jupiter:junit-jupiter:5.8.2")
42+
testImplementation("org.junit-pioneer:junit-pioneer:1.7.0")
43+
testImplementation("org.mockito:mockito-core:4.5.1")
3344
testImplementation("org.hamcrest:hamcrest-core:2.2")
34-
testRuntimeOnly("org.apache.logging.log4j:log4j-slf4j-impl:2.15.0")
45+
testRuntimeOnly("org.apache.logging.log4j:log4j-slf4j-impl:2.17.2")
46+
}
47+
48+
// Disabling compatibility check for the test avro definitions.
49+
tasks.named<org.hypertrace.gradle.avro.CheckAvroCompatibility>("avroCompatibilityCheck") {
50+
setAgainstFiles(null)
3551
}
3652

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
package org.hypertrace.core.kafkastreams.framework.partitioner;
2+
3+
import com.google.common.collect.HashBasedTable;
4+
import com.google.common.collect.Iterables;
5+
import com.google.common.collect.Table;
6+
import lombok.extern.slf4j.Slf4j;
7+
import org.apache.avro.generic.GenericRecord;
8+
import org.apache.kafka.streams.processor.StreamPartitioner;
9+
10+
import java.util.Iterator;
11+
import java.util.List;
12+
import java.util.Map;
13+
import java.util.Objects;
14+
import java.util.Optional;
15+
import java.util.stream.IntStream;
16+
17+
import static java.util.function.Predicate.not;
18+
import static java.util.stream.Collectors.toUnmodifiableList;
19+
import static org.hypertrace.core.kafkastreams.framework.partitioner.AvroFieldValuePartitionerConfig.PartitionGroupConfig;
20+
21+
/**
22+
* Example config:
23+
*
24+
* <pre>
25+
* avro.field.value.partitioner.topics.topic1.field.name = customer_id # mandatory
26+
* avro.field.value.partitioner.topics.topic1.excluded.partitions = "4,5,6,7" # Optional, default empty set
27+
* avro.field.value.partitioner.topics.topic2.field.name = tenant_id # mandatory for each configured topic
28+
* avro.field.value.partitioner.topics.topic2.excluded.partitions = "12,13,14,15" # Optional, default empty set
29+
*
30+
* avro.field.value.partitioner.groups.group1.members = tenant-1 # mandatory - for each configured group
31+
* avro.field.value.partitioner.groups.group1.weight = 25
32+
* avro.field.value.partitioner.groups.group2.members = tenant-2, tenant-3
33+
* avro.field.value.partitioner.groups.group2.weight = 25
34+
* avro.field.value.partitioner.default.group.weight = 50
35+
* </pre>
36+
*/
37+
@Slf4j
38+
public class AvroFieldValuePartitioner<V extends GenericRecord>
39+
implements StreamPartitioner<Object, V> {
40+
private final AvroFieldValuePartitionerConfig partitionerConfig;
41+
private final Table<
42+
String, AvroFieldValuePartitionerConfig.PartitionGroupConfig, Iterator<Integer>>
43+
partitionIteratorByTopicAndGroup = HashBasedTable.create();
44+
45+
public AvroFieldValuePartitioner(Map<String, ?> configs) {
46+
this.partitionerConfig = new AvroFieldValuePartitionerConfig(configs);
47+
}
48+
49+
@Override
50+
public Integer partition(String topic, Object ignoredKey, V value, int numPartitions) {
51+
String partitionKey = this.getPartitionKeyFromRecord(topic, value).orElse("");
52+
return this.calculatePartition(topic, partitionKey, numPartitions);
53+
}
54+
55+
private Optional<String> getPartitionKeyFromRecord(String topic, GenericRecord record) {
56+
return Optional.ofNullable(partitionerConfig.getFieldNameByTopic().get(topic))
57+
.filter(record::hasField)
58+
.map(record::get)
59+
.map(Object::toString);
60+
}
61+
62+
private int calculatePartition(String topic, String key, int numPartitions) {
63+
PartitionGroupConfig groupConfig = this.getPartitionGroup(key);
64+
if (!this.partitionIteratorByTopicAndGroup.contains(topic, groupConfig)) {
65+
List<Integer> availableTopicPartitions =
66+
this.getAvailablePartitionsForTopic(topic, numPartitions);
67+
int totalPartitions = availableTopicPartitions.size();
68+
int fromIndex = (int) (groupConfig.getNormalizedFractionalStart() * totalPartitions);
69+
int toIndex = (int) (groupConfig.getNormalizedFractionalEnd() * totalPartitions);
70+
List<Integer> assignedPartitions = availableTopicPartitions.subList(fromIndex, toIndex);
71+
log.info(
72+
"topic: {}, group config: {}, member: {}, available partitions:{}, assigned partitions: {}",
73+
topic,
74+
groupConfig,
75+
key,
76+
availableTopicPartitions,
77+
assignedPartitions);
78+
// Using cyclic iterator
79+
Iterator<Integer> partitionIterator = Iterables.cycle(assignedPartitions).iterator();
80+
synchronized (partitionIteratorByTopicAndGroup) {
81+
this.partitionIteratorByTopicAndGroup.put(topic, groupConfig, partitionIterator);
82+
}
83+
}
84+
85+
Iterator<Integer> iterator =
86+
Objects.requireNonNull(this.partitionIteratorByTopicAndGroup.get(topic, groupConfig));
87+
synchronized (iterator) {
88+
return iterator.next();
89+
}
90+
}
91+
92+
private PartitionGroupConfig getPartitionGroup(String key) {
93+
return this.partitionerConfig
94+
.getGroupConfigByMember()
95+
.getOrDefault(key, this.partitionerConfig.getDefaultGroupConfig());
96+
}
97+
98+
private List<Integer> getAvailablePartitionsForTopic(String topic, int totalPartitionCount) {
99+
return IntStream.range(0, totalPartitionCount)
100+
.boxed()
101+
.filter(not(partitionerConfig.getExcludedPartitionsByTopic().get(topic)::contains))
102+
.collect(toUnmodifiableList());
103+
}
104+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
package org.hypertrace.core.kafkastreams.framework.partitioner;
2+
3+
import com.google.common.base.Splitter;
4+
import com.google.common.collect.Maps;
5+
import com.google.common.util.concurrent.AtomicDouble;
6+
import com.typesafe.config.Config;
7+
import com.typesafe.config.ConfigFactory;
8+
import lombok.Value;
9+
10+
import java.util.Collection;
11+
import java.util.List;
12+
import java.util.Map;
13+
import java.util.Map.Entry;
14+
import java.util.Set;
15+
import java.util.function.Function;
16+
import java.util.stream.Collectors;
17+
18+
@Value
19+
class AvroFieldValuePartitionerConfig {
20+
private static final Splitter SPLITTER = Splitter.on(",").omitEmptyStrings().trimResults();
21+
22+
static final String PARTITIONER_CONFIG_PREFIX = "avro.field.value.partitioner";
23+
24+
static final String TOPICS_CONFIG_PREFIX = "topics";
25+
static final String FIELD_NAME = "field.name";
26+
static final String EXCLUDED_PARTITIONS = "excluded.partitions";
27+
static final String DEFAULT_GROUP_WEIGHT = "default.group.weight";
28+
29+
static final String GROUPS_CONFIG_PREFIX = "groups";
30+
static final String GROUP_MEMBERS = "members";
31+
static final String GROUP_WEIGHT = "weight";
32+
33+
Map<String, String> fieldNameByTopic;
34+
Map<String, Set<Integer>> excludedPartitionsByTopic;
35+
PartitionGroupConfig defaultGroupConfig;
36+
Map<String, PartitionGroupConfig> groupConfigByMember;
37+
38+
@Value
39+
static class PartitionGroupConfig {
40+
// This represents some range between 0-1 (e.g. 0.6-0.8) that specifies this group's share
41+
double normalizedFractionalStart;
42+
double normalizedFractionalEnd;
43+
}
44+
45+
public AvroFieldValuePartitionerConfig(Map<String, ?> streamConfigMap) {
46+
final Map<String, String> fieldNameByTopic = Maps.newHashMap();
47+
final Map<String, Set<Integer>> excludedPartitionsByTopic = Maps.newHashMap();
48+
49+
Map<String, Object> partitionerConfigProps =
50+
streamConfigMap.entrySet().stream()
51+
.filter(entry -> entry.getKey().startsWith(PARTITIONER_CONFIG_PREFIX))
52+
.collect(Collectors.toUnmodifiableMap(Entry::getKey, Entry::getValue));
53+
Config partitionerConfig =
54+
ConfigFactory.parseMap(partitionerConfigProps).getConfig(PARTITIONER_CONFIG_PREFIX);
55+
56+
Config topicsConfig = partitionerConfig.getConfig(TOPICS_CONFIG_PREFIX);
57+
topicsConfig
58+
.root()
59+
.keySet()
60+
.forEach(
61+
topic -> {
62+
Config topicConfig = topicsConfig.getConfig(topic);
63+
fieldNameByTopic.put(topic, topicConfig.getString(FIELD_NAME));
64+
String excludedPartitionsStr =
65+
topicConfig.hasPath(EXCLUDED_PARTITIONS)
66+
? topicConfig.getString(EXCLUDED_PARTITIONS)
67+
: "";
68+
Set<Integer> excludedPartitions =
69+
SPLITTER
70+
.splitToStream(excludedPartitionsStr)
71+
.map(Integer::valueOf)
72+
.collect(Collectors.toSet());
73+
excludedPartitionsByTopic.put(topic, excludedPartitions);
74+
});
75+
76+
this.fieldNameByTopic = Map.copyOf(fieldNameByTopic);
77+
this.excludedPartitionsByTopic = Map.copyOf(excludedPartitionsByTopic);
78+
double defaultWeight = partitionerConfig.getDouble(DEFAULT_GROUP_WEIGHT);
79+
80+
Config groupsConfig =
81+
partitionerConfig.hasPath(GROUPS_CONFIG_PREFIX)
82+
? partitionerConfig.getConfig(GROUPS_CONFIG_PREFIX)
83+
: ConfigFactory.empty();
84+
85+
// Sort groups by name for consistent ordering
86+
List<Config> groupConfigs =
87+
groupsConfig.root().keySet().stream()
88+
.sorted()
89+
.map(groupsConfig::getConfig)
90+
.collect(Collectors.toUnmodifiableList());
91+
92+
double totalWeight =
93+
defaultWeight
94+
+ groupConfigs.stream()
95+
.map(groupConfig -> groupConfig.getDouble(GROUP_WEIGHT))
96+
.reduce(0d, Double::sum);
97+
AtomicDouble weightConsumedSoFar = new AtomicDouble();
98+
this.defaultGroupConfig =
99+
new PartitionGroupConfig(
100+
weightConsumedSoFar.get(), weightConsumedSoFar.addAndGet(defaultWeight / totalWeight));
101+
102+
this.groupConfigByMember =
103+
groupConfigs.stream()
104+
.map(
105+
groupConfig ->
106+
buildKeyValueMapForConfig(
107+
groupConfig,
108+
new PartitionGroupConfig(
109+
weightConsumedSoFar.get(),
110+
weightConsumedSoFar.addAndGet(
111+
groupConfig.getDouble(GROUP_WEIGHT) / totalWeight))))
112+
.map(Map::entrySet)
113+
.flatMap(Collection::stream)
114+
.collect(Collectors.toUnmodifiableMap(Entry::getKey, Entry::getValue));
115+
}
116+
117+
private static Map<String, PartitionGroupConfig> buildKeyValueMapForConfig(
118+
Config groupConfig, PartitionGroupConfig partitionGroupConfig) {
119+
return SPLITTER.splitToList(groupConfig.getString(GROUP_MEMBERS)).stream()
120+
.collect(Collectors.toUnmodifiableMap(Function.identity(), unused -> partitionGroupConfig));
121+
}
122+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
@namespace("org.hypertrace.core.kafkastreams.framework.partitioner")
2+
protocol TestRecordProtocol {
3+
record TestCustomerRecord {
4+
string customer_id;
5+
string span_id;
6+
string trace_id;
7+
}
8+
9+
record TestTenantRecord {
10+
string tenant_id;
11+
string span_id;
12+
string trace_id;
13+
}
14+
}

0 commit comments

Comments
 (0)