Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 50 additions & 39 deletions sdks/python/apache_beam/utils/subprocess_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,45 +201,56 @@ def __exit__(self, *unused_args):
self.stop()

def start(self):
try:
process, endpoint = self.start_process()
wait_secs = .1
channel_options = [
("grpc.max_receive_message_length", -1),
("grpc.max_send_message_length", -1),
# Default: 20000ms (20s), increased to 10 minutes for stability
("grpc.keepalive_timeout_ms", 600_000),
# Default: 2, set to 0 to allow unlimited pings without data
("grpc.http2.max_pings_without_data", 0),
# Default: False, set to True to allow keepalive pings when no calls
("grpc.keepalive_permit_without_calls", True),
# Default: 2, set to 0 to allow unlimited ping strikes
("grpc.http2.max_ping_strikes", 0),
# Default: 0 (disabled), enable socket reuse for better handling
("grpc.so_reuseport", 1),
]
self._grpc_channel = grpc.insecure_channel(
endpoint, options=channel_options)
channel_ready = grpc.channel_ready_future(self._grpc_channel)
while True:
if process is not None and process.poll() is not None:
_LOGGER.error("Failed to start job service with %s", process.args)
raise RuntimeError(
'Service failed to start up with error %s' % process.poll())
try:
channel_ready.result(timeout=wait_secs)
break
except (grpc.FutureTimeoutError, grpc.RpcError):
wait_secs *= 1.2
logging.log(
logging.WARNING if wait_secs > 1 else logging.DEBUG,
'Waiting for grpc channel to be ready at %s.',
endpoint)
return self._stub_class(self._grpc_channel)
except: # pylint: disable=bare-except
_LOGGER.exception("Error bringing up service")
self.stop_force()
raise
max_retries = 3
for attempt in range(max_retries):
try:
process, endpoint = self.start_process()
wait_secs = .1
channel_options = [
("grpc.max_receive_message_length", -1),
("grpc.max_send_message_length", -1),
# Default: 20000ms (20s), increased to 10 minutes for stability
("grpc.keepalive_timeout_ms", 600_000),
# Default: 2, set to 0 to allow unlimited pings without data
("grpc.http2.max_pings_without_data", 0),
# Default: False, set to True to allow keepalive pings when no calls
("grpc.keepalive_permit_without_calls", True),
# Default: 2, set to 0 to allow unlimited ping strikes
("grpc.http2.max_ping_strikes", 0),
# Default: 0 (disabled), enable socket reuse for better handling
("grpc.so_reuseport", 1),
]
self._grpc_channel = grpc.insecure_channel(
endpoint, options=channel_options)
channel_ready = grpc.channel_ready_future(self._grpc_channel)
while True:
if process is not None and process.poll() is not None:
_LOGGER.error("Failed to start job service with %s", process.args)
raise RuntimeError(
'Service failed to start up with error %s' % process.poll())
try:
channel_ready.result(timeout=wait_secs)
break
except (grpc.FutureTimeoutError, grpc.RpcError):
wait_secs *= 1.2
logging.log(
logging.WARNING if wait_secs > 1 else logging.DEBUG,
'Waiting for grpc channel to be ready at %s.',
endpoint)
return self._stub_class(self._grpc_channel)
except Exception as e:
_LOGGER.warning(
"Error bringing up service (attempt %d of %d): %s",
attempt + 1,
max_retries,
e)
self.stop_force()
if attempt == max_retries - 1:
raise
time.sleep(1)
Comment thread
shunping marked this conversation as resolved.
except: # pylint: disable=bare-except
self.stop_force()
raise

def start_process(self):
if self._owner_id is not None:
Expand Down
22 changes: 16 additions & 6 deletions sdks/python/apache_beam/utils/subprocess_server_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -514,9 +514,16 @@ def __init__(self):
def poll(self):
return 1 # Simulate that process exited/failed

constructor_calls = 0

def custom_constructor(*args):
nonlocal constructor_calls
constructor_calls += 1
return (dummy_process, "localhost:12345")

dummy_process = DummyProcess()
cache = subprocess_server._SharedCache(
lambda *args: (dummy_process, "localhost:12345"), custom_destructor)
custom_constructor, custom_destructor)

# 1. Register an independent, unrelated owner in the cache first.
other_owner = cache.register()
Expand All @@ -536,11 +543,14 @@ def __init__(self):
self.assertEqual(cache._cache[cache_key].owners, {other_owner})

# 2. Verify starting the server (which registers its own owner and retrieves from cache) raises RuntimeError
with self.assertRaises(RuntimeError):
server.start()

# 3. Verify that the destructor was called on the process, meaning no leak (even though other_owner was still registered!)
self.assertEqual(destructor_calls, [(dummy_process, "localhost:12345")])
with patch('time.sleep'):
with self.assertRaises(RuntimeError):
server.start()
self.assertEqual(constructor_calls, 3)

# 3. Verify that the destructor was called on the process for each retry attempt (3 total),
# meaning there is no leak (even though other_owner was still registered).
self.assertEqual(destructor_calls, [(dummy_process, "localhost:12345")] * 3)

# 4. Verify that the server has cleaned up its owner_id
self.assertIsNone(server._owner_id)
Expand Down
Loading