Skip to content

[WIP] refactor: framework for subquery decorrelation #16016

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 22 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
4ba36c0
chore: add test
duongcongtoai Feb 3, 2025
79eaca3
chore: more progress
duongcongtoai Feb 10, 2025
7ed0831
temp
duongcongtoai Mar 18, 2025
cc97879
Merge remote-tracking branch 'origin/main' into 14554-unnest-subquery…
duongcongtoai Mar 18, 2025
5096937
Merge remote-tracking branch 'origin/main' into 14554-unnest-subquery…
duongcongtoai Apr 10, 2025
68fd9ca
chore: some work
duongcongtoai Apr 16, 2025
ace332e
chore: some work on indexed algebra
duongcongtoai Apr 27, 2025
da8980c
chore: more progress
duongcongtoai May 4, 2025
483e3ac
chore: impl projection pull up
duongcongtoai May 4, 2025
f14b145
chore: complete unnesting simple subquery
duongcongtoai May 6, 2025
0cd8143
chore: correct join condition
duongcongtoai May 8, 2025
cc3e01c
chore: handle exist query
duongcongtoai May 8, 2025
9b5daa2
test: in sq test
duongcongtoai May 10, 2025
f26baf8
test: exist with no dependent column
duongcongtoai May 10, 2025
37852c1
test: exist with dependent columns
duongcongtoai May 10, 2025
2544478
Merge remote-tracking branch 'origin/main' into 14554-subquery-unnest…
duongcongtoai May 10, 2025
e984a55
chore: remove redundant clone
duongcongtoai May 11, 2025
94aba08
feat: dummy implementation for aggregation
duongcongtoai May 13, 2025
0f039fe
feat: handle count bug
duongcongtoai May 15, 2025
898bdc4
feat: add sq alias step
duongcongtoai May 16, 2025
1a600b6
test: simple count decorrelate
duongcongtoai May 16, 2025
6ce21b3
chore: some work to support multiple subqueries per level
duongcongtoai May 17, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1655,6 +1655,25 @@ impl Expr {
using_columns
}

pub fn outer_column_refs(&self) -> HashSet<&Column> {
let mut using_columns = HashSet::new();
self.add_outer_column_refs(&mut using_columns);
using_columns
}

/// Adds references to all outer columns in this expression to the set
///
/// See [`Self::column_refs`] for details
pub fn add_outer_column_refs<'a>(&'a self, set: &mut HashSet<&'a Column>) {
self.apply(|expr| {
if let Expr::OuterReferenceColumn(_, col) = expr {
set.insert(col);
}
Ok(TreeNodeRecursion::Continue)
})
.expect("traversal is infallible");
}

/// Adds references to all columns in this expression to the set
///
/// See [`Self::column_refs`] for details
Expand Down Expand Up @@ -1715,6 +1734,19 @@ impl Expr {
.expect("exists closure is infallible")
}

/// Return true if the expression contains out reference(correlated) expressions.
pub fn contains_outer_from_relation(&self, outer_relation_name: &String) -> bool {
self.exists(|expr| {
if let Expr::OuterReferenceColumn(_, col) = expr {
if let Some(relation) = &col.relation {
return Ok(relation.table() == outer_relation_name);
}
}
Ok(false)
})
.expect("exists closure is infallible")
}

/// Returns true if the expression node is volatile, i.e. whether it can return
/// different results when evaluated multiple times with the same input.
/// Note: unlike [`Self::is_volatile`], this function does not consider inputs:
Expand Down
22 changes: 22 additions & 0 deletions datafusion/expr/src/expr_rewriter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,28 @@ pub fn normalize_sorts(
.collect()
}

/// Recursively rename the table of all [`Column`] expressions in a given expression tree with
/// a new name, ignoring the `skip_tables`
pub fn replace_col_base_table(
expr: Expr,
skip_tables: &[&str],
new_table: String,
) -> Result<Expr> {
expr.transform(|expr| {
if let Expr::Column(c) = &expr {
if let Some(relation) = &c.relation {
if !skip_tables.contains(&relation.table()) {
return Ok(Transformed::yes(Expr::Column(
c.with_relation(TableReference::bare(new_table.clone())),
)));
}
}
}
Ok(Transformed::no(expr))
})
.data()
}

/// Recursively replace all [`Column`] expressions in a given expression tree with
/// `Column` expressions provided by the hash map argument.
pub fn replace_col(expr: Expr, replace_map: &HashMap<&Column, &Column>) -> Result<Expr> {
Expand Down
25 changes: 25 additions & 0 deletions datafusion/expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -933,6 +933,31 @@ pub fn split_conjunction(expr: &Expr) -> Vec<&Expr> {
split_conjunction_impl(expr, vec![])
}

/// Splits a conjunctive [`Expr`] such as `A OR B OR C` => `[A, B, C]`
///
/// See [`split_disjunction`] for more details and an example.
pub fn split_disjunction(expr: &Expr) -> Vec<&Expr> {
split_disjunction_impl(expr, vec![])
}

fn split_disjunction_impl<'a>(expr: &'a Expr, mut exprs: Vec<&'a Expr>) -> Vec<&'a Expr> {
match expr {
Expr::BinaryExpr(BinaryExpr {
right,
op: Operator::Or,
left,
}) => {
let exprs = split_disjunction_impl(left, exprs);
split_disjunction_impl(right, exprs)
}
Expr::Alias(Alias { expr, .. }) => split_disjunction_impl(expr, exprs),
other => {
exprs.push(other);
exprs
}
}
}

fn split_conjunction_impl<'a>(expr: &'a Expr, mut exprs: Vec<&'a Expr>) -> Vec<&'a Expr> {
match expr {
Expr::BinaryExpr(BinaryExpr {
Expand Down
1 change: 1 addition & 0 deletions datafusion/optimizer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ chrono = { workspace = true }
datafusion-common = { workspace = true, default-features = true }
datafusion-expr = { workspace = true }
datafusion-physical-expr = { workspace = true }
datafusion-sql = { workspace = true }
indexmap = { workspace = true }
itertools = { workspace = true }
log = { workspace = true }
Expand Down
Loading