Retry connect() on SFU full by requesting a different SFU#222
Retry connect() on SFU full by requesting a different SFU#222
Conversation
When the SFU returns an error (e.g. SFU_FULL), the error code was discarded and only the message string was kept. This preserves the full error object so downstream retry logic can inspect the code.
…cket connect_websocket now distinguishes retryable SFU errors (SFU_FULL, SFU_SHUTTING_DOWN, CALL_PARTICIPANT_LIMIT_REACHED) from fatal ones, raising SfuJoinError so the retry loop in connect() can request a different SFU from the coordinator.
Allows the coordinator to exclude full/failed SFUs when assigning a new SFU for the participant, by passing migrating_from and migrating_from_list in the join call body.
When an SFU rejects a join with a retryable error (SFU_FULL, SFU_SHUTTING_DOWN, CALL_PARTICIPANT_LIMIT_REACHED), connect() now retries by asking the coordinator for a different SFU via migrating_from_list, instead of immediately failing.
_RETRYABLE_ERROR_PATTERNS and _is_retryable() used string matching and were never called. Retry logic now uses SFU error codes via SfuJoinError and _RETRYABLE_SFU_ERROR_CODES.
📝 WalkthroughWalkthroughThis change introduces retry capabilities for SFU joins when nodes are full. It implements exponential backoff-based retries, tracks failed SFUs to avoid during subsequent attempts, and propagates migration history through connection retry attempts with comprehensive error handling. Changes
Sequence DiagramsequenceDiagram
participant Client
participant ConnectionManager
participant SFU as SFU (Full)
participant SFU2 as SFU (Alt)
participant Backoff as Exponential Backoff
Client->>ConnectionManager: connect()
activate ConnectionManager
Note over ConnectionManager: Attempt 1
ConnectionManager->>SFU: _connect_internal(migrating_from=None)
SFU-->>ConnectionManager: SfuJoinError(should_retry=True)
Note over ConnectionManager: Check if retryable
alt should_retry == True
ConnectionManager->>ConnectionManager: Track failed SFU<br/>migrating_from_list = [SFU]
ConnectionManager->>Backoff: Calculate wait time<br/>(exponential backoff)
Backoff-->>ConnectionManager: wait_ms
Note over ConnectionManager: Attempt 2
ConnectionManager->>SFU2: _connect_internal(migrating_from_list=[SFU])
SFU2-->>ConnectionManager: Success
ConnectionManager-->>Client: Connected
else should_retry == False
ConnectionManager-->>Client: Raise SfuJoinError
end
deactivate ConnectionManager
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
📝 Coding Plan
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment Tip You can get early access to new features in CodeRabbit.Enable the |
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (2)
tests/test_connection_manager.py (2)
12-29: Prefer a fixture-backedConnectionManagersetup here.This helper patches every collaborator up front, so the retry tests mostly validate a mocked shell instead of the concrete retry plumbing added in this PR. A shared fixture with lightweight fakes would give stronger coverage and better match the repo's test conventions. As per coding guidelines,
**/test_*.py:Use fixtures to inject objects in testsandDo not use mocks or mock objects in tests unless directly requested.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/test_connection_manager.py` around lines 12 - 29, The helper _make_connection_manager currently patches every collaborator (PeerConnectionManager, NetworkMonitor, ReconnectionManager, RecordingManager, SubscriptionManager, ParticipantsState, Tracer) which over-mocks ConnectionManager and masks the real retry behavior; replace this with a fixture that constructs a ConnectionManager using lightweight fakes/stubs for only the external dependencies needed for retry testing (e.g., a FakeReconnectionManager or FakeNetworkMonitor) and register that fixture for tests that call _make_connection_manager; update tests to use the new fixture instead of calling _make_connection_manager so the concrete ConnectionManager.retry/join logic (in ConnectionManager) is exercised while avoiding heavyweight mocks.
62-62: Avoid the real backoff delays in these unit tests.
connect()now retries viaexp_backoff(..., sleep=True), so these assertions pay the real0.5s,1s, etc. waits. That makes the suite slower and less deterministic than it needs to be. Swap in an immediate backoff/sleeper for the tests.Also applies to: 91-91, 109-109, 141-141
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/test_connection_manager.py` at line 62, Tests are incurring real delays because connect() calls exp_backoff(..., sleep=True); in the test setup patch exp_backoff (or the module-level sleep used by it) to a no-op/immediate variant so retries don't actually await. Concretely, in tests/test_connection_manager.py before calling await cm.connect() replace the exp_backoff import used by the ConnectionManager with a stub that calls exp_backoff(..., sleep=False) or returns immediately (or monkeypatch asyncio.sleep to an async lambda that returns None) so backoff waits are skipped; do the same for the other occurrences around lines 91, 109, and 141 by applying the same patch/fixture. Ensure you patch the symbol the code under test imports (the exp_backoff name used by connect()) so the replacement takes effect.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@getstream/video/rtc/connection_utils.py`:
- Around line 461-472: The retryable SignalingError branch can leak the
temporary WebSocketClient started by ws_client.connect(); before raising
SfuJoinError you must cleanly close or shutdown that local ws_client instance so
its background thread/socket is not leaked (the cleanup in _handle_join_failure
only touches self._ws_client, not the local variable). Update the except
SignalingError block to call the ws_client's close/disconnect/shutdown method
(the same method used elsewhere to tear down self._ws_client) on the local
ws_client before raising SfuJoinError when e.error.code is in
_RETRYABLE_SFU_ERROR_CODES, ensuring no resource/thread is left running.
---
Nitpick comments:
In `@tests/test_connection_manager.py`:
- Around line 12-29: The helper _make_connection_manager currently patches every
collaborator (PeerConnectionManager, NetworkMonitor, ReconnectionManager,
RecordingManager, SubscriptionManager, ParticipantsState, Tracer) which
over-mocks ConnectionManager and masks the real retry behavior; replace this
with a fixture that constructs a ConnectionManager using lightweight fakes/stubs
for only the external dependencies needed for retry testing (e.g., a
FakeReconnectionManager or FakeNetworkMonitor) and register that fixture for
tests that call _make_connection_manager; update tests to use the new fixture
instead of calling _make_connection_manager so the concrete
ConnectionManager.retry/join logic (in ConnectionManager) is exercised while
avoiding heavyweight mocks.
- Line 62: Tests are incurring real delays because connect() calls
exp_backoff(..., sleep=True); in the test setup patch exp_backoff (or the
module-level sleep used by it) to a no-op/immediate variant so retries don't
actually await. Concretely, in tests/test_connection_manager.py before calling
await cm.connect() replace the exp_backoff import used by the ConnectionManager
with a stub that calls exp_backoff(..., sleep=False) or returns immediately (or
monkeypatch asyncio.sleep to an async lambda that returns None) so backoff waits
are skipped; do the same for the other occurrences around lines 91, 109, and 141
by applying the same patch/fixture. Ensure you patch the symbol the code under
test imports (the exp_backoff name used by connect()) so the replacement takes
effect.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 3d50b4fa-9c75-46c8-a3c2-2e991e5b7faa
📒 Files selected for processing (8)
getstream/video/rtc/connection_manager.pygetstream/video/rtc/connection_utils.pygetstream/video/rtc/coordinator/backoff.pygetstream/video/rtc/signaling.pytests/rtc/coordinator/test_backoff.pytests/test_connection_manager.pytests/test_connection_utils.pytests/test_signaling.py
WebSocketClient starts a background thread on creation. If connect() fails (e.g. SFU full), the client was not being closed, leaking the thread.
There was a problem hiding this comment.
🧹 Nitpick comments (1)
getstream/video/rtc/connection_utils.py (1)
92-96: Replace hardcoded SFU error literals with protobuf constants.The set uses raw error codes (700, 600, 301) instead of the protobuf-generated constants already available from
models_pb2. Using named constants likeERROR_CODE_SFU_FULL,ERROR_CODE_SFU_SHUTTING_DOWN, andERROR_CODE_CALL_PARTICIPANT_LIMIT_REACHEDprovides safer coupling to the protobuf definitions and improves maintainability.♻️ Proposed refactor
from getstream.video.rtc.pb.stream.video.sfu.models.models_pb2 import ( + ERROR_CODE_CALL_PARTICIPANT_LIMIT_REACHED, + ERROR_CODE_SFU_FULL, + ERROR_CODE_SFU_SHUTTING_DOWN, TRACK_TYPE_AUDIO, TRACK_TYPE_VIDEO, ClientDetails, @@ _RETRYABLE_SFU_ERROR_CODES = { - 700, # ERROR_CODE_SFU_FULL - 600, # ERROR_CODE_SFU_SHUTTING_DOWN - 301, # ERROR_CODE_CALL_PARTICIPANT_LIMIT_REACHED + ERROR_CODE_SFU_FULL, + ERROR_CODE_SFU_SHUTTING_DOWN, + ERROR_CODE_CALL_PARTICIPANT_LIMIT_REACHED, }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@getstream/video/rtc/connection_utils.py` around lines 92 - 96, Replace the hardcoded numeric SFU error codes in the _RETRYABLE_SFU_ERROR_CODES set with the protobuf-generated constants from models_pb2: use models_pb2.ERROR_CODE_SFU_FULL, models_pb2.ERROR_CODE_SFU_SHUTTING_DOWN, and models_pb2.ERROR_CODE_CALL_PARTICIPANT_LIMIT_REACHED; ensure models_pb2 is imported in the module and update the _RETRYABLE_SFU_ERROR_CODES set to reference those symbols instead of 700, 600, 301.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@getstream/video/rtc/connection_utils.py`:
- Around line 92-96: Replace the hardcoded numeric SFU error codes in the
_RETRYABLE_SFU_ERROR_CODES set with the protobuf-generated constants from
models_pb2: use models_pb2.ERROR_CODE_SFU_FULL,
models_pb2.ERROR_CODE_SFU_SHUTTING_DOWN, and
models_pb2.ERROR_CODE_CALL_PARTICIPANT_LIMIT_REACHED; ensure models_pb2 is
imported in the module and update the _RETRYABLE_SFU_ERROR_CODES set to
reference those symbols instead of 700, 600, 301.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 5f80bb44-f7b9-4bf6-81ec-175d2b8c6c0b
📒 Files selected for processing (1)
getstream/video/rtc/connection_utils.py
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@tests/test_connection_manager.py`:
- Around line 18-35: The helper _make_connection_manager currently constructs
ConnectionManager using broad patch(...) calls and a MagicMock call object;
replace this with a pytest fixture that injects minimal real or lightweight test
doubles for the required dependencies and a simple call object, and remove the
blanket patch scaffolding: create a fixture (e.g., connection_manager or
cm_factory) that instantiates ConnectionManager(call=SimpleCall(id="test_call",
call_type="default", user_id="user1"), user_id="user1", max_join_retries=...)
and pass in only the specific mocked external funcs or classes via pytest
monkeypatch if absolutely necessary; update tests to use the fixture instead of
_make_connection_manager and remove references to
patch("getstream.video.rtc.connection_manager.PeerConnectionManager") etc.,
keeping mocking targeted to external side-effects only.
- Around line 124-152: Update the test_cleans_up_ws_client_between_retries test
to assert that the partial WebSocket client is properly closed and cleared
before retry: when the first mock_connect_internal simulates a partial WS
connection by assigning cm._ws_client = MagicMock(), ensure you assert
cm._ws_client.close() was awaited/called and that cm._ws_client is set to None
prior to the second attempt; place these assertions either by checking the
MagicMock's close call after await cm.connect() or by injecting verification
into the mock flow so that connect() behavior (using _connect_internal,
_connect_coordinator_ws, and SfuJoinError) confirms the cleanup occurred.
- Around line 49-77: The test currently appends the mutable migrating_from_list
reference to received_migrating_from_list causing later mutations to affect
earlier snapshots; change the append to store an immutable snapshot (e.g.,
append a shallow copy or tuple) so each attempt's state is preserved — update
the line using received_migrating_from_list.append(migrating_from_list) to
append a copy (e.g., list(migrating_from_list) or tuple(migrating_from_list)) so
assertions against received_migrating_from_list[0..2] reflect the per-attempt
values.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: dabe67c3-7b85-4df0-a6ea-435febce94d5
⛔ Files ignored due to path filters (1)
uv.lockis excluded by!**/*.lock
📒 Files selected for processing (1)
tests/test_connection_manager.py
Replace helper method with a shared fixture that handles dependency patching, backoff mocking, and coordinator setup in one place.
…t in test_connection_manager Extract mock_ws_client and coordinator_request fixtures to reduce duplication. Snapshot migrating_from_list per attempt to avoid mutable reference aliasing in assertions.
Keeps exp_backoff as a pure delay generator, consistent with how coordinator/ws.py uses it. The caller (connect()) handles asyncio.sleep explicitly.
Why
When an SFU server is full,
connect()immediately fails withSfuConnectionError: WebSocket connection failed: Connection failed: server is full. Instead of failing, we should ask the coordinator for a different SFU — matching the behavior of the JS SDK (stream-video-js#2121).Changes
SignalingError(previously discarded, only the message string was kept)SfuJoinErrorexception with error code detection for retryable SFU errors (SFU_FULL,SFU_SHUTTING_DOWN,CALL_PARTICIPANT_LIMIT_REACHED)migrating_fromandmigrating_from_listto coordinator join request so it excludes full SFUsconnect()with exponential backoff (0.5s, 1s, 2s) — on retryable error, request a new SFU from the coordinator_RETRYABLE_ERROR_PATTERNS,_is_retryable())Summary by CodeRabbit
Release Notes
New Features
Improvements