Skip to content

Commit d33b994

Browse files
authored
Adds Async transformer functionality to kafka streams framework (#57)
1 parent 66f8ece commit d33b994

File tree

10 files changed

+400
-4
lines changed

10 files changed

+400
-4
lines changed

kafka-streams-framework/build.gradle.kts

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ tasks.test {
1111
}
1212

1313
dependencies {
14+
annotationProcessor("org.projectlombok:lombok:1.18.26")
15+
compileOnly("org.projectlombok:lombok:1.18.26")
16+
1417
api(project(":kafka-streams-serdes"))
1518
api("org.apache.kafka:kafka-streams:7.2.1-ccs")
1619
api("io.confluent:kafka-streams-avro-serde:7.2.1")
@@ -22,12 +25,14 @@ dependencies {
2225
implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.48")
2326
implementation("org.apache.commons:commons-lang3:3.12.0")
2427

28+
testCompileOnly("org.projectlombok:lombok:1.18.26")
29+
testAnnotationProcessor("org.projectlombok:lombok:1.18.26")
2530
testImplementation("org.apache.kafka:kafka-streams-test-utils:7.2.1-ccs")
26-
testImplementation("org.junit.jupiter:junit-jupiter:5.9.0")
27-
testImplementation("org.junit-pioneer:junit-pioneer:1.7.1")
28-
testImplementation("org.mockito:mockito-core:4.5.1")
31+
testImplementation("org.junit.jupiter:junit-jupiter:5.9.2")
32+
testImplementation("org.junit-pioneer:junit-pioneer:2.0.0")
33+
testImplementation("org.mockito:mockito-core:5.2.0")
2934
testImplementation("org.hamcrest:hamcrest-core:2.2")
30-
testRuntimeOnly("org.apache.logging.log4j:log4j-slf4j-impl:2.17.2")
35+
testRuntimeOnly("org.apache.logging.log4j:log4j-slf4j-impl:2.20.0")
3136
}
3237

3338
// Disabling compatibility check for the test avro definitions.
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package org.hypertrace.core.kafkastreams.framework.async;
2+
3+
import org.apache.kafka.streams.processor.api.Processor;
4+
import org.apache.kafka.streams.processor.api.Record;
5+
6+
public class AsyncProcessor implements Processor {
7+
@Override
8+
public void process(Record record) {}
9+
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package org.hypertrace.core.kafkastreams.framework.async;
2+
3+
import com.google.common.util.concurrent.RateLimiter;
4+
import java.util.List;
5+
import java.util.Map;
6+
import java.util.concurrent.ArrayBlockingQueue;
7+
import java.util.concurrent.BlockingQueue;
8+
import java.util.concurrent.CompletableFuture;
9+
import java.util.concurrent.Executor;
10+
import java.util.function.Supplier;
11+
import lombok.SneakyThrows;
12+
import lombok.extern.slf4j.Slf4j;
13+
import org.apache.kafka.streams.KeyValue;
14+
import org.apache.kafka.streams.kstream.Transformer;
15+
import org.apache.kafka.streams.processor.ProcessorContext;
16+
17+
@Slf4j
18+
@SuppressWarnings("UnstableApiUsage")
19+
public abstract class AsyncTransformer<K, V, KOUT, VOUT>
20+
implements Transformer<K, V, KeyValue<KOUT, VOUT>> {
21+
private final Executor executor;
22+
private final BlockingQueue<CompletableFuture<List<KeyValue<KOUT, VOUT>>>> pendingFutures;
23+
private final RateLimiter rateLimiter;
24+
private ProcessorContext context;
25+
26+
public AsyncTransformer(
27+
Supplier<Executor> executorSupplier, AsyncTransformerConfig asyncTransformerConfig) {
28+
this.executor = executorSupplier.get();
29+
this.pendingFutures = new ArrayBlockingQueue<>(asyncTransformerConfig.getMaxBatchSize());
30+
this.rateLimiter =
31+
RateLimiter.create(1.0 / asyncTransformerConfig.getCommitIntervalMs().toSeconds());
32+
}
33+
34+
@Override
35+
public final void init(ProcessorContext context) {
36+
this.context = context;
37+
doInit(context.appConfigs());
38+
}
39+
40+
protected abstract void doInit(Map<String, Object> appConfigs);
41+
42+
public abstract List<KeyValue<KOUT, VOUT>> asyncTransform(K key, V value);
43+
44+
@SneakyThrows
45+
@Override
46+
public final KeyValue<KOUT, VOUT> transform(K key, V value) {
47+
48+
CompletableFuture<List<KeyValue<KOUT, VOUT>>> future =
49+
CompletableFuture.supplyAsync(() -> asyncTransform(key, value), executor);
50+
// with put, thread gets blocked when queue is full. queue consumer runs in this same thread.
51+
pendingFutures.put(future);
52+
53+
// Flush based on size or time - whichever occurs first
54+
// once the queue is full, flush the queue.
55+
if (pendingFutures.remainingCapacity() == 0 || rateLimiter.tryAcquire()) {
56+
log.debug("flush start - queue size before flush: {}", pendingFutures.size());
57+
processResults();
58+
log.debug("flush end - queue size after flush: {}", pendingFutures.size());
59+
}
60+
return null;
61+
}
62+
63+
@Override
64+
public void close() {
65+
// no-op
66+
}
67+
68+
@SneakyThrows
69+
private void processResults() {
70+
while (!pendingFutures.isEmpty()) {
71+
CompletableFuture<List<KeyValue<KOUT, VOUT>>> future = pendingFutures.poll();
72+
// makes sure transformation is complete
73+
future.join();
74+
// another join is needed to make sure downstream forward is also complete
75+
future.thenAccept(result -> result.forEach(kv -> context.forward(kv.key, kv.value))).join();
76+
}
77+
// commit once per batch
78+
context.commit();
79+
}
80+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package org.hypertrace.core.kafkastreams.framework.async;
2+
3+
import static org.hypertrace.core.kafkastreams.framework.async.Constants.DEFAULT_ASYNC_TRANSFORMER_BATCH_SIZE;
4+
import static org.hypertrace.core.kafkastreams.framework.async.Constants.DEFAULT_ASYNC_TRANSFORMER_COMMIT_INTERVAL;
5+
6+
import com.typesafe.config.Config;
7+
import java.time.Duration;
8+
import lombok.Getter;
9+
10+
@Getter
11+
public class AsyncTransformerConfig {
12+
private static final String COMMIT_INTERVAL_CONFIG_KEY = "commitIntervalMs";
13+
private static final String MAX_BATCH_SIZE_CONFIG_KEY = "maxBatchSize";
14+
private static final String TRANSFORMERS_CONFIG_KEY = "async.transformers";
15+
private final int maxBatchSize;
16+
private final Duration commitIntervalMs;
17+
18+
AsyncTransformerConfig(int maxBatchSize, int commitIntervalMs) {
19+
this.maxBatchSize = maxBatchSize;
20+
this.commitIntervalMs = Duration.ofMillis(commitIntervalMs);
21+
}
22+
23+
public static AsyncTransformerConfig buildWith(Config config, String transformerName) {
24+
Config transformersConfig =
25+
config.hasPath(TRANSFORMERS_CONFIG_KEY) ? config.getConfig(TRANSFORMERS_CONFIG_KEY) : null;
26+
if (transformersConfig != null && transformersConfig.hasPath(transformerName)) {
27+
Config transformerConfig = transformersConfig.getConfig(transformerName);
28+
int batchSize =
29+
transformerConfig.hasPath(MAX_BATCH_SIZE_CONFIG_KEY)
30+
? transformerConfig.getInt(MAX_BATCH_SIZE_CONFIG_KEY)
31+
: DEFAULT_ASYNC_TRANSFORMER_BATCH_SIZE;
32+
int commitInterval =
33+
transformerConfig.hasPath(COMMIT_INTERVAL_CONFIG_KEY)
34+
? transformerConfig.getInt(COMMIT_INTERVAL_CONFIG_KEY)
35+
: DEFAULT_ASYNC_TRANSFORMER_COMMIT_INTERVAL;
36+
return new AsyncTransformerConfig(batchSize, commitInterval);
37+
}
38+
return new AsyncTransformerConfig(
39+
DEFAULT_ASYNC_TRANSFORMER_BATCH_SIZE, DEFAULT_ASYNC_TRANSFORMER_COMMIT_INTERVAL);
40+
}
41+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package org.hypertrace.core.kafkastreams.framework.async;
2+
3+
public class Constants {
4+
public static int DEFAULT_ASYNC_EXECUTOR_POOL_SIZE = 16;
5+
public static int DEFAULT_ASYNC_TRANSFORMER_BATCH_SIZE = 5120;
6+
public static int DEFAULT_ASYNC_TRANSFORMER_COMMIT_INTERVAL = 5000;
7+
public static String ASYNC_EXECUTOR_POOL_SIZE_KEY = "async.executors.maxPoolSize";
8+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package org.hypertrace.core.kafkastreams.framework.async;
2+
3+
import static org.hypertrace.core.kafkastreams.framework.async.Constants.ASYNC_EXECUTOR_POOL_SIZE_KEY;
4+
import static org.hypertrace.core.kafkastreams.framework.async.Constants.DEFAULT_ASYNC_EXECUTOR_POOL_SIZE;
5+
6+
import com.google.common.util.concurrent.ThreadFactoryBuilder;
7+
import com.typesafe.config.Config;
8+
import java.util.concurrent.Executor;
9+
import java.util.concurrent.Executors;
10+
import java.util.concurrent.ThreadFactory;
11+
import java.util.function.Supplier;
12+
13+
public class ExecutorFactory {
14+
private static Executor executor;
15+
16+
public static synchronized Supplier<Executor> getExecutorSupplier(Config config) {
17+
if (executor == null) {
18+
int poolSize =
19+
config.hasPath(ASYNC_EXECUTOR_POOL_SIZE_KEY)
20+
? config.getInt(ASYNC_EXECUTOR_POOL_SIZE_KEY)
21+
: DEFAULT_ASYNC_EXECUTOR_POOL_SIZE;
22+
23+
ThreadFactory threadFactory =
24+
new ThreadFactoryBuilder()
25+
.setNameFormat("kafka-streams-async-worker-%d")
26+
.setDaemon(true)
27+
.build();
28+
executor = Executors.newFixedThreadPool(poolSize, threadFactory);
29+
}
30+
return () -> executor;
31+
}
32+
}
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
package org.hypertrace.core.kafkastreams.framework;
2+
3+
import com.typesafe.config.Config;
4+
import java.util.List;
5+
import java.util.Map;
6+
import java.util.concurrent.Executor;
7+
import java.util.function.Supplier;
8+
import lombok.SneakyThrows;
9+
import lombok.extern.slf4j.Slf4j;
10+
import org.apache.kafka.streams.KeyValue;
11+
import org.apache.kafka.streams.StreamsBuilder;
12+
import org.apache.kafka.streams.kstream.KStream;
13+
import org.apache.kafka.streams.processor.api.Processor;
14+
import org.apache.kafka.streams.processor.api.Record;
15+
import org.hypertrace.core.kafkastreams.framework.async.AsyncTransformer;
16+
import org.hypertrace.core.kafkastreams.framework.async.AsyncTransformerConfig;
17+
import org.hypertrace.core.kafkastreams.framework.async.ExecutorFactory;
18+
import org.hypertrace.core.kafkastreams.framework.constants.KafkaStreamsAppConstants;
19+
import org.hypertrace.core.serviceframework.config.ConfigClient;
20+
21+
@Slf4j
22+
public class SampleAsyncApp extends KafkaStreamsApp {
23+
static String INPUT_TOPIC = "input";
24+
static String OUTPUT_TOPIC = "output";
25+
26+
public SampleAsyncApp(ConfigClient configClient) {
27+
super(configClient);
28+
}
29+
30+
@Override
31+
protected void doInitForConsolidatedKStreamApp(Config subTopologyJobConfig) {}
32+
33+
@Override
34+
protected void doCleanUpForConsolidatedKStreamApp() {}
35+
36+
@Override
37+
public StreamsBuilder buildTopology(
38+
Map<String, Object> streamsConfig,
39+
StreamsBuilder streamsBuilder,
40+
Map<String, KStream<?, ?>> sourceStreams) {
41+
KStream<String, String> stream = streamsBuilder.stream(INPUT_TOPIC);
42+
43+
Config kafkaStreamsConfig = configClient.getConfig().getConfig(KAFKA_STREAMS_CONFIG_KEY);
44+
KStream<String, String> transform =
45+
stream.transform(
46+
() ->
47+
new SlowTransformer(
48+
ExecutorFactory.getExecutorSupplier(kafkaStreamsConfig),
49+
AsyncTransformerConfig.buildWith(kafkaStreamsConfig, "slow.transformer")));
50+
transform.process(LoggingProcessor::new);
51+
transform.to(OUTPUT_TOPIC);
52+
return streamsBuilder;
53+
}
54+
55+
@Override
56+
public String getJobConfigKey() {
57+
return KafkaStreamsAppConstants.JOB_CONFIG;
58+
}
59+
60+
@Override
61+
public String getServiceName() {
62+
return "SampleApp";
63+
}
64+
}
65+
66+
@Slf4j
67+
class SlowTransformer extends AsyncTransformer<String, String, String, String> {
68+
69+
public SlowTransformer(
70+
Supplier<Executor> executorSupplier, AsyncTransformerConfig asyncTransformerConfigBuilder) {
71+
super(executorSupplier, asyncTransformerConfigBuilder);
72+
}
73+
74+
@Override
75+
protected void doInit(Map<String, Object> appConfigs) {
76+
// no-op
77+
}
78+
79+
@SneakyThrows
80+
@Override
81+
public List<KeyValue<String, String>> asyncTransform(String key, String value) {
82+
log.info("transforming - key: {}, value: {}", key, value);
83+
Thread.sleep(25);
84+
return List.of(KeyValue.pair("out:" + key, "out:" + value));
85+
}
86+
}
87+
88+
@Slf4j
89+
class LoggingProcessor implements Processor<String, String, Void, Void> {
90+
91+
@Override
92+
public void process(Record<String, String> record) {
93+
log.info("received - key: {}, value: {}", record.key(), record.value());
94+
}
95+
}
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
package org.hypertrace.core.kafkastreams.framework;
2+
3+
import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG;
4+
import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG;
5+
import static org.apache.kafka.streams.StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG;
6+
import static org.apache.kafka.streams.StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG;
7+
import static org.apache.kafka.streams.StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG;
8+
import static org.apache.kafka.streams.StreamsConfig.producerPrefix;
9+
import static org.hamcrest.CoreMatchers.endsWith;
10+
import static org.hamcrest.CoreMatchers.is;
11+
import static org.hamcrest.MatcherAssert.assertThat;
12+
13+
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
14+
import java.util.Map;
15+
import java.util.Properties;
16+
import lombok.SneakyThrows;
17+
import org.apache.kafka.common.serialization.Serdes;
18+
import org.apache.kafka.streams.TestInputTopic;
19+
import org.apache.kafka.streams.TestOutputTopic;
20+
import org.apache.kafka.streams.TopologyTestDriver;
21+
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
22+
import org.hypertrace.core.kafkastreams.framework.rocksdb.BoundedMemoryConfigSetter;
23+
import org.hypertrace.core.serviceframework.config.ConfigClientFactory;
24+
import org.junit.jupiter.api.AfterEach;
25+
import org.junit.jupiter.api.BeforeEach;
26+
import org.junit.jupiter.api.Test;
27+
import org.junitpioneer.jupiter.SetEnvironmentVariable;
28+
29+
@SetEnvironmentVariable(key = "SERVICE_NAME", value = "sample-kafka-streams-service")
30+
public class SampleAsyncAppTest {
31+
32+
private TopologyTestDriver td;
33+
private TestInputTopic<String, String> inputTopic;
34+
private TestOutputTopic<String, String> outputTopic;
35+
36+
private SampleAsyncApp sampleApp;
37+
private Properties streamsConfig;
38+
39+
@BeforeEach
40+
public void setup() {
41+
sampleApp = new SampleAsyncApp(ConfigClientFactory.getClient());
42+
43+
sampleApp.doInit();
44+
streamsConfig = new Properties();
45+
streamsConfig.putAll(sampleApp.streamsConfig);
46+
47+
td = new TopologyTestDriver(sampleApp.topology, streamsConfig);
48+
}
49+
50+
@AfterEach
51+
public void tearDown() {
52+
td.close();
53+
}
54+
55+
@SneakyThrows
56+
@Test
57+
public void asyncTransformationTest() {
58+
inputTopic =
59+
td.createInputTopic(
60+
SampleApp.INPUT_TOPIC, Serdes.String().serializer(), Serdes.String().serializer());
61+
outputTopic =
62+
td.createOutputTopic(
63+
SampleApp.OUTPUT_TOPIC, Serdes.String().deserializer(), Serdes.String().deserializer());
64+
65+
assertThat(outputTopic.isEmpty(), is(true));
66+
67+
int batchSize = 1000;
68+
for (int i = 1; i <= batchSize; i++) {
69+
inputTopic.pipeInput("key-" + i, "value-" + i);
70+
}
71+
72+
Thread.sleep(1000);
73+
inputTopic.pipeInput("final", "final");
74+
// test ordered processing. mandatory requirement.
75+
for (int i = 1; i <= batchSize; i++) {
76+
assertThat(outputTopic.readValue(), endsWith("value-" + i));
77+
}
78+
79+
// read the final record
80+
outputTopic.readKeyValue();
81+
82+
assertThat(outputTopic.isEmpty(), is(true));
83+
}
84+
85+
@Test
86+
public void baseStreamsConfigTest() {
87+
Map<String, Object> baseStreamsConfig = sampleApp.getBaseStreamsConfig();
88+
assertThat(
89+
baseStreamsConfig.get(ROCKSDB_CONFIG_SETTER_CLASS_CONFIG),
90+
is(BoundedMemoryConfigSetter.class));
91+
assertThat(baseStreamsConfig.get(DEFAULT_KEY_SERDE_CLASS_CONFIG), is(SpecificAvroSerde.class));
92+
assertThat(
93+
baseStreamsConfig.get(DEFAULT_VALUE_SERDE_CLASS_CONFIG), is(SpecificAvroSerde.class));
94+
assertThat(
95+
baseStreamsConfig.get(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG),
96+
is(LogAndContinueExceptionHandler.class));
97+
assertThat(baseStreamsConfig.get(producerPrefix(ACKS_CONFIG)), is("all"));
98+
}
99+
}

0 commit comments

Comments
 (0)