[python][ray] Add no-shuffle bucket join for co-bucketed tables#8397
Draft
XiaoHongbo-Hope wants to merge 6 commits into
Draft
[python][ray] Add no-shuffle bucket join for co-bucketed tables#8397XiaoHongbo-Hope wants to merge 6 commits into
XiaoHongbo-Hope wants to merge 6 commits into
Conversation
Add pypaimon.ray.bucket_join: a no-shuffle join for two fixed-bucket (HASH_FIXED) tables that share a bucket count and bucket-key, joined on that key. Each bucket is read and joined locally in its own Ray task, so equal keys co-locate by bucket and there is no global shuffle -- the alternative to ray.data.join for a co-bucketed url->row_id locator table vs a per-task input table (data-evolution main tables are bucket-unaware, so bucketing is borrowed via the lightweight locator table). Validates matching bucket count / bucket-key / join-key; rejects otherwise.
…lits Plan each side's manifest a single time in the driver and group splits by bucket, then hand each bucket task only its own splits (splits pickle to Ray cleanly), instead of re-planning the full manifest inside every bucket task. Only buckets present on both sides are dispatched (inner join).
Cover the remaining reject paths besides bucket-count: different bucket-key, and a join key that is not the bucket-key.
…r join Avoid driver OOM at scale: return ray.data.from_arrow_refs(refs) so each bucket's join output stays a distributed object ref, instead of ray.get-ing every bucket into the driver and rebuilding with from_arrow (which would pull hundreds of GB of matched rows onto one node). Also restrict join_type to 'inner': the per-bucket intersection is only correct for inner joins (an outer join needs the union of buckets), so reject others with a clear error instead of silently returning wrong results.
…er worker - No shared bucket now returns an empty Dataset that keeps the join schema (join two empty reads) instead of a schema-less from_items([]). - Per-process table cache reused across a worker's bucket reads, so the catalog is not reloaded per bucket; planning still loads a fresh table. - Document the no-overlapping-column assumption in the docstring; note that planning is driver-side metadata only. - Tests: fan-out (one url -> many row_ids) and empty-result-keeps-schema.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Purpose
Tests