From 87575f2b37e240c79ed1f86c448171c176948090 Mon Sep 17 00:00:00 2001 From: PoAn Yang Date: Thu, 26 Sep 2024 19:53:32 +0800 Subject: [PATCH] KAFKA-17154: New consumer subscribe may join group without a call to consumer.poll Signed-off-by: PoAn Yang --- .../internals/AbstractMembershipManager.java | 31 +++++++++++++++++-- .../events/ApplicationEventProcessor.java | 10 ++++-- .../clients/consumer/KafkaConsumerTest.java | 28 +++++++++++++++++ .../ConsumerMembershipManagerTest.java | 17 +++++++++- .../internals/ShareMembershipManagerTest.java | 17 +++++++++- .../events/ApplicationEventProcessorTest.java | 25 ++++++++++++++- 6 files changed, 120 insertions(+), 8 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java index 455e48a0c891..2b4bd8efab77 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java @@ -29,6 +29,7 @@ import org.slf4j.Logger; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -43,6 +44,7 @@ import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import static java.util.Collections.unmodifiableList; @@ -189,6 +191,12 @@ public abstract class AbstractMembershipManager impl private final Time time; + /** + * AtomicBoolean to track whether the subscription is updated. + * If it's true and subscription state is UNSUBSCRIBED, the next {@link #onConsumerPoll()} will change member state to JOINING. + */ + private final AtomicBoolean subscriptionUpdated = new AtomicBoolean(false); + /** * True if the poll timer has expired, signaled by a call to * {@link #transitionToSendingLeaveGroup(boolean)} with dueToExpiredPollTimer param true. This @@ -458,14 +466,24 @@ String memberIdInfoForLog() { } /** - * Join the group with the updated subscription, if the member is not part of it yet. If the - * member is already part of the group, this will only ensure that the updated subscription + * Set {@link #subscriptionUpdated} to true to indicate that the subscription has been updated. + * The next {@link #onConsumerPoll()} will join the group with the updated subscription, if the member is not part of it yet. + * If the member is already part of the group, this will only ensure that the updated subscription * is included in the next heartbeat request. *

* Note that list of topics of the subscription is taken from the shared subscription state. */ public void onSubscriptionUpdated() { - if (state == MemberState.UNSUBSCRIBED) { + subscriptionUpdated.compareAndSet(false, true); + } + + /** + * Join the group if the member is not part of it yet. This function separates {@link #transitionToJoining} + * from the {@link #onSubscriptionUpdated} to fulfill the requirement of the "rebalances will only occur during an + * active call to {@link org.apache.kafka.clients.consumer.KafkaConsumer#poll(Duration)}" + */ + public void onConsumerPoll() { + if (subscriptionUpdated.compareAndSet(true, false) && state == MemberState.UNSUBSCRIBED) { transitionToJoining(); } } @@ -1408,4 +1426,11 @@ Optional updateWith(Map> assignment) { return Optional.of(new LocalAssignment(nextLocalEpoch, assignment)); } } + + /* + * Visible for testing. + */ + boolean subscriptionUpdated() { + return subscriptionUpdated.get(); + } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java index 6ae6c23f6ee1..6008e850324f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java @@ -155,9 +155,15 @@ public void process(ApplicationEvent event) { private void process(final PollEvent event) { if (requestManagers.commitRequestManager.isPresent()) { requestManagers.commitRequestManager.ifPresent(m -> m.updateAutoCommitTimer(event.pollTimeMs())); - requestManagers.consumerHeartbeatRequestManager.ifPresent(hrm -> hrm.resetPollTimer(event.pollTimeMs())); + requestManagers.consumerHeartbeatRequestManager.ifPresent(hrm -> { + hrm.membershipManager().onConsumerPoll(); + hrm.resetPollTimer(event.pollTimeMs()); + }); } else { - requestManagers.shareHeartbeatRequestManager.ifPresent(hrm -> hrm.resetPollTimer(event.pollTimeMs())); + requestManagers.shareHeartbeatRequestManager.ifPresent(hrm -> { + hrm.membershipManager().onConsumerPoll(); + hrm.resetPollTimer(event.pollTimeMs()); + }); } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 77c0264849a1..f18385d1ebad 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -3443,6 +3443,34 @@ public void testPreventMultiThread(GroupProtocol groupProtocol) throws Interrupt } } + @ParameterizedTest + @EnumSource(value = GroupProtocol.class) + public void testPollSendsRequestToJoin(GroupProtocol groupProtocol) throws InterruptedException { + ConsumerMetadata metadata = createMetadata(subscription); + MockClient client = new MockClient(time, metadata); + initMetadata(client, Collections.singletonMap(topic, 1)); + Node node = metadata.fetch().nodes().get(0); + + client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node), node); + KafkaConsumer consumer = newConsumer(groupProtocol, time, client, subscription, metadata, + assignor, true, groupInstanceId); + consumer.subscribe(singletonList(topic)); + assertFalse(groupProtocol == GroupProtocol.CLASSIC ? + requestGenerated(client, ApiKeys.JOIN_GROUP) : + requestGenerated(client, ApiKeys.CONSUMER_GROUP_HEARTBEAT), + "KafkaConsumer#subscribe should not send " + (groupProtocol == GroupProtocol.CLASSIC ? "JoinGroup" : "Heartbeat") + " request"); + + consumer.poll(Duration.ZERO); + TestUtils.waitForCondition(() -> groupProtocol == GroupProtocol.CLASSIC ? + requestGenerated(client, ApiKeys.JOIN_GROUP) : + requestGenerated(client, ApiKeys.CONSUMER_GROUP_HEARTBEAT), + "Expected " + (groupProtocol == GroupProtocol.CLASSIC ? "JoinGroup" : "Heartbeat") + " request"); + } + + private boolean requestGenerated(MockClient client, ApiKeys apiKey) { + return client.requests().stream().anyMatch(request -> request.requestBuilder().apiKey().equals(apiKey)); + } + private static final List CLIENT_IDS = new ArrayList<>(); public static class DeserializerForClientId implements Deserializer { @Override diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java index 75c982351886..3ca7cf2bd2ba 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java @@ -1617,10 +1617,25 @@ public void testRevokePartitionsUsesTopicNamesLocalCacheWhenMetadataNotAvailable } @Test - public void testOnSubscriptionUpdatedTransitionsToJoiningOnlyIfNotInGroup() { + public void testOnSubscriptionUpdatedDoesNotTransitionToJoiningIfInGroup() { ConsumerMembershipManager membershipManager = createMemberInStableState(); membershipManager.onSubscriptionUpdated(); + assertTrue(membershipManager.subscriptionUpdated()); + membershipManager.onConsumerPoll(); verify(membershipManager, never()).transitionToJoining(); + assertFalse(membershipManager.subscriptionUpdated()); + } + + @Test + public void testOnSubscriptionUpdatedTransitionsToJoiningOnPollIfNotInGroup() { + ConsumerMembershipManager membershipManager = createMembershipManager(null); + assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state()); + membershipManager.onSubscriptionUpdated(); + verify(membershipManager, never()).transitionToJoining(); + assertTrue(membershipManager.subscriptionUpdated()); + assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state()); + membershipManager.onConsumerPoll(); + verify(membershipManager).transitionToJoining(); } @Test diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManagerTest.java index 96a2b98ce610..eca8db88852d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManagerTest.java @@ -1137,10 +1137,25 @@ public void testRevokePartitionsUsesTopicNamesLocalCacheWhenMetadataNotAvailable } @Test - public void testOnSubscriptionUpdatedTransitionsToJoiningOnlyIfNotInGroup() { + public void testOnSubscriptionUpdatedDoesNotTransitionToJoiningIfInGroup() { ShareMembershipManager membershipManager = createMemberInStableState(); membershipManager.onSubscriptionUpdated(); + assertTrue(membershipManager.subscriptionUpdated()); + membershipManager.onConsumerPoll(); verify(membershipManager, never()).transitionToJoining(); + assertFalse(membershipManager.subscriptionUpdated()); + } + + @Test + public void testOnSubscriptionUpdatedTransitionsToJoiningOnPollIfNotInGroup() { + ShareMembershipManager membershipManager = createMembershipManager(); + assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state()); + membershipManager.onSubscriptionUpdated(); + verify(membershipManager, never()).transitionToJoining(); + assertTrue(membershipManager.subscriptionUpdated()); + assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state()); + membershipManager.onConsumerPoll(); + verify(membershipManager).transitionToJoining(); } private void assertLeaveGroupDueToExpiredPollAndTransitionToStale(ShareMembershipManager membershipManager) { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java index 80315099fb89..db7d623c59e9 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java @@ -126,7 +126,6 @@ public void testApplicationEventIsProcessed(ApplicationEvent e) { private static Stream applicationEvents() { return Stream.of( - Arguments.of(new PollEvent(100)), Arguments.of(new AsyncCommitEvent(new HashMap<>())), Arguments.of(new SyncCommitEvent(new HashMap<>(), 500)), Arguments.of(new CheckAndUpdatePositionsEvent(500)), @@ -208,6 +207,30 @@ public void testSeekUnvalidatedEventWithException() { assertInstanceOf(IllegalStateException.class, e.getCause()); } + @Test + public void testPollEvent() { + PollEvent event = new PollEvent(12345); + + setupProcessor(true); + when(heartbeatRequestManager.membershipManager()).thenReturn(membershipManager); + processor.process(event); + verify(commitRequestManager).updateAutoCommitTimer(12345); + verify(membershipManager).onConsumerPoll(); + verify(heartbeatRequestManager).resetPollTimer(12345); + } + + @Test + public void testSubscriptionChangeEvent() { + SubscriptionChangeEvent event = new SubscriptionChangeEvent(); + + setupProcessor(true); + when(heartbeatRequestManager.membershipManager()).thenReturn(membershipManager); + processor.process(event); + verify(membershipManager).onSubscriptionUpdated(); + // verify member state doesn't transition to JOINING. + verify(membershipManager, never()).onConsumerPoll(); + } + private List mockCommitResults() { return Collections.singletonList(mock(NetworkClientDelegate.UnsentRequest.class)); }