fix: preserve score body aliases during ingestion#1581
Conversation
|
@claude review |
| # convert pydantic models to dicts | ||
| if "body" in event and isinstance(event["body"], BaseModel): | ||
| event["body"] = event["body"].model_dump(exclude_none=True) | ||
| if isinstance(event["body"], UniversalBaseModel): | ||
| event["body"] = event["body"].dict(exclude_none=True) |
There was a problem hiding this comment.
🔴 The new code calls event["body"].dict(exclude_none=True) for UniversalBaseModel instances, but UniversalBaseModel.dict() in Pydantic V2 hardcodes exclude_none=False after **kwargs, overriding the caller's exclude_none=True; explicitly-set None fields (e.g. ScoreBody(trace_id=None)) will appear in the serialized API payload as null rather than being omitted. This is a regression from the previous model_dump(exclude_none=True) which correctly stripped all None values, and may cause server-side validation errors for users who explicitly pass None for optional score body fields. The fix should call model_dump(exclude_none=True, by_alias=True) directly instead of relying on UniversalBaseModel.dict().
Extended reasoning...
What the bug is and how it manifests
The PR changes score_ingestion_consumer.py line 77 to call event["body"].dict(exclude_none=True) for UniversalBaseModel instances instead of the old model_dump(exclude_none=True). The intent is correct — using .dict() enables alias-aware serialization (camelCase keys). However, UniversalBaseModel.dict() in Pydantic V2 does not reliably honor the caller's exclude_none=True for fields that were explicitly set to None.
The specific code path that triggers it
In langfuse/api/core/pydantic_utilities.py lines 135–151, UniversalBaseModel.dict() builds two model dump calls for Pydantic V2:
kwargs_with_defaults_exclude_unset = {
**kwargs, # caller's exclude_none=True is placed here...
"by_alias": True,
"exclude_unset": True,
"exclude_none": False, # ...but then unconditionally overridden here
}Because "exclude_none": False is a literal key placed after **kwargs, it always wins. The first model_dump therefore runs with exclude_none=False regardless of what the caller passed.
Why existing code doesn't prevent it
The deep-merge logic (deep_union_pydantic_dicts) then merges source (from the first dump, exclude_unset=True, exclude_none=False) into destination (from the second dump, exclude_none=True). For a field explicitly set to None: the first dump includes it (it was set, and exclude_none=False); the second dump omits it. The merge iterates over source keys and writes them into destination, so traceId: None ends up in the final output even though the caller asked for exclude_none=True.
Step-by-step proof
- User creates:
ScoreBody(name="rating", value=1, trace_id=None) trace_idis now in the model's__fields_set__(explicitly set).event["body"].dict(exclude_none=True)is called.- First
model_dumpruns with{by_alias: True, exclude_unset: True, exclude_none: False}→ returns{"name": "rating", "value": 1, "traceId": null}. - Second
model_dumpruns with{by_alias: True, exclude_none: True, exclude_unset: False}→ returns{"name": "rating", "value": 1}(traceId omitted). deep_union_pydantic_dicts(source, destination)copiestraceId: Nonefrom source into destination.- Final result:
{"name": "rating", "value": 1, "traceId": null}— theNonefield was not excluded. - Old code:
model_dump(exclude_none=True)would have returned{"name": "rating", "value": 1}with notraceIdkey at all.
What the impact would be
When users explicitly set optional score body fields to None (a common pattern for clearing/defaulting fields), the null values are now included in the POST body sent to the Langfuse API. If the server performs strict schema validation and rejects unexpected null fields, these requests will fail — a regression that previously worked fine with model_dump(exclude_none=True). The added tests only use non-None field values, so this regression is not caught by the new test suite.
How to fix it
Replace event["body"].dict(exclude_none=True) with event["body"].model_dump(exclude_none=True, by_alias=True). This achieves both goals: alias-aware camelCase serialization and reliable exclusion of all None fields, bypassing the broken override in UniversalBaseModel.dict().
| ingestion_queue=ingestion_queue, | ||
| identifier=0, | ||
| client=Mock(), | ||
| public_key="pk-test", | ||
| ) | ||
|
|
||
| ingestion_queue.put( | ||
| { | ||
| "id": "event-1", | ||
| "type": "score-create", | ||
| "timestamp": "2026-03-25T16:10:45.793Z", | ||
| "body": ScoreBody( | ||
| id="score-1", |
There was a problem hiding this comment.
🟡 The new test test_score_ingestion_consumer_uses_aliases_for_langfuse_models blocks for ~1 second on each run because ScoreIngestionConsumer is constructed without flush_interval, so it defaults to 1.0s. Pass flush_interval=0.01 (or similar small value) to the constructor to avoid this unnecessary wait.
Extended reasoning...
What the bug is: test_score_ingestion_consumer_uses_aliases_for_langfuse_models constructs ScoreIngestionConsumer without specifying flush_interval, which defaults to 1 (one second, set at line self._flush_interval = flush_interval or 1). This causes every run of this test to block for approximately one second.
The specific code path: In _next(), the while loop condition is len(events) < self._flush_at (default 15). After successfully dequeuing the one item placed in the queue, len(events) becomes 1, which is still less than 15. The loop continues and computes elapsed = time.monotonic() - start_time, which is near zero (a few milliseconds). Since elapsed < flush_interval (1.0), the break condition is not triggered. The loop then calls self._ingestion_queue.get(block=True, timeout=self._flush_interval - elapsed), which is approximately 1.0 - 0.001 ≈ 0.999 seconds. No second item ever arrives, so an Empty exception is raised after ~1 second, finally breaking the loop.
Why existing code doesn't prevent it: The flush_interval guard is designed for production use where waiting a bit before flushing is acceptable. The test just forgets to set a small flush_interval value appropriate for a unit test that only needs to drain a single pre-populated item.
Step-by-step proof:
ScoreIngestionConsumeris instantiated withoutflush_interval→self._flush_interval = 1.0- One item is put into the queue
consumer._next()is called- Loop iteration 1:
elapsed ≈ 0,queue.get(timeout≈1.0)retrieves the item;len(events)= 1 - Loop iteration 2:
1 < 15is True;elapsed ≈ 0.001;queue.get(timeout≈0.999)blocks - After ~0.999s,
Emptyis raised → loop breaks - Test passes but took ~1 second unnecessarily
Impact: Each run of this new test adds approximately 1 second of unnecessary latency to the test suite. At CI scale this compounds across multiple runs.
How to fix: Pass flush_interval=0.01 (or any small value like 0.001) to the constructor in the test:
consumer = ScoreIngestionConsumer(
ingestion_queue=ingestion_queue,
identifier=0,
client=Mock(),
public_key="pk-test",
flush_interval=0.01, # add this
)
Summary
sessionIdeven when SDK callers populate snake_case model fieldsTesting
POETRY_VIRTUALENVS_IN_PROJECT=true poetry run pytest tests/test_serializer.pyPOETRY_VIRTUALENVS_IN_PROJECT=true poetry run ruff check langfuse/_task_manager/score_ingestion_consumer.py langfuse/_utils/serializer.py tests/test_serializer.pyFixes #12812
Disclaimer: Experimental PR review
Greptile Summary
This PR fixes a real serialisation bug where Langfuse ingestion models (e.g.
ScoreBody) were serialised with Python snake_case field names (session_id,trace_id) instead of the camelCase API aliases (sessionId,traceId) expected by the server. The fix dispatches throughUniversalBaseModel.dict()— Fern's Pydantic V1/V2-compatible override that always setsby_alias=True.\n\nKey changes:\n-EventSerializer.default()now callsobj.dict()instead ofobj.model_dump()forUniversalBaseModelinstances, preserving camelCase field names.\n-ScoreIngestionConsumer._next()applies the same dispatch so event bodies are converted to alias-keyed dicts before upload.\n- Two regression tests are added covering both the serialiser and the ingestion consumer.\n\nTwo minor behavioural side-effects to be aware of:\n- Inscore_ingestion_consumer.py, theexclude_none=Truekwarg is silently overridden in Pydantic V2'sUniversalBaseModel.dict()first pass, so explicitly-setNonefields could appear in the payload where the oldmodel_dump(exclude_none=True)would have dropped them.\n- Inserializer.py, the switch frommodel_dump()todict()means unset fields defaulting toNoneare now omitted from serialised output — likely desirable, but an implicit behaviour change.Confidence Score: 5/5
Safe to merge; the fix correctly resolves alias loss during ingestion and all remaining findings are minor edge-case caveats.
All findings are P2: one notes that exclude_none=True is silently no-op'd in Pydantic V2's UniversalBaseModel.dict() for explicitly-set None values (an unlikely pattern in practice), and the other notes an implicit behaviour change in the serialiser for unset-None fields (likely desirable). Neither is a present defect on any real user path. The core fix is correct, well-tested, and the regression tests clearly demonstrate the intended behaviour.
No files require special attention beyond the P2 notes on score_ingestion_consumer.py and serializer.py.
Important Files Changed
Sequence Diagram
sequenceDiagram participant Caller participant ScoreIngestionConsumer participant UniversalBaseModel participant LangfuseClient Caller->>ScoreIngestionConsumer: enqueue event {body: ScoreBody(session_id="x")} Note over ScoreIngestionConsumer: _next() dequeues event ScoreIngestionConsumer->>ScoreIngestionConsumer: isinstance(body, UniversalBaseModel)? alt UniversalBaseModel (new path) ScoreIngestionConsumer->>UniversalBaseModel: body.dict(exclude_none=True) UniversalBaseModel-->>ScoreIngestionConsumer: {sessionId: "x", ...} camelCase aliases else plain BaseModel (old/fallback path) ScoreIngestionConsumer->>ScoreIngestionConsumer: body.model_dump(exclude_none=True) Note over ScoreIngestionConsumer: {session_id: "x", ...} snake_case end ScoreIngestionConsumer->>LangfuseClient: batch_post(batch)Reviews (1): Last reviewed commit: "fix: preserve score body aliases during ..." | Re-trigger Greptile