Skip to content

Commit f58941e

Browse files
authored
async transformer improvements (#62)
1 parent 41f3cfe commit f58941e

File tree

11 files changed

+112
-30
lines changed

11 files changed

+112
-30
lines changed

kafka-streams-framework/build.gradle.kts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,11 @@ tasks.test {
1111
}
1212

1313
dependencies {
14+
constraints {
15+
implementation("org.xerial.snappy:snappy-java:1.1.10.1") {
16+
because("[https://nvd.nist.gov/vuln/detail/CVE-2023-34455] in 'org.apache.kafka:kafka-clients:*' > 'org.xerial.snappy:snappy-java:1.1.8.2'")
17+
}
18+
}
1419
annotationProcessor("org.projectlombok:lombok:1.18.26")
1520
compileOnly("org.projectlombok:lombok:1.18.26")
1621

kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/async/AsyncTransformer.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,13 @@ public AsyncTransformer(
2828
this.executor = executorSupplier.get();
2929
this.pendingFutures = new ArrayBlockingQueue<>(asyncTransformerConfig.getMaxBatchSize());
3030
this.rateLimiter =
31-
RateLimiter.create(1.0 / asyncTransformerConfig.getCommitIntervalMs().toSeconds());
31+
RateLimiter.create(1000.0 / asyncTransformerConfig.getCommitIntervalMs().toMillis());
32+
log.info(
33+
"async transformer config. maxBatchSize: {}, commit rate: {}",
34+
this.pendingFutures.remainingCapacity(),
35+
this.rateLimiter.getRate());
36+
// warmup to prevent commit on first message
37+
rateLimiter.tryAcquire();
3238
}
3339

3440
@Override
@@ -72,7 +78,14 @@ private void processResults() {
7278
// makes sure transformation is complete
7379
future.join();
7480
// another join is needed to make sure downstream forward is also complete
75-
future.thenAccept(result -> result.forEach(kv -> context.forward(kv.key, kv.value))).join();
81+
future
82+
.thenAccept(
83+
result -> {
84+
if (result != null) {
85+
result.forEach(kv -> context.forward(kv.key, kv.value));
86+
}
87+
})
88+
.join();
7689
}
7790
// commit once per batch
7891
context.commit();

kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/async/AsyncTransformerConfig.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ public static AsyncTransformerConfig buildWith(Config config, String transformer
3535
: DEFAULT_ASYNC_TRANSFORMER_COMMIT_INTERVAL;
3636
return new AsyncTransformerConfig(batchSize, commitInterval);
3737
}
38+
3839
return new AsyncTransformerConfig(
3940
DEFAULT_ASYNC_TRANSFORMER_BATCH_SIZE, DEFAULT_ASYNC_TRANSFORMER_COMMIT_INTERVAL);
4041
}

kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/async/ExecutorFactory.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,11 @@
66
import com.google.common.util.concurrent.ThreadFactoryBuilder;
77
import com.typesafe.config.Config;
88
import java.util.concurrent.Executor;
9+
import java.util.concurrent.ExecutorService;
910
import java.util.concurrent.Executors;
1011
import java.util.concurrent.ThreadFactory;
1112
import java.util.function.Supplier;
13+
import org.hypertrace.core.serviceframework.metrics.PlatformMetricsRegistry;
1214

1315
public class ExecutorFactory {
1416
private static Executor executor;
@@ -20,12 +22,20 @@ public static synchronized Supplier<Executor> getExecutorSupplier(Config config)
2022
? config.getInt(ASYNC_EXECUTOR_POOL_SIZE_KEY)
2123
: DEFAULT_ASYNC_EXECUTOR_POOL_SIZE;
2224

23-
ThreadFactory threadFactory =
24-
new ThreadFactoryBuilder()
25-
.setNameFormat("kafka-streams-async-worker-%d")
26-
.setDaemon(true)
27-
.build();
28-
executor = Executors.newFixedThreadPool(poolSize, threadFactory);
25+
if (poolSize > 0) {
26+
ThreadFactory threadFactory =
27+
new ThreadFactoryBuilder()
28+
.setNameFormat("kafka-streams-async-worker-%d")
29+
.setDaemon(true)
30+
.build();
31+
ExecutorService executorSvc = Executors.newFixedThreadPool(poolSize, threadFactory);
32+
PlatformMetricsRegistry.monitorExecutorService(
33+
"kafka-streams-async-pool", executorSvc, null);
34+
executor = executorSvc;
35+
} else {
36+
// direct/sync execution when pool size is explicitly configured to a value <= 0
37+
executor = Runnable::run;
38+
}
2939
}
3040
return () -> executor;
3141
}

kafka-streams-framework/src/test/java/org/hypertrace/core/kafkastreams/framework/SampleAsyncApp.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,8 @@ public String getServiceName() {
6767
class SlowTransformer extends AsyncTransformer<String, String, String, String> {
6868

6969
public SlowTransformer(
70-
Supplier<Executor> executorSupplier, AsyncTransformerConfig asyncTransformerConfigBuilder) {
71-
super(executorSupplier, asyncTransformerConfigBuilder);
70+
Supplier<Executor> executorSupplier, AsyncTransformerConfig asyncTransformerConfig) {
71+
super(executorSupplier, asyncTransformerConfig);
7272
}
7373

7474
@Override
@@ -79,6 +79,9 @@ protected void doInit(Map<String, Object> appConfigs) {
7979
@SneakyThrows
8080
@Override
8181
public List<KeyValue<String, String>> asyncTransform(String key, String value) {
82+
if (!key.startsWith("key")) {
83+
return null;
84+
}
8285
log.info("transforming - key: {}, value: {}", key, value);
8386
Thread.sleep(25);
8487
return List.of(KeyValue.pair("out:" + key, "out:" + value));

kafka-streams-framework/src/test/java/org/hypertrace/core/kafkastreams/framework/SampleAsyncAppTest.java

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,13 @@ public void setup() {
4545
streamsConfig.putAll(sampleApp.streamsConfig);
4646

4747
td = new TopologyTestDriver(sampleApp.topology, streamsConfig);
48+
49+
inputTopic =
50+
td.createInputTopic(
51+
SampleApp.INPUT_TOPIC, Serdes.String().serializer(), Serdes.String().serializer());
52+
outputTopic =
53+
td.createOutputTopic(
54+
SampleApp.OUTPUT_TOPIC, Serdes.String().deserializer(), Serdes.String().deserializer());
4855
}
4956

5057
@AfterEach
@@ -55,22 +62,17 @@ public void tearDown() {
5562
@SneakyThrows
5663
@Test
5764
public void asyncTransformationTest() {
58-
inputTopic =
59-
td.createInputTopic(
60-
SampleApp.INPUT_TOPIC, Serdes.String().serializer(), Serdes.String().serializer());
61-
outputTopic =
62-
td.createOutputTopic(
63-
SampleApp.OUTPUT_TOPIC, Serdes.String().deserializer(), Serdes.String().deserializer());
64-
6565
assertThat(outputTopic.isEmpty(), is(true));
6666

6767
int batchSize = 1000;
6868
for (int i = 1; i <= batchSize; i++) {
6969
inputTopic.pipeInput("key-" + i, "value-" + i);
7070
}
7171

72+
// sleep time has to be bigger than the commit interval configured for the test
73+
// otw, test will be flaky
7274
Thread.sleep(1000);
73-
inputTopic.pipeInput("final", "final");
75+
inputTopic.pipeInput("key-final", "value-final");
7476
// test ordered processing. mandatory requirement.
7577
for (int i = 1; i <= batchSize; i++) {
7678
assertThat(outputTopic.readValue(), endsWith("value-" + i));
@@ -82,6 +84,21 @@ public void asyncTransformationTest() {
8284
assertThat(outputTopic.isEmpty(), is(true));
8385
}
8486

87+
@SneakyThrows
88+
@Test
89+
public void testUnderlyingReturnsNull() {
90+
assertThat(outputTopic.isEmpty(), is(true));
91+
// null should be handled without erring out.
92+
inputTopic.pipeInput("discarded-key", "discarded-value");
93+
94+
Thread.sleep(1000);
95+
inputTopic.pipeInput("key-final", "value-final");
96+
97+
// read the final record
98+
outputTopic.readKeyValue();
99+
assertThat(outputTopic.isEmpty(), is(true));
100+
}
101+
85102
@Test
86103
public void baseStreamsConfigTest() {
87104
Map<String, Object> baseStreamsConfig = sampleApp.getBaseStreamsConfig();

kafka-streams-framework/src/test/resources/configs/sample-kafka-streams-service/application.conf

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ kafka.streams.config = {
1212
}
1313
transformers {
1414
slow.transformer = {
15-
maxBatchSize = 5120
16-
commitIntervalMs = 10
15+
maxBatchSize = 32
16+
commitIntervalMs = 100
1717
}
1818
}
1919
}

kafka-streams-partitioners/avro-partitioners/build.gradle.kts

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,14 @@ tasks.test {
1111
}
1212

1313
dependencies {
14+
constraints {
15+
implementation("com.fasterxml.jackson.core:jackson-databind:2.15.2")
16+
17+
implementation("org.xerial.snappy:snappy-java:1.1.10.1") {
18+
because("[https://nvd.nist.gov/vuln/detail/CVE-2023-34455] in 'org.apache.kafka:kafka-clients:*' > 'org.xerial.snappy:snappy-java:1.1.8.2'")
19+
}
20+
}
21+
1422
annotationProcessor("org.projectlombok:lombok:1.18.24")
1523
compileOnly("org.projectlombok:lombok:1.18.24")
1624

@@ -21,10 +29,6 @@ dependencies {
2129
implementation("org.apache.kafka:kafka-streams:7.2.1-ccs")
2230
implementation("org.slf4j:slf4j-api:1.7.36")
2331

24-
constraints {
25-
implementation("com.fasterxml.jackson.core:jackson-databind:2.15.2")
26-
}
27-
2832
testImplementation("org.junit.jupiter:junit-jupiter:5.8.2")
2933
testImplementation("org.junit-pioneer:junit-pioneer:1.7.1")
3034
testImplementation("org.mockito:mockito-core:4.5.1")

kafka-streams-partitioners/weighted-group-partitioner/build.gradle.kts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,14 @@ tasks.test {
1010
}
1111

1212
dependencies {
13+
constraints {
14+
implementation("com.fasterxml.jackson.core:jackson-databind:2.15.2")
15+
16+
implementation("org.xerial.snappy:snappy-java:1.1.10.1") {
17+
because("[https://nvd.nist.gov/vuln/detail/CVE-2023-34455] in 'org.apache.kafka:kafka-clients:*' > 'org.xerial.snappy:snappy-java:1.1.8.2'")
18+
}
19+
}
20+
1321
annotationProcessor("org.projectlombok:lombok:1.18.24")
1422
compileOnly("org.projectlombok:lombok:1.18.24")
1523

kafka-streams-serdes/build.gradle.kts

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,18 @@ tasks.test {
1111
}
1212

1313
dependencies {
14+
constraints {
15+
implementation("com.fasterxml.jackson.core:jackson-databind:2.15.2")
16+
17+
implementation("org.xerial.snappy:snappy-java:1.1.10.1") {
18+
because("[https://nvd.nist.gov/vuln/detail/CVE-2023-34455] in 'org.apache.kafka:kafka-clients:*' > 'org.xerial.snappy:snappy-java:1.1.8.2'")
19+
}
20+
}
21+
1422
api("org.apache.kafka:kafka-clients:7.2.1-ccs")
1523
api("org.apache.avro:avro:1.11.1")
1624

1725
testImplementation("org.junit.jupiter:junit-jupiter:5.8.2")
18-
19-
constraints {
20-
implementation("com.fasterxml.jackson.core:jackson-databind:2.15.2")
21-
}
2226
}
2327

2428
// Disabling compatibility check for the test avro definitions.

0 commit comments

Comments
 (0)