-
Notifications
You must be signed in to change notification settings - Fork 204
Auron 1807 #1844
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Auron 1807 #1844
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This pull request implements optimizations and fixes for broadcast joins with empty partitions and adds support for ExistenceJoin. The changes include special handling for Semi/Anti joins when the probed side has no partitions, and infrastructure to properly execute ExistenceJoin operations when one side is empty.
Key changes:
- Added empty partition optimization for LeftAnti and LeftSemi broadcast joins
- Implemented special handling for ExistenceJoin with empty probed side using EmptyPartitionsExecNode
- Simplified null key handling logic in the native semi-join implementation
- Added ExistenceJoinSuite test integration
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| NativeBroadcastJoinBase.scala | Added empty partition optimizations and ExistenceJoin special case handling for broadcast joins |
| test.rs | Removed test for anti-join with null keys |
| semi_join.rs | Simplified null key validation logic by removing special case for Anti joins |
| AuronExistenceJoinSuite.scala | Added new test suite for ExistenceJoin operations |
| AuronSparkTestSettings.scala | Enabled the new AuronExistenceJoinSuite |
Comments suppressed due to low confidence (1)
native-engine/datafusion-ext-plans/src/joins/test.rs:649
- This test was removed, but it was testing important null key behavior in Anti joins. The test verified that null keys are handled correctly in different join implementations (BHJ left probed, SHJ left probed had different expected output than SMJ/BHJ right probed/SHJ right probed). The removal of this test along with the logic change in semi_join.rs suggests the behavior change may not be fully validated. If the behavior change is intentional, there should be a replacement test that validates the new behavior.
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn join_with_duplicated_column_names() -> Result<()> {
for test_type in ALL_TEST_TYPE {
let left = build_table(
("a", &vec![1, 2, 3]),
("b", &vec![4, 5, 7]),
("c", &vec![7, 8, 9]),
);
let right = build_table(
("a", &vec![10, 20, 30]),
("b", &vec![1, 2, 7]),
("c", &vec![70, 80, 90]),
);
let on: JoinOn = vec![(
// join on a=b so there are duplicate column names on unjoined columns
Arc::new(Column::new_with_schema("a", &left.schema())?),
Arc::new(Column::new_with_schema("b", &right.schema())?),
)];
let (_, batches) = join_collect(test_type, left, right, on, Inner).await?;
let expected = vec![
"+---+---+---+----+---+----+",
"| a | b | c | a | b | c |",
"+---+---+---+----+---+----+",
"| 1 | 4 | 7 | 10 | 1 | 70 |",
"| 2 | 5 | 8 | 20 | 2 | 80 |",
"+---+---+---+----+---+----+",
];
// The output order is important as SMJ preserves sortedness
assert_batches_sorted_eq!(expected, &batches);
}
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn join_date32() -> Result<()> {
for test_type in ALL_TEST_TYPE {
let left = build_date_table(
("a1", &vec![1, 2, 3]),
("b1", &vec![19107, 19108, 19108]), // this has a repetition
("c1", &vec![7, 8, 9]),
);
let right = build_date_table(
("a2", &vec![10, 20, 30]),
("b1", &vec![19107, 19108, 19109]),
("c2", &vec![70, 80, 90]),
);
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
...nsion/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeBroadcastJoinBase.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 15 out of 15 changed files in this pull request and generated 5 comments.
Comments suppressed due to low confidence (1)
native-engine/datafusion-ext-plans/src/joins/test.rs:653
- A test case has been removed that was verifying join behavior with NULL keys. Removing this test reduces test coverage for an important edge case in join operations. If this test is no longer valid, it should be replaced with an equivalent test rather than simply deleted.
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn join_with_duplicated_column_names() -> Result<()> {
for test_type in ALL_TEST_TYPE {
let left = build_table(
("a", &vec![1, 2, 3]),
("b", &vec![4, 5, 7]),
("c", &vec![7, 8, 9]),
);
let right = build_table(
("a", &vec![10, 20, 30]),
("b", &vec![1, 2, 7]),
("c", &vec![70, 80, 90]),
);
let on: JoinOn = vec![(
// join on a=b so there are duplicate column names on unjoined columns
Arc::new(Column::new_with_schema("a", &left.schema())?),
Arc::new(Column::new_with_schema("b", &right.schema())?),
)];
let (_, batches) = join_collect(test_type, left, right, on, Inner).await?;
let expected = vec![
"+---+---+---+----+---+----+",
"| a | b | c | a | b | c |",
"+---+---+---+----+---+----+",
"| 1 | 4 | 7 | 10 | 1 | 70 |",
"| 2 | 5 | 8 | 20 | 2 | 80 |",
"+---+---+---+----+---+----+",
];
// The output order is important as SMJ preserves sortedness
assert_batches_sorted_eq!(expected, &batches);
}
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn join_date32() -> Result<()> {
for test_type in ALL_TEST_TYPE {
let left = build_date_table(
("a1", &vec![1, 2, 3]),
("b1", &vec![19107, 19108, 19108]), // this has a repetition
("c1", &vec![7, 8, 9]),
);
let right = build_date_table(
("a2", &vec![10, 20, 30]),
("b1", &vec![19107, 19108, 19109]),
("c2", &vec![70, 80, 90]),
);
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
...nsion/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeBroadcastJoinBase.scala
Outdated
Show resolved
Hide resolved
...nsion/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeBroadcastJoinBase.scala
Show resolved
Hide resolved
...nsion/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeBroadcastJoinBase.scala
Outdated
Show resolved
Hide resolved
...nsion/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeBroadcastJoinBase.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
...nsion/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeBroadcastJoinBase.scala
Show resolved
Hide resolved
...nsion/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeBroadcastJoinBase.scala
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 3 out of 3 changed files in this pull request and generated no new comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Which issue does this PR close?
Closes #<issue_number>
Rationale for this change
What changes are included in this PR?
Are there any user-facing changes?
How was this patch tested?