-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Refactor DecorrelateWhereExists and add back Distinct if needs #5345
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
I would like to add a rule to replace the |
It's a good idea. |
@ygf11 |
\n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]"; | ||
|
||
assert_plan_eq(&plan, expected) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find the behaviors of Distinct
in postgres and spark are not same.
- For postgres, it will not add back the distinct to the optimized result.
- For spark, it will add back the distinct, and will keep the unused project exprs(
sq.b + sq.c
in the above).
I think sq.b + sq.c
is not used any more, so remove it.
I am not sure if the way of this pr is appropriate.
I want to hear your advice @mingmwang @alamb @jackwener
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For postgres, it will not add back the distinct to the optimized result.
I think it likely depends on how the join operators are implemented
# current logical plan:
Projection: t1.t1_id
LeftSemi Join: Filter: t2.t2_id > t1.t1_id
TableScan: t1 projection=[t1_id]
Projection: t2.t2_id # DISTINCT is not added back
TableScan: t2 projection=[t2_id]
If the LeftSemiJoin
is going to read its build side into (effectively) a HashSet then it doesn't really matter if the input is deduplicated prior to input.
However, since there is no equality predicate (the predicate is >
) , i am not sure what our LeftSemiJoin will do (maybe it will buffer the entire build side?) in which case it might help performance to put a DISTINCT on he output of the Projection to reduce the cardinality that the join buffers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think sq.b + sq.c is not used any more, so remove it.
This is a good job.
For postgres, it will not add back the distinct to the optimized result.
I'm also not sure about it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find the behaviors of
Distinct
in postgres and spark are not same.
- For postgres, it will not add back the distinct to the optimized result.
- For spark, it will add back the distinct, and will keep the unused project exprs(
sq.b + sq.c
in the above).
@ygf11 @jackwener @alamb
I guess why Postgres remove the Distinct
in the final result is maybe it is related to how it implements NAAJ(Null Aware Anti Join) and to make sure the result is correct.
You can test try those SQLs in Postgres:
CREATE TABLE t1 (id integer,name varchar(100));
CREATE TABLE t2 (id integer,name varchar(100));
explain
SELECT t1.id, t1.name FROM t1 WHERE EXISTS (SELECT distinct * FROM t2 WHERE t2.id = t1.id);
QUERY PLAN
Hash Join (cost=18.50..34.32 rows=160 width=222)
Hash Cond: (t1.id = t2.id)
-> Seq Scan on t1 (cost=0.00..13.20 rows=320 width=222)
-> Hash (cost=16.00..16.00 rows=200 width=4)
-> HashAggregate (cost=14.00..16.00 rows=200 width=4)
Group Key: t2.id
-> Seq Scan on t2 (cost=0.00..13.20 rows=320 width=4)
explain
SELECT t1.id, t1.name FROM t1 WHERE NOT EXISTS (SELECT distinct t2.name FROM t2 WHERE t2.id = t1.id);
QUERY PLAN
--
Hash Anti Join (cost=17.20..35.82 rows=160 width=222)
Hash Cond: (t1.id = t2.id)
-> Seq Scan on t1 (cost=0.00..13.20 rows=320 width=222)
-> Hash (cost=13.20..13.20 rows=320 width=4)
-> Seq Scan on t2 (cost=0.00..13.20 rows=320 width=4)
explain
SELECT t1.id, t1.name FROM t1 WHERE t1.id in (SELECT distinct t2.id FROM t2);
QUERY PLAN
--
Hash Join (cost=20.50..34.56 rows=320 width=222)
Hash Cond: (t1.id = t2.id)
-> Seq Scan on t1 (cost=0.00..13.20 rows=320 width=222)
-> Hash (cost=18.00..18.00 rows=200 width=4)
-> HashAggregate (cost=14.00..16.00 rows=200 width=4)
Group Key: t2.id
-> Seq Scan on t2 (cost=0.00..13.20 rows=320 width=4)
explain
SELECT t1.id, t1.name FROM t1 WHERE t1.id not in (SELECT distinct t2.id FROM t2);
QUERY PLAN
--
Seq Scan on t1 (cost=16.50..30.50 rows=160 width=222)
Filter: (NOT (hashed SubPlan 1))
SubPlan 1
-> HashAggregate (cost=14.00..16.00 rows=200 width=4)
Group Key: t2.id
-> Seq Scan on t2 (cost=0.00..13.20 rows=320 width=4
You can see that for IN
/EXISTS
subqueries, there are Aggregates
. But for NOT EXISTS
, the Aggregateis removed, and for
NOT IN`, the subquery is kept and not decorrelated.
I think in DataFusion, I like the implementation in this PR and the generated plan is more consistent. For the correctness of NAAJ, we have another ticket to track this and we can make the Hash Join itself Null aware.
This pr is ready for review. |
) | ||
}); | ||
Ok(optimized_plan) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure the behavior of Distinct
is correct, so do not handle Aggregate
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure the behavior of
Distinct
is correct, so do not handleAggregate
here.
I will take a closer look at this PR tomorrow. If you do not know how to handle Aggregate
here, you can just leave it here and only handle the Distinct
case.
A simplest approach is checking whether there are out reference used by Aggregate
expressions, if there are, return Err and else add the correlated columns to be part of the group by columns. (Only allow out reference columns referred in Filter
expressions or Join
expressions, this will limit the supported cases of Subqueries
, but more safe.)
For example if the original inner aggregate is group by inner_a
and there is correlation condition like outer_b = inner_b
, them add the inner_b
to be part of the group by conditions. It is not a perfect solution and sometimes might cause some bug.
SparkSQL has the similar logic in the rule pullOutCorrelatedPredicates
case a @ Aggregate(grouping, expressions, child) =>
val referencesToAdd = missingReferences(a)
if (referencesToAdd.nonEmpty) {
Aggregate(grouping ++ referencesToAdd, expressions ++ referencesToAdd, child)
} else {
a
}
The latest SparkSQL's implementation also has bug here (need to differ the original Aggregate is Scalar Aggregate or Vector Aggregate)
SQL to reproduce the bug, you can have a try on both PostgreSQL and SparkSQL
CREATE TABLE t1 (id INT,name String);
CREATE TABLE t2 (id INT,name String);
insert into t1 values (11, "a"), (11, "a"), (22, "b"), (33, "c"), (44, "d"), (null, "e");
insert into t2 values (111, "z"), (111, "z"), (222, "y"), (444, "x"), (555, "w"), (null, "v");
-- Should output all t1
SELECT t1.id, t1.name FROM t1 WHERE EXISTS (select 0);
-- Should output all t1
SELECT t1.id, t1.name FROM t1 WHERE EXISTS (SELECT count(*) FROM t2 WHERE t2.id = t1.id);
-- Should output all t1
SELECT t1.id, t1.name FROM t1 WHERE EXISTS (SELECT count(*) FROM t2 WHERE t2.id = t1.id having count(*) = 0);
PostgreSQL does not support decorrelate Subqueries
which include Aggregates
. SparkSQL supports some cases but not all the cases and the implementation has bug.
I'm going to implement a general subquery decorrelation rule based on the two well known papers.
#5492
Orthogonal Optimization of Subqueries and Aggregation
https://dl.acm.org/doi/10.1145/375663.375748
Unnesting Arbitrary Queries
https://cs.emis.de/LNI/Proceedings/Proceedings241/383.pdf
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm going to implement a general subquery decorrelation rule based on the two well known papers.
Great job !
may related #5368
@ygf11 |
LGTM |
\n Aggregate: groupBy=[[t2.col_int32]], aggr=[[]]\ | ||
\n SubqueryAlias: t2\ | ||
\n TableScan: test projection=[col_int32]"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure whether this change must be good.
Because Agg and LeftSemi both can dedup, there will be a new operator, it may cause performance loss.
how do you think? @mingmwang
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most of the time adding Distinct
/Aggregate
is better, it can reduce the data volumes before the Join
(and reduce the shuffle data volumes before the Join
in a distribute engine).
Regarding the positions of Join
/Aggregation
, this is a another topic, in some engines they leverage CBO rules to decide push down/pull up Aggregation
through the Join
.
Snowflake implements a feature called Adaptive Aggregate placement, the basic idea is if the Aggregate
added by the optimizer can not dedup, just skip the Aggregate
.
I review this PR carefully. It's a great job to me except I'm not sure about a trade-off(maybe) -- add distinct. |
Thanks @alamb @mingmwang review ! |
Thanks for your ideas, learned a lot! @mingmwang @alamb @jackwener And I fixed the merge conflict. |
Benchmark runs are scheduled for baseline = 860918d and contender = 9587339. 9587339 is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
Which issue does this PR close?
Closes #5344.
Rationale for this change
In #5264, we support rewriting more
where-exists
to join, but do not add back theDistinct
.For query:
Although the execution result is correct, we should add back the
DISTINCT
.In some cases,
DISTINCT
can filter a lot rows before join, we should letuser
choose.What changes are included in this PR?
Refactor
optimize_exists
and addoptimize_subquery
.Are these changes tested?
Are there any user-facing changes?