Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions examples/voice_agents/async_tool_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
AgentSession,
JobContext,
RunContext,
ToolExecutionUpdatedEvent,
cli,
inference,
llm,
)
from livekit.agents.utils import aio

logger = logging.getLogger("async-travel-helper")

Expand Down Expand Up @@ -118,6 +120,9 @@ async def book_flight(self, ctx: RunContext, origin: str, destination: str, date
)
self._user_email = email.email_address
logger.info(f"User's email address: {self._user_email}")

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe we should redact the email, at least part of it, in the log.

await ctx.update(
"Thanks for providing your email address, we are confirming the booking now."
)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One more issue I noticed when testing:

It booked multiple flights for me because I interrupted one of its reply:

2026-06-24 14:31:17,772 - INFO async-travel-helper - Found airlines and prices, booking the flight... {"pid": 93851, "job_id": "AJ_JM9aU7VreFcV", "room": "console-e12dbf6f"}
2026-06-24 14:31:17,772 - INFO async-travel-helper - Getting user's email address {"pid": 93851, "job_id": "AJ_JM9aU7VreFcV", "room": "console-e12dbf6f"}
2026-06-24 14:31:17,773 - DEBUG livekit.agents - generate async tool reply {"speech_id": "speech_3809790b7599", "items": [["book_flight", "call_d8tto1um49bc73dpqdag_update_1"]], "updates_at_tail": true, "pid": 93851, "job_id": "AJ_JM9aU7VreFcV", "room": "console-e12dbf6f"}
2026-06-24 14:31:21,438 - DEBUG livekit.agents - received user transcript {"user_transcript": "No. That's it.", "language": "en", "transcript_delay": 1.2486140727996826, "pid": 93851, "job_id": "AJ_JM9aU7VreFcV", "room": "console-e12dbf6f"}
2026-06-24 14:31:21,442 - DEBUG livekit.agents - eot prediction {"probability": 0.544859766960144, "unlikely_threshold": 0.56, "endpointing_delay": 2.5, "language": "en", "trigger": "stt", "from_cache": true, "pid": 93851, "job_id": "AJ_JM9aU7VreFcV", "room": "console-e12dbf6f"}
2026-06-24 14:31:21,462 - DEBUG livekit.agents - conversation_item_added {"role": "assistant", "text": "I have", "pid": 93851, "job_id": "AJ_JM9aU7VreFcV", "room": "console-e12dbf6f"}
2026-06-24 14:31:23,589 - DEBUG livekit.agents - user turn committed {"last_speaking_time": 1782307881.088463, "last_final_transcript_time": 1782307881.4394088, "speech_start_time": 1782307879.132936, "delay_completed": true, "source": "stt", "end_of_turn_probability": 0.544859766960144, "unlikely_threshold": 0.56, "pid": 93851, "job_id": "AJ_JM9aU7VreFcV", "room": "console-e12dbf6f"}
2026-06-24 14:31:23,617 - DEBUG livekit.agents - conversation_item_added {"role": "user", "text": "No. That's it.", "pid": 93851, "job_id": "AJ_JM9aU7VreFcV", "room": "console-e12dbf6f"}
2026-06-24 14:31:24,699 - DEBUG livekit.agents - executing tool {"function": "book_flight", "arguments": "{\"date\": \"2026-06-25\", \"destination\": \"London\", \"lk_agents_confirm_duplicate\": true, \"origin\": \"Dublin\"}", "speech_id": "speech_7865bd740fc9", "pid": 93851, "job_id": "AJ_JM9aU7VreFcV", "room": "console-e12dbf6f"}
2026-06-24 14:31:24,718 - DEBUG livekit.agents - tools execution completed {"speech_id": "speech_7865bd740fc9", "pid": 93851, "job_id": "AJ_JM9aU7VreFcV", "room": "console-e12dbf6f"}
2026-06-24 14:31:25,676 - DEBUG livekit.agents - executing tool {"function": "book_flight", "arguments": "{\"date\": \"2026-06-25\", \"destination\": \"London\", \"lk_agents_confirm_duplicate\": true, \"origin\": \"Dublin\"}", "speech_id": "speech_7865bd740fc9", "pid": 93851, "job_id": "AJ_JM9aU7VreFcV", "room": "console-e12dbf6f"}
2026-06-24 14:31:25,787 - DEBUG livekit.agents - tools execution completed {"speech_id": "speech_7865bd740fc9", "pid": 93851, "job_id": "AJ_JM9aU7VreFcV", "room": "console-e12dbf6f"}
2026-06-24 14:31:26,342 - DEBUG livekit.agents - executing tool {"function": "lk_agents_get_running_tasks", "arguments": "{}", "speech_id": "speech_7865bd740fc9", "pid": 93851, "job_id": "AJ_JM9aU7VreFcV", "room": "console-e12dbf6f"}
2026-06-24 14:31:26,357 - DEBUG livekit.agents - tools execution completed {"speech_id": "speech_7865bd740fc9", "pid": 93851, "job_id": "AJ_JM9aU7VreFcV", "room": "console-e12dbf6f"}
2026-06-24 14:31:27,276 - DEBUG livekit.agents - executing tool {"function": "lk_agents_cancel_task", "arguments": "{\"call_id\": \"call_d8ttob6m49bc73dpqdb0\"}", "speech_id": "speech_7865bd740fc9", "pid": 93851, "job_id": "AJ_JM9aU7VreFcV", "room": "console-e12dbf6f"}
2026-06-24 14:31:27,281 - DEBUG livekit.agents - tool cancelled {"call_id": "call_d8ttob6m49bc73dpqdb0", "function": "book_flight", "pid": 93851, "job_id": "AJ_JM9aU7VreFcV", "room": "console-e12dbf6f"}
2026-06-24 14:31:27,294 - DEBUG livekit.agents - tools execution completed {"speech_id": "speech_7865bd740fc9", "pid": 93851, "job_id": "AJ_JM9aU7VreFcV", "room": "console-e12dbf6f"}
2026-06-24 14:31:27,297 - WARNING livekit.agents - maximum number of function calls steps reached, generating final response with tool_choice='none' {"speech_id": "speech_7865bd740fc9", "pid": 93851, "job_id": "AJ_JM9aU7VreFcV", "room": "console-e12dbf6f"}
2026-06-24 14:31:29,036 - ERROR livekit.agents - received a tool call with tool_choice set to 'none', ignoring {"function": "lk_agents_cancel_task", "speech_id": "speech_7865bd740fc9", "pid": 93851, "job_id": "AJ_JM9aU7VreFcV", "room": "console-e12dbf6f"}
2026-06-24 14:31:32,397 - DEBUG livekit.agents - conversation_item_added {"role": "assistant", "text": "We will need your email address to confirm the flight booking.", "pid": 93851, "job_id": "AJ_JM9aU7VreFcV", "room": "console-e12dbf6f"}
2026-06-24 14:31:32,405 - DEBUG livekit.agents - reusing STT pipeline from previous activity {"pid": 93851, "job_id": "AJ_JM9aU7VreFcV", "room": "console-e12dbf6f"}
2026-06-24 14:31:32,405 - DEBUG livekit.agents - reusing turn detector stream from previous activity {"pid": 93851, "job_id": "AJ_JM9aU7VreFcV", "room": "console-e12dbf6f"}
2026-06-24 14:31:34,918 - DEBUG livekit.agents - conversation_item_added {"role": "assistant", "text": "Please provide your email address.", "pid": 93851, "job_id": "AJ_JM9aU7VreFcV", "room": "console-e12dbf6f"}
2026-06-24 14:31:42,102 - DEBUG livekit.agents - received user transcript {"user_transcript": "It's a b c at Google dot com.", "language": "en", "transcript_delay": 0.6659722328186035, "pid": 93851, "job_id": "AJ_JM9aU7VreFcV", "room": "console-e12dbf6f"}

@longcw longcw Jun 25, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems like it's interrupted before the first update complete so the first tool call is not in the chat context.

maybe we need a placeholder for any tool call before it's done, like discussed here https://live-kit.slack.com/archives/C07TWVC0W4A/p1779940585627339?thread_ts=1779811238.610529&cid=C07TWVC0W4A

I can work on it in a separate pr.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, something like that. The other pattern I noticed is that it said good bye to me before the final confirmation arrives, maybe a model issue, but maybe it could benefit from same placeholder design.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checking the log closer, that's weird after your get email task (called from the book_flight task), another two book_flight tasks were called, and then one of it was cancelled... looks like a LLM hallucination...


await asyncio.sleep(40)

Expand Down Expand Up @@ -212,6 +217,8 @@ async def _summarize(

@server.rtc_session()
async def entrypoint(ctx: JobContext):
await ctx.connect()

session = AgentSession(
stt=inference.STT("deepgram/nova-3"),
# llm=inference.LLM("openai/gpt-5.3-chat-latest"),
Expand All @@ -221,6 +228,25 @@ async def entrypoint(ctx: JobContext):
turn_handling={"interruption": {"mode": "vad"}},
)

# stream tool execution (calls, ctx.update progress, reply lifecycle) to the frontend

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe a todo item: we should drop this once Console shows them.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the meantime, should we log each event too so it is visible in the logs?

status_ch = aio.Chan[ToolExecutionUpdatedEvent]()

@session.on("tool_execution_updated")
def _on_tool_status(ev: ToolExecutionUpdatedEvent) -> None:
status_ch.send_nowait(ev)

async def _publish_status() -> None:
async for ev in status_ch:
await ctx.room.local_participant.publish_data(ev.model_dump_json(), topic="tool_status")

publish_task = asyncio.create_task(_publish_status())

async def _close_status() -> None:
status_ch.close()
await aio.cancel_and_wait(publish_task)

ctx.add_shutdown_callback(_close_status)

await session.start(agent=TravelAgent(), room=ctx.room)


Expand Down
10 changes: 10 additions & 0 deletions livekit-agents/livekit/agents/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@
RunContext,
SessionUsageUpdatedEvent,
SpeechCreatedEvent,
ToolCallEnded,
ToolCallStarted,
ToolCallUpdated,
ToolExecutionUpdatedEvent,
ToolReplyUpdated,
UserInputTranscribedEvent,
UserStateChangedEvent,
UserTurnExceededEvent,
Expand Down Expand Up @@ -181,6 +186,11 @@ def __getattr__(name: str) -> typing.Any:
"UserInputTranscribedEvent",
"UserStateChangedEvent",
"SpeechCreatedEvent",
"ToolExecutionUpdatedEvent",
"ToolCallStarted",
"ToolCallUpdated",
"ToolCallEnded",
"ToolReplyUpdated",
"MetricsCollectedEvent",
"SessionUsageUpdatedEvent",
"FunctionToolsExecutedEvent",
Expand Down
10 changes: 10 additions & 0 deletions livekit-agents/livekit/agents/voice/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@
RunContext,
SessionUsageUpdatedEvent,
SpeechCreatedEvent,
ToolCallEnded,
ToolCallStarted,
ToolCallUpdated,
ToolExecutionUpdatedEvent,
ToolReplyUpdated,
UserInputTranscribedEvent,
UserStateChangedEvent,
UserTurnExceededEvent,
Expand Down Expand Up @@ -50,6 +55,11 @@
"FunctionToolsExecutedEvent",
"AgentFalseInterruptionEvent",
"RemoteSession",
"ToolExecutionUpdatedEvent",
"ToolCallStarted",
"ToolCallUpdated",
"ToolCallEnded",
"ToolReplyUpdated",
"UserTurnExceededEvent",
"TranscriptSynchronizer",
"io",
Expand Down
72 changes: 71 additions & 1 deletion livekit-agents/livekit/agents/voice/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,9 @@ async def update(
for s in self._filler_schedulers:
s.reset_dwell()

# events carry the raw message, before the LLM-facing template wraps it
raw_message = message if isinstance(message, str) else str(message)

if isinstance(message, str):
if template is None:
if self._executor is not None:
Expand Down Expand Up @@ -213,7 +216,18 @@ async def update(
self._updates.append(pair)

if self._executor is None:
return # standalone — nothing else to do
return # standalone — no executor, so no tool lifecycle to report

self._session.emit(
"tool_execution_updated",
ToolExecutionUpdatedEvent(
update=ToolCallUpdated(
id=pair[0].call_id,
call_id=self.function_call.call_id,
message=raw_message,
)
),
)

assert self._first_update_fut is not None
if not self._first_update_fut.done():
Expand Down Expand Up @@ -288,6 +302,7 @@ def _make_update_pair(
"metrics_collected",
"session_usage_updated",
"speech_created",
"tool_execution_updated",
"error",
"close",
"debug_message",
Expand Down Expand Up @@ -453,6 +468,60 @@ class SpeechCreatedEvent(BaseModel):
created_at: float = Field(default_factory=time.time)


class ToolCallStarted(BaseModel):
"""A function tool call was dispatched."""

type: Literal["tool_call_started"] = "tool_call_started"
function_call: FunctionCall


class ToolCallUpdated(BaseModel):
"""A progress update emitted via ``ctx.update()`` while a tool call runs."""

type: Literal["tool_call_updated"] = "tool_call_updated"
id: str
"""Entry id: ``call_id`` inline, ``{call_id}_update_N`` when deferred."""
call_id: str
message: str


class ToolCallEnded(BaseModel):
"""A tool call's single terminal entry."""

type: Literal["tool_call_ended"] = "tool_call_ended"
id: str
"""Entry id: ``call_id`` inline, ``{call_id}_final`` when deferred."""
call_id: str
message: str | None = None
"""Result or error text; None when there is nothing to voice."""
status: Literal["done", "error", "cancelled"]


class ToolReplyUpdated(BaseModel):
"""Lifecycle of the deferred reply that voices buffered tool updates: ``scheduled``
when queued, then ``completed`` / ``interrupted`` / ``skipped``. One reply may cover
several calls; an inline first update never gets one."""

type: Literal["tool_reply_updated"] = "tool_reply_updated"
update_ids: list[str]
"""``ToolCallUpdated.id`` values this reply covers."""
status: Literal["scheduled", "completed", "interrupted", "skipped"]
speech_id: str
"""Id of the reply speech; ``speech_created`` carries its handle."""


class ToolExecutionUpdatedEvent(BaseModel):
"""One flat tool-lifecycle update. Discriminate on ``update.type``: ``tool_call_started``
→ ``tool_call_updated`` → ``tool_call_ended`` → ``tool_reply_updated``."""

type: Literal["tool_execution_updated"] = "tool_execution_updated"
update: Annotated[
ToolCallStarted | ToolCallUpdated | ToolCallEnded | ToolReplyUpdated,
Field(discriminator="type"),
]
created_at: float = Field(default_factory=time.time)


class UserTurnExceededEvent(BaseModel):
type: Literal["user_turn_exceeded"] = "user_turn_exceeded"
transcript: str
Expand Down Expand Up @@ -517,6 +586,7 @@ class CloseEvent(BaseModel):
| ConversationItemAddedEvent
| FunctionToolsExecutedEvent
| SpeechCreatedEvent
| ToolExecutionUpdatedEvent
| ErrorEvent
| CloseEvent
| OverlappingSpeechEvent,
Expand Down
69 changes: 69 additions & 0 deletions livekit-agents/livekit/agents/voice/remote_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@
ErrorEvent,
FunctionToolsExecutedEvent,
SessionUsageUpdatedEvent,
ToolCallEnded,
ToolCallStarted,
ToolCallUpdated,
ToolExecutionUpdatedEvent,
ToolReplyUpdated,
UserInputTranscribedEvent,
UserState,
UserStateChangedEvent,
Expand Down Expand Up @@ -251,6 +256,19 @@ async def __anext__(self) -> agent_pb.AgentSessionMessage:
"e2e_latency",
)

_TOOL_CALL_STATUS_MAP: dict[str, agent_pb.ToolCallStatus] = {
"done": agent_pb.TC_DONE,
"error": agent_pb.TC_ERROR,
"cancelled": agent_pb.TC_CANCELLED,
}

_TOOL_REPLY_STATUS_MAP: dict[str, agent_pb.ToolReplyStatus] = {
"scheduled": agent_pb.TR_SCHEDULED,
"completed": agent_pb.TR_COMPLETED,
"interrupted": agent_pb.TR_INTERRUPTED,
"skipped": agent_pb.TR_SKIPPED,
}

_AMD_CATEGORY_MAP: dict[AMDCategory, agent_pb.AmdCategory] = {
AMDCategory.HUMAN: agent_pb.AmdCategory.AMD_HUMAN,
AMDCategory.MACHINE_IVR: agent_pb.AmdCategory.AMD_MACHINE_IVR,
Expand Down Expand Up @@ -372,6 +390,7 @@ def register_session(self, session: AgentSession) -> None:
session.on("conversation_item_added", self._on_conversation_item_added)
session.on("user_input_transcribed", self._on_user_input_transcribed)
session.on("function_tools_executed", self._on_function_tools_executed)
session.on("tool_execution_updated", self._on_tool_execution_updated)
session.on("session_usage_updated", self._on_session_usage_updated)
session.on("overlapping_speech", self._on_overlapping_speech)
session.on("error", self._on_error)
Expand All @@ -396,6 +415,7 @@ async def aclose(self) -> None:
self._session.off("conversation_item_added", self._on_conversation_item_added)
self._session.off("user_input_transcribed", self._on_user_input_transcribed)
self._session.off("function_tools_executed", self._on_function_tools_executed)
self._session.off("tool_execution_updated", self._on_tool_execution_updated)
self._session.off("session_usage_updated", self._on_session_usage_updated)
self._session.off("overlapping_speech", self._on_overlapping_speech)
self._session.off("error", self._on_error)
Expand Down Expand Up @@ -515,6 +535,54 @@ def _on_function_tools_executed(self, event: FunctionToolsExecutedEvent) -> None
)
)

def _on_tool_execution_updated(self, event: ToolExecutionUpdatedEvent) -> None:

@chenghao-mou chenghao-mou Jun 24, 2026

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rektdeckard I know you've been waiting for this for a while. Long has them now and they are much better than just the start and end states.

pb = agent_pb.AgentSessionEvent.ToolExecutionUpdated
updated: agent_pb.AgentSessionEvent.ToolExecutionUpdated
if isinstance(event.update, ToolCallStarted):
fc = event.update.function_call
updated = pb(
started=pb.Started(
function_call=agent_pb.FunctionCall(
id=fc.id,
call_id=fc.call_id,
name=fc.name,
arguments=fc.arguments,
)
)
)
elif isinstance(event.update, ToolCallUpdated):
updated = pb(
call_updated=pb.CallUpdated(
id=event.update.id,
call_id=event.update.call_id,
message=event.update.message,
)
)
elif isinstance(event.update, ToolCallEnded):
ended = pb.Ended(
id=event.update.id,
call_id=event.update.call_id,
status=_TOOL_CALL_STATUS_MAP[event.update.status],
)
if event.update.message is not None:
ended.message = event.update.message
updated = pb(ended=ended)
elif isinstance(event.update, ToolReplyUpdated):
updated = pb(
reply_updated=pb.ReplyUpdated(
update_ids=event.update.update_ids,
status=_TOOL_REPLY_STATUS_MAP[event.update.status],
speech_id=event.update.speech_id,
)
)
else:
return

self._send_event(
agent_pb.AgentSessionEvent(tool_execution_updated=updated),
created_at=event.created_at,
)

def _on_overlapping_speech(self, event: OverlappingSpeechEvent) -> None:
detected_at = Timestamp()
detected_at.FromNanoseconds(int(event.detected_at * 1e9))
Expand Down Expand Up @@ -931,6 +999,7 @@ def _session_usage_to_proto(usage: AgentSessionUsage) -> agent_pb.AgentSessionUs
"conversation_item_added",
"user_input_transcribed",
"function_tools_executed",
"tool_execution_updated",
"session_usage_updated",
"error",
]
Expand Down
Loading