Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 36 additions & 24 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -444,24 +444,38 @@ curl -N -X POST http://localhost:8000/api/v1/chat/a1b2c3d4-e5f6-7890-abcd-ef1234
Response (Server-Sent Events):

```
data: Lines
data: of
data: code
data: align
data: ...
event: message
data: {"role":"ai","content":"Lines of code align...","timestamp":"2025-04-24T10:30:05.000000Z","tool_calls":null,"status":"completed","structured_response":null}
data: {"type":"thinking","data":"Hmm, a haiku needs 5-7-5 syllables..."}

event: done
data:
data: {"type":"content","data":"Lines"}

data: {"type":"content","data":" of"}

data: {"type":"content","data":" code"}

data: {"type":"content","data":" align"}

data: {"type":"content","data":"..."}

data: {"type":"message","data":"{\"role\":\"ai\",\"content\":\"Lines of code align...\",\"timestamp\":\"2025-04-24T10:30:05.000000Z\",\"tool_calls\":null,\"status\":\"completed\",\"structured_response\":null,\"thinking\":\"Hmm, a haiku needs 5-7-5 syllables...\"}"}

data: [DONE]
```

The stream emits:
1. **`data: <chunk>`** — text chunks as they are generated (keepalive for proxies like Cloudflare)
2. **`event: message`** — the complete `Message` JSON with all fields (role, content, timestamp, tool_calls, status, structured_response), identical in format to the synchronous `POST /chat/{thread_id}` response
3. **`event: done`** — signals the stream is complete
The stream emits **typed `StreamEvent` JSON objects** over SSE:

This design prevents Cloudflare timeout issues (~100s on idle connections) because chunks and SSE pings (every 15s) keep the connection active. Clients that need the full structured response can read the `event: message` data.
| `type` | Description | Persisted? |
|---|---|---|
| `thinking` | Reasoning / chain-of-thought tokens from extended-thinking models (e.g., Claude reasoning). | Yes — saved in `Message.thinking` |
| `content` | Response text / markdown tokens as they are generated. | Yes — aggregated into `Message.content` |
| `message` | The final complete `Message` JSON with all fields (`role`, `content`, `timestamp`, `tool_calls`, `status`, `structured_response`, `thinking`). Identical in format to the synchronous `POST /chat/{thread_id}` response. | Yes — persisted as the AI turn in the thread |

The stream ends with `data: [DONE]`.

This design prevents Cloudflare timeout issues (~100s on idle connections) because chunks and SSE pings (every 15s) keep the connection active. Clients can switch rendering based on `type`:

- Render `thinking` events in a collapsible reasoning panel.
- Append `content` events directly to the chat bubble.
- Wait for the `message` event to finalize metadata (status, structure, tool calls).

### 7. List All Threads

Expand Down Expand Up @@ -711,20 +725,18 @@ ws.onopen = () => ws.send(JSON.stringify({ message: "Hello" }));
ws.onmessage = (event) => {
if (event.data === "[END]") {
console.log("Response complete");
} else {
try {
const data = JSON.parse(event.data);
if (data.type === "message") {
console.log("Final message:", data);
}
} catch {
process.stdout.write(event.data);
}
return;
}
const data = JSON.parse(event.data);
switch (data.type) {
case "thinking": console.log("[Thinking]", data.data); break;
case "content": process.stdout.write(data.data); break;
case "message": console.log("Final message:", data.data); break;
}
};
```

The WebSocket stream emits text chunks, then a JSON object with `type: "message"` containing the full `Message` fields, followed by `[END]`.
The WebSocket stream emits typed `StreamEvent` JSON objects: `thinking` (reasoning tokens), `content` (response text), `message` (final full `Message` JSON), then `[END]`.

---

Expand Down
26 changes: 26 additions & 0 deletions src/alembic/versions/004_add_thinking_column.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
"""Add thinking column to messages.

Revision ID: 004
Revises: 003
Create Date: 2026-04-29
"""

from collections.abc import Sequence

from sqlalchemy import Column as saColumn
from sqlalchemy import Text

from alembic import op

revision: str = "004"
down_revision: str | None = "003"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None


def upgrade() -> None:
op.add_column("messages", saColumn("thinking", Text, nullable=True))


def downgrade() -> None:
op.drop_column("messages", "thinking")
2 changes: 1 addition & 1 deletion src/application/requests/chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from pydantic import BaseModel, Field, model_validator

logger = logging.getLogger("composable-agents")
logger = logging.getLogger(__name__)


class ChatRequest(BaseModel):
Expand Down
2 changes: 1 addition & 1 deletion src/application/routes/agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from src.domain.entities.agent_config import AgentConfig
from src.domain.entities.agent_config_metadata import AgentConfigMetadata

logger = logging.getLogger("composable-agents")
logger = logging.getLogger(__name__)

router = APIRouter(prefix="/api/v1/agents", tags=["agents"])

Expand Down
17 changes: 10 additions & 7 deletions src/application/routes/chat.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import logging
from typing import Annotated

Expand All @@ -14,8 +15,9 @@
get_stream_message_use_case,
)
from src.domain.entities.message import Message
from src.domain.entities.stream_event import StreamEvent, StreamEventType

logger = logging.getLogger("composable-agents")
logger = logging.getLogger(__name__)

router = APIRouter(prefix="/api/v1/chat", tags=["chat"])

Expand Down Expand Up @@ -46,15 +48,16 @@ async def event_generator():
chunk_count = 0
try:
async for event in use_case.execute(thread_id, body.message):
if isinstance(event, str):
if event.type in (StreamEventType.THINKING, StreamEventType.CONTENT):
chunk_count += 1
yield {"data": event}
elif isinstance(event, Message):
yield {"data": event.model_dump_json()}
yield {"data": event.model_dump_json()}
yield {"data": "[DONE]"}
logger.info("[thread=%s] Stream complete, %d chunks", thread_id, chunk_count)
except Exception:
except asyncio.CancelledError:
raise
except Exception as exc:
logger.exception("[thread=%s] Stream error after %d chunks", thread_id, chunk_count)
yield {"event": "error", "data": "stream_error"}
error_event = StreamEvent(type=StreamEventType.ERROR, data=str(exc))
yield {"data": error_event.model_dump_json()}

return EventSourceResponse(event_generator(), sep="\r\n", ping=15)
2 changes: 1 addition & 1 deletion src/application/routes/prompt.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from src.dependencies import get_prompt_manager
from src.domain.ports.prompt_manager import PromptManager

logger = logging.getLogger("composable-agents")
logger = logging.getLogger(__name__)

router = APIRouter(prefix="/prompts", tags=["prompts"])

Expand Down
2 changes: 1 addition & 1 deletion src/application/routes/threads.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
)
from src.domain.entities.thread import Thread

logger = logging.getLogger("composable-agents")
logger = logging.getLogger(__name__)

router = APIRouter(prefix="/api/v1/threads", tags=["threads"])

Expand Down
15 changes: 7 additions & 8 deletions src/application/routes/websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@

from src.application.use_cases.stream_message import StreamMessageUseCase
from src.dependencies import get_stream_message_use_case
from src.domain.entities.message import Message
from src.domain.entities.stream_event import StreamEvent, StreamEventType

logger = logging.getLogger("composable-agents")
logger = logging.getLogger(__name__)

router = APIRouter(tags=["websocket"])

Expand All @@ -34,16 +34,15 @@ async def websocket_chat(
chunk_count = 0
try:
async for event in use_case.execute(thread_id, message):
if isinstance(event, str):
if event.type in (StreamEventType.THINKING, StreamEventType.CONTENT):
chunk_count += 1
await websocket.send_text(event)
elif isinstance(event, Message):
await websocket.send_text(json.dumps({"type": "message", **event.model_dump(mode="json")}))
await websocket.send_text(event.model_dump_json())
await websocket.send_text("[END]")
logger.info("[thread=%s] WS stream complete, %d chunks", thread_id, chunk_count)
except Exception:
except Exception as exc:
logger.exception("[thread=%s] WS stream error after %d chunks", thread_id, chunk_count)
await websocket.send_text(json.dumps({"error": "Agent execution error"}))
error_event = StreamEvent(type=StreamEventType.ERROR, data=str(exc))
await websocket.send_text(error_event.model_dump_json())
except WebSocketDisconnect:
logger.info("[thread=%s] WebSocket disconnected", thread_id)
except Exception:
Expand Down
2 changes: 1 addition & 1 deletion src/application/use_cases/create_agent_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from src.domain.ports.agent_config_repository import AgentConfigRepository
from src.domain.ports.agent_config_store import AgentConfigStore

logger = logging.getLogger("composable-agents")
logger = logging.getLogger(__name__)


class CreateAgentConfigUseCase:
Expand Down
2 changes: 1 addition & 1 deletion src/application/use_cases/create_prompt.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from src.domain.entities.prompt import PromptVersion
from src.domain.ports.prompt_manager import PromptManager

logger = logging.getLogger("composable-agents")
logger = logging.getLogger(__name__)


class CreatePromptUseCase:
Expand Down
2 changes: 1 addition & 1 deletion src/application/use_cases/delete_agent_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from src.domain.ports.agent_config_store import AgentConfigStore
from src.domain.ports.agent_registry import AgentRegistry

logger = logging.getLogger("composable-agents")
logger = logging.getLogger(__name__)


class DeleteAgentConfigUseCase:
Expand Down
2 changes: 1 addition & 1 deletion src/application/use_cases/get_agent_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from src.domain.ports.agent_config_loader import AgentConfigLoader
from src.domain.ports.agent_config_store import AgentConfigStore

logger = logging.getLogger("composable-agents")
logger = logging.getLogger(__name__)


class GetAgentConfigUseCase:
Expand Down
2 changes: 1 addition & 1 deletion src/application/use_cases/get_prompt.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from src.domain.entities.prompt import Prompt
from src.domain.ports.prompt_manager import PromptManager

logger = logging.getLogger("composable-agents")
logger = logging.getLogger(__name__)


class GetPromptUseCase:
Expand Down
2 changes: 1 addition & 1 deletion src/application/use_cases/list_agent_configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from src.domain.entities.agent_config_metadata import AgentConfigMetadata
from src.domain.ports.agent_config_repository import AgentConfigRepository

logger = logging.getLogger("composable-agents")
logger = logging.getLogger(__name__)


class ListAgentConfigsUseCase:
Expand Down
2 changes: 1 addition & 1 deletion src/application/use_cases/send_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from src.domain.ports.agent_registry import AgentRegistry
from src.domain.ports.thread_repository import ThreadRepository

logger = logging.getLogger("composable-agents")
logger = logging.getLogger(__name__)


class SendMessageUseCase:
Expand Down
37 changes: 22 additions & 15 deletions src/application/use_cases/stream_message.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import logging
import time
import json
from collections.abc import AsyncGenerator

from src.domain.entities.message import Message, MessageRole
from src.domain.entities.stream_event import StreamEvent, StreamEventType
from src.domain.ports.agent_registry import AgentRegistry
from src.domain.ports.thread_repository import ThreadRepository

logger = logging.getLogger("composable-agents")
logger = logging.getLogger(__name__)


class StreamMessageUseCase:
Expand All @@ -16,7 +18,7 @@ def __init__(self, registry: AgentRegistry, threads: ThreadRepository):
self._registry = registry
self._threads = threads

async def execute(self, thread_id: str, message: str) -> AsyncGenerator[str | Message, None]:
async def execute(self, thread_id: str, message: str) -> AsyncGenerator[StreamEvent, None]:
thread = await self._threads.get(thread_id)
human_msg = Message(role=MessageRole.HUMAN, content=message)
await self._threads.add_message(thread_id, human_msg)
Expand All @@ -27,11 +29,17 @@ async def execute(self, thread_id: str, message: str) -> AsyncGenerator[str | Me
final_message = None
try:
async for event in runner.stream_with_message(thread_id, message):
if isinstance(event, str):
if event.type in (StreamEventType.THINKING, StreamEventType.CONTENT):
chunk_count += 1
yield event
elif isinstance(event, Message):
final_message = event
elif event.type == StreamEventType.MESSAGE:
final_message = Message.model_validate_json(event.data)
if final_message and final_message.structured_response is not None:
yield StreamEvent(
type=StreamEventType.STRUCTURED,
data=json.dumps(final_message.structured_response)
)
yield event
except Exception:
logger.exception(
"[thread=%s][agent=%s] Stream error after %d chunks", thread_id, thread.agent_name, chunk_count
Expand All @@ -41,16 +49,15 @@ async def execute(self, thread_id: str, message: str) -> AsyncGenerator[str | Me
if final_message is not None:
try:
await self._threads.add_message(thread_id, final_message)
except Exception:
logger.info(
"[thread=%s][agent=%s] Stream complete, %d chunks, elapsed=%.2fs, message=persisted",
thread_id,
thread.agent_name,
chunk_count,
elapsed,
)
except Exception as exc:
logger.exception(
"[thread=%s][agent=%s] Failed to persist AI message after stream", thread_id, thread.agent_name
)
yield final_message
logger.info(
"[thread=%s][agent=%s] Stream complete, %d chunks, elapsed=%.2fs, message=%s",
thread_id,
thread.agent_name,
chunk_count,
elapsed,
"yielded" if final_message else "none",
)
raise RuntimeError(f"Failed to persist AI message after stream: {exc}") from exc
2 changes: 1 addition & 1 deletion src/application/use_cases/thread_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from src.domain.ports.agent_registry import AgentRegistry
from src.domain.ports.thread_repository import ThreadRepository

logger = logging.getLogger("composable-agents")
logger = logging.getLogger(__name__)


class CreateThreadUseCase:
Expand Down
2 changes: 1 addition & 1 deletion src/application/use_cases/update_agent_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from src.domain.ports.agent_config_store import AgentConfigStore
from src.domain.ports.agent_registry import AgentRegistry

logger = logging.getLogger("composable-agents")
logger = logging.getLogger(__name__)


class UpdateAgentConfigUseCase:
Expand Down
2 changes: 1 addition & 1 deletion src/application/use_cases/update_prompt.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from src.domain.ports.prompt_manager import PromptManager

logger = logging.getLogger("composable-agents")
logger = logging.getLogger(__name__)


class UpdatePromptUseCase:
Expand Down
2 changes: 1 addition & 1 deletion src/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from src.infrastructure.prompt_management.phoenix_prompt_adapter import PhoenixPromptManagerProvider
from src.infrastructure.yaml_config.adapter import YamlAgentConfigLoader

logger = logging.getLogger("composable-agents")
logger = logging.getLogger(__name__)

# ============= CONFIG =============

Expand Down
2 changes: 1 addition & 1 deletion src/domain/entities/agent_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from src.domain.entities.mcp_server_config import McpServerConfig

logger = logging.getLogger("composable-agents")
logger = logging.getLogger(__name__)


class MiddlewareType(StrEnum):
Expand Down
Loading
Loading