KAFKA-19078: Automatic controller addition to cluster metadata partition#19589
KAFKA-19078: Automatic controller addition to cluster metadata partition#19589chia7712 merged 26 commits intoapache:trunkfrom
Conversation
There was a problem hiding this comment.
Thanks for the feature @kevin-wu24 . Reviewed src/main. Need to review the tests.
| } else if (partitionState.lastKraftVersion().isReconfigSupported() && followersAlwaysFlush && | ||
| quorumConfig.autoJoinEnable() && state.hasAddRemoveVoterPeriodExpired(currentTimeMs)) { |
There was a problem hiding this comment.
Okay. I think we should document why we require both followersAlwaysFlush and autoJoinEnable to be true.
I am going to use this description to generate the commit message. I don't think we should include URLs in the commit description. Ideally, the commit message should explain the changes without having to read other documents. |
| RaftResponse.Inbound responseMetadata, | ||
| long currentTimeMs | ||
| ) { | ||
| RemoveRaftVoterResponseData data = (RemoveRaftVoterResponseData) responseMetadata.data(); |
There was a problem hiding this comment.
This cast to RemoveRaftVoterResponseData yet the method is for handleAddVoterResponse. Do the tests pass for you? How is that possible with this cast?
There was a problem hiding this comment.
I think this is because we are not testing the response handling anywhere. In the unit tests we are only asserting that the add/remove voter request was sent when we expect it.
I think we should do something like with fetch where we complete the request (I assume this testing method executes the handleFetchResponse). I need to see how we implement this for fetch.
There was a problem hiding this comment.
Doesn't KafkaRaftClientAutoJoinTest send add voter and remove voter responses to the replica?
There was a problem hiding this comment.
Yeah it does, I just looked at the code again. The tests do pass, but this is because this line in the test is sending a remove voter response: https://github.com/apache/kafka/pull/19589/files#diff-7b1538d9f4f7f1e27a444aac55ced6f780e22bfdd8af4462e82d62c24f778c85R94.
Both the implementation and test need to be fixed, thanks for the catch.
| private static final int NUMBER_FETCH_TIMEOUTS_IN_ADD_REMOVE_PERIOD = 1; | ||
|
|
||
| @Test | ||
| public void testAutoRemoveOldVoter() throws Exception { |
There was a problem hiding this comment.
Do we need a test that fully does a remove followed by an add? E.g.
- Start with the local replica not in the voter set but the id in the voter set.
- Remove voter is sent and acknowledged.
- Next FETCH response send the VOTER_RECORD control batch without the voter in the old voter in the voter set.
- Add voter is sent and acknowledged.
- Next FETCH response send a VOTER_RECORD control btach with the local replica in the voter set.
There was a problem hiding this comment.
I think we can add one since it acts as like a pseudo-integration test for the feature. I'm not seeing anywhere in KafkaRaftClientReconfigTest or KafkaRaftClientFetchTest that covers step 2. I see a KafkaRaftClientTest#testFollowerReplication but it doesn't add control records in the FETCH response when kraft.version == 1.
| // after sending a remove voter the next request should be a fetch | ||
| context.pollUntilRequest(); | ||
| var fetchRequest = context.assertSentFetchRequest(); | ||
| context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); |
There was a problem hiding this comment.
After this, I think we should check that remove voter is sent after completeFetch.
There was a problem hiding this comment.
I'm a bit confused. Why would we do this check again? I'm already doing this check from L52-56.
There was a problem hiding this comment.
I think we need to cover the case where the new voter sent a add/remove voter RPC. The RPC was acknowledge but the log was never updated (fetch responses didn't include the updated voter set). In this case the new voter will send another add/remove voter RPC after "update voter set period timer", right?
There was a problem hiding this comment.
Okay. Since the local replica is still an observer, it should still try to add/remove itself if the log hasn't been updated with fetch.
| // after sending an add voter the next request should be a fetch | ||
| context.pollUntilRequest(); | ||
| var fetchRequest = context.assertSentFetchRequest(); | ||
| context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); |
There was a problem hiding this comment.
After this, I think we should check that add voter is sent after another completeFetch?
jsancio
left a comment
There was a problem hiding this comment.
Thanks for the great tests. We should be able to merge this improvement soon.
| requestThread.doWork(); | ||
| } | ||
|
|
||
| @SuppressWarnings("NPathComplexity") |
There was a problem hiding this comment.
Try using if (...) { return } else if (...) { return } ... and see if that reduces the path complexity so you can remove this suppression.
You can also try using Java's new pattern matching and lambda syntax for switch statements. E.g.
return switch (requestData) {
case VoterRequestData voterData -> new VoterRequest.Builder(voterData);
...
default -> throw new IllegalArgumentException(...);
}There was a problem hiding this comment.
I think I did this, but I got a build error when running ./gradlew jar. It's pretty weird, because we're supposed to be on Java 17 according to gradle, and my IDE said that this kind of instanceof switch matching is supported in Java 17+. Let me try this again and put the actual error message here.
There was a problem hiding this comment.
This is what I get when running ./gradlew jar:
error: patterns in switch statements are a preview feature and are disabled by default.
...
(use --enable-preview to enable patterns in switch statements)
There was a problem hiding this comment.
Try using if (...) { return } else if (...) { return } ... and see if that reduces the path complexity so you can remove this suppression.
This worked, thanks!
There was a problem hiding this comment.
Yeah. I think I have seen this before. I think Java 21 is the oldest release that implemented pattern matching.
| return maybeSendFetchToAnyBootstrap(currentTimeMs); | ||
| } else if (partitionState.lastKraftVersion().isReconfigSupported() && canBecomeVoter && | ||
| quorumConfig.autoJoin() && state.hasUpdateVoterSetPeriodExpired(currentTimeMs)) { | ||
| /* Only replicas that are always flushing and are configured to auto join should |
There was a problem hiding this comment.
Replace "are always flushing" with "can become a voter."
| 0, | ||
| epoch, | ||
| BufferSupplier.NO_CACHING.get(300), | ||
| VoterSetTest.voterSet(Stream.of(leader)).toVotersRecord((short) 0)), |
There was a problem hiding this comment.
Missing newline.
VoterSetTest.voterSet(Stream.of(leader)).toVotersRecord((short) 0)
),| 0, | ||
| epoch, | ||
| BufferSupplier.NO_CACHING.get(300), | ||
| VoterSetTest.voterSet(Stream.of(leader, newFollowerKey)).toVotersRecord((short) 0)), |
There was a problem hiding this comment.
Missing newline.
VoterSetTest.voterSet(Stream.of(leader, newFollowerKey)).toVotersRecord((short) 0)
),| ) | ||
| ); | ||
| // poll kraft to update the replica's voter set | ||
| context.client.poll(); |
There was a problem hiding this comment.
How about adding this time advancement and showing that the replica sent a fetch request?
context.advanceTimeAndFetchToUpdateVoterSetTimer(epoch, leader.id());
context.time.sleep(context.fetchTimeoutMs - 1);There was a problem hiding this comment.
Just added and cleaning up this test file for readability, since there's a lot of duplicate code.
There was a problem hiding this comment.
I added the sleep method as part of the advanceTimeAndCompleteFetch helper, since that is what actually expires the timer. I also added a boolean flag to the method to determine whether sleep is called again.
…eck follower can fetch while add voter request is pending
jsancio
left a comment
There was a problem hiding this comment.
@kevin-wu24 , can you resolve the conflicts?
jsancio
left a comment
There was a problem hiding this comment.
Thanks for the changes @kevin-wu24 . Partial review.
| Map<Integer, Uuid> voters = findVoterDirs(admin); | ||
| assertEquals(new HashSet<>(List.of(3000, 3001, 3002)), voters.keySet()); | ||
| for (int replicaId : new int[] {3000, 3001, 3002}) { | ||
| assertNotEquals(Uuid.ZERO_UUID, voters.get(replicaId)); |
There was a problem hiding this comment.
Okay. Is there a way to get the exact directory id (UUID) and compare against that instead?
There was a problem hiding this comment.
Yeah, I can check that each replica has the exact metadata dir ID as what is in the TestKitNodes.
| quorumConfig.requestTimeoutMs(), | ||
| quorum.localReplicaKeyOrThrow(), | ||
| localListeners, | ||
| !quorumConfig.autoJoin() |
There was a problem hiding this comment.
Why is this the negative of auto join? Shouldn't it always be false? If KRaft send an "add voter" request, it should always be version 1 and return before committing.
| ReplicaKey voter, | ||
| Endpoints listeners | ||
| Endpoints listeners, | ||
| boolean ackWhenCommitted |
There was a problem hiding this comment.
This is the add voter request specific for the kraft implementation. "Ack when committed" should always be false. If that's true then let's remove this parameter and not give the caller the option to set it. In the implementation, the method should always call setAckWhenCommitted(false).
There was a problem hiding this comment.
"Ack when committed" should always be false
I don't think this is true, since there are callers of this method in KafkaRaftClientReconfigTest that do test sending an AddVoterRequest with ackWhenCommitted == true since it is testing the leader state when receiving this RPC. I agree that KafkaRaftClient should always call this method with false.
| long currentTimeMs, | ||
| ReplicaKey replicaKey |
There was a problem hiding this comment.
Flip the order of these parameters. The kraft module has a pattern of using the last parameter as the current time when needed.
| pollAndDeliverFetchToUpdateVoterSet(context, epoch, | ||
| VoterSetTest.voterSet(Stream.of(leader, newVoter))); |
There was a problem hiding this comment.
Let's fix this indentation. How about:
pollAndDeliverFetchToUpdateVoterSet(
context,
epoch,
VoterSetTest.voterSet(Stream.of(leader, newVoter))
);| RaftRequest.Outbound assertSentAddVoterRequest( | ||
| ReplicaKey replicaKey, | ||
| Endpoints endpoints, | ||
| boolean expectedAckWhenCommitted |
There was a problem hiding this comment.
Let's not give the caller the option to override this. This value should always be false and this method should just check for that explicitly.
| BufferSupplier.NO_CACHING.get(300), | ||
| newVoterSet.toVotersRecord((short) 0) | ||
| ), | ||
| context.log.endOffset().offset() + 1, |
There was a problem hiding this comment.
This assumes that the voter set only has one voter hence one record.
There was a problem hiding this comment.
Why does this assume the voter set only has one voter? Which voter set are you referencing?
testObserverRemovesOldVoterAndAutoJoins has the voter set go from size 2, to size 1, and then back to size 2 by having a follower node complete the whole "auto-join" flow (i.e. remove its old self and and its new self to the voter set).
| fetchRequest.destination().id(), | ||
| MemoryRecords.withVotersRecord( | ||
| context.log.endOffset().offset(), | ||
| 0, |
There was a problem hiding this comment.
Hmm. Maybe we can use context.time.milliseconds().
jsancio
left a comment
There was a problem hiding this comment.
Thanks @kevin-wu24 . I think we should be able to merge this soon.
| ReplicaKey voter, | ||
| Endpoints listeners | ||
| Endpoints listeners, | ||
| boolean ackWhenCommitted |
| ) | ||
| ); | ||
| } else { | ||
| return maybeSendFetchToBestNode(state, currentTimeMs); |
There was a problem hiding this comment.
This backoff not correct now that observers can send AddVoter and RemoveVoter requests. Take a look how I solved it for pollFollowerAsVoter.
| if (partitionState.lastKraftVersion().isReconfigSupported() && canBecomeVoter && | ||
| quorumConfig.autoJoin() && state.hasUpdateVoterSetPeriodExpired(currentTimeMs)) { |
There was a problem hiding this comment.
Let's add a shouldSendAddOrRemoveVoterRequest similar to shouldSendUpdateVoterRequest. This would allow you to better document this predicate.
| return partitionState.lastKraftVersion().isReconfigSupported() && canBecomeVoter && | ||
| quorumConfig.autoJoin() && state.hasUpdateVoterSetPeriodExpired(currentTimeMs); |
There was a problem hiding this comment.
Please document why we need this predicate. See shouldSendUpdateVoteRequest for an example.
| /* Only replicas that can become a voter and are configured to auto join should | ||
| * attempt to automatically join the voter set for the configured topic partition. | ||
| */ |
There was a problem hiding this comment.
See my other comment but you can move this comment to shouldSendAddOrRemoveVoterRequest.
| return Math.min( | ||
| backoffMs, | ||
| Math.min( | ||
| state.remainingFetchTimeMs(currentTimeMs), |
There was a problem hiding this comment.
Observer don't need to backoff until the fetch timeout since observer do not read or handle fetch timeouts.
chia7712
left a comment
There was a problem hiding this comment.
@kevin-wu24 thanks for this cool function! I have a couple of questions. Please take a look when you have some free time.
| ) { | ||
| return new AddRaftVoterRequestData() | ||
| .setClusterId(clusterId) | ||
| .setTimeoutMs(timeoutMs) |
There was a problem hiding this comment.
I didn't see any usage of the timeout. Is that expected?
There was a problem hiding this comment.
I didn't see any usage of the timeout.
Can you elaborate a bit more about this? I'm not sure what you mean.
The AddRaftVoterRequest is handled in a separate Java class, AddVoterHandler.
There was a problem hiding this comment.
I think I see what you mean @chia7712.
The leader does have timeout logic to expire the pending AddVoterRequest when it sends the ApiVersionsRequest. But it doesn't check the client-side timeout sent as part of the request has expired. In practice, this doesn't affect too much because the leader will either time out the request during the APIVersions handling, or if unable to commit the new voters record, will resign. The important thing is that the leader does not get stuck with a pending voters operation. For the purposes of auto-join PR, adding handling for the client-side timeout is out of scope.
Additionally, for the default timeout of an AdminClient request to add/remove a voter (1 min IIRC), the fetch timeout is significantly smaller, so it won't matter. If the timeout is sufficiently small enough, then we are missing handling on the server-side.
Filed https://issues.apache.org/jira/browse/KAFKA-19600 as a follow-up.
| try (Admin admin = Admin.create(cluster.clientProperties())) { | ||
| TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> { | ||
| Map<Integer, Uuid> voters = findVoterDirs(admin); | ||
| assertEquals(new HashSet<>(List.of(3000, 3001, 3002)), voters.keySet()); |
| } else if (initialVoterSet.isPresent()) { | ||
| for (final var controllerNode : initialVoterSet.get().entrySet()) { | ||
| final var voterId = controllerNode.getKey(); | ||
| final var voterDirectoryid = controllerNode.getValue(); |
There was a problem hiding this comment.
voterDirectoryid -> voterDirectoryId
| final RequestSendResult sendResult; | ||
| if (voters.voterIds().contains(localReplicaKey.id())) { | ||
| /* Replica id is in the voter set but replica is not voter. Remove old voter. | ||
| * Local replica is not in the voter set because the replica is an observer. |
There was a problem hiding this comment.
Perhaps we could remind that, in this path, the directory ID must be different.
There was a problem hiding this comment.
Sure, I can clarify the comment.
chia7712
left a comment
There was a problem hiding this comment.
LGTM. I'm leaving a follow-up suggestion.
| public static final String QUORUM_RETRY_BACKOFF_MS_DOC = CommonClientConfigs.RETRY_BACKOFF_MS_DOC; | ||
| public static final int DEFAULT_QUORUM_RETRY_BACKOFF_MS = 20; | ||
|
|
||
| public static final String QUORUM_AUTO_JOIN_ENABLE_CONFIG = QUORUM_PREFIX + "auto.join.enable"; |
There was a problem hiding this comment.
@kevin-wu24 it would be cool if users could find out about this new config through upgrade.html. Could you please consider addressing it in the follow-up?
There was a problem hiding this comment.
| } | ||
| return Math.min( | ||
| backoffMs, | ||
| state.remainingUpdateVoterSetPeriodMs(currentTimeMs) |
There was a problem hiding this comment.
In the broker node, the state timeout is not reset in voter-related responses. As a result, the 0 returned by remainingUpdateVoterSetPeriodMs can lead to a busy loop, which increases CPU usage. Could you please take a look?
There was a problem hiding this comment.
There was a problem hiding this comment.
Yeah, for a broker, this return statement will eventually always return 0 since we have no way to reset it. I don't think conceptually it makes sense to reset that timer upon receiving a FetchResponse. Instead we should not read that timer's value when determining the backoff if we are a broker.
Add the
controller.quorum.auto.join.enableconfiguration. When enabledwith KIP-853 supported, follower controllers who are observers (their
replica id + directory id are not in the voter set) will:
but not directory id by sending the
RemoveVoterRPCto the leader.present in the voter set by sending the
AddVoterRPCto the leader.Reviewers: José Armando García Sancio
jsancio@apache.org, Chia-Ping Tsai
chia7712@gmail.com