Skip to content

Commit

Permalink
adapter: use the new Optimize API for sequence_peek
Browse files Browse the repository at this point in the history
Introduce `OptimizePeek` in the `sequence_peek` optimization paths in
the `Coordinator`.

See the rollout proposal in MaterializeInc#20569 for details.
  • Loading branch information
aalexandrov committed Nov 7, 2023
1 parent 813fbe2 commit 43d21b6
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 204 deletions.
35 changes: 7 additions & 28 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,20 +314,14 @@ pub enum RealTimeRecencyContext {
},
Peek {
ctx: ExecuteContext,
finishing: RowSetFinishing,
copy_to: Option<CopyFormat>,
dataflow: DataflowDescription<OptimizedMirRelationExpr>,
cluster_id: ClusterId,
when: QueryWhen,
target_replica: Option<ReplicaId>,
view_id: GlobalId,
index_id: GlobalId,
timeline_context: TimelineContext,
source_ids: BTreeSet<GlobalId>,
in_immediate_multi_stmt_txn: bool,
key: Vec<MirScalarExpr>,
typ: RelationType,
dataflow_metainfo: DataflowMetainfo,
optimizer: optimize::OptimizePeek,
global_mir_plan: optimize::peek::GlobalMirPlan,
},
PeekDeprecated {
ctx: ExecuteContext,
Expand Down Expand Up @@ -387,56 +381,41 @@ pub struct PeekStageValidate {
pub struct PeekStageOptimize {
validity: PlanValidity,
source: MirRelationExpr,
finishing: RowSetFinishing,
copy_to: Option<CopyFormat>,
view_id: GlobalId,
index_id: GlobalId,
source_ids: BTreeSet<GlobalId>,
cluster_id: ClusterId,
when: QueryWhen,
target_replica: Option<ReplicaId>,
timeline_context: TimelineContext,
in_immediate_multi_stmt_txn: bool,
optimizer: optimize::OptimizePeek,
}

#[derive(Debug)]
pub struct PeekStageTimestamp {
validity: PlanValidity,
dataflow: DataflowDescription<OptimizedMirRelationExpr>,
finishing: RowSetFinishing,
copy_to: Option<CopyFormat>,
view_id: GlobalId,
index_id: GlobalId,
source_ids: BTreeSet<GlobalId>,
cluster_id: ClusterId,
id_bundle: CollectionIdBundle,
when: QueryWhen,
target_replica: Option<ReplicaId>,
timeline_context: TimelineContext,
in_immediate_multi_stmt_txn: bool,
key: Vec<MirScalarExpr>,
typ: RelationType,
dataflow_metainfo: DataflowMetainfo,
optimizer: optimize::OptimizePeek,
global_mir_plan: optimize::peek::GlobalMirPlan,
}

#[derive(Debug)]
pub struct PeekStageFinish {
validity: PlanValidity,
finishing: RowSetFinishing,
copy_to: Option<CopyFormat>,
dataflow: DataflowDescription<OptimizedMirRelationExpr>,
cluster_id: ClusterId,
id_bundle: Option<CollectionIdBundle>,
when: QueryWhen,
target_replica: Option<ReplicaId>,
view_id: GlobalId,
index_id: GlobalId,
timeline_context: TimelineContext,
source_ids: BTreeSet<GlobalId>,
real_time_recency_ts: Option<mz_repr::Timestamp>,
key: Vec<MirScalarExpr>,
typ: RelationType,
dataflow_metainfo: DataflowMetainfo,
optimizer: optimize::OptimizePeek,
global_mir_plan: optimize::peek::GlobalMirPlan,
}

/// An enum describing which cluster to run a statement on.
Expand Down
20 changes: 4 additions & 16 deletions src/adapter/src/coord/message_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -663,40 +663,28 @@ impl Coordinator {
}
RealTimeRecencyContext::Peek {
ctx,
finishing,
copy_to,
dataflow,
cluster_id,
when,
target_replica,
view_id,
index_id,
timeline_context,
source_ids,
in_immediate_multi_stmt_txn: _,
key,
typ,
dataflow_metainfo,
optimizer,
global_mir_plan,
} => {
self.sequence_peek_stage(
ctx,
PeekStage::Finish(PeekStageFinish {
validity,
finishing,
copy_to,
dataflow,
cluster_id,
id_bundle: None,
when,
target_replica,
view_id,
index_id,
timeline_context,
source_ids,
real_time_recency_ts: Some(real_time_recency_ts),
key,
typ,
dataflow_metainfo,
optimizer,
global_mir_plan,
}),
)
.await;
Expand Down
18 changes: 18 additions & 0 deletions src/adapter/src/coord/peek.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,24 @@ pub struct PeekDataflowPlan<T = mz_repr::Timestamp> {
thinned_arity: usize,
}

impl<T> PeekDataflowPlan<T> {
pub fn new(
desc: DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
id: GlobalId,
key: Vec<MirScalarExpr>,
permutation: BTreeMap<usize, usize>,
thinned_arity: usize,
) -> Self {
Self {
desc,
id,
key,
permutation,
thinned_arity,
}
}
}

#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Ord, PartialOrd)]
pub enum FastPathPlan {
/// The view evaluates to a constant result that can be returned.
Expand Down
Loading

0 comments on commit 43d21b6

Please sign in to comment.