diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 7c06057218c6..575181cab70c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -833,7 +833,7 @@ public void seekToBeginning(Collection partitions) { Timer timer = time.timer(defaultApiTimeoutMs); ResetOffsetEvent resetOffsetEvent = new ResetOffsetEvent(partitions, OffsetResetStrategy.EARLIEST, calculateDeadlineMs(timer)); - cachedSubscriptionHasAllFetchPositions = applicationEventHandler.addAndGet(resetOffsetEvent); + applicationEventHandler.addAndGet(resetOffsetEvent); } finally { release(); } @@ -849,7 +849,7 @@ public void seekToEnd(Collection partitions) { Timer timer = time.timer(defaultApiTimeoutMs); ResetOffsetEvent resetOffsetEvent = new ResetOffsetEvent(partitions, OffsetResetStrategy.LATEST, calculateDeadlineMs(timer)); - cachedSubscriptionHasAllFetchPositions = applicationEventHandler.addAndGet(resetOffsetEvent); + applicationEventHandler.addAndGet(resetOffsetEvent); } finally { release(); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java index 89d24b9e4b71..fb9109dd4371 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java @@ -261,11 +261,14 @@ private void process(final UnsubscribeEvent event) { } private void process(final ResetOffsetEvent event) { - Collection parts = event.topicPartitions().isEmpty() ? - subscriptions.assignedPartitions() : event.topicPartitions(); - subscriptions.requestOffsetReset(parts, event.offsetResetStrategy()); - CompletableFuture future = requestManagers.offsetsRequestManager.updateFetchPositions(event.deadlineMs()); - future.whenComplete(complete(event.future())); + try { + Collection parts = event.topicPartitions().isEmpty() ? + subscriptions.assignedPartitions() : event.topicPartitions(); + subscriptions.requestOffsetReset(parts, event.offsetResetStrategy()); + event.future().complete(null); + } catch (Exception e) { + event.future().completeExceptionally(e); + } } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ResetOffsetEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ResetOffsetEvent.java index 2db57c0abf78..b9d363b7fcb7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ResetOffsetEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ResetOffsetEvent.java @@ -29,7 +29,7 @@ * Event to perform {@link AsyncKafkaConsumer#seekToBeginning(Collection)} and {@link AsyncKafkaConsumer#seekToEnd(Collection)} * in the background thread. This can avoid race conditions when subscription state is updated. */ -public class ResetOffsetEvent extends CompletableApplicationEvent { +public class ResetOffsetEvent extends CompletableApplicationEvent { private final Collection topicPartitions;