-
Notifications
You must be signed in to change notification settings - Fork 3.3k
feat(voice): stream tool call status events #6100
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
3178fbc
ba50515
a2590ef
3f77a1b
7248498
f63ee13
2727c60
2660ea8
b1da658
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,10 +20,12 @@ | |
| AgentSession, | ||
| JobContext, | ||
| RunContext, | ||
| ToolExecutionUpdatedEvent, | ||
| cli, | ||
| inference, | ||
| llm, | ||
| ) | ||
| from livekit.agents.utils import aio | ||
|
|
||
| logger = logging.getLogger("async-travel-helper") | ||
|
|
||
|
|
@@ -118,6 +120,9 @@ async def book_flight(self, ctx: RunContext, origin: str, destination: str, date | |
| ) | ||
| self._user_email = email.email_address | ||
| logger.info(f"User's email address: {self._user_email}") | ||
| await ctx.update( | ||
| "Thanks for providing your email address, we are confirming the booking now." | ||
| ) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One more issue I noticed when testing: It booked multiple flights for me because I interrupted one of its reply:
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it seems like it's interrupted before the first update complete so the first tool call is not in the chat context. maybe we need a placeholder for any tool call before it's done, like discussed here https://live-kit.slack.com/archives/C07TWVC0W4A/p1779940585627339?thread_ts=1779811238.610529&cid=C07TWVC0W4A I can work on it in a separate pr.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, something like that. The other pattern I noticed is that it said good bye to me before the final confirmation arrives, maybe a model issue, but maybe it could benefit from same placeholder design.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. checking the log closer, that's weird after your get email task (called from the book_flight task), another two |
||
|
|
||
| await asyncio.sleep(40) | ||
|
|
||
|
|
@@ -212,6 +217,8 @@ async def _summarize( | |
|
|
||
| @server.rtc_session() | ||
| async def entrypoint(ctx: JobContext): | ||
| await ctx.connect() | ||
|
|
||
| session = AgentSession( | ||
| stt=inference.STT("deepgram/nova-3"), | ||
| # llm=inference.LLM("openai/gpt-5.3-chat-latest"), | ||
|
|
@@ -221,6 +228,25 @@ async def entrypoint(ctx: JobContext): | |
| turn_handling={"interruption": {"mode": "vad"}}, | ||
| ) | ||
|
|
||
| # stream tool execution (calls, ctx.update progress, reply lifecycle) to the frontend | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe a todo item: we should drop this once Console shows them.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. in the meantime, should we log each event too so it is visible in the logs? |
||
| status_ch = aio.Chan[ToolExecutionUpdatedEvent]() | ||
|
|
||
| @session.on("tool_execution_updated") | ||
| def _on_tool_status(ev: ToolExecutionUpdatedEvent) -> None: | ||
| status_ch.send_nowait(ev) | ||
|
|
||
| async def _publish_status() -> None: | ||
| async for ev in status_ch: | ||
| await ctx.room.local_participant.publish_data(ev.model_dump_json(), topic="tool_status") | ||
|
|
||
| publish_task = asyncio.create_task(_publish_status()) | ||
|
|
||
| async def _close_status() -> None: | ||
| status_ch.close() | ||
| await aio.cancel_and_wait(publish_task) | ||
|
|
||
| ctx.add_shutdown_callback(_close_status) | ||
|
|
||
| await session.start(agent=TravelAgent(), room=ctx.room) | ||
|
|
||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -43,6 +43,11 @@ | |
| ErrorEvent, | ||
| FunctionToolsExecutedEvent, | ||
| SessionUsageUpdatedEvent, | ||
| ToolCallEnded, | ||
| ToolCallStarted, | ||
| ToolCallUpdated, | ||
| ToolExecutionUpdatedEvent, | ||
| ToolReplyUpdated, | ||
| UserInputTranscribedEvent, | ||
| UserState, | ||
| UserStateChangedEvent, | ||
|
|
@@ -251,6 +256,19 @@ async def __anext__(self) -> agent_pb.AgentSessionMessage: | |
| "e2e_latency", | ||
| ) | ||
|
|
||
| _TOOL_CALL_STATUS_MAP: dict[str, agent_pb.ToolCallStatus] = { | ||
| "done": agent_pb.TC_DONE, | ||
| "error": agent_pb.TC_ERROR, | ||
| "cancelled": agent_pb.TC_CANCELLED, | ||
| } | ||
|
|
||
| _TOOL_REPLY_STATUS_MAP: dict[str, agent_pb.ToolReplyStatus] = { | ||
| "scheduled": agent_pb.TR_SCHEDULED, | ||
| "completed": agent_pb.TR_COMPLETED, | ||
| "interrupted": agent_pb.TR_INTERRUPTED, | ||
| "skipped": agent_pb.TR_SKIPPED, | ||
| } | ||
|
|
||
| _AMD_CATEGORY_MAP: dict[AMDCategory, agent_pb.AmdCategory] = { | ||
| AMDCategory.HUMAN: agent_pb.AmdCategory.AMD_HUMAN, | ||
| AMDCategory.MACHINE_IVR: agent_pb.AmdCategory.AMD_MACHINE_IVR, | ||
|
|
@@ -372,6 +390,7 @@ def register_session(self, session: AgentSession) -> None: | |
| session.on("conversation_item_added", self._on_conversation_item_added) | ||
| session.on("user_input_transcribed", self._on_user_input_transcribed) | ||
| session.on("function_tools_executed", self._on_function_tools_executed) | ||
| session.on("tool_execution_updated", self._on_tool_execution_updated) | ||
| session.on("session_usage_updated", self._on_session_usage_updated) | ||
| session.on("overlapping_speech", self._on_overlapping_speech) | ||
| session.on("error", self._on_error) | ||
|
|
@@ -396,6 +415,7 @@ async def aclose(self) -> None: | |
| self._session.off("conversation_item_added", self._on_conversation_item_added) | ||
| self._session.off("user_input_transcribed", self._on_user_input_transcribed) | ||
| self._session.off("function_tools_executed", self._on_function_tools_executed) | ||
| self._session.off("tool_execution_updated", self._on_tool_execution_updated) | ||
| self._session.off("session_usage_updated", self._on_session_usage_updated) | ||
| self._session.off("overlapping_speech", self._on_overlapping_speech) | ||
| self._session.off("error", self._on_error) | ||
|
|
@@ -515,6 +535,54 @@ def _on_function_tools_executed(self, event: FunctionToolsExecutedEvent) -> None | |
| ) | ||
| ) | ||
|
|
||
| def _on_tool_execution_updated(self, event: ToolExecutionUpdatedEvent) -> None: | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @rektdeckard I know you've been waiting for this for a while. Long has them now and they are much better than just the start and end states. |
||
| pb = agent_pb.AgentSessionEvent.ToolExecutionUpdated | ||
| updated: agent_pb.AgentSessionEvent.ToolExecutionUpdated | ||
| if isinstance(event.update, ToolCallStarted): | ||
| fc = event.update.function_call | ||
| updated = pb( | ||
| started=pb.Started( | ||
| function_call=agent_pb.FunctionCall( | ||
| id=fc.id, | ||
| call_id=fc.call_id, | ||
| name=fc.name, | ||
| arguments=fc.arguments, | ||
| ) | ||
| ) | ||
| ) | ||
| elif isinstance(event.update, ToolCallUpdated): | ||
| updated = pb( | ||
| call_updated=pb.CallUpdated( | ||
| id=event.update.id, | ||
| call_id=event.update.call_id, | ||
| message=event.update.message, | ||
| ) | ||
| ) | ||
| elif isinstance(event.update, ToolCallEnded): | ||
| ended = pb.Ended( | ||
| id=event.update.id, | ||
| call_id=event.update.call_id, | ||
| status=_TOOL_CALL_STATUS_MAP[event.update.status], | ||
| ) | ||
| if event.update.message is not None: | ||
| ended.message = event.update.message | ||
| updated = pb(ended=ended) | ||
| elif isinstance(event.update, ToolReplyUpdated): | ||
| updated = pb( | ||
| reply_updated=pb.ReplyUpdated( | ||
| update_ids=event.update.update_ids, | ||
| status=_TOOL_REPLY_STATUS_MAP[event.update.status], | ||
| speech_id=event.update.speech_id, | ||
| ) | ||
| ) | ||
| else: | ||
| return | ||
|
|
||
| self._send_event( | ||
| agent_pb.AgentSessionEvent(tool_execution_updated=updated), | ||
| created_at=event.created_at, | ||
| ) | ||
|
|
||
| def _on_overlapping_speech(self, event: OverlappingSpeechEvent) -> None: | ||
| detected_at = Timestamp() | ||
| detected_at.FromNanoseconds(int(event.detected_at * 1e9)) | ||
|
|
@@ -931,6 +999,7 @@ def _session_usage_to_proto(usage: AgentSessionUsage) -> agent_pb.AgentSessionUs | |
| "conversation_item_added", | ||
| "user_input_transcribed", | ||
| "function_tools_executed", | ||
| "tool_execution_updated", | ||
| "session_usage_updated", | ||
| "error", | ||
| ] | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe we should redact the email, at least part of it, in the log.