-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-17609:[4/4]Convert system tests to kraft part 4 #17328
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,28 +18,22 @@ | |
from ducktape.mark import matrix, ignore | ||
from ducktape.mark.resource import cluster | ||
from ducktape.tests.test import Test | ||
from ducktape.utils.util import wait_until | ||
from kafkatest.services.kafka import KafkaService | ||
from kafkatest.services.kafka import KafkaService, quorum | ||
from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService, \ | ||
StreamsUpgradeTestJobRunnerService | ||
from kafkatest.services.zookeeper import ZookeeperService | ||
from kafkatest.tests.streams.utils import extract_generation_from_logs, extract_generation_id | ||
from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, \ | ||
from kafkatest.version import LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, \ | ||
LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, LATEST_2_8, \ | ||
LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, LATEST_3_6, LATEST_3_7, LATEST_3_8, DEV_BRANCH, DEV_VERSION, \ | ||
KafkaVersion | ||
|
||
# broker 0.10.0 is not compatible with newer Kafka Streams versions | ||
# broker 0.10.1 and 0.10.2 do not support headers, as required by suppress() (since v2.2.1) | ||
broker_upgrade_versions = [str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1), | ||
str(LATEST_2_0), str(LATEST_2_1), str(LATEST_2_2), str(LATEST_2_3), | ||
str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6), str(LATEST_2_7), | ||
str(LATEST_2_8), str(LATEST_3_0), str(LATEST_3_1), str(LATEST_3_2), | ||
broker_upgrade_versions = [str(LATEST_2_8), str(LATEST_3_0), str(LATEST_3_1), str(LATEST_3_2), | ||
str(LATEST_3_3), str(LATEST_3_4), str(LATEST_3_5), str(LATEST_3_6), | ||
str(LATEST_3_7), str(LATEST_3_8), str(DEV_BRANCH)] | ||
|
||
metadata_1_versions = [str(LATEST_0_10_0)] | ||
metadata_2_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1), | ||
metadata_2_versions = [str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1), | ||
str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6), str(LATEST_2_7), str(LATEST_2_8), | ||
str(LATEST_3_0), str(LATEST_3_1), str(LATEST_3_2), str(LATEST_3_3)] | ||
# upgrading from version (2.4...3.3) is broken and only fixed later in 3.3.3 (unreleased) and 3.4.0 | ||
|
@@ -111,102 +105,10 @@ def perform_broker_upgrade(self, to_version): | |
node.version = KafkaVersion(to_version) | ||
self.kafka.start_node(node) | ||
|
||
@ignore | ||
@cluster(num_nodes=6) | ||
@matrix(from_version=broker_upgrade_versions, to_version=broker_upgrade_versions) | ||
def test_upgrade_downgrade_brokers(self, from_version, to_version): | ||
""" | ||
Start a smoke test client then perform rolling upgrades on the broker. | ||
""" | ||
|
||
if from_version == to_version: | ||
return | ||
|
||
self.replication = 3 | ||
self.num_kafka_nodes = 3 | ||
self.partitions = 1 | ||
self.isr = 2 | ||
self.topics = { | ||
'echo' : { 'partitions': self.partitions, 'replication-factor': self.replication, | ||
'configs': {"min.insync.replicas": self.isr}}, | ||
'data' : { 'partitions': self.partitions, 'replication-factor': self.replication, | ||
'configs': {"min.insync.replicas": self.isr} }, | ||
'min' : { 'partitions': self.partitions, 'replication-factor': self.replication, | ||
'configs': {"min.insync.replicas": self.isr} }, | ||
'max' : { 'partitions': self.partitions, 'replication-factor': self.replication, | ||
'configs': {"min.insync.replicas": self.isr} }, | ||
'sum' : { 'partitions': self.partitions, 'replication-factor': self.replication, | ||
'configs': {"min.insync.replicas": self.isr} }, | ||
'dif' : { 'partitions': self.partitions, 'replication-factor': self.replication, | ||
'configs': {"min.insync.replicas": self.isr} }, | ||
'cnt' : { 'partitions': self.partitions, 'replication-factor': self.replication, | ||
'configs': {"min.insync.replicas": self.isr} }, | ||
'avg' : { 'partitions': self.partitions, 'replication-factor': self.replication, | ||
'configs': {"min.insync.replicas": self.isr} }, | ||
'wcnt' : { 'partitions': self.partitions, 'replication-factor': self.replication, | ||
'configs': {"min.insync.replicas": self.isr} }, | ||
'tagg' : { 'partitions': self.partitions, 'replication-factor': self.replication, | ||
'configs': {"min.insync.replicas": self.isr} } | ||
} | ||
|
||
# Setup phase | ||
self.zk = ZookeeperService(self.test_context, num_nodes=1) | ||
self.zk.start() | ||
|
||
# number of nodes needs to be >= 3 for the smoke test | ||
self.kafka = KafkaService(self.test_context, num_nodes=self.num_kafka_nodes, | ||
zk=self.zk, version=KafkaVersion(from_version), topics=self.topics) | ||
self.kafka.start() | ||
|
||
# allow some time for topics to be created | ||
wait_until(lambda: self.confirm_topics_on_all_brokers(set(self.topics.keys())), | ||
timeout_sec=60, | ||
err_msg="Broker did not create all topics in 60 seconds ") | ||
|
||
self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka) | ||
|
||
processor = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka, "at_least_once") | ||
|
||
with self.driver.node.account.monitor_log(self.driver.STDOUT_FILE) as driver_monitor: | ||
self.driver.start() | ||
|
||
with processor.node.account.monitor_log(processor.STDOUT_FILE) as monitor: | ||
processor.start() | ||
monitor.wait_until(self.processed_data_msg, | ||
timeout_sec=60, | ||
err_msg="Never saw output '%s' on " % self.processed_data_msg + str(processor.node)) | ||
|
||
connected_message = "Discovered group coordinator" | ||
with processor.node.account.monitor_log(processor.LOG_FILE) as log_monitor: | ||
with processor.node.account.monitor_log(processor.STDOUT_FILE) as stdout_monitor: | ||
self.perform_broker_upgrade(to_version) | ||
|
||
log_monitor.wait_until(connected_message, | ||
timeout_sec=120, | ||
err_msg=("Never saw output '%s' on " % connected_message) + str(processor.node.account)) | ||
|
||
stdout_monitor.wait_until(self.processed_data_msg, | ||
timeout_sec=60, | ||
err_msg="Never saw output '%s' on" % self.processed_data_msg + str(processor.node.account)) | ||
|
||
# SmokeTestDriver allows up to 6 minutes to consume all | ||
# records for the verification step so this timeout is set to | ||
# 6 minutes (360 seconds) for consuming of verification records | ||
# and a very conservative additional 2 minutes (120 seconds) to process | ||
# the records in the verification step | ||
driver_monitor.wait_until('ALL-RECORDS-DELIVERED\|PROCESSED-MORE-THAN-GENERATED', | ||
timeout_sec=480, | ||
err_msg="Never saw output '%s' on" % 'ALL-RECORDS-DELIVERED|PROCESSED-MORE-THAN-GENERATED' + str(self.driver.node.account)) | ||
|
||
self.driver.stop() | ||
processor.stop() | ||
processor.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % processor.STDOUT_FILE, allow_fail=False) | ||
|
||
@cluster(num_nodes=6) | ||
@matrix(from_version=metadata_1_versions) | ||
@matrix(from_version=metadata_2_versions) | ||
@matrix(from_version=fk_join_versions) | ||
def test_rolling_upgrade_with_2_bounces(self, from_version): | ||
@matrix(from_version=metadata_2_versions, metadata_quorum=[quorum.combined_kraft]) | ||
@matrix(from_version=fk_join_versions, metadata_quorum=[quorum.combined_kraft]) | ||
def test_rolling_upgrade_with_2_bounces(self, from_version, metadata_quorum): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why are we adding There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Required to indicate to use KRaft for metadata quorum. Failing to do so without a
|
||
""" | ||
This test verifies that the cluster successfully upgrades despite changes in the metadata and FK | ||
join protocols. | ||
|
@@ -245,7 +147,8 @@ def test_rolling_upgrade_with_2_bounces(self, from_version): | |
self.stop_and_await() | ||
|
||
@cluster(num_nodes=6) | ||
def test_version_probing_upgrade(self): | ||
@matrix(metadata_quorum=[quorum.combined_kraft]) | ||
def test_version_probing_upgrade(self, metadata_quorum): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. as above |
||
""" | ||
Starts 3 KafkaStreams instances, and upgrades one-by-one to "future version" | ||
""" | ||
|
@@ -272,8 +175,8 @@ def test_version_probing_upgrade(self): | |
self.stop_and_await() | ||
|
||
@cluster(num_nodes=6) | ||
@matrix(from_version=[str(LATEST_3_2), str(DEV_VERSION)], upgrade=[True, False]) | ||
def test_upgrade_downgrade_state_updater(self, from_version, upgrade): | ||
@matrix(from_version=[str(LATEST_3_2), str(DEV_VERSION)], upgrade=[True, False], metadata_quorum=[quorum.combined_kraft]) | ||
def test_upgrade_downgrade_state_updater(self, from_version, upgrade, metadata_quorum): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. as above |
||
""" | ||
Starts 3 KafkaStreams instances, and enables / disables state restoration | ||
for the instances in a rolling bounce. | ||
|
@@ -312,10 +215,7 @@ def test_upgrade_downgrade_state_updater(self, from_version, upgrade): | |
self.stop_and_await() | ||
|
||
def set_up_services(self): | ||
self.zk = ZookeeperService(self.test_context, num_nodes=1) | ||
self.zk.start() | ||
|
||
self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk, topics=self.topics) | ||
self.kafka = KafkaService(self.test_context, num_nodes=1, zk=None, topics=self.topics) | ||
self.kafka.start() | ||
|
||
self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we remove
metadata_quorum
as parameter entirly ?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Required to indicate to use KRaft for metadata quorum. Failing to do so without a
@matrix
annotation results in this error as the test will default to ZK without it.