BRIC-18: Multiplex content stream with typed events (thinking/content/message)#17
Conversation
🔍 Review BRIC-18: Multiplexed Content StreamScore: 8/10✅ Points positifs
|
3c7b3dc to
30902e4
Compare
- Add StreamEvent entity (THINKING, CONTENT, MESSAGE) - Add thinking field to Message entity & DB migration - Update AgentRunner ABC to yield StreamEvent - Classify AIMessageChunk by additional_kwargs.reasoning_content and type=thinking - Update SSE/WS routes to emit JSON StreamEvent - Update postgres_thread adapter to map thinking column - Update all tests for new protocol
30902e4 to
213e228
Compare
Kaiohz
left a comment
There was a problem hiding this comment.
📋 Review de la PR: BRIC-18 - Multiplex content stream with typed events
🎯 Résumé
Cette PR introduit un système d'événements typés pour le streaming, remplaçant le système précédent qui mélangeait str et Message. Le nouveau design est plus propre et mieux typé.
✅ Points Positifs
Architecture & Design
- ✨ Nouvelle entité
StreamEvent- Design élégant avecStreamEventType(THINKING, CONTENT, MESSAGE) - 🧊 Entité immuable (
frozen=True) - Bonne pratique pour les DTOs - 🗃️ Migration DB propre - Ajout de la colonne
thinkingdansmessages - 📉 Réduction de complexité - Le stream handler est simplifié, plus de
isinstancechecks
Code Quality
- 🧪 Tests unitaires à jour - Tous les tests ont été mis à jour pour le nouveau format
- 📝 Code plus lisible - Suppression de la logique complexe de parsing dans les routes
- 🎯 Séparation claire des types - THINKING, CONTENT, MESSAGE bien distincts
Implementation
- La méthode
_classify_chunk()dansDeepAgentRunnerest bien pensée - Support des "reasoning tokens" pour les modèles extended-thinking (Claude, etc.)
- Gestion des
additional_kwargspour les différents formats de chunk
⚠️ Points à Améliorer
Tests
-
🔶 Manque de tests pour les events THINKING - Les tests actuels ne couvrent que CONTENT et MESSAGE. Ajouter des tests pour:
# Test case à ajouter async def test_stream_with_message_yields_thinking_then_content(): chunks = [ (StreamEventType.THINKING, "Let me think..."), (StreamEventType.CONTENT, "Hello "), (StreamEventType.CONTENT, "world!") ] # ... assert que thinking est bien yieldé
-
🔶 Edge cases non testés - Que se passe-t-il si un chunk est vide? Si
additional_kwargsest manquant?
Code Quality
-
🔶 Docstrings supprimées - Les docstrings de
AgentRunneront été retirées. Pour une interface publique, c'est mieux de les garder:# Avant (mieux) async def invoke(self, thread_id: str, message: str) -> Message: """Envoie un message et retourne la reponse complete.""" ... # Après (moins documenté) async def invoke(self, thread_id: str, message: str) -> Message: ...
-
🔶 Validation JSON -
Message.model_validate_json(event.data)pourrait throw si le JSON est malformé. Envisager un try/except:elif event.type == StreamEventType.MESSAGE: try: final_message = Message.model_validate_json(event.data) except ValidationError: logger.exception("[thread=%s] Invalid MESSAGE event data", thread_id) raise AgentError("Invalid message format")
Minor nitpicks
- 💡 La méthode
_is_nonblank_strpourrait être remplacée par une simple condition dans_classify_chunkpour réduire l'indirection - 💡 Le type hint
val: objectest correct maisstr | Noneserait plus idiomatique
📊 Score de Qualité: 8/10
Justification:
- +3 Architecture propre et bien pensée (StreamEvent, enum, frozen)
- +2 Réduction de complexité significative
- +2 Tests unitaires mis à jour
- +1 Migration DB propre et réversible
Déductions:
- -1 Manque de tests pour les events THINKING
- -1 Docstrings retirées de l'interface publique
🚀 Recommandations
- Ajouter des tests THINKING - Au minimum un test case pour valider le flux thinking → content → message
- Restaurer les docstrings - Garder la documentation de l'interface
AgentRunner - Ajouter validation - try/except autour de
model_validate_jsonpour la robustesse
🟢 Verdict: APPROVED (avec suggestions)
La PR est mergable en l'état. Les suggestions ci-dessus sont des améliorations qui peuvent être faites dans des PRs suivantes si on veut itérer rapidement.
Review générée par SoluBot 🤖
Kaiohz
left a comment
There was a problem hiding this comment.
Code Review: BRIC-18 Multiplexed Content Stream
Score: 8.5/10
Cette PR implémente une refonte propre du streaming pour supporter les événements typés (thinking/content/message). Voici mon analyse détaillée.
Points forts
Architecture (excellent)
- Nouvelle entité domaine
StreamEventavec enumStreamEventType- respecte parfaitement l'architecture hexagonale - Classification propre dans
_classify_chunk()avec détection OpenAI (reasoning_content) et Anthropic (type=thinking) - Separation of concerns claire: le domaine ne dépend pas des détails d'implementation
Persistance (très bon)
- Migration
004_add_thinking_columnpropre avec upgrade/downgrade - Accumulation intelligente des thinking tokens dans
thinking_partsavant persistence - Le champ
Message.thinkingest nullable - backward compatible
Tests (excellent)
- Tous les tests adaptés au nouveau format
StreamEvent - Coverage correcte des cas: content seul, thinking+content, tool_calls, structured_response, HITL interrupt
- Mock
_make_streaming_graphbien refactorisé
Documentation (bon)
- README mis à jour avec le nouveau format SSE/WebSocket
- Table explicite des types avec colonne Persisted?
- Exemples de code client mis à jour
Points à améliorer
1. Logs de debug manquants dans _classify_chunk
La méthode _classify_chunk fait des choix importants (thinking vs content) mais ne loggue rien. Ajouter des logs de debug faciliterait le troubleshooting:
@staticmethod
def _classify_chunk(chunk) -> tuple[StreamEventType, str] | None:
if chunk.type != "AIMessageChunk":
return None
additional = getattr(chunk, "additional_kwargs", {})
reasoning = additional.get("reasoning_content")
if DeepAgentRunner._is_nonblank_str(reasoning):
logger.debug("[classify] Detected OpenAI reasoning_content (%d chars)", len(reasoning))
return (StreamEventType.THINKING, reasoning)
# ...2. Helper _is_nonblank_str à partager
Cette méthode utilitaire pourrait être dans un module src/domain/utils/strings.py pour être réutilisée ailleurs.
3. Documentation format thinking
Le README montre data: {"type":"thinking","data":"..."} mais ne précise pas:
- Est-ce que
datacontient le texte brut ou du JSON? - Comment accumuler les thinking events côté client?
Un petit paragraphe example aurait été utile.
4. Variables unused _tool_call_id
Les paramètres approve_hitl(thread_id, _tool_call_id) et reject_hitl utilisent l'underscore pour ignorer le paramètre. C'est un pattern valide, mais un commentaire aurait clarifié.
Suggestions mineures
- Type alias:
StreamIterator = AsyncIterator[StreamEvent]pourrait éviter la répétition - Error handling: Le
raise RuntimeErrordansstream_message.pyest bien, mais pourrait être unAgentErrorcustom pour consistency
Verdict
Ready to merge avec des améliorations optionnelles.
L'architecture est solide, le code est propre, les tests passent. Les points ci-dessus sont des améliorations de qualité, pas des blockers.
✅ Approval recommandée.
Review postée par SoluBot - Assistant SoluDevTech
Kaiohz
left a comment
There was a problem hiding this comment.
Code Review: BRIC-18 Multiplexed Content Stream
Score: 8.5/10
Cette PR implémente une refonte propre du streaming pour supporter les événements typés (thinking/content/message). Voici mon analyse détaillée.
Points forts
Architecture (excellent)
- Nouvelle entité domaine
StreamEventavec enumStreamEventType- respecte parfaitement l'architecture hexagonale - Classification propre dans
_classify_chunk()avec détection OpenAI (reasoning_content) et Anthropic (type=thinking) - Separation of concerns claire: le domaine ne dépend pas des détails d'implémentation
Persistance (très bon)
- Migration
004_add_thinking_columnpropre avec upgrade/downgrade - Accumulation intelligente des thinking tokens dans
thinking_partsavant persistence - Champ
Message.thinkingnullable - backward compatible
Tests (excellent)
- Tous les tests adaptés au nouveau format
StreamEvent - Coverage correcte des cas: content seul, thinking+content, tool_calls, structured_response, HITL interrupt
- Mock
_make_streaming_graphbien refactorisé
Documentation (bon)
- README mis à jour avec le nouveau format SSE/WebSocket
- Table explicite des types avec colonne "Persisted?"
- Exemples de code client mis à jour
Points à améliorer
1. Logs de debug manquants dans _classify_chunk
La méthode _classify_chunk() fait des choix importants (thinking vs content) mais ne loggue rien. Ajouter des logs de debug faciliterait le troubleshooting:
@staticmethod
def _classify_chunk(chunk) -> tuple[StreamEventType, str] | None:
if chunk.type != "AIMessageChunk":
return None
additional = getattr(chunk, "additional_kwargs", {})
reasoning = additional.get("reasoning_content")
if DeepAgentRunner._is_nonblank_str(reasoning):
logger.debug("[classify] Detected OpenAI reasoning_content (%d chars)", len(reasoning))
return (StreamEventType.THINKING, reasoning)
# ...2. Helper _is_nonblank_str à partager
Cette méthode utilitaire pourrait être dans un module src/domain/utils/strings.py pour être réutilisée ailleurs.
3. Documentation format thinking côté client
Le README montre data: {"type":"thinking","data":"..."} mais ne précise pas:
- Est-ce que
datacontient le texte brut ou du JSON? - Comment accumuler les thinking events côté client?
Un petit paragraphe example aurait été utile.
4. Variables unused _tool_call_id
Les paramètres approve_hitl(thread_id, _tool_call_id) et reject_hitl utilisent l'underscore pour ignorer le paramètre. C'est un pattern valide, mais un commentaire aurait clarifié.
Suggestions mineures
- Type alias:
StreamIterator = AsyncIterator[StreamEvent]pourrait éviter la répétition - Error handling: Le
raise RuntimeErrordansstream_message.pyest bien, mais pourrait être unAgentErrorcustom pour consistency
Verdict
Approval recommandée.
L'architecture est solide, le code est propre, les tests passent. Les points ci-dessus sont des améliorations de qualité, pas des blockers.
Ready to merge!
Review postée par SoluBot - Assistant SoluDevTech
Add `STRUCTURED` stream event type and update test to handle structured events when locating the message event in stream output. Simplify root logging configuration in `main.py`.
Kaiohz
left a comment
There was a problem hiding this comment.
Review: BRIC-18 - Multiplexed Content Stream
📊 Score: 8.5/10
✅ Points forts
1. Architecture propre
- La nouvelle entité
StreamEventavecStreamEventTypeenum apporte de la clarté et de la sécurité de type - Séparation claire entre THINKING, CONTENT, MESSAGE, STRUCTURED, ERROR
- Le protocole SSE/WebSocket devient cohérent et facile à consommer côté client
2. Support thinking natif
_classify_chunk()détecte correctement les tokens de raisonnement OpenAI (reasoning_content) et Anthropic (type: thinking)- Accumulation des thinking tokens pendant le streaming
- Persistance en DB via la colonne
Message.thinking
3. Logger standardization
- Migration de
logging.getLogger("composable-agents")verslogging.getLogger(__name__)— bonne pratique Python - Logging simplifié dans
main.py
4. Error handling amélioré
StreamEvent(type=ERROR, data=str(exc))pour des erreurs structurées- Proper
asyncio.CancelledErrorhandling dans les routes
5. Documentation à jour
- README mis à jour avec table clair des types d'événements
- Explication de la persistance par type
6. Tests à jour
- Tests unitaires refactorés pour utiliser
StreamEvent - Coverage des cas thinking + content
🔧 Suggestions d'amélioration
1. Type hints sur _classify_chunk
# Actuel
def _classify_chunk(chunk) -> tuple[StreamEventType, str] | None:
# Suggestion: typer l'argument chunk
from langchain_core.messages import AIMessageChunk
def _classify_chunk(chunk: AIMessageChunk) -> tuple[StreamEventType, str] | None:Cela documenterait mieux le type attendu et permettrait à mypy/pyright de valider.
2. Edge case: thinking vide
Dans stream_with_message(), on accumule les thinking tokens:
thinking = "".join(thinking_parts) if thinking_parts else NoneMais si tous les thinking tokens sont des chaînes vides (whitespace), on pourrait persister une chaîne vide. Suggestion:
thinking = ("".join(thinking_parts)).strip() or None3. Structured event emission
Dans StreamMessageUseCase.execute(), on yield un STRUCTURED event APRÈS le MESSAGE event:
if final_message and final_message.structured_response is not None:
yield StreamEvent(type=StreamEventType.STRUCTURED, data=json.dumps(final_message.structured_response))
yield event # MESSAGE eventL'ordre [MESSAGE, STRUCTURED] pourrait surprendre les clients. Est-ce intentionnel? Alternative:
# D'abord le message, puis le structured
yield event # MESSAGE
if final_message.structured_response:
yield StreamEvent(type=StreamEventType.STRUCTURED, ...)4. Documentation API client
Une petite section dans le README montrant comment un client JS/Python doit parser les événements serait utile:
// Exemple client
for await (const event of stream) {
switch (event.type) {
case 'thinking': showReasoningPanel(event.data); break;
case 'content': appendToChatBubble(event.data); break;
case 'message': finalizeMessage(JSON.parse(event.data)); break;
}
}🧪 Tests
- 241/241 passing ✓
- Ruff linter clean ✓
📝 Conclusion
Excellente PR qui standardise le protocole de streaming et ajoute le support natif des modèles "extended-thinking". Le code est propre, bien testé, et suit les bonnes pratiques.
Le score de 8.5/10 reflète:
- Architecture solide et cohérente
- Support complet thinking (OpenAI + Anthropic)
- Documentation et tests à jour
- Quelques mineurs améliorations possibles sur les edge cases et la documentation client
Recommandation: APPROVE avec les suggestions optionnelles ci-dessus. Les améliorations peuvent être adressées dans une PR suivante si priorité.
Jira
BRIC-18
Changes
StreamEventJSON (type: thinking|content|message) instead of raw strings_classify_chunk()inspectsadditional_kwargs.reasoning_content(OpenAI) andadditional_kwargs.type==\thinking`` (Anthropic) to separate thinking from contentMessage.thinkingfield + DB migration004_add_thinking_columnstores accumulated reasoning textAgentRunnerABC,StreamMessageUseCase, SSE/WS routes, and Postgres mapperTests