Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions livekit-agents/livekit/agents/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
SpeechCreatedEvent,
UserInputTranscribedEvent,
UserStateChangedEvent,
UserTranscriptionTimeoutEvent,
UserTurnExceededEvent,
avatar,
io,
Expand Down Expand Up @@ -180,6 +181,7 @@ def __getattr__(name: str) -> typing.Any:
"AgentFalseInterruptionEvent",
"UserInputTranscribedEvent",
"UserStateChangedEvent",
"UserTranscriptionTimeoutEvent",
"SpeechCreatedEvent",
"MetricsCollectedEvent",
"SessionUsageUpdatedEvent",
Expand Down
2 changes: 2 additions & 0 deletions livekit-agents/livekit/agents/voice/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
SpeechCreatedEvent,
UserInputTranscribedEvent,
UserStateChangedEvent,
UserTranscriptionTimeoutEvent,
UserTurnExceededEvent,
)
from .remote_session import RemoteSession
Expand Down Expand Up @@ -46,6 +47,7 @@
"CloseEvent",
"CloseReason",
"UserStateChangedEvent",
"UserTranscriptionTimeoutEvent",
"AgentStateChangedEvent",
"FunctionToolsExecutedEvent",
"AgentFalseInterruptionEvent",
Expand Down
10 changes: 10 additions & 0 deletions livekit-agents/livekit/agents/voice/agent_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
SessionUsageUpdatedEvent,
SpeechCreatedEvent,
UserInputTranscribedEvent,
UserTranscriptionTimeoutEvent,
UserTurnExceededEvent,
_AgentBackchannelOpportunityEvent,
)
Expand Down Expand Up @@ -1916,6 +1917,15 @@ def on_end_of_speech(self, ev: vad.VADEvent | None) -> None:
if self._paused_speech:
self._start_false_interruption_timer(self._paused_speech.timeout)

def on_transcription_timeout(self, *, speech_duration: float, turn_start: float) -> None:
self._session.emit(
"user_transcription_timeout",
UserTranscriptionTimeoutEvent(
speech_duration=speech_duration,
vad_speech_started_at=turn_start,
),
)

def on_vad_inference_done(self, ev: vad.VADEvent) -> None:
if self._turn_detection in ("manual", "realtime_llm"):
# ignore vad inference done event if turn_detection is manual or realtime_llm
Expand Down
10 changes: 10 additions & 0 deletions livekit-agents/livekit/agents/voice/agent_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ class AgentSessionOptions:
"""sparse endpointing keys the user provided explicitly"""
max_tool_steps: int
user_away_timeout: float | None
transcription_timeout: float | None
min_consecutive_speech_delay: float
use_tts_aligned_transcript: bool | None
tts_text_transforms: Sequence[TextTransforms] | None
Expand Down Expand Up @@ -245,6 +246,7 @@ def __init__(
aec_warmup_duration: float | None = 3.0,
ivr_detection: bool = False,
user_away_timeout: float | None = 15.0,
transcription_timeout: float | None = 5.0,
session_close_transcript_timeout: float = 2.0,
# Runtime settings
conn_options: NotGivenOr[SessionConnectOptions] = NOT_GIVEN,
Expand Down Expand Up @@ -317,6 +319,13 @@ def __init__(
user_away_timeout (float, optional): If set, set the user state as
"away" after this amount of time after user and agent are silent.
Defaults to ``15.0`` s, set to ``None`` to disable.
transcription_timeout (float, optional): If set, emit a
``user_transcription_timeout`` event when VAD detects user speech
during the user's turn but no transcript arrives within this amount
of time after the speech ends. Useful to recover from STT failures
by prompting the user to repeat themselves. Relies on VAD speech
detection (the bundled default VAD works); inert only if VAD is
disabled. Defaults to ``5.0`` s, set to ``None`` to disable.
aec_warmup_duration (float, optional): The duration in seconds that the agent
will ignore user's audio interruptions after the agent starts speaking.
This is useful to prevent the agent from being interrupted by echo before AEC is ready.
Expand Down Expand Up @@ -384,6 +393,7 @@ def __init__(
endpointing_overrides=endpointing_overrides,
max_tool_steps=max_tool_steps,
user_away_timeout=user_away_timeout,
transcription_timeout=transcription_timeout,
min_consecutive_speech_delay=min_consecutive_speech_delay,
tts_text_transforms=(
tts_text_transforms
Expand Down
59 changes: 59 additions & 0 deletions livekit-agents/livekit/agents/voice/audio_recognition.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ def on_vad_inference_done(self, ev: vad.VADEvent) -> None: ...
def on_end_of_speech(self, ev: vad.VADEvent | None) -> None: ...
def on_interim_transcript(self, ev: stt.SpeechEvent, *, speaking: bool | None) -> None: ...
def on_final_transcript(self, ev: stt.SpeechEvent, *, speaking: bool | None = None) -> None: ...
def on_transcription_timeout(self, *, speech_duration: float, turn_start: float) -> None: ...
def on_end_of_turn(self, info: _EndOfTurnInfo) -> bool: ...
def on_eot_prediction(self, ev: EotPredictionEvent) -> None: ...
def on_agent_backchannel_opportunity(self, ev: _AgentBackchannelOpportunityEvent) -> None: ...
Expand Down Expand Up @@ -282,6 +283,11 @@ def __init__(

self._vad_speech_started: bool = False

# transcription timeout: VAD heard speech but STT produced no transcript
self._transcription_timeout_handle: asyncio.TimerHandle | None = None
self._turn_speech_duration: float = 0.0
self._turn_transcript_received: bool = False

# user turn limit tracking — accumulates across turns until agent speaks
self._turn_tracker = _UserTurnTracker()
self._word_tokenizer = tokenize.basic.WordTokenizer()
Expand Down Expand Up @@ -699,6 +705,8 @@ async def aclose(self) -> None:
self._backchannel_boundary_timer = None
self.backchannel_boundary_callback = None

self._cancel_transcription_timeout()

def update_stt(self, stt: io.STTNode | None, *, pipeline: _STTPipeline | None = None) -> None:
self._stt = stt
if pipeline is None and stt is not None:
Expand All @@ -718,6 +726,8 @@ def update_stt(self, stt: io.STTNode | None, *, pipeline: _STTPipeline | None =
self._ignore_user_transcript_until = NOT_GIVEN
self._input_started_at = None
else:
self._cancel_transcription_timeout()

if self._stt_consumer_atask is not None:
task = asyncio.create_task(aio.cancel_and_wait(self._stt_consumer_atask))
task.add_done_callback(lambda _: self._tasks.discard(task))
Expand Down Expand Up @@ -1052,6 +1062,8 @@ async def _on_stt_event(self, ev: stt.SpeechEvent) -> None:
if not transcript:
return

self._mark_turn_transcribed()

self._hooks.on_final_transcript(
ev,
speaking=self._speaking
Expand Down Expand Up @@ -1124,6 +1136,8 @@ async def _on_stt_event(self, ev: stt.SpeechEvent) -> None:
if not transcript:
return

self._mark_turn_transcribed()

logger.debug(
"received user preflight transcript",
extra={"user_transcript": transcript, "language": self._last_language},
Expand Down Expand Up @@ -1158,6 +1172,7 @@ async def _on_stt_event(self, ev: stt.SpeechEvent) -> None:
else None,
)
self._audio_interim_transcript = ev.alternatives[0].text
# interim transcripts don't cancel the timeout: STT may drop them without a final

elif ev.type == stt.SpeechEventType.END_OF_SPEECH and self._turn_detection_mode == "stt":
with trace.use_span(self._ensure_user_turn_span()):
Expand Down Expand Up @@ -1219,6 +1234,8 @@ async def _on_vad_event(self, ev: vad.VADEvent) -> None:
self._speech_start_time = speech_start_time
self._vad_speech_started = True

self._cancel_transcription_timeout()

with trace.use_span(self._ensure_user_turn_span(start_time=speech_start_time)):
self._hooks.on_start_of_speech(ev, speech_start_time=speech_start_time)

Expand Down Expand Up @@ -1269,6 +1286,9 @@ async def _on_vad_event(self, ev: vad.VADEvent) -> None:
self._user_speaking_event.clear()
self._last_speaking_time = time.time() - ev.silence_duration - ev.inference_duration

if self._stt_pipeline is not None:
self._arm_transcription_timeout(ev.speech_duration)

if self._vad_base_turn_detection or (
self._turn_detection_mode == "stt" and self._user_turn_committed
):
Expand Down Expand Up @@ -1564,6 +1584,9 @@ async def _bounce_eou_task(
self._user_turn_span = None
self._user_turn_start = None
self._stt_request_ids = []
self._cancel_transcription_timeout()
self._turn_speech_duration = 0.0
self._turn_transcript_received = False

# clear the transcript if the user turn was committed
self._audio_transcript = ""
Expand Down Expand Up @@ -1756,6 +1779,42 @@ async def _forward() -> None:
await aio.cancel_and_wait(forward_task)
await stream.aclose()

# region: user transcription timeout
def _cancel_transcription_timeout(self) -> None:
if self._transcription_timeout_handle is not None:
self._transcription_timeout_handle.cancel()
self._transcription_timeout_handle = None

def _mark_turn_transcribed(self) -> None:
self._turn_transcript_received = True
self._cancel_transcription_timeout()

def _arm_transcription_timeout(self, speech_duration: float) -> None:
self._turn_speech_duration += speech_duration

timeout = self._session.options.transcription_timeout
if timeout is None or self._turn_transcript_received:
return

self._cancel_transcription_timeout()
self._transcription_timeout_handle = asyncio.get_running_loop().call_later(
timeout, self._on_transcription_timeout
)

def _on_transcription_timeout(self) -> None:
self._transcription_timeout_handle = None
if self._user_turn_start is None or self._turn_transcript_received:
return

if self._agent_speaking:
return

self._hooks.on_transcription_timeout(
speech_duration=self._turn_speech_duration, turn_start=self._user_turn_start
)
Comment on lines +1804 to +1814

@devin-ai-integration devin-ai-integration Bot Jun 22, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 Transcription timeout suppressed during agent speech with no re-arm

When _on_transcription_timeout fires while the agent is speaking (audio_recognition.py:1809), the callback returns silently without emitting the event and without scheduling a retry. This means if VAD detects user speech that produces no transcript, and the timeout happens to fire while the agent is still speaking (e.g. the user spoke over the agent near the end of its turn), the timeout event is permanently lost for that speech burst. The rationale is likely that speech detected during agent output is often echo/noise, but with AEC enabled it could be genuine user speech. Whether this is acceptable depends on the use case.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since there is no user content, we should still emit the signal so the agent can check.


# endregion

def _ensure_user_turn_span(self, start_time: float | None = None) -> trace.Span:
if self._user_turn_span and self._user_turn_span.is_recording():
return self._user_turn_span
Expand Down
11 changes: 11 additions & 0 deletions livekit-agents/livekit/agents/voice/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ def _make_update_pair(
"user_state_changed",
"agent_state_changed",
"user_input_transcribed",
"user_transcription_timeout",
"conversation_item_added",
"agent_false_interruption",
"overlapping_speech",
Expand Down Expand Up @@ -320,6 +321,15 @@ class UserInputTranscribedEvent(BaseModel):
created_at: float = Field(default_factory=time.time)


class UserTranscriptionTimeoutEvent(BaseModel):
type: Literal["user_transcription_timeout"] = "user_transcription_timeout"
speech_duration: float
"""Total VAD-detected speech (s) in the turn that produced no transcript."""
vad_speech_started_at: float
"""When VAD first detected speech for this (untranscribed) turn."""
created_at: float = Field(default_factory=time.time)


class EotPredictionEvent(BaseModel):
type: Literal["eot_prediction"] = "eot_prediction"
probability: float
Expand Down Expand Up @@ -509,6 +519,7 @@ class CloseEvent(BaseModel):

AgentEvent = Annotated[
UserInputTranscribedEvent
| UserTranscriptionTimeoutEvent
| UserStateChangedEvent
| AgentStateChangedEvent
| AgentFalseInterruptionEvent
Expand Down
1 change: 1 addition & 0 deletions livekit-agents/livekit/agents/voice/remote_session.py

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 New event not forwarded in SessionHost remote transport

The SessionHost in remote_session.py registers handlers for specific events (lines 371-379) and forwards them over the transport. The new user_transcription_timeout event is not registered or forwarded. This means remote sessions won't receive this event. This is likely acceptable as a first iteration (not all events need remote transport support immediately), but it's an inconsistency with the event being in EventTypes and AgentEvent.

(Refers to lines 366-379)

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ def _serialize_options(opts: AgentSessionOptions) -> dict[str, str]:
"interruption": str(dict(opts.interruption)),
"max_tool_steps": str(opts.max_tool_steps),
"user_away_timeout": str(opts.user_away_timeout),
"transcription_timeout": str(opts.transcription_timeout),
"preemptive_generation": str(dict(opts.preemptive_generation)),
"min_consecutive_speech_delay": str(opts.min_consecutive_speech_delay),
"use_tts_aligned_transcript": str(opts.use_tts_aligned_transcript),
Expand Down
19 changes: 14 additions & 5 deletions tests/fake_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def create_session(
turn_handling: TurnHandlingOptions | None = None,
extra_kwargs: dict[str, Any] | None = None,
can_pause_audio: bool = False,
with_stt: bool = True,
) -> AgentSession:
user_speeches = actions.get_user_speeches(speed_factor=speed_factor)
llm_responses = actions.get_llm_responses(speed_factor=speed_factor)
Expand Down Expand Up @@ -63,7 +64,7 @@ def create_session(
**{**default_interruption, **turn_handling.get("interruption", {})}
)

stt = FakeSTT(fake_user_speeches=user_speeches)
stt = FakeSTT(fake_user_speeches=user_speeches) if with_stt else None

if "aec_warmup_duration" not in extra:
extra["aec_warmup_duration"] = None # disable aec warmup by default
Expand Down Expand Up @@ -101,7 +102,6 @@ def create_session(
async def run_session(session: AgentSession, agent: Agent, *, drain_delay: float = 5) -> float:
stt = session.stt
audio_input = session.input.audio
assert isinstance(stt, FakeSTT)
assert isinstance(audio_input, FakeAudioInput)

transcription_sync: TranscriptSynchronizer | None = None
Expand All @@ -114,8 +114,10 @@ async def run_session(session: AgentSession, agent: Agent, *, drain_delay: float
t_origin = time.time()
audio_input.push(0.1)

# wait for the user speeches to be processed
await stt.fake_user_speeches_done
# wait for the user speeches to be processed (no STT: rely on drain_delay for the
# VAD timeline to play out)
if isinstance(stt, FakeSTT):
await stt.fake_user_speeches_done

await asyncio.sleep(drain_delay)
with contextlib.suppress(RuntimeError):
Expand All @@ -133,14 +135,21 @@ def __init__(self) -> None:
self._items: list[FakeUserSpeech | FakeLLMResponse | FakeTTSResponse] = []

def add_user_speech(
self, start_time: float, end_time: float, transcript: str, *, stt_delay: float = 0.2
self,
start_time: float,
end_time: float,
transcript: str,
*,
stt_delay: float = 0.2,
final: bool = True,
) -> None:
self._items.append(
FakeUserSpeech(
start_time=start_time,
end_time=end_time,
transcript=transcript,
stt_delay=stt_delay,
final=final,
)
)

Expand Down
4 changes: 4 additions & 0 deletions tests/fake_stt.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class FakeUserSpeech(BaseModel):
end_time: float
transcript: str # empty string fires VAD SOS/EOS only — no STT events
stt_delay: float
final: bool = True # when False, only an interim transcript is emitted (no final)

def speed_up(self, factor: float) -> FakeUserSpeech:
obj = copy.deepcopy(self)
Expand Down Expand Up @@ -229,6 +230,9 @@ def curr_time() -> float:
final_transcript_time = fake_speech.end_time + fake_speech.stt_delay
if curr_time() < final_transcript_time:
await asyncio.sleep(final_transcript_time - curr_time())
if not fake_speech.final:
# interim only: STT dropped the utterance without a final transcript
continue
self.send_fake_transcript(fake_speech.transcript, is_final=True)

with contextlib.suppress(asyncio.InvalidStateError):
Expand Down
1 change: 1 addition & 0 deletions tests/test_audio_recognition_aclose.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def _create_audio_recognition(self) -> AudioRecognition:
audio_recognition._commit_user_turn_atask = None
audio_recognition._end_of_turn_task = None
audio_recognition._backchannel_boundary_timer = None
audio_recognition._transcription_timeout_handle = None

return audio_recognition

Expand Down
2 changes: 2 additions & 0 deletions tests/test_audio_recognition_turn_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ def _make_full_recognition_for_eou() -> AudioRecognition:
ar._hooks = MagicMock()
ar._hooks.on_end_of_turn.return_value = False # don't commit
ar._stt = None
ar._stt_pipeline = None # no STT pipeline => EOS skips arming the transcription timeout
ar._transcription_timeout_handle = None # SOS/commit paths cancel the timeout unconditionally
ar._audio_transcript = ""
ar._turn_detection_mode = "vad"

Expand Down
7 changes: 7 additions & 0 deletions tests/test_speech_start_time_persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,23 @@ def _create_audio_recognition(self) -> AudioRecognition:
audio_recognition._vad_base_turn_detection = False
audio_recognition._turn_detection_mode = None
audio_recognition._stt = None
audio_recognition._stt_pipeline = None
audio_recognition._stt_model = None
audio_recognition._stt_provider = None
audio_recognition._audio_transcript = ""
audio_recognition._audio_interim_transcript = ""
audio_recognition._last_speaking_time = None
# transcription timeout state touched by the SOS/EOS branches
audio_recognition._transcription_timeout_handle = None
audio_recognition._turn_speech_duration = 0.0

# collaborators
audio_recognition._hooks = MagicMock()
audio_recognition._session = MagicMock()
audio_recognition._session.amd = None
audio_recognition._session._room_io = None
# disable the transcription timeout — these tests cover VAD state only
audio_recognition._session.options.transcription_timeout = None

return audio_recognition

Expand Down
Loading
Loading