Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17609:[4/4]Convert system tests to kraft part 4 #17328

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions tests/kafkatest/tests/streams/streams_smoke_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ def __init__(self, test_context):
@cluster(num_nodes=8)
@matrix(processing_guarantee=['exactly_once_v2', 'at_least_once'],
crash=[True, False],
metadata_quorum=quorum.all_non_upgrade)
def test_streams(self, processing_guarantee, crash, metadata_quorum=quorum.zk):
metadata_quorum=[quorum.combined_kraft])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove metadata_quorum as parameter entirly ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Required to indicate to use KRaft for metadata quorum. Failing to do so without a @matrix annotation results in this error as the test will default to ZK without it.

    zk_sasl=self.zk.zk_sasl if self.quorum_info.using_zk else False, zk_tls=self.zk_client_secure,
AttributeError: 'NoneType' object has no attribute 'zk_sasl'

def test_streams(self, processing_guarantee, crash, metadata_quorum):
processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka, processing_guarantee)
processor2 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka, processing_guarantee)
processor3 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka, processing_guarantee)
Expand Down
12 changes: 1 addition & 11 deletions tests/kafkatest/tests/streams/streams_static_membership_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
from kafkatest.services.kafka import KafkaService, quorum
from kafkatest.services.streams import StaticMemberTestService
from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.tests.streams.utils import verify_stopped, stop_processors, verify_running, extract_generation_from_logs, extract_generation_id

class StreamsStaticMembershipTest(Test):
Expand All @@ -39,13 +38,8 @@ def __init__(self, test_context):
self.input_topic: {'partitions': 18},
}

self.zookeeper = (
ZookeeperService(self.test_context, 1)
if quorum.for_test(self.test_context) == quorum.zk
else None
)
self.kafka = KafkaService(self.test_context, num_nodes=3,
zk=self.zookeeper, topics=self.topics, controller_num_nodes_override=1)
zk=None, topics=self.topics, controller_num_nodes_override=1)

self.producer = VerifiableProducer(self.test_context,
1,
Expand All @@ -57,8 +51,6 @@ def __init__(self, test_context):
@cluster(num_nodes=8)
@matrix(metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[True, False])
def test_rolling_bounces_will_not_trigger_rebalance_under_static_membership(self, metadata_quorum, use_new_coordinator=False):
if self.zookeeper:
self.zookeeper.start()
self.kafka.start()

numThreads = 3
Expand Down Expand Up @@ -104,8 +96,6 @@ def test_rolling_bounces_will_not_trigger_rebalance_under_static_membership(self

self.producer.stop()
self.kafka.stop(timeout_sec=120)
if self.zookeeper:
self.zookeeper.stop()

def verify_processing(self, processors):
for processor in processors:
Expand Down
124 changes: 12 additions & 112 deletions tests/kafkatest/tests/streams/streams_upgrade_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,22 @@
from ducktape.mark import matrix, ignore
from ducktape.mark.resource import cluster
from ducktape.tests.test import Test
from ducktape.utils.util import wait_until
from kafkatest.services.kafka import KafkaService
from kafkatest.services.kafka import KafkaService, quorum
from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService, \
StreamsUpgradeTestJobRunnerService
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.tests.streams.utils import extract_generation_from_logs, extract_generation_id
from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, \
from kafkatest.version import LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, \
LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, LATEST_2_8, \
LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, LATEST_3_6, LATEST_3_7, LATEST_3_8, DEV_BRANCH, DEV_VERSION, \
KafkaVersion

# broker 0.10.0 is not compatible with newer Kafka Streams versions
# broker 0.10.1 and 0.10.2 do not support headers, as required by suppress() (since v2.2.1)
broker_upgrade_versions = [str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1),
str(LATEST_2_0), str(LATEST_2_1), str(LATEST_2_2), str(LATEST_2_3),
str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6), str(LATEST_2_7),
str(LATEST_2_8), str(LATEST_3_0), str(LATEST_3_1), str(LATEST_3_2),
broker_upgrade_versions = [str(LATEST_2_8), str(LATEST_3_0), str(LATEST_3_1), str(LATEST_3_2),
str(LATEST_3_3), str(LATEST_3_4), str(LATEST_3_5), str(LATEST_3_6),
str(LATEST_3_7), str(LATEST_3_8), str(DEV_BRANCH)]

metadata_1_versions = [str(LATEST_0_10_0)]
metadata_2_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1),
metadata_2_versions = [str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1),
str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6), str(LATEST_2_7), str(LATEST_2_8),
str(LATEST_3_0), str(LATEST_3_1), str(LATEST_3_2), str(LATEST_3_3)]
# upgrading from version (2.4...3.3) is broken and only fixed later in 3.3.3 (unreleased) and 3.4.0
Expand Down Expand Up @@ -111,102 +105,10 @@ def perform_broker_upgrade(self, to_version):
node.version = KafkaVersion(to_version)
self.kafka.start_node(node)

@ignore
@cluster(num_nodes=6)
@matrix(from_version=broker_upgrade_versions, to_version=broker_upgrade_versions)
def test_upgrade_downgrade_brokers(self, from_version, to_version):
"""
Start a smoke test client then perform rolling upgrades on the broker.
"""

if from_version == to_version:
return

self.replication = 3
self.num_kafka_nodes = 3
self.partitions = 1
self.isr = 2
self.topics = {
'echo' : { 'partitions': self.partitions, 'replication-factor': self.replication,
'configs': {"min.insync.replicas": self.isr}},
'data' : { 'partitions': self.partitions, 'replication-factor': self.replication,
'configs': {"min.insync.replicas": self.isr} },
'min' : { 'partitions': self.partitions, 'replication-factor': self.replication,
'configs': {"min.insync.replicas": self.isr} },
'max' : { 'partitions': self.partitions, 'replication-factor': self.replication,
'configs': {"min.insync.replicas": self.isr} },
'sum' : { 'partitions': self.partitions, 'replication-factor': self.replication,
'configs': {"min.insync.replicas": self.isr} },
'dif' : { 'partitions': self.partitions, 'replication-factor': self.replication,
'configs': {"min.insync.replicas": self.isr} },
'cnt' : { 'partitions': self.partitions, 'replication-factor': self.replication,
'configs': {"min.insync.replicas": self.isr} },
'avg' : { 'partitions': self.partitions, 'replication-factor': self.replication,
'configs': {"min.insync.replicas": self.isr} },
'wcnt' : { 'partitions': self.partitions, 'replication-factor': self.replication,
'configs': {"min.insync.replicas": self.isr} },
'tagg' : { 'partitions': self.partitions, 'replication-factor': self.replication,
'configs': {"min.insync.replicas": self.isr} }
}

# Setup phase
self.zk = ZookeeperService(self.test_context, num_nodes=1)
self.zk.start()

# number of nodes needs to be >= 3 for the smoke test
self.kafka = KafkaService(self.test_context, num_nodes=self.num_kafka_nodes,
zk=self.zk, version=KafkaVersion(from_version), topics=self.topics)
self.kafka.start()

# allow some time for topics to be created
wait_until(lambda: self.confirm_topics_on_all_brokers(set(self.topics.keys())),
timeout_sec=60,
err_msg="Broker did not create all topics in 60 seconds ")

self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka)

processor = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka, "at_least_once")

with self.driver.node.account.monitor_log(self.driver.STDOUT_FILE) as driver_monitor:
self.driver.start()

with processor.node.account.monitor_log(processor.STDOUT_FILE) as monitor:
processor.start()
monitor.wait_until(self.processed_data_msg,
timeout_sec=60,
err_msg="Never saw output '%s' on " % self.processed_data_msg + str(processor.node))

connected_message = "Discovered group coordinator"
with processor.node.account.monitor_log(processor.LOG_FILE) as log_monitor:
with processor.node.account.monitor_log(processor.STDOUT_FILE) as stdout_monitor:
self.perform_broker_upgrade(to_version)

log_monitor.wait_until(connected_message,
timeout_sec=120,
err_msg=("Never saw output '%s' on " % connected_message) + str(processor.node.account))

stdout_monitor.wait_until(self.processed_data_msg,
timeout_sec=60,
err_msg="Never saw output '%s' on" % self.processed_data_msg + str(processor.node.account))

# SmokeTestDriver allows up to 6 minutes to consume all
# records for the verification step so this timeout is set to
# 6 minutes (360 seconds) for consuming of verification records
# and a very conservative additional 2 minutes (120 seconds) to process
# the records in the verification step
driver_monitor.wait_until('ALL-RECORDS-DELIVERED\|PROCESSED-MORE-THAN-GENERATED',
timeout_sec=480,
err_msg="Never saw output '%s' on" % 'ALL-RECORDS-DELIVERED|PROCESSED-MORE-THAN-GENERATED' + str(self.driver.node.account))

self.driver.stop()
processor.stop()
processor.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % processor.STDOUT_FILE, allow_fail=False)

@cluster(num_nodes=6)
@matrix(from_version=metadata_1_versions)
@matrix(from_version=metadata_2_versions)
@matrix(from_version=fk_join_versions)
def test_rolling_upgrade_with_2_bounces(self, from_version):
@matrix(from_version=metadata_2_versions, metadata_quorum=[quorum.combined_kraft])
@matrix(from_version=fk_join_versions, metadata_quorum=[quorum.combined_kraft])
def test_rolling_upgrade_with_2_bounces(self, from_version, metadata_quorum):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we adding metadata_quorum parameter? Seems unncessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Required to indicate to use KRaft for metadata quorum. Failing to do so without a @matrix annotation results in this error as the test will default to ZK without it.

    zk_sasl=self.zk.zk_sasl if self.quorum_info.using_zk else False, zk_tls=self.zk_client_secure,
AttributeError: 'NoneType' object has no attribute 'zk_sasl'

"""
This test verifies that the cluster successfully upgrades despite changes in the metadata and FK
join protocols.
Expand Down Expand Up @@ -245,7 +147,8 @@ def test_rolling_upgrade_with_2_bounces(self, from_version):
self.stop_and_await()

@cluster(num_nodes=6)
def test_version_probing_upgrade(self):
@matrix(metadata_quorum=[quorum.combined_kraft])
def test_version_probing_upgrade(self, metadata_quorum):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above

"""
Starts 3 KafkaStreams instances, and upgrades one-by-one to "future version"
"""
Expand All @@ -272,8 +175,8 @@ def test_version_probing_upgrade(self):
self.stop_and_await()

@cluster(num_nodes=6)
@matrix(from_version=[str(LATEST_3_2), str(DEV_VERSION)], upgrade=[True, False])
def test_upgrade_downgrade_state_updater(self, from_version, upgrade):
@matrix(from_version=[str(LATEST_3_2), str(DEV_VERSION)], upgrade=[True, False], metadata_quorum=[quorum.combined_kraft])
def test_upgrade_downgrade_state_updater(self, from_version, upgrade, metadata_quorum):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above

"""
Starts 3 KafkaStreams instances, and enables / disables state restoration
for the instances in a rolling bounce.
Expand Down Expand Up @@ -312,10 +215,7 @@ def test_upgrade_downgrade_state_updater(self, from_version, upgrade):
self.stop_and_await()

def set_up_services(self):
self.zk = ZookeeperService(self.test_context, num_nodes=1)
self.zk.start()

self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk, topics=self.topics)
self.kafka = KafkaService(self.test_context, num_nodes=1, zk=None, topics=self.topics)
self.kafka.start()

self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka)
Expand Down
Loading