Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Race Condition in removeReadyTimers() to Prevent Cleared Timers from Firing #143

Open
wants to merge 1 commit into
base: li_trunk
Choose a base branch
from

Conversation

FuRyanf
Copy link

@FuRyanf FuRyanf commented Feb 18, 2025

Issue Summary

A race condition in removeReadyTimers() allows timers that were cleared in the same batch to still fire. This occurs because the method loads timers into eventTimeBuffer in batches, and if an earlier timer clears a later timer, the later timer is still added to readyTimers. However, since state.get(timerKey, timeDomain) returns null (or 0) after deletion, it fails to be properly removed, leading to it firing incorrectly when it should have been cleared.

Root Cause

  1. removeReadyTimers() removes timers from eventTimeBuffer and adds them to readyTimers.
  2. Timers are deleted from persistent state using state.deletePersisted(keyedTimerData).
  3. If an earlier timer in the same batch clears a later timer, the later timer is already removed from state, making state.get(timerKey, timeDomain) return null.
  4. This causes the later timer to still exist in readyTimers, leading to it firing when it should have been cleared.

Proposed Solution

We propose addressing this race condition by changing how event-time timers are handled. Currently, when timers are removed from eventTimeBuffer, they are immediately deleted from persistent state. This can lead to a situation where an earlier timer clears a later timer, but the later timer still fires because its deletion is not detected (i.e., state.get returns null).

To resolve this, we:

  • Delay the deletion of event-time timers in removeReadyTimers().
  • Introduce a new helper method, timerExists(), that verifies whether a timer still exists in persistent state before it is fired.
  • Modify DoFnOp.java to check if an event-time timer remains in state before firing and then delete it only after successful processing.
  • Keep the existing behavior for non-event-time timers (which are still deleted immediately upon removal from eventTimeBuffer).

Additional Benefits

Beyond fixing the race condition, this change improves robustness in failure scenarios. Previously, if a batch of timers encountered an issue during processing, all timers in the batch would have been purged. Now, each event-time timer is only removed from persistent state after successful processing, ensuring that failed timers are not lost and can be retried.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI.

if (keyedTimerData.getTimerData().getDomain() == TimeDomain.EVENT_TIME) {
if (timerInternalsFactory.timerExists(keyedTimerData)) {
fireTimer(keyedTimerData);
timerInternalsFactory.removeProcessingTimer(keyedTimerData);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, seems we are removing processingTimer in the if branch of event-time domain. Could you please double check the logic?

Copy link
Author

@FuRyanf FuRyanf Feb 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me rename the method, it was an existing public method that did what I needed it to do, but the logic internally can handle both EVENT_TIME and PROCESSING_TIME.

    void deletePersisted(KeyedTimerData<K> keyedTimerData) {
      final TimerKey<K> timerKey = TimerKey.of(keyedTimerData);
      switch (keyedTimerData.getTimerData().getDomain()) {
        case EVENT_TIME:
          eventTimeTimerState.remove(timerKey);
          timestampSortedEventTimeTimerState.remove(keyedTimerData);
          break;

        case PROCESSING_TIME:
          processingTimeTimerState.remove(timerKey);
          break;

        default:
          throw new UnsupportedOperationException(
              String.format(
                  "%s currently only supports event time or processing time but get %s",
                  SamzaRunner.class, keyedTimerData.getTimerData().getDomain()));
      }
    }

*
* @return a collection of ready timers to be fired.
*/
public Collection<KeyedTimerData<K>> removeReadyTimersForDoFnOp() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's hard for me to tell the difference between this new method compared to the old removeReadyTimers(). Could you please make the change in place so I can compare? thanks.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The old removeReadyTimers() is directly above this method - I think this should be sufficient for comparison. For context the reason why I created a separate method is because I see it also called by GroupByKeyOp and SplittableParDoProcessKeyedElementsOp and I didn't want to make changes to those classes.

@xinyuiscool
Copy link

Please do a ./gradlew :runners:samza:validatesRunner so we can run through all beam timer test cases. Thanks.

state.deletePersisted(keyedTimerData);
}
if (eventTimeBuffer.isEmpty()) {
state.reloadEventTimeTimers();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, seems we will keep reloading the same timers again and again here if we don't delete them from the state.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch, added a fix!

@FuRyanf FuRyanf force-pushed the FuRyanf/cleared-timers-firing-bug branch from 4261374 to 26b2269 Compare February 19, 2025 18:07
@FuRyanf FuRyanf force-pushed the FuRyanf/cleared-timers-firing-bug branch from 26b2269 to c292981 Compare February 19, 2025 18:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants