Skip to content

Commit

Permalink
feat: Support unary, paren, bool keyword and nonexistent metric/label…
Browse files Browse the repository at this point in the history
… in PromQL (GreptimeTeam#1049)

* feat: don't report metric/label not found as error

Signed-off-by: Ruihang Xia <[email protected]>

* feat: impl unary expr

Signed-off-by: Ruihang Xia <[email protected]>

* feat: impl paren expr

Signed-off-by: Ruihang Xia <[email protected]>

* feat: support bool keyword

Signed-off-by: Ruihang Xia <[email protected]>

* add some tests

Signed-off-by: Ruihang Xia <[email protected]>

* ignore nonexistence labels during planning

Signed-off-by: Ruihang Xia <[email protected]>

* fix clippy

Signed-off-by: Ruihang Xia <[email protected]>

---------

Signed-off-by: Ruihang Xia <[email protected]>
  • Loading branch information
waynexia authored and paomian committed Oct 19, 2023
1 parent c0344ea commit 7af9ddf
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 78 deletions.
16 changes: 4 additions & 12 deletions src/promql/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use std::any::Any;

use common_error::prelude::*;
use datafusion::error::DataFusionError;
use promql_parser::label::Label;
use promql_parser::parser::{Expr as PromExpr, TokenType};

#[derive(Debug, Snafu)]
Expand Down Expand Up @@ -49,13 +48,6 @@ pub enum Error {
#[snafu(display("Cannot find value columns in table {}", table))]
ValueNotFound { table: String, backtrace: Backtrace },

#[snafu(display("Cannot find label {} in table {}", label, table,))]
LabelNotFound {
table: String,
label: Label,
backtrace: Backtrace,
},

#[snafu(display("Cannot find the table {}", table))]
TableNotFound {
table: String,
Expand Down Expand Up @@ -107,15 +99,15 @@ impl ErrorExt for Error {
| UnsupportedExpr { .. }
| UnexpectedToken { .. }
| MultipleVector { .. }
| LabelNotFound { .. }
| ExpectExpr { .. } => StatusCode::InvalidArguments,

UnknownTable { .. }
| TableNotFound { .. }
| DataFusionPlanning { .. }
| UnexpectedPlanExpr { .. }
| IllegalRange { .. }
| EmptyRange { .. }
| TableNameNotFound { .. } => StatusCode::Internal,
| EmptyRange { .. } => StatusCode::Internal,

TableNotFound { .. } | TableNameNotFound { .. } => StatusCode::TableNotFound,
}
}
fn backtrace_opt(&self) -> Option<&Backtrace> {
Expand Down
194 changes: 129 additions & 65 deletions src/promql/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,28 @@ use datafusion::datasource::DefaultTableSource;
use datafusion::logical_expr::expr::AggregateFunction;
use datafusion::logical_expr::expr_rewriter::normalize_cols;
use datafusion::logical_expr::{
AggregateFunction as AggregateFunctionEnum, BinaryExpr, BuiltinScalarFunction, Extension,
AggregateFunction as AggregateFunctionEnum, BinaryExpr, BuiltinScalarFunction, Cast, Extension,
LogicalPlan, LogicalPlanBuilder, Operator,
};
use datafusion::optimizer::utils;
use datafusion::prelude::{Column, Expr as DfExpr, JoinType};
use datafusion::scalar::ScalarValue;
use datafusion::sql::planner::ContextProvider;
use datafusion::sql::TableReference;
use datatypes::arrow::datatypes::DataType as ArrowDataType;
use promql_parser::label::{MatchOp, Matchers, METRIC_NAME};
use promql_parser::parser::{
token, AggModifier, AggregateExpr, BinaryExpr as PromBinaryExpr, Call, EvalStmt,
Expr as PromExpr, Function, MatrixSelector, NumberLiteral, Offset, ParenExpr, StringLiteral,
SubqueryExpr, TokenType, UnaryExpr, VectorSelector,
};
use snafu::{ensure, OptionExt, ResultExt};
use snafu::{OptionExt, ResultExt};
use table::table::adapter::DfTableProviderAdapter;

use crate::error::{
DataFusionPlanningSnafu, ExpectExprSnafu, LabelNotFoundSnafu, MultipleVectorSnafu, Result,
TableNameNotFoundSnafu, TableNotFoundSnafu, TimeIndexNotFoundSnafu, UnexpectedTokenSnafu,
UnknownTableSnafu, UnsupportedExprSnafu, ValueNotFoundSnafu,
DataFusionPlanningSnafu, ExpectExprSnafu, MultipleVectorSnafu, Result, TableNameNotFoundSnafu,
TableNotFoundSnafu, TimeIndexNotFoundSnafu, UnexpectedTokenSnafu, UnknownTableSnafu,
UnsupportedExprSnafu, ValueNotFoundSnafu,
};
use crate::extension_plan::{
InstantManipulate, Millisecond, RangeManipulate, SeriesDivide, SeriesNormalize,
Expand Down Expand Up @@ -129,11 +130,25 @@ impl<S: ContextProvider> PromPlanner<S> {
.build()
.context(DataFusionPlanningSnafu)?
}
PromExpr::Unary(UnaryExpr { .. }) => UnsupportedExprSnafu {
name: "Prom Unary Expr",
PromExpr::Unary(UnaryExpr { expr }) => {
// Unary Expr in PromQL implys the `-` operator
let input = self.prom_expr_to_plan(*expr.clone())?;
self.projection_for_each_value_column(input, |col| {
Ok(DfExpr::Negative(Box::new(DfExpr::Column(col.into()))))
})?
}
.fail()?,
PromExpr::Binary(PromBinaryExpr { lhs, rhs, op, .. }) => {
PromExpr::Binary(PromBinaryExpr {
lhs,
rhs,
op,
modifier,
}) => {
let should_cast_to_bool = if let Some(modifier) = modifier {
modifier.return_bool && Self::is_token_a_comparison_op(*op)
} else {
false
};

match (
Self::try_build_literal_expr(lhs),
Self::try_build_literal_expr(rhs),
Expand All @@ -147,22 +162,36 @@ impl<S: ContextProvider> PromPlanner<S> {
(Some(expr), None) => {
let input = self.prom_expr_to_plan(*rhs.clone())?;
self.projection_for_each_value_column(input, |col| {
Ok(DfExpr::BinaryExpr(BinaryExpr {
let mut binary_expr = DfExpr::BinaryExpr(BinaryExpr {
left: Box::new(expr.clone()),
op: Self::prom_token_to_binary_op(*op)?,
right: Box::new(DfExpr::Column(col.into())),
}))
});
if should_cast_to_bool {
binary_expr = DfExpr::Cast(Cast {
expr: Box::new(binary_expr),
data_type: ArrowDataType::Float64,
});
}
Ok(binary_expr)
})?
}
// lhs is a column, rhs is a literal
(None, Some(expr)) => {
let input = self.prom_expr_to_plan(*lhs.clone())?;
self.projection_for_each_value_column(input, |col| {
Ok(DfExpr::BinaryExpr(BinaryExpr {
let mut binary_expr = DfExpr::BinaryExpr(BinaryExpr {
left: Box::new(DfExpr::Column(col.into())),
op: Self::prom_token_to_binary_op(*op)?,
right: Box::new(expr.clone()),
}))
});
if should_cast_to_bool {
binary_expr = DfExpr::Cast(Cast {
expr: Box::new(binary_expr),
data_type: ArrowDataType::Float64,
});
}
Ok(binary_expr)
})?
}
// both are columns. join them on time index
Expand Down Expand Up @@ -190,19 +219,23 @@ impl<S: ContextProvider> PromPlanner<S> {
.context(DataFusionPlanningSnafu)?
.qualified_column();

Ok(DfExpr::BinaryExpr(BinaryExpr {
let mut binary_expr = DfExpr::BinaryExpr(BinaryExpr {
left: Box::new(DfExpr::Column(left_col)),
op: Self::prom_token_to_binary_op(*op)?,
right: Box::new(DfExpr::Column(right_col)),
}))
});
if should_cast_to_bool {
binary_expr = DfExpr::Cast(Cast {
expr: Box::new(binary_expr),
data_type: ArrowDataType::Float64,
});
}
Ok(binary_expr)
})?
}
}
}
PromExpr::Paren(ParenExpr { .. }) => UnsupportedExprSnafu {
name: "Prom Paren Expr",
}
.fail()?,
PromExpr::Paren(ParenExpr { expr }) => self.prom_expr_to_plan(*expr.clone())?,
PromExpr::Subquery(SubqueryExpr { .. }) => UnsupportedExprSnafu {
name: "Prom Subquery",
}
Expand Down Expand Up @@ -376,20 +409,10 @@ impl<S: ContextProvider> PromPlanner<S> {
AggModifier::By(labels) => {
let mut exprs = Vec::with_capacity(labels.len());
for label in labels {
let field = input_schema
.field_with_unqualified_name(label)
.map_err(|_| {
LabelNotFoundSnafu {
table: self
.ctx
.table_name
.clone()
.unwrap_or("no_table_name".to_string()),
label: label.clone(),
}
.build()
})?;
exprs.push(DfExpr::Column(Column::from(field.name())));
// nonexistence label will be ignored
if let Ok(field) = input_schema.field_with_unqualified_name(label) {
exprs.push(DfExpr::Column(Column::from(field.name())));
}
}

// change the tag columns in context
Expand All @@ -406,21 +429,13 @@ impl<S: ContextProvider> PromPlanner<S> {
.iter()
.map(|f| f.name())
.collect::<BTreeSet<_>>();

// remove "without"-ed fields
// nonexistence label will be ignored
for label in labels {
ensure!(
// ensure this field was existed
all_fields.remove(label),
LabelNotFoundSnafu {
table: self
.ctx
.table_name
.clone()
.unwrap_or("no_table_name".to_string()),
label: label.clone(),
}
);
all_fields.remove(label);
}

// remove time index and value fields
if let Some(time_index) = &self.ctx.time_index_column {
all_fields.remove(time_index);
Expand Down Expand Up @@ -495,7 +510,7 @@ impl<S: ContextProvider> PromPlanner<S> {
let table = self
.schema_provider
.get_table_provider(TableReference::Bare { table: &table_name })
.context(DataFusionPlanningSnafu)?
.context(TableNotFoundSnafu { table: &table_name })?
.as_any()
.downcast_ref::<DefaultTableSource>()
.context(UnknownTableSnafu)?
Expand Down Expand Up @@ -750,6 +765,19 @@ impl<S: ContextProvider> PromPlanner<S> {
}
}

/// Check if the given op is a [comparison operator](https://prometheus.io/docs/prometheus/latest/querying/operators/#comparison-binary-operators).
fn is_token_a_comparison_op(token: TokenType) -> bool {
matches!(
token.id(),
token::T_EQLC
| token::T_NEQ
| token::T_GTR
| token::T_LSS
| token::T_GTE
| token::T_LTE
)
}

/// Build a inner join on time index column and tag columns to concat two logical plans.
/// The left plan will be alised as [`LEFT_PLAN_JOIN_ALIAS`].
fn join_on_non_value_columns(
Expand Down Expand Up @@ -1317,9 +1345,8 @@ mod test {
assert_eq!(plan.display_indent_schema().to_string(), expected);
}

#[tokio::test]
async fn binary_op_literal_column() {
let prom_expr = parser::parse(r#"1 + some_metric{tag_0="bar"}"#).unwrap();
async fn indie_query_plan_compare(query: &str, expected: String) {
let prom_expr = parser::parse(query).unwrap();
let eval_stmt = EvalStmt {
expr: prom_expr,
start: UNIX_EPOCH,
Expand All @@ -1333,7 +1360,13 @@ mod test {
let context_provider = build_test_context_provider("some_metric".to_string(), 1, 1).await;
let plan = PromPlanner::stmt_to_plan(eval_stmt, context_provider).unwrap();

let expected = String::from(
assert_eq!(plan.display_indent_schema().to_string(), expected);
}

#[tokio::test]
async fn binary_op_literal_column() {
let query = r#"1 + some_metric{tag_0="bar"}"#;
let expected = String::from(
"Projection: some_metric.tag_0, some_metric.timestamp, Float64(1) + some_metric.field_0 AS Float64(1) + field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), Float64(1) + field_0:Float64;N]\
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
Expand All @@ -1343,25 +1376,56 @@ mod test {
\n TableScan: some_metric, unsupported_filters=[tag_0 = Utf8(\"bar\"), timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
);

assert_eq!(plan.display_indent_schema().to_string(), expected);
indie_query_plan_compare(query, expected).await;
}

// TODO(ruihang): pure literal arithmetic is not supported yet.
#[tokio::test]
#[ignore = "pure literal arithmetic is not supported yet"]
async fn binary_op_literal_literal() {
let prom_expr = parser::parse(r#"1 + 1"#).unwrap();
let eval_stmt = EvalStmt {
expr: prom_expr,
start: UNIX_EPOCH,
end: UNIX_EPOCH
.checked_add(Duration::from_secs(100_000))
.unwrap(),
interval: Duration::from_secs(5),
lookback_delta: Duration::from_secs(1),
};
let query = r#"1 + 1"#;
let expected = String::from("");

let context_provider = build_test_context_provider("some_metric".to_string(), 1, 1).await;
let plan_result = PromPlanner::stmt_to_plan(eval_stmt, context_provider);
assert!(plan_result.is_err());
indie_query_plan_compare(query, expected).await;
}

#[tokio::test]
async fn simple_bool_grammar() {
let query = "some_metric != bool 1.2345";
let expected = String::from(
"Projection: some_metric.tag_0, some_metric.timestamp, CAST(some_metric.field_0 != Float64(1.2345) AS Float64) AS field_0 != Float64(1.2345) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0 != Float64(1.2345):Float64;N]\
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n TableScan: some_metric, unsupported_filters=[timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
);

indie_query_plan_compare(query, expected).await;
}

#[tokio::test]
#[ignore = "pure literal arithmetic is not supported yet"]
async fn bool_with_additional_arithmetic() {
let query = "some_metric + (1 == bool 2)";
let expected = String::from("");

indie_query_plan_compare(query, expected).await;
}

#[tokio::test]
async fn simple_unary() {
let query = "-some_metric";
let expected = String::from(
"Projection: some_metric.tag_0, some_metric.timestamp, (- some_metric.field_0) AS (- field_0) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), (- field_0):Float64;N]\
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n TableScan: some_metric, unsupported_filters=[timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
);

indie_query_plan_compare(query, expected).await;
}
}
18 changes: 17 additions & 1 deletion src/servers/src/promql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use axum::body::BoxBody;
use axum::extract::{Query, State};
use axum::{routing, Form, Json, Router};
use common_error::prelude::ErrorExt;
use common_error::status_code::StatusCode;
use common_query::Output;
use common_recordbatch::RecordBatches;
use common_telemetry::info;
Expand Down Expand Up @@ -216,7 +217,22 @@ impl PromqlJsonResponse {
json
};

response.unwrap_or_else(|err| Self::error(err.status_code().to_string(), err.to_string()))
match response {
Ok(resp) => resp,
Err(err) => {
// Prometheus won't report error if querying nonexist label and metric
if err.status_code() == StatusCode::TableNotFound
|| err.status_code() == StatusCode::TableColumnNotFound
{
Self::success(PromqlData {
result_type: "matrix".to_string(),
..Default::default()
})
} else {
Self::error(err.status_code().to_string(), err.to_string())
}
}
}
}

fn record_batches_to_data(batches: RecordBatches, metric_name: String) -> Result<PromqlData> {
Expand Down

0 comments on commit 7af9ddf

Please sign in to comment.