feat(observability): add messages genai span attributes#68
Conversation
|
Caution Review failedPull request was closed or merged during review 📝 WalkthroughWalkthroughCentralizes message-view and span-property helpers under Changes
Sequence DiagramsequenceDiagram
participant Client
participant Handler as Messages Handler
participant Span as Span (fastrace)
participant Gateway
participant Collector as StreamOutputCollector
Client->>Handler: POST /v1/messages
Handler->>Span: create & enter span "aisix.llm.messages"
Handler->>Span: apply request span properties (span_attributes)
alt Non-streaming response
Handler->>Gateway: call gateway
Gateway-->>Handler: AnthropicMessagesResponse
Handler->>Span: build output views -> apply response & usage properties
Handler-->>Client: HTTP response
else Streaming response
Handler->>Gateway: stream gateway
Gateway-->>Handler: stream events
loop For each event
Handler->>Collector: record event (MessageStart/ContentBlockDelta/MessageDelta)
Handler->>Span: apply chunk span properties
end
alt Stream completed
Handler->>Collector: build output_message_views
Handler->>Span: apply message-level properties
Handler->>Span: apply usage properties (sync or spawn observer)
else Stream error
Handler->>Span: set error.type=stream_error
Handler->>Span: flush collector properties
end
Handler-->>Client: SSE stream
end
Span->>Span: exit & record properties
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Possibly related PRs
Important Pre-merge checks failedPlease resolve all errors before merging. Addressing warnings is optional. ❌ Failed checks (1 error, 1 warning)
✅ Passed checks (4 passed)
✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Review rate limit: 3/5 reviews remaining, refill in 17 minutes and 45 seconds. Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/proxy/handlers/messages/mod.rs`:
- Around line 215-221: The stream-error branch currently hands off usage_rx to
spawn_stream_usage_observer and drops the span, losing usage attributes if usage
is already ready; modify the Some(Err(err)) arm to mirror the EOF fast-path by
calling try_recv() on the usage_rx (the same receiver variable), and if it
yields Some(usage) attach span.add_property(|| usage_span_properties(&usage))
(or add_properties with usage_span_properties()) and also call
span.add_properties(|| output_collector.output_message_span_properties()) before
deciding whether to spawn_stream_usage_observer(request_ctx.clone(), usage_rx) —
only move the receiver into spawn_stream_usage_observer when try_recv() returns
None. Ensure you reference the same usage_rx, span, spawn_stream_usage_observer,
usage_span_properties(), and output_collector.output_message_span_properties()
symbols.
- Around line 190-195: The first-token logic currently fires on idx == 0 which
triggers on the initial SSE "message_start" event; change the condition so
record_first_token_latency(&mut request_ctx).await and the TraceEvent "first
token arrived" are only executed for the first content-bearing SSE event (i.e.,
when idx == 0 AND the event contains actual output text/content rather than a
"message_start" control event). Locate the block around idx == 0 and update the
guard to check the event payload/type (skip events labeled "message_start" or
missing content) before calling hooks::observability::record_first_token_latency
and adding the TraceEvent.
In `@src/proxy/handlers/messages/span_attributes/message_attributes.rs`:
- Around line 81-86: The function image_source_to_data_url currently returns
full data URIs for ImageSource entries with type "base64", which can leak large
payloads into spans; change image_source_to_data_url (and any callers expecting
the full string) to avoid embedding raw base64 bodies and instead return
metadata only — e.g., include source.media_type plus either the byte length or a
short checksum/hex hash of source.data (or a sentinel like
"inline-base64-omitted") so callers (gen_ai.* message builders / OpenInference
fields) log media_type and size/hash but never the full payload. Ensure the
function still handles non-base64 sources by returning source.data unchanged and
update any downstream uses that assumed the full data URI to consume the
metadata token instead.
In `@src/proxy/handlers/messages/span_attributes/tests.rs`:
- Around line 1-2: Add the pretty_assertions assert_eq import to the test module
so failures show diffs: in src/proxy/handlers/messages/span_attributes/tests.rs
add `use pretty_assertions::assert_eq;` alongside the existing `use
serde_json::{Value, json};` import (or replace any local shadowed assert_eq
imports), then run the tests to ensure all existing `assert_eq!` calls in this
module now use pretty_assertions' assert_eq for clearer output.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 760da4f6-f870-4eab-a272-ffbe58cbf633
📒 Files selected for processing (15)
src/proxy/handlers/chat_completions/span_attributes/message_attributes.rssrc/proxy/handlers/chat_completions/span_attributes/mod.rssrc/proxy/handlers/chat_completions/span_attributes/stream_output.rssrc/proxy/handlers/chat_completions/span_attributes/telemetry.rssrc/proxy/handlers/messages/mod.rssrc/proxy/handlers/messages/span_attributes/message_attributes.rssrc/proxy/handlers/messages/span_attributes/mod.rssrc/proxy/handlers/messages/span_attributes/stream_output.rssrc/proxy/handlers/messages/span_attributes/telemetry.rssrc/proxy/handlers/messages/span_attributes/tests.rssrc/proxy/mod.rssrc/proxy/utils/mod.rssrc/proxy/utils/trace/mod.rssrc/proxy/utils/trace/span_attributes.rssrc/proxy/utils/trace/span_message_attributes.rs
| Some(Err(err)) => { | ||
| error!("Gateway stream error: {}", err); | ||
| span.add_property(|| ("error.type", "stream_error")); | ||
| span.add_properties(|| output_collector.output_message_span_properties()); | ||
| if let Some(usage_rx) = usage_rx.take() { | ||
| spawn_stream_usage_observer(request_ctx.clone(), usage_rx); | ||
| } |
There was a problem hiding this comment.
Mirror the EOF usage fast-path on stream errors.
The EOF branch does a try_recv() and attaches usage_span_properties(), but this branch immediately hands the receiver to spawn_stream_usage_observer() and then drops the span. If usage is already available, failed streams still lose all usage attributes in tracing. Reuse the same try_recv() logic here before giving up the span.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/proxy/handlers/messages/mod.rs` around lines 215 - 221, The stream-error
branch currently hands off usage_rx to spawn_stream_usage_observer and drops the
span, losing usage attributes if usage is already ready; modify the
Some(Err(err)) arm to mirror the EOF fast-path by calling try_recv() on the
usage_rx (the same receiver variable), and if it yields Some(usage) attach
span.add_property(|| usage_span_properties(&usage)) (or add_properties with
usage_span_properties()) and also call span.add_properties(||
output_collector.output_message_span_properties()) before deciding whether to
spawn_stream_usage_observer(request_ctx.clone(), usage_rx) — only move the
receiver into spawn_stream_usage_observer when try_recv() returns None. Ensure
you reference the same usage_rx, span, spawn_stream_usage_observer,
usage_span_properties(), and output_collector.output_message_span_properties()
symbols.
| pub(super) fn image_source_to_data_url(source: &ImageSource) -> String { | ||
| if source.r#type == "base64" { | ||
| format!("data:{};base64,{}", source.media_type, source.data) | ||
| } else { | ||
| source.data.clone() | ||
| } |
There was a problem hiding this comment.
Avoid tracing raw base64 image payloads.
For base64 sources this builds a full data: URI, and that value is later copied into gen_ai.*.messages and OpenInference message content fields. A single inline image can blow span-size limits and leak the full image payload into telemetry. Record metadata only (for example media type, size, or a hash) or skip inline-image bodies entirely.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/proxy/handlers/messages/span_attributes/message_attributes.rs` around
lines 81 - 86, The function image_source_to_data_url currently returns full data
URIs for ImageSource entries with type "base64", which can leak large payloads
into spans; change image_source_to_data_url (and any callers expecting the full
string) to avoid embedding raw base64 bodies and instead return metadata only —
e.g., include source.media_type plus either the byte length or a short
checksum/hex hash of source.data (or a sentinel like "inline-base64-omitted") so
callers (gen_ai.* message builders / OpenInference fields) log media_type and
size/hash but never the full payload. Ensure the function still handles
non-base64 sources by returning source.data unchanged and update any downstream
uses that assumed the full data URI to consume the metadata token instead.
There was a problem hiding this comment.
Create an issue for this.
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
gh issue create \
--repo api7/aisix \
--title "fix(observability): avoid embedding raw base64 image payloads in span attributes" \
--body "## Problem
In \`src/proxy/handlers/messages/span_attributes/message_attributes.rs\`, the function \`image_source_to_data_url\` currently constructs a full \`data:<media_type>;base64,<data>\` URI for \`ImageSource\` entries with type \`\"base64\"\`. This value is propagated into \`gen_ai.*\` message JSON and OpenInference message content span properties.
This has two risks:
1. **Span size limits** — A single inline image can easily exceed span size limits in observability backends (e.g., Jaeger, OTLP collectors), causing spans to be dropped or truncated.
2. **Data leakage** — Full image payloads are recorded verbatim into telemetry, which may not be appropriate.
## Expected Behaviour
\`image_source_to_data_url\` (and any callers) should record metadata only — e.g., \`media_type\` plus either the byte length or a short checksum/hash of \`source.data\`, or a sentinel like \`\"inline-base64-omitted\"\`. Non-base64 sources (\`url\` type) should continue to return \`source.data\` unchanged. Downstream consumers (\`content_parts_from_blocks\`, stream output collector, etc.) should be updated to handle the metadata token.
## References
- PR: https://github.com/api7/aisix/pull/68
- Review comment: https://github.com/api7/aisix/pull/68#discussion_r3159393407
/cc `@bzp2010`" \
--assignee bzp2010
Length of output: 40
@bzp2010, I've created the follow-up issue: #69
It tracks the concern about image_source_to_data_url embedding full base64 data URIs into span attributes, covering both the span-size and data-leakage risks, and is assigned to you.
🧠 Learnings used
Learnt from: bzp2010
Repo: api7/aisix PR: 34
File: src/proxy/hooks2/authorization/mod.rs:1-1
Timestamp: 2026-04-10T09:35:05.745Z
Learning: In Rust code using the `anyhow` crate, treat `use anyhow::Result;` as potentially used even if the code uses non-default error types like `Result<T, SomeOtherError>` (i.e., `Result<T, E>` where `E` is not `anyhow::Error`). Because `anyhow::Result` is an alias for `std::result::Result<T, E=anyhow::Error>`, the import is referenced by the `Result<..., ...>` syntax. Do not flag `use anyhow::Result;` as unused solely due to the presence of `Result<T, SomeOtherError>` usages.
Learnt from: AlinsRan
Repo: api7/aisix PR: 46
File: src/lib.rs:122-126
Timestamp: 2026-04-17T09:32:54.079Z
Learning: Avoid adding `#[fastrace::trace]` to top-level process-lifetime entrypoints in `src/lib.rs`—specifically `run()`, `run_with_config()`, and `run_with_provider()`. These functions cover the entire server lifetime (startup → shutdown), while Fastrace spans should be used for request-scoped boundaries (e.g., HTTP/RPC handler functions) rather than global process entrypoints.
There was a problem hiding this comment.
♻️ Duplicate comments (1)
src/proxy/handlers/messages/mod.rs (1)
215-233:⚠️ Potential issue | 🟡 MinorMirror the EOF
try_recv()pattern for stream errors.The error branch spawns the usage observer unconditionally, but the EOF branch (lines 238-264) first attempts
try_recv()to capture usage synchronously before the span is dropped. If usage is already available when an error occurs, it should be attached to the span immediately rather than lost.🔧 Suggested fix
Some(Err(err)) => { error!("Gateway stream error: {}", err); span.add_property(|| ("error.type", "stream_error")); span.add_properties(|| output_collector.output_message_span_properties()); - if let Some(usage_rx) = usage_rx.take() { - spawn_stream_usage_observer(request_ctx.clone(), usage_rx); + if let Some(mut usage_rx) = usage_rx.take() { + match usage_rx.try_recv() { + Ok(usage) => { + span.add_properties(|| usage_span_properties(&usage)); + } + Err(TryRecvError::Empty) => { + spawn_stream_usage_observer(request_ctx.clone(), usage_rx); + } + Err(TryRecvError::Closed) => { + error!("Failed to receive streaming usage from gateway: channel closed"); + } + } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/proxy/handlers/messages/mod.rs` around lines 215 - 233, The stream-error branch currently unconditionally spawns the usage observer and drops the span without first attempting to synchronously capture outstanding usage like the EOF branch does; update the Some(Err(err)) arm to mirror the EOF try_recv() pattern: if let Some(mut usage_rx) = usage_rx.take() then first attempt to capture usage synchronously via try_recv() (attach any returned usage to the span using span.add_property/add_properties and output_collector.output_message_span_properties()), and only call spawn_stream_usage_observer(request_ctx.clone(), usage_rx) if try_recv() did not yield usage; keep the existing error event creation (anthropic_error_sse_event(err.to_string())) and other returned tuple elements unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@src/proxy/handlers/messages/mod.rs`:
- Around line 215-233: The stream-error branch currently unconditionally spawns
the usage observer and drops the span without first attempting to synchronously
capture outstanding usage like the EOF branch does; update the Some(Err(err))
arm to mirror the EOF try_recv() pattern: if let Some(mut usage_rx) =
usage_rx.take() then first attempt to capture usage synchronously via try_recv()
(attach any returned usage to the span using span.add_property/add_properties
and output_collector.output_message_span_properties()), and only call
spawn_stream_usage_observer(request_ctx.clone(), usage_rx) if try_recv() did not
yield usage; keep the existing error event creation
(anthropic_error_sse_event(err.to_string())) and other returned tuple elements
unchanged.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: f89fd655-3cb4-41f0-9f3b-d6d4e297acce
📒 Files selected for processing (2)
src/proxy/handlers/messages/mod.rssrc/proxy/handlers/messages/span_attributes/tests.rs
Summary by CodeRabbit
New Features
Refactor
Tests