Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 2 additions & 23 deletions livekit-agents/livekit/agents/voice/room_io/_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ def __init__(

self._room.on("track_subscribed", self._on_track_available)
self._room.on("track_unpublished", self._on_track_unavailable)
self._room.on("token_refreshed", self._on_token_refreshed)

self._processor = processor
self._processor_owned = False
Expand Down Expand Up @@ -131,7 +130,6 @@ async def aclose(self) -> None:

self._room.off("track_subscribed", self._on_track_available)
self._room.off("track_unpublished", self._on_track_unavailable)
self._room.off("token_refreshed", self._on_token_refreshed)
self._data_ch.close()

@log_exceptions(logger=logger)
Expand Down Expand Up @@ -204,16 +202,6 @@ def _on_track_available(
self._close_stream()
self._stream = self._create_stream(track, participant)
self._publication = publication
if self._processor:
self._processor._on_stream_info_updated(
room_name=self._room.name,
participant_identity=participant.identity,
publication_sid=publication.sid,
)
if self._room._token is not None and self._room._server_url is not None:
self._processor._on_credentials_updated(
token=self._room._token, url=self._room._server_url
)
self._forward_atask = asyncio.create_task(
self._forward_task(self._forward_atask, self._stream, publication, participant)
)
Expand All @@ -238,16 +226,6 @@ def _on_track_unavailable(
if self._on_track_available(publication.track, publication, participant):
return

def _on_token_refreshed(self) -> None:
if (
self._processor is not None
and self._room._token is not None
and self._room._server_url is not None
):
self._processor._on_credentials_updated(
token=self._room._token, url=self._room._server_url
)


class _ParticipantAudioInputStream(_ParticipantInputStream[rtc.AudioFrame], AudioInput):
def __init__(
Expand Down Expand Up @@ -304,8 +282,9 @@ def _create_stream(self, track: rtc.Track, participant: rtc.Participant) -> rtc.
track=track,
sample_rate=self._sample_rate,
num_channels=self._num_channels,
noise_cancellation=noise_cancellation,
frame_size_ms=self._frame_size_ms,
noise_cancellation=noise_cancellation,
auto_close_noise_cancellation=False,
)
Comment on lines 282 to 288

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 auto_close_noise_cancellation=False applies unconditionally to all noise_cancellation types

The auto_close_noise_cancellation=False flag is passed unconditionally at _input.py:287, regardless of whether noise_cancellation is a FrameProcessor (agent-managed), NoiseCancellationOptions (SDK-managed config), or None. For the FrameProcessor case this is clearly correct (prevents double-close since the agent manages the lifecycle). For NoiseCancellationOptions, this depends on how the client SDK (livekit==1.1.12) internally handles the flag — if it creates an internal processor from the options, it needs to know whether to auto-close it. Since this is a coordinated release (uploaded same day per uv.lock), the SDK likely handles this correctly, but it's worth confirming that NoiseCancellationOptions-based NC is still properly cleaned up when the stream closes.

(Refers to lines 281-288)

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fine / by design, so ignoring this


@override
Expand Down
21 changes: 1 addition & 20 deletions tests/test_room_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,11 @@ async def test_participant_input_stream_aclose_unregisters_track_unpublished() -

assert room.listener_count("track_subscribed") == 1
assert room.listener_count("track_unpublished") == 1
assert room.listener_count("token_refreshed") == 1

await stream.aclose()

assert room.listener_count("track_subscribed") == 0
assert room.listener_count("track_unpublished") == 0
assert room.listener_count("token_refreshed") == 0


@pytest.mark.asyncio
Expand Down Expand Up @@ -249,16 +247,12 @@ async def test_direct_processor_lifecycle() -> None:

assert stream._processor is processor
assert processor.close_calls == 0
assert len(processor.stream_info_calls) == 1
assert len(processor.credentials_calls) == 1

# track switch — processor must survive
stream._on_track_available(track2, pub2, participant)

assert stream._processor is processor
assert processor.close_calls == 0
assert len(processor.stream_info_calls) == 2
assert len(processor.credentials_calls) == 2

# final teardown closes the processor exactly once
await stream.aclose()
Expand Down Expand Up @@ -290,17 +284,13 @@ def selector(_params: NoiseCancellationParams) -> _MockFrameProcessor:

assert len(processors) == 1
assert stream._processor is processors[0]
assert len(processors[0].stream_info_calls) == 1
assert len(processors[0].credentials_calls) == 1

# track switch — old processor closed, new one receives lifecycle calls
stream._on_track_available(track2, pub2, participant)

assert len(processors) == 2
assert processors[0].close_calls == 1
assert stream._processor is processors[1]
assert len(processors[1].stream_info_calls) == 1
assert len(processors[1].credentials_calls) == 1

# final teardown closes the active processor
await stream.aclose()
Expand All @@ -309,8 +299,7 @@ def selector(_params: NoiseCancellationParams) -> _MockFrameProcessor:

@pytest.mark.asyncio
async def test_selector_processor_track_disappears() -> None:
"""When a track vanishes with no replacement, the selector-created processor
is closed and subsequent token refreshes don't touch it."""
"""When a track vanishes with no replacement, the selector-created processor is closed."""
room = _FakeRoom()
processor = _MockFrameProcessor()
stream = _make_audio_input_stream(room, noise_cancellation=lambda _params: processor)
Expand All @@ -322,21 +311,13 @@ async def test_selector_processor_track_disappears() -> None:
stream._on_track_available(track, publication, participant)

assert stream._processor is processor
assert len(processor.credentials_calls) == 1

# track unpublished with no replacement
stream._on_track_unavailable(publication, participant)

assert processor.close_calls == 1
assert stream._processor is None

# token refresh must not reach the closed processor
room._token = "refreshed-token"
room._server_url = "wss://refreshed.livekit.cloud"
stream._on_token_refreshed()

assert len(processor.credentials_calls) == 1

await stream.aclose()


Expand Down
Loading