diff --git a/tests/rptest/transactions/tx_atomic_produce_consume_test.py b/tests/rptest/transactions/tx_atomic_produce_consume_test.py index 45280b67eec4..ced1dcbf997e 100644 --- a/tests/rptest/transactions/tx_atomic_produce_consume_test.py +++ b/tests/rptest/transactions/tx_atomic_produce_consume_test.py @@ -35,6 +35,7 @@ class ExactlyOnceVerifier(): def __init__(self, redpanda, src_topic: str, + partition_count: int, dst_topic: str, transform_func, logger, @@ -44,6 +45,7 @@ def __init__(self, commit_every: int = 50, timeout_sec: int = 60): self._src_topic = src_topic + self._partition_count = partition_count self._dst_topic = dst_topic self._transform_func = transform_func self._logger = logger @@ -185,7 +187,7 @@ def reached_end(): with self._lock: self._finished_partitions |= end_for consumers = self._consumer_cnt - if len(self._finished_partitions) == len(high_watermarks): + if len(self._finished_partitions) == self._partition_count: return True return len(end_for) == len(assignments) and consumers > 1 @@ -359,6 +361,7 @@ def simple_transform(k, v): transformer = ExactlyOnceVerifier(self.redpanda, topic.name, + topic.partition_count, dst_topic.name, simple_transform, self.logger,