Skip to content

Commit

Permalink
KAFKA-16527; Implement request handling for updated KRaft RPCs (#16235)
Browse files Browse the repository at this point in the history
Implement request handling for the updated versions of the KRaft RPCs (Fetch, FetchSnapshot, Vote,
BeginQuorumEpoch and EndQuorumEpoch). This doesn't add support for KRaft replicas to send the new
version of the KRaft RPCs. That will be implemented in KAFKA-16529.

All of the RPCs responses were extended to include the leader's endpoint for the listener of the
channel used in the request. EpochState was extended to include the leader's endpoint information
but only the FollowerState and LeaderState know the leader id and its endpoint(s).

For the Fetch request, the replica directory id was added. The leader now tracks the follower's log
end offset using both the replica id and replica directory id.

For the FetchSnapshot request, the replica directory id was added. This is not used by the KRaft
leader and it is there for consistency with Fetch and for help debugging.

For the Vote request, the replica key for both the voter (destination) and the candidate (source)
were added. The voter key is checked for consistency. The candidate key is persisted when the vote
is granted.

For the BeginQuorumEpoch request, all of the leader's endpoints are included. This is needed so
that the voters can return the leader's endpoint for all of the supported listeners.

For the EndQuorumEpoch request, all of the leader's endpoints are included. This is needed so that
the voters can return the leader's endpoint for all of the supported listeners. The successor list
has been extended to include the directory id. Receiving voters can use the entire replica key when
searching their position in the successor list.

Updated the existing test in KafkaRaftClientTest and KafkaRaftClientSnapshotTest to execute using
both the old version and new version of the RPCs.

Reviewers: Luke Chen <[email protected]>, Colin P. McCabe <[email protected]>
  • Loading branch information
jsancio committed Jun 25, 2024
1 parent 5b0e96d commit adee6f0
Show file tree
Hide file tree
Showing 72 changed files with 4,095 additions and 1,883 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.errors;

public class InvalidVoterKeyException extends ApiException {

private static final long serialVersionUID = 1;

public InvalidVoterKeyException(String s) {
super(s);
}

public InvalidVoterKeyException(String message, Throwable cause) {
super(message, cause);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.apache.kafka.common.errors.InvalidTxnStateException;
import org.apache.kafka.common.errors.InvalidTxnTimeoutException;
import org.apache.kafka.common.errors.InvalidUpdateVersionException;
import org.apache.kafka.common.errors.InvalidVoterKeyException;
import org.apache.kafka.common.errors.KafkaStorageException;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.ListenerNotFoundException;
Expand Down Expand Up @@ -403,7 +404,8 @@ public enum Errors {
INVALID_RECORD_STATE(121, "The record state is invalid. The acknowledgement of delivery could not be completed.", InvalidRecordStateException::new),
SHARE_SESSION_NOT_FOUND(122, "The share session was not found.", ShareSessionNotFoundException::new),
INVALID_SHARE_SESSION_EPOCH(123, "The share session epoch is invalid.", InvalidShareSessionEpochException::new),
FENCED_STATE_EPOCH(124, "The share coordinator rejected the request because the share-group state epoch did not match.", FencedStateEpochException::new);
FENCED_STATE_EPOCH(124, "The share coordinator rejected the request because the share-group state epoch did not match.", FencedStateEpochException::new),
INVALID_VOTER_KEY(125, "The voter key doesn't match the receiving replica's key.", InvalidVoterKeyException::new);

private static final Logger log = LoggerFactory.getLogger(Errors.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,12 @@ public static BeginQuorumEpochRequest parse(ByteBuffer buffer, short version) {
return new BeginQuorumEpochRequest(new BeginQuorumEpochRequestData(new ByteBufferAccessor(buffer), version), version);
}

public static BeginQuorumEpochRequestData singletonRequest(TopicPartition topicPartition,
int leaderEpoch,
int leaderId) {
return singletonRequest(topicPartition, null, leaderEpoch, leaderId);
}

public static BeginQuorumEpochRequestData singletonRequest(TopicPartition topicPartition,
String clusterId,
int leaderEpoch,
int leaderId) {
public static BeginQuorumEpochRequestData singletonRequest(
TopicPartition topicPartition,
String clusterId,
int leaderEpoch,
int leaderId
) {
return new BeginQuorumEpochRequestData()
.setClusterId(clusterId)
.setTopics(Collections.singletonList(
Expand All @@ -90,5 +86,4 @@ public static BeginQuorumEpochRequestData singletonRequest(TopicPartition topicP
.setLeaderId(leaderId))))
);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@

package org.apache.kafka.common.requests;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.BeginQuorumEpochResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;

import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

Expand All @@ -49,27 +47,6 @@ public BeginQuorumEpochResponse(BeginQuorumEpochResponseData data) {
this.data = data;
}

public static BeginQuorumEpochResponseData singletonResponse(
Errors topLevelError,
TopicPartition topicPartition,
Errors partitionLevelError,
int leaderEpoch,
int leaderId
) {
return new BeginQuorumEpochResponseData()
.setErrorCode(topLevelError.code())
.setTopics(Collections.singletonList(
new BeginQuorumEpochResponseData.TopicData()
.setTopicName(topicPartition.topic())
.setPartitions(Collections.singletonList(
new BeginQuorumEpochResponseData.PartitionData()
.setErrorCode(partitionLevelError.code())
.setLeaderId(leaderId)
.setLeaderEpoch(leaderEpoch)
)))
);
}

@Override
public Map<Errors, Integer> errorCounts() {
Map<Errors, Integer> errors = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.common.requests;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.EndQuorumEpochRequestData;
import org.apache.kafka.common.message.EndQuorumEpochResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
Expand All @@ -26,6 +27,7 @@
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

public class EndQuorumEpochRequest extends AbstractRequest {
public static class Builder extends AbstractRequest.Builder<EndQuorumEpochRequest> {
Expand Down Expand Up @@ -95,4 +97,18 @@ public static EndQuorumEpochRequestData singletonRequest(TopicPartition topicPar
);
}

public static List<EndQuorumEpochRequestData.ReplicaInfo> preferredCandidates(EndQuorumEpochRequestData.PartitionData partition) {
if (partition.preferredCandidates().isEmpty()) {
return partition
.preferredSuccessors()
.stream()
.map(id -> new EndQuorumEpochRequestData.ReplicaInfo()
.setCandidateId(id)
.setCandidateDirectoryId(Uuid.ZERO_UUID)
)
.collect(Collectors.toList());
} else {
return partition.preferredCandidates();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@

package org.apache.kafka.common.requests;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.EndQuorumEpochResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;

import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

Expand Down Expand Up @@ -78,27 +76,6 @@ public void maybeSetThrottleTimeMs(int throttleTimeMs) {
// Not supported by the response schema
}

public static EndQuorumEpochResponseData singletonResponse(
Errors topLevelError,
TopicPartition topicPartition,
Errors partitionLevelError,
int leaderEpoch,
int leaderId
) {
return new EndQuorumEpochResponseData()
.setErrorCode(topLevelError.code())
.setTopics(Collections.singletonList(
new EndQuorumEpochResponseData.TopicData()
.setTopicName(topicPartition.topic())
.setPartitions(Collections.singletonList(
new EndQuorumEpochResponseData.PartitionData()
.setErrorCode(partitionLevelError.code())
.setLeaderId(leaderId)
.setLeaderEpoch(leaderEpoch)
)))
);
}

public static EndQuorumEpochResponse parse(ByteBuffer buffer, short version) {
return new EndQuorumEpochResponse(new EndQuorumEpochResponseData(new ByteBufferAccessor(buffer), version));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,9 @@
import org.apache.kafka.common.protocol.Errors;

import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.function.UnaryOperator;

public final class FetchSnapshotResponse extends AbstractResponse {
private final FetchSnapshotResponseData data;
Expand Down Expand Up @@ -81,33 +79,6 @@ public static FetchSnapshotResponseData withTopLevelError(Errors error) {
return new FetchSnapshotResponseData().setErrorCode(error.code());
}

/**
* Creates a FetchSnapshotResponseData with a single PartitionSnapshot for the topic partition.
*
* The partition index will already be populated when calling operator.
*
* @param topicPartition the topic partition to include
* @param operator unary operator responsible for populating all of the appropriate fields
* @return the created fetch snapshot response data
*/
public static FetchSnapshotResponseData singleton(
TopicPartition topicPartition,
UnaryOperator<FetchSnapshotResponseData.PartitionSnapshot> operator
) {
FetchSnapshotResponseData.PartitionSnapshot partitionSnapshot = operator.apply(
new FetchSnapshotResponseData.PartitionSnapshot().setIndex(topicPartition.partition())
);

return new FetchSnapshotResponseData()
.setTopics(
Collections.singletonList(
new FetchSnapshotResponseData.TopicSnapshot()
.setName(topicPartition.topic())
.setPartitions(Collections.singletonList(partitionSnapshot))
)
);
}

/**
* Finds the PartitionSnapshot for a given topic partition.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,19 +69,6 @@ public static VoteRequest parse(ByteBuffer buffer, short version) {
return new VoteRequest(new VoteRequestData(new ByteBufferAccessor(buffer), version), version);
}

public static VoteRequestData singletonRequest(TopicPartition topicPartition,
int candidateEpoch,
int candidateId,
int lastEpoch,
long lastEpochEndOffset) {
return singletonRequest(topicPartition,
null,
candidateEpoch,
candidateId,
lastEpoch,
lastEpochEndOffset);
}

public static VoteRequestData singletonRequest(TopicPartition topicPartition,
String clusterId,
int candidateEpoch,
Expand All @@ -102,5 +89,4 @@ public static VoteRequestData singletonRequest(TopicPartition topicPartition,
.setLastOffset(lastEpochEndOffset))
)));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@

package org.apache.kafka.common.requests;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.VoteResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;

import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

Expand All @@ -49,25 +47,6 @@ public VoteResponse(VoteResponseData data) {
this.data = data;
}

public static VoteResponseData singletonResponse(Errors topLevelError,
TopicPartition topicPartition,
Errors partitionLevelError,
int leaderEpoch,
int leaderId,
boolean voteGranted) {
return new VoteResponseData()
.setErrorCode(topLevelError.code())
.setTopics(Collections.singletonList(
new VoteResponseData.TopicData()
.setTopicName(topicPartition.topic())
.setPartitions(Collections.singletonList(
new VoteResponseData.PartitionData()
.setErrorCode(partitionLevelError.code())
.setLeaderId(leaderId)
.setLeaderEpoch(leaderEpoch)
.setVoteGranted(voteGranted)))));
}

@Override
public Map<Errors, Integer> errorCounts() {
Map<Errors, Integer> errors = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,38 @@
"type": "request",
"listeners": ["controller"],
"name": "BeginQuorumEpochRequest",
"validVersions": "0",
"flexibleVersions": "none",
// Version 1 adds flexible versions, voter key and leader endpoints (KIP-853)
"validVersions": "0-1",
"flexibleVersions": "1+",
"fields": [
{ "name": "ClusterId", "type": "string", "versions": "0+",
"nullableVersions": "0+", "default": "null"},
{ "name": "VoterId", "type": "int32", "versions": "1+", "entityType": "brokerId", "ignorable": true,
"about": "The voter ID of the receiving replica" },
{ "name": "Topics", "type": "[]TopicData",
"versions": "0+", "fields": [
{ "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
"about": "The topic name." },
{ "name": "Partitions", "type": "[]PartitionData",
"versions": "0+", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
"about": "The partition index." },
{ "name": "LeaderId", "type": "int32", "versions": "0+", "entityType": "brokerId",
"about": "The ID of the newly elected leader"},
{ "name": "LeaderEpoch", "type": "int32", "versions": "0+",
"about": "The epoch of the newly elected leader"}
]}
]}
{ "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
"about": "The topic name" },
{ "name": "Partitions", "type": "[]PartitionData",
"versions": "0+", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
"about": "The partition index" },
{ "name": "VoterDirectoryId", "type": "uuid", "versions": "1+", "ignorable": true,
"about": "The directory id of the receiving replica" },
{ "name": "LeaderId", "type": "int32", "versions": "0+", "entityType": "brokerId",
"about": "The ID of the newly elected leader"},
{ "name": "LeaderEpoch", "type": "int32", "versions": "0+",
"about": "The epoch of the newly elected leader"}
]
}
]
},
{ "name": "LeaderEndpoints", "type": "[]LeaderEndpoint", "versions": "1+", "ignorable": true,
"about": "Endpoints for the leader", "fields": [
{ "name": "Name", "type": "string", "versions": "1+", "mapKey": true, "about": "The name of the endpoint" },
{ "name": "Host", "type": "string", "versions": "1+", "about": "The node's hostname" },
{ "name": "Port", "type": "uint16", "versions": "1+", "about": "The node's port" }
]
}
]
}
Loading

0 comments on commit adee6f0

Please sign in to comment.