Skip to content

Commit 75768dd

Browse files
authored
MINOR: Prevent re-join flakiness in test_fencing_static_consumer by ensuring conflicting static consumers terminate (#20772)
Related discussion: #20594 (review) ### Problem The test `OffsetValidationTest.test_fencing_static_consumer` failed when executed with `fencing_stage=stable` and `group_protocol=consumer`. It timed out while waiting for the group to become empty because the conflicting static consumers re-joined after the original members stopped, keeping the group non-empty and causing the timeout. ### Fix For the consumer-protocol path, the test now waits for all conflicting consumer processes to terminate before stopping the original static members. This ensures that each conflicting consumers is fully fenced and cannot re-join the group after the original members stop. Reviewers: Chia-Ping Tsai <[email protected]>
1 parent 53e1172 commit 75768dd

File tree

2 files changed

+26
-3
lines changed

2 files changed

+26
-3
lines changed

tests/kafkatest/services/verifiable_consumer.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ class ConsumerEventHandler(object):
4242

4343
def __init__(self, node, verify_offsets, idx, state=ConsumerState.Dead,
4444
revoked_count=0, assigned_count=0, assignment=None,
45-
position=None, committed=None, total_consumed=0):
45+
position=None, committed=None, total_consumed=0,
46+
shutdown_complete=False):
4647
self.node = node
4748
self.verify_offsets = verify_offsets
4849
self.idx = idx
@@ -53,11 +54,13 @@ def __init__(self, node, verify_offsets, idx, state=ConsumerState.Dead,
5354
self.position = position if position is not None else {}
5455
self.committed = committed if committed is not None else {}
5556
self.total_consumed = total_consumed
57+
self.shutdown_complete = shutdown_complete
5658

5759
def handle_shutdown_complete(self, node=None, logger=None):
5860
self.state = ConsumerState.Dead
5961
self.assignment = []
6062
self.position = {}
63+
self.shutdown_complete = True
6164

6265
if node is not None and logger is not None:
6366
logger.debug("Shut down %s" % node.account.hostname)
@@ -277,7 +280,8 @@ def create_handler_helper(handler_class, node, idx, existing_handler=None):
277280
assignment=existing_handler.assignment,
278281
position=existing_handler.position,
279282
committed=existing_handler.committed,
280-
total_consumed=existing_handler.total_consumed)
283+
total_consumed=existing_handler.total_consumed,
284+
shutdown_complete=existing_handler.shutdown_complete)
281285
else:
282286
return handler_class(node, self.verify_offsets, idx)
283287
existing_handler = self.event_handlers[node] if node in self.event_handlers else None
@@ -292,6 +296,7 @@ def _worker(self, idx, node):
292296
with self.lock:
293297
self.event_handlers[node] = self.create_event_handler(idx, node)
294298
handler = self.event_handlers[node]
299+
handler.shutdown_complete = False
295300

296301
node.account.ssh("mkdir -p %s" % VerifiableConsumer.PERSISTENT_ROOT, allow_fail=False)
297302

@@ -526,5 +531,10 @@ def alive_nodes(self):
526531
return [handler.node for handler in self.event_handlers.values()
527532
if handler.state != ConsumerState.Dead]
528533

534+
def shutdown_complete_nodes(self):
535+
with self.lock:
536+
return [handler.node for handler in self.event_handlers.values()
537+
if handler.shutdown_complete]
538+
529539
def is_consumer_group_protocol_enabled(self):
530540
return self.group_protocol and self.group_protocol.lower() == consumer_group.consumer_group_protocol

tests/kafkatest/tests/client/consumer_test.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from ducktape.mark.resource import cluster
1919

2020
from kafkatest.tests.verifiable_consumer_test import VerifiableConsumerTest
21+
from kafkatest.services.verifiable_consumer import VerifiableConsumer
2122
from kafkatest.services.kafka import TopicPartition, quorum, consumer_group
2223

2324
import signal
@@ -74,6 +75,14 @@ def setup_consumer(self, topic, **kwargs):
7475
self.mark_for_collect(consumer, 'verifiable_consumer_stdout')
7576
return consumer
7677

78+
def await_conflict_consumers_fenced(self, conflict_consumer):
79+
# Rely on explicit shutdown_complete events from the verifiable consumer to guarantee each conflict member
80+
# reached the fenced path rather than remaining in the default DEAD state prior to startup.
81+
wait_until(lambda: len(conflict_consumer.shutdown_complete_nodes()) == len(conflict_consumer.nodes) and
82+
len(conflict_consumer.dead_nodes()) == len(conflict_consumer.nodes),
83+
timeout_sec=60,
84+
err_msg="Timed out waiting for conflict consumers to report shutdown completion after fencing")
85+
7786
@cluster(num_nodes=7)
7887
@matrix(
7988
metadata_quorum=[quorum.isolated_kraft],
@@ -326,7 +335,11 @@ def test_fencing_static_consumer(self, num_conflict_consumers, fencing_stage, me
326335
assert num_rebalances == consumer.num_rebalances(), "Static consumers attempt to join with instance id in use should not cause a rebalance"
327336
assert len(consumer.joined_nodes()) == len(consumer.nodes)
328337
assert len(conflict_consumer.joined_nodes()) == 0
329-
338+
339+
# Conflict consumers will terminate due to a fatal UnreleasedInstanceIdException error.
340+
# Wait for termination to complete to prevent conflict consumers from immediately re-joining the group while existing nodes are shutting down.
341+
self.await_conflict_consumers_fenced(conflict_consumer)
342+
330343
# Stop existing nodes, so conflicting ones should be able to join.
331344
consumer.stop_all()
332345
wait_until(lambda: len(consumer.dead_nodes()) == len(consumer.nodes),

0 commit comments

Comments
 (0)