Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ target-version = "py311"
[tool.ruff.lint]
# https://docs.astral.sh/ruff/rules/
# https://docs.astral.sh/ruff/formatter/#conflicting-lint-rules
select = ["A", "B0", "C4", "D2", "D4", "E", "F", "I"]
select = ["A", "B0", "C4", "D2", "D4", "E", "F", "FURB", "I", "PYI", "UP"]
ignore = ["D203", "D206", "D213", "D400", "D401", "D413", "D415", "E1", "E501"]

[tool.eva]
Expand Down
6 changes: 3 additions & 3 deletions scripts/run_text_only.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import os
import shutil
import sys
from datetime import datetime, timezone
from datetime import UTC, datetime
from pathlib import Path
from typing import Any

Expand Down Expand Up @@ -514,7 +514,7 @@ async def run_record(
user_message = record.user_goal["starting_utterance"]
end_reason = "max_turns"
turn_count = 0
started_at = datetime.now(timezone.utc)
started_at = datetime.now(UTC)

for turn in range(max_turns):
turn_count = turn + 1
Expand Down Expand Up @@ -546,7 +546,7 @@ async def run_record(
else:
end_reason = "max_turns"

ended_at = datetime.now(timezone.utc)
ended_at = datetime.now(UTC)
duration = (ended_at - started_at).total_seconds()
logger.info(f"Conversation ended: {end_reason} ({turn_count} turns, {duration:.1f}s)")

Expand Down
5 changes: 3 additions & 2 deletions src/eva/assistant/agentic/audio_llm_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
audio so the model has full conversational context across turns.
"""

from collections.abc import AsyncGenerator
from pathlib import Path
from typing import Any, AsyncGenerator, Optional
from typing import Any

from eva.assistant.agentic.audit_log import AuditLog
from eva.assistant.agentic.system import AgenticSystem
Expand Down Expand Up @@ -37,7 +38,7 @@ def __init__(
tool_handler: ToolExecutor,
audit_log: AuditLog,
alm_client: ALMvLLMClient,
output_dir: Optional[Path] = None,
output_dir: Path | None = None,
):
super().__init__(
current_date_time=current_date_time,
Expand Down
38 changes: 19 additions & 19 deletions src/eva/assistant/agentic/audit_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import time
from enum import StrEnum
from pathlib import Path
from typing import Any, Optional
from typing import Any

from pydantic import BaseModel

Expand Down Expand Up @@ -32,11 +32,11 @@ class ConversationMessage(BaseModel):

role: MessageRole
content: str
tool_calls: Optional[list[dict[str, Any]]] = None
tool_call_id: Optional[str] = None
name: Optional[str] = None # For tool messages
turn_id: Optional[int] = None # For associating transcription updates
reasoning: Optional[str] = None # For model reasoning (e.g., from OpenAI o1)
tool_calls: list[dict[str, Any]] | None = None
tool_call_id: str | None = None
name: str | None = None # For tool messages
turn_id: int | None = None # For associating transcription updates
reasoning: str | None = None # For model reasoning (e.g., from OpenAI o1)

def to_dict(self) -> dict[str, Any]:
"""Convert to a plain dict, excluding None fields and internal tracking fields."""
Expand All @@ -47,17 +47,17 @@ class LLMCall(BaseModel):
"""Record of an LLM call."""

messages: list[dict]
tools: Optional[list[dict]] = None
response: Optional[ConversationMessage] = None
tools: list[dict] | None = None
response: ConversationMessage | None = None
duration_seconds: float = 0.0
start_time: str = ""
end_time: str = ""
status: str = "success"
model: Optional[str] = None
model: str | None = None
# New fields for enhanced tracking (optional for backward compatibility)
latency_ms: Optional[float] = None
error_type: Optional[str] = None
error_source: Optional[str] = None
latency_ms: float | None = None
error_type: str | None = None
error_source: str | None = None
retry_attempt: int = 0


Expand All @@ -75,13 +75,13 @@ def __init__(self):
self.conversation_messages: list[ConversationMessage] = [] # Full message sequence for LLM context
self._tool_calls_count = 0
self._tools_called: list[str] = []
self._last_tool_call: Optional[str] = None # Track last tool called for matching responses
self._last_tool_call: str | None = None # Track last tool called for matching responses

def append_user_input(
self,
content: str,
timestamp_ms: Optional[str] = None,
turn_id: Optional[int] = None,
timestamp_ms: str | None = None,
turn_id: int | None = None,
) -> None:
"""Record user input.

Expand Down Expand Up @@ -189,7 +189,7 @@ def append_assistant_output(
self,
content: str,
tool_calls: list[dict[str, Any]] | None = None,
timestamp_ms: Optional[str] = None,
timestamp_ms: str | None = None,
) -> None:
"""Record assistant output.

Expand Down Expand Up @@ -236,7 +236,7 @@ def append_tool_message(self, tool_call_id: str, content: str) -> None:
)
logger.debug(f"Audit: tool message for call_id {tool_call_id}")

def append_llm_call(self, llm_call: LLMCall, agent_name: Optional[str] = None) -> None:
def append_llm_call(self, llm_call: LLMCall, agent_name: str | None = None) -> None:
"""Record an LLM call."""
response_content = llm_call.response.content if llm_call.response else ""
response_dict = llm_call.response.to_dict() if llm_call.response else None
Expand Down Expand Up @@ -276,7 +276,7 @@ def append_tool_call(
self,
tool_name: str,
parameters: dict[str, Any],
response: Optional[dict[str, Any]] = None,
response: dict[str, Any] | None = None,
) -> None:
"""Record a tool call and its response."""
# Record tool call in transcript
Expand Down Expand Up @@ -342,7 +342,7 @@ def append_realtime_tool_call(

def get_conversation_messages(
self,
max_messages: Optional[int] = None,
max_messages: int | None = None,
) -> list[ConversationMessage]:
"""Get conversation messages for LLM context.

Expand Down
5 changes: 3 additions & 2 deletions src/eva/assistant/agentic/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
import json
import time
import warnings
from collections.abc import AsyncGenerator
from pathlib import Path
from typing import Any, AsyncGenerator
from typing import Any

from eva.assistant.agentic.audit_log import (
AuditLog,
Expand Down Expand Up @@ -220,7 +221,7 @@ async def _run_tool_loop(
llm_call_response = ConversationMessage(
role=MessageRole.ASSISTANT,
content=response_content,
tool_calls=tool_calls_dicts if tool_calls_dicts else None,
tool_calls=tool_calls_dicts or None,
reasoning=llm_stats.get("reasoning"),
)

Expand Down
6 changes: 3 additions & 3 deletions src/eva/assistant/pipeline/alm_vllm.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import struct
import time
import wave
from typing import Any, Optional
from typing import Any

from openai import AsyncOpenAI

Expand Down Expand Up @@ -163,7 +163,7 @@ def build_audio_user_message(
async def complete(
self,
messages: list[dict[str, Any]],
tools: Optional[list[dict]] = None,
tools: list[dict] | None = None,
) -> tuple[Any, dict[str, Any]]:
"""Chat completion with audio and tool support.

Expand All @@ -188,7 +188,7 @@ async def complete(
kwargs["tools"] = tools
kwargs["tool_choice"] = "auto"

last_exception: Optional[Exception] = None
last_exception: Exception | None = None
for attempt in range(self.max_retries + 1):
try:
start_time = time.time()
Expand Down
20 changes: 9 additions & 11 deletions src/eva/assistant/pipeline/audio_llm_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import wave
from collections.abc import Awaitable
from pathlib import Path
from typing import Any, Optional
from typing import Any

from openai import AsyncOpenAI
from pipecat.frames.frames import (
Expand Down Expand Up @@ -81,7 +81,7 @@ def __init__(
self,
context,
user_context_aggregator,
pre_speech_secs: Optional[float] = None,
pre_speech_secs: float | None = None,
**kwargs,
):
super().__init__(**kwargs)
Expand Down Expand Up @@ -176,7 +176,7 @@ def __init__(
audit_log: AuditLog,
alm_client: ALMvLLMClient,
audio_collector: AudioLLMUserAudioCollector,
output_dir: Optional[Path] = None,
output_dir: Path | None = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
Expand All @@ -195,11 +195,11 @@ def __init__(
)

# State tracking (mirrors BenchmarkAgentProcessor)
self._current_query_task: Optional[asyncio.Task] = None
self._current_query_task: asyncio.Task | None = None
self._interrupted = asyncio.Event()

# Optional callback for transcript saving (set by server.py)
self.on_assistant_response: Optional[Awaitable] = None
self.on_assistant_response: Awaitable | None = None

async def process_frame(self, frame: Frame, direction: FrameDirection) -> None:
if isinstance(frame, (EndFrame, CancelFrame)):
Expand Down Expand Up @@ -410,7 +410,7 @@ def __init__(
audio_collector: AudioLLMUserAudioCollector,
model: str = "",
params: dict[str, Any] = None,
system_prompt: Optional[str] = None,
system_prompt: str | None = None,
sample_rate: int = PIPELINE_SAMPLE_RATE,
**kwargs,
):
Expand All @@ -426,7 +426,7 @@ def __init__(
self._client: AsyncOpenAI = AsyncOpenAI(api_key=self._api_key, base_url=base_url)

# Callback for when transcription is ready (set by server.py)
self.on_transcription: Optional[Any] = None
self.on_transcription: Any | None = None

# Track background transcription tasks so they can complete even during interruptions
self._transcription_tasks: list[asyncio.Task] = []
Expand Down Expand Up @@ -463,7 +463,7 @@ async def process_frame(self, frame: Frame, direction: FrameDirection) -> None:
# Clean up completed tasks
self._transcription_tasks = [t for t in self._transcription_tasks if not t.done()]

async def transcribe(self, timestamp: str, turn_id: Optional[int] = None) -> Optional[str]:
async def transcribe(self, timestamp: str, turn_id: int | None = None) -> str | None:
"""Transcribe audio from the collector using chat completions.

This method can be called directly from event handlers or via frame processing.
Expand All @@ -479,9 +479,7 @@ async def transcribe(self, timestamp: str, turn_id: Optional[int] = None) -> Opt
audio_data = self._audio_collector.peek_buffered_audio()
return await self._transcribe_audio(audio_data, timestamp, turn_id)

async def _transcribe_audio(
self, audio_data: bytes, timestamp: str, turn_id: Optional[int] = None
) -> Optional[str]:
async def _transcribe_audio(self, audio_data: bytes, timestamp: str, turn_id: int | None = None) -> str | None:
"""Transcribe pre-captured audio data using chat completions.

This method takes audio data directly instead of reading from the collector,
Expand Down
10 changes: 5 additions & 5 deletions src/eva/assistant/pipeline/nvidia_stt.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import json
import ssl
import time
from typing import AsyncGenerator, Optional
from collections.abc import AsyncGenerator

import websockets
from loguru import logger
Expand Down Expand Up @@ -48,7 +48,7 @@ def __init__(
self,
*,
url: str = "ws://localhost:8080",
api_key: Optional[str] = None,
api_key: str | None = None,
sample_rate: int = 16000,
verify: bool = True,
**kwargs,
Expand All @@ -58,7 +58,7 @@ def __init__(
self._api_key = api_key
self._verify = verify
self._websocket = None
self._receive_task: Optional[asyncio.Task] = None
self._receive_task: asyncio.Task | None = None
self._ready = False

def can_generate_metrics(self) -> bool:
Expand Down Expand Up @@ -135,7 +135,7 @@ async def _connect_websocket(self):
self._websocket = await websockets.connect(
self._url,
ssl=ssl_context,
additional_headers=extra_headers if extra_headers else None,
additional_headers=extra_headers or None,
)
self._ready = False

Expand All @@ -149,7 +149,7 @@ async def _connect_websocket(self):
else:
logger.warning(f"{self} unexpected initial message: {data}")
self._ready = True
except asyncio.TimeoutError:
except TimeoutError:
logger.warning(f"{self} timeout waiting for ready, proceeding")
self._ready = True

Expand Down
10 changes: 5 additions & 5 deletions src/eva/assistant/pipeline/realtime_llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import struct
import time
from dataclasses import dataclass
from typing import Any, Optional
from typing import Any

from pipecat.frames.frames import Frame, InputAudioRawFrame, VADUserStartedSpeakingFrame, VADUserStoppedSpeakingFrame
from pipecat.processors.frame_processor import FrameDirection
Expand Down Expand Up @@ -75,14 +75,14 @@ def __init__(self, *, audit_log: AuditLog, **kwargs: Any) -> None:

# Assistant response accumulation (across audio_transcript_delta events)
self._current_assistant_transcript_parts: list[str] = []
self._assistant_response_start_wall_ms: Optional[str] = None
self._assistant_response_start_wall_ms: str | None = None

# Track whether we're mid-assistant-response (for interruption flushing)
self._assistant_responding: bool = False

# Track audio frame timing for VAD delay calculation
self._last_audio_frame_time: Optional[float] = None
self._vad_delay_ms: Optional[int] = None
self._last_audio_frame_time: float | None = None
self._vad_delay_ms: int | None = None

async def process_frame(self, frame: Frame, direction: FrameDirection) -> None:
"""Track audio frame timing before passing to parent.
Expand Down Expand Up @@ -295,7 +295,7 @@ def _reset_assistant_state(self) -> None:
self._assistant_responding = False

@property
def last_vad_delay_ms(self) -> Optional[int]:
def last_vad_delay_ms(self) -> int | None:
"""Return the most recent VAD delay in milliseconds.

This is the time between when audio frames stopped arriving and when
Expand Down
Loading
Loading