Skip to content

Commit

Permalink
adapter: fork old implementations for SELECT paths
Browse files Browse the repository at this point in the history
Fork the old implementations of

- `Coordinator::sequence_peek*`, and
- `Coordinator::explain_query_optimizer_pipeline`,

by duplicating code and dispatching between the old and the new code
path based on the value of the `enable_unified_optimizer_api` feature
flag. At the moment the two code paths are still identical, but this
will change with the next commit.

See the rollout proposal in MaterializeInc#20569 for details.
  • Loading branch information
aalexandrov committed Nov 7, 2023
1 parent b12c00c commit 37bc228
Show file tree
Hide file tree
Showing 5 changed files with 895 additions and 19 deletions.
24 changes: 24 additions & 0 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ use crate::coord::dataflows::dataflow_import_id_bundle;
use crate::coord::id_bundle::CollectionIdBundle;
use crate::coord::peek::PendingPeek;
use crate::coord::read_policy::ReadCapability;
use crate::coord::sequencer::old_optimizer_api::PeekStageDeprecated;
use crate::coord::timeline::{TimelineContext, TimelineState, WriteTimestamp};
use crate::coord::timestamp_oracle::catalog_oracle::CatalogTimestampPersistence;
use crate::coord::timestamp_selection::TimestampContext;
Expand Down Expand Up @@ -225,6 +226,10 @@ pub enum Message<T = mz_repr::Timestamp> {
ctx: ExecuteContext,
stage: PeekStage,
},
PeekStageDeprecatedReady {
ctx: ExecuteContext,
stage: PeekStageDeprecated,
},
DrainStatementLog,
}

Expand Down Expand Up @@ -264,6 +269,7 @@ impl Message {
"execute_single_statement_transaction"
}
Message::PeekStageReady { .. } => "peek_stage_ready",
Message::PeekStageDeprecatedReady { .. } => "peek_stage_ready",
Message::DrainStatementLog => "drain_statement_log",
}
}
Expand Down Expand Up @@ -323,13 +329,31 @@ pub enum RealTimeRecencyContext {
typ: RelationType,
dataflow_metainfo: DataflowMetainfo,
},
PeekDeprecated {
ctx: ExecuteContext,
finishing: RowSetFinishing,
copy_to: Option<CopyFormat>,
dataflow: DataflowDescription<OptimizedMirRelationExpr>,
cluster_id: ClusterId,
when: QueryWhen,
target_replica: Option<ReplicaId>,
view_id: GlobalId,
index_id: GlobalId,
timeline_context: TimelineContext,
source_ids: BTreeSet<GlobalId>,
in_immediate_multi_stmt_txn: bool,
key: Vec<MirScalarExpr>,
typ: RelationType,
dataflow_metainfo: DataflowMetainfo,
},
}

impl RealTimeRecencyContext {
pub(crate) fn take_context(self) -> ExecuteContext {
match self {
RealTimeRecencyContext::ExplainTimestamp { ctx, .. }
| RealTimeRecencyContext::Peek { ctx, .. } => ctx,
RealTimeRecencyContext::PeekDeprecated { ctx, .. } => ctx,
}
}
}
Expand Down
50 changes: 50 additions & 0 deletions src/adapter/src/coord/message_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use tracing_opentelemetry::OpenTelemetrySpanExt;

use crate::command::Command;
use crate::coord::appends::Deferred;
use crate::coord::sequencer::old_optimizer_api::{PeekStageDeprecated, PeekStageFinishDeprecated};
use crate::coord::{
Coordinator, CreateConnectionValidationReady, Message, PeekStage, PeekStageFinish,
PendingReadTxn, PlanValidity, PurifiedStatementReady, RealTimeRecencyContext,
Expand Down Expand Up @@ -107,6 +108,12 @@ impl Coordinator {
Message::PeekStageReady { ctx, stage } => {
self.sequence_peek_stage(ctx, stage).await;
}
Message::PeekStageDeprecatedReady { ctx, stage } => {
// Allow while the introduction of the new optimizer API in
// #20569 is in progress.
#[allow(deprecated)]
self.sequence_peek_stage_deprecated(ctx, stage).await;
}
Message::DrainStatementLog => {
self.drain_statement_log().await;
}
Expand Down Expand Up @@ -694,6 +701,49 @@ impl Coordinator {
)
.await;
}
RealTimeRecencyContext::PeekDeprecated {
ctx,
finishing,
copy_to,
dataflow,
cluster_id,
when,
target_replica,
view_id,
index_id,
timeline_context,
source_ids,
in_immediate_multi_stmt_txn: _,
key,
typ,
dataflow_metainfo,
} => {
// Allow while the introduction of the new optimizer API in
// #20569 is in progress.
#[allow(deprecated)]
self.sequence_peek_stage_deprecated(
ctx,
PeekStageDeprecated::Finish(PeekStageFinishDeprecated {
validity,
finishing,
copy_to,
dataflow,
cluster_id,
id_bundle: None,
when,
target_replica,
view_id,
index_id,
timeline_context,
source_ids,
real_time_recency_ts: Some(real_time_recency_ts),
key,
typ,
dataflow_metainfo,
}),
)
.await;
}
}
}
}
2 changes: 1 addition & 1 deletion src/adapter/src/coord/sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ mod alter_set_cluster;
mod cluster;
mod inner;
mod linked_cluster;
mod old_optimizer_api;
pub(super) mod old_optimizer_api;

impl Coordinator {
#[tracing::instrument(level = "debug", skip_all)]
Expand Down
71 changes: 60 additions & 11 deletions src/adapter/src/coord/sequencer/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ use crate::coord::dataflows::{
use crate::coord::id_bundle::CollectionIdBundle;
use crate::coord::peek::{FastPathPlan, PeekDataflowPlan, PlannedPeek};
use crate::coord::read_policy::SINCE_GRANULARITY;
use crate::coord::sequencer::old_optimizer_api::{
PeekStageDeprecated, PeekStageValidateDeprecated,
};
use crate::coord::timeline::TimelineContext;
use crate::coord::timestamp_selection::{
TimestampContext, TimestampDetermination, TimestampProvider, TimestampSource,
Expand Down Expand Up @@ -2132,17 +2135,40 @@ impl Coordinator {
) {
event!(Level::TRACE, plan = format!("{:?}", plan));

self.sequence_peek_stage(
ctx,
PeekStage::Validate(PeekStageValidate {
plan,
target_cluster,
}),
)
.await;
let enable_unified_optimizer_api = self
.catalog()
.system_config()
.enable_unified_optimizer_api();

if enable_unified_optimizer_api {
self.sequence_peek_stage(
ctx,
PeekStage::Validate(PeekStageValidate {
plan,
target_cluster,
}),
)
.await;
} else {
// Allow while the introduction of the new optimizer API in
// #20569 is in progress.
#[allow(deprecated)]
self.sequence_peek_stage_deprecated(
ctx,
PeekStageDeprecated::Validate(PeekStageValidateDeprecated {
plan,
target_cluster,
}),
)
.await;
}
}

/// Processes as many peek stages as possible.
///
/// WARNING! This should mirror the semantics of `sequence_peek_stage_deprecated`.
///
/// See ./doc/developer/design/20230714_optimizer_interface.md#minimal-viable-prototype
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) async fn sequence_peek_stage(
&mut self,
Expand Down Expand Up @@ -2185,7 +2211,11 @@ impl Coordinator {
}
}

// Do some simple validation. We must defer most of it until after any off-thread work.
/// Do some simple validation. We must defer most of it until after any off-thread work.
///
/// WARNING! This should mirror the semantics of `peek_stage_validate_deprecated`.
///
/// See ./doc/developer/design/20230714_optimizer_interface.md#minimal-viable-prototype
fn peek_stage_validate(
&mut self,
session: &Session,
Expand Down Expand Up @@ -2273,6 +2303,9 @@ impl Coordinator {
})
}

/// WARNING! This should mirror the semantics of `peek_stage_optimize_deprecated`.
///
/// See ./doc/developer/design/20230714_optimizer_interface.md#minimal-viable-prototype
async fn peek_stage_optimize(&mut self, ctx: ExecuteContext, mut stage: PeekStageOptimize) {
// Generate data structures that can be moved to another task where we will perform possibly
// expensive optimizations.
Expand Down Expand Up @@ -2335,6 +2368,9 @@ impl Coordinator {
);
}

/// WARNING! This should mirror the semantics of `optimize_peek_deprecated`.
///
/// See ./doc/developer/design/20230714_optimizer_interface.md#minimal-viable-prototype
fn optimize_peek(
catalog: &CatalogState,
compute: ComputeInstanceSnapshot,
Expand Down Expand Up @@ -2416,6 +2452,9 @@ impl Coordinator {
})
}

/// WARNING! This should mirror the semantics of `peek_stage_timestamp_deprecated`.
///
/// See ./doc/developer/design/20230714_optimizer_interface.md#minimal-viable-prototype
#[tracing::instrument(level = "debug", skip_all)]
fn peek_stage_timestamp(
&mut self,
Expand Down Expand Up @@ -2501,6 +2540,9 @@ impl Coordinator {
}
}

/// WARNING! This should mirror the semantics of `peek_stage_finish_deprecated`.
///
/// See ./doc/developer/design/20230714_optimizer_interface.md#minimal-viable-prototype
#[tracing::instrument(level = "debug", skip_all)]
async fn peek_stage_finish(
&mut self,
Expand Down Expand Up @@ -2591,6 +2633,10 @@ impl Coordinator {

/// Determines the query timestamp and acquires read holds on dependent sources
/// if necessary.
///
/// WARNING! This should mirror the semantics of `sequence_peek_timestamp_deprecated`.
///
/// See ./doc/developer/design/20230714_optimizer_interface.md#minimal-viable-prototype
async fn sequence_peek_timestamp(
&mut self,
session: &mut Session,
Expand Down Expand Up @@ -2711,6 +2757,9 @@ impl Coordinator {
Ok(determination)
}

/// WARNING! This should mirror the semantics of `plan_peek_deprecated`.
///
/// See ./doc/developer/design/20230714_optimizer_interface.md#minimal-viable-prototype
async fn plan_peek(
&mut self,
mut dataflow: DataflowDescription<OptimizedMirRelationExpr>,
Expand Down Expand Up @@ -2778,7 +2827,7 @@ impl Coordinator {

/// Checks to see if the session needs a real time recency timestamp and if so returns
/// a future that will return the timestamp.
fn recent_timestamp(
pub(super) fn recent_timestamp(
&self,
session: &Session,
source_ids: impl Iterator<Item = GlobalId>,
Expand Down Expand Up @@ -5709,7 +5758,7 @@ impl mz_transform::StatisticsOracle for CachedStatisticsOracle {
}

impl Coordinator {
async fn statistics_oracle(
pub(super) async fn statistics_oracle(
&self,
session: &Session,
source_ids: &BTreeSet<GlobalId>,
Expand Down
Loading

0 comments on commit 37bc228

Please sign in to comment.