diff --git a/.release-please-manifest.json b/.release-please-manifest.json index 2cfadde6a..819ecdd75 100644 --- a/.release-please-manifest.json +++ b/.release-please-manifest.json @@ -1,3 +1,3 @@ { - ".": "0.11.7" + ".": "0.11.8" } diff --git a/CHANGELOG.md b/CHANGELOG.md index 8621632f8..28b4b9eeb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,20 @@ # Changelog +## Unreleased + +### Features + +* **tracing:** emit OTel metrics for async span queue depth, batch drain, and SGP export success/failure (HTTP status labels). Disable SDK-side recording with ``AGENTEX_TRACING_METRICS=0``. + +## 0.11.8 (2026-06-01) + +Full Changelog: [v0.11.7...v0.11.8](https://github.com/scaleapi/scale-agentex-python/compare/v0.11.7...v0.11.8) + +### Features + +* **cli:** add Temporal + LangGraph agent template and example ([#383](https://github.com/scaleapi/scale-agentex-python/issues/383)) ([bbc9e02](https://github.com/scaleapi/scale-agentex-python/commit/bbc9e02d2a2b063a3e509a07ffca8ca4bf459e57)) +* **tracing:** OTel span queue and export telemetry (SGPINF-1863) ([#373](https://github.com/scaleapi/scale-agentex-python/issues/373)) ([6669012](https://github.com/scaleapi/scale-agentex-python/commit/6669012638481a63bdd7629582818796ca31bdf3)) + ## 0.11.7 (2026-06-01) Full Changelog: [v0.11.6...v0.11.7](https://github.com/scaleapi/scale-agentex-python/compare/v0.11.6...v0.11.7) diff --git a/examples/tutorials/10_async/10_temporal/130_langgraph/.dockerignore b/examples/tutorials/10_async/10_temporal/130_langgraph/.dockerignore new file mode 100644 index 000000000..c4f7a8b4b --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/130_langgraph/.dockerignore @@ -0,0 +1,43 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# Environments +.env** +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# IDE +.idea/ +.vscode/ +*.swp +*.swo + +# Git +.git +.gitignore + +# Misc +.DS_Store \ No newline at end of file diff --git a/examples/tutorials/10_async/10_temporal/130_langgraph/.env.example b/examples/tutorials/10_async/10_temporal/130_langgraph/.env.example new file mode 100644 index 000000000..ab1a5790f --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/130_langgraph/.env.example @@ -0,0 +1,13 @@ +# at130-langgraph - Environment Variables +# Copy this file to .env and fill in the values + +# API key for your LLM provider +LITELLM_API_KEY= + +# LLM base URL (optional - override to use a different provider) +# OPENAI_BASE_URL= + +# SGP Configuration (optional - for tracing) +# SGP_API_KEY= +# SGP_ACCOUNT_ID= +# SGP_CLIENT_BASE_URL= \ No newline at end of file diff --git a/examples/tutorials/10_async/10_temporal/130_langgraph/Dockerfile b/examples/tutorials/10_async/10_temporal/130_langgraph/Dockerfile new file mode 100644 index 000000000..8a125ac72 --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/130_langgraph/Dockerfile @@ -0,0 +1,43 @@ +# syntax=docker/dockerfile:1.3 +FROM python:3.12-slim +COPY --from=ghcr.io/astral-sh/uv:0.6.4 /uv /uvx /bin/ + +# Install system dependencies +RUN apt-get update && apt-get install -y \ + htop \ + vim \ + curl \ + tar \ + python3-dev \ + postgresql-client \ + build-essential \ + libpq-dev \ + gcc \ + cmake \ + netcat-openbsd \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/* + +RUN uv pip install --system --upgrade pip setuptools wheel + +ENV UV_HTTP_TIMEOUT=1000 + +COPY 10_async/10_temporal/130_langgraph/pyproject.toml /app/130_langgraph/pyproject.toml +COPY 10_async/10_temporal/130_langgraph/README.md /app/130_langgraph/README.md + +WORKDIR /app/130_langgraph + +COPY 10_async/10_temporal/130_langgraph/project /app/130_langgraph/project +COPY 10_async/10_temporal/130_langgraph/tests /app/130_langgraph/tests +COPY test_utils /app/test_utils + +RUN uv pip install --system .[dev] + +ENV PYTHONPATH=/app + +ENV AGENT_NAME=at130-langgraph + +CMD ["uvicorn", "project.acp:acp", "--host", "0.0.0.0", "--port", "8000"] + +# When we deploy the worker, we will replace the CMD with the following +# CMD ["python", "-m", "run_worker"] diff --git a/examples/tutorials/10_async/10_temporal/130_langgraph/README.md b/examples/tutorials/10_async/10_temporal/130_langgraph/README.md new file mode 100644 index 000000000..61ccaf66a --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/130_langgraph/README.md @@ -0,0 +1,58 @@ +# at130-langgraph — AgentEx Temporal + LangGraph + +A minimal Temporal-backed [LangGraph](https://langchain-ai.github.io/langgraph/) +agent. It uses the official [`temporalio.contrib.langgraph`](https://docs.temporal.io/develop/python/integrations/langgraph) +plugin so each LangGraph node runs as a durable **Temporal activity** (the LLM +`agent` node) or inline in the **workflow** (the `tools` node) — set per node +with `execute_in`. *Temporal is the runtime; LangGraph is the agent framework.* + +> The Temporal LangGraph plugin is currently **experimental**. + +## The graph + +``` +START → agent → (tool calls?) → tools → agent + → (no tool calls?) → END +``` + +- `agent` (`execute_in="activity"`): the LLM call — a retried, observable Temporal activity. +- `tools` (`execute_in="workflow"`): runs the tool calls inline in the workflow. + +The router and tools are `async` so LangGraph awaits them directly (a sync +callable is offloaded via `run_in_executor`, which Temporal workflows forbid). + +## Project structure + +``` +130_langgraph/ +├── project/ +│ ├── acp.py # Thin async ACP server; registers the LangGraphPlugin +│ ├── workflow.py # Runs the graph each turn; keeps multi-turn memory +│ ├── graph.py # LangGraph graph; nodes tagged execute_in activity/workflow +│ └── tools.py # Async tool(s) +└── run_worker.py is project/run_worker.py +``` + +## Running + +```bash +agentex agents run --manifest manifest.yaml +``` + +Open the Temporal UI at http://localhost:8080 to watch the workflow and the +`agent` activity execute. Use `dev.ipynb` to create a task and send messages. + +## Adding tools + +Define an **async** `@tool` in `project/tools.py` and add it to `TOOLS`. The +model is bound with `TOOLS` and the tool node runs them by name. + +For a fuller version with human-in-the-loop approval and graph-introspection +queries, scaffold the `temporal-langgraph` template via `agentex init`. + +## Tests + +- `tests/test_graph_temporal.py` — hermetic ReAct-loop test with a stub model, + plus a live end-to-end run through the real Temporal plugin (skipped unless + `LITELLM_API_KEY` is set). +- `tests/test_agent.py` — live integration against a running agent. diff --git a/examples/tutorials/10_async/10_temporal/130_langgraph/dev.ipynb b/examples/tutorials/10_async/10_temporal/130_langgraph/dev.ipynb new file mode 100644 index 000000000..5320daac7 --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/130_langgraph/dev.ipynb @@ -0,0 +1,126 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "36834357", + "metadata": {}, + "outputs": [], + "source": [ + "from agentex import Agentex\n", + "\n", + "client = Agentex(base_url=\"http://localhost:5003\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d1c309d6", + "metadata": {}, + "outputs": [], + "source": [ + "AGENT_NAME = \"at130-langgraph\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9f6e6ef0", + "metadata": {}, + "outputs": [], + "source": [ + "# (REQUIRED) Create a new task. For Async agents, you must create a task for messages to be associated with.\n", + "import uuid\n", + "\n", + "rpc_response = client.agents.create_task(\n", + " agent_name=AGENT_NAME,\n", + " params={\n", + " \"name\": f\"{str(uuid.uuid4())[:8]}-task\",\n", + " \"params\": {}\n", + " }\n", + ")\n", + "\n", + "task = rpc_response.result\n", + "print(task)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b03b0d37", + "metadata": {}, + "outputs": [], + "source": [ + "# Send an event to the agent\n", + "\n", + "# The response is expected to be a list of TaskMessage objects, which is a union of the following types:\n", + "# - TextContent: A message with just text content \n", + "# - DataContent: A message with JSON-serializable data content\n", + "# - ToolRequestContent: A message with a tool request, which contains a JSON-serializable request to call a tool\n", + "# - ToolResponseContent: A message with a tool response, which contains response object from a tool call in its content\n", + "\n", + "# When processing the message/send response, if you are expecting more than TextContent, such as DataContent, ToolRequestContent, or ToolResponseContent, you can process them as well\n", + "\n", + "rpc_response = client.agents.send_event(\n", + " agent_name=AGENT_NAME,\n", + " params={\n", + " \"content\": {\"type\": \"text\", \"author\": \"user\", \"content\": \"Hello what can you do?\"},\n", + " \"task_id\": task.id,\n", + " }\n", + ")\n", + "\n", + "event = rpc_response.result\n", + "print(event)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a6927cc0", + "metadata": {}, + "outputs": [], + "source": [ + "# Subscribe to the async task messages produced by the agent\n", + "from agentex.lib.utils.dev_tools import subscribe_to_async_task_messages\n", + "\n", + "task_messages = subscribe_to_async_task_messages(\n", + " client=client,\n", + " task=task, \n", + " only_after_timestamp=event.created_at, \n", + " print_messages=True,\n", + " rich_print=True,\n", + " timeout=5,\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4864e354", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.9" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} \ No newline at end of file diff --git a/examples/tutorials/10_async/10_temporal/130_langgraph/environments.yaml b/examples/tutorials/10_async/10_temporal/130_langgraph/environments.yaml new file mode 100644 index 000000000..d54d8e5ff --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/130_langgraph/environments.yaml @@ -0,0 +1,64 @@ +# Agent Environment Configuration +# ------------------------------ +# This file defines environment-specific settings for your agent. +# This DIFFERS from the manifest.yaml file in that it is used to program things that are ONLY per environment. + +# ********** EXAMPLE ********** +# schema_version: "v1" # This is used to validate the file structure and is not used by the agentex CLI +# environments: +# dev: +# auth: +# principal: +# user_id: "1234567890" +# user_name: "John Doe" +# user_email: "john.doe@example.com" +# user_role: "admin" +# user_permissions: "read, write, delete" +# helm_overrides: # This is used to override the global helm values.yaml file in the agentex-agent helm charts +# replicas: 3 +# resources: +# requests: +# cpu: "1000m" +# memory: "2Gi" +# limits: +# cpu: "2000m" +# memory: "4Gi" +# env: +# - name: LOG_LEVEL +# value: "DEBUG" +# - name: ENVIRONMENT +# value: "staging" +# +# kubernetes: +# # OPTIONAL - Otherwise it will be derived from separately. However, this can be used to override the derived +# # namespace and deploy it with in the same namespace that already exists for a separate agent. +# namespace: "team-at130-langgraph" +# ********** END EXAMPLE ********** + +schema_version: "v1" # This is used to validate the file structure and is not used by the agentex CLI +environments: + dev: + auth: + principal: + user_id: # TODO: Fill in + account_id: # TODO: Fill in + helm_overrides: + # This is used to override the global helm values.yaml file in the agentex-agent helm charts + replicaCount: 2 + resources: + requests: + cpu: "500m" + memory: "1Gi" + limits: + cpu: "1000m" + memory: "2Gi" + temporal-worker: + enabled: true + replicaCount: 2 + resources: + requests: + cpu: "500m" + memory: "1Gi" + limits: + cpu: "1000m" + memory: "2Gi" \ No newline at end of file diff --git a/examples/tutorials/10_async/10_temporal/130_langgraph/manifest.yaml b/examples/tutorials/10_async/10_temporal/130_langgraph/manifest.yaml new file mode 100644 index 000000000..d1f5960b1 --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/130_langgraph/manifest.yaml @@ -0,0 +1,128 @@ +# Agent Manifest Configuration +# --------------------------- +# This file defines how your agent should be built and deployed. + +# Build Configuration +# ------------------ +# The build config defines what gets packaged into your agent's Docker image. +# This same configuration is used whether building locally or remotely. +# +# When building: +# 1. All files from include_paths are collected into a build context +# 2. The context is filtered by dockerignore rules +# 3. The Dockerfile uses this context to build your agent's image +# 4. The image is pushed to a registry and used to run your agent +build: + context: + # Build from the tutorials root so shared test_utils are available. + root: ../../../ + include_paths: + - 10_async/10_temporal/130_langgraph + - test_utils + dockerfile: 10_async/10_temporal/130_langgraph/Dockerfile + dockerignore: 10_async/10_temporal/130_langgraph/.dockerignore + + +# Local Development Configuration +# ----------------------------- +# Only used when running the agent locally +local_development: + agent: + port: 8000 # Port where your local ACP server is running + host_address: host.docker.internal # Host address for Docker networking (host.docker.internal for Docker, localhost for direct) + + # File paths for local development (relative to this manifest.yaml) + paths: + # Path to ACP server file + # Examples: + # project/acp.py (standard) + # src/server.py (custom structure) + # ../shared/acp.py (shared across projects) + # /absolute/path/acp.py (absolute path) + acp: project/acp.py + + # Path to temporal worker file + # Examples: + # project/run_worker.py (standard) + # workers/temporal.py (custom structure) + # ../shared/worker.py (shared across projects) + worker: project/run_worker.py + + +# Agent Configuration +# ----------------- +agent: + # Type of agent - either sync or async + acp_type: async + + # Unique name for your agent + # Used for task routing and monitoring + name: at130-langgraph + + # Description of what your agent does + # Helps with documentation and discovery + description: "A Temporal-backed LangGraph agent whose nodes run as Temporal activities" + + # Temporal workflow configuration + # This enables your agent to run as a Temporal workflow for long-running tasks + temporal: + enabled: true + workflows: + # Name of the workflow class + # Must match the @workflow.defn name in your workflow.py + - name: at130-langgraph + + # Queue name for task distribution + # Used by Temporal to route tasks to your agent + # Convention: _task_queue + queue_name: at130_langgraph_queue + + # Optional: Health check port for temporal worker + # Defaults to 80 if not specified + # health_check_port: 80 + + # Optional: Credentials mapping + # Maps Kubernetes secrets to environment variables + # Common credentials include: + credentials: + - env_var_name: REDIS_URL + secret_name: redis-url-secret + secret_key: url + # - env_var_name: LITELLM_API_KEY + # secret_name: litellm-api-key + # secret_key: api-key + + # Optional: Set Environment variables for running your agent locally as well + # as for deployment later on + env: {} + # LITELLM_API_KEY: "" + # OPENAI_BASE_URL: "" + # OPENAI_ORG_ID: "" + + +# Deployment Configuration +# ----------------------- +# Configuration for deploying your agent to Kubernetes clusters +deployment: + # Container image configuration + image: + repository: "" # Update with your container registry + tag: "latest" # Default tag, should be versioned in production + + imagePullSecrets: [] # Update with your image pull secret name + # - name: my-registry-secret + + # Global deployment settings that apply to all clusters + # These can be overridden in cluster-specific environments (environments.yaml) + global: + # Default replica count + replicaCount: 1 + + # Default resource requirements + resources: + requests: + cpu: "500m" + memory: "1Gi" + limits: + cpu: "1000m" + memory: "2Gi" \ No newline at end of file diff --git a/examples/tutorials/10_async/10_temporal/130_langgraph/project/__init__.py b/examples/tutorials/10_async/10_temporal/130_langgraph/project/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/examples/tutorials/10_async/10_temporal/130_langgraph/project/acp.py b/examples/tutorials/10_async/10_temporal/130_langgraph/project/acp.py new file mode 100644 index 000000000..c01f8831c --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/130_langgraph/project/acp.py @@ -0,0 +1,42 @@ +"""ACP server for the Temporal LangGraph agent. + +This file is intentionally thin. When ``acp_type="async"`` is combined with +``TemporalACPConfig(type="temporal", ...)``, FastACP auto-wires: + + HTTP task/create → @workflow.run on the workflow class + HTTP task/event/send → @workflow.signal(SignalName.RECEIVE_EVENT) + HTTP task/cancel → workflow cancellation via the Temporal client + +so we don't define any handlers here. The agent logic lives in +``project/workflow.py`` (the runtime) and ``project/graph.py`` (the LangGraph +graph whose nodes run as Temporal activities), executed by the Temporal worker +(``project/run_worker.py``), not by this HTTP process. + +The ``LangGraphPlugin`` is registered here too so the Temporal client started +by FastACP shares the same graph registry as the worker. +""" + +from __future__ import annotations + +import os + +from dotenv import load_dotenv + +load_dotenv() + +from temporalio.contrib.langgraph import LangGraphPlugin + +from project.graph import GRAPH_NAME, build_graph +from agentex.lib.types.fastacp import TemporalACPConfig +from agentex.lib.sdk.fastacp.fastacp import FastACP + +acp = FastACP.create( + acp_type="async", + config=TemporalACPConfig( + # When deployed to the cluster, the Temporal address is set automatically. + # Locally we point at the Temporal service from docker compose. + type="temporal", + temporal_address=os.getenv("TEMPORAL_ADDRESS", "localhost:7233"), + plugins=[LangGraphPlugin(graphs={GRAPH_NAME: build_graph()})], + ), +) \ No newline at end of file diff --git a/examples/tutorials/10_async/10_temporal/130_langgraph/project/graph.py b/examples/tutorials/10_async/10_temporal/130_langgraph/project/graph.py new file mode 100644 index 000000000..8d8de92d1 --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/130_langgraph/project/graph.py @@ -0,0 +1,79 @@ +"""LangGraph graph for at130-langgraph — nodes run as Temporal activities. + +The ``temporalio.contrib.langgraph`` plugin runs each node where its +``execute_in`` metadata says: the LLM ``agent`` node as a durable Temporal +**activity**, the ``tools`` node inline in the **workflow**. + + START → agent → (tool calls?) → tools → agent + → (no tool calls?) → END + +The router and tools are ``async`` so LangGraph awaits them directly — a sync +callable would be offloaded via ``run_in_executor``, which Temporal's workflow +event loop does not support. +""" + +from __future__ import annotations + +import os +from typing import Any, Annotated +from datetime import datetime, timedelta + +_litellm_key = os.environ.get("LITELLM_API_KEY") +if _litellm_key: + os.environ.setdefault("OPENAI_API_KEY", _litellm_key) + +from typing_extensions import TypedDict + +from langgraph.graph import END, START, StateGraph +from langchain_openai import ChatOpenAI +from langgraph.prebuilt import ToolNode +from langchain_core.messages import SystemMessage +from langgraph.graph.message import add_messages + +from project.tools import TOOLS + +# Name this graph is registered under in the LangGraphPlugin (acp.py / run_worker.py). +GRAPH_NAME = "at130-langgraph" +MODEL_NAME = "gpt-4o" +SYSTEM_PROMPT = """You are a helpful AI assistant with access to tools. + +Current date and time: {timestamp} + +Be concise and use tools when they help answer the question.""" + + +class AgentState(TypedDict): + messages: Annotated[list[Any], add_messages] + + +async def agent_node(state: AgentState) -> dict[str, Any]: + """The 'agent' node — one LLM call. Runs as a durable Temporal activity.""" + llm = ChatOpenAI(model=MODEL_NAME).bind_tools(TOOLS) + messages = state["messages"] + if not messages or not isinstance(messages[0], SystemMessage): + system = SystemMessage( + content=SYSTEM_PROMPT.format(timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S")) + ) + messages = [system, *messages] + return {"messages": [await llm.ainvoke(messages)]} + + +async def route_after_agent(state: AgentState) -> str: + """Go to the tools node if the model requested tools, else finish (async router).""" + last = state["messages"][-1] + return "tools" if getattr(last, "tool_calls", None) else END + + +def build_graph() -> StateGraph: + """Build the agent graph; the LLM node runs as an activity, tools in the workflow.""" + builder = StateGraph(AgentState) + builder.add_node( + "agent", + agent_node, + metadata={"execute_in": "activity", "start_to_close_timeout": timedelta(minutes=5)}, + ) + builder.add_node("tools", ToolNode(TOOLS), metadata={"execute_in": "workflow"}) + builder.add_edge(START, "agent") + builder.add_conditional_edges("agent", route_after_agent, {"tools": "tools", END: END}) + builder.add_edge("tools", "agent") + return builder diff --git a/examples/tutorials/10_async/10_temporal/130_langgraph/project/run_worker.py b/examples/tutorials/10_async/10_temporal/130_langgraph/project/run_worker.py new file mode 100644 index 000000000..7040f560b --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/130_langgraph/project/run_worker.py @@ -0,0 +1,50 @@ +"""Temporal worker for at130-langgraph. + +Run as a separate long-lived process alongside the ACP HTTP server. The +worker polls Temporal for workflow + activity tasks and executes them. + +The ``LangGraphPlugin`` is given the graph registry (``{ GRAPH_NAME: graph }``). +At runtime it turns the graph's ``execute_in="activity"`` nodes into Temporal +activities and registers them on the worker automatically — so we don't have +to enumerate node activities by hand. +""" + +import asyncio + +from temporalio.contrib.langgraph import LangGraphPlugin + +from project.graph import GRAPH_NAME, build_graph +from project.workflow import At130LanggraphWorkflow +from agentex.lib.utils.debug import setup_debug_if_enabled +from agentex.lib.utils.logging import make_logger +from agentex.lib.environment_variables import EnvironmentVariables +from agentex.lib.core.temporal.activities import get_all_activities +from agentex.lib.core.temporal.workers.worker import AgentexWorker + +environment_variables = EnvironmentVariables.refresh() +logger = make_logger(__name__) + + +async def main(): + setup_debug_if_enabled() + + task_queue_name = environment_variables.WORKFLOW_TASK_QUEUE + if task_queue_name is None: + raise ValueError("WORKFLOW_TASK_QUEUE is not set") + + # AgentexWorker runs workflows with an unsandboxed runner, so importing + # langchain/langgraph inside the workflow + nodes is fine. The LangGraph + # plugin registers the graph's activity-nodes for us. + worker = AgentexWorker( + task_queue=task_queue_name, + plugins=[LangGraphPlugin(graphs={GRAPH_NAME: build_graph()})], + ) + + await worker.run( + activities=get_all_activities(), + workflow=At130LanggraphWorkflow, + ) + + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/examples/tutorials/10_async/10_temporal/130_langgraph/project/tools.py b/examples/tutorials/10_async/10_temporal/130_langgraph/project/tools.py new file mode 100644 index 000000000..20b7185ee --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/130_langgraph/project/tools.py @@ -0,0 +1,20 @@ +"""Tools for the LangGraph agent. + +Tools are ``async`` so the in-workflow tool node can await them directly +(a sync tool would be offloaded via ``run_in_executor``, which Temporal's +workflow event loop does not allow). +""" + +from __future__ import annotations + +from langchain_core.tools import tool + + +@tool +async def get_weather(city: str) -> str: + """Get the current weather for a city.""" + # TODO: replace with a real weather API call. + return f"The weather in {city} is sunny and 72°F" + + +TOOLS = [get_weather] diff --git a/examples/tutorials/10_async/10_temporal/130_langgraph/project/workflow.py b/examples/tutorials/10_async/10_temporal/130_langgraph/project/workflow.py new file mode 100644 index 000000000..a50670251 --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/130_langgraph/project/workflow.py @@ -0,0 +1,83 @@ +"""Temporal workflow for at130-langgraph — Temporal as the LangGraph runtime. + +Each turn the workflow runs the LangGraph graph (``project/graph.py``) via the +``temporalio.contrib.langgraph`` plugin. The plugin runs the LLM ``agent`` node +as a durable Temporal activity and the ``tools`` node inline in the workflow. + +Multi-turn memory is kept on the workflow instance (``self._messages``) — it's +durable and replay-safe for free, so no checkpoint database is needed. +""" + +from __future__ import annotations + +import json +from typing import Any + +from temporalio import workflow +from temporalio.contrib.langgraph import graph as lg_graph + +from agentex.lib import adk +from project.graph import GRAPH_NAME +from agentex.lib.adk import emit_langgraph_messages +from agentex.protocol.acp import SendEventParams, CreateTaskParams +from agentex.lib.utils.logging import make_logger +from agentex.types.text_content import TextContent +from agentex.lib.environment_variables import EnvironmentVariables +from agentex.lib.core.temporal.types.workflow import SignalName +from agentex.lib.core.temporal.workflows.workflow import BaseWorkflow + +environment_variables = EnvironmentVariables.refresh() + +if environment_variables.WORKFLOW_NAME is None: + raise ValueError("Environment variable WORKFLOW_NAME is not set") +if environment_variables.AGENT_NAME is None: + raise ValueError("Environment variable AGENT_NAME is not set") + +logger = make_logger(__name__) + + +@workflow.defn(name=environment_variables.WORKFLOW_NAME) +class At130LanggraphWorkflow(BaseWorkflow): + """Runs the LangGraph agent each turn; its nodes run as Temporal activities.""" + + def __init__(self) -> None: + super().__init__(display_name=environment_variables.AGENT_NAME) + self._complete_task = False + self._messages: list[Any] = [] + self._emitted = 0 + + @workflow.signal(name=SignalName.RECEIVE_EVENT) + async def on_task_event_send(self, params: SendEventParams) -> None: + """Echo the user's message, run the graph, surface the new messages.""" + await adk.messages.create(task_id=params.task.id, content=params.event.content) + self._messages.append({"role": "user", "content": params.event.content.content}) + + compiled = lg_graph(GRAPH_NAME).compile() + result = await compiled.ainvoke({"messages": self._messages}) + self._messages = result["messages"] + + # Surface the messages this turn produced (tool calls, results, final + # text) to the AgentEx UI. The SDK helper does the LangGraph→AgentEx + # message conversion. + await emit_langgraph_messages(self._messages[self._emitted:], params.task.id) + self._emitted = len(self._messages) + + @workflow.signal + async def complete_task_signal(self) -> None: + self._complete_task = True + + @workflow.run + async def on_task_create(self, params: CreateTaskParams) -> str: + await adk.messages.create( + task_id=params.task.id, + content=TextContent( + author="agent", + content=( + f"Task initialized with params:\n{json.dumps(params.params, indent=2)}\n\n" + "Send me a message and I'll respond using a LangGraph agent whose nodes " + "run as durable Temporal activities." + ), + ), + ) + await workflow.wait_condition(lambda: self._complete_task, timeout=None) + return "Task completed" diff --git a/examples/tutorials/10_async/10_temporal/130_langgraph/pyproject.toml b/examples/tutorials/10_async/10_temporal/130_langgraph/pyproject.toml new file mode 100644 index 000000000..e22905de4 --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/130_langgraph/pyproject.toml @@ -0,0 +1,42 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "at130-langgraph" +version = "0.1.0" +description = "A Temporal-backed LangGraph agent whose nodes run as Temporal activities" +requires-python = ">=3.12" +dependencies = [ + "agentex-sdk", + "scale-gp", + # Temporal with the LangGraph plugin (temporalio.contrib.langgraph), + # which runs LangGraph nodes as Temporal activities. Needs >=1.27.0. + "temporalio[langgraph]>=1.27.0", + "langchain-openai", + "langchain-core", + "grandalf", + "python-dotenv", +] + +[project.optional-dependencies] +dev = [ + "pytest", + "pytest-asyncio", + "httpx", + "black", + "isort", + "flake8", + "debugpy>=1.8.15", +] + +[tool.hatch.build.targets.wheel] +packages = ["project"] + +[tool.black] +line-length = 88 +target-version = ['py312'] + +[tool.isort] +profile = "black" +line_length = 88 \ No newline at end of file diff --git a/examples/tutorials/10_async/10_temporal/130_langgraph/tests/test_agent.py b/examples/tutorials/10_async/10_temporal/130_langgraph/tests/test_agent.py new file mode 100644 index 000000000..b798f568f --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/130_langgraph/tests/test_agent.py @@ -0,0 +1,127 @@ +"""Integration tests for the Temporal + LangGraph agent (live agent required). + +These drive a *running* agent over the AgentEx API and verify that: +- the agent sends a welcome message on task creation, +- a weather question triggers a tool_request / tool_response round-trip + (proving the LLM node ran as a Temporal activity and the tool node ran), +- the final answer reflects the tool output. + +For fast, network-free coverage of the graph + human-in-the-loop logic, see +``test_graph_temporal.py``. + +To run: +1. Start the agent (worker + ACP server): ``agentex agents run --manifest manifest.yaml`` +2. Set AGENTEX_API_BASE_URL if not using the default +3. ``pytest tests/test_agent.py -v`` +""" + +import os +import uuid + +import pytest +import pytest_asyncio +from test_utils.async_utils import ( + poll_messages, + send_event_and_poll_yielding, +) + +from agentex import AsyncAgentex +from agentex.types.task_message import TaskMessage +from agentex.types.agent_rpc_params import ParamsCreateTaskRequest + +AGENTEX_API_BASE_URL = os.environ.get("AGENTEX_API_BASE_URL", "http://localhost:5003") +AGENT_NAME = os.environ.get("AGENT_NAME", "at130-langgraph") + + +@pytest_asyncio.fixture +async def client(): + client = AsyncAgentex(base_url=AGENTEX_API_BASE_URL) + yield client + await client.close() + + +@pytest.fixture +def agent_name(): + return AGENT_NAME + + +@pytest_asyncio.fixture +async def agent_id(client, agent_name): + agents = await client.agents.list() + for agent in agents: + if agent.name == agent_name: + return agent.id + raise ValueError(f"Agent with name {agent_name} not found.") + + +class TestNonStreamingEvents: + """The Temporal-backed LangGraph agent responds and uses tools.""" + + @pytest.mark.asyncio + async def test_send_event_and_poll(self, client: AsyncAgentex, agent_id: str): + """Create a task, ask about weather, verify the tool round-trip.""" + task_response = await client.agents.create_task( + agent_id, params=ParamsCreateTaskRequest(name=uuid.uuid1().hex) + ) + task = task_response.result + assert task is not None + + # Wait for the welcome message from on_task_create + task_creation_found = False + async for message in poll_messages( + client=client, task_id=task.id, timeout=30, sleep_interval=1.0 + ): + assert isinstance(message, TaskMessage) + if ( + message.content + and message.content.type == "text" + and message.content.author == "agent" + ): + task_creation_found = True + break + assert task_creation_found, "Task creation welcome message not found" + + # Ask about weather — the agent (LangGraph node, as a Temporal activity) + # should call get_weather. + seen_tool_request = False + seen_tool_response = False + final_message = None + async for message in send_event_and_poll_yielding( + client=client, + agent_id=agent_id, + task_id=task.id, + user_message="What is the weather in San Francisco? Use your tool.", + timeout=60, + sleep_interval=1.0, + ): + assert isinstance(message, TaskMessage) + + if message.content and message.content.type == "tool_request": + seen_tool_request = True + if message.content and message.content.type == "tool_response": + seen_tool_response = True + + if ( + message.content + and message.content.type == "text" + and message.content.author == "agent" + ): + final_message = message + content_length = len(getattr(message.content, "content", "") or "") + if getattr(message, "streaming_status", None) in (None, "DONE") and content_length > 0: + if seen_tool_response: + break + + assert seen_tool_request, "Expected a tool_request (agent calling get_weather)" + assert seen_tool_response, "Expected a tool_response (get_weather result)" + assert final_message is not None, "Expected a final agent text message" + final_text = ( + getattr(final_message.content, "content", None) if final_message.content else None + ) + assert isinstance(final_text, str) and len(final_text) > 0 + # get_weather always returns "72°F" — the response should mention it. + assert "72" in final_text, "Expected weather response to mention 72°F" + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) diff --git a/examples/tutorials/10_async/10_temporal/130_langgraph/tests/test_graph_temporal.py b/examples/tutorials/10_async/10_temporal/130_langgraph/tests/test_graph_temporal.py new file mode 100644 index 000000000..485b896f6 --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/130_langgraph/tests/test_graph_temporal.py @@ -0,0 +1,105 @@ +"""Tests for the Temporal + LangGraph agent's graph. + +Two layers: + +1. ``TestGraphLogic`` — hermetic, no network. Compiles the actual shipped + graph (``project/graph.py``) with a deterministic stub model and runs the + ReAct loop (agent → tools → agent) to completion. + +2. ``TestTemporalPlugin`` — end-to-end through the real Temporal LangGraph + plugin on a local Temporal server, proving the LLM node runs as an activity + and the tool node in the workflow. Needs a real model, so it is skipped + unless ``LITELLM_API_KEY`` (or ``OPENAI_API_KEY``) is set. + +Run from the agent's own (uv) environment: pytest tests/test_graph_temporal.py -v +""" + +from __future__ import annotations + +import os +import uuid + +import pytest + +pytest.importorskip("langgraph") +pytest.importorskip("temporalio.contrib.langgraph") + +import project.graph as graph_module +from temporalio import workflow +from project.graph import GRAPH_NAME, build_graph +from langchain_core.messages import AIMessage, ToolMessage +from temporalio.contrib.langgraph import graph as lg_graph + + +@workflow.defn +class _DriverWorkflow: + """Module-level driver workflow (Temporal forbids local workflow classes).""" + + @workflow.run + async def run(self, message: str) -> str: + compiled = lg_graph(GRAPH_NAME).compile() + result = await compiled.ainvoke({"messages": [{"role": "user", "content": message}]}) + return result["messages"][-1].content + + +class _StubModel: + """Deterministic stand-in for ``ChatOpenAI(...).bind_tools(...)``. + + First call → emit a tool call for ``get_weather``; once a ToolMessage is in + the history → emit a plain text answer. Drives the full ReAct loop offline. + """ + + def bind_tools(self, _tools): + return self + + async def ainvoke(self, messages): + if any(isinstance(m, ToolMessage) for m in messages): + return AIMessage(content="All done — the tool has run.") + return AIMessage( + content="", + tool_calls=[{"id": "call_1", "name": "get_weather", "args": {"city": "Denver"}}], + ) + + +class TestGraphLogic: + """Hermetic test of the ReAct loop, no network.""" + + @pytest.mark.asyncio + async def test_react_loop_runs_tool(self, monkeypatch): + monkeypatch.setattr(graph_module, "ChatOpenAI", lambda *_a, **_k: _StubModel()) + compiled = build_graph().compile() + result = await compiled.ainvoke({"messages": [{"role": "user", "content": "go"}]}) + + tool_outputs = [m.content for m in result["messages"] if isinstance(m, ToolMessage)] + assert any("sunny" in o for o in tool_outputs) + assert "done" in result["messages"][-1].content.lower() + + +@pytest.mark.skipif( + not (os.environ.get("LITELLM_API_KEY") or os.environ.get("OPENAI_API_KEY")), + reason="needs a real model (set LITELLM_API_KEY) for the live Temporal run", +) +class TestTemporalPlugin: + """End-to-end through the real Temporal LangGraph plugin on a local server.""" + + @pytest.mark.asyncio + async def test_nodes_run_as_activities_via_plugin(self): + from temporalio.worker import Worker, UnsandboxedWorkflowRunner + from temporalio.testing import WorkflowEnvironment + from temporalio.contrib.langgraph import LangGraphPlugin + + plugin = LangGraphPlugin(graphs={GRAPH_NAME: build_graph()}) + async with await WorkflowEnvironment.start_local(plugins=[plugin]) as env: + async with Worker( + env.client, + task_queue="tq", + workflows=[_DriverWorkflow], + workflow_runner=UnsandboxedWorkflowRunner(), + ): + out = await env.client.execute_workflow( + _DriverWorkflow.run, + "What's the weather in Denver? Use the get_weather tool.", + id=f"wf-{uuid.uuid4()}", + task_queue="tq", + ) + assert "denver" in out.lower() diff --git a/pyproject.toml b/pyproject.toml index 95e53106c..f0572a941 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "agentex-sdk" -version = "0.11.7" +version = "0.11.8" description = "The official Python library for the agentex API" dynamic = ["readme"] license = "Apache-2.0" diff --git a/src/agentex/_version.py b/src/agentex/_version.py index 27c5900af..f06fd51b0 100644 --- a/src/agentex/_version.py +++ b/src/agentex/_version.py @@ -1,4 +1,4 @@ # File generated from our OpenAPI spec by Stainless. See CONTRIBUTING.md for details. __title__ = "agentex" -__version__ = "0.11.7" # x-release-please-version +__version__ = "0.11.8" # x-release-please-version diff --git a/src/agentex/lib/adk/__init__.py b/src/agentex/lib/adk/__init__.py index cbff5a3fe..a08131260 100644 --- a/src/agentex/lib/adk/__init__.py +++ b/src/agentex/lib/adk/__init__.py @@ -8,6 +8,7 @@ from agentex.lib.adk._modules.checkpointer import create_checkpointer from agentex.lib.adk._modules._langgraph_tracing import create_langgraph_tracing_handler from agentex.lib.adk._modules._langgraph_async import stream_langgraph_events +from agentex.lib.adk._modules._langgraph_messages import emit_langgraph_messages from agentex.lib.adk._modules._langgraph_sync import convert_langgraph_to_agentex_events from agentex.lib.adk._modules._pydantic_ai_async import stream_pydantic_ai_events from agentex.lib.adk._modules._pydantic_ai_sync import convert_pydantic_ai_to_agentex_events @@ -47,6 +48,7 @@ "create_checkpointer", "create_langgraph_tracing_handler", "stream_langgraph_events", + "emit_langgraph_messages", "convert_langgraph_to_agentex_events", # Pydantic AI "stream_pydantic_ai_events", diff --git a/src/agentex/lib/adk/_modules/_langgraph_messages.py b/src/agentex/lib/adk/_modules/_langgraph_messages.py new file mode 100644 index 000000000..c8856755b --- /dev/null +++ b/src/agentex/lib/adk/_modules/_langgraph_messages.py @@ -0,0 +1,85 @@ +"""Emit finished LangGraph messages as Agentex task messages. + +This is the non-streaming counterpart to ``stream_langgraph_events``. Use it +when you run a LangGraph graph with ``ainvoke`` (for example a Temporal-backed +agent using the LangGraph plugin, where streaming deltas aren't available) and +want to surface the resulting messages to the Agentex UI after the fact. + +It maps LangGraph/LangChain message objects to Agentex content types: + +- ``AIMessage`` tool calls → ``ToolRequestContent`` (one per call) +- ``AIMessage`` text content → ``TextContent`` +- ``ToolMessage`` → ``ToolResponseContent`` + +Pass only the messages produced this turn (e.g. ``messages[already_emitted:]``) +so each message is surfaced exactly once across a multi-turn conversation. +""" + +from __future__ import annotations + +from typing import Any + + +async def emit_langgraph_messages(messages: list[Any], task_id: str) -> str: + """Create Agentex messages for a list of LangGraph messages. + + Args: + messages: LangGraph/LangChain message objects to surface — typically + the new messages a turn produced. + task_id: The Agentex task to create messages on. + + Returns: + The last assistant text emitted (useful as a span/turn output), or "". + """ + # Lazy imports so langchain isn't required at module load time. + from langchain_core.messages import AIMessage, ToolMessage + + from agentex.lib import adk + from agentex.types.text_content import TextContent + from agentex.types.tool_request_content import ToolRequestContent + from agentex.types.tool_response_content import ToolResponseContent + + final_text = "" + for message in messages: + if isinstance(message, AIMessage): + for tool_call in message.tool_calls or []: + await adk.messages.create( + task_id=task_id, + content=ToolRequestContent( + author="agent", + tool_call_id=tool_call["id"], + name=tool_call["name"], + arguments=tool_call["args"], + ), + ) + # ``content`` may be a plain string (OpenAI) or a list of content + # blocks (Anthropic/Claude via LangChain, e.g. + # ``[{"type": "text", "text": "..."}]``). Extract and join the text + # so the response is visible regardless of the underlying model. + if isinstance(message.content, str): + text = message.content + else: + text = "".join( + block.get("text", "") if isinstance(block, dict) else str(block) + for block in message.content + if not isinstance(block, dict) or block.get("type") == "text" + ) + if text: + final_text = text + await adk.messages.create( + task_id=task_id, + content=TextContent(author="agent", content=text, format="markdown"), + ) + elif isinstance(message, ToolMessage): + await adk.messages.create( + task_id=task_id, + content=ToolResponseContent( + author="agent", + tool_call_id=message.tool_call_id, + name=message.name or "unknown", + content=message.content + if isinstance(message.content, str) + else str(message.content), + ), + ) + return final_text diff --git a/src/agentex/lib/cli/commands/init.py b/src/agentex/lib/cli/commands/init.py index 69b18e8e7..307a5d0e8 100644 --- a/src/agentex/lib/cli/commands/init.py +++ b/src/agentex/lib/cli/commands/init.py @@ -25,6 +25,7 @@ class TemplateType(str, Enum): TEMPORAL = "temporal" TEMPORAL_OPENAI_AGENTS = "temporal-openai-agents" TEMPORAL_PYDANTIC_AI = "temporal-pydantic-ai" + TEMPORAL_LANGGRAPH = "temporal-langgraph" DEFAULT = "default" DEFAULT_LANGGRAPH = "default-langgraph" DEFAULT_PYDANTIC_AI = "default-pydantic-ai" @@ -64,6 +65,7 @@ def create_project_structure( TemplateType.TEMPORAL: ["acp.py", "workflow.py", "run_worker.py"], TemplateType.TEMPORAL_OPENAI_AGENTS: ["acp.py", "workflow.py", "run_worker.py", "activities.py"], TemplateType.TEMPORAL_PYDANTIC_AI: ["acp.py", "workflow.py", "run_worker.py", "agent.py", "tools.py"], + TemplateType.TEMPORAL_LANGGRAPH: ["acp.py", "workflow.py", "run_worker.py", "graph.py", "tools.py"], TemplateType.DEFAULT: ["acp.py"], TemplateType.DEFAULT_LANGGRAPH: ["acp.py", "graph.py", "tools.py"], TemplateType.DEFAULT_PYDANTIC_AI: ["acp.py", "agent.py", "tools.py"], @@ -195,6 +197,7 @@ def validate_agent_name(text: str) -> bool | str: {"name": "Basic Temporal", "value": TemplateType.TEMPORAL}, {"name": "Temporal + OpenAI Agents SDK (Recommended)", "value": TemplateType.TEMPORAL_OPENAI_AGENTS}, {"name": "Temporal + Pydantic AI", "value": TemplateType.TEMPORAL_PYDANTIC_AI}, + {"name": "Temporal + LangGraph", "value": TemplateType.TEMPORAL_LANGGRAPH}, ], ).ask() if not template_type: diff --git a/src/agentex/lib/cli/templates/temporal-langgraph/.dockerignore.j2 b/src/agentex/lib/cli/templates/temporal-langgraph/.dockerignore.j2 new file mode 100644 index 000000000..c2d7fca4d --- /dev/null +++ b/src/agentex/lib/cli/templates/temporal-langgraph/.dockerignore.j2 @@ -0,0 +1,43 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# Environments +.env** +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# IDE +.idea/ +.vscode/ +*.swp +*.swo + +# Git +.git +.gitignore + +# Misc +.DS_Store diff --git a/src/agentex/lib/cli/templates/temporal-langgraph/.env.example.j2 b/src/agentex/lib/cli/templates/temporal-langgraph/.env.example.j2 new file mode 100644 index 000000000..015f49ef7 --- /dev/null +++ b/src/agentex/lib/cli/templates/temporal-langgraph/.env.example.j2 @@ -0,0 +1,13 @@ +# {{ agent_name }} - Environment Variables +# Copy this file to .env and fill in the values + +# API key for your LLM provider +LITELLM_API_KEY= + +# LLM base URL (optional - override to use a different provider) +# OPENAI_BASE_URL= + +# SGP Configuration (optional - for tracing) +# SGP_API_KEY= +# SGP_ACCOUNT_ID= +# SGP_CLIENT_BASE_URL= diff --git a/src/agentex/lib/cli/templates/temporal-langgraph/Dockerfile-uv.j2 b/src/agentex/lib/cli/templates/temporal-langgraph/Dockerfile-uv.j2 new file mode 100644 index 000000000..2a3f1108b --- /dev/null +++ b/src/agentex/lib/cli/templates/temporal-langgraph/Dockerfile-uv.j2 @@ -0,0 +1,55 @@ +# syntax=docker/dockerfile:1.3 +FROM python:3.12-slim +COPY --from=ghcr.io/astral-sh/uv:0.6.4 /uv /uvx /bin/ + +# Install system dependencies +RUN apt-get update && apt-get install -y \ + htop \ + vim \ + curl \ + tar \ + python3-dev \ + postgresql-client \ + build-essential \ + libpq-dev \ + gcc \ + cmake \ + netcat-openbsd \ + nodejs \ + npm \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/** + +# Install tctl (Temporal CLI) +RUN curl -L https://github.com/temporalio/tctl/releases/download/v1.18.1/tctl_1.18.1_linux_arm64.tar.gz -o /tmp/tctl.tar.gz && \ + tar -xzf /tmp/tctl.tar.gz -C /usr/local/bin && \ + chmod +x /usr/local/bin/tctl && \ + rm /tmp/tctl.tar.gz + +ENV UV_COMPILE_BYTECODE=1 +ENV UV_LINK_MODE=copy +ENV UV_HTTP_TIMEOUT=1000 + +WORKDIR /app/{{ project_path_from_build_root }} + +# Copy dependency files for layer caching +COPY {{ project_path_from_build_root }}/pyproject.toml {{ project_path_from_build_root }}/uv.lock ./ + +# Install dependencies (without project itself, for layer caching) +RUN --mount=type=cache,target=/root/.cache/uv \ + uv sync --locked --no-install-project --no-dev + +# Copy the project code +COPY {{ project_path_from_build_root }}/project ./project + +# Install the project +RUN --mount=type=cache,target=/root/.cache/uv \ + uv sync --locked --no-dev + +ENV PATH="/app/{{ project_path_from_build_root }}/.venv/bin:$PATH" + +# Run the ACP server using uvicorn +CMD ["uvicorn", "project.acp:acp", "--host", "0.0.0.0", "--port", "8000"] + +# When we deploy the worker, we will replace the CMD with the following +# CMD ["python", "-m", "run_worker"] \ No newline at end of file diff --git a/src/agentex/lib/cli/templates/temporal-langgraph/Dockerfile.j2 b/src/agentex/lib/cli/templates/temporal-langgraph/Dockerfile.j2 new file mode 100644 index 000000000..ba47485a9 --- /dev/null +++ b/src/agentex/lib/cli/templates/temporal-langgraph/Dockerfile.j2 @@ -0,0 +1,48 @@ +# syntax=docker/dockerfile:1.3 +FROM python:3.12-slim +COPY --from=ghcr.io/astral-sh/uv:0.6.4 /uv /uvx /bin/ + +# Install system dependencies +RUN apt-get update && apt-get install -y \ + htop \ + vim \ + curl \ + tar \ + python3-dev \ + postgresql-client \ + build-essential \ + libpq-dev \ + gcc \ + cmake \ + netcat-openbsd \ + nodejs \ + npm \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/* + +# Install tctl (Temporal CLI) +RUN curl -L https://github.com/temporalio/tctl/releases/download/v1.18.1/tctl_1.18.1_linux_arm64.tar.gz -o /tmp/tctl.tar.gz && \ + tar -xzf /tmp/tctl.tar.gz -C /usr/local/bin && \ + chmod +x /usr/local/bin/tctl && \ + rm /tmp/tctl.tar.gz + +RUN uv pip install --system --upgrade pip setuptools wheel + +ENV UV_HTTP_TIMEOUT=1000 + +# Copy just the requirements file to optimize caching +COPY {{ project_path_from_build_root }}/requirements.txt /app/{{ project_path_from_build_root }}/requirements.txt + +WORKDIR /app/{{ project_path_from_build_root }} + +# Install the required Python packages +RUN uv pip install --system -r requirements.txt + +# Copy the project code +COPY {{ project_path_from_build_root }}/project /app/{{ project_path_from_build_root }}/project + +# Run the ACP server using uvicorn +CMD ["uvicorn", "project.acp:acp", "--host", "0.0.0.0", "--port", "8000"] + +# When we deploy the worker, we will replace the CMD with the following +# CMD ["python", "-m", "run_worker"] \ No newline at end of file diff --git a/src/agentex/lib/cli/templates/temporal-langgraph/README.md.j2 b/src/agentex/lib/cli/templates/temporal-langgraph/README.md.j2 new file mode 100644 index 000000000..e8af5a90b --- /dev/null +++ b/src/agentex/lib/cli/templates/temporal-langgraph/README.md.j2 @@ -0,0 +1,121 @@ +# {{ agent_name }} — AgentEx Temporal + LangGraph + +A starter template for building AI agents with AgentEx, [LangGraph](https://langchain-ai.github.io/langgraph/), +and Temporal — where **Temporal is the runtime and LangGraph is the agent framework**. + +It uses the official [`temporalio.contrib.langgraph`](https://docs.temporal.io/develop/python/integrations/langgraph) +plugin: each LangGraph node runs either as a durable **Temporal activity** or +inline in the **workflow**, configured per node with `execute_in`. You get +per-node durability, automatic retries, and full visibility in the Temporal UI +— without LangGraph's own runtime or an external checkpoint database. + +> The Temporal LangGraph plugin is currently **experimental**; its API may change. + +## What's in the box + +- **Nodes as activities** — the LLM (`agent`) node runs as a retried, observable + Temporal activity; the `tools` node runs in the workflow (see below). +- **Human-in-the-loop** — approval-gated tools raise a LangGraph `interrupt`; + the workflow pauses on a Temporal signal (`provide_approval`) until a human + approves or rejects, then resumes. +- **Live introspection via Temporal queries** — `get_status`, + `get_pending_approval`, `get_graph_state`, and `get_graph_mermaid` / + `get_graph_ascii` to render the agent graph while it runs. +- **Multi-turn memory** — the running message list is kept on the workflow + instance, durable for free. +- **Tracing/observability** — a per-turn span shipped to SGP/AgentEx. + +## The agent graph + +``` +START --> agent --> (tool calls?) --> tools --> agent + --> (no tool calls?) --> END +``` + +`project/graph.py` defines this graph. The `agent` node is marked +`execute_in="activity"`; the `tools` node is `execute_in="workflow"`. Query +`get_graph_mermaid` at runtime to see it rendered. + +### Why the tools node runs in the workflow + +The `tools` node runs inline in the workflow (not as an activity) for two +reasons: the `AIMessage` with tool calls stays intact without crossing an +activity boundary, and LangGraph `interrupt` (used for human approval) must run +where the workflow can pause on a Temporal signal. For long-running or heavily +side-effecting tools, move that work into its own `execute_in="activity"` node. +The router and tools are `async` so LangGraph awaits them directly (sync +callables are offloaded via `run_in_executor`, which Temporal workflows forbid). + +## Project structure + +``` +{{ project_name }}/ +├── project/ +│ ├── __init__.py +│ ├── acp.py # Thin async ACP server; registers the LangGraphPlugin +│ ├── workflow.py # Temporal runtime: runs the graph, HIL, queries, memory +│ ├── graph.py # LangGraph graph; nodes tagged execute_in activity/workflow +│ ├── tools.py # Async tool definitions + approval set +│ └── run_worker.py # Temporal worker; registers the LangGraphPlugin +├── Dockerfile +├── manifest.yaml +├── environments.yaml +├── dev.ipynb +{% if use_uv %}└── pyproject.toml{% else %}└── requirements.txt{% endif %} +``` + +## Running the agent + +```bash +{% if use_uv %}agentex uv sync +source .venv/bin/activate{% else %}pip install -r requirements.txt{% endif %} + +# Start the agent (ACP server + Temporal worker) +agentex agents run --manifest manifest.yaml +``` + +The agent starts on port 8000. Open the Temporal UI at http://localhost:8080 to +watch workflows and activities execute. Use `dev.ipynb` to create a task and +send messages. + +## Human-in-the-loop + +Tools listed in `TOOLS_REQUIRING_APPROVAL` (in `project/tools.py`) raise a +LangGraph `interrupt` before they run. The workflow surfaces the pending call +(queryable via `get_pending_approval`) and waits — durably, for as long as it +takes — for a `provide_approval` signal carrying the decision: + +```python +# decision: {"approved": true, "approver": "daniel", "reason": "looks good"} +``` + +If rejected, the rejection is fed back to the model so it can adjust. + +## Adding tools + +1. Define an **async** `@tool` function in `project/tools.py` and add it to `TOOLS`. +2. (Optional) add its name to `TOOLS_REQUIRING_APPROVAL` to gate it behind + human approval. + +The model is bound with `TOOLS` and the tool node looks them up by name, so no +other wiring is needed. + +## Configuration + +Tune the model in `project/graph.py` (`MODEL_NAME`) and the system prompt +(`SYSTEM_PROMPT`). Per-node activity timeouts and retry policies live in the +node `metadata` in `build_graph()`. + +## Environment variables + +Create a `.env` file (see `.env.example`): + +```bash +LITELLM_API_KEY=your-litellm-key # copied to OPENAI_API_KEY automatically +# OPENAI_BASE_URL= # optional: point at a different provider +# SGP_API_KEY= # optional: tracing +# SGP_ACCOUNT_ID= # optional: tracing +# SGP_CLIENT_BASE_URL= # optional: tracing +``` + +Happy building with Temporal + LangGraph! diff --git a/src/agentex/lib/cli/templates/temporal-langgraph/dev.ipynb.j2 b/src/agentex/lib/cli/templates/temporal-langgraph/dev.ipynb.j2 new file mode 100644 index 000000000..d3a68303f --- /dev/null +++ b/src/agentex/lib/cli/templates/temporal-langgraph/dev.ipynb.j2 @@ -0,0 +1,126 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "36834357", + "metadata": {}, + "outputs": [], + "source": [ + "from agentex import Agentex\n", + "\n", + "client = Agentex(base_url=\"http://localhost:5003\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d1c309d6", + "metadata": {}, + "outputs": [], + "source": [ + "AGENT_NAME = \"{{ agent_name }}\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9f6e6ef0", + "metadata": {}, + "outputs": [], + "source": [ + "# (REQUIRED) Create a new task. For Async agents, you must create a task for messages to be associated with.\n", + "import uuid\n", + "\n", + "rpc_response = client.agents.create_task(\n", + " agent_name=AGENT_NAME,\n", + " params={\n", + " \"name\": f\"{str(uuid.uuid4())[:8]}-task\",\n", + " \"params\": {}\n", + " }\n", + ")\n", + "\n", + "task = rpc_response.result\n", + "print(task)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b03b0d37", + "metadata": {}, + "outputs": [], + "source": [ + "# Send an event to the agent\n", + "\n", + "# The response is expected to be a list of TaskMessage objects, which is a union of the following types:\n", + "# - TextContent: A message with just text content \n", + "# - DataContent: A message with JSON-serializable data content\n", + "# - ToolRequestContent: A message with a tool request, which contains a JSON-serializable request to call a tool\n", + "# - ToolResponseContent: A message with a tool response, which contains response object from a tool call in its content\n", + "\n", + "# When processing the message/send response, if you are expecting more than TextContent, such as DataContent, ToolRequestContent, or ToolResponseContent, you can process them as well\n", + "\n", + "rpc_response = client.agents.send_event(\n", + " agent_name=AGENT_NAME,\n", + " params={\n", + " \"content\": {\"type\": \"text\", \"author\": \"user\", \"content\": \"Hello what can you do?\"},\n", + " \"task_id\": task.id,\n", + " }\n", + ")\n", + "\n", + "event = rpc_response.result\n", + "print(event)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a6927cc0", + "metadata": {}, + "outputs": [], + "source": [ + "# Subscribe to the async task messages produced by the agent\n", + "from agentex.lib.utils.dev_tools import subscribe_to_async_task_messages\n", + "\n", + "task_messages = subscribe_to_async_task_messages(\n", + " client=client,\n", + " task=task, \n", + " only_after_timestamp=event.created_at, \n", + " print_messages=True,\n", + " rich_print=True,\n", + " timeout=5,\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4864e354", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.9" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/src/agentex/lib/cli/templates/temporal-langgraph/environments.yaml.j2 b/src/agentex/lib/cli/templates/temporal-langgraph/environments.yaml.j2 new file mode 100644 index 000000000..a3df5e228 --- /dev/null +++ b/src/agentex/lib/cli/templates/temporal-langgraph/environments.yaml.j2 @@ -0,0 +1,64 @@ +# Agent Environment Configuration +# ------------------------------ +# This file defines environment-specific settings for your agent. +# This DIFFERS from the manifest.yaml file in that it is used to program things that are ONLY per environment. + +# ********** EXAMPLE ********** +# schema_version: "v1" # This is used to validate the file structure and is not used by the agentex CLI +# environments: +# dev: +# auth: +# principal: +# user_id: "1234567890" +# user_name: "John Doe" +# user_email: "john.doe@example.com" +# user_role: "admin" +# user_permissions: "read, write, delete" +# helm_overrides: # This is used to override the global helm values.yaml file in the agentex-agent helm charts +# replicas: 3 +# resources: +# requests: +# cpu: "1000m" +# memory: "2Gi" +# limits: +# cpu: "2000m" +# memory: "4Gi" +# env: +# - name: LOG_LEVEL +# value: "DEBUG" +# - name: ENVIRONMENT +# value: "staging" +# +# kubernetes: +# # OPTIONAL - Otherwise it will be derived from separately. However, this can be used to override the derived +# # namespace and deploy it with in the same namespace that already exists for a separate agent. +# namespace: "team-{{agent_name}}" +# ********** END EXAMPLE ********** + +schema_version: "v1" # This is used to validate the file structure and is not used by the agentex CLI +environments: + dev: + auth: + principal: + user_id: # TODO: Fill in + account_id: # TODO: Fill in + helm_overrides: + # This is used to override the global helm values.yaml file in the agentex-agent helm charts + replicaCount: 2 + resources: + requests: + cpu: "500m" + memory: "1Gi" + limits: + cpu: "1000m" + memory: "2Gi" + temporal-worker: + enabled: true + replicaCount: 2 + resources: + requests: + cpu: "500m" + memory: "1Gi" + limits: + cpu: "1000m" + memory: "2Gi" \ No newline at end of file diff --git a/src/agentex/lib/cli/templates/temporal-langgraph/manifest.yaml.j2 b/src/agentex/lib/cli/templates/temporal-langgraph/manifest.yaml.j2 new file mode 100644 index 000000000..18cffd54a --- /dev/null +++ b/src/agentex/lib/cli/templates/temporal-langgraph/manifest.yaml.j2 @@ -0,0 +1,140 @@ +# Agent Manifest Configuration +# --------------------------- +# This file defines how your agent should be built and deployed. + +# Build Configuration +# ------------------ +# The build config defines what gets packaged into your agent's Docker image. +# This same configuration is used whether building locally or remotely. +# +# When building: +# 1. All files from include_paths are collected into a build context +# 2. The context is filtered by dockerignore rules +# 3. The Dockerfile uses this context to build your agent's image +# 4. The image is pushed to a registry and used to run your agent +build: + context: + # Root directory for the build context + root: ../ # Keep this as the default root + + # Paths to include in the Docker build context + # Must include: + # - Your agent's directory (your custom agent code) + # These paths are collected and sent to the Docker daemon for building + include_paths: + - {{ project_path_from_build_root }} + + # Path to your agent's Dockerfile + # This defines how your agent's image is built from the context + # Relative to the root directory + dockerfile: {{ project_path_from_build_root }}/Dockerfile + + # Path to your agent's .dockerignore + # Filters unnecessary files from the build context + # Helps keep build context small and builds fast + dockerignore: {{ project_path_from_build_root }}/.dockerignore + + +# Local Development Configuration +# ----------------------------- +# Only used when running the agent locally +local_development: + agent: + port: 8000 # Port where your local ACP server is running + host_address: host.docker.internal # Host address for Docker networking (host.docker.internal for Docker, localhost for direct) + + # File paths for local development (relative to this manifest.yaml) + paths: + # Path to ACP server file + # Examples: + # project/acp.py (standard) + # src/server.py (custom structure) + # ../shared/acp.py (shared across projects) + # /absolute/path/acp.py (absolute path) + acp: project/acp.py + + # Path to temporal worker file + # Examples: + # project/run_worker.py (standard) + # workers/temporal.py (custom structure) + # ../shared/worker.py (shared across projects) + worker: project/run_worker.py + + +# Agent Configuration +# ----------------- +agent: + # Type of agent - either sync or async + acp_type: async + + # Unique name for your agent + # Used for task routing and monitoring + name: {{ agent_name }} + + # Description of what your agent does + # Helps with documentation and discovery + description: "{{ description }}" + + # Temporal workflow configuration + # This enables your agent to run as a Temporal workflow for long-running tasks + temporal: + enabled: true + workflows: + # Name of the workflow class + # Must match the @workflow.defn name in your workflow.py + - name: {{ workflow_name }} + + # Queue name for task distribution + # Used by Temporal to route tasks to your agent + # Convention: _task_queue + queue_name: {{ queue_name }} + + # Optional: Health check port for temporal worker + # Defaults to 80 if not specified + # health_check_port: 80 + + # Optional: Credentials mapping + # Maps Kubernetes secrets to environment variables + # Common credentials include: + credentials: + - env_var_name: REDIS_URL + secret_name: redis-url-secret + secret_key: url + # - env_var_name: LITELLM_API_KEY + # secret_name: litellm-api-key + # secret_key: api-key + + # Optional: Set Environment variables for running your agent locally as well + # as for deployment later on + env: {} + # LITELLM_API_KEY: "" + # OPENAI_BASE_URL: "" + # OPENAI_ORG_ID: "" + + +# Deployment Configuration +# ----------------------- +# Configuration for deploying your agent to Kubernetes clusters +deployment: + # Container image configuration + image: + repository: "" # Update with your container registry + tag: "latest" # Default tag, should be versioned in production + + imagePullSecrets: [] # Update with your image pull secret name + # - name: my-registry-secret + + # Global deployment settings that apply to all clusters + # These can be overridden in cluster-specific environments (environments.yaml) + global: + # Default replica count + replicaCount: 1 + + # Default resource requirements + resources: + requests: + cpu: "500m" + memory: "1Gi" + limits: + cpu: "1000m" + memory: "2Gi" \ No newline at end of file diff --git a/src/agentex/lib/cli/templates/temporal-langgraph/project/acp.py.j2 b/src/agentex/lib/cli/templates/temporal-langgraph/project/acp.py.j2 new file mode 100644 index 000000000..c01f8831c --- /dev/null +++ b/src/agentex/lib/cli/templates/temporal-langgraph/project/acp.py.j2 @@ -0,0 +1,42 @@ +"""ACP server for the Temporal LangGraph agent. + +This file is intentionally thin. When ``acp_type="async"`` is combined with +``TemporalACPConfig(type="temporal", ...)``, FastACP auto-wires: + + HTTP task/create → @workflow.run on the workflow class + HTTP task/event/send → @workflow.signal(SignalName.RECEIVE_EVENT) + HTTP task/cancel → workflow cancellation via the Temporal client + +so we don't define any handlers here. The agent logic lives in +``project/workflow.py`` (the runtime) and ``project/graph.py`` (the LangGraph +graph whose nodes run as Temporal activities), executed by the Temporal worker +(``project/run_worker.py``), not by this HTTP process. + +The ``LangGraphPlugin`` is registered here too so the Temporal client started +by FastACP shares the same graph registry as the worker. +""" + +from __future__ import annotations + +import os + +from dotenv import load_dotenv + +load_dotenv() + +from temporalio.contrib.langgraph import LangGraphPlugin + +from project.graph import GRAPH_NAME, build_graph +from agentex.lib.types.fastacp import TemporalACPConfig +from agentex.lib.sdk.fastacp.fastacp import FastACP + +acp = FastACP.create( + acp_type="async", + config=TemporalACPConfig( + # When deployed to the cluster, the Temporal address is set automatically. + # Locally we point at the Temporal service from docker compose. + type="temporal", + temporal_address=os.getenv("TEMPORAL_ADDRESS", "localhost:7233"), + plugins=[LangGraphPlugin(graphs={GRAPH_NAME: build_graph()})], + ), +) \ No newline at end of file diff --git a/src/agentex/lib/cli/templates/temporal-langgraph/project/graph.py.j2 b/src/agentex/lib/cli/templates/temporal-langgraph/project/graph.py.j2 new file mode 100644 index 000000000..feb8051bb --- /dev/null +++ b/src/agentex/lib/cli/templates/temporal-langgraph/project/graph.py.j2 @@ -0,0 +1,165 @@ +"""LangGraph graph for {{ agent_name }} — nodes run as Temporal activities. + +This is the LangGraph half of the integration. The ``temporalio.contrib.langgraph`` +plugin executes this graph's nodes durably: each node's ``execute_in`` metadata +says whether it runs as a Temporal **activity** or inline in the **workflow**. + + START → agent → (tool calls?) → tools → agent + → (no tool calls?) → END + +- ``agent`` (``execute_in="activity"``): the LLM call. Runs as its own durable, + retried Temporal activity — visible in the Temporal UI. +- ``tools`` (``execute_in="workflow"``): executes tool calls and hosts the + human-in-the-loop gate. It runs inline in the workflow because (a) the + ``AIMessage`` with tool calls stays intact without crossing an activity + boundary, and (b) LangGraph ``interrupt`` (used for approvals) needs to run + where the workflow can pause on a Temporal signal. + +Why these shapes: +- The router (``route_after_agent``) and tools are **async** so LangGraph + awaits them directly; sync callables would be offloaded via + ``run_in_executor``, which Temporal's workflow event loop does not support. +- Tool execution as a workflow node keeps things simple for this template. + For long-running or heavily side-effecting tools, move that work into its + own activity (e.g. mark a dedicated tool node ``execute_in="activity"``). +""" + +from __future__ import annotations + +import os +from typing import Any, Annotated +from datetime import datetime, timedelta + +# Copy the LiteLLM proxy key to OPENAI_API_KEY so langchain-openai authenticates +# against the Scale LiteLLM proxy when one is configured. This runs in the +# worker process (where the agent activity executes). +_litellm_key = os.environ.get("LITELLM_API_KEY") +if _litellm_key: + os.environ.setdefault("OPENAI_API_KEY", _litellm_key) + +from typing_extensions import TypedDict + +from langgraph.graph import END, START, StateGraph +from langgraph.types import interrupt +from langchain_openai import ChatOpenAI +from temporalio.common import RetryPolicy +from langchain_core.messages import ToolMessage, SystemMessage +from langgraph.graph.message import add_messages + +from project.tools import TOOLS, TOOLS_BY_NAME, TOOLS_REQUIRING_APPROVAL + +# The name this graph is registered under in the LangGraphPlugin. The workflow +# retrieves it with ``graph(GRAPH_NAME)``; acp.py and run_worker.py register it. +GRAPH_NAME = "{{ agent_name }}" + +# Swap for any LangChain-supported chat model id, e.g. "gpt-4o", "o3-mini". +MODEL_NAME = "gpt-4o" + +SYSTEM_PROMPT = """You are a helpful AI assistant with access to tools. + +Current date and time: {timestamp} + +Guidelines: +- Be concise and helpful +- Use tools when they would help answer the user's question +- If you're unsure, ask clarifying questions +- Always provide accurate information +""" + + +class AgentState(TypedDict): + """State schema for the agent graph.""" + + messages: Annotated[list[Any], add_messages] + + +async def agent_node(state: AgentState) -> dict[str, Any]: + """The 'agent' node — one LLM call. Runs as a durable Temporal activity.""" + llm = ChatOpenAI(model=MODEL_NAME).bind_tools(TOOLS) + messages = state["messages"] + if not messages or not isinstance(messages[0], SystemMessage): + system = SystemMessage( + content=SYSTEM_PROMPT.format(timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S")) + ) + messages = [system, *messages] + return {"messages": [await llm.ainvoke(messages)]} + + +async def tools_node(state: AgentState) -> dict[str, Any]: + """The 'tools' node — executes tool calls, with a human-approval gate. + + Runs inline in the workflow. For tools in ``TOOLS_REQUIRING_APPROVAL`` it + raises a LangGraph ``interrupt`` carrying the pending call; the workflow + pauses on a Temporal signal until a human approves or rejects, then resumes. + """ + last_message = state["messages"][-1] + tool_messages: list[ToolMessage] = [] + + for tool_call in last_message.tool_calls: + name = tool_call["name"] + + tool = TOOLS_BY_NAME.get(name) + if tool is None: + # The model hallucinated a tool that isn't registered — tell it so + # it can recover, rather than crashing the workflow. + tool_messages.append( + ToolMessage(content=f"Error: unknown tool {name!r}", tool_call_id=tool_call["id"], name=name) + ) + continue + + if name in TOOLS_REQUIRING_APPROVAL: + # interrupt() pauses the graph; the workflow resumes it with the + # human's decision via Command(resume=...). Durable: it can wait + # minutes, hours, or days and survive worker restarts. + decision = interrupt( + {"tool_call_id": tool_call["id"], "name": name, "args": tool_call["args"]} + ) + if not decision.get("approved"): + rejection = ( + f"Tool call rejected by {decision.get('approver', 'human')}: " + f"{decision.get('reason', 'no reason given')}" + ) + tool_messages.append( + ToolMessage(content=rejection, tool_call_id=tool_call["id"], name=name) + ) + continue + + result = await tool.ainvoke(tool_call["args"]) + tool_messages.append( + ToolMessage(content=str(result), tool_call_id=tool_call["id"], name=name) + ) + + return {"messages": tool_messages} + + +async def route_after_agent(state: AgentState) -> str: + """Route to the tools node when the model requested tools, else finish. + + Async so LangGraph awaits it directly in the workflow (a sync router would + be offloaded via run_in_executor, unsupported in Temporal workflows). + """ + last_message = state["messages"][-1] + return "tools" if getattr(last_message, "tool_calls", None) else END + + +def build_graph() -> StateGraph: + """Build the agent graph with per-node Temporal execution metadata. + + Registered with the ``LangGraphPlugin`` in acp.py / run_worker.py, and used + by the workflow's visualization queries. + """ + builder = StateGraph(AgentState) + builder.add_node( + "agent", + agent_node, + metadata={ + "execute_in": "activity", + "start_to_close_timeout": timedelta(minutes=5), + "retry_policy": RetryPolicy(maximum_attempts=3), + }, + ) + builder.add_node("tools", tools_node, metadata={"execute_in": "workflow"}) + builder.add_edge(START, "agent") + builder.add_conditional_edges("agent", route_after_agent, {"tools": "tools", END: END}) + builder.add_edge("tools", "agent") + return builder \ No newline at end of file diff --git a/src/agentex/lib/cli/templates/temporal-langgraph/project/run_worker.py.j2 b/src/agentex/lib/cli/templates/temporal-langgraph/project/run_worker.py.j2 new file mode 100644 index 000000000..9dc45a4a0 --- /dev/null +++ b/src/agentex/lib/cli/templates/temporal-langgraph/project/run_worker.py.j2 @@ -0,0 +1,50 @@ +"""Temporal worker for {{ agent_name }}. + +Run as a separate long-lived process alongside the ACP HTTP server. The +worker polls Temporal for workflow + activity tasks and executes them. + +The ``LangGraphPlugin`` is given the graph registry (``{ GRAPH_NAME: graph }``). +At runtime it turns the graph's ``execute_in="activity"`` nodes into Temporal +activities and registers them on the worker automatically — so we don't have +to enumerate node activities by hand. +""" + +import asyncio + +from temporalio.contrib.langgraph import LangGraphPlugin + +from project.graph import GRAPH_NAME, build_graph +from project.workflow import {{ workflow_class }} +from agentex.lib.utils.debug import setup_debug_if_enabled +from agentex.lib.utils.logging import make_logger +from agentex.lib.environment_variables import EnvironmentVariables +from agentex.lib.core.temporal.activities import get_all_activities +from agentex.lib.core.temporal.workers.worker import AgentexWorker + +environment_variables = EnvironmentVariables.refresh() +logger = make_logger(__name__) + + +async def main(): + setup_debug_if_enabled() + + task_queue_name = environment_variables.WORKFLOW_TASK_QUEUE + if task_queue_name is None: + raise ValueError("WORKFLOW_TASK_QUEUE is not set") + + # AgentexWorker runs workflows with an unsandboxed runner, so importing + # langchain/langgraph inside the workflow + nodes is fine. The LangGraph + # plugin registers the graph's activity-nodes for us. + worker = AgentexWorker( + task_queue=task_queue_name, + plugins=[LangGraphPlugin(graphs={GRAPH_NAME: build_graph()})], + ) + + await worker.run( + activities=get_all_activities(), + workflow={{ workflow_class }}, + ) + + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/src/agentex/lib/cli/templates/temporal-langgraph/project/tools.py.j2 b/src/agentex/lib/cli/templates/temporal-langgraph/project/tools.py.j2 new file mode 100644 index 000000000..35660ad9b --- /dev/null +++ b/src/agentex/lib/cli/templates/temporal-langgraph/project/tools.py.j2 @@ -0,0 +1,57 @@ +"""Tool definitions for the LangGraph + Temporal agent. + +Each tool is an async LangChain ``@tool``. They're run by the ``tools`` node +(see ``project/graph.py``), which the Temporal LangGraph plugin executes +inside the workflow. Tools must be ``async`` so the in-workflow node awaits +them directly rather than offloading to a thread executor (which Temporal's +workflow event loop does not allow). + +``TOOLS`` is the single source of truth: it's bound to the model (so the LLM +knows the schemas) and looked up by name when the tool node runs. + +``TOOLS_REQUIRING_APPROVAL`` marks tools that pause for human approval before +they run — the tool node raises a LangGraph ``interrupt`` for those, which the +workflow surfaces and resolves via a Temporal signal (human-in-the-loop). +""" + +from __future__ import annotations + +from langchain_core.tools import tool + + +@tool +async def get_weather(city: str) -> str: + """Get the current weather for a city. + + Args: + city: The name of the city to get weather for. + + Returns: + A string describing the weather conditions. + """ + # TODO: Replace with a real weather API call. + return f"The weather in {city} is sunny and 72°F" + + +@tool +async def send_notification(recipient: str, message: str) -> str: + """Send a notification to a recipient. Requires human approval before sending. + + Args: + recipient: Who to notify. + message: The message body to send. + + Returns: + A confirmation string. + """ + # TODO: Replace with a real side-effecting integration (email, Slack, ...). + return f"Notification sent to {recipient}: {message!r}" + + +# All tools available to the agent. Bound to the model and looked up by name +# when the tool node runs. +TOOLS = [get_weather, send_notification] +TOOLS_BY_NAME = {t.name: t for t in TOOLS} + +# Tools in this set pause for a human-approval signal before they run. +TOOLS_REQUIRING_APPROVAL = {"send_notification"} diff --git a/src/agentex/lib/cli/templates/temporal-langgraph/project/workflow.py.j2 b/src/agentex/lib/cli/templates/temporal-langgraph/project/workflow.py.j2 new file mode 100644 index 000000000..d1621fb8c --- /dev/null +++ b/src/agentex/lib/cli/templates/temporal-langgraph/project/workflow.py.j2 @@ -0,0 +1,259 @@ +"""Temporal workflow for {{ agent_name }} — Temporal as the LangGraph runtime. + +*Temporal replaces the runtime; LangGraph is the agent framework.* This +workflow is that runtime. Each turn it runs the LangGraph graph defined in +``project/graph.py`` via the ``temporalio.contrib.langgraph`` plugin, which +executes the graph's nodes as durable Temporal activities (the ``agent``/LLM +node) or inline in the workflow (the ``tools`` node). + +Showcased here: + +- **Nodes as activities** — the plugin runs the LLM node as a retried, + observable Temporal activity (see ``execute_in`` metadata in graph.py). +- **Human-in-the-loop** — when the graph raises a LangGraph ``interrupt`` for + an approval-gated tool, the workflow pauses on a Temporal signal + (``provide_approval``) and resumes with the human's decision. +- **Live introspection via Temporal queries** — status, the pending approval, + and a Mermaid/ASCII rendering of the agent graph, queryable while it runs. +- **Multi-turn memory** — the running message list is kept on the workflow + instance; durable and replay-safe for free, so no checkpoint DB is needed. +- **Tracing** — a per-turn span shipped to SGP/AgentEx. +""" + +from __future__ import annotations + +import os +import json +from typing import Any + +# LangGraph plugin helper: retrieves the graph registered under GRAPH_NAME. +import langgraph.checkpoint.memory +from temporalio import workflow +from langgraph.types import Command +from temporalio.contrib.langgraph import graph as lg_graph + +from agentex.lib import adk +from project.graph import GRAPH_NAME, build_graph +from agentex.lib.adk import emit_langgraph_messages +from agentex.protocol.acp import SendEventParams, CreateTaskParams +from agentex.lib.types.tracing import SGPTracingProcessorConfig +from agentex.lib.utils.logging import make_logger +from agentex.types.text_content import TextContent +from agentex.lib.environment_variables import EnvironmentVariables +from agentex.lib.core.temporal.types.workflow import SignalName +from agentex.lib.core.temporal.workflows.workflow import BaseWorkflow +from agentex.lib.core.tracing.tracing_processor_manager import add_tracing_processor_config + +# Register the SGP tracing exporter (spans also reach the AgentEx backend via +# the default processor that is lazy-initialised on first span). +SGP_API_KEY = os.environ.get("SGP_API_KEY", "") +SGP_ACCOUNT_ID = os.environ.get("SGP_ACCOUNT_ID", "") +if SGP_API_KEY and SGP_ACCOUNT_ID: + add_tracing_processor_config( + SGPTracingProcessorConfig( + sgp_api_key=SGP_API_KEY, + sgp_account_id=SGP_ACCOUNT_ID, + sgp_base_url=os.environ.get("SGP_CLIENT_BASE_URL", ""), + ) + ) + +environment_variables = EnvironmentVariables.refresh() + +if environment_variables.WORKFLOW_NAME is None: + raise ValueError("Environment variable WORKFLOW_NAME is not set") +if environment_variables.AGENT_NAME is None: + raise ValueError("Environment variable AGENT_NAME is not set") + +logger = make_logger(__name__) + + +@workflow.defn(name=environment_variables.WORKFLOW_NAME) +class {{ workflow_class }}(BaseWorkflow): + """Durable runtime that runs the LangGraph agent via the Temporal plugin.""" + + def __init__(self) -> None: + super().__init__(display_name=environment_variables.AGENT_NAME) + self._complete_task = False + self._turn_number = 0 + # Running conversation, as LangGraph message objects. Durable: Temporal + # replays the activity results that produced it, so it survives crashes. + self._messages: list[Any] = [] + # How many messages have already been surfaced to the AgentEx UI. + self._emitted = 0 + self._status = "idle" + self._pending_approval: dict[str, Any] | None = None + self._approval_response: dict[str, Any] | None = None + self._viz_graph: Any = None + + # ------------------------------------------------------------------ # + # Signals + # ------------------------------------------------------------------ # + + @workflow.signal(name=SignalName.RECEIVE_EVENT) + async def on_task_event_send(self, params: SendEventParams) -> None: + """Handle a new user message: echo it, then run the agent graph durably.""" + logger.info(f"Received task event for task {params.task.id}") + self._turn_number += 1 + user_text = params.event.content.content + + # Echo the user's message so it shows up as a chat bubble. + await adk.messages.create(task_id=params.task.id, content=params.event.content) + self._messages.append({"role": "user", "content": user_text}) + + async with adk.tracing.span( + trace_id=params.task.id, + task_id=params.task.id, + name=f"Turn {self._turn_number}", + input={"message": user_text}, + ) as span: + final_text = await self._run_graph(params.task.id) + if span: + span.output = {"final_output": final_text} + + @workflow.signal + async def provide_approval(self, response: dict[str, Any]) -> None: + """Provide a human approval decision for a pending tool call. + + Args: + response: ``{"approved": bool, "reason": str, "approver": str}``. + """ + logger.info(f"Received approval response: {response}") + self._approval_response = response + + @workflow.signal + async def complete_task_signal(self) -> None: + """Gracefully end the task/workflow.""" + logger.info("Received complete_task signal") + self._complete_task = True + + # ------------------------------------------------------------------ # + # Agent turn — run the LangGraph graph, pausing for approvals. + # ------------------------------------------------------------------ # + + async def _run_graph(self, task_id: str) -> str: + """Run one turn of the graph, handling any human-approval interrupts.""" + # A fresh in-memory checkpointer per turn: it only needs to persist the + # interrupt/resume state within this turn. Temporal provides durability; + # cross-turn memory lives in self._messages. + compiled = lg_graph(GRAPH_NAME).compile( + checkpointer=langgraph.checkpoint.memory.InMemorySaver() + ) + config = {"configurable": {"thread_id": f"{task_id}-{self._turn_number}"}} + + self._status = "processing" + result = await compiled.ainvoke({"messages": self._messages}, config=config) + + # The graph pauses (interrupt) whenever an approval-gated tool is called. + while result.get("__interrupt__"): + interrupt_value = result["__interrupt__"][0].value + decision = await self._await_human_approval(task_id, interrupt_value) + result = await compiled.ainvoke(Command(resume=decision), config=config) + + self._messages = result["messages"] + # Surface the messages this turn produced (tool calls, results, final + # text) to the AgentEx UI. The SDK helper does the LangGraph→AgentEx + # message conversion and returns the final assistant text. + final_text = await emit_langgraph_messages(self._messages[self._emitted:], task_id) + self._emitted = len(self._messages) + self._status = "completed" + return final_text + + async def _await_human_approval(self, task_id: str, pending: dict[str, Any]) -> dict[str, Any]: + """Pause until a ``provide_approval`` signal arrives, then return the decision.""" + self._pending_approval = pending + self._approval_response = None + self._status = "waiting_for_approval" + + await adk.messages.create( + task_id=task_id, + content=TextContent( + author="agent", + content=( + f"⏸️ Waiting for human approval to run **{pending.get('name')}** " + f"with `{json.dumps(pending.get('args', {}))}`.\n\n" + "Send a `provide_approval` signal, e.g. " + '`{"approved": true, "approver": "you"}`.' + ), + ), + ) + + await workflow.wait_condition(lambda: self._approval_response is not None) + + decision = self._approval_response or {"approved": False} + self._pending_approval = None + self._approval_response = None + self._status = "processing" + return decision + + # ------------------------------------------------------------------ # + # Queries — inspect the running agent live from the Temporal UI/client. + # ------------------------------------------------------------------ # + + @workflow.query + def get_status(self) -> str: + """Current status: idle | processing | waiting_for_approval | completed.""" + return self._status + + @workflow.query + def get_pending_approval(self) -> dict[str, Any] | None: + """The tool call currently awaiting human approval, if any.""" + return self._pending_approval + + @workflow.query + def get_graph_state(self) -> dict[str, Any]: + """A snapshot of the agent loop's progress.""" + return { + "turn_number": self._turn_number, + "message_count": len(self._messages), + "status": self._status, + "pending_approval": self._pending_approval, + "completed": self._complete_task, + } + + @workflow.query + def get_graph_mermaid(self) -> str: + """Mermaid diagram of the agent graph (renders in GitHub/Notion).""" + try: + return self._visualization_graph().get_graph().draw_mermaid() + except Exception as exc: # pragma: no cover - visualization is best-effort + return f"Could not render graph: {exc}" + + @workflow.query + def get_graph_ascii(self) -> str: + """ASCII-art diagram of the agent graph (requires the `grandalf` package).""" + try: + return self._visualization_graph().get_graph().draw_ascii() + except ImportError: + return "ASCII rendering requires the 'grandalf' package. Try get_graph_mermaid instead." + except Exception as exc: # pragma: no cover - visualization is best-effort + return f"Could not render graph: {exc}" + + def _visualization_graph(self): + """Lazily build + cache the compiled graph used purely for rendering.""" + if self._viz_graph is None: + self._viz_graph = build_graph().compile() + return self._viz_graph + + # ------------------------------------------------------------------ # + # Entry point + # ------------------------------------------------------------------ # + + @workflow.run + async def on_task_create(self, params: CreateTaskParams) -> str: + """Keep the conversation alive, handling incoming message/approval signals.""" + logger.info(f"Task created: {params.task.id}") + + await adk.messages.create( + task_id=params.task.id, + content=TextContent( + author="agent", + content=( + f"Task initialized with params:\n{json.dumps(params.params, indent=2)}\n\n" + "Send me a message and I'll respond using a LangGraph agent whose nodes " + "run as durable Temporal activities." + ), + ), + ) + + await workflow.wait_condition(lambda: self._complete_task, timeout=None) + return "Task completed" \ No newline at end of file diff --git a/src/agentex/lib/cli/templates/temporal-langgraph/pyproject.toml.j2 b/src/agentex/lib/cli/templates/temporal-langgraph/pyproject.toml.j2 new file mode 100644 index 000000000..125ce704c --- /dev/null +++ b/src/agentex/lib/cli/templates/temporal-langgraph/pyproject.toml.j2 @@ -0,0 +1,42 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "{{ project_name }}" +version = "0.1.0" +description = "{{ description }}" +requires-python = ">=3.12" +dependencies = [ + "agentex-sdk", + "scale-gp", + # Temporal with the LangGraph plugin (temporalio.contrib.langgraph), + # which runs LangGraph nodes as Temporal activities. Needs >=1.27.0. + "temporalio[langgraph]>=1.27.0", + "langchain-openai", + "langchain-core", + "grandalf", + "python-dotenv", +] + +[project.optional-dependencies] +dev = [ + "pytest", + "pytest-asyncio", + "httpx", + "black", + "isort", + "flake8", + "debugpy>=1.8.15", +] + +[tool.hatch.build.targets.wheel] +packages = ["project"] + +[tool.black] +line-length = 88 +target-version = ['py312'] + +[tool.isort] +profile = "black" +line_length = 88 diff --git a/src/agentex/lib/cli/templates/temporal-langgraph/requirements.txt.j2 b/src/agentex/lib/cli/templates/temporal-langgraph/requirements.txt.j2 new file mode 100644 index 000000000..a499fc17c --- /dev/null +++ b/src/agentex/lib/cli/templates/temporal-langgraph/requirements.txt.j2 @@ -0,0 +1,18 @@ +# Agentex SDK +agentex-sdk + +# Scale GenAI Platform Python SDK +scale-gp + +# Temporal with the LangGraph plugin (temporalio.contrib.langgraph). +# The plugin runs LangGraph nodes as Temporal activities; needs >=1.27.0. +temporalio[langgraph]>=1.27.0 + +# LangChain model + tools +langchain-openai +langchain-core + +# Optional: enables get_graph_ascii() ASCII graph rendering +grandalf + +python-dotenv diff --git a/src/agentex/lib/cli/templates/temporal-langgraph/test_agent.py.j2 b/src/agentex/lib/cli/templates/temporal-langgraph/test_agent.py.j2 new file mode 100644 index 000000000..2d28e44d4 --- /dev/null +++ b/src/agentex/lib/cli/templates/temporal-langgraph/test_agent.py.j2 @@ -0,0 +1,147 @@ +""" +Sample tests for AgentEx ACP agent. + +This test suite demonstrates how to test the main AgentEx API functions: +- Non-streaming event sending and polling +- Streaming event sending + +To run these tests: +1. Make sure the agent is running (via docker-compose or `agentex agents run`) +2. Set the AGENTEX_API_BASE_URL environment variable if not using default +3. Run: pytest test_agent.py -v + +Configuration: +- AGENTEX_API_BASE_URL: Base URL for the AgentEx server (default: http://localhost:5003) +- AGENT_NAME: Name of the agent to test (default: {{ agent_name }}) +""" + +import os +import uuid +import asyncio +import pytest +import pytest_asyncio +from agentex import AsyncAgentex +from agentex.types import TaskMessage +from agentex.types.agent_rpc_params import ParamsCreateTaskRequest +from agentex.types.text_content_param import TextContentParam +from test_utils.async_utils import ( + poll_for_agent_response, + send_event_and_poll_yielding, + stream_agent_response, + validate_text_in_response, + poll_messages, +) + + +# Configuration from environment variables +AGENTEX_API_BASE_URL = os.environ.get("AGENTEX_API_BASE_URL", "http://localhost:5003") +AGENT_NAME = os.environ.get("AGENT_NAME", "{{ agent_name }}") + + +@pytest_asyncio.fixture +async def client(): + """Create an AsyncAgentex client instance for testing.""" + client = AsyncAgentex(base_url=AGENTEX_API_BASE_URL) + yield client + await client.close() + + +@pytest.fixture +def agent_name(): + """Return the agent name for testing.""" + return AGENT_NAME + + +@pytest_asyncio.fixture +async def agent_id(client, agent_name): + """Retrieve the agent ID based on the agent name.""" + agents = await client.agents.list() + for agent in agents: + if agent.name == agent_name: + return agent.id + raise ValueError(f"Agent with name {agent_name} not found.") + + +class TestNonStreamingEvents: + """Test non-streaming event sending and polling.""" + + @pytest.mark.asyncio + async def test_send_event_and_poll(self, client: AsyncAgentex, agent_name: str, agent_id: str): + """Test sending an event and polling for the response.""" + # TODO: Create a task for this conversation + # task_response = await client.agents.create_task(agent_id, params=ParamsCreateTaskRequest(name=uuid.uuid1().hex)) + # task = task_response.result + # assert task is not None + + # TODO: Poll for the initial task creation message (if your agent sends one) + # async for message in poll_messages( + # client=client, + # task_id=task.id, + # timeout=30, + # sleep_interval=1.0, + # ): + # assert isinstance(message, TaskMessage) + # if message.content and message.content.type == "text" and message.content.author == "agent": + # # Check for your expected initial message + # assert "expected initial text" in message.content.content + # break + + # TODO: Send an event and poll for response using the yielding helper function + # user_message = "Your test message here" + # async for message in send_event_and_poll_yielding( + # client=client, + # agent_id=agent_id, + # task_id=task.id, + # user_message=user_message, + # timeout=30, + # sleep_interval=1.0, + # ): + # assert isinstance(message, TaskMessage) + # if message.content and message.content.type == "text" and message.content.author == "agent": + # # Check for your expected response + # assert "expected response text" in message.content.content + # break + pass + + +class TestStreamingEvents: + """Test streaming event sending.""" + + @pytest.mark.asyncio + async def test_send_event_and_stream(self, client: AsyncAgentex, agent_name: str, agent_id: str): + """Test sending an event and streaming the response.""" + # TODO: Create a task for this conversation + # task_response = await client.agents.create_task(agent_id, params=ParamsCreateTaskRequest(name=uuid.uuid1().hex)) + # task = task_response.result + # assert task is not None + + # user_message = "Your test message here" + + # # Collect events from stream + # all_events = [] + + # async def collect_stream_events(): + # async for event in stream_agent_response( + # client=client, + # task_id=task.id, + # timeout=30, + # ): + # all_events.append(event) + + # # Start streaming task + # stream_task = asyncio.create_task(collect_stream_events()) + + # # Send the event + # event_content = TextContentParam(type="text", author="user", content=user_message) + # await client.agents.send_event(agent_id=agent_id, params={"task_id": task.id, "content": event_content}) + + # # Wait for streaming to complete + # await stream_task + + # # TODO: Add your validation here + # assert len(all_events) > 0, "No events received in streaming response" + pass + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) diff --git a/src/agentex/lib/core/observability/tests/test_tracing_metrics.py b/src/agentex/lib/core/observability/tests/test_tracing_metrics.py new file mode 100644 index 000000000..aab4fbfed --- /dev/null +++ b/src/agentex/lib/core/observability/tests/test_tracing_metrics.py @@ -0,0 +1,100 @@ +"""Tests for ``agentex.lib.core.observability.tracing_metrics``.""" + +from __future__ import annotations + +import agentex.lib.core.observability.tracing_metrics as tracing_metrics +from agentex.lib.core.observability.tracing_metrics import ( + TracingMetrics, + processor_label, + get_tracing_metrics, + classify_export_error, +) + + +class TestClassifyExportError: + def test_scale_gp_authentication_error(self): + class AuthenticationError(Exception): + pass + + exc = AuthenticationError("Error code: 401 - {'message': 'Not authorized to access Account'}") + assert classify_export_error(exc) == ("authentication", "401") + + def test_rate_limit_code(self): + class APIError(Exception): + pass + + exc = APIError("Error code: 429 - rate limited") + assert classify_export_error(exc) == ("rate_limit", "429") + + def test_server_error_code(self): + class APIError(Exception): + pass + + exc = APIError("Error code: 503 - unavailable") + assert classify_export_error(exc) == ("server_error", "5xx") + + def test_out_of_range_code_uses_bounded_label(self): + class APIError(Exception): + pass + + exc = APIError("Error code: 100 - continue") + assert classify_export_error(exc) == ("other_error", "other") + + def test_timeout_by_name(self): + class APITimeoutError(Exception): + pass + + assert classify_export_error(APITimeoutError("slow")) == ("timeout", "timeout") + + def test_unknown_error(self): + class WeirdError(Exception): + pass + + assert classify_export_error(WeirdError("boom")) == ("other_error", "unknown") + + +class TestProcessorLabel: + def test_sgp_async_processor(self): + class SGPAsyncTracingProcessor: + pass + + assert processor_label(SGPAsyncTracingProcessor()) == "sgp" + + def test_other_processor(self): + class AgentexAsyncTracingProcessor: + pass + + assert processor_label(AgentexAsyncTracingProcessor()) == "other" + + +class TestGetTracingMetrics: + def test_returns_tracing_metrics_instance(self, monkeypatch): + monkeypatch.setattr(tracing_metrics, "_tracing_metrics", None) + m = get_tracing_metrics() + assert isinstance(m, TracingMetrics) + + def test_singleton_returns_same_instance(self, monkeypatch): + monkeypatch.setattr(tracing_metrics, "_tracing_metrics", None) + first = get_tracing_metrics() + second = get_tracing_metrics() + assert first is second + + def test_instruments_exist(self, monkeypatch): + monkeypatch.setattr(tracing_metrics, "_tracing_metrics", None) + m = get_tracing_metrics() + for name in ( + "span_events_enqueued", + "span_events_dropped", + "queue_depth", + "queue_lag", + "batch_items", + "batch_size", + "batch_drain_duration", + "export_batches", + "export_spans", + "export_batch_failures", + "export_span_failures", + "shutdown_timeouts", + "shutdown_remaining_items", + ): + assert hasattr(m, name), f"missing instrument: {name}" diff --git a/src/agentex/lib/core/observability/tests/test_tracing_metrics_recording.py b/src/agentex/lib/core/observability/tests/test_tracing_metrics_recording.py new file mode 100644 index 000000000..6c50c599f --- /dev/null +++ b/src/agentex/lib/core/observability/tests/test_tracing_metrics_recording.py @@ -0,0 +1,143 @@ +"""Tests for ``agentex.lib.core.observability.tracing_metrics_recording``.""" + +from __future__ import annotations + +from unittest.mock import MagicMock, patch + +import agentex.lib.core.observability.tracing_metrics_recording as recording + + +class _Item: + def __init__(self, enqueued_at: float | None) -> None: + self.enqueued_at = enqueued_at + + +class TestIsMetricsEnabled: + def setup_method(self) -> None: + recording._metrics_enabled = None + recording._tracing = None + + def test_enabled_by_default(self, monkeypatch): + monkeypatch.delenv("AGENTEX_TRACING_METRICS", raising=False) + assert recording.is_metrics_enabled() is True + + def test_disabled_by_zero(self, monkeypatch): + monkeypatch.setenv("AGENTEX_TRACING_METRICS", "0") + recording._metrics_enabled = None + assert recording.is_metrics_enabled() is False + + +class TestRecordingHelpers: + def setup_method(self) -> None: + recording._metrics_enabled = None + recording._tracing = None + + def test_record_span_enqueued_when_disabled_does_not_load_metrics(self, monkeypatch): + monkeypatch.setenv("AGENTEX_TRACING_METRICS", "0") + recording._metrics_enabled = None + with patch( + "agentex.lib.core.observability.tracing_metrics.get_tracing_metrics" + ) as mock_get: + recording.record_span_enqueued("start") + mock_get.assert_not_called() + + def test_record_span_enqueued_when_enabled(self, monkeypatch): + monkeypatch.setenv("AGENTEX_TRACING_METRICS", "1") + recording._metrics_enabled = None + mock_metrics = MagicMock() + with patch( + "agentex.lib.core.observability.tracing_metrics.get_tracing_metrics", + return_value=mock_metrics, + ): + recording.record_span_enqueued("end") + mock_metrics.span_events_enqueued.add.assert_called_once_with(1, {"event_type": "end"}) + + def test_monotonic_if_enabled_respects_kill_switch(self, monkeypatch): + monkeypatch.setenv("AGENTEX_TRACING_METRICS", "0") + recording._metrics_enabled = None + assert recording.monotonic_if_enabled() is None + + def test_record_batch_coalesced_records_lag(self, monkeypatch): + monkeypatch.setenv("AGENTEX_TRACING_METRICS", "1") + recording._metrics_enabled = None + mock_metrics = MagicMock() + with patch( + "agentex.lib.core.observability.tracing_metrics.get_tracing_metrics", + return_value=mock_metrics, + ), patch("agentex.lib.core.observability.tracing_metrics_recording.time.monotonic", return_value=10.0): + recording.record_batch_coalesced( + queue_depth=3, + batch_items=[_Item(9.5), _Item(9.0)], + ) + mock_metrics.queue_depth.record.assert_called_once_with(3) + mock_metrics.batch_items.record.assert_called_once_with(2) + mock_metrics.queue_lag.record.assert_called_once_with(1000.0) + + def test_record_export_failure(self, monkeypatch): + monkeypatch.setenv("AGENTEX_TRACING_METRICS", "1") + recording._metrics_enabled = None + mock_metrics = MagicMock() + + class AuthenticationError(Exception): + pass + + exc = AuthenticationError("Error code: 401 - denied") + processor = type("SGPAsyncTracingProcessor", (), {})() + + with patch( + "agentex.lib.core.observability.tracing_metrics.get_tracing_metrics", + return_value=mock_metrics, + ): + recording.record_export_failure( + processor=processor, + event_type="start", + span_count=5, + exc=exc, + ) + + mock_metrics.export_batch_failures.add.assert_called_once() + mock_metrics.export_span_failures.add.assert_called_once_with( + 5, + { + "processor": "sgp", + "event_type": "start", + "http_code": "401", + "error_class": "authentication", + }, + ) + + def test_record_export_success(self, monkeypatch): + monkeypatch.setenv("AGENTEX_TRACING_METRICS", "1") + recording._metrics_enabled = None + mock_metrics = MagicMock() + with patch( + "agentex.lib.core.observability.tracing_metrics.get_tracing_metrics", + return_value=mock_metrics, + ): + recording.record_export_success(event_type="end", span_count=12, processor="sgp") + + mock_metrics.export_batches.add.assert_called_once_with( + 1, + {"processor": "sgp", "event_type": "end"}, + ) + mock_metrics.export_spans.add.assert_called_once_with( + 12, + {"processor": "sgp", "event_type": "end"}, + ) + + def test_record_export_success_accepts_processor_label(self, monkeypatch): + monkeypatch.setenv("AGENTEX_TRACING_METRICS", "1") + recording._metrics_enabled = None + mock_metrics = MagicMock() + with patch( + "agentex.lib.core.observability.tracing_metrics.get_tracing_metrics", + return_value=mock_metrics, + ): + recording.record_export_success( + event_type="start", span_count=3, processor="other" + ) + + mock_metrics.export_batches.add.assert_called_once_with( + 1, + {"processor": "other", "event_type": "start"}, + ) diff --git a/src/agentex/lib/core/observability/tracing_metrics.py b/src/agentex/lib/core/observability/tracing_metrics.py new file mode 100644 index 000000000..74960cc4a --- /dev/null +++ b/src/agentex/lib/core/observability/tracing_metrics.py @@ -0,0 +1,164 @@ +"""OTel metrics for async span queue and SGP export telemetry. + +Single source of truth for span-queue / export instrumentation. Import +``get_tracing_metrics()`` or the ``record_*`` helpers in +``tracing_metrics_recording`` from hot paths — never configure a +``MeterProvider`` here. + +The meter is no-op when the application has not configured a +``MeterProvider``. Set ``AGENTEX_TRACING_METRICS=0`` to skip recording +entirely (see ``tracing_metrics_recording.is_metrics_enabled``). + +Cardinality is bounded: +- ``event_type``: ``start`` | ``end`` +- ``processor``: ``sgp`` | ``other`` +- ``http_code``: small fixed set from ``classify_export_error`` (failure counters only) +- ``error_class``: small fixed set from ``classify_export_error`` (failure counters only) +- ``reason``: ``shutdown`` (drops only) +- ``phase``: ``start`` | ``end`` (batch drain histograms) + +Resource attributes (``service.name``, ``k8s.*``, etc.) come from the +host application's OTel resource configuration. +""" + +from __future__ import annotations + +import re +from typing import Optional + +from opentelemetry import metrics + +_HTTP_CODE_RE = re.compile(r"Error code:\s*(\d+)") + + +class TracingMetrics: + """Lazily-created OTel instruments for span queue + export telemetry.""" + + def __init__(self) -> None: + meter = metrics.get_meter("agentex.tracing") + self.span_events_enqueued = meter.create_counter( + name="agentex.tracing.span_events.enqueued", + unit="1", + description="Span queue START/END events accepted by enqueue()", + ) + self.span_events_dropped = meter.create_counter( + name="agentex.tracing.span_events.dropped", + unit="1", + description="Span queue events dropped (e.g. shutdown)", + ) + self.queue_depth = meter.create_histogram( + name="agentex.tracing.queue.depth", + unit="1", + description="asyncio queue depth at the start of a drain batch", + ) + self.queue_lag = meter.create_histogram( + name="agentex.tracing.queue.lag", + unit="ms", + description="Max time from enqueue to drain-batch start for items in the batch", + ) + self.batch_items = meter.create_histogram( + name="agentex.tracing.batch.items", + unit="1", + description="Total span events coalesced in one linger/drain batch", + ) + self.batch_size = meter.create_histogram( + name="agentex.tracing.batch.size", + unit="1", + description="Span events in one START or END dispatch phase", + ) + self.batch_drain_duration = meter.create_histogram( + name="agentex.tracing.batch.drain_duration", + unit="ms", + description="Wall time for one START or END _process_items dispatch", + ) + self.export_batches = meter.create_counter( + name="agentex.tracing.export.batches", + unit="1", + description="Successful HTTP export batches by processor and event type", + ) + self.export_spans = meter.create_counter( + name="agentex.tracing.export.spans", + unit="1", + description="Spans in successful HTTP export batches by processor and event type", + ) + self.export_batch_failures = meter.create_counter( + name="agentex.tracing.export.batch_failures", + unit="1", + description="Failed HTTP export batches by processor and HTTP status", + ) + self.export_span_failures = meter.create_counter( + name="agentex.tracing.export.span_failures", + unit="1", + description="Spans in failed HTTP export batches by processor and HTTP status", + ) + self.shutdown_timeouts = meter.create_counter( + name="agentex.tracing.shutdown.timeouts", + unit="1", + description="Span queue shutdown calls that hit the join timeout", + ) + self.shutdown_remaining_items = meter.create_histogram( + name="agentex.tracing.shutdown.remaining_items", + unit="1", + description="Queue depth when span queue shutdown times out", + ) + + +_tracing_metrics: Optional[TracingMetrics] = None + + +def get_tracing_metrics() -> TracingMetrics: + """Return the tracing metrics singleton, creating it on first use.""" + global _tracing_metrics + if _tracing_metrics is None: + _tracing_metrics = TracingMetrics() + return _tracing_metrics + + +def processor_label(processor: object) -> str: + """Map a tracing processor instance to a low-cardinality label.""" + if type(processor).__name__ == "SGPAsyncTracingProcessor": + return "sgp" + return "other" + + +def classify_export_error(exc: BaseException) -> tuple[str, str]: + """Categorize an export failure into (error_class, http_code_label). + + ``http_code_label`` is a small fixed set suitable for Prometheus labels. + """ + name = type(exc).__name__ + message = str(exc) + + if "Timeout" in name: + return "timeout", "timeout" + if "Connection" in name or "Connect" in name: + return "network_error", "network" + + match = _HTTP_CODE_RE.search(message) + if match: + code = int(match.group(1)) + if code == 401: + return "authentication", "401" + if code == 403: + return "authentication", "403" + if code == 429: + return "rate_limit", "429" + if 400 <= code < 500: + return "client_error", "4xx" + if 500 <= code < 600: + return "server_error", "5xx" + return "other_error", "other" + + if any(s in name for s in ("Authentication", "Permission")): + return "authentication", "unknown" + if "RateLimit" in name: + return "rate_limit", "429" + if any(s in name for s in ("ServerError", "InternalServer", "ServiceUnavailable", "BadGateway")): + return "server_error", "5xx" + if any( + s in name + for s in ("BadRequest", "NotFound", "Conflict", "UnprocessableEntity") + ): + return "client_error", "4xx" + + return "other_error", "unknown" diff --git a/src/agentex/lib/core/observability/tracing_metrics_recording.py b/src/agentex/lib/core/observability/tracing_metrics_recording.py new file mode 100644 index 000000000..4fd8632b0 --- /dev/null +++ b/src/agentex/lib/core/observability/tracing_metrics_recording.py @@ -0,0 +1,153 @@ +"""Best-effort recording helpers for span queue / export OTel metrics. + +This module intentionally does **not** import OpenTelemetry — hot paths can +import it without pulling in the OTel SDK. Instruments are created lazily on +first record when ``is_metrics_enabled()`` is true. +""" + +from __future__ import annotations + +import os +import time +from typing import Protocol, Sequence + + +class _HasEnqueuedAt(Protocol): + enqueued_at: float | None + + +_metrics_enabled: bool | None = None +_tracing = None # lazy-loaded tracing_metrics module (loads OTel on first use) + + +def is_metrics_enabled() -> bool: + """Return whether SDK span-queue metrics recording is enabled.""" + global _metrics_enabled + if _metrics_enabled is None: + raw = os.environ.get("AGENTEX_TRACING_METRICS", "1").strip().lower() + _metrics_enabled = raw not in ("0", "false", "no", "off") + return _metrics_enabled + + +def _tracing_module(): + """Return lazy-loaded ``tracing_metrics`` module (loads OTel on first use).""" + global _tracing + if _tracing is None: + from agentex.lib.core.observability import tracing_metrics + + _tracing = tracing_metrics + return _tracing + + +def monotonic_if_enabled() -> float | None: + """Return ``time.monotonic()`` when metrics are enabled, else ``None``.""" + if not is_metrics_enabled(): + return None + return time.monotonic() + + +def record_span_enqueued(event_type: str) -> None: + if not is_metrics_enabled(): + return + try: + _tracing_module().get_tracing_metrics().span_events_enqueued.add( + 1, {"event_type": event_type} + ) + except Exception: + pass + + +def record_span_dropped(reason: str, count: int = 1) -> None: + if count <= 0 or not is_metrics_enabled(): + return + try: + _tracing_module().get_tracing_metrics().span_events_dropped.add( + count, {"reason": reason} + ) + except Exception: + pass + + +def record_batch_coalesced( + *, + queue_depth: int, + batch_items: Sequence[_HasEnqueuedAt], +) -> None: + if not is_metrics_enabled(): + return + try: + metrics = _tracing_module().get_tracing_metrics() + metrics.queue_depth.record(max(queue_depth, 0)) + metrics.batch_items.record(len(batch_items)) + + now = time.monotonic() + lag_ms = 0.0 + for item in batch_items: + if item.enqueued_at is None: + continue + lag_ms = max(lag_ms, (now - item.enqueued_at) * 1000.0) + if lag_ms > 0: + metrics.queue_lag.record(lag_ms) + except Exception: + pass + + +def record_batch_phase(*, phase: str, size: int, duration_ms: float) -> None: + if not is_metrics_enabled(): + return + try: + attrs = {"phase": phase} + metrics = _tracing_module().get_tracing_metrics() + metrics.batch_size.record(size, attrs) + metrics.batch_drain_duration.record(duration_ms, attrs) + except Exception: + pass + + +def record_export_success(*, event_type: str, span_count: int, processor: str) -> None: + if not is_metrics_enabled(): + return + try: + attrs = {"processor": processor, "event_type": event_type} + metrics = _tracing_module().get_tracing_metrics() + metrics.export_batches.add(1, attrs) + metrics.export_spans.add(span_count, attrs) + except Exception: + pass + + +def record_export_failure( + *, + processor: object, + event_type: str, + span_count: int, + exc: BaseException, +) -> None: + if not is_metrics_enabled(): + return + try: + tm = _tracing_module() + error_class, http_code = tm.classify_export_error(exc) + proc = tm.processor_label(processor) + attrs = { + "processor": proc, + "event_type": event_type, + "http_code": http_code, + "error_class": error_class, + } + metrics = tm.get_tracing_metrics() + metrics.export_batch_failures.add(1, attrs) + metrics.export_span_failures.add(span_count, attrs) + except Exception: + pass + + +def record_shutdown_timeout(*, remaining_items: int) -> None: + if not is_metrics_enabled(): + return + try: + metrics = _tracing_module().get_tracing_metrics() + metrics.shutdown_timeouts.add(1) + metrics.shutdown_remaining_items.record(max(remaining_items, 0)) + except Exception: + pass diff --git a/src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py b/src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py index a21eff7d3..ced4c5d2c 100644 --- a/src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py +++ b/src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py @@ -12,6 +12,7 @@ from agentex.types.span import Span from agentex.lib.types.tracing import SGPTracingProcessorConfig from agentex.lib.utils.logging import make_logger +from agentex.lib.core.observability import tracing_metrics_recording as _metrics from agentex.lib.environment_variables import EnvironmentVariables from agentex.lib.core.tracing.processors.tracing_processor_interface import ( SyncTracingProcessor, @@ -159,6 +160,9 @@ async def on_spans_start(self, spans: list[Span]) -> None: sgp_spans = [_build_sgp_span(span, self.env_vars) for span in spans] await client.spans.upsert_batch(items=[s.to_request_params() for s in sgp_spans]) + _metrics.record_export_success( + event_type="start", span_count=len(spans), processor="sgp" + ) @override async def on_spans_end(self, spans: list[Span]) -> None: @@ -175,6 +179,9 @@ async def on_spans_end(self, spans: list[Span]) -> None: sgp_span.end_time = span.end_time.isoformat() # type: ignore[union-attr] sgp_spans.append(sgp_span) await client.spans.upsert_batch(items=[s.to_request_params() for s in sgp_spans]) + _metrics.record_export_success( + event_type="end", span_count=len(spans), processor="sgp" + ) @override async def shutdown(self) -> None: diff --git a/src/agentex/lib/core/tracing/span_queue.py b/src/agentex/lib/core/tracing/span_queue.py index 5afb8c44b..5d77e3440 100644 --- a/src/agentex/lib/core/tracing/span_queue.py +++ b/src/agentex/lib/core/tracing/span_queue.py @@ -1,12 +1,14 @@ from __future__ import annotations import os +import time import asyncio from enum import Enum from dataclasses import dataclass from agentex.types.span import Span from agentex.lib.utils.logging import make_logger +from agentex.lib.core.observability import tracing_metrics_recording as _metrics from agentex.lib.core.tracing.processors.tracing_processor_interface import ( AsyncTracingProcessor, ) @@ -73,6 +75,7 @@ class _SpanQueueItem: event_type: SpanEventType span: Span processors: list[AsyncTracingProcessor] + enqueued_at: float | None = None # Number of times this item has already been dispatched. Used to bound # re-enqueue on transient failures. attempts: int = 0 @@ -160,6 +163,10 @@ def _record_drop(self, count: int, reason: str) -> None: if count <= 0: return self._dropped_spans += count + if "shutting down" in reason: + _metrics.record_span_dropped("shutdown", count) + elif "queue full" in reason: + _metrics.record_span_dropped("queue_full", count) # Warn on the first drop and then sparsely, so a drop storm is visible # without flooding the log. if self._dropped_spans == count or self._dropped_spans % 100 < count: @@ -181,7 +188,15 @@ def enqueue( return self._ensure_drain_running() try: - self._queue.put_nowait(_SpanQueueItem(event_type=event_type, span=span, processors=processors)) + self._queue.put_nowait( + _SpanQueueItem( + event_type=event_type, + span=span, + processors=processors, + enqueued_at=_metrics.monotonic_if_enabled(), + ) + ) + _metrics.record_span_enqueued(event_type.value) except asyncio.QueueFull: self._record_drop(1, "queue full") @@ -227,6 +242,11 @@ async def _drain_loop(self) -> None: except asyncio.QueueEmpty: break + _metrics.record_batch_coalesced( + queue_depth=self._queue.qsize() + len(batch), + batch_items=batch, + ) + # Separate START and END events and dispatch each as its own send # task. Dispatching STARTs first (so they are registered before the # END snapshot) guarantees an END never outruns a START of the same @@ -262,7 +282,14 @@ async def _run_send(self, items: list[_SpanQueueItem], barrier: tuple[asyncio.Ta # Wait for the START sends this END batch depends on. Their # exceptions are irrelevant here — we only need them finished. await asyncio.gather(*barrier, return_exceptions=True) + phase_start = time.perf_counter() await self._process_items(items) + if items: + _metrics.record_batch_phase( + phase=items[0].event_type.value, + size=len(items), + duration_ms=(time.perf_counter() - phase_start) * 1000.0, + ) finally: # Mark every item done so shutdown's queue.join() can complete only # once all sends (and their retries) have finished. @@ -323,6 +350,12 @@ def _handle_failure( exhausted = len(items) - len(retriable) if exhausted: self._record_drop(exhausted, f"{type(p).__name__} retries exhausted during {event_type.value}") + _metrics.record_export_failure( + processor=p, + event_type=event_type.value, + span_count=exhausted, + exc=exc, + ) for item in retriable: self._reenqueue(item, p) if retriable: @@ -343,6 +376,12 @@ def _handle_failure( len(items), event_type.value, ) + _metrics.record_export_failure( + processor=p, + event_type=event_type.value, + span_count=len(items), + exc=exc, + ) def _reenqueue(self, item: _SpanQueueItem, p: AsyncTracingProcessor) -> None: """Put a single failed item back on the queue, scoped to the processor @@ -360,6 +399,7 @@ def _reenqueue(self, item: _SpanQueueItem, p: AsyncTracingProcessor) -> None: event_type=item.event_type, span=item.span, processors=[p], + enqueued_at=item.enqueued_at, attempts=item.attempts + 1, ) ) @@ -383,9 +423,11 @@ async def shutdown(self, timeout: float = 30.0) -> None: await asyncio.wait_for(self._queue.join(), timeout=timeout) except asyncio.TimeoutError: timed_out = True + remaining = self._queue.qsize() logger.warning( - "Span queue shutdown timed out after %.1fs with %d items remaining", timeout, self._queue.qsize() + "Span queue shutdown timed out after %.1fs with %d items remaining", timeout, remaining ) + _metrics.record_shutdown_timeout(remaining_items=remaining) if self._drain_task is not None and not self._drain_task.done(): self._drain_task.cancel() diff --git a/tests/lib/cli/test_init_templates.py b/tests/lib/cli/test_init_templates.py new file mode 100644 index 000000000..ec809cbbf --- /dev/null +++ b/tests/lib/cli/test_init_templates.py @@ -0,0 +1,139 @@ +"""Tests for the `agentex init` project templates. + +These render the Jinja templates the way the CLI does and assert that: + +- every template type's declared project files exist and render, +- rendered Python parses (catches `.j2` syntax/templating regressions), +- the agent-specific context (names, workflow class) is substituted in, +- the Temporal + LangGraph template is fully wired (enum, file map, root files). + +The Temporal + LangGraph template is the focus, but the parametrized smoke +test covers every template so a broken `.j2` anywhere is caught early. +""" + +from __future__ import annotations + +import ast +from pathlib import Path + +import pytest + +from agentex.lib.cli.commands.init import ( + TemplateType, + get_project_context, + create_project_structure, +) + + +def _context(template_type: TemplateType, use_uv: bool = True) -> dict: + """Build the same render context the CLI assembles from user answers.""" + answers = { + "template_type": template_type, + "project_path": ".", + "agent_name": "my-agent", + "agent_directory_name": "my-agent", + "description": "An Agentex agent", + "use_uv": use_uv, + } + context = get_project_context(answers, Path("."), Path("../../")) + context["template_type"] = template_type.value + context["use_uv"] = use_uv + return context + + +def _render_project(tmp_path: Path, template_type: TemplateType, use_uv: bool = True) -> Path: + context = _context(template_type, use_uv=use_uv) + create_project_structure(tmp_path, context, template_type, use_uv=use_uv) + return tmp_path / context["project_name"] + + +@pytest.mark.parametrize("template_type", list(TemplateType)) +def test_all_templates_render_to_valid_python(tmp_path: Path, template_type: TemplateType): + """Every template renders, and every rendered .py file is syntactically valid.""" + project_dir = _render_project(tmp_path, template_type) + + py_files = list(project_dir.rglob("*.py")) + assert py_files, f"{template_type.value} produced no Python files" + + for py_file in py_files: + source = py_file.read_text() + # Raises SyntaxError if a rendered template is broken. + ast.parse(source, filename=str(py_file)) + + +class TestTemporalLangGraphTemplate: + """Focused coverage for the new Temporal + LangGraph template.""" + + template_type = TemplateType.TEMPORAL_LANGGRAPH + + def test_enum_and_value(self): + assert TemplateType.TEMPORAL_LANGGRAPH.value == "temporal-langgraph" + + def test_expected_project_files_exist(self, tmp_path: Path): + project_dir = _render_project(tmp_path, self.template_type) + project_pkg = project_dir / "project" + for filename in ( + "acp.py", + "workflow.py", + "run_worker.py", + "graph.py", + "tools.py", + "__init__.py", + ): + assert (project_pkg / filename).is_file(), f"missing project/{filename}" + + def test_expected_root_files_exist(self, tmp_path: Path): + project_dir = _render_project(tmp_path, self.template_type) + for filename in ( + "manifest.yaml", + "README.md", + "environments.yaml", + ".env.example", + ".dockerignore", + "Dockerfile", + "dev.ipynb", + "pyproject.toml", + ): + assert (project_dir / filename).is_file(), f"missing {filename}" + + def test_workflow_class_substituted(self, tmp_path: Path): + project_dir = _render_project(tmp_path, self.template_type) + workflow_src = (project_dir / "project" / "workflow.py").read_text() + # agent_name "my-agent" -> workflow class "MyAgentWorkflow" + assert "class MyAgentWorkflow(BaseWorkflow):" in workflow_src + assert "{{" not in workflow_src, "unrendered Jinja left in workflow.py" + + def test_nodes_run_via_langgraph_plugin(self, tmp_path: Path): + """The defining trait: nodes run as Temporal activities via the plugin.""" + project_dir = _render_project(tmp_path, self.template_type) + graph_src = (project_dir / "project" / "graph.py").read_text() + # The agent (LLM) node is an activity; the tools node runs in-workflow. + assert '"execute_in": "activity"' in graph_src + assert '"execute_in": "workflow"' in graph_src + + # Both the worker and the ACP register the LangGraph plugin. + worker_src = (project_dir / "project" / "run_worker.py").read_text() + acp_src = (project_dir / "project" / "acp.py").read_text() + assert "LangGraphPlugin" in worker_src + assert "LangGraphPlugin" in acp_src + + def test_human_in_the_loop_and_queries_present(self, tmp_path: Path): + project_dir = _render_project(tmp_path, self.template_type) + workflow_src = (project_dir / "project" / "workflow.py").read_text() + graph_src = (project_dir / "project" / "graph.py").read_text() + # HIL: graph raises a langgraph interrupt; workflow resumes via signal + Command. + assert "interrupt(" in graph_src + assert "TOOLS_REQUIRING_APPROVAL" in graph_src + assert "def provide_approval" in workflow_src + assert "Command(resume=" in workflow_src + assert "wait_condition" in workflow_src + # Graph-visualization / introspection queries + for query in ("get_status", "get_graph_mermaid", "get_graph_ascii", "get_graph_state"): + assert query in workflow_src, f"missing query {query}" + + def test_requirements_include_langgraph_plugin_and_temporal(self, tmp_path: Path): + # requirements.txt only renders in the non-uv variant + project_dir = _render_project(tmp_path, self.template_type, use_uv=False) + requirements = (project_dir / "requirements.txt").read_text() + assert "temporalio[langgraph]>=1.27.0" in requirements + assert "langchain-openai" in requirements diff --git a/tests/lib/core/tracing/processors/test_sgp_tracing_processor.py b/tests/lib/core/tracing/processors/test_sgp_tracing_processor.py index 090cd9a09..41efcea5a 100644 --- a/tests/lib/core/tracing/processors/test_sgp_tracing_processor.py +++ b/tests/lib/core/tracing/processors/test_sgp_tracing_processor.py @@ -222,6 +222,33 @@ async def test_on_spans_start_sends_single_upsert_for_batch(self): items = mock_client.spans.upsert_batch.call_args.kwargs["items"] assert len(items) == n + async def test_on_spans_start_records_export_success_metrics(self, monkeypatch): + monkeypatch.setenv("AGENTEX_TRACING_METRICS", "1") + import agentex.lib.core.observability.tracing_metrics_recording as recording + + recording._metrics_enabled = None + recording._tracing = None + processor, _, mock_client = self._make_processor() + mock_metrics = MagicMock() + + n = 4 + spans = [_make_span() for _ in range(n)] + with patch(f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span()), patch( + "agentex.lib.core.observability.tracing_metrics.get_tracing_metrics", + return_value=mock_metrics, + ): + await processor.on_spans_start(spans) + + mock_metrics.export_batches.add.assert_called_once_with( + 1, + {"processor": "sgp", "event_type": "start"}, + ) + mock_metrics.export_spans.add.assert_called_once_with( + n, + {"processor": "sgp", "event_type": "start"}, + ) + assert mock_client.spans.upsert_batch.call_count == 1 + async def test_get_client_caches_per_event_loop(self): """The processor must keep one client per event loop, and reuse it across calls within the same loop. This is what enables connection diff --git a/tests/lib/core/tracing/test_span_queue.py b/tests/lib/core/tracing/test_span_queue.py index eee513ac9..d2452d619 100644 --- a/tests/lib/core/tracing/test_span_queue.py +++ b/tests/lib/core/tracing/test_span_queue.py @@ -401,6 +401,7 @@ async def capture_starts(spans: list[Span]) -> None: assert sum(len(b) for b in received) == 7 + class _FakeHTTPError(Exception): """Mimics an SGP/httpx status error: carries a ``status_code`` attribute.""" @@ -743,3 +744,118 @@ async def capture_end(span: Span) -> None: # END should still carry output and end_time assert end_spans[0].output is not None assert end_spans[0].end_time is not None + + +class TestAsyncSpanQueueMetrics: + async def test_batch_coalesced_records_depth_including_batch(self, monkeypatch): + monkeypatch.setenv("AGENTEX_TRACING_METRICS", "1") + import agentex.lib.core.observability.tracing_metrics_recording as recording + + recording._metrics_enabled = None + proc = _make_processor() + queue = AsyncSpanQueue(linger_ms=0) + recorded_depths: list[int] = [] + + def capture_coalesced(*, queue_depth: int, batch_items: object) -> None: + recorded_depths.append(queue_depth) + + with patch.object(recording, "record_batch_coalesced", side_effect=capture_coalesced): + for _ in range(3): + queue.enqueue(SpanEventType.START, _make_span(), [proc]) + await asyncio.sleep(0.05) + await queue.shutdown() + + assert recorded_depths, "expected at least one coalesced batch" + assert recorded_depths[0] >= 3, ( + f"queue_depth should include batch items removed from queue, got {recorded_depths[0]}" + ) + + async def test_enqueue_records_enqueued_metric(self, monkeypatch): + monkeypatch.setenv("AGENTEX_TRACING_METRICS", "1") + import agentex.lib.core.observability.tracing_metrics_recording as recording + + recording._metrics_enabled = None + recording._tracing = None + mock_metrics = MagicMock() + proc = _make_processor() + queue = AsyncSpanQueue() + + with patch( + "agentex.lib.core.observability.tracing_metrics.get_tracing_metrics", + return_value=mock_metrics, + ): + queue.enqueue(SpanEventType.START, _make_span(), [proc]) + await asyncio.sleep(0.05) + await queue.shutdown() + + mock_metrics.span_events_enqueued.add.assert_any_call(1, {"event_type": "start"}) + + async def test_enqueue_during_shutdown_records_dropped_metric(self, monkeypatch): + monkeypatch.setenv("AGENTEX_TRACING_METRICS", "1") + import agentex.lib.core.observability.tracing_metrics_recording as recording + + recording._metrics_enabled = None + recording._tracing = None + mock_metrics = MagicMock() + proc = _make_processor() + queue = AsyncSpanQueue(linger_ms=0) + + with patch( + "agentex.lib.core.observability.tracing_metrics.get_tracing_metrics", + return_value=mock_metrics, + ): + queue.enqueue(SpanEventType.START, _make_span(), [proc]) + await asyncio.sleep(0.05) + queue._stopping = True + queue.enqueue(SpanEventType.END, _make_span(), [proc]) + await queue.shutdown() + + mock_metrics.span_events_dropped.add.assert_any_call(1, {"reason": "shutdown"}) + + async def test_processor_failure_records_export_failure(self, monkeypatch): + monkeypatch.setenv("AGENTEX_TRACING_METRICS", "1") + import agentex.lib.core.observability.tracing_metrics_recording as recording + + recording._metrics_enabled = None + recording._tracing = None + mock_metrics = MagicMock() + + class ExportError(Exception): + pass + + proc = AsyncMock() + proc.on_spans_start = AsyncMock(side_effect=ExportError("Error code: 401 - denied")) + proc.on_spans_end = AsyncMock() + queue = AsyncSpanQueue() + + with patch( + "agentex.lib.core.observability.tracing_metrics.get_tracing_metrics", + return_value=mock_metrics, + ): + queue.enqueue(SpanEventType.START, _make_span(), [proc]) + await asyncio.sleep(0.05) + await queue.shutdown() + + mock_metrics.export_batch_failures.add.assert_called_once() + mock_metrics.export_span_failures.add.assert_called_once() + + async def test_enqueue_overhead_with_metrics_disabled(self, monkeypatch): + monkeypatch.setenv("AGENTEX_TRACING_METRICS", "0") + import agentex.lib.core.observability.tracing_metrics_recording as recording + + recording._metrics_enabled = None + recording._tracing = None + proc = _make_processor() + queue = AsyncSpanQueue() + + with patch( + "agentex.lib.core.observability.tracing_metrics.get_tracing_metrics" + ) as mock_get: + start = time.monotonic() + for _ in range(200): + queue.enqueue(SpanEventType.START, _make_span(), [proc]) + elapsed = time.monotonic() - start + await queue.shutdown() + + assert elapsed < 0.05, f"disabled metrics enqueue too slow: {elapsed:.3f}s" + mock_get.assert_not_called()