1616import lombok .extern .slf4j .Slf4j ;
1717import org .apache .kafka .clients .consumer .Consumer ;
1818import org .apache .kafka .clients .consumer .ConsumerRecords ;
19- import org .apache .kafka .common .PartitionInfo ;
2019import org .apache .kafka .common .TopicPartition ;
2120import org .apache .kafka .common .errors .InterruptException ;
2221import org .hypertrace .core .serviceframework .metrics .PlatformMetricsRegistry ;
2322
2423@ Slf4j
2524class KafkaLiveEventListenerCallable <K , V > implements Callable <Void > {
25+
2626 private static final String EVENT_CONSUMER_ERROR_COUNT = "event.consumer.error.count" ;
27- private final List <TopicPartition > topicPartitions ;
2827 private final Consumer <K , V > kafkaConsumer ;
2928 private final Duration pollTimeout ;
3029 private final Counter errorCounter ;
30+ private final String topic ;
3131 private final ConcurrentLinkedQueue <BiConsumer <? super K , ? super V >> callbacks ;
3232
3333 KafkaLiveEventListenerCallable (
@@ -41,16 +41,9 @@ class KafkaLiveEventListenerCallable<K, V> implements Callable<Void> {
4141 kafkaConfig .hasPath (POLL_TIMEOUT )
4242 ? kafkaConfig .getDuration (POLL_TIMEOUT )
4343 : Duration .ofSeconds (30 );
44- String topic = kafkaConfig .getString (TOPIC_NAME );
44+ this . topic = kafkaConfig .getString (TOPIC_NAME );
4545 this .kafkaConsumer = kafkaConsumer ;
46- // fetch partitions and seek to end of partitions to consume live events
47- List <PartitionInfo > partitions = kafkaConsumer .partitionsFor (topic );
48- topicPartitions =
49- partitions .stream ()
50- .map (p -> new TopicPartition (p .topic (), p .partition ()))
51- .collect (Collectors .toList ());
52- kafkaConsumer .assign (topicPartitions );
53- kafkaConsumer .seekToEnd (topicPartitions );
46+
5447 this .errorCounter =
5548 PlatformMetricsRegistry .registerCounter (
5649 consumerName + "." + EVENT_CONSUMER_ERROR_COUNT , Collections .emptyMap ());
@@ -60,8 +53,20 @@ void addCallback(BiConsumer<? super K, ? super V> callbackFunction) {
6053 callbacks .add (callbackFunction );
6154 }
6255
56+ private List <TopicPartition > initializePartitions () {
57+ // fetch partitions and seek to end of partitions to consume live events
58+ List <TopicPartition > topicPartitions =
59+ kafkaConsumer .partitionsFor (this .topic ).stream ()
60+ .map (p -> new TopicPartition (p .topic (), p .partition ()))
61+ .collect (Collectors .toUnmodifiableList ());
62+ kafkaConsumer .assign (topicPartitions );
63+ kafkaConsumer .seekToEnd (topicPartitions );
64+ return topicPartitions ;
65+ }
66+
6367 @ Override
6468 public Void call () {
69+ List <TopicPartition > topicPartitions = this .initializePartitions ();
6570 do {
6671 try {
6772 ConsumerRecords <K , V > records = kafkaConsumer .poll (pollTimeout );
0 commit comments