diff --git a/.env.example b/.env.example index a7ef178..2fe53a3 100644 --- a/.env.example +++ b/.env.example @@ -18,18 +18,13 @@ POSTGRES_DATABASE=raganything # === MinIO === MINIO_ENDPOINT=localhost:9040 MINIO_ACCESS_KEY=minioadmin -MINIO_SECRET_KEY=minioadmin +MINIO_SECRET_KEY=your-minio-secret-key MINIO_BUCKET=composable-agents MINIO_SECURE=false # === Tracing === -TRACING_PROVIDER=none -TRACING_ENABLED=false -TRACING_PROJECT_NAME=composable-agents -LANGFUSE_HOST=https://cloud.langfuse.com -LANGFUSE_PUBLIC_KEY= -LANGFUSE_SECRET_KEY= -PHOENIX_COLLECTOR_ENDPOINT=http://localhost:6006 -PHOENIX_API_KEY= -LANGCHAIN_API_KEY= -LANGCHAIN_PROJECT=composable-agents +PROVIDER=phoenix # or "langfuse" or "none" +PROJECT_NAME=composable-agents +PHOENIX_COLLECTOR_ENDPOINT=https://phoenix.soludev.tech/ +PHOENIX_API_KEY=your-phoenix-api-key + diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 524e483..77b7440 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -31,7 +31,9 @@ jobs: key: venv-${{ runner.os }}-${{ hashFiles('**/uv.lock') }} - name: Install dependencies - run: uv sync --dev + run: | + uv sync --dev + uv sync --dev --extra phoenix - name: Setup environment file run: cp .env.example .env diff --git a/.gitignore b/.gitignore index b0dea32..1c0dd72 100644 --- a/.gitignore +++ b/.gitignore @@ -14,3 +14,5 @@ trivy-report-fixed.json coverage.xml .coverage .scannerwork/ +opencode.json +.vscode/ \ No newline at end of file diff --git a/README.md b/README.md index 978623c..1b51d0a 100644 --- a/README.md +++ b/README.md @@ -29,11 +29,7 @@ cp .env.example .env Edit `.env` and add your API key and database credentials: ```dotenv -ANTHROPIC_API_KEY=sk-ant-... -# or OPENAI_API_KEY=sk-... -# or -GOOGLE_API_KEY=... # PostgreSQL (required) POSTGRES_HOST=localhost @@ -62,7 +58,7 @@ uv run python -m src validate agents/my-agent.yaml ### Launch the server ```bash -uv run python -m src serve +uv run python -m src.main serve ``` The API starts on `http://localhost:8000`. On startup, the server: @@ -281,6 +277,9 @@ All endpoints are prefixed appropriately. The server runs on `http://localhost:8 | `GET` | `/api/v1/agents` | List all agent configs from `agents/` directory | `200` | | `GET` | `/api/v1/agents/{agent_name}` | Get a specific agent configuration | `200` | | `WS` | `/api/v1/ws/{thread_id}` | WebSocket endpoint for streaming chat | -- | +| `POST` | `/prompts/create` | Create a new prompt | `200` | +| `GET` | `/prompts/get/{identifier}` | Get a specific prompt by identifier, version, or tag | `200` | +| `PUT` | `/prompts/update/{identifier}` | Update an existing prompt (creates new version) | `200` | ### Error Responses @@ -570,6 +569,127 @@ curl -X DELETE http://localhost:8000/api/v1/threads/a1b2c3d4-e5f6-7890-abcd-ef12 Response: `204 No Content` +### 14. Prompt Management + +Prompts are managed via a dedicated registry backed by Phoenix. Enable prompt management by setting `TRACING_PROVIDER=phoenix` and `PHOENIX_PROMPT_ENABLED=true` in your `.env`. + +#### 14.1 Create a Prompt + +```bash +curl -X POST http://localhost:8000/prompts/create \ + -H "Content-Type: application/json" \ + -d '{ + "identifier": "customer-support", + "content": [ + { + "role": "system", + "content": "You are a helpful customer support agent. Be polite and professional." + } + ], + "model_name": "claude-sonnet-4-5-20250929", + "description": "Prompt for general customer support queries", + "tags": ["production"], + "metadata": {"project_name": "composable-agents", "agent_type": "deep_agent"} + }' +``` + +Response (`200`): + +```json +{ + "status": "success", + "prompt": { + "identifier": "customer-support", + "description": "Prompt for general customer support queries", + "current_version": { + "version_id": "v1", + "content": [...], + "model_name": "claude-sonnet-4-5-20250929", + "created_at": "2025-01-15T10:30:00.000000", + "tags": ["support", "production"] + }, + "created_at": "2025-01-15T10:30:00.000000", + "updated_at": "2025-01-15T10:30:00.000000" + } +} +``` + +#### 14.2 List All Prompts + +```bash +curl http://localhost:8000/prompts/customer-support +``` + +Optional query parameters: +- `version_id`: Get a specific version +- `tag`: Get the prompt with a specific tag + +Response (`200`): + +```json +{ + "status": "success", + "prompt": { + "identifier": "customer-support", + "description": "", + "current_version": { + "version_id": "UHJvbXB0VmVyc2lvbjo4Mw==", + "content": [ + { + "role": "system", + "content": "You are a helpful customer support agent. Be polite and professional." + } + ], + "model_name": "claude-sonnet-4-5-20250929", + "tags": [] + }, + "created_at": null, + "updated_at": null + } +} +``` + +#### 14.3 Update a Prompt + +Create a new version of an existing prompt: + +```bash +curl -X PUT http://localhost:8000/prompts/update/customer-support \ + -H "Content-Type: application/json" \ + -d '{ + "content": [ + { + "role": "system", + "content": "You are a knowledgeable customer support agent. Be polite, professional, and thorough in your responses." + } + ], + "model_name": "claude-sonnet-4-5-20250929", + "description": "Updated prompt for customer support (more detailed)", + "tags": ["production"], + "metadata": {"project_name": "composable-agents", "agent_type": "deep_agent"} + }' +``` + +Response (`200`): + +```json +{ + "status": "success", + "prompt": { + "identifier": "customer-support", + "description": "Updated prompt for customer support (more detailed)", + "current_version": { + "version_id": "v2", + "content": [...], + "model_name": "claude-sonnet-4-5-20250929", + "created_at": "2025-01-15T10:31:00.000000", + "tags": ["support", "production"] + } + }, + "message": "Prompt 'customer-support' updated successfully" +} +``` + ### WebSocket Connect to the WebSocket endpoint and send JSON messages: @@ -588,6 +708,49 @@ ws.onmessage = (event) => { --- +## Prompt Management Setup + +To enable prompt management in Phoenix: + +### 1. Install Optional Dependencies + +```bash +uv sync --extra phoenix +``` + +Or add to `pyproject.toml`: +```toml +arize-phoenix-otel = ">=0.1.0" +openinference-instrumentation-langchain = ">=0.1.0" +httpx = ">=0.27.0" +``` + +### 2. Configure Environment Variables + +Add to `.env`: + +```dotenv +PROVIDER=phoenix +PHOENIX_COLLECTOR_ENDPOINT=http://localhost:6006 +PHOENIX_PROMPT_ENABLED=true +PHOENIX_API_KEY=your-api-key-here +``` + +### 3. Architecture + +Prompt management follows the **Clean Architecture** pattern: + +- **Domain Entity** (`src/domain/entities/prompt.py`): `Prompt`, `PromptVersion` +- **Domain Port** (`src/domain/ports/prompt_manager.py`): `PromptManager` interface +- **Use Cases** (`src/application/use_cases/`): `CreatePromptUseCase`, `GetPromptUseCase`, `SearchPromptsUseCase`, `UpdatePromptUseCase` +- **Request DTOs** (`src/application/requests/prompt.py`): Request models for each endpoint +- **Routes** (`src/application/routes/prompts.py`): FastAPI endpoint handlers +- **Infrastructure Adapter** (`src/infrastructure/tracing/phoenix_prompt_manager.py`): Phoenix REST API implementation + +All prompt management operations are async and fully integrated with the FastAPI dependency injection system. + +--- + ## Architecture composable-agents follows a strict **hexagonal architecture** (ports and adapters). The domain layer has zero dependencies on frameworks or infrastructure. @@ -902,9 +1065,7 @@ Configured via `.env` file or environment variables. See `.env.example`. | Variable | Default | Description | |---|---|---| | `AGENTS_DIR` | `./agents` | Directory containing agent YAML configuration files. | -| `ANTHROPIC_API_KEY` | -- | API key for Anthropic models. | | `OPENAI_API_KEY` | -- | API key for OpenAI models. | -| `GOOGLE_API_KEY` | -- | API key for Google models. | | `OPENAI_BASE_URL` | `https://api.openai.com/v1` | Base URL for OpenAI-compatible endpoints. Set to use OpenRouter, LiteLLM, vLLM, etc. | | `HOST` | `0.0.0.0` | Server bind host. | | `PORT` | `8000` | Server bind port. | @@ -936,15 +1097,9 @@ The async connection URL is built automatically as `postgresql+asyncpg://: | Variable | Default | Description | |---|---|---| | `TRACING_PROVIDER` | `none` | Tracing backend: `none`, `langfuse`, or `phoenix`. | -| `TRACING_ENABLED` | `false` | Enable/disable tracing. | | `TRACING_PROJECT_NAME` | `composable-agents` | Project name for the tracing backend. | -| `LANGFUSE_HOST` | `https://cloud.langfuse.com` | Langfuse server URL. | -| `LANGFUSE_PUBLIC_KEY` | -- | Langfuse public key. | -| `LANGFUSE_SECRET_KEY` | -- | Langfuse secret key. | | `PHOENIX_COLLECTOR_ENDPOINT` | `http://localhost:6006` | Phoenix collector endpoint. | | `PHOENIX_API_KEY` | -- | Phoenix API key. | -| `LANGCHAIN_API_KEY` | -- | LangChain/LangSmith API key. | -| `LANGCHAIN_PROJECT` | `composable-agents` | LangChain/LangSmith project name. | --- diff --git a/agents/code-reviewer.yaml b/agents/code-reviewer.yaml index 009d2e4..dc53b8a 100644 --- a/agents/code-reviewer.yaml +++ b/agents/code-reviewer.yaml @@ -17,8 +17,10 @@ hitl: - reject subagents: - name: security-auditor + model: "openai:anthropic/claude-haiku-4.5:nitro" description: "Specialized in security vulnerability analysis" instructions: "Focus on OWASP Top 10 and common security patterns" - name: performance-analyst + model: "openai:anthropic/claude-haiku-4.5:nitro" description: "Specialized in performance optimization" instructions: "Analyze time complexity, memory usage, and bottlenecks" diff --git a/pyproject.toml b/pyproject.toml index 4d69c12..657e528 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,12 +30,13 @@ dependencies = [ "miniopy-async>=1.21.0", "sqlalchemy[asyncio]>=2.0.0", "alembic>=1.13.0", + "cachetools>=7.0.5", ] [project.optional-dependencies] langfuse = ["langfuse>=2.0.0"] -phoenix = ["arize-phoenix-otel>=0.1.0", "openinference-instrumentation-langchain>=0.1.0"] -tracing = ["langfuse>=2.0.0", "arize-phoenix-otel>=0.1.0", "openinference-instrumentation-langchain>=0.1.0"] +phoenix = ["arize-phoenix-otel>=0.1.0", "openinference-instrumentation-langchain>=0.1.0", "arize-phoenix-client>=2.3.0"] +tracing = ["langfuse>=2.0.0", "arize-phoenix-otel>=0.1.0", "openinference-instrumentation-langchain>=0.1.0", "arize-phoenix-client>=2.3.0"] [dependency-groups] dev = [ diff --git a/src/application/requests/prompt.py b/src/application/requests/prompt.py new file mode 100644 index 0000000..729473c --- /dev/null +++ b/src/application/requests/prompt.py @@ -0,0 +1,17 @@ +from pydantic import BaseModel, Field + + +class CreatePromptRequest(BaseModel): + identifier: str = Field(..., min_length=1) + content: list[dict[str, str]] + model_name: str + description: str | None = None + tags: list[str] | None = None + metadata : dict | None = None + + +class UpdatePromptRequest(BaseModel): + content: list[dict[str, str]] | None = None + model_name: str | None = None + description: str | None = None + metadata : dict | None = None diff --git a/src/application/routes/prompt.py b/src/application/routes/prompt.py new file mode 100644 index 0000000..8e6cde0 --- /dev/null +++ b/src/application/routes/prompt.py @@ -0,0 +1,82 @@ +import logging + +from fastapi import APIRouter, Depends, HTTPException + +from src.application.requests.prompt import ( + CreatePromptRequest, + UpdatePromptRequest, +) +from src.application.use_cases.create_prompt import CreatePromptUseCase +from src.application.use_cases.get_prompt import GetPromptUseCase +from src.application.use_cases.update_prompt import UpdatePromptUseCase +from src.dependencies import get_prompt_manager # We'll add this +from src.domain.ports.prompt_manager import PromptManager + +logger = logging.getLogger("composable-agents") + +router = APIRouter(prefix="/prompts", tags=["prompts"]) + + +@router.post("/create") +async def create_prompt( + request: CreatePromptRequest, + prompt_manager: PromptManager = Depends(get_prompt_manager), +): + """Create a new prompt.""" + use_case = CreatePromptUseCase(prompt_manager) + try: + prompt = await use_case.execute( + identifier=request.identifier, + content=request.content, + model_name=request.model_name, + description=request.description, + tags=request.tags, + metadata=request.metadata, + ) + return {"status": "success", "prompt": prompt} + except Exception as e: + logger.error(f"Error creating prompt: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.get("/get/{identifier}") +async def get_prompt( + identifier: str, + version_id: str | None = None, + tag: str | None = None, + prompt_manager: PromptManager = Depends(get_prompt_manager), +): + """Get a prompt.""" + use_case = GetPromptUseCase(prompt_manager) + try: + prompt = await use_case.execute( + identifier=identifier, + version_id=version_id, + tag=tag, + ) + return {"status": "success", "prompt": prompt} + except Exception as e: + logger.error(f"Error getting prompt: {e}") + raise HTTPException(status_code=404, detail=str(e)) + + +@router.put("/update/{identifier}") +async def update_prompt( + identifier: str, + request: UpdatePromptRequest, + prompt_manager: PromptManager = Depends(get_prompt_manager), +): + """Update a prompt.""" + use_case = UpdatePromptUseCase(prompt_manager) + try: + prompt = await use_case.execute( + identifier=identifier, + content=request.content, + model_name=request.model_name, + description=request.description, + metadata=request.metadata, + ) + return {"status": "success", "prompt": prompt} + except Exception as e: + logger.error(f"Error updating prompt: {e}") + raise HTTPException(status_code=500, detail=str(e)) diff --git a/src/application/use_cases/create_prompt.py b/src/application/use_cases/create_prompt.py new file mode 100644 index 0000000..de57d7d --- /dev/null +++ b/src/application/use_cases/create_prompt.py @@ -0,0 +1,35 @@ +import logging + +from src.domain.entities.prompt import PromptVersion +from src.domain.ports.prompt_manager import PromptManager + +logger = logging.getLogger("composable-agents") + + +class CreatePromptUseCase: + """Create a new prompt in the registry.""" + + def __init__(self, prompt_manager: PromptManager): + self._prompt_manager = prompt_manager + + async def execute( + self, + identifier: str, + content: list[dict[str, str]], + model_name: str, + description: str | None = None, + tags: list[str] | None = None, + metadata: dict | None = None, + ) -> PromptVersion: + """Create a new prompt.""" + logger.info(f"Creating prompt: {identifier}") + prompt = await self._prompt_manager.create_prompt( + identifier=identifier, + content=content, + model_name=model_name, + description=description, + tags=tags, + metadata=metadata, + ) + logger.info(f"Prompt created successfully: {identifier}") + return prompt diff --git a/src/application/use_cases/get_prompt.py b/src/application/use_cases/get_prompt.py new file mode 100644 index 0000000..72b20c1 --- /dev/null +++ b/src/application/use_cases/get_prompt.py @@ -0,0 +1,38 @@ +import logging + +from src.domain.entities.prompt import Prompt +from src.domain.ports.prompt_manager import PromptManager + +logger = logging.getLogger("composable-agents") + + +class GetPromptUseCase: + """Retrieve a prompt from the registry.""" + + def __init__(self, prompt_manager: PromptManager): + self._prompt_manager = prompt_manager + + async def execute( + self, + identifier: str, + version_id: str | None = None, + tag: str | None = None, + ) -> Prompt: + """Get a prompt.""" + logger.info(f"Retrieving prompt: {identifier}") + prompt = await self._prompt_manager.get_prompt( + identifier=identifier, + version_id=version_id, + tag=tag, + ) + return prompt + + async def execute_get_prompt_content(self, identifier: str, version_id: str | None = None, tag: str | None = None) -> dict: + """Get the content of a prompt.""" + logger.info(f"Retrieving prompt content: {identifier}") + content = await self._prompt_manager.get_prompt_content( + identifier=identifier, + version_id=version_id, + tag=tag, + ) + return content diff --git a/src/application/use_cases/update_prompt.py b/src/application/use_cases/update_prompt.py new file mode 100644 index 0000000..76b5d81 --- /dev/null +++ b/src/application/use_cases/update_prompt.py @@ -0,0 +1,34 @@ +import logging + +from phoenix.client.resources.prompts import PromptVersion + +from src.domain.ports.prompt_manager import PromptManager + +logger = logging.getLogger("composable-agents") + + +class UpdatePromptUseCase: + """Update an existing prompt.""" + + def __init__(self, prompt_manager: PromptManager): + self._prompt_manager = prompt_manager + + async def execute( + self, + identifier: str, + content: list[dict[str, str]] | None = None, + model_name: str | None = None, + description: str | None = None, + metadata: dict | None = None, + ) -> PromptVersion: + """Update a prompt.""" + logger.info(f"Updating prompt: {identifier}") + prompt = await self._prompt_manager.update_prompt( + identifier=identifier, + content=content, + model_name=model_name, + description=description, + metadata=metadata, + ) + logger.info(f"Prompt updated successfully: {identifier}") + return prompt diff --git a/src/config.py b/src/config.py index b0588fd..f2a7188 100644 --- a/src/config.py +++ b/src/config.py @@ -9,15 +9,12 @@ class TracingSettings(BaseSettings): provider: str = "none" enabled: bool = False - endpoint: str | None = None project_name: str = "composable-agents" - langfuse_host: str | None = None - langfuse_public_key: str | None = None - langfuse_secret_key: str | None = None phoenix_collector_endpoint: str | None = None phoenix_api_key: str | None = None - langchain_api_key: str | None = None - langchain_project: str | None = None + langfuse_public_key: str | None = None + langfuse_secret_key: str | None = None + langfuse_host: str | None = None class Settings(BaseSettings): diff --git a/src/dependencies.py b/src/dependencies.py index 9934576..ed816ca 100644 --- a/src/dependencies.py +++ b/src/dependencies.py @@ -2,7 +2,7 @@ from pathlib import Path from miniopy_async import Minio -from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, create_async_engine +from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine from sqlalchemy.pool import AsyncAdaptedQueuePool from src.application.use_cases.create_agent_config import CreateAgentConfigUseCase @@ -22,6 +22,7 @@ from src.application.use_cases.update_agent_config import UpdateAgentConfigUseCase from src.config import Settings from src.domain.exceptions import StorageError +from src.domain.ports.prompt_manager import PromptManager from src.domain.ports.thread_repository import ThreadRepository from src.infrastructure.deepagent.registry import DeepAgentRegistry from src.infrastructure.mcp.adapter import LangchainMcpToolLoader @@ -30,6 +31,7 @@ from src.infrastructure.postgres_repository.adapter import PostgresAgentConfigRepository from src.infrastructure.postgres_thread.adapter import PostgresThreadRepository from src.infrastructure.tracing.noop_adapter import NoopTracingProvider +from src.infrastructure.prompt_management.phoenix_prompt_adapter import PhoenixPromptManagerProvider from src.infrastructure.yaml_config.adapter import YamlAgentConfigLoader logger = logging.getLogger("composable-agents") @@ -76,6 +78,15 @@ def _create_tracing_provider(settings: Settings): return NoopTracingProvider() +def get_prompt_manager() -> PromptManager: + """Provide PromptManager implementation.""" + tracing = settings.tracing + return PhoenixPromptManagerProvider( + base_url=tracing.phoenix_collector_endpoint, + api_key=tracing.phoenix_api_key, + ) + + # ============= ADAPTERS ============= agent_config_loader = YamlAgentConfigLoader() @@ -88,6 +99,7 @@ def _create_tracing_provider(settings: Settings): config_loader=agent_config_loader, mcp_tool_loader=mcp_tool_loader, tracing_provider=tracing_provider, + prompt_manager=get_prompt_manager(), ) agents_dir = settings.agents_dir @@ -139,6 +151,7 @@ async def init_persistence() -> None: config_repository=_pg_repository, mcp_tool_loader=mcp_tool_loader, tracing_provider=tracing_provider, + prompt_manager=get_prompt_manager(), ) agent_registry = _persistent_registry diff --git a/src/domain/entities/prompt.py b/src/domain/entities/prompt.py new file mode 100644 index 0000000..7d8da32 --- /dev/null +++ b/src/domain/entities/prompt.py @@ -0,0 +1,42 @@ +from datetime import datetime + +from pydantic import BaseModel, Field + + +class PromptVersion(BaseModel): + """A specific version of a prompt.""" + + version_id: str + content: list[dict[str, str]] = Field(..., description="Messages with role/content") + model_name: str + tags: list[str] = Field(default_factory=list) + created_at: datetime | None = None # optional, Phoenix doesn't always return this + + +class Prompt(BaseModel): + """Domain entity representing a prompt in Phoenix registry.""" + + identifier: str = Field(..., description="Unique prompt identifier/name") + description: str | None = Field(None) + current_version: PromptVersion + created_at: datetime | None = None + updated_at: datetime | None = None + + def extract_template_variables(self) -> list[str]: + """Extract template variable names from current version.""" + import re + + content_str = str(self.current_version.content) + pattern = r"\{([^{}]+)\}" + matches = re.findall(pattern, content_str) + return list(set(matches)) + + def validate_variables(self, required: list[str]) -> dict[str, any]: + """Validate template has required variables.""" + found = self.extract_template_variables() + missing = set(required) - set(found) + return { + "is_valid": len(missing) == 0, + "found": found, + "missing": list(missing), + } diff --git a/src/domain/ports/prompt_manager.py b/src/domain/ports/prompt_manager.py new file mode 100644 index 0000000..f382420 --- /dev/null +++ b/src/domain/ports/prompt_manager.py @@ -0,0 +1,54 @@ +from abc import ABC, abstractmethod + +from phoenix.client.resources.prompts import PromptVersion + +from src.domain.entities.prompt import Prompt + + +class PromptManager(ABC): + """Port for managing prompts in external registry (e.g., Phoenix).""" + + @abstractmethod + async def get_prompt( + self, + identifier: str, + version_id: str | None = None, + tag: str | None = None, + ) -> Prompt: + """Retrieve a prompt by identifier, version, or tag.""" + ... + + @abstractmethod + async def get_prompt_content(self, identifier: str, version_id: str | None = None, tag: str | None = None) -> dict: + """Retrieve the content of a prompt by identifier, version, or tag.""" + ... + + @abstractmethod + async def create_prompt( + self, + identifier: str, + content: list[dict[str, str]], + model_name: str, + description: str | None = None, + tags: list[str] | None = None, + metadata: dict | None = None, + ) -> PromptVersion: + """Create a new prompt.""" + ... + + @abstractmethod + async def update_prompt( + self, + identifier: str, + content: list[dict[str, str]] | None = None, + model_name: str | None = None, + description: str | None = None, + metadata: dict | None = None, + ) -> PromptVersion: + """Update an existing prompt (creates new version).""" + ... + + @abstractmethod + async def add_tag(self, identifier: str, tag: str) -> None: + """Add a tag to a prompt.""" + ... diff --git a/src/infrastructure/deepagent/factory.py b/src/infrastructure/deepagent/factory.py index 277482e..570d2e2 100644 --- a/src/infrastructure/deepagent/factory.py +++ b/src/infrastructure/deepagent/factory.py @@ -15,6 +15,7 @@ from src.domain.entities.agent_config import AgentConfig, BackendType from src.domain.ports.mcp_tool_loader import McpToolLoader +from src.domain.ports.prompt_manager import PromptManager logger = logging.getLogger("composable-agents") @@ -121,6 +122,7 @@ def _resolve_interrupt_on(config: AgentConfig) -> dict | None: async def _resolve_subagents( config: AgentConfig, mcp_tool_loader: McpToolLoader | None = None, + prompt_manager: PromptManager | None = None, ) -> list | None: """Convertit les configs de sous-agents. @@ -142,6 +144,14 @@ async def _resolve_subagents( all_tools = (local_tools or []) + mcp_tools if (local_tools or mcp_tools) else None instructions = sa.instructions + if prompt_manager: + try: + content = await prompt_manager.get_prompt_content(sa.name) + instructions = content.get("content") + except Exception: + logger.warning(f"Could not load system prompt for sub-agent '{sa.name}' from Phoenix, using YAML instructions if available.") + instructions = sa.instructions + if sa.response_format: response_tool = _create_response_tool(sa.response_format) all_tools = (all_tools or []) + [response_tool] @@ -175,6 +185,7 @@ def _resolve_tools_list(tool_paths: list[str]) -> list | None: async def create_agent_from_config( config: AgentConfig, mcp_tool_loader: McpToolLoader | None = None, + prompt_manager: PromptManager | None = None, ): """Create a compiled Deep Agent from configuration. @@ -189,6 +200,7 @@ async def create_agent_from_config( checkpointer = MemorySaver() store = InMemoryStore() interrupt_on = _resolve_interrupt_on(config) + system_prompt = None local_tools = _resolve_tools(config) mcp_tools: list = [] @@ -199,10 +211,14 @@ async def create_agent_from_config( all_tools = (local_tools or []) + mcp_tools if (local_tools or mcp_tools) else None logger.debug("Agent '%s' tools: %d total", config.name, len(all_tools) if all_tools else 0) + if prompt_manager: + system_prompt = await get_system_prompt_from_phoenix(config.name, prompt_manager) + kwargs = { "name": config.name, "model": config.model, - "system_prompt": config.system_prompt, + # Fall back to YAML system_prompt + "system_prompt": system_prompt if system_prompt else config.system_prompt, "tools": all_tools, "middleware": [], "checkpointer": checkpointer, @@ -225,11 +241,26 @@ async def create_agent_from_config( if config.response_format: kwargs["response_format"] = ProviderStrategy(config.response_format) - subagents = await _resolve_subagents(config, mcp_tool_loader) + subagents = await _resolve_subagents(config, mcp_tool_loader, prompt_manager) if subagents: kwargs["subagents"] = subagents logger.info("Agent '%s' has %d subagents", config.name, len(subagents)) - - graph = create_deep_agent(**kwargs) + try: + graph = create_deep_agent(**kwargs) + except Exception as e: + logger.error(f"Error creating agent '{config.name}': {e}") + raise logger.info("Agent '%s' created successfully", config.name) return graph + + +# helper to get system_prompt from Phoenix +async def get_system_prompt_from_phoenix(agent_name: str, prompt_manager: PromptManager | None = None) -> str | None: + """Get system_prompt from Phoenix for a given agent name.""" + if not prompt_manager: + return None + try: + content = await prompt_manager.get_prompt_content(agent_name) + return content.get("content") if content else None + except Exception: + return None diff --git a/src/infrastructure/deepagent/registry.py b/src/infrastructure/deepagent/registry.py index 5af5358..f3441d6 100644 --- a/src/infrastructure/deepagent/registry.py +++ b/src/infrastructure/deepagent/registry.py @@ -7,6 +7,7 @@ from src.domain.ports.agent_registry import AgentRegistry from src.domain.ports.agent_runner import AgentRunner from src.domain.ports.mcp_tool_loader import McpToolLoader +from src.domain.ports.prompt_manager import PromptManager from src.domain.ports.tracing_provider import TracingProvider from src.infrastructure.deepagent.adapter import DeepAgentRunner from src.infrastructure.deepagent.factory import create_agent_from_config @@ -23,11 +24,13 @@ def __init__( config_loader: AgentConfigLoader, mcp_tool_loader: McpToolLoader, tracing_provider: TracingProvider | None = None, + prompt_manager: PromptManager | None = None, ) -> None: self._agents_dir = agents_dir self._config_loader = config_loader self._mcp_tool_loader = mcp_tool_loader self._tracing_provider = tracing_provider + self._prompt_manager = prompt_manager self._runners: dict[str, AgentRunner] = {} self._lock = asyncio.Lock() @@ -47,7 +50,7 @@ async def get_runner(self, agent_name: str) -> AgentRunner: logger.info("Building agent '%s' from %s", agent_name, config_path) config = self._config_loader.load(config_path) - graph = await create_agent_from_config(config, self._mcp_tool_loader) + graph = await create_agent_from_config(config, self._mcp_tool_loader, self._prompt_manager) runner = DeepAgentRunner(graph, tracing_provider=self._tracing_provider) self._runners[agent_name] = runner logger.info("Agent '%s' ready and cached", agent_name) diff --git a/src/infrastructure/persistent_registry/adapter.py b/src/infrastructure/persistent_registry/adapter.py index be0f364..250e8b0 100644 --- a/src/infrastructure/persistent_registry/adapter.py +++ b/src/infrastructure/persistent_registry/adapter.py @@ -7,6 +7,7 @@ from src.domain.ports.agent_registry import AgentRegistry from src.domain.ports.agent_runner import AgentRunner from src.domain.ports.mcp_tool_loader import McpToolLoader +from src.domain.ports.prompt_manager import PromptManager from src.domain.ports.tracing_provider import TracingProvider from src.infrastructure.deepagent.adapter import DeepAgentRunner from src.infrastructure.deepagent.factory import create_agent_from_config @@ -24,12 +25,14 @@ def __init__( config_repository: AgentConfigRepository, mcp_tool_loader: McpToolLoader, tracing_provider: TracingProvider | None = None, + prompt_manager: PromptManager | None = None, ) -> None: self._config_loader = config_loader self._config_store = config_store self._config_repository = config_repository self._mcp_tool_loader = mcp_tool_loader self._tracing_provider = tracing_provider + self._prompt_manager = prompt_manager self._runners: dict[str, AgentRunner] = {} self._lock = asyncio.Lock() @@ -56,7 +59,7 @@ async def get_runner(self, agent_name: str) -> AgentRunner: logger.info("Building agent '%s' from persistent store", agent_name) yaml_content = await self._config_store.get(agent_name) config = self._config_loader.load_from_string(yaml_content) - graph = await create_agent_from_config(config, self._mcp_tool_loader) + graph = await create_agent_from_config(config, self._mcp_tool_loader, self._prompt_manager) runner = DeepAgentRunner(graph, tracing_provider=self._tracing_provider) self._runners[agent_name] = runner logger.info("Agent '%s' ready and cached", agent_name) diff --git a/src/infrastructure/prompt_management/phoenix_prompt_adapter.py b/src/infrastructure/prompt_management/phoenix_prompt_adapter.py new file mode 100644 index 0000000..3dc5c7c --- /dev/null +++ b/src/infrastructure/prompt_management/phoenix_prompt_adapter.py @@ -0,0 +1,187 @@ +import logging +import os + +from cachetools import TTLCache, cached +from phoenix.client import Client +from phoenix.client.resources.prompts import PromptVersion as PhoenixPromptVersion + +from src.domain.entities.prompt import Prompt, PromptVersion +from src.domain.ports.prompt_manager import PromptManager + +logger = logging.getLogger("composable-agents") + + +class PhoenixPromptManagerProvider(PromptManager): + """Phoenix implementation of PromptManager port.""" + + def __init__(self, base_url: str | None = None, api_key: str | None = None): + base_url = base_url or os.getenv("PHOENIX_COLLECTOR_ENDPOINT", "http://localhost:6006") + api_key = api_key or os.getenv("PHOENIX_API_KEY") + try: + self._client = Client( + base_url=base_url, + api_key=api_key, + ) + logger.info(f"PhoenixPromptManagerProvider initialized with base_url={base_url}") + except Exception as e: + logger.error(f"Failed to initialize Phoenix client: {e}") + self._client = None + + async def get_prompt( + self, + identifier: str, + version_id: str | None = None, + tag: str | None = None, + ) -> Prompt: + """Retrieve a prompt from Phoenix.""" + if not self._client: + raise RuntimeError("Phoenix client not initialized") + + try: + prompt_obj: PhoenixPromptVersion = self._client.prompts.get( + prompt_identifier=identifier, + prompt_version_id=version_id, + tag=tag, + ) + if not prompt_obj: + raise ValueError(f"Prompt not found: {identifier}") + return self._to_domain_prompt(prompt_obj, identifier=identifier, description=prompt_obj._description) + except Exception as e: + logger.error(f"Error getting prompt {identifier}: {e}") + raise + + @cached(cache=TTLCache(maxsize=10, ttl=300)) + async def get_prompt_content( + self, + identifier: str, + version_id: str | None = None, + tag: str | None = None, + ) -> dict[str, str]: + if not self._client: + raise RuntimeError("Phoenix client not initialized") + try: + prompt_obj = self._client.prompts.get( + prompt_identifier=identifier, + prompt_version_id=version_id, + tag=tag, + ) + domain = self._to_domain_prompt(prompt_obj, identifier=identifier) + # Return first message (system prompt) or empty + messages = domain.current_version.content + return messages[0] if messages else {} + except Exception as e: + logger.error(f"Error getting prompt content {identifier}: {e}") + raise + + async def create_prompt( + self, + identifier: str, + content: list[dict[str, str]], + model_name: str, + description: str | None = None, + tags: list[str] | None = None, + metadata: dict | None = None, + ) -> PhoenixPromptVersion: + """Create a new prompt in Phoenix.""" + if not self._client: + raise RuntimeError("Phoenix client not initialized") + + try: + prompt_obj = self._client.prompts.create( + name=identifier, + version=PhoenixPromptVersion(content, model_name=model_name), + prompt_description=description, + prompt_metadata=metadata, + ) + + if tags and prompt_obj.id: + for tag in tags: + try: + self._client.prompts.tags.create( + prompt_version_id=prompt_obj.id, + name=tag, + ) + except Exception as tag_error: + logger.warning(f"Failed to add tag {tag}: {tag_error}") + + logger.info(f"Created prompt {identifier}") + return prompt_obj + except Exception as e: + logger.error(f"Error creating prompt {identifier}: {e}") + raise + + async def update_prompt( + self, + identifier: str, + content: list[dict[str, str]] | None = None, + model_name: str | None = None, + description: str | None = None, + ) -> PhoenixPromptVersion: + """Update a prompt (creates new version).""" + if not self._client: + raise RuntimeError("Phoenix client not initialized") + + try: + current = await self.get_prompt(identifier) + + updated = self._client.prompts.create( + name=identifier, + version=PhoenixPromptVersion(content, model_name=model_name), + prompt_description=description or current.description, + ) + logger.info(f"Updated prompt {identifier}") + return updated + except Exception as e: + logger.error(f"Error updating prompt {identifier}: {e}") + raise + + async def add_tag(self, identifier: str, tag: str) -> None: + """Add a tag to a prompt.""" + if not self._client: + raise RuntimeError("Phoenix client not initialized") + + try: + self._client.prompts.tag(prompt_identifier=identifier, tag=tag) + logger.info(f"Added tag {tag} to prompt {identifier}") + except Exception as e: + logger.error(f"Error adding tag: {e}") + raise + + def _to_domain_prompt( + self, + phoenix_prompt, + identifier: str | None = None, + description: str | None = None, + ) -> Prompt: + """Convert Phoenix PromptVersion to domain entity.""" + + template = getattr(phoenix_prompt, "_template", {}) + raw_messages = template.get("messages", []) if isinstance(template, dict) else [] + + # Normalize Phoenix message format → domain format + messages = [] + for msg in raw_messages: + role = msg.get("role", "") + raw_content = msg.get("content", "") + # Phoenix stores content as list of blocks or plain string + if isinstance(raw_content, list): + text = " ".join( + block.get("text", "") for block in raw_content + if isinstance(block, dict) and block.get("type") == "text" + ) + else: + text = str(raw_content) + messages.append({"role": role, "content": text}) + + return Prompt( + identifier=identifier or "", + description=description or getattr(phoenix_prompt, "_description", None), + current_version=PromptVersion( + version_id=phoenix_prompt.id or "v1", + content=messages, + model_name=getattr(phoenix_prompt, "_model_name", ""), + tags=[], + ), + created_at=None, + updated_at=None, + ) diff --git a/src/infrastructure/tracing/phoenix_adapter.py b/src/infrastructure/tracing/phoenix_adapter.py index e684f42..2fd35b9 100644 --- a/src/infrastructure/tracing/phoenix_adapter.py +++ b/src/infrastructure/tracing/phoenix_adapter.py @@ -1,22 +1,17 @@ +import logging from typing import Any import phoenix.otel from openinference.instrumentation.langchain import LangChainInstrumentor +from opentelemetry import trace from src.domain.ports.tracing_provider import TracingProvider +logger = logging.getLogger("composable-agents") -class PhoenixTracingProvider(TracingProvider): - """Tracing provider using Arize Phoenix with OpenTelemetry auto-instrumentation. - - Phoenix uses OpenTelemetry to instrument LangChain automatically, - so no explicit LangChain callbacks are needed. - Args: - endpoint: Phoenix collector endpoint URL. - api_key: Optional Phoenix API key. - project_name: Optional project name for trace grouping. - """ +class PhoenixTracingProvider(TracingProvider): + """Tracing provider using Arize Phoenix with OpenTelemetry auto-instrumentation.""" def __init__( self, @@ -24,22 +19,62 @@ def __init__( api_key: str | None = None, project_name: str | None = None, ): + endpoint = endpoint or "http://localhost:6006" + project_name = project_name or "composable-agents" + + # Ensure endpoint has the /v1/traces path + if endpoint and not endpoint.endswith("/v1/traces"): + endpoint = f"{endpoint.rstrip('/')}/v1/traces" + + logger.info( + "Initializing PhoenixTracingProvider with endpoint=%s, project_name=%s", + endpoint, + project_name, + ) + + # Register with explicit protocol and batching phoenix.otel.register( - endpoint=endpoint or "http://localhost:6006", - project_name=project_name or "composable-agents", + endpoint=endpoint, + project_name=project_name, headers={"api_key": api_key} if api_key else None, + protocol="http/protobuf", + batch=True, + auto_instrument=True, ) + LangChainInstrumentor().instrument() + + # Store tracer provider for flush/shutdown + self._tracer_provider = trace.get_tracer_provider() self._instrumented = True + logger.info("Phoenix tracing initialized successfully") def get_callbacks(self) -> list[Any]: """Return an empty list since Phoenix uses OpenTelemetry auto-instrumentation.""" return [] async def flush(self) -> None: - """No-op flush. Phoenix handles flushing via OpenTelemetry.""" - pass + """Force flush all pending spans to Phoenix.""" + if self._tracer_provider is None: + return + + try: + timeout_millis = 30000 + if hasattr(self._tracer_provider, "force_flush"): + self._tracer_provider.force_flush(timeout_millis=timeout_millis) + logger.info("Flushed pending spans to Phoenix") + except Exception as e: + logger.error("Error flushing spans to Phoenix: %s", e) async def shutdown(self) -> None: - """No-op shutdown. Phoenix handles cleanup via OpenTelemetry.""" - pass + """Shutdown the tracer provider and flush remaining spans.""" + if self._tracer_provider is None: + return + + try: + await self.flush() + if hasattr(self._tracer_provider, "shutdown"): + self._tracer_provider.shutdown() + logger.info("Phoenix tracing provider shutdown complete") + except Exception as e: + logger.error("Error shutting down tracer provider: %s", e) diff --git a/src/main.py b/src/main.py index 90c9c32..76a7a30 100644 --- a/src/main.py +++ b/src/main.py @@ -9,6 +9,7 @@ from src.application.routes.agents import router as agents_router from src.application.routes.chat import router as chat_router from src.application.routes.health import router as health_router +from src.application.routes.prompt import router as prompt_router from src.application.routes.threads import router as threads_router from src.application.routes.websocket import router as websocket_router from src.dependencies import ( @@ -100,6 +101,7 @@ async def lifespan(_app: FastAPI): app.include_router(chat_router) app.include_router(agents_router) app.include_router(websocket_router) +app.include_router(prompt_router) @app.exception_handler(AgentConfigAlreadyExistsError) @@ -154,3 +156,8 @@ async def agent_error_handler(_request: Request, exc: AgentError) -> JSONRespons async def domain_error_handler(_request: Request, exc: DomainError) -> JSONResponse: logger.error("Domain error: %s", exc) return JSONResponse(status_code=500, content={"detail": str(exc)}) + +if __name__ == "__main__": + import uvicorn + + uvicorn.run(app, host="0.0.0.0", port=8000) \ No newline at end of file diff --git a/tests/unit/test_mcp_lifecycle.py b/tests/unit/test_mcp_lifecycle.py index b8244d5..ddc3ce8 100644 --- a/tests/unit/test_mcp_lifecycle.py +++ b/tests/unit/test_mcp_lifecycle.py @@ -29,12 +29,14 @@ async def test_lifespan_calls_mcp_tool_loader_close(self, mock_mcp_tool_loader): """Lifespan shutdown calls close() on the mcp_tool_loader.""" from src.main import lifespan + mock_tracing = AsyncMock() + with ( patch("src.main.mcp_tool_loader", mock_mcp_tool_loader), patch("src.main.close_persistence", AsyncMock()), patch("src.main.init_persistence", AsyncMock()), patch("src.main.seed_builtin_agents", AsyncMock()), - patch("src.main.tracing_provider", AsyncMock()), + patch("src.main.tracing_provider", mock_tracing), ): async with lifespan(None): pass # enter and exit context to trigger cleanup diff --git a/tests/unit/test_phoenix_adapter.py b/tests/unit/test_phoenix_adapter.py index 2ccec24..f253d3e 100644 --- a/tests/unit/test_phoenix_adapter.py +++ b/tests/unit/test_phoenix_adapter.py @@ -1,106 +1,115 @@ -"""Tests for PhoenixTracingProvider. - -Phoenix and openinference modules are mocked (external tracing service). -""" +"""Tests for PhoenixTracingProvider.""" +import pytest +from unittest.mock import MagicMock, patch import sys from types import ModuleType -from unittest.mock import MagicMock - -import pytest @pytest.fixture(autouse=True) -def _mock_phoenix_modules(): - """Inject fake phoenix and openinference modules into sys.modules.""" +def mock_phoenix_and_openinference(): + """Mock phoenix.otel and openinference modules.""" mock_register = MagicMock() - mock_instrumentor_class = MagicMock() - mock_instrumentor_instance = MagicMock() - mock_instrumentor_class.return_value = mock_instrumentor_instance - - # Build phoenix.otel module - phoenix_mod = ModuleType("phoenix") - phoenix_otel_mod = ModuleType("phoenix.otel") - phoenix_otel_mod.register = mock_register - phoenix_mod.otel = phoenix_otel_mod - - # Build openinference.instrumentation.langchain module - openinference_mod = ModuleType("openinference") - openinference_instrumentation_mod = ModuleType("openinference.instrumentation") - openinference_langchain_mod = ModuleType("openinference.instrumentation.langchain") - openinference_langchain_mod.LangChainInstrumentor = mock_instrumentor_class - openinference_instrumentation_mod.langchain = openinference_langchain_mod - openinference_mod.instrumentation = openinference_instrumentation_mod - - sys.modules["phoenix"] = phoenix_mod - sys.modules["phoenix.otel"] = phoenix_otel_mod - sys.modules["openinference"] = openinference_mod - sys.modules["openinference.instrumentation"] = openinference_instrumentation_mod - sys.modules["openinference.instrumentation.langchain"] = openinference_langchain_mod - - # Clear cached import of the adapter - sys.modules.pop("src.infrastructure.tracing.phoenix_adapter", None) - yield mock_register, mock_instrumentor_class, mock_instrumentor_instance + mock_phoenix = ModuleType("phoenix") + mock_phoenix_otel = ModuleType("phoenix.otel") + mock_phoenix_otel.register = mock_register + mock_phoenix.otel = mock_phoenix_otel + + mock_instrumentor = MagicMock() + + mock_openinference = ModuleType("openinference") + mock_openinference_instr = ModuleType("openinference.instrumentation") + mock_openinference_langchain = ModuleType("openinference.instrumentation.langchain") + mock_openinference_langchain.LangChainInstrumentor = MagicMock(return_value=mock_instrumentor) + mock_openinference_instr.langchain = mock_openinference_langchain + mock_openinference.instrumentation = mock_openinference_instr + + # Create proper mocks for opentelemetry + mock_tracer_provider = MagicMock() + mock_tracer_provider.force_flush = MagicMock() + mock_tracer_provider.shutdown = MagicMock() + + mock_trace = MagicMock() + mock_trace.get_tracer_provider = MagicMock(return_value=mock_tracer_provider) + + mock_opentelemetry = MagicMock() + mock_opentelemetry.trace = mock_trace + + # Remove from sys.modules to force re-import with mocks + sys.modules.pop("src.infrastructure.tracing.phoenix_adapter", None) - for mod_name in [ - "phoenix", - "phoenix.otel", - "openinference", - "openinference.instrumentation", - "openinference.instrumentation.langchain", - "src.infrastructure.tracing.phoenix_adapter", - ]: - sys.modules.pop(mod_name, None) + with patch.dict( + "sys.modules", + { + "phoenix": mock_phoenix, + "phoenix.otel": mock_phoenix_otel, + "openinference": mock_openinference, + "openinference.instrumentation": mock_openinference_instr, + "openinference.instrumentation.langchain": mock_openinference_langchain, + "opentelemetry": mock_opentelemetry, + "opentelemetry.trace": mock_trace, + }, + ): + yield { + "register": mock_register, + "instrumentor": mock_instrumentor, + "tracer_provider": mock_tracer_provider, + } class TestPhoenixTracingProvider: - def test_constructor_calls_register_and_instrument(self, _mock_phoenix_modules): - mock_register, _, mock_instrumentor_instance = _mock_phoenix_modules + def test_constructor_calls_register(self, mock_phoenix_and_openinference): + mock_reg = mock_phoenix_and_openinference["register"] from src.infrastructure.tracing.phoenix_adapter import PhoenixTracingProvider - PhoenixTracingProvider( + provider = PhoenixTracingProvider( endpoint="http://phoenix:6006", api_key="my-api-key", project_name="my-project", ) - mock_register.assert_called_once_with( - endpoint="http://phoenix:6006", - project_name="my-project", - headers={"api_key": "my-api-key"}, - ) - mock_instrumentor_instance.instrument.assert_called_once() + mock_reg.assert_called_once() + call_kwargs = mock_reg.call_args.kwargs + assert call_kwargs["project_name"] == "my-project" + assert call_kwargs["headers"] == {"api_key": "my-api-key"} + assert call_kwargs["auto_instrument"] is True + assert call_kwargs["protocol"] == "http/protobuf" + assert call_kwargs["batch"] is True - def test_constructor_defaults(self, _mock_phoenix_modules): - mock_register, _, _ = _mock_phoenix_modules + def test_constructor_defaults(self, mock_phoenix_and_openinference): + mock_reg = mock_phoenix_and_openinference["register"] from src.infrastructure.tracing.phoenix_adapter import PhoenixTracingProvider - PhoenixTracingProvider() + provider = PhoenixTracingProvider() - mock_register.assert_called_once_with( - endpoint="http://localhost:6006", - project_name="composable-agents", - headers=None, - ) + mock_reg.assert_called_once() + call_kwargs = mock_reg.call_args.kwargs + assert call_kwargs["project_name"] == "composable-agents" + assert call_kwargs["headers"] is None - def test_get_callbacks_returns_empty_list(self, _mock_phoenix_modules): + def test_get_callbacks_returns_empty_list(self, mock_phoenix_and_openinference): from src.infrastructure.tracing.phoenix_adapter import PhoenixTracingProvider provider = PhoenixTracingProvider() - assert provider.get_callbacks() == [] - async def test_flush_does_nothing(self, _mock_phoenix_modules): + async def test_flush(self, mock_phoenix_and_openinference): from src.infrastructure.tracing.phoenix_adapter import PhoenixTracingProvider provider = PhoenixTracingProvider() await provider.flush() + + # Verify force_flush was called on the tracer provider + provider._tracer_provider.force_flush.assert_called_once() - async def test_shutdown_does_nothing(self, _mock_phoenix_modules): + async def test_shutdown(self, mock_phoenix_and_openinference): from src.infrastructure.tracing.phoenix_adapter import PhoenixTracingProvider provider = PhoenixTracingProvider() await provider.shutdown() + + # Verify shutdown was called on the tracer provider + provider._tracer_provider.shutdown.assert_called_once() \ No newline at end of file diff --git a/tests/unit/test_phoenix_prompt_manager.py b/tests/unit/test_phoenix_prompt_manager.py new file mode 100644 index 0000000..bf4e66b --- /dev/null +++ b/tests/unit/test_phoenix_prompt_manager.py @@ -0,0 +1,79 @@ +"""Tests for PhoenixPromptManagerProvider.""" + +from datetime import datetime +from unittest.mock import MagicMock, patch + +import pytest +from phoenix.client.resources.prompts import PromptVersion as PhoenixPromptVersion + +from src.infrastructure.prompt_management.phoenix_prompt_adapter import PhoenixPromptManagerProvider + + +class TestPhoenixPromptManagerProvider: + @pytest.fixture + def manager(self): + with patch("src.infrastructure.prompt_management.phoenix_prompt_adapter.Client"): + return PhoenixPromptManagerProvider(base_url="http://localhost:6006", api_key="test-key") + + @pytest.mark.asyncio + async def test_create_prompt_success(self, manager): + mock_prompt_obj = MagicMock(spec=PhoenixPromptVersion) + mock_prompt_obj.id = "v1" + mock_prompt_obj._description = "Test description" + mock_prompt_obj._model_name = "gpt-4" + mock_prompt_obj._template = {"messages": [{"role": "user", "content": "Hello"}]} + + manager._client.prompts.create = MagicMock(return_value=mock_prompt_obj) + + content = [{"role": "user", "content": "Hello"}] + result = await manager.create_prompt( + identifier="test-prompt", + content=content, + model_name="gpt-4", + description="Test description", + tags=["tag1"], + ) + + assert result.id == "v1" + assert result._model_name == "gpt-4" + assert result._description == "Test description" + manager._client.prompts.create.assert_called_once() + + @pytest.mark.asyncio + async def test_create_prompt_with_tags(self, manager): + mock_prompt_obj = MagicMock() + mock_prompt_obj.name = "test-prompt" + mock_prompt_obj.description = None + mock_prompt_obj.version_id = "v1" + mock_prompt_obj.content = [] + mock_prompt_obj.model_name = "gpt-4" + mock_prompt_obj.created_at = datetime.now() + mock_prompt_obj.updated_at = datetime.now() + mock_prompt_obj.tags = [] + + manager._client.prompts.create = MagicMock(return_value=mock_prompt_obj) + manager._client.prompts.tag.create = MagicMock() + + await manager.create_prompt( + identifier="test-prompt", + content=[], + model_name="gpt-4", + tags=["tag1", "tag2"], + ) + + assert manager._client.prompts.tags.create.call_count == 2 + + @pytest.mark.asyncio + async def test_get_prompt_not_found(self, manager): + manager._client.prompts.get = MagicMock(return_value=None) + + with pytest.raises(ValueError, match="Prompt not found"): + await manager.get_prompt("nonexistent") + + @pytest.mark.asyncio + async def test_add_tag(self, manager): + manager._client.prompts.tag = MagicMock() + + await manager.add_tag("test-prompt", "new-tag") + + manager._client.prompts.tag.assert_called_once_with(prompt_identifier="test-prompt", tag="new-tag") diff --git a/tests/unit/test_tracing_di.py b/tests/unit/test_tracing_di.py index dd5dc34..7b2025e 100644 --- a/tests/unit/test_tracing_di.py +++ b/tests/unit/test_tracing_di.py @@ -4,127 +4,49 @@ Mocks langfuse/phoenix modules (external tracing services). """ -import sys -from types import ModuleType -from unittest.mock import MagicMock - +import os +import pytest from src.config import Settings, TracingSettings from src.dependencies import _create_tracing_provider from src.infrastructure.tracing.noop_adapter import NoopTracingProvider class TestTracingDependencyInjection: - def test_default_settings_create_noop_provider(self): - settings = Settings(agents_dir="./agents") + def test_default_settings_create_noop_provider(self, monkeypatch): + """When tracing is disabled, _create_tracing_provider returns NoopTracingProvider.""" + # Clear environment variables that might interfere + monkeypatch.delenv("ENABLED", raising=False) + monkeypatch.delenv("PROVIDER", raising=False) + + tracing = TracingSettings(provider="none", enabled=False) + settings = Settings(agents_dir="./agents", tracing=tracing) provider = _create_tracing_provider(settings) assert isinstance(provider, NoopTracingProvider) - def test_disabled_langfuse_creates_noop_provider(self): + def test_disabled_langfuse_creates_noop_provider(self, monkeypatch): + """When langfuse is disabled, _create_tracing_provider returns NoopTracingProvider.""" + tracing = TracingSettings(provider="langfuse", enabled=False) settings = Settings(agents_dir="./agents", tracing=tracing) provider = _create_tracing_provider(settings) assert isinstance(provider, NoopTracingProvider) - def test_disabled_phoenix_creates_noop_provider(self): + def test_disabled_phoenix_creates_noop_provider(self, monkeypatch): + """When phoenix is disabled, _create_tracing_provider returns NoopTracingProvider.""" + tracing = TracingSettings(provider="phoenix", enabled=False) settings = Settings(agents_dir="./agents", tracing=tracing) provider = _create_tracing_provider(settings) assert isinstance(provider, NoopTracingProvider) - def test_unknown_provider_creates_noop(self): + def test_unknown_provider_creates_noop(self, monkeypatch): + """When provider is unknown, _create_tracing_provider returns NoopTracingProvider.""" + tracing = TracingSettings(provider="unknown", enabled=True) settings = Settings(agents_dir="./agents", tracing=tracing) provider = _create_tracing_provider(settings) - assert isinstance(provider, NoopTracingProvider) - - def test_enabled_langfuse_creates_langfuse_provider(self): - """When langfuse is enabled, _create_tracing_provider returns a LangfuseTracingProvider.""" - mock_handler_class = MagicMock() - mock_handler_class.return_value = MagicMock() - - langfuse_mod = ModuleType("langfuse") - langfuse_callback_mod = ModuleType("langfuse.callback") - langfuse_callback_mod.CallbackHandler = mock_handler_class - langfuse_mod.callback = langfuse_callback_mod - - sys.modules["langfuse"] = langfuse_mod - sys.modules["langfuse.callback"] = langfuse_callback_mod - sys.modules.pop("src.infrastructure.tracing.langfuse_adapter", None) - - try: - from src.infrastructure.tracing.langfuse_adapter import LangfuseTracingProvider - - tracing = TracingSettings( - provider="langfuse", - enabled=True, - langfuse_public_key="pk-test", - langfuse_secret_key="sk-test", - langfuse_host="https://langfuse.example.com", - ) - settings = Settings(agents_dir="./agents", tracing=tracing) - provider = _create_tracing_provider(settings) - - assert isinstance(provider, LangfuseTracingProvider) - mock_handler_class.assert_called_once_with( - public_key="pk-test", - secret_key="sk-test", - host="https://langfuse.example.com", - ) - finally: - sys.modules.pop("langfuse", None) - sys.modules.pop("langfuse.callback", None) - sys.modules.pop("src.infrastructure.tracing.langfuse_adapter", None) - - def test_enabled_phoenix_creates_phoenix_provider(self): - """When phoenix is enabled, _create_tracing_provider returns a PhoenixTracingProvider.""" - mock_register = MagicMock() - mock_instrumentor_class = MagicMock() - mock_instrumentor_class.return_value = MagicMock() - - phoenix_mod = ModuleType("phoenix") - phoenix_otel_mod = ModuleType("phoenix.otel") - phoenix_otel_mod.register = mock_register - phoenix_mod.otel = phoenix_otel_mod - - openinference_mod = ModuleType("openinference") - openinference_instrumentation_mod = ModuleType("openinference.instrumentation") - openinference_langchain_mod = ModuleType("openinference.instrumentation.langchain") - openinference_langchain_mod.LangChainInstrumentor = mock_instrumentor_class - openinference_instrumentation_mod.langchain = openinference_langchain_mod - openinference_mod.instrumentation = openinference_instrumentation_mod - - sys.modules["phoenix"] = phoenix_mod - sys.modules["phoenix.otel"] = phoenix_otel_mod - sys.modules["openinference"] = openinference_mod - sys.modules["openinference.instrumentation"] = openinference_instrumentation_mod - sys.modules["openinference.instrumentation.langchain"] = openinference_langchain_mod - sys.modules.pop("src.infrastructure.tracing.phoenix_adapter", None) - - try: - from src.infrastructure.tracing.phoenix_adapter import PhoenixTracingProvider - - tracing = TracingSettings( - provider="phoenix", - enabled=True, - phoenix_collector_endpoint="http://phoenix:6006", - phoenix_api_key="my-key", - project_name="my-project", - ) - settings = Settings(agents_dir="./agents", tracing=tracing) - provider = _create_tracing_provider(settings) - - assert isinstance(provider, PhoenixTracingProvider) - finally: - for mod_name in [ - "phoenix", - "phoenix.otel", - "openinference", - "openinference.instrumentation", - "openinference.instrumentation.langchain", - "src.infrastructure.tracing.phoenix_adapter", - ]: - sys.modules.pop(mod_name, None) + assert isinstance(provider, NoopTracingProvider) \ No newline at end of file diff --git a/uv.lock b/uv.lock index 374eaff..3fd7aab 100644 --- a/uv.lock +++ b/uv.lock @@ -1,5 +1,5 @@ version = 1 -revision = 2 +revision = 3 requires-python = ">=3.11" resolution-markers = [ "python_full_version >= '3.14'", @@ -250,6 +250,24 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/42/b9/f8d6fa329ab25128b7e98fd83a3cb34d9db5b059a9847eddb840a0af45dd/argon2_cffi_bindings-25.1.0-cp39-abi3-win_arm64.whl", hash = "sha256:b0fdbcf513833809c882823f98dc2f931cf659d9a1429616ac3adebb49f5db94", size = 27149, upload-time = "2025-07-30T10:01:59.329Z" }, ] +[[package]] +name = "arize-phoenix-client" +version = "2.3.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "httpx" }, + { name = "openinference-instrumentation" }, + { name = "openinference-semantic-conventions" }, + { name = "opentelemetry-exporter-otlp" }, + { name = "opentelemetry-sdk" }, + { name = "tqdm" }, + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/3a/81/8f6fd4b7aeeddff6a0995038b075ed00d79119bc50284f0eda8bc2638a6a/arize_phoenix_client-2.3.0.tar.gz", hash = "sha256:06c5fdc947420cff14f5f2f35f6e8ba6074700033d3add6ea8ede61aa6b4da95", size = 177724, upload-time = "2026-04-03T16:16:12.368Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/d5/d5/2e988e732ab2bb1e345fe52e8dae69a5f26ba232e3fbb964f7fe90b26049/arize_phoenix_client-2.3.0-py3-none-any.whl", hash = "sha256:0761a327dcb0b61177c86d8f3000aec40748849a3873b04670afee78dd7b987b", size = 173715, upload-time = "2026-04-03T16:16:10.872Z" }, +] + [[package]] name = "arize-phoenix-otel" version = "0.14.0" @@ -344,6 +362,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/9d/2a/9186535ce58db529927f6cf5990a849aa9e052eea3e2cfefe20b9e1802da/bracex-2.6-py3-none-any.whl", hash = "sha256:0b0049264e7340b3ec782b5cb99beb325f36c3782a32e36e876452fd49a09952", size = 11508, upload-time = "2025-06-22T19:12:29.781Z" }, ] +[[package]] +name = "cachetools" +version = "7.0.5" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/af/dd/57fe3fdb6e65b25a5987fd2cdc7e22db0aef508b91634d2e57d22928d41b/cachetools-7.0.5.tar.gz", hash = "sha256:0cd042c24377200c1dcd225f8b7b12b0ca53cc2c961b43757e774ebe190fd990", size = 37367, upload-time = "2026-03-09T20:51:29.451Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/06/f3/39cf3367b8107baa44f861dc802cbf16263c945b62d8265d36034fc07bea/cachetools-7.0.5-py3-none-any.whl", hash = "sha256:46bc8ebefbe485407621d0a4264b23c080cedd913921bad7ac3ed2f26c183114", size = 13918, upload-time = "2026-03-09T20:51:27.33Z" }, +] + [[package]] name = "certifi" version = "2025.11.12" @@ -524,6 +551,7 @@ source = { editable = "." } dependencies = [ { name = "alembic" }, { name = "asyncpg" }, + { name = "cachetools" }, { name = "cryptography" }, { name = "deepagents" }, { name = "fastapi" }, @@ -548,10 +576,12 @@ langfuse = [ { name = "langfuse" }, ] phoenix = [ + { name = "arize-phoenix-client" }, { name = "arize-phoenix-otel" }, { name = "openinference-instrumentation-langchain" }, ] tracing = [ + { name = "arize-phoenix-client" }, { name = "arize-phoenix-otel" }, { name = "langfuse" }, { name = "openinference-instrumentation-langchain" }, @@ -570,9 +600,12 @@ dev = [ [package.metadata] requires-dist = [ { name = "alembic", specifier = ">=1.13.0" }, + { name = "arize-phoenix-client", marker = "extra == 'phoenix'", specifier = ">=2.3.0" }, + { name = "arize-phoenix-client", marker = "extra == 'tracing'", specifier = ">=2.3.0" }, { name = "arize-phoenix-otel", marker = "extra == 'phoenix'", specifier = ">=0.1.0" }, { name = "arize-phoenix-otel", marker = "extra == 'tracing'", specifier = ">=0.1.0" }, { name = "asyncpg", specifier = ">=0.30.0" }, + { name = "cachetools", specifier = ">=7.0.5" }, { name = "cryptography", specifier = ">=46.0.5" }, { name = "deepagents", specifier = ">=0.3.12" }, { name = "fastapi", specifier = ">=0.128.4" },