From 3539208f4d7006e0541b54f6e729829bb6dacd12 Mon Sep 17 00:00:00 2001 From: Hassieb Pakzad <68423100+hassiebp@users.noreply.github.com> Date: Mon, 23 Feb 2026 11:11:20 +0100 Subject: [PATCH 1/2] feat(batch-evaluation): add trace tags and roll up scores to trace --- langfuse/_client/client.py | 46 ++++++++++++- langfuse/_client/resource_manager.py | 99 ++++++++++++++++++++-------- langfuse/batch_evaluation.py | 82 +++++++++++++++++++++-- 3 files changed, 191 insertions(+), 36 deletions(-) diff --git a/langfuse/_client/client.py b/langfuse/_client/client.py index ccb5e3cbf..8a2750eaa 100644 --- a/langfuse/_client/client.py +++ b/langfuse/_client/client.py @@ -86,6 +86,7 @@ PaginatedDatasetRuns, ) from langfuse.api.resources.ingestion.types.score_body import ScoreBody +from langfuse.api.resources.ingestion.types.trace_body import TraceBody from langfuse.api.resources.prompts.types import ( CreatePromptRequest_Chat, CreatePromptRequest_Text, @@ -2098,6 +2099,43 @@ def create_score( f"Error creating score: Failed to process score event for trace_id={trace_id}, name={name}. Error: {e}" ) + def _create_trace_tags_via_ingestion( + self, + *, + trace_id: str, + tags: List[str], + ) -> None: + """Private helper to enqueue trace tag updates via ingestion API events.""" + if not self._tracing_enabled: + return + + if len(tags) == 0: + return + + try: + new_body = TraceBody( + id=trace_id, + tags=tags, + ) + + event = { + "id": self.create_trace_id(), + "type": "trace-create", + "timestamp": _get_timestamp(), + "body": new_body, + } + + if self._resources is not None: + self._resources.add_trace_task( + event, + trace_id=trace_id, + force_sample=True, + ) + except Exception as e: + langfuse_logger.exception( + f"Error updating trace tags: Failed to process trace update event for trace_id={trace_id}. Error: {e}" + ) + @overload def score_current_span( self, @@ -3115,8 +3153,10 @@ def run_batched_evaluation( max_retries: int = 3, evaluators: List[EvaluatorFunction], composite_evaluator: Optional[CompositeEvaluatorFunction] = None, - max_concurrency: int = 50, + max_concurrency: int = 5, metadata: Optional[Dict[str, Any]] = None, + _add_observation_scores_to_trace: bool = False, + _additional_trace_tags: Optional[List[str]] = None, resume_from: Optional[BatchEvaluationResumeToken] = None, verbose: bool = False, ) -> BatchEvaluationResult: @@ -3158,7 +3198,7 @@ def run_batched_evaluation( items matching the filter. Useful for testing or limiting evaluation runs. Default: None (process all). max_concurrency: Maximum number of items to evaluate concurrently. Controls - parallelism and resource usage. Default: 50. + parallelism and resource usage. Default: 5. composite_evaluator: Optional function that creates a composite score from item-level evaluations. Receives the original item and its evaluations, returns a single Evaluation. Useful for weighted averages or combined metrics. @@ -3327,6 +3367,8 @@ def composite_evaluator(*, item, evaluations): max_concurrency=max_concurrency, composite_evaluator=composite_evaluator, metadata=metadata, + _add_observation_scores_to_trace=_add_observation_scores_to_trace, + _additional_trace_tags=_additional_trace_tags, max_retries=max_retries, verbose=verbose, resume_from=resume_from, diff --git a/langfuse/_client/resource_manager.py b/langfuse/_client/resource_manager.py index 08c008234..a8639692f 100644 --- a/langfuse/_client/resource_manager.py +++ b/langfuse/_client/resource_manager.py @@ -306,37 +306,49 @@ def reset(cls) -> None: cls._instances.clear() + def _enqueue_ingestion_task( + self, + *, + event: dict, + trace_id: Optional[str], + sampling_name: str = "ingestion", + force_sample: bool = False, + ) -> None: + """Enqueue ingestion event with trace sampling aligned to the OTel sampler.""" + # Sample ingestion events with the same sampler that is used for tracing + tracer_provider = cast(TracerProvider, otel_trace_api.get_tracer_provider()) + should_sample = ( + force_sample + or isinstance( + tracer_provider, otel_trace_api.ProxyTracerProvider + ) # default to in-sample if otel sampler is not available + or ( + tracer_provider.sampler.should_sample( + parent_context=None, + trace_id=int(trace_id, 16), + name=sampling_name, + ).decision + == Decision.RECORD_AND_SAMPLE + if trace_id is not None + else True + ) + ) + + if should_sample: + self._score_ingestion_queue.put(event, block=False) + def add_score_task(self, event: dict, *, force_sample: bool = False) -> None: try: - # Sample scores with the same sampler that is used for tracing - tracer_provider = cast(TracerProvider, otel_trace_api.get_tracer_provider()) - should_sample = ( - force_sample - or isinstance( - tracer_provider, otel_trace_api.ProxyTracerProvider - ) # default to in-sample if otel sampler is not available - or ( - ( - tracer_provider.sampler.should_sample( - parent_context=None, - trace_id=int(event["body"].trace_id, 16), - name="score", - ).decision - == Decision.RECORD_AND_SAMPLE - if hasattr(event["body"], "trace_id") - else True - ) - if event["body"].trace_id - is not None # do not sample out session / dataset run scores - else True - ) + trace_id = event["body"].trace_id + langfuse_logger.debug( + f"Score: Enqueuing event type={event['type']} for trace_id={trace_id} name={event['body'].name} value={event['body'].value}" + ) + self._enqueue_ingestion_task( + event=event, + trace_id=trace_id, + sampling_name="score", + force_sample=force_sample, ) - - if should_sample: - langfuse_logger.debug( - f"Score: Enqueuing event type={event['type']} for trace_id={event['body'].trace_id} name={event['body'].name} value={event['body'].value}" - ) - self._score_ingestion_queue.put(event, block=False) except Full: langfuse_logger.warning( @@ -351,6 +363,37 @@ def add_score_task(self, event: dict, *, force_sample: bool = False) -> None: return + def add_trace_task( + self, + event: dict, + *, + trace_id: Optional[str], + force_sample: bool = False, + ) -> None: + try: + langfuse_logger.debug( + f"Trace: Enqueuing event type={event['type']} for trace_id={trace_id}" + ) + self._enqueue_ingestion_task( + event=event, + trace_id=trace_id, + sampling_name="trace", + force_sample=force_sample, + ) + + except Full: + langfuse_logger.warning( + "System overload: Trace ingestion queue has reached capacity (100,000 items). Trace update will be dropped. Consider increasing flush frequency or decreasing event volume." + ) + + return + except Exception as e: + langfuse_logger.error( + f"Unexpected error: Failed to process trace event. The trace update will be dropped. Error details: {e}" + ) + + return + @property def tracer(self) -> Optional[Tracer]: return self._otel_tracer diff --git a/langfuse/batch_evaluation.py b/langfuse/batch_evaluation.py index 7783955dd..6de480e67 100644 --- a/langfuse/batch_evaluation.py +++ b/langfuse/batch_evaluation.py @@ -18,6 +18,7 @@ List, Optional, Protocol, + Set, Tuple, Union, cast, @@ -849,9 +850,11 @@ async def run_async( fetch_batch_size: int = 50, fetch_trace_fields: Optional[str] = None, max_items: Optional[int] = None, - max_concurrency: int = 50, + max_concurrency: int = 5, composite_evaluator: Optional[CompositeEvaluatorFunction] = None, metadata: Optional[Dict[str, Any]] = None, + _add_observation_scores_to_trace: bool = False, + _additional_trace_tags: Optional[List[str]] = None, max_retries: int = 3, verbose: bool = False, resume_from: Optional[BatchEvaluationResumeToken] = None, @@ -873,6 +876,10 @@ async def run_async( max_concurrency: Maximum number of concurrent evaluations. composite_evaluator: Optional function to create composite scores. metadata: Metadata to add to all created scores. + _add_observation_scores_to_trace: Private option to duplicate + observation-level scores onto the parent trace. + _additional_trace_tags: Private option to add tags on traces via + ingestion trace-create events. max_retries: Maximum retries for failed batch fetches. verbose: If True, log progress to console. resume_from: Resume token from a previous failed run. @@ -903,6 +910,12 @@ async def run_async( # Handle resume token by modifying filter effective_filter = self._build_timestamp_filter(filter, resume_from) + normalized_additional_trace_tags = ( + self._dedupe_tags(_additional_trace_tags) + if _additional_trace_tags is not None + else [] + ) + updated_trace_ids: Set[str] = set() # Create semaphore for concurrency control semaphore = asyncio.Semaphore(max_concurrency) @@ -1011,6 +1024,7 @@ async def process_item( evaluators=evaluators, composite_evaluator=composite_evaluator, metadata=metadata, + _add_observation_scores_to_trace=_add_observation_scores_to_trace, evaluator_stats_dict=evaluator_stats_dict, ) return (item_id, result) @@ -1043,6 +1057,20 @@ async def process_item( # Store evaluations for this item item_evaluations[item_id] = evaluations + if normalized_additional_trace_tags: + trace_id = ( + item_id + if scope == "traces" + else cast(ObservationsView, item).trace_id + ) + + if trace_id and trace_id not in updated_trace_ids: + self.client._create_trace_tags_via_ingestion( + trace_id=trace_id, + tags=normalized_additional_trace_tags, + ) + updated_trace_ids.add(trace_id) + # Update last processed tracking last_item_timestamp = self._get_item_timestamp(item, scope) last_item_id = item_id @@ -1168,6 +1196,7 @@ async def _process_batch_evaluation_item( evaluators: List[EvaluatorFunction], composite_evaluator: Optional[CompositeEvaluatorFunction], metadata: Optional[Dict[str, Any]], + _add_observation_scores_to_trace: bool, evaluator_stats_dict: Dict[str, EvaluatorStats], ) -> Tuple[int, int, int, List[Evaluation]]: """Process a single item: map, evaluate, create scores. @@ -1179,6 +1208,8 @@ async def _process_batch_evaluation_item( evaluators: List of evaluator functions. composite_evaluator: Optional composite evaluator function. metadata: Additional metadata to add to scores. + _add_observation_scores_to_trace: Whether to duplicate + observation-level scores at trace level. evaluator_stats_dict: Dictionary tracking evaluator statistics. Returns: @@ -1226,7 +1257,7 @@ async def _process_batch_evaluation_item( # Create scores for item-level evaluations item_id = self._get_item_id(item, scope) for evaluation in evaluations: - self._create_score_for_scope( + scores_created += self._create_score_for_scope( scope=scope, item_id=item_id, trace_id=cast(ObservationsView, item).trace_id @@ -1234,8 +1265,8 @@ async def _process_batch_evaluation_item( else None, evaluation=evaluation, additional_metadata=metadata, + add_observation_score_to_trace=_add_observation_scores_to_trace, ) - scores_created += 1 # Run composite evaluator if provided and we have evaluations if composite_evaluator and evaluations: @@ -1251,7 +1282,7 @@ async def _process_batch_evaluation_item( # Create scores for all composite evaluations for composite_eval in composite_evals: - self._create_score_for_scope( + composite_scores_created += self._create_score_for_scope( scope=scope, item_id=item_id, trace_id=cast(ObservationsView, item).trace_id @@ -1259,8 +1290,8 @@ async def _process_batch_evaluation_item( else None, evaluation=composite_eval, additional_metadata=metadata, + add_observation_score_to_trace=_add_observation_scores_to_trace, ) - composite_scores_created += 1 # Add composite evaluations to the list evaluations.extend(composite_evals) @@ -1382,7 +1413,8 @@ def _create_score_for_scope( trace_id: Optional[str] = None, evaluation: Evaluation, additional_metadata: Optional[Dict[str, Any]], - ) -> None: + add_observation_score_to_trace: bool = False, + ) -> int: """Create a score linked to the appropriate entity based on scope. Args: @@ -1391,6 +1423,11 @@ def _create_score_for_scope( trace_id: The trace ID of the entity; required if scope=observations evaluation: The evaluation result to create a score from. additional_metadata: Additional metadata to merge with evaluation metadata. + add_observation_score_to_trace: Whether to duplicate observation + score on parent trace as well. + + Returns: + Number of score events created. """ # Merge metadata score_metadata = { @@ -1408,6 +1445,7 @@ def _create_score_for_scope( data_type=evaluation.data_type, # type: ignore[arg-type] config_id=evaluation.config_id, ) + return 1 elif scope == "observations": self.client.create_score( observation_id=item_id, @@ -1419,6 +1457,23 @@ def _create_score_for_scope( data_type=evaluation.data_type, # type: ignore[arg-type] config_id=evaluation.config_id, ) + score_count = 1 + + if add_observation_score_to_trace and trace_id: + self.client.create_score( + trace_id=trace_id, + name=evaluation.name, + value=evaluation.value, # type: ignore + comment=evaluation.comment, + metadata=score_metadata, + data_type=evaluation.data_type, # type: ignore[arg-type] + config_id=evaluation.config_id, + ) + score_count += 1 + + return score_count + + return 0 def _build_timestamp_filter( self, @@ -1519,6 +1574,21 @@ def _get_timestamp_field_for_scope(scope: str) -> str: return "start_time" return "timestamp" # Default + @staticmethod + def _dedupe_tags(tags: Optional[List[str]]) -> List[str]: + """Deduplicate tags while preserving order.""" + if tags is None: + return [] + + deduped: List[str] = [] + seen = set() + for tag in tags: + if tag not in seen: + deduped.append(tag) + seen.add(tag) + + return deduped + def _build_result( self, total_items_fetched: int, From 8afdc2e34b8401821ac50651adf19c54f7b7c475 Mon Sep 17 00:00:00 2001 From: Hassieb Pakzad <68423100+hassiebp@users.noreply.github.com> Date: Mon, 23 Feb 2026 11:31:03 +0100 Subject: [PATCH 2/2] push --- langfuse/_client/client.py | 6 +-- langfuse/_client/resource_manager.py | 80 +++++++++++----------------- 2 files changed, 31 insertions(+), 55 deletions(-) diff --git a/langfuse/_client/client.py b/langfuse/_client/client.py index 8a2750eaa..3787280c5 100644 --- a/langfuse/_client/client.py +++ b/langfuse/_client/client.py @@ -2126,11 +2126,7 @@ def _create_trace_tags_via_ingestion( } if self._resources is not None: - self._resources.add_trace_task( - event, - trace_id=trace_id, - force_sample=True, - ) + self._resources.add_trace_task(event) except Exception as e: langfuse_logger.exception( f"Error updating trace tags: Failed to process trace update event for trace_id={trace_id}. Error: {e}" diff --git a/langfuse/_client/resource_manager.py b/langfuse/_client/resource_manager.py index a8639692f..e1b3aa7a9 100644 --- a/langfuse/_client/resource_manager.py +++ b/langfuse/_client/resource_manager.py @@ -306,50 +306,38 @@ def reset(cls) -> None: cls._instances.clear() - def _enqueue_ingestion_task( - self, - *, - event: dict, - trace_id: Optional[str], - sampling_name: str = "ingestion", - force_sample: bool = False, - ) -> None: - """Enqueue ingestion event with trace sampling aligned to the OTel sampler.""" - # Sample ingestion events with the same sampler that is used for tracing - tracer_provider = cast(TracerProvider, otel_trace_api.get_tracer_provider()) - should_sample = ( - force_sample - or isinstance( - tracer_provider, otel_trace_api.ProxyTracerProvider - ) # default to in-sample if otel sampler is not available - or ( - tracer_provider.sampler.should_sample( - parent_context=None, - trace_id=int(trace_id, 16), - name=sampling_name, - ).decision - == Decision.RECORD_AND_SAMPLE - if trace_id is not None - else True - ) - ) - - if should_sample: - self._score_ingestion_queue.put(event, block=False) - def add_score_task(self, event: dict, *, force_sample: bool = False) -> None: try: - trace_id = event["body"].trace_id - langfuse_logger.debug( - f"Score: Enqueuing event type={event['type']} for trace_id={trace_id} name={event['body'].name} value={event['body'].value}" - ) - self._enqueue_ingestion_task( - event=event, - trace_id=trace_id, - sampling_name="score", - force_sample=force_sample, + # Sample scores with the same sampler that is used for tracing + tracer_provider = cast(TracerProvider, otel_trace_api.get_tracer_provider()) + should_sample = ( + force_sample + or isinstance( + tracer_provider, otel_trace_api.ProxyTracerProvider + ) # default to in-sample if otel sampler is not available + or ( + ( + tracer_provider.sampler.should_sample( + parent_context=None, + trace_id=int(event["body"].trace_id, 16), + name="score", + ).decision + == Decision.RECORD_AND_SAMPLE + if hasattr(event["body"], "trace_id") + else True + ) + if event["body"].trace_id + is not None # do not sample out session / dataset run scores + else True + ) ) + if should_sample: + langfuse_logger.debug( + f"Score: Enqueuing event type={event['type']} for trace_id={event['body'].trace_id} name={event['body'].name} value={event['body'].value}" + ) + self._score_ingestion_queue.put(event, block=False) + except Full: langfuse_logger.warning( "System overload: Score ingestion queue has reached capacity (100,000 items). Score will be dropped. Consider increasing flush frequency or decreasing event volume." @@ -366,20 +354,12 @@ def add_score_task(self, event: dict, *, force_sample: bool = False) -> None: def add_trace_task( self, event: dict, - *, - trace_id: Optional[str], - force_sample: bool = False, ) -> None: try: langfuse_logger.debug( - f"Trace: Enqueuing event type={event['type']} for trace_id={trace_id}" - ) - self._enqueue_ingestion_task( - event=event, - trace_id=trace_id, - sampling_name="trace", - force_sample=force_sample, + f"Trace: Enqueuing event type={event['type']} for trace_id={event['body'].id}" ) + self._score_ingestion_queue.put(event, block=False) except Full: langfuse_logger.warning(