Skip to content

Commit 444e10d

Browse files
talatuyarertuyarer
andauthored
Kafka adaptive timeout implementation to handle empty topic cases (apache#29400)
* Kafka adaptive timeout implementation to handle empty topic cases * Format fix with spotless --------- Co-authored-by: tuyarer <[email protected]>
1 parent 0972bc0 commit 444e10d

File tree

1 file changed

+15
-3
lines changed

1 file changed

+15
-3
lines changed

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,9 @@ public long getSplitBacklogBytes() {
357357
*/
358358
private static final Duration KAFKA_POLL_TIMEOUT = Duration.millis(1000);
359359

360-
private static final Duration RECORDS_DEQUEUE_POLL_TIMEOUT = Duration.millis(10);
360+
private Duration recordsDequeuePollTimeout;
361+
private static final Duration RECORDS_DEQUEUE_POLL_TIMEOUT_MIN = Duration.millis(1);
362+
private static final Duration RECORDS_DEQUEUE_POLL_TIMEOUT_MAX = Duration.millis(20);
361363
private static final Duration RECORDS_ENQUEUE_POLL_TIMEOUT = Duration.millis(100);
362364

363365
// Use a separate thread to read Kafka messages. Kafka Consumer does all its work including
@@ -543,6 +545,7 @@ Instant updateAndGetWatermark() {
543545
bytesReadBySplit = SourceMetrics.bytesReadBySplit(splitId);
544546
backlogBytesOfSplit = SourceMetrics.backlogBytesOfSplit(splitId);
545547
backlogElementsOfSplit = SourceMetrics.backlogElementsOfSplit(splitId);
548+
recordsDequeuePollTimeout = Duration.millis(10);
546549
}
547550

548551
private void consumerPollLoop() {
@@ -614,8 +617,7 @@ private void nextBatch() throws IOException {
614617
try {
615618
// poll available records, wait (if necessary) up to the specified timeout.
616619
records =
617-
availableRecordsQueue.poll(
618-
RECORDS_DEQUEUE_POLL_TIMEOUT.getMillis(), TimeUnit.MILLISECONDS);
620+
availableRecordsQueue.poll(recordsDequeuePollTimeout.getMillis(), TimeUnit.MILLISECONDS);
619621
} catch (InterruptedException e) {
620622
Thread.currentThread().interrupt();
621623
LOG.warn("{}: Unexpected", this, e);
@@ -627,9 +629,19 @@ private void nextBatch() throws IOException {
627629
if (consumerPollException.get() != null) {
628630
throw new IOException("Exception while reading from Kafka", consumerPollException.get());
629631
}
632+
if (recordsDequeuePollTimeout.isLongerThan(RECORDS_DEQUEUE_POLL_TIMEOUT_MIN)) {
633+
recordsDequeuePollTimeout = recordsDequeuePollTimeout.minus(Duration.millis(1));
634+
LOG.debug("Reducing poll timeout for reader to " + recordsDequeuePollTimeout.getMillis());
635+
}
630636
return;
631637
}
632638

639+
if (recordsDequeuePollTimeout.isShorterThan(RECORDS_DEQUEUE_POLL_TIMEOUT_MAX)) {
640+
recordsDequeuePollTimeout = recordsDequeuePollTimeout.plus(Duration.millis(1));
641+
LOG.debug("Increasing poll timeout for reader to " + recordsDequeuePollTimeout.getMillis());
642+
LOG.debug("Record count: " + records.count());
643+
}
644+
633645
partitionStates.forEach(p -> p.recordIter = records.records(p.topicPartition).iterator());
634646

635647
// cycle through the partitions in order to interleave records from each.

0 commit comments

Comments
 (0)