Skip to content

[python][ray] Add no-shuffle bucket join for co-bucketed tables#8397

Draft
XiaoHongbo-Hope wants to merge 6 commits into
apache:masterfrom
XiaoHongbo-Hope:support_bucket_join
Draft

[python][ray] Add no-shuffle bucket join for co-bucketed tables#8397
XiaoHongbo-Hope wants to merge 6 commits into
apache:masterfrom
XiaoHongbo-Hope:support_bucket_join

Conversation

@XiaoHongbo-Hope

Copy link
Copy Markdown
Contributor

Purpose

Tests

Add pypaimon.ray.bucket_join: a no-shuffle join for two fixed-bucket (HASH_FIXED)
tables that share a bucket count and bucket-key, joined on that key. Each bucket
is read and joined locally in its own Ray task, so equal keys co-locate by bucket
and there is no global shuffle -- the alternative to ray.data.join for a co-bucketed
url->row_id locator table vs a per-task input table (data-evolution main tables are
bucket-unaware, so bucketing is borrowed via the lightweight locator table).

Validates matching bucket count / bucket-key / join-key; rejects otherwise.
…lits

Plan each side's manifest a single time in the driver and group splits by bucket,
then hand each bucket task only its own splits (splits pickle to Ray cleanly),
instead of re-planning the full manifest inside every bucket task. Only buckets
present on both sides are dispatched (inner join).
Cover the remaining reject paths besides bucket-count: different bucket-key, and
a join key that is not the bucket-key.
…r join

Avoid driver OOM at scale: return ray.data.from_arrow_refs(refs) so each bucket's
join output stays a distributed object ref, instead of ray.get-ing every bucket
into the driver and rebuilding with from_arrow (which would pull hundreds of GB
of matched rows onto one node).

Also restrict join_type to 'inner': the per-bucket intersection is only correct
for inner joins (an outer join needs the union of buckets), so reject others with
a clear error instead of silently returning wrong results.
…er worker

- No shared bucket now returns an empty Dataset that keeps the join schema
  (join two empty reads) instead of a schema-less from_items([]).
- Per-process table cache reused across a worker's bucket reads, so the
  catalog is not reloaded per bucket; planning still loads a fresh table.
- Document the no-overlapping-column assumption in the docstring; note that
  planning is driver-side metadata only.
- Tests: fan-out (one url -> many row_ids) and empty-result-keeps-schema.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant