From e9d94eed4e5b381f43decc27d76250fa0919ecd8 Mon Sep 17 00:00:00 2001 From: bzp2010 Date: Wed, 29 Apr 2026 00:31:38 +0800 Subject: [PATCH 1/4] feat(observability): add chat completion genai attributes --- src/config/entities/mod.rs | 12 +- src/config/entities/models.rs | 11 +- src/config/entities/providers.rs | 4 + src/gateway/providers/azure.rs | 23 +- src/gateway/providers/bedrock.rs | 21 +- src/gateway/providers/gemini.rs | 23 +- src/gateway/traits/chat_format.rs | 1 + src/gateway/traits/mod.rs | 2 +- src/gateway/traits/provider.rs | 34 +- src/proxy/handlers/chat_completions/mod.rs | 172 ++++-- .../span_attributes/message_attributes.rs | 364 +++++++++++++ .../chat_completions/span_attributes/mod.rs | 12 + .../span_attributes/stream_output.rs | 117 ++++ .../span_attributes/telemetry.rs | 504 ++++++++++++++++++ .../chat_completions/span_attributes/tests.rs | 251 +++++++++ src/proxy/handlers/embeddings/mod.rs | 14 +- src/proxy/handlers/messages/mod.rs | 11 +- src/proxy/hooks/authorization/mod.rs | 2 +- src/proxy/middlewares/auth.rs | 8 + src/proxy/middlewares/trace.rs | 54 +- src/proxy/provider.rs | 21 +- src/utils/future.rs | 28 +- src/utils/observability/mod.rs | 2 +- 23 files changed, 1594 insertions(+), 97 deletions(-) create mode 100644 src/proxy/handlers/chat_completions/span_attributes/message_attributes.rs create mode 100644 src/proxy/handlers/chat_completions/span_attributes/mod.rs create mode 100644 src/proxy/handlers/chat_completions/span_attributes/stream_output.rs create mode 100644 src/proxy/handlers/chat_completions/span_attributes/telemetry.rs create mode 100644 src/proxy/handlers/chat_completions/span_attributes/tests.rs diff --git a/src/config/entities/mod.rs b/src/config/entities/mod.rs index c58f41b..7b26f3b 100644 --- a/src/config/entities/mod.rs +++ b/src/config/entities/mod.rs @@ -165,7 +165,7 @@ impl ResourceStore { deleted } - #[cfg(test)] + /// Get an entry by its primary key fn get(&self, key: &str) -> Option> { self.data.load().primary.get(key).cloned() } @@ -295,16 +295,16 @@ impl EntityStore { Self { store } } + /// Get a snapshot of all entries + pub fn list(&self) -> Arc>> { + self.store.primary_snapshot() + } + /// Get the value of the specified key - #[cfg(test)] pub fn get(&self, key: &str) -> Option> { self.store.get(key) } - pub fn list(&self) -> Arc>> { - self.store.primary_snapshot() - } - /// Get an entry via a secondary index fn get_by_secondary(&self, index: &str, sec_key: &str) -> Option> { self.store.get_by_secondary(index, sec_key) diff --git a/src/config/entities/models.rs b/src/config/entities/models.rs index 07e772b..bd019ff 100644 --- a/src/config/entities/models.rs +++ b/src/config/entities/models.rs @@ -8,7 +8,10 @@ use utoipa::ToSchema; use super::{ConfigProvider, EntityStore, IndexFn, ResourceEntry}; use crate::{ - config::entities::types::{HasRateLimit, RateLimit, RateLimitMetric}, + config::entities::{ + Provider, ResourceRegistry, + types::{HasRateLimit, RateLimit, RateLimitMetric}, + }, utils::jsonschema::format_evaluation_error, }; @@ -32,6 +35,12 @@ pub struct Model { pub rate_limit: Option, } +impl Model { + pub fn provider(&self, resources: &ResourceRegistry) -> Option> { + resources.providers.get_by_id(&self.provider_id) + } +} + impl HasRateLimit for ResourceEntry { fn rate_limit(&self) -> Option { self.rate_limit.clone() diff --git a/src/config/entities/providers.rs b/src/config/entities/providers.rs index 28889b0..a3fda66 100644 --- a/src/config/entities/providers.rs +++ b/src/config/entities/providers.rs @@ -98,6 +98,10 @@ impl ProvidersStore { pub fn list(&self) -> Arc>> { self.store.list() } + + pub fn get_by_id(&self, id: &str) -> Option> { + self.store.get(id) + } } #[cfg(test)] diff --git a/src/gateway/providers/azure.rs b/src/gateway/providers/azure.rs index c9aa4dc..310a990 100644 --- a/src/gateway/providers/azure.rs +++ b/src/gateway/providers/azure.rs @@ -9,7 +9,7 @@ use crate::gateway::{ provider_instance::ProviderAuth, traits::{ ChatTransform, CompatQuirks, EmbedTransform, ProviderCapabilities, ProviderMeta, - provider::encode_path_segment, + ProviderSemanticConventions, provider::encode_path_segment, }, types::{ embed::{EmbedRequestBody, EmbeddingRequest}, @@ -51,6 +51,14 @@ impl ProviderMeta for AzureDef { DEFAULT_BASE_URL } + fn semantic_conventions(&self) -> ProviderSemanticConventions { + ProviderSemanticConventions { + gen_ai_provider_name: "azure.ai.openai", + llm_system: "openai", + llm_provider: Some("azure"), + } + } + fn chat_endpoint_path(&self, model: &str) -> Cow<'static, str> { Cow::Owned(format!( "/openai/deployments/{}/chat/completions", @@ -124,7 +132,10 @@ mod tests { use super::{AzureDef, DEFAULT_API_VERSION}; use crate::gateway::{ provider_instance::ProviderAuth, - traits::{ChatTransform, EmbedTransform, ProviderCapabilities, ProviderMeta}, + traits::{ + ChatTransform, EmbedTransform, ProviderCapabilities, ProviderMeta, + ProviderSemanticConventions, + }, types::{embed::EmbedRequestBody, openai::ChatCompletionRequest}, }; @@ -171,6 +182,14 @@ mod tests { ); assert_eq!(embed_url.query(), Some("api-version=v1")); assert!(provider.as_embed_transform().is_some()); + assert_eq!( + provider.semantic_conventions(), + ProviderSemanticConventions { + gen_ai_provider_name: "azure.ai.openai", + llm_system: "openai", + llm_provider: Some("azure"), + } + ); } #[test] diff --git a/src/gateway/providers/bedrock.rs b/src/gateway/providers/bedrock.rs index a62e9f3..8bef8ba 100644 --- a/src/gateway/providers/bedrock.rs +++ b/src/gateway/providers/bedrock.rs @@ -17,7 +17,7 @@ use crate::gateway::{ provider_instance::ProviderAuth, traits::{ ChatStreamState, ChatTransform, PreparedRequest, ProviderCapabilities, ProviderMeta, - StreamReaderKind, provider::encode_path_segment, + ProviderSemanticConventions, StreamReaderKind, provider::encode_path_segment, }, types::openai::{ChatCompletionChunk, ChatCompletionRequest, ChatCompletionResponse}, }; @@ -77,6 +77,14 @@ impl ProviderMeta for BedrockDef { DEFAULT_BASE_URL } + fn semantic_conventions(&self) -> ProviderSemanticConventions { + ProviderSemanticConventions { + gen_ai_provider_name: "aws.bedrock", + llm_system: "amazon", + llm_provider: Some("aws"), + } + } + fn chat_endpoint_path(&self, model: &str) -> Cow<'static, str> { Cow::Owned(format!("/model/{}/converse", encode_path_segment(model))) } @@ -217,7 +225,7 @@ mod tests { use super::{BedrockDef, BedrockProviderConfig}; use crate::gateway::{ provider_instance::ProviderAuth, - traits::{PreparedRequest, ProviderMeta}, + traits::{PreparedRequest, ProviderMeta, ProviderSemanticConventions}, }; #[test] @@ -262,6 +270,15 @@ mod tests { fn build_url_uses_overlap_handling_and_encodes_model_ids_with_slashes() { let provider = BedrockDef; + assert_eq!( + provider.semantic_conventions(), + ProviderSemanticConventions { + gen_ai_provider_name: "aws.bedrock", + llm_system: "amazon", + llm_provider: Some("aws"), + } + ); + let url = provider.build_url( "https://bedrock-runtime.us-east-1.amazonaws.com/model", "inference-profile/us.anthropic.claude-3-7-sonnet-20250219-v1:0", diff --git a/src/gateway/providers/gemini.rs b/src/gateway/providers/gemini.rs index 72a5cae..58723e8 100644 --- a/src/gateway/providers/gemini.rs +++ b/src/gateway/providers/gemini.rs @@ -6,7 +6,10 @@ use serde::{Deserialize, Serialize}; use crate::gateway::{ error::{GatewayError, Result}, provider_instance::ProviderAuth, - traits::{ChatTransform, EmbedTransform, ProviderCapabilities, ProviderMeta}, + traits::{ + ChatTransform, EmbedTransform, ProviderCapabilities, ProviderMeta, + ProviderSemanticConventions, + }, }; pub const IDENTIFIER: &str = "gemini"; @@ -30,6 +33,14 @@ impl ProviderMeta for GoogleDef { "https://generativelanguage.googleapis.com/v1beta/openai" } + fn semantic_conventions(&self) -> ProviderSemanticConventions { + ProviderSemanticConventions { + gen_ai_provider_name: "gcp.gemini", + llm_system: "gemini", + llm_provider: Some("google"), + } + } + fn chat_endpoint_path(&self, _model: &str) -> Cow<'static, str> { Cow::Borrowed("/chat/completions") } @@ -67,7 +78,7 @@ mod tests { use super::GoogleDef; use crate::gateway::{ provider_instance::ProviderAuth, - traits::{EmbedTransform, ProviderCapabilities, ProviderMeta}, + traits::{EmbedTransform, ProviderCapabilities, ProviderMeta, ProviderSemanticConventions}, }; #[test] @@ -92,5 +103,13 @@ mod tests { ); assert_eq!(headers["x-goog-api-key"], "gemini-key"); assert!(provider.as_embed_transform().is_some()); + assert_eq!( + provider.semantic_conventions(), + ProviderSemanticConventions { + gen_ai_provider_name: "gcp.gemini", + llm_system: "gemini", + llm_provider: Some("google"), + } + ); } } diff --git a/src/gateway/traits/chat_format.rs b/src/gateway/traits/chat_format.rs index 1d5a436..4e17c03 100644 --- a/src/gateway/traits/chat_format.rs +++ b/src/gateway/traits/chat_format.rs @@ -26,6 +26,7 @@ pub trait ChatFormat: Send + Sync + 'static { type NativeStreamState: Default + Send + Unpin; /// Stable format name used for logs and diagnostics. + #[allow(unused)] fn name() -> &'static str; /// Whether the request expects a streaming response. diff --git a/src/gateway/traits/mod.rs b/src/gateway/traits/mod.rs index 879c4bc..3e4ef80 100644 --- a/src/gateway/traits/mod.rs +++ b/src/gateway/traits/mod.rs @@ -10,7 +10,7 @@ pub use native::{ pub use native::{NativeOpenAIResponsesSupport, OpenAIResponsesNativeStreamState}; pub use provider::{ ChatTransform, CompatQuirks, EmbedTransform, PreparedRequest, ProviderCapabilities, - ProviderMeta, StreamReaderKind, + ProviderMeta, ProviderSemanticConventions, StreamReaderKind, }; #[allow(unused_imports)] pub use provider::{ImageGenTransform, SttTransform, TtsTransform}; diff --git a/src/gateway/traits/provider.rs b/src/gateway/traits/provider.rs index 874f506..b3392e7 100644 --- a/src/gateway/traits/provider.rs +++ b/src/gateway/traits/provider.rs @@ -45,11 +45,26 @@ pub(crate) fn encode_path_segment(segment: &str) -> String { utf8_percent_encode(segment, PATH_SEGMENT_ENCODE_SET).to_string() } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct ProviderSemanticConventions { + pub gen_ai_provider_name: &'static str, + pub llm_system: &'static str, + pub llm_provider: Option<&'static str>, +} + /// Provider metadata with no data transformation logic. pub trait ProviderMeta: Send + Sync + 'static { fn name(&self) -> &'static str; fn default_base_url(&self) -> &'static str; + fn semantic_conventions(&self) -> ProviderSemanticConventions { + ProviderSemanticConventions { + gen_ai_provider_name: self.name(), + llm_system: self.name(), + llm_provider: None, + } + } + /// Chat endpoint path for the provider. Implementations may use `model` /// for providers whose route shape depends on the model name. fn chat_endpoint_path(&self, _model: &str) -> Cow<'static, str> { @@ -306,7 +321,10 @@ mod tests { use http::HeaderMap; use serde_json::json; - use super::{ChatTransform, CompatQuirks, EmbedTransform, ProviderMeta, StreamReaderKind}; + use super::{ + ChatTransform, CompatQuirks, EmbedTransform, ProviderMeta, + ProviderSemanticConventions, StreamReaderKind, + }; use crate::gateway::{ provider_instance::ProviderAuth, traits::chat_format::ChatStreamState, @@ -421,6 +439,20 @@ mod tests { assert_eq!(body["stream_options"]["include_usage"], true); } + #[test] + fn provider_meta_uses_name_based_default_semantic_conventions() { + let provider = DummyProvider; + + assert_eq!( + provider.semantic_conventions(), + ProviderSemanticConventions { + gen_ai_provider_name: "dummy", + llm_system: "dummy", + llm_provider: None, + } + ); + } + #[test] fn apply_to_request_skips_stream_usage_for_non_streaming_requests() { let quirks = CompatQuirks { diff --git a/src/proxy/handlers/chat_completions/mod.rs b/src/proxy/handlers/chat_completions/mod.rs index cf7dd1d..8865a55 100644 --- a/src/proxy/handlers/chat_completions/mod.rs +++ b/src/proxy/handlers/chat_completions/mod.rs @@ -1,10 +1,11 @@ +mod span_attributes; mod types; use std::{convert::Infallible, time::Duration}; use axum::{ Json, - extract::{Extension, State}, + extract::State, response::{ IntoResponse, Response, sse::{Event as SseEvent, Sse}, @@ -12,12 +13,17 @@ use axum::{ }; use fastrace::prelude::{Event as TraceEvent, *}; use log::error; -use tokio::sync::oneshot; +use span_attributes::{ + StreamOutputCollector, apply_span_properties, chunk_span_properties, request_span_properties, + response_span_properties, usage_span_properties, +}; +use tokio::sync::{oneshot, oneshot::error::TryRecvError}; pub use types::ChatCompletionError; use crate::{ config::entities::{Model, ResourceEntry}, gateway::{ + error::GatewayError, formats::OpenAIChatFormat, traits::ChatFormat, types::{ @@ -31,13 +37,11 @@ use crate::{ hooks::{self, RequestContext}, provider::create_provider_instance, }, - utils::future::maybe_timeout, + utils::future::{WithSpan, maybe_timeout}, }; -#[fastrace::trace] pub async fn chat_completions( State(state): State, - Extension(span_ctx): Extension, mut request_ctx: RequestContext, Json(mut request_data): Json, ) -> Result { @@ -62,27 +66,51 @@ pub async fn chat_completions( let gateway = state.gateway(); let resources = state.resources(); - let provider_instance = create_provider_instance(gateway.as_ref(), resources.as_ref(), &model)?; + let provider = model.provider(resources.as_ref()).ok_or_else(|| { + GatewayError::Internal(format!("provider {} not found", model.provider_id)) + })?; + let provider_instance = create_provider_instance(gateway.as_ref(), &provider)?; + let provider_base_url = provider_instance.effective_base_url().ok(); - match maybe_timeout( - timeout, - gateway.chat_completion(&request_data, &provider_instance), - ) - .await - { - Ok(response) => match response? { - ChatResponse::Complete { response, usage } => { - handle_regular_request(response, usage, &mut request_ctx).await - } - ChatResponse::Stream { stream, usage_rx } => { - handle_stream_request(stream, usage_rx, &mut request_ctx, span_ctx).await - } - }, - Err(err) => Err(ChatCompletionError::Timeout(err)), + let span = Span::enter_with_local_parent("aisix.llm.chat_completion"); + apply_span_properties( + &span, + request_span_properties( + &request_data, + provider_instance.def.as_ref(), + provider_base_url.as_ref(), + ), + ); + + let (response, span) = (WithSpan { + inner: maybe_timeout( + timeout, + gateway.chat_completion(&request_data, &provider_instance), + ), + span: Some(span), + }) + .await; + + match response { + Ok(Ok(ChatResponse::Complete { response, usage })) => { + LocalSpan::add_properties(|| response_span_properties(&response, &usage)); + handle_regular_request(response, usage, &mut request_ctx).await + } + Ok(Ok(ChatResponse::Stream { stream, usage_rx })) => { + LocalSpan::add_property(|| ("error.type", "no_stream")); + handle_stream_request(stream, usage_rx, &mut request_ctx, span).await + } + Ok(Err(err)) => { + LocalSpan::add_property(|| ("error.type", "gateway_error")); + Err(err.into()) + } + Err(err) => { + LocalSpan::add_property(|| ("error.type", "timeout")); + Err(ChatCompletionError::Timeout(err)) + } } } -#[fastrace::trace] async fn handle_regular_request( response: ChatCompletionResponse, usage: Usage, @@ -119,29 +147,36 @@ fn spawn_stream_usage_observer(request_ctx: RequestContext, usage_rx: oneshot::R }); } -#[fastrace::trace] async fn handle_stream_request( stream: ChatResponseStream, usage_rx: oneshot::Receiver, request_ctx: &mut RequestContext, - span_ctx: SpanContext, + span: Span, ) -> Result { use futures::stream::StreamExt; - spawn_stream_usage_observer(request_ctx.clone(), usage_rx); - let stream_request_ctx = request_ctx.clone(); - let stream_span = Span::root("sse_connection", span_ctx); let sse_stream = futures::stream::unfold( ( stream, - stream_span, + span, 0usize, stream_request_ctx, false, false, + Some(usage_rx), + StreamOutputCollector::default(), ), - |(mut stream, span, idx, mut request_ctx, done, saw_chunk)| async move { + |( + mut stream, + span, + idx, + mut request_ctx, + done, + saw_chunk, + mut usage_rx, + mut output_collector, + )| async move { if done { drop(span); return None; @@ -149,12 +184,22 @@ async fn handle_stream_request( match stream.next().await { Some(Ok(chunk)) => { + output_collector.record_chunk(&chunk); + if idx == 0 { hooks::observability::record_first_token_latency(&mut request_ctx).await; - span.add_event(TraceEvent::new(format!( - "{} first token arrived", - OpenAIChatFormat::name() - ))); + span.add_event(TraceEvent::new("first token arrived")); + span.add_properties(|| chunk_span_properties(&chunk)); + } else { + let properties = chunk_span_properties(&chunk); + properties + .iter() + .filter(|(key, _)| { + key == "gen_ai.response.finish_reasons" + || key == "llm.finish_reason" + || key == "llm.token_count.completion_details.reasoning" + }) + .for_each(|item| span.add_property(|| item.clone())); } let mut event = @@ -164,18 +209,75 @@ async fn handle_stream_request( } let event = Ok::(event); - Some((event, (stream, span, idx + 1, request_ctx, false, true))) + Some(( + event, + ( + stream, + span, + idx + 1, + request_ctx, + false, + true, + usage_rx, + output_collector, + ), + )) } Some(Err(err)) => { error!("Gateway stream error: {}", err); + span.add_property(|| ("error.type", "stream_error")); + span.add_properties(|| output_collector.output_message_span_properties()); + if let Some(usage_rx) = usage_rx.take() { + spawn_stream_usage_observer(request_ctx.clone(), usage_rx); + } drop(span); None } None => { + span.add_properties(|| output_collector.output_message_span_properties()); + + if let Some(mut usage_rx) = usage_rx.take() { + match usage_rx.try_recv() { + Ok(usage) => { + if let Err(err) = hooks::rate_limit::post_check_streaming( + &mut request_ctx, + &usage, + ) + .await + { + error!("Rate limit post_check_streaming error: {}", err); + } + hooks::observability::record_streaming_usage( + &mut request_ctx, + &usage, + ) + .await; + span.add_properties(|| usage_span_properties(&usage)); + } + Err(TryRecvError::Empty) => { + spawn_stream_usage_observer(request_ctx.clone(), usage_rx); + } + Err(TryRecvError::Closed) => { + error!( + "Failed to receive streaming usage from gateway: channel closed" + ); + } + } + } + if saw_chunk { Some(( Ok(SseEvent::default().data("[DONE]")), - (stream, span, idx + 1, request_ctx, true, saw_chunk), + ( + stream, + span, + idx + 1, + request_ctx, + true, + saw_chunk, + usage_rx, + output_collector, + ), )) } else { drop(span); diff --git a/src/proxy/handlers/chat_completions/span_attributes/message_attributes.rs b/src/proxy/handlers/chat_completions/span_attributes/message_attributes.rs new file mode 100644 index 0000000..3963c5f --- /dev/null +++ b/src/proxy/handlers/chat_completions/span_attributes/message_attributes.rs @@ -0,0 +1,364 @@ +use serde::Serialize; +use serde_json::{Map, Value}; + +use crate::gateway::types::openai::{ + ChatCompletionResponse, ChatMessage, ContentPart, MessageContent, Tool, +}; + +#[derive(Clone)] +pub(super) enum MessageContentView { + Text(String), + Parts(Vec), +} + +#[derive(Clone)] +pub(super) enum ContentPartView { + Text(String), + ImageUrl { url: String }, +} + +#[derive(Clone)] +pub(super) struct ToolCallView { + pub(super) id: Option, + pub(super) name: String, + pub(super) arguments: String, +} + +#[derive(Clone)] +pub(super) struct MessageView { + pub(super) role: String, + pub(super) content: Option, + pub(super) name: Option, + pub(super) tool_calls: Vec, + pub(super) tool_call_id: Option, +} + +#[derive(Clone)] +pub(super) struct OutputMessageView { + pub(super) message: MessageView, + pub(super) finish_reason: Option, +} + +pub(super) fn message_view_from_chat_message(message: &ChatMessage) -> MessageView { + MessageView { + role: message.role.clone(), + content: message + .content + .as_ref() + .map(message_content_view_from_message_content), + name: message.name.clone(), + tool_calls: message + .tool_calls + .as_deref() + .unwrap_or_default() + .iter() + .map(|tool_call| ToolCallView { + id: Some(tool_call.id.clone()), + name: tool_call.function.name.clone(), + arguments: tool_call.function.arguments.clone(), + }) + .collect(), + tool_call_id: message.tool_call_id.clone(), + } +} + +pub(super) fn response_output_message_views( + response: &ChatCompletionResponse, +) -> Vec { + response + .choices + .iter() + .map(|choice| OutputMessageView { + message: message_view_from_chat_message(&choice.message), + finish_reason: choice.finish_reason.clone(), + }) + .collect() +} + +pub(super) fn gen_ai_input_messages_json(messages: &[MessageView]) -> Option { + if messages.is_empty() { + return None; + } + + let values: Vec = messages + .iter() + .map(|message| { + let mut value = Map::new(); + value.insert("role".into(), Value::String(message.role.clone())); + value.insert("parts".into(), Value::Array(gen_ai_message_parts(message))); + + if let Some(name) = &message.name { + value.insert("name".into(), Value::String(name.clone())); + } + + Value::Object(value) + }) + .collect(); + + serialize_to_json_string(&values) +} + +pub(super) fn gen_ai_output_messages_json(messages: &[OutputMessageView]) -> Option { + if messages.is_empty() { + return None; + } + + let values: Vec = messages + .iter() + .map(|message| { + let mut value = Map::new(); + value.insert("role".into(), Value::String(message.message.role.clone())); + value.insert( + "parts".into(), + Value::Array(gen_ai_message_parts(&message.message)), + ); + value.insert( + "finish_reason".into(), + Value::String( + message + .finish_reason + .clone() + .unwrap_or_else(|| "unknown".into()), + ), + ); + + if let Some(name) = &message.message.name { + value.insert("name".into(), Value::String(name.clone())); + } + + Value::Object(value) + }) + .collect(); + + serialize_to_json_string(&values) +} + +pub(super) fn gen_ai_tool_definitions_json(tools: &[Tool]) -> Option { + if tools.is_empty() { + return None; + } + + let values: Vec = tools + .iter() + .map(|tool| { + let mut value = Map::new(); + value.insert("type".into(), Value::String(tool.r#type.clone())); + value.insert("name".into(), Value::String(tool.function.name.clone())); + + if let Some(description) = &tool.function.description { + value.insert("description".into(), Value::String(description.clone())); + } + + if let Some(parameters) = &tool.function.parameters { + value.insert("parameters".into(), parameters.clone()); + } + + Value::Object(value) + }) + .collect(); + + serialize_to_json_string(&values) +} + +pub(super) fn append_openinference_message_properties( + properties: &mut Vec<(String, String)>, + prefix: &str, + messages: &[MessageView], +) { + for (message_index, message) in messages.iter().enumerate() { + let prefix = format!("{prefix}.{message_index}.message"); + properties.push((format!("{prefix}.role"), message.role.clone())); + + if let Some(name) = &message.name { + properties.push((format!("{prefix}.name"), name.clone())); + } + + if let Some(tool_call_id) = &message.tool_call_id { + properties.push((format!("{prefix}.tool_call_id"), tool_call_id.clone())); + } + + match &message.content { + Some(MessageContentView::Text(text)) if !text.is_empty() => { + properties.push((format!("{prefix}.content"), text.clone())); + } + Some(MessageContentView::Parts(parts)) => { + for (content_index, part) in parts.iter().enumerate() { + let part_prefix = format!("{prefix}.contents.{content_index}.message_content"); + match part { + ContentPartView::Text(text) => { + properties.push((format!("{part_prefix}.type"), "text".into())); + properties.push((format!("{part_prefix}.text"), text.clone())); + } + ContentPartView::ImageUrl { url } => { + properties.push((format!("{part_prefix}.type"), "image".into())); + properties + .push((format!("{part_prefix}.image.image.url"), url.clone())); + } + } + } + } + _ => {} + } + + for (tool_call_index, tool_call) in message.tool_calls.iter().enumerate() { + let tool_call_prefix = format!("{prefix}.tool_calls.{tool_call_index}.tool_call"); + + if let Some(id) = &tool_call.id { + properties.push((format!("{tool_call_prefix}.id"), id.clone())); + } + + properties.push(( + format!("{tool_call_prefix}.function.name"), + tool_call.name.clone(), + )); + + if !tool_call.arguments.is_empty() { + properties.push(( + format!("{tool_call_prefix}.function.arguments"), + tool_call.arguments.clone(), + )); + } + } + } +} + +pub(super) fn append_openinference_output_message_properties( + properties: &mut Vec<(String, String)>, + prefix: &str, + messages: &[OutputMessageView], +) { + let message_views: Vec<_> = messages + .iter() + .map(|message| message.message.clone()) + .collect(); + append_openinference_message_properties(properties, prefix, &message_views); +} + +pub(super) fn append_openinference_tool_properties( + properties: &mut Vec<(String, String)>, + tools: &[Tool], +) { + for (tool_index, tool) in tools.iter().enumerate() { + let prefix = format!("llm.tools.{tool_index}.tool"); + properties.push((format!("{prefix}.name"), tool.function.name.clone())); + + if let Some(description) = &tool.function.description { + properties.push((format!("{prefix}.description"), description.clone())); + } + + if let Some(parameters) = &tool.function.parameters { + if let Some(value) = serialize_to_json_string(parameters) { + properties.push((format!("{prefix}.parameters"), value)); + } + } + + if let Some(value) = serialize_to_json_string(tool) { + properties.push((format!("{prefix}.json_schema"), value)); + } + } +} + +fn serialize_to_json_string(value: &impl Serialize) -> Option { + serde_json::to_string(value).ok() +} + +fn parse_json_or_string(raw: &str) -> Value { + serde_json::from_str(raw).unwrap_or_else(|_| Value::String(raw.to_string())) +} + +fn content_part_view_from_content_part(part: &ContentPart) -> ContentPartView { + match part { + ContentPart::Text { text } => ContentPartView::Text(text.clone()), + ContentPart::ImageUrl { image_url } => ContentPartView::ImageUrl { + url: image_url.url.clone(), + }, + } +} + +fn message_content_view_from_message_content(content: &MessageContent) -> MessageContentView { + match content { + MessageContent::Text(text) => MessageContentView::Text(text.clone()), + MessageContent::Parts(parts) => MessageContentView::Parts( + parts + .iter() + .map(content_part_view_from_content_part) + .collect(), + ), + } +} + +fn gen_ai_message_parts(message: &MessageView) -> Vec { + if message.role == "tool" { + let mut part = Map::new(); + part.insert("type".into(), Value::String("tool_call_response".into())); + + if let Some(tool_call_id) = &message.tool_call_id { + part.insert("id".into(), Value::String(tool_call_id.clone())); + } + + let response = match &message.content { + Some(MessageContentView::Text(text)) => parse_json_or_string(text), + Some(MessageContentView::Parts(parts)) => { + Value::Array(parts.iter().map(gen_ai_content_part_value).collect()) + } + None => Value::Null, + }; + part.insert("response".into(), response); + + return vec![Value::Object(part)]; + } + + let mut parts = Vec::new(); + + match &message.content { + Some(MessageContentView::Text(text)) if !text.is_empty() => { + parts.push(gen_ai_text_part(text)); + } + Some(MessageContentView::Parts(content_parts)) => { + parts.extend(content_parts.iter().map(gen_ai_content_part_value)); + } + _ => {} + } + + for tool_call in &message.tool_calls { + let mut part = Map::new(); + part.insert("type".into(), Value::String("tool_call".into())); + + if let Some(id) = &tool_call.id { + part.insert("id".into(), Value::String(id.clone())); + } + + part.insert("name".into(), Value::String(tool_call.name.clone())); + + if !tool_call.arguments.is_empty() { + part.insert( + "arguments".into(), + parse_json_or_string(&tool_call.arguments), + ); + } + + parts.push(Value::Object(part)); + } + + parts +} + +fn gen_ai_text_part(text: &str) -> Value { + let mut part = Map::new(); + part.insert("type".into(), Value::String("text".into())); + part.insert("content".into(), Value::String(text.to_string())); + Value::Object(part) +} + +fn gen_ai_content_part_value(part: &ContentPartView) -> Value { + match part { + ContentPartView::Text(text) => gen_ai_text_part(text), + ContentPartView::ImageUrl { url } => { + let mut part = Map::new(); + part.insert("type".into(), Value::String("uri".into())); + part.insert("modality".into(), Value::String("image".into())); + part.insert("uri".into(), Value::String(url.clone())); + Value::Object(part) + } + } +} diff --git a/src/proxy/handlers/chat_completions/span_attributes/mod.rs b/src/proxy/handlers/chat_completions/span_attributes/mod.rs new file mode 100644 index 0000000..6b22c33 --- /dev/null +++ b/src/proxy/handlers/chat_completions/span_attributes/mod.rs @@ -0,0 +1,12 @@ +mod message_attributes; +mod stream_output; +mod telemetry; + +pub(super) use stream_output::StreamOutputCollector; +pub(super) use telemetry::{ + apply_span_properties, chunk_span_properties, request_span_properties, + response_span_properties, usage_span_properties, +}; + +#[cfg(test)] +mod tests; diff --git a/src/proxy/handlers/chat_completions/span_attributes/stream_output.rs b/src/proxy/handlers/chat_completions/span_attributes/stream_output.rs new file mode 100644 index 0000000..8c6b6e5 --- /dev/null +++ b/src/proxy/handlers/chat_completions/span_attributes/stream_output.rs @@ -0,0 +1,117 @@ +use std::collections::BTreeMap; + +use super::message_attributes::{ + MessageContentView, MessageView, OutputMessageView, ToolCallView, + append_openinference_output_message_properties, gen_ai_output_messages_json, +}; +use crate::gateway::types::openai::ChatCompletionChunk; + +#[derive(Default)] +struct StreamOutputToolCall { + id: Option, + name: Option, + arguments: String, +} + +#[derive(Default)] +struct StreamOutputChoice { + role: Option, + content: String, + tool_calls: BTreeMap, + finish_reason: Option, +} + +#[derive(Default)] +pub(in crate::proxy::handlers::chat_completions) struct StreamOutputCollector { + choices: BTreeMap, +} + +impl StreamOutputCollector { + pub(in crate::proxy::handlers::chat_completions) fn record_chunk( + &mut self, + chunk: &ChatCompletionChunk, + ) { + for choice in &chunk.choices { + let output_choice = self.choices.entry(choice.index).or_default(); + + if let Some(role) = &choice.delta.role { + output_choice.role = Some(role.clone()); + } + + if let Some(content) = &choice.delta.content { + output_choice.content.push_str(content); + } + + if let Some(tool_calls) = &choice.delta.tool_calls { + for tool_call in tool_calls { + let output_tool_call = + output_choice.tool_calls.entry(tool_call.index).or_default(); + + if let Some(id) = &tool_call.id { + output_tool_call.id = Some(id.clone()); + } + + if let Some(function) = &tool_call.function { + if let Some(name) = &function.name { + output_tool_call.name = Some(name.clone()); + } + + if let Some(arguments) = &function.arguments { + output_tool_call.arguments.push_str(arguments); + } + } + } + } + + if let Some(finish_reason) = &choice.finish_reason { + output_choice.finish_reason = Some(finish_reason.clone()); + } + } + } + + pub(in crate::proxy::handlers::chat_completions) fn output_message_span_properties( + &self, + ) -> Vec<(String, String)> { + let output_messages = self.output_message_views(); + let mut properties = Vec::new(); + + append_openinference_output_message_properties( + &mut properties, + "llm.output_messages", + &output_messages, + ); + + if let Some(value) = gen_ai_output_messages_json(&output_messages) { + properties.push(("gen_ai.output.messages".into(), value)); + } + + properties + } + + fn output_message_views(&self) -> Vec { + self.choices + .values() + .map(|choice| OutputMessageView { + message: MessageView { + role: choice.role.clone().unwrap_or_else(|| "assistant".into()), + content: (!choice.content.is_empty()) + .then(|| MessageContentView::Text(choice.content.clone())), + name: None, + tool_calls: choice + .tool_calls + .values() + .filter_map(|tool_call| { + tool_call.name.clone().map(|name| ToolCallView { + id: tool_call.id.clone(), + name, + arguments: tool_call.arguments.clone(), + }) + }) + .collect(), + tool_call_id: None, + }, + finish_reason: choice.finish_reason.clone(), + }) + .collect() + } +} diff --git a/src/proxy/handlers/chat_completions/span_attributes/telemetry.rs b/src/proxy/handlers/chat_completions/span_attributes/telemetry.rs new file mode 100644 index 0000000..c6493e6 --- /dev/null +++ b/src/proxy/handlers/chat_completions/span_attributes/telemetry.rs @@ -0,0 +1,504 @@ +use fastrace::prelude::Span; +use reqwest::Url; +use serde_json::{Map, Value}; + +use crate::{ + gateway::types::{ + common::Usage, + openai::{ + ChatCompletionChoice, ChatCompletionChunk, ChatCompletionChunkChoice, + ChatCompletionRequest, ChatCompletionResponse, ChatCompletionUsage, + ResponseFormat, StopCondition, + }, + }, + gateway::traits::ProviderCapabilities, +}; + +use super::{ + message_attributes::{ + append_openinference_message_properties, + append_openinference_output_message_properties, + append_openinference_tool_properties, gen_ai_input_messages_json, + gen_ai_output_messages_json, gen_ai_tool_definitions_json, + message_view_from_chat_message, response_output_message_views, + }, +}; + +pub(in crate::proxy::handlers::chat_completions) fn request_span_properties( + request: &ChatCompletionRequest, + provider: &dyn ProviderCapabilities, + base_url: Option<&Url>, +) -> Vec<(String, String)> { + let provider_semantics = provider.semantic_conventions(); + let input_messages: Vec<_> = request + .messages + .iter() + .map(message_view_from_chat_message) + .collect(); + let mut properties = vec![ + ("gen_ai.operation.name".into(), "chat".into()), + ("openinference.span.kind".into(), "LLM".into()), + ( + "gen_ai.provider.name".into(), + provider_semantics.gen_ai_provider_name.to_string(), + ), + ("llm.system".into(), provider_semantics.llm_system.to_string()), + ("gen_ai.request.model".into(), request.model.clone()), + ]; + + if let Some(llm_provider) = provider_semantics.llm_provider { + properties.push(("llm.provider".into(), llm_provider.to_string())); + } + + if let Some(choice_count) = request.n.filter(|count| *count != 1) { + properties.push(( + "gen_ai.request.choice.count".into(), + choice_count.to_string(), + )); + } + + if let Some(seed) = request.seed { + properties.push(("gen_ai.request.seed".into(), seed.to_string())); + } + + if let Some(max_tokens) = request.max_completion_tokens.or(request.max_tokens) { + properties.push(("gen_ai.request.max_tokens".into(), max_tokens.to_string())); + } + + if let Some(value) = request.frequency_penalty { + properties.push(("gen_ai.request.frequency_penalty".into(), value.to_string())); + } + + if let Some(value) = request.presence_penalty { + properties.push(("gen_ai.request.presence_penalty".into(), value.to_string())); + } + + if let Some(value) = request.temperature { + properties.push(("gen_ai.request.temperature".into(), value.to_string())); + } + + if let Some(value) = request.top_p { + properties.push(("gen_ai.request.top_p".into(), value.to_string())); + } + + if let Some(value) = numeric_extra_to_string(request.extra.get("top_k")) { + properties.push(("gen_ai.request.top_k".into(), value)); + } + + if let Some(value) = stop_sequences_json(request.stop.as_ref()) { + properties.push(("gen_ai.request.stop_sequences".into(), value)); + } + + if let Some(value) = response_format_output_type(request.response_format.as_ref()) { + properties.push(("gen_ai.output.type".into(), value.to_string())); + } + + if let Some(value) = request_invocation_parameters(request) { + properties.push(("llm.invocation_parameters".into(), value)); + } + + if let Some(user_id) = request.user.as_ref().filter(|user_id| !user_id.is_empty()) { + properties.push(("user.id".into(), user_id.clone())); + } + + append_openinference_message_properties( + &mut properties, + "llm.input_messages", + &input_messages, + ); + + if let Some(value) = gen_ai_input_messages_json(&input_messages) { + properties.push(("gen_ai.input.messages".into(), value)); + } + + if let Some(tools) = request.tools.as_deref() { + append_openinference_tool_properties(&mut properties, tools); + + if let Some(value) = gen_ai_tool_definitions_json(tools) { + properties.push(("gen_ai.tool.definitions".into(), value)); + } + } + + if let Some(base_url) = base_url { + if let Some(address) = base_url.host_str() { + properties.push(("server.address".into(), address.to_string())); + } + if let Some(port) = base_url.port_or_known_default() { + properties.push(("server.port".into(), port.to_string())); + } + } + + properties +} + +pub(in crate::proxy::handlers::chat_completions) fn response_span_properties( + response: &ChatCompletionResponse, + usage: &Usage, +) -> Vec<(String, String)> { + let output_messages = response_output_message_views(response); + let mut properties = vec![ + ("gen_ai.response.id".into(), response.id.clone()), + ("gen_ai.response.model".into(), response.model.clone()), + ("llm.model_name".into(), response.model.clone()), + ]; + + append_finish_reason_properties( + &mut properties, + collect_finish_reasons(response.choices.iter().map(choice_finish_reason)), + ); + append_response_usage_properties(&mut properties, usage, response.usage.as_ref()); + append_openinference_output_message_properties( + &mut properties, + "llm.output_messages", + &output_messages, + ); + + if let Some(value) = gen_ai_output_messages_json(&output_messages) { + properties.push(("gen_ai.output.messages".into(), value)); + } + + properties +} + +pub(in crate::proxy::handlers::chat_completions) fn chunk_span_properties( + chunk: &ChatCompletionChunk, +) -> Vec<(String, String)> { + let mut properties = Vec::new(); + + if !chunk.id.is_empty() { + properties.push(("gen_ai.response.id".into(), chunk.id.clone())); + } + + if !chunk.model.is_empty() { + properties.push(("gen_ai.response.model".into(), chunk.model.clone())); + properties.push(("llm.model_name".into(), chunk.model.clone())); + } + + append_finish_reason_properties( + &mut properties, + collect_finish_reasons(chunk.choices.iter().map(chunk_choice_finish_reason)), + ); + append_chunk_usage_properties(&mut properties, chunk.usage.as_ref()); + + properties +} + +pub(in crate::proxy::handlers::chat_completions) fn usage_span_properties( + usage: &Usage, +) -> Vec<(String, String)> { + let mut properties = Vec::new(); + append_usage_properties(&mut properties, usage); + properties +} + +pub(in crate::proxy::handlers::chat_completions) fn apply_span_properties( + span: &Span, + properties: Vec<(String, String)>, +) { + if properties.is_empty() { + return; + } + + span.add_properties(move || properties); +} + +fn response_format_output_type(response_format: Option<&ResponseFormat>) -> Option<&'static str> { + match response_format?.r#type.as_str() { + "json_object" | "json_schema" => Some("json"), + "text" => Some("text"), + "image" => Some("image"), + "speech" => Some("speech"), + _ => None, + } +} + +fn numeric_extra_to_string(value: Option<&Value>) -> Option { + let value = value?; + + if let Some(integer) = value.as_i64() { + return Some(integer.to_string()); + } + + if let Some(integer) = value.as_u64() { + return Some(integer.to_string()); + } + + value.as_f64().map(|float| float.to_string()) +} + +fn stop_sequences_json(stop: Option<&StopCondition>) -> Option { + let stop = stop?; + let value = match stop { + StopCondition::Single(value) => Value::Array(vec![Value::String(value.clone())]), + StopCondition::Multiple(values) => { + Value::Array(values.iter().cloned().map(Value::String).collect()) + } + }; + + serde_json::to_string(&value).ok() +} + +fn request_invocation_parameters(request: &ChatCompletionRequest) -> Option { + let mut params = Map::new(); + + if let Some(value) = request.frequency_penalty { + params.insert("frequency_penalty".into(), Value::from(value)); + } + if let Some(value) = request.logprobs { + params.insert("logprobs".into(), Value::from(value)); + } + if let Some(value) = request.top_logprobs { + params.insert("top_logprobs".into(), Value::from(value)); + } + if let Some(value) = request.max_tokens { + params.insert("max_tokens".into(), Value::from(value)); + } + if let Some(value) = request.max_completion_tokens { + params.insert("max_completion_tokens".into(), Value::from(value)); + } + if let Some(value) = request.n { + params.insert("n".into(), Value::from(value)); + } + if let Some(value) = request.presence_penalty { + params.insert("presence_penalty".into(), Value::from(value)); + } + if let Some(value) = request.seed { + params.insert("seed".into(), Value::from(value)); + } + if let Some(value) = stop_sequences_json(request.stop.as_ref()) { + if let Ok(parsed) = serde_json::from_str(&value) { + params.insert("stop".into(), parsed); + } + } + if let Some(value) = request.stream { + params.insert("stream".into(), Value::from(value)); + } + if let Some(value) = request.temperature { + params.insert("temperature".into(), Value::from(value)); + } + if let Some(value) = request.top_p { + params.insert("top_p".into(), Value::from(value)); + } + if let Some(value) = request.parallel_tool_calls { + params.insert("parallel_tool_calls".into(), Value::from(value)); + } + if let Some(value) = request.response_format.as_ref() { + if let Ok(value) = serde_json::to_value(value) { + params.insert("response_format".into(), value); + } + } + if let Some(value) = request.stream_options.as_ref() { + if let Ok(value) = serde_json::to_value(value) { + params.insert("stream_options".into(), value); + } + } + if let Some(value) = request.tool_choice.as_ref() { + if let Ok(value) = serde_json::to_value(value) { + params.insert("tool_choice".into(), value); + } + } + + if params.is_empty() { + return None; + } + + serde_json::to_string(&Value::Object(params)).ok() +} + +fn collect_finish_reasons(finish_reasons: I) -> Vec +where + I: IntoIterator>, +{ + let mut values = Vec::new(); + + for finish_reason in finish_reasons.into_iter().flatten() { + if !values.iter().any(|value| value == &finish_reason) { + values.push(finish_reason); + } + } + + values +} + +fn append_finish_reason_properties( + properties: &mut Vec<(String, String)>, + finish_reasons: Vec, +) { + if finish_reasons.is_empty() { + return; + } + + properties.push(( + "gen_ai.response.finish_reasons".into(), + serde_json::to_string(&finish_reasons).unwrap_or_default(), + )); + + if let Some(finish_reason) = finish_reasons.first() { + properties.push(("llm.finish_reason".into(), finish_reason.clone())); + } +} + +fn append_usage_properties(properties: &mut Vec<(String, String)>, usage: &Usage) { + if let Some(input_tokens) = usage.input_tokens { + let input_tokens = input_tokens.to_string(); + properties.push(("gen_ai.usage.input_tokens".into(), input_tokens.clone())); + properties.push(("llm.token_count.prompt".into(), input_tokens)); + } + + if let Some(output_tokens) = usage.output_tokens { + let output_tokens = output_tokens.to_string(); + properties.push(("gen_ai.usage.output_tokens".into(), output_tokens.clone())); + properties.push(("llm.token_count.completion".into(), output_tokens)); + } + + if let Some(total_tokens) = usage.resolved_total_tokens() { + properties.push(("llm.token_count.total".into(), total_tokens.to_string())); + } + + if let Some(cache_creation_input_tokens) = usage.cache_creation_input_tokens { + let cache_creation_input_tokens = cache_creation_input_tokens.to_string(); + properties.push(( + "gen_ai.usage.cache_creation.input_tokens".into(), + cache_creation_input_tokens.clone(), + )); + properties.push(( + "llm.token_count.prompt_details.cache_write".into(), + cache_creation_input_tokens, + )); + } + + if let Some(cache_read_input_tokens) = usage.cache_read_input_tokens { + let cache_read_input_tokens = cache_read_input_tokens.to_string(); + properties.push(( + "gen_ai.usage.cache_read.input_tokens".into(), + cache_read_input_tokens.clone(), + )); + properties.push(( + "llm.token_count.prompt_details.cache_read".into(), + cache_read_input_tokens, + )); + } + + if let Some(input_audio_tokens) = usage.input_audio_tokens { + properties.push(( + "llm.token_count.prompt_details.audio".into(), + input_audio_tokens.to_string(), + )); + } + + if let Some(output_audio_tokens) = usage.output_audio_tokens { + properties.push(( + "llm.token_count.completion_details.audio".into(), + output_audio_tokens.to_string(), + )); + } +} + +fn append_response_usage_properties( + properties: &mut Vec<(String, String)>, + usage: &Usage, + raw_usage: Option<&ChatCompletionUsage>, +) { + append_usage_properties(properties, usage); + + let Some(raw_usage) = raw_usage else { + return; + }; + + if usage.input_tokens.is_none() { + let input_tokens = raw_usage.prompt_tokens.to_string(); + properties.push(("gen_ai.usage.input_tokens".into(), input_tokens.clone())); + properties.push(("llm.token_count.prompt".into(), input_tokens)); + } + + if usage.output_tokens.is_none() { + let output_tokens = raw_usage.completion_tokens.to_string(); + properties.push(("gen_ai.usage.output_tokens".into(), output_tokens.clone())); + properties.push(("llm.token_count.completion".into(), output_tokens)); + } + + if usage.resolved_total_tokens().is_none() { + properties.push(( + "llm.token_count.total".into(), + raw_usage.total_tokens.to_string(), + )); + } + + if usage.cache_read_input_tokens.is_none() { + if let Some(cached_tokens) = raw_usage + .prompt_tokens_details + .as_ref() + .and_then(|details| details.cached_tokens) + { + let cached_tokens = cached_tokens.to_string(); + properties.push(( + "gen_ai.usage.cache_read.input_tokens".into(), + cached_tokens.clone(), + )); + properties.push(( + "llm.token_count.prompt_details.cache_read".into(), + cached_tokens, + )); + } + } + + if usage.input_audio_tokens.is_none() { + if let Some(audio_tokens) = raw_usage + .prompt_tokens_details + .as_ref() + .and_then(|details| details.audio_tokens) + { + properties.push(( + "llm.token_count.prompt_details.audio".into(), + audio_tokens.to_string(), + )); + } + } + + if let Some(reasoning_tokens) = raw_usage + .completion_tokens_details + .as_ref() + .and_then(|details| details.reasoning_tokens) + { + properties.push(( + "llm.token_count.completion_details.reasoning".into(), + reasoning_tokens.to_string(), + )); + } + + if usage.output_audio_tokens.is_none() { + if let Some(audio_tokens) = raw_usage + .completion_tokens_details + .as_ref() + .and_then(|details| details.audio_tokens) + { + properties.push(( + "llm.token_count.completion_details.audio".into(), + audio_tokens.to_string(), + )); + } + } +} + +fn append_chunk_usage_properties( + properties: &mut Vec<(String, String)>, + raw_usage: Option<&ChatCompletionUsage>, +) { + if let Some(reasoning_tokens) = raw_usage + .and_then(|usage| usage.completion_tokens_details.as_ref()) + .and_then(|details| details.reasoning_tokens) + { + properties.push(( + "llm.token_count.completion_details.reasoning".into(), + reasoning_tokens.to_string(), + )); + } +} + +fn choice_finish_reason(choice: &ChatCompletionChoice) -> Option { + choice.finish_reason.clone() +} + +fn chunk_choice_finish_reason(choice: &ChatCompletionChunkChoice) -> Option { + choice.finish_reason.clone() +} diff --git a/src/proxy/handlers/chat_completions/span_attributes/tests.rs b/src/proxy/handlers/chat_completions/span_attributes/tests.rs new file mode 100644 index 0000000..f3270c5 --- /dev/null +++ b/src/proxy/handlers/chat_completions/span_attributes/tests.rs @@ -0,0 +1,251 @@ +use serde_json::{Value, json}; + +use super::{StreamOutputCollector, request_span_properties, response_span_properties}; +use crate::gateway::{ + providers::openai::OpenAIDef, + types::{ + common::Usage, + openai::{ + ChatCompletionChoice, ChatCompletionChunk, ChatCompletionChunkChoice, + ChatCompletionChunkDelta, ChatCompletionRequest, ChatCompletionResponse, + ChatCompletionUsage, ChatMessage, ChunkFunctionCall, ChunkToolCall, + CompletionTokensDetails, ContentPart, FunctionCall, FunctionDefinition, ImageUrl, + MessageContent, Tool, + }, + }, +}; + +fn property_value<'a>(properties: &'a [(String, String)], key: &str) -> Option<&'a str> { + properties + .iter() + .find(|(property_key, _)| property_key == key) + .map(|(_, value)| value.as_str()) +} + +#[test] +fn request_span_properties_include_message_tool_and_user_attributes() { + let mut request = ChatCompletionRequest { + model: "gpt-4o".into(), + messages: vec![ + ChatMessage { + role: "system".into(), + content: Some(MessageContent::Text("Be concise".into())), + name: None, + tool_calls: None, + tool_call_id: None, + }, + ChatMessage { + role: "user".into(), + content: Some(MessageContent::Parts(vec![ + ContentPart::Text { + text: "Describe this image".into(), + }, + ContentPart::ImageUrl { + image_url: ImageUrl { + url: "https://example.com/cat.png".into(), + detail: None, + }, + }, + ])), + name: None, + tool_calls: None, + tool_call_id: None, + }, + ], + tools: Some(vec![Tool { + r#type: "function".into(), + function: FunctionDefinition { + name: "get_weather".into(), + description: Some("Get current weather".into()), + parameters: Some(json!({ + "type": "object", + "properties": {"city": {"type": "string"}} + })), + strict: Some(true), + }, + }]), + user: Some("user-123".into()), + ..Default::default() + }; + request.extra.insert("top_k".into(), json!(5)); + let provider = OpenAIDef; + + let properties = request_span_properties(&request, &provider, None); + + assert_eq!(property_value(&properties, "user.id"), Some("user-123")); + assert_eq!( + property_value(&properties, "gen_ai.request.top_k"), + Some("5") + ); + assert_eq!( + property_value(&properties, "llm.input_messages.0.message.role"), + Some("system") + ); + assert_eq!( + property_value( + &properties, + "llm.input_messages.1.message.contents.1.message_content.image.image.url", + ), + Some("https://example.com/cat.png"), + ); + assert_eq!( + property_value(&properties, "llm.tools.0.tool.name"), + Some("get_weather"), + ); + + let input_messages: Value = + serde_json::from_str(property_value(&properties, "gen_ai.input.messages").unwrap()) + .unwrap(); + assert_eq!(input_messages[0]["role"], "system"); + assert_eq!(input_messages[1]["parts"][1]["type"], "uri"); + + let tool_definitions: Value = + serde_json::from_str(property_value(&properties, "gen_ai.tool.definitions").unwrap()) + .unwrap(); + assert_eq!(tool_definitions[0]["name"], "get_weather"); +} + +#[test] +fn response_span_properties_include_output_messages_and_reasoning_tokens() { + let response = ChatCompletionResponse { + id: "chatcmpl-1".into(), + object: "chat.completion".into(), + created: 0, + model: "gpt-4o".into(), + choices: vec![ChatCompletionChoice { + index: 0, + message: ChatMessage { + role: "assistant".into(), + content: Some(MessageContent::Text("I'll check.".into())), + name: None, + tool_calls: Some(vec![crate::gateway::types::openai::ToolCall { + id: "call_1".into(), + r#type: "function".into(), + function: FunctionCall { + name: "get_weather".into(), + arguments: r#"{"city":"SF"}"#.into(), + }, + }]), + tool_call_id: None, + }, + finish_reason: Some("tool_calls".into()), + }], + usage: Some(ChatCompletionUsage { + prompt_tokens: 9, + completion_tokens: 7, + total_tokens: 16, + prompt_tokens_details: None, + completion_tokens_details: Some(CompletionTokensDetails { + reasoning_tokens: Some(3), + audio_tokens: None, + }), + }), + system_fingerprint: None, + }; + let usage = Usage { + input_tokens: Some(9), + output_tokens: Some(7), + total_tokens: Some(16), + ..Default::default() + }; + + let properties = response_span_properties(&response, &usage); + + assert_eq!( + property_value(&properties, "llm.output_messages.0.message.role"), + Some("assistant") + ); + assert_eq!( + property_value( + &properties, + "llm.output_messages.0.message.tool_calls.0.tool_call.function.name", + ), + Some("get_weather"), + ); + assert_eq!( + property_value(&properties, "llm.token_count.completion_details.reasoning"), + Some("3") + ); + + let output_messages: Value = + serde_json::from_str(property_value(&properties, "gen_ai.output.messages").unwrap()) + .unwrap(); + assert_eq!(output_messages[0]["finish_reason"], "tool_calls"); + assert_eq!(output_messages[0]["parts"][1]["type"], "tool_call"); +} + +#[test] +fn stream_output_collector_accumulates_chunks_into_output_messages() { + let mut collector = StreamOutputCollector::default(); + + collector.record_chunk(&ChatCompletionChunk { + id: "chatcmpl-1".into(), + object: "chat.completion.chunk".into(), + created: 0, + model: "gpt-4o".into(), + choices: vec![ChatCompletionChunkChoice { + index: 0, + delta: ChatCompletionChunkDelta { + role: Some("assistant".into()), + content: Some("Hello ".into()), + tool_calls: Some(vec![ChunkToolCall { + index: 0, + id: Some("call_1".into()), + r#type: Some("function".into()), + function: Some(ChunkFunctionCall { + name: Some("get_weather".into()), + arguments: Some(r#"{"city":""#.into()), + }), + }]), + }, + finish_reason: None, + }], + usage: None, + system_fingerprint: None, + }); + + collector.record_chunk(&ChatCompletionChunk { + id: "chatcmpl-1".into(), + object: "chat.completion.chunk".into(), + created: 0, + model: "gpt-4o".into(), + choices: vec![ChatCompletionChunkChoice { + index: 0, + delta: ChatCompletionChunkDelta { + role: None, + content: Some("world".into()), + tool_calls: Some(vec![ChunkToolCall { + index: 0, + id: None, + r#type: None, + function: Some(ChunkFunctionCall { + name: None, + arguments: Some(r#"SF"}"#.into()), + }), + }]), + }, + finish_reason: Some("tool_calls".into()), + }], + usage: None, + system_fingerprint: None, + }); + + let properties = collector.output_message_span_properties(); + + assert_eq!( + property_value(&properties, "llm.output_messages.0.message.content"), + Some("Hello world") + ); + assert_eq!( + property_value( + &properties, + "llm.output_messages.0.message.tool_calls.0.tool_call.function.arguments", + ), + Some(r#"{"city":"SF"}"#), + ); + + let output_messages: Value = + serde_json::from_str(property_value(&properties, "gen_ai.output.messages").unwrap()) + .unwrap(); + assert_eq!(output_messages[0]["parts"][1]["arguments"]["city"], "SF"); +} diff --git a/src/proxy/handlers/embeddings/mod.rs b/src/proxy/handlers/embeddings/mod.rs index a2a43a1..bdd8e1a 100644 --- a/src/proxy/handlers/embeddings/mod.rs +++ b/src/proxy/handlers/embeddings/mod.rs @@ -12,9 +12,12 @@ pub use types::EmbeddingError; use crate::{ config::entities::{Model, ResourceEntry}, - gateway::types::{ - common::Usage, - embed::{EmbeddingRequest, EmbeddingResponse}, + gateway::{ + error::GatewayError, + types::{ + common::Usage, + embed::{EmbeddingRequest, EmbeddingResponse}, + }, }, proxy::{ AppState, @@ -54,7 +57,10 @@ pub async fn embeddings( let gateway = state.gateway(); let resources = state.resources(); - let provider_instance = create_provider_instance(gateway.as_ref(), resources.as_ref(), &model)?; + let provider = model.provider(resources.as_ref()).ok_or_else(|| { + GatewayError::Internal(format!("provider {} not found", model.provider_id)) + })?; + let provider_instance = create_provider_instance(gateway.as_ref(), &provider)?; let timeout = model.timeout.map(Duration::from_millis); // Replace request model name with real model name diff --git a/src/proxy/handlers/messages/mod.rs b/src/proxy/handlers/messages/mod.rs index 902eccf..25221bf 100644 --- a/src/proxy/handlers/messages/mod.rs +++ b/src/proxy/handlers/messages/mod.rs @@ -18,6 +18,7 @@ pub use types::MessagesError; use crate::{ config::entities::{Model, ResourceEntry}, gateway::{ + error::GatewayError, formats::AnthropicMessagesFormat, traits::ChatFormat, types::{ @@ -69,7 +70,10 @@ pub async fn messages( let gateway = state.gateway(); let resources = state.resources(); - let provider_instance = create_provider_instance(gateway.as_ref(), resources.as_ref(), &model)?; + let provider = model.provider(resources.as_ref()).ok_or_else(|| { + GatewayError::Internal(format!("provider {} not found", model.provider_id)) + })?; + let provider_instance = create_provider_instance(gateway.as_ref(), &provider)?; match maybe_timeout(timeout, gateway.messages(&request_data, &provider_instance)).await { Ok(response) => match response? { @@ -146,10 +150,7 @@ async fn handle_stream_request( Some(Ok(event)) => { if idx == 0 { hooks::observability::record_first_token_latency(&mut request_ctx).await; - span.add_event(TraceEvent::new(format!( - "{} first token arrived", - AnthropicMessagesFormat::name() - ))); + span.add_event(TraceEvent::new("first token arrived")); } let sse_event = Ok::(serialize_stream_event(&event)); diff --git a/src/proxy/hooks/authorization/mod.rs b/src/proxy/hooks/authorization/mod.rs index a024b47..a053efb 100644 --- a/src/proxy/hooks/authorization/mod.rs +++ b/src/proxy/hooks/authorization/mod.rs @@ -57,7 +57,7 @@ impl IntoResponse for AuthorizationError { } } -#[fastrace::trace] +#[fastrace::trace(name = "aisix.proxy.hook.authz")] pub async fn check(ctx: &mut RequestContext, model_name: String) -> Result<(), AuthorizationError> { let model = match ctx.app_state().resources().models.get_by_name(&model_name) { Some(model) => model, diff --git a/src/proxy/middlewares/auth.rs b/src/proxy/middlewares/auth.rs index 1f9af99..83c0c25 100644 --- a/src/proxy/middlewares/auth.rs +++ b/src/proxy/middlewares/auth.rs @@ -4,6 +4,7 @@ use axum::{ middleware::Next, response::{IntoResponse, Response}, }; +use fastrace::Span; use serde_json::json; use crate::{ @@ -53,6 +54,8 @@ pub async fn auth( mut req: Request, next: Next, ) -> Result { + let span = Span::enter_with_local_parent("aisix.proxy.middleware.authn"); + let api_key = if let Some(value) = req.headers().get(http::header::AUTHORIZATION) { let header = value.to_str().unwrap_or(""); let (prefix, rest) = header.split_at(7.min(header.len())); @@ -73,8 +76,13 @@ pub async fn auth( return Err(AuthError::InvalidApiKey); } }; + + span.add_property(|| ("aisix.apikey_id", api_key.id.clone())); + req.extensions_mut() .insert::>(api_key); + drop(span); + Ok(next.run(req).await) } diff --git a/src/proxy/middlewares/trace.rs b/src/proxy/middlewares/trace.rs index 315af05..d7fd2b2 100644 --- a/src/proxy/middlewares/trace.rs +++ b/src/proxy/middlewares/trace.rs @@ -25,6 +25,8 @@ use opentelemetry_semantic_conventions::trace::{ HTTP_REQUEST_METHOD, HTTP_RESPONSE_STATUS_CODE, HTTP_ROUTE, URL_PATH, }; +use crate::utils::future::WithSpan; + pub const TRACEPARENT_HEADER: &str = "traceparent"; pub struct TimedBody { @@ -34,6 +36,7 @@ pub struct TimedBody { metric_endpoint: String, metric_status_code: u16, latency_recorded: bool, + span: Option, } impl http_body::Body for TimedBody { @@ -47,10 +50,19 @@ impl http_body::Body for TimedBody { ) -> Poll, Self::Error>>> { let poll = Pin::new(&mut self.inner).poll_frame(cx); - if let Poll::Ready(None) = &poll { + match &poll { + Poll::Ready(Some(Ok(frame))) => { + self.on_body_chunk(frame); + } // At this moment, all frames have been consumed by hyper, but it remains uncertain whether // the data has been written to the kernel TCP buffer or sent to the client. - self.record_latency(); + Poll::Ready(None) => { + self.on_eos(); + } + Poll::Ready(Some(Err(_))) => { + self.on_eos(); + } + Poll::Pending => {} } poll @@ -75,6 +87,13 @@ impl Drop for TimedBody { } impl TimedBody { + fn on_body_chunk(&mut self, _frame: &Frame) {} + + fn on_eos(&mut self) { + self.record_latency(); + self.span.take(); + } + fn record_latency(&mut self) { if self.latency_recorded { return; @@ -107,13 +126,13 @@ pub async fn trace(mut req: Request, next: Next) -> Response { .to_string(); let path = req.uri().path().to_string(); - let (span, span_ctx) = generate_span(&req); + let (root_span, span_ctx) = generate_span(&req); // Inject span context to facilitate generate new root spans throughout the project. // A typical use case is span recording for SSE streams. req.extensions_mut().insert(span_ctx); - span.add_properties(|| { + root_span.add_properties(|| { [ (HTTP_REQUEST_METHOD, method.to_string()), (URL_PATH, path.clone()), @@ -121,20 +140,22 @@ pub async fn trace(mut req: Request, next: Next) -> Response { }); if let Some(ref route) = matched_path { - span.add_property(|| (HTTP_ROUTE, route.as_str().to_string())); + root_span.add_property(|| (HTTP_ROUTE, route.as_str().to_string())); } - let response = async { - let response = next.run(req).await; - LocalSpan::add_property(|| { - ( - HTTP_RESPONSE_STATUS_CODE, - response.status().as_u16().to_string(), - ) - }); - response - } - .in_span(span) + let (response, root_span) = (WithSpan { + inner: async { + let response = next.run(req).await; + LocalSpan::add_property(|| { + ( + HTTP_RESPONSE_STATUS_CODE, + response.status().as_u16().to_string(), + ) + }); + response + }, + span: Some(root_span), + }) .await; let (parts, body) = response.into_parts(); @@ -156,6 +177,7 @@ pub async fn trace(mut req: Request, next: Next) -> Response { metric_endpoint: metric_endpoint.clone(), metric_status_code, latency_recorded: false, + span: Some(root_span), }, ); diff --git a/src/proxy/provider.rs b/src/proxy/provider.rs index bdffbf0..d6b2723 100644 --- a/src/proxy/provider.rs +++ b/src/proxy/provider.rs @@ -2,9 +2,7 @@ use http::HeaderMap; use reqwest::Url; use crate::{ - config::entities::{ - Model, Provider, ResourceEntry, ResourceRegistry, providers::ProviderConfig, - }, + config::entities::{Provider, ResourceEntry, providers::ProviderConfig}, gateway::{ Gateway, error::{GatewayError, Result}, @@ -13,13 +11,10 @@ use crate::{ }; /// Creates a gateway provider instance for the given model using the gateway registry. -#[fastrace::trace] pub fn create_provider_instance( gateway: &Gateway, - resources: &ResourceRegistry, - model: &ResourceEntry, + provider: &ResourceEntry, ) -> Result { - let provider = resolve_provider(resources, &model.provider_id)?; let provider_name = provider.provider_type(); let def = gateway.registry().get(provider_name).ok_or_else(|| { GatewayError::Internal(format!( @@ -38,18 +33,6 @@ pub fn create_provider_instance( }) } -fn resolve_provider( - resources: &ResourceRegistry, - provider_id: &str, -) -> Result> { - resources - .providers - .list() - .get(provider_id) - .cloned() - .ok_or_else(|| GatewayError::Internal(format!("provider {} not found", provider_id))) -} - fn provider_auth_and_base_url(config: &ProviderConfig) -> Result<(ProviderAuth, Option)> { let (auth, base_url_override) = match config { ProviderConfig::Anthropic(config) => ( diff --git a/src/utils/future.rs b/src/utils/future.rs index f59714e..4923104 100644 --- a/src/utils/future.rs +++ b/src/utils/future.rs @@ -1,5 +1,10 @@ -use std::time::Duration; +use std::{ + pin::Pin, + task::{Context, Poll}, + time::Duration, +}; +use fastrace::Span; use tokio::time::error::Elapsed; pub async fn maybe_timeout(dur: Option, fut: F) -> Result @@ -12,3 +17,24 @@ where None => Ok(fut.await), } } + +#[pin_project::pin_project] +pub struct WithSpan { + #[pin] + pub inner: F, + pub span: Option, +} + +impl Future for WithSpan { + type Output = (F::Output, Span); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + let _guard = this.span.as_ref().map(|s| s.set_local_parent()); + + match this.inner.poll(cx) { + Poll::Ready(val) => Poll::Ready((val, this.span.take().unwrap())), + Poll::Pending => Poll::Pending, + } + } +} diff --git a/src/utils/observability/mod.rs b/src/utils/observability/mod.rs index 2fa9dfe..1147d83 100644 --- a/src/utils/observability/mod.rs +++ b/src/utils/observability/mod.rs @@ -19,7 +19,7 @@ use opentelemetry_sdk::{ metrics::{PeriodicReader, SdkMeterProvider}, }; use tokio::{sync::oneshot, task::JoinHandle}; -pub use trace::*; +pub use trace::{BoxedSpanExporter, DynSpanExporter}; use crate::utils; From 669d1c98060be5903e16a91e611ac3d68367f793 Mon Sep 17 00:00:00 2001 From: bzp2010 Date: Wed, 29 Apr 2026 01:02:28 +0800 Subject: [PATCH 2/4] fix comments --- src/config/entities/models.rs | 1 + src/gateway/traits/provider.rs | 7 +++++-- src/proxy/handlers/chat_completions/mod.rs | 1 - src/proxy/provider.rs | 2 +- 4 files changed, 7 insertions(+), 4 deletions(-) diff --git a/src/config/entities/models.rs b/src/config/entities/models.rs index bd019ff..5e984ac 100644 --- a/src/config/entities/models.rs +++ b/src/config/entities/models.rs @@ -36,6 +36,7 @@ pub struct Model { } impl Model { + /// Get provider of current model pub fn provider(&self, resources: &ResourceRegistry) -> Option> { resources.providers.get_by_id(&self.provider_id) } diff --git a/src/gateway/traits/provider.rs b/src/gateway/traits/provider.rs index b3392e7..96e56a3 100644 --- a/src/gateway/traits/provider.rs +++ b/src/gateway/traits/provider.rs @@ -45,6 +45,7 @@ pub(crate) fn encode_path_segment(segment: &str) -> String { utf8_percent_encode(segment, PATH_SEGMENT_ENCODE_SET).to_string() } +/// OpenTelemetry and OpenInference semantic conventions for the provider. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct ProviderSemanticConventions { pub gen_ai_provider_name: &'static str, @@ -57,6 +58,8 @@ pub trait ProviderMeta: Send + Sync + 'static { fn name(&self) -> &'static str; fn default_base_url(&self) -> &'static str; + /// Get the provider's semantic conventions. + /// Used for OpenTelemetry and OpenInference semantic conventions. fn semantic_conventions(&self) -> ProviderSemanticConventions { ProviderSemanticConventions { gen_ai_provider_name: self.name(), @@ -322,8 +325,8 @@ mod tests { use serde_json::json; use super::{ - ChatTransform, CompatQuirks, EmbedTransform, ProviderMeta, - ProviderSemanticConventions, StreamReaderKind, + ChatTransform, CompatQuirks, EmbedTransform, ProviderMeta, ProviderSemanticConventions, + StreamReaderKind, }; use crate::gateway::{ provider_instance::ProviderAuth, diff --git a/src/proxy/handlers/chat_completions/mod.rs b/src/proxy/handlers/chat_completions/mod.rs index 8865a55..fa8f381 100644 --- a/src/proxy/handlers/chat_completions/mod.rs +++ b/src/proxy/handlers/chat_completions/mod.rs @@ -97,7 +97,6 @@ pub async fn chat_completions( handle_regular_request(response, usage, &mut request_ctx).await } Ok(Ok(ChatResponse::Stream { stream, usage_rx })) => { - LocalSpan::add_property(|| ("error.type", "no_stream")); handle_stream_request(stream, usage_rx, &mut request_ctx, span).await } Ok(Err(err)) => { diff --git a/src/proxy/provider.rs b/src/proxy/provider.rs index d6b2723..9601147 100644 --- a/src/proxy/provider.rs +++ b/src/proxy/provider.rs @@ -10,7 +10,7 @@ use crate::{ }, }; -/// Creates a gateway provider instance for the given model using the gateway registry. +/// Creates a gateway provider instance for the given provider using the gateway registry. pub fn create_provider_instance( gateway: &Gateway, provider: &ResourceEntry, From 08c81920d89f4700a74614dd6e92fba9183685b4 Mon Sep 17 00:00:00 2001 From: bzp2010 Date: Wed, 29 Apr 2026 11:14:17 +0800 Subject: [PATCH 3/4] first token event --- src/proxy/handlers/chat_completions/mod.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/proxy/handlers/chat_completions/mod.rs b/src/proxy/handlers/chat_completions/mod.rs index fa8f381..2dd6ccc 100644 --- a/src/proxy/handlers/chat_completions/mod.rs +++ b/src/proxy/handlers/chat_completions/mod.rs @@ -187,7 +187,10 @@ async fn handle_stream_request( if idx == 0 { hooks::observability::record_first_token_latency(&mut request_ctx).await; - span.add_event(TraceEvent::new("first token arrived")); + span.add_event( + TraceEvent::new("first token arrived") + .with_property(|| ("kind", "first_token_arrived")), + ); span.add_properties(|| chunk_span_properties(&chunk)); } else { let properties = chunk_span_properties(&chunk); From 75d1070f70456b5bbd090d80499ca26075da1a4f Mon Sep 17 00:00:00 2001 From: bzp2010 Date: Wed, 29 Apr 2026 11:24:51 +0800 Subject: [PATCH 4/4] fix comment --- src/proxy/handlers/chat_completions/mod.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/proxy/handlers/chat_completions/mod.rs b/src/proxy/handlers/chat_completions/mod.rs index 2dd6ccc..ddb2065 100644 --- a/src/proxy/handlers/chat_completions/mod.rs +++ b/src/proxy/handlers/chat_completions/mod.rs @@ -93,18 +93,18 @@ pub async fn chat_completions( match response { Ok(Ok(ChatResponse::Complete { response, usage })) => { - LocalSpan::add_properties(|| response_span_properties(&response, &usage)); + span.add_properties(|| response_span_properties(&response, &usage)); handle_regular_request(response, usage, &mut request_ctx).await } Ok(Ok(ChatResponse::Stream { stream, usage_rx })) => { handle_stream_request(stream, usage_rx, &mut request_ctx, span).await } Ok(Err(err)) => { - LocalSpan::add_property(|| ("error.type", "gateway_error")); + span.add_property(|| ("error.type", "gateway_error")); Err(err.into()) } Err(err) => { - LocalSpan::add_property(|| ("error.type", "timeout")); + span.add_property(|| ("error.type", "timeout")); Err(ChatCompletionError::Timeout(err)) } }