Skip to content

fix: crashing while exiting loop agent in web #566

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
8 changes: 6 additions & 2 deletions src/google/adk/agents/base_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,10 @@ async def run_async(
Yields:
Event: the events generated by the agent.
"""

with tracer.start_as_current_span(f'agent_run [{self.name}]'):
span = tracer.start_span(
f'agent_run [{self.name}]'
) # Start the span explicitly
try:
ctx = self._create_invocation_context(parent_context)

if event := await self.__handle_before_agent_callback(ctx):
Expand All @@ -152,6 +154,8 @@ async def run_async(

if event := await self.__handle_after_agent_callback(ctx):
yield event
finally:
span.end() # End the span explicitly

@final
async def run_live(
Expand Down
25 changes: 23 additions & 2 deletions src/google/adk/agents/loop_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from __future__ import annotations

from google.genai import types
from typing import AsyncGenerator
from typing import Optional

Expand Down Expand Up @@ -48,9 +49,29 @@ async def _run_async_impl(
while not self.max_iterations or times_looped < self.max_iterations:
for sub_agent in self.sub_agents:
async for event in sub_agent.run_async(ctx):
yield event
yield event # Yield the event immediately
if event.actions.escalate:
return
# Ensure the escalation message is processed immediately
if event.content and event.content.parts:
for part in event.content.parts:
if part.function_response.response:
# Yield the escalation message as a new event
yield Event(
invocation_id=ctx.invocation_id,
author=sub_agent.name,
branch=ctx.branch,
content=types.Content(
role='assistant',
parts=[
types.Part(
text=part.function_response.response.get(
'message'
),
)
],
),
)
return # Exit the loop after escalation
times_looped += 1
return

Expand Down
5 changes: 4 additions & 1 deletion src/google/adk/flows/llm_flows/base_llm_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,8 @@ async def _call_llm_async(

# Calls the LLM.
llm = self.__get_llm(invocation_context)
with tracer.start_as_current_span('call_llm'):
span = tracer.start_span(f'call_llm') # Start the span explicitly
try:
if invocation_context.run_config.support_cfc:
invocation_context.live_request_queue = LiveRequestQueue()
async for llm_response in self.run_live(invocation_context):
Expand Down Expand Up @@ -507,6 +508,8 @@ async def _call_llm_async(
llm_response = altered_llm_response

yield llm_response
finally:
span.end() # End the span explicitly

async def _handle_before_model_callback(
self,
Expand Down
6 changes: 5 additions & 1 deletion src/google/adk/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,8 @@ async def run_async(
Yields:
The events generated by the agent.
"""
with tracer.start_as_current_span('invocation'):
span = tracer.start_span('invocation') # Start the span explicitly
try:
session = self.session_service.get_session(
app_name=self.app_name, user_id=user_id, session_id=session_id
)
Expand All @@ -196,6 +197,9 @@ async def run_async(
if not event.partial:
self.session_service.append_event(session=session, event=event)
yield event
finally:
# Ensure the span is finished when the function exits
span.end()

async def _append_new_message_to_session(
self,
Expand Down
10 changes: 5 additions & 5 deletions src/google/adk/sessions/database_session_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -484,11 +484,11 @@ def append_event(self, session: Session, event: Event) -> Event:

if storage_session.update_time.timestamp() > session.last_update_time:
raise ValueError(
f"Session last_update_time "
f"{datetime.fromtimestamp(session.last_update_time):%Y-%m-%d %H:%M:%S} "
f"is later than the update_time in storage "
f"{storage_session.update_time:%Y-%m-%d %H:%M:%S}"
)
"Session last_update_time"
f" {datetime.fromtimestamp(session.last_update_time):%Y-%m-%d %H:%M:%S} is"
" later than the update_time in storage"
f" {storage_session.update_time:%Y-%m-%d %H:%M:%S}"
)

# Fetch states from storage
storage_app_state = sessionFactory.get(
Expand Down