Skip to content

Commit

Permalink
Remove offsetManager in ResetOffsetEvent
Browse files Browse the repository at this point in the history
  • Loading branch information
TaiJuWu committed Sep 29, 2024
1 parent 8e10de1 commit f8379a0
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -833,7 +833,7 @@ public void seekToBeginning(Collection<TopicPartition> partitions) {
Timer timer = time.timer(defaultApiTimeoutMs);
ResetOffsetEvent resetOffsetEvent = new ResetOffsetEvent(partitions, OffsetResetStrategy.EARLIEST,
calculateDeadlineMs(timer));
cachedSubscriptionHasAllFetchPositions = applicationEventHandler.addAndGet(resetOffsetEvent);
applicationEventHandler.addAndGet(resetOffsetEvent);
} finally {
release();
}
Expand All @@ -849,7 +849,7 @@ public void seekToEnd(Collection<TopicPartition> partitions) {
Timer timer = time.timer(defaultApiTimeoutMs);
ResetOffsetEvent resetOffsetEvent = new ResetOffsetEvent(partitions, OffsetResetStrategy.LATEST,
calculateDeadlineMs(timer));
cachedSubscriptionHasAllFetchPositions = applicationEventHandler.addAndGet(resetOffsetEvent);
applicationEventHandler.addAndGet(resetOffsetEvent);
} finally {
release();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,11 +261,14 @@ private void process(final UnsubscribeEvent event) {
}

private void process(final ResetOffsetEvent event) {
Collection<TopicPartition> parts = event.topicPartitions().isEmpty() ?
subscriptions.assignedPartitions() : event.topicPartitions();
subscriptions.requestOffsetReset(parts, event.offsetResetStrategy());
CompletableFuture<Boolean> future = requestManagers.offsetsRequestManager.updateFetchPositions(event.deadlineMs());
future.whenComplete(complete(event.future()));
try {
Collection<TopicPartition> parts = event.topicPartitions().isEmpty() ?
subscriptions.assignedPartitions() : event.topicPartitions();
subscriptions.requestOffsetReset(parts, event.offsetResetStrategy());
event.future().complete(null);
} catch (Exception e) {
event.future().completeExceptionally(e);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
* Event to perform {@link AsyncKafkaConsumer#seekToBeginning(Collection)} and {@link AsyncKafkaConsumer#seekToEnd(Collection)}
* in the background thread. This can avoid race conditions when subscription state is updated.
*/
public class ResetOffsetEvent extends CompletableApplicationEvent<Boolean> {
public class ResetOffsetEvent extends CompletableApplicationEvent<Void> {

private final Collection<TopicPartition> topicPartitions;

Expand Down

0 comments on commit f8379a0

Please sign in to comment.