Skip to content

Commit df777b6

Browse files
authored
Async processor: minor utility method added (#71)
1 parent 71281bb commit df777b6

File tree

2 files changed

+14
-3
lines changed

2 files changed

+14
-3
lines changed

kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/async/RecordToForward.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,17 @@
1717
public class RecordToForward<K, V> {
1818
@Nullable private String childName;
1919
private Record<K, V> record;
20+
21+
public static <K, V> RecordToForward<K, V> from(String childName, K key, V value) {
22+
return from(childName, key, value, System.currentTimeMillis());
23+
}
24+
25+
public static <K, V> RecordToForward<K, V> from(
26+
String childName, K key, V value, long timestamp) {
27+
return from(childName, new Record<>(key, value, timestamp));
28+
}
29+
30+
public static <K, V> RecordToForward<K, V> from(String childName, Record<K, V> record) {
31+
return new RecordToForward<>(childName, record);
32+
}
2033
}

kafka-streams-framework/src/test/java/org/hypertrace/core/kafkastreams/framework/SampleAsyncApp.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,9 +83,7 @@ public List<RecordToForward<String, String>> asyncProcess(String key, String val
8383
}
8484
log.info("processing - key: {}, value: {}", key, value);
8585
Thread.sleep(25);
86-
return List.of(
87-
new RecordToForward<>(
88-
null, new Record<>("out:" + key, "out:" + value, System.currentTimeMillis())));
86+
return List.of(RecordToForward.from(null, "out:" + key, "out:" + value));
8987
}
9088
}
9189

0 commit comments

Comments
 (0)