Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17505: New consumer seekToBeginning/End should run in background thread #17230

Open
wants to merge 14 commits into
base: trunk
Choose a base branch
from

Conversation

TaiJuWu
Copy link
Contributor

@TaiJuWu TaiJuWu commented Sep 19, 2024

Jira: https://issues.apache.org/jira/browse/KAFKA-17505

Move seekToBeginning/End run in background thread.
The app thread will be blocked until background finish this operation in order to avoid race issue.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@frankvicky frankvicky added consumer ctr Consumer Threading Refactor (KIP-848) labels Sep 19, 2024
Collection<TopicPartition> parts = partitions.isEmpty() ? subscriptions.assignedPartitions() : partitions;
subscriptions.requestOffsetReset(parts, OffsetResetStrategy.LATEST);
cachedSubscriptionHasAllFetchPositions = false;
applicationEventHandler.add(new ResetOffsetEvent(partitions, OffsetResetStrategy.LATEST));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seekToBeginning and seekToEnd don't wait the completion, but seek does. not sure why those similar methods have different behavior. The doc of seek declares the effect will be applied on next poll, and ClassicKafkaConsumer#seek does not wait also. IMHO, AsyncKafkaConsumer#seek should use add rather than addAndGet.

https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L792

@kirktrue @lianetm WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

@lianetm lianetm Sep 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey here, sorry for the late reply. I agree that these 3 methods should have similar behaviour, but I wonder if the case is that they should all use addAndGet? My concern is, when seeking (either of the 3 methods), we do expect that as soon as they complete, the position is updated so that other funcs will see it (ex. currentLag, poll). We would be not respecting that anymore if we use add. seek could complete without having updated the offsets right?

From the seek contract, it states that "Overrides the fetch offsets that the consumer will use on the next {@link #poll(Duration) poll(timeout)}", so we need to:

  1. override fetch offsets (this requires addAndGet I expect)
  2. make sure they are available on the next poll (again addAndGet on seek, otherwise consumer.seek + consumer.poll may not find the offsets if the poll happens when the seek with add is still being processed in the background right?). Also calls like currentLag after that seek + add could not have the info they need I guess.

Thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lianetm thanks for the sharing. you are right, those seek methods need to wait the event gets completed to avoid race condition. +1 to use addAndGet on seek, seekToBeginning, and seekToEnd. I will close https://issues.apache.org/jira/browse/KAFKA-17586

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@TaiJuWu Could you please use addAndGet instead?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another option we've used elsewhere is to stash a reference to the ResetOffsetsEvent in the AsyncKafkaConsumer after it's enqueued so it can be checked later (i.e. in poll()).

I will admit I don't like the idea 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @chia7712 @lianetm , thanks for your review and sharing.
I will update this PR according your suggestion.
Thanks a lot.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed another difference between seek(), seekToBeginning(), and seekToEnd()seek() sets the position and validates it (if possible) while the other two forms request the positions be reset. So even if the code in seekToBeginning() and seekToEnd() is switched to addAndGet(), it doesn't guarantee that the positions have been reset when the method exits, does it?

Copy link
Contributor Author

@TaiJuWu TaiJuWu Sep 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed another difference between seek(), seekToBeginning(), and seekToEnd()seek() sets the position and validates it (if possible) while the other two forms request the positions be reset. So even if the code in seekToBeginning() and seekToEnd() is switched to addAndGet(), it doesn't guarantee that the positions have been reset when the method exits, does it?

Hi @kirktrue , you are right and thanks for your detailed review.
I think seekToBeginning and seekToEnd can be guarantee by follow steps if we leverage offsetManager:
updateFetchPositions[1] -> updatePositionsWithOffsets[2] -> initWithPartitionOffsetsIfNeeded[3] -> resetPositionsIfNeeded[4] -> sendListOffsetsRequestsAndResetPositions[5]

@TaiJuWu TaiJuWu marked this pull request as ready for review September 21, 2024 05:50
Copy link
Member

@FrankYang0529 FrankYang0529 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @TaiJuWu, thanks for the PR. Leave some minor comments.

@kirktrue kirktrue added the KIP-848 The Next Generation of the Consumer Rebalance Protocol label Sep 26, 2024
Copy link
Collaborator

@kirktrue kirktrue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @TaiJuWu!

I left a few comments, mostly minor tweaks.

Thanks!

Collection<TopicPartition> parts = partitions.isEmpty() ? subscriptions.assignedPartitions() : partitions;
subscriptions.requestOffsetReset(parts, OffsetResetStrategy.LATEST);
cachedSubscriptionHasAllFetchPositions = false;
applicationEventHandler.add(new ResetOffsetEvent(partitions, OffsetResetStrategy.LATEST));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another option we've used elsewhere is to stash a reference to the ResetOffsetsEvent in the AsyncKafkaConsumer after it's enqueued so it can be checked later (i.e. in poll()).

I will admit I don't like the idea 🤔

Collection<TopicPartition> parts = partitions.isEmpty() ? subscriptions.assignedPartitions() : partitions;
subscriptions.requestOffsetReset(parts, OffsetResetStrategy.LATEST);
cachedSubscriptionHasAllFetchPositions = false;
applicationEventHandler.add(new ResetOffsetEvent(partitions, OffsetResetStrategy.LATEST));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed another difference between seek(), seekToBeginning(), and seekToEnd()seek() sets the position and validates it (if possible) while the other two forms request the positions be reset. So even if the code in seekToBeginning() and seekToEnd() is switched to addAndGet(), it doesn't guarantee that the positions have been reset when the method exits, does it?

public void testSeekToBeginning() {
SubscriptionState subscriptions = mock(SubscriptionState.class);
TopicPartition topic = new TopicPartition("test", 0);
consumer = spy(newConsumer(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does consumer need to be wrapped as a spy in this case? It doesn't look like we're verifying or modifying it in any way 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, we do not this, thanks.

@TaiJuWu
Copy link
Contributor Author

TaiJuWu commented Sep 30, 2024

Hi @kirktrue @lianetm @chia7712,

I have updated the seekToBeginning and seekToEnd methods to use addAndGet. Please take a look.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ci-approved clients consumer ctr Consumer Threading Refactor (KIP-848) KIP-848 The Next Generation of the Consumer Rebalance Protocol
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants