Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -449,8 +449,20 @@ data: of
data: code
data: align
data: ...
event: message
data: {"role":"ai","content":"Lines of code align...","timestamp":"2025-04-24T10:30:05.000000Z","tool_calls":null,"status":"completed","structured_response":null}

event: done
data:
```

The stream emits:
1. **`data: <chunk>`** — text chunks as they are generated (keepalive for proxies like Cloudflare)
2. **`event: message`** — the complete `Message` JSON with all fields (role, content, timestamp, tool_calls, status, structured_response), identical in format to the synchronous `POST /chat/{thread_id}` response
3. **`event: done`** — signals the stream is complete

This design prevents Cloudflare timeout issues (~100s on idle connections) because chunks and SSE pings (every 15s) keep the connection active. Clients that need the full structured response can read the `event: message` data.

### 7. List All Threads

```bash
Expand Down Expand Up @@ -700,11 +712,20 @@ ws.onmessage = (event) => {
if (event.data === "[END]") {
console.log("Response complete");
} else {
process.stdout.write(event.data);
try {
const data = JSON.parse(event.data);
if (data.type === "message") {
console.log("Final message:", data);
}
} catch {
process.stdout.write(event.data);
}
}
};
```

The WebSocket stream emits text chunks, then a JSON object with `type: "message"` containing the full `Message` fields, followed by `[END]`.

---

## Prompt Management Setup
Expand Down
11 changes: 7 additions & 4 deletions src/application/routes/chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,13 @@ async def stream_message(
async def event_generator():
chunk_count = 0
try:
async for chunk in use_case.execute(thread_id, body.message):
chunk_count += 1
yield {"data": chunk}
yield {"event": "done", "data": ""}
async for event in use_case.execute(thread_id, body.message):
if isinstance(event, str):
chunk_count += 1
yield {"data": event}
elif isinstance(event, Message):
yield {"data": event.model_dump_json()}
yield {"data": "[DONE]"}
logger.info("[thread=%s] Stream complete, %d chunks", thread_id, chunk_count)
except Exception:
logger.exception("[thread=%s] Stream error after %d chunks", thread_id, chunk_count)
Expand Down
10 changes: 7 additions & 3 deletions src/application/routes/websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from src.application.use_cases.stream_message import StreamMessageUseCase
from src.dependencies import get_stream_message_use_case
from src.domain.entities.message import Message

logger = logging.getLogger("composable-agents")

Expand Down Expand Up @@ -32,9 +33,12 @@ async def websocket_chat(
logger.info("[thread=%s] WS message received: %s", thread_id, message[:80])
chunk_count = 0
try:
async for chunk in use_case.execute(thread_id, message):
chunk_count += 1
await websocket.send_text(chunk)
async for event in use_case.execute(thread_id, message):
if isinstance(event, str):
chunk_count += 1
await websocket.send_text(event)
elif isinstance(event, Message):
await websocket.send_text(json.dumps({"type": "message", **event.model_dump(mode="json")}))
await websocket.send_text("[END]")
logger.info("[thread=%s] WS stream complete, %d chunks", thread_id, chunk_count)
except Exception:
Expand Down
35 changes: 19 additions & 16 deletions src/application/use_cases/stream_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,44 +10,47 @@


class StreamMessageUseCase:
"""Envoie un message a l'agent et streame la reponse."""
"""Envoie un message a l'agent et streame la reponse avec le Message final."""

def __init__(self, registry: AgentRegistry, threads: ThreadRepository):
self._registry = registry
self._threads = threads

async def execute(self, thread_id: str, message: str) -> AsyncGenerator[str, None]:
async def execute(self, thread_id: str, message: str) -> AsyncGenerator[str | Message, None]:
thread = await self._threads.get(thread_id)
human_msg = Message(role=MessageRole.HUMAN, content=message)
await self._threads.add_message(thread_id, human_msg)
runner = await self._registry.get_runner(thread.agent_name)
start = time.monotonic()
logger.info("[thread=%s][agent=%s] Stream started", thread_id, thread.agent_name)
full_response = []
chunk_count = 0
final_message = None
try:
async for chunk in runner.stream(thread_id, message):
chunk_count += 1
full_response.append(chunk)
yield chunk
async for event in runner.stream_with_message(thread_id, message):
if isinstance(event, str):
chunk_count += 1
yield event
elif isinstance(event, Message):
final_message = event
except Exception:
logger.exception(
"[thread=%s][agent=%s] Stream error after %d chunks", thread_id, thread.agent_name, chunk_count
)
raise
elapsed = time.monotonic() - start
ai_msg = Message(role=MessageRole.AI, content="".join(full_response))
try:
await self._threads.add_message(thread_id, ai_msg)
except Exception:
logger.exception(
"[thread=%s][agent=%s] Failed to persist AI message after stream", thread_id, thread.agent_name
)
if final_message is not None:
try:
await self._threads.add_message(thread_id, final_message)
except Exception:
logger.exception(
"[thread=%s][agent=%s] Failed to persist AI message after stream", thread_id, thread.agent_name
)
yield final_message
logger.info(
"[thread=%s][agent=%s] Stream complete, %d chunks, %d chars, elapsed=%.2fs",
"[thread=%s][agent=%s] Stream complete, %d chunks, elapsed=%.2fs, message=%s",
thread_id,
thread.agent_name,
chunk_count,
len(ai_msg.content),
elapsed,
"yielded" if final_message else "none",
)
5 changes: 5 additions & 0 deletions src/domain/ports/agent_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ async def stream(self, thread_id: str, message: str) -> AsyncIterator[str]:
"""Envoie un message et streame la reponse par chunks."""
...

@abstractmethod
async def stream_with_message(self, thread_id: str, message: str) -> AsyncIterator[str | Message]:
"""Streame les chunks puis yield le Message final complet."""
...

@abstractmethod
async def approve_hitl(self, thread_id: str, tool_call_id: str) -> Message:
"""Approuve une action HITL en attente."""
Expand Down
82 changes: 63 additions & 19 deletions src/infrastructure/deepagent/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ def _build_config(self, thread_id: str) -> dict:

def _build_response(self, result: dict, config: dict) -> Message:
"""Build a Message from graph result, detecting interrupts and collecting tool_calls."""
messages = result["messages"]
messages = result.get("messages", [])
if not messages:
raise AgentError("Graph completed but no messages were found in the final state.")
last_message = messages[-1]

all_tool_calls = getattr(last_message, "tool_calls", None) or []
Expand Down Expand Up @@ -107,32 +109,74 @@ async def invoke(self, thread_id: str, message: str) -> Message:
logger.exception("[thread=%s] Agent execution error", thread_id)
raise AgentError(f"Agent execution error: {e}") from e

async def _yield_chunks(
self,
thread_id: str,
message: str,
config: dict,
stats: dict,
) -> AsyncIterator[str]:
"""Stream graph chunks and populate *stats* with timing."""
start = time.monotonic()
first_chunk = True
chunk_count = 0
async for chunk, _metadata in self._graph.astream(
{"messages": [{"role": "human", "content": message}]},
config=config,
stream_mode="messages",
):
if hasattr(chunk, "content") and chunk.content and chunk.type == "AIMessageChunk":
if first_chunk:
logger.info(
"[thread=%s] First chunk received, elapsed=%.2fs",
thread_id,
time.monotonic() - start,
)
first_chunk = False
chunk_count += 1
yield chunk.content
stats["chunk_count"] = chunk_count
stats["elapsed"] = time.monotonic() - start

async def stream(self, thread_id: str, message: str) -> AsyncIterator[str]:
config = self._build_config(thread_id)
logger.info("[thread=%s] Streaming agent response", thread_id)
try:
start = time.monotonic()
first_chunk = True
chunk_count = 0
async for chunk, _metadata in self._graph.astream(
{"messages": [{"role": "human", "content": message}]},
config=config,
stream_mode="messages",
):
if hasattr(chunk, "content") and chunk.content and chunk.type == "AIMessageChunk":
if first_chunk:
logger.info(
"[thread=%s] First chunk received, elapsed=%.2fs", thread_id, time.monotonic() - start
)
first_chunk = False
chunk_count += 1
yield chunk.content
stats: dict = {}
async for chunk in self._yield_chunks(thread_id, message, config, stats):
yield chunk
logger.info(
"[thread=%s] Stream complete, %d chunks, elapsed=%.2fs",
thread_id,
chunk_count,
time.monotonic() - start,
stats["chunk_count"],
stats["elapsed"],
)
except Exception as e:
logger.exception("[thread=%s] Streaming error", thread_id)
raise AgentError(f"Streaming error: {e}") from e

async def stream_with_message(self, thread_id: str, message: str) -> AsyncIterator[str | Message]:
config = self._build_config(thread_id)
logger.info("[thread=%s] Streaming agent response with final message", thread_id)
try:
stats: dict = {}
async for chunk in self._yield_chunks(thread_id, message, config, stats):
yield chunk
state = self._graph.get_state(config)
values = state.values if state and hasattr(state, "values") else {}
result = {
"messages": values.get("messages", []),
"structured_response": values.get("structured_response"),
}
response = self._build_response(result, config)
logger.info(
"[thread=%s] Stream with message complete, %d chunks, elapsed=%.2fs, status=%s",
thread_id,
stats["chunk_count"],
stats["elapsed"],
response.status,
)
yield response
except Exception as e:
logger.exception("[thread=%s] Streaming error", thread_id)
raise AgentError(f"Streaming error: {e}") from e
Expand Down
Loading
Loading