diff --git a/backend/workflow_manager/workflow_v2/workflow_helper.py b/backend/workflow_manager/workflow_v2/workflow_helper.py index 5adb81a7e..3e1e4c80b 100644 --- a/backend/workflow_manager/workflow_v2/workflow_helper.py +++ b/backend/workflow_manager/workflow_v2/workflow_helper.py @@ -415,8 +415,9 @@ def _set_result_acknowledge(execution: WorkflowExecution) -> None: f"ExecutionID [{execution.id}] - Task {execution.task_id} acknowledged" ) - @staticmethod + @classmethod def execute_workflow_async( + cls, workflow_id: str, execution_id: str, hash_values_of_files: dict[str, FileHash], @@ -445,7 +446,7 @@ def execute_workflow_async( } org_schema = UserContext.get_organization_identifier() log_events_id = StateStore.get(Common.LOG_EVENTS_ID) - async_execution = WorkflowHelper.execute_bin.apply_async( + async_execution: AsyncResult = cls.execute_bin.apply_async( args=[ org_schema, # schema_name workflow_id, # workflow_id @@ -480,6 +481,10 @@ def execute_workflow_async( workflow_execution.status, result=task_result, ) + # If task is complete, handle acknowledgment and forgetting the + if async_execution.ready(): + async_execution.forget() # Remove the result from the result backend. + cls._set_result_acknowledge(workflow_execution) return execution_response except celery_exceptions.TimeoutError: return ExecutionResponse(