Skip to content

Commit c0af6b0

Browse files
adaltonclaudeAmbient Code Botjeremyeder
authored
feat: add optional Langfuse tracing for MCP tool calls (#37)
* feat: add optional Langfuse tracing for MCP tool calls and HTTP requests Add observability to the mcp-acp server via Langfuse integration. Every MCP tool call produces a trace with tool name, filtered args, duration, and success/error status. HTTP requests to the ACP public API appear as nested child spans with method, path, and status code. Tracing is opt-in and silently no-ops when unconfigured: - Reads LANGFUSE_PUBLIC_KEY, LANGFUSE_SECRET_KEY, LANGFUSE_BASE_URL - MCP_ACP_TRACING_ENABLED env var (default: true) serves as kill switch - SDK init failures are caught and logged, never break tool execution - tracing.py: Langfuse v3 SDK client singleton, trace_tool_call and trace_http_request async context managers, no-op fallbacks, lifecycle - server.py: wrap call_tool() dispatch in trace_tool_call, flush on exit - client.py: wrap _request() and _request_text() in trace_http_request - settings.py: add missing _acpctl_config_path() (pre-existing bug fix) - test_tracing.py: 25 unit tests covering all tracing paths - scripts/test_tracing_e2e.py: e2e test against live ACP cluster Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * test: add comprehensive E2E tests for MCP server Add test_server_e2e.py with full coverage of all 26 MCP tools: - Session management (list, get, create, delete, restart, clone, update) - Observability (logs, transcript, metrics) - Labels (add, remove, list by label, bulk operations) - Bulk operations (delete, stop, restart - by name and by label) - Cluster management (list, whoami, switch, login) Tests use mocked HTTP transport to verify the complete flow from tool call through client to HTTP requests. Includes: - Success and error path testing - Dry-run mode verification - Confirmation requirement enforcement for destructive operations - Input validation testing - Complete workflow simulation Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * Fix ruff linting issues in test_tracing_e2e.py - Use single quotes for jsonpath argument containing double quotes - Remove unused f-string prefix Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * Fix ruff linting issues in test_server_e2e.py Remove unused AsyncMock import, extra blank line, and fix multi-line call formatting. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * Fix import path for pylogger in tracing.py Use mcp_acp.utils.pylogger instead of utils.pylogger to match other modules in the package. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * refactor: enhance E2E tests with better mocking and edge cases Improvements to MockHTTPClient: - Move to module level class with comprehensive docstrings - Add get_calls_for() to retrieve calls matching method/path - Add assert_called_with() for HTTP call verification - Sort responses by path length for better specificity matching New test cases: - test_create_session_with_repos: verify repos parameter handling - test_invalid_template_name: validation for unknown templates - test_update_session_no_fields: error when no update fields provided - test_bulk_by_label_no_matches: graceful empty results handling - test_empty_sessions_list: empty list display - test_delete_verifies_http_call: HTTP call verification example Other improvements: - Move json import to module level (remove local import) - Add detailed docstrings with test categories Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * Fix test failures and implementation bug - Fix test_client.py: Update test expectations to match actual API schema (use 'task' instead of 'initialPrompt', 'model' instead of 'llmConfig.model') - Fix clone_session: Add displayName to clone_data (was missing) - Fix test_tracing.py: Correct patch location for Langfuse module - Remove non-existent 'timeout' parameter from create_session tests Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * Fix ruff formatting in test_server_e2e.py Combine multi-line statements that ruff prefers on single lines. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com> Co-authored-by: Ambient Code Bot <bot@ambient-code.local> Co-authored-by: Jeremy Eder <jeder@redhat.com>
1 parent 0bc57bf commit c0af6b0

8 files changed

Lines changed: 1164 additions & 286 deletions

File tree

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ dependencies = [
3333
"httpx>=0.27.0",
3434
"pyyaml>=6.0",
3535
"python-dateutil>=2.8.0",
36+
"langfuse>=2.0.0",
3637
]
3738

3839
[project.optional-dependencies]

scripts/test_tracing_e2e.py

Lines changed: 249 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,249 @@
1+
#!/usr/bin/env python3
2+
"""End-to-end test for Langfuse tracing integration.
3+
4+
Spins up the mcp-acp MCP server as a subprocess, connects via stdio,
5+
exercises several tools against a live ACP cluster, and verifies that
6+
Langfuse traces are created (when credentials are configured).
7+
8+
Prerequisites:
9+
- A running ACP cluster (kind or remote) with public-api accessible
10+
- A clusters.yaml config OR env vars pointing to the cluster
11+
- Langfuse credentials sourced into env
12+
13+
Usage:
14+
# 1. Create a clusters.yaml if you don't have one:
15+
uv run python scripts/test_tracing_e2e.py --setup-config
16+
17+
# 2. Source Langfuse creds and run:
18+
source ../langfuse.env
19+
uv run python scripts/test_tracing_e2e.py
20+
21+
# 3. Verify kill switch disables tracing:
22+
MCP_ACP_TRACING_ENABLED=false uv run python scripts/test_tracing_e2e.py
23+
"""
24+
25+
import argparse
26+
import asyncio
27+
import os
28+
import subprocess
29+
import sys
30+
import textwrap
31+
from pathlib import Path
32+
33+
34+
def setup_kind_config() -> Path:
35+
"""Auto-generate a clusters.yaml for the local kind cluster."""
36+
config_dir = Path.home() / ".config" / "acp"
37+
config_path = config_dir / "clusters.yaml"
38+
39+
# Get node IP
40+
node_ip = subprocess.check_output(
41+
["kubectl", "get", "nodes", "-o", 'jsonpath={.items[0].status.addresses[?(@.type=="InternalIP")].address}'],
42+
text=True,
43+
).strip()
44+
45+
# Get NodePort for public-api-service
46+
node_port = subprocess.check_output(
47+
[
48+
"kubectl",
49+
"get",
50+
"svc",
51+
"public-api-service",
52+
"-n",
53+
"ambient-code",
54+
"-o",
55+
"jsonpath={.spec.ports[0].nodePort}",
56+
],
57+
text=True,
58+
).strip()
59+
60+
# Get test user token
61+
token_b64 = subprocess.check_output(
62+
["kubectl", "get", "secret", "test-user-token", "-n", "ambient-code", "-o", "jsonpath={.data.token}"],
63+
text=True,
64+
).strip()
65+
token = subprocess.check_output(["base64", "-d"], input=token_b64, text=True).strip()
66+
67+
config_content = textwrap.dedent(f"""\
68+
default_cluster: kind
69+
clusters:
70+
kind:
71+
server: http://{node_ip}:{node_port}
72+
default_project: ambient-code
73+
description: Local kind development cluster
74+
token: "{token}"
75+
""")
76+
77+
config_dir.mkdir(parents=True, exist_ok=True)
78+
config_path.write_text(config_content)
79+
print(f"Wrote clusters.yaml to {config_path}")
80+
print(f" server: http://{node_ip}:{node_port}")
81+
print(" project: ambient-code")
82+
return config_path
83+
84+
85+
async def run_e2e_test() -> None:
86+
"""Connect to the MCP server and exercise tools."""
87+
from mcp import ClientSession
88+
from mcp.client.stdio import StdioServerParameters, stdio_client
89+
90+
# Pass through current env (including Langfuse vars if set)
91+
env = os.environ.copy()
92+
env["PYTHONPATH"] = str(Path(__file__).resolve().parent.parent / "src")
93+
94+
# Also add the utils path (sibling to src/)
95+
utils_parent = Path(__file__).resolve().parent.parent
96+
if "PYTHONPATH" in env:
97+
env["PYTHONPATH"] = f"{utils_parent}:{env['PYTHONPATH']}"
98+
else:
99+
env["PYTHONPATH"] = str(utils_parent)
100+
101+
# Prefer .venv-3.12 (avoids Langfuse/Pydantic v1 incompatibility with Python 3.14)
102+
project_root = Path(__file__).resolve().parent.parent
103+
venv_312 = project_root / ".venv-3.12" / "bin" / "python"
104+
venv_default = project_root / ".venv" / "bin" / "python"
105+
python_bin = str(venv_312 if venv_312.exists() else venv_default)
106+
107+
server_params = StdioServerParameters(
108+
command=python_bin,
109+
args=["-m", "mcp_acp.server"],
110+
env=env,
111+
cwd=str(project_root),
112+
)
113+
114+
langfuse_configured = bool(os.getenv("LANGFUSE_PUBLIC_KEY") and os.getenv("LANGFUSE_SECRET_KEY"))
115+
tracing_enabled = os.getenv("MCP_ACP_TRACING_ENABLED", "true").lower() in ("true", "1", "yes")
116+
117+
print("=" * 60)
118+
print("MCP-ACP Tracing End-to-End Test")
119+
print("=" * 60)
120+
print(f" Langfuse credentials: {'configured' if langfuse_configured else 'NOT SET (silent no-op)'}")
121+
print(f" Tracing enabled: {tracing_enabled}")
122+
print(f" Kill switch: MCP_ACP_TRACING_ENABLED={os.getenv('MCP_ACP_TRACING_ENABLED', 'true')}")
123+
print()
124+
125+
results: list[dict] = []
126+
127+
async with stdio_client(server_params) as (read_stream, write_stream):
128+
async with ClientSession(read_stream, write_stream) as session:
129+
await session.initialize()
130+
print("[OK] MCP server started and initialized")
131+
132+
# 1. List tools
133+
tools_result = await session.list_tools()
134+
tool_names = [t.name for t in tools_result.tools]
135+
print(f"[OK] list_tools: {len(tool_names)} tools available")
136+
results.append({"test": "list_tools", "status": "pass", "tool_count": len(tool_names)})
137+
138+
# 2. acp_whoami — tests cluster auth
139+
print("\n--- Tool: acp_whoami ---")
140+
whoami = await session.call_tool("acp_whoami", {})
141+
whoami_text = whoami.content[0].text
142+
print(f" Response: {whoami_text[:200]}")
143+
results.append({"test": "acp_whoami", "status": "pass"})
144+
145+
# 3. acp_list_clusters
146+
print("\n--- Tool: acp_list_clusters ---")
147+
clusters = await session.call_tool("acp_list_clusters", {})
148+
clusters_text = clusters.content[0].text
149+
print(f" Response: {clusters_text[:200]}")
150+
results.append({"test": "acp_list_clusters", "status": "pass"})
151+
152+
# 4. acp_list_sessions — exercises HTTP call tracing
153+
print("\n--- Tool: acp_list_sessions ---")
154+
sessions_result = await session.call_tool("acp_list_sessions", {})
155+
sessions_text = sessions_result.content[0].text
156+
print(f" Response: {sessions_text[:200]}")
157+
results.append({"test": "acp_list_sessions", "status": "pass"})
158+
159+
# 5. acp_get_session with invalid session — exercises error tracing
160+
print("\n--- Tool: acp_get_session (expect error) ---")
161+
err_result = await session.call_tool("acp_get_session", {"session": "nonexistent-session-xyz"})
162+
err_text = err_result.content[0].text
163+
is_error = "error" in err_text.lower() or "not found" in err_text.lower()
164+
print(f" Response: {err_text[:200]}")
165+
print(f" Got expected error: {is_error}")
166+
results.append({"test": "acp_get_session_error", "status": "pass" if is_error else "warn"})
167+
168+
# 6. acp_create_session dry_run — exercises POST without side effects
169+
print("\n--- Tool: acp_create_session (dry_run) ---")
170+
create_result = await session.call_tool(
171+
"acp_create_session",
172+
{
173+
"initial_prompt": "Hello from tracing e2e test",
174+
"display_name": "tracing-e2e-test",
175+
"dry_run": True,
176+
},
177+
)
178+
create_text = create_result.content[0].text
179+
print(f" Response: {create_text[:200]}")
180+
results.append({"test": "acp_create_session_dry_run", "status": "pass"})
181+
182+
# 7. acp_bulk_delete_sessions without confirm — exercises validation error
183+
print("\n--- Tool: acp_bulk_delete_sessions (expect validation error) ---")
184+
bulk_result = await session.call_tool(
185+
"acp_bulk_delete_sessions",
186+
{"sessions": ["s1"]},
187+
)
188+
bulk_text = bulk_result.content[0].text
189+
is_validation = "confirm" in bulk_text.lower() or "validation" in bulk_text.lower()
190+
print(f" Response: {bulk_text[:200]}")
191+
print(f" Got expected validation error: {is_validation}")
192+
results.append({"test": "acp_bulk_validation_error", "status": "pass" if is_validation else "warn"})
193+
194+
# Summary
195+
print("\n" + "=" * 60)
196+
print("RESULTS SUMMARY")
197+
print("=" * 60)
198+
for r in results:
199+
icon = "PASS" if r["status"] == "pass" else "WARN"
200+
print(f" [{icon}] {r['test']}")
201+
202+
passed = sum(1 for r in results if r["status"] == "pass")
203+
print(f"\n {passed}/{len(results)} tests passed")
204+
205+
if langfuse_configured and tracing_enabled:
206+
print("\n Check your Langfuse dashboard for traces:")
207+
print(f" {os.getenv('LANGFUSE_BASE_URL', 'https://cloud.langfuse.com')}")
208+
print(" Expected traces:")
209+
print(" - acp_whoami")
210+
print(" - acp_list_clusters")
211+
print(" - acp_list_sessions (with child span: GET /v1/sessions)")
212+
print(" - acp_get_session (with error status)")
213+
print(" - acp_create_session (dry_run, no HTTP span)")
214+
print(" - acp_bulk_delete_sessions (validation_error, no HTTP span)")
215+
elif not langfuse_configured:
216+
print("\n Langfuse credentials not set — no traces were sent (silent no-op).")
217+
print(" To verify tracing, set LANGFUSE_PUBLIC_KEY, LANGFUSE_SECRET_KEY, and LANGFUSE_BASE_URL.")
218+
elif not tracing_enabled:
219+
print("\n Tracing disabled via MCP_ACP_TRACING_ENABLED=false — no traces sent.")
220+
221+
print()
222+
223+
224+
def main():
225+
parser = argparse.ArgumentParser(description="End-to-end test for Langfuse tracing in mcp-acp")
226+
parser.add_argument(
227+
"--setup-config",
228+
action="store_true",
229+
help="Generate ~/.config/acp/clusters.yaml from the local kind cluster, then exit",
230+
)
231+
args = parser.parse_args()
232+
233+
if args.setup_config:
234+
setup_kind_config()
235+
return
236+
237+
# Check clusters.yaml exists
238+
config_path = os.getenv("ACP_CLUSTER_CONFIG") or str(Path.home() / ".config" / "acp" / "clusters.yaml")
239+
if not Path(config_path).exists():
240+
print(f"ERROR: No clusters.yaml found at {config_path}")
241+
print("Run with --setup-config to auto-generate from kind cluster:")
242+
print(f" uv run python {sys.argv[0]} --setup-config")
243+
sys.exit(1)
244+
245+
asyncio.run(run_e2e_test())
246+
247+
248+
if __name__ == "__main__":
249+
main()

src/mcp_acp/client.py

Lines changed: 51 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import httpx
1414

1515
from mcp_acp.settings import _acpctl_config_path, load_clusters_config, load_settings
16+
from mcp_acp.tracing import trace_http_request
1617
from mcp_acp.utils.pylogger import get_python_logger
1718

1819
logger = get_python_logger()
@@ -162,42 +163,44 @@ async def _request(
162163

163164
client = await self._get_http_client()
164165

165-
try:
166-
response = await client.request(
167-
method=method,
168-
url=url,
169-
headers=headers,
170-
json=json_data,
171-
params=params,
172-
)
173-
174-
if response.status_code >= 400:
175-
try:
176-
error_data = response.json()
177-
error_msg = error_data.get("error", f"HTTP {response.status_code}")
178-
except Exception:
179-
error_msg = f"HTTP {response.status_code}: {response.text}"
180-
181-
logger.warning(
182-
"api_request_failed",
166+
async with trace_http_request(method, path, params) as span_ctx:
167+
try:
168+
response = await client.request(
183169
method=method,
184-
path=path,
185-
status_code=response.status_code,
186-
error=error_msg,
170+
url=url,
171+
headers=headers,
172+
json=json_data,
173+
params=params,
187174
)
188-
raise ValueError(error_msg)
189-
190-
if response.status_code == 204:
191-
return {"success": True}
192-
193-
return response.json()
194-
195-
except httpx.TimeoutException as e:
196-
logger.error("api_request_timeout", method=method, path=path, error=str(e))
197-
raise TimeoutError(f"Request timed out: {path}") from e
198-
except httpx.RequestError as e:
199-
logger.error("api_request_error", method=method, path=path, error=str(e))
200-
raise ValueError(f"Request failed: {str(e)}") from e
175+
span_ctx.set_response(response.status_code)
176+
177+
if response.status_code >= 400:
178+
try:
179+
error_data = response.json()
180+
error_msg = error_data.get("error", f"HTTP {response.status_code}")
181+
except Exception:
182+
error_msg = f"HTTP {response.status_code}: {response.text}"
183+
184+
logger.warning(
185+
"api_request_failed",
186+
method=method,
187+
path=path,
188+
status_code=response.status_code,
189+
error=error_msg,
190+
)
191+
raise ValueError(error_msg)
192+
193+
if response.status_code == 204:
194+
return {"success": True}
195+
196+
return response.json()
197+
198+
except httpx.TimeoutException as e:
199+
logger.error("api_request_timeout", method=method, path=path, error=str(e))
200+
raise TimeoutError(f"Request timed out: {path}") from e
201+
except httpx.RequestError as e:
202+
logger.error("api_request_error", method=method, path=path, error=str(e))
203+
raise ValueError(f"Request failed: {str(e)}") from e
201204

202205
async def _request_text(
203206
self,
@@ -221,20 +224,22 @@ async def _request_text(
221224

222225
client = await self._get_http_client()
223226

224-
try:
225-
response = await client.request(method=method, url=url, headers=headers, params=params)
227+
async with trace_http_request(method, path, params) as span_ctx:
228+
try:
229+
response = await client.request(method=method, url=url, headers=headers, params=params)
230+
span_ctx.set_response(response.status_code)
226231

227-
if response.status_code >= 400:
228-
raise ValueError(f"HTTP {response.status_code}: {response.text}")
232+
if response.status_code >= 400:
233+
raise ValueError(f"HTTP {response.status_code}: {response.text}")
229234

230-
return response.text
235+
return response.text
231236

232-
except httpx.TimeoutException as e:
233-
logger.error("api_request_timeout", method=method, path=path, error=str(e))
234-
raise TimeoutError(f"Request timed out: {path}") from e
235-
except httpx.RequestError as e:
236-
logger.error("api_request_error", method=method, path=path, error=str(e))
237-
raise ValueError(f"Request failed: {str(e)}") from e
237+
except httpx.TimeoutException as e:
238+
logger.error("api_request_timeout", method=method, path=path, error=str(e))
239+
raise TimeoutError(f"Request timed out: {path}") from e
240+
except httpx.RequestError as e:
241+
logger.error("api_request_error", method=method, path=path, error=str(e))
242+
raise ValueError(f"Request failed: {str(e)}") from e
238243

239244
# ── Validation ───────────────────────────────────────────────────────
240245

@@ -544,6 +549,7 @@ async def clone_session(
544549
clone_data: dict[str, Any] = {
545550
"task": source.get("task", ""),
546551
"model": source.get("model", "claude-sonnet-4"),
552+
"displayName": new_display_name,
547553
}
548554

549555
if source.get("repos"):

0 commit comments

Comments
 (0)