Skip to content

Commit 65d2e81

Browse files
committed
refactor: emit LLM token / request metrics via RunHooks
Splits metric emission by what each layer can see: - agentex.lib.core.observability.llm_metrics_hooks.LLMMetricsHooks (RunHooks subclass) emits agentex.llm.requests + the four token counters in on_llm_end. Works for any RunHooks-aware path. - TemporalStreamingHooks now inherits from LLMMetricsHooks so the async path picks up the same metrics automatically. - TemporalStreamingModel keeps only the streaming-only metrics (ttft, ttat, tps) — those need per-chunk visibility hooks can't provide. Failure path uses the new record_llm_failure helper. This makes adding the sync ACP path trivial later: pass LLMMetricsHooks() to Runner.run from services/adk/providers/openai.py and it'll emit the same metrics with no double-counting. Tests cover: - classify_status branches (rate_limit / timeout / server_error / network_error / client_error / other_error / success) - get_llm_metrics singleton + instrument presence - LLMMetricsHooks.on_llm_end emits requests + token counters with the right model attribute - Both the hooks path and record_llm_failure swallow exporter exceptions so callers don't break when metrics fail
1 parent 1935aa9 commit 65d2e81

6 files changed

Lines changed: 279 additions & 31 deletions

File tree

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
"""``RunHooks`` adapter that emits per-call LLM metrics.
2+
3+
Used by the sync ACP path and as a base class for ``TemporalStreamingHooks``
4+
on the async path, so token / request / cache metrics emit consistently
5+
across both. Streaming-only metrics (ttft, ttat, tps) are emitted from the
6+
streaming model itself, not here — hooks don't see individual chunks.
7+
"""
8+
9+
from __future__ import annotations
10+
11+
from typing import Any
12+
13+
from agents import Agent, RunHooks, ModelResponse, RunContextWrapper
14+
15+
from agentex.lib.core.observability.llm_metrics import classify_status, get_llm_metrics
16+
17+
18+
class LLMMetricsHooks(RunHooks):
19+
"""Emits ``agentex.llm.requests`` + token counters on every LLM call."""
20+
21+
async def on_llm_end(
22+
self,
23+
context: RunContextWrapper[Any],
24+
agent: Agent[Any],
25+
response: ModelResponse,
26+
) -> None:
27+
del context # part of the RunHooks contract; unused here
28+
m = get_llm_metrics()
29+
attrs = {"model": str(agent.model) if agent.model else "unknown"}
30+
try:
31+
usage = response.usage
32+
m.requests.add(1, {**attrs, "status": "success"})
33+
m.input_tokens.add(usage.input_tokens or 0, attrs)
34+
m.output_tokens.add(usage.output_tokens or 0, attrs)
35+
m.cached_input_tokens.add(usage.input_tokens_details.cached_tokens or 0, attrs)
36+
m.reasoning_tokens.add(usage.output_tokens_details.reasoning_tokens or 0, attrs)
37+
except Exception:
38+
pass
39+
40+
41+
def record_llm_failure(model: str, exc: BaseException) -> None:
42+
"""Best-effort counter bump for an LLM call that raised before ``on_llm_end``."""
43+
try:
44+
get_llm_metrics().requests.add(1, {"model": model, "status": classify_status(exc)})
45+
except Exception:
46+
pass

src/agentex/lib/core/observability/tests/__init__.py

Whitespace-only changes.
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
"""Tests for ``agentex.lib.core.observability.llm_metrics``."""
2+
3+
from __future__ import annotations
4+
5+
import agentex.lib.core.observability.llm_metrics as llm_metrics
6+
from agentex.lib.core.observability.llm_metrics import (
7+
LLMMetrics,
8+
classify_status,
9+
get_llm_metrics,
10+
)
11+
12+
13+
class TestClassifyStatus:
14+
def test_none_is_success(self):
15+
assert classify_status(None) == "success"
16+
17+
def test_rate_limit(self):
18+
class RateLimitError(Exception):
19+
pass
20+
21+
assert classify_status(RateLimitError()) == "rate_limit"
22+
23+
def test_timeout(self):
24+
class APITimeoutError(Exception):
25+
pass
26+
27+
assert classify_status(APITimeoutError()) == "timeout"
28+
29+
def test_server_error(self):
30+
class InternalServerError(Exception):
31+
pass
32+
33+
assert classify_status(InternalServerError()) == "server_error"
34+
35+
class ServiceUnavailable(Exception):
36+
pass
37+
38+
assert classify_status(ServiceUnavailable()) == "server_error"
39+
40+
def test_network_error(self):
41+
class APIConnectionError(Exception):
42+
pass
43+
44+
assert classify_status(APIConnectionError()) == "network_error"
45+
46+
def test_client_error(self):
47+
for cls_name in ("BadRequestError", "AuthenticationError", "PermissionError"):
48+
cls = type(cls_name, (Exception,), {})
49+
assert classify_status(cls()) == "client_error"
50+
51+
def test_unknown_falls_back(self):
52+
class WeirdProviderException(Exception):
53+
pass
54+
55+
assert classify_status(WeirdProviderException()) == "other_error"
56+
57+
58+
class TestGetLLMMetrics:
59+
def test_returns_llm_metrics_instance(self, monkeypatch):
60+
monkeypatch.setattr(llm_metrics, "_llm_metrics", None)
61+
m = get_llm_metrics()
62+
assert isinstance(m, LLMMetrics)
63+
64+
def test_singleton_returns_same_instance(self, monkeypatch):
65+
monkeypatch.setattr(llm_metrics, "_llm_metrics", None)
66+
first = get_llm_metrics()
67+
second = get_llm_metrics()
68+
assert first is second
69+
70+
def test_instruments_exist(self, monkeypatch):
71+
monkeypatch.setattr(llm_metrics, "_llm_metrics", None)
72+
m = get_llm_metrics()
73+
for name in (
74+
"requests",
75+
"ttft_ms",
76+
"ttat_ms",
77+
"tps",
78+
"input_tokens",
79+
"output_tokens",
80+
"cached_input_tokens",
81+
"reasoning_tokens",
82+
):
83+
assert hasattr(m, name), f"missing instrument: {name}"
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
"""Tests for ``agentex.lib.core.observability.llm_metrics_hooks``."""
2+
3+
from __future__ import annotations
4+
5+
from unittest.mock import MagicMock
6+
7+
import pytest
8+
9+
import agentex.lib.core.observability.llm_metrics_hooks as hooks_module
10+
from agentex.lib.core.observability.llm_metrics_hooks import (
11+
LLMMetricsHooks,
12+
record_llm_failure,
13+
)
14+
15+
16+
def _mock_response(
17+
*,
18+
input_tokens: int = 100,
19+
output_tokens: int = 50,
20+
cached_tokens: int = 30,
21+
reasoning_tokens: int = 10,
22+
) -> MagicMock:
23+
response = MagicMock()
24+
response.usage.input_tokens = input_tokens
25+
response.usage.output_tokens = output_tokens
26+
response.usage.input_tokens_details.cached_tokens = cached_tokens
27+
response.usage.output_tokens_details.reasoning_tokens = reasoning_tokens
28+
return response
29+
30+
31+
def _mock_agent(model: str = "gpt-5") -> MagicMock:
32+
agent = MagicMock()
33+
agent.model = model
34+
return agent
35+
36+
37+
class TestLLMMetricsHooksOnLLMEnd:
38+
@pytest.mark.asyncio
39+
async def test_emits_success_request_counter(self, monkeypatch):
40+
m = MagicMock()
41+
monkeypatch.setattr(hooks_module, "get_llm_metrics", lambda: m)
42+
43+
await LLMMetricsHooks().on_llm_end(
44+
context=MagicMock(),
45+
agent=_mock_agent("gpt-5"),
46+
response=_mock_response(),
47+
)
48+
49+
m.requests.add.assert_called_once_with(1, {"model": "gpt-5", "status": "success"})
50+
51+
@pytest.mark.asyncio
52+
async def test_emits_token_counters(self, monkeypatch):
53+
m = MagicMock()
54+
monkeypatch.setattr(hooks_module, "get_llm_metrics", lambda: m)
55+
56+
await LLMMetricsHooks().on_llm_end(
57+
context=MagicMock(),
58+
agent=_mock_agent("gpt-5"),
59+
response=_mock_response(
60+
input_tokens=200,
61+
output_tokens=75,
62+
cached_tokens=50,
63+
reasoning_tokens=20,
64+
),
65+
)
66+
67+
attrs = {"model": "gpt-5"}
68+
m.input_tokens.add.assert_called_once_with(200, attrs)
69+
m.output_tokens.add.assert_called_once_with(75, attrs)
70+
m.cached_input_tokens.add.assert_called_once_with(50, attrs)
71+
m.reasoning_tokens.add.assert_called_once_with(20, attrs)
72+
73+
@pytest.mark.asyncio
74+
async def test_zero_tokens_emit_zero_not_skip(self, monkeypatch):
75+
m = MagicMock()
76+
monkeypatch.setattr(hooks_module, "get_llm_metrics", lambda: m)
77+
78+
await LLMMetricsHooks().on_llm_end(
79+
context=MagicMock(),
80+
agent=_mock_agent(),
81+
response=_mock_response(input_tokens=0, output_tokens=0, cached_tokens=0, reasoning_tokens=0),
82+
)
83+
84+
m.input_tokens.add.assert_called_once_with(0, {"model": "gpt-5"})
85+
m.output_tokens.add.assert_called_once_with(0, {"model": "gpt-5"})
86+
87+
@pytest.mark.asyncio
88+
async def test_unknown_model_falls_back(self, monkeypatch):
89+
m = MagicMock()
90+
monkeypatch.setattr(hooks_module, "get_llm_metrics", lambda: m)
91+
92+
agent = MagicMock()
93+
agent.model = None
94+
95+
await LLMMetricsHooks().on_llm_end(
96+
context=MagicMock(),
97+
agent=agent,
98+
response=_mock_response(),
99+
)
100+
101+
m.requests.add.assert_called_once_with(1, {"model": "unknown", "status": "success"})
102+
103+
@pytest.mark.asyncio
104+
async def test_swallows_exporter_failure(self, monkeypatch):
105+
m = MagicMock()
106+
m.requests.add.side_effect = RuntimeError("exporter exploded")
107+
monkeypatch.setattr(hooks_module, "get_llm_metrics", lambda: m)
108+
109+
# Should not raise — caller's flow must not break on metric failure.
110+
await LLMMetricsHooks().on_llm_end(
111+
context=MagicMock(),
112+
agent=_mock_agent(),
113+
response=_mock_response(),
114+
)
115+
116+
117+
class TestRecordLLMFailure:
118+
def test_emits_classified_status(self, monkeypatch):
119+
m = MagicMock()
120+
monkeypatch.setattr(hooks_module, "get_llm_metrics", lambda: m)
121+
122+
class RateLimitError(Exception):
123+
pass
124+
125+
record_llm_failure("gpt-5", RateLimitError())
126+
127+
m.requests.add.assert_called_once_with(1, {"model": "gpt-5", "status": "rate_limit"})
128+
129+
def test_swallows_exporter_failure(self, monkeypatch):
130+
m = MagicMock()
131+
m.requests.add.side_effect = RuntimeError("exporter exploded")
132+
monkeypatch.setattr(hooks_module, "get_llm_metrics", lambda: m)
133+
134+
# Should not raise.
135+
record_llm_failure("gpt-5", Exception("upstream"))

src/agentex/lib/core/temporal/plugins/openai_agents/hooks/hooks.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,19 @@
88
from typing import Any, override
99
from datetime import timedelta
1010

11-
from agents import Tool, Agent, RunHooks, RunContextWrapper
11+
from agents import Tool, Agent, RunContextWrapper
1212
from temporalio import workflow
1313
from agents.tool_context import ToolContext
1414

1515
from agentex.types.text_content import TextContent
1616
from agentex.types.task_message_content import ToolRequestContent, ToolResponseContent
17+
from agentex.lib.core.observability.llm_metrics_hooks import LLMMetricsHooks
1718
from agentex.lib.core.temporal.plugins.openai_agents.hooks.activities import stream_lifecycle_content
1819

1920
logger = logging.getLogger(__name__)
2021

2122

22-
class TemporalStreamingHooks(RunHooks):
23+
class TemporalStreamingHooks(LLMMetricsHooks):
2324
"""Convenience hooks class for streaming OpenAI Agent lifecycle events to the AgentEx UI.
2425
2526
This class automatically streams agent lifecycle events (tool calls, handoffs) to the

src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py

Lines changed: 12 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@
3232
# Re-export the canonical StreamingMode literal from the streaming service so
3333
# all layers share a single definition.
3434
from agentex.lib.core.services.adk.streaming import StreamingMode as StreamingMode
35-
from agentex.lib.core.observability.llm_metrics import classify_status, get_llm_metrics
35+
from agentex.lib.core.observability.llm_metrics import get_llm_metrics
36+
from agentex.lib.core.observability.llm_metrics_hooks import record_llm_failure
3637

3738
try:
3839
from agents.tool import ShellTool # type: ignore[attr-defined]
@@ -1026,34 +1027,24 @@ async def get_response(
10261027

10271028
span.output = output_data
10281029

1029-
# Emit LLM metrics derived from the captured stream. The meter is a
1030-
# no-op if the application hasn't configured a MeterProvider, so this
1031-
# is safe to do unconditionally. We only emit ttft / tps when their
1032-
# input data is actually meaningful (got a content delta, got tokens).
1030+
# Streaming-only metrics. Token counters and the success request
1031+
# counter are emitted by LLMMetricsHooks.on_llm_end so they fire
1032+
# consistently across streaming and non-streaming paths.
10331033
m = get_llm_metrics()
10341034
metric_attrs = {"model": self.model_name}
1035-
m.requests.add(1, {**metric_attrs, "status": "success"})
1036-
m.input_tokens.add(usage.input_tokens or 0, metric_attrs)
1037-
m.output_tokens.add(usage.output_tokens or 0, metric_attrs)
1038-
m.cached_input_tokens.add(usage.input_tokens_details.cached_tokens or 0, metric_attrs)
1039-
m.reasoning_tokens.add(usage.output_tokens_details.reasoning_tokens or 0, metric_attrs)
10401035
if first_token_at is not None:
10411036
m.ttft_ms.record((first_token_at - stream_start_perf) * 1000, metric_attrs)
10421037
if first_answer_at is not None:
10431038
m.ttat_ms.record((first_answer_at - stream_start_perf) * 1000, metric_attrs)
1044-
# tps denominator is the generation window (first→last delta), not
1045-
# total stream wall time — see LLMMetrics for rationale. Single-token
1046-
# responses (where first_token_at == last_token_at, e.g. a one-token
1047-
# tool-result acknowledgement) collapse the window to 0 and are
1048-
# intentionally skipped — TPS is undefined in that case.
1039+
# Single-token responses collapse the generation window to 0; tps
1040+
# is undefined and skipped.
10491041
if (
10501042
first_token_at is not None
10511043
and last_token_at is not None
10521044
and last_token_at > first_token_at
10531045
and (usage.output_tokens or 0) > 0
10541046
):
1055-
generation_window_s = last_token_at - first_token_at
1056-
m.tps.record(usage.output_tokens / generation_window_s, metric_attrs)
1047+
m.tps.record(usage.output_tokens / (last_token_at - first_token_at), metric_attrs)
10571048

10581049
# Return the response. response_id is the server-issued id from
10591050
# ResponseCompletedEvent.response.id, or None when the stream ended
@@ -1070,18 +1061,10 @@ async def get_response(
10701061

10711062
except Exception as e:
10721063
logger.error(f"Error using Responses API: {e}")
1073-
# Emit a request-counter event so 429s, 5xxs, timeouts, etc. are
1074-
# observable on the SDK side. Status histograms / token counters
1075-
# only fire on successful completion above. Wrapped in a bare
1076-
# try/except so a misbehaving exporter can't shadow the original
1077-
# LLM exception — callers (retry logic, circuit breakers) need
1078-
# to see the typed RateLimitError / APITimeoutError / etc.
1079-
try:
1080-
get_llm_metrics().requests.add(
1081-
1, {"model": self.model_name, "status": classify_status(e)}
1082-
)
1083-
except Exception:
1084-
pass
1064+
# LLMMetricsHooks.on_llm_end doesn't fire on error, so emit the
1065+
# failure counter here. Best-effort so the typed LLM exception
1066+
# always propagates intact for retry / circuit-breaker logic.
1067+
record_llm_failure(self.model_name, e)
10851068
raise
10861069

10871070
# The _get_response_with_responses_api method has been merged into get_response above

0 commit comments

Comments
 (0)