diff --git a/src/adapter/src/coord/sequencer/inner.rs b/src/adapter/src/coord/sequencer/inner.rs index ed23c547b8dd..768b1d620021 100644 --- a/src/adapter/src/coord/sequencer/inner.rs +++ b/src/adapter/src/coord/sequencer/inner.rs @@ -20,7 +20,6 @@ use futures::future::BoxFuture; use itertools::Itertools; use maplit::{btreemap, btreeset}; use mz_cloud_resources::VpcEndpointConfig; -use mz_compute_types::dataflows::DataflowDesc; use mz_controller_types::{ClusterId, ReplicaId}; use mz_expr::{ permutation_for_arrangement, CollectionPlan, MirScalarExpr, OptimizedMirRelationExpr, @@ -88,7 +87,7 @@ use crate::catalog::{ }; use crate::command::{ExecuteResponse, Response}; use crate::coord::appends::{Deferred, DeferredPlan, PendingWriteTxn}; -use crate::coord::dataflows::{prep_relation_expr, prep_scalar_expr, EvalTime, ExprPrepStyle}; +use crate::coord::dataflows::{prep_scalar_expr, EvalTime, ExprPrepStyle}; use crate::coord::id_bundle::CollectionIdBundle; use crate::coord::peek::{FastPathPlan, PeekDataflowPlan, PeekPlan, PlannedPeek}; use crate::coord::read_policy::SINCE_GRANULARITY; @@ -3149,6 +3148,7 @@ impl Coordinator { plan::ExplaineeStatement::Query { raw_plan, row_set_finishing, + desc, broken, } => { if enable_unified_optimizer_api { @@ -3159,7 +3159,8 @@ impl Coordinator { broken, target_cluster, ctx.session_mut(), - &row_set_finishing, + row_set_finishing, + desc, &config, root_dispatch, ) @@ -3176,7 +3177,7 @@ impl Coordinator { broken, target_cluster, ctx.session_mut(), - &row_set_finishing, + &Some(row_set_finishing), &config, root_dispatch, ) @@ -3333,7 +3334,8 @@ impl Coordinator { broken: bool, target_cluster: TargetCluster, session: &mut Session, - finishing: &Option, + finishing: RowSetFinishing, + desc: RelationDesc, explain_config: &mz_repr::explain::ExplainConfig, root_dispatch: tracing::Dispatch, ) -> Result< @@ -3351,11 +3353,42 @@ impl Coordinator { tracing::warn!("EXPLAIN ... BROKEN is known to leak memory, use with caution"); } - let catalog = self.catalog(); + // Initialize optimizer context + // ---------------------------- + + // Collect optimizer parameters. + let catalog = self.owned_catalog(); let target_cluster_id = catalog.resolve_target_cluster(target_cluster, session)?.id; + let compute_instance = self + .instance_snapshot(target_cluster_id) + .expect("compute instance does not exist"); + let select_id = self.allocate_transient_id()?; + let index_id = self.allocate_transient_id()?; let system_config = catalog.system_config(); let optimizer_config = OptimizerConfig::from((system_config, explain_config)); + // Build an optimizer for this SELECT. + let mut optimizer = optimize::OptimizePeek::new( + Arc::clone(&catalog), + compute_instance, + finishing, + select_id, + index_id, + optimizer_config.clone(), + ); + + // Create a transient catalog item + // ------------------------------- + + let mut transient_items = BTreeMap::new(); + transient_items.insert(select_id, { + TransientItem::new( + Some(GlobalId::Explain.to_string()), + Some(GlobalId::Explain.to_string()), + Some(desc.iter_names().map(|c| c.to_string()).collect()), + ) + }); + // Execute the various stages of the optimization pipeline // ------------------------------------------------------- @@ -3364,148 +3397,112 @@ impl Coordinator { trace_plan(&raw_plan); }); - // Execute the `optimize/hir_to_mir` stage. - let decorrelated_plan = catch_unwind(broken, "hir_to_mir", || { - raw_plan.lower((system_config, explain_config)) - })?; - - let mut timeline_context = - self.validate_timeline_context(decorrelated_plan.depends_on())?; - if matches!(timeline_context, TimelineContext::TimestampIndependent) - && decorrelated_plan.contains_temporal() - { - // If the source IDs are timestamp independent but the query contains temporal functions, - // then the timeline context needs to be upgraded to timestamp dependent. This is - // required because `source_ids` doesn't contain functions. - timeline_context = TimelineContext::TimestampDependent; - } - - let source_ids = decorrelated_plan.depends_on(); - let id_bundle = self - .index_oracle(target_cluster_id) - .sufficient_collections(&source_ids); + // MIR ⇒ MIR optimization (local) + let local_mir_plan = catch_unwind(broken, "optimize", || optimizer.optimize(raw_plan))?; - // Execute the `optimize/local` stage. - let optimized_plan = catch_unwind(broken, "local", || { - let _span = tracing::span!(target: "optimizer", Level::TRACE, "local").entered(); - let optimized_plan = self.view_optimizer.optimize(decorrelated_plan)?; - trace_plan(optimized_plan.as_inner()); - Ok::<_, AdapterError>(optimized_plan) - })?; + // Resolve timestamp statistics catalog + // ------------------------------------ - // Acquire a timestamp (necessary for loading statistics). - let timestamp_ctx = self - .sequence_peek_timestamp( - session, - &QueryWhen::Immediately, - target_cluster_id, - timeline_context, - &id_bundle, - &source_ids, - None, // no real-time recency - ) - .await? - .timestamp_context; + let (timestamp_ctx, stats) = { + let source_ids = local_mir_plan.expr().depends_on(); + let mut timeline_context = + self.validate_timeline_context(source_ids.iter().cloned())?; + if matches!(timeline_context, TimelineContext::TimestampIndependent) + && local_mir_plan.expr().contains_temporal() + { + // If the source IDs are timestamp independent but the query contains temporal functions, + // then the timeline context needs to be upgraded to timestamp dependent. This is + // required because `source_ids` doesn't contain functions. + timeline_context = TimelineContext::TimestampDependent; + } - // Load cardinality statistics. - // - // TODO: proper stats needs exact timestamp at the moment. However, we - // don't want to resolve the timestamp twice, so we need to figure out a - // way to get somewhat stale stats. - let stats = self - .statistics_oracle(session, &source_ids, timestamp_ctx.antichain(), true) - .with_subscriber(root_dispatch) - .await?; + let id_bundle = self + .index_oracle(target_cluster_id) + .sufficient_collections(&source_ids); - let state = self.catalog().state(); + // Acquire a timestamp (necessary for loading statistics). + let timestamp_ctx = self + .sequence_peek_timestamp( + session, + &QueryWhen::Immediately, + target_cluster_id, + timeline_context, + &id_bundle, + &source_ids, + None, // no real-time recency + ) + .await? + .timestamp_context; - // Execute the `optimize/global` stage. - let (mut df, df_metainfo) = catch_unwind(broken, "global", || { - let mut df_builder = self.dataflow_builder(target_cluster_id); + // Load cardinality statistics. + // + // TODO: proper stats needs exact timestamp at the moment. However, we + // don't want to resolve the timestamp twice, so we need to figure out a + // way to get somewhat stale stats. + let stats = self + .statistics_oracle(session, &source_ids, timestamp_ctx.antichain(), true) + .with_subscriber(root_dispatch) + .await?; + + (timestamp_ctx, stats) + }; - let mut df = DataflowDesc::new("explanation".to_string()); - df_builder.import_view_into_dataflow(&GlobalId::Explain, &optimized_plan, &mut df)?; - df_builder.reoptimize_imported_views(&mut df, &optimizer_config)?; + let (used_indexes, fast_path_plan, df_meta) = catch_unwind(broken, "optimize", || { + // MIR ⇒ MIR optimization (global) + let local_mir_plan = local_mir_plan.resolve(session, stats); + let global_mir_plan = optimizer.optimize(local_mir_plan)?; - // Resolve all unmaterializable function calls except mz_now(), - // because in line with the `sequence_~` method we pretend that we - // don't have a timestamp yet. - let style = ExprPrepStyle::OneShot { - logical_time: EvalTime::Deferred, - session, + // Collect the list of indexes used by the dataflow at this point + let mut used_indexes = { + let df_desc = global_mir_plan.df_desc(); + let df_meta = global_mir_plan.df_meta(); + UsedIndexes::new( + df_desc + .index_imports + .iter() + .map(|(id, _index_import)| { + (*id, df_meta.index_usage_types.get(id).expect("prune_and_annotate_dataflow_index_imports should have been called already").clone()) + }) + .collect(), + ) }; - df.visit_children( - |r| prep_relation_expr(state, r, style), - |s| prep_scalar_expr(state, s, style), - )?; - - // Optimize the dataflow across views, and any other ways that appeal. - let df_metainfo = mz_transform::optimize_dataflow( - &mut df, - &self.index_oracle(target_cluster_id), - stats.as_ref(), - )?; - - Ok::<_, AdapterError>((df, df_metainfo)) - })?; - - // Collect the list of indexes used by the dataflow at this point - let mut used_indexes = UsedIndexes::new( - df - .index_imports - .iter() - .map(|(id, _index_import)| { - (*id, df_metainfo.index_usage_types.get(id).expect("prune_and_annotate_dataflow_index_imports should have been called already").clone()) - }) - .collect(), - ); - // Determine if fast path plan will be used for this explainee. - let fast_path_plan = { - df.set_as_of(timestamp_ctx.antichain()); + // MIR ⇒ LIR lowering and LIR ⇒ LIR optimization (global) + let global_mir_plan = global_mir_plan.resolve(timestamp_ctx, session); + let global_lir_plan = optimizer.optimize(global_mir_plan)?; - // Resolve all unmaterializable function including mz_now(). - let style = ExprPrepStyle::OneShot { - logical_time: EvalTime::Time(timestamp_ctx.timestamp_or_default()), - session, - }; - df.visit_children( - |r| prep_relation_expr(state, r, style), - |s| prep_scalar_expr(state, s, style), - )?; - peek::create_fast_path_plan( - &mut df, - GlobalId::Explain, - finishing.as_ref(), - self.catalog.system_config().persist_fast_path_limit(), - )? - }; + let (fast_path_plan, df_meta) = match global_lir_plan { + optimize::peek::GlobalLirPlan::FastPath { plan, typ, df_meta } => { + let finishing = if !optimizer.finishing.is_trivial(typ.arity()) { + Some(optimizer.finishing.clone()) + } else { + None + }; + used_indexes = plan.used_indexes(&finishing); - if let Some(fast_path_plan) = &fast_path_plan { - used_indexes = fast_path_plan.used_indexes(finishing); - } + // TODO(aalexandrov): rework `OptimizerTrace` with support + // for diverging plan types towards the end and add a + // `PlanTrace` for the `FastPathPlan` type. + trace_plan(&"fast_path_plan (missing)".to_string()); - // We have the opportunity to name an `until` frontier that will prevent work we needn't perform. - // By default, `until` will be `Antichain::new()`, which prevents no updates and is safe. - if let Some(as_of) = df.as_of.as_ref() { - if !as_of.is_empty() { - if let Some(next) = as_of.as_option().and_then(|as_of| as_of.checked_add(1)) { - df.until = timely::progress::Antichain::from_elem(next); + (Some(plan), df_meta) } - } - } + optimize::peek::GlobalLirPlan::SlowPath { + df_desc, + df_meta, + typ: _, + } => { + trace_plan(&df_desc); + (None, df_meta) + } + }; - // Execute the `optimize/finalize_dataflow` stage. - let df = catch_unwind(broken, "finalize_dataflow", || { - self.finalize_dataflow(df, target_cluster_id) + Ok::<_, AdapterError>((used_indexes, fast_path_plan, df_meta)) })?; - // Trace the resulting plan for the top-level `optimize` path. - trace_plan(&df); - // Return objects that need to be passed to the `ExplainContext` // when rendering explanations for the various trace entries. - Ok((used_indexes, fast_path_plan, df_metainfo, BTreeMap::new())) + Ok((used_indexes, fast_path_plan, df_meta, transient_items)) } /// Run the MV optimization explanation pipeline. This function must be called with diff --git a/src/sql/src/plan.rs b/src/sql/src/plan.rs index 0f114aa55916..a774c6bdce57 100644 --- a/src/sql/src/plan.rs +++ b/src/sql/src/plan.rs @@ -847,7 +847,8 @@ pub enum ExplaineeStatement { /// The object to be explained is a SELECT statement. Query { raw_plan: HirRelationExpr, - row_set_finishing: Option, + row_set_finishing: RowSetFinishing, + desc: RelationDesc, /// Broken flag (see [`ExplaineeStatement::broken()`]). broken: bool, }, @@ -900,10 +901,16 @@ impl ExplaineeStatement { pub fn row_set_finishing(&self) -> Option { match self { Self::Query { - row_set_finishing, .. + row_set_finishing, + desc, + .. } => { - // Use the optional finishing extracted in the plan_query call. - row_set_finishing.clone() + if !row_set_finishing.is_trivial(desc.arity()) { + // Use the optional finishing extracted in the plan_query call. + Some(row_set_finishing.clone()) + } else { + None + } } Self::CreateMaterializedView { .. } => { // Trivial finishing asserted in plan_create_materialized_view. diff --git a/src/sql/src/plan/statement/dml.rs b/src/sql/src/plan/statement/dml.rs index b9ed8a17d156..83070b329f62 100644 --- a/src/sql/src/plan/statement/dml.rs +++ b/src/sql/src/plan/statement/dml.rs @@ -334,17 +334,11 @@ pub fn plan_explain_plan( let query::PlannedQuery { expr: mut raw_plan, desc, - finishing, + finishing: row_set_finishing, scope: _, } = query::plan_root_query(scx, *query, QueryLifetime::OneShot)?; raw_plan.bind_parameters(params)?; - let row_set_finishing = if finishing.is_trivial(desc.arity()) { - None - } else { - Some(finishing) - }; - if broken { scx.require_feature_flag(&vars::ENABLE_EXPLAIN_BROKEN)?; } @@ -352,6 +346,7 @@ pub fn plan_explain_plan( crate::plan::Explainee::Statement(ExplaineeStatement::Query { raw_plan, row_set_finishing, + desc, broken, }) }