Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions docs/metric_context.md
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ Counts and flags computed during benchmark execution.
- `"transfer"`: Assistant transferred to live agent
- `"error"`: An error occurred
- **`duration_seconds: float`** - Total duration of the conversation in seconds.
- **`is_audio_native: bool`** - Whether this conversation used an audio-native architecture. Metrics should check this flag to adjust behavior (e.g., audio-native uses intended user text in conversation_trace).
- **`pipeline_type: PipelineType`** - The pipeline architecture used (`CASCADE`, `AUDIO_LLM`, or `S2S`). Access `context.is_audio_native` for a convenience boolean that returns `True` for both `AUDIO_LLM` and `S2S`.
- **`response_speed_latencies: list[float]`** - List of response latencies in seconds (time from user speech end to assistant speech start).

### File Paths
Expand Down Expand Up @@ -212,11 +212,11 @@ The LLM processes **transcribed text**, so `transcribed_user_turns` reflects wha

The model processes **raw audio**. The audit log may contain a transcript from the service's own secondary STT, but this is **not what the model used** — it's just for reference. This is why `transcribed_user_turns` is unreliable for audio-native models and `intended_user_turns` should be used instead.

Check `context.is_audio_native` (audio-native) to determine which mode was used.
Check `context.pipeline_type` to determine which mode was used, or `context.is_audio_native` for a boolean grouping of `S2S` and `AUDIO_LLM`.

### Writing Audio-Native-Aware Metrics

If your metric needs user text directly (rather than via `conversation_trace`, which handles this automatically), branch on `context.is_audio_native` (audio-native):
If your metric needs user text directly (rather than via `conversation_trace`, which handles this automatically), branch on `context.is_audio_native`:

```python
async def compute(self, context: MetricContext) -> MetricScore:
Expand Down
9 changes: 7 additions & 2 deletions src/eva/metrics/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
resolve_turn_id,
validate_rating,
)
from eva.models.config import PipelineType
from eva.models.results import MetricScore
from eva.utils.llm_client import LLMClient
from eva.utils.logging import get_logger
Expand Down Expand Up @@ -84,7 +85,7 @@ def __init__(
response_speed_latencies: list[float] | None = None,
assistant_interrupted_turns: set[int] | None = None,
user_interrupted_turns: set[int] | None = None,
is_audio_native: bool = False,
pipeline_type: PipelineType = PipelineType.CASCADE,
):
self.record_id = record_id

Expand Down Expand Up @@ -134,7 +135,11 @@ def __init__(
self.response_speed_latencies = response_speed_latencies or []
self.assistant_interrupted_turns = assistant_interrupted_turns or set()
self.user_interrupted_turns = user_interrupted_turns or set()
self.is_audio_native = is_audio_native
self.pipeline_type = pipeline_type

@property
def is_audio_native(self) -> bool:
return self.pipeline_type in (PipelineType.S2S, PipelineType.AUDIO_LLM)

def to_dict(self) -> dict[str, Any]:
"""Convert MetricContext to a serializable dictionary."""
Expand Down
74 changes: 49 additions & 25 deletions src/eva/metrics/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from pathlib import Path

from eva.assistant.agentic.system import GENERIC_ERROR
from eva.models.config import PipelineType
from eva.models.results import ConversationResult
from eva.utils.log_processing import (
AnnotationLabel,
Expand Down Expand Up @@ -138,7 +139,7 @@ def _process_user_speech(
state: _TurnExtractionState,
context: "_ProcessorContext",
conversation_trace: list[dict],
is_audio_native: bool,
pipeline_type: PipelineType,
) -> None:
"""Process a single user_speech event into intended_user_turns (and audio-native trace)."""
turn_idx = state.last_user_audio_turn
Expand All @@ -150,7 +151,7 @@ def _process_user_speech(
append_turn_text(context.intended_user_turns, turn_idx, user_text, sep)
state.user_speech_in_session = True
# For audio-native models, use intended user text in the conversation trace
if is_audio_native:
if pipeline_type in (PipelineType.S2S, PipelineType.AUDIO_LLM):
trace_entry = {
"role": "user",
"content": user_text,
Expand Down Expand Up @@ -186,7 +187,7 @@ def _handle_audit_log_event(
state: "_TurnExtractionState",
context: "_ProcessorContext",
conversation_trace: list[dict],
is_audio_native: bool,
pipeline_type: PipelineType,
) -> None:
"""Process a single audit_log source event into turn variables and conversation trace."""
if event["event_type"] == "user":
Expand All @@ -212,12 +213,14 @@ def _handle_audit_log_event(
entry["content"] = f"{AnnotationLabel.USER_INTERRUPTS} {entry['content']}"
state.pending_user_interrupts_label = False
# For audio-native models, user trace entries come from ElevenLabs user_speech instead
if not is_audio_native:
if pipeline_type == PipelineType.CASCADE:
conversation_trace.append(entry)
sep = _user_transcript_separator(existing, turn, state)
append_turn_text(context.transcribed_user_turns, turn, entry["content"], sep)

elif event["event_type"] == "assistant":
if pipeline_type == PipelineType.S2S:
return
turn = state.turn_num
content = event["data"]
# Apply interruption prefix if this is the first assistant entry in a turn where assistant barged in
Expand All @@ -226,7 +229,7 @@ def _handle_audit_log_event(
has_prior = any(e.get("role") == "assistant" and e.get("turn_id") == turn for e in conversation_trace)
if not has_prior:
content = f"{AnnotationLabel.ASSISTANT_INTERRUPTS} {content}"
user_entry_type = "intended" if is_audio_native else "transcribed"
user_entry_type = "transcribed" if pipeline_type == PipelineType.CASCADE else "intended"
annotate_last_entry(
conversation_trace, turn, "user", user_entry_type, AnnotationLabel.CUT_OFF_BY_ASSISTANT
)
Expand Down Expand Up @@ -303,7 +306,7 @@ def _handle_audio_start(
state: "_TurnExtractionState",
context: "_ProcessorContext",
conversation_trace: list[dict],
is_audio_native: bool,
pipeline_type: PipelineType,
) -> None:
"""Process an ElevenLabs audio_start event, advancing the turn counter if needed."""
role = event["data"]["user"]
Expand Down Expand Up @@ -345,7 +348,7 @@ def _handle_audio_start(
# Replay any buffered user_speech that arrived before this audio_start — now we know the correct turn.
if state.buffered_user_speech:
for buffered in state.buffered_user_speech:
_process_user_speech(buffered, state, context, conversation_trace, is_audio_native)
_process_user_speech(buffered, state, context, conversation_trace, pipeline_type)
state.buffered_user_speech.clear()

elif role == "pipecat_agent":
Expand Down Expand Up @@ -394,7 +397,7 @@ def _handle_elevenlabs_event(
state: "_TurnExtractionState",
context: "_ProcessorContext",
conversation_trace: list[dict],
is_audio_native: bool,
pipeline_type: PipelineType,
) -> bool:
"""Process a single elevenlabs source event. Returns True if the caller should continue."""
if event["event_type"] == "assistant_speech":
Expand All @@ -403,14 +406,25 @@ def _handle_elevenlabs_event(
turn = state.last_assistant_audio_turn
# Only mark "assistant spoke" if the speech belongs to the current turn; late transcripts from a
# previous turn must not trigger a spurious turn advance.
if turn == state.turn_num:
if turn == state.turn_num or pipeline_type == PipelineType.S2S:
state.assistant_spoke_in_turn = True
existing = context.transcribed_assistant_turns.get(turn, "")
sep = _assistant_speech_separator(existing, turn, state)
text = event["data"]["data"]["text"]
if not existing and turn in state.assistant_interrupted_turns:
text = f"{AnnotationLabel.ASSISTANT_INTERRUPTS} {text}"
append_turn_text(context.transcribed_assistant_turns, turn, text, sep)
# For S2S, assistant trace entries come from EL (audit log assistant entries are skipped)
if pipeline_type == PipelineType.S2S:
conversation_trace.append(
{
"role": "assistant",
"content": text,
"timestamp": event["timestamp_ms"],
"type": "transcribed",
"turn_id": turn,
}
)

elif event["event_type"] == "user_speech":
# Buffer user_speech when it cannot be paired with the current user audio session. This happens when:
Expand All @@ -431,10 +445,10 @@ def _handle_elevenlabs_event(
if raw_text in state.buffered_user_speech_texts:
state.buffered_user_speech_texts.discard(raw_text)
return False
_process_user_speech(event, state, context, conversation_trace, is_audio_native)
_process_user_speech(event, state, context, conversation_trace, pipeline_type)

elif event["event_type"] == "audio_start":
_handle_audio_start(event, state, context, conversation_trace, is_audio_native)
_handle_audio_start(event, state, context, conversation_trace, pipeline_type)

elif event["event_type"] == "audio_end":
_handle_audio_end(event, state)
Expand Down Expand Up @@ -550,7 +564,10 @@ def _finalize_extraction(
context.tool_params, context.tool_responses = extract_tool_params_and_responses(conversation_trace)
context.tool_called = [t["tool_name"].lower() for t in context.tool_params]
context.num_tool_calls = len(context.tool_params)
context.num_assistant_turns = len(context.intended_assistant_turns)
if context.pipeline_type == PipelineType.S2S:
context.num_assistant_turns = len(context.transcribed_assistant_turns)
else:
context.num_assistant_turns = len(context.intended_assistant_turns)
context.num_user_turns = len(context.transcribed_user_turns)

_warn_turn_misalignment(context)
Expand Down Expand Up @@ -592,11 +609,12 @@ def _ensure_greeting_is_first(context: "_ProcessorContext") -> None:
if greeting_idx is not None:
greeting = context.conversation_trace.pop(greeting_idx)
else:
# Cascade: greeting not in audit log — create from pipecat text.
# Greeting not in audit log — create from pipecat text (cascade) or transcribed text (S2S).
greeting_text = context.intended_assistant_turns.get(0) or context.transcribed_assistant_turns.get(0)
greeting = {
"role": "assistant",
"content": context.intended_assistant_turns.get(0),
"type": "intended",
"content": greeting_text,
"type": "intended" if context.intended_assistant_turns.get(0) else "transcribed",
"turn_id": 0,
}
context.conversation_trace.insert(0, greeting)
Expand Down Expand Up @@ -639,8 +657,9 @@ def _label_trailing_assistant_turn(context: "_ProcessorContext", last_entry: dic
{"role": "assistant", "content": labeled, "type": "intended", "turn_id": trailing_turn_id}
)

# Sync intended + transcribed
context.intended_assistant_turns[trailing_turn_id] = labeled
# Sync intended + transcribed (skip intended for S2S — no intended text exists)
if context.pipeline_type != PipelineType.S2S:
context.intended_assistant_turns[trailing_turn_id] = labeled
if not context.transcribed_assistant_turns.get(trailing_turn_id):
context.transcribed_assistant_turns[trailing_turn_id] = labeled
else:
Expand Down Expand Up @@ -687,7 +706,7 @@ def __init__(self):
# Conversation metadata
self.conversation_finished: bool = False
self.conversation_ended_reason: str | None = None
self.is_audio_native: bool = False
self.pipeline_type: PipelineType = PipelineType.CASCADE

# Response latencies from Pipecat's UserBotLatencyObserver
self.response_speed_latencies: list[float] = []
Expand All @@ -703,14 +722,14 @@ def process_record(
self,
result: ConversationResult,
output_dir: Path,
is_audio_native: bool = False,
pipeline_type: PipelineType = PipelineType.CASCADE,
) -> _ProcessorContext | None:
"""Process a single conversation record to create metric context.

Args:
result: ConversationResult object
output_dir: Path to the output directory containing logs
is_audio_native: Whether the model is audio-native
pipeline_type: The type of voice pipeline used

Returns:
_ProcessorContext object with all processed variables, or None if processing failed
Expand All @@ -720,7 +739,7 @@ def process_record(
context.audio_assistant_path = result.audio_assistant_path
context.audio_user_path = result.audio_user_path
context.audio_mixed_path = result.audio_mixed_path
context.is_audio_native = is_audio_native
context.pipeline_type = pipeline_type

try:
self._build_history(context, output_dir, result)
Expand Down Expand Up @@ -824,7 +843,8 @@ def _build_history(
Each entry: {timestamp_ms, source, event_type, data}.
"""
history = self._load_audit_log_transcript(output_dir)
history.extend(self._load_pipecat_logs(result.pipecat_logs_path))
if context.pipeline_type != PipelineType.S2S:
history.extend(self._load_pipecat_logs(result.pipecat_logs_path))
history.extend(self._load_elevenlabs_logs(result.elevenlabs_logs_path))

history.sort(key=lambda e: e["timestamp_ms"])
Expand Down Expand Up @@ -859,18 +879,22 @@ def _extract_turns_from_history(context: _ProcessorContext) -> None:
conversation_trace: list[dict] = []
for event in context.history:
if event["source"] == "audit_log":
_handle_audit_log_event(event, state, context, conversation_trace, context.is_audio_native)
_handle_audit_log_event(event, state, context, conversation_trace, context.pipeline_type)
elif event["source"] == "pipecat":
_handle_pipecat_event(event, state, context, conversation_trace)
elif event["source"] == "elevenlabs":
if _handle_elevenlabs_event(event, state, context, conversation_trace, context.is_audio_native):
if _handle_elevenlabs_event(event, state, context, conversation_trace, context.pipeline_type):
continue

if not state.session_end_ts:
state.session_end_ts = context.history[-1].get("timestamp_ms") / 1000.0

_pair_audio_segments(state, context)
validated_trace = _validate_conversation_trace(conversation_trace, context)
if context.pipeline_type == PipelineType.S2S:
# S2S has no pipecat segments to validate against — trace entries come from EL directly
validated_trace = conversation_trace
else:
validated_trace = _validate_conversation_trace(conversation_trace, context)
context.conversation_trace = group_consecutive_turns(validated_trace)
_fix_interruption_labels(context, state)
_finalize_extraction(context, state, conversation_trace)
Expand Down
8 changes: 3 additions & 5 deletions src/eva/metrics/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from eva.metrics.base import BaseMetric, MetricContext
from eva.metrics.processor import MetricsContextProcessor
from eva.metrics.registry import MetricRegistry, get_global_registry
from eva.models.config import is_audio_native_pipeline
from eva.models.config import PipelineType, get_pipeline_type
from eva.models.record import EvaluationRecord
from eva.models.results import ConversationResult, MetricScore, PassAtKResult, RecordMetrics
from eva.utils.hash_utils import get_dict_hash
Expand Down Expand Up @@ -130,7 +130,7 @@ def _load_agent_config(self) -> dict[str, Any]:

# Determine pipeline type from config (fallback to False for legacy runs)
model_data = config_data.get("model", {})
self._is_audio_native = is_audio_native_pipeline(model_data) if model_data else False
self._pipeline_type = get_pipeline_type(model_data) if model_data else PipelineType.CASCADE

agent_config_path = config_data.get("agent_config_path")

Expand Down Expand Up @@ -429,9 +429,7 @@ def _load_context(self, record_id: str, record_dir: Path) -> MetricContext:
result = ConversationResult(**result_data)

# Use postprocessor to process logs and create enriched context
metrics_context = self.metrics_processor.process_record(
result, record_dir, is_audio_native=self._is_audio_native
)
metrics_context = self.metrics_processor.process_record(result, record_dir, pipeline_type=self._pipeline_type)

# Get agent instructions and tools from config
agent_instructions = self._agent_config["instructions"]
Expand Down
24 changes: 17 additions & 7 deletions src/eva/models/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import copy
import logging
from datetime import UTC, datetime
from enum import StrEnum
from pathlib import Path
from typing import Annotated, Any, ClassVar, Literal

Expand Down Expand Up @@ -171,6 +172,14 @@ def pipeline_parts(self) -> dict[str, str]:
_AUDIO_LLM_FIELDS = {"audio_llm", "audio_llm_params", "tts", "tts_params"}


class PipelineType(StrEnum):
"""Type of voice pipeline."""

CASCADE = "cascade"
AUDIO_LLM = "audio_llm"
S2S = "s2s"


def _model_config_discriminator(data: Any) -> str:
"""Discriminate which pipeline config type to use based on unique fields."""
if isinstance(data, dict):
Expand All @@ -186,21 +195,22 @@ def _model_config_discriminator(data: Any) -> str:
return "pipeline"


def is_audio_native_pipeline(model_data: dict | Any) -> bool:
"""Return True if the model config represents an audio-native pipeline (S2S or AudioLLM).
def get_pipeline_type(model_data: dict | Any) -> PipelineType:
"""Return the pipeline type for the given model config.

Works with both raw dicts (e.g. from config.json) and parsed model config objects.
Also handles legacy configs where ``realtime_model`` was stored alongside
``llm_model`` in a flat dict (before the discriminated-union refactor).
Returns False for configs missing the ``model`` key.
"""
mode = _model_config_discriminator(model_data)
if mode in ("s2s", "audio_llm"):
return True
if mode == "s2s":
return PipelineType.S2S
if mode == "audio_llm":
return PipelineType.AUDIO_LLM
# Legacy: realtime_model was a sibling of llm_model before the union split
if isinstance(model_data, dict) and model_data.get("realtime_model"):
return True
return False
return PipelineType.S2S
return PipelineType.CASCADE


def _strip_other_mode_fields(data: dict) -> dict:
Expand Down
Loading
Loading