Commit 20ffbf7
[SPARK-37377][SQL] Initial implementation of Storage-Partitioned Join
### What changes were proposed in this pull request?
This PR introduces the initial implementation of Storage-Partitioned Join ([SPIP](https://docs.google.com/document/d/1foTkDSM91VxKgkEcBMsuAvEjNybjja-uHk-r3vtXWFE)).
Changes:
- `org.apache.spark.sql.connector.read.partitioning.Partitioning` currently is very limited (as mentioned in the SPIP), and cannot be extended to handle join cases. This PR completely replace it following the catalyst `Partitioning` interface, and added two concrete sub-classes: `KeyGroupedPartitioning` and `UnknownPartitioning`. This allows a V2 data source to report to Spark it's partition transform expressions, via `SupportsReportPartitioning` interface.
- with the above change, `org.apache.spark.sql.connector.read.partitioning.Distribution` and `org.apache.spark.sql.connector.read.partitioning.ClusteredDistribution` now are replaced by classes with the same name in `org.apache.spark.sql.connector.distributions` package. Therefore, this PR marks the former two as deprecated.
- `DataSourcePartitioning` used to be in `org.apache.spark.sql.execution.datasources.v2`. This moves it into package `org.apache.spark.sql.catalyst.plans.physical` and renames it to `KeyGroupedPartitioning`, so that it can be extended for more non-V2 use cases, such as Hive bucketing. In addition, it is also changed to accommodate the Storage-Partitioned Join feature.
- a new expression type: `TransformExpression`, is introduced to bind syntactic partition transforms with their semantic meaning, represented by a V2 function. This expression is un-evaluable for now, and is used later in `EnsureRequirements` to check whether join children are compatible with each other.
- a new optimizer rule: `V2ScanPartitioning`, is added to recognize `Scan`s implement `SupportsReportPartitioning`. If they do, this rule converts V2 partition transform expressions into their counterparts in catalyst, and annotate `DataSourceV2ScanRelation` with the result. These are later propagated into `DataSourceV2ScanExecBase`.
- changes are made in `DataSourceV2ScanExecBase` to create `KeyGroupedPartitioning` for scan if 1) the scan is annotated with catalyst partition transform expressions, and 2) if all input splits implement `HasPartitionKey`.
- A new config: `spark.sql.sources.v2.bucketing.enabled` is introduced to turn on or off the behavior. By default it is false.
### Why are the changes needed?
Spark currently support bucketing in DataSource V1, but not in V2. This is the first step to support bucket join, and is general form, storage-partitioned join, for V2 data sources. In addition, the work here can potentially used to support Hive bucketing as well. Please check the SPIP for details.
### Does this PR introduce _any_ user-facing change?
With the changes, a user can now:
- have V2 data sources to report distribution and ordering to Spark on read path
- Spark will recognize the distribution property and eliminate shuffle in join/aggregate/window, etc, when the source distribution matches the required distribution from these.
- a new config `spark.sql.sources.v2.bucketing.enabled` is introduced to turn on/off the above behavior
### How was this patch tested?
- Added a new test suite `KeyGroupedPartitioningSuite` covers end-to-end tests on the new feature
- Extended `EnsureRequirementsSuite` to cover `DataSourcePartitioning`
- Some existing test classes, such as `InMemoryTable` are extended to cover the changes
Closes apache#35657 from sunchao/SPARK-37377-partitioning.
Lead-authored-by: Chao Sun <sunchao@apple.com>
Co-authored-by: Chao Sun <sunchao@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>1 parent 608f70d commit 20ffbf7
45 files changed
Lines changed: 1839 additions & 338 deletions
File tree
- connector
- avro/src/test/scala/org/apache/spark/sql/avro
- docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2
- core/src/main/scala/org/apache/spark/util/collection
- project
- sql
- catalyst/src
- main
- java/org/apache/spark/sql/connector/read/partitioning
- scala-2.12/org/apache/spark/sql/catalyst/util
- scala-2.13/org/apache/spark/sql/catalyst/util
- scala/org/apache/spark/sql
- catalyst
- expressions
- plans/physical
- connector/catalog
- execution/datasources/v2
- internal
- test/scala/org/apache/spark/sql/connector/catalog
- core/src
- main/scala/org/apache/spark/sql
- execution
- datasources/v2
- dynamicpruning
- exchange
- test
- java/test/org/apache/spark/sql/connector
- scala/org/apache/spark/sql
- connector
- catalog/functions
- execution
- datasources
- orc
- parquet
- exchange
Some content is hidden
Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.
Lines changed: 1 addition & 1 deletion
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
59 | 59 | | |
60 | 60 | | |
61 | 61 | | |
62 | | - | |
| 62 | + | |
63 | 63 | | |
64 | 64 | | |
65 | 65 | | |
| |||
Lines changed: 3 additions & 3 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
2335 | 2335 | | |
2336 | 2336 | | |
2337 | 2337 | | |
2338 | | - | |
| 2338 | + | |
2339 | 2339 | | |
2340 | 2340 | | |
2341 | 2341 | | |
| |||
2368 | 2368 | | |
2369 | 2369 | | |
2370 | 2370 | | |
2371 | | - | |
| 2371 | + | |
2372 | 2372 | | |
2373 | 2373 | | |
2374 | 2374 | | |
| |||
2449 | 2449 | | |
2450 | 2450 | | |
2451 | 2451 | | |
2452 | | - | |
| 2452 | + | |
2453 | 2453 | | |
2454 | 2454 | | |
2455 | 2455 | | |
| |||
Lines changed: 1 addition & 1 deletion
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
372 | 372 | | |
373 | 373 | | |
374 | 374 | | |
375 | | - | |
| 375 | + | |
376 | 376 | | |
377 | 377 | | |
378 | 378 | | |
| |||
Lines changed: 9 additions & 0 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
36 | 36 | | |
37 | 37 | | |
38 | 38 | | |
| 39 | + | |
| 40 | + | |
| 41 | + | |
| 42 | + | |
| 43 | + | |
| 44 | + | |
| 45 | + | |
| 46 | + | |
| 47 | + | |
39 | 48 | | |
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
64 | 64 | | |
65 | 65 | | |
66 | 66 | | |
67 | | - | |
| 67 | + | |
| 68 | + | |
| 69 | + | |
| 70 | + | |
| 71 | + | |
| 72 | + | |
| 73 | + | |
68 | 74 | | |
69 | 75 | | |
70 | 76 | | |
| |||
Lines changed: 0 additions & 44 deletions
This file was deleted.
Lines changed: 55 additions & 0 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
| 1 | + | |
| 2 | + | |
| 3 | + | |
| 4 | + | |
| 5 | + | |
| 6 | + | |
| 7 | + | |
| 8 | + | |
| 9 | + | |
| 10 | + | |
| 11 | + | |
| 12 | + | |
| 13 | + | |
| 14 | + | |
| 15 | + | |
| 16 | + | |
| 17 | + | |
| 18 | + | |
| 19 | + | |
| 20 | + | |
| 21 | + | |
| 22 | + | |
| 23 | + | |
| 24 | + | |
| 25 | + | |
| 26 | + | |
| 27 | + | |
| 28 | + | |
| 29 | + | |
| 30 | + | |
| 31 | + | |
| 32 | + | |
| 33 | + | |
| 34 | + | |
| 35 | + | |
| 36 | + | |
| 37 | + | |
| 38 | + | |
| 39 | + | |
| 40 | + | |
| 41 | + | |
| 42 | + | |
| 43 | + | |
| 44 | + | |
| 45 | + | |
| 46 | + | |
| 47 | + | |
| 48 | + | |
| 49 | + | |
| 50 | + | |
| 51 | + | |
| 52 | + | |
| 53 | + | |
| 54 | + | |
| 55 | + | |
Lines changed: 9 additions & 17 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
18 | 18 | | |
19 | 19 | | |
20 | 20 | | |
21 | | - | |
22 | 21 | | |
23 | 22 | | |
24 | 23 | | |
25 | 24 | | |
26 | | - | |
27 | | - | |
28 | | - | |
| 25 | + | |
| 26 | + | |
| 27 | + | |
| 28 | + | |
| 29 | + | |
| 30 | + | |
| 31 | + | |
| 32 | + | |
29 | 33 | | |
30 | 34 | | |
31 | 35 | | |
32 | 36 | | |
33 | 37 | | |
34 | | - | |
35 | 38 | | |
36 | | - | |
| 39 | + | |
37 | 40 | | |
38 | 41 | | |
39 | | - | |
40 | | - | |
41 | | - | |
42 | | - | |
43 | | - | |
44 | | - | |
45 | | - | |
46 | | - | |
47 | | - | |
48 | | - | |
49 | | - | |
50 | 42 | | |
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
18 | 18 | | |
19 | 19 | | |
20 | 20 | | |
21 | | - | |
22 | 21 | | |
23 | 22 | | |
24 | | - | |
25 | | - | |
26 | | - | |
| 23 | + | |
27 | 24 | | |
28 | | - | |
| 25 | + | |
29 | 26 | | |
30 | 27 | | |
31 | | - | |
| 28 | + | |
| 29 | + | |
32 | 30 | | |
33 | | - | |
34 | | - | |
35 | | - | |
36 | | - | |
| 31 | + | |
| 32 | + | |
| 33 | + | |
37 | 34 | | |
38 | | - | |
39 | | - | |
| 35 | + | |
| 36 | + | |
| 37 | + | |
40 | 38 | | |
41 | 39 | | |
Lines changed: 65 additions & 0 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
| 1 | + | |
| 2 | + | |
| 3 | + | |
| 4 | + | |
| 5 | + | |
| 6 | + | |
| 7 | + | |
| 8 | + | |
| 9 | + | |
| 10 | + | |
| 11 | + | |
| 12 | + | |
| 13 | + | |
| 14 | + | |
| 15 | + | |
| 16 | + | |
| 17 | + | |
| 18 | + | |
| 19 | + | |
| 20 | + | |
| 21 | + | |
| 22 | + | |
| 23 | + | |
| 24 | + | |
| 25 | + | |
| 26 | + | |
| 27 | + | |
| 28 | + | |
| 29 | + | |
| 30 | + | |
| 31 | + | |
| 32 | + | |
| 33 | + | |
| 34 | + | |
| 35 | + | |
| 36 | + | |
| 37 | + | |
| 38 | + | |
| 39 | + | |
| 40 | + | |
| 41 | + | |
| 42 | + | |
| 43 | + | |
| 44 | + | |
| 45 | + | |
| 46 | + | |
| 47 | + | |
| 48 | + | |
| 49 | + | |
| 50 | + | |
| 51 | + | |
| 52 | + | |
| 53 | + | |
| 54 | + | |
| 55 | + | |
| 56 | + | |
| 57 | + | |
| 58 | + | |
| 59 | + | |
| 60 | + | |
| 61 | + | |
| 62 | + | |
| 63 | + | |
| 64 | + | |
| 65 | + | |
0 commit comments