Skip to content
This repository was archived by the owner on Mar 31, 2026. It is now read-only.

Commit 0db12df

Browse files
authored
feat: add application-level keepalive to prevent ALB idle connection timeouts (#49)
AWS ALBs do not forward HTTP/2 PING frames, causing idle gRPC connections to be closed. This adds a background loop that periodically calls the existing Hello RPC as application-level traffic to keep the connection alive through L7 load balancers. Signed-off-by: joshvanl <me@joshvanl.dev>
1 parent 945caa4 commit 0db12df

File tree

1 file changed

+32
-0
lines changed

1 file changed

+32
-0
lines changed

durabletask/worker.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,9 @@ class TaskHubGrpcWorker:
231231
controlling worker concurrency limits. If None, default settings are used.
232232
stop_timeout (float, optional): Maximum time in seconds to wait for the worker thread
233233
to stop when calling stop(). Defaults to 30.0. Useful to set lower values in tests.
234+
keepalive_interval (float, optional): Interval in seconds between application-level
235+
keepalive Hello RPCs sent to prevent L7 load balancers (e.g. AWS ALB) from closing
236+
idle HTTP/2 connections. Set to 0 or negative to disable. Defaults to 30.0.
234237
235238
Attributes:
236239
concurrency_options (ConcurrencyOptions): The current concurrency configuration.
@@ -297,6 +300,7 @@ def __init__(
297300
concurrency_options: Optional[ConcurrencyOptions] = None,
298301
channel_options: Optional[Sequence[tuple[str, Any]]] = None,
299302
stop_timeout: float = 30.0,
303+
keepalive_interval: float = 30.0,
300304
):
301305
self._registry = _Registry()
302306
self._host_address = host_address if host_address else shared.get_default_host_address()
@@ -306,6 +310,7 @@ def __init__(
306310
self._secure_channel = secure_channel
307311
self._channel_options = channel_options
308312
self._stop_timeout = stop_timeout
313+
self._keepalive_interval = keepalive_interval
309314
self._current_channel: Optional[grpc.Channel] = None # Store channel reference for cleanup
310315
self._channel_cleanup_threads: list[threading.Thread] = [] # Deferred channel close threads
311316
self._stream_ready = threading.Event()
@@ -369,6 +374,26 @@ def run_loop():
369374
raise RuntimeError("Failed to establish work item stream connection within 10 seconds")
370375
self._is_running = True
371376

377+
async def _keepalive_loop(self, stub):
378+
"""Background keepalive loop to keep the gRPC connection alive through L7 load balancers."""
379+
loop = asyncio.get_running_loop()
380+
while not self._shutdown.is_set():
381+
await asyncio.sleep(self._keepalive_interval)
382+
if self._shutdown.is_set():
383+
return
384+
try:
385+
await loop.run_in_executor(None, lambda: stub.Hello(empty_pb2.Empty(), timeout=10))
386+
except Exception as e:
387+
self._logger.debug(f"keepalive failed: {e}")
388+
389+
@staticmethod
390+
async def _cancel_keepalive(keepalive_task):
391+
"""Cancel and await the keepalive task if it exists."""
392+
if keepalive_task is not None:
393+
keepalive_task.cancel()
394+
with contextlib.suppress(asyncio.CancelledError):
395+
await keepalive_task
396+
372397
# TODO: refactor this to be more readable and maintainable.
373398
async def _async_run_loop(self):
374399
"""
@@ -464,6 +489,7 @@ def should_invalidate_connection(rpc_error):
464489
if self._shutdown.wait(delay):
465490
break
466491
continue
492+
keepalive_task = None
467493
try:
468494
assert current_stub is not None
469495
stub = current_stub
@@ -580,6 +606,8 @@ def stream_reader():
580606
raise
581607

582608
loop = asyncio.get_running_loop()
609+
if self._keepalive_interval > 0:
610+
keepalive_task = asyncio.ensure_future(self._keepalive_loop(stub))
583611

584612
# NOTE: This is a blocking call that will wait for a work item to become available or the shutdown sentinel
585613
while not self._shutdown.is_set():
@@ -629,6 +657,7 @@ def stream_reader():
629657
invalidate_connection()
630658
raise e
631659
current_reader_thread.join(timeout=1)
660+
await self._cancel_keepalive(keepalive_task)
632661

633662
if self._shutdown.is_set():
634663
self._logger.info(f"Disconnected from {self._host_address}")
@@ -642,6 +671,7 @@ def stream_reader():
642671
# Fall through to the top of the outer loop, which will
643672
# create a fresh connection (with retry/backoff if needed)
644673
except grpc.RpcError as rpc_error:
674+
await self._cancel_keepalive(keepalive_task)
645675
# Check shutdown first - if shutting down, exit immediately
646676
if self._shutdown.is_set():
647677
self._logger.debug("Shutdown detected during RPC error handling, exiting")
@@ -664,6 +694,7 @@ def stream_reader():
664694
else:
665695
self._logger.warning(f"gRPC error ({error_code}): {error_detail}")
666696
except RuntimeError as ex:
697+
await self._cancel_keepalive(keepalive_task)
667698
# RuntimeError often indicates asyncio loop issues (e.g., "cannot schedule new futures after shutdown")
668699
# Check shutdown state first
669700
if self._shutdown.is_set():
@@ -687,6 +718,7 @@ def stream_reader():
687718
# it's likely shutdown-related. Break to prevent infinite retries.
688719
break
689720
except Exception as ex:
721+
await self._cancel_keepalive(keepalive_task)
690722
if self._shutdown.is_set():
691723
self._logger.debug(
692724
f"Shutdown detected during exception handling, exiting: {ex}"

0 commit comments

Comments
 (0)