Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 40 additions & 2 deletions langfuse/_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
PaginatedDatasetRuns,
)
from langfuse.api.resources.ingestion.types.score_body import ScoreBody
from langfuse.api.resources.ingestion.types.trace_body import TraceBody
from langfuse.api.resources.prompts.types import (
CreatePromptRequest_Chat,
CreatePromptRequest_Text,
Expand Down Expand Up @@ -2098,6 +2099,39 @@ def create_score(
f"Error creating score: Failed to process score event for trace_id={trace_id}, name={name}. Error: {e}"
)

def _create_trace_tags_via_ingestion(
self,
*,
trace_id: str,
tags: List[str],
) -> None:
"""Private helper to enqueue trace tag updates via ingestion API events."""
if not self._tracing_enabled:
return

if len(tags) == 0:
return

try:
new_body = TraceBody(
id=trace_id,
tags=tags,
)

event = {
"id": self.create_trace_id(),
"type": "trace-create",
"timestamp": _get_timestamp(),
"body": new_body,
}

if self._resources is not None:
self._resources.add_trace_task(event)
except Exception as e:
langfuse_logger.exception(
f"Error updating trace tags: Failed to process trace update event for trace_id={trace_id}. Error: {e}"
)

@overload
def score_current_span(
self,
Expand Down Expand Up @@ -3115,8 +3149,10 @@ def run_batched_evaluation(
max_retries: int = 3,
evaluators: List[EvaluatorFunction],
composite_evaluator: Optional[CompositeEvaluatorFunction] = None,
max_concurrency: int = 50,
max_concurrency: int = 5,
metadata: Optional[Dict[str, Any]] = None,
_add_observation_scores_to_trace: bool = False,
_additional_trace_tags: Optional[List[str]] = None,
resume_from: Optional[BatchEvaluationResumeToken] = None,
verbose: bool = False,
) -> BatchEvaluationResult:
Expand Down Expand Up @@ -3158,7 +3194,7 @@ def run_batched_evaluation(
items matching the filter. Useful for testing or limiting evaluation runs.
Default: None (process all).
max_concurrency: Maximum number of items to evaluate concurrently. Controls
parallelism and resource usage. Default: 50.
parallelism and resource usage. Default: 5.
composite_evaluator: Optional function that creates a composite score from
item-level evaluations. Receives the original item and its evaluations,
returns a single Evaluation. Useful for weighted averages or combined metrics.
Expand Down Expand Up @@ -3327,6 +3363,8 @@ def composite_evaluator(*, item, evaluations):
max_concurrency=max_concurrency,
composite_evaluator=composite_evaluator,
metadata=metadata,
_add_observation_scores_to_trace=_add_observation_scores_to_trace,
_additional_trace_tags=_additional_trace_tags,
max_retries=max_retries,
verbose=verbose,
resume_from=resume_from,
Expand Down
23 changes: 23 additions & 0 deletions langfuse/_client/resource_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,29 @@ def add_score_task(self, event: dict, *, force_sample: bool = False) -> None:

return

def add_trace_task(
self,
event: dict,
) -> None:
try:
langfuse_logger.debug(
f"Trace: Enqueuing event type={event['type']} for trace_id={event['body'].id}"
)
self._score_ingestion_queue.put(event, block=False)

except Full:
langfuse_logger.warning(
"System overload: Trace ingestion queue has reached capacity (100,000 items). Trace update will be dropped. Consider increasing flush frequency or decreasing event volume."
)

return
except Exception as e:
langfuse_logger.error(
f"Unexpected error: Failed to process trace event. The trace update will be dropped. Error details: {e}"
)

return

@property
def tracer(self) -> Optional[Tracer]:
return self._otel_tracer
Expand Down
82 changes: 76 additions & 6 deletions langfuse/batch_evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
List,
Optional,
Protocol,
Set,
Tuple,
Union,
cast,
Expand Down Expand Up @@ -849,9 +850,11 @@ async def run_async(
fetch_batch_size: int = 50,
fetch_trace_fields: Optional[str] = None,
max_items: Optional[int] = None,
max_concurrency: int = 50,
max_concurrency: int = 5,
composite_evaluator: Optional[CompositeEvaluatorFunction] = None,
metadata: Optional[Dict[str, Any]] = None,
_add_observation_scores_to_trace: bool = False,
_additional_trace_tags: Optional[List[str]] = None,
max_retries: int = 3,
verbose: bool = False,
resume_from: Optional[BatchEvaluationResumeToken] = None,
Expand All @@ -873,6 +876,10 @@ async def run_async(
max_concurrency: Maximum number of concurrent evaluations.
composite_evaluator: Optional function to create composite scores.
metadata: Metadata to add to all created scores.
_add_observation_scores_to_trace: Private option to duplicate
observation-level scores onto the parent trace.
_additional_trace_tags: Private option to add tags on traces via
ingestion trace-create events.
max_retries: Maximum retries for failed batch fetches.
verbose: If True, log progress to console.
resume_from: Resume token from a previous failed run.
Expand Down Expand Up @@ -903,6 +910,12 @@ async def run_async(

# Handle resume token by modifying filter
effective_filter = self._build_timestamp_filter(filter, resume_from)
normalized_additional_trace_tags = (
self._dedupe_tags(_additional_trace_tags)
if _additional_trace_tags is not None
else []
)
updated_trace_ids: Set[str] = set()

# Create semaphore for concurrency control
semaphore = asyncio.Semaphore(max_concurrency)
Expand Down Expand Up @@ -1011,6 +1024,7 @@ async def process_item(
evaluators=evaluators,
composite_evaluator=composite_evaluator,
metadata=metadata,
_add_observation_scores_to_trace=_add_observation_scores_to_trace,
evaluator_stats_dict=evaluator_stats_dict,
)
return (item_id, result)
Expand Down Expand Up @@ -1043,6 +1057,20 @@ async def process_item(
# Store evaluations for this item
item_evaluations[item_id] = evaluations

if normalized_additional_trace_tags:
trace_id = (
item_id
if scope == "traces"
else cast(ObservationsView, item).trace_id
)

if trace_id and trace_id not in updated_trace_ids:
self.client._create_trace_tags_via_ingestion(
trace_id=trace_id,
tags=normalized_additional_trace_tags,
)
updated_trace_ids.add(trace_id)

# Update last processed tracking
last_item_timestamp = self._get_item_timestamp(item, scope)
last_item_id = item_id
Expand Down Expand Up @@ -1168,6 +1196,7 @@ async def _process_batch_evaluation_item(
evaluators: List[EvaluatorFunction],
composite_evaluator: Optional[CompositeEvaluatorFunction],
metadata: Optional[Dict[str, Any]],
_add_observation_scores_to_trace: bool,
evaluator_stats_dict: Dict[str, EvaluatorStats],
) -> Tuple[int, int, int, List[Evaluation]]:
"""Process a single item: map, evaluate, create scores.
Expand All @@ -1179,6 +1208,8 @@ async def _process_batch_evaluation_item(
evaluators: List of evaluator functions.
composite_evaluator: Optional composite evaluator function.
metadata: Additional metadata to add to scores.
_add_observation_scores_to_trace: Whether to duplicate
observation-level scores at trace level.
evaluator_stats_dict: Dictionary tracking evaluator statistics.

Returns:
Expand Down Expand Up @@ -1226,16 +1257,16 @@ async def _process_batch_evaluation_item(
# Create scores for item-level evaluations
item_id = self._get_item_id(item, scope)
for evaluation in evaluations:
self._create_score_for_scope(
scores_created += self._create_score_for_scope(
scope=scope,
item_id=item_id,
trace_id=cast(ObservationsView, item).trace_id
if scope == "observations"
else None,
evaluation=evaluation,
additional_metadata=metadata,
add_observation_score_to_trace=_add_observation_scores_to_trace,
)
scores_created += 1

# Run composite evaluator if provided and we have evaluations
if composite_evaluator and evaluations:
Expand All @@ -1251,16 +1282,16 @@ async def _process_batch_evaluation_item(

# Create scores for all composite evaluations
for composite_eval in composite_evals:
self._create_score_for_scope(
composite_scores_created += self._create_score_for_scope(
scope=scope,
item_id=item_id,
trace_id=cast(ObservationsView, item).trace_id
if scope == "observations"
else None,
evaluation=composite_eval,
additional_metadata=metadata,
add_observation_score_to_trace=_add_observation_scores_to_trace,
)
composite_scores_created += 1

# Add composite evaluations to the list
evaluations.extend(composite_evals)
Expand Down Expand Up @@ -1382,7 +1413,8 @@ def _create_score_for_scope(
trace_id: Optional[str] = None,
evaluation: Evaluation,
additional_metadata: Optional[Dict[str, Any]],
) -> None:
add_observation_score_to_trace: bool = False,
) -> int:
"""Create a score linked to the appropriate entity based on scope.

Args:
Expand All @@ -1391,6 +1423,11 @@ def _create_score_for_scope(
trace_id: The trace ID of the entity; required if scope=observations
evaluation: The evaluation result to create a score from.
additional_metadata: Additional metadata to merge with evaluation metadata.
add_observation_score_to_trace: Whether to duplicate observation
score on parent trace as well.

Returns:
Number of score events created.
"""
# Merge metadata
score_metadata = {
Expand All @@ -1408,6 +1445,7 @@ def _create_score_for_scope(
data_type=evaluation.data_type, # type: ignore[arg-type]
config_id=evaluation.config_id,
)
return 1
elif scope == "observations":
self.client.create_score(
observation_id=item_id,
Expand All @@ -1419,6 +1457,23 @@ def _create_score_for_scope(
data_type=evaluation.data_type, # type: ignore[arg-type]
config_id=evaluation.config_id,
)
score_count = 1

if add_observation_score_to_trace and trace_id:
self.client.create_score(
trace_id=trace_id,
name=evaluation.name,
value=evaluation.value, # type: ignore
comment=evaluation.comment,
metadata=score_metadata,
data_type=evaluation.data_type, # type: ignore[arg-type]
config_id=evaluation.config_id,
)
score_count += 1

return score_count

return 0

def _build_timestamp_filter(
self,
Expand Down Expand Up @@ -1519,6 +1574,21 @@ def _get_timestamp_field_for_scope(scope: str) -> str:
return "start_time"
return "timestamp" # Default

@staticmethod
def _dedupe_tags(tags: Optional[List[str]]) -> List[str]:
"""Deduplicate tags while preserving order."""
if tags is None:
return []

deduped: List[str] = []
seen = set()
for tag in tags:
if tag not in seen:
deduped.append(tag)
seen.add(tag)

return deduped

def _build_result(
self,
total_items_fetched: int,
Expand Down
Loading