Skip to content

Conversation

@cxzl25
Copy link
Contributor

@cxzl25 cxzl25 commented Jan 7, 2026

Which issue does this PR close?

Closes #<issue_number>

Rationale for this change

What changes are included in this PR?

Are there any user-facing changes?

How was this patch tested?

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This pull request implements optimizations and fixes for broadcast joins with empty partitions and adds support for ExistenceJoin. The changes include special handling for Semi/Anti joins when the probed side has no partitions, and infrastructure to properly execute ExistenceJoin operations when one side is empty.

Key changes:

  • Added empty partition optimization for LeftAnti and LeftSemi broadcast joins
  • Implemented special handling for ExistenceJoin with empty probed side using EmptyPartitionsExecNode
  • Simplified null key handling logic in the native semi-join implementation
  • Added ExistenceJoinSuite test integration

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
NativeBroadcastJoinBase.scala Added empty partition optimizations and ExistenceJoin special case handling for broadcast joins
test.rs Removed test for anti-join with null keys
semi_join.rs Simplified null key validation logic by removing special case for Anti joins
AuronExistenceJoinSuite.scala Added new test suite for ExistenceJoin operations
AuronSparkTestSettings.scala Enabled the new AuronExistenceJoinSuite
Comments suppressed due to low confidence (1)

native-engine/datafusion-ext-plans/src/joins/test.rs:649

  • This test was removed, but it was testing important null key behavior in Anti joins. The test verified that null keys are handled correctly in different join implementations (BHJ left probed, SHJ left probed had different expected output than SMJ/BHJ right probed/SHJ right probed). The removal of this test along with the logic change in semi_join.rs suggests the behavior change may not be fully validated. If the behavior change is intentional, there should be a replacement test that validates the new behavior.
    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
    async fn join_with_duplicated_column_names() -> Result<()> {
        for test_type in ALL_TEST_TYPE {
            let left = build_table(
                ("a", &vec![1, 2, 3]),
                ("b", &vec![4, 5, 7]),
                ("c", &vec![7, 8, 9]),
            );
            let right = build_table(
                ("a", &vec![10, 20, 30]),
                ("b", &vec![1, 2, 7]),
                ("c", &vec![70, 80, 90]),
            );
            let on: JoinOn = vec![(
                // join on a=b so there are duplicate column names on unjoined columns
                Arc::new(Column::new_with_schema("a", &left.schema())?),
                Arc::new(Column::new_with_schema("b", &right.schema())?),
            )];

            let (_, batches) = join_collect(test_type, left, right, on, Inner).await?;
            let expected = vec![
                "+---+---+---+----+---+----+",
                "| a | b | c | a  | b | c  |",
                "+---+---+---+----+---+----+",
                "| 1 | 4 | 7 | 10 | 1 | 70 |",
                "| 2 | 5 | 8 | 20 | 2 | 80 |",
                "+---+---+---+----+---+----+",
            ];
            // The output order is important as SMJ preserves sortedness
            assert_batches_sorted_eq!(expected, &batches);
        }
        Ok(())
    }

    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
    async fn join_date32() -> Result<()> {
        for test_type in ALL_TEST_TYPE {
            let left = build_date_table(
                ("a1", &vec![1, 2, 3]),
                ("b1", &vec![19107, 19108, 19108]), // this has a repetition
                ("c1", &vec![7, 8, 9]),
            );
            let right = build_date_table(
                ("a2", &vec![10, 20, 30]),
                ("b1", &vec![19107, 19108, 19109]),
                ("c2", &vec![70, 80, 90]),
            );

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 15 out of 15 changed files in this pull request and generated 5 comments.

Comments suppressed due to low confidence (1)

native-engine/datafusion-ext-plans/src/joins/test.rs:653

  • A test case has been removed that was verifying join behavior with NULL keys. Removing this test reduces test coverage for an important edge case in join operations. If this test is no longer valid, it should be replaced with an equivalent test rather than simply deleted.
    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
    async fn join_with_duplicated_column_names() -> Result<()> {
        for test_type in ALL_TEST_TYPE {
            let left = build_table(
                ("a", &vec![1, 2, 3]),
                ("b", &vec![4, 5, 7]),
                ("c", &vec![7, 8, 9]),
            );
            let right = build_table(
                ("a", &vec![10, 20, 30]),
                ("b", &vec![1, 2, 7]),
                ("c", &vec![70, 80, 90]),
            );
            let on: JoinOn = vec![(
                // join on a=b so there are duplicate column names on unjoined columns
                Arc::new(Column::new_with_schema("a", &left.schema())?),
                Arc::new(Column::new_with_schema("b", &right.schema())?),
            )];

            let (_, batches) = join_collect(test_type, left, right, on, Inner).await?;
            let expected = vec![
                "+---+---+---+----+---+----+",
                "| a | b | c | a  | b | c  |",
                "+---+---+---+----+---+----+",
                "| 1 | 4 | 7 | 10 | 1 | 70 |",
                "| 2 | 5 | 8 | 20 | 2 | 80 |",
                "+---+---+---+----+---+----+",
            ];
            // The output order is important as SMJ preserves sortedness
            assert_batches_sorted_eq!(expected, &batches);
        }
        Ok(())
    }

    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
    async fn join_date32() -> Result<()> {
        for test_type in ALL_TEST_TYPE {
            let left = build_date_table(
                ("a1", &vec![1, 2, 3]),
                ("b1", &vec![19107, 19108, 19108]), // this has a repetition
                ("c1", &vec![7, 8, 9]),
            );
            let right = build_date_table(
                ("a2", &vec![10, 20, 30]),
                ("b1", &vec![19107, 19108, 19109]),
                ("c2", &vec![70, 80, 90]),
            );

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 3 out of 3 changed files in this pull request and generated no new comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant