Skip to content

Commit cbd7b99

Browse files
Async processor (#70)
1 parent 4c174dc commit cbd7b99

File tree

8 files changed

+173
-155
lines changed

8 files changed

+173
-155
lines changed
Lines changed: 92 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,99 @@
11
package org.hypertrace.core.kafkastreams.framework.async;
22

3+
import com.google.common.util.concurrent.RateLimiter;
4+
import java.util.List;
5+
import java.util.Map;
6+
import java.util.concurrent.ArrayBlockingQueue;
7+
import java.util.concurrent.BlockingQueue;
8+
import java.util.concurrent.CompletableFuture;
9+
import java.util.concurrent.Executor;
10+
import java.util.function.Supplier;
11+
import lombok.SneakyThrows;
12+
import lombok.extern.slf4j.Slf4j;
313
import org.apache.kafka.streams.processor.api.Processor;
14+
import org.apache.kafka.streams.processor.api.ProcessorContext;
415
import org.apache.kafka.streams.processor.api.Record;
516

6-
public class AsyncProcessor implements Processor {
17+
/**
18+
* Async version of the {@link Processor} interface. Offloads the entire processing to a different
19+
* executor(thread pool), collects the records to forward to the next stage of the topology and
20+
* flushes them after the processing is complete
21+
*
22+
* @param <K> the type of input keys
23+
* @param <V> the type of input values
24+
* @param <KOUT> the type of output keys
25+
* @param <VOUT> the type of output values
26+
*/
27+
@Slf4j
28+
@SuppressWarnings("UnstableApiUsage")
29+
public abstract class AsyncProcessor<K, V, KOUT, VOUT> implements Processor<K, V, KOUT, VOUT> {
30+
31+
private final Executor executor;
32+
private final BlockingQueue<CompletableFuture<List<RecordToForward<KOUT, VOUT>>>> pendingFutures;
33+
private final RateLimiter rateLimiter;
34+
private ProcessorContext<KOUT, VOUT> context;
35+
36+
public AsyncProcessor(
37+
Supplier<Executor> executorSupplier, AsyncProcessorConfig asyncProcessorConfig) {
38+
this.executor = executorSupplier.get();
39+
this.pendingFutures = new ArrayBlockingQueue<>(asyncProcessorConfig.getMaxBatchSize());
40+
this.rateLimiter =
41+
RateLimiter.create(1000.0 / asyncProcessorConfig.getCommitIntervalMs().toMillis());
42+
log.info(
43+
"async processor config. maxBatchSize: {}, commit rate: {}",
44+
this.pendingFutures.remainingCapacity(),
45+
this.rateLimiter.getRate());
46+
// warmup to prevent commit on first message
47+
rateLimiter.tryAcquire();
48+
}
49+
750
@Override
8-
public void process(Record record) {}
51+
public final void init(ProcessorContext<KOUT, VOUT> context) {
52+
this.context = context;
53+
doInit(context.appConfigs());
54+
}
55+
56+
protected abstract void doInit(Map<String, Object> appConfigs);
57+
58+
public abstract List<RecordToForward<KOUT, VOUT>> asyncProcess(K key, V value);
59+
60+
@SneakyThrows
61+
@Override
62+
public void process(Record<K, V> record) {
63+
CompletableFuture<List<RecordToForward<KOUT, VOUT>>> future =
64+
CompletableFuture.supplyAsync(() -> asyncProcess(record.key(), record.value()), executor);
65+
// with put, thread gets blocked when queue is full. queue consumer runs in this same thread.
66+
pendingFutures.put(future);
67+
68+
// Flush based on size or time - whichever occurs first
69+
// once the queue is full, flush the queue.
70+
if (pendingFutures.remainingCapacity() == 0 || rateLimiter.tryAcquire()) {
71+
log.debug("flush start - queue size before flush: {}", pendingFutures.size());
72+
processResults();
73+
log.debug("flush end - queue size after flush: {}", pendingFutures.size());
74+
}
75+
}
76+
77+
@SneakyThrows
78+
private void processResults() {
79+
while (!pendingFutures.isEmpty()) {
80+
CompletableFuture<List<RecordToForward<KOUT, VOUT>>> future = pendingFutures.poll();
81+
// makes sure processing is complete
82+
future.join();
83+
// another join is needed to make sure downstream forward is also complete
84+
future
85+
.thenAccept(
86+
result -> {
87+
if (result != null) {
88+
result.forEach(
89+
recordToForward ->
90+
context.forward(
91+
recordToForward.getRecord(), recordToForward.getChildName()));
92+
}
93+
})
94+
.join();
95+
}
96+
// commit once per batch
97+
context.commit();
98+
}
999
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package org.hypertrace.core.kafkastreams.framework.async;
2+
3+
import static org.hypertrace.core.kafkastreams.framework.async.Constants.DEFAULT_ASYNC_PROCESSOR_BATCH_SIZE;
4+
import static org.hypertrace.core.kafkastreams.framework.async.Constants.DEFAULT_ASYNC_PROCESSOR_COMMIT_INTERVAL;
5+
6+
import com.typesafe.config.Config;
7+
import java.time.Duration;
8+
import lombok.Getter;
9+
10+
@Getter
11+
public class AsyncProcessorConfig {
12+
private static final String COMMIT_INTERVAL_CONFIG_KEY = "commitIntervalMs";
13+
private static final String MAX_BATCH_SIZE_CONFIG_KEY = "maxBatchSize";
14+
private static final String PROCESSORS_CONFIG_KEY = "async.processors";
15+
private final int maxBatchSize;
16+
private final Duration commitIntervalMs;
17+
18+
AsyncProcessorConfig(int maxBatchSize, int commitIntervalMs) {
19+
this.maxBatchSize = maxBatchSize;
20+
this.commitIntervalMs = Duration.ofMillis(commitIntervalMs);
21+
}
22+
23+
public static AsyncProcessorConfig buildWith(Config config, String processorName) {
24+
Config processorsConfig =
25+
config.hasPath(PROCESSORS_CONFIG_KEY) ? config.getConfig(PROCESSORS_CONFIG_KEY) : null;
26+
if (processorsConfig != null && processorsConfig.hasPath(processorName)) {
27+
Config processorConfig = processorsConfig.getConfig(processorName);
28+
int batchSize =
29+
processorConfig.hasPath(MAX_BATCH_SIZE_CONFIG_KEY)
30+
? processorConfig.getInt(MAX_BATCH_SIZE_CONFIG_KEY)
31+
: DEFAULT_ASYNC_PROCESSOR_BATCH_SIZE;
32+
int commitInterval =
33+
processorConfig.hasPath(COMMIT_INTERVAL_CONFIG_KEY)
34+
? processorConfig.getInt(COMMIT_INTERVAL_CONFIG_KEY)
35+
: DEFAULT_ASYNC_PROCESSOR_COMMIT_INTERVAL;
36+
return new AsyncProcessorConfig(batchSize, commitInterval);
37+
}
38+
39+
return new AsyncProcessorConfig(
40+
DEFAULT_ASYNC_PROCESSOR_BATCH_SIZE, DEFAULT_ASYNC_PROCESSOR_COMMIT_INTERVAL);
41+
}
42+
}

kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/async/AsyncTransformer.java

Lines changed: 0 additions & 93 deletions
This file was deleted.

kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/async/AsyncTransformerConfig.java

Lines changed: 0 additions & 42 deletions
This file was deleted.

kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/async/Constants.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
public class Constants {
44
public static int DEFAULT_ASYNC_EXECUTOR_POOL_SIZE = 16;
5-
public static int DEFAULT_ASYNC_TRANSFORMER_BATCH_SIZE = 5120;
6-
public static int DEFAULT_ASYNC_TRANSFORMER_COMMIT_INTERVAL = 5000;
5+
public static int DEFAULT_ASYNC_PROCESSOR_BATCH_SIZE = 5120;
6+
public static int DEFAULT_ASYNC_PROCESSOR_COMMIT_INTERVAL = 5000;
77
public static String ASYNC_EXECUTOR_POOL_SIZE_KEY = "async.executors.maxPoolSize";
88
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package org.hypertrace.core.kafkastreams.framework.async;
2+
3+
import javax.annotation.Nullable;
4+
import lombok.AllArgsConstructor;
5+
import lombok.Getter;
6+
import org.apache.kafka.streams.processor.api.Record;
7+
8+
/**
9+
* Encapsulates the {@link Record record} and the {@link String childName} used by the {@link
10+
* AsyncProcessor} to forward messages to the downstream processor
11+
*
12+
* @param <K> type of the record key
13+
* @param <V> type of the record value
14+
*/
15+
@AllArgsConstructor
16+
@Getter
17+
public class RecordToForward<K, V> {
18+
@Nullable private String childName;
19+
private Record<K, V> record;
20+
}

kafka-streams-framework/src/test/java/org/hypertrace/core/kafkastreams/framework/SampleAsyncApp.java

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,14 @@
77
import java.util.function.Supplier;
88
import lombok.SneakyThrows;
99
import lombok.extern.slf4j.Slf4j;
10-
import org.apache.kafka.streams.KeyValue;
1110
import org.apache.kafka.streams.StreamsBuilder;
1211
import org.apache.kafka.streams.kstream.KStream;
1312
import org.apache.kafka.streams.processor.api.Processor;
1413
import org.apache.kafka.streams.processor.api.Record;
15-
import org.hypertrace.core.kafkastreams.framework.async.AsyncTransformer;
16-
import org.hypertrace.core.kafkastreams.framework.async.AsyncTransformerConfig;
14+
import org.hypertrace.core.kafkastreams.framework.async.AsyncProcessor;
15+
import org.hypertrace.core.kafkastreams.framework.async.AsyncProcessorConfig;
1716
import org.hypertrace.core.kafkastreams.framework.async.ExecutorFactory;
17+
import org.hypertrace.core.kafkastreams.framework.async.RecordToForward;
1818
import org.hypertrace.core.kafkastreams.framework.constants.KafkaStreamsAppConstants;
1919
import org.hypertrace.core.serviceframework.config.ConfigClient;
2020

@@ -42,11 +42,11 @@ public StreamsBuilder buildTopology(
4242

4343
Config kafkaStreamsConfig = configClient.getConfig().getConfig(KAFKA_STREAMS_CONFIG_KEY);
4444
KStream<String, String> transform =
45-
stream.transform(
45+
stream.process(
4646
() ->
47-
new SlowTransformer(
47+
new SlowProcessor(
4848
ExecutorFactory.getExecutorSupplier(kafkaStreamsConfig),
49-
AsyncTransformerConfig.buildWith(kafkaStreamsConfig, "slow.transformer")));
49+
AsyncProcessorConfig.buildWith(kafkaStreamsConfig, "slow.processor")));
5050
transform.process(LoggingProcessor::new);
5151
transform.to(OUTPUT_TOPIC);
5252
return streamsBuilder;
@@ -64,11 +64,10 @@ public String getServiceName() {
6464
}
6565

6666
@Slf4j
67-
class SlowTransformer extends AsyncTransformer<String, String, String, String> {
68-
69-
public SlowTransformer(
70-
Supplier<Executor> executorSupplier, AsyncTransformerConfig asyncTransformerConfig) {
71-
super(executorSupplier, asyncTransformerConfig);
67+
class SlowProcessor extends AsyncProcessor<String, String, String, String> {
68+
public SlowProcessor(
69+
Supplier<Executor> executorSupplier, AsyncProcessorConfig asyncProcessorConfig) {
70+
super(executorSupplier, asyncProcessorConfig);
7271
}
7372

7473
@Override
@@ -78,13 +77,15 @@ protected void doInit(Map<String, Object> appConfigs) {
7877

7978
@SneakyThrows
8079
@Override
81-
public List<KeyValue<String, String>> asyncTransform(String key, String value) {
80+
public List<RecordToForward<String, String>> asyncProcess(String key, String value) {
8281
if (!key.startsWith("key")) {
8382
return null;
8483
}
85-
log.info("transforming - key: {}, value: {}", key, value);
84+
log.info("processing - key: {}, value: {}", key, value);
8685
Thread.sleep(25);
87-
return List.of(KeyValue.pair("out:" + key, "out:" + value));
86+
return List.of(
87+
new RecordToForward<>(
88+
null, new Record<>("out:" + key, "out:" + value, System.currentTimeMillis())));
8889
}
8990
}
9091

kafka-streams-framework/src/test/resources/configs/sample-kafka-streams-service/application.conf

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ kafka.streams.config = {
1010
executors {
1111
maxPoolSize = 16
1212
}
13-
transformers {
14-
slow.transformer = {
13+
processors {
14+
slow.processor = {
1515
maxBatchSize = 32
1616
commitIntervalMs = 100
1717
}

0 commit comments

Comments
 (0)