Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
f68bbf7
feat: preserve SFU error code in SignalingError
aliev Mar 17, 2026
bd33f67
feat: add SfuJoinError and retryable error detection in connect_webso…
aliev Mar 17, 2026
369a085
feat: pass migrating_from to coordinator join request
aliev Mar 17, 2026
8678b80
feat: retry connect() on SFU full by requesting a different SFU
aliev Mar 17, 2026
8f73de8
chore: remove dead retry code replaced by error-code-based detection
aliev Mar 17, 2026
6a352b9
style: remove extra blank line left after dead code removal
aliev Mar 17, 2026
536f95d
style: apply ruff formatting
aliev Mar 17, 2026
311447d
style: apply ruff formatting to test_signaling.py
aliev Mar 17, 2026
61da611
refactor: extract _handle_join_failure from connect() retry loop
aliev Mar 17, 2026
2cf9b73
refactor: use exp_backoff with sleep parameter in connect() retry loop
aliev Mar 17, 2026
bacbce1
refactor: move ConnectionManager import to module level in tests
aliev Mar 17, 2026
5af1ac6
Merge branch 'main' into feat/retry-connect-on-sfu-full
aliev Mar 17, 2026
9db988d
fix: close ws_client on connect_websocket failure to prevent thread leak
aliev Mar 17, 2026
6a34c05
test: mock exp_backoff in connect() tests to avoid real sleep delays
aliev Mar 17, 2026
857d3c7
chore: update uv.lock
aliev Mar 17, 2026
efd9b09
refactor: use pytest fixture for ConnectionManager setup in tests
aliev Mar 17, 2026
ab8a27c
refactor: use fixtures in test_connection_utils, snapshot mutable lis…
aliev Mar 17, 2026
df9a07c
test: assert ws_client cleanup behavior, not just retry count
aliev Mar 17, 2026
de6d889
refactor: remove sleep param from exp_backoff, keep sleep in caller
aliev Mar 18, 2026
23dc2f2
refactor: remove redundant _instant_backoff test helper
aliev Mar 18, 2026
dd99813
test: assert retry count in exhausted-retries test
aliev Mar 18, 2026
66708cd
refactor: extract _connect_with_sfu_reassignment from connect()
aliev Mar 18, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 62 additions & 2 deletions getstream/video/rtc/connection_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
from getstream.video.rtc.connection_utils import (
ConnectionState,
SfuConnectionError,
SfuJoinError,
ConnectionOptions,
connect_websocket,
join_call,
watch_call,
)
from getstream.video.rtc.coordinator.backoff import exp_backoff
from getstream.video.rtc.track_util import (
fix_sdp_msid_semantic,
fix_sdp_rtcp_fb,
Expand Down Expand Up @@ -55,6 +57,7 @@ def __init__(
user_id: Optional[str] = None,
create: bool = True,
subscription_config: Optional[SubscriptionConfig] = None,
max_join_retries: int = 3,
**kwargs: Any,
):
super().__init__()
Expand All @@ -68,6 +71,7 @@ def __init__(
self.session_id: str = str(uuid.uuid4())
self.join_response: Optional[JoinCallResponse] = None
self.local_sfu: bool = False # Local SFU flag for development
self._max_join_retries: int = max_join_retries

# Private attributes
self._connection_state: ConnectionState = ConnectionState.IDLE
Expand Down Expand Up @@ -282,6 +286,7 @@ async def _connect_internal(
ws_url: Optional[str] = None,
token: Optional[str] = None,
session_id: Optional[str] = None,
migrating_from_list: Optional[list] = None,
) -> None:
"""
Internal connection method that handles the core connection logic.
Expand Down Expand Up @@ -324,6 +329,10 @@ async def _connect_internal(
"auto",
self.create,
self.local_sfu,
migrating_from=migrating_from_list[-1]
if migrating_from_list
else None,
migrating_from_list=migrating_from_list,
**self.kwargs,
)
ws_url = join_response.data.credentials.server.ws_endpoint
Expand Down Expand Up @@ -395,6 +404,8 @@ async def _connect_internal(
logger.exception(f"No join response from WebSocket: {sfu_event}")

logger.debug(f"WebSocket connected successfully to {ws_url}")
except SfuJoinError:
raise
except Exception as e:
logger.exception(f"Failed to connect WebSocket to {ws_url}: {e}")
raise SfuConnectionError(f"WebSocket connection failed: {e}") from e
Expand Down Expand Up @@ -427,7 +438,8 @@ async def connect(self):
Connect to SFU.
This method automatically handles retry logic for transient errors
like "server is full" and network issues.
like "server is full" by requesting a different SFU from the
coordinator.
"""
logger.info("Connecting to SFU")
# Fire-and-forget the coordinator WS connection so we don't block here
Expand All @@ -445,7 +457,55 @@ def _on_coordinator_task_done(task: asyncio.Task):
logger.exception("Coordinator WS task failed")

self._coordinator_task.add_done_callback(_on_coordinator_task_done)
await self._connect_internal()

await self._connect_with_sfu_reassignment()

async def _connect_with_sfu_reassignment(self) -> None:
"""Try connecting to SFU, reassigning to a different one on failure."""
failed_sfus: list[str] = []
last_error: Optional[SfuJoinError] = None

# First attempt without delay
attempt = 0
try:
await self._connect_internal()
return
except SfuJoinError as e:
last_error = e
self._handle_join_failure(e, attempt, failed_sfus)

# Retries with exponential backoff, requesting a different SFU
async for delay in exp_backoff(max_retries=self._max_join_retries, base=0.5):
attempt += 1
logger.info(f"Retrying in {delay}s with different SFU...")
await asyncio.sleep(delay)
try:
await self._connect_internal(
migrating_from_list=failed_sfus if failed_sfus else None,
)
return
except SfuJoinError as e:
last_error = e
self._handle_join_failure(e, attempt, failed_sfus)

raise last_error # type: ignore[misc]

def _handle_join_failure(
self, error: SfuJoinError, attempt: int, failed_sfus: list[str]
) -> None:
"""Track a failed SFU and clean up partial connection state."""
if self.join_response and self.join_response.credentials:
edge = self.join_response.credentials.server.edge_name
if edge and edge not in failed_sfus:
failed_sfus.append(edge)
logger.warning(
f"SFU join failed (attempt {attempt + 1}/{1 + self._max_join_retries}, "
f"code={error.error_code}). Failed SFUs: {failed_sfus}"
)
if self._ws_client:
self._ws_client.close()
self._ws_client = None
self.connection_state = ConnectionState.IDLE

async def wait(self):
"""
Expand Down
80 changes: 40 additions & 40 deletions getstream/video/rtc/connection_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,20 +58,6 @@
"connect_websocket",
]

# Private constants - internal use only
_RETRYABLE_ERROR_PATTERNS = [
"server is full",
"server overloaded",
"capacity exceeded",
"try again later",
"service unavailable",
"connection timeout",
"network error",
"temporary failure",
"connection refused",
"connection reset",
]


# Public classes and exceptions
class ConnectionState(Enum):
Expand All @@ -94,6 +80,22 @@ class SfuConnectionError(Exception):
pass


class SfuJoinError(SfuConnectionError):
"""Raised when SFU join fails with a retryable error code."""

def __init__(self, message: str, error_code: int = 0, should_retry: bool = False):
super().__init__(message)
self.error_code = error_code
self.should_retry = should_retry


_RETRYABLE_SFU_ERROR_CODES = {
700, # ERROR_CODE_SFU_FULL
600, # ERROR_CODE_SFU_SHUTTING_DOWN
301, # ERROR_CODE_CALL_PARTICIPANT_LIMIT_REACHED
}


@dataclass
class ConnectionOptions:
"""Options for the connection process."""
Expand Down Expand Up @@ -175,6 +177,8 @@ async def join_call_coordinator_request(
notify: Optional[bool] = None,
video: Optional[bool] = None,
location: Optional[str] = None,
migrating_from: Optional[str] = None,
migrating_from_list: Optional[list] = None,
) -> StreamResponse[JoinCallResponse]:
"""Make a request to join a call via the coordinator.

Expand Down Expand Up @@ -208,6 +212,10 @@ async def join_call_coordinator_request(
video=video,
data=data,
)
if migrating_from:
json_body["migrating_from"] = migrating_from
if migrating_from_list:
json_body["migrating_from_list"] = migrating_from_list

# Make the POST request to join the call
return await client.post(
Expand Down Expand Up @@ -423,6 +431,8 @@ async def connect_websocket(
"""
logger.info(f"Connecting to WebSocket at {ws_url}")

ws_client = None
success = False
try:
# Create JoinRequest for WebSocket connection
join_request = await create_join_request(token, session_id)
Expand All @@ -448,34 +458,24 @@ async def connect_websocket(
sfu_event = await ws_client.connect()

logger.debug("WebSocket connection established")
success = True
return ws_client, sfu_event

except SignalingError as e:
if (
e.error
and hasattr(e.error, "code")
and e.error.code in _RETRYABLE_SFU_ERROR_CODES
):
raise SfuJoinError(
str(e),
error_code=e.error.code,
should_retry=True,
) from e
raise
except Exception as e:
logger.error(f"Failed to connect WebSocket to {ws_url}: {e}")
raise SignalingError(f"WebSocket connection failed: {e}")


# Private functions
def _is_retryable(retry_state: Any) -> bool:
"""Check if an error should be retried.

Args:
retry_state: The retry state object from tenacity

Returns:
True if the error should be retried, False otherwise
"""
# Extract the actual exception from the retry state
if hasattr(retry_state, "outcome") and retry_state.outcome.failed:
error = retry_state.outcome.exception()
else:
return False

# Import here to avoid circular imports
from getstream.video.rtc.signaling import SignalingError

if not isinstance(error, (SignalingError, SfuConnectionError)):
return False

error_message = str(error).lower()
return any(pattern in error_message for pattern in _RETRYABLE_ERROR_PATTERNS)
finally:
if ws_client and not success:
ws_client.close()
10 changes: 7 additions & 3 deletions getstream/video/rtc/signaling.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
class SignalingError(Exception):
"""Exception raised for errors in the signaling process."""

pass
def __init__(self, message: str, error=None):
super().__init__(message)
self.error = error


class WebSocketClient(StreamAsyncIOEventEmitter):
Expand Down Expand Up @@ -111,8 +113,10 @@ async def connect(self):

# Check if the first message is an error
if self.first_message and self.first_message.HasField("error"):
error_msg = self.first_message.error.error.message
raise SignalingError(f"Connection failed: {error_msg}")
sfu_error = self.first_message.error.error
raise SignalingError(
f"Connection failed: {sfu_error.message}", error=sfu_error
)

# Check if we got join_response
if self.first_message and self.first_message.HasField("join_response"):
Expand Down
Loading
Loading