BRIC-14: SSE stream emits final Message JSON event#16
Conversation
The stream endpoint POST /api/v1/chat/{thread_id}/stream now emits a final
event: message with the complete Message JSON before event: done, allowing
clients behind Cloudflare to use streaming and still get the full structured
response (identical format to the sync endpoint).
Changes:
- Add stream_with_message() to AgentRunner port and DeepAgentRunner adapter
- Refactor shared streaming logic into _yield_chunks() private method
- Modify StreamMessageUseCase to yield str | Message via stream_with_message()
- Update SSE route to emit event: message with Message model_dump_json()
- Update WebSocket route to send final Message as JSON with type field
- Guard _build_response against empty messages (AgentError)
- Fix WebSocket Message serialization (mode='json' for datetime)
- Update README with new SSE protocol and WebSocket docs
- Add 9 new tests for stream_with_message and SSE message event
…nt:done otherwise The stream endpoint now conditionally emits the final event: - If the Message has a structured_response: emits event:message as last event (no event:done) - If no structured_response: emits event:done as last event (original behavior) This ensures backward compatibility while giving clients that need the full JSON a clear signal via event:message when structured output is available.
📋 Code Review - BRIC-14: SSE stream emits final Message JSON eventScore: 8/10 ⭐⭐⭐⭐⭐⭐⭐⭐☆☆✅ Points Positifs1. Refactoring DRY exemplaire
2. Gestion d'erreurs défensive
3. Architecture propre
4. Typing correct
5. Tests fournis
6. Documentation mise à jour
|
Stream protocol simplified: - Chunks: data: <text> - Message JSON: data: <Message JSON> (second-to-last data line) - Terminator: data: [DONE] (last data line) No named events. Client just reads data lines: last is [DONE], second-to-last is the full Message JSON when structured output is present.
Kaiohz
left a comment
There was a problem hiding this comment.
🔍 Code Review — BRIC-14: SSE Stream Final Message
📊 Score: 8/10
✅ Points Positifs
-
Architecture propre — Nouvelle méthode
stream_with_message()ajoutée dans le port (agent_runner.py), implémentée dans l'adapter, consommée dans le use case. Hexagonal architecture respectée. -
Code reuse —
_yield_chunks()bien extrait pour éviter la duplication entrestream()etstream_with_message(). -
Backwards compatible — L'ancien
stream()reste disponible, c'est additif. -
Tests solides — 6 tests dédiés pour
stream_with_message()+ 3 tests de routes. Coverage des edge cases (empty chunks, tool calls, structured response, HITL interrupt). -
Guard ajouté —
_build_response()vérifie maintenantmessagesnon vide. -
Documentation MAJ — README clair sur le nouveau protocole SSE/WebSocket.
⚠️ Problèmes Identifiés
1. 🔴 Breaking Change: Terminus SSE modifié
Avant:
yield {"event": "done", "data": ""}Après:
yield {"data": "[DONE]"}Le README montre encore event: done mais le code envoie data: [DONE].
Si des clients attendaient event: done, ça casse.
Recommandation: Soit:
- Revenir à
yield {"event": "done", "data": ""} - Ou clarifier dans le README que le terminus est
data: [DONE]et versionner l'API
2. 🟡 Incohérence README vs Code — event: message
Le README montre:
event: message
data: {"role":"ai",...}
Mais dans chat.py:
yield {"data": event.model_dump_json()}Il n'y a pas de event: message, juste data: {...}. Si c'est intentionnel, il faut corriger le README.
3. 🟡 Pas de test pour event: line
Les tests vérifient les data: lines mais pas la présence de event: message. Ajouter un test qui vérifie:
event_lines = [l for l in body.split("\n") if l.startswith("event:")]4. 🟢 Minor: Stats dict mutable
_yield_chunks() mute un dict passé en paramètre. Fonctionnel mais pourrait être un NamedTuple ou dataclass StreamStats(chunk_count, elapsed) pour plus de clarté.
📝 Suggestions d'Amélioration
-
Aligner README et implémentation — Soit le code correspond au README (
event: message), soit on met à jour le README. -
Ajouter un test qui vérifie explicitement l'absence ou présence de
event:lines selon le comportement attendu. -
Considérer un changelog/notes de version pour signaler le changement de terminus SSE.
🎯 Verdict
Bonne PR, architecture propre, tests bien faits. Le principal point d'attention est l'incohérence entre README et code pour le protocole SSE. Une fois alignés, c'est bon à merger.
Action requise: Clarifier le protocole SSE (event vs data) et décider si le changement de terminus est breaking ou documenté.
Jira
BRIC-14
Problem
Cloudflare has a ~100s timeout on idle HTTP connections. The synchronous endpoint
POST /api/v1/chat/{thread_id}blocks until the agent completes, which can exceed 100s for complex agents. This causes 504 Gateway Timeout errors.Solution
Modify the SSE stream endpoint
POST /api/v1/chat/{thread_id}/streamto emit the completeMessageJSON as a finalevent: messagebeforeevent: done. This leverages the existing SSE keep-alive (ping every 15s) to prevent Cloudflare timeout while giving clients the full structured response.SSE Protocol (new)
Changes
src/domain/ports/agent_runner.py: Addstream_with_message()abstract methodsrc/infrastructure/deepagent/adapter.py: Implementstream_with_message()+ refactor shared streaming logic into_yield_chunks()src/application/use_cases/stream_message.py:execute()now yieldsstr | Messageviastream_with_message()src/application/routes/chat.py: SSE route emitsevent: messagewithMessage.model_dump_json()forMessageyieldssrc/application/routes/websocket.py: WebSocket route sendsMessageas JSON withtype: "message"+model_dump(mode="json")_build_responseagainst empty messages listTests
stream_with_message()intest_deep_agent_runner_stream_with_message.pytest_routes.py(event: message, format match, ordering)QA Results
event: message+event: donein correct order{"type": "message", ...}JSON before[END]Not merging yet
Per request, this PR is created but not merged pending further review.