Skip to content

Commit d2770cd

Browse files
authored
fix: fail activity/orchestrator immediately when gRPC result exceeds max message size (#50)
* fix: use correct type + use proper close method Signed-off-by: Samantha Coyle <sam@diagrid.io> * style: appease linter Signed-off-by: Samantha Coyle <sam@diagrid.io> * fix: proper handle and fail for resource exhausted due to large payloads Signed-off-by: Samantha Coyle <sam@diagrid.io> * style: address feedback Signed-off-by: Samantha Coyle <sam@diagrid.io> * fix: address feedback Signed-off-by: Samantha Coyle <sam@diagrid.io> --------- Signed-off-by: Samantha Coyle <sam@diagrid.io>
1 parent cb97b43 commit d2770cd

File tree

1 file changed

+61
-2
lines changed

1 file changed

+61
-2
lines changed

durabletask/worker.py

Lines changed: 61 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,15 @@ class ActivityNotRegisteredError(ValueError):
201201
pass
202202

203203

204+
def _is_message_too_large(rpc_error: grpc.RpcError) -> bool:
205+
"""Return True if the gRPC error is RESOURCE_EXHAUSTED.
206+
207+
All RESOURCE_EXHAUSTED errors are treated as a permanent message-size violation
208+
so the sidecar always receives an acknowledgment and avoids infinite redelivery.
209+
"""
210+
return rpc_error.code() == grpc.StatusCode.RESOURCE_EXHAUSTED
211+
212+
204213
# TODO: refactor this to closely match durabletask-go/client/worker_grpc.go instead of this.
205214
class TaskHubGrpcWorker:
206215
"""A gRPC-based worker for processing durable task orchestrations and activities.
@@ -890,7 +899,34 @@ def _execute_orchestrator(
890899
try:
891900
stub.CompleteOrchestratorTask(res)
892901
except grpc.RpcError as rpc_error: # type: ignore
893-
self._handle_grpc_execution_error(rpc_error, "orchestrator")
902+
if _is_message_too_large(rpc_error):
903+
# Response is too large to deliver - fail the orchestration immediately.
904+
# This can only be fixed with infrastructure changes (increasing gRPC max message size).
905+
self._logger.error(
906+
f"Orchestrator response for '{req.instanceId}' is too large to deliver "
907+
f"(RESOURCE_EXHAUSTED). Failing the orchestration task: {rpc_error.details()}"
908+
)
909+
failure_actions = [
910+
ph.new_complete_orchestration_action(
911+
-1, pb.ORCHESTRATION_STATUS_FAILED, "",
912+
ph.new_failure_details(RuntimeError(
913+
f"Orchestrator response exceeds gRPC max message size: {rpc_error.details()}"
914+
))
915+
)
916+
]
917+
failure_res = pb.OrchestratorResponse(
918+
instanceId=req.instanceId,
919+
actions=failure_actions,
920+
completionToken=completionToken,
921+
)
922+
try:
923+
stub.CompleteOrchestratorTask(failure_res)
924+
except Exception as ex:
925+
self._logger.exception(
926+
f"Failed to deliver orchestrator failure response for '{req.instanceId}': {ex}"
927+
)
928+
else:
929+
self._handle_grpc_execution_error(rpc_error, "orchestrator")
894930
except ValueError:
895931
# gRPC raises ValueError when the underlying channel has been closed (e.g. during reconnection).
896932
self._logger.debug(
@@ -949,7 +985,30 @@ def _execute_activity(
949985
try:
950986
stub.CompleteActivityTask(res)
951987
except grpc.RpcError as rpc_error: # type: ignore
952-
self._handle_grpc_execution_error(rpc_error, "activity")
988+
if _is_message_too_large(rpc_error):
989+
# Result is too large to deliver - fail the activity immediately.
990+
# This can only be fixed with infrastructure changes (increasing gRPC max message size).
991+
self._logger.error(
992+
f"Activity '{req.name}#{req.taskId}' result is too large to deliver "
993+
f"(RESOURCE_EXHAUSTED). Failing the activity task: {rpc_error.details()}"
994+
)
995+
failure_res = pb.ActivityResponse(
996+
instanceId=instance_id,
997+
taskId=req.taskId,
998+
failureDetails=ph.new_failure_details(RuntimeError(
999+
f"Activity result exceeds gRPC max message size: {rpc_error.details()}"
1000+
)),
1001+
completionToken=completionToken,
1002+
)
1003+
try:
1004+
stub.CompleteActivityTask(failure_res)
1005+
except Exception as ex:
1006+
self._logger.exception(
1007+
f"Failed to deliver activity failure response for '{req.name}#{req.taskId}' "
1008+
f"of orchestration ID '{instance_id}': {ex}"
1009+
)
1010+
else:
1011+
self._handle_grpc_execution_error(rpc_error, "activity")
9531012
except ValueError:
9541013
# gRPC raises ValueError when the underlying channel has been closed (e.g. during reconnection).
9551014
self._logger.debug(

0 commit comments

Comments
 (0)