fix: merge primary-key rows correctly when bucket files span multiple splits#374
Open
TheR1sing3un wants to merge 4 commits into
Open
fix: merge primary-key rows correctly when bucket files span multiple splits#374TheR1sing3un wants to merge 4 commits into
TheR1sing3un wants to merge 4 commits into
Conversation
plan_snapshot bin-packed a bucket's files purely by size, so files whose primary-key ranges overlap could land in different splits. Each split runs its own sort-merge reader, so the same key surfaced once per split instead of merging to a single row (reproducible with three commits of one key under source.split.target-size=1b). Port the Java MergeTreeSplitGenerator/IntervalPartition algorithm: sort files by decoded (min_key, max_key), group transitively overlapping files into sections, then bin-pack whole sections via pack_for_ordered. Keys are decoded with the trimmed-PK data types and compared through datum_cmp — BinaryRow stores fields little-endian, so raw byte comparison would order int 256 before int 1. Undecodable key ranges collapse the bucket into one section, trading parallelism for correctness. Append-only tables keep the file-level bin pack.
read_pk routed a split to the raw (non-merging) reader whenever it had no level-0 files. A split can still hold compacted files on different levels whose key ranges overlap — e.g. produced by Java/Spark compaction — and raw-reading those emits one row per version of a key. Extend the dispatch with split_requires_merge: any level-0 file or any key-range overlap among the split's files sends it through the sort-merge reader; only disjoint compacted files keep the raw fast path. Key ranges are compared with the same decoded-key comparator the split planner uses (KeyComparator::from_table_schema, extracted from plan_snapshot), and undecodable key ranges fall back to merging.
Reproduce the reviewer scenario from apache#340: a 1-byte source.split.target-size forces every data file into its own split candidate, so versions of one key used to surface once per split. Cover both deduplicate (three commits of one key read back as the latest row) and partial-update (per-column updates over three commits merge into one row). Both tests fail without the merge-tree split generator fix.
… tables
Deletion-vector tables resolve stale key versions through DVs, not
read-time merging, and KeyValueFileReader rejects splits with deletion
vectors — routing their key-overlapping compacted files to the
sort-merge reader broke reads of Spark-written DV tables
("KeyValueFileReader does not support deletion vectors").
Mirror Java MergeTreeSplitGenerator's fast path: when deletion vectors
are enabled or the merge engine is first-row, plan_snapshot keeps plain
size-based packing (no key-range sectioning), and read_pk keeps the
plain level-0 dispatch instead of the overlap-aware one.
Caught by test_read_primary_key_table_via_datafusion against the
Spark-provisioned warehouse.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Purpose
Linked issue: close #373
PK tables could return unmerged (duplicate-key) rows in two ways sharing one
root cause — split planning and read dispatch were blind to key-range overlap:
plan_snapshotbin-packed a bucket's files purely by size, so filesholding versions of the same key could land in different splits; each
split runs its own sort-merge reader and emits its own version.
Reproducible with
source.split.target-size=1band three commits of onekey: SELECT returned 3 rows instead of 1.
read_pksent splits without level-0 files to the raw (non-merging)reader, but compacted files on different levels can still overlap on key
range (e.g. produced by Java/Spark compaction) and need merging.
Reported by @JingsongLi while reviewing #340; affects
deduplicateandpartial-updateon main, andaggregationonce #340 lands.Changes
table/merge_tree_split_generator.rs(new) — port of JavaMergeTreeSplitGenerator/IntervalPartition:KeyComparatordecodes serialized BinaryRow min/max keys with thetrimmed-PK data types and compares via
datum_cmp. BinaryRow storesfields little-endian, so raw byte comparison would order int 256 before
int 1 — decoding is mandatory for correctness.
interval_partitionsorts files by decoded(min_key, max_key)andgroups transitively overlapping files into sections; sections never
overlap each other.
pack_sectionsbin-packs whole sections into splits (reusingpack_for_ordered); a section is atomic and never separated.section — losing parallelism, never correctness.
table_scan.rs— PK tables route throughpack_sections(interval_partition(...))on the non-data-evolution path;append-only tables keep the existing file-level
split_for_batch.table_read.rs—read_pkdispatch is now overlap-aware(
split_requires_merge): any level-0 file or key-overlapping compactedfiles → sort-merge reader; only disjoint compacted files keep the raw
fast path.
MergeTreeSplitGenerator: DV tables resolve stale versions through DVs(and
KeyValueFileReaderrejects DV splits), and first-row tables skiplevel-0 at plan time, so both keep plain size-based packing and the plain
level-0 read dispatch.
Tests
grouping, running-bound chains, atomic packing, overlap detection,
undecodable-key degradation.
overlapping files share one split and read back merged; disjoint files keep
split parallelism; append tables keep file-level bin pack.
deduplicateand
partial-update. All new tests fail without the fix.(
make docker-up+cargo test -p paimon-datafusion --all-targets),which caught and now guards the deletion-vector interaction.
Out of scope
first-rowreads never go throughread_pk(pre-existing routing); thesame overlap consideration for its raw path can be tracked separately.
rawConvertibleflag plumbing throughDataSplit; this PR derivesthe same decision read-side from file metadata instead.