Skip to content

Commit d95d78d

Browse files
fix(pydantic-ai): Set system instructions after instructions are set on the request object
1 parent f590e32 commit d95d78d

File tree

8 files changed

+513
-220
lines changed

8 files changed

+513
-220
lines changed
Lines changed: 278 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,278 @@
1+
import sys
2+
import itertools
3+
from collections.abc import Sequence
4+
from functools import wraps
5+
6+
import sentry_sdk
7+
from sentry_sdk.integrations import DidNotEnable
8+
from sentry_sdk.integrations.pydantic_ai.utils import _should_send_prompts
9+
from sentry_sdk.utils import capture_internal_exceptions, reraise
10+
from sentry_sdk.consts import SPANDATA
11+
from sentry_sdk.ai import set_data_normalized
12+
13+
from ..spans import invoke_agent_span, update_invoke_agent_span
14+
from ..utils import _capture_exception, pop_agent, push_agent
15+
16+
from typing import TYPE_CHECKING
17+
18+
try:
19+
from pydantic_ai.agent import Agent # type: ignore
20+
except ImportError:
21+
raise DidNotEnable("pydantic-ai not installed")
22+
23+
if TYPE_CHECKING:
24+
from typing import Any, Callable, Optional, Union
25+
26+
from pydantic_ai.agent.abstract import Instructions
27+
from pydantic_ai.tools import SystemPromptFunc
28+
29+
from sentry_sdk._types import TextPart
30+
31+
32+
def _get_current_system_instructions(
33+
agent_creation_time_instructions: "list[Union[str, SystemPromptFunc]]",
34+
system_instructions: "Optional[Instructions[Any]]",
35+
) -> "list[str]":
36+
instruction_texts: "list[str]" = []
37+
38+
instruction_texts += [
39+
instruction
40+
for instruction in agent_creation_time_instructions
41+
if isinstance(instruction, str)
42+
]
43+
44+
if isinstance(system_instructions, str):
45+
instruction_texts.append(system_instructions)
46+
47+
elif isinstance(system_instructions, Sequence):
48+
instruction_texts += [
49+
instruction
50+
for instruction in system_instructions
51+
if isinstance(instruction, str)
52+
]
53+
54+
return instruction_texts
55+
56+
57+
def _transform_system_instructions(
58+
permanent_instructions: "list[str]", current_instructions: "list[str]"
59+
) -> "list[TextPart]":
60+
return [
61+
{
62+
"type": "text",
63+
"content": instruction,
64+
}
65+
for instruction in itertools.chain(permanent_instructions, current_instructions)
66+
]
67+
68+
69+
class _StreamingContextManagerWrapper:
70+
"""Wrapper for streaming methods that return async context managers."""
71+
72+
def __init__(
73+
self,
74+
agent: "Any",
75+
original_ctx_manager: "Any",
76+
user_prompt: "Any",
77+
model: "Any",
78+
model_settings: "Any",
79+
is_streaming: bool = True,
80+
) -> None:
81+
self.agent = agent
82+
self.original_ctx_manager = original_ctx_manager
83+
self.user_prompt = user_prompt
84+
self.model = model
85+
self.model_settings = model_settings
86+
self.is_streaming = is_streaming
87+
self._isolation_scope: "Any" = None
88+
self._span: "Optional[sentry_sdk.tracing.Span]" = None
89+
self._result: "Any" = None
90+
91+
async def __aenter__(self) -> "Any":
92+
# Set up isolation scope and invoke_agent span
93+
self._isolation_scope = sentry_sdk.isolation_scope()
94+
self._isolation_scope.__enter__()
95+
96+
# Create invoke_agent span (will be closed in __aexit__)
97+
self._span = invoke_agent_span(
98+
self.user_prompt,
99+
self.agent,
100+
self.model,
101+
self.model_settings,
102+
self.is_streaming,
103+
)
104+
self._span.__enter__()
105+
106+
# Push agent to contextvar stack after span is successfully created and entered
107+
# This ensures proper pairing with pop_agent() in __aexit__ even if exceptions occur
108+
push_agent(self.agent, self.is_streaming)
109+
110+
# Enter the original context manager
111+
result = await self.original_ctx_manager.__aenter__()
112+
self._result = result
113+
return result
114+
115+
async def __aexit__(self, exc_type: "Any", exc_val: "Any", exc_tb: "Any") -> None:
116+
try:
117+
# Exit the original context manager first
118+
await self.original_ctx_manager.__aexit__(exc_type, exc_val, exc_tb)
119+
120+
# Update span with result if successful
121+
if exc_type is None and self._result and self._span is not None:
122+
update_invoke_agent_span(self._span, self._result)
123+
finally:
124+
# Pop agent from contextvar stack
125+
pop_agent()
126+
127+
# Clean up invoke span
128+
if self._span:
129+
self._span.__exit__(exc_type, exc_val, exc_tb)
130+
131+
# Clean up isolation scope
132+
if self._isolation_scope:
133+
self._isolation_scope.__exit__(exc_type, exc_val, exc_tb)
134+
135+
136+
def _create_run_wrapper(
137+
original_func: "Callable[..., Any]", is_streaming: bool = False
138+
) -> "Callable[..., Any]":
139+
"""
140+
Wraps the Agent.run method to create an invoke_agent span.
141+
142+
Args:
143+
original_func: The original run method
144+
is_streaming: Whether this is a streaming method (for future use)
145+
"""
146+
147+
@wraps(original_func)
148+
async def wrapper(self: "Agent", *args: "Any", **kwargs: "Any") -> "Any":
149+
# Isolate each workflow so that when agents are run in asyncio tasks they
150+
# don't touch each other's scopes
151+
with sentry_sdk.isolation_scope():
152+
# Extract parameters for the span
153+
user_prompt = kwargs.get("user_prompt") or (args[0] if args else None)
154+
model = kwargs.get("model")
155+
model_settings = kwargs.get("model_settings")
156+
157+
permanent_instructions = list(self._system_prompts)
158+
159+
instructions: "Optional[Instructions[Any]]" = kwargs.get("instructions")
160+
current_instructions = _get_current_system_instructions(
161+
self._instructions, instructions
162+
)
163+
164+
# Create invoke_agent span
165+
with invoke_agent_span(
166+
user_prompt, self, model, model_settings, is_streaming
167+
) as span:
168+
if _should_send_prompts() and (
169+
len(permanent_instructions) > 0 or len(current_instructions) > 0
170+
):
171+
set_data_normalized(
172+
span,
173+
SPANDATA.GEN_AI_SYSTEM_INSTRUCTIONS,
174+
_transform_system_instructions(
175+
permanent_instructions, current_instructions
176+
),
177+
unpack=False,
178+
)
179+
180+
# Push agent to contextvar stack after span is successfully created and entered
181+
# This ensures proper pairing with pop_agent() in finally even if exceptions occur
182+
push_agent(self, is_streaming)
183+
184+
try:
185+
result = await original_func(self, *args, **kwargs)
186+
187+
# Update span with result
188+
update_invoke_agent_span(span, result)
189+
190+
return result
191+
except Exception as exc:
192+
exc_info = sys.exc_info()
193+
with capture_internal_exceptions():
194+
_capture_exception(exc)
195+
reraise(*exc_info)
196+
finally:
197+
# Pop agent from contextvar stack
198+
pop_agent()
199+
200+
return wrapper
201+
202+
203+
def _create_streaming_wrapper(
204+
original_func: "Callable[..., Any]",
205+
) -> "Callable[..., Any]":
206+
"""
207+
Wraps run_stream method that returns an async context manager.
208+
"""
209+
210+
@wraps(original_func)
211+
def wrapper(self: "Any", *args: "Any", **kwargs: "Any") -> "Any":
212+
# Extract parameters for the span
213+
user_prompt = kwargs.get("user_prompt") or (args[0] if args else None)
214+
model = kwargs.get("model")
215+
model_settings = kwargs.get("model_settings")
216+
217+
# Call original function to get the context manager
218+
original_ctx_manager = original_func(self, *args, **kwargs)
219+
220+
# Wrap it with our instrumentation
221+
return _StreamingContextManagerWrapper(
222+
agent=self,
223+
original_ctx_manager=original_ctx_manager,
224+
user_prompt=user_prompt,
225+
model=model,
226+
model_settings=model_settings,
227+
is_streaming=True,
228+
)
229+
230+
return wrapper
231+
232+
233+
def _create_streaming_events_wrapper(
234+
original_func: "Callable[..., Any]",
235+
) -> "Callable[..., Any]":
236+
"""
237+
Wraps run_stream_events method - no span needed as it delegates to run().
238+
239+
Note: run_stream_events internally calls self.run() with an event_stream_handler,
240+
so the invoke_agent span will be created by the run() wrapper.
241+
"""
242+
243+
@wraps(original_func)
244+
async def wrapper(self: "Any", *args: "Any", **kwargs: "Any") -> "Any":
245+
# Just call the original generator - it will call run() which has the instrumentation
246+
try:
247+
async for event in original_func(self, *args, **kwargs):
248+
yield event
249+
except Exception as exc:
250+
exc_info = sys.exc_info()
251+
with capture_internal_exceptions():
252+
_capture_exception(exc)
253+
reraise(*exc_info)
254+
255+
return wrapper
256+
257+
258+
def _patch_agent_run() -> None:
259+
"""
260+
Patches the Agent run methods to create spans for agent execution.
261+
262+
This patches both non-streaming (run, run_sync) and streaming
263+
(run_stream, run_stream_events) methods.
264+
"""
265+
266+
# Store original methods
267+
original_run = Agent.run
268+
original_run_stream = Agent.run_stream
269+
original_run_stream_events = Agent.run_stream_events
270+
271+
# Wrap and apply patches for non-streaming methods
272+
Agent.run = _create_run_wrapper(original_run, is_streaming=False)
273+
274+
# Wrap and apply patches for streaming methods
275+
Agent.run_stream = _create_streaming_wrapper(original_run_stream)
276+
Agent.run_stream_events = _create_streaming_events_wrapper(
277+
original_run_stream_events
278+
)

sentry_sdk/integrations/pydantic_ai/patches/graph_nodes.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
ai_client_span,
88
update_ai_client_span,
99
)
10+
from ..utils import _set_input_messages
1011

1112
try:
1213
from pydantic_ai._agent_graph import ModelRequestNode # type: ignore
@@ -59,9 +60,12 @@ def _patch_graph_nodes() -> None:
5960
async def wrapped_model_request_run(self: "Any", ctx: "Any") -> "Any":
6061
messages, model, model_settings = _extract_span_data(self, ctx)
6162

62-
with ai_client_span(messages, None, model, model_settings) as span:
63+
with ai_client_span(None, model, model_settings) as span:
6364
result = await original_model_request_run(self, ctx)
6465

66+
if messages:
67+
_set_input_messages(span, messages)
68+
6569
# Extract response from result if available
6670
model_response = None
6771
if hasattr(result, "model_response"):
@@ -86,7 +90,10 @@ async def wrapped_model_request_stream(self: "Any", ctx: "Any") -> "Any":
8690
messages, model, model_settings = _extract_span_data(self, ctx)
8791

8892
# Create chat span for streaming request
89-
with ai_client_span(messages, None, model, model_settings) as span:
93+
with ai_client_span(None, model, model_settings) as span:
94+
if messages:
95+
_set_input_messages(span, messages)
96+
9097
# Call the original stream method
9198
async with original_stream_method(self, ctx) as stream:
9299
yield stream

sentry_sdk/integrations/pydantic_ai/patches/model_request.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33

44
from sentry_sdk.integrations import DidNotEnable
55

6+
from ..utils import _set_input_messages
7+
68
try:
79
from pydantic_ai import models # type: ignore
810
except ImportError:
@@ -32,7 +34,10 @@ async def wrapped_request(
3234
self: "Any", messages: "Any", *args: "Any", **kwargs: "Any"
3335
) -> "Any":
3436
# Pass all messages (full conversation history)
35-
with ai_client_span(messages, None, self, None) as span:
37+
with ai_client_span(None, self, None) as span:
38+
if messages:
39+
_set_input_messages(span, messages)
40+
3641
result = await original_request(self, messages, *args, **kwargs)
3742
update_ai_client_span(span, result)
3843
return result

0 commit comments

Comments
 (0)