Skip to content

Commit

Permalink
Part 3 of 4 converting streams system tests to KRaft
Browse files Browse the repository at this point in the history
  • Loading branch information
bbejeck committed Sep 30, 2024
1 parent e27d0df commit ada336f
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 82 deletions.
78 changes: 12 additions & 66 deletions tests/kafkatest/tests/streams/streams_broker_compatibility_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,34 +18,27 @@
from ducktape.mark.resource import cluster
from ducktape.tests.test import Test
from ducktape.utils.util import wait_until
from kafkatest.services.kafka import KafkaService
from kafkatest.services.kafka import KafkaService, quorum
from kafkatest.services.streams import StreamsBrokerCompatibilityService
from kafkatest.services.verifiable_consumer import VerifiableConsumer
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.version import LATEST_0_11_0, LATEST_0_10_2, LATEST_0_10_1, LATEST_0_10_0, LATEST_1_0, LATEST_1_1, \
LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, LATEST_2_8, \
from kafkatest.version import LATEST_2_8, \
LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, LATEST_3_6, LATEST_3_7, LATEST_3_8, KafkaVersion


class StreamsBrokerCompatibility(Test):
"""
These tests validates that
- Streams works for older brokers 0.11 (or newer)
- Streams w/ EOS-alpha works for older brokers 0.11 (or newer)
- Streams w/ EOS-v2 works for older brokers 2.5 (or newer)
- Streams fails fast for older brokers 0.10.0, 0.10.2, and 0.10.1
- Streams w/ EOS-v2 fails fast for older brokers 2.4 or older
"""

input = "brokerCompatibilitySourceTopic"
output = "brokerCompatibilitySinkTopic"

def __init__(self, test_context):
super(StreamsBrokerCompatibility, self).__init__(test_context=test_context)
self.zk = ZookeeperService(test_context, num_nodes=1)
self.kafka = KafkaService(test_context,
num_nodes=1,
zk=self.zk,
zk=None,
topics={
self.input: {'partitions': 1, 'replication-factor': 1},
self.output: {'partitions': 1, 'replication-factor': 1}
Expand All @@ -60,18 +53,15 @@ def __init__(self, test_context):
self.output,
"stream-broker-compatibility-verify-consumer")

def setUp(self):
self.zk.start()


@cluster(num_nodes=4)
@matrix(broker_version=[str(LATEST_0_11_0),str(LATEST_1_0),str(LATEST_1_1),str(LATEST_2_0),
str(LATEST_2_1),str(LATEST_2_2),str(LATEST_2_3),str(LATEST_2_4),
str(LATEST_2_5),str(LATEST_2_6),str(LATEST_2_7),str(LATEST_2_8),
@matrix(broker_version=[str(LATEST_2_8),
str(LATEST_3_0),str(LATEST_3_1),str(LATEST_3_2),str(LATEST_3_3),
str(LATEST_3_4),str(LATEST_3_5),str(LATEST_3_6),str(LATEST_3_7),
str(LATEST_3_8)])
def test_compatible_brokers_eos_disabled(self, broker_version):
str(LATEST_3_8)],
metadata_quorum=[quorum.combined_kraft]
)
def test_compatible_brokers_eos_disabled(self, broker_version, metadata_quorum):
self.kafka.set_version(KafkaVersion(broker_version))
self.kafka.start()

Expand All @@ -88,11 +78,12 @@ def test_compatible_brokers_eos_disabled(self, broker_version):
self.kafka.stop()

@cluster(num_nodes=4)
@matrix(broker_version=[str(LATEST_2_5),str(LATEST_2_6),str(LATEST_2_7),str(LATEST_2_8),
@matrix(broker_version=[str(LATEST_2_8),
str(LATEST_3_0),str(LATEST_3_1),str(LATEST_3_2),str(LATEST_3_3),
str(LATEST_3_4),str(LATEST_3_5),str(LATEST_3_6),str(LATEST_3_7),
str(LATEST_3_8)])
def test_compatible_brokers_eos_v2_enabled(self, broker_version):
str(LATEST_3_8)],
metadata_quorum=[quorum.combined_kraft])
def test_compatible_brokers_eos_v2_enabled(self, broker_version, metadata_quorum):
self.kafka.set_version(KafkaVersion(broker_version))
self.kafka.start()

Expand All @@ -107,48 +98,3 @@ def test_compatible_brokers_eos_v2_enabled(self, broker_version):

self.consumer.stop()
self.kafka.stop()

@cluster(num_nodes=4)
@parametrize(broker_version=str(LATEST_0_10_2))
@parametrize(broker_version=str(LATEST_0_10_1))
@parametrize(broker_version=str(LATEST_0_10_0))
def test_fail_fast_on_incompatible_brokers(self, broker_version):
self.kafka.set_version(KafkaVersion(broker_version))
self.kafka.start()

processor = StreamsBrokerCompatibilityService(self.test_context, self.kafka, "at_least_once")

with processor.node.account.monitor_log(processor.STDERR_FILE) as monitor:
processor.start()
monitor.wait_until('FATAL: An unexpected exception org.apache.kafka.common.errors.UnsupportedVersionException',
timeout_sec=60,
err_msg="Never saw 'FATAL: An unexpected exception org.apache.kafka.common.errors.UnsupportedVersionException " + str(processor.node.account))

self.kafka.stop()

@cluster(num_nodes=4)
@parametrize(broker_version=str(LATEST_2_4))
@parametrize(broker_version=str(LATEST_2_3))
@parametrize(broker_version=str(LATEST_2_2))
@parametrize(broker_version=str(LATEST_2_1))
@parametrize(broker_version=str(LATEST_2_0))
@parametrize(broker_version=str(LATEST_1_1))
@parametrize(broker_version=str(LATEST_1_0))
@parametrize(broker_version=str(LATEST_0_11_0))
def test_fail_fast_on_incompatible_brokers_if_eos_v2_enabled(self, broker_version):
self.kafka.set_version(KafkaVersion(broker_version))
self.kafka.start()

processor = StreamsBrokerCompatibilityService(self.test_context, self.kafka, "exactly_once_v2")

with processor.node.account.monitor_log(processor.STDERR_FILE) as monitor:
with processor.node.account.monitor_log(processor.LOG_FILE) as log:
processor.start()
log.wait_until('Shutting down because the Kafka cluster seems to be on a too old version. Setting processing\.guarantee="exactly_once_v2" requires broker version 2\.5 or higher\.',
timeout_sec=60,
err_msg="Never saw 'Shutting down because the Kafka cluster seems to be on a too old version. Setting `processing.guarantee=\"exactly_once_v2\"` requires broker version 2.5 or higher.' log message " + str(processor.node.account))
monitor.wait_until('FATAL: An unexpected exception org.apache.kafka.common.errors.UnsupportedVersionException',
timeout_sec=60,
err_msg="Never saw 'FATAL: An unexpected exception org.apache.kafka.common.errors.UnsupportedVersionException' error message " + str(processor.node.account))

self.kafka.stop()
16 changes: 3 additions & 13 deletions tests/kafkatest/tests/streams/streams_optimized_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from kafkatest.services.streams import StreamsOptimizedUpgradeTestService
from kafkatest.services.streams import StreamsResetter
from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.tests.streams.utils import stop_processors

class StreamsOptimizedTest(Test):
Expand All @@ -46,13 +45,8 @@ def __init__(self, test_context):
self.join_topic: {'partitions': 6}
}

self.zookeeper = (
ZookeeperService(self.test_context, 1)
if quorum.for_test(self.test_context) == quorum.zk
else None
)
self.kafka = KafkaService(self.test_context, num_nodes=3,
zk=self.zookeeper, topics=self.topics, controller_num_nodes_override=1)
self.kafka = KafkaService(self.test_context, num_nodes=3, controller_num_nodes_override=1,
zk=None, topics=self.topics)

self.producer = VerifiableProducer(self.test_context,
1,
Expand All @@ -62,10 +56,8 @@ def __init__(self, test_context):
acks=1)

@cluster(num_nodes=9)
@matrix(metadata_quorum=[quorum.isolated_kraft])
@matrix(metadata_quorum=[quorum.combined_kraft])
def test_upgrade_optimized_topology(self, metadata_quorum):
if self.zookeeper:
self.zookeeper.start()
self.kafka.start()

processor1 = StreamsOptimizedUpgradeTestService(self.test_context, self.kafka)
Expand Down Expand Up @@ -111,8 +103,6 @@ def test_upgrade_optimized_topology(self, metadata_quorum):
self.logger.info("teardown")
self.producer.stop()
self.kafka.stop()
if self.zookeeper:
self.zookeeper.stop()

def reset_application(self):
resetter = StreamsResetter(self.test_context, self.kafka, topic = self.input_topic, applicationId = 'StreamsOptimizedTest')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ def __init__(self, test_context):

@cluster(num_nodes=8)
@matrix(crash=[False, True],
processing_guarantee=['exactly_once', 'exactly_once_v2'],
metadata_quorum=[quorum.isolated_kraft])
processing_guarantee=['exactly_once_v2'],
metadata_quorum=[quorum.combined_kraft])
def test_streams(self, crash, processing_guarantee, metadata_quorum):
driver = StreamsRelationalSmokeTestService(self.test_context, self.kafka, "driver", "ignored", "ignored")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def __init__(self, test_context):
self.driver = StreamsSmokeTestShutdownDeadlockService(test_context, self.kafka)

@cluster(num_nodes=3)
@matrix(metadata_quorum=[quorum.isolated_kraft])
@matrix(metadata_quorum=[quorum.combined_kraft])
def test_shutdown_wont_deadlock(self, metadata_quorum):
"""
Start ShutdownDeadLockTest, wait for upt to 1 minute, and check that the process exited.
Expand Down

0 comments on commit ada336f

Please sign in to comment.