Skip to content

Commit 0258aa5

Browse files
committed
fix(streaming): address greptile review findings
- _run: when CancelledError is raised mid-flush in the for-loop, re-enqueue the in-flight item plus any remaining items in the local `drained` list back into self._buf so close()'s final drain can recover them. Previously the local `drained` list was unreachable after CancelledError exited the for-loop, causing the last coalesced batch to be silently dropped on close-during-flush races. Trade-off: the in-flight item may be duplicated on the consumer side (Redis pub may have completed before cancel was delivered), which is preferable to silent loss for streaming UX. - _merge_pair: replace `return b` fallback with AssertionError. All six current TaskMessageDelta variants have explicit isinstance branches, so the fallback is unreachable today. But _can_merge returns True for any same-type pair, so adding a 7th delta variant without updating _merge_pair would silently drop `a`'s accumulated content. Asserting turns a future silent data-loss into an immediate, diagnosable crash.
1 parent 67c316d commit 0258aa5

1 file changed

Lines changed: 13 additions & 2 deletions

File tree

src/agentex/lib/core/services/adk/streaming.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,10 @@ def _merge_pair(a: TaskMessageDelta, b: TaskMessageDelta) -> TaskMessageDelta:
122122
name=a.name,
123123
content_delta=(a.content_delta or "") + (b.content_delta or ""),
124124
)
125-
return b
125+
raise AssertionError(
126+
f"_can_merge approved {type(a).__name__} pair but _merge_pair has no handler — "
127+
"a new TaskMessageDelta variant was added without updating both functions"
128+
)
126129

127130

128131
def _merge_consecutive(updates: list[StreamTaskMessageDelta]) -> list[StreamTaskMessageDelta]:
@@ -189,9 +192,17 @@ async def _run(self) -> None:
189192
async with self._lock:
190193
self._flush_signal.clear()
191194
drained = self._drain_locked()
192-
for u in drained:
195+
for idx, u in enumerate(drained):
193196
try:
194197
await self._on_flush(u)
198+
except asyncio.CancelledError:
199+
# Re-enqueue the item being flushed plus any remaining so
200+
# close()'s final drain can recover them. May cause a
201+
# duplicate publish of the in-flight item, which is
202+
# preferable to silent loss for a streaming UX.
203+
async with self._lock:
204+
self._buf = drained[idx:] + self._buf
205+
raise
195206
except Exception as e:
196207
logger.exception(f"CoalescingBuffer flush failed: {e}")
197208
except asyncio.CancelledError:

0 commit comments

Comments
 (0)