Skip to content

Commit

Permalink
KAFKA-17608, KAFKA-17604, KAFKA-16963; KRaft controller crashes when …
Browse files Browse the repository at this point in the history
…active controller is removed (apache#17146)

This change fixes a few issues.

KAFKA-17608; KRaft controller crashes when active controller is removed
When a control batch is committed, the quorum controller currently increases the last stable offset but fails to create a snapshot for that offset. This causes an issue if the quorum controller renounces and needs to revert to that offset (which has no snapshot present). Since the control batches are no-ops for the quorum controller, it does not need to update its offsets for control records. We skip handle commit logic for control batches.

KAFKA-17604; Describe quorum output missing added voters endpoints
Describe quorum output will miss endpoints of voters which were added via AddRaftVoter. This is due to a bug in LeaderState's updateVoterAndObserverStates which will pull replica state from observer states map (which does not include endpoints). The fix is to populate endpoints from the lastVoterSet passed into the method.

Reviewers: José Armando García Sancio <[email protected]>, Colin P. McCabe <[email protected]>, Chia-Ping Tsai <[email protected]>
  • Loading branch information
ahuang98 authored Sep 26, 2024
1 parent ede0c94 commit 68b9770
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,10 @@ void handleCommitBatch(Batch<ApiMessageAndVersion> batch) {
this.lastCommittedOffset = batch.lastOffset();
this.lastCommittedEpoch = batch.epoch();
maybeAdvanceLastStableOffset();
handleCommitBatchMetrics(batch);
}

void handleCommitBatchMetrics(Batch<ApiMessageAndVersion> batch) {
metrics.setLastCommittedRecordOffset(batch.lastOffset());
if (!active()) {
// On standby controllers, the last applied record offset is equals to the last
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1065,7 +1065,10 @@ public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
int epoch = batch.epoch();
List<ApiMessageAndVersion> messages = batch.records();

if (isActive) {
if (messages.isEmpty()) {
log.debug("Skipping handling commit for batch with no data records with offset {} and epoch {}.", offset, epoch);
offsetControl.handleCommitBatchMetrics(batch);
} else if (isActive) {
// If the controller is active, the records were already replayed,
// so we don't need to do it here.
log.debug("Completing purgatory items up to offset {} and epoch {}.", offset, epoch);
Expand All @@ -1075,9 +1078,6 @@ public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
offsetControl.handleCommitBatch(batch);
deferredEventQueue.completeUpTo(offsetControl.lastStableOffset());
deferredUnstableEventQueue.completeUpTo(offsetControl.lastCommittedOffset());

// The active controller can delete up to the current committed offset.
snapshotRegistry.deleteSnapshotsUpTo(offsetControl.lastStableOffset());
} else {
// If the controller is a standby, replay the records that were
// created by the active controller.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2911,7 +2911,7 @@ private long pollResigned(long currentTimeMs) {
if (quorum.isVoter()) {
transitionToCandidate(currentTimeMs);
} else {
// It is posible that the old leader is not a voter in the new voter set.
// It is possible that the old leader is not a voter in the new voter set.
// In that case increase the epoch and transition to unattached. The epoch needs
// to be increased to avoid FETCH responses with the leader being this replica.
transitionToUnattached(quorum.epoch() + 1);
Expand Down
9 changes: 8 additions & 1 deletion raft/src/main/java/org/apache/kafka/raft/LeaderState.java
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,9 @@ private void updateVoterAndObserverStates(VoterSet lastVoterSet) {

// Make sure that the replica key in the replica state matches the voter's
state.setReplicaKey(voterNode.voterKey());

// Make sure that the listeners are updated
state.updateListeners(voterNode.listeners());
newVoterStates.put(state.replicaKey.id(), state);
}
voterStates = newVoterStates;
Expand Down Expand Up @@ -752,8 +755,12 @@ void setReplicaKey(ReplicaKey replicaKey) {
this.replicaKey = replicaKey;
}

void updateListeners(Endpoints listeners) {
this.listeners = listeners;
}

void clearListeners() {
this.listeners = Endpoints.empty();
updateListeners(Endpoints.empty());
}

boolean matchesKey(ReplicaKey replicaKey) {
Expand Down
63 changes: 31 additions & 32 deletions tests/kafkatest/services/kafka/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAI
:param bool allow_zk_with_kraft: if True, then allow a KRaft broker or controller to also use ZooKeeper
:param quorum_info_provider: A function that takes this KafkaService as an argument and returns a ServiceQuorumInfo. If this is None, then the ServiceQuorumInfo is generated from the test context
:param use_new_coordinator: When true, use the new implementation of the group coordinator as per KIP-848. If this is None, the default existing group coordinator is used.
:param dynamicRaftQuorum: When true, the quorum uses kraft.version=1, controller_quorum_bootstrap_servers, and bootstraps the first controller using the standalone flag
"""

self.zk = zk
Expand Down Expand Up @@ -299,7 +300,8 @@ def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAI

if self.quorum_info.using_kraft:
self.dynamicRaftQuorum = dynamicRaftQuorum
self.first_controller_started = False
# Used to ensure not more than one controller bootstraps with the standalone flag
self.standalone_controller_bootstrapped = False
if self.quorum_info.has_brokers:
num_nodes_broker_role = num_nodes
if self.quorum_info.has_controllers:
Expand Down Expand Up @@ -449,8 +451,12 @@ def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAI
self.combined_nodes_started = 0
self.nodes_to_start = self.nodes

# Does not do any validation to check if this node is part of an isolated controller quorum or not
def node_id_as_isolated_controller(self, node):
"""
Generates the node id for a controller-only node, starting from config_property.FIRST_CONTROLLER_ID so as not
to overlap with broker id numbering.
This method does not do any validation to check this node is actually part of an isolated controller quorum.
"""
return self.idx(node) + config_property.FIRST_CONTROLLER_ID - 1

def reconfigure_zk_for_migration(self, kraft_quorum):
Expand Down Expand Up @@ -746,8 +752,7 @@ def set_protocol_and_port(self, node):
for port in self.port_mappings.values():
if port.open:
listeners.append(port.listener())
if not port.name in controller_listener_names:
advertised_listeners.append(port.advertised_listener(node))
advertised_listeners.append(port.advertised_listener(node))
protocol_map.append(port.listener_security_protocol())
controller_sec_protocol = self.isolated_controller_quorum.controller_security_protocol if self.isolated_controller_quorum \
else self.controller_security_protocol if self.quorum_info.has_brokers_and_controllers and not quorum.NodeQuorumInfo(self.quorum_info, node).has_controller_role \
Expand Down Expand Up @@ -863,12 +868,12 @@ def start_node(self, node, timeout_sec=60, **kwargs):
self.maybe_setup_broker_scram_credentials(node)

if self.quorum_info.using_kraft:
# define controller.quorum.bootstrap.servrers or controller.quorum.voters text
# define controller.quorum.bootstrap.servers or controller.quorum.voters text
security_protocol_to_use = self.controller_quorum.controller_security_protocol
first_node_id = 1 if self.quorum_info.has_brokers_and_controllers else config_property.FIRST_CONTROLLER_ID
controller_quorum_bootstrap_servers = ','.join(["%s:%s" % (node.account.hostname,
config_property.FIRST_CONTROLLER_PORT +
KafkaService.SECURITY_PROTOCOLS.index(security_protocol_to_use))
controller_quorum_bootstrap_servers = ','.join(["{}:{}".format(node.account.hostname,
config_property.FIRST_CONTROLLER_PORT +
KafkaService.SECURITY_PROTOCOLS.index(security_protocol_to_use))
for node in self.controller_quorum.nodes[:self.controller_quorum.num_nodes_controller_role]])
if self.dynamicRaftQuorum:
self.controller_quorum_bootstrap_servers = controller_quorum_bootstrap_servers
Expand All @@ -894,11 +899,11 @@ def start_node(self, node, timeout_sec=60, **kwargs):
cmd = "%s format --ignore-formatted --config %s --cluster-id %s" % (kafka_storage_script, KafkaService.CONFIG_FILE, config_property.CLUSTER_ID)
if self.dynamicRaftQuorum:
cmd += " --feature kraft.version=1"
if not self.first_controller_started and self.node_quorum_info.has_controller_role:
if not self.standalone_controller_bootstrapped and self.node_quorum_info.has_controller_role:
cmd += " --standalone"
self.standalone_controller_bootstrapped = True
self.logger.info("Running log directory format command...\n%s" % cmd)
node.account.ssh(cmd)
self.first_controller_started = True

cmd = self.start_cmd(node)
self.logger.debug("Attempting to start KafkaService %s on %s with command: %s" %\
Expand Down Expand Up @@ -1028,11 +1033,12 @@ def kafka_metadata_quorum_cmd(self, node, kafka_security_protocol=None, use_cont
else:
security_protocol_to_use = kafka_security_protocol
if use_controller_bootstrap:
bootstrap = "--bootstrap-controller %s" % (self.bootstrap_controllers("CONTROLLER_%s" % security_protocol_to_use))
bootstrap = "--bootstrap-controller {}".format(
self.bootstrap_controllers("CONTROLLER_{}".format(security_protocol_to_use)))
else:
bootstrap = "--bootstrap-server %s" % (self.bootstrap_servers(security_protocol_to_use))
bootstrap = "--bootstrap-server {}".format(self.bootstrap_servers(security_protocol_to_use))
kafka_metadata_script = self.path.script("kafka-metadata-quorum.sh", node)
return "%s %s" % (kafka_metadata_script, bootstrap)
return "{} {}".format(kafka_metadata_script, bootstrap)

def kafka_topics_cmd_with_optional_security_settings(self, node, force_use_zk_connection, kafka_security_protocol=None, offline_nodes=[]):
if self.quorum_info.using_kraft and not self.quorum_info.has_brokers:
Expand Down Expand Up @@ -1788,10 +1794,8 @@ def describe_quorum(self, node=None):
if node is None:
node = self.nodes[0]
cmd = fix_opts_for_new_jvm(node)
cmd += "%(kafka_metadata_quorum_cmd)s describe --status" % {
'kafka_metadata_quorum_cmd': self.kafka_metadata_quorum_cmd(node)
}
self.logger.info("Running describe quorum command...\n%s" % cmd)
cmd += f"{self.kafka_metadata_quorum_cmd(node)} describe --status"
self.logger.info(f"Running describe quorum command...\n{cmd}")
node.account.ssh(cmd)

output = ""
Expand All @@ -1815,11 +1819,9 @@ def add_controller(self, controllerId, controller):

controller.account.create_file(command_config_path, configs)
cmd = fix_opts_for_new_jvm(controller)
cmd += "%(kafka_metadata_quorum_cmd)s --command-config %(command_config)s add-controller" % {
'kafka_metadata_quorum_cmd': self.kafka_metadata_quorum_cmd(controller, use_controller_bootstrap=True),
'command_config': command_config_path
}
self.logger.info("Running add controller command...\n%s" % cmd)
kafka_metadata_quorum_cmd = self.kafka_metadata_quorum_cmd(controller, use_controller_bootstrap=True)
cmd += f"{kafka_metadata_quorum_cmd} --command-config {command_config_path} add-controller"
self.logger.info(f"Running add controller command...\n{cmd}")
controller.account.ssh(cmd)

def remove_controller(self, controllerId, directoryId, node=None):
Expand All @@ -1829,12 +1831,9 @@ def remove_controller(self, controllerId, directoryId, node=None):
if node is None:
node = self.nodes[0]
cmd = fix_opts_for_new_jvm(node)
cmd += "%(kafka_metadata_quorum_cmd)s remove-controller -i %(controller_id)s -d %(directory_id)s" % {
'kafka_metadata_quorum_cmd': self.kafka_metadata_quorum_cmd(node, use_controller_bootstrap=True),
'controller_id': controllerId,
'directory_id': directoryId
}
self.logger.info("Running remove controller command...\n%s" % cmd)
kafka_metadata_quorum_cmd = self.kafka_metadata_quorum_cmd(node, use_controller_bootstrap=True)
cmd += f"{kafka_metadata_quorum_cmd} remove-controller -i {controllerId} -d {directoryId}"
self.logger.info(f"Running remove controller command...\n{cmd}")
node.account.ssh(cmd)

def zk_connect_setting(self):
Expand All @@ -1844,17 +1843,17 @@ def zk_connect_setting(self):

def __bootstrap_servers(self, port, validate=True, offline_nodes=[]):
if validate and not port.open:
raise ValueError("We are retrieving bootstrap servers for the port: %s which is not currently open. - " %
str(port.port_number))
raise ValueError(f"We are retrieving bootstrap servers for the port: {str(port.port_number)} "
f"which is not currently open.")

return ','.join([node.account.hostname + ":" + str(port.port_number)
for node in self.nodes
if node not in offline_nodes])

def __bootstrap_controllers(self, port, validate=True, offline_nodes=[]):
if validate and not port.open:
raise ValueError("We are retrieving bootstrap servers for the port: %s which is not currently open. - " %
str(port.port_number))
raise ValueError(f"We are retrieving bootstrap controllers for the port: {str(port.port_number)} "
f"which is not currently open.")

return ','.join([node.account.hostname + ":" + str(port.port_number)
for node in self.controller_quorum.nodes[:self.controller_quorum.num_nodes_controller_role]
Expand Down
Loading

0 comments on commit 68b9770

Please sign in to comment.