Skip to content

Commit afd4083

Browse files
alessandrobolognayaythomas
authored andcommitted
Fix sync checkpoint race by ordering operation fetch
Move completion event signaling to after the execution state is updated from the checkpoint response. This prevents a waiting user thread from running the second status check before new operations are added, which could lead to a duplicate START and stalled checkpoint thread. This preserves the existing checkpoint API semantics while closing the race window between completion signaling and state refresh.
1 parent c2e9bdd commit afd4083

1 file changed

Lines changed: 6 additions & 6 deletions

File tree

  • src/aws_durable_execution_sdk_python

src/aws_durable_execution_sdk_python/state.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -613,20 +613,20 @@ def checkpoint_batches_forever(self) -> None:
613613

614614
logger.debug("Checkpoint batch processed successfully")
615615

616-
# Signal completion for any synchronous operations
617-
for queued_op in batch:
618-
if queued_op.completion_event is not None:
619-
queued_op.completion_event.set()
620-
621616
# Update local token for next iteration
622617
current_checkpoint_token = output.checkpoint_token
623618

624-
# Fetch new operations from the API
619+
# Fetch new operations from the API before unblocking sync waiters
625620
self.fetch_paginated_operations(
626621
output.new_execution_state.operations,
627622
output.checkpoint_token,
628623
output.new_execution_state.next_marker,
629624
)
625+
626+
# Signal completion for any synchronous operations
627+
for queued_op in batch:
628+
if queued_op.completion_event is not None:
629+
queued_op.completion_event.set()
630630
except Exception as e:
631631
# Checkpoint failed - wake all blocked threads so they can raise error
632632
# Drain both queues and signal all completion events

0 commit comments

Comments
 (0)