Skip to content

Add OuterReferenceColumn to Expr to represent correlated expression #5593

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Mar 20, 2023

Conversation

mingmwang
Copy link
Contributor

@mingmwang mingmwang commented Mar 14, 2023

Which issue does this PR close?

Closes #5571.

Original design doc:
https://docs.google.com/document/d/1j5vHyva-T_5l3POnHSS0-r5TOp86sEkfSUcRLqJZaK4/edit#heading=h.wgtywgh0byuf

Rationale for this change

This is first part to support more general and complex subqueries and subquery expressions.
This PR add the OuterReferenceColumn to Expr to represent the correlated expressions, no matter how deeply
the correlated expressions exist.

During the SqlToRel(planner) process, correlated expressions will be marked as OuterReferenceColumn.
This will make sure the correlated expressions can pass the inner plan's schema validation. It will also simply the logic
of Decorrelation rules. The goal of the Decorrelation rule is just to find out the correlated expressions and convert to different Joins.

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

@github-actions github-actions bot added core Core DataFusion crate logical-expr Logical plan and expressions optimizer Optimizer rules sql SQL Planner labels Mar 14, 2023
@mingmwang
Copy link
Contributor Author

I will add more UTs to cover more complex cases:

  1. Subquery with Limit
  2. Subquery with Order by
  3. Subquery with Window Functions
  4. Subquery with Union

@mingmwang
Copy link
Contributor Author

@jackwener @ygf11 @alamb

@jackwener jackwener self-requested a review March 16, 2023 10:43
Copy link
Contributor

@ygf11 ygf11 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great to me. Thanks @mingmwang.

@@ -1919,14 +1947,11 @@ impl Join {
pub struct Subquery {
/// The subquery
pub subquery: Arc<LogicalPlan>,
/// The outer references used in the subquery
pub outer_ref_columns: Vec<Expr>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 This field can help to check if it is a correlated subquery.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

planner_context.outer_query_schema = Some(input_schema.clone());
let sub_plan = self.query_to_plan(subquery, planner_context)?;
let outer_ref_columns = sub_plan.all_out_ref_exprs();
planner_context.outer_query_schema = old_outer_query_schema;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we support the following subquery in the future?

# three level
# the innermost(`t3`) reference outermost(`t1`) column
SELECT *
FROM t1
WHERE EXISTS (
	SELECT *
	FROM t2
	WHERE t1_id = t2_id
		AND EXISTS (
			SELECT *
			FROM t3
			WHERE t3_id > t1_id
		)
);

I just think the optimizer rule you plan to add(#5492) can also optimize it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid I will not support this in recent. I think the method mentioned in the "Unnesting Arbitrary Queries" paper can cover this, but there is additional complexity need to deal with if there are some ambiguous to the out ref columns, because different level of out plans' schema might have ambiguous columns.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me @mingmwang -- I left some suggestions about cleaning up the code in the planner, but the overall idea looks great to me. Thank you

@@ -81,6 +81,7 @@ impl ExpressionVisitor for ApplicabilityVisitor<'_> {
}
Expr::Literal(_)
| Expr::Alias(_, _)
| Expr::OuterReferenceColumn(_, _)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This idea makes a lot of sense to me 👍

@@ -179,3 +179,117 @@ async fn in_subquery_with_same_table() -> Result<()> {

Ok(())
}

#[tokio::test]
async fn support_agg_correlated_columns() -> Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if possible I would recommend adding these tests as sqllogictest -- https://github.com/apache/arrow-datafusion/tree/main/datafusion/core/tests/sqllogictests

They are much easier to change / maintain in my opinion

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If test just relate with plan tree, I think it's also ok to keep it here.
Because we can debug it easily, and see plan directly in the code.

#[tokio::test]
async fn support_join_correlated_columns2() -> Result<()> {
let ctx = create_sub_query_join_context("t0_id", "t1_id", "t2_id", true)?;
let sql = "SELECT t0_id, t0_name FROM t0 WHERE EXISTS (SELECT 1 FROM t1 INNER JOIN (select * from t2 where t2.t2_name = t0.t0_name) as t2 ON(t1.t1_id = t2.t2_id ))";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is neat to see a correlation to the (second) layer of query

if filter.contains_outer() {
joins.push(strip_outer_reference((*filter).clone()));
} else {
// TODO remove the logic
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this TODO -- is it meant for this PR or for some other one

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the original decorrelation rules, there was complex logic to find out join columns, this is unnecessary after this PR.
I will clean up the code in the next PR.

Comment on lines 62 to 64
let outer_query_schema_opt =
planner_context.outer_query_schema.clone();
if let Some(outer) = outer_query_schema_opt.as_ref() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason for this clone (of the entire schema)? Could it be more like

Suggested change
let outer_query_schema_opt =
planner_context.outer_query_schema.clone();
if let Some(outer) = outer_query_schema_opt.as_ref() {
let outer_query_schema_opt = planner_context.outer_query_schema.as_ref();
if let Some(outer) = outer_query_schema_opt {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same thing applies to the other places in this file

Comment on lines 141 to 166
let outer_query_schema_opt =
planner_context.outer_query_schema.clone();
if let Some(outer) = outer_query_schema_opt.as_ref() {
match outer.field_with_qualified_name(&relation, &name) {
Ok(field) => {
// found an exact match on a qualified name in the outer plan schema, so this is an outer reference column
Ok(Expr::OuterReferenceColumn(
field.data_type().clone(),
Column {
relation: Some(relation),
name,
},
))
}
Err(_) => Ok(Expr::Column(Column {
relation: Some(relation),
name,
})),
}
} else {
// table.column identifier
Ok(Expr::Column(Column {
relation: Some(relation),
name,
}))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pattern appears to be repeated several times -- maybe it could be refactored into a reusable function rather than being inlined several places?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will do.

} else {
vec![]
};
let outer_query_schema = planner_context.outer_query_schema.clone();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again here I think we can avoid a clone

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with it.
We can avoid a clone

@@ -2635,7 +2635,7 @@ fn exists_subquery() {
\n Filter: EXISTS (<subquery>)\
\n Subquery:\
\n Projection: person.first_name\
\n Filter: person.last_name = p.last_name AND person.state = p.state\
\n Filter: person.last_name = outer_ref(p.last_name) AND person.state = outer_ref(p.state)\
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find this actually much easier to read 👍

Copy link
Member

@jackwener jackwener left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A nice job to me.

} else {
vec![]
};
let outer_query_schema = planner_context.outer_query_schema.clone();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with it.
We can avoid a clone

@@ -149,6 +152,7 @@ fn optimize_where_in(
let projection = Projection::try_from_plan(&query_info.query.subquery)
.map_err(|e| context!("a projection is required", e))?;
let subquery_input = projection.input.clone();
// TODO add the validate logic to Analyzer
Copy link
Member

@jackwener jackwener Mar 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😍Look forward it!

In the future, I also prepare to do some check job in the Analyzer.

@@ -179,3 +179,117 @@ async fn in_subquery_with_same_table() -> Result<()> {

Ok(())
}

#[tokio::test]
async fn support_agg_correlated_columns() -> Result<()> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If test just relate with plan tree, I think it's also ok to keep it here.
Because we can debug it easily, and see plan directly in the code.

@alamb
Copy link
Contributor

alamb commented Mar 18, 2023

Looks like this PR has some conflicts to fix but then it will be good to go

@mingmwang
Copy link
Contributor Author

@alamb @jackwener
Please help to take a look again. Conflicts are resolved now.

@alamb
Copy link
Contributor

alamb commented Mar 20, 2023

Please help to take a look again. Conflicts are resolved now.

LGTM but sadly there appear to be some more conflicts now:

Screenshot 2023-03-20 at 9 25 26 AM

I took the liberty to merge from main and resolve the new conflicts -- I plan to merge this PR when CI passes

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate logical-expr Logical plan and expressions optimizer Optimizer rules sql SQL Planner
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Resolve outer reference columns
4 participants