diff --git a/tests/kafkatest/tests/streams/base_streams_test.py b/tests/kafkatest/tests/streams/base_streams_test.py index 81cad7a4d1b8d..da00e0895f21e 100644 --- a/tests/kafkatest/tests/streams/base_streams_test.py +++ b/tests/kafkatest/tests/streams/base_streams_test.py @@ -82,8 +82,8 @@ def assert_produce_consume(self, self.assert_consume(client_id, test_state, streams_sink_topic, num_messages, timeout_sec) - def assert_produce(self, topic, test_state, num_messages=5, timeout_sec=60): - producer = self.get_producer(topic, num_messages) + def assert_produce(self, topic, test_state, num_messages=5, timeout_sec=60, repeating_keys=None): + producer = self.get_producer(topic, num_messages, repeating_keys=repeating_keys) producer.start() wait_until(lambda: producer.num_acked >= num_messages, diff --git a/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py b/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py index 3b9d0b43bf7fc..94df6e3747338 100644 --- a/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py +++ b/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py @@ -129,10 +129,12 @@ def test_streams_runs_with_broker_down_initially(self, metadata_quorum, group_pr with processor_2.node.account.monitor_log(processor_2.STDOUT_FILE) as monitor_2: with processor_3.node.account.monitor_log(processor_3.STDOUT_FILE) as monitor_3: + # repeating_keys enables production of records with keys, ensuring that we produce to all 3 partitions self.assert_produce(self.inputTopic, "sending_message_after_broker_down_initially", num_messages=self.num_messages, - timeout_sec=120) + timeout_sec=120, + repeating_keys=self.num_messages) monitor_1.wait_until(self.message, timeout_sec=120, @@ -189,10 +191,12 @@ def test_streams_should_scale_in_while_brokers_down(self, metadata_quorum, group with processor_2.node.account.monitor_log(processor_2.STDOUT_FILE) as monitor_2: with processor_3.node.account.monitor_log(processor_3.STDOUT_FILE) as monitor_3: + # repeating_keys enables production of records with keys, ensuring that we produce to all 3 partitions self.assert_produce(self.inputTopic, "sending_message_normal_broker_start", num_messages=self.num_messages, - timeout_sec=120) + timeout_sec=120, + repeating_keys=self.num_messages) monitor_1.wait_until(self.message, timeout_sec=120, @@ -273,10 +277,12 @@ def test_streams_should_failover_while_brokers_down(self, metadata_quorum, group with processor_2.node.account.monitor_log(processor_2.STDOUT_FILE) as monitor_2: with processor_3.node.account.monitor_log(processor_3.STDOUT_FILE) as monitor_3: + # repeating_keys enables production of records with keys, ensuring that we produce to all 3 partitions self.assert_produce(self.inputTopic, "sending_message_after_normal_broker_start", num_messages=self.num_messages, - timeout_sec=120) + timeout_sec=120, + repeating_keys=self.num_messages) monitor_1.wait_until(self.message, timeout_sec=120, @@ -320,10 +326,12 @@ def test_streams_should_failover_while_brokers_down(self, metadata_quorum, group with processor_2.node.account.monitor_log(processor_2.STDOUT_FILE) as monitor_2: with processor_3.node.account.monitor_log(processor_3.STDOUT_FILE) as monitor_3: + # repeating_keys enables production of records with keys, ensuring that we produce to all 3 partitions self.assert_produce(self.inputTopic, "sending_message_after_hard_bouncing_streams_instance_bouncing_broker", num_messages=self.num_messages, - timeout_sec=120) + timeout_sec=120, + repeating_keys=self.num_messages) monitor_1.wait_until(self.message, timeout_sec=120,