Skip to content

Commit

Permalink
feat: fix time index for flow plan
Browse files Browse the repository at this point in the history
  • Loading branch information
discord9 committed Dec 30, 2024
1 parent cd3df5d commit 2be25d0
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 10 deletions.
12 changes: 2 additions & 10 deletions src/flow/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use common_runtime::JoinHandle;
use common_telemetry::logging::{LoggingOptions, TracingOptions};
use common_telemetry::{debug, info, trace};
use datatypes::schema::ColumnSchema;
use datatypes::time;

Check failure on line 32 in src/flow/src/adapter.rs

View workflow job for this annotation

GitHub Actions / Clippy

unused import: `datatypes::time`

Check failure on line 32 in src/flow/src/adapter.rs

View workflow job for this annotation

GitHub Actions / Check (ubuntu-20.04)

unused import: `datatypes::time`
use datatypes::value::Value;
use greptime_proto::v1;
use itertools::{EitherOrBoth, Itertools};
Expand Down Expand Up @@ -56,7 +57,7 @@ use crate::error::{
EvalSnafu, ExternalSnafu, FlowAlreadyExistSnafu, InternalSnafu, InvalidQuerySnafu,
UnexpectedSnafu,
};
use crate::expr::Batch;
use crate::expr::{Batch, ScalarExpr};

Check failure on line 60 in src/flow/src/adapter.rs

View workflow job for this annotation

GitHub Actions / Clippy

unused import: `ScalarExpr`

Check failure on line 60 in src/flow/src/adapter.rs

View workflow job for this annotation

GitHub Actions / Check (ubuntu-20.04)

unused import: `ScalarExpr`
use crate::metrics::{METRIC_FLOW_INSERT_ELAPSED, METRIC_FLOW_ROWS, METRIC_FLOW_RUN_INTERVAL_MS};
use crate::plan::TypedPlan;
use crate::repr::{self, DiffRow, RelationDesc, Row, BATCH_SIZE};
Expand Down Expand Up @@ -753,15 +754,6 @@ impl FlowWorkerManager {
Ok(())
}

/// adjust flow plan's time index to match real table schema
async fn fix_time_index_for_flow_plan(
&self,
flow_plan: &TypedPlan,
real_schema: &[ColumnSchema],
) -> Result<TypedPlan, Error> {
todo!()
}

///// check schema against actual table schema if exists
/// if not exist create sink table immediately
async fn valid_or_create_sink_table(
Expand Down
1 change: 1 addition & 0 deletions src/flow/src/expr/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::cmp::Ordering;
use std::collections::BTreeMap;

use datatypes::schema::ColumnSchema;

Check failure on line 18 in src/flow/src/expr/utils.rs

View workflow job for this annotation

GitHub Actions / Clippy

unused import: `datatypes::schema::ColumnSchema`

Check failure on line 18 in src/flow/src/expr/utils.rs

View workflow job for this annotation

GitHub Actions / Check (ubuntu-20.04)

unused import: `datatypes::schema::ColumnSchema`
use datatypes::value::Value;
use snafu::{ensure, OptionExt};

Expand Down
13 changes: 13 additions & 0 deletions src/flow/src/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
mod join;
mod reduce;
mod utils;

use std::collections::BTreeSet;

Expand Down Expand Up @@ -208,6 +209,18 @@ impl Plan {
}
}

/// Get mutable ref to the first input plan if exists
pub fn get_mut_first_input_plan(&mut self) -> Option<&mut TypedPlan> {
match self {
Plan::Let { value, .. } => Some(value),
Plan::Mfp { input, .. } => Some(input),
Plan::Reduce { input, .. } => Some(input),
Plan::Join { inputs, .. } => inputs.first_mut(),
Plan::Union { inputs, .. } => inputs.first_mut(),
_ => None,
}
}

/// Find all the used collection in the plan
pub fn find_used_collection(&self) -> BTreeSet<GlobalId> {
fn recur_find_use(plan: &Plan, used: &mut BTreeSet<GlobalId>) {
Expand Down
99 changes: 99 additions & 0 deletions src/flow/src/plan/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use datatypes::schema::ColumnSchema;
use snafu::OptionExt;

use crate::error::{Error, UnexpectedSnafu};
use crate::plan::{ScalarExpr, TypedPlan};

/// Fix the time index of a flow plan to match the real schema.
///
/// This function will traverse the plan to find the deepest time index expr, and set that expr as time index,
/// so that internal `EXPIRE AFTER` mechanism can work properly.
pub fn fix_time_index_for_flow_plan(
flow_plan: &TypedPlan,
real_schema: &[ColumnSchema],
) -> Result<TypedPlan, Error> {
let outer_time_index = {
let mut outer_time_index = None;
for (idx, real_col) in real_schema.iter().enumerate() {
if real_col.is_time_index() {
outer_time_index = Some(idx);
break;
}
}

match outer_time_index {
Some(outer) => outer,
None => {
return UnexpectedSnafu {
reason: "No time index found in real schema".to_string(),
}
.fail()?
}
}
};
let mut rewrite_plan = flow_plan.clone();

let mut cur_plan = &mut rewrite_plan;
let mut time_index = outer_time_index;
let mut expr_time_index;
loop {
// follow upward and find deepest time index expr that is not a column ref, and set time index to it.
if let Some(ty) = cur_plan.schema.typ.column_types.get(time_index)
&& ty.scalar_type.is_timestamp()
{
cur_plan.schema.typ = cur_plan
.schema
.typ
.clone()
.with_time_index(Some(time_index));
} else {
UnexpectedSnafu {
reason: format!(
"Time index column type mismatch, expect timestamp got {:?}",
cur_plan.schema.typ.column_types.get(time_index)
),
}
.fail()?
}

expr_time_index = Some(cur_plan.plan.get_nth_expr(time_index).context(
UnexpectedSnafu {
reason: "Failed to find time index expr",
},
)?);
let inner_time_index = if let Some(ScalarExpr::Column(i)) = expr_time_index {
i
} else {
break;
};
let inner_plan = if let Some(input) = cur_plan.plan.get_mut_first_input_plan() {
input
} else {
break;
};

time_index = inner_time_index;
cur_plan = inner_plan;
}

Ok(rewrite_plan)
}

#[cfg(test)]
mod test {
use super::*;
}

0 comments on commit 2be25d0

Please sign in to comment.