Skip to content

Commit da85d7b

Browse files
committed
review (stas): extract llm metrics to core/observability + add request counter
Two follow-up changes from the PR review: 1. Move the LLM metric instruments from _StreamingMetrics in temporal_streaming_model.py to a new module: agentex.lib.core.observability.llm_metrics Public API: get_llm_metrics() returns a singleton LLMMetrics with the same six instruments (ttft, tps, input_tokens, output_tokens, cached_input_tokens, reasoning_tokens) plus a new requests counter. This makes the temporal+openai_agents plugin one of several future call sites — the sync ACP path and the Claude SDK plugin can record to the same instruments without redefining names, units, or descriptions. Keeps cross-provider naming consistent. 2. Add agentex.llm.requests counter with a status label so 429s, 5xxs, timeouts, and other failures are observable on the SDK side without scraping logs. classify_status() maps exception types to a small fixed set (success / rate_limit / server_error / client_error / timeout / network_error / other_error) by class name, so it works across OpenAI, Anthropic, and other provider SDKs that use similar exception naming. Recorded in two places: success path (alongside token counters) and the existing get_response except handler (so terminal failures emit a counter event before re-raising). Cardinality remains bounded — model + status (7 values) on the counter; all other metrics keep just `model`.
1 parent 6209b20 commit da85d7b

3 files changed

Lines changed: 128 additions & 61 deletions

File tree

src/agentex/lib/core/observability/__init__.py

Whitespace-only changes.
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
"""OTel metrics for LLM calls.
2+
3+
Single source of truth for LLM-call instrumentation across all agentex code
4+
paths — temporal+openai_agents streaming today, sync ACP and the Claude SDK
5+
plugin in future PRs. Centralizing the instrument definitions here means
6+
those follow-ups don't need to redefine the metric names, units, or
7+
description strings; they import ``get_llm_metrics()`` and record values.
8+
9+
The meter is no-op when the application hasn't configured a ``MeterProvider``,
10+
so importing this module is safe for runtimes that don't use OTel. Instruments
11+
are created lazily on first ``get_llm_metrics()`` call so a ``MeterProvider``
12+
configured *after* this module is imported still binds correctly.
13+
14+
Cardinality is bounded:
15+
- All metrics carry only ``model`` (the LLM model name).
16+
- ``requests`` additionally carries ``status``, drawn from a small fixed set
17+
(see ``classify_status``).
18+
19+
Resource attributes (``service.name``, ``k8s.*``, etc.) come from the
20+
application's OTel resource configuration and are added to every series
21+
automatically.
22+
"""
23+
24+
from __future__ import annotations
25+
26+
from typing import Optional
27+
28+
from opentelemetry import metrics
29+
30+
31+
class LLMMetrics:
32+
"""Lazily-created OTel instruments for LLM call telemetry."""
33+
34+
def __init__(self) -> None:
35+
meter = metrics.get_meter("agentex.llm")
36+
self.requests = meter.create_counter(
37+
name="agentex.llm.requests",
38+
unit="1",
39+
description=(
40+
"LLM call count tagged with status (success / rate_limit / "
41+
"server_error / client_error / timeout / network_error / "
42+
"other_error). Use to alert on 429s, 5xxs, etc."
43+
),
44+
)
45+
self.ttft_ms = meter.create_histogram(
46+
name="agentex.llm.ttft",
47+
unit="ms",
48+
description="Time from request submission to first content token (ms)",
49+
)
50+
# Note: TPS denominator is the model-generation window
51+
# (last_token_time - first_token_time), not total stream wall time.
52+
# This isolates raw model throughput from event-loop / tool-call latency.
53+
self.tps = meter.create_histogram(
54+
name="agentex.llm.tps",
55+
unit="tokens/s",
56+
description="Output tokens per second over the generation window",
57+
)
58+
self.input_tokens = meter.create_counter(
59+
name="agentex.llm.input_tokens",
60+
unit="tokens",
61+
description="Total input tokens sent to the LLM",
62+
)
63+
self.output_tokens = meter.create_counter(
64+
name="agentex.llm.output_tokens",
65+
unit="tokens",
66+
description="Total output tokens returned by the LLM",
67+
)
68+
self.cached_input_tokens = meter.create_counter(
69+
name="agentex.llm.cached_input_tokens",
70+
unit="tokens",
71+
description="Subset of input tokens served from prompt cache",
72+
)
73+
self.reasoning_tokens = meter.create_counter(
74+
name="agentex.llm.reasoning_tokens",
75+
unit="tokens",
76+
description="Output tokens spent on reasoning (subset of output_tokens)",
77+
)
78+
79+
80+
_llm_metrics: Optional[LLMMetrics] = None
81+
82+
83+
def get_llm_metrics() -> LLMMetrics:
84+
"""Return the LLM metrics singleton, creating it on first use."""
85+
global _llm_metrics
86+
if _llm_metrics is None:
87+
_llm_metrics = LLMMetrics()
88+
return _llm_metrics
89+
90+
91+
def classify_status(exc: Optional[BaseException]) -> str:
92+
"""Categorize an LLM call's outcome into a small fixed set of status labels.
93+
94+
A successful call returns ``"success"``. Exceptions are mapped by type name
95+
so we don't depend on a specific provider SDK's exception class hierarchy:
96+
OpenAI, Anthropic, and other providers all use names like ``RateLimitError``,
97+
``APITimeoutError``, ``InternalServerError``, etc.
98+
"""
99+
if exc is None:
100+
return "success"
101+
name = type(exc).__name__
102+
if "RateLimit" in name:
103+
return "rate_limit"
104+
if "Timeout" in name:
105+
return "timeout"
106+
if any(s in name for s in ("ServerError", "InternalServer", "ServiceUnavailable", "BadGateway")):
107+
return "server_error"
108+
if "Connection" in name:
109+
return "network_error"
110+
if any(s in name for s in ("BadRequest", "Authentication", "Permission", "NotFound", "Conflict", "UnprocessableEntity")):
111+
return "client_error"
112+
return "other_error"

src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py

Lines changed: 16 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,12 @@
2727
CodeInterpreterTool,
2828
ImageGenerationTool,
2929
)
30-
from opentelemetry import metrics
3130
from agents.computer import Computer, AsyncComputer
3231

3332
# Re-export the canonical StreamingMode literal from the streaming service so
3433
# all layers share a single definition.
3534
from agentex.lib.core.services.adk.streaming import StreamingMode as StreamingMode
35+
from agentex.lib.core.observability.llm_metrics import classify_status, get_llm_metrics
3636

3737
try:
3838
from agents.tool import ShellTool # type: ignore[attr-defined]
@@ -80,61 +80,9 @@
8080
logger = make_logger("agentex.temporal.streaming")
8181

8282

83-
# OTel metrics for LLM streaming behavior. Instruments are created lazily on
84-
# first use so the meter resolves to whatever MeterProvider the application
85-
# eventually configures, even if that happens after this module is imported.
86-
# All metrics carry only a ``model`` attribute to keep cardinality bounded;
87-
# resource attributes (service.name, k8s.*, etc.) come from the application's
88-
# OTel resource configuration.
89-
class _StreamingMetrics:
90-
"""Lazily-created OTel instruments for streaming LLM telemetry."""
91-
92-
def __init__(self) -> None:
93-
meter = metrics.get_meter("agentex.openai_agents.streaming")
94-
self.ttft_ms = meter.create_histogram(
95-
name="agentex.llm.ttft",
96-
unit="ms",
97-
description="Time from request submission to first content token (ms)",
98-
)
99-
# Note: TPS denominator is the model-generation window
100-
# (last_token_time - first_token_time), not total stream wall time.
101-
# This isolates raw model throughput from event-loop / tool-call latency.
102-
self.tps = meter.create_histogram(
103-
name="agentex.llm.tps",
104-
unit="tokens/s",
105-
description="Output tokens per second over the generation window",
106-
)
107-
self.input_tokens = meter.create_counter(
108-
name="agentex.llm.input_tokens",
109-
unit="tokens",
110-
description="Total input tokens sent to the LLM",
111-
)
112-
self.output_tokens = meter.create_counter(
113-
name="agentex.llm.output_tokens",
114-
unit="tokens",
115-
description="Total output tokens returned by the LLM",
116-
)
117-
self.cached_input_tokens = meter.create_counter(
118-
name="agentex.llm.cached_input_tokens",
119-
unit="tokens",
120-
description="Subset of input tokens served from prompt cache",
121-
)
122-
self.reasoning_tokens = meter.create_counter(
123-
name="agentex.llm.reasoning_tokens",
124-
unit="tokens",
125-
description="Output tokens spent on reasoning (subset of output_tokens)",
126-
)
127-
128-
129-
_streaming_metrics: Optional[_StreamingMetrics] = None
130-
131-
132-
def _get_streaming_metrics() -> _StreamingMetrics:
133-
"""Return the streaming metrics singleton, creating it on first use."""
134-
global _streaming_metrics
135-
if _streaming_metrics is None:
136-
_streaming_metrics = _StreamingMetrics()
137-
return _streaming_metrics
83+
# LLM metrics live in agentex.lib.core.observability.llm_metrics so other
84+
# code paths (sync ACP, Claude SDK plugin, future provider integrations)
85+
# can share the same instrument definitions without redefining names.
13886

13987

14088
def _serialize_item(item: Any) -> dict[str, Any]:
@@ -1070,19 +1018,20 @@ async def get_response(
10701018
# no-op if the application hasn't configured a MeterProvider, so this
10711019
# is safe to do unconditionally. We only emit ttft / tps when their
10721020
# input data is actually meaningful (got a content delta, got tokens).
1073-
m = _get_streaming_metrics()
1021+
m = get_llm_metrics()
10741022
metric_attrs = {"model": self.model_name}
1023+
m.requests.add(1, {**metric_attrs, "status": "success"})
10751024
m.input_tokens.add(usage.input_tokens or 0, metric_attrs)
10761025
m.output_tokens.add(usage.output_tokens or 0, metric_attrs)
10771026
m.cached_input_tokens.add(usage.input_tokens_details.cached_tokens or 0, metric_attrs)
10781027
m.reasoning_tokens.add(usage.output_tokens_details.reasoning_tokens or 0, metric_attrs)
10791028
if first_token_at is not None:
10801029
m.ttft_ms.record((first_token_at - stream_start_perf) * 1000, metric_attrs)
10811030
# tps denominator is the generation window (first→last delta), not
1082-
# total stream wall time — see _StreamingMetrics for rationale.
1083-
# Note: single-token responses (where first_token_at == last_token_at,
1084-
# e.g. a one-token tool-result acknowledgement) collapse the window
1085-
# to 0 and are intentionally skipped — TPS is undefined in that case.
1031+
# total stream wall time — see LLMMetrics for rationale. Single-token
1032+
# responses (where first_token_at == last_token_at, e.g. a one-token
1033+
# tool-result acknowledgement) collapse the window to 0 and are
1034+
# intentionally skipped — TPS is undefined in that case.
10861035
if (
10871036
first_token_at is not None
10881037
and last_token_at is not None
@@ -1107,6 +1056,12 @@ async def get_response(
11071056

11081057
except Exception as e:
11091058
logger.error(f"Error using Responses API: {e}")
1059+
# Emit a request-counter event so 429s, 5xxs, timeouts, etc. are
1060+
# observable on the SDK side. Status histograms / token counters
1061+
# only fire on successful completion above.
1062+
get_llm_metrics().requests.add(
1063+
1, {"model": self.model_name, "status": classify_status(e)}
1064+
)
11101065
raise
11111066

11121067
# The _get_response_with_responses_api method has been merged into get_response above

0 commit comments

Comments
 (0)