diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs index e366169650de..d066d8d9dd2c 100644 --- a/datafusion/core/src/datasource/listing/helpers.rs +++ b/datafusion/core/src/datasource/listing/helpers.rs @@ -93,6 +93,8 @@ impl ExpressionVisitor for ApplicabilityVisitor<'_> { | Expr::Between { .. } | Expr::InList { .. } | Expr::Exists { .. } + | Expr::InSubquery { .. } + | Expr::ScalarSubquery(_) | Expr::GetIndexedField { .. } | Expr::Case { .. } => Recursion::Continue(self), diff --git a/datafusion/core/src/logical_plan/builder.rs b/datafusion/core/src/logical_plan/builder.rs index c303cdec8639..0b0fdebb68f7 100644 --- a/datafusion/core/src/logical_plan/builder.rs +++ b/datafusion/core/src/logical_plan/builder.rs @@ -1366,6 +1366,56 @@ mod tests { Ok(()) } + #[test] + fn filter_in_subquery() -> Result<()> { + let foo = test_table_scan_with_name("foo")?; + let bar = test_table_scan_with_name("bar")?; + + let subquery = LogicalPlanBuilder::from(foo) + .project(vec![col("a")])? + .filter(col("a").eq(col("bar.a")))? + .build()?; + + // SELECT a FROM bar WHERE a IN (SELECT a FROM foo WHERE a = bar.a) + let outer_query = LogicalPlanBuilder::from(bar) + .project(vec![col("a")])? + .filter(in_subquery(col("a"), Arc::new(subquery)))? + .build()?; + + let expected = "Filter: #bar.a IN (Subquery: Filter: #foo.a = #bar.a\ + \n Projection: #foo.a\ + \n TableScan: foo projection=None)\ + \n Projection: #bar.a\ + \n TableScan: bar projection=None"; + assert_eq!(expected, format!("{:?}", outer_query)); + + Ok(()) + } + + #[test] + fn select_scalar_subquery() -> Result<()> { + let foo = test_table_scan_with_name("foo")?; + let bar = test_table_scan_with_name("bar")?; + + let subquery = LogicalPlanBuilder::from(foo) + .project(vec![col("b")])? + .filter(col("a").eq(col("bar.a")))? + .build()?; + + // SELECT (SELECT a FROM foo WHERE a = bar.a) FROM bar + let outer_query = LogicalPlanBuilder::from(bar) + .project(vec![scalar_subquery(Arc::new(subquery))])? + .build()?; + + let expected = "Projection: (Subquery: Filter: #foo.a = #bar.a\ + \n Projection: #foo.b\ + \n TableScan: foo projection=None)\ + \n TableScan: bar projection=None"; + assert_eq!(expected, format!("{:?}", outer_query)); + + Ok(()) + } + #[test] fn projection_non_unique_names() -> Result<()> { let plan = LogicalPlanBuilder::scan_empty( diff --git a/datafusion/core/src/logical_plan/expr.rs b/datafusion/core/src/logical_plan/expr.rs index 8935170cfc43..673345c69b61 100644 --- a/datafusion/core/src/logical_plan/expr.rs +++ b/datafusion/core/src/logical_plan/expr.rs @@ -71,6 +71,7 @@ pub fn columnize_expr(e: Expr, input_schema: &DFSchema) -> Expr { Expr::Alias(inner_expr, name) => { Expr::Alias(Box::new(columnize_expr(*inner_expr, input_schema)), name) } + Expr::ScalarSubquery(_) => e.clone(), _ => match e.name(input_schema) { Ok(name) => match input_schema.field_with_unqualified_name(&name) { Ok(field) => Expr::Column(field.qualified_column()), diff --git a/datafusion/core/src/logical_plan/expr_rewriter.rs b/datafusion/core/src/logical_plan/expr_rewriter.rs index e99fc7e66cf8..ef6d8dff9c8f 100644 --- a/datafusion/core/src/logical_plan/expr_rewriter.rs +++ b/datafusion/core/src/logical_plan/expr_rewriter.rs @@ -111,7 +111,17 @@ impl ExprRewritable for Expr { let expr = match self { Expr::Alias(expr, name) => Expr::Alias(rewrite_boxed(expr, rewriter)?, name), Expr::Column(_) => self.clone(), - Expr::Exists(_) => self.clone(), + Expr::Exists { .. } => self.clone(), + Expr::InSubquery { + expr, + subquery, + negated, + } => Expr::InSubquery { + expr: rewrite_boxed(expr, rewriter)?, + subquery, + negated, + }, + Expr::ScalarSubquery(_) => self.clone(), Expr::ScalarVariable(ty, names) => Expr::ScalarVariable(ty, names), Expr::Literal(value) => Expr::Literal(value), Expr::BinaryExpr { left, op, right } => Expr::BinaryExpr { diff --git a/datafusion/core/src/logical_plan/expr_visitor.rs b/datafusion/core/src/logical_plan/expr_visitor.rs index bfab0ca04c75..7c578da19b75 100644 --- a/datafusion/core/src/logical_plan/expr_visitor.rs +++ b/datafusion/core/src/logical_plan/expr_visitor.rs @@ -106,7 +106,9 @@ impl ExprVisitable for Expr { Expr::Column(_) | Expr::ScalarVariable(_, _) | Expr::Literal(_) - | Expr::Exists(_) + | Expr::Exists { .. } + | Expr::InSubquery { .. } + | Expr::ScalarSubquery(_) | Expr::Wildcard | Expr::QualifiedWildcard { .. } => Ok(visitor), Expr::BinaryExpr { left, right, .. } => { diff --git a/datafusion/core/src/logical_plan/mod.rs b/datafusion/core/src/logical_plan/mod.rs index d933b0229b1f..bcf0a161447f 100644 --- a/datafusion/core/src/logical_plan/mod.rs +++ b/datafusion/core/src/logical_plan/mod.rs @@ -41,13 +41,14 @@ pub use expr::{ avg, bit_length, btrim, call_fn, case, ceil, character_length, chr, coalesce, col, columnize_expr, combine_filters, concat, concat_expr, concat_ws, concat_ws_expr, cos, count, count_distinct, create_udaf, create_udf, date_part, date_trunc, digest, - exists, exp, exprlist_to_fields, floor, in_list, initcap, left, length, lit, - lit_timestamp_nano, ln, log10, log2, lower, lpad, ltrim, max, md5, min, now, - now_expr, nullif, octet_length, or, random, regexp_match, regexp_replace, repeat, - replace, reverse, right, round, rpad, rtrim, sha224, sha256, sha384, sha512, signum, - sin, split_part, sqrt, starts_with, strpos, substr, sum, tan, to_hex, - to_timestamp_micros, to_timestamp_millis, to_timestamp_seconds, translate, trim, - trunc, unalias, upper, when, Column, Expr, ExprSchema, Literal, + exists, exp, exprlist_to_fields, floor, in_list, in_subquery, initcap, left, length, + lit, lit_timestamp_nano, ln, log10, log2, lower, lpad, ltrim, max, md5, min, + not_exists, not_in_subquery, now, now_expr, nullif, octet_length, or, random, + regexp_match, regexp_replace, repeat, replace, reverse, right, round, rpad, rtrim, + scalar_subquery, sha224, sha256, sha384, sha512, signum, sin, split_part, sqrt, + starts_with, strpos, substr, sum, tan, to_hex, to_timestamp_micros, + to_timestamp_millis, to_timestamp_seconds, translate, trim, trunc, unalias, upper, + when, Column, Expr, ExprSchema, Literal, }; pub use expr_rewriter::{ normalize_col, normalize_cols, replace_col, rewrite_sort_cols_by_aggs, diff --git a/datafusion/core/src/optimizer/common_subexpr_eliminate.rs b/datafusion/core/src/optimizer/common_subexpr_eliminate.rs index 4a9bf8e913a0..a9983cdf1e08 100644 --- a/datafusion/core/src/optimizer/common_subexpr_eliminate.rs +++ b/datafusion/core/src/optimizer/common_subexpr_eliminate.rs @@ -460,8 +460,16 @@ impl ExprIdentifierVisitor<'_> { desc.push_str("InList-"); desc.push_str(&negated.to_string()); } - Expr::Exists(_) => { + Expr::Exists { negated, .. } => { desc.push_str("Exists-"); + desc.push_str(&negated.to_string()); + } + Expr::InSubquery { negated, .. } => { + desc.push_str("InSubquery-"); + desc.push_str(&negated.to_string()); + } + Expr::ScalarSubquery(_) => { + desc.push_str("ScalarSubquery-"); } Expr::Wildcard => { desc.push_str("Wildcard-"); diff --git a/datafusion/core/src/optimizer/simplify_expressions.rs b/datafusion/core/src/optimizer/simplify_expressions.rs index 93d9fb506177..457224619007 100644 --- a/datafusion/core/src/optimizer/simplify_expressions.rs +++ b/datafusion/core/src/optimizer/simplify_expressions.rs @@ -375,7 +375,9 @@ impl<'a> ConstEvaluator<'a> { | Expr::AggregateUDF { .. } | Expr::ScalarVariable(_, _) | Expr::Column(_) - | Expr::Exists(_) + | Expr::Exists { .. } + | Expr::InSubquery { .. } + | Expr::ScalarSubquery(_) | Expr::WindowFunction { .. } | Expr::Sort { .. } | Expr::Wildcard diff --git a/datafusion/core/src/optimizer/utils.rs b/datafusion/core/src/optimizer/utils.rs index 939af8041562..df36761fec40 100644 --- a/datafusion/core/src/optimizer/utils.rs +++ b/datafusion/core/src/optimizer/utils.rs @@ -85,7 +85,9 @@ impl ExpressionVisitor for ColumnNameVisitor<'_> { | Expr::AggregateFunction { .. } | Expr::AggregateUDF { .. } | Expr::InList { .. } - | Expr::Exists(_) + | Expr::Exists { .. } + | Expr::InSubquery { .. } + | Expr::ScalarSubquery(_) | Expr::Wildcard | Expr::QualifiedWildcard { .. } | Expr::GetIndexedField { .. } => {} @@ -371,7 +373,9 @@ pub fn expr_sub_expressions(expr: &Expr) -> Result> { } Ok(expr_list) } - Expr::Exists(_) => Ok(vec![]), + Expr::Exists { .. } => Ok(vec![]), + Expr::InSubquery { expr, .. } => Ok(vec![expr.as_ref().to_owned()]), + Expr::ScalarSubquery(_) => Ok(vec![]), Expr::Wildcard { .. } => Err(DataFusionError::Internal( "Wildcard expressions are not valid in a logical query plan".to_owned(), )), @@ -506,7 +510,9 @@ pub fn rewrite_expression(expr: &Expr, expressions: &[Expr]) -> Result { Expr::Column(_) | Expr::Literal(_) | Expr::InList { .. } - | Expr::Exists(_) + | Expr::Exists { .. } + | Expr::InSubquery { .. } + | Expr::ScalarSubquery(_) | Expr::ScalarVariable(_, _) => Ok(expr.clone()), Expr::Sort { asc, nulls_first, .. diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index 84785777b016..4cca768c7a34 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -186,9 +186,15 @@ fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result { Ok(format!("{} IN ({:?})", expr, list)) } } - Expr::Exists(_) => Err(DataFusionError::NotImplemented( + Expr::Exists { .. } => Err(DataFusionError::NotImplemented( "EXISTS is not yet supported in the physical plan".to_string(), )), + Expr::InSubquery { .. } => Err(DataFusionError::NotImplemented( + "IN subquery is not yet supported in the physical plan".to_string(), + )), + Expr::ScalarSubquery(_) => Err(DataFusionError::NotImplemented( + "Scalar subqueries are not yet supported in the physical plan".to_string(), + )), Expr::Between { expr, negated, diff --git a/datafusion/core/src/prelude.rs b/datafusion/core/src/prelude.rs index e0c418417c5c..11ca3332d5a5 100644 --- a/datafusion/core/src/prelude.rs +++ b/datafusion/core/src/prelude.rs @@ -33,8 +33,9 @@ pub use crate::execution::options::{ pub use crate::logical_plan::{ approx_percentile_cont, array, ascii, avg, bit_length, btrim, character_length, chr, coalesce, col, concat, concat_ws, count, create_udf, date_part, date_trunc, digest, - exists, in_list, initcap, left, length, lit, lower, lpad, ltrim, max, md5, min, now, - octet_length, random, regexp_match, regexp_replace, repeat, replace, reverse, right, - rpad, rtrim, sha224, sha256, sha384, sha512, split_part, starts_with, strpos, substr, - sum, to_hex, translate, trim, upper, Column, JoinType, Partitioning, + exists, in_list, in_subquery, initcap, left, length, lit, lower, lpad, ltrim, max, + md5, min, not_exists, not_in_subquery, now, octet_length, random, regexp_match, + regexp_replace, repeat, replace, reverse, right, rpad, rtrim, scalar_subquery, + sha224, sha256, sha384, sha512, split_part, starts_with, strpos, substr, sum, to_hex, + translate, trim, upper, Column, JoinType, Partitioning, }; diff --git a/datafusion/core/src/sql/utils.rs b/datafusion/core/src/sql/utils.rs index f9242c6aab63..9ed7f0b66cc9 100644 --- a/datafusion/core/src/sql/utils.rs +++ b/datafusion/core/src/sql/utils.rs @@ -371,7 +371,17 @@ where Expr::Column { .. } | Expr::Literal(_) | Expr::ScalarVariable(_, _) - | Expr::Exists(_) => Ok(expr.clone()), + | Expr::Exists { .. } + | Expr::ScalarSubquery(_) => Ok(expr.clone()), + Expr::InSubquery { + expr: nested_expr, + subquery, + negated, + } => Ok(Expr::InSubquery { + expr: Box::new(clone_with_replacement(&**nested_expr, replacement_fn)?), + subquery: subquery.clone(), + negated: *negated, + }), Expr::Wildcard => Ok(Expr::Wildcard), Expr::QualifiedWildcard { .. } => Ok(expr.clone()), Expr::GetIndexedField { expr, key } => Ok(Expr::GetIndexedField { diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs index 88c489670588..4d88ed815b14 100644 --- a/datafusion/expr/src/expr.rs +++ b/datafusion/expr/src/expr.rs @@ -228,7 +228,23 @@ pub enum Expr { negated: bool, }, /// EXISTS subquery - Exists(Subquery), + Exists { + /// subquery that will produce a single column of data + subquery: Subquery, + /// Whether the expression is negated + negated: bool, + }, + /// IN subquery + InSubquery { + /// The expression to compare + expr: Box, + /// subquery that will produce a single column of data to compare against + subquery: Subquery, + /// Whether the expression is negated + negated: bool, + }, + /// Scalar subquery + ScalarSubquery(Subquery), /// Represents a reference to all fields in a schema. Wildcard, /// Represents a reference to all fields in a specific schema. @@ -434,7 +450,25 @@ impl fmt::Debug for Expr { Expr::Negative(expr) => write!(f, "(- {:?})", expr), Expr::IsNull(expr) => write!(f, "{:?} IS NULL", expr), Expr::IsNotNull(expr) => write!(f, "{:?} IS NOT NULL", expr), - Expr::Exists(subquery) => write!(f, "EXISTS ({:?})", subquery), + Expr::Exists { + subquery, + negated: true, + } => write!(f, "NOT EXISTS ({:?})", subquery), + Expr::Exists { + subquery, + negated: false, + } => write!(f, "EXISTS ({:?})", subquery), + Expr::InSubquery { + expr, + subquery, + negated: true, + } => write!(f, "{:?} NOT IN ({:?})", expr, subquery), + Expr::InSubquery { + expr, + subquery, + negated: false, + } => write!(f, "{:?} IN ({:?})", expr, subquery), + Expr::ScalarSubquery(subquery) => write!(f, "({:?})", subquery), Expr::BinaryExpr { left, op, right } => { write!(f, "{:?} {} {:?}", left, op, right) } @@ -622,7 +656,13 @@ fn create_name(e: &Expr, input_schema: &DFSchema) -> Result { let expr = create_name(expr, input_schema)?; Ok(format!("{} IS NOT NULL", expr)) } - Expr::Exists(_) => Ok("EXISTS".to_string()), + Expr::Exists { negated: true, .. } => Ok("NOT EXISTS".to_string()), + Expr::Exists { negated: false, .. } => Ok("EXISTS".to_string()), + Expr::InSubquery { negated: true, .. } => Ok("NOT IN".to_string()), + Expr::InSubquery { negated: false, .. } => Ok("IN".to_string()), + Expr::ScalarSubquery(subquery) => { + Ok(subquery.subquery.schema().field(0).name().clone()) + } Expr::GetIndexedField { expr, key } => { let expr = create_name(expr, input_schema)?; Ok(format!("{}[{}]", expr, key)) diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs index 19c311f4fa45..0a3b2ebd05fb 100644 --- a/datafusion/expr/src/expr_fn.rs +++ b/datafusion/expr/src/expr_fn.rs @@ -184,7 +184,41 @@ pub fn approx_percentile_cont_with_weight( /// Create an EXISTS subquery expression pub fn exists(subquery: Arc) -> Expr { - Expr::Exists(Subquery { subquery }) + Expr::Exists { + subquery: Subquery { subquery }, + negated: false, + } +} + +/// Create a NOT EXISTS subquery expression +pub fn not_exists(subquery: Arc) -> Expr { + Expr::Exists { + subquery: Subquery { subquery }, + negated: true, + } +} + +/// Create an IN subquery expression +pub fn in_subquery(expr: Expr, subquery: Arc) -> Expr { + Expr::InSubquery { + expr: Box::new(expr), + subquery: Subquery { subquery }, + negated: false, + } +} + +/// Create a NOT IN subquery expression +pub fn not_in_subquery(expr: Expr, subquery: Arc) -> Expr { + Expr::InSubquery { + expr: Box::new(expr), + subquery: Subquery { subquery }, + negated: true, + } +} + +/// Create a scalar subquery expression +pub fn scalar_subquery(subquery: Arc) -> Expr { + Expr::ScalarSubquery(Subquery { subquery }) } // TODO(kszucs): this seems buggy, unary_scalar_expr! is used for many diff --git a/datafusion/expr/src/expr_schema.rs b/datafusion/expr/src/expr_schema.rs index 4c6457962fd3..b932eefa0b96 100644 --- a/datafusion/expr/src/expr_schema.rs +++ b/datafusion/expr/src/expr_schema.rs @@ -100,10 +100,14 @@ impl ExprSchemable for Expr { } Expr::Not(_) | Expr::IsNull(_) - | Expr::Exists(_) + | Expr::Exists { .. } + | Expr::InSubquery { .. } | Expr::Between { .. } | Expr::InList { .. } | Expr::IsNotNull(_) => Ok(DataType::Boolean), + Expr::ScalarSubquery(subquery) => { + Ok(subquery.subquery.schema().field(0).data_type().clone()) + } Expr::BinaryExpr { ref left, ref right, @@ -173,7 +177,11 @@ impl ExprSchemable for Expr { | Expr::WindowFunction { .. } | Expr::AggregateFunction { .. } | Expr::AggregateUDF { .. } => Ok(true), - Expr::IsNull(_) | Expr::IsNotNull(_) | Expr::Exists(_) => Ok(false), + Expr::IsNull(_) | Expr::IsNotNull(_) | Expr::Exists { .. } => Ok(false), + Expr::InSubquery { expr, .. } => expr.nullable(input_schema), + Expr::ScalarSubquery(subquery) => { + Ok(subquery.subquery.schema().field(0).is_nullable()) + } Expr::BinaryExpr { ref left, ref right, diff --git a/datafusion/expr/src/lib.rs b/datafusion/expr/src/lib.rs index 3dd24600a20c..b513bf52d4a1 100644 --- a/datafusion/expr/src/lib.rs +++ b/datafusion/expr/src/lib.rs @@ -48,12 +48,13 @@ pub use expr_fn::{ abs, acos, and, approx_distinct, approx_percentile_cont, array, ascii, asin, atan, avg, bit_length, btrim, case, ceil, character_length, chr, coalesce, col, concat, concat_expr, concat_ws, concat_ws_expr, cos, count, count_distinct, date_part, - date_trunc, digest, exists, exp, floor, in_list, initcap, left, length, ln, log10, - log2, lower, lpad, ltrim, max, md5, min, now, now_expr, nullif, octet_length, or, - random, regexp_match, regexp_replace, repeat, replace, reverse, right, round, rpad, - rtrim, sha224, sha256, sha384, sha512, signum, sin, split_part, sqrt, starts_with, - strpos, substr, sum, tan, to_hex, to_timestamp_micros, to_timestamp_millis, - to_timestamp_seconds, translate, trim, trunc, upper, when, + date_trunc, digest, exists, exp, floor, in_list, in_subquery, initcap, left, length, + ln, log10, log2, lower, lpad, ltrim, max, md5, min, not_exists, not_in_subquery, now, + now_expr, nullif, octet_length, or, random, regexp_match, regexp_replace, repeat, + replace, reverse, right, round, rpad, rtrim, scalar_subquery, sha224, sha256, sha384, + sha512, signum, sin, split_part, sqrt, starts_with, strpos, substr, sum, tan, to_hex, + to_timestamp_micros, to_timestamp_millis, to_timestamp_seconds, translate, trim, + trunc, upper, when, }; pub use expr_schema::ExprSchemable; pub use function::{