From 2be25d0f7fcdd14a6f258a2ab26b691dc4c0dc86 Mon Sep 17 00:00:00 2001 From: discord9 Date: Mon, 30 Dec 2024 15:28:57 +0800 Subject: [PATCH] feat: fix time index for flow plan --- src/flow/src/adapter.rs | 12 +---- src/flow/src/expr/utils.rs | 1 + src/flow/src/plan.rs | 13 +++++ src/flow/src/plan/utils.rs | 99 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 115 insertions(+), 10 deletions(-) create mode 100644 src/flow/src/plan/utils.rs diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index d38de1347af2..d67e66601842 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -29,6 +29,7 @@ use common_runtime::JoinHandle; use common_telemetry::logging::{LoggingOptions, TracingOptions}; use common_telemetry::{debug, info, trace}; use datatypes::schema::ColumnSchema; +use datatypes::time; use datatypes::value::Value; use greptime_proto::v1; use itertools::{EitherOrBoth, Itertools}; @@ -56,7 +57,7 @@ use crate::error::{ EvalSnafu, ExternalSnafu, FlowAlreadyExistSnafu, InternalSnafu, InvalidQuerySnafu, UnexpectedSnafu, }; -use crate::expr::Batch; +use crate::expr::{Batch, ScalarExpr}; use crate::metrics::{METRIC_FLOW_INSERT_ELAPSED, METRIC_FLOW_ROWS, METRIC_FLOW_RUN_INTERVAL_MS}; use crate::plan::TypedPlan; use crate::repr::{self, DiffRow, RelationDesc, Row, BATCH_SIZE}; @@ -753,15 +754,6 @@ impl FlowWorkerManager { Ok(()) } - /// adjust flow plan's time index to match real table schema - async fn fix_time_index_for_flow_plan( - &self, - flow_plan: &TypedPlan, - real_schema: &[ColumnSchema], - ) -> Result { - todo!() - } - ///// check schema against actual table schema if exists /// if not exist create sink table immediately async fn valid_or_create_sink_table( diff --git a/src/flow/src/expr/utils.rs b/src/flow/src/expr/utils.rs index 84019c958979..9d359c29545d 100644 --- a/src/flow/src/expr/utils.rs +++ b/src/flow/src/expr/utils.rs @@ -15,6 +15,7 @@ use std::cmp::Ordering; use std::collections::BTreeMap; +use datatypes::schema::ColumnSchema; use datatypes::value::Value; use snafu::{ensure, OptionExt}; diff --git a/src/flow/src/plan.rs b/src/flow/src/plan.rs index f6b33c46cb5c..efb1b2df8c99 100644 --- a/src/flow/src/plan.rs +++ b/src/flow/src/plan.rs @@ -17,6 +17,7 @@ mod join; mod reduce; +mod utils; use std::collections::BTreeSet; @@ -208,6 +209,18 @@ impl Plan { } } + /// Get mutable ref to the first input plan if exists + pub fn get_mut_first_input_plan(&mut self) -> Option<&mut TypedPlan> { + match self { + Plan::Let { value, .. } => Some(value), + Plan::Mfp { input, .. } => Some(input), + Plan::Reduce { input, .. } => Some(input), + Plan::Join { inputs, .. } => inputs.first_mut(), + Plan::Union { inputs, .. } => inputs.first_mut(), + _ => None, + } + } + /// Find all the used collection in the plan pub fn find_used_collection(&self) -> BTreeSet { fn recur_find_use(plan: &Plan, used: &mut BTreeSet) { diff --git a/src/flow/src/plan/utils.rs b/src/flow/src/plan/utils.rs new file mode 100644 index 000000000000..6a8d7d5ace8b --- /dev/null +++ b/src/flow/src/plan/utils.rs @@ -0,0 +1,99 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use datatypes::schema::ColumnSchema; +use snafu::OptionExt; + +use crate::error::{Error, UnexpectedSnafu}; +use crate::plan::{ScalarExpr, TypedPlan}; + +/// Fix the time index of a flow plan to match the real schema. +/// +/// This function will traverse the plan to find the deepest time index expr, and set that expr as time index, +/// so that internal `EXPIRE AFTER` mechanism can work properly. +pub fn fix_time_index_for_flow_plan( + flow_plan: &TypedPlan, + real_schema: &[ColumnSchema], +) -> Result { + let outer_time_index = { + let mut outer_time_index = None; + for (idx, real_col) in real_schema.iter().enumerate() { + if real_col.is_time_index() { + outer_time_index = Some(idx); + break; + } + } + + match outer_time_index { + Some(outer) => outer, + None => { + return UnexpectedSnafu { + reason: "No time index found in real schema".to_string(), + } + .fail()? + } + } + }; + let mut rewrite_plan = flow_plan.clone(); + + let mut cur_plan = &mut rewrite_plan; + let mut time_index = outer_time_index; + let mut expr_time_index; + loop { + // follow upward and find deepest time index expr that is not a column ref, and set time index to it. + if let Some(ty) = cur_plan.schema.typ.column_types.get(time_index) + && ty.scalar_type.is_timestamp() + { + cur_plan.schema.typ = cur_plan + .schema + .typ + .clone() + .with_time_index(Some(time_index)); + } else { + UnexpectedSnafu { + reason: format!( + "Time index column type mismatch, expect timestamp got {:?}", + cur_plan.schema.typ.column_types.get(time_index) + ), + } + .fail()? + } + + expr_time_index = Some(cur_plan.plan.get_nth_expr(time_index).context( + UnexpectedSnafu { + reason: "Failed to find time index expr", + }, + )?); + let inner_time_index = if let Some(ScalarExpr::Column(i)) = expr_time_index { + i + } else { + break; + }; + let inner_plan = if let Some(input) = cur_plan.plan.get_mut_first_input_plan() { + input + } else { + break; + }; + + time_index = inner_time_index; + cur_plan = inner_plan; + } + + Ok(rewrite_plan) +} + +#[cfg(test)] +mod test { + use super::*; +}