diff --git a/src/proxy/handlers/chat_completions/span_attributes/message_attributes.rs b/src/proxy/handlers/chat_completions/span_attributes/message_attributes.rs index 3963c5f..058efd5 100644 --- a/src/proxy/handlers/chat_completions/span_attributes/message_attributes.rs +++ b/src/proxy/handlers/chat_completions/span_attributes/message_attributes.rs @@ -1,43 +1,16 @@ -use serde::Serialize; use serde_json::{Map, Value}; -use crate::gateway::types::openai::{ - ChatCompletionResponse, ChatMessage, ContentPart, MessageContent, Tool, +pub(super) use crate::proxy::utils::trace::span_message_attributes::{ + ContentPartView, MessageContentView, MessageView, OutputMessageView, ToolCallView, + append_openinference_message_properties, append_openinference_output_message_properties, + gen_ai_input_messages_json, gen_ai_output_messages_json, +}; +use crate::{ + gateway::types::openai::{ + ChatCompletionResponse, ChatMessage, ContentPart, MessageContent, Tool, + }, + proxy::utils::trace::span_message_attributes::serialize_to_json_string, }; - -#[derive(Clone)] -pub(super) enum MessageContentView { - Text(String), - Parts(Vec), -} - -#[derive(Clone)] -pub(super) enum ContentPartView { - Text(String), - ImageUrl { url: String }, -} - -#[derive(Clone)] -pub(super) struct ToolCallView { - pub(super) id: Option, - pub(super) name: String, - pub(super) arguments: String, -} - -#[derive(Clone)] -pub(super) struct MessageView { - pub(super) role: String, - pub(super) content: Option, - pub(super) name: Option, - pub(super) tool_calls: Vec, - pub(super) tool_call_id: Option, -} - -#[derive(Clone)] -pub(super) struct OutputMessageView { - pub(super) message: MessageView, - pub(super) finish_reason: Option, -} pub(super) fn message_view_from_chat_message(message: &ChatMessage) -> MessageView { MessageView { @@ -75,64 +48,6 @@ pub(super) fn response_output_message_views( .collect() } -pub(super) fn gen_ai_input_messages_json(messages: &[MessageView]) -> Option { - if messages.is_empty() { - return None; - } - - let values: Vec = messages - .iter() - .map(|message| { - let mut value = Map::new(); - value.insert("role".into(), Value::String(message.role.clone())); - value.insert("parts".into(), Value::Array(gen_ai_message_parts(message))); - - if let Some(name) = &message.name { - value.insert("name".into(), Value::String(name.clone())); - } - - Value::Object(value) - }) - .collect(); - - serialize_to_json_string(&values) -} - -pub(super) fn gen_ai_output_messages_json(messages: &[OutputMessageView]) -> Option { - if messages.is_empty() { - return None; - } - - let values: Vec = messages - .iter() - .map(|message| { - let mut value = Map::new(); - value.insert("role".into(), Value::String(message.message.role.clone())); - value.insert( - "parts".into(), - Value::Array(gen_ai_message_parts(&message.message)), - ); - value.insert( - "finish_reason".into(), - Value::String( - message - .finish_reason - .clone() - .unwrap_or_else(|| "unknown".into()), - ), - ); - - if let Some(name) = &message.message.name { - value.insert("name".into(), Value::String(name.clone())); - } - - Value::Object(value) - }) - .collect(); - - serialize_to_json_string(&values) -} - pub(super) fn gen_ai_tool_definitions_json(tools: &[Tool]) -> Option { if tools.is_empty() { return None; @@ -160,80 +75,6 @@ pub(super) fn gen_ai_tool_definitions_json(tools: &[Tool]) -> Option { serialize_to_json_string(&values) } -pub(super) fn append_openinference_message_properties( - properties: &mut Vec<(String, String)>, - prefix: &str, - messages: &[MessageView], -) { - for (message_index, message) in messages.iter().enumerate() { - let prefix = format!("{prefix}.{message_index}.message"); - properties.push((format!("{prefix}.role"), message.role.clone())); - - if let Some(name) = &message.name { - properties.push((format!("{prefix}.name"), name.clone())); - } - - if let Some(tool_call_id) = &message.tool_call_id { - properties.push((format!("{prefix}.tool_call_id"), tool_call_id.clone())); - } - - match &message.content { - Some(MessageContentView::Text(text)) if !text.is_empty() => { - properties.push((format!("{prefix}.content"), text.clone())); - } - Some(MessageContentView::Parts(parts)) => { - for (content_index, part) in parts.iter().enumerate() { - let part_prefix = format!("{prefix}.contents.{content_index}.message_content"); - match part { - ContentPartView::Text(text) => { - properties.push((format!("{part_prefix}.type"), "text".into())); - properties.push((format!("{part_prefix}.text"), text.clone())); - } - ContentPartView::ImageUrl { url } => { - properties.push((format!("{part_prefix}.type"), "image".into())); - properties - .push((format!("{part_prefix}.image.image.url"), url.clone())); - } - } - } - } - _ => {} - } - - for (tool_call_index, tool_call) in message.tool_calls.iter().enumerate() { - let tool_call_prefix = format!("{prefix}.tool_calls.{tool_call_index}.tool_call"); - - if let Some(id) = &tool_call.id { - properties.push((format!("{tool_call_prefix}.id"), id.clone())); - } - - properties.push(( - format!("{tool_call_prefix}.function.name"), - tool_call.name.clone(), - )); - - if !tool_call.arguments.is_empty() { - properties.push(( - format!("{tool_call_prefix}.function.arguments"), - tool_call.arguments.clone(), - )); - } - } - } -} - -pub(super) fn append_openinference_output_message_properties( - properties: &mut Vec<(String, String)>, - prefix: &str, - messages: &[OutputMessageView], -) { - let message_views: Vec<_> = messages - .iter() - .map(|message| message.message.clone()) - .collect(); - append_openinference_message_properties(properties, prefix, &message_views); -} - pub(super) fn append_openinference_tool_properties( properties: &mut Vec<(String, String)>, tools: &[Tool], @@ -258,14 +99,6 @@ pub(super) fn append_openinference_tool_properties( } } -fn serialize_to_json_string(value: &impl Serialize) -> Option { - serde_json::to_string(value).ok() -} - -fn parse_json_or_string(raw: &str) -> Value { - serde_json::from_str(raw).unwrap_or_else(|_| Value::String(raw.to_string())) -} - fn content_part_view_from_content_part(part: &ContentPart) -> ContentPartView { match part { ContentPart::Text { text } => ContentPartView::Text(text.clone()), @@ -286,79 +119,3 @@ fn message_content_view_from_message_content(content: &MessageContent) -> Messag ), } } - -fn gen_ai_message_parts(message: &MessageView) -> Vec { - if message.role == "tool" { - let mut part = Map::new(); - part.insert("type".into(), Value::String("tool_call_response".into())); - - if let Some(tool_call_id) = &message.tool_call_id { - part.insert("id".into(), Value::String(tool_call_id.clone())); - } - - let response = match &message.content { - Some(MessageContentView::Text(text)) => parse_json_or_string(text), - Some(MessageContentView::Parts(parts)) => { - Value::Array(parts.iter().map(gen_ai_content_part_value).collect()) - } - None => Value::Null, - }; - part.insert("response".into(), response); - - return vec![Value::Object(part)]; - } - - let mut parts = Vec::new(); - - match &message.content { - Some(MessageContentView::Text(text)) if !text.is_empty() => { - parts.push(gen_ai_text_part(text)); - } - Some(MessageContentView::Parts(content_parts)) => { - parts.extend(content_parts.iter().map(gen_ai_content_part_value)); - } - _ => {} - } - - for tool_call in &message.tool_calls { - let mut part = Map::new(); - part.insert("type".into(), Value::String("tool_call".into())); - - if let Some(id) = &tool_call.id { - part.insert("id".into(), Value::String(id.clone())); - } - - part.insert("name".into(), Value::String(tool_call.name.clone())); - - if !tool_call.arguments.is_empty() { - part.insert( - "arguments".into(), - parse_json_or_string(&tool_call.arguments), - ); - } - - parts.push(Value::Object(part)); - } - - parts -} - -fn gen_ai_text_part(text: &str) -> Value { - let mut part = Map::new(); - part.insert("type".into(), Value::String("text".into())); - part.insert("content".into(), Value::String(text.to_string())); - Value::Object(part) -} - -fn gen_ai_content_part_value(part: &ContentPartView) -> Value { - match part { - ContentPartView::Text(text) => gen_ai_text_part(text), - ContentPartView::ImageUrl { url } => { - let mut part = Map::new(); - part.insert("type".into(), Value::String("uri".into())); - part.insert("modality".into(), Value::String("image".into())); - part.insert("uri".into(), Value::String(url.clone())); - Value::Object(part) - } - } -} diff --git a/src/proxy/handlers/chat_completions/span_attributes/mod.rs b/src/proxy/handlers/chat_completions/span_attributes/mod.rs index 6b22c33..4ee6282 100644 --- a/src/proxy/handlers/chat_completions/span_attributes/mod.rs +++ b/src/proxy/handlers/chat_completions/span_attributes/mod.rs @@ -4,8 +4,11 @@ mod telemetry; pub(super) use stream_output::StreamOutputCollector; pub(super) use telemetry::{ - apply_span_properties, chunk_span_properties, request_span_properties, - response_span_properties, usage_span_properties, + chunk_span_properties, request_span_properties, response_span_properties, +}; + +pub(super) use crate::proxy::utils::trace::span_attributes::{ + apply_span_properties, usage_span_properties, }; #[cfg(test)] diff --git a/src/proxy/handlers/chat_completions/span_attributes/stream_output.rs b/src/proxy/handlers/chat_completions/span_attributes/stream_output.rs index 8c6b6e5..a51360d 100644 --- a/src/proxy/handlers/chat_completions/span_attributes/stream_output.rs +++ b/src/proxy/handlers/chat_completions/span_attributes/stream_output.rs @@ -1,10 +1,10 @@ use std::collections::BTreeMap; -use super::message_attributes::{ - MessageContentView, MessageView, OutputMessageView, ToolCallView, - append_openinference_output_message_properties, gen_ai_output_messages_json, +use super::message_attributes::{MessageContentView, MessageView, OutputMessageView, ToolCallView}; +use crate::{ + gateway::types::openai::ChatCompletionChunk, + proxy::utils::trace::span_message_attributes::output_message_span_properties, }; -use crate::gateway::types::openai::ChatCompletionChunk; #[derive(Default)] struct StreamOutputToolCall { @@ -72,20 +72,7 @@ impl StreamOutputCollector { pub(in crate::proxy::handlers::chat_completions) fn output_message_span_properties( &self, ) -> Vec<(String, String)> { - let output_messages = self.output_message_views(); - let mut properties = Vec::new(); - - append_openinference_output_message_properties( - &mut properties, - "llm.output_messages", - &output_messages, - ); - - if let Some(value) = gen_ai_output_messages_json(&output_messages) { - properties.push(("gen_ai.output.messages".into(), value)); - } - - properties + output_message_span_properties(&self.output_message_views()) } fn output_message_views(&self) -> Vec { diff --git a/src/proxy/handlers/chat_completions/span_attributes/telemetry.rs b/src/proxy/handlers/chat_completions/span_attributes/telemetry.rs index c6493e6..e40171e 100644 --- a/src/proxy/handlers/chat_completions/span_attributes/telemetry.rs +++ b/src/proxy/handlers/chat_completions/span_attributes/telemetry.rs @@ -1,26 +1,25 @@ -use fastrace::prelude::Span; use reqwest::Url; use serde_json::{Map, Value}; +use super::message_attributes::{ + append_openinference_message_properties, append_openinference_output_message_properties, + append_openinference_tool_properties, gen_ai_input_messages_json, gen_ai_output_messages_json, + gen_ai_tool_definitions_json, message_view_from_chat_message, response_output_message_views, +}; use crate::{ - gateway::types::{ - common::Usage, - openai::{ - ChatCompletionChoice, ChatCompletionChunk, ChatCompletionChunkChoice, - ChatCompletionRequest, ChatCompletionResponse, ChatCompletionUsage, - ResponseFormat, StopCondition, + gateway::{ + traits::ProviderCapabilities, + types::{ + common::Usage, + openai::{ + ChatCompletionChoice, ChatCompletionChunk, ChatCompletionChunkChoice, + ChatCompletionRequest, ChatCompletionResponse, ChatCompletionUsage, ResponseFormat, + StopCondition, + }, }, }, - gateway::traits::ProviderCapabilities, -}; - -use super::{ - message_attributes::{ - append_openinference_message_properties, - append_openinference_output_message_properties, - append_openinference_tool_properties, gen_ai_input_messages_json, - gen_ai_output_messages_json, gen_ai_tool_definitions_json, - message_view_from_chat_message, response_output_message_views, + proxy::utils::trace::span_attributes::{ + append_finish_reason_properties, append_usage_properties, collect_finish_reasons, }, }; @@ -42,7 +41,10 @@ pub(in crate::proxy::handlers::chat_completions) fn request_span_properties( "gen_ai.provider.name".into(), provider_semantics.gen_ai_provider_name.to_string(), ), - ("llm.system".into(), provider_semantics.llm_system.to_string()), + ( + "llm.system".into(), + provider_semantics.llm_system.to_string(), + ), ("gen_ai.request.model".into(), request.model.clone()), ]; @@ -101,11 +103,7 @@ pub(in crate::proxy::handlers::chat_completions) fn request_span_properties( properties.push(("user.id".into(), user_id.clone())); } - append_openinference_message_properties( - &mut properties, - "llm.input_messages", - &input_messages, - ); + append_openinference_message_properties(&mut properties, "llm.input_messages", &input_messages); if let Some(value) = gen_ai_input_messages_json(&input_messages) { properties.push(("gen_ai.input.messages".into(), value)); @@ -183,25 +181,6 @@ pub(in crate::proxy::handlers::chat_completions) fn chunk_span_properties( properties } -pub(in crate::proxy::handlers::chat_completions) fn usage_span_properties( - usage: &Usage, -) -> Vec<(String, String)> { - let mut properties = Vec::new(); - append_usage_properties(&mut properties, usage); - properties -} - -pub(in crate::proxy::handlers::chat_completions) fn apply_span_properties( - span: &Span, - properties: Vec<(String, String)>, -) { - if properties.is_empty() { - return; - } - - span.add_properties(move || properties); -} - fn response_format_output_type(response_format: Option<&ResponseFormat>) -> Option<&'static str> { match response_format?.r#type.as_str() { "json_object" | "json_schema" => Some("json"), @@ -305,95 +284,6 @@ fn request_invocation_parameters(request: &ChatCompletionRequest) -> Option(finish_reasons: I) -> Vec -where - I: IntoIterator>, -{ - let mut values = Vec::new(); - - for finish_reason in finish_reasons.into_iter().flatten() { - if !values.iter().any(|value| value == &finish_reason) { - values.push(finish_reason); - } - } - - values -} - -fn append_finish_reason_properties( - properties: &mut Vec<(String, String)>, - finish_reasons: Vec, -) { - if finish_reasons.is_empty() { - return; - } - - properties.push(( - "gen_ai.response.finish_reasons".into(), - serde_json::to_string(&finish_reasons).unwrap_or_default(), - )); - - if let Some(finish_reason) = finish_reasons.first() { - properties.push(("llm.finish_reason".into(), finish_reason.clone())); - } -} - -fn append_usage_properties(properties: &mut Vec<(String, String)>, usage: &Usage) { - if let Some(input_tokens) = usage.input_tokens { - let input_tokens = input_tokens.to_string(); - properties.push(("gen_ai.usage.input_tokens".into(), input_tokens.clone())); - properties.push(("llm.token_count.prompt".into(), input_tokens)); - } - - if let Some(output_tokens) = usage.output_tokens { - let output_tokens = output_tokens.to_string(); - properties.push(("gen_ai.usage.output_tokens".into(), output_tokens.clone())); - properties.push(("llm.token_count.completion".into(), output_tokens)); - } - - if let Some(total_tokens) = usage.resolved_total_tokens() { - properties.push(("llm.token_count.total".into(), total_tokens.to_string())); - } - - if let Some(cache_creation_input_tokens) = usage.cache_creation_input_tokens { - let cache_creation_input_tokens = cache_creation_input_tokens.to_string(); - properties.push(( - "gen_ai.usage.cache_creation.input_tokens".into(), - cache_creation_input_tokens.clone(), - )); - properties.push(( - "llm.token_count.prompt_details.cache_write".into(), - cache_creation_input_tokens, - )); - } - - if let Some(cache_read_input_tokens) = usage.cache_read_input_tokens { - let cache_read_input_tokens = cache_read_input_tokens.to_string(); - properties.push(( - "gen_ai.usage.cache_read.input_tokens".into(), - cache_read_input_tokens.clone(), - )); - properties.push(( - "llm.token_count.prompt_details.cache_read".into(), - cache_read_input_tokens, - )); - } - - if let Some(input_audio_tokens) = usage.input_audio_tokens { - properties.push(( - "llm.token_count.prompt_details.audio".into(), - input_audio_tokens.to_string(), - )); - } - - if let Some(output_audio_tokens) = usage.output_audio_tokens { - properties.push(( - "llm.token_count.completion_details.audio".into(), - output_audio_tokens.to_string(), - )); - } -} - fn append_response_usage_properties( properties: &mut Vec<(String, String)>, usage: &Usage, diff --git a/src/proxy/handlers/messages/mod.rs b/src/proxy/handlers/messages/mod.rs index 25221bf..565a37d 100644 --- a/src/proxy/handlers/messages/mod.rs +++ b/src/proxy/handlers/messages/mod.rs @@ -1,10 +1,11 @@ +mod span_attributes; mod types; use std::{convert::Infallible, time::Duration}; use axum::{ Json, - extract::{Extension, State}, + extract::State, response::{ IntoResponse, Response, sse::{Event as SseEvent, Sse}, @@ -12,7 +13,11 @@ use axum::{ }; use fastrace::prelude::{Event as TraceEvent, *}; use log::error; -use tokio::sync::oneshot; +use span_attributes::{ + StreamOutputCollector, apply_span_properties, chunk_span_properties, request_span_properties, + response_span_properties, usage_span_properties, +}; +use tokio::sync::{oneshot, oneshot::error::TryRecvError}; pub use types::MessagesError; use crate::{ @@ -34,7 +39,7 @@ use crate::{ hooks::{self, RequestContext}, provider::create_provider_instance, }, - utils::future::maybe_timeout, + utils::future::{WithSpan, maybe_timeout}, }; /// Handles Anthropic Messages API requests on `/v1/messages`. @@ -43,10 +48,8 @@ use crate::{ /// context from `AppState`, `SpanContext`, and `RequestContext`, and returns /// either a complete Anthropic Messages JSON response or an SSE stream of /// Anthropic stream events. Failures are mapped into `MessagesError`. -#[fastrace::trace] pub async fn messages( State(state): State, - Extension(span_ctx): Extension, mut request_ctx: RequestContext, Json(mut request_data): Json, ) -> Result { @@ -74,21 +77,43 @@ pub async fn messages( GatewayError::Internal(format!("provider {} not found", model.provider_id)) })?; let provider_instance = create_provider_instance(gateway.as_ref(), &provider)?; + let provider_base_url = provider_instance.effective_base_url().ok(); - match maybe_timeout(timeout, gateway.messages(&request_data, &provider_instance)).await { - Ok(response) => match response? { - ChatResponse::Complete { response, usage } => { - handle_regular_request(response, usage, &mut request_ctx).await - } - ChatResponse::Stream { stream, usage_rx } => { - handle_stream_request(stream, usage_rx, &mut request_ctx, span_ctx).await - } - }, - Err(err) => Err(MessagesError::Timeout(err)), + let span = Span::enter_with_local_parent("aisix.llm.messages"); + apply_span_properties( + &span, + request_span_properties( + &request_data, + provider_instance.def.as_ref(), + provider_base_url.as_ref(), + ), + ); + + let (response, span) = (WithSpan { + inner: maybe_timeout(timeout, gateway.messages(&request_data, &provider_instance)), + span: Some(span), + }) + .await; + + match response { + Ok(Ok(ChatResponse::Complete { response, usage })) => { + span.add_properties(|| response_span_properties(&response, &usage)); + handle_regular_request(response, usage, &mut request_ctx).await + } + Ok(Ok(ChatResponse::Stream { stream, usage_rx })) => { + handle_stream_request(stream, usage_rx, &mut request_ctx, span).await + } + Ok(Err(err)) => { + span.add_property(|| ("error.type", "gateway_error")); + Err(err.into()) + } + Err(err) => { + span.add_property(|| ("error.type", "timeout")); + Err(MessagesError::Timeout(err)) + } } } -#[fastrace::trace] async fn handle_regular_request( response: AnthropicMessagesResponse, usage: Usage, @@ -125,22 +150,36 @@ fn spawn_stream_usage_observer(request_ctx: RequestContext, usage_rx: oneshot::R }); } -#[fastrace::trace] async fn handle_stream_request( stream: ChatResponseStream, usage_rx: oneshot::Receiver, request_ctx: &mut RequestContext, - span_ctx: SpanContext, + span: Span, ) -> Result { use futures::stream::StreamExt; - spawn_stream_usage_observer(request_ctx.clone(), usage_rx); - let stream_request_ctx = request_ctx.clone(); - let stream_span = Span::root("messages_sse_connection", span_ctx); let sse_stream = futures::stream::unfold( - (stream, stream_span, 0usize, stream_request_ctx, false), - |(mut stream, span, idx, mut request_ctx, should_terminate)| async move { + ( + stream, + span, + 0usize, + stream_request_ctx, + false, + Some(usage_rx), + StreamOutputCollector::default(), + false, + ), + |( + mut stream, + span, + idx, + mut request_ctx, + should_terminate, + mut usage_rx, + mut output_collector, + mut first_token_arrived, + )| async move { if should_terminate { drop(span); return None; @@ -148,23 +187,90 @@ async fn handle_stream_request( match stream.next().await { Some(Ok(event)) => { - if idx == 0 { + output_collector.record_event(&event); + + if let AnthropicStreamEvent::ContentBlockStart { .. } = event + && !first_token_arrived + { + first_token_arrived = true; hooks::observability::record_first_token_latency(&mut request_ctx).await; - span.add_event(TraceEvent::new("first token arrived")); + span.add_event( + TraceEvent::new("first token arrived") + .with_property(|| ("kind", "first_token_arrived")), + ); } + span.add_properties(|| chunk_span_properties(&event)); + let sse_event = Ok::(serialize_stream_event(&event)); - Some((sse_event, (stream, span, idx + 1, request_ctx, false))) + Some(( + sse_event, + ( + stream, + span, + idx + 1, + request_ctx, + false, + usage_rx, + output_collector, + first_token_arrived, + ), + )) } Some(Err(err)) => { error!("Gateway stream error: {}", err); + span.add_property(|| ("error.type", "stream_error")); + span.add_properties(|| output_collector.output_message_span_properties()); + if let Some(usage_rx) = usage_rx.take() { + spawn_stream_usage_observer(request_ctx.clone(), usage_rx); + } Some(( Ok(anthropic_error_sse_event(err.to_string())), - (stream, span, idx + 1, request_ctx, true), + ( + stream, + span, + idx + 1, + request_ctx, + true, + usage_rx, + output_collector, + first_token_arrived, + ), )) } None => { + span.add_properties(|| output_collector.output_message_span_properties()); + + if let Some(mut usage_rx) = usage_rx.take() { + match usage_rx.try_recv() { + Ok(usage) => { + if let Err(err) = hooks::rate_limit::post_check_streaming( + &mut request_ctx, + &usage, + ) + .await + { + error!("Rate limit post_check_streaming error: {}", err); + } + hooks::observability::record_streaming_usage( + &mut request_ctx, + &usage, + ) + .await; + span.add_properties(|| usage_span_properties(&usage)); + } + Err(TryRecvError::Empty) => { + spawn_stream_usage_observer(request_ctx.clone(), usage_rx); + } + Err(TryRecvError::Closed) => { + error!( + "Failed to receive streaming usage from gateway: channel closed" + ); + } + } + } + drop(span); None } diff --git a/src/proxy/handlers/messages/span_attributes/message_attributes.rs b/src/proxy/handlers/messages/span_attributes/message_attributes.rs new file mode 100644 index 0000000..6053e19 --- /dev/null +++ b/src/proxy/handlers/messages/span_attributes/message_attributes.rs @@ -0,0 +1,245 @@ +use serde_json::{Map, Value}; + +pub(super) use crate::proxy::utils::trace::span_message_attributes::{ + ContentPartView, MessageContentView, MessageView, OutputMessageView, ToolCallView, + append_openinference_message_properties, append_openinference_output_message_properties, + gen_ai_input_messages_json, gen_ai_output_messages_json, + message_content_view_from_content_parts, +}; +use crate::{ + gateway::types::anthropic::{ + AnthropicContent, AnthropicContentBlock, AnthropicMessage, AnthropicMessagesRequest, + AnthropicMessagesResponse, AnthropicTool, ImageSource, SystemPrompt, + }, + proxy::utils::trace::span_message_attributes::serialize_to_json_string, +}; + +pub(super) fn request_input_message_views(request: &AnthropicMessagesRequest) -> Vec { + let mut messages = system_prompt_message_views(request.system.as_ref()); + + for message in &request.messages { + messages.extend(message_views_from_anthropic_message(message)); + } + + messages +} + +pub(super) fn response_output_message_views( + response: &AnthropicMessagesResponse, +) -> Vec { + vec![OutputMessageView { + message: message_view_from_blocks(&response.role, &response.content), + finish_reason: response.stop_reason.clone(), + }] +} + +pub(super) fn gen_ai_tool_definitions_json(tools: &[AnthropicTool]) -> Option { + if tools.is_empty() { + return None; + } + + let values: Vec = tools.iter().map(anthropic_tool_to_json).collect(); + + serialize_to_json_string(&values) +} + +pub(super) fn append_openinference_tool_properties( + properties: &mut Vec<(String, String)>, + tools: &[AnthropicTool], +) { + for (tool_index, tool) in tools.iter().enumerate() { + let prefix = format!("llm.tools.{tool_index}.tool"); + properties.push((format!("{prefix}.name"), tool.name.clone())); + + if let Some(description) = &tool.description { + properties.push((format!("{prefix}.description"), description.clone())); + } + + if let Some(value) = serialize_to_json_string(&tool.input_schema) { + properties.push((format!("{prefix}.parameters"), value)); + } + + if let Some(value) = serialize_to_json_string(&anthropic_tool_to_json(tool)) { + properties.push((format!("{prefix}.json_schema"), value)); + } + } +} + +pub(super) fn message_view_from_blocks( + role: &str, + blocks: &[AnthropicContentBlock], +) -> MessageView { + MessageView { + role: role.to_string(), + content: message_content_view_from_content_parts(content_parts_from_blocks(blocks)), + name: None, + tool_calls: tool_calls_from_blocks(blocks), + tool_call_id: None, + } +} + +pub(super) fn image_source_to_data_url(source: &ImageSource) -> String { + if source.r#type == "base64" { + format!("data:{};base64,{}", source.media_type, source.data) + } else { + source.data.clone() + } +} + +fn anthropic_tool_to_json(tool: &AnthropicTool) -> Value { + let mut value = Map::new(); + value.insert("type".into(), Value::String("function".into())); + value.insert("name".into(), Value::String(tool.name.clone())); + + if let Some(description) = &tool.description { + value.insert("description".into(), Value::String(description.clone())); + } + + value.insert("parameters".into(), tool.input_schema.clone()); + Value::Object(value) +} + +fn system_prompt_message_views(system: Option<&SystemPrompt>) -> Vec { + match system { + None => vec![], + Some(SystemPrompt::Text(text)) => vec![MessageView { + role: "system".into(), + content: Some(MessageContentView::Text(text.clone())), + name: None, + tool_calls: Vec::new(), + tool_call_id: None, + }], + Some(SystemPrompt::Blocks(blocks)) => blocks + .iter() + .map(|block| MessageView { + role: "system".into(), + content: Some(MessageContentView::Text(block.text.clone())), + name: None, + tool_calls: Vec::new(), + tool_call_id: None, + }) + .collect(), + } +} + +fn message_views_from_anthropic_message(message: &AnthropicMessage) -> Vec { + match &message.content { + AnthropicContent::Text(text) => vec![MessageView { + role: message.role.clone(), + content: Some(MessageContentView::Text(text.clone())), + name: None, + tool_calls: Vec::new(), + tool_call_id: None, + }], + AnthropicContent::Blocks(blocks) => { + if message.role == "user" { + return user_message_views_from_blocks(blocks); + } + + vec![message_view_from_blocks(&message.role, blocks)] + } + } +} + +fn user_message_views_from_blocks(blocks: &[AnthropicContentBlock]) -> Vec { + let mut messages = Vec::new(); + let mut pending_blocks = Vec::new(); + + for block in blocks { + match block { + AnthropicContentBlock::Text { .. } + | AnthropicContentBlock::Image { .. } + | AnthropicContentBlock::ToolUse { .. } => pending_blocks.push(block.clone()), + AnthropicContentBlock::ToolResult { + tool_use_id, + content, + .. + } => { + if let Some(message) = message_view_from_pending_user_blocks(&pending_blocks) { + messages.push(message); + pending_blocks.clear(); + } + + messages.push(MessageView { + role: "tool".into(), + content: anthropic_optional_content_to_message_content_view(content.as_ref()), + name: None, + tool_calls: Vec::new(), + tool_call_id: Some(tool_use_id.clone()), + }); + } + } + } + + if let Some(message) = message_view_from_pending_user_blocks(&pending_blocks) { + messages.push(message); + } + + if messages.is_empty() { + messages.push(MessageView { + role: "user".into(), + content: Some(MessageContentView::Text(String::new())), + name: None, + tool_calls: Vec::new(), + tool_call_id: None, + }); + } + + messages +} + +fn message_view_from_pending_user_blocks(blocks: &[AnthropicContentBlock]) -> Option { + if blocks.is_empty() { + return None; + } + + Some(message_view_from_blocks("user", blocks)) +} + +fn anthropic_optional_content_to_message_content_view( + content: Option<&AnthropicContent>, +) -> Option { + content.and_then(anthropic_content_to_message_content_view) +} + +fn anthropic_content_to_message_content_view( + content: &AnthropicContent, +) -> Option { + match content { + AnthropicContent::Text(text) => Some(MessageContentView::Text(text.clone())), + AnthropicContent::Blocks(blocks) => { + message_content_view_from_content_parts(content_parts_from_blocks(blocks)) + } + } +} + +fn content_parts_from_blocks(blocks: &[AnthropicContentBlock]) -> Vec { + blocks + .iter() + .filter_map(|block| match block { + AnthropicContentBlock::Text { text, .. } => Some(ContentPartView::Text(text.clone())), + AnthropicContentBlock::Image { source, .. } => Some(ContentPartView::ImageUrl { + url: image_source_to_data_url(source), + }), + AnthropicContentBlock::ToolUse { .. } | AnthropicContentBlock::ToolResult { .. } => { + None + } + }) + .collect() +} + +fn tool_calls_from_blocks(blocks: &[AnthropicContentBlock]) -> Vec { + blocks + .iter() + .filter_map(|block| match block { + AnthropicContentBlock::ToolUse { + id, name, input, .. + } => Some(ToolCallView { + id: Some(id.clone()), + name: name.clone(), + arguments: input.to_string(), + }), + _ => None, + }) + .collect() +} diff --git a/src/proxy/handlers/messages/span_attributes/mod.rs b/src/proxy/handlers/messages/span_attributes/mod.rs new file mode 100644 index 0000000..4ee6282 --- /dev/null +++ b/src/proxy/handlers/messages/span_attributes/mod.rs @@ -0,0 +1,15 @@ +mod message_attributes; +mod stream_output; +mod telemetry; + +pub(super) use stream_output::StreamOutputCollector; +pub(super) use telemetry::{ + chunk_span_properties, request_span_properties, response_span_properties, +}; + +pub(super) use crate::proxy::utils::trace::span_attributes::{ + apply_span_properties, usage_span_properties, +}; + +#[cfg(test)] +mod tests; diff --git a/src/proxy/handlers/messages/span_attributes/stream_output.rs b/src/proxy/handlers/messages/span_attributes/stream_output.rs new file mode 100644 index 0000000..f0c701a --- /dev/null +++ b/src/proxy/handlers/messages/span_attributes/stream_output.rs @@ -0,0 +1,155 @@ +use std::collections::BTreeMap; + +use super::message_attributes::{ + ContentPartView, MessageView, OutputMessageView, ToolCallView, image_source_to_data_url, + message_content_view_from_content_parts, +}; +use crate::{ + gateway::types::anthropic::{AnthropicContentBlock, AnthropicStreamEvent, ContentDelta}, + proxy::utils::trace::span_message_attributes::output_message_span_properties, +}; + +enum StreamOutputBlock { + Text(String), + ImageUrl { + url: String, + }, + ToolUse { + id: Option, + name: String, + arguments: String, + }, +} + +#[derive(Default)] +pub(in crate::proxy::handlers::messages) struct StreamOutputCollector { + role: Option, + blocks: BTreeMap, + finish_reason: Option, +} + +impl StreamOutputCollector { + pub(in crate::proxy::handlers::messages) fn record_event( + &mut self, + event: &AnthropicStreamEvent, + ) { + match event { + AnthropicStreamEvent::MessageStart { message } => { + self.role = Some(message.role.clone()); + } + AnthropicStreamEvent::ContentBlockStart { + index, + content_block, + } => match content_block { + AnthropicContentBlock::Text { text, .. } => { + self.blocks + .insert(*index, StreamOutputBlock::Text(text.clone())); + } + AnthropicContentBlock::Image { source, .. } => { + self.blocks.insert( + *index, + StreamOutputBlock::ImageUrl { + url: image_source_to_data_url(source), + }, + ); + } + AnthropicContentBlock::ToolUse { + id, name, input, .. + } => { + self.blocks.insert( + *index, + StreamOutputBlock::ToolUse { + id: Some(id.clone()), + name: name.clone(), + arguments: match input { + serde_json::Value::Object(map) if map.is_empty() => String::new(), + _ => input.to_string(), + }, + }, + ); + } + AnthropicContentBlock::ToolResult { .. } => {} + }, + AnthropicStreamEvent::ContentBlockDelta { index, delta } => match delta { + ContentDelta::TextDelta { text } => match self.blocks.get_mut(index) { + Some(StreamOutputBlock::Text(existing_text)) => existing_text.push_str(text), + _ => { + self.blocks + .insert(*index, StreamOutputBlock::Text(text.clone())); + } + }, + ContentDelta::InputJsonDelta { partial_json } => match self.blocks.get_mut(index) { + Some(StreamOutputBlock::ToolUse { arguments, .. }) => { + arguments.push_str(partial_json); + } + _ => { + self.blocks.insert( + *index, + StreamOutputBlock::ToolUse { + id: None, + name: String::new(), + arguments: partial_json.clone(), + }, + ); + } + }, + }, + AnthropicStreamEvent::MessageDelta { delta, .. } => { + self.finish_reason = delta.stop_reason.clone(); + } + AnthropicStreamEvent::ContentBlockStop { .. } + | AnthropicStreamEvent::MessageStop + | AnthropicStreamEvent::Ping + | AnthropicStreamEvent::Error { .. } => {} + } + } + + pub(in crate::proxy::handlers::messages) fn output_message_span_properties( + &self, + ) -> Vec<(String, String)> { + output_message_span_properties(&self.output_message_views()) + } + + fn output_message_views(&self) -> Vec { + if self.role.is_none() && self.blocks.is_empty() && self.finish_reason.is_none() { + return Vec::new(); + } + + let mut content_parts = Vec::new(); + let mut tool_calls = Vec::new(); + + for block in self.blocks.values() { + match block { + StreamOutputBlock::Text(text) if !text.is_empty() => { + content_parts.push(ContentPartView::Text(text.clone())); + } + StreamOutputBlock::ImageUrl { url } => { + content_parts.push(ContentPartView::ImageUrl { url: url.clone() }); + } + StreamOutputBlock::ToolUse { + id, + name, + arguments, + } => { + tool_calls.push(ToolCallView { + id: id.clone(), + name: name.clone(), + arguments: arguments.clone(), + }); + } + StreamOutputBlock::Text(_) => {} + } + } + + vec![OutputMessageView { + message: MessageView { + role: self.role.clone().unwrap_or_else(|| "assistant".into()), + content: message_content_view_from_content_parts(content_parts), + name: None, + tool_calls, + tool_call_id: None, + }, + finish_reason: self.finish_reason.clone(), + }] + } +} diff --git a/src/proxy/handlers/messages/span_attributes/telemetry.rs b/src/proxy/handlers/messages/span_attributes/telemetry.rs new file mode 100644 index 0000000..9700877 --- /dev/null +++ b/src/proxy/handlers/messages/span_attributes/telemetry.rs @@ -0,0 +1,349 @@ +use reqwest::Url; +use serde_json::{Map, Value}; + +use super::message_attributes::{ + append_openinference_message_properties, append_openinference_output_message_properties, + append_openinference_tool_properties, gen_ai_input_messages_json, gen_ai_output_messages_json, + gen_ai_tool_definitions_json, request_input_message_views, response_output_message_views, +}; +use crate::{ + gateway::{ + traits::ProviderCapabilities, + types::{ + anthropic::{ + AnthropicMessagesRequest, AnthropicMessagesResponse, AnthropicStreamEvent, + AnthropicToolChoice, AnthropicUsage, DeltaUsage, MessageStartUsage, + }, + common::Usage, + }, + }, + proxy::utils::trace::span_attributes::{ + append_finish_reason_properties, append_usage_properties, collect_finish_reasons, + }, +}; + +pub(in crate::proxy::handlers::messages) fn request_span_properties( + request: &AnthropicMessagesRequest, + provider: &dyn ProviderCapabilities, + base_url: Option<&Url>, +) -> Vec<(String, String)> { + let provider_semantics = provider.semantic_conventions(); + let input_messages = request_input_message_views(request); + let mut properties = vec![ + ("gen_ai.operation.name".into(), "chat".into()), + ("openinference.span.kind".into(), "LLM".into()), + ( + "gen_ai.provider.name".into(), + provider_semantics.gen_ai_provider_name.to_string(), + ), + ( + "llm.system".into(), + provider_semantics.llm_system.to_string(), + ), + ("gen_ai.request.model".into(), request.model.clone()), + ( + "gen_ai.request.max_tokens".into(), + request.max_tokens.to_string(), + ), + ]; + + if let Some(llm_provider) = provider_semantics.llm_provider { + properties.push(("llm.provider".into(), llm_provider.to_string())); + } + + if let Some(value) = request.temperature { + properties.push(("gen_ai.request.temperature".into(), value.to_string())); + } + + if let Some(value) = request.top_p { + properties.push(("gen_ai.request.top_p".into(), value.to_string())); + } + + if let Some(value) = request.top_k { + properties.push(("gen_ai.request.top_k".into(), value.to_string())); + } + + if let Some(value) = stop_sequences_json(request.stop_sequences.as_deref()) { + properties.push(("gen_ai.request.stop_sequences".into(), value)); + } + + if let Some(value) = request_invocation_parameters(request) { + properties.push(("llm.invocation_parameters".into(), value)); + } + + if let Some(user_id) = request + .metadata + .as_ref() + .and_then(|metadata| metadata.user_id.as_ref()) + .filter(|user_id| !user_id.is_empty()) + { + properties.push(("user.id".into(), user_id.clone())); + } + + append_openinference_message_properties(&mut properties, "llm.input_messages", &input_messages); + + if let Some(value) = gen_ai_input_messages_json(&input_messages) { + properties.push(("gen_ai.input.messages".into(), value)); + } + + if let Some(tools) = request.tools.as_deref() { + append_openinference_tool_properties(&mut properties, tools); + + if let Some(value) = gen_ai_tool_definitions_json(tools) { + properties.push(("gen_ai.tool.definitions".into(), value)); + } + } + + if let Some(base_url) = base_url { + if let Some(address) = base_url.host_str() { + properties.push(("server.address".into(), address.to_string())); + } + if let Some(port) = base_url.port_or_known_default() { + properties.push(("server.port".into(), port.to_string())); + } + } + + properties +} + +pub(in crate::proxy::handlers::messages) fn response_span_properties( + response: &AnthropicMessagesResponse, + usage: &Usage, +) -> Vec<(String, String)> { + let output_messages = response_output_message_views(response); + let mut properties = vec![ + ("gen_ai.response.id".into(), response.id.clone()), + ("gen_ai.response.model".into(), response.model.clone()), + ("llm.model_name".into(), response.model.clone()), + ]; + + append_finish_reason_properties( + &mut properties, + collect_finish_reasons(std::iter::once(response.stop_reason.clone())), + ); + append_response_usage_properties(&mut properties, usage, &response.usage); + append_openinference_output_message_properties( + &mut properties, + "llm.output_messages", + &output_messages, + ); + + if let Some(value) = gen_ai_output_messages_json(&output_messages) { + properties.push(("gen_ai.output.messages".into(), value)); + } + + properties +} + +pub(in crate::proxy::handlers::messages) fn chunk_span_properties( + event: &AnthropicStreamEvent, +) -> Vec<(String, String)> { + let mut properties = Vec::new(); + + match event { + AnthropicStreamEvent::MessageStart { message } => { + properties.push(("gen_ai.response.id".into(), message.id.clone())); + properties.push(("gen_ai.response.model".into(), message.model.clone())); + properties.push(("llm.model_name".into(), message.model.clone())); + append_message_start_usage_properties(&mut properties, &message.usage); + } + AnthropicStreamEvent::MessageDelta { delta, usage } => { + append_finish_reason_properties( + &mut properties, + collect_finish_reasons(std::iter::once(delta.stop_reason.clone())), + ); + append_delta_usage_properties(&mut properties, usage); + } + AnthropicStreamEvent::ContentBlockStart { .. } + | AnthropicStreamEvent::ContentBlockDelta { .. } + | AnthropicStreamEvent::ContentBlockStop { .. } + | AnthropicStreamEvent::MessageStop + | AnthropicStreamEvent::Ping + | AnthropicStreamEvent::Error { .. } => {} + } + + properties +} + +fn stop_sequences_json(stop_sequences: Option<&[String]>) -> Option { + let stop_sequences = stop_sequences?; + + serde_json::to_string(stop_sequences).ok() +} + +fn request_invocation_parameters(request: &AnthropicMessagesRequest) -> Option { + let mut params = Map::new(); + params.insert("max_tokens".into(), Value::from(request.max_tokens)); + + if let Some(value) = request.temperature { + params.insert("temperature".into(), Value::from(value)); + } + if let Some(value) = request.top_p { + params.insert("top_p".into(), Value::from(value)); + } + if let Some(value) = request.top_k { + params.insert("top_k".into(), Value::from(value)); + } + if let Some(value) = request.stream { + params.insert("stream".into(), Value::from(value)); + } + if let Some(stop_sequences) = request.stop_sequences.as_ref() { + params.insert( + "stop_sequences".into(), + Value::Array(stop_sequences.iter().cloned().map(Value::String).collect()), + ); + } + if let Some(value) = request.cache_control.as_ref() + && let Ok(value) = serde_json::to_value(value) + { + params.insert("cache_control".into(), value); + } + if let Some(value) = request.tool_choice.as_ref() + && let Some(value) = anthropic_tool_choice_to_value(value) + { + params.insert("tool_choice".into(), value); + } + + serde_json::to_string(&Value::Object(params)).ok() +} + +fn anthropic_tool_choice_to_value(tool_choice: &AnthropicToolChoice) -> Option { + serde_json::to_value(tool_choice).ok() +} + +fn append_response_usage_properties( + properties: &mut Vec<(String, String)>, + usage: &Usage, + raw_usage: &AnthropicUsage, +) { + append_usage_properties(properties, usage); + + let raw_input_tokens = raw_usage.input_tokens + + raw_usage.cache_creation_input_tokens + + raw_usage.cache_read_input_tokens; + + if usage.input_tokens.is_none() { + let input_tokens = raw_input_tokens.to_string(); + properties.push(("gen_ai.usage.input_tokens".into(), input_tokens.clone())); + properties.push(("llm.token_count.prompt".into(), input_tokens)); + } + + if usage.output_tokens.is_none() { + let output_tokens = raw_usage.output_tokens.to_string(); + properties.push(("gen_ai.usage.output_tokens".into(), output_tokens.clone())); + properties.push(("llm.token_count.completion".into(), output_tokens)); + } + + if usage.resolved_total_tokens().is_none() { + properties.push(( + "llm.token_count.total".into(), + (raw_input_tokens + raw_usage.output_tokens).to_string(), + )); + } + + if usage.cache_creation_input_tokens.is_none() && raw_usage.cache_creation_input_tokens > 0 { + let cache_creation = raw_usage.cache_creation_input_tokens.to_string(); + properties.push(( + "gen_ai.usage.cache_creation.input_tokens".into(), + cache_creation.clone(), + )); + properties.push(( + "llm.token_count.prompt_details.cache_write".into(), + cache_creation, + )); + } + + if usage.cache_read_input_tokens.is_none() && raw_usage.cache_read_input_tokens > 0 { + let cache_read = raw_usage.cache_read_input_tokens.to_string(); + properties.push(( + "gen_ai.usage.cache_read.input_tokens".into(), + cache_read.clone(), + )); + properties.push(( + "llm.token_count.prompt_details.cache_read".into(), + cache_read, + )); + } +} + +fn append_message_start_usage_properties( + properties: &mut Vec<(String, String)>, + usage: &MessageStartUsage, +) { + append_message_usage_values( + properties, + usage.input_tokens, + usage.output_tokens, + usage.cache_creation_input_tokens, + usage.cache_read_input_tokens, + ); +} + +fn append_delta_usage_properties(properties: &mut Vec<(String, String)>, usage: &DeltaUsage) { + append_message_usage_values( + properties, + usage.input_tokens, + usage.output_tokens, + usage.cache_creation_input_tokens, + usage.cache_read_input_tokens, + ); +} + +fn append_message_usage_values( + properties: &mut Vec<(String, String)>, + input_tokens: Option, + output_tokens: Option, + cache_creation_input_tokens: Option, + cache_read_input_tokens: Option, +) { + let input_tokens = input_tokens.map(|input_tokens| { + input_tokens + + cache_creation_input_tokens.unwrap_or(0) + + cache_read_input_tokens.unwrap_or(0) + }); + + if let Some(input_tokens) = input_tokens { + let input_tokens = input_tokens.to_string(); + properties.push(("gen_ai.usage.input_tokens".into(), input_tokens.clone())); + properties.push(("llm.token_count.prompt".into(), input_tokens)); + } + + if let Some(output_tokens) = output_tokens { + let output_tokens = output_tokens.to_string(); + properties.push(("gen_ai.usage.output_tokens".into(), output_tokens.clone())); + properties.push(("llm.token_count.completion".into(), output_tokens)); + } + + if let Some(input_tokens) = input_tokens + && let Some(output_tokens) = output_tokens + { + properties.push(( + "llm.token_count.total".into(), + (input_tokens + output_tokens).to_string(), + )); + } + + if let Some(cache_creation_input_tokens) = cache_creation_input_tokens { + let cache_creation = cache_creation_input_tokens.to_string(); + properties.push(( + "gen_ai.usage.cache_creation.input_tokens".into(), + cache_creation.clone(), + )); + properties.push(( + "llm.token_count.prompt_details.cache_write".into(), + cache_creation, + )); + } + + if let Some(cache_read_input_tokens) = cache_read_input_tokens { + let cache_read = cache_read_input_tokens.to_string(); + properties.push(( + "gen_ai.usage.cache_read.input_tokens".into(), + cache_read.clone(), + )); + properties.push(( + "llm.token_count.prompt_details.cache_read".into(), + cache_read, + )); + } +} diff --git a/src/proxy/handlers/messages/span_attributes/tests.rs b/src/proxy/handlers/messages/span_attributes/tests.rs new file mode 100644 index 0000000..ce9e787 --- /dev/null +++ b/src/proxy/handlers/messages/span_attributes/tests.rs @@ -0,0 +1,287 @@ +use pretty_assertions::assert_eq; +use serde_json::{Value, json}; + +use super::{ + StreamOutputCollector, chunk_span_properties, request_span_properties, response_span_properties, +}; +use crate::gateway::{ + providers::AnthropicDef, + types::{ + anthropic::{ + AnthropicContent, AnthropicContentBlock, AnthropicMessage, AnthropicMessagesRequest, + AnthropicMessagesResponse, AnthropicMetadata, AnthropicStreamEvent, AnthropicTool, + AnthropicToolChoice, AnthropicUsage, ContentDelta, DeltaUsage, MessageDelta, + MessageStartPayload, MessageStartUsage, SystemPrompt, + }, + common::Usage, + }, +}; + +fn property_value<'a>(properties: &'a [(String, String)], key: &str) -> Option<&'a str> { + properties + .iter() + .find(|(property_key, _)| property_key == key) + .map(|(_, value)| value.as_str()) +} + +#[test] +fn request_span_properties_include_system_tool_and_user_attributes() { + let request = AnthropicMessagesRequest { + model: "claude-3-5-sonnet-20241022".into(), + messages: vec![AnthropicMessage { + role: "user".into(), + content: AnthropicContent::Blocks(vec![ + AnthropicContentBlock::Text { + text: "What's the weather?".into(), + cache_control: None, + }, + AnthropicContentBlock::ToolResult { + tool_use_id: "tool_1".into(), + content: Some(AnthropicContent::Text("72F and sunny".into())), + is_error: None, + cache_control: None, + }, + ]), + }], + max_tokens: 1024, + cache_control: None, + system: Some(SystemPrompt::Text("You are helpful.".into())), + temperature: Some(0.2), + top_p: Some(0.9), + top_k: Some(5), + stop_sequences: Some(vec!["DONE".into()]), + stream: Some(true), + metadata: Some(AnthropicMetadata { + user_id: Some("user-123".into()), + }), + tools: Some(vec![AnthropicTool { + name: "get_weather".into(), + description: Some("Get current weather".into()), + input_schema: json!({ + "type": "object", + "properties": {"city": {"type": "string"}} + }), + }]), + tool_choice: Some(AnthropicToolChoice::Auto), + }; + let provider = AnthropicDef; + + let properties = request_span_properties(&request, &provider, None); + + assert_eq!(property_value(&properties, "user.id"), Some("user-123")); + assert_eq!( + property_value(&properties, "gen_ai.request.top_k"), + Some("5") + ); + assert_eq!( + property_value(&properties, "llm.input_messages.0.message.role"), + Some("system") + ); + assert_eq!( + property_value(&properties, "llm.input_messages.1.message.role"), + Some("user") + ); + assert_eq!( + property_value(&properties, "llm.input_messages.2.message.role"), + Some("tool") + ); + assert_eq!( + property_value(&properties, "llm.input_messages.2.message.tool_call_id"), + Some("tool_1") + ); + assert_eq!( + property_value(&properties, "llm.tools.0.tool.name"), + Some("get_weather") + ); + + let input_messages: Value = + serde_json::from_str(property_value(&properties, "gen_ai.input.messages").unwrap()) + .unwrap(); + assert_eq!(input_messages[0]["role"], "system"); + assert_eq!(input_messages[2]["role"], "tool"); + assert_eq!(input_messages[2]["parts"][0]["type"], "tool_call_response"); + + let tool_definitions: Value = + serde_json::from_str(property_value(&properties, "gen_ai.tool.definitions").unwrap()) + .unwrap(); + assert_eq!(tool_definitions[0]["name"], "get_weather"); +} + +#[test] +fn response_span_properties_include_output_messages_and_usage() { + let response = AnthropicMessagesResponse { + id: "msg_123".into(), + r#type: "message".into(), + role: "assistant".into(), + content: vec![ + AnthropicContentBlock::Text { + text: "Let me check.".into(), + cache_control: None, + }, + AnthropicContentBlock::ToolUse { + id: "tool_1".into(), + name: "get_weather".into(), + input: json!({"city": "SF"}), + cache_control: None, + }, + ], + model: "claude-3-5-sonnet-20241022".into(), + stop_reason: Some("tool_use".into()), + stop_sequence: None, + usage: AnthropicUsage { + input_tokens: 10, + output_tokens: 20, + cache_creation_input_tokens: 3, + cache_read_input_tokens: 2, + cache_creation: None, + }, + }; + + let properties = response_span_properties(&response, &Usage::default()); + + assert_eq!( + property_value(&properties, "llm.output_messages.0.message.role"), + Some("assistant") + ); + assert_eq!( + property_value( + &properties, + "llm.output_messages.0.message.tool_calls.0.tool_call.function.name", + ), + Some("get_weather") + ); + assert_eq!( + property_value(&properties, "gen_ai.usage.input_tokens"), + Some("15") + ); + assert_eq!( + property_value(&properties, "llm.token_count.total"), + Some("35") + ); + + let output_messages: Value = + serde_json::from_str(property_value(&properties, "gen_ai.output.messages").unwrap()) + .unwrap(); + assert_eq!(output_messages[0]["finish_reason"], "tool_use"); + assert_eq!(output_messages[0]["parts"][1]["type"], "tool_call"); +} + +#[test] +fn chunk_span_properties_include_message_start_and_stop_reason() { + let message_start = AnthropicStreamEvent::MessageStart { + message: MessageStartPayload { + id: "msg_123".into(), + r#type: "message".into(), + role: "assistant".into(), + model: "claude-3-5-sonnet-20241022".into(), + usage: MessageStartUsage { + input_tokens: Some(7), + output_tokens: Some(1), + cache_creation_input_tokens: Some(3), + cache_read_input_tokens: Some(2), + cache_creation: None, + }, + }, + }; + let message_delta = AnthropicStreamEvent::MessageDelta { + delta: MessageDelta { + stop_reason: Some("end_turn".into()), + stop_sequence: None, + }, + usage: DeltaUsage { + output_tokens: Some(11), + input_tokens: Some(7), + cache_creation_input_tokens: None, + cache_read_input_tokens: None, + }, + }; + + let start_properties = chunk_span_properties(&message_start); + let delta_properties = chunk_span_properties(&message_delta); + + assert_eq!( + property_value(&start_properties, "gen_ai.response.id"), + Some("msg_123") + ); + assert_eq!( + property_value(&start_properties, "gen_ai.usage.input_tokens"), + Some("12") + ); + assert_eq!( + property_value(&delta_properties, "llm.finish_reason"), + Some("end_turn") + ); + assert_eq!( + property_value(&delta_properties, "gen_ai.usage.output_tokens"), + Some("11") + ); +} + +#[test] +fn stream_output_collector_accumulates_events_into_output_messages() { + let mut collector = StreamOutputCollector::default(); + + collector.record_event(&AnthropicStreamEvent::MessageStart { + message: MessageStartPayload { + id: "msg_123".into(), + r#type: "message".into(), + role: "assistant".into(), + model: "claude-3-5-sonnet-20241022".into(), + usage: MessageStartUsage::default(), + }, + }); + collector.record_event(&AnthropicStreamEvent::ContentBlockStart { + index: 0, + content_block: AnthropicContentBlock::Text { + text: String::new(), + cache_control: None, + }, + }); + collector.record_event(&AnthropicStreamEvent::ContentBlockDelta { + index: 0, + delta: ContentDelta::TextDelta { + text: "Hello".into(), + }, + }); + collector.record_event(&AnthropicStreamEvent::ContentBlockStart { + index: 1, + content_block: AnthropicContentBlock::ToolUse { + id: "tool_1".into(), + name: "get_weather".into(), + input: json!({}), + cache_control: None, + }, + }); + collector.record_event(&AnthropicStreamEvent::ContentBlockDelta { + index: 1, + delta: ContentDelta::InputJsonDelta { + partial_json: r#"{"city":"SF"}"#.into(), + }, + }); + collector.record_event(&AnthropicStreamEvent::MessageDelta { + delta: MessageDelta { + stop_reason: Some("tool_use".into()), + stop_sequence: None, + }, + usage: DeltaUsage::default(), + }); + + let properties = collector.output_message_span_properties(); + + assert_eq!( + property_value(&properties, "llm.output_messages.0.message.content"), + Some("Hello") + ); + assert_eq!( + property_value( + &properties, + "llm.output_messages.0.message.tool_calls.0.tool_call.function.arguments", + ), + Some(r#"{"city":"SF"}"#), + ); + + let output_messages: Value = + serde_json::from_str(property_value(&properties, "gen_ai.output.messages").unwrap()) + .unwrap(); + assert_eq!(output_messages[0]["parts"][1]["arguments"]["city"], "SF"); +} diff --git a/src/proxy/mod.rs b/src/proxy/mod.rs index af2a882..4ca128e 100644 --- a/src/proxy/mod.rs +++ b/src/proxy/mod.rs @@ -2,6 +2,7 @@ mod handlers; mod hooks; mod middlewares; mod provider; +mod utils; use std::sync::Arc; diff --git a/src/proxy/utils/mod.rs b/src/proxy/utils/mod.rs new file mode 100644 index 0000000..11ec916 --- /dev/null +++ b/src/proxy/utils/mod.rs @@ -0,0 +1 @@ +pub mod trace; diff --git a/src/proxy/utils/trace/mod.rs b/src/proxy/utils/trace/mod.rs new file mode 100644 index 0000000..809bb10 --- /dev/null +++ b/src/proxy/utils/trace/mod.rs @@ -0,0 +1,2 @@ +pub(crate) mod span_attributes; +pub(crate) mod span_message_attributes; diff --git a/src/proxy/utils/trace/span_attributes.rs b/src/proxy/utils/trace/span_attributes.rs new file mode 100644 index 0000000..4a73e59 --- /dev/null +++ b/src/proxy/utils/trace/span_attributes.rs @@ -0,0 +1,106 @@ +use fastrace::prelude::Span; + +use crate::gateway::types::common::Usage; + +pub(crate) fn collect_finish_reasons(finish_reasons: I) -> Vec +where + I: IntoIterator>, +{ + let mut values = Vec::new(); + + for finish_reason in finish_reasons.into_iter().flatten() { + if !values.iter().any(|value| value == &finish_reason) { + values.push(finish_reason); + } + } + + values +} + +pub(crate) fn append_finish_reason_properties( + properties: &mut Vec<(String, String)>, + finish_reasons: Vec, +) { + if finish_reasons.is_empty() { + return; + } + + properties.push(( + "gen_ai.response.finish_reasons".into(), + serde_json::to_string(&finish_reasons).unwrap_or_default(), + )); + + if let Some(finish_reason) = finish_reasons.first() { + properties.push(("llm.finish_reason".into(), finish_reason.clone())); + } +} + +pub(crate) fn append_usage_properties(properties: &mut Vec<(String, String)>, usage: &Usage) { + if let Some(input_tokens) = usage.input_tokens { + let input_tokens = input_tokens.to_string(); + properties.push(("gen_ai.usage.input_tokens".into(), input_tokens.clone())); + properties.push(("llm.token_count.prompt".into(), input_tokens)); + } + + if let Some(output_tokens) = usage.output_tokens { + let output_tokens = output_tokens.to_string(); + properties.push(("gen_ai.usage.output_tokens".into(), output_tokens.clone())); + properties.push(("llm.token_count.completion".into(), output_tokens)); + } + + if let Some(total_tokens) = usage.resolved_total_tokens() { + properties.push(("llm.token_count.total".into(), total_tokens.to_string())); + } + + if let Some(cache_creation_input_tokens) = usage.cache_creation_input_tokens { + let cache_creation_input_tokens = cache_creation_input_tokens.to_string(); + properties.push(( + "gen_ai.usage.cache_creation.input_tokens".into(), + cache_creation_input_tokens.clone(), + )); + properties.push(( + "llm.token_count.prompt_details.cache_write".into(), + cache_creation_input_tokens, + )); + } + + if let Some(cache_read_input_tokens) = usage.cache_read_input_tokens { + let cache_read_input_tokens = cache_read_input_tokens.to_string(); + properties.push(( + "gen_ai.usage.cache_read.input_tokens".into(), + cache_read_input_tokens.clone(), + )); + properties.push(( + "llm.token_count.prompt_details.cache_read".into(), + cache_read_input_tokens, + )); + } + + if let Some(input_audio_tokens) = usage.input_audio_tokens { + properties.push(( + "llm.token_count.prompt_details.audio".into(), + input_audio_tokens.to_string(), + )); + } + + if let Some(output_audio_tokens) = usage.output_audio_tokens { + properties.push(( + "llm.token_count.completion_details.audio".into(), + output_audio_tokens.to_string(), + )); + } +} + +pub(crate) fn usage_span_properties(usage: &Usage) -> Vec<(String, String)> { + let mut properties = Vec::new(); + append_usage_properties(&mut properties, usage); + properties +} + +pub(crate) fn apply_span_properties(span: &Span, properties: Vec<(String, String)>) { + if properties.is_empty() { + return; + } + + span.add_properties(move || properties); +} diff --git a/src/proxy/utils/trace/span_message_attributes.rs b/src/proxy/utils/trace/span_message_attributes.rs new file mode 100644 index 0000000..c9bda70 --- /dev/null +++ b/src/proxy/utils/trace/span_message_attributes.rs @@ -0,0 +1,284 @@ +use serde::Serialize; +use serde_json::{Map, Value}; + +#[derive(Clone)] +pub(crate) enum MessageContentView { + Text(String), + Parts(Vec), +} + +#[derive(Clone)] +pub(crate) enum ContentPartView { + Text(String), + ImageUrl { url: String }, +} + +#[derive(Clone)] +pub(crate) struct ToolCallView { + pub(crate) id: Option, + pub(crate) name: String, + pub(crate) arguments: String, +} + +#[derive(Clone)] +pub(crate) struct MessageView { + pub(crate) role: String, + pub(crate) content: Option, + pub(crate) name: Option, + pub(crate) tool_calls: Vec, + pub(crate) tool_call_id: Option, +} + +#[derive(Clone)] +pub(crate) struct OutputMessageView { + pub(crate) message: MessageView, + pub(crate) finish_reason: Option, +} + +pub(crate) fn gen_ai_input_messages_json(messages: &[MessageView]) -> Option { + if messages.is_empty() { + return None; + } + + let values: Vec = messages + .iter() + .map(|message| { + let mut value = Map::new(); + value.insert("role".into(), Value::String(message.role.clone())); + value.insert("parts".into(), Value::Array(gen_ai_message_parts(message))); + + if let Some(name) = &message.name { + value.insert("name".into(), Value::String(name.clone())); + } + + Value::Object(value) + }) + .collect(); + + serialize_to_json_string(&values) +} + +pub(crate) fn gen_ai_output_messages_json(messages: &[OutputMessageView]) -> Option { + if messages.is_empty() { + return None; + } + + let values: Vec = messages + .iter() + .map(|message| { + let mut value = Map::new(); + value.insert("role".into(), Value::String(message.message.role.clone())); + value.insert( + "parts".into(), + Value::Array(gen_ai_message_parts(&message.message)), + ); + value.insert( + "finish_reason".into(), + Value::String( + message + .finish_reason + .clone() + .unwrap_or_else(|| "unknown".into()), + ), + ); + + if let Some(name) = &message.message.name { + value.insert("name".into(), Value::String(name.clone())); + } + + Value::Object(value) + }) + .collect(); + + serialize_to_json_string(&values) +} + +pub(crate) fn append_openinference_message_properties( + properties: &mut Vec<(String, String)>, + prefix: &str, + messages: &[MessageView], +) { + for (message_index, message) in messages.iter().enumerate() { + let prefix = format!("{prefix}.{message_index}.message"); + properties.push((format!("{prefix}.role"), message.role.clone())); + + if let Some(name) = &message.name { + properties.push((format!("{prefix}.name"), name.clone())); + } + + if let Some(tool_call_id) = &message.tool_call_id { + properties.push((format!("{prefix}.tool_call_id"), tool_call_id.clone())); + } + + match &message.content { + Some(MessageContentView::Text(text)) if !text.is_empty() => { + properties.push((format!("{prefix}.content"), text.clone())); + } + Some(MessageContentView::Parts(parts)) => { + for (content_index, part) in parts.iter().enumerate() { + let part_prefix = format!("{prefix}.contents.{content_index}.message_content"); + match part { + ContentPartView::Text(text) => { + properties.push((format!("{part_prefix}.type"), "text".into())); + properties.push((format!("{part_prefix}.text"), text.clone())); + } + ContentPartView::ImageUrl { url } => { + properties.push((format!("{part_prefix}.type"), "image".into())); + properties + .push((format!("{part_prefix}.image.image.url"), url.clone())); + } + } + } + } + _ => {} + } + + for (tool_call_index, tool_call) in message.tool_calls.iter().enumerate() { + let tool_call_prefix = format!("{prefix}.tool_calls.{tool_call_index}.tool_call"); + + if let Some(id) = &tool_call.id { + properties.push((format!("{tool_call_prefix}.id"), id.clone())); + } + + properties.push(( + format!("{tool_call_prefix}.function.name"), + tool_call.name.clone(), + )); + + if !tool_call.arguments.is_empty() { + properties.push(( + format!("{tool_call_prefix}.function.arguments"), + tool_call.arguments.clone(), + )); + } + } + } +} + +pub(crate) fn append_openinference_output_message_properties( + properties: &mut Vec<(String, String)>, + prefix: &str, + messages: &[OutputMessageView], +) { + let message_views: Vec<_> = messages + .iter() + .map(|message| message.message.clone()) + .collect(); + append_openinference_message_properties(properties, prefix, &message_views); +} + +pub(crate) fn output_message_span_properties( + output_messages: &[OutputMessageView], +) -> Vec<(String, String)> { + if output_messages.is_empty() { + return Vec::new(); + } + + let mut properties = Vec::new(); + + append_openinference_output_message_properties( + &mut properties, + "llm.output_messages", + output_messages, + ); + + if let Some(value) = gen_ai_output_messages_json(output_messages) { + properties.push(("gen_ai.output.messages".into(), value)); + } + + properties +} + +pub(crate) fn message_content_view_from_content_parts( + parts: Vec, +) -> Option { + match parts.as_slice() { + [] => None, + [ContentPartView::Text(text)] => Some(MessageContentView::Text(text.clone())), + _ => Some(MessageContentView::Parts(parts)), + } +} + +pub(crate) fn serialize_to_json_string(value: &impl Serialize) -> Option { + serde_json::to_string(value).ok() +} + +fn parse_json_or_string(raw: &str) -> Value { + serde_json::from_str(raw).unwrap_or_else(|_| Value::String(raw.to_string())) +} + +fn gen_ai_message_parts(message: &MessageView) -> Vec { + if message.role == "tool" { + let mut part = Map::new(); + part.insert("type".into(), Value::String("tool_call_response".into())); + + if let Some(tool_call_id) = &message.tool_call_id { + part.insert("id".into(), Value::String(tool_call_id.clone())); + } + + let response = match &message.content { + Some(MessageContentView::Text(text)) => parse_json_or_string(text), + Some(MessageContentView::Parts(parts)) => { + Value::Array(parts.iter().map(gen_ai_content_part_value).collect()) + } + None => Value::Null, + }; + part.insert("response".into(), response); + + return vec![Value::Object(part)]; + } + + let mut parts = Vec::new(); + + match &message.content { + Some(MessageContentView::Text(text)) if !text.is_empty() => { + parts.push(gen_ai_text_part(text)); + } + Some(MessageContentView::Parts(content_parts)) => { + parts.extend(content_parts.iter().map(gen_ai_content_part_value)); + } + _ => {} + } + + for tool_call in &message.tool_calls { + let mut part = Map::new(); + part.insert("type".into(), Value::String("tool_call".into())); + + if let Some(id) = &tool_call.id { + part.insert("id".into(), Value::String(id.clone())); + } + + part.insert("name".into(), Value::String(tool_call.name.clone())); + + if !tool_call.arguments.is_empty() { + part.insert( + "arguments".into(), + parse_json_or_string(&tool_call.arguments), + ); + } + + parts.push(Value::Object(part)); + } + + parts +} + +fn gen_ai_text_part(text: &str) -> Value { + let mut part = Map::new(); + part.insert("type".into(), Value::String("text".into())); + part.insert("content".into(), Value::String(text.to_string())); + Value::Object(part) +} + +fn gen_ai_content_part_value(part: &ContentPartView) -> Value { + match part { + ContentPartView::Text(text) => gen_ai_text_part(text), + ContentPartView::ImageUrl { url } => { + let mut part = Map::new(); + part.insert("type".into(), Value::String("uri".into())); + part.insert("modality".into(), Value::String("image".into())); + part.insert("uri".into(), Value::String(url.clone())); + Value::Object(part) + } + } +}