Skip to content

fix: preserve score body aliases during ingestion#1581

Closed
hassiebp wants to merge 1 commit intomainfrom
codex/lfe-9008-session-score-sessionid
Closed

fix: preserve score body aliases during ingestion#1581
hassiebp wants to merge 1 commit intomainfrom
codex/lfe-9008-session-score-sessionid

Conversation

@hassiebp
Copy link
Copy Markdown
Contributor

@hassiebp hassiebp commented Mar 27, 2026

Summary

  • serialize Langfuse ingestion models with alias-aware dict output in the score ingestion path
  • preserve camelCase API field names such as sessionId even when SDK callers populate snake_case model fields
  • add regression coverage for both the shared event serializer and the score ingestion consumer

Testing

  • POETRY_VIRTUALENVS_IN_PROJECT=true poetry run pytest tests/test_serializer.py
  • POETRY_VIRTUALENVS_IN_PROJECT=true poetry run ruff check langfuse/_task_manager/score_ingestion_consumer.py langfuse/_utils/serializer.py tests/test_serializer.py

Fixes #12812

Disclaimer: Experimental PR review

Greptile Summary

This PR fixes a real serialisation bug where Langfuse ingestion models (e.g. ScoreBody) were serialised with Python snake_case field names (session_id, trace_id) instead of the camelCase API aliases (sessionId, traceId) expected by the server. The fix dispatches through UniversalBaseModel.dict() — Fern's Pydantic V1/V2-compatible override that always sets by_alias=True.\n\nKey changes:\n- EventSerializer.default() now calls obj.dict() instead of obj.model_dump() for UniversalBaseModel instances, preserving camelCase field names.\n- ScoreIngestionConsumer._next() applies the same dispatch so event bodies are converted to alias-keyed dicts before upload.\n- Two regression tests are added covering both the serialiser and the ingestion consumer.\n\nTwo minor behavioural side-effects to be aware of:\n- In score_ingestion_consumer.py, the exclude_none=True kwarg is silently overridden in Pydantic V2's UniversalBaseModel.dict() first pass, so explicitly-set None fields could appear in the payload where the old model_dump(exclude_none=True) would have dropped them.\n- In serializer.py, the switch from model_dump() to dict() means unset fields defaulting to None are now omitted from serialised output — likely desirable, but an implicit behaviour change.

Confidence Score: 5/5

Safe to merge; the fix correctly resolves alias loss during ingestion and all remaining findings are minor edge-case caveats.

All findings are P2: one notes that exclude_none=True is silently no-op'd in Pydantic V2's UniversalBaseModel.dict() for explicitly-set None values (an unlikely pattern in practice), and the other notes an implicit behaviour change in the serialiser for unset-None fields (likely desirable). Neither is a present defect on any real user path. The core fix is correct, well-tested, and the regression tests clearly demonstrate the intended behaviour.

No files require special attention beyond the P2 notes on score_ingestion_consumer.py and serializer.py.

Important Files Changed

Filename Overview
langfuse/_task_manager/score_ingestion_consumer.py Adds alias-aware dict serialisation for UniversalBaseModel bodies; the exclude_none=True kwarg passed to dict() is silently overridden by the Fern-generated V2 implementation, so explicitly-set None fields may not be excluded as intended.
langfuse/_utils/serializer.py EventSerializer now calls obj.dict() for UniversalBaseModel instances to preserve camelCase aliases; introduces a subtle behaviour change where unset-and-None fields are no longer included in the JSON output (previously model_dump() included them).
tests/test_serializer.py Adds two regression tests covering alias preservation for EventSerializer and ScoreIngestionConsumer; happy-path only — does not exercise explicitly-set None fields where the new dict() behaviour differs from model_dump(exclude_none=True).

Sequence Diagram

sequenceDiagram
    participant Caller
    participant ScoreIngestionConsumer
    participant UniversalBaseModel
    participant LangfuseClient

    Caller->>ScoreIngestionConsumer: enqueue event {body: ScoreBody(session_id="x")}
    Note over ScoreIngestionConsumer: _next() dequeues event
    ScoreIngestionConsumer->>ScoreIngestionConsumer: isinstance(body, UniversalBaseModel)?
    alt UniversalBaseModel (new path)
        ScoreIngestionConsumer->>UniversalBaseModel: body.dict(exclude_none=True)
        UniversalBaseModel-->>ScoreIngestionConsumer: {sessionId: "x", ...} camelCase aliases
    else plain BaseModel (old/fallback path)
        ScoreIngestionConsumer->>ScoreIngestionConsumer: body.model_dump(exclude_none=True)
        Note over ScoreIngestionConsumer: {session_id: "x", ...} snake_case
    end
    ScoreIngestionConsumer->>LangfuseClient: batch_post(batch)
Loading

Reviews (1): Last reviewed commit: "fix: preserve score body aliases during ..." | Re-trigger Greptile

@github-actions
Copy link
Copy Markdown

@claude review

Comment on lines 75 to +78
# convert pydantic models to dicts
if "body" in event and isinstance(event["body"], BaseModel):
event["body"] = event["body"].model_dump(exclude_none=True)
if isinstance(event["body"], UniversalBaseModel):
event["body"] = event["body"].dict(exclude_none=True)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 The new code calls event["body"].dict(exclude_none=True) for UniversalBaseModel instances, but UniversalBaseModel.dict() in Pydantic V2 hardcodes exclude_none=False after **kwargs, overriding the caller's exclude_none=True; explicitly-set None fields (e.g. ScoreBody(trace_id=None)) will appear in the serialized API payload as null rather than being omitted. This is a regression from the previous model_dump(exclude_none=True) which correctly stripped all None values, and may cause server-side validation errors for users who explicitly pass None for optional score body fields. The fix should call model_dump(exclude_none=True, by_alias=True) directly instead of relying on UniversalBaseModel.dict().

Extended reasoning...

What the bug is and how it manifests

The PR changes score_ingestion_consumer.py line 77 to call event["body"].dict(exclude_none=True) for UniversalBaseModel instances instead of the old model_dump(exclude_none=True). The intent is correct — using .dict() enables alias-aware serialization (camelCase keys). However, UniversalBaseModel.dict() in Pydantic V2 does not reliably honor the caller's exclude_none=True for fields that were explicitly set to None.

The specific code path that triggers it

In langfuse/api/core/pydantic_utilities.py lines 135–151, UniversalBaseModel.dict() builds two model dump calls for Pydantic V2:

kwargs_with_defaults_exclude_unset = {
    **kwargs,            # caller's exclude_none=True is placed here...
    "by_alias": True,
    "exclude_unset": True,
    "exclude_none": False,  # ...but then unconditionally overridden here
}

Because "exclude_none": False is a literal key placed after **kwargs, it always wins. The first model_dump therefore runs with exclude_none=False regardless of what the caller passed.

Why existing code doesn't prevent it

The deep-merge logic (deep_union_pydantic_dicts) then merges source (from the first dump, exclude_unset=True, exclude_none=False) into destination (from the second dump, exclude_none=True). For a field explicitly set to None: the first dump includes it (it was set, and exclude_none=False); the second dump omits it. The merge iterates over source keys and writes them into destination, so traceId: None ends up in the final output even though the caller asked for exclude_none=True.

Step-by-step proof

  1. User creates: ScoreBody(name="rating", value=1, trace_id=None)
  2. trace_id is now in the model's __fields_set__ (explicitly set).
  3. event["body"].dict(exclude_none=True) is called.
  4. First model_dump runs with {by_alias: True, exclude_unset: True, exclude_none: False} → returns {"name": "rating", "value": 1, "traceId": null}.
  5. Second model_dump runs with {by_alias: True, exclude_none: True, exclude_unset: False} → returns {"name": "rating", "value": 1} (traceId omitted).
  6. deep_union_pydantic_dicts(source, destination) copies traceId: None from source into destination.
  7. Final result: {"name": "rating", "value": 1, "traceId": null} — the None field was not excluded.
  8. Old code: model_dump(exclude_none=True) would have returned {"name": "rating", "value": 1} with no traceId key at all.

What the impact would be

When users explicitly set optional score body fields to None (a common pattern for clearing/defaulting fields), the null values are now included in the POST body sent to the Langfuse API. If the server performs strict schema validation and rejects unexpected null fields, these requests will fail — a regression that previously worked fine with model_dump(exclude_none=True). The added tests only use non-None field values, so this regression is not caught by the new test suite.

How to fix it

Replace event["body"].dict(exclude_none=True) with event["body"].model_dump(exclude_none=True, by_alias=True). This achieves both goals: alias-aware camelCase serialization and reliable exclusion of all None fields, bypassing the broken override in UniversalBaseModel.dict().

Comment on lines +104 to +116
ingestion_queue=ingestion_queue,
identifier=0,
client=Mock(),
public_key="pk-test",
)

ingestion_queue.put(
{
"id": "event-1",
"type": "score-create",
"timestamp": "2026-03-25T16:10:45.793Z",
"body": ScoreBody(
id="score-1",
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 The new test test_score_ingestion_consumer_uses_aliases_for_langfuse_models blocks for ~1 second on each run because ScoreIngestionConsumer is constructed without flush_interval, so it defaults to 1.0s. Pass flush_interval=0.01 (or similar small value) to the constructor to avoid this unnecessary wait.

Extended reasoning...

What the bug is: test_score_ingestion_consumer_uses_aliases_for_langfuse_models constructs ScoreIngestionConsumer without specifying flush_interval, which defaults to 1 (one second, set at line self._flush_interval = flush_interval or 1). This causes every run of this test to block for approximately one second.

The specific code path: In _next(), the while loop condition is len(events) < self._flush_at (default 15). After successfully dequeuing the one item placed in the queue, len(events) becomes 1, which is still less than 15. The loop continues and computes elapsed = time.monotonic() - start_time, which is near zero (a few milliseconds). Since elapsed < flush_interval (1.0), the break condition is not triggered. The loop then calls self._ingestion_queue.get(block=True, timeout=self._flush_interval - elapsed), which is approximately 1.0 - 0.001 ≈ 0.999 seconds. No second item ever arrives, so an Empty exception is raised after ~1 second, finally breaking the loop.

Why existing code doesn't prevent it: The flush_interval guard is designed for production use where waiting a bit before flushing is acceptable. The test just forgets to set a small flush_interval value appropriate for a unit test that only needs to drain a single pre-populated item.

Step-by-step proof:

  1. ScoreIngestionConsumer is instantiated without flush_intervalself._flush_interval = 1.0
  2. One item is put into the queue
  3. consumer._next() is called
  4. Loop iteration 1: elapsed ≈ 0, queue.get(timeout≈1.0) retrieves the item; len(events) = 1
  5. Loop iteration 2: 1 < 15 is True; elapsed ≈ 0.001; queue.get(timeout≈0.999) blocks
  6. After ~0.999s, Empty is raised → loop breaks
  7. Test passes but took ~1 second unnecessarily

Impact: Each run of this new test adds approximately 1 second of unnecessary latency to the test suite. At CI scale this compounds across multiple runs.

How to fix: Pass flush_interval=0.01 (or any small value like 0.001) to the constructor in the test:

consumer = ScoreIngestionConsumer(
    ingestion_queue=ingestion_queue,
    identifier=0,
    client=Mock(),
    public_key="pk-test",
    flush_interval=0.01,  # add this
)

@hassiebp hassiebp closed this Mar 27, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant