Skip to content

Commit 46ed066

Browse files
committed
perf(streaming): add MAXLEN to Redis xadd to prevent unbounded growth
The SDK's RedisStreamRepository.send_event was calling xadd with no MAXLEN, so every task:* stream grew unbounded for the lifetime of the task. The accompanying comment ("Add to Redis stream with a reasonable max length") suggested the cap was intended but never wired up. This change matches the agentex server-side adapter, which has had maxlen=REDIS_STREAM_MAXLEN, approximate=True since Jan 2 (PR #111 in scaleapi/scale-agentex). Default is 10000 entries, overridable via the REDIS_STREAM_MAXLEN env var, same as the server. Note: this caps each stream's size during generation but does not delete streams when their task completes -- that's a separate fix.
1 parent 09a816c commit 46ed066

1 file changed

Lines changed: 11 additions & 2 deletions

File tree

src/agentex/lib/core/adapters/streams/adapter_redis.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,26 @@
1515
logger = make_logger(__name__)
1616

1717

18+
_DEFAULT_STREAM_MAXLEN = 10000
19+
20+
1821
class RedisStreamRepository(StreamRepository):
1922
"""
2023
A simplified Redis implementation of the EventStreamRepository interface.
2124
Optimized for text/JSON streaming with SSE.
2225
"""
2326

24-
def __init__(self, redis_url: str | None = None):
27+
def __init__(self, redis_url: str | None = None, stream_maxlen: int | None = None):
2528
# Get Redis URL from environment if not provided
2629
self.redis_url = redis_url or os.environ.get(
2730
"REDIS_URL", "redis://localhost:6379"
2831
)
2932
self.redis = redis.from_url(self.redis_url)
33+
self.stream_maxlen = (
34+
stream_maxlen
35+
if stream_maxlen is not None
36+
else int(os.environ.get("REDIS_STREAM_MAXLEN", _DEFAULT_STREAM_MAXLEN))
37+
)
3038

3139
@override
3240
async def send_event(self, topic: str, event: dict[str, Any]) -> str:
@@ -47,10 +55,11 @@ async def send_event(self, topic: str, event: dict[str, Any]) -> str:
4755
# # Uncomment to debug
4856
# logger.info(f"Sending event to Redis stream {topic}: {event_json}")
4957

50-
# Add to Redis stream with a reasonable max length
5158
message_id = await self.redis.xadd(
5259
name=topic,
5360
fields={"data": event_json},
61+
maxlen=self.stream_maxlen,
62+
approximate=True,
5463
)
5564

5665
return message_id

0 commit comments

Comments
 (0)