diff --git a/sentry_sdk/ai/utils.py b/sentry_sdk/ai/utils.py index 71f7544a1c..a4ebe96d99 100644 --- a/sentry_sdk/ai/utils.py +++ b/sentry_sdk/ai/utils.py @@ -72,6 +72,397 @@ def parse_data_uri(url: str) -> "Tuple[str, str]": return mime_type, content +def get_modality_from_mime_type(mime_type: str) -> str: + """ + Infer the content modality from a MIME type string. + + Args: + mime_type: A MIME type string (e.g., "image/jpeg", "audio/mp3") + + Returns: + One of: "image", "audio", "video", or "document" + Defaults to "image" for unknown or empty MIME types. + + Examples: + "image/jpeg" -> "image" + "audio/mp3" -> "audio" + "video/mp4" -> "video" + "application/pdf" -> "document" + "text/plain" -> "document" + """ + if not mime_type: + return "image" # Default fallback + + mime_lower = mime_type.lower() + if mime_lower.startswith("image/"): + return "image" + elif mime_lower.startswith("audio/"): + return "audio" + elif mime_lower.startswith("video/"): + return "video" + elif mime_lower.startswith("application/") or mime_lower.startswith("text/"): + return "document" + else: + return "image" # Default fallback for unknown types + + +def transform_openai_content_part( + content_part: "Dict[str, Any]", +) -> "Optional[Dict[str, Any]]": + """ + Transform an OpenAI/LiteLLM content part to Sentry's standardized format. + + This handles the OpenAI image_url format used by OpenAI and LiteLLM SDKs. + + Input format: + - {"type": "image_url", "image_url": {"url": "..."}} + - {"type": "image_url", "image_url": "..."} (string shorthand) + + Output format (one of): + - {"type": "blob", "modality": "image", "mime_type": "...", "content": "..."} + - {"type": "uri", "modality": "image", "mime_type": "", "uri": "..."} + + Args: + content_part: A dictionary representing a content part from OpenAI/LiteLLM + + Returns: + A transformed dictionary in standardized format, or None if the format + is not OpenAI image_url format or transformation fails. + """ + if not isinstance(content_part, dict): + return None + + block_type = content_part.get("type") + + if block_type != "image_url": + return None + + image_url_data = content_part.get("image_url") + if isinstance(image_url_data, str): + url = image_url_data + elif isinstance(image_url_data, dict): + url = image_url_data.get("url", "") + else: + return None + + if not url: + return None + + # Check if it's a data URI (base64 encoded) + if url.startswith("data:"): + try: + mime_type, content = parse_data_uri(url) + return { + "type": "blob", + "modality": get_modality_from_mime_type(mime_type), + "mime_type": mime_type, + "content": content, + } + except ValueError: + # If parsing fails, return as URI + return { + "type": "uri", + "modality": "image", + "mime_type": "", + "uri": url, + } + else: + # Regular URL + return { + "type": "uri", + "modality": "image", + "mime_type": "", + "uri": url, + } + + +def transform_anthropic_content_part( + content_part: "Dict[str, Any]", +) -> "Optional[Dict[str, Any]]": + """ + Transform an Anthropic content part to Sentry's standardized format. + + This handles the Anthropic image and document formats with source dictionaries. + + Input format: + - {"type": "image", "source": {"type": "base64", "media_type": "...", "data": "..."}} + - {"type": "image", "source": {"type": "url", "media_type": "...", "url": "..."}} + - {"type": "image", "source": {"type": "file", "media_type": "...", "file_id": "..."}} + - {"type": "document", "source": {...}} (same source formats) + + Output format (one of): + - {"type": "blob", "modality": "...", "mime_type": "...", "content": "..."} + - {"type": "uri", "modality": "...", "mime_type": "...", "uri": "..."} + - {"type": "file", "modality": "...", "mime_type": "...", "file_id": "..."} + + Args: + content_part: A dictionary representing a content part from Anthropic + + Returns: + A transformed dictionary in standardized format, or None if the format + is not Anthropic format or transformation fails. + """ + if not isinstance(content_part, dict): + return None + + block_type = content_part.get("type") + + if block_type not in ("image", "document") or "source" not in content_part: + return None + + source = content_part.get("source") + if not isinstance(source, dict): + return None + + source_type = source.get("type") + media_type = source.get("media_type", "") + modality = ( + "document" + if block_type == "document" + else get_modality_from_mime_type(media_type) + ) + + if source_type == "base64": + return { + "type": "blob", + "modality": modality, + "mime_type": media_type, + "content": source.get("data", ""), + } + elif source_type == "url": + return { + "type": "uri", + "modality": modality, + "mime_type": media_type, + "uri": source.get("url", ""), + } + elif source_type == "file": + return { + "type": "file", + "modality": modality, + "mime_type": media_type, + "file_id": source.get("file_id", ""), + } + + return None + + +def transform_google_content_part( + content_part: "Dict[str, Any]", +) -> "Optional[Dict[str, Any]]": + """ + Transform a Google GenAI content part to Sentry's standardized format. + + This handles the Google GenAI inline_data and file_data formats. + + Input format: + - {"inline_data": {"mime_type": "...", "data": "..."}} + - {"file_data": {"mime_type": "...", "file_uri": "..."}} + + Output format (one of): + - {"type": "blob", "modality": "...", "mime_type": "...", "content": "..."} + - {"type": "uri", "modality": "...", "mime_type": "...", "uri": "..."} + + Args: + content_part: A dictionary representing a content part from Google GenAI + + Returns: + A transformed dictionary in standardized format, or None if the format + is not Google format or transformation fails. + """ + if not isinstance(content_part, dict): + return None + + # Handle Google inline_data format + if "inline_data" in content_part: + inline_data = content_part.get("inline_data") + if isinstance(inline_data, dict): + mime_type = inline_data.get("mime_type", "") + return { + "type": "blob", + "modality": get_modality_from_mime_type(mime_type), + "mime_type": mime_type, + "content": inline_data.get("data", ""), + } + return None + + # Handle Google file_data format + if "file_data" in content_part: + file_data = content_part.get("file_data") + if isinstance(file_data, dict): + mime_type = file_data.get("mime_type", "") + return { + "type": "uri", + "modality": get_modality_from_mime_type(mime_type), + "mime_type": mime_type, + "uri": file_data.get("file_uri", ""), + } + return None + + return None + + +def transform_generic_content_part( + content_part: "Dict[str, Any]", +) -> "Optional[Dict[str, Any]]": + """ + Transform a generic/LangChain-style content part to Sentry's standardized format. + + This handles generic formats where the type indicates the modality and + the data is provided via direct base64, url, or file_id fields. + + Input format: + - {"type": "image", "base64": "...", "mime_type": "..."} + - {"type": "audio", "url": "...", "mime_type": "..."} + - {"type": "video", "base64": "...", "mime_type": "..."} + - {"type": "file", "file_id": "...", "mime_type": "..."} + + Output format (one of): + - {"type": "blob", "modality": "...", "mime_type": "...", "content": "..."} + - {"type": "uri", "modality": "...", "mime_type": "...", "uri": "..."} + - {"type": "file", "modality": "...", "mime_type": "...", "file_id": "..."} + + Args: + content_part: A dictionary representing a content part in generic format + + Returns: + A transformed dictionary in standardized format, or None if the format + is not generic format or transformation fails. + """ + if not isinstance(content_part, dict): + return None + + block_type = content_part.get("type") + + if block_type not in ("image", "audio", "video", "file"): + return None + + # Ensure it's not Anthropic format (which also uses type: "image") + if "source" in content_part: + return None + + mime_type = content_part.get("mime_type", "") + modality = block_type if block_type != "file" else "document" + + # Check for base64 encoded content + if "base64" in content_part: + return { + "type": "blob", + "modality": modality, + "mime_type": mime_type, + "content": content_part.get("base64", ""), + } + # Check for URL reference + elif "url" in content_part: + return { + "type": "uri", + "modality": modality, + "mime_type": mime_type, + "uri": content_part.get("url", ""), + } + # Check for file_id reference + elif "file_id" in content_part: + return { + "type": "file", + "modality": modality, + "mime_type": mime_type, + "file_id": content_part.get("file_id", ""), + } + + return None + + +def transform_content_part( + content_part: "Dict[str, Any]", +) -> "Optional[Dict[str, Any]]": + """ + Transform a content part from various AI SDK formats to Sentry's standardized format. + + This is a heuristic dispatcher that detects the format and delegates to the + appropriate SDK-specific transformer. For direct SDK integration, prefer using + the specific transformers directly: + - transform_openai_content_part() for OpenAI/LiteLLM + - transform_anthropic_content_part() for Anthropic + - transform_google_content_part() for Google GenAI + - transform_generic_content_part() for LangChain and other generic formats + + Detection order: + 1. OpenAI: type == "image_url" + 2. Google: "inline_data" or "file_data" keys present + 3. Anthropic: type in ("image", "document") with "source" key + 4. Generic: type in ("image", "audio", "video", "file") with base64/url/file_id + + Output format (one of): + - {"type": "blob", "modality": "...", "mime_type": "...", "content": "..."} + - {"type": "uri", "modality": "...", "mime_type": "...", "uri": "..."} + - {"type": "file", "modality": "...", "mime_type": "...", "file_id": "..."} + + Args: + content_part: A dictionary representing a content part from an AI SDK + + Returns: + A transformed dictionary in standardized format, or None if the format + is unrecognized or transformation fails. + """ + if not isinstance(content_part, dict): + return None + + # Try OpenAI format first (most common, clear indicator) + result = transform_openai_content_part(content_part) + if result is not None: + return result + + # Try Google format (unique keys make it easy to detect) + result = transform_google_content_part(content_part) + if result is not None: + return result + + # Try Anthropic format (has "source" key) + result = transform_anthropic_content_part(content_part) + if result is not None: + return result + + # Try generic format as fallback + result = transform_generic_content_part(content_part) + if result is not None: + return result + + # Unrecognized format + return None + + +def transform_message_content(content: "Any") -> "Any": + """ + Transform message content, handling both string content and list of content blocks. + + For list content, each item is transformed using transform_content_part(). + Items that cannot be transformed (return None) are kept as-is. + + Args: + content: Message content - can be a string, list of content blocks, or other + + Returns: + - String content: returned as-is + - List content: list with each transformable item converted to standardized format + - Other: returned as-is + """ + if isinstance(content, str): + return content + + if isinstance(content, (list, tuple)): + transformed = [] + for item in content: + if isinstance(item, dict): + result = transform_content_part(item) + # If transformation succeeded, use the result; otherwise keep original + transformed.append(result if result is not None else item) + else: + transformed.append(item) + return transformed + + return content + + def _normalize_data(data: "Any", unpack: bool = True) -> "Any": # convert pydantic data (e.g. OpenAI v1+) to json compatible format if hasattr(data, "model_dump"): diff --git a/sentry_sdk/integrations/google_genai/utils.py b/sentry_sdk/integrations/google_genai/utils.py index 03423c385a..22b7a9f8ab 100644 --- a/sentry_sdk/integrations/google_genai/utils.py +++ b/sentry_sdk/integrations/google_genai/utils.py @@ -2,6 +2,7 @@ import inspect from functools import wraps from .consts import ORIGIN, TOOL_ATTRIBUTES_MAP, GEN_AI_SYSTEM +from sentry_sdk._types import BLOB_DATA_SUBSTITUTE from typing import ( cast, TYPE_CHECKING, @@ -12,6 +13,7 @@ Optional, Union, TypedDict, + Dict, ) import sentry_sdk @@ -19,6 +21,9 @@ set_data_normalized, truncate_and_annotate_messages, normalize_message_roles, + redact_blob_message_parts, + transform_google_content_part, + get_modality_from_mime_type, ) from sentry_sdk.consts import OP, SPANDATA from sentry_sdk.scope import should_send_default_pii @@ -145,44 +150,301 @@ def get_model_name(model: "Union[str, Model]") -> str: return str(model) -def extract_contents_text(contents: "ContentListUnion") -> "Optional[str]": - """Extract text from contents parameter which can have various formats.""" +def extract_contents_messages(contents: "ContentListUnion") -> "List[Dict[str, Any]]": + """Extract messages from contents parameter which can have various formats. + + Returns a list of message dictionaries in the format: + - System: {"role": "system", "content": "string"} + - User/Assistant: {"role": "user"|"assistant", "content": [{"text": "...", "type": "text"}, ...]} + """ if contents is None: - return None + return [] - # Simple string case + messages = [] + + # Handle string case if isinstance(contents, str): - return contents + return [{"role": "user", "content": contents}] - # List of contents or parts + # Handle list case - process each item (non-recursive, flatten at top level) if isinstance(contents, list): - texts = [] for item in contents: - # Recursively extract text from each item - extracted = extract_contents_text(item) - if extracted: - texts.append(extracted) - return " ".join(texts) if texts else None + item_messages = extract_contents_messages(item) + messages.extend(item_messages) + return messages - # Dictionary case + # Handle dictionary case (ContentDict) if isinstance(contents, dict): - if "text" in contents: - return contents["text"] - # Try to extract from parts if present in dict - if "parts" in contents: - return extract_contents_text(contents["parts"]) + role = contents.get("role", "user") + parts = contents.get("parts") + + if parts: + content_parts = [] + tool_messages = [] + + for part in parts: + part_result = _extract_part_content(part) + if part_result is None: + continue + + if isinstance(part_result, dict) and part_result.get("role") == "tool": + # Tool message - add separately + tool_messages.append(part_result) + else: + # Regular content part + content_parts.append(part_result) + + # Add main message if we have content parts + if content_parts: + # Normalize role: "model" -> "assistant" + normalized_role = "assistant" if role == "model" else role or "user" + messages.append({"role": normalized_role, "content": content_parts}) + + # Add tool messages + messages.extend(tool_messages) + elif "text" in contents: + # Simple text in dict + messages.append( + { + "role": role or "user", + "content": [{"text": contents["text"], "type": "text"}], + } + ) + + return messages + + # Handle Content object + if hasattr(contents, "parts") and contents.parts: + role = getattr(contents, "role", None) or "user" + content_parts = [] + tool_messages = [] + + for part in contents.parts: + part_result = _extract_part_content(part) + if part_result is None: + continue + + if isinstance(part_result, dict) and part_result.get("role") == "tool": + tool_messages.append(part_result) + else: + content_parts.append(part_result) + + if content_parts: + normalized_role = "assistant" if role == "model" else role + messages.append({"role": normalized_role, "content": content_parts}) - # Content object with parts - recurse into parts - if getattr(contents, "parts", None): - return extract_contents_text(contents.parts) + messages.extend(tool_messages) + return messages + + # Handle Part object directly + part_result = _extract_part_content(contents) + if part_result: + if isinstance(part_result, dict) and part_result.get("role") == "tool": + return [part_result] + else: + return [{"role": "user", "content": [part_result]}] + + # Handle PIL.Image.Image + try: + from PIL import Image as PILImage # type: ignore[import-not-found] + + if isinstance(contents, PILImage.Image): + blob_part = _extract_pil_image(contents) + if blob_part: + return [{"role": "user", "content": [blob_part]}] + except ImportError: + pass + + # Handle File object + if hasattr(contents, "uri") and hasattr(contents, "mime_type"): + # File object + file_uri = getattr(contents, "uri", None) + mime_type = getattr(contents, "mime_type", None) + if file_uri and mime_type: + blob_part = { + "type": "blob", + "mime_type": mime_type, + "file_uri": file_uri, + } + return [{"role": "user", "content": [blob_part]}] + + # Handle direct text attribute + if hasattr(contents, "text") and contents.text: + return [ + {"role": "user", "content": [{"text": str(contents.text), "type": "text"}]} + ] + + return [] + + +def _extract_part_content(part: "Any") -> "Optional[dict[str, Any]]": + """Extract content from a Part object or dict. - # Direct text attribute - if hasattr(contents, "text"): - return contents.text + Returns: + - dict for content part (text/blob) or tool message + - None if part should be skipped + """ + if part is None: + return None + + # Handle dict Part + if isinstance(part, dict): + # Check for function_response first (tool message) + if "function_response" in part: + return _extract_tool_message_from_part(part) + + if part.get("text"): + return {"text": part["text"], "type": "text"} + + # Try using Google-specific transform for dict formats (inline_data, file_data) + result = transform_google_content_part(part) + if result is not None: + # For inline_data with bytes data, substitute the content + if "inline_data" in part: + inline_data = part["inline_data"] + if isinstance(inline_data, dict) and isinstance( + inline_data.get("data"), bytes + ): + result["content"] = BLOB_DATA_SUBSTITUTE + return result + + return None + + # Handle Part object + # Check for function_response (tool message) + if hasattr(part, "function_response") and part.function_response: + return _extract_tool_message_from_part(part) + + # Handle text + if hasattr(part, "text") and part.text: + return {"text": part.text, "type": "text"} + + # Handle file_data + if hasattr(part, "file_data") and part.file_data: + file_data = part.file_data + file_uri = getattr(file_data, "file_uri", None) + mime_type = getattr(file_data, "mime_type", None) + if file_uri and mime_type: + return { + "type": "blob", + "mime_type": mime_type, + "file_uri": file_uri, + } + + # Handle inline_data + if hasattr(part, "inline_data") and part.inline_data: + inline_data = part.inline_data + data = getattr(inline_data, "data", None) + mime_type = getattr(inline_data, "mime_type", None) + if data and mime_type: + if isinstance(data, bytes): + return { + "type": "blob", + "mime_type": mime_type, + "content": BLOB_DATA_SUBSTITUTE, + } return None +def _extract_tool_message_from_part(part: "Any") -> "Optional[dict[str, Any]]": + """Extract tool message from a Part with function_response. + + Returns: + {"role": "tool", "content": {"toolCallId": "...", "toolName": "...", "output": "..."}} + or None if not a valid tool message + """ + function_response = None + + if isinstance(part, dict): + function_response = part.get("function_response") + elif hasattr(part, "function_response"): + function_response = part.function_response + + if not function_response: + return None + + # Extract fields from function_response + tool_call_id = None + tool_name = None + output = None + + if isinstance(function_response, dict): + tool_call_id = function_response.get("id") + tool_name = function_response.get("name") + response_dict = function_response.get("response", {}) + # Prefer "output" key if present, otherwise use entire response + output = response_dict.get("output", response_dict) + else: + # FunctionResponse object + tool_call_id = getattr(function_response, "id", None) + tool_name = getattr(function_response, "name", None) + response_obj = getattr(function_response, "response", None) + if response_obj is None: + response_obj = {} + if isinstance(response_obj, dict): + output = response_obj.get("output", response_obj) + else: + output = response_obj + + if not tool_name: + return None + + return { + "role": "tool", + "content": { + "toolCallId": str(tool_call_id) if tool_call_id else None, + "toolName": str(tool_name), + "output": safe_serialize(output) if output is not None else None, + }, + } + + +def _extract_pil_image(image: "Any") -> "Optional[dict[str, Any]]": + """Extract blob part from PIL.Image.Image.""" + try: + from PIL import Image as PILImage + + if not isinstance(image, PILImage.Image): + return None + + # Get format, default to JPEG + format_str = image.format or "JPEG" + suffix = format_str.lower() + mime_type = f"image/{suffix}" + + return { + "type": "blob", + "mime_type": mime_type, + "content": BLOB_DATA_SUBSTITUTE, + } + except Exception: + return None + + +def extract_contents_text(contents: "ContentListUnion") -> "Optional[str]": + """Extract text from contents parameter which can have various formats. + + This is a compatibility function that extracts text from messages. + For new code, use extract_contents_messages instead. + """ + messages = extract_contents_messages(contents) + if not messages: + return None + + texts = [] + for message in messages: + content = message.get("content") + if isinstance(content, str): + texts.append(content) + elif isinstance(content, list): + for part in content: + if isinstance(part, dict) and part.get("type") == "text": + texts.append(part.get("text", "")) + + return " ".join(texts) if texts else None + + def _format_tools_for_span( tools: "Iterable[Tool | Callable[..., Any]]", ) -> "Optional[List[dict[str, Any]]]": @@ -457,14 +719,28 @@ def set_span_data_for_request( if config and hasattr(config, "system_instruction"): system_instruction = config.system_instruction if system_instruction: - system_text = extract_contents_text(system_instruction) - if system_text: - messages.append({"role": "system", "content": system_text}) - - # Add user message - contents_text = extract_contents_text(contents) - if contents_text: - messages.append({"role": "user", "content": contents_text}) + system_messages = extract_contents_messages(system_instruction) + # System instruction should be a single system message + # Extract text from all messages and combine into one system message + system_texts = [] + for msg in system_messages: + content = msg.get("content") + if isinstance(content, list): + # Extract text from content parts + for part in content: + if isinstance(part, dict) and part.get("type") == "text": + system_texts.append(part.get("text", "")) + elif isinstance(content, str): + system_texts.append(content) + + if system_texts: + messages.append( + {"role": "system", "content": " ".join(system_texts)} + ) + + # Extract messages from contents + contents_messages = extract_contents_messages(contents) + messages.extend(contents_messages) if messages: normalized_messages = normalize_message_roles(messages) diff --git a/tests/integrations/google_genai/test_google_genai.py b/tests/integrations/google_genai/test_google_genai.py index a49822f3d4..2557c1c2e5 100644 --- a/tests/integrations/google_genai/test_google_genai.py +++ b/tests/integrations/google_genai/test_google_genai.py @@ -6,8 +6,10 @@ from google.genai import types as genai_types from sentry_sdk import start_transaction +from sentry_sdk._types import BLOB_DATA_SUBSTITUTE from sentry_sdk.consts import OP, SPANDATA from sentry_sdk.integrations.google_genai import GoogleGenAIIntegration +from sentry_sdk.integrations.google_genai.utils import extract_contents_messages @pytest.fixture @@ -1417,3 +1419,321 @@ async def test_async_embed_content_span_origin( assert event["contexts"]["trace"]["origin"] == "manual" for span in event["spans"]: assert span["origin"] == "auto.ai.google_genai" + + +# Tests for extract_contents_messages function +def test_extract_contents_messages_none(): + """Test extract_contents_messages with None input""" + result = extract_contents_messages(None) + assert result == [] + + +def test_extract_contents_messages_string(): + """Test extract_contents_messages with string input""" + result = extract_contents_messages("Hello world") + assert result == [{"role": "user", "content": "Hello world"}] + + +def test_extract_contents_messages_content_object(): + """Test extract_contents_messages with Content object""" + content = genai_types.Content( + role="user", parts=[genai_types.Part(text="Test message")] + ) + result = extract_contents_messages(content) + assert len(result) == 1 + assert result[0]["role"] == "user" + assert result[0]["content"] == [{"text": "Test message", "type": "text"}] + + +def test_extract_contents_messages_content_object_model_role(): + """Test extract_contents_messages with Content object having model role""" + content = genai_types.Content( + role="model", parts=[genai_types.Part(text="Assistant response")] + ) + result = extract_contents_messages(content) + assert len(result) == 1 + assert result[0]["role"] == "assistant" + assert result[0]["content"] == [{"text": "Assistant response", "type": "text"}] + + +def test_extract_contents_messages_content_object_no_role(): + """Test extract_contents_messages with Content object without role""" + content = genai_types.Content(parts=[genai_types.Part(text="No role message")]) + result = extract_contents_messages(content) + assert len(result) == 1 + assert result[0]["role"] == "user" + assert result[0]["content"] == [{"text": "No role message", "type": "text"}] + + +def test_extract_contents_messages_part_object(): + """Test extract_contents_messages with Part object""" + part = genai_types.Part(text="Direct part") + result = extract_contents_messages(part) + assert len(result) == 1 + assert result[0]["role"] == "user" + assert result[0]["content"] == [{"text": "Direct part", "type": "text"}] + + +def test_extract_contents_messages_file_data(): + """Test extract_contents_messages with file_data""" + file_data = genai_types.FileData( + file_uri="gs://bucket/file.jpg", mime_type="image/jpeg" + ) + part = genai_types.Part(file_data=file_data) + content = genai_types.Content(parts=[part]) + result = extract_contents_messages(content) + + assert len(result) == 1 + assert result[0]["role"] == "user" + assert len(result[0]["content"]) == 1 + blob_part = result[0]["content"][0] + assert blob_part["type"] == "blob" + assert blob_part["mime_type"] == "image/jpeg" + assert blob_part["file_uri"] == "gs://bucket/file.jpg" + + +def test_extract_contents_messages_inline_data(): + """Test extract_contents_messages with inline_data (binary)""" + # Create inline data with bytes + image_bytes = b"fake_image_data" + blob = genai_types.Blob(data=image_bytes, mime_type="image/png") + part = genai_types.Part(inline_data=blob) + content = genai_types.Content(parts=[part]) + result = extract_contents_messages(content) + + assert len(result) == 1 + assert result[0]["role"] == "user" + assert len(result[0]["content"]) == 1 + blob_part = result[0]["content"][0] + assert blob_part["type"] == "blob" + assert blob_part["mime_type"] == "image/png" + assert blob_part["content"] == BLOB_DATA_SUBSTITUTE + + +def test_extract_contents_messages_function_response(): + """Test extract_contents_messages with function_response (tool message)""" + function_response = genai_types.FunctionResponse( + id="call_123", name="get_weather", response={"output": "sunny"} + ) + part = genai_types.Part(function_response=function_response) + content = genai_types.Content(parts=[part]) + result = extract_contents_messages(content) + + assert len(result) == 1 + assert result[0]["role"] == "tool" + assert result[0]["content"]["toolCallId"] == "call_123" + assert result[0]["content"]["toolName"] == "get_weather" + assert result[0]["content"]["output"] == '"sunny"' + + +def test_extract_contents_messages_function_response_with_output_key(): + """Test extract_contents_messages with function_response that has output key""" + function_response = genai_types.FunctionResponse( + id="call_456", name="get_time", response={"output": "3:00 PM", "error": None} + ) + part = genai_types.Part(function_response=function_response) + content = genai_types.Content(parts=[part]) + result = extract_contents_messages(content) + + assert len(result) == 1 + assert result[0]["role"] == "tool" + assert result[0]["content"]["toolCallId"] == "call_456" + assert result[0]["content"]["toolName"] == "get_time" + # Should prefer "output" key + assert result[0]["content"]["output"] == '"3:00 PM"' + + +def test_extract_contents_messages_mixed_parts(): + """Test extract_contents_messages with mixed content parts""" + content = genai_types.Content( + role="user", + parts=[ + genai_types.Part(text="Text part"), + genai_types.Part( + file_data=genai_types.FileData( + file_uri="gs://bucket/image.jpg", mime_type="image/jpeg" + ) + ), + ], + ) + result = extract_contents_messages(content) + + assert len(result) == 1 + assert result[0]["role"] == "user" + assert len(result[0]["content"]) == 2 + assert result[0]["content"][0] == {"text": "Text part", "type": "text"} + assert result[0]["content"][1]["type"] == "blob" + assert result[0]["content"][1]["file_uri"] == "gs://bucket/image.jpg" + + +def test_extract_contents_messages_list(): + """Test extract_contents_messages with list input""" + contents = [ + "First message", + genai_types.Content( + role="user", parts=[genai_types.Part(text="Second message")] + ), + ] + result = extract_contents_messages(contents) + + assert len(result) == 2 + assert result[0] == {"role": "user", "content": "First message"} + assert result[1]["role"] == "user" + assert result[1]["content"] == [{"text": "Second message", "type": "text"}] + + +def test_extract_contents_messages_dict_content(): + """Test extract_contents_messages with dict (ContentDict)""" + content_dict = {"role": "user", "parts": [{"text": "Dict message"}]} + result = extract_contents_messages(content_dict) + + assert len(result) == 1 + assert result[0]["role"] == "user" + assert result[0]["content"] == [{"text": "Dict message", "type": "text"}] + + +def test_extract_contents_messages_dict_with_text(): + """Test extract_contents_messages with dict containing text key""" + content_dict = {"role": "user", "text": "Simple text"} + result = extract_contents_messages(content_dict) + + assert len(result) == 1 + assert result[0]["role"] == "user" + assert result[0]["content"] == [{"text": "Simple text", "type": "text"}] + + +def test_extract_contents_messages_file_object(): + """Test extract_contents_messages with File object""" + file_obj = genai_types.File( + name="files/123", uri="gs://bucket/file.pdf", mime_type="application/pdf" + ) + result = extract_contents_messages(file_obj) + + assert len(result) == 1 + assert result[0]["role"] == "user" + assert len(result[0]["content"]) == 1 + blob_part = result[0]["content"][0] + assert blob_part["type"] == "blob" + assert blob_part["mime_type"] == "application/pdf" + assert blob_part["file_uri"] == "gs://bucket/file.pdf" + + +@pytest.mark.skipif( + not hasattr(genai_types, "PIL_Image") or genai_types.PIL_Image is None, + reason="PIL not available", +) +def test_extract_contents_messages_pil_image(): + """Test extract_contents_messages with PIL.Image.Image""" + try: + from PIL import Image as PILImage + + # Create a simple test image + img = PILImage.new("RGB", (10, 10), color="red") + result = extract_contents_messages(img) + + assert len(result) == 1 + assert result[0]["role"] == "user" + assert len(result[0]["content"]) == 1 + blob_part = result[0]["content"][0] + assert blob_part["type"] == "blob" + assert blob_part["mime_type"].startswith("image/") + assert "content" in blob_part + # Binary content is substituted with placeholder for privacy + assert blob_part["content"] == "[Blob substitute]" + except ImportError: + pytest.skip("PIL not available") + + +def test_extract_contents_messages_tool_and_text(): + """Test extract_contents_messages with both tool message and text""" + content = genai_types.Content( + role="user", + parts=[ + genai_types.Part(text="User question"), + genai_types.Part( + function_response=genai_types.FunctionResponse( + id="call_789", name="search", response={"output": "results"} + ) + ), + ], + ) + result = extract_contents_messages(content) + + # Should have two messages: one user message and one tool message + assert len(result) == 2 + # First should be user message with text + assert result[0]["role"] == "user" + assert result[0]["content"] == [{"text": "User question", "type": "text"}] + # Second should be tool message + assert result[1]["role"] == "tool" + assert result[1]["content"]["toolCallId"] == "call_789" + assert result[1]["content"]["toolName"] == "search" + + +def test_extract_contents_messages_empty_parts(): + """Test extract_contents_messages with Content object with empty parts""" + content = genai_types.Content(role="user", parts=[]) + result = extract_contents_messages(content) + + assert result == [] + + +def test_extract_contents_messages_empty_list(): + """Test extract_contents_messages with empty list""" + result = extract_contents_messages([]) + assert result == [] + + +def test_extract_contents_messages_dict_inline_data(): + """Test extract_contents_messages with dict containing inline_data""" + content_dict = { + "role": "user", + "parts": [{"inline_data": {"data": b"binary_data", "mime_type": "image/gif"}}], + } + result = extract_contents_messages(content_dict) + + assert len(result) == 1 + assert result[0]["role"] == "user" + assert len(result[0]["content"]) == 1 + blob_part = result[0]["content"][0] + assert blob_part["type"] == "blob" + assert blob_part["mime_type"] == "image/gif" + assert blob_part["content"] == BLOB_DATA_SUBSTITUTE + + +def test_extract_contents_messages_dict_function_response(): + """Test extract_contents_messages with dict containing function_response""" + content_dict = { + "role": "user", + "parts": [ + { + "function_response": { + "id": "dict_call_1", + "name": "dict_tool", + "response": {"result": "success"}, + } + } + ], + } + result = extract_contents_messages(content_dict) + + assert len(result) == 1 + assert result[0]["role"] == "tool" + assert result[0]["content"]["toolCallId"] == "dict_call_1" + assert result[0]["content"]["toolName"] == "dict_tool" + assert result[0]["content"]["output"] == '{"result": "success"}' + + +def test_extract_contents_messages_object_with_text_attribute(): + """Test extract_contents_messages with object that has text attribute""" + + class TextObject: + def __init__(self): + self.text = "Object text" + + obj = TextObject() + result = extract_contents_messages(obj) + + assert len(result) == 1 + assert result[0]["role"] == "user" + assert result[0]["content"] == [{"text": "Object text", "type": "text"}] diff --git a/tests/test_ai_monitoring.py b/tests/test_ai_monitoring.py index 1ff354f473..f6852d54bb 100644 --- a/tests/test_ai_monitoring.py +++ b/tests/test_ai_monitoring.py @@ -19,6 +19,13 @@ _find_truncation_index, parse_data_uri, redact_blob_message_parts, + get_modality_from_mime_type, + transform_openai_content_part, + transform_anthropic_content_part, + transform_google_content_part, + transform_generic_content_part, + transform_content_part, + transform_message_content, ) from sentry_sdk.serializer import serialize from sentry_sdk.utils import safe_serialize @@ -842,3 +849,906 @@ def test_handles_uri_without_data_prefix(self): assert mime_type == "image/jpeg" assert content == "/9j/4AAQ" + + +class TestGetModalityFromMimeType: + def test_image_mime_types(self): + """Test that image MIME types return 'image' modality""" + assert get_modality_from_mime_type("image/jpeg") == "image" + assert get_modality_from_mime_type("image/png") == "image" + assert get_modality_from_mime_type("image/gif") == "image" + assert get_modality_from_mime_type("image/webp") == "image" + assert get_modality_from_mime_type("IMAGE/JPEG") == "image" # case insensitive + + def test_audio_mime_types(self): + """Test that audio MIME types return 'audio' modality""" + assert get_modality_from_mime_type("audio/mp3") == "audio" + assert get_modality_from_mime_type("audio/wav") == "audio" + assert get_modality_from_mime_type("audio/ogg") == "audio" + assert get_modality_from_mime_type("AUDIO/MP3") == "audio" # case insensitive + + def test_video_mime_types(self): + """Test that video MIME types return 'video' modality""" + assert get_modality_from_mime_type("video/mp4") == "video" + assert get_modality_from_mime_type("video/webm") == "video" + assert get_modality_from_mime_type("video/quicktime") == "video" + assert get_modality_from_mime_type("VIDEO/MP4") == "video" # case insensitive + + def test_document_mime_types(self): + """Test that application and text MIME types return 'document' modality""" + assert get_modality_from_mime_type("application/pdf") == "document" + assert get_modality_from_mime_type("application/json") == "document" + assert get_modality_from_mime_type("text/plain") == "document" + assert get_modality_from_mime_type("text/html") == "document" + + def test_empty_mime_type_returns_image(self): + """Test that empty MIME type defaults to 'image'""" + assert get_modality_from_mime_type("") == "image" + + def test_none_mime_type_returns_image(self): + """Test that None-like values default to 'image'""" + assert get_modality_from_mime_type(None) == "image" + + def test_unknown_mime_type_returns_image(self): + """Test that unknown MIME types default to 'image'""" + assert get_modality_from_mime_type("unknown/type") == "image" + assert get_modality_from_mime_type("custom/format") == "image" + + +class TestTransformOpenAIContentPart: + """Tests for the OpenAI-specific transform function.""" + + def test_image_url_with_data_uri(self): + """Test transforming OpenAI image_url with base64 data URI""" + content_part = { + "type": "image_url", + "image_url": {"url": ""}, + } + result = transform_openai_content_part(content_part) + + assert result == { + "type": "blob", + "modality": "image", + "mime_type": "image/jpeg", + "content": "/9j/4AAQSkZJRg==", + } + + def test_image_url_with_regular_url(self): + """Test transforming OpenAI image_url with regular URL""" + content_part = { + "type": "image_url", + "image_url": {"url": "https://example.com/image.jpg"}, + } + result = transform_openai_content_part(content_part) + + assert result == { + "type": "uri", + "modality": "image", + "mime_type": "", + "uri": "https://example.com/image.jpg", + } + + def test_image_url_string_format(self): + """Test transforming OpenAI image_url where image_url is a string""" + content_part = { + "type": "image_url", + "image_url": "https://example.com/image.jpg", + } + result = transform_openai_content_part(content_part) + + assert result == { + "type": "uri", + "modality": "image", + "mime_type": "", + "uri": "https://example.com/image.jpg", + } + + def test_image_url_invalid_data_uri(self): + """Test transforming OpenAI image_url with invalid data URI falls back to URI""" + content_part = { + "type": "image_url", + "image_url": {"url": "data:image/jpeg;base64"}, # Missing comma + } + result = transform_openai_content_part(content_part) + + assert result == { + "type": "uri", + "modality": "image", + "mime_type": "", + "uri": "data:image/jpeg;base64", + } + + def test_empty_url_returns_none(self): + """Test that image_url with empty URL returns None""" + content_part = {"type": "image_url", "image_url": {"url": ""}} + assert transform_openai_content_part(content_part) is None + + def test_non_image_url_type_returns_none(self): + """Test that non-image_url types return None""" + content_part = {"type": "text", "text": "Hello"} + assert transform_openai_content_part(content_part) is None + + def test_anthropic_format_returns_none(self): + """Test that Anthropic format returns None (not handled)""" + content_part = { + "type": "image", + "source": {"type": "base64", "media_type": "image/png", "data": "abc"}, + } + assert transform_openai_content_part(content_part) is None + + def test_google_format_returns_none(self): + """Test that Google format returns None (not handled)""" + content_part = {"inline_data": {"mime_type": "image/jpeg", "data": "abc"}} + assert transform_openai_content_part(content_part) is None + + def test_non_dict_returns_none(self): + """Test that non-dict input returns None""" + assert transform_openai_content_part("string") is None + assert transform_openai_content_part(123) is None + assert transform_openai_content_part(None) is None + + +class TestTransformAnthropicContentPart: + """Tests for the Anthropic-specific transform function.""" + + def test_image_base64(self): + """Test transforming Anthropic image with base64 source""" + content_part = { + "type": "image", + "source": { + "type": "base64", + "media_type": "image/png", + "data": "iVBORw0KGgo=", + }, + } + result = transform_anthropic_content_part(content_part) + + assert result == { + "type": "blob", + "modality": "image", + "mime_type": "image/png", + "content": "iVBORw0KGgo=", + } + + def test_image_url(self): + """Test transforming Anthropic image with URL source""" + content_part = { + "type": "image", + "source": { + "type": "url", + "media_type": "image/jpeg", + "url": "https://example.com/image.jpg", + }, + } + result = transform_anthropic_content_part(content_part) + + assert result == { + "type": "uri", + "modality": "image", + "mime_type": "image/jpeg", + "uri": "https://example.com/image.jpg", + } + + def test_image_file(self): + """Test transforming Anthropic image with file source""" + content_part = { + "type": "image", + "source": { + "type": "file", + "media_type": "image/jpeg", + "file_id": "file_123", + }, + } + result = transform_anthropic_content_part(content_part) + + assert result == { + "type": "file", + "modality": "image", + "mime_type": "image/jpeg", + "file_id": "file_123", + } + + def test_document_base64(self): + """Test transforming Anthropic document with base64 source""" + content_part = { + "type": "document", + "source": { + "type": "base64", + "media_type": "application/pdf", + "data": "JVBERi0xLjQ=", + }, + } + result = transform_anthropic_content_part(content_part) + + assert result == { + "type": "blob", + "modality": "document", + "mime_type": "application/pdf", + "content": "JVBERi0xLjQ=", + } + + def test_document_url(self): + """Test transforming Anthropic document with URL source""" + content_part = { + "type": "document", + "source": { + "type": "url", + "media_type": "application/pdf", + "url": "https://example.com/doc.pdf", + }, + } + result = transform_anthropic_content_part(content_part) + + assert result == { + "type": "uri", + "modality": "document", + "mime_type": "application/pdf", + "uri": "https://example.com/doc.pdf", + } + + def test_invalid_source_returns_none(self): + """Test that Anthropic format with invalid source returns None""" + content_part = {"type": "image", "source": "not_a_dict"} + assert transform_anthropic_content_part(content_part) is None + + def test_unknown_source_type_returns_none(self): + """Test that Anthropic format with unknown source type returns None""" + content_part = { + "type": "image", + "source": {"type": "unknown", "data": "something"}, + } + assert transform_anthropic_content_part(content_part) is None + + def test_missing_source_returns_none(self): + """Test that Anthropic format without source returns None""" + content_part = {"type": "image", "data": "something"} + assert transform_anthropic_content_part(content_part) is None + + def test_openai_format_returns_none(self): + """Test that OpenAI format returns None (not handled)""" + content_part = { + "type": "image_url", + "image_url": {"url": "https://example.com"}, + } + assert transform_anthropic_content_part(content_part) is None + + def test_google_format_returns_none(self): + """Test that Google format returns None (not handled)""" + content_part = {"inline_data": {"mime_type": "image/jpeg", "data": "abc"}} + assert transform_anthropic_content_part(content_part) is None + + def test_non_dict_returns_none(self): + """Test that non-dict input returns None""" + assert transform_anthropic_content_part("string") is None + assert transform_anthropic_content_part(123) is None + assert transform_anthropic_content_part(None) is None + + +class TestTransformGoogleContentPart: + """Tests for the Google GenAI-specific transform function.""" + + def test_inline_data(self): + """Test transforming Google inline_data format""" + content_part = { + "inline_data": { + "mime_type": "image/jpeg", + "data": "/9j/4AAQSkZJRg==", + } + } + result = transform_google_content_part(content_part) + + assert result == { + "type": "blob", + "modality": "image", + "mime_type": "image/jpeg", + "content": "/9j/4AAQSkZJRg==", + } + + def test_file_data(self): + """Test transforming Google file_data format""" + content_part = { + "file_data": { + "mime_type": "video/mp4", + "file_uri": "gs://bucket/video.mp4", + } + } + result = transform_google_content_part(content_part) + + assert result == { + "type": "uri", + "modality": "video", + "mime_type": "video/mp4", + "uri": "gs://bucket/video.mp4", + } + + def test_inline_data_audio(self): + """Test transforming Google inline_data with audio""" + content_part = { + "inline_data": { + "mime_type": "audio/wav", + "data": "UklGRiQA", + } + } + result = transform_google_content_part(content_part) + + assert result == { + "type": "blob", + "modality": "audio", + "mime_type": "audio/wav", + "content": "UklGRiQA", + } + + def test_inline_data_not_dict_returns_none(self): + """Test that Google inline_data with non-dict value returns None""" + content_part = {"inline_data": "not_a_dict"} + assert transform_google_content_part(content_part) is None + + def test_file_data_not_dict_returns_none(self): + """Test that Google file_data with non-dict value returns None""" + content_part = {"file_data": "not_a_dict"} + assert transform_google_content_part(content_part) is None + + def test_openai_format_returns_none(self): + """Test that OpenAI format returns None (not handled)""" + content_part = { + "type": "image_url", + "image_url": {"url": "https://example.com"}, + } + assert transform_google_content_part(content_part) is None + + def test_anthropic_format_returns_none(self): + """Test that Anthropic format returns None (not handled)""" + content_part = { + "type": "image", + "source": {"type": "base64", "media_type": "image/png", "data": "abc"}, + } + assert transform_google_content_part(content_part) is None + + def test_non_dict_returns_none(self): + """Test that non-dict input returns None""" + assert transform_google_content_part("string") is None + assert transform_google_content_part(123) is None + assert transform_google_content_part(None) is None + + +class TestTransformGenericContentPart: + """Tests for the generic/LangChain-style transform function.""" + + def test_image_base64(self): + """Test transforming generic format with base64""" + content_part = { + "type": "image", + "base64": "/9j/4AAQSkZJRg==", + "mime_type": "image/jpeg", + } + result = transform_generic_content_part(content_part) + + assert result == { + "type": "blob", + "modality": "image", + "mime_type": "image/jpeg", + "content": "/9j/4AAQSkZJRg==", + } + + def test_audio_url(self): + """Test transforming generic format with URL""" + content_part = { + "type": "audio", + "url": "https://example.com/audio.mp3", + "mime_type": "audio/mp3", + } + result = transform_generic_content_part(content_part) + + assert result == { + "type": "uri", + "modality": "audio", + "mime_type": "audio/mp3", + "uri": "https://example.com/audio.mp3", + } + + def test_file_with_file_id(self): + """Test transforming generic format with file_id""" + content_part = { + "type": "file", + "file_id": "file_456", + "mime_type": "application/pdf", + } + result = transform_generic_content_part(content_part) + + assert result == { + "type": "file", + "modality": "document", + "mime_type": "application/pdf", + "file_id": "file_456", + } + + def test_video_base64(self): + """Test transforming generic video format""" + content_part = { + "type": "video", + "base64": "AAAA", + "mime_type": "video/mp4", + } + result = transform_generic_content_part(content_part) + + assert result == { + "type": "blob", + "modality": "video", + "mime_type": "video/mp4", + "content": "AAAA", + } + + def test_image_with_source_returns_none(self): + """Test that image with source key (Anthropic style) returns None""" + # This is Anthropic format, should NOT be handled by generic + content_part = { + "type": "image", + "source": {"type": "base64", "data": "abc"}, + } + assert transform_generic_content_part(content_part) is None + + def test_text_type_returns_none(self): + """Test that text type returns None""" + content_part = {"type": "text", "text": "Hello"} + assert transform_generic_content_part(content_part) is None + + def test_openai_format_returns_none(self): + """Test that OpenAI format returns None (not handled)""" + content_part = { + "type": "image_url", + "image_url": {"url": "https://example.com"}, + } + assert transform_generic_content_part(content_part) is None + + def test_google_format_returns_none(self): + """Test that Google format returns None (not handled)""" + content_part = {"inline_data": {"mime_type": "image/jpeg", "data": "abc"}} + assert transform_generic_content_part(content_part) is None + + def test_non_dict_returns_none(self): + """Test that non-dict input returns None""" + assert transform_generic_content_part("string") is None + assert transform_generic_content_part(123) is None + assert transform_generic_content_part(None) is None + + def test_missing_data_key_returns_none(self): + """Test that missing data key (base64/url/file_id) returns None""" + content_part = {"type": "image", "mime_type": "image/jpeg"} + assert transform_generic_content_part(content_part) is None + + +class TestTransformContentPart: + # OpenAI/LiteLLM format tests + def test_openai_image_url_with_data_uri(self): + """Test transforming OpenAI image_url with base64 data URI""" + content_part = { + "type": "image_url", + "image_url": {"url": ""}, + } + result = transform_content_part(content_part) + + assert result == { + "type": "blob", + "modality": "image", + "mime_type": "image/jpeg", + "content": "/9j/4AAQSkZJRg==", + } + + def test_openai_image_url_with_regular_url(self): + """Test transforming OpenAI image_url with regular URL""" + content_part = { + "type": "image_url", + "image_url": {"url": "https://example.com/image.jpg"}, + } + result = transform_content_part(content_part) + + assert result == { + "type": "uri", + "modality": "image", + "mime_type": "", + "uri": "https://example.com/image.jpg", + } + + def test_openai_image_url_string_format(self): + """Test transforming OpenAI image_url where image_url is a string""" + content_part = { + "type": "image_url", + "image_url": "https://example.com/image.jpg", + } + result = transform_content_part(content_part) + + assert result == { + "type": "uri", + "modality": "image", + "mime_type": "", + "uri": "https://example.com/image.jpg", + } + + def test_openai_image_url_invalid_data_uri(self): + """Test transforming OpenAI image_url with invalid data URI falls back to URI""" + content_part = { + "type": "image_url", + "image_url": {"url": "data:image/jpeg;base64"}, # Missing comma + } + result = transform_content_part(content_part) + + assert result == { + "type": "uri", + "modality": "image", + "mime_type": "", + "uri": "data:image/jpeg;base64", + } + + # Anthropic format tests + def test_anthropic_image_base64(self): + """Test transforming Anthropic image with base64 source""" + content_part = { + "type": "image", + "source": { + "type": "base64", + "media_type": "image/png", + "data": "iVBORw0KGgo=", + }, + } + result = transform_content_part(content_part) + + assert result == { + "type": "blob", + "modality": "image", + "mime_type": "image/png", + "content": "iVBORw0KGgo=", + } + + def test_anthropic_image_url(self): + """Test transforming Anthropic image with URL source""" + content_part = { + "type": "image", + "source": { + "type": "url", + "media_type": "image/jpeg", + "url": "https://example.com/image.jpg", + }, + } + result = transform_content_part(content_part) + + assert result == { + "type": "uri", + "modality": "image", + "mime_type": "image/jpeg", + "uri": "https://example.com/image.jpg", + } + + def test_anthropic_image_file(self): + """Test transforming Anthropic image with file source""" + content_part = { + "type": "image", + "source": { + "type": "file", + "media_type": "image/jpeg", + "file_id": "file_123", + }, + } + result = transform_content_part(content_part) + + assert result == { + "type": "file", + "modality": "image", + "mime_type": "image/jpeg", + "file_id": "file_123", + } + + def test_anthropic_document_base64(self): + """Test transforming Anthropic document with base64 source""" + content_part = { + "type": "document", + "source": { + "type": "base64", + "media_type": "application/pdf", + "data": "JVBERi0xLjQ=", + }, + } + result = transform_content_part(content_part) + + assert result == { + "type": "blob", + "modality": "document", + "mime_type": "application/pdf", + "content": "JVBERi0xLjQ=", + } + + def test_anthropic_document_url(self): + """Test transforming Anthropic document with URL source""" + content_part = { + "type": "document", + "source": { + "type": "url", + "media_type": "application/pdf", + "url": "https://example.com/doc.pdf", + }, + } + result = transform_content_part(content_part) + + assert result == { + "type": "uri", + "modality": "document", + "mime_type": "application/pdf", + "uri": "https://example.com/doc.pdf", + } + + # Google format tests + def test_google_inline_data(self): + """Test transforming Google inline_data format""" + content_part = { + "inline_data": { + "mime_type": "image/jpeg", + "data": "/9j/4AAQSkZJRg==", + } + } + result = transform_content_part(content_part) + + assert result == { + "type": "blob", + "modality": "image", + "mime_type": "image/jpeg", + "content": "/9j/4AAQSkZJRg==", + } + + def test_google_file_data(self): + """Test transforming Google file_data format""" + content_part = { + "file_data": { + "mime_type": "video/mp4", + "file_uri": "gs://bucket/video.mp4", + } + } + result = transform_content_part(content_part) + + assert result == { + "type": "uri", + "modality": "video", + "mime_type": "video/mp4", + "uri": "gs://bucket/video.mp4", + } + + def test_google_inline_data_audio(self): + """Test transforming Google inline_data with audio""" + content_part = { + "inline_data": { + "mime_type": "audio/wav", + "data": "UklGRiQA", + } + } + result = transform_content_part(content_part) + + assert result == { + "type": "blob", + "modality": "audio", + "mime_type": "audio/wav", + "content": "UklGRiQA", + } + + # Generic format tests (LangChain style) + def test_generic_image_base64(self): + """Test transforming generic format with base64""" + content_part = { + "type": "image", + "base64": "/9j/4AAQSkZJRg==", + "mime_type": "image/jpeg", + } + result = transform_content_part(content_part) + + assert result == { + "type": "blob", + "modality": "image", + "mime_type": "image/jpeg", + "content": "/9j/4AAQSkZJRg==", + } + + def test_generic_audio_url(self): + """Test transforming generic format with URL""" + content_part = { + "type": "audio", + "url": "https://example.com/audio.mp3", + "mime_type": "audio/mp3", + } + result = transform_content_part(content_part) + + assert result == { + "type": "uri", + "modality": "audio", + "mime_type": "audio/mp3", + "uri": "https://example.com/audio.mp3", + } + + def test_generic_file_with_file_id(self): + """Test transforming generic format with file_id""" + content_part = { + "type": "file", + "file_id": "file_456", + "mime_type": "application/pdf", + } + result = transform_content_part(content_part) + + assert result == { + "type": "file", + "modality": "document", + "mime_type": "application/pdf", + "file_id": "file_456", + } + + def test_generic_video_base64(self): + """Test transforming generic video format""" + content_part = { + "type": "video", + "base64": "AAAA", + "mime_type": "video/mp4", + } + result = transform_content_part(content_part) + + assert result == { + "type": "blob", + "modality": "video", + "mime_type": "video/mp4", + "content": "AAAA", + } + + # Edge cases and error handling + def test_text_block_returns_none(self): + """Test that text blocks return None (not transformed)""" + content_part = {"type": "text", "text": "Hello world"} + result = transform_content_part(content_part) + + assert result is None + + def test_non_dict_returns_none(self): + """Test that non-dict input returns None""" + assert transform_content_part("string") is None + assert transform_content_part(123) is None + assert transform_content_part(None) is None + assert transform_content_part([1, 2, 3]) is None + + def test_empty_dict_returns_none(self): + """Test that empty dict returns None""" + assert transform_content_part({}) is None + + def test_unknown_type_returns_none(self): + """Test that unknown type returns None""" + content_part = {"type": "unknown", "data": "something"} + assert transform_content_part(content_part) is None + + def test_openai_image_url_empty_url_returns_none(self): + """Test that image_url with empty URL returns None""" + content_part = {"type": "image_url", "image_url": {"url": ""}} + assert transform_content_part(content_part) is None + + def test_anthropic_invalid_source_returns_none(self): + """Test that Anthropic format with invalid source returns None""" + content_part = {"type": "image", "source": "not_a_dict"} + assert transform_content_part(content_part) is None + + def test_anthropic_unknown_source_type_returns_none(self): + """Test that Anthropic format with unknown source type returns None""" + content_part = { + "type": "image", + "source": {"type": "unknown", "data": "something"}, + } + assert transform_content_part(content_part) is None + + def test_google_inline_data_not_dict_returns_none(self): + """Test that Google inline_data with non-dict value returns None""" + content_part = {"inline_data": "not_a_dict"} + assert transform_content_part(content_part) is None + + def test_google_file_data_not_dict_returns_none(self): + """Test that Google file_data with non-dict value returns None""" + content_part = {"file_data": "not_a_dict"} + assert transform_content_part(content_part) is None + + +class TestTransformMessageContent: + def test_string_content_returned_as_is(self): + """Test that string content is returned unchanged""" + content = "Hello, world!" + result = transform_message_content(content) + + assert result == "Hello, world!" + + def test_list_with_transformable_items(self): + """Test transforming a list with transformable content parts""" + content = [ + {"type": "text", "text": "What's in this image?"}, + { + "type": "image_url", + "image_url": {"url": ""}, + }, + ] + result = transform_message_content(content) + + assert len(result) == 2 + # Text block should be unchanged (transform returns None, so original kept) + assert result[0] == {"type": "text", "text": "What's in this image?"} + # Image should be transformed + assert result[1] == { + "type": "blob", + "modality": "image", + "mime_type": "image/jpeg", + "content": "/9j/4AAQ", + } + + def test_list_with_non_dict_items(self): + """Test that non-dict items in list are kept as-is""" + content = ["text string", 123, {"type": "text", "text": "hi"}] + result = transform_message_content(content) + + assert result == ["text string", 123, {"type": "text", "text": "hi"}] + + def test_tuple_content(self): + """Test that tuple content is also handled""" + content = ( + {"type": "text", "text": "Hello"}, + { + "type": "image_url", + "image_url": {"url": "https://example.com/img.jpg"}, + }, + ) + result = transform_message_content(content) + + assert len(result) == 2 + assert result[0] == {"type": "text", "text": "Hello"} + assert result[1] == { + "type": "uri", + "modality": "image", + "mime_type": "", + "uri": "https://example.com/img.jpg", + } + + def test_other_types_returned_as_is(self): + """Test that other types are returned unchanged""" + assert transform_message_content(123) == 123 + assert transform_message_content(None) is None + assert transform_message_content({"key": "value"}) == {"key": "value"} + + def test_mixed_content_types(self): + """Test transforming mixed content with multiple formats""" + content = [ + {"type": "text", "text": "Look at these:"}, + { + "type": "image_url", + "image_url": {"url": ""}, + }, + { + "type": "image", + "source": { + "type": "base64", + "media_type": "image/jpeg", + "data": "/9j/4AAQ", + }, + }, + {"inline_data": {"mime_type": "audio/wav", "data": "UklGRiQA"}}, + ] + result = transform_message_content(content) + + assert len(result) == 4 + assert result[0] == {"type": "text", "text": "Look at these:"} + assert result[1] == { + "type": "blob", + "modality": "image", + "mime_type": "image/png", + "content": "iVBORw0", + } + assert result[2] == { + "type": "blob", + "modality": "image", + "mime_type": "image/jpeg", + "content": "/9j/4AAQ", + } + assert result[3] == { + "type": "blob", + "modality": "audio", + "mime_type": "audio/wav", + "content": "UklGRiQA", + } + + def test_empty_list(self): + """Test that empty list is returned as empty list""" + assert transform_message_content([]) == []