-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix Race Condition in removeReadyTimers() to Prevent Cleared Timers from Firing #143
base: li_trunk
Are you sure you want to change the base?
Fix Race Condition in removeReadyTimers() to Prevent Cleared Timers from Firing #143
Conversation
if (keyedTimerData.getTimerData().getDomain() == TimeDomain.EVENT_TIME) { | ||
if (timerInternalsFactory.timerExists(keyedTimerData)) { | ||
fireTimer(keyedTimerData); | ||
timerInternalsFactory.removeProcessingTimer(keyedTimerData); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, seems we are removing processingTimer in the if branch of event-time domain. Could you please double check the logic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me rename the method, it was an existing public method that did what I needed it to do, but the logic internally can handle both EVENT_TIME and PROCESSING_TIME.
void deletePersisted(KeyedTimerData<K> keyedTimerData) {
final TimerKey<K> timerKey = TimerKey.of(keyedTimerData);
switch (keyedTimerData.getTimerData().getDomain()) {
case EVENT_TIME:
eventTimeTimerState.remove(timerKey);
timestampSortedEventTimeTimerState.remove(keyedTimerData);
break;
case PROCESSING_TIME:
processingTimeTimerState.remove(timerKey);
break;
default:
throw new UnsupportedOperationException(
String.format(
"%s currently only supports event time or processing time but get %s",
SamzaRunner.class, keyedTimerData.getTimerData().getDomain()));
}
}
* | ||
* @return a collection of ready timers to be fired. | ||
*/ | ||
public Collection<KeyedTimerData<K>> removeReadyTimersForDoFnOp() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's hard for me to tell the difference between this new method compared to the old removeReadyTimers(). Could you please make the change in place so I can compare? thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The old removeReadyTimers() is directly above this method - I think this should be sufficient for comparison. For context the reason why I created a separate method is because I see it also called by GroupByKeyOp
and SplittableParDoProcessKeyedElementsOp
and I didn't want to make changes to those classes.
Please do a ./gradlew :runners:samza:validatesRunner so we can run through all beam timer test cases. Thanks. |
state.deletePersisted(keyedTimerData); | ||
} | ||
if (eventTimeBuffer.isEmpty()) { | ||
state.reloadEventTimeTimers(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, seems we will keep reloading the same timers again and again here if we don't delete them from the state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch, added a fix!
4261374
to
26b2269
Compare
…eReadyTimers() batch.
26b2269
to
c292981
Compare
Issue Summary
A race condition in
removeReadyTimers()
allows timers that were cleared in the same batch to still fire. This occurs because the method loads timers intoeventTimeBuffer
in batches, and if an earlier timer clears a later timer, the later timer is still added toreadyTimers
. However, sincestate.get(timerKey, timeDomain)
returnsnull
(or0
) after deletion, it fails to be properly removed, leading to it firing incorrectly when it should have been cleared.Root Cause
removeReadyTimers()
removes timers fromeventTimeBuffer
and adds them toreadyTimers
.state.deletePersisted(keyedTimerData)
.state.get(timerKey, timeDomain)
returnnull
.readyTimers
, leading to it firing when it should have been cleared.Proposed Solution
We propose addressing this race condition by changing how event-time timers are handled. Currently, when timers are removed from
eventTimeBuffer
, they are immediately deleted from persistent state. This can lead to a situation where an earlier timer clears a later timer, but the later timer still fires because its deletion is not detected (i.e.,state.get
returnsnull
).To resolve this, we:
removeReadyTimers()
.timerExists()
, that verifies whether a timer still exists in persistent state before it is fired.DoFnOp.java
to check if an event-time timer remains in state before firing and then delete it only after successful processing.eventTimeBuffer
).Additional Benefits
Beyond fixing the race condition, this change improves robustness in failure scenarios. Previously, if a batch of timers encountered an issue during processing, all timers in the batch would have been purged. Now, each event-time timer is only removed from persistent state after successful processing, ensuring that failed timers are not lost and can be retried.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.