Skip to content

Commit

Permalink
KAFKA-17154: New consumer subscribe may join group without a call to …
Browse files Browse the repository at this point in the history
…consumer.poll

Signed-off-by: PoAn Yang <[email protected]>
  • Loading branch information
FrankYang0529 committed Sep 24, 2024
1 parent 42cc3c0 commit 217df42
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import org.slf4j.Logger;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -43,6 +44,7 @@
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import static java.util.Collections.unmodifiableList;
Expand Down Expand Up @@ -189,6 +191,12 @@ public abstract class AbstractMembershipManager<R extends AbstractResponse> impl

private final Time time;

/**
* AtomicBoolean to track whether the subscription is updated.
* If it's true and subscription state is UNSUBSCRIBED, the next {@link #maybeJoinGroup()} will change member state to JOINING.
*/
private final AtomicBoolean hasSubscriptionUpdated = new AtomicBoolean(false);

/**
* True if the poll timer has expired, signaled by a call to
* {@link #transitionToSendingLeaveGroup(boolean)} with dueToExpiredPollTimer param true. This
Expand Down Expand Up @@ -458,14 +466,24 @@ String memberIdInfoForLog() {
}

/**
* Join the group with the updated subscription, if the member is not part of it yet. If the
* member is already part of the group, this will only ensure that the updated subscription
* Set {@link #hasSubscriptionUpdated} to true to indicate that the subscription has been updated.
* The next {@link #maybeJoinGroup()} will join the group with the updated subscription, if the member is not part of it yet.
* If the member is already part of the group, this will only ensure that the updated subscription
* is included in the next heartbeat request.
* <p/>
* Note that list of topics of the subscription is taken from the shared subscription state.
*/
public void onSubscriptionUpdated() {
if (state == MemberState.UNSUBSCRIBED) {
hasSubscriptionUpdated.compareAndSet(false, true);
}

/**
* Join the group if the member is not part of it yet. This function separates {@link #transitionToJoining}
* from the {@link #onSubscriptionUpdated} to fulfill the requirement of the "rebalances will only occur during an
* active call to {@link org.apache.kafka.clients.consumer.KafkaConsumer#poll(Duration)}"
*/
public void maybeJoinGroup() {
if (hasSubscriptionUpdated.compareAndSet(true, false) && state == MemberState.UNSUBSCRIBED) {
transitionToJoining();
}
}
Expand Down Expand Up @@ -1408,4 +1426,11 @@ Optional<LocalAssignment> updateWith(Map<Uuid, SortedSet<Integer>> assignment) {
return Optional.of(new LocalAssignment(nextLocalEpoch, assignment));
}
}

/*
* Visible for testing.
*/
boolean hasSubscriptionUpdated() {
return hasSubscriptionUpdated.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,15 @@ public void process(ApplicationEvent event) {
private void process(final PollEvent event) {
if (requestManagers.commitRequestManager.isPresent()) {
requestManagers.commitRequestManager.ifPresent(m -> m.updateAutoCommitTimer(event.pollTimeMs()));
requestManagers.consumerHeartbeatRequestManager.ifPresent(hrm -> hrm.resetPollTimer(event.pollTimeMs()));
requestManagers.consumerHeartbeatRequestManager.ifPresent(hrm -> {
hrm.membershipManager().maybeJoinGroup();
hrm.resetPollTimer(event.pollTimeMs());
});
} else {
requestManagers.shareHeartbeatRequestManager.ifPresent(hrm -> hrm.resetPollTimer(event.pollTimeMs()));
requestManagers.shareHeartbeatRequestManager.ifPresent(hrm -> {
hrm.membershipManager().maybeJoinGroup();
hrm.resetPollTimer(event.pollTimeMs());
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3443,6 +3443,34 @@ public void testPreventMultiThread(GroupProtocol groupProtocol) throws Interrupt
}
}

@ParameterizedTest
@EnumSource(value = GroupProtocol.class)
public void testPollSendsRequestToJoin(GroupProtocol groupProtocol) throws InterruptedException {
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
initMetadata(client, Collections.singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);

client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node), node);
KafkaConsumer<String, String> consumer = newConsumer(groupProtocol, time, client, subscription, metadata,
assignor, true, groupInstanceId);
consumer.subscribe(singletonList(topic));
assertFalse(groupProtocol == GroupProtocol.CLASSIC ?
requestGenerated(client, ApiKeys.JOIN_GROUP) :
requestGenerated(client, ApiKeys.CONSUMER_GROUP_HEARTBEAT),
"KafkaConsumer#subscribe should not send " + (groupProtocol == GroupProtocol.CLASSIC ? "JoinGroup" : "Heartbeat") + " request");

consumer.poll(Duration.ZERO);
TestUtils.waitForCondition(() -> groupProtocol == GroupProtocol.CLASSIC ?
requestGenerated(client, ApiKeys.JOIN_GROUP) :
requestGenerated(client, ApiKeys.CONSUMER_GROUP_HEARTBEAT),
"Expected " + (groupProtocol == GroupProtocol.CLASSIC ? "JoinGroup" : "Heartbeat") + " request");
}

private boolean requestGenerated(MockClient client, ApiKeys apiKey) {
return client.requests().stream().anyMatch(request -> request.requestBuilder().apiKey().equals(apiKey));
}

private static final List<String> CLIENT_IDS = new ArrayList<>();
public static class DeserializerForClientId implements Deserializer<byte[]> {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1617,10 +1617,25 @@ public void testRevokePartitionsUsesTopicNamesLocalCacheWhenMetadataNotAvailable
}

@Test
public void testOnSubscriptionUpdatedTransitionsToJoiningOnlyIfNotInGroup() {
public void testOnSubscriptionUpdatedDoesNotSetShouldTransitionToJoiningIfInGroup() {
ConsumerMembershipManager membershipManager = createMemberInStableState();
membershipManager.onSubscriptionUpdated();
assertTrue(membershipManager.hasSubscriptionUpdated());
membershipManager.maybeJoinGroup();
verify(membershipManager, never()).transitionToJoining();
assertFalse(membershipManager.hasSubscriptionUpdated());
}

@Test
public void testOnSubscriptionUpdatedSetShouldTransitionToJoiningIfNotInGroup() {
ConsumerMembershipManager membershipManager = createMembershipManager(null);
assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
membershipManager.onSubscriptionUpdated();
verify(membershipManager, never()).transitionToJoining();
assertTrue(membershipManager.hasSubscriptionUpdated());
assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
membershipManager.maybeJoinGroup();
verify(membershipManager).transitionToJoining();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1137,10 +1137,25 @@ public void testRevokePartitionsUsesTopicNamesLocalCacheWhenMetadataNotAvailable
}

@Test
public void testOnSubscriptionUpdatedTransitionsToJoiningOnlyIfNotInGroup() {
public void testOnSubscriptionUpdatedDoesNotSetShouldTransitionToJoiningIfInGroup() {
ShareMembershipManager membershipManager = createMemberInStableState();
membershipManager.onSubscriptionUpdated();
assertTrue(membershipManager.hasSubscriptionUpdated());
membershipManager.maybeJoinGroup();
verify(membershipManager, never()).transitionToJoining();
assertFalse(membershipManager.hasSubscriptionUpdated());
}

@Test
public void testOnSubscriptionUpdatedSetShouldTransitionToJoiningIfNotInGroup() {
ShareMembershipManager membershipManager = createMembershipManager();
assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
membershipManager.onSubscriptionUpdated();
verify(membershipManager, never()).transitionToJoining();
assertTrue(membershipManager.hasSubscriptionUpdated());
assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
membershipManager.maybeJoinGroup();
verify(membershipManager).transitionToJoining();
}

private void assertLeaveGroupDueToExpiredPollAndTransitionToStale(ShareMembershipManager membershipManager) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ public void testApplicationEventIsProcessed(ApplicationEvent e) {

private static Stream<Arguments> applicationEvents() {
return Stream.of(
Arguments.of(new PollEvent(100)),
Arguments.of(new AsyncCommitEvent(new HashMap<>())),
Arguments.of(new SyncCommitEvent(new HashMap<>(), 500)),
Arguments.of(new CheckAndUpdatePositionsEvent(500)),
Expand Down Expand Up @@ -208,6 +207,35 @@ public void testSeekUnvalidatedEventWithException() {
assertInstanceOf(IllegalStateException.class, e.getCause());
}

@Test
public void testPollEvent() {
PollEvent event = new PollEvent(12345);

setupProcessor(true);
when(heartbeatRequestManager.membershipManager()).thenReturn(membershipManager);
processor.process(event);
verify(commitRequestManager).updateAutoCommitTimer(12345);
verify(membershipManager).maybeJoinGroup();
verify(heartbeatRequestManager).resetPollTimer(12345);
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testSubscriptionChangeEvent(boolean withGroupId) {
SubscriptionChangeEvent event = new SubscriptionChangeEvent();

setupProcessor(withGroupId);
when(heartbeatRequestManager.membershipManager()).thenReturn(membershipManager);
processor.process(event);
if (withGroupId) {
verify(membershipManager).onSubscriptionUpdated();
} else {
verify(membershipManager, never()).onSubscriptionUpdated();
}
// verify member state doesn't transition to JOINING.
verify(membershipManager, never()).maybeJoinGroup();
}

private List<NetworkClientDelegate.UnsentRequest> mockCommitResults() {
return Collections.singletonList(mock(NetworkClientDelegate.UnsentRequest.class));
}
Expand Down

0 comments on commit 217df42

Please sign in to comment.