Skip to content

Commit d97d6b1

Browse files
committed
feat(streaming): add sliding TTL to Redis stream keys
Mirror scaleapi/scale-agentex#215 (server-side adapter): pipeline XADD with EXPIRE so each task:* stream key gets a sliding TTL. Orphaned streams (no writes for the TTL window) self-delete in Redis without needing an explicit cleanup_stream call from the caller. This is the right shape of fix for the SDK's leak: an explicit DEL on terminal task transitions (an earlier draft of this PR) introduced a race where the server's task_updated event published to the same topic could be deleted before a connected frontend SSE consumer read it. EXPIRE sidesteps that — TTL only fires after inactivity, so an actively-streaming agent or actively-reading consumer keeps the key alive, and the key only ages out once everyone is done with it. Defaults match the server: REDIS_STREAM_TTL_SECONDS=3600 (1h), overridable via env var. Setting it to 0 short-circuits to plain XADD (no TTL refresh), matching the server's escape hatch. Implementation notes: - transaction=False on the pipeline: connection-level batching, no MULTI/EXEC overhead for what's already a fast op. - raise_on_error=False: an EXPIRE failure after a successful XADD must not surface to the caller. The message has been published; retrying would duplicate it. We log and move on. Next successful XADD will reset the TTL anyway.
1 parent 46ed066 commit d97d6b1

1 file changed

Lines changed: 49 additions & 7 deletions

File tree

src/agentex/lib/core/adapters/streams/adapter_redis.py

Lines changed: 49 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717

1818
_DEFAULT_STREAM_MAXLEN = 10000
19+
_DEFAULT_STREAM_TTL_SECONDS = 3600
1920

2021

2122
class RedisStreamRepository(StreamRepository):
@@ -24,7 +25,12 @@ class RedisStreamRepository(StreamRepository):
2425
Optimized for text/JSON streaming with SSE.
2526
"""
2627

27-
def __init__(self, redis_url: str | None = None, stream_maxlen: int | None = None):
28+
def __init__(
29+
self,
30+
redis_url: str | None = None,
31+
stream_maxlen: int | None = None,
32+
stream_ttl_seconds: int | None = None,
33+
):
2834
# Get Redis URL from environment if not provided
2935
self.redis_url = redis_url or os.environ.get(
3036
"REDIS_URL", "redis://localhost:6379"
@@ -35,6 +41,14 @@ def __init__(self, redis_url: str | None = None, stream_maxlen: int | None = Non
3541
if stream_maxlen is not None
3642
else int(os.environ.get("REDIS_STREAM_MAXLEN", _DEFAULT_STREAM_MAXLEN))
3743
)
44+
# 0 disables sliding TTL.
45+
self.stream_ttl_seconds = (
46+
stream_ttl_seconds
47+
if stream_ttl_seconds is not None
48+
else int(
49+
os.environ.get("REDIS_STREAM_TTL_SECONDS", _DEFAULT_STREAM_TTL_SECONDS)
50+
)
51+
)
3852

3953
@override
4054
async def send_event(self, topic: str, event: dict[str, Any]) -> str:
@@ -55,12 +69,40 @@ async def send_event(self, topic: str, event: dict[str, Any]) -> str:
5569
# # Uncomment to debug
5670
# logger.info(f"Sending event to Redis stream {topic}: {event_json}")
5771

58-
message_id = await self.redis.xadd(
59-
name=topic,
60-
fields={"data": event_json},
61-
maxlen=self.stream_maxlen,
62-
approximate=True,
63-
)
72+
# Pipeline XADD + EXPIRE in one round-trip so the stream key gets
73+
# a sliding TTL — orphaned streams (no writes for the TTL window)
74+
# self-delete. Mirrors the server-side adapter (scaleapi/scale-agentex#215).
75+
if self.stream_ttl_seconds > 0:
76+
async with self.redis.pipeline(transaction=False) as pipe:
77+
pipe.xadd(
78+
name=topic,
79+
fields={"data": event_json},
80+
maxlen=self.stream_maxlen,
81+
approximate=True,
82+
)
83+
pipe.expire(name=topic, time=self.stream_ttl_seconds)
84+
# raise_on_error=False so an EXPIRE failure does not surface
85+
# to the caller after XADD already succeeded — that would
86+
# risk callers retrying and duplicating messages. A failed
87+
# TTL refresh is recoverable: MAXLEN still caps RAM and the
88+
# next write resets the clock.
89+
results = await pipe.execute(raise_on_error=False)
90+
# results[0] = xadd message ID (or Exception)
91+
# results[1] = expire bool (or Exception)
92+
message_id = results[0]
93+
if isinstance(message_id, Exception):
94+
raise message_id
95+
if isinstance(results[1], Exception):
96+
logger.warning(
97+
f"Failed to refresh TTL on stream {topic}: {results[1]}"
98+
)
99+
else:
100+
message_id = await self.redis.xadd(
101+
name=topic,
102+
fields={"data": event_json},
103+
maxlen=self.stream_maxlen,
104+
approximate=True,
105+
)
64106

65107
return message_id
66108
except Exception as e:

0 commit comments

Comments
 (0)