Skip to content

Commit 404098d

Browse files
instrument cua, fixes and tests to litellm responses (#187)
* instrument cua, fixes and tests to litellm responses * Update tests/test_litellm_openai.py Co-authored-by: ellipsis-dev[bot] <65095814+ellipsis-dev[bot]@users.noreply.github.com> * Update src/lmnr/opentelemetry_lib/opentelemetry/instrumentation/cua_agent/__init__.py Co-authored-by: ellipsis-dev[bot] <65095814+ellipsis-dev[bot]@users.noreply.github.com> * Update src/lmnr/opentelemetry_lib/opentelemetry/instrumentation/cua_computer/__init__.py Co-authored-by: ellipsis-dev[bot] <65095814+ellipsis-dev[bot]@users.noreply.github.com> * add ruff config python 310 * tool span, dont record screenshot, rename step span, skip empty step span * fix type hint --------- Co-authored-by: ellipsis-dev[bot] <65095814+ellipsis-dev[bot]@users.noreply.github.com>
1 parent 5622871 commit 404098d

File tree

14 files changed

+1038
-79
lines changed

14 files changed

+1038
-79
lines changed

pyproject.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,3 +144,6 @@ build-backend = "uv_build"
144144

145145
[tool.uv.workspace]
146146
members = ["examples/fastapi-app"]
147+
148+
[tool.ruff]
149+
target-version = "py310"

src/lmnr/opentelemetry_lib/decorators/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ def wrap(*args, **kwargs):
218218
except Exception as e:
219219
_process_exception(span, e)
220220
_cleanup_span(span, wrapper)
221-
raise e
221+
raise
222222
finally:
223223
# Always restore global context
224224
context_api.detach(ctx_token)

src/lmnr/opentelemetry_lib/litellm/__init__.py

Lines changed: 107 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,12 @@
33
import json
44
from datetime import datetime
55

6+
from opentelemetry.semconv._incubating.attributes.gen_ai_attributes import GEN_AI_PROMPT
67
from opentelemetry.trace import SpanKind, Status, StatusCode, Tracer
78
from lmnr.opentelemetry_lib.decorators import json_dumps
89
from lmnr.opentelemetry_lib.litellm.utils import (
910
get_tool_definition,
11+
is_validator_iterator,
1012
model_as_dict,
1113
set_span_attribute,
1214
)
@@ -245,35 +247,107 @@ def _process_input_messages(self, span, messages):
245247
if not isinstance(messages, list):
246248
return
247249

248-
for i, message in enumerate(messages):
249-
message_dict = model_as_dict(message)
250-
role = message_dict.get("role", "unknown")
251-
set_span_attribute(span, f"gen_ai.prompt.{i}.role", role)
252-
253-
tool_calls = message_dict.get("tool_calls", [])
254-
self._process_tool_calls(span, tool_calls, i, is_response=False)
250+
prompt_index = 0
251+
for item in messages:
252+
block_dict = model_as_dict(item)
253+
if block_dict.get("type", "message") == "message":
254+
tool_calls = block_dict.get("tool_calls", [])
255+
self._process_tool_calls(
256+
span, tool_calls, prompt_index, is_response=False
257+
)
258+
content = block_dict.get("content")
259+
if is_validator_iterator(content):
260+
# Have not been able to catch this in the wild, but keeping
261+
# just in case, as raw OpenAI responses do that
262+
content = [self._process_content_part(part) for part in content]
263+
try:
264+
stringified_content = (
265+
content if isinstance(content, str) else json_dumps(content)
266+
)
267+
except Exception:
268+
stringified_content = (
269+
str(content) if content is not None else ""
270+
)
271+
set_span_attribute(
272+
span,
273+
f"{GEN_AI_PROMPT}.{prompt_index}.content",
274+
stringified_content,
275+
)
276+
set_span_attribute(
277+
span,
278+
f"{GEN_AI_PROMPT}.{prompt_index}.role",
279+
block_dict.get("role"),
280+
)
281+
prompt_index += 1
255282

256-
content = message_dict.get("content", "")
257-
if content is None:
258-
continue
259-
if isinstance(content, str):
260-
set_span_attribute(span, f"gen_ai.prompt.{i}.content", content)
261-
elif isinstance(content, list):
283+
elif block_dict.get("type") == "computer_call_output":
262284
set_span_attribute(
263-
span, f"gen_ai.prompt.{i}.content", json.dumps(content)
285+
span,
286+
f"{GEN_AI_PROMPT}.{prompt_index}.role",
287+
"computer_call_output",
264288
)
265-
else:
289+
output_image_url = block_dict.get("output", {}).get("image_url")
290+
if output_image_url:
291+
set_span_attribute(
292+
span,
293+
f"{GEN_AI_PROMPT}.{prompt_index}.content",
294+
json.dumps(
295+
[
296+
{
297+
"type": "image_url",
298+
"image_url": {"url": output_image_url},
299+
}
300+
]
301+
),
302+
)
303+
prompt_index += 1
304+
elif block_dict.get("type") == "computer_call":
305+
set_span_attribute(
306+
span, f"{GEN_AI_PROMPT}.{prompt_index}.role", "assistant"
307+
)
308+
call_content = {}
309+
if block_dict.get("id"):
310+
call_content["id"] = block_dict.get("id")
311+
if block_dict.get("action"):
312+
call_content["action"] = block_dict.get("action")
266313
set_span_attribute(
267314
span,
268-
f"gen_ai.prompt.{i}.content",
269-
json.dumps(model_as_dict(content)),
315+
f"{GEN_AI_PROMPT}.{prompt_index}.tool_calls.0.arguments",
316+
json.dumps(call_content),
270317
)
271-
if role == "tool":
272318
set_span_attribute(
273319
span,
274-
f"gen_ai.prompt.{i}.tool_call_id",
275-
message_dict.get("tool_call_id"),
320+
f"{GEN_AI_PROMPT}.{prompt_index}.tool_calls.0.id",
321+
block_dict.get("call_id"),
276322
)
323+
set_span_attribute(
324+
span,
325+
f"{GEN_AI_PROMPT}.{prompt_index}.tool_calls.0.name",
326+
"computer_call",
327+
)
328+
prompt_index += 1
329+
elif block_dict.get("type") == "reasoning":
330+
reasoning_summary = block_dict.get("summary")
331+
if reasoning_summary and isinstance(reasoning_summary, list):
332+
processed_chunks = [
333+
{"type": "text", "text": chunk.get("text")}
334+
for chunk in reasoning_summary
335+
if isinstance(chunk, dict)
336+
and chunk.get("type") == "summary_text"
337+
]
338+
set_span_attribute(
339+
span,
340+
f"{GEN_AI_PROMPT}.{prompt_index}.reasoning",
341+
json_dumps(processed_chunks),
342+
)
343+
set_span_attribute(
344+
span,
345+
f"{GEN_AI_PROMPT}.{prompt_index}.role",
346+
"assistant",
347+
)
348+
# reasoning is followed by other content parts in the same messge,
349+
# so we don't increment the prompt index
350+
# TODO: handle other block types
277351

278352
def _process_request_tool_definitions(self, span, tools):
279353
"""Process and set tool definitions attributes on the span"""
@@ -493,11 +567,19 @@ def _process_response_output(self, span, output):
493567
)
494568
tool_call_index += 1
495569
elif block_dict.get("type") == "reasoning":
496-
set_span_attribute(
497-
span,
498-
"gen_ai.completion.0.reasoning",
499-
block_dict.get("summary"),
500-
)
570+
reasoning_summary = block_dict.get("summary")
571+
if reasoning_summary and isinstance(reasoning_summary, list):
572+
processed_chunks = [
573+
{"type": "text", "text": chunk.get("text")}
574+
for chunk in reasoning_summary
575+
if isinstance(chunk, dict)
576+
and chunk.get("type") == "summary_text"
577+
]
578+
set_span_attribute(
579+
span,
580+
"gen_ai.completion.0.reasoning",
581+
json_dumps(processed_chunks),
582+
)
501583
# TODO: handle other block types, in particular other calls
502584

503585
def _process_success_response(self, span, response_obj):

src/lmnr/opentelemetry_lib/litellm/utils.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import re
12
from pydantic import BaseModel
23
from opentelemetry.sdk.trace import Span
34
from opentelemetry.util.types import AttributeValue
@@ -80,3 +81,14 @@ def get_tool_definition(tool: dict) -> ToolDefinition:
8081
description=description,
8182
parameters=parameters,
8283
)
84+
85+
86+
def is_validator_iterator(content):
87+
"""
88+
Some OpenAI objects contain fields typed as Iterable, which pydantic
89+
internally converts to a ValidatorIterator, and they cannot be trivially
90+
serialized without consuming the iterator to, for example, a list.
91+
92+
See: https://github.com/pydantic/pydantic/issues/9541#issuecomment-2189045051
93+
"""
94+
return re.search(r"pydantic.*ValidatorIterator'>$", str(type(content)))
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
"""OpenTelemetry CUA instrumentation"""
2+
3+
import logging
4+
from typing import Any, AsyncGenerator, Collection
5+
6+
from lmnr.opentelemetry_lib.decorators import json_dumps
7+
from lmnr import Laminar
8+
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
9+
from opentelemetry.instrumentation.utils import unwrap
10+
11+
from opentelemetry.trace import Span
12+
from opentelemetry.trace.status import Status, StatusCode
13+
from wrapt import wrap_function_wrapper
14+
15+
logger = logging.getLogger(__name__)
16+
17+
_instruments = ("cua-agent >= 0.4.0",)
18+
19+
20+
def _wrap_run(
21+
wrapped,
22+
instance,
23+
args,
24+
kwargs,
25+
):
26+
parent_span = Laminar.start_span("ComputerAgent.run")
27+
instance._lmnr_parent_span = parent_span
28+
29+
try:
30+
result: AsyncGenerator[dict[str, Any], None] = wrapped(*args, **kwargs)
31+
return _abuild_from_streaming_response(parent_span, result)
32+
except Exception as e:
33+
if parent_span.is_recording():
34+
parent_span.set_status(Status(StatusCode.ERROR))
35+
parent_span.record_exception(e)
36+
parent_span.end()
37+
raise
38+
39+
40+
async def _abuild_from_streaming_response(
41+
parent_span: Span, response: AsyncGenerator[dict[str, Any], None]
42+
) -> AsyncGenerator[dict[str, Any], None]:
43+
with Laminar.use_span(parent_span, end_on_exit=True):
44+
response_iter = aiter(response)
45+
while True:
46+
step = None
47+
step_span = Laminar.start_span("ComputerAgent.step")
48+
with Laminar.use_span(step_span):
49+
try:
50+
step = await anext(response_iter)
51+
step_span.set_attribute("lmnr.span.output", json_dumps(step))
52+
try:
53+
# When processing tool calls, each output item is processed separately,
54+
# if the output is message, agent.step returns an empty array
55+
# https://github.com/trycua/cua/blob/17d670962970a1d1774daaec029ebf92f1f9235e/libs/python/agent/agent/agent.py#L459
56+
if len(step.get("output", [])) == 0:
57+
continue
58+
except Exception:
59+
pass
60+
if step_span.is_recording():
61+
step_span.end()
62+
except StopAsyncIteration:
63+
# don't end on purpose, there is no iteration step here.
64+
break
65+
66+
if step is not None:
67+
yield step
68+
69+
70+
class CuaAgentInstrumentor(BaseInstrumentor):
71+
def __init__(self):
72+
super().__init__()
73+
74+
def instrumentation_dependencies(self) -> Collection[str]:
75+
return _instruments
76+
77+
def _instrument(self, **kwargs):
78+
wrap_package = "agent.agent"
79+
wrap_object = "ComputerAgent"
80+
wrap_method = "run"
81+
try:
82+
wrap_function_wrapper(
83+
wrap_package,
84+
f"{wrap_object}.{wrap_method}",
85+
_wrap_run,
86+
)
87+
except ModuleNotFoundError:
88+
pass # that's ok, we don't want to fail if some methods do not exist
89+
90+
def _uninstrument(self, **kwargs):
91+
wrap_package = "agent.agent"
92+
wrap_object = "ComputerAgent"
93+
wrap_method = "run"
94+
try:
95+
unwrap(
96+
f"{wrap_package}.{wrap_object}",
97+
wrap_method,
98+
)
99+
except ModuleNotFoundError:
100+
pass # that's ok, we don't want to fail if some methods do not exist

0 commit comments

Comments
 (0)