Skip to content

Commit

Permalink
adapter: use the new Optimize API for sequence_explain_plan
Browse files Browse the repository at this point in the history
Introduce `OptimizePeek` in the `sequence_explain_plan` optimization
path for `Query` explainees in the `Coordinator`.

See the rollout proposal in MaterializeInc#20569 for details.
  • Loading branch information
aalexandrov committed Nov 7, 2023
1 parent 1f1e2c1 commit 04b1717
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 141 deletions.
257 changes: 127 additions & 130 deletions src/adapter/src/coord/sequencer/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use futures::future::BoxFuture;
use itertools::Itertools;
use maplit::{btreemap, btreeset};
use mz_cloud_resources::VpcEndpointConfig;
use mz_compute_types::dataflows::DataflowDesc;
use mz_controller_types::{ClusterId, ReplicaId};
use mz_expr::{
permutation_for_arrangement, CollectionPlan, MirScalarExpr, OptimizedMirRelationExpr,
Expand Down Expand Up @@ -88,7 +87,7 @@ use crate::catalog::{
};
use crate::command::{ExecuteResponse, Response};
use crate::coord::appends::{Deferred, DeferredPlan, PendingWriteTxn};
use crate::coord::dataflows::{prep_relation_expr, prep_scalar_expr, EvalTime, ExprPrepStyle};
use crate::coord::dataflows::{prep_scalar_expr, EvalTime, ExprPrepStyle};
use crate::coord::id_bundle::CollectionIdBundle;
use crate::coord::peek::{FastPathPlan, PeekDataflowPlan, PeekPlan, PlannedPeek};
use crate::coord::read_policy::SINCE_GRANULARITY;
Expand Down Expand Up @@ -3150,6 +3149,7 @@ impl Coordinator {
plan::ExplaineeStatement::Query {
raw_plan,
row_set_finishing,
desc,
broken,
} => {
if enable_unified_optimizer_api {
Expand All @@ -3160,7 +3160,8 @@ impl Coordinator {
broken,
target_cluster,
ctx.session_mut(),
&row_set_finishing,
row_set_finishing,
desc,
&config,
root_dispatch,
)
Expand All @@ -3177,7 +3178,7 @@ impl Coordinator {
broken,
target_cluster,
ctx.session_mut(),
&row_set_finishing,
&Some(row_set_finishing),
&config,
root_dispatch,
)
Expand Down Expand Up @@ -3334,7 +3335,8 @@ impl Coordinator {
broken: bool,
target_cluster: TargetCluster,
session: &mut Session,
finishing: &Option<RowSetFinishing>,
finishing: RowSetFinishing,
desc: RelationDesc,
explain_config: &mz_repr::explain::ExplainConfig,
root_dispatch: tracing::Dispatch,
) -> Result<
Expand All @@ -3352,11 +3354,42 @@ impl Coordinator {
tracing::warn!("EXPLAIN ... BROKEN <query> is known to leak memory, use with caution");
}

let catalog = self.catalog();
// Initialize optimizer context
// ----------------------------

// Collect optimizer parameters.
let catalog = self.owned_catalog();
let target_cluster_id = catalog.resolve_target_cluster(target_cluster, session)?.id;
let compute_instance = self
.instance_snapshot(target_cluster_id)
.expect("compute instance does not exist");
let select_id = self.allocate_transient_id()?;
let index_id = self.allocate_transient_id()?;
let system_config = catalog.system_config();
let optimizer_config = OptimizerConfig::from((system_config, explain_config));

// Build an optimizer for this SELECT.
let mut optimizer = optimize::OptimizePeek::new(
Arc::clone(&catalog),
compute_instance,
finishing,
select_id,
index_id,
optimizer_config.clone(),
);

// Create a transient catalog item
// -------------------------------

let mut transient_items = BTreeMap::new();
transient_items.insert(select_id, {
TransientItem::new(
Some(GlobalId::Explain.to_string()),
Some(GlobalId::Explain.to_string()),
Some(desc.iter_names().map(|c| c.to_string()).collect()),
)
});

// Execute the various stages of the optimization pipeline
// -------------------------------------------------------

Expand All @@ -3365,148 +3398,112 @@ impl Coordinator {
trace_plan(&raw_plan);
});

// Execute the `optimize/hir_to_mir` stage.
let decorrelated_plan = catch_unwind(broken, "hir_to_mir", || {
raw_plan.lower((system_config, explain_config))
})?;

let mut timeline_context =
self.validate_timeline_context(decorrelated_plan.depends_on())?;
if matches!(timeline_context, TimelineContext::TimestampIndependent)
&& decorrelated_plan.contains_temporal()
{
// If the source IDs are timestamp independent but the query contains temporal functions,
// then the timeline context needs to be upgraded to timestamp dependent. This is
// required because `source_ids` doesn't contain functions.
timeline_context = TimelineContext::TimestampDependent;
}

let source_ids = decorrelated_plan.depends_on();
let id_bundle = self
.index_oracle(target_cluster_id)
.sufficient_collections(&source_ids);
// MIR ⇒ MIR optimization (local)
let local_mir_plan = catch_unwind(broken, "optimize", || optimizer.optimize(raw_plan))?;

// Execute the `optimize/local` stage.
let optimized_plan = catch_unwind(broken, "local", || {
let _span = tracing::span!(target: "optimizer", Level::TRACE, "local").entered();
let optimized_plan = self.view_optimizer.optimize(decorrelated_plan)?;
trace_plan(optimized_plan.as_inner());
Ok::<_, AdapterError>(optimized_plan)
})?;
// Resolve timestamp statistics catalog
// ------------------------------------

// Acquire a timestamp (necessary for loading statistics).
let timestamp_ctx = self
.sequence_peek_timestamp(
session,
&QueryWhen::Immediately,
target_cluster_id,
timeline_context,
&id_bundle,
&source_ids,
None, // no real-time recency
)
.await?
.timestamp_context;
let (as_of, stats) = {
let source_ids = local_mir_plan.expr().depends_on();
let mut timeline_context =
self.validate_timeline_context(source_ids.iter().cloned())?;
if matches!(timeline_context, TimelineContext::TimestampIndependent)
&& local_mir_plan.expr().contains_temporal()
{
// If the source IDs are timestamp independent but the query contains temporal functions,
// then the timeline context needs to be upgraded to timestamp dependent. This is
// required because `source_ids` doesn't contain functions.
timeline_context = TimelineContext::TimestampDependent;
}

// Load cardinality statistics.
//
// TODO: proper stats needs exact timestamp at the moment. However, we
// don't want to resolve the timestamp twice, so we need to figure out a
// way to get somewhat stale stats.
let stats = self
.statistics_oracle(session, &source_ids, timestamp_ctx.antichain(), true)
.with_subscriber(root_dispatch)
.await?;
let id_bundle = self
.index_oracle(target_cluster_id)
.sufficient_collections(&source_ids);

let state = self.catalog().state();
// Acquire a timestamp (necessary for loading statistics).
let timestamp_ctx = self
.sequence_peek_timestamp(
session,
&QueryWhen::Immediately,
target_cluster_id,
timeline_context,
&id_bundle,
&source_ids,
None, // no real-time recency
)
.await?
.timestamp_context;

// Execute the `optimize/global` stage.
let (mut df, df_metainfo) = catch_unwind(broken, "global", || {
let mut df_builder = self.dataflow_builder(target_cluster_id);
// Load cardinality statistics.
//
// TODO: proper stats needs exact timestamp at the moment. However, we
// don't want to resolve the timestamp twice, so we need to figure out a
// way to get somewhat stale stats.
let stats = self
.statistics_oracle(session, &source_ids, timestamp_ctx.antichain(), true)
.with_subscriber(root_dispatch)
.await?;

(timestamp_ctx.antichain(), stats)
};

let mut df = DataflowDesc::new("explanation".to_string());
df_builder.import_view_into_dataflow(&GlobalId::Explain, &optimized_plan, &mut df)?;
df_builder.reoptimize_imported_views(&mut df, &optimizer_config)?;
let (used_indexes, fast_path_plan, df_meta) = catch_unwind(broken, "optimize", || {
// MIR ⇒ MIR optimization (global)
let local_mir_plan = local_mir_plan.resolve(session, stats);
let global_mir_plan = optimizer.optimize(local_mir_plan)?;

// Resolve all unmaterializable function calls except mz_now(),
// because in line with the `sequence_~` method we pretend that we
// don't have a timestamp yet.
let style = ExprPrepStyle::OneShot {
logical_time: EvalTime::Deferred,
session,
// Collect the list of indexes used by the dataflow at this point
let mut used_indexes = {
let df_desc = global_mir_plan.df_desc();
let df_meta = global_mir_plan.df_meta();
UsedIndexes::new(
df_desc
.index_imports
.iter()
.map(|(id, _index_import)| {
(*id, df_meta.index_usage_types.get(id).expect("prune_and_annotate_dataflow_index_imports should have been called already").clone())
})
.collect(),
)
};
df.visit_children(
|r| prep_relation_expr(state, r, style),
|s| prep_scalar_expr(state, s, style),
)?;

// Optimize the dataflow across views, and any other ways that appeal.
let df_metainfo = mz_transform::optimize_dataflow(
&mut df,
&self.index_oracle(target_cluster_id),
stats.as_ref(),
)?;

Ok::<_, AdapterError>((df, df_metainfo))
})?;

// Collect the list of indexes used by the dataflow at this point
let mut used_indexes = UsedIndexes::new(
df
.index_imports
.iter()
.map(|(id, _index_import)| {
(*id, df_metainfo.index_usage_types.get(id).expect("prune_and_annotate_dataflow_index_imports should have been called already").clone())
})
.collect(),
);

// Determine if fast path plan will be used for this explainee.
let fast_path_plan = {
df.set_as_of(timestamp_ctx.antichain());
// MIR ⇒ LIR lowering and LIR ⇒ LIR optimization (global)
let global_mir_plan = global_mir_plan.resolve(as_of, session);
let global_lir_plan = optimizer.optimize(global_mir_plan)?;

// Resolve all unmaterializable function including mz_now().
let style = ExprPrepStyle::OneShot {
logical_time: EvalTime::Time(timestamp_ctx.timestamp_or_default()),
session,
};
df.visit_children(
|r| prep_relation_expr(state, r, style),
|s| prep_scalar_expr(state, s, style),
)?;
peek::create_fast_path_plan(
&mut df,
GlobalId::Explain,
finishing.as_ref(),
self.catalog.system_config().persist_fast_path_limit(),
)?
};
let (fast_path_plan, df_meta) = match global_lir_plan {
optimize::peek::GlobalLirPlan::FastPath { plan, typ, df_meta } => {
let finishing = if !optimizer.finishing.is_trivial(typ.arity()) {
Some(optimizer.finishing.clone())
} else {
None
};
used_indexes = plan.used_indexes(&finishing);

if let Some(fast_path_plan) = &fast_path_plan {
used_indexes = fast_path_plan.used_indexes(finishing);
}
// TODO(aalexandrov): rework `OptimizerTrace` with support
// for diverging plan types towards the end and add a
// `PlanTrace` for the `FastPathPlan` type.
trace_plan(&"fast_path_plan (missing)".to_string());

// We have the opportunity to name an `until` frontier that will prevent work we needn't perform.
// By default, `until` will be `Antichain::new()`, which prevents no updates and is safe.
if let Some(as_of) = df.as_of.as_ref() {
if !as_of.is_empty() {
if let Some(next) = as_of.as_option().and_then(|as_of| as_of.checked_add(1)) {
df.until = timely::progress::Antichain::from_elem(next);
(Some(plan), df_meta)
}
}
}
optimize::peek::GlobalLirPlan::SlowPath {
df_desc,
df_meta,
typ: _,
} => {
trace_plan(&df_desc);
(None, df_meta)
}
};

// Execute the `optimize/finalize_dataflow` stage.
let df = catch_unwind(broken, "finalize_dataflow", || {
self.finalize_dataflow(df, target_cluster_id)
Ok::<_, AdapterError>((used_indexes, fast_path_plan, df_meta))
})?;

// Trace the resulting plan for the top-level `optimize` path.
trace_plan(&df);

// Return objects that need to be passed to the `ExplainContext`
// when rendering explanations for the various trace entries.
Ok((used_indexes, fast_path_plan, df_metainfo, BTreeMap::new()))
Ok((used_indexes, fast_path_plan, df_meta, transient_items))
}

/// Run the MV optimization explanation pipeline. This function must be called with
Expand Down
15 changes: 11 additions & 4 deletions src/sql/src/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -847,7 +847,8 @@ pub enum ExplaineeStatement {
/// The object to be explained is a SELECT statement.
Query {
raw_plan: HirRelationExpr,
row_set_finishing: Option<RowSetFinishing>,
row_set_finishing: RowSetFinishing,
desc: RelationDesc,
/// Broken flag (see [`ExplaineeStatement::broken()`]).
broken: bool,
},
Expand Down Expand Up @@ -900,10 +901,16 @@ impl ExplaineeStatement {
pub fn row_set_finishing(&self) -> Option<RowSetFinishing> {
match self {
Self::Query {
row_set_finishing, ..
row_set_finishing,
desc,
..
} => {
// Use the optional finishing extracted in the plan_query call.
row_set_finishing.clone()
if !row_set_finishing.is_trivial(desc.arity()) {
// Use the optional finishing extracted in the plan_query call.
Some(row_set_finishing.clone())
} else {
None
}
}
Self::CreateMaterializedView { .. } => {
// Trivial finishing asserted in plan_create_materialized_view.
Expand Down
9 changes: 2 additions & 7 deletions src/sql/src/plan/statement/dml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,24 +334,19 @@ pub fn plan_explain_plan(
let query::PlannedQuery {
expr: mut raw_plan,
desc,
finishing,
finishing: row_set_finishing,
scope: _,
} = query::plan_root_query(scx, *query, QueryLifetime::OneShot)?;
raw_plan.bind_parameters(params)?;

let row_set_finishing = if finishing.is_trivial(desc.arity()) {
None
} else {
Some(finishing)
};

if broken {
scx.require_feature_flag(&vars::ENABLE_EXPLAIN_BROKEN)?;
}

crate::plan::Explainee::Statement(ExplaineeStatement::Query {
raw_plan,
row_set_finishing,
desc,
broken,
})
}
Expand Down

0 comments on commit 04b1717

Please sign in to comment.