Skip to content

Commit 708d696

Browse files
authored
fix: sse error handling (#107)
1 parent 5ffe4d1 commit 708d696

File tree

4 files changed

+347
-12
lines changed

4 files changed

+347
-12
lines changed

devcycle_python_sdk/managers/config_manager.py

100644100755
Lines changed: 98 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,12 @@ def __init__(
4040
self._config_etag: Optional[str] = None
4141
self._config_lastmodified: Optional[str] = None
4242

43+
# Exponential backoff configuration
44+
self._sse_reconnect_attempts = 0
45+
self._min_reconnect_interval = 5.0 # Start at 5 seconds
46+
self._max_reconnect_interval = 300.0 # Cap at 5 minutes
47+
self._last_reconnect_attempt_time: Optional[float] = None
48+
self._sse_reconnecting = False
4349
self._config_api_client = ConfigAPIClient(self._sdk_key, self._options)
4450

4551
self._polling_enabled = True
@@ -49,6 +55,48 @@ def __init__(
4955
def is_initialized(self) -> bool:
5056
return self._config is not None
5157

58+
def _recreate_sse_connection(self):
59+
"""Recreate the SSE connection with the current config."""
60+
if self._config is None or self._options.disable_realtime_updates:
61+
logger.debug(
62+
"DevCycle: Skipping SSE recreation - no config or updates disabled"
63+
)
64+
return
65+
66+
# Update timestamp right before attempting connection
67+
self._last_reconnect_attempt_time = time.time()
68+
69+
try:
70+
# Close existing connection if present
71+
if self._sse_manager is not None and self._sse_manager.client is not None:
72+
self._sse_manager.client.close()
73+
if self._sse_manager.read_thread.is_alive():
74+
self._sse_manager.read_thread.join(timeout=1.0)
75+
76+
# Create new SSE manager
77+
self._sse_manager = SSEManager(
78+
self.sse_state,
79+
self.sse_error,
80+
self.sse_message,
81+
)
82+
self._sse_manager.update(self._config)
83+
84+
except Exception as e:
85+
logger.debug(f"DevCycle: Failed to recreate SSE connection: {e}")
86+
87+
def _delayed_sse_reconnect(self, delay_seconds: float):
88+
"""Delayed SSE reconnection with configurable backoff."""
89+
try:
90+
logger.debug(
91+
f"DevCycle: Waiting {delay_seconds}s before reconnecting SSE..."
92+
)
93+
time.sleep(delay_seconds)
94+
self._recreate_sse_connection()
95+
except Exception as e:
96+
logger.error(f"DevCycle: Error during delayed SSE reconnection: {e}")
97+
finally:
98+
self._sse_reconnecting = False
99+
52100
def _get_config(self, last_modified: Optional[float] = None):
53101
try:
54102
lm_header = self._config_lastmodified
@@ -87,12 +135,10 @@ def _get_config(self, last_modified: Optional[float] = None):
87135
or self._sse_manager.client is None
88136
or not self._sse_manager.read_thread.is_alive()
89137
):
90-
self._sse_manager = SSEManager(
91-
self.sse_state,
92-
self.sse_error,
93-
self.sse_message,
138+
logger.info(
139+
"DevCycle: SSE connection not active, creating new connection"
94140
)
95-
self._sse_manager.update(self._config)
141+
self._recreate_sse_connection()
96142

97143
if (
98144
trigger_on_client_initialized
@@ -101,7 +147,6 @@ def _get_config(self, last_modified: Optional[float] = None):
101147
try:
102148
self._options.on_client_initialized()
103149
except Exception as e:
104-
# consume any error
105150
logger.warning(
106151
f"DevCycle: Error received from on_client_initialized callback: {str(e)}"
107152
)
@@ -122,7 +167,6 @@ def run(self):
122167
self._get_config()
123168
except Exception as e:
124169
if self._polling_enabled:
125-
# Only log a warning if we're still polling
126170
logger.warning(
127171
f"DevCycle: Error polling for config changes: {str(e)}"
128172
)
@@ -137,6 +181,7 @@ def sse_message(self, message: ld_eventsource.actions.Event):
137181
self.sse_state(None)
138182
logger.info(f"DevCycle: Received message: {message.data}")
139183
sse_message = json.loads(message.data)
184+
140185
dvc_data = json.loads(sse_message.get("data"))
141186
if (
142187
dvc_data.get("type") == "refetchConfig"
@@ -145,15 +190,60 @@ def sse_message(self, message: ld_eventsource.actions.Event):
145190
):
146191
logger.info("DevCycle: Received refetchConfig message - updating config")
147192
self._get_config(dvc_data["lastModified"] / 1000.0)
193+
# SSE connection healthy, reconnect attempts reset.
194+
if dvc_data.get("type") == "ping" or dvc_data.get("type") == "refetchConfig":
195+
self._sse_reconnect_attempts = 0
148196

149197
def sse_error(self, error: ld_eventsource.actions.Fault):
150198
self._sse_connected = False
151-
logger.debug(f"DevCycle: Received SSE error: {error}")
199+
logger.debug(f"DevCycle: SSE connection error: {error.error}")
200+
current_time = time.time()
201+
202+
if self._sse_reconnecting:
203+
logger.debug("DevCycle: Reconnection already in progress, skipping")
204+
return
205+
206+
# Calculate exponential backoff interval (capped at max)
207+
backoff_interval = min(
208+
self._min_reconnect_interval * (2**self._sse_reconnect_attempts),
209+
self._max_reconnect_interval,
210+
)
211+
212+
# Check if we need to wait for remaining backoff time
213+
delay_seconds = backoff_interval
214+
if self._last_reconnect_attempt_time is not None:
215+
time_since_last_attempt = current_time - self._last_reconnect_attempt_time
216+
if time_since_last_attempt < backoff_interval:
217+
delay_seconds = backoff_interval - time_since_last_attempt
218+
logger.debug(
219+
f"DevCycle: Within backoff period, scheduling reconnection in {delay_seconds:.1f}s"
220+
)
221+
222+
self._sse_reconnecting = True
223+
self._sse_reconnect_attempts += 1
224+
225+
logger.debug(
226+
f"DevCycle: Attempting SSE reconnection (attempt #{self._sse_reconnect_attempts}, "
227+
f"backoff: {delay_seconds:.1f}s)"
228+
)
229+
230+
reconnect_thread = threading.Thread(
231+
target=self._delayed_sse_reconnect, args=(delay_seconds,), daemon=True
232+
)
233+
reconnect_thread.start()
152234

153235
def sse_state(self, state: Optional[ld_eventsource.actions.Start]):
154236
if not self._sse_connected:
155237
self._sse_connected = True
156238
logger.info("DevCycle: Connected to SSE stream")
157239

240+
# Clear reconnection state
241+
self._sse_reconnecting = False
242+
self._last_reconnect_attempt_time = None
243+
else:
244+
logger.debug("DevCycle: SSE keepalive received")
245+
158246
def close(self):
159247
self._polling_enabled = False
248+
if self._sse_manager is not None and self._sse_manager.client is not None:
249+
self._sse_manager.client.close()

devcycle_python_sdk/managers/sse_manager.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,9 @@ def read_events(
3333
handle_error: Callable[[ld_eventsource.actions.Fault], None],
3434
handle_message: Callable[[ld_eventsource.actions.Event], None],
3535
):
36-
self.client.start()
3736
try:
37+
self.client.start()
38+
logger.info("DevCycle: SSE connection created successfully")
3839
for event in self.client.all:
3940
if isinstance(event, ld_eventsource.actions.Start):
4041
handle_state(event)
@@ -45,7 +46,11 @@ def read_events(
4546
elif isinstance(event, ld_eventsource.actions.Comment):
4647
handle_state(None)
4748
except Exception as e:
48-
logger.debug(f"DevCycle: failed to read SSE message: {e}")
49+
logger.debug(f"DevCycle SSE: Error in read loop: {e}")
50+
fault_event = ld_eventsource.actions.Fault(error=e)
51+
handle_error(fault_event)
52+
finally:
53+
logger.debug("DevCycle SSE: Connection closed")
4954

5055
def update(self, config: dict):
5156
if self.use_new_config(config["sse"]):
@@ -66,6 +71,6 @@ def update(self, config: dict):
6671

6772
def use_new_config(self, config: dict) -> bool:
6873
new_url = config["hostname"] + config["path"]
69-
if self.url == "" or self.url is None and new_url != "":
74+
if (self.url == "" or self.url is None) and new_url != "":
7075
return True
7176
return self.url != new_url

example/cloud_client_example.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ def main():
6161
client.track(user, event)
6262

6363
except Exception as e:
64-
logger.exception(f"Exception when calling Devcycle API: {e}\n")
64+
logger.exception(f"Exception when calling DevCycle API: {e}\n")
6565

6666

6767
if __name__ == "__main__":

0 commit comments

Comments
 (0)