Skip to content

BRIC-14: SSE stream emits final Message JSON event#16

Merged
Kaiohz merged 3 commits into
mainfrom
BRIC-14/sse-json-final-message
Apr 24, 2026
Merged

BRIC-14: SSE stream emits final Message JSON event#16
Kaiohz merged 3 commits into
mainfrom
BRIC-14/sse-json-final-message

Conversation

@Kaiohz
Copy link
Copy Markdown
Contributor

@Kaiohz Kaiohz commented Apr 24, 2026

Jira

BRIC-14

Problem

Cloudflare has a ~100s timeout on idle HTTP connections. The synchronous endpoint POST /api/v1/chat/{thread_id} blocks until the agent completes, which can exceed 100s for complex agents. This causes 504 Gateway Timeout errors.

Solution

Modify the SSE stream endpoint POST /api/v1/chat/{thread_id}/stream to emit the complete Message JSON as a final event: message before event: done. This leverages the existing SSE keep-alive (ping every 15s) to prevent Cloudflare timeout while giving clients the full structured response.

SSE Protocol (new)

data: chunk1
data: chunk2
...
event: message
data: {"role":"ai","content":"chunk1chunk2...","status":"completed","tool_calls":null,"structured_response":{...},"timestamp":"..."}

event: done
data:

Changes

  • src/domain/ports/agent_runner.py: Add stream_with_message() abstract method
  • src/infrastructure/deepagent/adapter.py: Implement stream_with_message() + refactor shared streaming logic into _yield_chunks()
  • src/application/use_cases/stream_message.py: execute() now yields str | Message via stream_with_message()
  • src/application/routes/chat.py: SSE route emits event: message with Message.model_dump_json() for Message yields
  • src/application/routes/websocket.py: WebSocket route sends Message as JSON with type: "message" + model_dump(mode="json")
  • Guard _build_response against empty messages list
  • README.md: Updated SSE and WebSocket documentation

Tests

  • 6 new unit tests for stream_with_message() in test_deep_agent_runner_stream_with_message.py
  • 3 new route tests in test_routes.py (event: message, format match, ordering)
  • All 240 tests pass

QA Results

  • Manual testing confirmed: SSE stream emits chunks + event: message + event: done in correct order
  • Sync endpoint remains unchanged
  • Error handling (404, 422) works correctly
  • WebSocket sends final {"type": "message", ...} JSON before [END]

Not merging yet

Per request, this PR is created but not merged pending further review.

Kaiohz added 2 commits April 24, 2026 11:34
The stream endpoint POST /api/v1/chat/{thread_id}/stream now emits a final
event: message with the complete Message JSON before event: done, allowing
clients behind Cloudflare to use streaming and still get the full structured
response (identical format to the sync endpoint).

Changes:
- Add stream_with_message() to AgentRunner port and DeepAgentRunner adapter
- Refactor shared streaming logic into _yield_chunks() private method
- Modify StreamMessageUseCase to yield str | Message via stream_with_message()
- Update SSE route to emit event: message with Message model_dump_json()
- Update WebSocket route to send final Message as JSON with type field
- Guard _build_response against empty messages (AgentError)
- Fix WebSocket Message serialization (mode='json' for datetime)
- Update README with new SSE protocol and WebSocket docs
- Add 9 new tests for stream_with_message and SSE message event
…nt:done otherwise

The stream endpoint now conditionally emits the final event:
- If the Message has a structured_response: emits event:message as last event (no event:done)
- If no structured_response: emits event:done as last event (original behavior)

This ensures backward compatibility while giving clients that need the full
JSON a clear signal via event:message when structured output is available.
@Kaiohz
Copy link
Copy Markdown
Contributor Author

Kaiohz commented Apr 24, 2026

📋 Code Review - BRIC-14: SSE stream emits final Message JSON event

Score: 8/10 ⭐⭐⭐⭐⭐⭐⭐⭐☆☆


✅ Points Positifs

1. Refactoring DRY exemplaire

  • Extraction de _yield_chunks() pour éviter la duplication de code entre stream() et stream_with_message()
  • Pattern propre avec stats dict pour partager les métriques

2. Gestion d'erreurs défensive

  • result.get("messages", []) au lieu de result["messages"] évite les KeyError
  • Message d'erreur explicite: "Graph completed but no messages were found in the final state."

3. Architecture propre

  • Nouvelle méthode abstraite dans AgentRunner (port)
  • Implémentation dans l'adapter DeepAgent
  • Mise à jour du use case et des routes (SSE + WebSocket)

4. Typing correct

  • AsyncIterator[str | Message] pour la nouvelle méthode
  • Signature claire et cohérente

5. Tests fournis

  • 6 nouveaux tests unitaires pour stream_with_message()
  • 3 nouveaux tests de routes
  • 240 tests passent

6. Documentation mise à jour

  • README mis à jour avec le nouveau protocole SSE
  • Documentation WebSocket aussi mise à jour

⚠️ Points à Améliorer

1. Potentiel race condition

state = self._graph.get_state(config)

Après astream(), le get_state() pourrait ne pas avoir l'état final si le stream a été interrompu. Ajouter un check :

state = self._graph.get_state(config)
if not state or not hasattr(state, "values"):
    logger.warning("[thread=%s] No final state available after stream", thread_id)

2. Nom de méthode
stream_with_message est ambigu. Suggérer:

  • stream_with_final_message()
  • stream_and_get_result()
  • stream_with_response()

3. Documentation manquante
Ajouter une docstring plus détaillée:

async def stream_with_message(self, thread_id: str, message: str) -> AsyncIterator[str | Message]:
    """
    Stream agent response chunks, then yield final Message object.
    
    Yields:
        str: Text chunks during streaming
        Message: Final message object after stream completion
    
    Raises:
        AgentError: If streaming fails or no messages in final state
    """

🔧 Suggestions d'amélioration

  1. Robustesse du get_state:
state = self._graph.get_state(config)
if not state:
    raise AgentError(f"No state available after stream for thread {thread_id}")
values = state.values if hasattr(state, "values") else {}
  1. Considérer un type Union:
from typing import Union
async def stream_with_message(self, thread_id: str, message: str) -> AsyncIterator[Union[str, Message]]:

📝 Conclusion

Bonne PR, code propre et bien structuré. L'approche DRY est excellente. Quelques améliorations mineures suggérées mais rien de bloquant.

Recommandation: Merge après avoir adressé le point sur la robustesse du get_state().


Review par SoluBot 🤖

Stream protocol simplified:
- Chunks: data: <text>
- Message JSON: data: <Message JSON>  (second-to-last data line)
- Terminator: data: [DONE]  (last data line)

No named events. Client just reads data lines: last is [DONE],
second-to-last is the full Message JSON when structured output is present.
Copy link
Copy Markdown
Contributor Author

@Kaiohz Kaiohz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔍 Code Review — BRIC-14: SSE Stream Final Message

📊 Score: 8/10


✅ Points Positifs

  1. Architecture propre — Nouvelle méthode stream_with_message() ajoutée dans le port (agent_runner.py), implémentée dans l'adapter, consommée dans le use case. Hexagonal architecture respectée.

  2. Code reuse_yield_chunks() bien extrait pour éviter la duplication entre stream() et stream_with_message().

  3. Backwards compatible — L'ancien stream() reste disponible, c'est additif.

  4. Tests solides — 6 tests dédiés pour stream_with_message() + 3 tests de routes. Coverage des edge cases (empty chunks, tool calls, structured response, HITL interrupt).

  5. Guard ajouté_build_response() vérifie maintenant messages non vide.

  6. Documentation MAJ — README clair sur le nouveau protocole SSE/WebSocket.


⚠️ Problèmes Identifiés

1. 🔴 Breaking Change: Terminus SSE modifié

Avant:

yield {"event": "done", "data": ""}

Après:

yield {"data": "[DONE]"}

Le README montre encore event: done mais le code envoie data: [DONE].

Si des clients attendaient event: done, ça casse.

Recommandation: Soit:

  • Revenir à yield {"event": "done", "data": ""}
  • Ou clarifier dans le README que le terminus est data: [DONE] et versionner l'API

2. 🟡 Incohérence README vs Code — event: message

Le README montre:

event: message
data: {"role":"ai",...}

Mais dans chat.py:

yield {"data": event.model_dump_json()}

Il n'y a pas de event: message, juste data: {...}. Si c'est intentionnel, il faut corriger le README.

3. 🟡 Pas de test pour event: line

Les tests vérifient les data: lines mais pas la présence de event: message. Ajouter un test qui vérifie:

event_lines = [l for l in body.split("\n") if l.startswith("event:")]

4. 🟢 Minor: Stats dict mutable

_yield_chunks() mute un dict passé en paramètre. Fonctionnel mais pourrait être un NamedTuple ou dataclass StreamStats(chunk_count, elapsed) pour plus de clarté.


📝 Suggestions d'Amélioration

  1. Aligner README et implémentation — Soit le code correspond au README (event: message), soit on met à jour le README.

  2. Ajouter un test qui vérifie explicitement l'absence ou présence de event: lines selon le comportement attendu.

  3. Considérer un changelog/notes de version pour signaler le changement de terminus SSE.


🎯 Verdict

Bonne PR, architecture propre, tests bien faits. Le principal point d'attention est l'incohérence entre README et code pour le protocole SSE. Une fois alignés, c'est bon à merger.

Action requise: Clarifier le protocole SSE (event vs data) et décider si le changement de terminus est breaking ou documenté.

@Kaiohz Kaiohz marked this pull request as ready for review April 24, 2026 10:55
@Kaiohz Kaiohz merged commit eac1d8a into main Apr 24, 2026
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant