-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-17646: Fix flaky KafkaStreamsTest.testStateGlobalThreadClose #17310
KAFKA-17646: Fix flaky KafkaStreamsTest.testStateGlobalThreadClose #17310
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chenyulin0719 thanks for your patch. overall +1
streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
Outdated
Show resolved
Hide resolved
d6571d0
to
06f299f
Compare
@chia7712 Thanks for the review. I updated the PR as suggested. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. One question. Won't we leak the thread if we don't join
it?
@@ -540,21 +540,18 @@ public void testStateGlobalThreadClose() throws Exception { | |||
waitForCondition( | |||
() -> globalStreamThread.state() == GlobalStreamThread.State.DEAD, | |||
"Thread never stopped."); | |||
globalStreamThread.join(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we remove this line?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @mjsax, there will be no GlobalStreamThread thread triggered since we mock the GlobalStreamThread.start() to change the state.
kafka/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
Lines 277 to 283 in 1854d4b
doAnswer(invocation -> { | |
globalThreadState.set(GlobalStreamThread.State.RUNNING); | |
threadStateListenerCapture.getValue().onChange(mock, | |
GlobalStreamThread.State.RUNNING, | |
GlobalStreamThread.State.CREATED); | |
return null; | |
}).when(mock).start(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah. Thanks! Missed this part.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
loop 200 times, all pass
It's regarding KAFKA-17646.
Two issues found in the flaky test:
streams.close()
andshutdownHelper()
threads: Resolved it by moving thestreams.close()
call after confirming the KafkaStreams state is inERROR
state.PENDING_ERROR
state: Resolved it by changing the assertion to check the log contentState transition from RUNNING to PENDING_ERROR
.Put the verbose explanation under Jira comment.
Committer Checklist (excluded from commit message)