Skip to content

Commit 104b43f

Browse files
committed
Handle assignment signal failures
1 parent bbe3383 commit 104b43f

File tree

2 files changed

+25
-9
lines changed

2 files changed

+25
-9
lines changed

resource_pool/resource_pool_workflow.py

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from typing import Optional
33

44
from temporalio import workflow
5+
from temporalio.exceptions import ApplicationError
56

67
from resource_pool.shared import AcquireRequest, AcquireResponse
78

@@ -79,20 +80,28 @@ def get_current_holders(self) -> dict[str, Optional[InternalAcquireRequest]]:
7980
async def assign_resource(
8081
self, resource: str, internal_request: InternalAcquireRequest
8182
) -> None:
82-
self.resources[resource] = internal_request
8383
workflow.logger.info(
8484
f"workflow_id={internal_request.workflow_id} acquired resource {resource}"
8585
)
86-
internal_request.release_signal = str(workflow.uuid4())
87-
self.release_key_to_resource[internal_request.release_signal] = resource
8886

8987
requester = workflow.get_external_workflow_handle(internal_request.workflow_id)
90-
await requester.signal(
91-
f"assign_resource_{workflow.info().workflow_id}",
92-
AcquireResponse(
93-
release_key=internal_request.release_signal, resource=resource
94-
),
95-
)
88+
try:
89+
release_signal = str(workflow.uuid4())
90+
await requester.signal(
91+
f"assign_resource_{workflow.info().workflow_id}",
92+
AcquireResponse(release_key=release_signal, resource=resource),
93+
)
94+
95+
internal_request.release_signal = release_signal
96+
self.resources[resource] = internal_request
97+
self.release_key_to_resource[release_signal] = resource
98+
except ApplicationError as e:
99+
if e.type == "ExternalWorkflowExecutionNotFound":
100+
workflow.logger.info(
101+
f"Could not assign resource {resource} to {internal_request.workflow_id}: {e.message}"
102+
)
103+
else:
104+
raise e
96105

97106
async def assign_next_resource(self) -> bool:
98107
if len(self.waiters) == 0:

tests/resource_pool/workflow_test.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,13 @@ async def use_resource_mock(input: UseResourceActivityInput) -> None:
6969
), f"{workflow_id} ended on {resource} held by {holder}"
7070
holder = None
7171

72+
# Are all the resources free, per the query?
73+
pool_handle = client.get_workflow_handle_for(
74+
ResourcePoolWorkflow.run, RESOURCE_POOL_WORKFLOW_ID
75+
)
76+
query_result = await pool_handle.query(ResourcePoolWorkflow.get_current_holders)
77+
assert query_result == {"r_a": None, "r_b": None, "r_c": None}
78+
7279

7380
async def run_all_workflows(client: Client):
7481
resource_pool_handle = await client.start_workflow(

0 commit comments

Comments
 (0)