Skip to content

Commit

Permalink
adapter: fork old implementations of EXPLAIN SELECT paths
Browse files Browse the repository at this point in the history
  • Loading branch information
aalexandrov committed Nov 7, 2023
1 parent 902ab3a commit 72deda8
Show file tree
Hide file tree
Showing 2 changed files with 256 additions and 16 deletions.
50 changes: 37 additions & 13 deletions src/adapter/src/coord/sequencer/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3140,25 +3140,49 @@ impl Coordinator {
// we aren't storing this clone in a `Subscriber`, so we should be fine.
let root_dispatch = tracing::dispatcher::get_default(|d| d.clone());

let enable_unified_optimizer_api = self
.catalog()
.system_config()
.enable_unified_optimizer_api();

let pipeline_result = match stmt {
plan::ExplaineeStatement::Query {
raw_plan,
row_set_finishing,
broken,
} => {
// Please see the doc comment on `explain_query_optimizer_pipeline` for more
// information regarding its subtleties.
self.explain_query_optimizer_pipeline(
raw_plan,
broken,
target_cluster,
ctx.session_mut(),
&row_set_finishing,
&config,
root_dispatch,
)
.with_subscriber(&optimizer_trace)
.await
if enable_unified_optimizer_api {
// Please see the doc comment on `explain_query_optimizer_pipeline` for more
// information regarding its subtleties.
self.explain_query_optimizer_pipeline(
raw_plan,
broken,
target_cluster,
ctx.session_mut(),
&row_set_finishing,
&config,
root_dispatch,
)
.with_subscriber(&optimizer_trace)
.await
} else {
// Allow while the introduction of the new optimizer API in
// #20569 is in progress.
#[allow(deprecated)]
// Please see the doc comment on `explain_query_optimizer_pipeline` for more
// information regarding its subtleties.
self.explain_query_optimizer_pipeline_deprecated(
raw_plan,
broken,
target_cluster,
ctx.session_mut(),
&row_set_finishing,
&config,
root_dispatch,
)
.with_subscriber(&optimizer_trace)
.await
}
}
plan::ExplaineeStatement::CreateMaterializedView {
name,
Expand Down
222 changes: 219 additions & 3 deletions src/adapter/src/coord/sequencer/old_optimizer_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

#![allow(deprecated)]

use std::collections::BTreeSet;
use std::collections::{BTreeMap, BTreeSet};

use mz_cluster_client::ReplicaId;
use mz_compute_types::dataflows::{DataflowDesc, DataflowDescription, IndexDesc};
Expand All @@ -28,24 +28,27 @@ use mz_expr::{
OptimizedMirRelationExpr, RowSetFinishing,
};
use mz_ore::task;
use mz_repr::explain::{TransientItem, UsedIndexes};
use mz_repr::{GlobalId, RelationDesc, RelationType, Timestamp};
use mz_sql::catalog::CatalogCluster;
use mz_sql::plan::{self, CopyFormat, QueryWhen, SubscribeFrom};
use mz_transform::dataflow::DataflowMetainfo;
use mz_transform::{EmptyStatisticsOracle, Optimizer, StatisticsOracle};
use timely::progress::Antichain;
use tokio::sync::mpsc;
use tracing::instrument::WithSubscriber;
use tracing::{event, warn, Level};

use crate::catalog::CatalogState;
use crate::coord::dataflows::{
prep_relation_expr, prep_scalar_expr, ComputeInstanceSnapshot, DataflowBuilder, EvalTime,
ExprPrepStyle,
};
use crate::coord::peek::PlannedPeek;
use crate::coord::sequencer::inner::{check_log_reads, return_if_err};
use crate::coord::peek::{self, FastPathPlan, PlannedPeek};
use crate::coord::sequencer::inner::{catch_unwind, check_log_reads, return_if_err};
use crate::coord::timestamp_selection::TimestampDetermination;
use crate::coord::{Coordinator, Message, PlanValidity, RealTimeRecencyContext, TargetCluster};
use crate::optimize::OptimizerConfig;
use crate::session::{Session, TransactionOps, TransactionStatus};
use crate::subscribe::ActiveSubscribe;
use crate::util::{ComputeSinkId, ResultExt};
Expand Down Expand Up @@ -909,6 +912,219 @@ impl Coordinator {
source_ids,
})
}

/// Run the query optimization explanation pipeline. This function must be called with
/// an `OptimizerTrace` `tracing` subscriber, using `.with_subscriber(...)`.
/// The `root_dispatch` should be the global `tracing::Dispatch`.
///
/// This should mirror the operational semantics of
/// `Coordinator::explain_query_optimizer_pipeline`.
//
// WARNING, ENTERING SPOOKY ZONE 3.0
//
// You must be careful when altering this function. Any async call (so, anything that uses
// `.await` should be wrapped with `.with_subscriber(root_dispatch)`.
//
// `tracing` has limitations that mean that any `Span` created under the `OptimizerTrace`
// subscriber that _leaves_ this function will almost assuredly cause a panic inside `tracing`.
// This is because `Span`s track the `Subscriber` they were created under, but certain actions
// (like an ordinary `Span` exit) will call a method on the _thread-local_ `Subscriber`, which
// may be backed with a different `Registry`.
//
// At first glance, there is no obvious way this method leaks `Span`s, but ANY tokio
// resource (like `oneshot` channels) create `Span`s if tokio is built with `tokio_unstable`
// and the `tracing` feature. This method has been audited to make sure ALL such
// cases are dispatched inside the global `root_dispatch`, and **any change to this method
// needs to ensure this invariant is upheld.**
//
// It is a bit wonky to have this method under a specialized `Dispatch`, but ensuring
// all `.await` points inside it use the passed `root_dispatch`, but splitting the method
// into pieces to allow us to control the `Dispatch` for various pieces at a higher-level
// would be very hard to read. Additionally, once the issues with `broken` are resolved
// (as discussed in <https://github.com/MaterializeInc/materialize/pull/21809>), this
// can be simplified, as only a _singular_ `Registry` will be in use.
#[deprecated = "This is being replaced by explain_query_optimizer_pipeline (see #20569)."]
#[tracing::instrument(target = "optimizer", level = "trace", name = "optimize", skip_all)]
pub(crate) async fn explain_query_optimizer_pipeline_deprecated(
&mut self,
raw_plan: mz_sql::plan::HirRelationExpr,
broken: bool,
target_cluster: TargetCluster,
session: &mut Session,
finishing: &Option<RowSetFinishing>,
explain_config: &mz_repr::explain::ExplainConfig,
root_dispatch: tracing::Dispatch,
) -> Result<
(
UsedIndexes,
Option<FastPathPlan>,
DataflowMetainfo,
BTreeMap<GlobalId, TransientItem>,
),
AdapterError,
> {
use mz_repr::explain::trace_plan;

if broken {
tracing::warn!("EXPLAIN ... BROKEN <query> is known to leak memory, use with caution");
}

let catalog = self.catalog();
let target_cluster_id = catalog.resolve_target_cluster(target_cluster, session)?.id;
let system_config = catalog.system_config();
let optimizer_config = OptimizerConfig::from((system_config, explain_config));

// Execute the various stages of the optimization pipeline
// -------------------------------------------------------

// Trace the pipeline input under `optimize/raw`.
tracing::span!(target: "optimizer", Level::TRACE, "raw").in_scope(|| {
trace_plan(&raw_plan);
});

// Execute the `optimize/hir_to_mir` stage.
let decorrelated_plan = catch_unwind(broken, "hir_to_mir", || {
raw_plan.lower((system_config, explain_config))
})?;

let mut timeline_context =
self.validate_timeline_context(decorrelated_plan.depends_on())?;
if matches!(timeline_context, TimelineContext::TimestampIndependent)
&& decorrelated_plan.contains_temporal()
{
// If the source IDs are timestamp independent but the query contains temporal functions,
// then the timeline context needs to be upgraded to timestamp dependent. This is
// required because `source_ids` doesn't contain functions.
timeline_context = TimelineContext::TimestampDependent;
}

let source_ids = decorrelated_plan.depends_on();
let id_bundle = self
.index_oracle(target_cluster_id)
.sufficient_collections(&source_ids);

// Execute the `optimize/local` stage.
let optimized_plan = catch_unwind(broken, "local", || {
let _span = tracing::span!(target: "optimizer", Level::TRACE, "local").entered();
let optimized_plan = self.view_optimizer.optimize(decorrelated_plan)?;
trace_plan(optimized_plan.as_inner());
Ok::<_, AdapterError>(optimized_plan)
})?;

// Acquire a timestamp (necessary for loading statistics).
let timestamp_ctx = self
.sequence_peek_timestamp_deprecated(
session,
&QueryWhen::Immediately,
target_cluster_id,
timeline_context,
&id_bundle,
&source_ids,
None, // no real-time recency
)
.await?
.timestamp_context;

// Load cardinality statistics.
//
// TODO: proper stats needs exact timestamp at the moment. However, we
// don't want to resolve the timestamp twice, so we need to figure out a
// way to get somewhat stale stats.
let stats = self
.statistics_oracle(session, &source_ids, timestamp_ctx.antichain(), true)
.with_subscriber(root_dispatch)
.await?;

let state = self.catalog().state();

// Execute the `optimize/global` stage.
let (mut df, df_metainfo) = catch_unwind(broken, "global", || {
let mut df_builder = self.dataflow_builder(target_cluster_id);

let mut df = DataflowDesc::new("explanation".to_string());
df_builder.import_view_into_dataflow(&GlobalId::Explain, &optimized_plan, &mut df)?;
df_builder.reoptimize_imported_views(&mut df, &optimizer_config)?;

// Resolve all unmaterializable function calls except mz_now(),
// because in line with the `sequence_~` method we pretend that we
// don't have a timestamp yet.
let style = ExprPrepStyle::OneShot {
logical_time: EvalTime::Deferred,
session,
};
df.visit_children(
|r| prep_relation_expr(state, r, style),
|s| prep_scalar_expr(state, s, style),
)?;

// Optimize the dataflow across views, and any other ways that appeal.
let df_metainfo = mz_transform::optimize_dataflow(
&mut df,
&self.index_oracle(target_cluster_id),
stats.as_ref(),
)?;

Ok::<_, AdapterError>((df, df_metainfo))
})?;

// Collect the list of indexes used by the dataflow at this point
let mut used_indexes = UsedIndexes::new(
df
.index_imports
.iter()
.map(|(id, _index_import)| {
(*id, df_metainfo.index_usage_types.get(id).expect("prune_and_annotate_dataflow_index_imports should have been called already").clone())
})
.collect(),
);

// Determine if fast path plan will be used for this explainee.
let fast_path_plan = {
df.set_as_of(timestamp_ctx.antichain());

// Resolve all unmaterializable function including mz_now().
let style = ExprPrepStyle::OneShot {
logical_time: EvalTime::Time(timestamp_ctx.timestamp_or_default()),
session,
};
df.visit_children(
|r| prep_relation_expr(state, r, style),
|s| prep_scalar_expr(state, s, style),
)?;
peek::create_fast_path_plan(
&mut df,
GlobalId::Explain,
finishing.as_ref(),
self.catalog.system_config().persist_fast_path_limit(),
)?
};

if let Some(fast_path_plan) = &fast_path_plan {
used_indexes = fast_path_plan.used_indexes(finishing);
}

// We have the opportunity to name an `until` frontier that will prevent work we needn't perform.
// By default, `until` will be `Antichain::new()`, which prevents no updates and is safe.
if let Some(as_of) = df.as_of.as_ref() {
if !as_of.is_empty() {
if let Some(next) = as_of.as_option().and_then(|as_of| as_of.checked_add(1)) {
df.until = timely::progress::Antichain::from_elem(next);
}
}
}

// Execute the `optimize/finalize_dataflow` stage.
let df = catch_unwind(broken, "finalize_dataflow", || {
self.finalize_dataflow(df, target_cluster_id)
})?;

// Trace the resulting plan for the top-level `optimize` path.
trace_plan(&df);

// Return objects that need to be passed to the `ExplainContext`
// when rendering explanations for the various trace entries.
Ok((used_indexes, fast_path_plan, df_metainfo, BTreeMap::new()))
}
}

#[derive(Debug)]
Expand Down

0 comments on commit 72deda8

Please sign in to comment.