diff --git a/src/adapter/src/coord.rs b/src/adapter/src/coord.rs index d09430f3ba9a1..62d9384c1a411 100644 --- a/src/adapter/src/coord.rs +++ b/src/adapter/src/coord.rs @@ -134,6 +134,7 @@ use crate::coord::dataflows::dataflow_import_id_bundle; use crate::coord::id_bundle::CollectionIdBundle; use crate::coord::peek::PendingPeek; use crate::coord::read_policy::ReadCapability; +use crate::coord::sequencer::old_optimizer_api::PeekStageDeprecated; use crate::coord::timeline::{TimelineContext, TimelineState, WriteTimestamp}; use crate::coord::timestamp_oracle::catalog_oracle::CatalogTimestampPersistence; use crate::coord::timestamp_selection::TimestampContext; @@ -225,6 +226,10 @@ pub enum Message { ctx: ExecuteContext, stage: PeekStage, }, + PeekStageDeprecatedReady { + ctx: ExecuteContext, + stage: PeekStageDeprecated, + }, DrainStatementLog, } @@ -264,6 +269,7 @@ impl Message { "execute_single_statement_transaction" } Message::PeekStageReady { .. } => "peek_stage_ready", + Message::PeekStageDeprecatedReady { .. } => "peek_stage_ready", Message::DrainStatementLog => "drain_statement_log", } } @@ -323,6 +329,23 @@ pub enum RealTimeRecencyContext { typ: RelationType, dataflow_metainfo: DataflowMetainfo, }, + PeekDeprecated { + ctx: ExecuteContext, + finishing: RowSetFinishing, + copy_to: Option, + dataflow: DataflowDescription, + cluster_id: ClusterId, + when: QueryWhen, + target_replica: Option, + view_id: GlobalId, + index_id: GlobalId, + timeline_context: TimelineContext, + source_ids: BTreeSet, + in_immediate_multi_stmt_txn: bool, + key: Vec, + typ: RelationType, + dataflow_metainfo: DataflowMetainfo, + }, } impl RealTimeRecencyContext { @@ -330,6 +353,7 @@ impl RealTimeRecencyContext { match self { RealTimeRecencyContext::ExplainTimestamp { ctx, .. } | RealTimeRecencyContext::Peek { ctx, .. } => ctx, + RealTimeRecencyContext::PeekDeprecated { ctx, .. } => ctx, } } } diff --git a/src/adapter/src/coord/message_handler.rs b/src/adapter/src/coord/message_handler.rs index bb19c199fbc11..fecb0b44ed6de 100644 --- a/src/adapter/src/coord/message_handler.rs +++ b/src/adapter/src/coord/message_handler.rs @@ -30,6 +30,7 @@ use tracing_opentelemetry::OpenTelemetrySpanExt; use crate::command::Command; use crate::coord::appends::Deferred; +use crate::coord::sequencer::old_optimizer_api::{PeekStageDeprecated, PeekStageFinishDeprecated}; use crate::coord::{ Coordinator, CreateConnectionValidationReady, Message, PeekStage, PeekStageFinish, PendingReadTxn, PlanValidity, PurifiedStatementReady, RealTimeRecencyContext, @@ -107,6 +108,12 @@ impl Coordinator { Message::PeekStageReady { ctx, stage } => { self.sequence_peek_stage(ctx, stage).await; } + Message::PeekStageDeprecatedReady { ctx, stage } => { + // Allow while the introduction of the new optimizer API in + // #20569 is in progress. + #[allow(deprecated)] + self.sequence_peek_stage_deprecated(ctx, stage).await; + } Message::DrainStatementLog => { self.drain_statement_log().await; } @@ -694,6 +701,49 @@ impl Coordinator { ) .await; } + RealTimeRecencyContext::PeekDeprecated { + ctx, + finishing, + copy_to, + dataflow, + cluster_id, + when, + target_replica, + view_id, + index_id, + timeline_context, + source_ids, + in_immediate_multi_stmt_txn: _, + key, + typ, + dataflow_metainfo, + } => { + // Allow while the introduction of the new optimizer API in + // #20569 is in progress. + #[allow(deprecated)] + self.sequence_peek_stage_deprecated( + ctx, + PeekStageDeprecated::Finish(PeekStageFinishDeprecated { + validity, + finishing, + copy_to, + dataflow, + cluster_id, + id_bundle: None, + when, + target_replica, + view_id, + index_id, + timeline_context, + source_ids, + real_time_recency_ts: Some(real_time_recency_ts), + key, + typ, + dataflow_metainfo, + }), + ) + .await; + } } } } diff --git a/src/adapter/src/coord/sequencer.rs b/src/adapter/src/coord/sequencer.rs index 732b9a93a9783..4aafcd72a707c 100644 --- a/src/adapter/src/coord/sequencer.rs +++ b/src/adapter/src/coord/sequencer.rs @@ -60,7 +60,7 @@ mod alter_set_cluster; mod cluster; mod inner; mod linked_cluster; -mod old_optimizer_api; +pub(super) mod old_optimizer_api; impl Coordinator { #[tracing::instrument(level = "debug", skip_all)] diff --git a/src/adapter/src/coord/sequencer/inner.rs b/src/adapter/src/coord/sequencer/inner.rs index c76d9bc8ee617..6310fdd379872 100644 --- a/src/adapter/src/coord/sequencer/inner.rs +++ b/src/adapter/src/coord/sequencer/inner.rs @@ -97,6 +97,9 @@ use crate::coord::dataflows::{ use crate::coord::id_bundle::CollectionIdBundle; use crate::coord::peek::{FastPathPlan, PeekDataflowPlan, PlannedPeek}; use crate::coord::read_policy::SINCE_GRANULARITY; +use crate::coord::sequencer::old_optimizer_api::{ + PeekStageDeprecated, PeekStageValidateDeprecated, +}; use crate::coord::timeline::TimelineContext; use crate::coord::timestamp_selection::{ TimestampContext, TimestampDetermination, TimestampProvider, TimestampSource, @@ -2132,17 +2135,40 @@ impl Coordinator { ) { event!(Level::TRACE, plan = format!("{:?}", plan)); - self.sequence_peek_stage( - ctx, - PeekStage::Validate(PeekStageValidate { - plan, - target_cluster, - }), - ) - .await; + let enable_unified_optimizer_api = self + .catalog() + .system_config() + .enable_unified_optimizer_api(); + + if enable_unified_optimizer_api { + self.sequence_peek_stage( + ctx, + PeekStage::Validate(PeekStageValidate { + plan, + target_cluster, + }), + ) + .await; + } else { + // Allow while the introduction of the new optimizer API in + // #20569 is in progress. + #[allow(deprecated)] + self.sequence_peek_stage_deprecated( + ctx, + PeekStageDeprecated::Validate(PeekStageValidateDeprecated { + plan, + target_cluster, + }), + ) + .await; + } } /// Processes as many peek stages as possible. + /// + /// WARNING! This should mirror the semantics of `sequence_peek_stage_deprecated`. + /// + /// See ./doc/developer/design/20230714_optimizer_interface.md#minimal-viable-prototype #[tracing::instrument(level = "debug", skip_all)] pub(crate) async fn sequence_peek_stage( &mut self, @@ -2185,7 +2211,11 @@ impl Coordinator { } } - // Do some simple validation. We must defer most of it until after any off-thread work. + /// Do some simple validation. We must defer most of it until after any off-thread work. + /// + /// WARNING! This should mirror the semantics of `peek_stage_validate_deprecated`. + /// + /// See ./doc/developer/design/20230714_optimizer_interface.md#minimal-viable-prototype fn peek_stage_validate( &mut self, session: &Session, @@ -2273,6 +2303,9 @@ impl Coordinator { }) } + /// WARNING! This should mirror the semantics of `peek_stage_optimize_deprecated`. + /// + /// See ./doc/developer/design/20230714_optimizer_interface.md#minimal-viable-prototype async fn peek_stage_optimize(&mut self, ctx: ExecuteContext, mut stage: PeekStageOptimize) { // Generate data structures that can be moved to another task where we will perform possibly // expensive optimizations. @@ -2335,6 +2368,9 @@ impl Coordinator { ); } + /// WARNING! This should mirror the semantics of `optimize_peek_deprecated`. + /// + /// See ./doc/developer/design/20230714_optimizer_interface.md#minimal-viable-prototype fn optimize_peek( catalog: &CatalogState, compute: ComputeInstanceSnapshot, @@ -2416,6 +2452,9 @@ impl Coordinator { }) } + /// WARNING! This should mirror the semantics of `peek_stage_timestamp_deprecated`. + /// + /// See ./doc/developer/design/20230714_optimizer_interface.md#minimal-viable-prototype #[tracing::instrument(level = "debug", skip_all)] fn peek_stage_timestamp( &mut self, @@ -2501,6 +2540,9 @@ impl Coordinator { } } + /// WARNING! This should mirror the semantics of `peek_stage_finish_deprecated`. + /// + /// See ./doc/developer/design/20230714_optimizer_interface.md#minimal-viable-prototype #[tracing::instrument(level = "debug", skip_all)] async fn peek_stage_finish( &mut self, @@ -2591,6 +2633,10 @@ impl Coordinator { /// Determines the query timestamp and acquires read holds on dependent sources /// if necessary. + /// + /// WARNING! This should mirror the semantics of `sequence_peek_timestamp_deprecated`. + /// + /// See ./doc/developer/design/20230714_optimizer_interface.md#minimal-viable-prototype async fn sequence_peek_timestamp( &mut self, session: &mut Session, @@ -2711,6 +2757,9 @@ impl Coordinator { Ok(determination) } + /// WARNING! This should mirror the semantics of `plan_peek_deprecated`. + /// + /// See ./doc/developer/design/20230714_optimizer_interface.md#minimal-viable-prototype async fn plan_peek( &mut self, mut dataflow: DataflowDescription, @@ -2778,7 +2827,7 @@ impl Coordinator { /// Checks to see if the session needs a real time recency timestamp and if so returns /// a future that will return the timestamp. - fn recent_timestamp( + pub(super) fn recent_timestamp( &self, session: &Session, source_ids: impl Iterator, @@ -5709,7 +5758,7 @@ impl mz_transform::StatisticsOracle for CachedStatisticsOracle { } impl Coordinator { - async fn statistics_oracle( + pub(super) async fn statistics_oracle( &self, session: &Session, source_ids: &BTreeSet, diff --git a/src/adapter/src/coord/sequencer/old_optimizer_api.rs b/src/adapter/src/coord/sequencer/old_optimizer_api.rs index 84a17c294f4d0..e1a9fa879edac 100644 --- a/src/adapter/src/coord/sequencer/old_optimizer_api.rs +++ b/src/adapter/src/coord/sequencer/old_optimizer_api.rs @@ -15,19 +15,44 @@ //! we will deprecate the old methods and move the implementations here to the //! `inner` module. -use mz_compute_types::dataflows::DataflowDesc; +#![allow(deprecated)] + +use std::collections::BTreeSet; + +use mz_cluster_client::ReplicaId; +use mz_compute_types::dataflows::{DataflowDesc, DataflowDescription, IndexDesc}; use mz_compute_types::sinks::{ComputeSinkConnection, ComputeSinkDesc, SubscribeSinkConnection}; -use mz_repr::RelationDesc; -use mz_sql::plan::{self, QueryWhen, SubscribeFrom}; +use mz_controller_types::ClusterId; +use mz_expr::{ + permutation_for_arrangement, CollectionPlan, MirRelationExpr, MirScalarExpr, + OptimizedMirRelationExpr, RowSetFinishing, +}; +use mz_ore::task; +use mz_repr::{GlobalId, RelationDesc, RelationType, Timestamp}; +use mz_sql::catalog::CatalogCluster; +use mz_sql::plan::{self, CopyFormat, QueryWhen, SubscribeFrom}; +use mz_transform::dataflow::DataflowMetainfo; +use mz_transform::{EmptyStatisticsOracle, Optimizer, StatisticsOracle}; use timely::progress::Antichain; use tokio::sync::mpsc; +use tracing::{event, warn, Level}; -use crate::coord::sequencer::inner::check_log_reads; -use crate::coord::{Coordinator, TargetCluster}; -use crate::session::TransactionOps; +use crate::catalog::CatalogState; +use crate::coord::dataflows::{ + prep_relation_expr, prep_scalar_expr, ComputeInstanceSnapshot, DataflowBuilder, EvalTime, + ExprPrepStyle, +}; +use crate::coord::peek::PlannedPeek; +use crate::coord::sequencer::inner::{check_log_reads, return_if_err}; +use crate::coord::timestamp_selection::TimestampDetermination; +use crate::coord::{Coordinator, Message, PlanValidity, RealTimeRecencyContext, TargetCluster}; +use crate::session::{Session, TransactionOps, TransactionStatus}; use crate::subscribe::ActiveSubscribe; use crate::util::{ComputeSinkId, ResultExt}; -use crate::{AdapterError, AdapterNotice, ExecuteContext, ExecuteResponse, TimelineContext}; +use crate::{ + AdapterError, AdapterNotice, CollectionIdBundle, ExecuteContext, ExecuteResponse, + TimelineContext, TimestampContext, +}; impl Coordinator { // Subscribe @@ -239,4 +264,732 @@ impl Coordinator { }), } } + + // Peek + // ---- + + /// Processes as many peek stages as possible. + #[deprecated = "This is being replaced by sequence_peek_stage (see #20569)."] + #[tracing::instrument(level = "debug", skip_all)] + pub(crate) async fn sequence_peek_stage_deprecated( + &mut self, + mut ctx: ExecuteContext, + mut stage: PeekStageDeprecated, + ) { + // Process the current stage and allow for processing the next. + loop { + event!(Level::TRACE, stage = format!("{:?}", stage)); + + // Always verify peek validity. This is cheap, and prevents programming errors + // if we move any stages off thread. + if let Some(validity) = stage.validity() { + if let Err(err) = validity.check(self.catalog()) { + ctx.retire(Err(err)); + return; + } + } + + (ctx, stage) = match stage { + PeekStageDeprecated::Validate(stage) => { + let next = return_if_err!( + self.peek_stage_validate_deprecated(ctx.session_mut(), stage), + ctx + ); + (ctx, PeekStageDeprecated::Optimize(next)) + } + PeekStageDeprecated::Optimize(stage) => { + self.peek_stage_optimize_deprecated(ctx, stage).await; + return; + } + PeekStageDeprecated::Timestamp(stage) => { + match self.peek_stage_timestamp_deprecated(ctx, stage) { + Some((ctx, next)) => (ctx, PeekStageDeprecated::Finish(next)), + None => return, + } + } + PeekStageDeprecated::Finish(stage) => { + let res = self.peek_stage_finish_deprecated(&mut ctx, stage).await; + ctx.retire(res); + return; + } + } + } + } + + // Do some simple validation. We must defer most of it until after any off-thread work. + #[deprecated = "This is being replaced by peek_stage_validate (see #20569)."] + fn peek_stage_validate_deprecated( + &mut self, + session: &Session, + PeekStageValidateDeprecated { + plan, + target_cluster, + }: PeekStageValidateDeprecated, + ) -> Result { + let plan::SelectPlan { + source, + when, + finishing, + copy_to, + } = plan; + + // Two transient allocations. We could reclaim these if we don't use them, potentially. + // TODO: reclaim transient identifiers in fast path cases. + let view_id = self.allocate_transient_id()?; + let index_id = self.allocate_transient_id()?; + let catalog = self.catalog(); + + let cluster = catalog.resolve_target_cluster(target_cluster, session)?; + + let target_replica_name = session.vars().cluster_replica(); + let mut target_replica = target_replica_name + .map(|name| { + cluster + .replica_id(name) + .ok_or(AdapterError::UnknownClusterReplica { + cluster_name: cluster.name.clone(), + replica_name: name.to_string(), + }) + }) + .transpose()?; + + if cluster.replicas().next().is_none() { + return Err(AdapterError::NoClusterReplicasAvailable( + cluster.name.clone(), + )); + } + + let source_ids = source.depends_on(); + let mut timeline_context = self.validate_timeline_context(source_ids.clone())?; + if matches!(timeline_context, TimelineContext::TimestampIndependent) + && source.contains_temporal() + { + // If the source IDs are timestamp independent but the query contains temporal functions, + // then the timeline context needs to be upgraded to timestamp dependent. This is + // required because `source_ids` doesn't contain functions. + timeline_context = TimelineContext::TimestampDependent; + } + let in_immediate_multi_stmt_txn = session.transaction().is_in_multi_statement_transaction() + && when == QueryWhen::Immediately; + + let notices = check_log_reads( + catalog, + cluster, + &source_ids, + &mut target_replica, + session.vars(), + )?; + session.add_notices(notices); + + let validity = PlanValidity { + transient_revision: catalog.transient_revision(), + dependency_ids: source_ids.clone(), + cluster_id: Some(cluster.id()), + replica_id: target_replica, + role_metadata: session.role_metadata().clone(), + }; + + Ok(PeekStageOptimizeDeprecated { + validity, + source, + finishing, + copy_to, + view_id, + index_id, + source_ids, + cluster_id: cluster.id(), + when, + target_replica, + timeline_context, + in_immediate_multi_stmt_txn, + }) + } + + #[deprecated = "This is being replaced by peek_stage_optimize (see #20569)."] + async fn peek_stage_optimize_deprecated( + &mut self, + ctx: ExecuteContext, + mut stage: PeekStageOptimizeDeprecated, + ) { + // Generate data structures that can be moved to another task where we will perform possibly + // expensive optimizations. + let catalog = self.owned_catalog(); + let compute = ComputeInstanceSnapshot::new(&self.controller, stage.cluster_id) + .expect("compute instance does not exist"); + let internal_cmd_tx = self.internal_cmd_tx.clone(); + + // TODO: Is there a way to avoid making two dataflow_builders (the second is in + // optimize_peek)? + let id_bundle = self + .dataflow_builder(stage.cluster_id) + .sufficient_collections(&stage.source_ids); + // Although we have added `sources.depends_on()` to the validity already, also add the + // sufficient collections for safety. + stage.validity.dependency_ids.extend(id_bundle.iter()); + + let stats = { + match self + .determine_timestamp( + ctx.session(), + &id_bundle, + &stage.when, + stage.cluster_id, + &stage.timeline_context, + None, + ) + .await + { + Err(_) => Box::new(EmptyStatisticsOracle), + Ok(query_as_of) => self + .statistics_oracle( + ctx.session(), + &stage.source_ids, + query_as_of.timestamp_context.antichain(), + true, + ) + .await + .unwrap_or_else(|_| Box::new(EmptyStatisticsOracle)), + } + }; + + mz_ore::task::spawn_blocking( + || "optimize peek", + move || match Self::optimize_peek_deprecated( + catalog.state(), + compute, + ctx.session(), + stats, + id_bundle, + stage, + ) { + Ok(stage) => { + let stage = PeekStageDeprecated::Timestamp(stage); + // Ignore errors if the coordinator has shut down. + let _ = internal_cmd_tx.send(Message::PeekStageDeprecatedReady { ctx, stage }); + } + Err(err) => ctx.retire(Err(err)), + }, + ); + } + + #[deprecated = "This is being replaced by optimize_peek (see #20569)."] + fn optimize_peek_deprecated( + catalog: &CatalogState, + compute: ComputeInstanceSnapshot, + session: &Session, + stats: Box, + id_bundle: CollectionIdBundle, + PeekStageOptimizeDeprecated { + validity, + source, + finishing, + copy_to, + view_id, + index_id, + source_ids, + cluster_id, + when, + target_replica, + timeline_context, + in_immediate_multi_stmt_txn, + }: PeekStageOptimizeDeprecated, + ) -> Result { + let optimizer = Optimizer::logical_optimizer(&mz_transform::typecheck::empty_context()); + let source = optimizer.optimize(source)?; + let mut builder = DataflowBuilder::new(catalog, compute); + + // We create a dataflow and optimize it, to determine if we can avoid building it. + // This can happen if the result optimizes to a constant, or to a `Get` expression + // around a maintained arrangement. + let typ = source.typ(); + let key: Vec = typ + .default_key() + .iter() + .map(|k| MirScalarExpr::Column(*k)) + .collect(); + // The assembled dataflow contains a view and an index of that view. + let mut dataflow = DataflowDesc::new(format!("oneshot-select-{}", view_id)); + builder.import_view_into_dataflow(&view_id, &source, &mut dataflow)?; + + // Resolve all unmaterializable function calls except mz_now(), because we don't yet have a + // timestamp. + let style = ExprPrepStyle::OneShot { + logical_time: EvalTime::Deferred, + session, + }; + dataflow.visit_children( + |r| prep_relation_expr(catalog, r, style), + |s| prep_scalar_expr(catalog, s, style), + )?; + + dataflow.export_index( + index_id, + IndexDesc { + on_id: view_id, + key: key.clone(), + }, + typ.clone(), + ); + + // Optimize the dataflow across views, and any other ways that appeal. + let dataflow_metainfo = mz_transform::optimize_dataflow(&mut dataflow, &builder, &*stats)?; + + Ok(PeekStageTimestampDeprecated { + validity, + dataflow, + finishing, + copy_to, + view_id, + index_id, + source_ids, + cluster_id, + id_bundle, + when, + target_replica, + timeline_context, + in_immediate_multi_stmt_txn, + key, + typ, + dataflow_metainfo, + }) + } + + #[deprecated = "This is being replaced by peek_stage_timestamp (see #20569)."] + #[tracing::instrument(level = "debug", skip_all)] + fn peek_stage_timestamp_deprecated( + &mut self, + ctx: ExecuteContext, + PeekStageTimestampDeprecated { + validity, + dataflow, + finishing, + copy_to, + view_id, + index_id, + source_ids, + cluster_id, + id_bundle, + when, + target_replica, + timeline_context, + in_immediate_multi_stmt_txn, + key, + typ, + dataflow_metainfo, + }: PeekStageTimestampDeprecated, + ) -> Option<(ExecuteContext, PeekStageFinishDeprecated)> { + match self.recent_timestamp(ctx.session(), source_ids.iter().cloned()) { + Some(fut) => { + let internal_cmd_tx = self.internal_cmd_tx.clone(); + let conn_id = ctx.session().conn_id().clone(); + self.pending_real_time_recency_timestamp.insert( + conn_id.clone(), + RealTimeRecencyContext::PeekDeprecated { + ctx, + finishing, + copy_to, + dataflow, + cluster_id, + when, + target_replica, + view_id, + index_id, + timeline_context, + source_ids, + in_immediate_multi_stmt_txn, + key, + typ, + dataflow_metainfo, + }, + ); + task::spawn(|| "real_time_recency_peek", async move { + let real_time_recency_ts = fut.await; + // It is not an error for these results to be ready after `internal_cmd_rx` has been dropped. + let result = internal_cmd_tx.send(Message::RealTimeRecencyTimestamp { + conn_id: conn_id.clone(), + real_time_recency_ts, + validity, + }); + if let Err(e) = result { + warn!("internal_cmd_rx dropped before we could send: {:?}", e); + } + }); + None + } + None => Some(( + ctx, + PeekStageFinishDeprecated { + validity, + finishing, + copy_to, + dataflow, + cluster_id, + id_bundle: Some(id_bundle), + when, + target_replica, + view_id, + index_id, + timeline_context, + source_ids, + real_time_recency_ts: None, + key, + typ, + dataflow_metainfo, + }, + )), + } + } + + #[deprecated = "This is being replaced by peek_stage_finish (see #20569)."] + #[tracing::instrument(level = "debug", skip_all)] + async fn peek_stage_finish_deprecated( + &mut self, + ctx: &mut ExecuteContext, + PeekStageFinishDeprecated { + validity: _, + finishing, + copy_to, + dataflow, + cluster_id, + id_bundle, + when, + target_replica, + view_id, + index_id, + timeline_context, + source_ids, + real_time_recency_ts, + key, + typ, + dataflow_metainfo, + }: PeekStageFinishDeprecated, + ) -> Result { + let id_bundle = id_bundle.unwrap_or_else(|| { + self.index_oracle(cluster_id) + .sufficient_collections(&source_ids) + }); + let peek_plan = self + .plan_peek_deprecated( + dataflow, + ctx.session_mut(), + &when, + cluster_id, + view_id, + index_id, + timeline_context, + source_ids, + &id_bundle, + real_time_recency_ts, + key, + typ, + &finishing, + ) + .await?; + + let determination = peek_plan.determination.clone(); + + let max_query_result_size = std::cmp::min( + ctx.session().vars().max_query_result_size(), + self.catalog().system_config().max_result_size(), + ); + // Implement the peek, and capture the response. + let resp = self + .implement_peek_plan( + ctx.extra_mut(), + peek_plan, + finishing, + cluster_id, + target_replica, + max_query_result_size, + ) + .await?; + + if ctx.session().vars().emit_timestamp_notice() { + let explanation = + self.explain_timestamp(ctx.session(), cluster_id, &id_bundle, determination); + ctx.session() + .add_notice(AdapterNotice::QueryTimestamp { explanation }); + } + self.emit_optimizer_notices(ctx.session(), &dataflow_metainfo.optimizer_notices); + + match copy_to { + None => Ok(resp), + Some(format) => Ok(ExecuteResponse::CopyTo { + format, + resp: Box::new(resp), + }), + } + } + + /// Determines the query timestamp and acquires read holds on dependent sources + /// if necessary. + #[deprecated = "This is being replaced by sequence_peek_timestamp (see #20569)."] + async fn sequence_peek_timestamp_deprecated( + &mut self, + session: &mut Session, + when: &QueryWhen, + cluster_id: ClusterId, + timeline_context: TimelineContext, + source_bundle: &CollectionIdBundle, + source_ids: &BTreeSet, + real_time_recency_ts: Option, + ) -> Result, AdapterError> { + let in_immediate_multi_stmt_txn = session.transaction().in_immediate_multi_stmt_txn(when); + let timedomain_bundle; + + // Fetch or generate a timestamp for this query and what the read holds would be if we need to set + // them. + let (determination, potential_read_holds) = + match session.get_transaction_timestamp_determination() { + // Use the transaction's timestamp if it exists and this isn't an AS OF query. + Some( + determination @ TimestampDetermination { + timestamp_context: TimestampContext::TimelineTimestamp(_, _), + .. + }, + ) if in_immediate_multi_stmt_txn => (determination, None), + _ => { + let determine_bundle = if in_immediate_multi_stmt_txn { + // In a transaction, determine a timestamp that will be valid for anything in + // any schema referenced by the first query. + timedomain_bundle = self.timedomain_for( + source_ids, + &timeline_context, + session.conn_id(), + cluster_id, + )?; + &timedomain_bundle + } else { + // If not in a transaction, use the source. + source_bundle + }; + let determination = self + .determine_timestamp( + session, + determine_bundle, + when, + cluster_id, + &timeline_context, + real_time_recency_ts, + ) + .await?; + // We only need read holds if the read depends on a timestamp. We don't set the + // read holds here because it makes the code a bit more clear to handle the two + // cases for "is this the first statement in a transaction?" in an if/else block + // below. + let read_holds = determination + .timestamp_context + .timestamp() + .map(|timestamp| (timestamp.clone(), determine_bundle)); + (determination, read_holds) + } + }; + + // If we're in a multi-statement transaction and the query does not use `AS OF`, + // acquire read holds on any sources in the current time-domain if they have not + // already been acquired. If the query does use `AS OF`, it is not necessary to + // acquire read holds. + if in_immediate_multi_stmt_txn { + // Either set the valid read ids for this transaction (if it's the first statement in a + // transaction) otherwise verify the ids referenced in this query are in the timedomain. + if let Some(txn_reads) = self.txn_reads.get(session.conn_id()) { + // Find referenced ids not in the read hold. A reference could be caused by a + // user specifying an object in a different schema than the first query. An + // index could be caused by a CREATE INDEX after the transaction started. + let allowed_id_bundle = txn_reads.id_bundle(); + let outside = source_bundle.difference(&allowed_id_bundle); + // Queries without a timestamp and timeline can belong to any existing timedomain. + if determination.timestamp_context.contains_timestamp() && !outside.is_empty() { + let valid_names = + self.resolve_collection_id_bundle_names(session, &allowed_id_bundle); + let invalid_names = self.resolve_collection_id_bundle_names(session, &outside); + return Err(AdapterError::RelationOutsideTimeDomain { + relations: invalid_names, + names: valid_names, + }); + } + } else { + if let Some((timestamp, bundle)) = potential_read_holds { + let read_holds = self.acquire_read_holds(timestamp, bundle); + self.txn_reads.insert(session.conn_id().clone(), read_holds); + } + } + } + + // TODO: Checking for only `InTransaction` and not `Implied` (also `Started`?) seems + // arbitrary and we don't recall why we did it (possibly an error!). Change this to always + // set the transaction ops. Decide and document what our policy should be on AS OF queries. + // Maybe they shouldn't be allowed in transactions at all because it's hard to explain + // what's going on there. This should probably get a small design document. + + // We only track the peeks in the session if the query doesn't use AS + // OF or we're inside an explicit transaction. The latter case is + // necessary to support PG's `BEGIN` semantics, whose behavior can + // depend on whether or not reads have occurred in the txn. + let mut transaction_determination = determination.clone(); + if when.is_transactional() { + session.add_transaction_ops(TransactionOps::Peeks { + determination: transaction_determination, + cluster_id, + })?; + } else if matches!(session.transaction(), &TransactionStatus::InTransaction(_)) { + // If the query uses AS OF, then ignore the timestamp. + transaction_determination.timestamp_context = TimestampContext::NoTimestamp; + session.add_transaction_ops(TransactionOps::Peeks { + determination: transaction_determination, + cluster_id, + })?; + }; + + Ok(determination) + } + + #[deprecated = "This is being replaced by plan_peek (see #20569)."] + async fn plan_peek_deprecated( + &mut self, + mut dataflow: DataflowDescription, + session: &mut Session, + when: &QueryWhen, + cluster_id: ClusterId, + view_id: GlobalId, + index_id: GlobalId, + timeline_context: TimelineContext, + source_ids: BTreeSet, + id_bundle: &CollectionIdBundle, + real_time_recency_ts: Option, + key: Vec, + typ: RelationType, + finishing: &RowSetFinishing, + ) -> Result { + let conn_id = session.conn_id().clone(); + let determination = self + .sequence_peek_timestamp_deprecated( + session, + when, + cluster_id, + timeline_context, + id_bundle, + &source_ids, + real_time_recency_ts, + ) + .await?; + + // Now that we have a timestamp, set the as of and resolve calls to mz_now(). + dataflow.set_as_of(determination.timestamp_context.antichain()); + let style = ExprPrepStyle::OneShot { + logical_time: EvalTime::Time(determination.timestamp_context.timestamp_or_default()), + session, + }; + let state = self.catalog().state(); + dataflow.visit_children( + |r| prep_relation_expr(state, r, style), + |s| prep_scalar_expr(state, s, style), + )?; + + let (permutation, thinning) = permutation_for_arrangement(&key, typ.arity()); + + // At this point, `dataflow_plan` contains our best optimized dataflow. + // We will check the plan to see if there is a fast path to escape full dataflow construction. + let peek_plan = self.create_peek_plan( + dataflow, + view_id, + cluster_id, + index_id, + key, + permutation, + thinning.len(), + finishing, + )?; + + Ok(PlannedPeek { + plan: peek_plan, + determination, + conn_id, + source_arity: typ.arity(), + source_ids, + }) + } +} + +#[derive(Debug)] +pub enum PeekStageDeprecated { + Validate(PeekStageValidateDeprecated), + Optimize(PeekStageOptimizeDeprecated), + Timestamp(PeekStageTimestampDeprecated), + Finish(PeekStageFinishDeprecated), +} + +impl PeekStageDeprecated { + fn validity(&mut self) -> Option<&mut PlanValidity> { + match self { + PeekStageDeprecated::Validate(_) => None, + PeekStageDeprecated::Optimize(PeekStageOptimizeDeprecated { validity, .. }) + | PeekStageDeprecated::Timestamp(PeekStageTimestampDeprecated { validity, .. }) + | PeekStageDeprecated::Finish(PeekStageFinishDeprecated { validity, .. }) => { + Some(validity) + } + } + } +} + +#[derive(Debug)] +pub struct PeekStageValidateDeprecated { + pub plan: mz_sql::plan::SelectPlan, + pub target_cluster: TargetCluster, +} + +#[derive(Debug)] +pub struct PeekStageOptimizeDeprecated { + pub validity: PlanValidity, + pub source: MirRelationExpr, + pub finishing: RowSetFinishing, + pub copy_to: Option, + pub view_id: GlobalId, + pub index_id: GlobalId, + pub source_ids: BTreeSet, + pub cluster_id: ClusterId, + pub when: QueryWhen, + pub target_replica: Option, + pub timeline_context: TimelineContext, + pub in_immediate_multi_stmt_txn: bool, +} + +#[derive(Debug)] +pub struct PeekStageTimestampDeprecated { + pub validity: PlanValidity, + pub dataflow: DataflowDescription, + pub finishing: RowSetFinishing, + pub copy_to: Option, + pub view_id: GlobalId, + pub index_id: GlobalId, + pub source_ids: BTreeSet, + pub cluster_id: ClusterId, + pub id_bundle: CollectionIdBundle, + pub when: QueryWhen, + pub target_replica: Option, + pub timeline_context: TimelineContext, + pub in_immediate_multi_stmt_txn: bool, + pub key: Vec, + pub typ: RelationType, + pub dataflow_metainfo: DataflowMetainfo, +} + +#[derive(Debug)] +pub struct PeekStageFinishDeprecated { + pub validity: PlanValidity, + pub finishing: RowSetFinishing, + pub copy_to: Option, + pub dataflow: DataflowDescription, + pub cluster_id: ClusterId, + pub id_bundle: Option, + pub when: QueryWhen, + pub target_replica: Option, + pub view_id: GlobalId, + pub index_id: GlobalId, + pub timeline_context: TimelineContext, + pub source_ids: BTreeSet, + pub real_time_recency_ts: Option, + pub key: Vec, + pub typ: RelationType, + pub dataflow_metainfo: DataflowMetainfo, }