Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17154: New consumer subscribe may join group without a call to consumer.poll #17165

Merged
merged 1 commit into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import org.slf4j.Logger;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -43,6 +44,7 @@
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import static java.util.Collections.unmodifiableList;
Expand Down Expand Up @@ -189,6 +191,12 @@ public abstract class AbstractMembershipManager<R extends AbstractResponse> impl

private final Time time;

/**
* AtomicBoolean to track whether the subscription is updated.
* If it's true and subscription state is UNSUBSCRIBED, the next {@link #onConsumerPoll()} will change member state to JOINING.
*/
private final AtomicBoolean subscriptionUpdated = new AtomicBoolean(false);

/**
* True if the poll timer has expired, signaled by a call to
* {@link #transitionToSendingLeaveGroup(boolean)} with dueToExpiredPollTimer param true. This
Expand Down Expand Up @@ -458,14 +466,24 @@ String memberIdInfoForLog() {
}

/**
* Join the group with the updated subscription, if the member is not part of it yet. If the
* member is already part of the group, this will only ensure that the updated subscription
* Set {@link #subscriptionUpdated} to true to indicate that the subscription has been updated.
* The next {@link #onConsumerPoll()} will join the group with the updated subscription, if the member is not part of it yet.
* If the member is already part of the group, this will only ensure that the updated subscription
* is included in the next heartbeat request.
* <p/>
* Note that list of topics of the subscription is taken from the shared subscription state.
*/
public void onSubscriptionUpdated() {
if (state == MemberState.UNSUBSCRIBED) {
subscriptionUpdated.compareAndSet(false, true);
}

/**
* Join the group if the member is not part of it yet. This function separates {@link #transitionToJoining}
* from the {@link #onSubscriptionUpdated} to fulfill the requirement of the "rebalances will only occur during an
* active call to {@link org.apache.kafka.clients.consumer.KafkaConsumer#poll(Duration)}"
*/
public void onConsumerPoll() {
if (subscriptionUpdated.compareAndSet(true, false) && state == MemberState.UNSUBSCRIBED) {
transitionToJoining();
}
}
Expand Down Expand Up @@ -1408,4 +1426,11 @@ Optional<LocalAssignment> updateWith(Map<Uuid, SortedSet<Integer>> assignment) {
return Optional.of(new LocalAssignment(nextLocalEpoch, assignment));
}
}

/*
* Visible for testing.
*/
boolean subscriptionUpdated() {
return subscriptionUpdated.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,15 @@ public void process(ApplicationEvent event) {
private void process(final PollEvent event) {
if (requestManagers.commitRequestManager.isPresent()) {
requestManagers.commitRequestManager.ifPresent(m -> m.updateAutoCommitTimer(event.pollTimeMs()));
requestManagers.consumerHeartbeatRequestManager.ifPresent(hrm -> hrm.resetPollTimer(event.pollTimeMs()));
requestManagers.consumerHeartbeatRequestManager.ifPresent(hrm -> {
hrm.membershipManager().onConsumerPoll();
hrm.resetPollTimer(event.pollTimeMs());
});
} else {
requestManagers.shareHeartbeatRequestManager.ifPresent(hrm -> hrm.resetPollTimer(event.pollTimeMs()));
requestManagers.shareHeartbeatRequestManager.ifPresent(hrm -> {
hrm.membershipManager().onConsumerPoll();
hrm.resetPollTimer(event.pollTimeMs());
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3443,6 +3443,34 @@ public void testPreventMultiThread(GroupProtocol groupProtocol) throws Interrupt
}
}

@ParameterizedTest
@EnumSource(value = GroupProtocol.class)
public void testPollSendsRequestToJoin(GroupProtocol groupProtocol) throws InterruptedException {
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
initMetadata(client, Collections.singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);

client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node), node);
KafkaConsumer<String, String> consumer = newConsumer(groupProtocol, time, client, subscription, metadata,
assignor, true, groupInstanceId);
consumer.subscribe(singletonList(topic));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we need to make application threadsleep here to make sure network thread does not send join request.
WDYT?

Copy link
Member Author

@FrankYang0529 FrankYang0529 Sep 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @TaiJuWu, application and network are different threads. I'm not sure why making application thread sleep can let network thread doesn't send join request. Also , this test would like to make sure KafkaConsumer#subscribe doesn't send join request, so we should not have other operations to make the result. Could you elaborate more on your idea? Thank you.

Copy link
Contributor

@TaiJuWu TaiJuWu Sep 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the confusion.

The reason for making the application sleep here is to give the network enough time to send the join request even without poll. This helps to reproduce the bug more easily and consistently.

Additionally, to ensure the behavior we’ve promised in the documentation, the two operations (subscribe and poll) should not be too close together in the timeline in this test. Does that make sense?

Copy link
Member Author

@FrankYang0529 FrankYang0529 Sep 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @TaiJuWu, thanks for the suggestion. It's hard to say waiting how much time can make sure KafkaConsumer#subscribe doesn't send a join request. I add two unit tests to check the behavior.

In ApplicationEventProcessorTest#testSubscriptionChangeEvent, we check ApplicationEventProcessor#process(SubscriptionChangeEvent) doesn't call ConsumerMembershipManager#maybeJoinGroup.
In ConsumerMembershipManagerTest#testOnSubscriptionUpdatedTransitionsToJoiningOnlyIfNotInGroup, we check ConsumerMembershipManager#onSubscriptionUpdated keeps member state as UNSUBSCRIBED and calling transitionToJoining in maybeJoinGroup.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your explanation.

assertFalse(groupProtocol == GroupProtocol.CLASSIC ?
requestGenerated(client, ApiKeys.JOIN_GROUP) :
requestGenerated(client, ApiKeys.CONSUMER_GROUP_HEARTBEAT),
"KafkaConsumer#subscribe should not send " + (groupProtocol == GroupProtocol.CLASSIC ? "JoinGroup" : "Heartbeat") + " request");

consumer.poll(Duration.ZERO);
TestUtils.waitForCondition(() -> groupProtocol == GroupProtocol.CLASSIC ?
requestGenerated(client, ApiKeys.JOIN_GROUP) :
requestGenerated(client, ApiKeys.CONSUMER_GROUP_HEARTBEAT),
"Expected " + (groupProtocol == GroupProtocol.CLASSIC ? "JoinGroup" : "Heartbeat") + " request");
}

private boolean requestGenerated(MockClient client, ApiKeys apiKey) {
return client.requests().stream().anyMatch(request -> request.requestBuilder().apiKey().equals(apiKey));
}

private static final List<String> CLIENT_IDS = new ArrayList<>();
public static class DeserializerForClientId implements Deserializer<byte[]> {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1617,10 +1617,25 @@ public void testRevokePartitionsUsesTopicNamesLocalCacheWhenMetadataNotAvailable
}

@Test
public void testOnSubscriptionUpdatedTransitionsToJoiningOnlyIfNotInGroup() {
public void testOnSubscriptionUpdatedDoesNotTransitionToJoiningIfInGroup() {
ConsumerMembershipManager membershipManager = createMemberInStableState();
membershipManager.onSubscriptionUpdated();
assertTrue(membershipManager.subscriptionUpdated());
membershipManager.onConsumerPoll();
verify(membershipManager, never()).transitionToJoining();
assertFalse(membershipManager.subscriptionUpdated());
}

@Test
public void testOnSubscriptionUpdatedTransitionsToJoiningOnPollIfNotInGroup() {
ConsumerMembershipManager membershipManager = createMembershipManager(null);
assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
membershipManager.onSubscriptionUpdated();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we verify here that there was no transitionToJoining? (to be sure that the transition is only triggered after the call to maybeJoinGroup)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, added it. Thanks.

verify(membershipManager, never()).transitionToJoining();
assertTrue(membershipManager.subscriptionUpdated());
assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
membershipManager.onConsumerPoll();
verify(membershipManager).transitionToJoining();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1137,10 +1137,25 @@ public void testRevokePartitionsUsesTopicNamesLocalCacheWhenMetadataNotAvailable
}

@Test
public void testOnSubscriptionUpdatedTransitionsToJoiningOnlyIfNotInGroup() {
public void testOnSubscriptionUpdatedDoesNotTransitionToJoiningIfInGroup() {
ShareMembershipManager membershipManager = createMemberInStableState();
membershipManager.onSubscriptionUpdated();
assertTrue(membershipManager.subscriptionUpdated());
membershipManager.onConsumerPoll();
verify(membershipManager, never()).transitionToJoining();
assertFalse(membershipManager.subscriptionUpdated());
}

@Test
public void testOnSubscriptionUpdatedTransitionsToJoiningOnPollIfNotInGroup() {
ShareMembershipManager membershipManager = createMembershipManager();
assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
membershipManager.onSubscriptionUpdated();
verify(membershipManager, never()).transitionToJoining();
assertTrue(membershipManager.subscriptionUpdated());
assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
membershipManager.onConsumerPoll();
verify(membershipManager).transitionToJoining();
}

private void assertLeaveGroupDueToExpiredPollAndTransitionToStale(ShareMembershipManager membershipManager) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ public void testApplicationEventIsProcessed(ApplicationEvent e) {

private static Stream<Arguments> applicationEvents() {
return Stream.of(
Arguments.of(new PollEvent(100)),
Arguments.of(new AsyncCommitEvent(new HashMap<>())),
Arguments.of(new SyncCommitEvent(new HashMap<>(), 500)),
Arguments.of(new CheckAndUpdatePositionsEvent(500)),
Expand Down Expand Up @@ -208,6 +207,30 @@ public void testSeekUnvalidatedEventWithException() {
assertInstanceOf(IllegalStateException.class, e.getCause());
}

@Test
public void testPollEvent() {
PollEvent event = new PollEvent(12345);

setupProcessor(true);
when(heartbeatRequestManager.membershipManager()).thenReturn(membershipManager);
processor.process(event);
verify(commitRequestManager).updateAutoCommitTimer(12345);
verify(membershipManager).onConsumerPoll();
verify(heartbeatRequestManager).resetPollTimer(12345);
}

@Test
public void testSubscriptionChangeEvent() {
SubscriptionChangeEvent event = new SubscriptionChangeEvent();

setupProcessor(true);
when(heartbeatRequestManager.membershipManager()).thenReturn(membershipManager);
processor.process(event);
verify(membershipManager).onSubscriptionUpdated();
// verify member state doesn't transition to JOINING.
verify(membershipManager, never()).onConsumerPoll();
}

private List<NetworkClientDelegate.UnsentRequest> mockCommitResults() {
return Collections.singletonList(mock(NetworkClientDelegate.UnsentRequest.class));
}
Expand Down
Loading