Skip to content

Commit

Permalink
KAFKA-17609:[1/4] Changes needed to convert system tests to use KRaft…
Browse files Browse the repository at this point in the history
… and remove ZK (#17275)

This is part one of a multi-pr effort to convert Kafka Streams system tests to KRaft. I decided to break down the changes into multiple PRs to reduce the review load

Reviewers: Matthias Sax <[email protected]>
  • Loading branch information
bbejeck authored Nov 5, 2024
1 parent 8c62c2a commit 36c131e
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 49 deletions.
15 changes: 2 additions & 13 deletions tests/kafkatest/services/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
from kafkatest.services.kafka import KafkaConfig
from kafkatest.services.monitor.jmx import JmxMixin
from kafkatest.version import KafkaVersion, LATEST_0_10_0, LATEST_0_10_1

STATE_DIR = "state.dir"

Expand Down Expand Up @@ -627,11 +626,6 @@ def prop_file(self):

def start_cmd(self, node):
args = self.args.copy()

if self.KAFKA_STREAMS_VERSION == str(LATEST_0_10_0) or self.KAFKA_STREAMS_VERSION == str(LATEST_0_10_1):
args['zk'] = self.kafka.zk.connect_setting()
else:
args['zk'] = ""
args['config_file'] = self.CONFIG_FILE
args['stdout'] = self.STDOUT_FILE
args['stderr'] = self.STDERR_FILE
Expand All @@ -642,7 +636,7 @@ def start_cmd(self, node):

cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
"INCLUDE_TEST_JARS=true UPGRADE_KAFKA_STREAMS_TEST_VERSION=%(version)s " \
" %(kafka_run_class)s %(streams_class_name)s %(zk)s %(config_file)s " \
" %(kafka_run_class)s %(streams_class_name)s %(config_file)s " \
" & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args

self.logger.info("Executing: " + cmd)
Expand Down Expand Up @@ -732,11 +726,6 @@ def set_upgrade_phase(self, upgrade_phase):

def start_cmd(self, node):
args = self.args.copy()

if self.KAFKA_STREAMS_VERSION == str(LATEST_0_10_0) or self.KAFKA_STREAMS_VERSION == str(LATEST_0_10_1):
args['zk'] = self.kafka.zk.connect_setting()
else:
args['zk'] = ""
args['config_file'] = self.CONFIG_FILE
args['stdout'] = self.STDOUT_FILE
args['stderr'] = self.STDERR_FILE
Expand All @@ -747,7 +736,7 @@ def start_cmd(self, node):

cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
"INCLUDE_TEST_JARS=true UPGRADE_KAFKA_STREAMS_TEST_VERSION=%(version)s " \
" %(kafka_run_class)s %(streams_class_name)s %(zk)s %(config_file)s " \
" %(kafka_run_class)s %(streams_class_name)s %(config_file)s " \
" & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args

self.logger.info("Executing: " + cmd)
Expand Down
4 changes: 2 additions & 2 deletions tests/kafkatest/tests/streams/base_streams_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ class BaseStreamsTest(KafkaTest):
Extends KafkaTest which manages setting up Kafka Cluster and Zookeeper
see tests/kafkatest/tests/kafka_test.py for more info
"""
def __init__(self, test_context, topics, num_zk=1, num_brokers=3):
super(BaseStreamsTest, self).__init__(test_context, num_zk, num_brokers, topics)
def __init__(self, test_context, topics, num_controllers=1, num_brokers=3):
super(BaseStreamsTest, self).__init__(test_context, num_controllers, num_brokers, topics)

def get_consumer(self, client_id, topic, num_messages):
return VerifiableConsumer(self.test_context,
Expand Down
16 changes: 6 additions & 10 deletions tests/kafkatest/tests/streams/streams_application_upgrade_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@
from ducktape.mark.resource import cluster
from ducktape.tests.test import Test
from ducktape.utils.util import wait_until
from kafkatest.services.kafka import KafkaService
from kafkatest.services.kafka import KafkaService, quorum
from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.version import LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, LATEST_2_8, \
LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, LATEST_3_6, LATEST_3_7, LATEST_3_8, DEV_VERSION, KafkaVersion

Expand Down Expand Up @@ -56,9 +55,9 @@ def perform_broker_upgrade(self, to_version):
node.version = KafkaVersion(to_version)
self.kafka.start_node(node)

@cluster(num_nodes=6)
@matrix(from_version=smoke_test_versions, bounce_type=["full"])
def test_app_upgrade(self, from_version, bounce_type):
@cluster(num_nodes=9)
@matrix(from_version=smoke_test_versions, bounce_type=["full"], metadata_quorum=[quorum.combined_kraft])
def test_app_upgrade(self, from_version, bounce_type, metadata_quorum):
"""
Starts 3 KafkaStreams instances with <old_version>, and upgrades one-by-one to <new_version>
"""
Expand All @@ -68,10 +67,7 @@ def test_app_upgrade(self, from_version, bounce_type):
if from_version == to_version:
return

self.zk = ZookeeperService(self.test_context, num_nodes=1)
self.zk.start()

self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk, topics={
self.kafka = KafkaService(self.test_context, num_nodes=3, zk=None, topics={
'echo' : { 'partitions': 5, 'replication-factor': 1 },
'data' : { 'partitions': 5, 'replication-factor': 1 },
'min' : { 'partitions': 5, 'replication-factor': 1 },
Expand All @@ -86,7 +82,7 @@ def test_app_upgrade(self, from_version, bounce_type):
'avg' : { 'partitions': 5, 'replication-factor': 1 },
'wcnt' : { 'partitions': 5, 'replication-factor': 1 },
'tagg' : { 'partitions': 5, 'replication-factor': 1 }
})
}, controller_num_nodes_override=1)
self.kafka.start()

self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka)
Expand Down
32 changes: 12 additions & 20 deletions tests/kafkatest/tests/streams/streams_broker_bounce_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
from ducktape.mark import matrix
from ducktape.mark import ignore
from kafkatest.services.kafka import KafkaService, quorum
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService
import time
import signal
Expand Down Expand Up @@ -152,16 +151,8 @@ def confirm_topics_on_all_brokers(self, expected_topic_set):

def setup_system(self, start_processor=True, num_threads=3):
# Setup phase
self.zk = (
ZookeeperService(self.test_context, 1)
if quorum.for_test(self.test_context) == quorum.zk
else None
)
if self.zk:
self.zk.start()

self.kafka = KafkaService(self.test_context, num_nodes=self.replication, zk=self.zk, topics=self.topics,
controller_num_nodes_override=1)

self.kafka = KafkaService(self.test_context, num_nodes=self.replication, zk=None, topics=self.topics)
self.kafka.start()

# allow some time for topics to be created
Expand Down Expand Up @@ -216,7 +207,7 @@ def collect_results(self, sleep_time_secs):
broker_type=["leader"],
num_threads=[1, 3],
sleep_time_secs=[120],
metadata_quorum=[quorum.isolated_kraft])
metadata_quorum=[quorum.combined_kraft])
def test_broker_type_bounce(self, failure_mode, broker_type, sleep_time_secs, num_threads, metadata_quorum):
"""
Start a smoke test client, then kill one particular broker and ensure data is still received
Expand All @@ -238,8 +229,9 @@ def test_broker_type_bounce(self, failure_mode, broker_type, sleep_time_secs, nu
@cluster(num_nodes=7)
@matrix(failure_mode=["clean_shutdown"],
broker_type=["controller"],
sleep_time_secs=[0])
def test_broker_type_bounce_at_start(self, failure_mode, broker_type, sleep_time_secs):
sleep_time_secs=[0],
metadata_quorum=[quorum.combined_kraft])
def test_broker_type_bounce_at_start(self, failure_mode, broker_type, sleep_time_secs, metadata_quorum):
"""
Start a smoke test client, then kill one particular broker immediately before streams stats
Streams should throw an exception since it cannot create topics with the desired
Expand All @@ -257,11 +249,11 @@ def test_broker_type_bounce_at_start(self, failure_mode, broker_type, sleep_time

return self.collect_results(sleep_time_secs)

@cluster(num_nodes=7)
@cluster(num_nodes=10)
@matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"],
num_failures=[2],
metadata_quorum=quorum.all_non_upgrade)
def test_many_brokers_bounce(self, failure_mode, num_failures, metadata_quorum=quorum.zk):
metadata_quorum=[quorum.isolated_kraft])
def test_many_brokers_bounce(self, failure_mode, num_failures, metadata_quorum):
"""
Start a smoke test client, then kill a few brokers and ensure data is still received
Record if records are delivered
Expand All @@ -276,11 +268,11 @@ def test_many_brokers_bounce(self, failure_mode, num_failures, metadata_quorum=q

return self.collect_results(120)

@cluster(num_nodes=7)
@cluster(num_nodes=10)
@matrix(failure_mode=["clean_bounce", "hard_bounce"],
num_failures=[3],
metadata_quorum=quorum.all_non_upgrade)
def test_all_brokers_bounce(self, failure_mode, num_failures, metadata_quorum=quorum.zk):
metadata_quorum=[quorum.isolated_kraft])
def test_all_brokers_bounce(self, failure_mode, num_failures, metadata_quorum):
"""
Start a smoke test client, then kill a few brokers and ensure data is still received
Record if records are delivered
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def setUp(self):
self.zk.start()

@cluster(num_nodes=7)
@matrix(metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[True, False])
@matrix(metadata_quorum=[quorum.combined_kraft], use_new_coordinator=[True, False])
def test_streams_resilient_to_broker_down(self, metadata_quorum, use_new_coordinator=False):
self.kafka.start()

Expand Down Expand Up @@ -82,7 +82,7 @@ def test_streams_resilient_to_broker_down(self, metadata_quorum, use_new_coordin
self.kafka.stop()

@cluster(num_nodes=7)
@matrix(metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[True, False])
@matrix(metadata_quorum=[quorum.combined_kraft], use_new_coordinator=[True, False])
def test_streams_runs_with_broker_down_initially(self, metadata_quorum, use_new_coordinator=False):
self.kafka.start()
node = self.kafka.leader(self.inputTopic)
Expand Down Expand Up @@ -150,7 +150,7 @@ def test_streams_runs_with_broker_down_initially(self, metadata_quorum, use_new_
self.kafka.stop()

@cluster(num_nodes=9)
@matrix(metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[True, False])
@matrix(metadata_quorum=[quorum.combined_kraft], use_new_coordinator=[True, False])
def test_streams_should_scale_in_while_brokers_down(self, metadata_quorum, use_new_coordinator=False):
self.kafka.start()

Expand Down Expand Up @@ -229,7 +229,7 @@ def test_streams_should_scale_in_while_brokers_down(self, metadata_quorum, use_n
self.kafka.stop()

@cluster(num_nodes=9)
@matrix(metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[True, False])
@matrix(metadata_quorum=[quorum.combined_kraft], use_new_coordinator=[True, False])
def test_streams_should_failover_while_brokers_down(self, metadata_quorum, use_new_coordinator=False):
self.kafka.start()

Expand Down

0 comments on commit 36c131e

Please sign in to comment.