Skip to content

Commit

Permalink
MINOR: Fix kafkatest advertised listeners (apache#17294)
Browse files Browse the repository at this point in the history
Followup for apache#17146

Reviewers: Bill Bejeck <[email protected]>
  • Loading branch information
ahuang98 authored Sep 30, 2024
1 parent bb11257 commit e27d0df
Showing 1 changed file with 14 additions and 9 deletions.
23 changes: 14 additions & 9 deletions tests/kafkatest/services/kafka/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAI
self.controller_quorum = None # will define below if necessary
self.isolated_controller_quorum = None # will define below if necessary
self.configured_for_zk_migration = False
self.dynamicRaftQuorum = False

# Set use_new_coordinator based on context and arguments.
# If not specified, the default config is used.
Expand Down Expand Up @@ -752,7 +753,9 @@ def set_protocol_and_port(self, node):
for port in self.port_mappings.values():
if port.open:
listeners.append(port.listener())
advertised_listeners.append(port.advertised_listener(node))
if (self.dynamicRaftQuorum and quorum.NodeQuorumInfo(self.quorum_info, node).has_controller_role) or \
port.name not in controller_listener_names:
advertised_listeners.append(port.advertised_listener(node))
protocol_map.append(port.listener_security_protocol())
controller_sec_protocol = self.isolated_controller_quorum.controller_security_protocol if self.isolated_controller_quorum \
else self.controller_security_protocol if self.quorum_info.has_brokers_and_controllers and not quorum.NodeQuorumInfo(self.quorum_info, node).has_controller_role \
Expand Down Expand Up @@ -871,16 +874,18 @@ def start_node(self, node, timeout_sec=60, **kwargs):
# define controller.quorum.bootstrap.servers or controller.quorum.voters text
security_protocol_to_use = self.controller_quorum.controller_security_protocol
first_node_id = 1 if self.quorum_info.has_brokers_and_controllers else config_property.FIRST_CONTROLLER_ID
controller_quorum_bootstrap_servers = ','.join(["{}:{}".format(node.account.hostname,
config_property.FIRST_CONTROLLER_PORT +
KafkaService.SECURITY_PROTOCOLS.index(security_protocol_to_use))
for node in self.controller_quorum.nodes[:self.controller_quorum.num_nodes_controller_role]])
if self.dynamicRaftQuorum:
self.controller_quorum_bootstrap_servers = controller_quorum_bootstrap_servers
self.controller_quorum_bootstrap_servers = ','.join(["{}:{}".format(node.account.hostname,
config_property.FIRST_CONTROLLER_PORT +
KafkaService.SECURITY_PROTOCOLS.index(security_protocol_to_use))
for node in self.controller_quorum.nodes[:self.controller_quorum.num_nodes_controller_role]])
else:
self.controller_quorum_voters = ','.join(["%s@%s" % (self.controller_quorum.idx(node) + first_node_id - 1,
bootstrap_server)
for bootstrap_server in controller_quorum_bootstrap_servers.split(',')])
self.controller_quorum_voters = ','.join(["{}@{}:{}".format(self.controller_quorum.idx(node) +
first_node_id - 1,
node.account.hostname,
config_property.FIRST_CONTROLLER_PORT +
KafkaService.SECURITY_PROTOCOLS.index(security_protocol_to_use))
for node in self.controller_quorum.nodes[:self.controller_quorum.num_nodes_controller_role]])
# define controller.listener.names
self.controller_listener_names = ','.join(self.controller_listener_name_list(node))
# define sasl.mechanism.controller.protocol to match the isolated quorum if one exists
Expand Down

0 comments on commit e27d0df

Please sign in to comment.