Skip to content

KAFKA-19078: Automatic controller addition to cluster metadata partition#19589

Merged
chia7712 merged 26 commits intoapache:trunkfrom
kevin-wu24:KAFKA-19078
Aug 13, 2025
Merged

KAFKA-19078: Automatic controller addition to cluster metadata partition#19589
chia7712 merged 26 commits intoapache:trunkfrom
kevin-wu24:KAFKA-19078

Conversation

@kevin-wu24
Copy link
Copy Markdown
Contributor

@kevin-wu24 kevin-wu24 commented Apr 28, 2025

Add the controller.quorum.auto.join.enable configuration. When enabled
with KIP-853 supported, follower controllers who are observers (their
replica id + directory id are not in the voter set) will:

  • Automatically remove voter set entries which match their replica id
    but not directory id by sending the RemoveVoterRPC to the leader.
  • Automatically add themselves as a voter when their replica id is not
    present in the voter set by sending the AddVoterRPC to the leader.

Reviewers: José Armando García Sancio
jsancio@apache.org, Chia-Ping Tsai
chia7712@gmail.com

@github-actions github-actions Bot added triage PRs from the community kraft labels Apr 28, 2025
Comment thread raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java Outdated
@github-actions github-actions Bot removed the triage PRs from the community label Apr 30, 2025
Copy link
Copy Markdown
Member

@jsancio jsancio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feature @kevin-wu24 . Reviewed src/main. Need to review the tests.

Comment thread raft/src/main/java/org/apache/kafka/raft/FollowerState.java Outdated
Comment thread raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
Comment thread raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java Outdated
Comment thread raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java Outdated
Comment thread raft/src/main/java/org/apache/kafka/raft/VoterSet.java Outdated
Comment on lines +3323 to +3324
} else if (partitionState.lastKraftVersion().isReconfigSupported() && followersAlwaysFlush &&
quorumConfig.autoJoinEnable() && state.hasAddRemoveVoterPeriodExpired(currentTimeMs)) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. I think we should document why we require both followersAlwaysFlush and autoJoinEnable to be true.

Comment thread raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java Outdated
Comment thread raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java Outdated
Comment thread raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java Outdated
@jsancio
Copy link
Copy Markdown
Member

jsancio commented May 1, 2025

Feature description: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217391519#KIP853:KRaftControllerMembershipChanges-Controllerautojoining

Add the controller.quorum.auto.join.enable configuration. When enabled with KIP-853 supported, follower controllers who are observers (their replica id + directory id are not in the voter set) will:

  • Automatically remove voter set entries which match their replica id but not directory id.
  • Automatically add themselves as a voter when their replica id is not present in the voter set.

I am going to use this description to generate the commit message. I don't think we should include URLs in the commit description. Ideally, the commit message should explain the changes without having to read other documents.

Copy link
Copy Markdown
Member

@jsancio jsancio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the changes.

Comment thread raft/src/main/java/org/apache/kafka/raft/FollowerState.java Outdated
Comment thread raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java Outdated
Comment thread raft/src/main/java/org/apache/kafka/raft/RaftUtil.java
RaftResponse.Inbound responseMetadata,
long currentTimeMs
) {
RemoveRaftVoterResponseData data = (RemoveRaftVoterResponseData) responseMetadata.data();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This cast to RemoveRaftVoterResponseData yet the method is for handleAddVoterResponse. Do the tests pass for you? How is that possible with this cast?

Copy link
Copy Markdown
Contributor Author

@kevin-wu24 kevin-wu24 May 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is because we are not testing the response handling anywhere. In the unit tests we are only asserting that the add/remove voter request was sent when we expect it.

I think we should do something like with fetch where we complete the request (I assume this testing method executes the handleFetchResponse). I need to see how we implement this for fetch.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't KafkaRaftClientAutoJoinTest send add voter and remove voter responses to the replica?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it does, I just looked at the code again. The tests do pass, but this is because this line in the test is sending a remove voter response: https://github.com/apache/kafka/pull/19589/files#diff-7b1538d9f4f7f1e27a444aac55ced6f780e22bfdd8af4462e82d62c24f778c85R94.
Both the implementation and test need to be fixed, thanks for the catch.

Comment thread raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java Outdated
Comment thread raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientAutoJoinTest.java Outdated
private static final int NUMBER_FETCH_TIMEOUTS_IN_ADD_REMOVE_PERIOD = 1;

@Test
public void testAutoRemoveOldVoter() throws Exception {
Copy link
Copy Markdown
Member

@jsancio jsancio May 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a test that fully does a remove followed by an add? E.g.

  1. Start with the local replica not in the voter set but the id in the voter set.
  2. Remove voter is sent and acknowledged.
  3. Next FETCH response send the VOTER_RECORD control batch without the voter in the old voter in the voter set.
  4. Add voter is sent and acknowledged.
  5. Next FETCH response send a VOTER_RECORD control btach with the local replica in the voter set.

Copy link
Copy Markdown
Contributor Author

@kevin-wu24 kevin-wu24 May 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can add one since it acts as like a pseudo-integration test for the feature. I'm not seeing anywhere in KafkaRaftClientReconfigTest or KafkaRaftClientFetchTest that covers step 2. I see a KafkaRaftClientTest#testFollowerReplication but it doesn't add control records in the FETCH response when kraft.version == 1.

Comment on lines +63 to +66
// after sending a remove voter the next request should be a fetch
context.pollUntilRequest();
var fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After this, I think we should check that remove voter is sent after completeFetch.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused. Why would we do this check again? I'm already doing this check from L52-56.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to cover the case where the new voter sent a add/remove voter RPC. The RPC was acknowledge but the log was never updated (fetch responses didn't include the updated voter set). In this case the new voter will send another add/remove voter RPC after "update voter set period timer", right?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. Since the local replica is still an observer, it should still try to add/remove itself if the log hasn't been updated with fetch.

Comment on lines +97 to +100
// after sending an add voter the next request should be a fetch
context.pollUntilRequest();
var fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After this, I think we should check that add voter is sent after another completeFetch?

Comment thread raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientAutoJoinTest.java Outdated
@github-actions github-actions Bot added the core Kafka Broker label May 7, 2025
Copy link
Copy Markdown
Member

@jsancio jsancio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the great tests. We should be able to merge this improvement soon.

requestThread.doWork();
}

@SuppressWarnings("NPathComplexity")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Try using if (...) { return } else if (...) { return } ... and see if that reduces the path complexity so you can remove this suppression.

You can also try using Java's new pattern matching and lambda syntax for switch statements. E.g.

return switch (requestData) {
    case VoterRequestData voterData -> new VoterRequest.Builder(voterData);
    ...
    default -> throw new IllegalArgumentException(...);
}

Copy link
Copy Markdown
Contributor Author

@kevin-wu24 kevin-wu24 May 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I did this, but I got a build error when running ./gradlew jar. It's pretty weird, because we're supposed to be on Java 17 according to gradle, and my IDE said that this kind of instanceof switch matching is supported in Java 17+. Let me try this again and put the actual error message here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what I get when running ./gradlew jar:

error: patterns in switch statements are a preview feature and are disabled by default.
...
(use --enable-preview to enable patterns in switch statements)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Try using if (...) { return } else if (...) { return } ... and see if that reduces the path complexity so you can remove this suppression.

This worked, thanks!

Copy link
Copy Markdown
Member

@jsancio jsancio May 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. I think I have seen this before. I think Java 21 is the oldest release that implemented pattern matching.

return maybeSendFetchToAnyBootstrap(currentTimeMs);
} else if (partitionState.lastKraftVersion().isReconfigSupported() && canBecomeVoter &&
quorumConfig.autoJoin() && state.hasUpdateVoterSetPeriodExpired(currentTimeMs)) {
/* Only replicas that are always flushing and are configured to auto join should
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace "are always flushing" with "can become a voter."

0,
epoch,
BufferSupplier.NO_CACHING.get(300),
VoterSetTest.voterSet(Stream.of(leader)).toVotersRecord((short) 0)),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing newline.

                    VoterSetTest.voterSet(Stream.of(leader)).toVotersRecord((short) 0)
                ),

0,
epoch,
BufferSupplier.NO_CACHING.get(300),
VoterSetTest.voterSet(Stream.of(leader, newFollowerKey)).toVotersRecord((short) 0)),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing newline.

                    VoterSetTest.voterSet(Stream.of(leader, newFollowerKey)).toVotersRecord((short) 0)
                ),

)
);
// poll kraft to update the replica's voter set
context.client.poll();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about adding this time advancement and showing that the replica sent a fetch request?

        context.advanceTimeAndFetchToUpdateVoterSetTimer(epoch, leader.id());
        context.time.sleep(context.fetchTimeoutMs - 1);

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just added and cleaning up this test file for readability, since there's a lot of duplicate code.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the sleep method as part of the advanceTimeAndCompleteFetch helper, since that is what actually expires the timer. I also added a boolean flag to the method to determine whether sleep is called again.

Comment thread raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientAutoJoinTest.java Outdated
Comment thread raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
Copy link
Copy Markdown
Member

@jsancio jsancio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kevin-wu24 , can you resolve the conflicts?

Comment thread raft/src/main/java/org/apache/kafka/raft/KafkaNetworkChannel.java
@kevin-wu24 kevin-wu24 changed the title KAFKA-19078: Implement automatic controller addition to cluster metadata partition KAFKA-19078: Automatic controller addition to cluster metadata partition Jul 30, 2025
Copy link
Copy Markdown
Member

@jsancio jsancio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the changes @kevin-wu24 . Partial review.

Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(new HashSet<>(List.of(3000, 3001, 3002)), voters.keySet());
for (int replicaId : new int[] {3000, 3001, 3002}) {
assertNotEquals(Uuid.ZERO_UUID, voters.get(replicaId));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. Is there a way to get the exact directory id (UUID) and compare against that instead?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I can check that each replica has the exact metadata dir ID as what is in the TestKitNodes.

quorumConfig.requestTimeoutMs(),
quorum.localReplicaKeyOrThrow(),
localListeners,
!quorumConfig.autoJoin()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this the negative of auto join? Shouldn't it always be false? If KRaft send an "add voter" request, it should always be version 1 and return before committing.

ReplicaKey voter,
Endpoints listeners
Endpoints listeners,
boolean ackWhenCommitted
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the add voter request specific for the kraft implementation. "Ack when committed" should always be false. If that's true then let's remove this parameter and not give the caller the option to set it. In the implementation, the method should always call setAckWhenCommitted(false).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Ack when committed" should always be false

I don't think this is true, since there are callers of this method in KafkaRaftClientReconfigTest that do test sending an AddVoterRequest with ackWhenCommitted == true since it is testing the leader state when receiving this RPC. I agree that KafkaRaftClient should always call this method with false.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. That's fair.

Comment on lines +3449 to +3450
long currentTimeMs,
ReplicaKey replicaKey
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flip the order of these parameters. The kraft module has a pattern of using the last parameter as the current time when needed.

Comment on lines +105 to +106
pollAndDeliverFetchToUpdateVoterSet(context, epoch,
VoterSetTest.voterSet(Stream.of(leader, newVoter)));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's fix this indentation. How about:

        pollAndDeliverFetchToUpdateVoterSet(
            context,
            epoch,
            VoterSetTest.voterSet(Stream.of(leader, newVoter))
        );

RaftRequest.Outbound assertSentAddVoterRequest(
ReplicaKey replicaKey,
Endpoints endpoints,
boolean expectedAckWhenCommitted
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not give the caller the option to override this. This value should always be false and this method should just check for that explicitly.

BufferSupplier.NO_CACHING.get(300),
newVoterSet.toVotersRecord((short) 0)
),
context.log.endOffset().offset() + 1,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assumes that the voter set only has one voter hence one record.

Copy link
Copy Markdown
Contributor Author

@kevin-wu24 kevin-wu24 Aug 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this assume the voter set only has one voter? Which voter set are you referencing?

testObserverRemovesOldVoterAndAutoJoins has the voter set go from size 2, to size 1, and then back to size 2 by having a follower node complete the whole "auto-join" flow (i.e. remove its old self and and its new self to the voter set).

fetchRequest.destination().id(),
MemoryRecords.withVotersRecord(
context.log.endOffset().offset(),
0,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. Maybe we can use context.time.milliseconds().

Copy link
Copy Markdown
Member

@jsancio jsancio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @kevin-wu24 . I think we should be able to merge this soon.

ReplicaKey voter,
Endpoints listeners
Endpoints listeners,
boolean ackWhenCommitted
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. That's fair.

)
);
} else {
return maybeSendFetchToBestNode(state, currentTimeMs);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This backoff not correct now that observers can send AddVoter and RemoveVoter requests. Take a look how I solved it for pollFollowerAsVoter.

Comment on lines +3326 to +3327
if (partitionState.lastKraftVersion().isReconfigSupported() && canBecomeVoter &&
quorumConfig.autoJoin() && state.hasUpdateVoterSetPeriodExpired(currentTimeMs)) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a shouldSendAddOrRemoveVoterRequest similar to shouldSendUpdateVoterRequest. This would allow you to better document this predicate.

Comment on lines +3326 to +3327
return partitionState.lastKraftVersion().isReconfigSupported() && canBecomeVoter &&
quorumConfig.autoJoin() && state.hasUpdateVoterSetPeriodExpired(currentTimeMs);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please document why we need this predicate. See shouldSendUpdateVoteRequest for an example.

Comment on lines +3338 to +3340
/* Only replicas that can become a voter and are configured to auto join should
* attempt to automatically join the voter set for the configured topic partition.
*/
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my other comment but you can move this comment to shouldSendAddOrRemoveVoterRequest.

return Math.min(
backoffMs,
Math.min(
state.remainingFetchTimeMs(currentTimeMs),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Observer don't need to backoff until the fetch timeout since observer do not read or handle fetch timeouts.

Copy link
Copy Markdown
Member

@jsancio jsancio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Copy Markdown
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kevin-wu24 thanks for this cool function! I have a couple of questions. Please take a look when you have some free time.

) {
return new AddRaftVoterRequestData()
.setClusterId(clusterId)
.setTimeoutMs(timeoutMs)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't see any usage of the timeout. Is that expected?

Copy link
Copy Markdown
Contributor Author

@kevin-wu24 kevin-wu24 Aug 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't see any usage of the timeout.

Can you elaborate a bit more about this? I'm not sure what you mean.

The AddRaftVoterRequest is handled in a separate Java class, AddVoterHandler.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I see what you mean @chia7712.

The leader does have timeout logic to expire the pending AddVoterRequest when it sends the ApiVersionsRequest. But it doesn't check the client-side timeout sent as part of the request has expired. In practice, this doesn't affect too much because the leader will either time out the request during the APIVersions handling, or if unable to commit the new voters record, will resign. The important thing is that the leader does not get stuck with a pending voters operation. For the purposes of auto-join PR, adding handling for the client-side timeout is out of scope.

Additionally, for the default timeout of an AdminClient request to add/remove a voter (1 min IIRC), the fetch timeout is significantly smaller, so it won't matter. If the timeout is sufficiently small enough, then we are missing handling on the server-side.

Filed https://issues.apache.org/jira/browse/KAFKA-19600 as a follow-up.

try (Admin admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(new HashSet<>(List.of(3000, 3001, 3002)), voters.keySet());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Set.of(3000, 3001, 3002)

} else if (initialVoterSet.isPresent()) {
for (final var controllerNode : initialVoterSet.get().entrySet()) {
final var voterId = controllerNode.getKey();
final var voterDirectoryid = controllerNode.getValue();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

voterDirectoryid -> voterDirectoryId

final RequestSendResult sendResult;
if (voters.voterIds().contains(localReplicaKey.id())) {
/* Replica id is in the voter set but replica is not voter. Remove old voter.
* Local replica is not in the voter set because the replica is an observer.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we could remind that, in this path, the directory ID must be different.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I can clarify the comment.

Copy link
Copy Markdown
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. I'm leaving a follow-up suggestion.

public static final String QUORUM_RETRY_BACKOFF_MS_DOC = CommonClientConfigs.RETRY_BACKOFF_MS_DOC;
public static final int DEFAULT_QUORUM_RETRY_BACKOFF_MS = 20;

public static final String QUORUM_AUTO_JOIN_ENABLE_CONFIG = QUORUM_PREFIX + "auto.join.enable";
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kevin-wu24 it would be cool if users could find out about this new config through upgrade.html. Could you please consider addressing it in the follow-up?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chia7712 chia7712 merged commit 92d8cb5 into apache:trunk Aug 13, 2025
26 checks passed
}
return Math.min(
backoffMs,
state.remainingUpdateVoterSetPeriodMs(currentTimeMs)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the broker node, the state timeout is not reset in voter-related responses. As a result, the 0 returned by remainingUpdateVoterSetPeriodMs can lead to a busy loop, which increases CPU usage. Could you please take a look?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, for a broker, this return statement will eventually always return 0 since we have no way to reset it. I don't think conceptually it makes sense to reset that timer upon receiving a FetchResponse. Instead we should not read that timer's value when determining the backoff if we are a broker.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opened #20354

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants