Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17154: New consumer subscribe may join group without a call to consumer.poll #17165

Merged
merged 1 commit into from
Sep 26, 2024

Conversation

FrankYang0529
Copy link
Member

@FrankYang0529 FrankYang0529 commented Sep 11, 2024

To fulfill "rebalances will only occur during an active call to KafkaConsumer#poll(Duration)", we should not send JoinRequest after AsyncKafkaConsumer#subscribe. Add a flag subscriptionUpdated to AbstractMembershipManager#onSubscriptionUpdated. When calling AsyncKafkaConsumer#subscribe, set the flag to true. When calling AsyncKafkaConsumer#poll, send the JoinRequest if the flag is true.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@TaiJuWu
Copy link
Contributor

TaiJuWu commented Sep 12, 2024

Hi @FrankYang0529 , I opened a PR for this behavior.
Could you take a look? Thanks a lot!

#17175

@FrankYang0529
Copy link
Member Author

I opened a PR for this behavior. Could you take a look? Thanks a lot!

#17175

Hi @TaiJuWu, thanks for the PR. I also have a similar test case testPollSendRequestForRebalance. We can also have your test after this PR is merged.

@TaiJuWu
Copy link
Contributor

TaiJuWu commented Sep 13, 2024

I opened a PR for this behavior. Could you take a look? Thanks a lot!
#17175

Hi @TaiJuWu, thanks for the PR. I also have a similar test case testPollSendRequestForRebalance. We can also have your test after this PR is merged.

Hi @FrankYang0529 , I will close my PR because they do same thing and I leave a comment for testPollSendRequestForRebalance.

client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node), node);
KafkaConsumer<String, String> consumer = newConsumer(groupProtocol, time, client, subscription, metadata,
assignor, true, groupInstanceId);
consumer.subscribe(singletonList(topic));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we need to make application threadsleep here to make sure network thread does not send join request.
WDYT?

Copy link
Member Author

@FrankYang0529 FrankYang0529 Sep 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @TaiJuWu, application and network are different threads. I'm not sure why making application thread sleep can let network thread doesn't send join request. Also , this test would like to make sure KafkaConsumer#subscribe doesn't send join request, so we should not have other operations to make the result. Could you elaborate more on your idea? Thank you.

Copy link
Contributor

@TaiJuWu TaiJuWu Sep 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the confusion.

The reason for making the application sleep here is to give the network enough time to send the join request even without poll. This helps to reproduce the bug more easily and consistently.

Additionally, to ensure the behavior we’ve promised in the documentation, the two operations (subscribe and poll) should not be too close together in the timeline in this test. Does that make sense?

Copy link
Member Author

@FrankYang0529 FrankYang0529 Sep 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @TaiJuWu, thanks for the suggestion. It's hard to say waiting how much time can make sure KafkaConsumer#subscribe doesn't send a join request. I add two unit tests to check the behavior.

In ApplicationEventProcessorTest#testSubscriptionChangeEvent, we check ApplicationEventProcessor#process(SubscriptionChangeEvent) doesn't call ConsumerMembershipManager#maybeJoinGroup.
In ConsumerMembershipManagerTest#testOnSubscriptionUpdatedTransitionsToJoiningOnlyIfNotInGroup, we check ConsumerMembershipManager#onSubscriptionUpdated keeps member state as UNSUBSCRIBED and calling transitionToJoining in maybeJoinGroup.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your explanation.

@lianetm
Copy link
Collaborator

lianetm commented Sep 16, 2024

Hey @FrankYang0529, so seems this draft is the PR we're keeping right? Let me know when it's ready for review and I'll be happy to take a look. Thanks!

@lianetm lianetm added consumer KIP-848 The Next Generation of the Consumer Rebalance Protocol ctr Consumer Threading Refactor (KIP-848) labels Sep 16, 2024
@FrankYang0529
Copy link
Member Author

Hi @lianetm, yes, the PR is ready for review. Thank you.

@FrankYang0529 FrankYang0529 marked this pull request as ready for review September 17, 2024 02:37
@FrankYang0529 FrankYang0529 force-pushed the KAFKA-17154 branch 2 times, most recently from 91f5a98 to 804f17c Compare September 17, 2024 04:06
Copy link
Collaborator

@lianetm lianetm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @FrankYang0529 , thanks for the patch! Left some comments for consideration.

Comment on lines 1622 to 1623
membershipManager.onSubscriptionUpdated();
assertFalse(membershipManager.subscriptionUpdated());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these 2 lines are a bit confusing (we trigger "onSubscriptionUpdated" and expect "subscriptionUpdated" to be false). I would say that the boolean var name is what we should review, because it truly indicates if the member should join (related to comment on the membershipManager.subscriptionUpdated var)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change variable from subscriptionUpdated to shouldTransitionToJoining and add more comments for it. Thank you.

@@ -1617,12 +1617,25 @@ public void testRevokePartitionsUsesTopicNamesLocalCacheWhenMetadataNotAvailable
}

@Test
public void testOnSubscriptionUpdatedTransitionsToJoiningOnlyIfNotInGroup() {
public void testOnSubscriptionUpdatedDoNothingIfInGroup() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about testSubscriptionUpdateDoesNotTransitionToJoining (or similar)? (not transitioning to joining is really the core bit we were missing and are fixing/test here..and is actually consistent with the name you have for the similar test right below)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion. I think whether the member is in the group is important as well, because it determines whether to change shouldTransitionToJoining variable. I update the test case name to testOnSubscriptionUpdatedDoesNotSetShouldTransitionToJoiningIfInGroup and another test case below to testOnSubscriptionUpdatedSetShouldTransitionToJoiningIfNotInGroup. Do you think it makes sense? Thank you.

public void testOnSubscriptionUpdatedTransitionsToJoiningOnlyIfNotInGroup() {
ConsumerMembershipManager membershipManager = createMembershipManager(null);
assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
membershipManager.onSubscriptionUpdated();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we verify here that there was no transitionToJoining? (to be sure that the transition is only triggered after the call to maybeJoinGroup)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, added it. Thanks.

@@ -3443,6 +3443,34 @@ public void testPreventMultiThread(GroupProtocol groupProtocol) throws Interrupt
}
}

@ParameterizedTest
@EnumSource(value = GroupProtocol.class)
public void testPollSendRequestForRebalance(GroupProtocol groupProtocol) throws InterruptedException {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo testPollSendsRequestForRebalance? (and maybe clearer testPollSendsRequestToJoin?)

@@ -189,6 +191,9 @@ public abstract class AbstractMembershipManager<R extends AbstractResponse> impl

private final Time time;

// AtomicBoolean to track if the subscription has been updated
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is truly more to know if we should join after the subscription has been updated right? (true if not in group)

@FrankYang0529 FrankYang0529 force-pushed the KAFKA-17154 branch 2 times, most recently from a01febd to 23d386f Compare September 18, 2024 15:38
@FrankYang0529
Copy link
Member Author

Hi @lianetm, I found that PlaintextConsumerSubscriptionTest#testSubscribeInvalidTopic is flaky in this PR, so I will convert to draft first. Thanks.

@FrankYang0529 FrankYang0529 marked this pull request as draft September 19, 2024 12:23
@lianetm
Copy link
Collaborator

lianetm commented Sep 19, 2024

Hey @FrankYang0529, just in case it helps, we have this https://issues.apache.org/jira/browse/KAFKA-17286 related to that flaky test with some ideas in the comments.

@FrankYang0529
Copy link
Member Author

Hey @FrankYang0529, just in case it helps, we have this https://issues.apache.org/jira/browse/KAFKA-17286 related to that flaky test with some ideas in the comments.

Yes, that helps. I found that both in trunk and this PR, the failed result is reproducible by adding Thread.sleep(1000) at the end of PlaintextConsumerSubscriptionTest#testSubscribeInvalidTopic.

Do you think we let https://issues.apache.org/jira/browse/KAFKA-17286 handle this flaky? Or we should solve it in this PR? Thanks.

@lianetm
Copy link
Collaborator

lianetm commented Sep 23, 2024

Hey @FrankYang0529 , sorry for the late reply, I was traveling last week but I'm back, so will be taking another look at this. You can mark it as ready, we can leave the fix for the flaky test separate given that we already understand the root cause not related to this change. I'll take another look at this one between today and tomorrow. Thanks!

@FrankYang0529 FrankYang0529 marked this pull request as ready for review September 23, 2024 11:55
Copy link
Collaborator

@lianetm lianetm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @FrankYang0529, I took another full pass and left some comments for consideration. Thanks!

@@ -466,6 +474,17 @@ String memberIdInfoForLog() {
*/
public void onSubscriptionUpdated() {
if (state == MemberState.UNSUBSCRIBED) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this check is still needed here? It made sense before because we were doing the actual join, but now we're just setting a flag (that looks more like a "subscriptionUpdated" var now). Then on the maybeJoinGroup is where we need to check that the state if UNSUBSCRIBED and subscriptionUpdated then join. Makes sense?

@@ -246,7 +252,9 @@ private void process(final SubscriptionChangeEvent ignored) {
*/
private void process(final UnsubscribeEvent event) {
if (requestManagers.consumerHeartbeatRequestManager.isPresent()) {
System.out.println("UnsubscribeEvent: " + event);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets remove this please

CompletableFuture<Void> future = requestManagers.consumerHeartbeatRequestManager.get().membershipManager().leaveGroup();
System.out.println("UnsubscribeEvent: " + future.isCompletedExceptionally());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@@ -244,6 +244,7 @@ class PlaintextConsumerSubscriptionTest extends AbstractConsumerTest {
}, waitTimeMs = 5000, msg = "An InvalidTopicException should be thrown.")

assertEquals(s"Invalid topics: [${invalidTopicName}]", exception.getMessage)
Thread.sleep(1000)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove?

@@ -524,7 +543,7 @@ public void transitionToJoining() {
* to leave the group has been sent out.
*/
public CompletableFuture<Void> leaveGroup() {
if (isNotInGroup()) {
if (isNotInGroup() || state == MemberState.JOINING) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite get how this change relates to this PR, why is it that we need this here?

Also, this is effectively ignoring the leave group (unsubscribe) if JOINING, which I would expect is not right (the member will remain JOINING and may even become STABLE, never leave). Before this change, JOINING + consumer.unsubscribe => LEAVING (member would run the full leave flow and attempt to send the leave HB). With this change, JOINING + consumer.unsubscribe => still JOINING. Is that the intention?

I know that the leave while joining has challenges to solve so that it can be processed correctly (KIP-1082), but I would expect that we keep the intention we had here, even after the KIP-1082 fixes: we should attempt to leave the group when there's a call to unsubscribe while waiting for the join response (JOINING) because the broker may have already processed the join.

@FrankYang0529
Copy link
Member Author

Hi @lianetm, I'm sorry that there was a testing commit. I've removed it. Also, I change shouldTransitionToJoining to hasSubscriptionUpdated and set hasSubscriptionUpdated to true in AbstractMembershipManager#hasSubscriptionUpdated without checking subscription state.

Copy link
Collaborator

@lianetm lianetm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @FrankYang0529 , thanks for the updates. Just some minor comments left.

* AtomicBoolean to track whether the subscription is updated.
* If it's true and subscription state is UNSUBSCRIBED, the next {@link #maybeJoinGroup()} will change member state to JOINING.
*/
private final AtomicBoolean hasSubscriptionUpdated = new AtomicBoolean(false);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: what about simply subscriptionUpdated ...to track whether the subscription has been updated

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, update the variable and function to subscriptionUpdated. Thank you.

@@ -1617,10 +1617,25 @@ public void testRevokePartitionsUsesTopicNamesLocalCacheWhenMetadataNotAvailable
}

@Test
public void testOnSubscriptionUpdatedTransitionsToJoiningOnlyIfNotInGroup() {
public void testOnSubscriptionUpdatedDoesNotSetShouldTransitionToJoiningIfInGroup() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is no "shouldTransitionToJoining" var/concept anymore, so should we update this ? maybe simply ...DoesNotTransitionToJoiningIfInGroup

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the test case name to testOnSubscriptionUpdatedDoesNotTransitionToJoiningIfInGroup.

}

@Test
public void testOnSubscriptionUpdatedSetShouldTransitionToJoiningIfNotInGroup() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

testOnSubscriptionUpdatedTransitionsToJoiningIOnPollfNotInGroup?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed the test case name to testOnSubscriptionUpdatedTransitionsToJoiningOnPollIfNotInGroup. Thanks.

@@ -1137,10 +1137,25 @@ public void testRevokePartitionsUsesTopicNamesLocalCacheWhenMetadataNotAvailable
}

@Test
public void testOnSubscriptionUpdatedTransitionsToJoiningOnlyIfNotInGroup() {
public void testOnSubscriptionUpdatedDoesNotSetShouldTransitionToJoiningIfInGroup() {
Copy link
Collaborator

@lianetm lianetm Sep 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's make sure to align these with the names in the consumer tests if they change with the comments above

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, align test case names between ConsumerMembershipManagerTest and ShareMembershipManagerTest.


@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testSubscriptionChangeEvent(boolean withGroupId) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say this test only makes sense withGroupId=true right? if there is no groupId, the processor will have null HBMgr, MembershipMgr (there cannot be a SubscriptionChange event without a groupId, the api call to subscribe requires groupId)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I leave withGroupId=true case here. Thank you.

Copy link
Collaborator

@lianetm lianetm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @FrankYang0529 , could you please update the PR description where it says:

When calling AsyncKafkaConsumer#subscribe, send the JoinRequest if the flag is true.

I guess you meant when calling poll transition to joining if the subscription has been updated?

With that (and the last nit up to you) this looks good to me. Thanks!

* from the {@link #onSubscriptionUpdated} to fulfill the requirement of the "rebalances will only occur during an
* active call to {@link org.apache.kafka.clients.consumer.KafkaConsumer#poll(Duration)}"
*/
public void maybeJoinGroup() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: what do you think about calling this something like onConsumerPoll? Just to make the func self explanatory (on poll -> transition to joining), no need to find usages to understand what this is all about. No strong feeling though, up to you.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion. I updated PR description and change maybeJoinGroup to onConsumerPoll.

Copy link
Collaborator

@lianetm lianetm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @FrankYang0529! LGTM.

@lianetm lianetm removed the core Kafka Broker label Sep 26, 2024
@lianetm
Copy link
Collaborator

lianetm commented Sep 26, 2024

FYI, I filed https://issues.apache.org/jira/browse/KAFKA-17623 for a flaky consumer integration test that I noticed here, has been flaky for a while.

@FrankYang0529
Copy link
Member Author

FYI, I filed https://issues.apache.org/jira/browse/KAFKA-17623 for a flaky consumer integration test that I noticed here, has been flaky for a while.

Okay. I will take a look. Thanks for filing the issue.

@lianetm lianetm merged commit 4e52a12 into apache:trunk Sep 26, 2024
7 of 9 checks passed
@chia7712
Copy link
Contributor

This PR causes the following build error:

/home/chia7712/project/kafka/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:3500: error: method requestGenerated(MockClient,ApiKeys) is already defined in class KafkaConsumerTest
    private boolean requestGenerated(MockClient client, ApiKeys apiKey) {

@lianetm
Copy link
Collaborator

lianetm commented Sep 26, 2024

uhm builds completed successfully but I guess it conflicted with a recent merge. Checking now and will open PR right away. Thanks @chia7712 !

@lianetm
Copy link
Collaborator

lianetm commented Sep 26, 2024

Removing dup method here #17291

@mumrah
Copy link
Contributor

mumrah commented Sep 26, 2024

Once ASF signs off on the merge queue, we can completely avoid this class of failure on trunk. According to Infra team, we are a month or two away from being able to enable it.

@lianetm
Copy link
Collaborator

lianetm commented Sep 26, 2024

That will be helpful, it was really my miss not asking to have this rebased one last time.

@FrankYang0529 FrankYang0529 deleted the KAFKA-17154 branch September 27, 2024 00:16
bbejeck pushed a commit to bbejeck/kafka that referenced this pull request Sep 28, 2024
airlock-confluentinc bot pushed a commit to confluentinc/kafka that referenced this pull request Sep 30, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
clients consumer ctr Consumer Threading Refactor (KIP-848) KIP-848 The Next Generation of the Consumer Rebalance Protocol
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants