-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-17505: New consumer seekToBeginning/End should run in background thread #17230
base: trunk
Are you sure you want to change the base?
Conversation
Collection<TopicPartition> parts = partitions.isEmpty() ? subscriptions.assignedPartitions() : partitions; | ||
subscriptions.requestOffsetReset(parts, OffsetResetStrategy.LATEST); | ||
cachedSubscriptionHasAllFetchPositions = false; | ||
applicationEventHandler.add(new ResetOffsetEvent(partitions, OffsetResetStrategy.LATEST)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seekToBeginning
and seekToEnd
don't wait the completion, but seek
does. not sure why those similar methods have different behavior. The doc of seek
declares the effect will be applied on next poll
, and ClassicKafkaConsumer#seek
does not wait also. IMHO, AsyncKafkaConsumer#seek
should use add
rather than addAndGet
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey here, sorry for the late reply. I agree that these 3 methods should have similar behaviour, but I wonder if the case is that they should all use addAndGet
? My concern is, when seeking (either of the 3 methods), we do expect that as soon as they complete, the position is updated so that other funcs will see it (ex. currentLag, poll). We would be not respecting that anymore if we use add. seek could complete without having updated the offsets right?
From the seek contract, it states that "Overrides the fetch offsets that the consumer will use on the next {@link #poll(Duration) poll(timeout)}", so we need to:
- override fetch offsets (this requires
addAndGet
I expect) - make sure they are available on the next poll (again addAndGet on seek, otherwise consumer.seek + consumer.poll may not find the offsets if the poll happens when the
seek
withadd
is still being processed in the background right?). Also calls like currentLag after that seek + add could not have the info they need I guess.
Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lianetm thanks for the sharing. you are right, those seek methods need to wait the event gets completed to avoid race condition. +1 to use addAndGet
on seek
, seekToBeginning
, and seekToEnd
. I will close https://issues.apache.org/jira/browse/KAFKA-17586
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@TaiJuWu Could you please use addAndGet
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another option we've used elsewhere is to stash a reference to the ResetOffsetsEvent
in the AsyncKafkaConsumer
after it's enqueued so it can be checked later (i.e. in poll()
).
I will admit I don't like the idea 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I noticed another difference between seek()
, seekToBeginning()
, and seekToEnd()
—seek()
sets the position and validates it (if possible) while the other two forms request the positions be reset. So even if the code in seekToBeginning()
and seekToEnd()
is switched to addAndGet()
, it doesn't guarantee that the positions have been reset when the method exits, does it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I noticed another difference between
seek()
,seekToBeginning()
, andseekToEnd()
—seek()
sets the position and validates it (if possible) while the other two forms request the positions be reset. So even if the code inseekToBeginning()
andseekToEnd()
is switched toaddAndGet()
, it doesn't guarantee that the positions have been reset when the method exits, does it?
Hi @kirktrue , you are right and thanks for your detailed review.
I think seekToBeginning
and seekToEnd
can be guarantee by follow steps if we leverage offsetManager
:
updateFetchPositions[1] -> updatePositionsWithOffsets[2] -> initWithPartitionOffsetsIfNeeded[3] -> resetPositionsIfNeeded[4] -> sendListOffsetsRequestsAndResetPositions[5]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @TaiJuWu, thanks for the PR. Leave some minor comments.
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
Outdated
Show resolved
Hide resolved
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
Outdated
Show resolved
Hide resolved
e8130ae
to
823951d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ResetOffsetEvent.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ResetOffsetEvent.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ResetOffsetEvent.java
Outdated
Show resolved
Hide resolved
Collection<TopicPartition> parts = partitions.isEmpty() ? subscriptions.assignedPartitions() : partitions; | ||
subscriptions.requestOffsetReset(parts, OffsetResetStrategy.LATEST); | ||
cachedSubscriptionHasAllFetchPositions = false; | ||
applicationEventHandler.add(new ResetOffsetEvent(partitions, OffsetResetStrategy.LATEST)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another option we've used elsewhere is to stash a reference to the ResetOffsetsEvent
in the AsyncKafkaConsumer
after it's enqueued so it can be checked later (i.e. in poll()
).
I will admit I don't like the idea 🤔
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ResetOffsetEvent.java
Show resolved
Hide resolved
Collection<TopicPartition> parts = partitions.isEmpty() ? subscriptions.assignedPartitions() : partitions; | ||
subscriptions.requestOffsetReset(parts, OffsetResetStrategy.LATEST); | ||
cachedSubscriptionHasAllFetchPositions = false; | ||
applicationEventHandler.add(new ResetOffsetEvent(partitions, OffsetResetStrategy.LATEST)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I noticed another difference between seek()
, seekToBeginning()
, and seekToEnd()
—seek()
sets the position and validates it (if possible) while the other two forms request the positions be reset. So even if the code in seekToBeginning()
and seekToEnd()
is switched to addAndGet()
, it doesn't guarantee that the positions have been reset when the method exits, does it?
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ResetOffsetEvent.java
Outdated
Show resolved
Hide resolved
public void testSeekToBeginning() { | ||
SubscriptionState subscriptions = mock(SubscriptionState.class); | ||
TopicPartition topic = new TopicPartition("test", 0); | ||
consumer = spy(newConsumer( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does consumer
need to be wrapped as a spy
in this case? It doesn't look like we're verifying or modifying it in any way 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, we do not this, thanks.
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
Outdated
Show resolved
Hide resolved
32f7cab
to
f8379a0
Compare
1820990
to
2772942
Compare
Jira: https://issues.apache.org/jira/browse/KAFKA-17505
Move
seekToBeginning/End
run in background thread.The app thread will be blocked until background finish this operation in order to avoid race issue.
Committer Checklist (excluded from commit message)