Skip to content

Refactor DecorrelateWhereExists and add back Distinct if needs #5345

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Mar 11, 2023

Conversation

ygf11
Copy link
Contributor

@ygf11 ygf11 commented Feb 20, 2023

Which issue does this PR close?

Closes #5344.

Rationale for this change

In #5264, we support rewriting more where-exists to join, but do not add back the Distinct.

For query:

SELECT t1.t1_id
FROM   t1
WHERE  EXISTS(SELECT DISTINCT t2_int
              FROM   t2
              WHERE  t2.t2_id > t1.t1_id); 

# current logical plan:
Projection: t1.t1_id                                                                                                                
  LeftSemi Join:  Filter: t2.t2_id > t1.t1_id                                                                                                               
    TableScan: t1 projection=[t1_id]                                                                                                       
    Projection: t2.t2_id # DISTINCT is not added back                                                                                                                                    
      TableScan: t2 projection=[t2_id]

Although the execution result is correct, we should add back the DISTINCT.
In some cases, DISTINCT can filter a lot rows before join, we should let user choose.

What changes are included in this PR?

Refactor optimize_exists and add optimize_subquery.

Are these changes tested?

Are there any user-facing changes?

@github-actions github-actions bot added core Core DataFusion crate optimizer Optimizer rules labels Feb 20, 2023
@mingmwang
Copy link
Contributor

I would like to add a rule to replace the Distinct to Aggregate at early stage of logical optimization phase, I think this will simply the following logical rules, sometime they might handle the Aggregate but forget to handle Distinct.

@jackwener
Copy link
Member

I would like to add a rule to replace the Distinct to Aggregate at early stage of logical optimization phase, I think this will simply the following logical rules, sometime they might handle the Aggregate but forget to handle Distinct.

It's a good idea.

@mingmwang
Copy link
Contributor

#5348

@github-actions github-actions bot added the logical-expr Logical plan and expressions label Feb 22, 2023
@ygf11 ygf11 marked this pull request as ready for review February 22, 2023 08:29
@ygf11 ygf11 marked this pull request as draft February 27, 2023 10:39
@mingmwang
Copy link
Contributor

@ygf11
Just ping me if this PR is ready for review.

@github-actions github-actions bot removed the logical-expr Logical plan and expressions label Mar 3, 2023
\n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]";

assert_plan_eq(&plan, expected)
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find the behaviors of Distinct in postgres and spark are not same.

  • For postgres, it will not add back the distinct to the optimized result.
  • For spark, it will add back the distinct, and will keep the unused project exprs(sq.b + sq.c in the above).

I think sq.b + sq.c is not used any more, so remove it.

I am not sure if the way of this pr is appropriate.
I want to hear your advice @mingmwang @alamb @jackwener

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For postgres, it will not add back the distinct to the optimized result.

I think it likely depends on how the join operators are implemented

# current logical plan:
Projection: t1.t1_id                                                                                                                
  LeftSemi Join:  Filter: t2.t2_id > t1.t1_id                                                                                                               
    TableScan: t1 projection=[t1_id]                                                                                                       
    Projection: t2.t2_id # DISTINCT is not added back                                                                                                                                    
      TableScan: t2 projection=[t2_id]

If the LeftSemiJoin is going to read its build side into (effectively) a HashSet then it doesn't really matter if the input is deduplicated prior to input.

However, since there is no equality predicate (the predicate is >) , i am not sure what our LeftSemiJoin will do (maybe it will buffer the entire build side?) in which case it might help performance to put a DISTINCT on he output of the Projection to reduce the cardinality that the join buffers.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think sq.b + sq.c is not used any more, so remove it.

This is a good job.

For postgres, it will not add back the distinct to the optimized result.

I'm also not sure about it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find the behaviors of Distinct in postgres and spark are not same.

  • For postgres, it will not add back the distinct to the optimized result.
  • For spark, it will add back the distinct, and will keep the unused project exprs(sq.b + sq.c in the above).

@ygf11 @jackwener @alamb
I guess why Postgres remove the Distinct in the final result is maybe it is related to how it implements NAAJ(Null Aware Anti Join) and to make sure the result is correct.

You can test try those SQLs in Postgres:

CREATE TABLE t1 (id integer,name varchar(100));
CREATE TABLE t2 (id integer,name varchar(100));

explain  
SELECT t1.id, t1.name FROM t1 WHERE EXISTS (SELECT distinct * FROM t2 WHERE t2.id = t1.id);
QUERY PLAN
Hash Join (cost=18.50..34.32 rows=160 width=222)
Hash Cond: (t1.id = t2.id)
-> Seq Scan on t1 (cost=0.00..13.20 rows=320 width=222)
-> Hash (cost=16.00..16.00 rows=200 width=4)
-> HashAggregate (cost=14.00..16.00 rows=200 width=4)
Group Key: t2.id
-> Seq Scan on t2 (cost=0.00..13.20 rows=320 width=4)

explain  
SELECT t1.id, t1.name FROM t1 WHERE NOT EXISTS (SELECT distinct t2.name FROM t2 WHERE t2.id = t1.id);
QUERY PLAN
--
Hash Anti Join (cost=17.20..35.82 rows=160 width=222)
Hash Cond: (t1.id = t2.id)
-> Seq Scan on t1 (cost=0.00..13.20 rows=320 width=222)
-> Hash (cost=13.20..13.20 rows=320 width=4)
-> Seq Scan on t2 (cost=0.00..13.20 rows=320 width=4)

explain  
SELECT t1.id, t1.name FROM t1 WHERE t1.id in (SELECT distinct t2.id FROM t2);
QUERY PLAN
--
Hash Join (cost=20.50..34.56 rows=320 width=222)
Hash Cond: (t1.id = t2.id)
-> Seq Scan on t1 (cost=0.00..13.20 rows=320 width=222)
-> Hash (cost=18.00..18.00 rows=200 width=4)
-> HashAggregate (cost=14.00..16.00 rows=200 width=4)
Group Key: t2.id
-> Seq Scan on t2 (cost=0.00..13.20 rows=320 width=4)

explain  
SELECT t1.id, t1.name FROM t1 WHERE t1.id not in (SELECT distinct t2.id FROM t2);
QUERY PLAN
--
Seq Scan on t1 (cost=16.50..30.50 rows=160 width=222)
Filter: (NOT (hashed SubPlan 1))
SubPlan 1
-> HashAggregate (cost=14.00..16.00 rows=200 width=4)
Group Key: t2.id
-> Seq Scan on t2 (cost=0.00..13.20 rows=320 width=4

You can see that for IN/EXISTS subqueries, there are Aggregates. But for NOT EXISTS, the Aggregateis removed, and forNOT IN`, the subquery is kept and not decorrelated.

I think in DataFusion, I like the implementation in this PR and the generated plan is more consistent. For the correctness of NAAJ, we have another ticket to track this and we can make the Hash Join itself Null aware.

@ygf11 ygf11 marked this pull request as ready for review March 3, 2023 08:16
@ygf11
Copy link
Contributor Author

ygf11 commented Mar 3, 2023

This pr is ready for review.

)
});
Ok(optimized_plan)
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure the behavior of Distinct is correct, so do not handle Aggregate here.

Copy link
Contributor

@mingmwang mingmwang Mar 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure the behavior of Distinct is correct, so do not handle Aggregate here.

I will take a closer look at this PR tomorrow. If you do not know how to handle Aggregate here, you can just leave it here and only handle the Distinct case.

A simplest approach is checking whether there are out reference used by Aggregate expressions, if there are, return Err and else add the correlated columns to be part of the group by columns. (Only allow out reference columns referred in Filter expressions or Join expressions, this will limit the supported cases of Subqueries, but more safe.)
For example if the original inner aggregate is group by inner_a and there is correlation condition like outer_b = inner_b, them add the inner_b to be part of the group by conditions. It is not a perfect solution and sometimes might cause some bug.

SparkSQL has the similar logic in the rule pullOutCorrelatedPredicates

      case a @ Aggregate(grouping, expressions, child) =>
        val referencesToAdd = missingReferences(a)
        if (referencesToAdd.nonEmpty) {
          Aggregate(grouping ++ referencesToAdd, expressions ++ referencesToAdd, child)
        } else {
          a
        }

The latest SparkSQL's implementation also has bug here (need to differ the original Aggregate is Scalar Aggregate or Vector Aggregate)

SQL to reproduce the bug, you can have a try on both PostgreSQL and SparkSQL

CREATE TABLE t1 (id INT,name String);
CREATE TABLE t2 (id INT,name String);
insert into t1 values (11, "a"), (11, "a"), (22, "b"), (33, "c"), (44, "d"), (null, "e");
insert into t2 values (111, "z"), (111, "z"), (222, "y"), (444, "x"), (555, "w"), (null, "v");
-- Should output  all t1
SELECT t1.id, t1.name FROM t1 WHERE EXISTS (select 0);

-- Should output all t1
SELECT t1.id, t1.name FROM t1 WHERE EXISTS (SELECT count(*) FROM t2 WHERE t2.id = t1.id);

-- Should output all t1
SELECT t1.id, t1.name FROM t1 WHERE EXISTS (SELECT count(*) FROM t2 WHERE t2.id = t1.id having count(*) = 0);

PostgreSQL does not support decorrelate Subqueries which include Aggregates. SparkSQL supports some cases but not all the cases and the implementation has bug.

I'm going to implement a general subquery decorrelation rule based on the two well known papers.
#5492

Orthogonal Optimization of Subqueries and Aggregation
https://dl.acm.org/doi/10.1145/375663.375748

Unnesting Arbitrary Queries
https://cs.emis.de/LNI/Proceedings/Proceedings241/383.pdf

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to implement a general subquery decorrelation rule based on the two well known papers.

Great job !

may related #5368

@mingmwang
Copy link
Contributor

@ygf11
I think the handling of Distinct in this PR is correct.
Actually the generated plan is better than Postgres, SparkSQL and even Hyper.

@mingmwang
Copy link
Contributor

LGTM

Comment on lines +154 to +156
\n Aggregate: groupBy=[[t2.col_int32]], aggr=[[]]\
\n SubqueryAlias: t2\
\n TableScan: test projection=[col_int32]";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure whether this change must be good.

Because Agg and LeftSemi both can dedup, there will be a new operator, it may cause performance loss.

how do you think? @mingmwang

Copy link
Contributor

@mingmwang mingmwang Mar 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of the time adding Distinct/Aggregate is better, it can reduce the data volumes before the Join(and reduce the shuffle data volumes before the Join in a distribute engine).
Regarding the positions of Join/Aggregation, this is a another topic, in some engines they leverage CBO rules to decide push down/pull up Aggregation through the Join.
Snowflake implements a feature called Adaptive Aggregate placement, the basic idea is if the Aggregate added by the optimizer can not dedup, just skip the Aggregate.

@jackwener
Copy link
Member

jackwener commented Mar 9, 2023

I review this PR carefully. It's a great job to me except I'm not sure about a trade-off(maybe) -- add distinct.
Thanks @ygf11 ! ❤️

@jackwener
Copy link
Member

Thanks @alamb @mingmwang review !
Can you to resolve conflict ? @ygf11
I prepare to merge this PR, thank you! @ygf11

@ygf11
Copy link
Contributor Author

ygf11 commented Mar 11, 2023

Thanks for your ideas, learned a lot! @mingmwang @alamb @jackwener

And I fixed the merge conflict.

@jackwener jackwener merged commit 9587339 into apache:main Mar 11, 2023
@ursabot
Copy link

ursabot commented Mar 11, 2023

Benchmark runs are scheduled for baseline = 860918d and contender = 9587339. 9587339 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ec2-t3-xlarge-us-east-2] ec2-t3-xlarge-us-east-2
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on test-mac-arm] test-mac-arm
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-i9-9960x] ursa-i9-9960x
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-thinkcentre-m75q] ursa-thinkcentre-m75q
Buildkite builds:
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

@ygf11 ygf11 deleted the where-exists-distinct branch March 15, 2023 07:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate optimizer Optimizer rules
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add back Distinct for where-exists if subquery is a DISTINCT
5 participants