diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/LICENSE b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/LICENSE new file mode 100644 index 00000000..261eeb9e --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/README.rst b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/README.rst new file mode 100644 index 00000000..056ca427 --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/README.rst @@ -0,0 +1,67 @@ +OpenTelemetry LiteLLM Instrumentation +====================================== + +|pypi| + +.. |pypi| image:: https://badge.fury.io/py/opentelemetry-instrumentation-litellm.svg + :target: https://pypi.org/project/opentelemetry-instrumentation-litellm/ + +This library provides automatic instrumentation for the +`LiteLLM `_ library, which provides +a unified interface to 100+ LLM providers. + +Installation +------------ + +:: + + pip install opentelemetry-instrumentation-litellm + +Configuration +------------- + +The instrumentation can be enabled/disabled using environment variables: + +* ``ARMS_LITELLM_INSTRUMENTATION_ENABLED``: Enable/disable instrumentation (default: true) + +Usage +----- + +.. code:: python + + from opentelemetry.instrumentation.litellm import LiteLLMInstrumentor + import litellm + + # Instrument LiteLLM + LiteLLMInstrumentor().instrument() + + # Use LiteLLM as normal + response = litellm.completion( + model="gpt-3.5-turbo", + messages=[{"role": "user", "content": "Hello!"}] + ) + +Features +-------- + +This instrumentation automatically captures: + +* LLM completion calls (sync and async) +* Streaming completions +* Embedding calls +* Image generation calls +* Retry mechanisms +* Tool/function calls +* Request and response metadata +* Token usage +* Model information + +The instrumentation follows OpenTelemetry semantic conventions for GenAI operations. + +References +---------- + +* `OpenTelemetry LiteLLM Instrumentation `_ +* `OpenTelemetry Project `_ +* `LiteLLM Documentation `_ + diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/example.py b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/example.py new file mode 100644 index 00000000..a25fd07d --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/example.py @@ -0,0 +1,154 @@ +#!/usr/bin/env python3 +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Example usage of LiteLLM instrumentation. + +This example demonstrates how to use the OpenTelemetry instrumentation +for LiteLLM to automatically trace and meter LLM calls. +""" + +import os +from opentelemetry import trace, metrics +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import ConsoleSpanExporter, BatchSpanProcessor +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import ConsoleMetricExporter, PeriodicExportingMetricReader + +# Set up environment variables +os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY", "test-openai-key-placeholder") +os.environ["DASHSCOPE_API_KEY"] = os.getenv("DASHSCOPE_API_KEY", "test-dashscope-key-placeholder") +os.environ["OPENAI_API_BASE"] = "https://dashscope.aliyuncs.com/compatible-mode/v1" + +# Set up OpenTelemetry +tracer_provider = TracerProvider() +tracer_provider.add_span_processor(BatchSpanProcessor(ConsoleSpanExporter())) +trace.set_tracer_provider(tracer_provider) + +metric_reader = PeriodicExportingMetricReader(ConsoleMetricExporter(), export_interval_millis=5000) +meter_provider = MeterProvider(metric_readers=[metric_reader]) +metrics.set_meter_provider(meter_provider) + +# Import and instrument LiteLLM +from opentelemetry.instrumentation.litellm import LiteLLMInstrumentor +import litellm + +print("Instrumenting LiteLLM...") +LiteLLMInstrumentor().instrument() + +def example_basic_completion(): + """Example: Basic completion call""" + print("\n=== Example 1: Basic Completion ===") + response = litellm.completion( + model="dashscope/qwen-turbo", + messages=[ + {"role": "user", "content": "What is the capital of France? Answer in one word."} + ], + temperature=0.7 + ) + print(f"Response: {response.choices[0].message.content}") + + +def example_streaming_completion(): + """Example: Streaming completion""" + print("\n=== Example 2: Streaming Completion ===") + response = litellm.completion( + model="dashscope/qwen-turbo", + messages=[ + {"role": "user", "content": "Count from 1 to 5."} + ], + stream=True + ) + + print("Streaming response:") + for chunk in response: + if hasattr(chunk.choices[0].delta, 'content') and chunk.choices[0].delta.content: + print(chunk.choices[0].delta.content, end='', flush=True) + print() + + +def example_embedding(): + """Example: Embedding call""" + print("\n=== Example 3: Embedding ===") + response = litellm.embedding( + model="openai/text-embedding-v1", + api_base="https://dashscope.aliyuncs.com/compatible-mode/v1", + input="Hello, this is a test." + ) + print(f"Embedding dimension: {len(response.data[0].embedding)}") + + +def example_with_tools(): + """Example: Tool calling""" + print("\n=== Example 4: Tool Calling ===") + + tools = [ + { + "type": "function", + "function": { + "name": "get_weather", + "description": "Get the weather in a location", + "parameters": { + "type": "object", + "properties": { + "location": { + "type": "string", + "description": "The city and state, e.g. San Francisco, CA" + } + }, + "required": ["location"] + } + } + } + ] + + response = litellm.completion( + model="dashscope/qwen-plus", + messages=[ + {"role": "user", "content": "What's the weather in Beijing?"} + ], + tools=tools + ) + + print(f"Response: {response.choices[0].message.content or 'Tool call requested'}") + + +def main(): + """Run all examples""" + try: + example_basic_completion() + example_streaming_completion() + example_embedding() + example_with_tools() + + print("\n=== All examples completed ===") + print("Check the console output for traces and metrics!") + + except Exception as e: + print(f"Error running examples: {e}") + import traceback + traceback.print_exc() + finally: + # Clean up + print("\nUninstrumenting LiteLLM...") + LiteLLMInstrumentor().uninstrument() + + # Force metric export + meter_provider.force_flush() + + +if __name__ == "__main__": + main() + diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/pyproject.toml b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/pyproject.toml new file mode 100644 index 00000000..ce4073b0 --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/pyproject.toml @@ -0,0 +1,55 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "opentelemetry-instrumentation-litellm" +dynamic = ["version"] +description = "OpenTelemetry LiteLLM instrumentation" +readme = "README.rst" +license = "Apache-2.0" +requires-python = ">=3.8" +authors = [ + { name = "OpenTelemetry Authors", email = "cncf-opentelemetry-contributors@lists.cncf.io" }, +] +classifiers = [ + "Development Status :: 4 - Beta", + "Intended Audience :: Developers", + "License :: OSI Approved :: Apache Software License", + "Programming Language :: Python", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", +] +dependencies = [ + "opentelemetry-api <= 1.35.0", +] + +[project.optional-dependencies] +instruments = [ + "litellm >= 1.0.0", +] + +[project.entry-points.opentelemetry_instrumentor] +litellm = "opentelemetry.instrumentation.litellm:LiteLLMInstrumentor" + +[project.urls] +Homepage = "https://github.com/open-telemetry/opentelemetry-python-contrib/tree/main/instrumentation/opentelemetry-instrumentation-litellm" +Repository = "https://github.com/open-telemetry/opentelemetry-python-contrib" + +[tool.hatch.version] +path = "src/opentelemetry/instrumentation/litellm/version.py" + +[tool.hatch.build.targets.sdist] +include = [ + "/src", + "/tests", +] + +[tool.hatch.build.targets.wheel] +packages = ["src/opentelemetry"] + diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/__init__.py b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/__init__.py new file mode 100644 index 00000000..39f57893 --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/__init__.py @@ -0,0 +1,212 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +OpenTelemetry LiteLLM Instrumentation +====================================== + +This library provides automatic instrumentation for the LiteLLM library. + +Installation +------------ + +:: + + pip install opentelemetry-instrumentation-litellm + +Configuration +------------- + +The instrumentation can be configured using environment variables: + +* ``ENABLE_LITELLM_INSTRUMENTOR``: Enable/disable instrumentation (default: true) +* ``ARMS_LITELLM_INSTRUMENTATION_ENABLED``: Alternative enable/disable flag (default: true) + +Usage +----- + +.. code:: python + + from opentelemetry.instrumentation.litellm import LiteLLMInstrumentor + import litellm + + # Instrument LiteLLM + LiteLLMInstrumentor().instrument() + + # Use LiteLLM as normal + response = litellm.completion( + model="gpt-3.5-turbo", + messages=[{"role": "user", "content": "Hello!"}] + ) + +API +--- +""" + +import logging +from typing import Collection, Any, Dict, Callable + +from opentelemetry.instrumentation.instrumentor import BaseInstrumentor +from opentelemetry.instrumentation.litellm.package import _instruments +from opentelemetry.instrumentation.litellm.version import __version__ +from opentelemetry import trace, metrics + +logger = logging.getLogger(__name__) + +__all__ = ["LiteLLMInstrumentor"] + + +class LiteLLMInstrumentor(BaseInstrumentor): + """ + An instrumentor for the LiteLLM library. + + This class provides automatic instrumentation for LiteLLM, including: + - Chat completion calls (sync and async) + - Streaming completions + - Embedding calls + - Retry mechanisms + - Tool/function calls + """ + + def __init__(self): + super().__init__() + self._original_functions: Dict[str, Callable] = {} + self._tracer = None + self._meter = None + + def instrumentation_dependencies(self) -> Collection[str]: + return _instruments + + def _instrument(self, **kwargs: Any): + """ + Instrument the LiteLLM library. + + This method sets up instrumentation for all LiteLLM functions, + including completion, embedding, and retry functions. + """ + super()._instrument(**kwargs) + + try: + import litellm + except ImportError: + logger.warning("LiteLLM not found, skipping instrumentation") + return + + # Get tracer and meter providers + tracer_provider = kwargs.get("tracer_provider") or trace.get_tracer_provider() + meter_provider = kwargs.get("meter_provider") or metrics.get_meter_provider() + + # Create tracer and meter + self._tracer = tracer_provider.get_tracer(__name__, __version__) + self._meter = meter_provider.get_meter(__name__, __version__) + + # Import wrappers + from opentelemetry.instrumentation.litellm._wrapper import ( + CompletionWrapper, + AsyncCompletionWrapper, + ) + from opentelemetry.instrumentation.litellm._embedding_wrapper import ( + EmbeddingWrapper, + AsyncEmbeddingWrapper, + ) + + # Save original functions + functions_to_wrap = [ + "completion", + "acompletion", + "embedding", + "aembedding", + "completion_with_retries", + "acompletion_with_retries", + ] + + for func_name in functions_to_wrap: + if hasattr(litellm, func_name): + self._original_functions[func_name] = getattr(litellm, func_name) + + # Wrap functions + if "completion" in self._original_functions: + completion_wrapper = CompletionWrapper( + self._tracer, + self._meter, + self._original_functions["completion"] + ) + litellm.completion = completion_wrapper + + if "acompletion" in self._original_functions: + async_completion_wrapper = AsyncCompletionWrapper( + self._tracer, + self._meter, + self._original_functions["acompletion"] + ) + litellm.acompletion = async_completion_wrapper + + if "embedding" in self._original_functions: + litellm.embedding = EmbeddingWrapper( + self._tracer, + self._meter, + self._original_functions["embedding"] + ) + + if "aembedding" in self._original_functions: + litellm.aembedding = AsyncEmbeddingWrapper( + self._tracer, + self._meter, + self._original_functions["aembedding"] + ) + + # Wrap retry functions to use our wrapped completion functions + # Note: LiteLLM's retry functions internally reference the completion function at definition time, + # so we need to recreate them to use our wrapped versions + if "completion_with_retries" in self._original_functions: + # Create a new retry wrapper that calls our wrapped completion + def completion_with_retries_wrapper(*args, **kwargs): + # Use the wrapped completion function + return litellm.completion(*args, **kwargs) + litellm.completion_with_retries = completion_with_retries_wrapper + + if "acompletion_with_retries" in self._original_functions: + # Create a new async retry wrapper that calls our wrapped acompletion + async def acompletion_with_retries_wrapper(*args, **kwargs): + # Use the wrapped acompletion function + return await litellm.acompletion(*args, **kwargs) + litellm.acompletion_with_retries = acompletion_with_retries_wrapper + + logger.info("LiteLLM instrumentation enabled") + + def _uninstrument(self, **kwargs: Any): + """ + Uninstrument the LiteLLM library. + + This method removes all instrumentation and restores + original LiteLLM functions. + """ + try: + import litellm + except ImportError: + logger.warning("LiteLLM not found, skipping uninstrumentation") + return + + # Restore original functions + for func_name, original_func in self._original_functions.items(): + if hasattr(litellm, func_name): + setattr(litellm, func_name, original_func) + + # Clear saved functions + self._original_functions.clear() + self._tracer = None + self._meter = None + + logger.info("LiteLLM instrumentation disabled") + diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_embedding_wrapper.py b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_embedding_wrapper.py new file mode 100644 index 00000000..04259985 --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_embedding_wrapper.py @@ -0,0 +1,212 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Embedding wrapper for LiteLLM instrumentation. +""" + +import os +import time +import logging +from typing import Any, Callable +from opentelemetry import trace, context +from opentelemetry.trace import Status, StatusCode +from opentelemetry.context import _SUPPRESS_INSTRUMENTATION_KEY +from opentelemetry.metrics import Meter +from aliyun.sdk.extension.arms.self_monitor.self_monitor_decorator import hook_advice, async_hook_advice +from aliyun.sdk.extension.arms.semconv import _SUPPRESS_LLM_SDK_KEY + +# Import ARMS extension +from opentelemetry.instrumentation.litellm.arms_litellm_extension import ArmsLiteLLMExtension +from opentelemetry.instrumentation.litellm._utils import parse_provider_from_model + +# Import semantic conventions +from aliyun.semconv.trace_v2 import GenAiSpanKind + +logger = logging.getLogger(__name__) + + +def _is_instrumentation_enabled() -> bool: + """Check if instrumentation is enabled via environment variable.""" + enabled = os.getenv("ARMS_LITELLM_INSTRUMENTATION_ENABLED", "true").lower() + return enabled != "false" + + +class EmbeddingWrapper: + """Wrapper for litellm.embedding()""" + + def __init__(self, tracer: trace.Tracer, meter: Meter, original_func: Callable): + self.tracer = tracer + self.meter = meter + self.original_func = original_func + self.arms_extension = ArmsLiteLLMExtension(meter) + + @hook_advice(instrumentation_name="litellm", advice_method="embedding", throw_exception=True) + def __call__(self, *args, **kwargs): + """Wrap litellm.embedding()""" + # Check if instrumentation is enabled + if not _is_instrumentation_enabled(): + return self.original_func(*args, **kwargs) + + # Check suppression context + if context.get_value(_SUPPRESS_INSTRUMENTATION_KEY): + return self.original_func(*args, **kwargs) + + # Check if LLM SDK is suppressed (e.g., called from higher-level framework like langchain) + if context.get_value(_SUPPRESS_LLM_SDK_KEY): + return self.original_func(*args, **kwargs) + + # Extract request parameters + model = kwargs.get("model", "unknown") + + # Parse provider from model + provider = parse_provider_from_model(model) or "unknown" + + # Create span + span_name = f"embedding {model}" + start_time = time.time() + start_time_ns = time.time_ns() + + with self.tracer.start_as_current_span(span_name) as span: + # Set _SUPPRESS_LLM_SDK_KEY to prevent nested SDK instrumentation (e.g., OpenAI) + suppress_token = None + try: + suppress_token = context.attach( + context.set_value(_SUPPRESS_LLM_SDK_KEY, True) + ) + except Exception: + # If context setting fails, continue without suppression token + pass + + try: + # Set request attributes using ARMS extension + self.arms_extension.set_embedding_request_attributes(span, model, provider) + + # Call original function + response = self.original_func(*args, **kwargs) + + # Set response attributes + self.arms_extension.set_embedding_response_attributes(span, response) + + # Record metrics + duration = time.time() - start_time + + self.arms_extension.record_llm_metrics(span, duration, start_time_ns) + + return response + + except Exception as e: + # Record error + span.record_exception(e) + span.set_status(Status(StatusCode.ERROR, str(e))) + + # Record metrics even for failed calls + duration = time.time() - start_time + self.arms_extension.record_llm_metrics(span, duration, start_time_ns) + + # Record error metrics + self.arms_extension.record_error_metrics(GenAiSpanKind.EMBEDDING.value) + + raise + finally: + # Detach suppress context + if suppress_token: + try: + context.detach(suppress_token) + except Exception: + pass + + +class AsyncEmbeddingWrapper: + """Wrapper for litellm.aembedding()""" + + def __init__(self, tracer: trace.Tracer, meter: Meter, original_func: Callable): + self.tracer = tracer + self.meter = meter + self.original_func = original_func + self.arms_extension = ArmsLiteLLMExtension(meter) + + @async_hook_advice(instrumentation_name="litellm", advice_method="aembedding", throw_exception=True) + async def __call__(self, *args, **kwargs): + """Wrap litellm.aembedding()""" + # Check if instrumentation is enabled + if not _is_instrumentation_enabled(): + return await self.original_func(*args, **kwargs) + + # Check suppression context + if context.get_value(_SUPPRESS_INSTRUMENTATION_KEY): + return await self.original_func(*args, **kwargs) + + # Check if LLM SDK is suppressed (e.g., called from higher-level framework like langchain) + if context.get_value(_SUPPRESS_LLM_SDK_KEY): + return await self.original_func(*args, **kwargs) + + # Extract request parameters + model = kwargs.get("model", "unknown") + + # Parse provider from model + provider = parse_provider_from_model(model) or "unknown" + + # Create span + span_name = f"embedding {model}" + start_time = time.time() + start_time_ns = time.time_ns() + + with self.tracer.start_as_current_span(span_name) as span: + # Set _SUPPRESS_LLM_SDK_KEY to prevent nested SDK instrumentation (e.g., OpenAI) + suppress_token = None + try: + suppress_token = context.attach( + context.set_value(_SUPPRESS_LLM_SDK_KEY, True) + ) + except Exception: + # If context setting fails, continue without suppression token + pass + + try: + # Set request attributes using ARMS extension + self.arms_extension.set_embedding_request_attributes(span, model, provider) + + # Call original function + response = await self.original_func(*args, **kwargs) + + # Set response attributes + self.arms_extension.set_embedding_response_attributes(span, response) + + # Record metrics + duration = time.time() - start_time + self.arms_extension.record_llm_metrics(span, duration, start_time_ns) + + return response + + except Exception as e: + # Record error + span.record_exception(e) + span.set_status(Status(StatusCode.ERROR, str(e))) + + # Record metrics even for failed calls + duration = time.time() - start_time + self.arms_extension.record_llm_metrics(span, duration, start_time_ns) + + # Record error metrics + self.arms_extension.record_error_metrics(GenAiSpanKind.EMBEDDING.value) + + raise + finally: + # Detach suppress context + if suppress_token: + try: + context.detach(suppress_token) + except Exception: + pass diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_stream_wrapper.py b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_stream_wrapper.py new file mode 100644 index 00000000..09b56c39 --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_stream_wrapper.py @@ -0,0 +1,257 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Stream wrapper for LiteLLM streaming responses. +""" + +import time +import logging +import gc +from typing import Any, Iterator, Optional +from aliyun.semconv.trace_v2 import LLMAttributes +logger = logging.getLogger(__name__) + + +class StreamWrapper: + """ + Wrapper for synchronous streaming responses. + + Note: To avoid memory leaks, we only keep the last chunk instead of all chunks. + This is sufficient for extracting usage information which is typically in the last chunk. + + Supports context manager protocol for reliable cleanup. + """ + + def __init__(self, stream: Iterator, span: Any, callback: callable): + self.stream = stream + self.span = span + self.callback = callback + self.first_chunk_time = None + self.start_time = time.time_ns() + self.last_chunk = None # Only keep last chunk to avoid memory leak + self.chunk_count = 0 + self._finalized = False + self.accumulated_content = [] # Accumulate content for output messages + self.accumulated_tool_calls = [] # Accumulate tool calls + + def __iter__(self): + return self + + def __next__(self): + try: + chunk = next(self.stream) + + # Record first chunk time for TTFT + if self.first_chunk_time is None: + self.first_chunk_time = time.time_ns() + + # Accumulate content from delta for output messages + if hasattr(chunk, 'choices') and chunk.choices: + choice = chunk.choices[0] + if hasattr(choice, 'delta'): + delta = choice.delta + # Accumulate text content + if hasattr(delta, 'content') and delta.content: + self.accumulated_content.append(delta.content) + # Accumulate tool calls + if hasattr(delta, 'tool_calls') and delta.tool_calls: + self.accumulated_tool_calls.extend(delta.tool_calls) + + # Only keep the last chunk (contains usage info) + self.last_chunk = chunk + self.chunk_count += 1 + + return chunk + except StopIteration: + # Stream ended normally, finalize span + self._finalize() + raise + except Exception as e: + # Error during streaming + logger.debug(f"Error during streaming: {e}") + self._finalize(error=e) + raise + + def __enter__(self): + """Support context manager protocol.""" + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Ensure finalization on context exit.""" + if exc_type is not None: + # Exception occurred during iteration + self._finalize(error=exc_val) + else: + # Normal exit (may have completed or early terminated) + self._finalize() + return False + + def close(self): + """Explicitly close and finalize the stream.""" + self._finalize() + + def _finalize(self, error: Optional[Exception] = None): + """Finalize the span with data from last chunk.""" + if self._finalized: + return + + self._finalized = True + try: + # Calculate TTFT if we got at least one chunk + if self.first_chunk_time: + ttft = self.first_chunk_time - self.start_time + # Use semantic convention constant + from aliyun.semconv.trace_v2 import LLMAttributes + self.span.set_attribute(LLMAttributes.GEN_AI_RESPONSE_TIME_TO_FIRST_TOKEN, ttft) + + # Call the callback with only the last chunk + if self.callback: + self.callback(self.span, self.last_chunk, error) + + # End the span now that streaming is complete + if self.span: + self.span.end() + + # Clear reference to avoid holding memory + self.last_chunk = None + except Exception as e: + logger.debug(f"Error finalizing stream: {e}") + + +class AsyncStreamWrapper: + """ + Wrapper for asynchronous streaming responses. + + Note: To avoid memory leaks, we only keep the last chunk instead of all chunks. + This is sufficient for extracting usage information which is typically in the last chunk. + + Important: AsyncStreamWrapper must be consumed within an async context that ensures + finalization, either by: + 1. Using as an async context manager: async with response: ... + 2. Explicitly calling close() after iteration + 3. Letting the wrapper detect stream exhaustion + """ + + def __init__(self, stream, span: Any, callback: callable): + self.stream = stream + self.span = span + self.callback = callback + self.first_chunk_time = None + self.start_time = time.time_ns() + self.last_chunk = None # Only keep last chunk to avoid memory leak + self.chunk_count = 0 + self._finalized = False + self._stream_exhausted = False + self.accumulated_content = [] # Accumulate content for output messages + self.accumulated_tool_calls = [] # Accumulate tool calls + + def __aiter__(self): + # Return an async generator that wraps the stream and ensures finalization + return self._wrapped_iteration() + + async def _wrapped_iteration(self): + """ + Async generator that wraps the underlying stream and ensures finalization. + This approach guarantees that _finalize() is called when: + 1. The stream is exhausted normally + 2. An exception occurs + 3. The generator is closed early (via aclose()) + """ + try: + async for chunk in self.stream: + # Record first chunk time for TTFT + if self.first_chunk_time is None: + self.first_chunk_time = time.time_ns() + + # Accumulate content from delta for output messages + if hasattr(chunk, 'choices') and chunk.choices: + choice = chunk.choices[0] + if hasattr(choice, 'delta'): + delta = choice.delta + # Accumulate text content + if hasattr(delta, 'content') and delta.content: + self.accumulated_content.append(delta.content) + # Accumulate tool calls + if hasattr(delta, 'tool_calls') and delta.tool_calls: + self.accumulated_tool_calls.extend(delta.tool_calls) + + # Only keep the last chunk (contains usage info) + self.last_chunk = chunk + self.chunk_count += 1 + + yield chunk + + # Stream exhausted normally + logger.debug(f"AsyncStreamWrapper: Stream completed (chunks: {self.chunk_count})") + except Exception as e: + # Error during streaming + logger.debug(f"AsyncStreamWrapper: Error during streaming: {e}") + self._finalize(error=e) + raise + finally: + # Always finalize, whether completed normally, with error, or closed early + self._finalize() + + async def __aenter__(self): + """Support async context manager protocol.""" + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + """Ensure finalization on async context exit.""" + if exc_type is not None: + # Exception occurred during iteration + self._finalize(error=exc_val) + else: + # Normal exit (may have completed or early terminated) + self._finalize() + return False + + async def aclose(self): + """Explicitly close and finalize the async stream.""" + self._finalize() + + def close(self): + """Synchronous close method for compatibility.""" + self._finalize() + + def _finalize(self, error: Optional[Exception] = None): + """Finalize the span with data from last chunk.""" + if self._finalized: + return + + self._finalized = True + try: + # Calculate TTFT if we got at least one chunk + if self.first_chunk_time: + ttft = self.first_chunk_time - self.start_time + # Use semantic convention constant + self.span.set_attribute(LLMAttributes.GEN_AI_RESPONSE_TIME_TO_FIRST_TOKEN, ttft) + + # Call the callback with only the last chunk + if self.callback: + try: + self.callback(self.span, self.last_chunk, error) + except Exception as callback_error: + logger.debug(f"Error in stream callback: {callback_error}") + + # End the span now that streaming is complete + if self.span: + self.span.end() + + # Clear reference to avoid holding memory + self.last_chunk = None + except Exception as e: + logger.debug(f"Error finalizing async stream: {e}") + diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_utils.py b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_utils.py new file mode 100644 index 00000000..308f06cb --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_utils.py @@ -0,0 +1,175 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Utility functions for LiteLLM instrumentation. +""" + +import json +import logging +from typing import Any, Dict, List, Optional + +logger = logging.getLogger(__name__) + + +def convert_messages_to_structured_format(messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """ + Convert LiteLLM message format to structured format required by semantic conventions. + + Converts from: + {"role": "user", "content": "..."} + To: + {"role": "user", "parts": [{"type": "text", "content": "..."}]} + """ + if not isinstance(messages, list): + return [] + + structured_messages = [] + for msg in messages: + if not isinstance(msg, dict): + continue + + role = msg.get("role", "") + structured_msg = {"role": role, "parts": []} + + # Handle text content + if "content" in msg and msg["content"]: + content = msg["content"] + if isinstance(content, str): + structured_msg["parts"].append({ + "type": "text", + "content": content + }) + elif isinstance(content, list): + # Handle multi-modal content + for item in content: + if isinstance(item, dict): + if item.get("type") == "text": + structured_msg["parts"].append({ + "type": "text", + "content": item.get("text", "") + }) + else: + structured_msg["parts"].append(item) + + # Handle tool calls + if "tool_calls" in msg and msg["tool_calls"]: + for tool_call in msg["tool_calls"]: + if not isinstance(tool_call, dict): + continue + + tool_part = {"type": "tool_call"} + if "id" in tool_call: + tool_part["id"] = tool_call["id"] + if "function" in tool_call: + func = tool_call["function"] + if isinstance(func, dict): + if "name" in func: + tool_part["name"] = func["name"] + if "arguments" in func: + try: + # Try to parse arguments if it's a JSON string + args_str = func["arguments"] + if isinstance(args_str, str): + tool_part["arguments"] = json.loads(args_str) + else: + tool_part["arguments"] = args_str + except: + tool_part["arguments"] = func.get("arguments", "") + + structured_msg["parts"].append(tool_part) + + # Handle tool call responses + if role == "tool" and "content" in msg: + tool_response_part = { + "type": "tool_call_response", + "response": msg["content"] + } + if "tool_call_id" in msg: + tool_response_part["id"] = msg["tool_call_id"] + structured_msg["parts"].append(tool_response_part) + + structured_messages.append(structured_msg) + + return structured_messages + + +def parse_provider_from_model(model: str) -> Optional[str]: + """ + Parse provider name from model string. + + LiteLLM uses format like "openai/gpt-4", "dashscope/qwen-turbo", etc. + """ + if not model: + return None + + if "/" in model: + return model.split("/")[0] + + # Fallback: try to infer from model name patterns + if "gpt" in model.lower(): + return "openai" + elif "qwen" in model.lower(): + return "dashscope" + elif "claude" in model.lower(): + return "anthropic" + elif "gemini" in model.lower(): + return "google" + + return "unknown" + + +def parse_model_name(model: str) -> str: + """ + Parse model name by removing provider prefix. + + Examples: + "openai/gpt-4" -> "gpt-4" + "dashscope/qwen-turbo" -> "qwen-turbo" + "gpt-4" -> "gpt-4" + """ + if not model: + return "unknown" + + if "/" in model: + return model.split("/", 1)[1] + + return model + + +def safe_json_dumps(obj: Any, default: str = "{}") -> str: + """ + Safely serialize object to JSON string. + """ + try: + return json.dumps(obj, ensure_ascii=False) + except Exception as e: + logger.debug(f"Failed to serialize object to JSON: {e}") + return default + + +def convert_tool_definitions(tools: List[Dict[str, Any]]) -> str: + """ + Convert tool definitions to JSON string format. + """ + if not tools: + return "[]" + + try: + # Tools are typically in format: [{"type": "function", "function": {...}}] + return json.dumps(tools, ensure_ascii=False) + except Exception as e: + logger.debug(f"Failed to convert tool definitions: {e}") + return "[]" + diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_wrapper.py b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_wrapper.py new file mode 100644 index 00000000..6cffa41a --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_wrapper.py @@ -0,0 +1,502 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Wrapper functions for LiteLLM completion instrumentation. +""" + +import os +import time +import logging +from typing import Any, Callable, Dict, Optional +from opentelemetry import trace, context +from opentelemetry.trace import Status, StatusCode +from opentelemetry.context import _SUPPRESS_INSTRUMENTATION_KEY +from opentelemetry.metrics import Meter +from aliyun.sdk.extension.arms.self_monitor.self_monitor_decorator import hook_advice, async_hook_advice +from aliyun.sdk.extension.arms.semconv import _SUPPRESS_LLM_SDK_KEY + +# Import ARMS extension +from opentelemetry.instrumentation.litellm.arms_litellm_extension import ArmsLiteLLMExtension +from opentelemetry.instrumentation.litellm._utils import ( + convert_messages_to_structured_format, + parse_provider_from_model, + safe_json_dumps, + convert_tool_definitions, +) +from opentelemetry.instrumentation.litellm._stream_wrapper import StreamWrapper, AsyncStreamWrapper + +# Import semantic conventions +from aliyun.semconv.trace_v2 import LLMAttributes, GenAiSpanKind + +logger = logging.getLogger(__name__) + +# Environment variable to control instrumentation +LITELLM_INSTRUMENTATION_ENABLED = "ARMS_LITELLM_INSTRUMENTATION_ENABLED" + + +def _is_instrumentation_enabled() -> bool: + """Check if instrumentation is enabled via environment variable.""" + enabled = os.getenv(LITELLM_INSTRUMENTATION_ENABLED, "true").lower() + return enabled != "false" + + +class CompletionWrapper: + """Wrapper for litellm.completion()""" + + def __init__(self, tracer: trace.Tracer, meter: Meter, original_func: Callable): + self.tracer = tracer + self.meter = meter + self.original_func = original_func + self.arms_extension = ArmsLiteLLMExtension(meter) + + @hook_advice(instrumentation_name="litellm", advice_method="completion", throw_exception=True) + def __call__(self, *args, **kwargs): + """Wrap litellm.completion()""" + # Check if instrumentation is enabled + if not _is_instrumentation_enabled(): + return self.original_func(*args, **kwargs) + + # Check suppression context + if context.get_value(_SUPPRESS_INSTRUMENTATION_KEY): + return self.original_func(*args, **kwargs) + + # Check if LLM SDK is suppressed (e.g., called from higher-level framework like langchain) + if context.get_value(_SUPPRESS_LLM_SDK_KEY): + return self.original_func(*args, **kwargs) + + # Extract request parameters + model = kwargs.get("model", "unknown") + messages = kwargs.get("messages", []) + is_stream = kwargs.get("stream", False) + + # For streaming, enable usage tracking if not explicitly disabled + # This ensures we get token usage information in the final chunk + if is_stream and "stream_options" not in kwargs: + kwargs["stream_options"] = {"include_usage": True} + + # Parse provider from model + provider = parse_provider_from_model(model) or "unknown" + + # Create span + span_name = f"chat {model}" + start_time = time.time() + start_time_ns = time.time_ns() + + # For streaming, we need to manually manage span lifecycle + # because attributes are set after all chunks are consumed + if is_stream: + span = self.tracer.start_span(span_name) + ctx = trace.set_span_in_context(span) + token = context.attach(ctx) + + # Set _SUPPRESS_LLM_SDK_KEY to prevent nested SDK instrumentation (e.g., OpenAI) + suppress_token = None + try: + suppress_token = context.attach( + context.set_value(_SUPPRESS_LLM_SDK_KEY, True) + ) + except Exception: + # If context setting fails, continue without suppression token + pass + + try: + # Set request attributes using ARMS extension + self.arms_extension.set_llm_request_attributes(span, model, provider, kwargs, messages) + + # Set input messages + if messages: + structured_messages = convert_messages_to_structured_format(messages) + span.set_attribute(LLMAttributes.GEN_AI_INPUT_MESSAGES, safe_json_dumps(structured_messages)) + + # Set tool definitions + tools = kwargs.get("tools") + if tools: + span.set_attribute(LLMAttributes.GEN_AI_TOOL_DEFINITIONS, convert_tool_definitions(tools)) + + # Call original function + response = self.original_func(*args, **kwargs) + + # Wrap the streaming response - span will be ended by wrapper + stream_wrapper = StreamWrapper( + stream=response, + span=span, + callback=lambda s, last_chunk, error: self._handle_stream_end( + s, last_chunk, error, start_time, start_time_ns, stream_wrapper + ) + ) + response = stream_wrapper + + return response + except Exception as e: + # Record error + span.record_exception(e) + span.set_status(Status(StatusCode.ERROR, str(e))) + + # Record metrics even for failed calls + duration = time.time() - start_time + self.arms_extension.record_llm_metrics(span, duration, start_time_ns) + + # Record error metrics + self.arms_extension.record_error_metrics(GenAiSpanKind.LLM.value) + + span.end() + + raise + finally: + # Detach suppress context first, then span context + if suppress_token: + try: + context.detach(suppress_token) + except Exception: + pass + context.detach(token) + else: + # Non-streaming: use context manager as normal + with self.tracer.start_as_current_span(span_name) as span: + # Set _SUPPRESS_LLM_SDK_KEY to prevent nested SDK instrumentation (e.g., OpenAI) + suppress_token = None + try: + suppress_token = context.attach( + context.set_value(_SUPPRESS_LLM_SDK_KEY, True) + ) + except Exception: + # If context setting fails, continue without suppression token + pass + + try: + # Set request attributes using ARMS extension + self.arms_extension.set_llm_request_attributes(span, model, provider, kwargs, messages) + + # Set input messages + if messages: + structured_messages = convert_messages_to_structured_format(messages) + span.set_attribute(LLMAttributes.GEN_AI_INPUT_MESSAGES, safe_json_dumps(structured_messages)) + + # Set tool definitions + tools = kwargs.get("tools") + if tools: + span.set_attribute(LLMAttributes.GEN_AI_TOOL_DEFINITIONS, convert_tool_definitions(tools)) + + # Call original function + response = self.original_func(*args, **kwargs) + + # Set response attributes immediately + self.arms_extension.set_llm_response_attributes(span, response) + + # Set output messages + self._set_output_messages(span, response) + + # Record metrics + duration = time.time() - start_time + self.arms_extension.record_llm_metrics(span, duration, start_time_ns) + + return response + + except Exception as e: + # Record error + span.record_exception(e) + span.set_status(Status(StatusCode.ERROR, str(e))) + + # Record metrics even for failed calls + duration = time.time() - start_time + self.arms_extension.record_llm_metrics(span, duration, start_time_ns) + + # Record error metrics + self.arms_extension.record_error_metrics(GenAiSpanKind.LLM.value) + + raise + finally: + # Detach suppress context + if suppress_token: + try: + context.detach(suppress_token) + except Exception: + pass + + def _set_output_messages(self, span: trace.Span, response: Any): + """Set output messages on span.""" + try: + if hasattr(response, "choices") and response.choices: + output_messages = [] + for choice in response.choices: + if hasattr(choice, "message"): + msg = choice.message + msg_dict = {"role": getattr(msg, "role", "assistant")} + + # Extract content + if hasattr(msg, "content") and msg.content: + msg_dict["content"] = msg.content + + # Extract tool calls + if hasattr(msg, "tool_calls") and msg.tool_calls: + msg_dict["tool_calls"] = [ + { + "id": getattr(tc, "id", ""), + "function": { + "name": getattr(tc.function, "name", ""), + "arguments": getattr(tc.function, "arguments", "") + } + } + for tc in msg.tool_calls + ] + + output_messages.append(msg_dict) + + if output_messages: + structured_output = convert_messages_to_structured_format(output_messages) + span.set_attribute(LLMAttributes.GEN_AI_OUTPUT_MESSAGES, safe_json_dumps(structured_output)) + + except Exception as e: + logger.debug(f"Error setting output messages: {e}") + + def _handle_stream_end( + self, + span: trace.Span, + last_chunk: Optional[Any], + error: Optional[Exception], + start_time: float, + start_time_ns: int, + stream_wrapper: Optional[Any] = None + ): + """Handle the end of a streaming response.""" + try: + if error: + span.record_exception(error) + span.set_status(Status(StatusCode.ERROR, str(error))) + self.arms_extension.record_error_metrics(GenAiSpanKind.LLM.value) + return + + # Use last chunk to extract final information + if last_chunk: + self.arms_extension.set_llm_response_attributes(span, last_chunk) + + # For streaming, construct output message from accumulated content + if stream_wrapper and hasattr(stream_wrapper, 'accumulated_content'): + full_content = ''.join(stream_wrapper.accumulated_content) + if full_content or stream_wrapper.accumulated_tool_calls: + # Create a synthetic message with accumulated content + output_msg = {"role": "assistant"} + if full_content: + output_msg["content"] = full_content + if stream_wrapper.accumulated_tool_calls: + output_msg["tool_calls"] = stream_wrapper.accumulated_tool_calls + + # Convert and set output messages + from opentelemetry.instrumentation.litellm._utils import ( + convert_messages_to_structured_format, + safe_json_dumps + ) + structured_output = convert_messages_to_structured_format([output_msg]) + span.set_attribute(LLMAttributes.GEN_AI_OUTPUT_MESSAGES, safe_json_dumps(structured_output)) + else: + # Fallback to non-streaming logic + self._set_output_messages(span, last_chunk) + + # Record metrics + duration = time.time() - start_time + self.arms_extension.record_llm_metrics(span, duration, start_time_ns) + + except Exception as e: + logger.debug(f"Error handling stream end: {e}") + + +class AsyncCompletionWrapper: + """Wrapper for litellm.acompletion()""" + + def __init__(self, tracer: trace.Tracer, meter: Meter, original_func: Callable): + self.tracer = tracer + self.meter = meter + self.original_func = original_func + self.arms_extension = ArmsLiteLLMExtension(meter) + + @async_hook_advice(instrumentation_name="litellm", advice_method="acompletion", throw_exception=True) + async def __call__(self, *args, **kwargs): + """Wrap litellm.acompletion()""" + # Check if instrumentation is enabled + if not _is_instrumentation_enabled(): + return await self.original_func(*args, **kwargs) + + # Check suppression context + if context.get_value(_SUPPRESS_INSTRUMENTATION_KEY): + return await self.original_func(*args, **kwargs) + + # Check if LLM SDK is suppressed (e.g., called from higher-level framework like langchain) + if context.get_value(_SUPPRESS_LLM_SDK_KEY): + return await self.original_func(*args, **kwargs) + + # Extract request parameters + model = kwargs.get("model", "unknown") + messages = kwargs.get("messages", []) + is_stream = kwargs.get("stream", False) + + # For streaming, enable usage tracking if not explicitly disabled + # This ensures we get token usage information in the final chunk + if is_stream and "stream_options" not in kwargs: + kwargs["stream_options"] = {"include_usage": True} + + # Parse provider from model + provider = parse_provider_from_model(model) or "unknown" + + # Create span + span_name = f"chat {model}" + start_time = time.time() + start_time_ns = time.time_ns() + + # For streaming, we need to manually manage span lifecycle + # because attributes are set after all chunks are consumed + if is_stream: + span = self.tracer.start_span(span_name) + ctx = trace.set_span_in_context(span) + token = context.attach(ctx) + + # Set _SUPPRESS_LLM_SDK_KEY to prevent nested SDK instrumentation (e.g., OpenAI) + suppress_token = None + try: + suppress_token = context.attach( + context.set_value(_SUPPRESS_LLM_SDK_KEY, True) + ) + except Exception: + # If context setting fails, continue without suppression token + pass + + try: + # Set request attributes using ARMS extension + self.arms_extension.set_llm_request_attributes(span, model, provider, kwargs, messages) + + # Set input messages + if messages: + structured_messages = convert_messages_to_structured_format(messages) + span.set_attribute(LLMAttributes.GEN_AI_INPUT_MESSAGES, safe_json_dumps(structured_messages)) + + # Set tool definitions + tools = kwargs.get("tools") + if tools: + span.set_attribute(LLMAttributes.GEN_AI_TOOL_DEFINITIONS, convert_tool_definitions(tools)) + + # Call original function + response = await self.original_func(*args, **kwargs) + + # Wrap the async streaming response - span will be ended by wrapper + stream_wrapper = AsyncStreamWrapper( + stream=response, + span=span, + callback=lambda s, last_chunk, error: self._handle_stream_end( + s, last_chunk, error, start_time, start_time_ns, stream_wrapper + ) + ) + response = stream_wrapper + + return response + except Exception as e: + # Record error + span.record_exception(e) + span.set_status(Status(StatusCode.ERROR, str(e))) + + # Record metrics even for failed calls + duration = time.time() - start_time + self.arms_extension.record_llm_metrics(span, duration, start_time_ns) + + # Record error metrics + self.arms_extension.record_error_metrics(GenAiSpanKind.LLM.value) + + span.end() + + raise + finally: + # Detach suppress context first, then span context + if suppress_token: + try: + context.detach(suppress_token) + except Exception: + pass + context.detach(token) + else: + # Non-streaming: use context manager as normal + with self.tracer.start_as_current_span(span_name) as span: + # Set _SUPPRESS_LLM_SDK_KEY to prevent nested SDK instrumentation (e.g., OpenAI) + suppress_token = None + try: + suppress_token = context.attach( + context.set_value(_SUPPRESS_LLM_SDK_KEY, True) + ) + except Exception: + # If context setting fails, continue without suppression token + pass + + try: + # Set request attributes using ARMS extension + self.arms_extension.set_llm_request_attributes(span, model, provider, kwargs, messages) + + # Set input messages + if messages: + structured_messages = convert_messages_to_structured_format(messages) + span.set_attribute(LLMAttributes.GEN_AI_INPUT_MESSAGES, safe_json_dumps(structured_messages)) + + # Set tool definitions + tools = kwargs.get("tools") + if tools: + span.set_attribute(LLMAttributes.GEN_AI_TOOL_DEFINITIONS, convert_tool_definitions(tools)) + + # Call original function + response = await self.original_func(*args, **kwargs) + + # Set response attributes immediately + self.arms_extension.set_llm_response_attributes(span, response) + + # Set output messages (reuse sync logic) + completion_wrapper = CompletionWrapper(self.tracer, self.meter, None) + completion_wrapper._set_output_messages(span, response) + + # Record metrics + duration = time.time() - start_time + self.arms_extension.record_llm_metrics(span, duration, start_time_ns) + + return response + + except Exception as e: + # Record error + span.record_exception(e) + span.set_status(Status(StatusCode.ERROR, str(e))) + + # Record metrics even for failed calls + duration = time.time() - start_time + self.arms_extension.record_llm_metrics(span, duration, start_time_ns) + + # Record error metrics + self.arms_extension.record_error_metrics(GenAiSpanKind.LLM.value) + + raise + finally: + # Detach suppress context + if suppress_token: + try: + context.detach(suppress_token) + except Exception: + pass + + def _handle_stream_end( + self, + span: trace.Span, + last_chunk: Optional[Any], + error: Optional[Exception], + start_time: float, + start_time_ns: int, + stream_wrapper: Optional[Any] = None + ): + """Handle the end of an async streaming response.""" + # Reuse sync logic + completion_wrapper = CompletionWrapper(self.tracer, self.meter, None) + completion_wrapper.arms_extension = self.arms_extension + completion_wrapper._handle_stream_end(span, last_chunk, error, start_time, start_time_ns, stream_wrapper) diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/arms_litellm_extension.py b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/arms_litellm_extension.py new file mode 100644 index 00000000..c513cf82 --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/arms_litellm_extension.py @@ -0,0 +1,243 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +ARMS extension for LiteLLM instrumentation. + +This module contains ARMS-specific logic including: +- Semantic conventions +- Metrics collection +- Attribute management +""" + +import time +import logging +from typing import Any, Dict, Optional +from opentelemetry import trace +from opentelemetry.metrics import Meter, get_meter + +# Import ARMS semantic conventions +from aliyun.semconv.trace import SpanAttributes, AliyunSpanKindValues +from aliyun.semconv.trace_v2 import LLMAttributes, EmbeddingAttributes, CommonAttributes, GenAiSpanKind +from aliyun.sdk.extension.arms.semconv.metrics import ArmsCommonServiceMetrics +from aliyun.sdk.extension.arms.common.utils.metrics_utils import get_llm_common_attributes +from opentelemetry.semconv._incubating.attributes.gen_ai_attributes import GEN_AI_SYSTEM + +logger = logging.getLogger(__name__) + + +class ArmsLiteLLMExtension: + """ + ARMS extension for LiteLLM instrumentation. + + Handles ARMS-specific semantic conventions, metrics, and attributes. + """ + + def __init__(self, meter: Meter): + self.meter = meter + self._arms_metrics = None + + def set_llm_request_attributes( + self, + span: trace.Span, + model: str, + provider: str, + kwargs: Dict[str, Any], + messages: list + ) -> None: + """Set LLM request attributes following ARMS semantic conventions.""" + # Required attributes using semantic conventions constants + span.set_attribute(CommonAttributes.GEN_AI_SPAN_KIND, GenAiSpanKind.LLM.value) + span.set_attribute(GEN_AI_SYSTEM, provider) + # Store the model name as requested by user + span.set_attribute(LLMAttributes.GEN_AI_REQUEST_MODEL, model) + + # Optional request parameters + if "temperature" in kwargs: + span.set_attribute(LLMAttributes.GEN_AI_REQUEST_TEMPERATURE, kwargs["temperature"]) + if "max_tokens" in kwargs: + span.set_attribute(LLMAttributes.GEN_AI_REQUEST_MAX_TOKENS, kwargs["max_tokens"]) + if "top_p" in kwargs: + span.set_attribute(LLMAttributes.GEN_AI_REQUEST_TOP_P, kwargs["top_p"]) + if "top_k" in kwargs: + span.set_attribute(LLMAttributes.GEN_AI_REQUEST_TOP_K, kwargs["top_k"]) + if "frequency_penalty" in kwargs: + span.set_attribute(LLMAttributes.GEN_AI_REQUEST_FREQUENCY_PENALTY, kwargs["frequency_penalty"]) + if "presence_penalty" in kwargs: + span.set_attribute(LLMAttributes.GEN_AI_REQUEST_PRESENCE_PENALTY, kwargs["presence_penalty"]) + if "seed" in kwargs: + span.set_attribute(LLMAttributes.GEN_AI_REQUEST_SEED, str(kwargs["seed"])) + if "stop" in kwargs: + span.set_attribute(LLMAttributes.GEN_AI_REQUEST_STOP_SEQUENCES, str(kwargs["stop"])) + if "stream" in kwargs: + span.set_attribute(LLMAttributes.GEN_AI_REQUEST_IS_STREAM, kwargs["stream"]) + if "n" in kwargs: + span.set_attribute(LLMAttributes.GEN_AI_REQUEST_CHOICE_COUNT, kwargs["n"]) + + def set_llm_response_attributes( + self, + span: trace.Span, + response: Any + ) -> None: + """Set LLM response attributes following ARMS semantic conventions.""" + try: + # Response model and ID + if hasattr(response, "model"): + span.set_attribute(LLMAttributes.GEN_AI_RESPONSE_MODEL, response.model) + if hasattr(response, "id"): + span.set_attribute(LLMAttributes.GEN_AI_RESPONSE_ID, response.id) + + # Token usage + if hasattr(response, "usage"): + usage = response.usage + if hasattr(usage, "prompt_tokens"): + span.set_attribute(LLMAttributes.GEN_AI_USAGE_INPUT_TOKENS, usage.prompt_tokens) + if hasattr(usage, "completion_tokens"): + span.set_attribute(LLMAttributes.GEN_AI_USAGE_OUTPUT_TOKENS, usage.completion_tokens) + if hasattr(usage, "total_tokens"): + span.set_attribute(LLMAttributes.GEN_AI_USAGE_TOTAL_TOKENS, usage.total_tokens) + + # Finish reasons + if hasattr(response, "choices") and response.choices: + if hasattr(response.choices[0], "finish_reason"): + span.set_attribute(LLMAttributes.GEN_AI_RESPONSE_FINISH_REASONS, [response.choices[0].finish_reason]) + + except Exception as e: + logger.debug(f"Error setting LLM response attributes: {e}") + + def set_embedding_request_attributes( + self, + span: trace.Span, + model: str, + provider: str + ) -> None: + """Set embedding request attributes following ARMS semantic conventions.""" + # Import parse_model_name here to avoid circular imports + from opentelemetry.instrumentation.litellm._utils import parse_model_name + + span.set_attribute(CommonAttributes.GEN_AI_SPAN_KIND, GenAiSpanKind.EMBEDDING.value) + # Note: EmbeddingAttributes doesn't have GEN_AI_SYSTEM, use LLMAttributes + span.set_attribute(GEN_AI_SYSTEM, provider) + # Store model name without provider prefix + span.set_attribute(EmbeddingAttributes.GEN_AI_REQUEST_MODEL, parse_model_name(model)) + + def set_embedding_response_attributes( + self, + span: trace.Span, + response: Any + ) -> None: + """Set embedding response attributes following ARMS semantic conventions.""" + try: + # Token usage + if hasattr(response, "usage"): + usage = response.usage + if hasattr(usage, "prompt_tokens"): + span.set_attribute(EmbeddingAttributes.GEN_AI_USAGE_INPUT_TOKENS, usage.prompt_tokens) + # Note: EmbeddingAttributes doesn't have GEN_AI_USAGE_TOTAL_TOKENS, use LLMAttributes + if hasattr(usage, "total_tokens"): + span.set_attribute(LLMAttributes.GEN_AI_USAGE_TOTAL_TOKENS, usage.total_tokens) + + # Embedding dimension + if hasattr(response, "data") and response.data: + first_embedding = response.data[0] + # Support both dict and object access + embedding_vector = None + if hasattr(first_embedding, "embedding"): + embedding_vector = first_embedding.embedding + elif isinstance(first_embedding, dict) and "embedding" in first_embedding: + embedding_vector = first_embedding["embedding"] + + if embedding_vector and isinstance(embedding_vector, list): + span.set_attribute(EmbeddingAttributes.GEN_AI_EMBEDDINGS_DIMENSION_COUNT, len(embedding_vector)) + + except Exception as e: + logger.debug(f"Error setting embedding response attributes: {e}") + + def record_llm_metrics( + self, + span: trace.Span, + duration: float, + start_time_ns: int + ) -> None: + """ + Record LLM metrics following ARMS standards. + + Includes modelName in metrics for monitoring granularity. + """ + try: + # Get common attributes (includes service info, etc.) + metrics_attributes = get_llm_common_attributes() + + # Get span kind from span + span_kind = span.attributes.get(CommonAttributes.GEN_AI_SPAN_KIND) + if not span_kind: + return + + # Add spanKind to metrics attributes + metrics_attributes["spanKind"] = span_kind + + # Extract model name from span attributes + model_name = "UNSET" + if response_model := span.attributes.get(LLMAttributes.GEN_AI_RESPONSE_MODEL): + model_name = response_model + elif request_model := span.attributes.get(LLMAttributes.GEN_AI_REQUEST_MODEL): + model_name = request_model + metrics_attributes["modelName"] = model_name + + # Initialize metrics if not already done + if self._arms_metrics is None: + self._arms_metrics = ArmsCommonServiceMetrics(self.meter) + + # Record call count + self._arms_metrics.calls_count.add(1, attributes=metrics_attributes) + + # Record duration + self._arms_metrics.calls_duration_seconds.record(duration, attributes=metrics_attributes) + + # Record token usage for LLM spans only + if span_kind == GenAiSpanKind.LLM.value: + # Record input tokens + if input_tokens := span.attributes.get(LLMAttributes.GEN_AI_USAGE_INPUT_TOKENS): + # Create new dict instead of deepcopy to avoid overhead + input_attrs = dict(metrics_attributes) + input_attrs["usageType"] = "input" + self._arms_metrics.llm_usage_tokens.add(input_tokens, attributes=input_attrs) + + # Record output tokens + if output_tokens := span.attributes.get(LLMAttributes.GEN_AI_USAGE_OUTPUT_TOKENS): + # Create new dict instead of deepcopy to avoid overhead + output_attrs = dict(metrics_attributes) + output_attrs["usageType"] = "output" + self._arms_metrics.llm_usage_tokens.add(output_tokens, attributes=output_attrs) + + except Exception as e: + logger.debug(f"Error recording metrics: {e}") + + def record_error_metrics( + self, + span_kind: str + ) -> None: + """Record error metrics.""" + try: + metrics_attributes = get_llm_common_attributes() + metrics_attributes["spanKind"] = span_kind + + if self._arms_metrics is None: + self._arms_metrics = ArmsCommonServiceMetrics(self.meter) + + self._arms_metrics.call_error_count.add(1, attributes=metrics_attributes) + + except Exception as e: + logger.debug(f"Error recording error metrics: {e}") + diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/package.py b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/package.py new file mode 100644 index 00000000..977adbd5 --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/package.py @@ -0,0 +1,18 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +_instruments = ("litellm >= 1.0.0",) + +_supports_metrics = True + diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/version.py b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/version.py new file mode 100644 index 00000000..19f19fdd --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/version.py @@ -0,0 +1,16 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +__version__ = "0.51b0" + diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/test-requirements.txt b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/test-requirements.txt new file mode 100644 index 00000000..4945578e --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/test-requirements.txt @@ -0,0 +1,8 @@ +litellm>=1.79.0 +pytest +pytest-asyncio +openai +-e aliyun-semantic-conventions +-e util/opentelemetry-util-http +-e instrumentation/opentelemetry-instrumentation-litellm + diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/__init__.py b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/__init__.py new file mode 100644 index 00000000..f87ce79b --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/__init__.py @@ -0,0 +1,14 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_embedding.py b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_embedding.py new file mode 100644 index 00000000..6263796d --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_embedding.py @@ -0,0 +1,309 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Test cases for LiteLLM embedding calls. + +This module tests embedding functionality using LiteLLM's embedding API, +including both synchronous and asynchronous calls. +""" + +import os +import asyncio +from opentelemetry.test.test_base import TestBase +from opentelemetry.instrumentation.litellm import LiteLLMInstrumentor + + +class TestEmbedding(TestBase): + """ + Test embedding calls with LiteLLM. + """ + + def setUp(self): + super().setUp() + # Set up environment variables for testing + os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY", "test-openai-key-placeholder") + os.environ["DASHSCOPE_API_KEY"] = os.getenv("DASHSCOPE_API_KEY", "test-dashscope-key-placeholder") + os.environ["OPENAI_API_BASE"] = "https://dashscope.aliyuncs.com/compatible-mode/v1" + os.environ["DASHSCOPE_API_BASE"] = "https://dashscope.aliyuncs.com/compatible-mode/v1" + + # Instrument LiteLLM + LiteLLMInstrumentor().instrument( + tracer_provider=self.tracer_provider, + meter_provider=self.meter_provider + ) + + def tearDown(self): + super().tearDown() + # Uninstrument to avoid affecting other tests + LiteLLMInstrumentor().uninstrument() + from aliyun.sdk.extension.arms.semconv.metrics import SingletonMeta + SingletonMeta.reset() + + def test_sync_embedding_single_text(self): + """ + Test synchronous embedding with single text input. + + This test performs a basic embedding request using LiteLLM withœ + a single text string. It verifies that the embedding vector is + generated and instrumentation captures all required information. + + The test verifies: + - A span is created with gen_ai.span.kind = "EMBEDDING" + - Required attributes: model, usage tokens, dimension count + - Input text is captured + - Embedding dimension is recorded + """ + import litellm + + # Business demo: Single text embedding + # This demo generates an embedding for a single text string using text-embedding-v1 model + response = litellm.embedding( + model="openai/text-embedding-v1", + api_base="https://dashscope.aliyuncs.com/compatible-mode/v1", + input="The quick brown fox jumps over the lazy dog" + ) + + # Verify the response + self.assertIsNotNone(response) + self.assertTrue(hasattr(response, 'data')) + self.assertGreater(len(response.data), 0) + + # Verify embedding is a list of numbers + embedding = response.data[0].get("embedding") + self.assertIsInstance(embedding, list) + self.assertGreater(len(embedding), 0) + + # Get spans and verify instrumentation + spans = self.get_finished_spans() + self.assertEqual(len(spans), 1, "Expected exactly one span for embedding call") + + span = spans[0] + + # Verify span kind + self.assertEqual( + span.attributes.get("gen_ai.span.kind"), + "EMBEDDING", + "Span kind should be EMBEDDING" + ) + + # Verify required attributes + self.assertIn("gen_ai.request.model", span.attributes) + self.assertEqual(span.attributes.get("gen_ai.request.model"), "text-embedding-v1") + + # Verify token usage (required for embedding) + self.assertIn("gen_ai.usage.input_tokens", span.attributes) + self.assertIn("gen_ai.usage.total_tokens", span.attributes) + self.assertGreater(span.attributes.get("gen_ai.usage.input_tokens"), 0) + self.assertGreater(span.attributes.get("gen_ai.usage.total_tokens"), 0) + + # Verify embedding dimension count + self.assertIn("gen_ai.embeddings.dimension.count", span.attributes) + dimension = span.attributes.get("gen_ai.embeddings.dimension.count") + self.assertEqual(dimension, len(embedding)) + self.assertGreater(dimension, 0) + + # Verify metrics + metrics = self.get_sorted_metrics() + metric_names = [m.name for m in metrics] + + # Check for required metrics + self.assertIn("genai_calls_count", metric_names) + self.assertIn("genai_calls_duration_seconds", metric_names) + + # Verify genai_calls_count metric + calls_metric = next(m for m in metrics if m.name == "genai_calls_count") + self.assertEqual(len(calls_metric.data.data_points), 1) + data_point = calls_metric.data.data_points[0] + self.assertEqual(data_point.value, 1) + self.assertIn("spanKind", data_point.attributes) + self.assertEqual(data_point.attributes["spanKind"], "EMBEDDING") + + def test_sync_embedding_multiple_texts(self): + """ + Test synchronous embedding with multiple text inputs. + + This test performs an embedding request with a list of texts. + It verifies that all texts are embedded and the instrumentation + captures the batch operation correctly. + + The test verifies: + - Multiple embeddings are generated + - Span captures batch operation + - Token usage reflects multiple inputs + """ + import litellm + + # Business demo: Batch embedding + # This demo generates embeddings for multiple texts in a single call + texts = [ + "Hello, world!", + "Artificial intelligence is fascinating.", + "LiteLLM makes LLM integration easy." + ] + + response = litellm.embedding( + model="openai/text-embedding-v1", + api_base="https://dashscope.aliyuncs.com/compatible-mode/v1", + input=texts + ) + + # Verify the response + self.assertIsNotNone(response) + self.assertTrue(hasattr(response, 'data')) + self.assertEqual(len(response.data), len(texts), "Should have embedding for each text") + + + # Verify each embedding + self.assertIsInstance(response.data[0].get("embedding"), list) + self.assertGreater(len(response.data[0].get("embedding")), 0) + + # Get spans + spans = self.get_finished_spans() + self.assertEqual(len(spans), 1) + + span = spans[0] + + # Verify span kind + self.assertEqual(span.attributes.get("gen_ai.span.kind"), "EMBEDDING") + + # Verify token usage accounts for all inputs + self.assertIn("gen_ai.usage.input_tokens", span.attributes) + input_tokens = span.attributes.get("gen_ai.usage.input_tokens") + self.assertGreater(input_tokens, 0) + + def test_async_embedding(self): + """ + Test asynchronous embedding call. + + This test performs an asynchronous embedding request using + litellm.aembedding(). It verifies that async operations are + properly instrumented. + + The test verifies: + - Async embedding works correctly + - Span is created for async call + - All required attributes are captured + """ + import litellm + + async def run_async_embedding(): + # Business demo: Asynchronous embedding + # This demo uses async API to generate embeddings without blocking + response = await litellm.aembedding( + model="openai/text-embedding-v1", + api_base="https://dashscope.aliyuncs.com/compatible-mode/v1", + input="Asynchronous embedding test" + ) + return response + + # Run the async function + response = asyncio.run(run_async_embedding()) + + # Verify response + self.assertIsNotNone(response) + self.assertTrue(hasattr(response, 'data')) + self.assertGreater(len(response.data), 0) + + # Get spans + spans = self.get_finished_spans() + self.assertEqual(len(spans), 1) + + span = spans[0] + + # Verify span attributes + self.assertEqual(span.attributes.get("gen_ai.span.kind"), "EMBEDDING") + self.assertIn("gen_ai.request.model", span.attributes) + self.assertIn("gen_ai.usage.input_tokens", span.attributes) + self.assertIn("gen_ai.embeddings.dimension.count", span.attributes) + + def test_embedding_with_different_models(self): + """ + Test embedding with different model providers. + + This test tries embedding with different models to verify + that the instrumentation works across different providers + supported by LiteLLM. + + The test verifies: + - Different models are handled correctly + - Model name is captured in attributes + - System/provider information is recorded + """ + import litellm + + # Business demo: Using different embedding models + # This demo tests embedding with text-embedding-v1 model + response = litellm.embedding( + model="openai/text-embedding-v1", + api_base="https://dashscope.aliyuncs.com/compatible-mode/v1", + input="Testing different embedding models" + ) + + # Verify response + self.assertIsNotNone(response) + + # Get spans + spans = self.get_finished_spans() + self.assertEqual(len(spans), 1) + + span = spans[0] + + # Verify model information + self.assertIn("gen_ai.request.model", span.attributes) + request_model = span.attributes.get("gen_ai.request.model") + self.assertEqual(request_model, "text-embedding-v1") + + # Verify system/provider is captured + self.assertIn("gen_ai.system", span.attributes) + system = span.attributes.get("gen_ai.system") + self.assertIsNotNone(system) + self.assertIsInstance(system, str) + + def test_embedding_empty_input(self): + """ + Test embedding with edge case inputs. + + This test verifies that the instrumentation handles edge cases + like very short texts correctly. + + The test verifies: + - Short/simple inputs are handled + - Instrumentation doesn't break on edge cases + """ + import litellm + + # Business demo: Embedding with minimal input + # This demo tests embedding with a very short text + response = litellm.embedding( + model="openai/text-embedding-v1", + api_base="https://dashscope.aliyuncs.com/compatible-mode/v1", + input="Hi" + ) + + # Verify response + self.assertIsNotNone(response) + self.assertTrue(hasattr(response, 'data')) + self.assertGreater(len(response.data), 0) + + # Get spans + spans = self.get_finished_spans() + self.assertEqual(len(spans), 1) + + span = spans[0] + + # Verify basic attributes are still captured + self.assertEqual(span.attributes.get("gen_ai.span.kind"), "EMBEDDING") + self.assertIn("gen_ai.usage.input_tokens", span.attributes) + diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_error_handling.py b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_error_handling.py new file mode 100644 index 00000000..fc23d9e3 --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_error_handling.py @@ -0,0 +1,413 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Test cases for LiteLLM error handling and edge cases. + +This module tests various error scenarios and edge cases to verify +that instrumentation handles failures gracefully. +""" + +import os +from opentelemetry.test.test_base import TestBase +from opentelemetry.instrumentation.litellm import LiteLLMInstrumentor +from opentelemetry.trace import StatusCode + + +class TestErrorHandling(TestBase): + """ + Test error handling and edge cases with LiteLLM. + """ + + def setUp(self): + super().setUp() + # Instrument LiteLLM + LiteLLMInstrumentor().instrument( + tracer_provider=self.tracer_provider, + meter_provider=self.meter_provider + ) + + def tearDown(self): + super().tearDown() + # Uninstrument to avoid affecting other tests + LiteLLMInstrumentor().uninstrument() + from aliyun.sdk.extension.arms.semconv.metrics import SingletonMeta + SingletonMeta.reset() + + def test_authentication_failure(self): + """ + Test handling of authentication failures. + + This test intentionally provides invalid API credentials + to verify that authentication errors are properly captured + and instrumented. + + The test verifies: + - Span is created even on authentication failure + - Span status indicates error + - Error information is captured + - genai_calls_error_count metric is incremented + """ + import litellm + + # Business demo: Authentication failure + # This demo tests behavior when API key is invalid + # Temporarily set invalid credentials for dashscope + original_dashscope_key = os.environ.get("DASHSCOPE_API_KEY") + os.environ["DASHSCOPE_API_KEY"] = "invalid-key-12345" + + try: + response = litellm.completion( + model="dashscope/qwen-turbo", + messages=[ + {"role": "user", "content": "Hello"} + ] + ) + # If it somehow succeeds, fail the test + self.fail("Expected authentication error but call succeeded") + except Exception as e: + # Expected to fail + self.assertIsNotNone(e) + finally: + # Restore original key + if original_dashscope_key: + os.environ["DASHSCOPE_API_KEY"] = original_dashscope_key + else: + os.environ.pop("DASHSCOPE_API_KEY", None) + + # Get spans + spans = self.get_finished_spans() + + if len(spans) > 0: + span = spans[0] + + # Verify span status indicates error + self.assertEqual( + span.status.status_code, + StatusCode.ERROR, + "Span status should indicate error" + ) + + # Check if error metrics are recorded + metrics = self.get_sorted_metrics() + metric_names = [m.name for m in metrics] + + if "genai_calls_error_count" in metric_names: + error_metric = next(m for m in metrics if m.name == "genai_calls_error_count") + self.assertGreater(len(error_metric.data.data_points), 0) + + def test_invalid_model_name(self): + """ + Test handling of invalid model names. + + This test uses a non-existent model name to verify + that model not found errors are handled correctly. + + The test verifies: + - Error is properly raised + - Span captures the invalid model name + - Error status is recorded + """ + import litellm + + # Set up valid credentials + os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY", "test-openai-key-placeholder") + os.environ["OPENAI_API_BASE"] = "https://dashscope.aliyuncs.com/compatible-mode/v1" + + # Business demo: Invalid model name + # This demo tests behavior when model name doesn't exist + try: + response = litellm.completion( + model="non-existent-model-xyz-123", + messages=[ + {"role": "user", "content": "Hello"} + ] + ) + # If it somehow succeeds, that's also okay for this test + except Exception as e: + # Expected to fail with model not found + self.assertIsNotNone(e) + + # Get spans + spans = self.get_finished_spans() + + if len(spans) > 0: + span = spans[0] + + # Verify model name is still captured + self.assertIn("gen_ai.request.model", span.attributes) + self.assertEqual( + span.attributes.get("gen_ai.request.model"), + "non-existent-model-xyz-123" + ) + + def test_network_timeout(self): + """ + Test handling of network timeouts. + + This test sets a very short timeout to trigger a timeout error + and verifies that it's handled gracefully. + + The test verifies: + - Timeout errors are captured + - Span indicates error status + - Timeout parameter is recorded + """ + import litellm + + # Set up valid credentials + os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY", "test-openai-key-placeholder") + os.environ["OPENAI_API_BASE"] = "https://dashscope.aliyuncs.com/compatible-mode/v1" + + # Business demo: Network timeout + # This demo tests behavior with extremely short timeout + try: + response = litellm.completion( + model="dashscope/qwen-turbo", + messages=[ + {"role": "user", "content": "Tell me a long story"} + ], + timeout=0.001 # Very short timeout to trigger error + ) + # May or may not succeed depending on network speed + except Exception as e: + # Timeout is expected + self.assertIsNotNone(e) + + # Get spans + spans = self.get_finished_spans() + + if len(spans) > 0: + span = spans[0] + # Verify basic attributes are captured even on timeout + self.assertIn("gen_ai.request.model", span.attributes) + + def test_empty_messages(self): + """ + Test handling of empty message list. + + This test provides an empty messages list to verify + that input validation errors are handled. + + The test verifies: + - Empty input error is raised + - Instrumentation doesn't crash + """ + import litellm + + # Set up valid credentials + os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY", "test-openai-key-placeholder") + os.environ["OPENAI_API_BASE"] = "https://dashscope.aliyuncs.com/compatible-mode/v1" + + # Business demo: Empty messages + # This demo tests behavior with empty message list + try: + response = litellm.completion( + model="dashscope/qwen-turbo", + messages=[] # Empty messages + ) + # May raise validation error + except Exception as e: + # Expected to fail + self.assertIsNotNone(e) + + # Instrumentation should not crash + + def test_invalid_temperature(self): + """ + Test handling of invalid parameter values. + + This test provides invalid temperature value to verify + that parameter validation is handled. + + The test verifies: + - Invalid parameter errors are handled + - Parameters are still captured even if invalid + """ + import litellm + + # Set up valid credentials + os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY", "test-openai-key-placeholder") + os.environ["OPENAI_API_BASE"] = "https://dashscope.aliyuncs.com/compatible-mode/v1" + + # Business demo: Invalid temperature + # This demo tests behavior with out-of-range temperature + try: + response = litellm.completion( + model="dashscope/qwen-turbo", + messages=[ + {"role": "user", "content": "Hello"} + ], + temperature=5.0 # Invalid: temperature should be 0-2 + ) + # Some providers might accept this or clamp it + except Exception as e: + # May fail with validation error + self.assertIsNotNone(e) + + # Get spans + spans = self.get_finished_spans() + + if len(spans) > 0: + span = spans[0] + # Verify temperature is captured + if "gen_ai.request.temperature" in span.attributes: + self.assertEqual( + span.attributes.get("gen_ai.request.temperature"), + 5.0 + ) + + def test_max_tokens_exceeded(self): + """ + Test handling when max_tokens is exceeded. + + This test sets a very small max_tokens to trigger + truncation and verify finish_reason is captured. + + The test verifies: + - Response with length limit is handled + - finish_reason indicates truncation + - Output is still captured + """ + import litellm + + # Set up valid credentials + os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY", "test-openai-key-placeholder") + os.environ["OPENAI_API_BASE"] = "https://dashscope.aliyuncs.com/compatible-mode/v1" + os.environ["DASHSCOPE_API_KEY"] = os.getenv("DASHSCOPE_API_KEY", "test-dashscope-key-placeholder") + os.environ["DASHSCOPE_API_BASE"] = "https://dashscope.aliyuncs.com/compatible-mode/v1" + + # Business demo: Max tokens limit + # This demo requests a long response with very small token limit + response = litellm.completion( + model="dashscope/qwen-turbo", + messages=[ + {"role": "user", "content": "Write a detailed essay about artificial intelligence"} + ], + max_tokens=5 # Very small limit + ) + + # Should succeed but with truncated output + self.assertIsNotNone(response) + + # Get spans + spans = self.get_finished_spans() + self.assertEqual(len(spans), 1) + + span = spans[0] + + # Verify finish_reason might indicate length limit + if "gen_ai.response.finish_reasons" in span.attributes: + finish_reason = span.attributes.get("gen_ai.response.finish_reasons") + # Could be "length" or other provider-specific value + self.assertIsNotNone(finish_reason) + + def test_malformed_message_format(self): + """ + Test handling of malformed message structures. + + This test provides messages with invalid structure to verify + that the instrumentation handles it gracefully. + + The test verifies: + - Malformed input doesn't crash instrumentation + - Error is propagated correctly + """ + import litellm + + # Set up valid credentials + os.environ["OPENAI_API_KEY"] = "sk-bb17f655100247aba631aaf0c6e6f424" + os.environ["OPENAI_API_BASE"] = "https://dashscope.aliyuncs.com/compatible-mode/v1" + + # Business demo: Malformed message + # This demo tests behavior with invalid message structure + try: + response = litellm.completion( + model="dashscope/qwen-turbo", + messages=[ + {"invalid_key": "user", "wrong_field": "Hello"} # Wrong keys + ] + ) + # Should fail with validation error + except Exception as e: + # Expected to fail + self.assertIsNotNone(e) + + # Instrumentation should not crash + # Spans may or may not be created depending on when validation happens + + def test_rate_limit_handling(self): + """ + Test handling of rate limit errors. + + Note: This test cannot reliably trigger rate limits without + making many requests, so it's more of a placeholder for + documenting expected behavior. + + The test verifies: + - Rate limit errors should be captured in spans + - Error metrics should be incremented + - Status should indicate error + """ + # This test is mainly documentation of expected behavior + # In real scenarios, rate limit errors should be handled gracefully + # and captured in instrumentation + pass + + def test_streaming_interruption(self): + """ + Test handling of interrupted streaming. + + This test starts a stream but simulates interruption + to verify that partial data is handled correctly. + + The test verifies: + - Partial stream data is captured + - Span is finalized even on interruption + """ + import litellm + + # Set up valid credentials + os.environ["OPENAI_API_KEY"] = "sk-bb17f655100247aba631aaf0c6e6f424" + os.environ["OPENAI_API_BASE"] = "https://dashscope.aliyuncs.com/compatible-mode/v1" + + # Business demo: Interrupted streaming + # This demo starts a stream but stops reading early + try: + response = litellm.completion( + model="dashscope/qwen-turbo", + messages=[ + {"role": "user", "content": "Write a very long story"} + ], + stream=True, + max_tokens=200 + ) + + # Read only first chunk then stop + first_chunk = next(response, None) + self.assertIsNotNone(first_chunk) + # Don't consume the rest of the stream + + except Exception as e: + # May raise if stream is not properly closed + pass + + # Get spans + spans = self.get_finished_spans() + + # Should have created span even with incomplete stream + if len(spans) > 0: + span = spans[0] + self.assertEqual(span.attributes.get("gen_ai.request.is_stream"), True) + diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_retry.py b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_retry.py new file mode 100644 index 00000000..c8a0705f --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_retry.py @@ -0,0 +1,279 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Test cases for LiteLLM retry mechanisms. + +This module tests retry functionality in LiteLLM, including both +completion_with_retries and acompletion_with_retries functions. +""" + +import os +import asyncio +from opentelemetry.test.test_base import TestBase +from opentelemetry.instrumentation.litellm import LiteLLMInstrumentor + + +class TestRetry(TestBase): + """ + Test retry mechanisms with LiteLLM. + """ + + def setUp(self): + super().setUp() + # Set up environment variables for testing + os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY", "test-openai-key-placeholder") + os.environ["DASHSCOPE_API_KEY"] = os.getenv("DASHSCOPE_API_KEY", "test-dashscope-key-placeholder") + os.environ["OPENAI_API_BASE"] = "https://dashscope.aliyuncs.com/compatible-mode/v1" + os.environ["DASHSCOPE_API_BASE"] = "https://dashscope.aliyuncs.com/compatible-mode/v1" + + # Instrument LiteLLM + LiteLLMInstrumentor().instrument( + tracer_provider=self.tracer_provider, + meter_provider=self.meter_provider + ) + + def tearDown(self): + # Force flush metrics before tearDown + if hasattr(self, 'meter_provider') and self.meter_provider: + try: + self.meter_provider.force_flush() + except Exception: + pass + + super().tearDown() + # Uninstrument to avoid affecting other tests + LiteLLMInstrumentor().uninstrument() + from aliyun.sdk.extension.arms.semconv.metrics import SingletonMeta + SingletonMeta.reset() + + def test_completion_with_retries_success(self): + """ + Test successful completion with retry mechanism. + + This test uses litellm.completion_with_retries() which wraps + the standard completion call with automatic retry logic. + When the call succeeds on first try, it should behave like + a normal completion. + + The test verifies: + - Retry-wrapped completion succeeds + - Span is created correctly + - All standard attributes are captured + - No retry-specific attributes if first call succeeds + """ + import litellm + + # Business demo: Completion with retry wrapper (success case) + # This demo uses completion_with_retries which automatically retries on failures + # In this case, the call should succeed on first try + response = litellm.completion_with_retries( + model="dashscope/qwen-turbo", + messages=[ + {"role": "user", "content": "What is 1+1? Answer briefly."} + ], + temperature=0.1 + ) + + # Verify the response + self.assertIsNotNone(response) + self.assertTrue(hasattr(response, 'choices')) + self.assertGreater(len(response.choices), 0) + + # Get spans + spans = self.get_finished_spans() + # Should have at least one span (may have more if retry logic creates child spans) + self.assertGreaterEqual(len(spans), 1) + + # Get the main span (usually the last one or the root span) + span = spans[-1] if len(spans) > 0 else spans[0] + + # Verify span kind + self.assertEqual( + span.attributes.get("gen_ai.span.kind"), + "LLM", + "Span kind should be LLM" + ) + + # Verify standard attributes + self.assertIn("gen_ai.request.model", span.attributes) + self.assertIn("gen_ai.usage.input_tokens", span.attributes) + self.assertIn("gen_ai.usage.output_tokens", span.attributes) + + def test_async_completion_with_retries(self): + """ + Test asynchronous completion with retry mechanism. + + This test uses litellm.acompletion_with_retries() for + asynchronous retry logic. + + The test verifies: + - Async retry wrapper works correctly + - Span is created for async retry call + - Standard attributes are captured + """ + import litellm + + async def run_async_retry(): + # Business demo: Async completion with retry wrapper + # This demo uses async version of completion_with_retries + response = await litellm.acompletion_with_retries( + model="dashscope/qwen-turbo", + messages=[ + {"role": "user", "content": "Name a primary color."} + ], + temperature=0.0 + ) + return response + + # Run the async function + response = asyncio.run(run_async_retry()) + + # Verify response + self.assertIsNotNone(response) + self.assertTrue(hasattr(response, 'choices')) + + # Get spans + spans = self.get_finished_spans() + self.assertGreaterEqual(len(spans), 1) + + # Find LLM span + llm_spans = [s for s in spans if s.attributes.get("gen_ai.span.kind") == "LLM"] + self.assertGreater(len(llm_spans), 0, "Should have at least one LLM span") + + span = llm_spans[0] + + # Verify attributes + self.assertIn("gen_ai.request.model", span.attributes) + self.assertIn("gen_ai.usage.input_tokens", span.attributes) + + def test_completion_with_custom_retry_config(self): + """ + Test completion with custom retry configuration. + + This test configures custom retry parameters like max retries + and verifies that the instrumentation handles them correctly. + + The test verifies: + - Custom retry config is respected + - Instrumentation works with custom config + """ + import litellm + + # Business demo: Completion with custom retry configuration + # This demo sets custom retry parameters + # Note: LiteLLM's retry mechanism might use different parameter names + response = litellm.completion_with_retries( + model="dashscope/qwen-turbo", + messages=[ + {"role": "user", "content": "What is the capital of China?"} + ], + num_retries=3, # Maximum number of retries + timeout=30 # Timeout in seconds + ) + + # Verify response + self.assertIsNotNone(response) + + # Get spans + spans = self.get_finished_spans() + self.assertGreaterEqual(len(spans), 1) + + def test_retry_with_streaming(self): + """ + Test retry mechanism with streaming completion. + + This test combines retry logic with streaming to verify + that both features work together correctly. + + The test verifies: + - Retry works with streaming + - Stream is properly handled + - TTFT is captured + """ + import litellm + + # Business demo: Streaming completion with retry wrapper + # This demo uses retry wrapper with streaming enabled + response = litellm.completion_with_retries( + model="dashscope/qwen-turbo", + messages=[ + {"role": "user", "content": "Count from 1 to 3."} + ], + stream=True, + temperature=0.0 + ) + + # Collect stream chunks + chunks = [] + for chunk in response: + chunks.append(chunk) + + # Verify we got chunks + self.assertGreater(len(chunks), 0) + + # Get spans + spans = self.get_finished_spans() + self.assertGreaterEqual(len(spans), 1) + + # Find streaming span + stream_spans = [ + s for s in spans + if s.attributes.get("gen_ai.request.is_stream") == True + ] + + if len(stream_spans) > 0: + span = stream_spans[0] + # Verify TTFT is recorded + self.assertIn("gen_ai.response.time_to_first_token", span.attributes) + + def test_completion_retry_metrics(self): + """ + Test that retry calls generate appropriate metrics. + + This test verifies that metrics are properly recorded + for completion calls with retry logic, including the + final successful call metrics. + + The test verifies: + - Metrics are generated correctly + - Call count reflects actual calls made + - Duration includes retry time + """ + import litellm + + # Business demo: Simple retry call to verify metrics + response = litellm.completion_with_retries( + model="dashscope/qwen-turbo", + messages=[ + {"role": "user", "content": "Reply with OK."} + ] + ) + + # Verify response + self.assertIsNotNone(response) + + # Get metrics + metrics = self.get_sorted_metrics() + metric_names = [m.name for m in metrics] + + # Verify required metrics exist + self.assertIn("genai_calls_count", metric_names) + self.assertIn("genai_calls_duration_seconds", metric_names) + + # Verify call count + calls_metric = next(m for m in metrics if m.name == "genai_calls_count") + # Should have at least one call recorded + self.assertGreater(len(calls_metric.data.data_points), 0) + diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_stream_completion.py b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_stream_completion.py new file mode 100644 index 00000000..fc6b2db3 --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_stream_completion.py @@ -0,0 +1,309 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Test cases for streaming LiteLLM completion calls. + +This module tests streaming text generation functionality using LiteLLM's +streaming API, including both synchronous and asynchronous streaming. +""" + +import os +import json +import asyncio +from opentelemetry.test.test_base import TestBase +from opentelemetry.instrumentation.litellm import LiteLLMInstrumentor + + +class TestStreamCompletion(TestBase): + """ + Test streaming completion calls with LiteLLM. + """ + + def setUp(self): + super().setUp() + # Set up environment variables for testing + os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY", "test-openai-key-placeholder") + os.environ["DASHSCOPE_API_KEY"] = os.getenv("DASHSCOPE_API_KEY", "test-dashscope-key-placeholder") + os.environ["OPENAI_API_BASE"] = "https://dashscope.aliyuncs.com/compatible-mode/v1" + os.environ["DASHSCOPE_API_BASE"] = "https://dashscope.aliyuncs.com/compatible-mode/v1" + + # Instrument LiteLLM + LiteLLMInstrumentor().instrument( + tracer_provider=self.tracer_provider, + meter_provider=self.meter_provider + ) + + def tearDown(self): + super().tearDown() + # Uninstrument to avoid affecting other tests + LiteLLMInstrumentor().uninstrument() + from aliyun.sdk.extension.arms.semconv.metrics import SingletonMeta + SingletonMeta.reset() + + def test_sync_streaming_completion(self): + """ + Test synchronous streaming text generation. + + This test performs a streaming chat completion request using LiteLLM. + It iterates through the stream to collect chunks and verifies that + the complete response is assembled correctly. + + The test verifies: + - A span is created for the streaming call + - Stream parameter is captured (gen_ai.request.is_stream = True) + - Time to first token (TTFT) is recorded + - Complete output is captured after stream ends + - All required span attributes are present + """ + import litellm + + # Business demo: Synchronous streaming completion + # This demo makes a streaming call to dashscope/qwen-turbo model and collects all chunks + chunks = [] + response = litellm.completion( + model="dashscope/qwen-turbo", + messages=[ + {"role": "user", "content": "Count from 1 to 5 with commas between numbers."} + ], + stream=True, + temperature=0.1 + ) + + # Collect all streaming chunks + for chunk in response: + chunks.append(chunk) + if hasattr(chunk, 'choices') and len(chunk.choices) > 0: + delta = chunk.choices[0].delta + if hasattr(delta, 'content') and delta.content: + pass # Content is being streamed + + # Verify we received chunks + self.assertGreater(len(chunks), 0, "Should receive at least one chunk") + + # Get spans and verify instrumentation + spans = self.get_finished_spans() + self.assertEqual(len(spans), 1, "Expected exactly one span for streaming call") + + span = spans[0] + + # Verify span kind + self.assertEqual( + span.attributes.get("gen_ai.span.kind"), + "LLM", + "Span kind should be LLM" + ) + + # Verify streaming flag + self.assertEqual( + span.attributes.get("gen_ai.request.is_stream"), + True, + "Should indicate streaming mode" + ) + + # Verify required attributes + self.assertIn("gen_ai.system", span.attributes) + self.assertIn("gen_ai.request.model", span.attributes) + self.assertEqual(span.attributes.get("gen_ai.request.model"), "dashscope/qwen-turbo") + + # Verify token usage + self.assertIn("gen_ai.usage.input_tokens", span.attributes) + self.assertIn("gen_ai.usage.output_tokens", span.attributes) + self.assertGreater(span.attributes.get("gen_ai.usage.input_tokens"), 0) + self.assertGreater(span.attributes.get("gen_ai.usage.output_tokens"), 0) + + # Verify TTFT (Time To First Token) is recorded + self.assertIn("gen_ai.response.time_to_first_token", span.attributes) + ttft = span.attributes.get("gen_ai.response.time_to_first_token") + self.assertGreater(ttft, 0, "TTFT should be greater than 0") + + # Verify input messages + self.assertIn("gen_ai.input.messages", span.attributes) + input_messages = json.loads(span.attributes.get("gen_ai.input.messages")) + self.assertIsInstance(input_messages, list) + self.assertGreater(len(input_messages), 0) + + # Verify output messages (should be assembled from stream) + self.assertIn("gen_ai.output.messages", span.attributes) + output_messages = json.loads(span.attributes.get("gen_ai.output.messages")) + self.assertIsInstance(output_messages, list) + self.assertGreater(len(output_messages), 0) + + # Verify metrics + metrics = self.get_sorted_metrics() + metric_names = [m.name for m in metrics] + + self.assertIn("genai_calls_count", metric_names) + self.assertIn("genai_calls_duration_seconds", metric_names) + + def test_async_streaming_completion(self): + """ + Test asynchronous streaming text generation. + + This test performs an asynchronous streaming chat completion request. + It uses async/await syntax to iterate through the stream asynchronously. + + The test verifies: + - Async streaming works correctly + - All span attributes are captured for async calls + - TTFT is recorded for async streams + """ + import litellm + + async def run_async_stream(): + # Business demo: Asynchronous streaming completion + # This demo makes an async streaming call to dashscope/qwen-turbo model + chunks = [] + response = await litellm.acompletion( + model="dashscope/qwen-turbo", + messages=[ + {"role": "user", "content": "Say hello in 3 different languages."} + ], + stream=True, + temperature=0.3 + ) + + # Collect all streaming chunks + async for chunk in response: + chunks.append(chunk) + + # Explicitly close to ensure span finalization + if hasattr(response, 'close'): + response.close() + + return chunks + + # Run the async function + chunks = asyncio.run(run_async_stream()) + + # Verify we received chunks + self.assertGreater(len(chunks), 0, "Should receive at least one chunk") + + # Force flush to ensure spans are processed + if hasattr(self, 'tracer_provider') and self.tracer_provider: + self.tracer_provider.force_flush() + + # Get spans + spans = self.get_finished_spans() + self.assertEqual(len(spans), 1) + + span = spans[0] + + # Verify streaming attributes + self.assertEqual(span.attributes.get("gen_ai.span.kind"), "LLM") + self.assertEqual(span.attributes.get("gen_ai.request.is_stream"), True) + self.assertIn("gen_ai.response.time_to_first_token", span.attributes) + + # Verify token usage + self.assertIn("gen_ai.usage.input_tokens", span.attributes) + self.assertIn("gen_ai.usage.output_tokens", span.attributes) + + def test_streaming_with_early_termination(self): + """ + Test streaming completion with early termination. + + This test starts a streaming call but stops reading after a few chunks. + It verifies that the instrumentation handles partial streams correctly. + + The test verifies: + - Partial stream reading is handled correctly + - Span is still created and finalized + - Available data is captured even if stream is not fully consumed + """ + import litellm + + # Business demo: Streaming with early termination + # This demo starts a stream but stops reading after 3 chunks + chunks_read = 0 + max_chunks = 3 + + response = litellm.completion( + model="dashscope/qwen-turbo", + messages=[ + {"role": "user", "content": "Write a long story about a cat."} + ], + stream=True, + max_tokens=200 + ) + + # Read only first few chunks + for chunk in response: + chunks_read += 1 + if chunks_read >= max_chunks: + break + + # Explicitly close the stream to finalize span + if hasattr(response, 'close'): + response.close() + + # Verify we read the expected number of chunks + self.assertEqual(chunks_read, max_chunks) + + # Get spans + spans = self.get_finished_spans() + self.assertGreaterEqual(len(spans), 1, "Should have at least one span") + + span = spans[0] + + # Verify basic attributes are still captured + self.assertEqual(span.attributes.get("gen_ai.span.kind"), "LLM") + self.assertEqual(span.attributes.get("gen_ai.request.is_stream"), True) + self.assertIn("gen_ai.request.model", span.attributes) + + def test_streaming_multiple_choices(self): + """ + Test streaming completion with multiple choice outputs. + + This test requests multiple completion choices (n > 1) in streaming mode. + It verifies that all choices are properly captured. + + The test verifies: + - Multiple choices are handled in streaming mode + - gen_ai.request.choice.count is set correctly + - All choices are captured in output + """ + import litellm + + # Business demo: Streaming with multiple choices + # This demo requests 2 different completions for the same prompt + response = litellm.completion( + model="dashscope/qwen-turbo", + messages=[ + {"role": "user", "content": "What color is the sky?"} + ], + stream=True, + n=2, + temperature=0.8 + ) + + # Collect all chunks + chunks = list(response) + self.assertGreater(len(chunks), 0) + + # Get spans + spans = self.get_finished_spans() + self.assertEqual(len(spans), 1) + + span = spans[0] + + # Verify choice count + self.assertEqual( + span.attributes.get("gen_ai.request.choice.count"), + 2, + "Should request 2 choices" + ) + + # Verify streaming flag + self.assertEqual(span.attributes.get("gen_ai.request.is_stream"), True) + diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_sync_completion.py b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_sync_completion.py new file mode 100644 index 00000000..7a14a6d5 --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_sync_completion.py @@ -0,0 +1,260 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Test cases for synchronous LiteLLM completion calls. + +This module tests basic synchronous text generation functionality using LiteLLM's +completion API with various models and configurations. +""" + +import os +import json +from opentelemetry.test.test_base import TestBase +from opentelemetry.instrumentation.litellm import LiteLLMInstrumentor + + +class TestSyncCompletion(TestBase): + """ + Test synchronous completion calls with LiteLLM. + """ + + def setUp(self): + super().setUp() + # Set up environment variables for testing + os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY", "test-openai-key-placeholder") + os.environ["DASHSCOPE_API_KEY"] = os.getenv("DASHSCOPE_API_KEY", "test-dashscope-key-placeholder") + os.environ["OPENAI_API_BASE"] = "https://dashscope.aliyuncs.com/compatible-mode/v1" + os.environ["DASHSCOPE_API_BASE"] = "https://dashscope.aliyuncs.com/compatible-mode/v1" + + # Instrument LiteLLM + LiteLLMInstrumentor().instrument( + tracer_provider=self.tracer_provider, + meter_provider=self.meter_provider + ) + + def tearDown(self): + super().tearDown() + # Uninstrument to avoid affecting other tests + LiteLLMInstrumentor().uninstrument() + from aliyun.sdk.extension.arms.semconv.metrics import SingletonMeta + SingletonMeta.reset() + + def test_basic_sync_completion(self): + """ + Test basic synchronous text generation. + + This test performs a simple chat completion request using LiteLLM with the + dashscope/qwen-turbo model. It sends a single user message and expects a text response. + + The test verifies: + - A span is created with gen_ai.span.kind = "LLM" + - Required span attributes are present (model, system, tokens) + - Input and output messages are captured + - Metrics are recorded (calls count, duration, token usage) + """ + import litellm + + # Business demo: Simple chat completion with LiteLLM + # This demo makes a synchronous call to dashscope/qwen-turbo model with a simple question + response = litellm.completion( + model="dashscope/qwen-turbo", + messages=[ + {"role": "user", "content": "What is the capital of France? Answer in one word."} + ], + temperature=0.7, + max_tokens=50 + ) + + # Verify the response + self.assertIsNotNone(response) + self.assertTrue(hasattr(response, 'choices')) + self.assertGreater(len(response.choices), 0) + + # Get spans and verify instrumentation + spans = self.get_finished_spans() + self.assertEqual(len(spans), 1, "Expected exactly one span for completion call") + + span = spans[0] + + # Verify span kind + self.assertEqual( + span.attributes.get("gen_ai.span.kind"), + "LLM", + "Span kind should be LLM" + ) + + # Verify required attributes + self.assertIn("gen_ai.system", span.attributes) + self.assertIn("gen_ai.request.model", span.attributes) + self.assertEqual(span.attributes.get("gen_ai.request.model"), "dashscope/qwen-turbo") + + # Verify token usage (must be present and > 0) + self.assertIn("gen_ai.usage.input_tokens", span.attributes) + self.assertIn("gen_ai.usage.output_tokens", span.attributes) + self.assertIn("gen_ai.usage.total_tokens", span.attributes) + self.assertGreater(span.attributes.get("gen_ai.usage.input_tokens"), 0) + self.assertGreater(span.attributes.get("gen_ai.usage.output_tokens"), 0) + self.assertGreater(span.attributes.get("gen_ai.usage.total_tokens"), 0) + + # Verify input messages + self.assertIn("gen_ai.input.messages", span.attributes) + input_messages = json.loads(span.attributes.get("gen_ai.input.messages")) + self.assertIsInstance(input_messages, list) + self.assertGreater(len(input_messages), 0) + self.assertEqual(input_messages[0]["role"], "user") + + # Verify output messages + self.assertIn("gen_ai.output.messages", span.attributes) + output_messages = json.loads(span.attributes.get("gen_ai.output.messages")) + self.assertIsInstance(output_messages, list) + self.assertGreater(len(output_messages), 0) + + # Verify recommended attributes + self.assertIn("gen_ai.request.temperature", span.attributes) + self.assertIn("gen_ai.request.max_tokens", span.attributes) + self.assertIn("gen_ai.response.model", span.attributes) + self.assertIn("gen_ai.response.finish_reasons", span.attributes) + + # Verify metrics + metrics = self.get_sorted_metrics() + metric_names = [m.name for m in metrics] + + # Check for required metrics + self.assertIn("genai_calls_count", metric_names) + self.assertIn("genai_calls_duration_seconds", metric_names) + self.assertIn("genai_llm_usage_tokens", metric_names) + + # Verify genai_calls_count metric + calls_metric = next(m for m in metrics if m.name == "genai_calls_count") + self.assertEqual(len(calls_metric.data.data_points), 1) + data_point = calls_metric.data.data_points[0] + self.assertEqual(data_point.value, 1) + self.assertIn("modelName", data_point.attributes) + self.assertIn("spanKind", data_point.attributes) + self.assertEqual(data_point.attributes["spanKind"], "LLM") + + # Verify duration metric + duration_metric = next(m for m in metrics if m.name == "genai_calls_duration_seconds") + self.assertGreater(len(duration_metric.data.data_points), 0) + + # Verify token usage metric + token_metric = next(m for m in metrics if m.name == "genai_llm_usage_tokens") + token_data_points = token_metric.data.data_points + self.assertGreaterEqual(len(token_data_points), 2) # At least input and output + + # Verify usage types + usage_types = {dp.attributes.get("usageType") for dp in token_data_points} + self.assertIn("input", usage_types) + self.assertIn("output", usage_types) + + def test_sync_completion_with_multiple_messages(self): + """ + Test synchronous completion with conversation history. + + This test simulates a multi-turn conversation by providing system, user, + and assistant messages in the request. It verifies that all messages + are properly captured in the span attributes. + + The test verifies: + - Multiple messages are captured in input + - System message is properly handled + - Response includes proper assistant message + """ + import litellm + + # Business demo: Multi-turn conversation + # This demo simulates a conversation with system prompt and message history + messages = [ + {"role": "system", "content": "You are a helpful assistant that provides concise answers."}, + {"role": "user", "content": "What is 2+2?"}, + {"role": "assistant", "content": "4"}, + {"role": "user", "content": "What is 3+3?"} + ] + + response = litellm.completion( + model="dashscope/qwen-turbo", + messages=messages, + temperature=0.1 + ) + + # Verify response + self.assertIsNotNone(response) + + # Get spans + spans = self.get_finished_spans() + self.assertEqual(len(spans), 1) + + span = spans[0] + + # Verify input messages contain all provided messages + input_messages = json.loads(span.attributes.get("gen_ai.input.messages")) + self.assertEqual(len(input_messages), 4) + + # Verify message roles + self.assertEqual(input_messages[0]["role"], "system") + self.assertEqual(input_messages[1]["role"], "user") + self.assertEqual(input_messages[2]["role"], "assistant") + self.assertEqual(input_messages[3]["role"], "user") + + # Verify output + output_messages = json.loads(span.attributes.get("gen_ai.output.messages")) + self.assertGreater(len(output_messages), 0) + self.assertEqual(output_messages[0]["role"], "assistant") + + def test_sync_completion_with_parameters(self): + """ + Test synchronous completion with various LLM parameters. + + This test verifies that LLM parameters like temperature, top_p, max_tokens, + etc. are properly captured in the span attributes. + + The test verifies: + - All request parameters are captured + - Parameters are correctly recorded in span attributes + """ + import litellm + + # Business demo: Completion with various parameters + # This demo tests parameter capture including temperature, top_p, max_tokens, etc. + response = litellm.completion( + model="dashscope/qwen-turbo", + messages=[{"role": "user", "content": "Tell me a short joke."}], + temperature=0.9, + max_tokens=100, + top_p=0.95, + n=1, + stop=["END"], + seed=42 + ) + + # Verify response + self.assertIsNotNone(response) + + # Get span + spans = self.get_finished_spans() + span = spans[0] + + # Verify parameter attributes + self.assertEqual(span.attributes.get("gen_ai.request.temperature"), 0.9) + self.assertEqual(span.attributes.get("gen_ai.request.max_tokens"), 100) + self.assertEqual(span.attributes.get("gen_ai.request.top_p"), 0.95) + self.assertEqual(span.attributes.get("gen_ai.request.choice.count"), 1) + self.assertEqual(span.attributes.get("gen_ai.request.seed"), "42") + + # Verify stop sequences + self.assertIn("gen_ai.request.stop_sequences", span.attributes) + stop_sequences = span.attributes.get("gen_ai.request.stop_sequences") + self.assertIn("END", stop_sequences) + diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_tool_calls.py b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_tool_calls.py new file mode 100644 index 00000000..6b59f1d8 --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_tool_calls.py @@ -0,0 +1,416 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Test cases for LiteLLM tool/function calling. + +This module tests tool and function calling capabilities in LiteLLM, +including tool definitions, tool call requests, and tool call responses. +""" + +import os +import json +from opentelemetry.test.test_base import TestBase +from opentelemetry.instrumentation.litellm import LiteLLMInstrumentor + + +class TestToolCalls(TestBase): + """ + Test tool and function calling with LiteLLM. + """ + + def setUp(self): + super().setUp() + # Set up environment variables for testing + os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY", "test-openai-key-placeholder") + os.environ["DASHSCOPE_API_KEY"] = os.getenv("DASHSCOPE_API_KEY", "test-dashscope-key-placeholder") + os.environ["OPENAI_API_BASE"] = "https://dashscope.aliyuncs.com/compatible-mode/v1" + os.environ["DASHSCOPE_API_BASE"] = "https://dashscope.aliyuncs.com/compatible-mode/v1" + + # Instrument LiteLLM + LiteLLMInstrumentor().instrument( + tracer_provider=self.tracer_provider, + meter_provider=self.meter_provider + ) + + def tearDown(self): + super().tearDown() + # Uninstrument to avoid affecting other tests + LiteLLMInstrumentor().uninstrument() + from aliyun.sdk.extension.arms.semconv.metrics import SingletonMeta + SingletonMeta.reset() + + def test_completion_with_tool_definition(self): + """ + Test completion with tool definitions. + + This test provides tool definitions to the LLM and verifies + that the model can request tool calls. It checks that tool + definitions are properly captured in span attributes. + + The test verifies: + - Tool definitions are provided to the model + - gen_ai.tool.definitions attribute is set + - Tool definitions are properly formatted in span + - Model can request tool calls + """ + import litellm + + # Business demo: LLM call with tool definitions + # This demo defines a get_weather tool and asks a weather-related question + tools = [ + { + "type": "function", + "function": { + "name": "get_weather", + "description": "Get the current weather in a given location", + "parameters": { + "type": "object", + "properties": { + "location": { + "type": "string", + "description": "The city and state, e.g. San Francisco, CA", + }, + "unit": { + "type": "string", + "enum": ["celsius", "fahrenheit"], + "description": "The temperature unit", + }, + }, + "required": ["location"], + }, + }, + } + ] + + response = litellm.completion( + model="dashscope/qwen-plus", # Use a model that supports tool calling + messages=[ + {"role": "user", "content": "What's the weather like in San Francisco?"} + ], + tools=tools, + tool_choice="auto" + ) + + # Verify the response + self.assertIsNotNone(response) + self.assertTrue(hasattr(response, 'choices')) + self.assertGreater(len(response.choices), 0) + + # Get spans + spans = self.get_finished_spans() + self.assertEqual(len(spans), 1) + + span = spans[0] + + # Verify span kind + self.assertEqual(span.attributes.get("gen_ai.span.kind"), "LLM") + + # Verify tool definitions are captured + self.assertIn("gen_ai.tool.definitions", span.attributes) + tool_defs = span.attributes.get("gen_ai.tool.definitions") + self.assertIsNotNone(tool_defs) + + # Parse and verify tool definitions + tool_defs_json = json.loads(tool_defs) + self.assertIsInstance(tool_defs_json, list) + self.assertGreater(len(tool_defs_json), 0) + self.assertEqual(tool_defs_json[0]["function"]["name"], "get_weather") + + # Check if model requested a tool call + choice = response.choices[0] + message = choice.message + + if hasattr(message, 'tool_calls') and message.tool_calls: + # Model requested tool call - verify it's captured in output + self.assertIn("gen_ai.output.messages", span.attributes) + output_messages = json.loads(span.attributes.get("gen_ai.output.messages")) + self.assertIsInstance(output_messages, list) + self.assertGreater(len(output_messages), 0) + + # Check if tool call is in the output + output_msg = output_messages[0] + self.assertIn("parts", output_msg) + + def test_completion_with_multiple_tools(self): + """ + Test completion with multiple tool definitions. + + This test provides multiple tools to the model and verifies + that all tool definitions are captured correctly. + + The test verifies: + - Multiple tools can be defined + - All tool definitions are captured + - Model can choose appropriate tool + """ + import litellm + + # Business demo: Multiple tool definitions + # This demo defines multiple tools: get_weather and get_time + tools = [ + { + "type": "function", + "function": { + "name": "get_weather", + "description": "Get the current weather", + "parameters": { + "type": "object", + "properties": { + "location": {"type": "string"} + }, + "required": ["location"] + } + } + }, + { + "type": "function", + "function": { + "name": "get_time", + "description": "Get the current time", + "parameters": { + "type": "object", + "properties": { + "timezone": {"type": "string"} + }, + "required": ["timezone"] + } + } + } + ] + + response = litellm.completion( + model="dashscope/qwen-plus", + messages=[ + {"role": "user", "content": "What time is it in New York?"} + ], + tools=tools + ) + + # Verify response + self.assertIsNotNone(response) + + # Get spans + spans = self.get_finished_spans() + self.assertEqual(len(spans), 1) + + span = spans[0] + + # Verify tool definitions + self.assertIn("gen_ai.tool.definitions", span.attributes) + tool_defs = json.loads(span.attributes.get("gen_ai.tool.definitions")) + self.assertEqual(len(tool_defs), 2) + + # Verify both tools are present + tool_names = [tool["function"]["name"] for tool in tool_defs] + self.assertIn("get_weather", tool_names) + self.assertIn("get_time", tool_names) + + def test_completion_with_tool_response(self): + """ + Test completion with tool call and response. + + This test simulates a complete tool call flow: + 1. Ask a question that requires a tool + 2. Model requests tool call + 3. Execute tool and provide response + 4. Model generates final answer + + The test verifies: + - Tool call flow is properly captured + - Tool responses are included in conversation + - Multiple spans might be created for the flow + """ + import litellm + + # Business demo: Complete tool call workflow + # Step 1: Initial request with tool definition + tools = [ + { + "type": "function", + "function": { + "name": "calculator", + "description": "Perform basic arithmetic operations", + "parameters": { + "type": "object", + "properties": { + "operation": { + "type": "string", + "enum": ["add", "subtract", "multiply", "divide"] + }, + "a": {"type": "number"}, + "b": {"type": "number"} + }, + "required": ["operation", "a", "b"] + } + } + } + ] + + messages = [ + {"role": "user", "content": "What is 15 multiplied by 7?"} + ] + + response = litellm.completion( + model="dashscope/qwen-plus", + messages=messages, + tools=tools + ) + + # Check if model requested tool call + if hasattr(response.choices[0].message, 'tool_calls') and response.choices[0].message.tool_calls: + tool_call = response.choices[0].message.tool_calls[0] + + # Step 2: Simulate tool execution + # In real scenario, we would execute the tool here + tool_result = "105" + + # Step 3: Send tool response back to model + messages.append(response.choices[0].message) + messages.append({ + "role": "tool", + "tool_call_id": tool_call.id, + "content": tool_result + }) + + # Get final response + final_response = litellm.completion( + model="dashscope/qwen-plus", + messages=messages, + tools=tools + ) + + # Verify final response + self.assertIsNotNone(final_response) + + # Get spans - should have 2 spans (initial + final) + spans = self.get_finished_spans() + self.assertGreaterEqual(len(spans), 2) + + # Verify both spans are LLM spans + llm_spans = [s for s in spans if s.attributes.get("gen_ai.span.kind") == "LLM"] + self.assertGreaterEqual(len(llm_spans), 2) + + def test_function_calling_with_streaming(self): + """ + Test function calling with streaming enabled. + + This test verifies that tool calls work correctly with + streaming responses. + + The test verifies: + - Tool calls work with streaming + - Tool call information is captured in stream + - Final assembled response includes tool calls + """ + import litellm + + # Business demo: Tool calling with streaming + # This demo tests tool definitions with streaming enabled + tools = [ + { + "type": "function", + "function": { + "name": "search", + "description": "Search for information", + "parameters": { + "type": "object", + "properties": { + "query": {"type": "string"} + }, + "required": ["query"] + } + } + } + ] + + response = litellm.completion( + model="dashscope/qwen-plus", + messages=[ + {"role": "user", "content": "Search for latest AI news"} + ], + tools=tools, + stream=True + ) + + # Collect stream chunks + chunks = list(response) + self.assertGreater(len(chunks), 0) + + # Get spans + spans = self.get_finished_spans() + self.assertGreaterEqual(len(spans), 1) + + span = spans[0] + + # Verify streaming and tool definitions + self.assertEqual(span.attributes.get("gen_ai.request.is_stream"), True) + self.assertIn("gen_ai.tool.definitions", span.attributes) + + def test_tool_choice_parameter(self): + """ + Test different tool_choice parameter values. + + This test verifies that different tool_choice values + (auto, required, specific function) are handled correctly. + + The test verifies: + - tool_choice parameter is respected + - Instrumentation captures tool choice setting + """ + import litellm + + # Business demo: Tool choice parameter + # This demo tests forcing tool call with tool_choice + tools = [ + { + "type": "function", + "function": { + "name": "format_response", + "description": "Format the response in a specific way", + "parameters": { + "type": "object", + "properties": { + "format": { + "type": "string", + "enum": ["json", "xml", "plain"] + } + }, + "required": ["format"] + } + } + } + ] + + response = litellm.completion( + model="dashscope/qwen-plus", + messages=[ + {"role": "user", "content": "Format this as JSON: Hello World"} + ], + tools=tools, + tool_choice="required" # Force tool call + ) + + # Verify response + self.assertIsNotNone(response) + + # Get spans + spans = self.get_finished_spans() + self.assertEqual(len(spans), 1) + + span = spans[0] + + # Verify tool definitions are captured + self.assertIn("gen_ai.tool.definitions", span.attributes) + diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_utils.py b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_utils.py new file mode 100644 index 00000000..63b048c2 --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_utils.py @@ -0,0 +1,87 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Test cases for utility functions in LiteLLM instrumentation. +""" + +import unittest + +from opentelemetry.instrumentation.litellm._utils import parse_provider_from_model + + +class TestParseProviderFromModel(unittest.TestCase): + """ + Test cases for parse_provider_from_model function. + """ + + def test_empty_model_returns_none(self): + """Test that empty string returns None.""" + self.assertIsNone(parse_provider_from_model("")) + + def test_none_model_returns_none(self): + """Test that None returns None.""" + self.assertIsNone(parse_provider_from_model(None)) # type: ignore[arg-type] + + def test_model_with_slash_returns_provider_prefix(self): + """Test that model with '/' returns the provider prefix.""" + self.assertEqual(parse_provider_from_model("openai/gpt-4"), "openai") + self.assertEqual(parse_provider_from_model("dashscope/qwen-turbo"), "dashscope") + self.assertEqual(parse_provider_from_model("anthropic/claude-3"), "anthropic") + self.assertEqual(parse_provider_from_model("google/gemini-pro"), "google") + self.assertEqual(parse_provider_from_model("custom-provider/some-model"), "custom-provider") + + def test_model_with_multiple_slashes_returns_first_part(self): + """Test that model with multiple '/' returns only the first part.""" + self.assertEqual(parse_provider_from_model("openai/gpt-4/turbo"), "openai") + self.assertEqual(parse_provider_from_model("provider/model/version/extra"), "provider") + + def test_gpt_model_inferred_as_openai(self): + """Test that model containing 'gpt' is inferred as openai.""" + self.assertEqual(parse_provider_from_model("gpt-4"), "openai") + self.assertEqual(parse_provider_from_model("gpt-3.5-turbo"), "openai") + self.assertEqual(parse_provider_from_model("GPT-4"), "openai") + self.assertEqual(parse_provider_from_model("GPT-4-turbo"), "openai") + + def test_qwen_model_inferred_as_dashscope(self): + """Test that model containing 'qwen' is inferred as dashscope.""" + self.assertEqual(parse_provider_from_model("qwen-turbo"), "dashscope") + self.assertEqual(parse_provider_from_model("qwen-plus"), "dashscope") + self.assertEqual(parse_provider_from_model("QWEN-max"), "dashscope") + self.assertEqual(parse_provider_from_model("Qwen-VL"), "dashscope") + + def test_claude_model_inferred_as_anthropic(self): + """Test that model containing 'claude' is inferred as anthropic.""" + self.assertEqual(parse_provider_from_model("claude-3"), "anthropic") + self.assertEqual(parse_provider_from_model("claude-3-opus"), "anthropic") + self.assertEqual(parse_provider_from_model("CLAUDE-instant"), "anthropic") + self.assertEqual(parse_provider_from_model("Claude-2"), "anthropic") + + def test_gemini_model_inferred_as_google(self): + """Test that model containing 'gemini' is inferred as google.""" + self.assertEqual(parse_provider_from_model("gemini-pro"), "google") + self.assertEqual(parse_provider_from_model("gemini-1.5-pro"), "google") + self.assertEqual(parse_provider_from_model("GEMINI-ultra"), "google") + self.assertEqual(parse_provider_from_model("Gemini-nano"), "google") + + def test_unknown_model_returns_unknown(self): + """Test that unrecognized model names return 'unknown'.""" + self.assertEqual(parse_provider_from_model("llama-2"), "unknown") + self.assertEqual(parse_provider_from_model("mistral-7b"), "unknown") + self.assertEqual(parse_provider_from_model("some-random-model"), "unknown") + self.assertEqual(parse_provider_from_model("custom-model"), "unknown") + + +if __name__ == "__main__": + unittest.main()