@@ -231,6 +231,9 @@ class TaskHubGrpcWorker:
231231 controlling worker concurrency limits. If None, default settings are used.
232232 stop_timeout (float, optional): Maximum time in seconds to wait for the worker thread
233233 to stop when calling stop(). Defaults to 30.0. Useful to set lower values in tests.
234+ keepalive_interval (float, optional): Interval in seconds between application-level
235+ keepalive Hello RPCs sent to prevent L7 load balancers (e.g. AWS ALB) from closing
236+ idle HTTP/2 connections. Set to 0 or negative to disable. Defaults to 30.0.
234237
235238 Attributes:
236239 concurrency_options (ConcurrencyOptions): The current concurrency configuration.
@@ -297,6 +300,7 @@ def __init__(
297300 concurrency_options : Optional [ConcurrencyOptions ] = None ,
298301 channel_options : Optional [Sequence [tuple [str , Any ]]] = None ,
299302 stop_timeout : float = 30.0 ,
303+ keepalive_interval : float = 30.0 ,
300304 ):
301305 self ._registry = _Registry ()
302306 self ._host_address = host_address if host_address else shared .get_default_host_address ()
@@ -306,6 +310,7 @@ def __init__(
306310 self ._secure_channel = secure_channel
307311 self ._channel_options = channel_options
308312 self ._stop_timeout = stop_timeout
313+ self ._keepalive_interval = keepalive_interval
309314 self ._current_channel : Optional [grpc .Channel ] = None # Store channel reference for cleanup
310315 self ._stream_ready = threading .Event ()
311316 # Use provided concurrency options or create default ones
@@ -368,6 +373,26 @@ def run_loop():
368373 raise RuntimeError ("Failed to establish work item stream connection within 10 seconds" )
369374 self ._is_running = True
370375
376+ async def _keepalive_loop (self , stub ):
377+ """Background keepalive loop to keep the gRPC connection alive through L7 load balancers."""
378+ loop = asyncio .get_running_loop ()
379+ while not self ._shutdown .is_set ():
380+ await asyncio .sleep (self ._keepalive_interval )
381+ if self ._shutdown .is_set ():
382+ return
383+ try :
384+ await loop .run_in_executor (None , lambda : stub .Hello (empty_pb2 .Empty (), timeout = 10 ))
385+ except Exception as e :
386+ self ._logger .debug (f"keepalive failed: { e } " )
387+
388+ @staticmethod
389+ async def _cancel_keepalive (keepalive_task ):
390+ """Cancel and await the keepalive task if it exists."""
391+ if keepalive_task is not None :
392+ keepalive_task .cancel ()
393+ with contextlib .suppress (asyncio .CancelledError ):
394+ await keepalive_task
395+
371396 # TODO: refactor this to be more readable and maintainable.
372397 async def _async_run_loop (self ):
373398 """
@@ -472,6 +497,7 @@ def should_invalidate_connection(rpc_error):
472497 if self ._shutdown .wait (delay ):
473498 break
474499 continue
500+ keepalive_task = None
475501 try :
476502 assert current_stub is not None
477503 stub = current_stub
@@ -584,6 +610,8 @@ def stream_reader():
584610 raise
585611
586612 loop = asyncio .get_running_loop ()
613+ if self ._keepalive_interval > 0 :
614+ keepalive_task = asyncio .ensure_future (self ._keepalive_loop (stub ))
587615
588616 # NOTE: This is a blocking call that will wait for a work item to become available or the shutdown sentinel
589617 while not self ._shutdown .is_set ():
@@ -633,6 +661,7 @@ def stream_reader():
633661 invalidate_connection ()
634662 raise e
635663 current_reader_thread .join (timeout = 1 )
664+ await self ._cancel_keepalive (keepalive_task )
636665
637666 if self ._shutdown .is_set ():
638667 self ._logger .info (f"Disconnected from { self ._host_address } " )
@@ -646,6 +675,7 @@ def stream_reader():
646675 # Fall through to the top of the outer loop, which will
647676 # create a fresh connection (with retry/backoff if needed)
648677 except grpc .RpcError as rpc_error :
678+ await self ._cancel_keepalive (keepalive_task )
649679 # Check shutdown first - if shutting down, exit immediately
650680 if self ._shutdown .is_set ():
651681 self ._logger .debug ("Shutdown detected during RPC error handling, exiting" )
@@ -681,6 +711,7 @@ def stream_reader():
681711 f"Application-level gRPC error ({ error_code } ): { rpc_error } "
682712 )
683713 except RuntimeError as ex :
714+ await self ._cancel_keepalive (keepalive_task )
684715 # RuntimeError often indicates asyncio loop issues (e.g., "cannot schedule new futures after shutdown")
685716 # Check shutdown state first
686717 if self ._shutdown .is_set ():
@@ -704,6 +735,7 @@ def stream_reader():
704735 # it's likely shutdown-related. Break to prevent infinite retries.
705736 break
706737 except Exception as ex :
738+ await self ._cancel_keepalive (keepalive_task )
707739 if self ._shutdown .is_set ():
708740 self ._logger .debug (
709741 f"Shutdown detected during exception handling, exiting: { ex } "
0 commit comments