Skip to content
This repository was archived by the owner on Mar 31, 2026. It is now read-only.

Commit c67b696

Browse files
authored
Merge pull request #46 from JoshVanL/task-execution-id
fix: assign new event IDs for activity retries and add TaskExecutionId
2 parents 2cab991 + 82deaf6 commit c67b696

File tree

6 files changed

+209
-127
lines changed

6 files changed

+209
-127
lines changed

durabletask/internal/helpers.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,14 +204,19 @@ def new_create_timer_action(id: int, fire_at: datetime) -> pb.OrchestratorAction
204204

205205

206206
def new_schedule_task_action(
207-
id: int, name: str, encoded_input: Optional[str], router: Optional[pb.TaskRouter] = None
207+
id: int,
208+
name: str,
209+
encoded_input: Optional[str],
210+
router: Optional[pb.TaskRouter] = None,
211+
task_execution_id: str = "",
208212
) -> pb.OrchestratorAction:
209213
return pb.OrchestratorAction(
210214
id=id,
211215
scheduleTask=pb.ScheduleTaskAction(
212216
name=name,
213217
input=get_string_value(encoded_input),
214218
router=router,
219+
taskExecutionId=task_execution_id,
215220
),
216221
router=router,
217222
)

durabletask/task.py

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -396,16 +396,24 @@ class RetryableTask(CompletableTask[T]):
396396
def __init__(
397397
self,
398398
retry_policy: RetryPolicy,
399-
action: pb.OrchestratorAction,
400399
start_time: datetime,
401400
is_sub_orch: bool,
401+
task_name: str,
402+
encoded_input: Optional[str] = None,
403+
task_execution_id: str = "",
404+
instance_id: Optional[str] = None,
405+
app_id: Optional[str] = None,
402406
) -> None:
403407
super().__init__()
404-
self._action = action
405408
self._retry_policy = retry_policy
406409
self._attempt_count = 1
407410
self._start_time = start_time
408411
self._is_sub_orch = is_sub_orch
412+
self._task_name = task_name
413+
self._encoded_input = encoded_input
414+
self._task_execution_id = task_execution_id
415+
self._instance_id = instance_id
416+
self._app_id = app_id
409417

410418
def increment_attempt_count(self) -> None:
411419
self._attempt_count += 1
@@ -479,9 +487,10 @@ def when_any(tasks: list[Task]) -> WhenAnyTask:
479487

480488

481489
class ActivityContext:
482-
def __init__(self, orchestration_id: str, task_id: int):
490+
def __init__(self, orchestration_id: str, task_id: int, task_execution_id: str = ""):
483491
self._orchestration_id = orchestration_id
484492
self._task_id = task_id
493+
self._task_execution_id = task_execution_id
485494

486495
@property
487496
def orchestration_id(self) -> str:
@@ -510,6 +519,21 @@ def task_id(self) -> int:
510519
"""
511520
return self._task_id
512521

522+
@property
523+
def task_execution_id(self) -> str:
524+
"""Get the task execution ID associated with this activity invocation.
525+
526+
The task execution ID is a UUID that is stable across retry attempts
527+
of the same activity call. It can be used for idempotency and
528+
deduplication when an activity may be retried.
529+
530+
Returns
531+
-------
532+
str
533+
The task execution ID for this activity invocation.
534+
"""
535+
return self._task_execution_id
536+
513537

514538
# Orchestrators are generators that yield tasks and receive/return any type
515539
Orchestrator = Callable[[OrchestrationContext, TInput], Union[Generator[Task, Any, Any], TOutput]]

durabletask/worker.py

Lines changed: 34 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -888,7 +888,9 @@ def _execute_activity(
888888
with span_context:
889889
try:
890890
executor = _ActivityExecutor(self._registry, self._logger)
891-
result = executor.execute(instance_id, req.name, req.taskId, req.input.value)
891+
result = executor.execute(
892+
instance_id, req.name, req.taskId, req.input.value, req.taskExecutionId
893+
)
892894
res = pb.ActivityResponse(
893895
instanceId=instance_id,
894896
taskId=req.taskId,
@@ -1125,9 +1127,16 @@ def call_activity(
11251127
app_id: Optional[str] = None,
11261128
) -> task.Task[TOutput]:
11271129
id = self.next_sequence_number()
1130+
task_execution_id = str(self.new_guid())
11281131

11291132
self.call_activity_function_helper(
1130-
id, activity, input=input, retry_policy=retry_policy, is_sub_orch=False, app_id=app_id
1133+
id,
1134+
activity,
1135+
input=input,
1136+
retry_policy=retry_policy,
1137+
is_sub_orch=False,
1138+
app_id=app_id,
1139+
task_execution_id=task_execution_id,
11311140
)
11321141
return self._pending_tasks.get(id, task.CompletableTask())
11331142

@@ -1167,6 +1176,7 @@ def call_activity_function_helper(
11671176
instance_id: Optional[str] = None,
11681177
fn_task: Optional[task.CompletableTask[TOutput]] = None,
11691178
app_id: Optional[str] = None,
1179+
task_execution_id: str = "",
11701180
):
11711181
if id is None:
11721182
id = self.next_sequence_number()
@@ -1180,16 +1190,17 @@ def call_activity_function_helper(
11801190
if fn_task is None:
11811191
encoded_input = shared.to_json(input) if input is not None else None
11821192
else:
1183-
# Here, we don't need to convert the input to JSON because it is already converted.
1184-
# We just need to take string representation of it.
1185-
encoded_input = str(input)
1193+
# When retrying, input is already encoded as a string (or None).
1194+
encoded_input = str(input) if input is not None else None
11861195
if not is_sub_orch:
11871196
name = (
11881197
activity_function
11891198
if isinstance(activity_function, str)
11901199
else task.get_name(activity_function)
11911200
)
1192-
action = ph.new_schedule_task_action(id, name, encoded_input, router)
1201+
action = ph.new_schedule_task_action(
1202+
id, name, encoded_input, router, task_execution_id=task_execution_id
1203+
)
11931204
else:
11941205
if instance_id is None:
11951206
# Create a deteministic instance ID based on the parent instance ID
@@ -1207,9 +1218,13 @@ def call_activity_function_helper(
12071218
else:
12081219
fn_task = task.RetryableTask[TOutput](
12091220
retry_policy=retry_policy,
1210-
action=action,
12111221
start_time=self.current_utc_datetime,
12121222
is_sub_orch=is_sub_orch,
1223+
task_name=name if not is_sub_orch else activity_function,
1224+
encoded_input=encoded_input,
1225+
task_execution_id=task_execution_id,
1226+
instance_id=instance_id,
1227+
app_id=app_id,
12131228
)
12141229
self._pending_tasks[id] = fn_task
12151230

@@ -1429,28 +1444,18 @@ def process_event(self, ctx: _RuntimeOrchestrationContext, event: pb.HistoryEven
14291444
return
14301445
timer_task.complete(None)
14311446
if timer_task._retryable_parent is not None:
1432-
activity_action = timer_task._retryable_parent._action
1433-
1434-
if not timer_task._retryable_parent._is_sub_orch:
1435-
cur_task = activity_action.scheduleTask
1436-
instance_id = None
1437-
else:
1438-
cur_task = activity_action.createSubOrchestration
1439-
instance_id = cur_task.instanceId
1440-
if cur_task.router and cur_task.router.targetAppID:
1441-
target_app_id = cur_task.router.targetAppID
1442-
else:
1443-
target_app_id = None
1447+
retryable = timer_task._retryable_parent
14441448

14451449
ctx.call_activity_function_helper(
1446-
id=activity_action.id,
1447-
activity_function=cur_task.name,
1448-
input=cur_task.input.value,
1449-
retry_policy=timer_task._retryable_parent._retry_policy,
1450-
is_sub_orch=timer_task._retryable_parent._is_sub_orch,
1451-
instance_id=instance_id,
1452-
fn_task=timer_task._retryable_parent,
1453-
app_id=target_app_id,
1450+
id=None, # Get a new sequence number
1451+
activity_function=retryable._task_name,
1452+
input=retryable._encoded_input,
1453+
retry_policy=retryable._retry_policy,
1454+
is_sub_orch=retryable._is_sub_orch,
1455+
instance_id=retryable._instance_id,
1456+
fn_task=retryable,
1457+
app_id=retryable._app_id,
1458+
task_execution_id=retryable._task_execution_id,
14541459
)
14551460
else:
14561461
ctx.resume()
@@ -1682,6 +1687,7 @@ def execute(
16821687
name: str,
16831688
task_id: int,
16841689
encoded_input: Optional[str],
1690+
task_execution_id: str = "",
16851691
) -> Optional[str]:
16861692
"""Executes an activity function and returns the serialized result, if any."""
16871693
self._logger.debug(f"{orchestration_id}/{task_id}: Executing activity '{name}'...")
@@ -1692,7 +1698,7 @@ def execute(
16921698
)
16931699

16941700
activity_input = shared.from_json(encoded_input) if encoded_input else None
1695-
ctx = task.ActivityContext(orchestration_id, task_id)
1701+
ctx = task.ActivityContext(orchestration_id, task_id, task_execution_id)
16961702

16971703
# Execute the activity function
16981704
activity_output = fn(ctx, activity_input)

tests/durabletask/test_orchestration_e2e.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -500,8 +500,9 @@ def throw_activity_with_retry(ctx: task.ActivityContext, _):
500500
assert state.runtime_status == client.OrchestrationStatus.FAILED
501501
assert state.failure_details is not None
502502
assert state.failure_details.error_type == "TaskFailedError"
503-
assert state.failure_details.message.startswith("Sub-orchestration task #1 failed:")
504-
assert state.failure_details.message.endswith("Activity task #1 failed: Kah-BOOOOM!!!")
503+
assert "Sub-orchestration task #" in state.failure_details.message
504+
assert "failed:" in state.failure_details.message
505+
assert state.failure_details.message.endswith("failed: Kah-BOOOOM!!!")
505506
assert state.failure_details.stack_trace is not None
506507
assert throw_activity_counter == 9
507508
assert child_orch_counter == 3
@@ -568,7 +569,7 @@ def throw_activity(ctx: task.ActivityContext, _):
568569
assert state.runtime_status == client.OrchestrationStatus.FAILED
569570
assert state.failure_details is not None
570571
assert state.failure_details.error_type == "TaskFailedError"
571-
assert state.failure_details.message.endswith("Activity task #1 failed: Kah-BOOOOM!!!")
572+
assert state.failure_details.message.endswith("failed: Kah-BOOOOM!!!")
572573
assert state.failure_details.stack_trace is not None
573574
assert throw_activity_counter == 4
574575

tests/durabletask/test_orchestration_e2e_async.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -414,8 +414,9 @@ def throw_activity_with_retry(ctx: task.ActivityContext, _):
414414
assert state.runtime_status == OrchestrationStatus.FAILED
415415
assert state.failure_details is not None
416416
assert state.failure_details.error_type == "TaskFailedError"
417-
assert state.failure_details.message.startswith("Sub-orchestration task #1 failed:")
418-
assert state.failure_details.message.endswith("Activity task #1 failed: Kah-BOOOOM!!!")
417+
assert "Sub-orchestration task #" in state.failure_details.message
418+
assert "failed:" in state.failure_details.message
419+
assert state.failure_details.message.endswith("failed: Kah-BOOOOM!!!")
419420
assert state.failure_details.stack_trace is not None
420421
assert throw_activity_counter == 9
421422
assert child_orch_counter == 3
@@ -455,7 +456,7 @@ def throw_activity(ctx: task.ActivityContext, _):
455456
assert state.runtime_status == OrchestrationStatus.FAILED
456457
assert state.failure_details is not None
457458
assert state.failure_details.error_type == "TaskFailedError"
458-
assert state.failure_details.message.endswith("Activity task #1 failed: Kah-BOOOOM!!!")
459+
assert state.failure_details.message.endswith("failed: Kah-BOOOOM!!!")
459460
assert state.failure_details.stack_trace is not None
460461
assert throw_activity_counter == 4
461462

0 commit comments

Comments
 (0)