-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Add OuterReferenceColumn to Expr to represent correlated expression #5593
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
I will add more UTs to cover more complex cases:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great to me. Thanks @mingmwang.
@@ -1919,14 +1947,11 @@ impl Join { | |||
pub struct Subquery { | |||
/// The subquery | |||
pub subquery: Arc<LogicalPlan>, | |||
/// The outer references used in the subquery | |||
pub outer_ref_columns: Vec<Expr>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 This field can help to check if it is a correlated subquery.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes.
planner_context.outer_query_schema = Some(input_schema.clone()); | ||
let sub_plan = self.query_to_plan(subquery, planner_context)?; | ||
let outer_ref_columns = sub_plan.all_out_ref_exprs(); | ||
planner_context.outer_query_schema = old_outer_query_schema; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we support the following subquery in the future?
# three level
# the innermost(`t3`) reference outermost(`t1`) column
SELECT *
FROM t1
WHERE EXISTS (
SELECT *
FROM t2
WHERE t1_id = t2_id
AND EXISTS (
SELECT *
FROM t3
WHERE t3_id > t1_id
)
);
I just think the optimizer rule you plan to add(#5492) can also optimize it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm afraid I will not support this in recent. I think the method mentioned in the "Unnesting Arbitrary Queries" paper can cover this, but there is additional complexity need to deal with if there are some ambiguous to the out ref columns, because different level of out plans' schema might have ambiguous columns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me @mingmwang -- I left some suggestions about cleaning up the code in the planner, but the overall idea looks great to me. Thank you
@@ -81,6 +81,7 @@ impl ExpressionVisitor for ApplicabilityVisitor<'_> { | |||
} | |||
Expr::Literal(_) | |||
| Expr::Alias(_, _) | |||
| Expr::OuterReferenceColumn(_, _) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This idea makes a lot of sense to me 👍
@@ -179,3 +179,117 @@ async fn in_subquery_with_same_table() -> Result<()> { | |||
|
|||
Ok(()) | |||
} | |||
|
|||
#[tokio::test] | |||
async fn support_agg_correlated_columns() -> Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if possible I would recommend adding these tests as sqllogictest
-- https://github.com/apache/arrow-datafusion/tree/main/datafusion/core/tests/sqllogictests
They are much easier to change / maintain in my opinion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If test just relate with plan tree
, I think it's also ok to keep it here.
Because we can debug it easily, and see plan directly in the code.
#[tokio::test] | ||
async fn support_join_correlated_columns2() -> Result<()> { | ||
let ctx = create_sub_query_join_context("t0_id", "t1_id", "t2_id", true)?; | ||
let sql = "SELECT t0_id, t0_name FROM t0 WHERE EXISTS (SELECT 1 FROM t1 INNER JOIN (select * from t2 where t2.t2_name = t0.t0_name) as t2 ON(t1.t1_id = t2.t2_id ))"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is neat to see a correlation to the (second) layer of query
if filter.contains_outer() { | ||
joins.push(strip_outer_reference((*filter).clone())); | ||
} else { | ||
// TODO remove the logic |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand this TODO -- is it meant for this PR or for some other one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the original decorrelation rules, there was complex logic to find out join columns, this is unnecessary after this PR.
I will clean up the code in the next PR.
let outer_query_schema_opt = | ||
planner_context.outer_query_schema.clone(); | ||
if let Some(outer) = outer_query_schema_opt.as_ref() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason for this clone (of the entire schema)? Could it be more like
let outer_query_schema_opt = | |
planner_context.outer_query_schema.clone(); | |
if let Some(outer) = outer_query_schema_opt.as_ref() { | |
let outer_query_schema_opt = planner_context.outer_query_schema.as_ref(); | |
if let Some(outer) = outer_query_schema_opt { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same thing applies to the other places in this file
let outer_query_schema_opt = | ||
planner_context.outer_query_schema.clone(); | ||
if let Some(outer) = outer_query_schema_opt.as_ref() { | ||
match outer.field_with_qualified_name(&relation, &name) { | ||
Ok(field) => { | ||
// found an exact match on a qualified name in the outer plan schema, so this is an outer reference column | ||
Ok(Expr::OuterReferenceColumn( | ||
field.data_type().clone(), | ||
Column { | ||
relation: Some(relation), | ||
name, | ||
}, | ||
)) | ||
} | ||
Err(_) => Ok(Expr::Column(Column { | ||
relation: Some(relation), | ||
name, | ||
})), | ||
} | ||
} else { | ||
// table.column identifier | ||
Ok(Expr::Column(Column { | ||
relation: Some(relation), | ||
name, | ||
})) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This pattern appears to be repeated several times -- maybe it could be refactored into a reusable function rather than being inlined several places?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, will do.
} else { | ||
vec![] | ||
}; | ||
let outer_query_schema = planner_context.outer_query_schema.clone(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
again here I think we can avoid a clone
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree with it.
We can avoid a clone
@@ -2635,7 +2635,7 @@ fn exists_subquery() { | |||
\n Filter: EXISTS (<subquery>)\ | |||
\n Subquery:\ | |||
\n Projection: person.first_name\ | |||
\n Filter: person.last_name = p.last_name AND person.state = p.state\ | |||
\n Filter: person.last_name = outer_ref(p.last_name) AND person.state = outer_ref(p.state)\ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find this actually much easier to read 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A nice job to me.
} else { | ||
vec![] | ||
}; | ||
let outer_query_schema = planner_context.outer_query_schema.clone(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree with it.
We can avoid a clone
@@ -149,6 +152,7 @@ fn optimize_where_in( | |||
let projection = Projection::try_from_plan(&query_info.query.subquery) | |||
.map_err(|e| context!("a projection is required", e))?; | |||
let subquery_input = projection.input.clone(); | |||
// TODO add the validate logic to Analyzer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😍Look forward it!
In the future, I also prepare to do some check job in the Analyzer.
@@ -179,3 +179,117 @@ async fn in_subquery_with_same_table() -> Result<()> { | |||
|
|||
Ok(()) | |||
} | |||
|
|||
#[tokio::test] | |||
async fn support_agg_correlated_columns() -> Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If test just relate with plan tree
, I think it's also ok to keep it here.
Because we can debug it easily, and see plan directly in the code.
Looks like this PR has some conflicts to fix but then it will be good to go |
@alamb @jackwener |
Which issue does this PR close?
Closes #5571.
Original design doc:
https://docs.google.com/document/d/1j5vHyva-T_5l3POnHSS0-r5TOp86sEkfSUcRLqJZaK4/edit#heading=h.wgtywgh0byuf
Rationale for this change
This is first part to support more general and complex subqueries and subquery expressions.
This PR add the
OuterReferenceColumn
toExpr
to represent the correlated expressions, no matter how deeplythe correlated expressions exist.
During the
SqlToRel
(planner) process, correlated expressions will be marked asOuterReferenceColumn
.This will make sure the correlated expressions can pass the
inner plan
's schema validation. It will also simply the logicof
Decorrelation
rules. The goal of theDecorrelation
rule is just to find out the correlated expressions and convert to differentJoin
s.What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?