- 
                Notifications
    You must be signed in to change notification settings 
- Fork 14.8k
MINOR: Prevent re-join flakiness in test_fencing_static_consumer by ensuring conflicting static consumers terminate #20772
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| Test command: Before: After:  | 
| return consumer | ||
|  | ||
| def _node_failed_with_unreleased_instance_id(self, node): | ||
| cmd = "grep -q 'UnreleasedInstanceIdException' %s" % VerifiableConsumer.LOG_FILE | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that the error is subject to change, would it be more reliable to check the process ID (PID) directly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the suggestion!
Given backward compatibility, checking the PID makes more sense.
I’ve made the corresponding adjustment.
| Reran the test to reflect the recent changes: Also tested other parameter combinations, and they were not affected.  | 
|  | ||
| def await_conflict_consumers_fenced(self, conflict_consumer): | ||
| # Ensure every conflicting consumer actually starts once before we wait for fencing. | ||
| started_nodes = set() | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the purpose of this started_nodes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed, no longer in use.
| started_nodes.add(node) | ||
| return len(conflict_consumer.alive_nodes()) == len(conflict_consumer.nodes) | ||
|  | ||
| wait_until(all_conflict_consumers_started, | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if the conflicting consumer dies before this check occurs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the reminder.
conflict_consumer.start() runs asynchronously. The detection of a duplicate groupId and the subsequent UnreleasedInstanceIdException that interrupts the consumer process both occur during this phase.
If the newly added validation method await_conflict_consumers_fenced(conflict_consumer) is executed before the conflict consumer is interrupted, wait_until(all_conflict_consumers_started) will time out since it never sees all consumers started, leading to the test failure.
In my environment, adding a 2-second sleep before the validation step can reliably reproduce the issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I’ve modified the test to only focus on verifying that the conflict consumer node has properly terminated.
This should make the test more stable.
| timeout_sec=60, | ||
| err_msg="Timed out waiting for conflict consumers to terminate after fencing") | ||
|  | ||
| # Guard against stray processes that could rejoin once the original static members stop. | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you remind readers that UnreleasedInstanceIdException is a fatal error, resulting in the consumer's immediate failure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
I’ve included this explanation here:
https://github.com/apache/kafka/pull/20772/files#diff-d3acc8af63cbc0e087de2edb74ce53c6e4191c87f27e9a29359faef033eb0911R339-R341
| self.position = position if position is not None else {} | ||
| self.committed = committed if committed is not None else {} | ||
| self.total_consumed = total_consumed | ||
| self.shutdown_complete = shutdown_complete | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ConsumerEventHandler instantiates every consumer in ConsumerState.Dead. During the test, we sometimes read that state before the background thread emits startup_complete, so
the test concluded “this node started and then shut down” even though it never left the default state.
We now keep an explicit shutdown_complete flag so the fencing test only proceeds once each conflicting consumer has actually gone through the start→shutdown sequence.
stateDiagram-v2
    [*] --> Dead : Init (default)
    Dead --> Started : startup_complete
    Started --> Dead : shutdown_complete(flag = true)
    | The following are the updated test results. Also ran this test 20 times using the following command to verify its stability, and all runs passed successfully. for i in {1..20}; do
  echo "Run $i"
  TC_PATHS="tests/kafkatest/tests/client/consumer_test.py::OffsetValidationTest.test_fencing_static_consumer" \
  _DUCKTAPE_OPTIONS="--parameters '{\"fencing_stage\":\"stable\",\"group_protocol\":\"consumer\",\"num_conflict_consumers\":1,\"metadata_quorum\":\"ISOLATED_KRAFT\"}'" \
  bash tests/docker/run_tests.sh || break
done | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Related discussion:
#20594 (review)
Problem
The test
OffsetValidationTest.test_fencing_static_consumerfailed whenexecuted with
fencing_stage=stableandgroup_protocol=consumer.It timed out while waiting for the group to become empty because the
conflicting static consumers re-joined after the original members
stopped, keeping the group non-empty and causing the timeout.
Fix
For the consumer-protocol path, the test now waits for all conflicting
consumer processes to terminate before stopping the original static
members. This ensures that each conflicting consumers is fully fenced
and cannot re-join the group after the original members stop.
Reviewers: Chia-Ping Tsai [email protected]