diff --git a/langfuse/_client/propagation.py b/langfuse/_client/propagation.py index e0c68db00..28f2ccab3 100644 --- a/langfuse/_client/propagation.py +++ b/langfuse/_client/propagation.py @@ -17,6 +17,7 @@ from opentelemetry import ( trace as otel_trace_api, ) +from opentelemetry.context import _RUNTIME_CONTEXT from opentelemetry.util._decorator import ( _AgnosticContextManager, _agnosticcontextmanager, @@ -272,7 +273,13 @@ def _propagate_attributes( yield finally: - otel_context_api.detach(token) + try: + # Bypass the public detach() which logs an ERROR when the token was + # created in a different async task/thread (common in async frameworks). + # The span data is already captured; the failed detach is harmless. + _RUNTIME_CONTEXT.detach(token) + except Exception: + pass def _get_propagated_attributes_from_context( diff --git a/langfuse/langchain/CallbackHandler.py b/langfuse/langchain/CallbackHandler.py index 0cd4dd133..7c9725caa 100644 --- a/langfuse/langchain/CallbackHandler.py +++ b/langfuse/langchain/CallbackHandler.py @@ -1,3 +1,4 @@ +import json from contextvars import Token from typing import ( Any, @@ -35,6 +36,166 @@ from langfuse.logger import langfuse_logger from langfuse.types import TraceContext + +def _to_langfuse_tool(tool: Any) -> List[Any]: + """Normalize a tool definition into a list of OpenAI-format tool dicts. + + Returns a list because Google / Vertex AI's ``function_declarations`` format + bundles multiple tools into a single object, so one input can expand to many. + + LangChain providers serialize tools differently depending on the backend: + - OpenAI / LiteLLM / Ollama: {type: "function", function: {name, description, parameters}} + - Anthropic (ChatAnthropic): {name, description, input_schema} + - Google / Vertex AI: {function_declarations: [{name, description, parameters}, ...]} + - BaseTool / StructuredTool objects: LangChain objects not yet converted to dict + + All formats are normalized to the canonical OpenAI shape so Langfuse's + ``extractToolsFromObservation`` (which uses ``OpenAIToolSchema``) can parse them. + """ + if not isinstance(tool, dict): + # BaseTool / StructuredTool objects — passed without dict conversion (langfuse#11850). + # Extract via duck typing to avoid a hard langchain-core dependency. + if hasattr(tool, "name") and hasattr(tool, "description"): + try: + parameters: Dict[str, Any] = {} + args_schema = getattr(tool, "args_schema", None) + if args_schema is not None: + if hasattr(args_schema, "model_json_schema"): # Pydantic v2 + schema = args_schema.model_json_schema() + elif hasattr(args_schema, "schema"): # Pydantic v1 + schema = args_schema.schema() + else: + schema = {} + parameters = {k: v for k, v in schema.items() if k != "title"} + return [ + { + "type": "function", + "function": { + "name": tool.name, + "description": tool.description or "", + "parameters": parameters, + }, + } + ] + except Exception: + langfuse_logger.debug( + "Failed to convert BaseTool object to dict: %s", tool + ) + return [tool] + + # Already in OpenAI format: {type: "function", function: {name, description, parameters}} + if tool.get("type") == "function" and "function" in tool: + return [tool] + + # Anthropic format: {name, description, input_schema} -> OpenAI format + if "name" in tool and "input_schema" in tool: + return [ + { + "type": "function", + "function": { + "name": tool["name"], + "description": tool.get("description", ""), + "parameters": tool["input_schema"], + }, + } + ] + + # Google / Vertex AI format: {function_declarations: [{name, description, parameters}, ...]} + # One object bundles multiple tool definitions — expand to individual OpenAI-format tools. + if "function_declarations" in tool: + result = [] + for decl in tool.get("function_declarations", []): + if not isinstance(decl, dict): + continue + result.append( + { + "type": "function", + "function": { + "name": decl.get("name", ""), + "description": decl.get("description", ""), + "parameters": decl.get("parameters", {}), + }, + } + ) + return result if result else [tool] + + return [tool] + + +def _normalize_anthropic_content_blocks( + content: List[Any], tool_calls: List[Dict[str, Any]] +) -> List[Any]: + """Remove streaming artifacts from Anthropic content blocks. + + Anthropic streaming leaves tool_use blocks with ``input: {}`` and + streaming-specific fields (``index``, ``partial_json``). The actual + arguments are already reconstructed in ``message.tool_calls``. This + helper fills the empty ``input`` from the normalized tool_calls and + strips the streaming-only keys so the block looks like a proper + Anthropic content block. + """ + if not tool_calls: + return content + tc_by_id: Dict[str, Any] = { + tc["id"]: tc.get("args", {}) + for tc in tool_calls + if isinstance(tc, dict) and "id" in tc + } + tc_by_name: Dict[str, Any] = { + tc["name"]: tc.get("args", {}) + for tc in tool_calls + if isinstance(tc, dict) and "name" in tc + } + result = [] + for block in content: + if isinstance(block, dict) and block.get("type") == "tool_use": + block_input = block.get("input") or {} + if not block_input: + block_input = ( + tc_by_id.get(block.get("id", "")) + or tc_by_name.get(block.get("name", "")) + or {} + ) + normalized = { + k: v for k, v in block.items() if k not in ("index", "partial_json") + } + normalized["input"] = block_input + result.append(normalized) + else: + result.append(block) + return result + + +def _convert_tool_call(tc: Any, include_error: bool = False) -> Optional[Dict[str, Any]]: + """Convert a single tool call dict to Langfuse's canonical format. + + Handles both LangChain format {name, args, id} and Anthropic streaming + format {type: "tool_use", name, input, id}. + + Returns None (and logs a debug message) if tc is not a dict. + Set include_error=True for invalid_tool_calls entries. + """ + if not isinstance(tc, dict): + langfuse_logger.debug("Skipping tool_call entry that is not a dict: %s", tc) + return None + # Anthropic streaming uses "input" instead of "args" + args = tc.get("args") or tc.get("input") or {} + try: + arguments = json.dumps(args) + except (TypeError, ValueError) as e: + langfuse_logger.debug("Failed to serialize tool call args to JSON: %s", e) + arguments = "{}" + result: Dict[str, Any] = { + "id": tc.get("id", ""), + "type": "function", + "name": tc.get("name", ""), + "arguments": arguments, + } + if include_error: + result["error"] = tc.get("error", "") + return result + + try: import langchain @@ -841,9 +1002,27 @@ def __on_llm_action( self._child_to_parent_run_id_map[run_id] = parent_run_id try: - tools = kwargs.get("invocation_params", {}).get("tools", None) + observation_input: Any = prompts + invocation_params = kwargs.get("invocation_params", {}) + langfuse_logger.debug( + "LLM action invocation_params keys: %s", list(invocation_params.keys()) + ) + tools = invocation_params.get("tools", None) + langfuse_logger.debug( + "LLM action tools from invocation_params: %s", tools + ) if tools and isinstance(tools, list): - prompts.extend([{"role": "tool", "content": tool} for tool in tools]) + # Structure input as {messages, tools} so extractToolsFromObservation + # can find tool definitions at the top-level tools key — the canonical + # format expected by the backend's LLMToolDefinitionSchema. + normalized_tools = [n for t in tools for n in _to_langfuse_tool(t)] + langfuse_logger.debug( + "LLM action normalized tools: %s", normalized_tools + ) + observation_input = { + "messages": prompts, + "tools": normalized_tools, + } model_name = self._parse_model_and_log_errors( serialized=serialized, metadata=metadata, kwargs=kwargs @@ -868,7 +1047,7 @@ def __on_llm_action( content = { "name": self.get_langchain_run_name(serialized, **kwargs), - "input": prompts, + "input": observation_input, "metadata": self.__join_tags_and_metadata( tags, metadata, @@ -1049,21 +1228,56 @@ def _convert_message_to_dict(self, message: BaseMessage) -> Dict[str, Any]: if isinstance(message, HumanMessage): message_dict: Dict[str, Any] = {"role": "user", "content": message.content} elif isinstance(message, AIMessage): - message_dict = {"role": "assistant", "content": message.content} - - if ( + # Normalize Anthropic content blocks: fill empty tool_use input from + # message.tool_calls and strip streaming artifacts (index, partial_json). + content = message.content + langfuse_logger.debug( + "AIMessage content type=%s value=%s", type(content).__name__, content + ) + lc_tool_calls = ( + list(message.tool_calls) + if hasattr(message, "tool_calls") and message.tool_calls + else [] + ) + langfuse_logger.debug( + "AIMessage tool_calls=%s additional_kwargs=%s", + lc_tool_calls, + message.additional_kwargs, + ) + if isinstance(content, list) and lc_tool_calls: + content = _normalize_anthropic_content_blocks(content, lc_tool_calls) + langfuse_logger.debug("AIMessage normalized content=%s", content) + message_dict = {"role": "assistant", "content": content} + + # Resolve tool_calls: prefer LangChain's normalized {name, args, id} + # format; fall back to additional_kwargs["tool_calls"] which contains + # Anthropic's raw {type: "tool_use", name, input, id} format when + # streaming is used and message.tool_calls is not populated. + raw_tool_calls = message.tool_calls if ( hasattr(message, "tool_calls") and message.tool_calls is not None and len(message.tool_calls) > 0 - ): - message_dict["tool_calls"] = message.tool_calls + ) else message.additional_kwargs.get("tool_calls") or [] + + if raw_tool_calls: + converted_tool_calls = [ + r for tc in raw_tool_calls if (r := _convert_tool_call(tc)) is not None + ] + if converted_tool_calls: + message_dict["tool_calls"] = converted_tool_calls if ( - hasattr(message, "invalid_tool_calls") - and message.invalid_tool_calls is not None + hasattr(message, "invalid_tool_calls") + and message.invalid_tool_calls is not None and len(message.invalid_tool_calls) > 0 ): - message_dict["invalid_tool_calls"] = message.invalid_tool_calls + converted_invalid_tool_calls = [ + r + for tc in message.invalid_tool_calls + if (r := _convert_tool_call(tc, include_error=True)) is not None + ] + if converted_invalid_tool_calls: + message_dict["invalid_tool_calls"] = converted_invalid_tool_calls elif isinstance(message, SystemMessage): message_dict = {"role": "system", "content": message.content} diff --git a/tests/test_langchain.py b/tests/test_langchain.py index 575cbf150..6d6be8c79 100644 --- a/tests/test_langchain.py +++ b/tests/test_langchain.py @@ -18,9 +18,224 @@ from langfuse._client.client import Langfuse from langfuse.langchain import CallbackHandler +from langfuse.langchain.CallbackHandler import ( + _convert_tool_call, + _normalize_anthropic_content_blocks, + _to_langfuse_tool, +) from tests.utils import create_uuid, encode_file_to_base64, get_api +# --- Unit tests for _to_langfuse_tool --- + + +def test_to_langfuse_tool_openai_format(): + tool = { + "type": "function", + "function": { + "name": "get_weather", + "description": "Get the weather", + "parameters": {"type": "object", "properties": {}}, + }, + } + # OpenAI format is already canonical — returned as single-item list + assert _to_langfuse_tool(tool) == [tool] + + +def test_to_langfuse_tool_anthropic_format(): + tool = { + "name": "get_weather", + "description": "Get the weather", + "input_schema": {"type": "object", "properties": {}}, + } + assert _to_langfuse_tool(tool) == [ + { + "type": "function", + "function": { + "name": "get_weather", + "description": "Get the weather", + "parameters": {"type": "object", "properties": {}}, + }, + } + ] + + +def test_to_langfuse_tool_google_format_single(): + # Google / Vertex AI bundles tools in a function_declarations object + tool = { + "function_declarations": [ + { + "name": "get_weather", + "description": "Get the weather", + "parameters": {"type": "object", "properties": {"city": {"type": "string"}}}, + } + ] + } + assert _to_langfuse_tool(tool) == [ + { + "type": "function", + "function": { + "name": "get_weather", + "description": "Get the weather", + "parameters": {"type": "object", "properties": {"city": {"type": "string"}}}, + }, + } + ] + + +def test_to_langfuse_tool_google_format_multiple(): + # Multiple tools in one function_declarations object expand to multiple items + tool = { + "function_declarations": [ + {"name": "get_weather", "description": "Get weather", "parameters": {}}, + {"name": "get_time", "description": "Get time", "parameters": {}}, + ] + } + result = _to_langfuse_tool(tool) + assert len(result) == 2 + assert result[0]["function"]["name"] == "get_weather" + assert result[1]["function"]["name"] == "get_time" + + +def test_to_langfuse_tool_base_tool_object(): + # BaseTool / StructuredTool objects passed without dict conversion (langfuse#11850) + class MockSchema: + def model_json_schema(self): + return { + "title": "GetWeather", + "type": "object", + "properties": {"city": {"type": "string"}}, + "required": ["city"], + } + + class MockTool: + name = "get_weather" + description = "Get the weather for a city" + args_schema = MockSchema() + + assert _to_langfuse_tool(MockTool()) == [ + { + "type": "function", + "function": { + "name": "get_weather", + "description": "Get the weather for a city", + "parameters": { + "type": "object", + "properties": {"city": {"type": "string"}}, + "required": ["city"], + }, + }, + } + ] + + +def test_to_langfuse_tool_passthrough_unknown_dict(): + tool = {"name": "my_tool", "custom_field": "value"} + assert _to_langfuse_tool(tool) == [tool] + + +def test_to_langfuse_tool_passthrough_non_dict(): + assert _to_langfuse_tool("not a dict") == ["not a dict"] + + +# --- Unit tests for _convert_tool_call --- + + +def test_convert_tool_call_langchain_format(): + tc = {"name": "get_weather", "args": {"city": "Berlin"}, "id": "call_1"} + result = _convert_tool_call(tc) + assert result == { + "id": "call_1", + "type": "function", + "name": "get_weather", + "arguments": '{"city": "Berlin"}', + } + + +def test_convert_tool_call_anthropic_streaming_format(): + # Anthropic streaming uses "input" instead of "args" + tc = {"type": "tool_use", "name": "get_weather", "input": {"city": "Berlin"}, "id": "toolu_1"} + result = _convert_tool_call(tc) + assert result == { + "id": "toolu_1", + "type": "function", + "name": "get_weather", + "arguments": '{"city": "Berlin"}', + } + + +def test_convert_tool_call_include_error(): + tc = {"name": "bad_tool", "args": {}, "id": "call_2", "error": "invalid input"} + result = _convert_tool_call(tc, include_error=True) + assert result == { + "id": "call_2", + "type": "function", + "name": "bad_tool", + "arguments": "{}", + "error": "invalid input", + } + + +def test_convert_tool_call_non_dict_returns_none(): + assert _convert_tool_call("not a dict") is None + + +# --- Unit tests for _normalize_anthropic_content_blocks --- + + +def test_normalize_anthropic_content_blocks_fills_empty_input(): + """Streaming leaves input:{} and partial_json artifacts — should be filled from tool_calls.""" + content = [ + { + "type": "tool_use", + "id": "toolu_abc", + "name": "get_weather", + "input": {}, + "index": 0, + "partial_json": ['{"city": "Berlin"}'], + } + ] + tool_calls = [{"id": "toolu_abc", "name": "get_weather", "args": {"city": "Berlin"}}] + result = _normalize_anthropic_content_blocks(content, tool_calls) + assert result == [ + {"type": "tool_use", "id": "toolu_abc", "name": "get_weather", "input": {"city": "Berlin"}} + ] + + +def test_normalize_anthropic_content_blocks_preserves_non_empty_input(): + """If input is already populated, keep it and still strip streaming keys.""" + content = [ + { + "type": "tool_use", + "id": "toolu_abc", + "name": "get_weather", + "input": {"city": "Berlin"}, + "index": 0, + } + ] + tool_calls = [{"id": "toolu_abc", "name": "get_weather", "args": {"city": "Paris"}}] + result = _normalize_anthropic_content_blocks(content, tool_calls) + assert result == [ + {"type": "tool_use", "id": "toolu_abc", "name": "get_weather", "input": {"city": "Berlin"}} + ] + + +def test_normalize_anthropic_content_blocks_ignores_non_tool_use(): + """Text content blocks should pass through unchanged.""" + content = [{"type": "text", "text": "hello", "index": 0}] + result = _normalize_anthropic_content_blocks(content, []) + assert result == content + + +def test_normalize_anthropic_content_blocks_no_tool_calls_passthrough(): + """Without tool_calls to match against, return content unchanged.""" + content = [{"type": "tool_use", "id": "x", "name": "f", "input": {}}] + assert _normalize_anthropic_content_blocks(content, []) is content + + +# --- End unit tests --- + + def test_callback_generated_from_trace_chat(): langfuse = Langfuse() @@ -762,15 +977,19 @@ class GetWeather(BaseModel): for generation in generations: assert generation.input is not None - tool_messages = [msg for msg in generation.input if msg["role"] == "tool"] - assert len(tool_messages) == 2 - assert any( - "standardize_address" == msg["content"]["function"]["name"] - for msg in tool_messages - ) - assert any( - "get_weather" == msg["content"]["function"]["name"] for msg in tool_messages - ) + # Input is structured as {messages, tools} for extractToolsFromObservation + assert "messages" in generation.input + assert "tools" in generation.input + # Each tool must conform to OpenAI format (what extractTools parses) + for t in generation.input["tools"]: + assert t.get("type") == "function" + assert "function" in t + assert "name" in t["function"] + assert "description" in t["function"] + assert "parameters" in t["function"] + tool_names = [t["function"]["name"] for t in generation.input["tools"]] + assert "standardize_address" in tool_names + assert "get_weather" in tool_names assert generation.output is not None