diff --git a/benchmarks/src/cancellation.rs b/benchmarks/src/cancellation.rs index f5740bdc96e0..27bd2179890b 100644 --- a/benchmarks/src/cancellation.rs +++ b/benchmarks/src/cancellation.rs @@ -33,6 +33,7 @@ use datafusion::execution::TaskContext; use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::*; +use datafusion::sql::planner::PlannerContext; use datafusion_common::instant::Instant; use futures::TryStreamExt; use object_store::ObjectStore; @@ -185,7 +186,10 @@ async fn datafusion(store: Arc) -> Result<()> { .await?; println!("Creating logical plan..."); - let logical_plan = ctx.state().create_logical_plan(query).await?; + let logical_plan = ctx + .state() + .create_logical_plan(query, &mut PlannerContext::new()) + .await?; println!("Creating physical plan..."); let physical_plan = Arc::new(CoalescePartitionsExec::new( diff --git a/datafusion-cli/src/cli_context.rs b/datafusion-cli/src/cli_context.rs index 516929ebacf1..13da8716548c 100644 --- a/datafusion-cli/src/cli_context.rs +++ b/datafusion-cli/src/cli_context.rs @@ -22,7 +22,7 @@ use datafusion::{ error::DataFusionError, execution::{context::SessionState, TaskContext}, logical_expr::LogicalPlan, - prelude::SessionContext, + prelude::SessionContext, sql::planner::PlannerContext, }; use object_store::ObjectStore; @@ -51,6 +51,7 @@ pub trait CliSessionContext { async fn execute_logical_plan( &self, plan: LogicalPlan, + planner_context: &mut PlannerContext, ) -> Result; } @@ -92,6 +93,7 @@ impl CliSessionContext for SessionContext { async fn execute_logical_plan( &self, plan: LogicalPlan, + planner_context: &mut PlannerContext, ) -> Result { self.execute_logical_plan(plan).await } diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs index fa4cd9c5fd3b..cc49fd607ae5 100644 --- a/datafusion-cli/src/exec.rs +++ b/datafusion-cli/src/exec.rs @@ -26,6 +26,7 @@ use crate::{ object_storage::get_object_store, print_options::{MaxRows, PrintOptions}, }; +use datafusion::sql::planner::PlannerContext; use futures::StreamExt; use std::collections::HashMap; use std::fs::File; @@ -231,10 +232,11 @@ pub(super) async fn exec_and_print( let adjusted = AdjustedPrintOptions::new(print_options.clone()).with_statement(&statement); - let plan = create_plan(ctx, statement).await?; + let mut planner_context = PlannerContext::new(); + let plan = create_plan(ctx, statement, &mut planner_context).await?; let adjusted = adjusted.with_plan(&plan); - let df = ctx.execute_logical_plan(plan).await?; + let df = ctx.execute_logical_plan(plan, &mut planner_context).await?; let physical_plan = df.create_physical_plan().await?; // Track memory usage for the query result if it's bounded @@ -348,8 +350,12 @@ fn config_file_type_from_str(ext: &str) -> Option { async fn create_plan( ctx: &dyn CliSessionContext, statement: Statement, + planner_context: &mut PlannerContext, ) -> Result { - let mut plan = ctx.session_state().statement_to_plan(statement).await?; + let mut plan = ctx + .session_state() + .statement_to_plan(statement, planner_context) + .await?; // Note that cmd is a mutable reference so that create_external_table function can remove all // datafusion-cli specific options before passing through to datafusion. Otherwise, datafusion @@ -453,7 +459,10 @@ mod tests { async fn create_external_table_test(location: &str, sql: &str) -> Result<()> { let ctx = SessionContext::new(); - let plan = ctx.state().create_logical_plan(sql).await?; + let plan = ctx + .state() + .create_logical_plan(sql, &mut PlannerContext::new()) + .await?; if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &plan { let format = config_file_type_from_str(&cmd.file_type); @@ -479,7 +488,10 @@ mod tests { let ctx = SessionContext::new(); // AWS CONFIG register. - let plan = ctx.state().create_logical_plan(sql).await?; + let plan = ctx + .state() + .create_logical_plan(sql, &mut PlannerContext::new()) + .await?; if let LogicalPlan::Copy(cmd) = &plan { let format = config_file_type_from_str(&cmd.file_type.get_ext()); diff --git a/datafusion-cli/src/object_storage.rs b/datafusion-cli/src/object_storage.rs index c31310093ac6..642fc7716322 100644 --- a/datafusion-cli/src/object_storage.rs +++ b/datafusion-cli/src/object_storage.rs @@ -493,7 +493,10 @@ mod tests { ); let ctx = SessionContext::new(); - let mut plan = ctx.state().create_logical_plan(&sql).await?; + let mut plan = ctx + .state() + .create_logical_plan(&sql, &mut PlannerContext::new()) + .await?; if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan { ctx.register_table_options_extension_from_scheme(scheme); @@ -538,7 +541,10 @@ mod tests { ); let ctx = SessionContext::new(); - let mut plan = ctx.state().create_logical_plan(&sql).await?; + let mut plan = ctx + .state() + .create_logical_plan(&sql, &mut PlannerContext::new()) + .await?; if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan { ctx.register_table_options_extension_from_scheme(scheme); @@ -564,7 +570,10 @@ mod tests { ) LOCATION '{location}'" ); - let mut plan = ctx.state().create_logical_plan(&sql).await?; + let mut plan = ctx + .state() + .create_logical_plan(&sql, &mut PlannerContext::new()) + .await?; if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan { ctx.register_table_options_extension_from_scheme(scheme); @@ -592,7 +601,10 @@ mod tests { let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS('aws.access_key_id' '{access_key_id}', 'aws.secret_access_key' '{secret_access_key}', 'aws.oss.endpoint' '{endpoint}') LOCATION '{location}'"); let ctx = SessionContext::new(); - let mut plan = ctx.state().create_logical_plan(&sql).await?; + let mut plan = ctx + .state() + .create_logical_plan(&sql, &mut PlannerContext::new()) + .await?; if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan { ctx.register_table_options_extension_from_scheme(scheme); @@ -629,7 +641,10 @@ mod tests { let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS('gcp.service_account_path' '{service_account_path}', 'gcp.service_account_key' '{service_account_key}', 'gcp.application_credentials_path' '{application_credentials_path}') LOCATION '{location}'"); let ctx = SessionContext::new(); - let mut plan = ctx.state().create_logical_plan(&sql).await?; + let mut plan = ctx + .state() + .create_logical_plan(&sql, &mut PlannerContext::new()) + .await?; if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan { ctx.register_table_options_extension_from_scheme(scheme); diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index ecc3bd2990f4..81e37d9d7f95 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -87,6 +87,7 @@ use datafusion_session::SessionStore; use async_trait::async_trait; use chrono::{DateTime, Utc}; +use datafusion_sql::planner::PlannerContext; use object_store::ObjectStore; use parking_lot::RwLock; use url::Url; @@ -620,7 +621,10 @@ impl SessionContext { sql: &str, options: SQLOptions, ) -> Result { - let plan = self.state().create_logical_plan(sql).await?; + let plan = self + .state() + .create_logical_plan(sql, &mut PlannerContext::new()) + .await?; options.verify_plan(&plan)?; self.execute_logical_plan(plan).await diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index 8aa812cc5258..699e56127beb 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -461,6 +461,7 @@ impl SessionState { pub async fn statement_to_plan( &self, statement: Statement, + planner_context: &mut PlannerContext, ) -> datafusion_common::Result { let references = self.resolve_table_references(&statement)?; @@ -482,7 +483,7 @@ impl SessionState { } let query = SqlToRel::new_with_options(&provider, self.get_parser_options()); - query.statement_to_plan(statement) + query.statement_to_plan(statement, planner_context) } fn get_parser_options(&self) -> ParserOptions { @@ -514,10 +515,11 @@ impl SessionState { pub async fn create_logical_plan( &self, sql: &str, + planner_context: &mut PlannerContext, ) -> datafusion_common::Result { let dialect = self.config.options().sql_parser.dialect.as_str(); let statement = self.sql_to_statement(sql, dialect)?; - let plan = self.statement_to_plan(statement).await?; + let plan = self.statement_to_plan(statement, planner_context).await?; Ok(plan) } diff --git a/datafusion/core/tests/sql/sql_api.rs b/datafusion/core/tests/sql/sql_api.rs index ec086bcc50c7..fa7199d2c6ca 100644 --- a/datafusion/core/tests/sql/sql_api.rs +++ b/datafusion/core/tests/sql/sql_api.rs @@ -162,7 +162,9 @@ async fn empty_statement_returns_error() { let state = ctx.state(); // Give it an empty string which contains no statements - let plan_res = state.create_logical_plan("").await; + let plan_res = state + .create_logical_plan("", &mut PlannerContext::new()) + .await; assert_eq!( plan_res.unwrap_err().strip_backtrace(), "Error during planning: No SQL statements were provided in the query string" @@ -180,6 +182,7 @@ async fn multiple_statements_returns_error() { let plan_res = state .create_logical_plan( "INSERT INTO test (x) VALUES (1); INSERT INTO test (x) VALUES (2)", + &mut PlannerContext::new(), ) .await; assert_eq!( @@ -199,7 +202,10 @@ async fn ddl_can_not_be_planned_by_session_state() { // can not create a logical plan for catalog DDL let sql = "DROP TABLE test"; - let plan = state.create_logical_plan(sql).await.unwrap(); + let plan = state + .create_logical_plan(sql, &mut PlannerContext::new()) + .await + .unwrap(); let physical_plan = state.create_physical_plan(&plan).await; assert_eq!( physical_plan.unwrap_err().strip_backtrace(), diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs index cee356a2b42c..8441fd1f9cbc 100644 --- a/datafusion/expr/src/expr_fn.rs +++ b/datafusion/expr/src/expr_fn.rs @@ -231,12 +231,13 @@ pub fn in_list(expr: Expr, list: Vec, negated: bool) -> Expr { } /// Create an EXISTS subquery expression -pub fn exists(subquery: Arc) -> Expr { +pub fn exists(subquery: Arc, depth: usize) -> Expr { let outer_ref_columns = subquery.all_out_ref_exprs(); Expr::Exists(Exists { subquery: Subquery { subquery, outer_ref_columns, + depth, spans: Spans::new(), }, negated: false, @@ -244,12 +245,13 @@ pub fn exists(subquery: Arc) -> Expr { } /// Create a NOT EXISTS subquery expression -pub fn not_exists(subquery: Arc) -> Expr { +pub fn not_exists(subquery: Arc, depth: usize) -> Expr { let outer_ref_columns = subquery.all_out_ref_exprs(); Expr::Exists(Exists { subquery: Subquery { subquery, outer_ref_columns, + depth, spans: Spans::new(), }, negated: true, @@ -257,13 +259,14 @@ pub fn not_exists(subquery: Arc) -> Expr { } /// Create an IN subquery expression -pub fn in_subquery(expr: Expr, subquery: Arc) -> Expr { +pub fn in_subquery(expr: Expr, subquery: Arc, depth: usize) -> Expr { let outer_ref_columns = subquery.all_out_ref_exprs(); Expr::InSubquery(InSubquery::new( Box::new(expr), Subquery { subquery, outer_ref_columns, + depth, spans: Spans::new(), }, false, @@ -271,13 +274,14 @@ pub fn in_subquery(expr: Expr, subquery: Arc) -> Expr { } /// Create a NOT IN subquery expression -pub fn not_in_subquery(expr: Expr, subquery: Arc) -> Expr { +pub fn not_in_subquery(expr: Expr, subquery: Arc, depth: usize) -> Expr { let outer_ref_columns = subquery.all_out_ref_exprs(); Expr::InSubquery(InSubquery::new( Box::new(expr), Subquery { subquery, outer_ref_columns, + depth, spans: Spans::new(), }, true, @@ -285,11 +289,12 @@ pub fn not_in_subquery(expr: Expr, subquery: Arc) -> Expr { } /// Create a scalar subquery expression -pub fn scalar_subquery(subquery: Arc) -> Expr { +pub fn scalar_subquery(subquery: Arc, depth: usize) -> Expr { let outer_ref_columns = subquery.all_out_ref_exprs(); Expr::ScalarSubquery(Subquery { subquery, outer_ref_columns, + depth, spans: Spans::new(), }) } diff --git a/datafusion/expr/src/expr_schema.rs b/datafusion/expr/src/expr_schema.rs index 3786180e2cfa..c543f3e5be70 100644 --- a/datafusion/expr/src/expr_schema.rs +++ b/datafusion/expr/src/expr_schema.rs @@ -661,6 +661,7 @@ pub fn cast_subquery(subquery: Subquery, cast_to_type: &DataType) -> Result { self.assert_no_expressions(expr)?; @@ -950,6 +951,7 @@ impl LogicalPlan { Ok(LogicalPlan::Subquery(Subquery { subquery: Arc::new(subquery), outer_ref_columns: outer_ref_columns.clone(), + depth: *depth, spans: spans.clone(), })) } @@ -3833,6 +3835,7 @@ pub struct Subquery { pub subquery: Arc, /// The outer references used in the subquery pub outer_ref_columns: Vec, + pub depth: usize, /// Span information for subquery projection columns pub spans: Spans, } @@ -3869,6 +3872,7 @@ impl Subquery { Subquery { subquery: plan, outer_ref_columns: self.outer_ref_columns.clone(), + depth: self.depth, spans: Spans::new(), } } diff --git a/datafusion/expr/src/logical_plan/tree_node.rs b/datafusion/expr/src/logical_plan/tree_node.rs index 7f6e1e025387..be4194bfb478 100644 --- a/datafusion/expr/src/logical_plan/tree_node.rs +++ b/datafusion/expr/src/logical_plan/tree_node.rs @@ -159,11 +159,13 @@ impl TreeNode for LogicalPlan { LogicalPlan::Subquery(Subquery { subquery, outer_ref_columns, + depth, spans, }) => subquery.map_elements(f)?.update_data(|subquery| { LogicalPlan::Subquery(Subquery { subquery, outer_ref_columns, + depth, spans, }) }), diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs b/datafusion/optimizer/src/analyzer/type_coercion.rs index 860e041bb423..a9f187eef396 100644 --- a/datafusion/optimizer/src/analyzer/type_coercion.rs +++ b/datafusion/optimizer/src/analyzer/type_coercion.rs @@ -315,6 +315,7 @@ impl TreeNodeRewriter for TypeCoercionRewriter<'_> { Expr::ScalarSubquery(Subquery { subquery, outer_ref_columns, + depth, spans, }) => { let new_plan = @@ -322,6 +323,7 @@ impl TreeNodeRewriter for TypeCoercionRewriter<'_> { Ok(Transformed::yes(Expr::ScalarSubquery(Subquery { subquery: Arc::new(new_plan), outer_ref_columns, + depth, spans, }))) } @@ -335,6 +337,7 @@ impl TreeNodeRewriter for TypeCoercionRewriter<'_> { subquery: Subquery { subquery: Arc::new(new_plan), outer_ref_columns: subquery.outer_ref_columns, + depth: subquery.depth, spans: subquery.spans, }, negated, @@ -359,6 +362,7 @@ impl TreeNodeRewriter for TypeCoercionRewriter<'_> { let new_subquery = Subquery { subquery: Arc::new(new_plan), outer_ref_columns: subquery.outer_ref_columns, + depth: subquery.depth, spans: subquery.spans, }; Ok(Transformed::yes(Expr::InSubquery(InSubquery::new( diff --git a/datafusion/optimizer/src/decorrelate_predicate_subquery.rs b/datafusion/optimizer/src/decorrelate_predicate_subquery.rs index a72657bf689d..c000f0d52256 100644 --- a/datafusion/optimizer/src/decorrelate_predicate_subquery.rs +++ b/datafusion/optimizer/src/decorrelate_predicate_subquery.rs @@ -134,19 +134,23 @@ fn rewrite_inner_subqueries( let alias = config.alias_generator(); let expr_without_subqueries = expr.transform(|e| match e { Expr::Exists(Exists { - subquery: Subquery { subquery, .. }, + subquery: Subquery { + subquery, depth, .. + }, negated, }) => match mark_join(&cur_input, Arc::clone(&subquery), None, negated, alias)? { Some((plan, exists_expr)) => { cur_input = plan; Ok(Transformed::yes(exists_expr)) } - None if negated => Ok(Transformed::no(not_exists(subquery))), - None => Ok(Transformed::no(exists(subquery))), + None if negated => Ok(Transformed::no(not_exists(subquery, depth))), + None => Ok(Transformed::no(exists(subquery, depth))), }, Expr::InSubquery(InSubquery { expr, - subquery: Subquery { subquery, .. }, + subquery: Subquery { + subquery, depth, .. + }, negated, }) => { let in_predicate = subquery @@ -165,8 +169,10 @@ fn rewrite_inner_subqueries( cur_input = plan; Ok(Transformed::yes(exists_expr)) } - None if negated => Ok(Transformed::no(not_in_subquery(*expr, subquery))), - None => Ok(Transformed::no(in_subquery(*expr, subquery))), + None if negated => { + Ok(Transformed::no(not_in_subquery(*expr, subquery, depth))) + } + None => Ok(Transformed::no(in_subquery(*expr, subquery, depth))), } } _ => Ok(Transformed::no(e)), @@ -409,12 +415,12 @@ impl SubqueryInfo { pub fn expr(self) -> Expr { match self.where_in_expr { Some(expr) => match self.negated { - true => not_in_subquery(expr, self.query.subquery), - false => in_subquery(expr, self.query.subquery), + true => not_in_subquery(expr, self.query.subquery, self.query.depth), + false => in_subquery(expr, self.query.subquery, self.query.depth), }, None => match self.negated { - true => not_exists(self.query.subquery), - false => exists(self.query.subquery), + true => not_exists(self.query.subquery, self.query.depth), + false => exists(self.query.subquery, self.query.depth), }, } } diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs index eee1bc23c9ad..ed73dea2206e 100644 --- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs @@ -288,7 +288,10 @@ async fn roundtrip_custom_listing_tables() -> Result<()> { LOCATION '../core/tests/data/window_2.csv' OPTIONS ('format.has_header' 'true')"; - let plan = ctx.state().create_logical_plan(query).await?; + let plan = ctx + .state() + .create_logical_plan(query, &mut PlannerContext::new()) + .await?; let bytes = logical_plan_to_bytes(&plan)?; let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?; diff --git a/datafusion/sql/src/expr/identifier.rs b/datafusion/sql/src/expr/identifier.rs index 7c276ce53e35..2544d4e3c99e 100644 --- a/datafusion/sql/src/expr/identifier.rs +++ b/datafusion/sql/src/expr/identifier.rs @@ -165,48 +165,49 @@ impl SqlToRel<'_, S> { not_impl_err!("compound identifier: {ids:?}") } else { // Check the outer_query_schema and try to find a match - if let Some(outer) = planner_context.outer_query_schema() { - let search_result = search_dfschema(&ids, outer); - match search_result { - // Found matching field with spare identifier(s) for nested field(s) in structure - Some((field, qualifier, nested_names)) - if !nested_names.is_empty() => - { - // TODO: remove when can support nested identifiers for OuterReferenceColumn - not_impl_err!( - "Nested identifiers are not yet supported for OuterReferenceColumn {}", - Column::from((qualifier, field)).quoted_flat_name() - ) + // TODO: Put the depth somewhere to record it like (OuterReferenceColumn) + for (_depth, schema) in + planner_context.iter_outer_query_schemas_rev() + { + if let Some(outer) = schema { + let search_result = search_dfschema(&ids, outer); + match search_result { + // Found matching field with spare identifier(s) for nested field(s) in structure + Some((field, qualifier, nested_names)) + if !nested_names.is_empty() => + { + // TODO: remove when can support nested identifiers for OuterReferenceColumn + return not_impl_err!("Nested identifiers are not yet supported for OuterReferenceColumn {}", Column::from((qualifier, field)).quoted_flat_name()); + } + // Found matching field with no spare identifier(s) + Some((field, qualifier, _nested_names)) => { + // Found an exact match on a qualified name in the outer plan schema, so this is an outer reference column + return Ok(Expr::OuterReferenceColumn( + field.data_type().clone(), + Column::from((qualifier, field)), + )); + } + // Found no matching field, will return a default + None => continue, } - // Found matching field with no spare identifier(s) - Some((field, qualifier, _nested_names)) => { - // Found an exact match on a qualified name in the outer plan schema, so this is an outer reference column - Ok(Expr::OuterReferenceColumn( - field.data_type().clone(), - Column::from((qualifier, field)), - )) - } - // Found no matching field, will return a default - None => { - let s = &ids[0..ids.len()]; - // safe unwrap as s can never be empty or exceed the bounds - let (relation, column_name) = - form_identifier(s).unwrap(); - Ok(Expr::Column(Column::new(relation, column_name))) - } - } - } else { - let s = &ids[0..ids.len()]; - // Safe unwrap as s can never be empty or exceed the bounds - let (relation, column_name) = form_identifier(s).unwrap(); - let mut column = Column::new(relation, column_name); - if self.options.collect_spans { - if let Some(span) = ids_span { - column.spans_mut().add_span(span); + } else { + // Only depth=0 outer_query_schema can reach here. + let s = &ids[0..ids.len()]; + // Safe unwrap as s can never be empty or exceed the bounds + let (relation, column_name) = form_identifier(s).unwrap(); + let mut column = Column::new(relation, column_name); + if self.options.collect_spans { + if let Some(span) = ids_span { + column.spans_mut().add_span(span); + } } + return Ok(Expr::Column(column)); } - Ok(Expr::Column(column)) } + let s = &ids[0..ids.len()]; + // safe unwrap as s can never be empty or exceed the bounds + let (relation, column_name) = form_identifier(s).unwrap(); + return Ok(Expr::Column(Column::new(relation, column_name))); } } } diff --git a/datafusion/sql/src/expr/subquery.rs b/datafusion/sql/src/expr/subquery.rs index 602d39233d58..a07767fc7ad5 100644 --- a/datafusion/sql/src/expr/subquery.rs +++ b/datafusion/sql/src/expr/subquery.rs @@ -31,15 +31,21 @@ impl SqlToRel<'_, S> { input_schema: &DFSchema, planner_context: &mut PlannerContext, ) -> Result { - let old_outer_query_schema = - planner_context.set_outer_query_schema(Some(input_schema.clone().into())); + // TODO + planner_context.push_outer_query_schema(Some(input_schema.clone().into())); + planner_context.increase_depth(); + let depth = planner_context.cur_depth(); + let sub_plan = self.query_to_plan(subquery, planner_context)?; let outer_ref_columns = sub_plan.all_out_ref_exprs(); - planner_context.set_outer_query_schema(old_outer_query_schema); + + planner_context.decrease_depth(); + Ok(Expr::Exists(Exists { subquery: Subquery { subquery: Arc::new(sub_plan), outer_ref_columns, + depth, spans: Spans::new(), }, negated, @@ -54,8 +60,10 @@ impl SqlToRel<'_, S> { input_schema: &DFSchema, planner_context: &mut PlannerContext, ) -> Result { - let old_outer_query_schema = - planner_context.set_outer_query_schema(Some(input_schema.clone().into())); + // TODO + planner_context.push_outer_query_schema(Some(input_schema.clone().into())); + planner_context.increase_depth(); + let depth = planner_context.cur_depth(); let mut spans = Spans::new(); if let SetExpr::Select(select) = subquery.body.as_ref() { @@ -70,7 +78,6 @@ impl SqlToRel<'_, S> { let sub_plan = self.query_to_plan(subquery, planner_context)?; let outer_ref_columns = sub_plan.all_out_ref_exprs(); - planner_context.set_outer_query_schema(old_outer_query_schema); self.validate_single_column( &sub_plan, @@ -81,11 +88,14 @@ impl SqlToRel<'_, S> { let expr_obj = self.sql_to_expr(expr, input_schema, planner_context)?; + planner_context.decrease_depth(); + Ok(Expr::InSubquery(InSubquery::new( Box::new(expr_obj), Subquery { subquery: Arc::new(sub_plan), outer_ref_columns, + depth, spans, }, negated, @@ -98,8 +108,10 @@ impl SqlToRel<'_, S> { input_schema: &DFSchema, planner_context: &mut PlannerContext, ) -> Result { - let old_outer_query_schema = - planner_context.set_outer_query_schema(Some(input_schema.clone().into())); + planner_context.push_outer_query_schema(Some(input_schema.clone().into())); + planner_context.increase_depth(); + let depth = planner_context.cur_depth(); + let mut spans = Spans::new(); if let SetExpr::Select(select) = subquery.body.as_ref() { for item in &select.projection { @@ -112,7 +124,7 @@ impl SqlToRel<'_, S> { } let sub_plan = self.query_to_plan(subquery, planner_context)?; let outer_ref_columns = sub_plan.all_out_ref_exprs(); - planner_context.set_outer_query_schema(old_outer_query_schema); + dbg!(&outer_ref_columns); self.validate_single_column( &sub_plan, @@ -121,9 +133,12 @@ impl SqlToRel<'_, S> { "Select only one column in the subquery", )?; + planner_context.decrease_depth(); + Ok(Expr::ScalarSubquery(Subquery { subquery: Arc::new(sub_plan), outer_ref_columns, + depth, spans, })) } diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index 3325c98aa74b..0036ba1ce097 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -198,13 +198,35 @@ pub struct PlannerContext { /// Map of CTE name to logical plan of the WITH clause. /// Use `Arc` to allow cheap cloning ctes: HashMap>, - /// The query schema of the outer query plan, used to resolve the columns in subquery - outer_query_schema: Option, /// The joined schemas of all FROM clauses planned so far. When planning LATERAL /// FROM clauses, this should become a suffix of the `outer_query_schema`. outer_from_schema: Option, /// The query schema defined by the table create_table_schema: Option, + + // TODO: take outer_from_schema and create_table_schema into consideration. + /// All levels query schema of the outer query plan, used to resolve the columns in subquery. + /// Use `depth` to index different level outer_query_schema. + /// For example: + /// SELECT name <---------------------------------------- depth = 0 + /// FROM employees e + /// WHERE salary > ( + /// SELECT AVG(salary) <----------------------------- depth = 1 + /// FROM employees e2 + /// WHERE e2.department_id = e.department_id + /// AND e.department_id IN ( + /// SELECT department_id <--------------------- depth = 2 + /// FROM employees + /// GROUP BY department_id + /// HAVING AVG(salary) > ( + /// SELECT AVG(salary) + /// FROM employees + /// ) + /// ) + /// ); + outer_query_schemas: Vec>, + /// Current depth of query, starting from 0. + cur_depth: usize, } impl Default for PlannerContext { @@ -219,9 +241,10 @@ impl PlannerContext { Self { prepare_param_data_types: Arc::new(vec![]), ctes: HashMap::new(), - outer_query_schema: None, outer_from_schema: None, create_table_schema: None, + outer_query_schemas: vec![None], // depth 0 has no outer query schema + cur_depth: 0, } } @@ -234,19 +257,46 @@ impl PlannerContext { self } + // TODO: replace all places with outer_query_schemas() // Return a reference to the outer query's schema pub fn outer_query_schema(&self) -> Option<&DFSchema> { - self.outer_query_schema.as_ref().map(|s| s.as_ref()) + self.outer_query_schemas[self.cur_depth] + .as_ref() + .map(|s| s.as_ref()) + } + + pub fn outer_query_schemas(&self) -> &[Option] { + &self.outer_query_schemas + } + + /// Returns an iterator over the outer query schemas from back to front, + /// along with their indices. + pub fn iter_outer_query_schemas_rev( + &self, + ) -> impl Iterator)> { + self.outer_query_schemas + .iter() + .enumerate() + .rev() + .map(|(i, schema_ref)| (i, schema_ref.as_ref().map(|s| s.as_ref()))) } /// Sets the outer query schema, returning the existing one, if /// any - pub fn set_outer_query_schema( - &mut self, - mut schema: Option, - ) -> Option { - std::mem::swap(&mut self.outer_query_schema, &mut schema); - schema + pub fn push_outer_query_schema(&mut self, schema: Option) { + self.outer_query_schemas.push(schema); + } + + pub fn increase_depth(&mut self) { + self.cur_depth += 1; + } + + pub fn decrease_depth(&mut self) { + self.cur_depth -= 1; + } + + pub fn cur_depth(&self) -> usize { + self.cur_depth } pub fn set_table_schema( diff --git a/datafusion/sql/src/relation/mod.rs b/datafusion/sql/src/relation/mod.rs index dee855f8c000..f01d30dc26ab 100644 --- a/datafusion/sql/src/relation/mod.rs +++ b/datafusion/sql/src/relation/mod.rs @@ -184,7 +184,8 @@ impl SqlToRel<'_, S> { let old_from_schema = planner_context .set_outer_from_schema(None) .unwrap_or_else(|| Arc::new(DFSchema::empty())); - let new_query_schema = match planner_context.outer_query_schema() { + // TODO + let _new_query_schema = match planner_context.outer_query_schema() { Some(old_query_schema) => { let mut new_query_schema = old_from_schema.as_ref().clone(); new_query_schema.merge(old_query_schema); @@ -192,12 +193,12 @@ impl SqlToRel<'_, S> { } None => Some(Arc::clone(&old_from_schema)), }; - let old_query_schema = planner_context.set_outer_query_schema(new_query_schema); + // let old_query_schema = planner_context.set_outer_query_schema(new_query_schema); let plan = self.create_relation(subquery, planner_context)?; let outer_ref_columns = plan.all_out_ref_exprs(); - planner_context.set_outer_query_schema(old_query_schema); + // planner_context.set_outer_query_schema(old_query_schema); planner_context.set_outer_from_schema(Some(old_from_schema)); // We can omit the subquery wrapper if there are no columns @@ -206,12 +207,14 @@ impl SqlToRel<'_, S> { return Ok(plan); } + // TODO: handle depth match plan { LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => { subquery_alias( LogicalPlan::Subquery(Subquery { subquery: input, outer_ref_columns, + depth: 1, spans: Spans::new(), }), alias, @@ -220,6 +223,7 @@ impl SqlToRel<'_, S> { plan => Ok(LogicalPlan::Subquery(Subquery { subquery: Arc::new(plan), outer_ref_columns, + depth: 1, spans: Spans::new(), })), } diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index 1f1c235fee6f..73e0275e7472 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -177,26 +177,37 @@ fn calc_inline_constraints_from_columns(columns: &[ColumnDef]) -> Vec SqlToRel<'_, S> { /// Generate a logical plan from an DataFusion SQL statement - pub fn statement_to_plan(&self, statement: DFStatement) -> Result { + pub fn statement_to_plan( + &self, + statement: DFStatement, + planner_context: &mut PlannerContext, + ) -> Result { match statement { DFStatement::CreateExternalTable(s) => self.external_table_to_plan(s), - DFStatement::Statement(s) => self.sql_statement_to_plan(*s), + DFStatement::Statement(s) => self.sql_statement_to_plan(*s, planner_context), DFStatement::CopyTo(s) => self.copy_to_plan(s), DFStatement::Explain(ExplainStatement { verbose, analyze, format, statement, - }) => self.explain_to_plan(verbose, analyze, format, *statement), + }) => self.explain_to_plan( + verbose, + analyze, + format, + *statement, + planner_context, + ), } } /// Generate a logical plan from an SQL statement - pub fn sql_statement_to_plan(&self, statement: Statement) -> Result { - self.sql_statement_to_plan_with_context_impl( - statement, - &mut PlannerContext::new(), - ) + pub fn sql_statement_to_plan( + &self, + statement: Statement, + planner_context: &mut PlannerContext, + ) -> Result { + self.sql_statement_to_plan_with_context_impl(statement, planner_context) } /// Generate a logical plan from an SQL statement @@ -229,7 +240,7 @@ impl SqlToRel<'_, S> { } => { let format = format.map(|format| format.to_string()); let statement = DFStatement::Statement(statement); - self.explain_to_plan(verbose, analyze, format, statement) + self.explain_to_plan(verbose, analyze, format, statement, planner_context) } Statement::Query(query) => self.query_to_plan(*query, planner_context), Statement::ShowVariable { variable } => self.show_variable_to_plan(&variable), @@ -1295,7 +1306,10 @@ impl SqlToRel<'_, S> { let query = "SELECT * FROM information_schema.tables;"; let mut rewrite = DFParser::parse_sql(query)?; assert_eq!(rewrite.len(), 1); - self.statement_to_plan(rewrite.pop_front().unwrap()) // length of rewrite is 1 + self.statement_to_plan( + rewrite.pop_front().unwrap(), + &mut PlannerContext::new(), + ) // length of rewrite is 1 } else { plan_err!("SHOW TABLES is not supported unless information_schema is enabled") } @@ -1629,8 +1643,9 @@ impl SqlToRel<'_, S> { analyze: bool, format: Option, statement: DFStatement, + planner_context: &mut PlannerContext, ) -> Result { - let plan = self.statement_to_plan(statement)?; + let plan = self.statement_to_plan(statement, planner_context)?; if matches!(plan, LogicalPlan::Explain(_)) { return plan_err!("Nested EXPLAINs are not supported"); } @@ -1723,7 +1738,7 @@ impl SqlToRel<'_, S> { let mut rewrite = DFParser::parse_sql(&query)?; assert_eq!(rewrite.len(), 1); - self.statement_to_plan(rewrite.pop_front().unwrap()) + self.statement_to_plan(rewrite.pop_front().unwrap(), &mut PlannerContext::new()) } fn set_variable_to_plan( @@ -2109,7 +2124,8 @@ impl SqlToRel<'_, S> { let mut rewrite = DFParser::parse_sql(&query)?; assert_eq!(rewrite.len(), 1); - self.statement_to_plan(rewrite.pop_front().unwrap()) // length of rewrite is 1 + self.statement_to_plan(rewrite.pop_front().unwrap(), &mut PlannerContext::new()) + // length of rewrite is 1 } /// Rewrite `SHOW FUNCTIONS` to another SQL query @@ -2193,7 +2209,8 @@ ON p.function_name = r.routine_name ); let mut rewrite = DFParser::parse_sql(&query)?; assert_eq!(rewrite.len(), 1); - self.statement_to_plan(rewrite.pop_front().unwrap()) // length of rewrite is 1 + self.statement_to_plan(rewrite.pop_front().unwrap(), &mut PlannerContext::new()) + // length of rewrite is 1 } fn show_create_table_to_plan( @@ -2221,7 +2238,8 @@ ON p.function_name = r.routine_name let mut rewrite = DFParser::parse_sql(&query)?; assert_eq!(rewrite.len(), 1); - self.statement_to_plan(rewrite.pop_front().unwrap()) // length of rewrite is 1 + self.statement_to_plan(rewrite.pop_front().unwrap(), &mut PlannerContext::new()) + // length of rewrite is 1 } /// Return true if there is a table provider available for "schema.table" diff --git a/datafusion/substrait/src/logical_plan/consumer/expr/subquery.rs b/datafusion/substrait/src/logical_plan/consumer/expr/subquery.rs index f7e4c2bb0fbd..49977a1d38c3 100644 --- a/datafusion/substrait/src/logical_plan/consumer/expr/subquery.rs +++ b/datafusion/substrait/src/logical_plan/consumer/expr/subquery.rs @@ -49,6 +49,7 @@ pub async fn from_subquery( subquery: Subquery { subquery: Arc::new(haystack_expr), outer_ref_columns: outer_refs, + depth: 1, spans: Spans::new(), }, negated: false, @@ -68,6 +69,7 @@ pub async fn from_subquery( Ok(Expr::ScalarSubquery(Subquery { subquery: Arc::new(plan), outer_ref_columns, + depth: 1, spans: Spans::new(), })) } @@ -84,6 +86,7 @@ pub async fn from_subquery( Subquery { subquery: Arc::new(plan), outer_ref_columns, + depth: 1, spans: Spans::new(), }, false,