From d2fb05ed5ba71fd0f1d440baca12897413c2a8af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Berkay=20=C5=9Eahin?= <124376117+berkaysynnada@users.noreply.github.com> Date: Fri, 17 May 2024 14:08:01 +0300 Subject: [PATCH] PhysicalExpr Orderings with Range Information (#10504) * Self review * Fix null interval accumulation * Refactor monotonicity * Ignore failing tests * Initial impl * Ready for review * Update properties.rs * Update configs.md Update configs.md * cargo doc * Add abs test * Update properties.rs * Update udf.rs * Review Part 1 * Review Part 2 * Minor --------- Co-authored-by: Mehmet Ozan Kabak --- datafusion-examples/examples/advanced_udf.rs | 28 +- .../examples/function_factory.rs | 13 +- .../enforce_distribution.rs | 8 +- .../src/physical_optimizer/join_selection.rs | 4 +- .../physical_optimizer/projection_pushdown.rs | 4 - .../sort_preserving_repartition_fuzz.rs | 6 +- .../user_defined_scalar_functions.rs | 15 +- datafusion/expr/src/interval_arithmetic.rs | 104 ++++++-- datafusion/expr/src/lib.rs | 4 +- datafusion/expr/src/signature.rs | 8 - .../src/sort_properties.rs | 59 +++-- datafusion/expr/src/udf.rs | 132 ++++++++-- datafusion/functions/src/datetime/date_bin.rs | 28 +- .../functions/src/datetime/date_trunc.rs | 33 ++- datafusion/functions/src/macros.rs | 36 +-- datafusion/functions/src/math/abs.rs | 43 ++-- datafusion/functions/src/math/log.rs | 46 ++-- datafusion/functions/src/math/mod.rs | 72 ++++-- datafusion/functions/src/math/monotonicity.rs | 241 ++++++++++++++++++ datafusion/functions/src/math/pi.rs | 10 +- datafusion/functions/src/math/round.rs | 27 +- datafusion/functions/src/math/trunc.rs | 27 +- datafusion/physical-expr-common/src/lib.rs | 1 - .../physical-expr-common/src/physical_expr.rs | 24 +- datafusion/physical-expr-common/src/utils.rs | 31 ++- .../physical-expr/src/equivalence/mod.rs | 15 +- .../physical-expr/src/equivalence/ordering.rs | 26 +- .../src/equivalence/projection.rs | 25 +- .../src/equivalence/properties.rs | 223 ++++++++++------ .../physical-expr/src/expressions/binary.rs | 57 ++++- .../physical-expr/src/expressions/cast.rs | 28 +- .../physical-expr/src/expressions/literal.rs | 11 +- .../physical-expr/src/expressions/negative.rs | 9 +- datafusion/physical-expr/src/functions.rs | 8 +- datafusion/physical-expr/src/lib.rs | 7 - .../physical-expr/src/scalar_function.rs | 114 +++------ datafusion/physical-expr/src/utils/mod.rs | 11 +- .../physical-plan/src/aggregates/mod.rs | 2 +- datafusion/physical-plan/src/filter.rs | 5 +- .../proto/src/physical_plan/from_proto.rs | 1 - .../tests/cases/roundtrip_physical_plan.rs | 2 - datafusion/sqllogictest/test_files/order.slt | 151 +++++++++++ 42 files changed, 1196 insertions(+), 503 deletions(-) rename datafusion/{physical-expr-common => expr}/src/sort_properties.rs (77%) create mode 100644 datafusion/functions/src/math/monotonicity.rs diff --git a/datafusion-examples/examples/advanced_udf.rs b/datafusion-examples/examples/advanced_udf.rs index c8063c0eb1e3..d1ef1c6c9dd0 100644 --- a/datafusion-examples/examples/advanced_udf.rs +++ b/datafusion-examples/examples/advanced_udf.rs @@ -15,26 +15,21 @@ // specific language governing permissions and limitations // under the License. -use datafusion::{ - arrow::{ - array::{ArrayRef, Float32Array, Float64Array}, - datatypes::DataType, - record_batch::RecordBatch, - }, - logical_expr::Volatility, -}; use std::any::Any; +use std::sync::Arc; -use arrow::array::{new_null_array, Array, AsArray}; +use arrow::array::{ + new_null_array, Array, ArrayRef, AsArray, Float32Array, Float64Array, +}; use arrow::compute; -use arrow::datatypes::Float64Type; +use arrow::datatypes::{DataType, Float64Type}; +use arrow::record_batch::RecordBatch; use datafusion::error::Result; +use datafusion::logical_expr::Volatility; use datafusion::prelude::*; use datafusion_common::{internal_err, ScalarValue}; -use datafusion_expr::{ - ColumnarValue, FuncMonotonicity, ScalarUDF, ScalarUDFImpl, Signature, -}; -use std::sync::Arc; +use datafusion_expr::sort_properties::{ExprProperties, SortProperties}; +use datafusion_expr::{ColumnarValue, ScalarUDF, ScalarUDFImpl, Signature}; /// This example shows how to use the full ScalarUDFImpl API to implement a user /// defined function. As in the `simple_udf.rs` example, this struct implements @@ -186,8 +181,9 @@ impl ScalarUDFImpl for PowUdf { &self.aliases } - fn monotonicity(&self) -> Result> { - Ok(Some(vec![Some(true)])) + fn monotonicity(&self, input: &[ExprProperties]) -> Result { + // The POW function preserves the order of its argument. + Ok(input[0].sort_properties) } } diff --git a/datafusion-examples/examples/function_factory.rs b/datafusion-examples/examples/function_factory.rs index 3973e50474ba..9e624b66294d 100644 --- a/datafusion-examples/examples/function_factory.rs +++ b/datafusion-examples/examples/function_factory.rs @@ -15,17 +15,18 @@ // specific language governing permissions and limitations // under the License. +use std::result::Result as RResult; +use std::sync::Arc; + use datafusion::error::Result; use datafusion::execution::context::{ FunctionFactory, RegisterFunction, SessionContext, SessionState, }; use datafusion_common::tree_node::{Transformed, TreeNode}; use datafusion_common::{exec_err, internal_err, DataFusionError}; -use datafusion_expr::simplify::ExprSimplifyResult; -use datafusion_expr::simplify::SimplifyInfo; +use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyInfo}; +use datafusion_expr::sort_properties::{ExprProperties, SortProperties}; use datafusion_expr::{CreateFunction, Expr, ScalarUDF, ScalarUDFImpl, Signature}; -use std::result::Result as RResult; -use std::sync::Arc; /// This example shows how to utilize [FunctionFactory] to implement simple /// SQL-macro like functions using a `CREATE FUNCTION` statement. The same @@ -156,8 +157,8 @@ impl ScalarUDFImpl for ScalarFunctionWrapper { &[] } - fn monotonicity(&self) -> Result> { - Ok(None) + fn monotonicity(&self, _input: &[ExprProperties]) -> Result { + Ok(SortProperties::Unordered) } } diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index c07f2c5dcf24..cd84e911d381 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -3572,7 +3572,11 @@ pub(crate) mod tests { expr: col("c", &schema).unwrap(), options: SortOptions::default(), }]; - let alias = vec![("a".to_string(), "a".to_string())]; + let alias = vec![ + ("a".to_string(), "a".to_string()), + ("b".to_string(), "b".to_string()), + ("c".to_string(), "c".to_string()), + ]; let plan = sort_preserving_merge_exec( sort_key.clone(), sort_exec( @@ -3585,7 +3589,7 @@ pub(crate) mod tests { let expected = &[ "SortExec: expr=[c@2 ASC], preserve_partitioning=[true]", // Since this projection is trivial, increasing parallelism is not beneficial - "ProjectionExec: expr=[a@0 as a]", + "ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ]; assert_optimized!(expected, plan.clone(), true); diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs b/datafusion/core/src/physical_optimizer/join_selection.rs index 042a0198bfb5..135a59aa0353 100644 --- a/datafusion/core/src/physical_optimizer/join_selection.rs +++ b/datafusion/core/src/physical_optimizer/join_selection.rs @@ -39,8 +39,8 @@ use crate::physical_plan::{ExecutionPlan, ExecutionPlanProperties}; use arrow_schema::Schema; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::{internal_err, JoinSide, JoinType}; +use datafusion_expr::sort_properties::SortProperties; use datafusion_physical_expr::expressions::Column; -use datafusion_physical_expr::sort_properties::SortProperties; use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr}; /// The [`JoinSelection`] rule tries to modify a given plan so that it can @@ -561,7 +561,7 @@ fn hash_join_convert_symmetric_subrule( let name = schema.field(*index).name(); let col = Arc::new(Column::new(name, *index)) as _; // Check if the column is ordered. - equivalence.get_expr_ordering(col).data + equivalence.get_expr_properties(col).sort_properties != SortProperties::Unordered }, ) diff --git a/datafusion/core/src/physical_optimizer/projection_pushdown.rs b/datafusion/core/src/physical_optimizer/projection_pushdown.rs index 0190f35cc97b..fe1290e40774 100644 --- a/datafusion/core/src/physical_optimizer/projection_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/projection_pushdown.rs @@ -1376,7 +1376,6 @@ mod tests { )), ], DataType::Int32, - None, )), Arc::new(CaseExpr::try_new( Some(Arc::new(Column::new("d", 2))), @@ -1442,7 +1441,6 @@ mod tests { )), ], DataType::Int32, - None, )), Arc::new(CaseExpr::try_new( Some(Arc::new(Column::new("d", 3))), @@ -1511,7 +1509,6 @@ mod tests { )), ], DataType::Int32, - None, )), Arc::new(CaseExpr::try_new( Some(Arc::new(Column::new("d", 2))), @@ -1577,7 +1574,6 @@ mod tests { )), ], DataType::Int32, - None, )), Arc::new(CaseExpr::try_new( Some(Arc::new(Column::new("d_new", 3))), diff --git a/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs b/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs index 6c9c3359ebf4..21ef8a7c2110 100644 --- a/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs @@ -39,12 +39,12 @@ mod sp_repartition_fuzz_tests { config::SessionConfig, memory_pool::MemoryConsumer, SendableRecordBatchStream, }; use datafusion_physical_expr::{ + equivalence::{EquivalenceClass, EquivalenceProperties}, expressions::{col, Column}, - EquivalenceProperties, PhysicalExpr, PhysicalSortExpr, + PhysicalExpr, PhysicalSortExpr, }; use test_utils::add_empty_batches; - use datafusion_physical_expr::equivalence::EquivalenceClass; use itertools::izip; use rand::{rngs::StdRng, seq::SliceRandom, Rng, SeedableRng}; @@ -78,7 +78,7 @@ mod sp_repartition_fuzz_tests { let mut eq_properties = EquivalenceProperties::new(test_schema.clone()); // Define a and f are aliases - eq_properties.add_equal_conditions(col_a, col_f); + eq_properties.add_equal_conditions(col_a, col_f)?; // Column e has constant value. eq_properties = eq_properties.add_constants([col_e.clone()]); diff --git a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs index def9fcb4c61b..df41cab7bf02 100644 --- a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs +++ b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs @@ -15,26 +15,27 @@ // specific language governing permissions and limitations // under the License. +use std::any::Any; +use std::sync::Arc; + use arrow::compute::kernels::numeric::add; use arrow_array::{ArrayRef, Float32Array, Float64Array, Int32Array, RecordBatch}; use arrow_schema::{DataType, Field, Schema}; use datafusion::execution::context::{FunctionFactory, RegisterFunction, SessionState}; use datafusion::prelude::*; use datafusion::{execution::registry::FunctionRegistry, test_util}; +use datafusion_common::cast::{as_float64_array, as_int32_array}; use datafusion_common::tree_node::{Transformed, TreeNode}; use datafusion_common::{ - assert_batches_eq, assert_batches_sorted_eq, cast::as_float64_array, - cast::as_int32_array, not_impl_err, plan_err, ExprSchema, Result, ScalarValue, + assert_batches_eq, assert_batches_sorted_eq, assert_contains, exec_err, internal_err, + not_impl_err, plan_err, DataFusionError, ExprSchema, Result, ScalarValue, }; -use datafusion_common::{assert_contains, exec_err, internal_err, DataFusionError}; use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyInfo}; use datafusion_expr::{ Accumulator, ColumnarValue, CreateFunction, ExprSchemable, LogicalPlanBuilder, ScalarUDF, ScalarUDFImpl, Signature, Volatility, }; -use std::any::Any; -use std::sync::Arc; /// test that casting happens on udfs. /// c11 is f32, but `custom_sqrt` requires f64. Casting happens but the logical plan and @@ -776,10 +777,6 @@ impl ScalarUDFImpl for ScalarFunctionWrapper { fn aliases(&self) -> &[String] { &[] } - - fn monotonicity(&self) -> Result> { - Ok(None) - } } impl ScalarFunctionWrapper { diff --git a/datafusion/expr/src/interval_arithmetic.rs b/datafusion/expr/src/interval_arithmetic.rs index ca91a8c9da00..c4890b97e748 100644 --- a/datafusion/expr/src/interval_arithmetic.rs +++ b/datafusion/expr/src/interval_arithmetic.rs @@ -273,19 +273,34 @@ impl Interval { unreachable!(); }; // Standardize boolean interval endpoints: - Self { + return Self { lower: ScalarValue::Boolean(Some(lower_bool.unwrap_or(false))), upper: ScalarValue::Boolean(Some(upper_bool.unwrap_or(true))), - } + }; } - // Standardize floating-point endpoints: - else if lower.data_type() == DataType::Float32 { - handle_float_intervals!(Float32, f32, lower, upper) - } else if lower.data_type() == DataType::Float64 { - handle_float_intervals!(Float64, f64, lower, upper) - } else { + match lower.data_type() { + // Standardize floating-point endpoints: + DataType::Float32 => handle_float_intervals!(Float32, f32, lower, upper), + DataType::Float64 => handle_float_intervals!(Float64, f64, lower, upper), + // Unsigned null values for lower bounds are set to zero: + DataType::UInt8 if lower.is_null() => Self { + lower: ScalarValue::UInt8(Some(0)), + upper, + }, + DataType::UInt16 if lower.is_null() => Self { + lower: ScalarValue::UInt16(Some(0)), + upper, + }, + DataType::UInt32 if lower.is_null() => Self { + lower: ScalarValue::UInt32(Some(0)), + upper, + }, + DataType::UInt64 if lower.is_null() => Self { + lower: ScalarValue::UInt64(Some(0)), + upper, + }, // Other data types do not require standardization: - Self { lower, upper } + _ => Self { lower, upper }, } } @@ -299,6 +314,12 @@ impl Interval { Self::try_new(ScalarValue::from(lower), ScalarValue::from(upper)) } + /// Creates a singleton zero interval if the datatype supported. + pub fn make_zero(data_type: &DataType) -> Result { + let zero_endpoint = ScalarValue::new_zero(data_type)?; + Ok(Self::new(zero_endpoint.clone(), zero_endpoint)) + } + /// Creates an unbounded interval from both sides if the datatype supported. pub fn make_unbounded(data_type: &DataType) -> Result { let unbounded_endpoint = ScalarValue::try_from(data_type)?; @@ -369,7 +390,7 @@ impl Interval { /// NOTE: This function only works with intervals of the same data type. /// Attempting to compare intervals of different data types will lead /// to an error. - pub(crate) fn gt>(&self, other: T) -> Result { + pub fn gt>(&self, other: T) -> Result { let rhs = other.borrow(); if self.data_type().ne(&rhs.data_type()) { internal_err!( @@ -402,7 +423,7 @@ impl Interval { /// NOTE: This function only works with intervals of the same data type. /// Attempting to compare intervals of different data types will lead /// to an error. - pub(crate) fn gt_eq>(&self, other: T) -> Result { + pub fn gt_eq>(&self, other: T) -> Result { let rhs = other.borrow(); if self.data_type().ne(&rhs.data_type()) { internal_err!( @@ -435,7 +456,7 @@ impl Interval { /// NOTE: This function only works with intervals of the same data type. /// Attempting to compare intervals of different data types will lead /// to an error. - pub(crate) fn lt>(&self, other: T) -> Result { + pub fn lt>(&self, other: T) -> Result { other.borrow().gt(self) } @@ -446,7 +467,7 @@ impl Interval { /// NOTE: This function only works with intervals of the same data type. /// Attempting to compare intervals of different data types will lead /// to an error. - pub(crate) fn lt_eq>(&self, other: T) -> Result { + pub fn lt_eq>(&self, other: T) -> Result { other.borrow().gt_eq(self) } @@ -457,7 +478,7 @@ impl Interval { /// NOTE: This function only works with intervals of the same data type. /// Attempting to compare intervals of different data types will lead /// to an error. - pub(crate) fn equal>(&self, other: T) -> Result { + pub fn equal>(&self, other: T) -> Result { let rhs = other.borrow(); if get_result_type(&self.data_type(), &Operator::Eq, &rhs.data_type()).is_err() { internal_err!( @@ -480,7 +501,7 @@ impl Interval { /// Compute the logical conjunction of this (boolean) interval with the /// given boolean interval. - pub(crate) fn and>(&self, other: T) -> Result { + pub fn and>(&self, other: T) -> Result { let rhs = other.borrow(); match (&self.lower, &self.upper, &rhs.lower, &rhs.upper) { ( @@ -501,8 +522,31 @@ impl Interval { } } + /// Compute the logical disjunction of this boolean interval with the + /// given boolean interval. + pub fn or>(&self, other: T) -> Result { + let rhs = other.borrow(); + match (&self.lower, &self.upper, &rhs.lower, &rhs.upper) { + ( + &ScalarValue::Boolean(Some(self_lower)), + &ScalarValue::Boolean(Some(self_upper)), + &ScalarValue::Boolean(Some(other_lower)), + &ScalarValue::Boolean(Some(other_upper)), + ) => { + let lower = self_lower || other_lower; + let upper = self_upper || other_upper; + + Ok(Self { + lower: ScalarValue::Boolean(Some(lower)), + upper: ScalarValue::Boolean(Some(upper)), + }) + } + _ => internal_err!("Incompatible data types for logical conjunction"), + } + } + /// Compute the logical negation of this (boolean) interval. - pub(crate) fn not(&self) -> Result { + pub fn not(&self) -> Result { if self.data_type().ne(&DataType::Boolean) { internal_err!("Cannot apply logical negation to a non-boolean interval") } else if self == &Self::CERTAINLY_TRUE { @@ -761,6 +805,18 @@ impl Interval { } .map(|result| result + 1) } + + /// Reflects an [`Interval`] around the point zero. + /// + /// This method computes the arithmetic negation of the interval, reflecting + /// it about the origin of the number line. This operation swaps and negates + /// the lower and upper bounds of the interval. + pub fn arithmetic_negate(self) -> Result { + Ok(Self { + lower: self.upper().clone().arithmetic_negate()?, + upper: self.lower().clone().arithmetic_negate()?, + }) + } } impl Display for Interval { @@ -1885,10 +1941,10 @@ mod tests { let unbounded_cases = vec![ (DataType::Boolean, Boolean(Some(false)), Boolean(Some(true))), - (DataType::UInt8, UInt8(None), UInt8(None)), - (DataType::UInt16, UInt16(None), UInt16(None)), - (DataType::UInt32, UInt32(None), UInt32(None)), - (DataType::UInt64, UInt64(None), UInt64(None)), + (DataType::UInt8, UInt8(Some(0)), UInt8(None)), + (DataType::UInt16, UInt16(Some(0)), UInt16(None)), + (DataType::UInt32, UInt32(Some(0)), UInt32(None)), + (DataType::UInt64, UInt64(Some(0)), UInt64(None)), (DataType::Int8, Int8(None), Int8(None)), (DataType::Int16, Int16(None), Int16(None)), (DataType::Int32, Int32(None), Int32(None)), @@ -1994,6 +2050,10 @@ mod tests { Interval::make(None, Some(1000_i64))?, Interval::make(Some(1000_i64), Some(1500_i64))?, ), + ( + Interval::make(Some(0_u8), Some(0_u8))?, + Interval::make::(None, None)?, + ), ( Interval::try_new( prev_value(ScalarValue::Float32(Some(0.0_f32))), @@ -2036,6 +2096,10 @@ mod tests { Interval::make(Some(-1000_i64), Some(1000_i64))?, Interval::make(None, Some(-1500_i64))?, ), + ( + Interval::make::(None, None)?, + Interval::make(Some(0_u64), Some(0_u64))?, + ), ( Interval::make(Some(0.0_f32), Some(0.0_f32))?, Interval::make(Some(0.0_f32), Some(0.0_f32))?, diff --git a/datafusion/expr/src/lib.rs b/datafusion/expr/src/lib.rs index e2b68388abb9..bac2f9c14541 100644 --- a/datafusion/expr/src/lib.rs +++ b/datafusion/expr/src/lib.rs @@ -50,6 +50,7 @@ pub mod groups_accumulator; pub mod interval_arithmetic; pub mod logical_plan; pub mod simplify; +pub mod sort_properties; pub mod tree_node; pub mod type_coercion; pub mod utils; @@ -77,8 +78,7 @@ pub use logical_plan::*; pub use operator::Operator; pub use partition_evaluator::PartitionEvaluator; pub use signature::{ - ArrayFunctionSignature, FuncMonotonicity, Signature, TypeSignature, Volatility, - TIMEZONE_WILDCARD, + ArrayFunctionSignature, Signature, TypeSignature, Volatility, TIMEZONE_WILDCARD, }; pub use table_source::{TableProviderFilterPushDown, TableSource, TableType}; pub use udaf::{AggregateUDF, AggregateUDFImpl}; diff --git a/datafusion/expr/src/signature.rs b/datafusion/expr/src/signature.rs index 5d925c8605ee..63b030f0b748 100644 --- a/datafusion/expr/src/signature.rs +++ b/datafusion/expr/src/signature.rs @@ -343,14 +343,6 @@ impl Signature { } } -/// Monotonicity of the `ScalarFunctionExpr` with respect to its arguments. -/// Each element of this vector corresponds to an argument and indicates whether -/// the function's behavior is monotonic, or non-monotonic/unknown for that argument, namely: -/// - `None` signifies unknown monotonicity or non-monotonicity. -/// - `Some(true)` indicates that the function is monotonically increasing w.r.t. the argument in question. -/// - Some(false) indicates that the function is monotonically decreasing w.r.t. the argument in question. -pub type FuncMonotonicity = Vec>; - #[cfg(test)] mod tests { use super::*; diff --git a/datafusion/physical-expr-common/src/sort_properties.rs b/datafusion/expr/src/sort_properties.rs similarity index 77% rename from datafusion/physical-expr-common/src/sort_properties.rs rename to datafusion/expr/src/sort_properties.rs index 47a5d5ba5e3b..7778be2ecf0d 100644 --- a/datafusion/physical-expr-common/src/sort_properties.rs +++ b/datafusion/expr/src/sort_properties.rs @@ -17,9 +17,10 @@ use std::ops::Neg; -use arrow::compute::SortOptions; +use crate::interval_arithmetic::Interval; -use crate::tree_node::ExprContext; +use arrow::compute::SortOptions; +use arrow::datatypes::DataType; /// To propagate [`SortOptions`] across the `PhysicalExpr`, it is insufficient /// to simply use `Option`: There must be a differentiation between @@ -120,29 +121,39 @@ impl SortProperties { impl Neg for SortProperties { type Output = Self; - fn neg(self) -> Self::Output { - match self { - SortProperties::Ordered(SortOptions { - descending, - nulls_first, - }) => SortProperties::Ordered(SortOptions { - descending: !descending, - nulls_first, - }), - SortProperties::Singleton => SortProperties::Singleton, - SortProperties::Unordered => SortProperties::Unordered, + fn neg(mut self) -> Self::Output { + if let SortProperties::Ordered(SortOptions { descending, .. }) = &mut self { + *descending = !*descending; } + self } } -/// The `ExprOrdering` struct is designed to aid in the determination of ordering (represented -/// by [`SortProperties`]) for a given `PhysicalExpr`. When analyzing the orderings -/// of a `PhysicalExpr`, the process begins by assigning the ordering of its leaf nodes. -/// By propagating these leaf node orderings upwards in the expression tree, the overall -/// ordering of the entire `PhysicalExpr` can be derived. -/// -/// This struct holds the necessary state information for each expression in the `PhysicalExpr`. -/// It encapsulates the orderings (`data`) associated with the expression (`expr`), and -/// orderings of the children expressions (`children`). The [`ExprOrdering`] of a parent -/// expression is determined based on the [`ExprOrdering`] states of its children expressions. -pub type ExprOrdering = ExprContext; +/// Represents the properties of a `PhysicalExpr`, including its sorting and range attributes. +#[derive(Debug, Clone)] +pub struct ExprProperties { + pub sort_properties: SortProperties, + pub range: Interval, +} + +impl ExprProperties { + /// Creates a new `ExprProperties` instance with unknown sort properties and unknown range. + pub fn new_unknown() -> Self { + Self { + sort_properties: SortProperties::default(), + range: Interval::make_unbounded(&DataType::Null).unwrap(), + } + } + + /// Sets the sorting properties of the expression and returns the modified instance. + pub fn with_order(mut self, order: SortProperties) -> Self { + self.sort_properties = order; + self + } + + /// Sets the range of the expression and returns the modified instance. + pub fn with_range(mut self, range: Interval) -> Self { + self.range = range; + self + } +} diff --git a/datafusion/expr/src/udf.rs b/datafusion/expr/src/udf.rs index fadea26e7f4e..921d13ab3583 100644 --- a/datafusion/expr/src/udf.rs +++ b/datafusion/expr/src/udf.rs @@ -17,19 +17,20 @@ //! [`ScalarUDF`]: Scalar User Defined Functions +use std::any::Any; +use std::fmt::{self, Debug, Formatter}; +use std::sync::Arc; + use crate::expr::create_name; +use crate::interval_arithmetic::Interval; use crate::simplify::{ExprSimplifyResult, SimplifyInfo}; +use crate::sort_properties::{ExprProperties, SortProperties}; use crate::{ - ColumnarValue, Expr, FuncMonotonicity, ReturnTypeFunction, - ScalarFunctionImplementation, Signature, + ColumnarValue, Expr, ReturnTypeFunction, ScalarFunctionImplementation, Signature, }; + use arrow::datatypes::DataType; use datafusion_common::{not_impl_err, ExprSchema, Result}; -use std::any::Any; -use std::fmt; -use std::fmt::Debug; -use std::fmt::Formatter; -use std::sync::Arc; /// Logical representation of a Scalar User Defined Function. /// @@ -202,18 +203,63 @@ impl ScalarUDF { Arc::new(move |args| captured.invoke(args)) } - /// This function specifies monotonicity behaviors for User defined scalar functions. - /// - /// See [`ScalarUDFImpl::monotonicity`] for more details. - pub fn monotonicity(&self) -> Result> { - self.inner.monotonicity() - } - /// Get the circuits of inner implementation pub fn short_circuits(&self) -> bool { self.inner.short_circuits() } + /// Computes the output interval for a [`ScalarUDF`], given the input + /// intervals. + /// + /// # Parameters + /// + /// * `inputs` are the intervals for the inputs (children) of this function. + /// + /// # Example + /// + /// If the function is `ABS(a)`, and the input interval is `a: [-3, 2]`, + /// then the output interval would be `[0, 3]`. + pub fn evaluate_bounds(&self, inputs: &[&Interval]) -> Result { + self.inner.evaluate_bounds(inputs) + } + + /// Updates bounds for child expressions, given a known interval for this + /// function. This is used to propagate constraints down through an expression + /// tree. + /// + /// # Parameters + /// + /// * `interval` is the currently known interval for this function. + /// * `inputs` are the current intervals for the inputs (children) of this function. + /// + /// # Returns + /// + /// A `Vec` of new intervals for the children, in order. + /// + /// If constraint propagation reveals an infeasibility for any child, returns + /// [`None`]. If none of the children intervals change as a result of + /// propagation, may return an empty vector instead of cloning `children`. + /// This is the default (and conservative) return value. + /// + /// # Example + /// + /// If the function is `ABS(a)`, the current `interval` is `[4, 5]` and the + /// input `a` is given as `[-7, -6]`, then propagation would would return + /// `[-5, 5]`. + pub fn propagate_constraints( + &self, + interval: &Interval, + inputs: &[&Interval], + ) -> Result>> { + self.inner.propagate_constraints(interval, inputs) + } + + /// Calculates the [`SortProperties`] of this function based on its + /// children's properties. + pub fn monotonicity(&self, inputs: &[ExprProperties]) -> Result { + self.inner.monotonicity(inputs) + } + /// See [`ScalarUDFImpl::coerce_types`] for more details. pub fn coerce_types(&self, arg_types: &[DataType]) -> Result> { self.inner.coerce_types(arg_types) @@ -387,11 +433,6 @@ pub trait ScalarUDFImpl: Debug + Send + Sync { &[] } - /// This function specifies monotonicity behaviors for User defined scalar functions. - fn monotonicity(&self) -> Result> { - Ok(None) - } - /// Optionally apply per-UDF simplification / rewrite rules. /// /// This can be used to apply function specific simplification rules during @@ -426,6 +467,59 @@ pub trait ScalarUDFImpl: Debug + Send + Sync { false } + /// Computes the output interval for a [`ScalarUDFImpl`], given the input + /// intervals. + /// + /// # Parameters + /// + /// * `children` are the intervals for the children (inputs) of this function. + /// + /// # Example + /// + /// If the function is `ABS(a)`, and the input interval is `a: [-3, 2]`, + /// then the output interval would be `[0, 3]`. + fn evaluate_bounds(&self, _input: &[&Interval]) -> Result { + // We cannot assume the input datatype is the same of output type. + Interval::make_unbounded(&DataType::Null) + } + + /// Updates bounds for child expressions, given a known interval for this + /// function. This is used to propagate constraints down through an expression + /// tree. + /// + /// # Parameters + /// + /// * `interval` is the currently known interval for this function. + /// * `inputs` are the current intervals for the inputs (children) of this function. + /// + /// # Returns + /// + /// A `Vec` of new intervals for the children, in order. + /// + /// If constraint propagation reveals an infeasibility for any child, returns + /// [`None`]. If none of the children intervals change as a result of + /// propagation, may return an empty vector instead of cloning `children`. + /// This is the default (and conservative) return value. + /// + /// # Example + /// + /// If the function is `ABS(a)`, the current `interval` is `[4, 5]` and the + /// input `a` is given as `[-7, -6]`, then propagation would would return + /// `[-5, 5]`. + fn propagate_constraints( + &self, + _interval: &Interval, + _inputs: &[&Interval], + ) -> Result>> { + Ok(Some(vec![])) + } + + /// Calculates the [`SortProperties`] of this function based on its + /// children's properties. + fn monotonicity(&self, _inputs: &[ExprProperties]) -> Result { + Ok(SortProperties::Unordered) + } + /// Coerce arguments of a function call to types that the function can evaluate. /// /// This function is only called if [`ScalarUDFImpl::signature`] returns [`crate::TypeSignature::UserDefined`]. Most diff --git a/datafusion/functions/src/datetime/date_bin.rs b/datafusion/functions/src/datetime/date_bin.rs index da1797cdae81..51f5c09a0665 100644 --- a/datafusion/functions/src/datetime/date_bin.rs +++ b/datafusion/functions/src/datetime/date_bin.rs @@ -29,16 +29,17 @@ use arrow::datatypes::DataType::{Null, Timestamp, Utf8}; use arrow::datatypes::IntervalUnit::{DayTime, MonthDayNano}; use arrow::datatypes::TimeUnit::{Microsecond, Millisecond, Nanosecond, Second}; use arrow::datatypes::{DataType, TimeUnit}; -use chrono::{DateTime, Datelike, Duration, Months, TimeDelta, Utc}; use datafusion_common::cast::as_primitive_array; use datafusion_common::{exec_err, not_impl_err, plan_err, Result, ScalarValue}; +use datafusion_expr::sort_properties::{ExprProperties, SortProperties}; use datafusion_expr::TypeSignature::Exact; use datafusion_expr::{ - ColumnarValue, FuncMonotonicity, ScalarUDFImpl, Signature, Volatility, - TIMEZONE_WILDCARD, + ColumnarValue, ScalarUDFImpl, Signature, Volatility, TIMEZONE_WILDCARD, }; +use chrono::{DateTime, Datelike, Duration, Months, TimeDelta, Utc}; + #[derive(Debug)] pub struct DateBinFunc { signature: Signature, @@ -146,8 +147,21 @@ impl ScalarUDFImpl for DateBinFunc { } } - fn monotonicity(&self) -> Result> { - Ok(Some(vec![None, Some(true)])) + fn monotonicity(&self, input: &[ExprProperties]) -> Result { + // The DATE_BIN function preserves the order of its second argument. + let step = &input[0]; + let date_value = &input[1]; + let reference = input.get(2); + + if step.sort_properties.eq(&SortProperties::Singleton) + && reference + .map(|r| r.sort_properties.eq(&SortProperties::Singleton)) + .unwrap_or(true) + { + Ok(date_value.sort_properties) + } else { + Ok(SortProperties::Unordered) + } } } @@ -425,16 +439,16 @@ fn date_bin_impl( mod tests { use std::sync::Arc; + use crate::datetime::date_bin::{date_bin_nanos_interval, DateBinFunc}; use arrow::array::types::TimestampNanosecondType; use arrow::array::{IntervalDayTimeArray, TimestampNanosecondArray}; use arrow::compute::kernels::cast_utils::string_to_timestamp_nanos; use arrow::datatypes::{DataType, TimeUnit}; - use chrono::TimeDelta; use datafusion_common::ScalarValue; use datafusion_expr::{ColumnarValue, ScalarUDFImpl}; - use crate::datetime::date_bin::{date_bin_nanos_interval, DateBinFunc}; + use chrono::TimeDelta; #[test] fn test_date_bin() { diff --git a/datafusion/functions/src/datetime/date_trunc.rs b/datafusion/functions/src/datetime/date_trunc.rs index 0414bf9c2a26..ba5db567a025 100644 --- a/datafusion/functions/src/datetime/date_trunc.rs +++ b/datafusion/functions/src/datetime/date_trunc.rs @@ -29,19 +29,18 @@ use arrow::array::types::{ TimestampNanosecondType, TimestampSecondType, }; use arrow::array::{Array, PrimitiveArray}; -use arrow::datatypes::DataType::{Null, Timestamp, Utf8}; -use arrow::datatypes::TimeUnit::{Microsecond, Millisecond, Nanosecond, Second}; -use arrow::datatypes::{DataType, TimeUnit}; -use chrono::{ - DateTime, Datelike, Duration, LocalResult, NaiveDateTime, Offset, TimeDelta, Timelike, -}; - +use arrow::datatypes::DataType::{self, Null, Timestamp, Utf8}; +use arrow::datatypes::TimeUnit::{self, Microsecond, Millisecond, Nanosecond, Second}; use datafusion_common::cast::as_primitive_array; use datafusion_common::{exec_err, plan_err, DataFusionError, Result, ScalarValue}; +use datafusion_expr::sort_properties::{ExprProperties, SortProperties}; use datafusion_expr::TypeSignature::Exact; use datafusion_expr::{ - ColumnarValue, FuncMonotonicity, ScalarUDFImpl, Signature, Volatility, - TIMEZONE_WILDCARD, + ColumnarValue, ScalarUDFImpl, Signature, Volatility, TIMEZONE_WILDCARD, +}; + +use chrono::{ + DateTime, Datelike, Duration, LocalResult, NaiveDateTime, Offset, TimeDelta, Timelike, }; #[derive(Debug)] @@ -205,8 +204,16 @@ impl ScalarUDFImpl for DateTruncFunc { &self.aliases } - fn monotonicity(&self) -> Result> { - Ok(Some(vec![None, Some(true)])) + fn monotonicity(&self, input: &[ExprProperties]) -> Result { + // The DATE_TRUNC function preserves the order of its second argument. + let precision = &input[0]; + let date_value = &input[1]; + + if precision.sort_properties.eq(&SortProperties::Singleton) { + Ok(date_value.sort_properties) + } else { + Ok(SortProperties::Unordered) + } } } @@ -410,7 +417,10 @@ fn parse_tz(tz: &Option>) -> Result> { #[cfg(test)] mod tests { + use std::sync::Arc; + use crate::datetime::date_trunc::{date_trunc_coarse, DateTruncFunc}; + use arrow::array::cast::as_primitive_array; use arrow::array::types::TimestampNanosecondType; use arrow::array::TimestampNanosecondArray; @@ -418,7 +428,6 @@ mod tests { use arrow::datatypes::{DataType, TimeUnit}; use datafusion_common::ScalarValue; use datafusion_expr::{ColumnarValue, ScalarUDFImpl}; - use std::sync::Arc; #[test] fn date_trunc_test() { diff --git a/datafusion/functions/src/macros.rs b/datafusion/functions/src/macros.rs index 5ee47bd3e8eb..2f14e881d1d8 100644 --- a/datafusion/functions/src/macros.rs +++ b/datafusion/functions/src/macros.rs @@ -89,7 +89,6 @@ macro_rules! make_udf_function { /// The rationale for providing stub functions is to help users to configure datafusion /// properly (so they get an error telling them why a function is not available) /// instead of getting a cryptic "no function found" message at runtime. - macro_rules! make_stub_package { ($name:ident, $feature:literal) => { #[cfg(not(feature = $feature))] @@ -115,7 +114,6 @@ macro_rules! make_stub_package { /// $ARGS_TYPE: the type of array to cast the argument to /// $RETURN_TYPE: the type of array to return /// $FUNC: the function to apply to each element of $ARG -/// macro_rules! make_function_scalar_inputs_return_type { ($ARG: expr, $NAME:expr, $ARG_TYPE:ident, $RETURN_TYPE:ident, $FUNC: block) => {{ let arg = downcast_arg!($ARG, $NAME, $ARG_TYPE); @@ -162,14 +160,14 @@ macro_rules! make_math_unary_udf { make_udf_function!($NAME::$UDF, $GNAME, $NAME); mod $NAME { + use std::any::Any; + use std::sync::Arc; + use arrow::array::{ArrayRef, Float32Array, Float64Array}; use arrow::datatypes::DataType; use datafusion_common::{exec_err, DataFusionError, Result}; - use datafusion_expr::{ - ColumnarValue, FuncMonotonicity, ScalarUDFImpl, Signature, Volatility, - }; - use std::any::Any; - use std::sync::Arc; + use datafusion_expr::sort_properties::{ExprProperties, SortProperties}; + use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility}; #[derive(Debug)] pub struct $UDF { @@ -211,8 +209,11 @@ macro_rules! make_math_unary_udf { } } - fn monotonicity(&self) -> Result> { - Ok($MONOTONICITY) + fn monotonicity( + &self, + input: &[ExprProperties], + ) -> Result { + $MONOTONICITY(input) } fn invoke(&self, args: &[ColumnarValue]) -> Result { @@ -266,15 +267,15 @@ macro_rules! make_math_binary_udf { make_udf_function!($NAME::$UDF, $GNAME, $NAME); mod $NAME { + use std::any::Any; + use std::sync::Arc; + use arrow::array::{ArrayRef, Float32Array, Float64Array}; use arrow::datatypes::DataType; use datafusion_common::{exec_err, DataFusionError, Result}; + use datafusion_expr::sort_properties::{ExprProperties, SortProperties}; use datafusion_expr::TypeSignature::*; - use datafusion_expr::{ - ColumnarValue, FuncMonotonicity, ScalarUDFImpl, Signature, Volatility, - }; - use std::any::Any; - use std::sync::Arc; + use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility}; #[derive(Debug)] pub struct $UDF { @@ -318,8 +319,11 @@ macro_rules! make_math_binary_udf { } } - fn monotonicity(&self) -> Result> { - Ok($MONOTONICITY) + fn monotonicity( + &self, + input: &[ExprProperties], + ) -> Result { + $MONOTONICITY(input) } fn invoke(&self, args: &[ColumnarValue]) -> Result { diff --git a/datafusion/functions/src/math/abs.rs b/datafusion/functions/src/math/abs.rs index e05dc8665285..a752102913ba 100644 --- a/datafusion/functions/src/math/abs.rs +++ b/datafusion/functions/src/math/abs.rs @@ -17,23 +17,20 @@ //! math expressions -use arrow::array::Decimal128Array; -use arrow::array::Decimal256Array; -use arrow::array::Int16Array; -use arrow::array::Int32Array; -use arrow::array::Int64Array; -use arrow::array::Int8Array; -use arrow::datatypes::DataType; -use datafusion_common::{exec_err, not_impl_err}; -use datafusion_common::{DataFusionError, Result}; -use datafusion_expr::ColumnarValue; - -use arrow::array::{ArrayRef, Float32Array, Float64Array}; -use arrow::error::ArrowError; -use datafusion_expr::{ScalarUDFImpl, Signature, Volatility}; use std::any::Any; use std::sync::Arc; +use arrow::array::{ + ArrayRef, Decimal128Array, Decimal256Array, Float32Array, Float64Array, Int16Array, + Int32Array, Int64Array, Int8Array, +}; +use arrow::datatypes::DataType; +use arrow::error::ArrowError; +use datafusion_common::{exec_err, not_impl_err, DataFusionError, Result}; +use datafusion_expr::interval_arithmetic::Interval; +use datafusion_expr::sort_properties::{ExprProperties, SortProperties}; +use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility}; + type MathArrayFunction = fn(&Vec) -> Result; macro_rules! make_abs_function { @@ -170,7 +167,21 @@ impl ScalarUDFImpl for AbsFunc { let input_data_type = args[0].data_type(); let abs_fun = create_abs_function(input_data_type)?; - let arr = abs_fun(&args)?; - Ok(ColumnarValue::Array(arr)) + abs_fun(&args).map(ColumnarValue::Array) + } + + fn monotonicity(&self, input: &[ExprProperties]) -> Result { + // Non-decreasing for x ≥ 0 and symmetrically non-increasing for x ≤ 0. + let arg = &input[0]; + let range = &arg.range; + let zero_point = Interval::make_zero(&range.lower().data_type())?; + + if range.gt_eq(&zero_point)? == Interval::CERTAINLY_TRUE { + Ok(arg.sort_properties) + } else if range.lt_eq(&zero_point)? == Interval::CERTAINLY_TRUE { + Ok(-arg.sort_properties) + } else { + Ok(SortProperties::Unordered) + } } } diff --git a/datafusion/functions/src/math/log.rs b/datafusion/functions/src/math/log.rs index e6c698ad1a80..8c1e8ac8fea3 100644 --- a/datafusion/functions/src/math/log.rs +++ b/datafusion/functions/src/math/log.rs @@ -17,6 +17,12 @@ //! Math function: `log()`. +use std::any::Any; +use std::sync::Arc; + +use super::power::PowerFunc; + +use arrow::array::{ArrayRef, Float32Array, Float64Array}; use arrow::datatypes::DataType; use datafusion_common::{ exec_err, internal_err, plan_datafusion_err, plan_err, DataFusionError, Result, @@ -24,15 +30,9 @@ use datafusion_common::{ }; use datafusion_expr::expr::ScalarFunction; use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyInfo}; -use datafusion_expr::{lit, ColumnarValue, Expr, FuncMonotonicity, ScalarUDF}; - -use arrow::array::{ArrayRef, Float32Array, Float64Array}; -use datafusion_expr::TypeSignature::*; +use datafusion_expr::sort_properties::{ExprProperties, SortProperties}; +use datafusion_expr::{lit, ColumnarValue, Expr, ScalarUDF, TypeSignature::*}; use datafusion_expr::{ScalarUDFImpl, Signature, Volatility}; -use std::any::Any; -use std::sync::Arc; - -use super::power::PowerFunc; #[derive(Debug)] pub struct LogFunc { @@ -81,8 +81,23 @@ impl ScalarUDFImpl for LogFunc { } } - fn monotonicity(&self) -> Result> { - Ok(Some(vec![Some(true), Some(false)])) + fn monotonicity(&self, input: &[ExprProperties]) -> Result { + match (input[0].sort_properties, input[1].sort_properties) { + (first @ SortProperties::Ordered(value), SortProperties::Ordered(base)) + if !value.descending && base.descending + || value.descending && !base.descending => + { + Ok(first) + } + ( + first @ (SortProperties::Ordered(_) | SortProperties::Singleton), + SortProperties::Singleton, + ) => Ok(first), + (SortProperties::Singleton, second @ SortProperties::Ordered(_)) => { + Ok(-second) + } + _ => Ok(SortProperties::Unordered), + } } // Support overloaded log(base, x) and log(x) which defaults to log(10, x) @@ -213,14 +228,13 @@ fn is_pow(func: &ScalarUDF) -> bool { mod tests { use std::collections::HashMap; - use datafusion_common::{ - cast::{as_float32_array, as_float64_array}, - DFSchema, - }; - use datafusion_expr::{execution_props::ExecutionProps, simplify::SimplifyContext}; - use super::*; + use datafusion_common::cast::{as_float32_array, as_float64_array}; + use datafusion_common::DFSchema; + use datafusion_expr::execution_props::ExecutionProps; + use datafusion_expr::simplify::SimplifyContext; + #[test] fn test_log_f64() { let args = [ diff --git a/datafusion/functions/src/math/mod.rs b/datafusion/functions/src/math/mod.rs index b6e8d26b6460..6c26ce79d0a5 100644 --- a/datafusion/functions/src/math/mod.rs +++ b/datafusion/functions/src/math/mod.rs @@ -17,9 +17,12 @@ //! "math" DataFusion functions -use datafusion_expr::ScalarUDF; use std::sync::Arc; +use crate::math::monotonicity::*; + +use datafusion_expr::ScalarUDF; + pub mod abs; pub mod cot; pub mod factorial; @@ -27,6 +30,7 @@ pub mod gcd; pub mod iszero; pub mod lcm; pub mod log; +pub mod monotonicity; pub mod nans; pub mod nanvl; pub mod pi; @@ -37,42 +41,60 @@ pub mod trunc; // Create UDFs make_udf_function!(abs::AbsFunc, ABS, abs); -make_math_unary_udf!(AcosFunc, ACOS, acos, acos, None); -make_math_unary_udf!(AcoshFunc, ACOSH, acosh, acosh, Some(vec![Some(true)])); -make_math_unary_udf!(AsinFunc, ASIN, asin, asin, None); -make_math_unary_udf!(AsinhFunc, ASINH, asinh, asinh, Some(vec![Some(true)])); -make_math_unary_udf!(AtanFunc, ATAN, atan, atan, Some(vec![Some(true)])); -make_math_unary_udf!(AtanhFunc, ATANH, atanh, atanh, Some(vec![Some(true)])); -make_math_binary_udf!(Atan2, ATAN2, atan2, atan2, Some(vec![Some(true)])); -make_math_unary_udf!(CbrtFunc, CBRT, cbrt, cbrt, None); -make_math_unary_udf!(CeilFunc, CEIL, ceil, ceil, Some(vec![Some(true)])); -make_math_unary_udf!(CosFunc, COS, cos, cos, None); -make_math_unary_udf!(CoshFunc, COSH, cosh, cosh, None); +make_math_unary_udf!(AcosFunc, ACOS, acos, acos, super::acos_monotonicity); +make_math_unary_udf!(AcoshFunc, ACOSH, acosh, acosh, super::acosh_monotonicity); +make_math_unary_udf!(AsinFunc, ASIN, asin, asin, super::asin_monotonicity); +make_math_unary_udf!(AsinhFunc, ASINH, asinh, asinh, super::asinh_monotonicity); +make_math_unary_udf!(AtanFunc, ATAN, atan, atan, super::atan_monotonicity); +make_math_unary_udf!(AtanhFunc, ATANH, atanh, atanh, super::atanh_monotonicity); +make_math_binary_udf!(Atan2, ATAN2, atan2, atan2, super::atan2_monotonicity); +make_math_unary_udf!(CbrtFunc, CBRT, cbrt, cbrt, super::cbrt_monotonicity); +make_math_unary_udf!(CeilFunc, CEIL, ceil, ceil, super::ceil_monotonicity); +make_math_unary_udf!(CosFunc, COS, cos, cos, super::cos_monotonicity); +make_math_unary_udf!(CoshFunc, COSH, cosh, cosh, super::cosh_monotonicity); make_udf_function!(cot::CotFunc, COT, cot); -make_math_unary_udf!(DegreesFunc, DEGREES, degrees, to_degrees, None); -make_math_unary_udf!(ExpFunc, EXP, exp, exp, Some(vec![Some(true)])); +make_math_unary_udf!( + DegreesFunc, + DEGREES, + degrees, + to_degrees, + super::degrees_monotonicity +); +make_math_unary_udf!(ExpFunc, EXP, exp, exp, super::exp_monotonicity); make_udf_function!(factorial::FactorialFunc, FACTORIAL, factorial); -make_math_unary_udf!(FloorFunc, FLOOR, floor, floor, Some(vec![Some(true)])); +make_math_unary_udf!(FloorFunc, FLOOR, floor, floor, super::floor_monotonicity); make_udf_function!(log::LogFunc, LOG, log); make_udf_function!(gcd::GcdFunc, GCD, gcd); make_udf_function!(nans::IsNanFunc, ISNAN, isnan); make_udf_function!(iszero::IsZeroFunc, ISZERO, iszero); make_udf_function!(lcm::LcmFunc, LCM, lcm); -make_math_unary_udf!(LnFunc, LN, ln, ln, Some(vec![Some(true)])); -make_math_unary_udf!(Log2Func, LOG2, log2, log2, Some(vec![Some(true)])); -make_math_unary_udf!(Log10Func, LOG10, log10, log10, Some(vec![Some(true)])); +make_math_unary_udf!(LnFunc, LN, ln, ln, super::ln_monotonicity); +make_math_unary_udf!(Log2Func, LOG2, log2, log2, super::log2_monotonicity); +make_math_unary_udf!(Log10Func, LOG10, log10, log10, super::log10_monotonicity); make_udf_function!(nanvl::NanvlFunc, NANVL, nanvl); make_udf_function!(pi::PiFunc, PI, pi); make_udf_function!(power::PowerFunc, POWER, power); -make_math_unary_udf!(RadiansFunc, RADIANS, radians, to_radians, None); +make_math_unary_udf!( + RadiansFunc, + RADIANS, + radians, + to_radians, + super::radians_monotonicity +); make_udf_function!(random::RandomFunc, RANDOM, random); make_udf_function!(round::RoundFunc, ROUND, round); -make_math_unary_udf!(SignumFunc, SIGNUM, signum, signum, None); -make_math_unary_udf!(SinFunc, SIN, sin, sin, None); -make_math_unary_udf!(SinhFunc, SINH, sinh, sinh, None); -make_math_unary_udf!(SqrtFunc, SQRT, sqrt, sqrt, None); -make_math_unary_udf!(TanFunc, TAN, tan, tan, None); -make_math_unary_udf!(TanhFunc, TANH, tanh, tanh, None); +make_math_unary_udf!( + SignumFunc, + SIGNUM, + signum, + signum, + super::signum_monotonicity +); +make_math_unary_udf!(SinFunc, SIN, sin, sin, super::sin_monotonicity); +make_math_unary_udf!(SinhFunc, SINH, sinh, sinh, super::sinh_monotonicity); +make_math_unary_udf!(SqrtFunc, SQRT, sqrt, sqrt, super::sqrt_monotonicity); +make_math_unary_udf!(TanFunc, TAN, tan, tan, super::tan_monotonicity); +make_math_unary_udf!(TanhFunc, TANH, tanh, tanh, super::tanh_monotonicity); make_udf_function!(trunc::TruncFunc, TRUNC, trunc); pub mod expr_fn { diff --git a/datafusion/functions/src/math/monotonicity.rs b/datafusion/functions/src/math/monotonicity.rs new file mode 100644 index 000000000000..5ce5654ae79e --- /dev/null +++ b/datafusion/functions/src/math/monotonicity.rs @@ -0,0 +1,241 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::datatypes::DataType; +use datafusion_common::{exec_err, Result, ScalarValue}; +use datafusion_expr::interval_arithmetic::Interval; +use datafusion_expr::sort_properties::{ExprProperties, SortProperties}; + +fn symmetric_unit_interval(data_type: &DataType) -> Result { + Interval::try_new( + ScalarValue::new_negative_one(data_type)?, + ScalarValue::new_one(data_type)?, + ) +} + +/// Non-increasing on the interval \[−1, 1\], undefined otherwise. +pub fn acos_monotonicity(input: &[ExprProperties]) -> Result { + let arg = &input[0]; + let range = &arg.range; + + let valid_domain = symmetric_unit_interval(&range.lower().data_type())?; + + if valid_domain.contains(range)? == Interval::CERTAINLY_TRUE { + Ok(-arg.sort_properties) + } else { + exec_err!("Input range of ACOS contains out-of-domain values") + } +} + +/// Non-decreasing for x ≥ 1, undefined otherwise. +pub fn acosh_monotonicity(input: &[ExprProperties]) -> Result { + let arg = &input[0]; + let range = &arg.range; + + let valid_domain = Interval::try_new( + ScalarValue::new_one(&range.lower().data_type())?, + ScalarValue::try_from(&range.upper().data_type())?, + )?; + + if valid_domain.contains(range)? == Interval::CERTAINLY_TRUE { + Ok(arg.sort_properties) + } else { + exec_err!("Input range of ACOSH contains out-of-domain values") + } +} + +/// Non-decreasing on the interval \[−1, 1\], undefined otherwise. +pub fn asin_monotonicity(input: &[ExprProperties]) -> Result { + let arg = &input[0]; + let range = &arg.range; + + let valid_domain = symmetric_unit_interval(&range.lower().data_type())?; + + if valid_domain.contains(range)? == Interval::CERTAINLY_TRUE { + Ok(arg.sort_properties) + } else { + exec_err!("Input range of ASIN contains out-of-domain values") + } +} + +/// Non-decreasing for all real numbers. +pub fn asinh_monotonicity(input: &[ExprProperties]) -> Result { + Ok(input[0].sort_properties) +} + +/// Non-decreasing for all real numbers. +pub fn atan_monotonicity(input: &[ExprProperties]) -> Result { + Ok(input[0].sort_properties) +} + +/// Non-decreasing on the interval \[−1, 1\], undefined otherwise. +pub fn atanh_monotonicity(input: &[ExprProperties]) -> Result { + let arg = &input[0]; + let range = &arg.range; + + let valid_domain = symmetric_unit_interval(&range.lower().data_type())?; + + if valid_domain.contains(range)? == Interval::CERTAINLY_TRUE { + Ok(arg.sort_properties) + } else { + exec_err!("Input range of ATANH contains out-of-domain values") + } +} + +/// Monotonicity depends on the quadrant. +// TODO: Implement monotonicity of the ATAN2 function. +pub fn atan2_monotonicity(_input: &[ExprProperties]) -> Result { + Ok(SortProperties::Unordered) +} + +/// Non-decreasing for all real numbers. +pub fn cbrt_monotonicity(input: &[ExprProperties]) -> Result { + Ok(input[0].sort_properties) +} + +/// Non-decreasing for all real numbers. +pub fn ceil_monotonicity(input: &[ExprProperties]) -> Result { + Ok(input[0].sort_properties) +} + +/// Non-increasing on \[0, π\] and then non-decreasing on \[π, 2π\]. +/// This pattern repeats periodically with a period of 2π. +// TODO: Implement monotonicity of the ATAN2 function. +pub fn cos_monotonicity(_input: &[ExprProperties]) -> Result { + Ok(SortProperties::Unordered) +} + +/// Non-decreasing for x ≥ 0 and symmetrically non-increasing for x ≤ 0. +pub fn cosh_monotonicity(input: &[ExprProperties]) -> Result { + let arg = &input[0]; + let range = &arg.range; + + let zero_point = Interval::make_zero(&range.lower().data_type())?; + + if range.gt_eq(&zero_point)? == Interval::CERTAINLY_TRUE { + Ok(arg.sort_properties) + } else if range.lt_eq(&zero_point)? == Interval::CERTAINLY_TRUE { + Ok(-arg.sort_properties) + } else { + Ok(SortProperties::Unordered) + } +} + +/// Non-decreasing function that converts radians to degrees. +pub fn degrees_monotonicity(input: &[ExprProperties]) -> Result { + Ok(input[0].sort_properties) +} + +/// Non-decreasing for all real numbers. +pub fn exp_monotonicity(input: &[ExprProperties]) -> Result { + Ok(input[0].sort_properties) +} + +/// Non-decreasing for all real numbers. +pub fn floor_monotonicity(input: &[ExprProperties]) -> Result { + Ok(input[0].sort_properties) +} + +/// Non-decreasing for x ≥ 0, undefined otherwise. +pub fn ln_monotonicity(input: &[ExprProperties]) -> Result { + let arg = &input[0]; + let range = &arg.range; + + let zero_point = Interval::make_zero(&range.lower().data_type())?; + + if range.gt_eq(&zero_point)? == Interval::CERTAINLY_TRUE { + Ok(arg.sort_properties) + } else { + exec_err!("Input range of LN contains out-of-domain values") + } +} + +/// Non-decreasing for x ≥ 0, undefined otherwise. +pub fn log2_monotonicity(input: &[ExprProperties]) -> Result { + let arg = &input[0]; + let range = &arg.range; + + let zero_point = Interval::make_zero(&range.lower().data_type())?; + + if range.gt_eq(&zero_point)? == Interval::CERTAINLY_TRUE { + Ok(arg.sort_properties) + } else { + exec_err!("Input range of LOG2 contains out-of-domain values") + } +} + +/// Non-decreasing for x ≥ 0, undefined otherwise. +pub fn log10_monotonicity(input: &[ExprProperties]) -> Result { + let arg = &input[0]; + let range = &arg.range; + + let zero_point = Interval::make_zero(&range.lower().data_type())?; + + if range.gt_eq(&zero_point)? == Interval::CERTAINLY_TRUE { + Ok(arg.sort_properties) + } else { + exec_err!("Input range of LOG10 contains out-of-domain values") + } +} + +/// Non-decreasing for all real numbers x. +pub fn radians_monotonicity(input: &[ExprProperties]) -> Result { + Ok(input[0].sort_properties) +} + +/// Non-decreasing for all real numbers x. +pub fn signum_monotonicity(input: &[ExprProperties]) -> Result { + Ok(input[0].sort_properties) +} + +/// Non-decreasing on \[0, π\] and then non-increasing on \[π, 2π\]. +/// This pattern repeats periodically with a period of 2π. +// TODO: Implement monotonicity of the SIN function. +pub fn sin_monotonicity(_input: &[ExprProperties]) -> Result { + Ok(SortProperties::Unordered) +} + +/// Non-decreasing for all real numbers. +pub fn sinh_monotonicity(input: &[ExprProperties]) -> Result { + Ok(input[0].sort_properties) +} + +/// Non-decreasing for x ≥ 0, undefined otherwise. +pub fn sqrt_monotonicity(input: &[ExprProperties]) -> Result { + let arg = &input[0]; + let range = &arg.range; + + let zero_point = Interval::make_zero(&range.lower().data_type())?; + + if range.gt_eq(&zero_point)? == Interval::CERTAINLY_TRUE { + Ok(arg.sort_properties) + } else { + exec_err!("Input range of SQRT contains out-of-domain values") + } +} + +/// Non-decreasing between vertical asymptotes at x = k * π ± π / 2 for any +/// integer k. +// TODO: Implement monotonicity of the TAN function. +pub fn tan_monotonicity(_input: &[ExprProperties]) -> Result { + Ok(SortProperties::Unordered) +} + +/// Non-decreasing for all real numbers. +pub fn tanh_monotonicity(input: &[ExprProperties]) -> Result { + Ok(input[0].sort_properties) +} diff --git a/datafusion/functions/src/math/pi.rs b/datafusion/functions/src/math/pi.rs index f9403e411fe2..60c94b6ca622 100644 --- a/datafusion/functions/src/math/pi.rs +++ b/datafusion/functions/src/math/pi.rs @@ -19,10 +19,9 @@ use std::any::Any; use arrow::datatypes::DataType; use arrow::datatypes::DataType::Float64; - use datafusion_common::{not_impl_err, Result, ScalarValue}; -use datafusion_expr::{ColumnarValue, FuncMonotonicity, Volatility}; -use datafusion_expr::{ScalarUDFImpl, Signature}; +use datafusion_expr::sort_properties::{ExprProperties, SortProperties}; +use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility}; #[derive(Debug)] pub struct PiFunc { @@ -70,7 +69,8 @@ impl ScalarUDFImpl for PiFunc { )))) } - fn monotonicity(&self) -> Result> { - Ok(Some(vec![Some(true)])) + fn monotonicity(&self, _input: &[ExprProperties]) -> Result { + // This function returns a constant value. + Ok(SortProperties::Singleton) } } diff --git a/datafusion/functions/src/math/round.rs b/datafusion/functions/src/math/round.rs index f4a163137a35..600f4fd5472a 100644 --- a/datafusion/functions/src/math/round.rs +++ b/datafusion/functions/src/math/round.rs @@ -18,15 +18,15 @@ use std::any::Any; use std::sync::Arc; +use crate::utils::make_scalar_function; + use arrow::array::{ArrayRef, Float32Array, Float64Array, Int64Array}; use arrow::datatypes::DataType; use arrow::datatypes::DataType::{Float32, Float64}; - -use crate::utils::make_scalar_function; use datafusion_common::{exec_err, DataFusionError, Result, ScalarValue}; +use datafusion_expr::sort_properties::{ExprProperties, SortProperties}; use datafusion_expr::TypeSignature::Exact; -use datafusion_expr::{ColumnarValue, FuncMonotonicity}; -use datafusion_expr::{ScalarUDFImpl, Signature, Volatility}; +use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility}; #[derive(Debug)] pub struct RoundFunc { @@ -80,8 +80,19 @@ impl ScalarUDFImpl for RoundFunc { make_scalar_function(round, vec![])(args) } - fn monotonicity(&self) -> Result> { - Ok(Some(vec![Some(true)])) + fn monotonicity(&self, input: &[ExprProperties]) -> Result { + // round preserves the order of the first argument + let value = &input[0]; + let precision = input.get(1); + + if precision + .map(|r| r.sort_properties.eq(&SortProperties::Singleton)) + .unwrap_or(true) + { + Ok(value.sort_properties) + } else { + Ok(SortProperties::Unordered) + } } } @@ -179,10 +190,12 @@ pub fn round(args: &[ArrayRef]) -> Result { #[cfg(test)] mod test { + use std::sync::Arc; + use crate::math::round::round; + use arrow::array::{ArrayRef, Float32Array, Float64Array, Int64Array}; use datafusion_common::cast::{as_float32_array, as_float64_array}; - use std::sync::Arc; #[test] fn test_round_f32() { diff --git a/datafusion/functions/src/math/trunc.rs b/datafusion/functions/src/math/trunc.rs index 6f88099889cc..0c4d38564b9f 100644 --- a/datafusion/functions/src/math/trunc.rs +++ b/datafusion/functions/src/math/trunc.rs @@ -18,16 +18,16 @@ use std::any::Any; use std::sync::Arc; +use crate::utils::make_scalar_function; + use arrow::array::{ArrayRef, Float32Array, Float64Array, Int64Array}; use arrow::datatypes::DataType; use arrow::datatypes::DataType::{Float32, Float64}; - -use crate::utils::make_scalar_function; use datafusion_common::ScalarValue::Int64; use datafusion_common::{exec_err, DataFusionError, Result}; +use datafusion_expr::sort_properties::{ExprProperties, SortProperties}; use datafusion_expr::TypeSignature::Exact; -use datafusion_expr::{ColumnarValue, FuncMonotonicity}; -use datafusion_expr::{ScalarUDFImpl, Signature, Volatility}; +use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility}; #[derive(Debug)] pub struct TruncFunc { @@ -86,8 +86,19 @@ impl ScalarUDFImpl for TruncFunc { make_scalar_function(trunc, vec![])(args) } - fn monotonicity(&self) -> Result> { - Ok(Some(vec![Some(true)])) + fn monotonicity(&self, input: &[ExprProperties]) -> Result { + // trunc preserves the order of the first argument + let value = &input[0]; + let precision = input.get(1); + + if precision + .map(|r| r.sort_properties.eq(&SortProperties::Singleton)) + .unwrap_or(true) + { + Ok(value.sort_properties) + } else { + Ok(SortProperties::Unordered) + } } } @@ -156,10 +167,12 @@ fn compute_truncate64(x: f64, y: i64) -> f64 { #[cfg(test)] mod test { + use std::sync::Arc; + use crate::math::trunc::trunc; + use arrow::array::{ArrayRef, Float32Array, Float64Array, Int64Array}; use datafusion_common::cast::{as_float32_array, as_float64_array}; - use std::sync::Arc; #[test] fn test_truncate_32() { diff --git a/datafusion/physical-expr-common/src/lib.rs b/datafusion/physical-expr-common/src/lib.rs index 53e3134a1b05..f335958698ab 100644 --- a/datafusion/physical-expr-common/src/lib.rs +++ b/datafusion/physical-expr-common/src/lib.rs @@ -19,6 +19,5 @@ pub mod aggregate; pub mod expressions; pub mod physical_expr; pub mod sort_expr; -pub mod sort_properties; pub mod tree_node; pub mod utils; diff --git a/datafusion/physical-expr-common/src/physical_expr.rs b/datafusion/physical-expr-common/src/physical_expr.rs index a0f8bdf10377..00b3dd725dc2 100644 --- a/datafusion/physical-expr-common/src/physical_expr.rs +++ b/datafusion/physical-expr-common/src/physical_expr.rs @@ -20,17 +20,17 @@ use std::fmt::{Debug, Display}; use std::hash::{Hash, Hasher}; use std::sync::Arc; +use crate::utils::scatter; + use arrow::array::BooleanArray; use arrow::compute::filter_record_batch; use arrow::datatypes::{DataType, Schema}; use arrow::record_batch::RecordBatch; use datafusion_common::{internal_err, not_impl_err, Result}; use datafusion_expr::interval_arithmetic::Interval; +use datafusion_expr::sort_properties::ExprProperties; use datafusion_expr::ColumnarValue; -use crate::sort_properties::SortProperties; -use crate::utils::scatter; - /// See [create_physical_expr](https://docs.rs/datafusion/latest/datafusion/physical_expr/fn.create_physical_expr.html) /// for examples of creating `PhysicalExpr` from `Expr` pub trait PhysicalExpr: Send + Sync + Display + Debug + PartialEq { @@ -154,17 +154,13 @@ pub trait PhysicalExpr: Send + Sync + Display + Debug + PartialEq { /// directly because it must remain object safe. fn dyn_hash(&self, _state: &mut dyn Hasher); - /// The order information of a PhysicalExpr can be estimated from its children. - /// This is especially helpful for projection expressions. If we can ensure that the - /// order of a PhysicalExpr to project matches with the order of SortExec, we can - /// eliminate that SortExecs. - /// - /// By recursively calling this function, we can obtain the overall order - /// information of the PhysicalExpr. Since `SortOptions` cannot fully handle - /// the propagation of unordered columns and literals, the `SortProperties` - /// struct is used. - fn get_ordering(&self, _children: &[SortProperties]) -> SortProperties { - SortProperties::Unordered + /// Calculates the properties of this [`PhysicalExpr`] based on its + /// children's properties (i.e. order and range), recursively aggregating + /// the information from its children. In cases where the [`PhysicalExpr`] + /// has no children (e.g., `Literal` or `Column`), these properties should + /// be specified externally, as the function defaults to unknown properties. + fn get_properties(&self, _children: &[ExprProperties]) -> Result { + Ok(ExprProperties::new_unknown()) } } diff --git a/datafusion/physical-expr-common/src/utils.rs b/datafusion/physical-expr-common/src/utils.rs index 459b5a4849cb..601d344e4aac 100644 --- a/datafusion/physical-expr-common/src/utils.rs +++ b/datafusion/physical-expr-common/src/utils.rs @@ -15,13 +15,34 @@ // specific language governing permissions and limitations // under the License. -use arrow::{ - array::{make_array, Array, ArrayRef, BooleanArray, MutableArrayData}, - compute::{and_kleene, is_not_null, SlicesIterator}, +use std::sync::Arc; + +use crate::{ + physical_expr::PhysicalExpr, sort_expr::PhysicalSortExpr, tree_node::ExprContext, }; -use datafusion_common::Result; -use crate::sort_expr::PhysicalSortExpr; +use arrow::array::{make_array, Array, ArrayRef, BooleanArray, MutableArrayData}; +use arrow::compute::{and_kleene, is_not_null, SlicesIterator}; +use datafusion_common::Result; +use datafusion_expr::sort_properties::ExprProperties; + +/// Represents a [`PhysicalExpr`] node with associated properties (order and +/// range) in a context where properties are tracked. +pub type ExprPropertiesNode = ExprContext; + +impl ExprPropertiesNode { + /// Constructs a new `ExprPropertiesNode` with unknown properties for a + /// given physical expression. This node initializes with default properties + /// and recursively applies this to all child expressions. + pub fn new_unknown(expr: Arc) -> Self { + let children = expr.children().into_iter().map(Self::new_unknown).collect(); + Self { + expr, + data: ExprProperties::new_unknown(), + children, + } + } +} /// Scatter `truthy` array by boolean mask. When the mask evaluates `true`, next values of `truthy` /// are taken, when the mask evaluates `false` values null values are filled. diff --git a/datafusion/physical-expr/src/equivalence/mod.rs b/datafusion/physical-expr/src/equivalence/mod.rs index 3ce641c5aa46..7faf2caae01c 100644 --- a/datafusion/physical-expr/src/equivalence/mod.rs +++ b/datafusion/physical-expr/src/equivalence/mod.rs @@ -70,7 +70,6 @@ pub fn add_offset_to_expr( #[cfg(test)] mod tests { - use super::*; use crate::expressions::col; use crate::PhysicalSortExpr; @@ -147,7 +146,7 @@ mod tests { let col_f = &col("f", &test_schema)?; let col_g = &col("g", &test_schema)?; let mut eq_properties = EquivalenceProperties::new(test_schema.clone()); - eq_properties.add_equal_conditions(col_a, col_c); + eq_properties.add_equal_conditions(col_a, col_c)?; let option_asc = SortOptions { descending: false, @@ -204,7 +203,7 @@ mod tests { let mut eq_properties = EquivalenceProperties::new(test_schema.clone()); // Define a and f are aliases - eq_properties.add_equal_conditions(col_a, col_f); + eq_properties.add_equal_conditions(col_a, col_f)?; // Column e has constant value. eq_properties = eq_properties.add_constants([col_e.clone()]); @@ -338,11 +337,11 @@ mod tests { let col_y_expr = Arc::new(Column::new("y", 4)) as Arc; // a and b are aliases - eq_properties.add_equal_conditions(&col_a_expr, &col_b_expr); + eq_properties.add_equal_conditions(&col_a_expr, &col_b_expr)?; assert_eq!(eq_properties.eq_group().len(), 1); // This new entry is redundant, size shouldn't increase - eq_properties.add_equal_conditions(&col_b_expr, &col_a_expr); + eq_properties.add_equal_conditions(&col_b_expr, &col_a_expr)?; assert_eq!(eq_properties.eq_group().len(), 1); let eq_groups = &eq_properties.eq_group().classes[0]; assert_eq!(eq_groups.len(), 2); @@ -351,7 +350,7 @@ mod tests { // b and c are aliases. Exising equivalence class should expand, // however there shouldn't be any new equivalence class - eq_properties.add_equal_conditions(&col_b_expr, &col_c_expr); + eq_properties.add_equal_conditions(&col_b_expr, &col_c_expr)?; assert_eq!(eq_properties.eq_group().len(), 1); let eq_groups = &eq_properties.eq_group().classes[0]; assert_eq!(eq_groups.len(), 3); @@ -360,12 +359,12 @@ mod tests { assert!(eq_groups.contains(&col_c_expr)); // This is a new set of equality. Hence equivalent class count should be 2. - eq_properties.add_equal_conditions(&col_x_expr, &col_y_expr); + eq_properties.add_equal_conditions(&col_x_expr, &col_y_expr)?; assert_eq!(eq_properties.eq_group().len(), 2); // This equality bridges distinct equality sets. // Hence equivalent class count should decrease from 2 to 1. - eq_properties.add_equal_conditions(&col_x_expr, &col_a_expr); + eq_properties.add_equal_conditions(&col_x_expr, &col_a_expr)?; assert_eq!(eq_properties.eq_group().len(), 1); let eq_groups = &eq_properties.eq_group().classes[0]; assert_eq!(eq_groups.len(), 5); diff --git a/datafusion/physical-expr/src/equivalence/ordering.rs b/datafusion/physical-expr/src/equivalence/ordering.rs index ed4600f2d95e..7857d9df726e 100644 --- a/datafusion/physical-expr/src/equivalence/ordering.rs +++ b/datafusion/physical-expr/src/equivalence/ordering.rs @@ -223,26 +223,26 @@ fn resolve_overlap(orderings: &mut [LexOrdering], idx: usize, pre_idx: usize) -> mod tests { use std::sync::Arc; - use arrow::datatypes::{DataType, Field, Schema}; - use arrow_schema::SortOptions; - use itertools::Itertools; - - use datafusion_common::{DFSchema, Result}; - use datafusion_expr::{Operator, ScalarUDF}; - use crate::equivalence::tests::{ convert_to_orderings, convert_to_sort_exprs, create_random_schema, - create_test_params, generate_table_for_eq_properties, is_table_same_after_sort, + create_test_params, create_test_schema, generate_table_for_eq_properties, + is_table_same_after_sort, }; - use crate::equivalence::{tests::create_test_schema, EquivalenceProperties}; use crate::equivalence::{ - EquivalenceClass, EquivalenceGroup, OrderingEquivalenceClass, + EquivalenceClass, EquivalenceGroup, EquivalenceProperties, + OrderingEquivalenceClass, }; - use crate::expressions::Column; - use crate::expressions::{col, BinaryExpr}; + use crate::expressions::{col, BinaryExpr, Column}; use crate::utils::tests::TestScalarUDF; use crate::{PhysicalExpr, PhysicalSortExpr}; + use arrow::datatypes::{DataType, Field, Schema}; + use arrow_schema::SortOptions; + use datafusion_common::{DFSchema, Result}; + use datafusion_expr::{Operator, ScalarUDF}; + + use itertools::Itertools; + #[test] fn test_ordering_satisfy() -> Result<()> { let input_schema = Arc::new(Schema::new(vec![ @@ -883,7 +883,7 @@ mod tests { }; // a=c (e.g they are aliases). let mut eq_properties = EquivalenceProperties::new(test_schema); - eq_properties.add_equal_conditions(col_a, col_c); + eq_properties.add_equal_conditions(col_a, col_c)?; let orderings = vec![ vec![(col_a, options)], diff --git a/datafusion/physical-expr/src/equivalence/projection.rs b/datafusion/physical-expr/src/equivalence/projection.rs index 260610f23dc6..b5ac149d8b71 100644 --- a/datafusion/physical-expr/src/equivalence/projection.rs +++ b/datafusion/physical-expr/src/equivalence/projection.rs @@ -17,14 +17,13 @@ use std::sync::Arc; -use arrow::datatypes::SchemaRef; +use crate::expressions::Column; +use crate::PhysicalExpr; +use arrow::datatypes::SchemaRef; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::{internal_err, Result}; -use crate::expressions::Column; -use crate::PhysicalExpr; - /// Stores the mapping between source expressions and target expressions for a /// projection. #[derive(Debug, Clone)] @@ -114,14 +113,7 @@ impl ProjectionMapping { #[cfg(test)] mod tests { - - use arrow::datatypes::{DataType, Field, Schema}; - use arrow_schema::{SortOptions, TimeUnit}; - use itertools::Itertools; - - use datafusion_common::DFSchema; - use datafusion_expr::{Operator, ScalarUDF}; - + use super::*; use crate::equivalence::tests::{ apply_projection, convert_to_orderings, convert_to_orderings_owned, create_random_schema, generate_table_for_eq_properties, is_table_same_after_sort, @@ -133,7 +125,12 @@ mod tests { use crate::utils::tests::TestScalarUDF; use crate::PhysicalSortExpr; - use super::*; + use arrow::datatypes::{DataType, Field, Schema}; + use arrow_schema::{SortOptions, TimeUnit}; + use datafusion_common::DFSchema; + use datafusion_expr::{Operator, ScalarUDF}; + + use itertools::Itertools; #[test] fn project_orderings() -> Result<()> { @@ -941,7 +938,7 @@ mod tests { for (orderings, equal_columns, expected) in test_cases { let mut eq_properties = EquivalenceProperties::new(schema.clone()); for (lhs, rhs) in equal_columns { - eq_properties.add_equal_conditions(lhs, rhs); + eq_properties.add_equal_conditions(lhs, rhs)?; } let orderings = convert_to_orderings(&orderings); diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index c654208208df..016c4c4ae107 100644 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -18,25 +18,27 @@ use std::hash::{Hash, Hasher}; use std::sync::Arc; -use arrow_schema::{SchemaRef, SortOptions}; -use indexmap::{IndexMap, IndexSet}; -use itertools::Itertools; - -use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; -use datafusion_common::{JoinSide, JoinType, Result}; - +use super::ordering::collapse_lex_ordering; use crate::equivalence::{ collapse_lex_req, EquivalenceGroup, OrderingEquivalenceClass, ProjectionMapping, }; use crate::expressions::{CastExpr, Literal}; -use crate::sort_properties::{ExprOrdering, SortProperties}; use crate::{ physical_exprs_contains, LexOrdering, LexOrderingRef, LexRequirement, LexRequirementRef, PhysicalExpr, PhysicalExprRef, PhysicalSortExpr, PhysicalSortRequirement, }; -use super::ordering::collapse_lex_ordering; +use arrow_schema::{SchemaRef, SortOptions}; +use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; +use datafusion_common::{JoinSide, JoinType, Result}; +use datafusion_expr::interval_arithmetic::Interval; +use datafusion_expr::sort_properties::{ExprProperties, SortProperties}; +use datafusion_physical_expr_common::expressions::column::Column; +use datafusion_physical_expr_common::utils::ExprPropertiesNode; + +use indexmap::{IndexMap, IndexSet}; +use itertools::Itertools; /// A `EquivalenceProperties` object stores useful information related to a schema. /// Currently, it keeps track of: @@ -197,7 +199,7 @@ impl EquivalenceProperties { &mut self, left: &Arc, right: &Arc, - ) { + ) -> Result<()> { // Discover new constants in light of new the equality: if self.is_expr_constant(left) { // Left expression is constant, add right as constant @@ -216,27 +218,34 @@ impl EquivalenceProperties { let mut new_orderings = vec![]; for ordering in self.normalized_oeq_class().iter() { let expressions = if left.eq(&ordering[0].expr) { - // left expression is leading ordering + // Left expression is leading ordering Some((ordering[0].options, right)) } else if right.eq(&ordering[0].expr) { - // right expression is leading ordering + // Right expression is leading ordering Some((ordering[0].options, left)) } else { None }; if let Some((leading_ordering, other_expr)) = expressions { - // Only handle expressions with exactly one child - // TODO: it should be possible to handle expressions orderings f(a, b, c), a, b, c - // if f is monotonic in all arguments - // First Expression after leading ordering + // Currently, we only handle expressions with a single child. + // TODO: It should be possible to handle expressions orderings like + // f(a, b, c), a, b, c if f is monotonic in all arguments. + // First expression after leading ordering if let Some(next_expr) = ordering.get(1) { let children = other_expr.children(); if children.len() == 1 && children[0].eq(&next_expr.expr) && SortProperties::Ordered(leading_ordering) - == other_expr.get_ordering(&[SortProperties::Ordered( - next_expr.options, - )]) + == other_expr + .get_properties(&[ExprProperties { + sort_properties: SortProperties::Ordered( + leading_ordering, + ), + range: Interval::make_unbounded( + &other_expr.data_type(&self.schema)?, + )?, + }])? + .sort_properties { // Assume existing ordering is [a ASC, b ASC] // When equality a = f(b) is given, If we know that given ordering `[b ASC]`, ordering `[f(b) ASC]` is valid, @@ -254,6 +263,7 @@ impl EquivalenceProperties { // Add equal expressions to the state self.eq_group.add_equal_conditions(left, right); + Ok(()) } /// Track/register physical expressions with constant values. @@ -378,11 +388,15 @@ impl EquivalenceProperties { /// /// Returns `true` if the specified ordering is satisfied, `false` otherwise. fn ordering_satisfy_single(&self, req: &PhysicalSortRequirement) -> bool { - let expr_ordering = self.get_expr_ordering(req.expr.clone()); - let ExprOrdering { expr, data, .. } = expr_ordering; - match data { + let ExprProperties { + sort_properties, .. + } = self.get_expr_properties(req.expr.clone()); + match sort_properties { SortProperties::Ordered(options) => { - let sort_expr = PhysicalSortExpr { expr, options }; + let sort_expr = PhysicalSortExpr { + expr: req.expr.clone(), + options, + }; sort_expr.satisfy(req, self.schema()) } // Singleton expressions satisfies any ordering. @@ -698,8 +712,9 @@ impl EquivalenceProperties { referred_dependencies(&dependency_map, source) .into_iter() .filter_map(|relevant_deps| { - if let SortProperties::Ordered(options) = - get_expr_ordering(source, &relevant_deps) + if let Ok(SortProperties::Ordered(options)) = + get_expr_properties(source, &relevant_deps, &self.schema) + .map(|prop| prop.sort_properties) { Some((options, relevant_deps)) } else { @@ -837,16 +852,27 @@ impl EquivalenceProperties { let ordered_exprs = search_indices .iter() .flat_map(|&idx| { - let ExprOrdering { expr, data, .. } = - eq_properties.get_expr_ordering(exprs[idx].clone()); - match data { - SortProperties::Ordered(options) => { - Some((PhysicalSortExpr { expr, options }, idx)) - } + let ExprProperties { + sort_properties, .. + } = eq_properties.get_expr_properties(exprs[idx].clone()); + match sort_properties { + SortProperties::Ordered(options) => Some(( + PhysicalSortExpr { + expr: exprs[idx].clone(), + options, + }, + idx, + )), SortProperties::Singleton => { // Assign default ordering to constant expressions let options = SortOptions::default(); - Some((PhysicalSortExpr { expr, options }, idx)) + Some(( + PhysicalSortExpr { + expr: exprs[idx].clone(), + options, + }, + idx, + )) } SortProperties::Unordered => None, } @@ -895,32 +921,33 @@ impl EquivalenceProperties { is_constant_recurse(&normalized_constants, &normalized_expr) } - /// Retrieves the ordering information for a given physical expression. + /// Retrieves the properties for a given physical expression. /// - /// This function constructs an `ExprOrdering` object for the provided + /// This function constructs an [`ExprProperties`] object for the given /// expression, which encapsulates information about the expression's - /// ordering, including its [`SortProperties`]. + /// properties, including its [`SortProperties`] and [`Interval`]. /// - /// # Arguments + /// # Parameters /// /// - `expr`: An `Arc` representing the physical expression /// for which ordering information is sought. /// /// # Returns /// - /// Returns an `ExprOrdering` object containing the ordering information for - /// the given expression. - pub fn get_expr_ordering(&self, expr: Arc) -> ExprOrdering { - ExprOrdering::new_default(expr.clone()) - .transform_up(|expr| Ok(update_ordering(expr, self))) + /// Returns an [`ExprProperties`] object containing the ordering and range + /// information for the given expression. + pub fn get_expr_properties(&self, expr: Arc) -> ExprProperties { + ExprPropertiesNode::new_unknown(expr) + .transform_up(|expr| update_properties(expr, self)) .data() - // Guaranteed to always return `Ok`. - .unwrap() + .map(|node| node.data) + .unwrap_or(ExprProperties::new_unknown()) } } -/// Calculates the [`SortProperties`] of a given [`ExprOrdering`] node. -/// The node can either be a leaf node, or an intermediate node: +/// Calculates the properties of a given [`ExprPropertiesNode`]. +/// +/// Order information can be retrieved as: /// - If it is a leaf node, we directly find the order of the node by looking /// at the given sort expression and equivalence properties if it is a `Column` /// leaf, or we mark it as unordered. In the case of a `Literal` leaf, we mark @@ -931,30 +958,41 @@ impl EquivalenceProperties { /// node directly matches with the sort expression. If there is a match, the /// sort expression emerges at that node immediately, discarding the recursive /// result coming from its children. -fn update_ordering( - mut node: ExprOrdering, +/// +/// Range information is calculated as: +/// - If it is a `Literal` node, we set the range as a point value. If it is a +/// `Column` node, we set the datatype of the range, but cannot give an interval +/// for the range, yet. +/// - If it is an intermediate node, the children states matter. Each `PhysicalExpr` +/// and operator has its own rules on how to propagate the children range. +fn update_properties( + mut node: ExprPropertiesNode, eq_properties: &EquivalenceProperties, -) -> Transformed { - // We have a Column, which is one of the two possible leaf node types: +) -> Result> { + // First, try to gather the information from the children: + if !node.expr.children().is_empty() { + // We have an intermediate (non-leaf) node, account for its children: + let children_props = node.children.iter().map(|c| c.data.clone()).collect_vec(); + node.data = node.expr.get_properties(&children_props)?; + } else if node.expr.as_any().is::() { + // We have a Literal, which is one of the two possible leaf node types: + node.data = node.expr.get_properties(&[])?; + } else if node.expr.as_any().is::() { + // We have a Column, which is the other possible leaf node type: + node.data.range = + Interval::make_unbounded(&node.expr.data_type(eq_properties.schema())?)? + } + // Now, check what we know about orderings: let normalized_expr = eq_properties.eq_group.normalize_expr(node.expr.clone()); if eq_properties.is_expr_constant(&normalized_expr) { - node.data = SortProperties::Singleton; + node.data.sort_properties = SortProperties::Singleton; } else if let Some(options) = eq_properties .normalized_oeq_class() .get_options(&normalized_expr) { - node.data = SortProperties::Ordered(options); - } else if !node.expr.children().is_empty() { - // We have an intermediate (non-leaf) node, account for its children: - let children_orderings = node.children.iter().map(|c| c.data).collect_vec(); - node.data = node.expr.get_ordering(&children_orderings); - } else if node.expr.as_any().is::() { - // We have a Literal, which is the other possible leaf node type: - node.data = node.expr.get_ordering(&[]); - } else { - return Transformed::no(node); + node.data.sort_properties = SortProperties::Ordered(options); } - Transformed::yes(node) + Ok(Transformed::yes(node)) } /// This function determines whether the provided expression is constant @@ -1124,8 +1162,9 @@ fn generate_dependency_orderings( .collect() } -/// This function examines the given expression and the sort expressions it -/// refers to determine the ordering properties of the expression. +/// This function examines the given expression and its properties to determine +/// the ordering properties of the expression. The range knowledge is not utilized +/// yet in the scope of this function. /// /// # Parameters /// @@ -1133,26 +1172,41 @@ fn generate_dependency_orderings( /// which ordering properties need to be determined. /// - `dependencies`: A reference to `Dependencies`, containing sort expressions /// referred to by `expr`. +/// - `schema``: A reference to the schema which the `expr` columns refer. /// /// # Returns /// /// A `SortProperties` indicating the ordering information of the given expression. -fn get_expr_ordering( +fn get_expr_properties( expr: &Arc, dependencies: &Dependencies, -) -> SortProperties { + schema: &SchemaRef, +) -> Result { if let Some(column_order) = dependencies.iter().find(|&order| expr.eq(&order.expr)) { // If exact match is found, return its ordering. - SortProperties::Ordered(column_order.options) + Ok(ExprProperties { + sort_properties: SortProperties::Ordered(column_order.options), + range: Interval::make_unbounded(&expr.data_type(schema)?)?, + }) + } else if expr.as_any().downcast_ref::().is_some() { + Ok(ExprProperties { + sort_properties: SortProperties::Unordered, + range: Interval::make_unbounded(&expr.data_type(schema)?)?, + }) + } else if let Some(literal) = expr.as_any().downcast_ref::() { + Ok(ExprProperties { + sort_properties: SortProperties::Singleton, + range: Interval::try_new(literal.value().clone(), literal.value().clone())?, + }) } else { // Find orderings of its children let child_states = expr .children() .iter() - .map(|child| get_expr_ordering(child, dependencies)) - .collect::>(); + .map(|child| get_expr_properties(child, dependencies, schema)) + .collect::>>()?; // Calculate expression ordering using ordering of its children. - expr.get_ordering(&child_states) + expr.get_properties(&child_states) } } @@ -1351,12 +1405,7 @@ impl Hash for ExprWrapper { mod tests { use std::ops::Not; - use arrow::datatypes::{DataType, Field, Schema}; - use arrow_schema::{Fields, TimeUnit}; - - use datafusion_common::DFSchema; - use datafusion_expr::{Operator, ScalarUDF}; - + use super::*; use crate::equivalence::add_offset_to_expr; use crate::equivalence::tests::{ convert_to_orderings, convert_to_sort_exprs, convert_to_sort_reqs, @@ -1366,7 +1415,10 @@ mod tests { use crate::expressions::{col, BinaryExpr, Column}; use crate::utils::tests::TestScalarUDF; - use super::*; + use arrow::datatypes::{DataType, Field, Schema}; + use arrow_schema::{Fields, TimeUnit}; + use datafusion_common::DFSchema; + use datafusion_expr::{Operator, ScalarUDF}; #[test] fn project_equivalence_properties_test() -> Result<()> { @@ -1577,8 +1629,8 @@ mod tests { let mut join_eq_properties = EquivalenceProperties::new(Arc::new(schema)); // a=x and d=w - join_eq_properties.add_equal_conditions(col_a, col_x); - join_eq_properties.add_equal_conditions(col_d, col_w); + join_eq_properties.add_equal_conditions(col_a, col_x)?; + join_eq_properties.add_equal_conditions(col_d, col_w)?; updated_right_ordering_equivalence_class( &mut right_oeq_class, @@ -1615,7 +1667,7 @@ mod tests { let col_c_expr = col("c", &schema)?; let mut eq_properties = EquivalenceProperties::new(Arc::new(schema.clone())); - eq_properties.add_equal_conditions(&col_a_expr, &col_c_expr); + eq_properties.add_equal_conditions(&col_a_expr, &col_c_expr)?; let others = vec![ vec![PhysicalSortExpr { expr: col_b_expr.clone(), @@ -1760,7 +1812,7 @@ mod tests { } #[test] - fn test_update_ordering() -> Result<()> { + fn test_update_properties() -> Result<()> { let schema = Schema::new(vec![ Field::new("a", DataType::Int32, true), Field::new("b", DataType::Int32, true), @@ -1778,7 +1830,7 @@ mod tests { nulls_first: false, }; // b=a (e.g they are aliases) - eq_properties.add_equal_conditions(col_b, col_a); + eq_properties.add_equal_conditions(col_b, col_a)?; // [b ASC], [d ASC] eq_properties.add_new_orderings(vec![ vec![PhysicalSortExpr { @@ -1821,12 +1873,12 @@ mod tests { .iter() .flat_map(|ordering| ordering.first().cloned()) .collect::>(); - let expr_ordering = eq_properties.get_expr_ordering(expr.clone()); + let expr_props = eq_properties.get_expr_properties(expr.clone()); let err_msg = format!( "expr:{:?}, expected: {:?}, actual: {:?}, leading_orderings: {leading_orderings:?}", - expr, expected, expr_ordering.data + expr, expected, expr_props.sort_properties ); - assert_eq!(expr_ordering.data, expected, "{}", err_msg); + assert_eq!(expr_props.sort_properties, expected, "{}", err_msg); } Ok(()) @@ -2266,6 +2318,7 @@ mod tests { Ok(()) } + #[test] fn test_eliminate_redundant_monotonic_sorts() -> Result<()> { let schema = Arc::new(Schema::new(vec![ @@ -2334,7 +2387,7 @@ mod tests { for case in cases { let mut properties = base_properties.clone().add_constants(case.constants); for [left, right] in &case.equal_conditions { - properties.add_equal_conditions(left, right) + properties.add_equal_conditions(left, right)? } let sort = case diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs index 76154dca0338..08f7523f92f0 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr/src/expressions/binary.rs @@ -23,21 +23,21 @@ use std::{any::Any, sync::Arc}; use crate::expressions::datum::{apply, apply_cmp}; use crate::intervals::cp_solver::{propagate_arithmetic, propagate_comparison}; use crate::physical_expr::down_cast_any_ref; -use crate::sort_properties::SortProperties; use crate::PhysicalExpr; use arrow::array::*; use arrow::compute::kernels::boolean::{and_kleene, not, or_kleene}; use arrow::compute::kernels::cmp::*; -use arrow::compute::kernels::comparison::regexp_is_match_utf8; -use arrow::compute::kernels::comparison::regexp_is_match_utf8_scalar; +use arrow::compute::kernels::comparison::{ + regexp_is_match_utf8, regexp_is_match_utf8_scalar, +}; use arrow::compute::kernels::concat_elements::concat_elements_utf8; use arrow::compute::{cast, ilike, like, nilike, nlike}; use arrow::datatypes::*; - use datafusion_common::cast::as_boolean_array; use datafusion_common::{internal_err, Result, ScalarValue}; use datafusion_expr::interval_arithmetic::{apply_operator, Interval}; +use datafusion_expr::sort_properties::ExprProperties; use datafusion_expr::type_coercion::binary::get_result_type; use datafusion_expr::{ColumnarValue, Operator}; @@ -442,17 +442,45 @@ impl PhysicalExpr for BinaryExpr { self.hash(&mut s); } - /// For each operator, [`BinaryExpr`] has distinct ordering rules. - /// TODO: There may be rules specific to some data types (such as division and multiplication on unsigned integers) - fn get_ordering(&self, children: &[SortProperties]) -> SortProperties { - let (left_child, right_child) = (&children[0], &children[1]); + /// For each operator, [`BinaryExpr`] has distinct rules. + /// TODO: There may be rules specific to some data types and expression ranges. + fn get_properties(&self, children: &[ExprProperties]) -> Result { + let (l_order, l_range) = (children[0].sort_properties, &children[0].range); + let (r_order, r_range) = (children[1].sort_properties, &children[1].range); match self.op() { - Operator::Plus => left_child.add(right_child), - Operator::Minus => left_child.sub(right_child), - Operator::Gt | Operator::GtEq => left_child.gt_or_gteq(right_child), - Operator::Lt | Operator::LtEq => right_child.gt_or_gteq(left_child), - Operator::And | Operator::Or => left_child.and_or(right_child), - _ => SortProperties::Unordered, + Operator::Plus => Ok(ExprProperties { + sort_properties: l_order.add(&r_order), + range: l_range.add(r_range)?, + }), + Operator::Minus => Ok(ExprProperties { + sort_properties: l_order.sub(&r_order), + range: l_range.sub(r_range)?, + }), + Operator::Gt => Ok(ExprProperties { + sort_properties: l_order.gt_or_gteq(&r_order), + range: l_range.gt(r_range)?, + }), + Operator::GtEq => Ok(ExprProperties { + sort_properties: l_order.gt_or_gteq(&r_order), + range: l_range.gt_eq(r_range)?, + }), + Operator::Lt => Ok(ExprProperties { + sort_properties: r_order.gt_or_gteq(&l_order), + range: l_range.lt(r_range)?, + }), + Operator::LtEq => Ok(ExprProperties { + sort_properties: r_order.gt_or_gteq(&l_order), + range: l_range.lt_eq(r_range)?, + }), + Operator::And => Ok(ExprProperties { + sort_properties: r_order.and_or(&l_order), + range: l_range.and(r_range)?, + }), + Operator::Or => Ok(ExprProperties { + sort_properties: r_order.and_or(&l_order), + range: l_range.or(r_range)?, + }), + _ => Ok(ExprProperties::new_unknown()), } } } @@ -623,6 +651,7 @@ pub fn binary( mod tests { use super::*; use crate::expressions::{col, lit, try_cast, Literal}; + use datafusion_common::plan_datafusion_err; use datafusion_expr::type_coercion::binary::get_input_types; diff --git a/datafusion/physical-expr/src/expressions/cast.rs b/datafusion/physical-expr/src/expressions/cast.rs index a3b32461e581..79a44ac30cfc 100644 --- a/datafusion/physical-expr/src/expressions/cast.rs +++ b/datafusion/physical-expr/src/expressions/cast.rs @@ -15,21 +15,21 @@ // specific language governing permissions and limitations // under the License. -use crate::physical_expr::down_cast_any_ref; -use crate::sort_properties::SortProperties; -use crate::PhysicalExpr; use std::any::Any; use std::fmt; use std::hash::{Hash, Hasher}; use std::sync::Arc; -use DataType::*; + +use crate::physical_expr::down_cast_any_ref; +use crate::PhysicalExpr; use arrow::compute::{can_cast_types, CastOptions}; -use arrow::datatypes::{DataType, Schema}; +use arrow::datatypes::{DataType, DataType::*, Schema}; use arrow::record_batch::RecordBatch; use datafusion_common::format::DEFAULT_FORMAT_OPTIONS; use datafusion_common::{not_impl_err, Result}; use datafusion_expr::interval_arithmetic::Interval; +use datafusion_expr::sort_properties::ExprProperties; use datafusion_expr::ColumnarValue; const DEFAULT_CAST_OPTIONS: CastOptions<'static> = CastOptions { @@ -163,9 +163,21 @@ impl PhysicalExpr for CastExpr { self.cast_options.hash(&mut s); } - /// A [`CastExpr`] preserves the ordering of its child. - fn get_ordering(&self, children: &[SortProperties]) -> SortProperties { - children[0] + /// A [`CastExpr`] preserves the ordering of its child if the cast is done + /// under the same datatype family. + fn get_properties(&self, children: &[ExprProperties]) -> Result { + let source_datatype = children[0].range.data_type(); + let target_type = &self.cast_type; + + let unbounded = Interval::make_unbounded(target_type)?; + if source_datatype.is_numeric() && target_type.is_numeric() + || source_datatype.is_temporal() && target_type.is_temporal() + || source_datatype.eq(target_type) + { + Ok(children[0].clone().with_range(unbounded)) + } else { + Ok(ExprProperties::new_unknown().with_range(unbounded)) + } } } diff --git a/datafusion/physical-expr/src/expressions/literal.rs b/datafusion/physical-expr/src/expressions/literal.rs index 35ea80ea574d..371028959ab8 100644 --- a/datafusion/physical-expr/src/expressions/literal.rs +++ b/datafusion/physical-expr/src/expressions/literal.rs @@ -22,7 +22,6 @@ use std::hash::{Hash, Hasher}; use std::sync::Arc; use crate::physical_expr::down_cast_any_ref; -use crate::sort_properties::SortProperties; use crate::PhysicalExpr; use arrow::{ @@ -30,6 +29,8 @@ use arrow::{ record_batch::RecordBatch, }; use datafusion_common::{Result, ScalarValue}; +use datafusion_expr::interval_arithmetic::Interval; +use datafusion_expr::sort_properties::{ExprProperties, SortProperties}; use datafusion_expr::{ColumnarValue, Expr}; /// Represents a literal value @@ -90,8 +91,11 @@ impl PhysicalExpr for Literal { self.hash(&mut s); } - fn get_ordering(&self, _children: &[SortProperties]) -> SortProperties { - SortProperties::Singleton + fn get_properties(&self, _children: &[ExprProperties]) -> Result { + Ok(ExprProperties { + sort_properties: SortProperties::Singleton, + range: Interval::try_new(self.value().clone(), self.value().clone())?, + }) } } @@ -115,6 +119,7 @@ pub fn lit(value: T) -> Arc { #[cfg(test)] mod tests { use super::*; + use arrow::array::Int32Array; use arrow::datatypes::*; use datafusion_common::cast::as_int32_array; diff --git a/datafusion/physical-expr/src/expressions/negative.rs b/datafusion/physical-expr/src/expressions/negative.rs index f6d4620c427f..62f865bd9b32 100644 --- a/datafusion/physical-expr/src/expressions/negative.rs +++ b/datafusion/physical-expr/src/expressions/negative.rs @@ -22,7 +22,6 @@ use std::hash::{Hash, Hasher}; use std::sync::Arc; use crate::physical_expr::down_cast_any_ref; -use crate::sort_properties::SortProperties; use crate::PhysicalExpr; use arrow::{ @@ -32,6 +31,7 @@ use arrow::{ }; use datafusion_common::{plan_err, Result}; use datafusion_expr::interval_arithmetic::Interval; +use datafusion_expr::sort_properties::ExprProperties; use datafusion_expr::{ type_coercion::{is_interval, is_null, is_signed_numeric, is_timestamp}, ColumnarValue, @@ -134,8 +134,11 @@ impl PhysicalExpr for NegativeExpr { } /// The ordering of a [`NegativeExpr`] is simply the reverse of its child. - fn get_ordering(&self, children: &[SortProperties]) -> SortProperties { - -children[0] + fn get_properties(&self, children: &[ExprProperties]) -> Result { + Ok(ExprProperties { + sort_properties: -children[0].sort_properties, + range: children[0].range.clone().arithmetic_negate()?, + }) } } diff --git a/datafusion/physical-expr/src/functions.rs b/datafusion/physical-expr/src/functions.rs index 21cf6d348cd5..9c7d6d09349d 100644 --- a/datafusion/physical-expr/src/functions.rs +++ b/datafusion/physical-expr/src/functions.rs @@ -33,14 +33,12 @@ use std::sync::Arc; -use arrow::array::ArrayRef; -use arrow_array::Array; - -pub use crate::scalar_function::create_physical_expr; +use arrow::array::{Array, ArrayRef}; use datafusion_common::{Result, ScalarValue}; -pub use datafusion_expr::FuncMonotonicity; use datafusion_expr::{ColumnarValue, ScalarFunctionImplementation}; +pub use crate::scalar_function::create_physical_expr; + #[derive(Debug, Clone, Copy)] pub enum Hint { /// Indicates the argument needs to be padded if it is scalar diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index aef5aa7c00e7..1bdf082b2eaf 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -61,13 +61,6 @@ pub use scalar_function::ScalarFunctionExpr; pub use datafusion_physical_expr_common::utils::reverse_order_bys; pub use utils::split_conjunction; -// For backwards compatibility -pub mod sort_properties { - pub use datafusion_physical_expr_common::sort_properties::{ - ExprOrdering, SortProperties, - }; -} - // For backwards compatibility pub mod tree_node { pub use datafusion_physical_expr_common::tree_node::ExprContext; diff --git a/datafusion/physical-expr/src/scalar_function.rs b/datafusion/physical-expr/src/scalar_function.rs index 1244a9b4db38..daa110071096 100644 --- a/datafusion/physical-expr/src/scalar_function.rs +++ b/datafusion/physical-expr/src/scalar_function.rs @@ -32,19 +32,18 @@ use std::any::Any; use std::fmt::{self, Debug, Formatter}; use std::hash::{Hash, Hasher}; -use std::ops::Neg; use std::sync::Arc; +use crate::physical_expr::{down_cast_any_ref, physical_exprs_equal}; +use crate::PhysicalExpr; + use arrow::datatypes::{DataType, Schema}; use arrow::record_batch::RecordBatch; - use datafusion_common::{internal_err, DFSchema, Result}; +use datafusion_expr::interval_arithmetic::Interval; +use datafusion_expr::sort_properties::ExprProperties; use datafusion_expr::type_coercion::functions::data_types_with_scalar_udf; -use datafusion_expr::{expr_vec_fmt, ColumnarValue, Expr, FuncMonotonicity, ScalarUDF}; - -use crate::physical_expr::{down_cast_any_ref, physical_exprs_equal}; -use crate::sort_properties::SortProperties; -use crate::PhysicalExpr; +use datafusion_expr::{expr_vec_fmt, ColumnarValue, Expr, ScalarUDF}; /// Physical expression of a scalar function pub struct ScalarFunctionExpr { @@ -52,11 +51,6 @@ pub struct ScalarFunctionExpr { name: String, args: Vec>, return_type: DataType, - // Keeps monotonicity information of the function. - // FuncMonotonicity vector is one to one mapped to `args`, - // and it specifies the effect of an increase or decrease in - // the corresponding `arg` to the function value. - monotonicity: Option, } impl Debug for ScalarFunctionExpr { @@ -66,7 +60,6 @@ impl Debug for ScalarFunctionExpr { .field("name", &self.name) .field("args", &self.args) .field("return_type", &self.return_type) - .field("monotonicity", &self.monotonicity) .finish() } } @@ -78,14 +71,12 @@ impl ScalarFunctionExpr { fun: Arc, args: Vec>, return_type: DataType, - monotonicity: Option, ) -> Self { Self { fun, name: name.to_owned(), args, return_type, - monotonicity, } } @@ -108,11 +99,6 @@ impl ScalarFunctionExpr { pub fn return_type(&self) -> &DataType { &self.return_type } - - /// Monotonicity information of the function - pub fn monotonicity(&self) -> &Option { - &self.monotonicity - } } impl fmt::Display for ScalarFunctionExpr { @@ -170,10 +156,21 @@ impl PhysicalExpr for ScalarFunctionExpr { self.fun.clone(), children, self.return_type().clone(), - self.monotonicity.clone(), ))) } + fn evaluate_bounds(&self, children: &[&Interval]) -> Result { + self.fun.evaluate_bounds(children) + } + + fn propagate_constraints( + &self, + interval: &Interval, + children: &[&Interval], + ) -> Result>> { + self.fun.propagate_constraints(interval, children) + } + fn dyn_hash(&self, state: &mut dyn Hasher) { let mut s = state; self.name.hash(&mut s); @@ -182,11 +179,18 @@ impl PhysicalExpr for ScalarFunctionExpr { // Add `self.fun` when hash is available } - fn get_ordering(&self, children: &[SortProperties]) -> SortProperties { - self.monotonicity - .as_ref() - .map(|monotonicity| out_ordering(monotonicity, children)) - .unwrap_or(SortProperties::Unordered) + fn get_properties(&self, children: &[ExprProperties]) -> Result { + let sort_properties = self.fun.monotonicity(children)?; + let children_range = children + .iter() + .map(|props| &props.range) + .collect::>(); + let range = self.fun().evaluate_bounds(&children_range)?; + + Ok(ExprProperties { + sort_properties, + range, + }) } } @@ -231,63 +235,5 @@ pub fn create_physical_expr( Arc::new(fun.clone()), input_phy_exprs.to_vec(), return_type, - fun.monotonicity()?, ))) } - -/// Determines a [ScalarFunctionExpr]'s monotonicity for the given arguments -/// and the function's behavior depending on its arguments. -/// -/// [ScalarFunctionExpr]: crate::scalar_function::ScalarFunctionExpr -pub fn out_ordering( - func: &FuncMonotonicity, - arg_orderings: &[SortProperties], -) -> SortProperties { - func.iter().zip(arg_orderings).fold( - SortProperties::Singleton, - |prev_sort, (item, arg)| { - let current_sort = func_order_in_one_dimension(item, arg); - - match (prev_sort, current_sort) { - (_, SortProperties::Unordered) => SortProperties::Unordered, - (SortProperties::Singleton, SortProperties::Ordered(_)) => current_sort, - (SortProperties::Ordered(prev), SortProperties::Ordered(current)) - if prev.descending != current.descending => - { - SortProperties::Unordered - } - _ => prev_sort, - } - }, - ) -} - -/// This function decides the monotonicity property of a [ScalarFunctionExpr] for a single argument (i.e. across a single dimension), given that argument's sort properties. -/// -/// [ScalarFunctionExpr]: crate::scalar_function::ScalarFunctionExpr -fn func_order_in_one_dimension( - func_monotonicity: &Option, - arg: &SortProperties, -) -> SortProperties { - if *arg == SortProperties::Singleton { - SortProperties::Singleton - } else { - match func_monotonicity { - None => SortProperties::Unordered, - Some(false) => { - if let SortProperties::Ordered(_) = arg { - arg.neg() - } else { - SortProperties::Unordered - } - } - Some(true) => { - if let SortProperties::Ordered(_) = arg { - *arg - } else { - SortProperties::Unordered - } - } - } - } -} diff --git a/datafusion/physical-expr/src/utils/mod.rs b/datafusion/physical-expr/src/utils/mod.rs index 76cee3a1a786..6b964546cb74 100644 --- a/datafusion/physical-expr/src/utils/mod.rs +++ b/datafusion/physical-expr/src/utils/mod.rs @@ -255,19 +255,18 @@ pub fn merge_vectors( #[cfg(test)] pub(crate) mod tests { - use arrow_array::{ArrayRef, Float32Array, Float64Array}; use std::any::Any; use std::fmt::{Display, Formatter}; use super::*; use crate::expressions::{binary, cast, col, in_list, lit, Literal}; + use arrow_array::{ArrayRef, Float32Array, Float64Array}; use arrow_schema::{DataType, Field, Schema}; use datafusion_common::{exec_err, DataFusionError, ScalarValue}; + use datafusion_expr::sort_properties::{ExprProperties, SortProperties}; + use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility}; - use datafusion_expr::{ - ColumnarValue, FuncMonotonicity, ScalarUDFImpl, Signature, Volatility, - }; use petgraph::visit::Bfs; #[derive(Debug, Clone)] @@ -309,8 +308,8 @@ pub(crate) mod tests { } } - fn monotonicity(&self) -> Result> { - Ok(Some(vec![Some(true)])) + fn monotonicity(&self, input: &[ExprProperties]) -> Result { + Ok(input[0].sort_properties) } fn invoke(&self, args: &[ColumnarValue]) -> Result { diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 95376e7e69cd..21608db40d56 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -2080,7 +2080,7 @@ mod tests { let col_c = &col("c", &test_schema)?; let mut eq_properties = EquivalenceProperties::new(test_schema); // Columns a and b are equal. - eq_properties.add_equal_conditions(col_a, col_b); + eq_properties.add_equal_conditions(col_a, col_b)?; // Aggregate requirements are // [None], [a ASC], [a ASC, b ASC, c ASC], [a ASC, b ASC] respectively let order_by_exprs = vec![ diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index bf1ab8b73126..6729e3b9e603 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -192,7 +192,7 @@ impl FilterExec { let mut eq_properties = input.equivalence_properties().clone(); let (equal_pairs, _) = collect_columns_from_predicate(predicate); for (lhs, rhs) in equal_pairs { - eq_properties.add_equal_conditions(lhs, rhs) + eq_properties.add_equal_conditions(lhs, rhs)? } // Add the columns that have only one viable value (singleton) after // filtering to constants. @@ -433,13 +433,12 @@ pub type EqualAndNonEqual<'a> = #[cfg(test)] mod tests { - use super::*; + use crate::empty::EmptyExec; use crate::expressions::*; use crate::test; use crate::test::exec::StatisticsExec; - use crate::empty::EmptyExec; use arrow::datatypes::{Field, Schema}; use arrow_schema::{UnionFields, UnionMode}; use datafusion_common::ScalarValue; diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index a290f30586ce..b7bc60a0486c 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -354,7 +354,6 @@ pub fn parse_physical_expr( scalar_fun_def, args, convert_required!(e.return_type)?, - None, )) } ExprType::LikeExpr(like_expr) => Arc::new(LikeExpr::new( diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index dd8e450d3165..79abecf556da 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -624,7 +624,6 @@ fn roundtrip_scalar_udf() -> Result<()> { fun_def, vec![col("a", &schema)?], DataType::Int64, - None, ); let project = @@ -752,7 +751,6 @@ fn roundtrip_scalar_udf_extension_codec() -> Result<()> { Arc::new(udf.clone()), vec![col("text", &schema)?], DataType::Int64, - None, )); let filter = Arc::new(FilterExec::try_new( diff --git a/datafusion/sqllogictest/test_files/order.slt b/datafusion/sqllogictest/test_files/order.slt index 0f869fc0b419..fb07d5ebe895 100644 --- a/datafusion/sqllogictest/test_files/order.slt +++ b/datafusion/sqllogictest/test_files/order.slt @@ -955,3 +955,154 @@ drop table foo; statement ok drop table ambiguity_test; + +# Casting from numeric to string types breaks the ordering +statement ok +CREATE EXTERNAL TABLE ordered_table ( + a0 INT, + a INT, + b INT, + c INT, + d INT +) +STORED AS CSV +WITH ORDER (c ASC) +LOCATION '../core/tests/data/window_2.csv' +OPTIONS ('format.has_header' 'true'); + +query T +SELECT CAST(c as VARCHAR) as c_str +FROM ordered_table +ORDER BY c_str +limit 5; +---- +0 +1 +10 +11 +12 + +query TT +EXPLAIN SELECT CAST(c as VARCHAR) as c_str +FROM ordered_table +ORDER BY c_str +limit 5; +---- +logical_plan +01)Limit: skip=0, fetch=5 +02)--Sort: c_str ASC NULLS LAST, fetch=5 +03)----Projection: CAST(ordered_table.c AS Utf8) AS c_str +04)------TableScan: ordered_table projection=[c] +physical_plan +01)GlobalLimitExec: skip=0, fetch=5 +02)--SortPreservingMergeExec: [c_str@0 ASC NULLS LAST], fetch=5 +03)----SortExec: TopK(fetch=5), expr=[c_str@0 ASC NULLS LAST], preserve_partitioning=[true] +04)------ProjectionExec: expr=[CAST(c@0 AS Utf8) as c_str] +05)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +06)----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c], output_ordering=[c@0 ASC NULLS LAST], has_header=true + + +# Casting from numeric to numeric types preserves the ordering +query I +SELECT CAST(c as BIGINT) as c_bigint +FROM ordered_table +ORDER BY c_bigint +limit 5; +---- +0 +1 +2 +3 +4 + +query TT +EXPLAIN SELECT CAST(c as BIGINT) as c_bigint +FROM ordered_table +ORDER BY c_bigint +limit 5; +---- +logical_plan +01)Limit: skip=0, fetch=5 +02)--Sort: c_bigint ASC NULLS LAST, fetch=5 +03)----Projection: CAST(ordered_table.c AS Int64) AS c_bigint +04)------TableScan: ordered_table projection=[c] +physical_plan +01)GlobalLimitExec: skip=0, fetch=5 +02)--SortPreservingMergeExec: [c_bigint@0 ASC NULLS LAST], fetch=5 +03)----ProjectionExec: expr=[CAST(c@0 AS Int64) as c_bigint] +04)------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +05)--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c], output_ordering=[c@0 ASC NULLS LAST], has_header=true + +statement ok +drop table ordered_table; + + +# ABS(x) breaks the ordering if x's range contains both negative and positive values. +# Since x is defined as INT, its range is assumed to be from NEG_INF to INF. +statement ok +CREATE EXTERNAL TABLE ordered_table ( + a0 INT, + a INT, + b INT, + c INT, + d INT +) +STORED AS CSV +WITH ORDER (c ASC) +LOCATION '../core/tests/data/window_2.csv' +OPTIONS ('format.has_header' 'true'); + +query TT +EXPLAIN SELECT ABS(c) as abs_c +FROM ordered_table +ORDER BY abs_c +limit 5; +---- +logical_plan +01)Limit: skip=0, fetch=5 +02)--Sort: abs_c ASC NULLS LAST, fetch=5 +03)----Projection: abs(ordered_table.c) AS abs_c +04)------TableScan: ordered_table projection=[c] +physical_plan +01)GlobalLimitExec: skip=0, fetch=5 +02)--SortPreservingMergeExec: [abs_c@0 ASC NULLS LAST], fetch=5 +03)----SortExec: TopK(fetch=5), expr=[abs_c@0 ASC NULLS LAST], preserve_partitioning=[true] +04)------ProjectionExec: expr=[abs(c@0) as abs_c] +05)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +06)----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c], output_ordering=[c@0 ASC NULLS LAST], has_header=true + +statement ok +drop table ordered_table; + +# ABS(x) preserves the ordering if x's range falls into positive values. +# Since x is defined as INT UNSIGNED, its range is assumed to be from 0 to INF. +statement ok +CREATE EXTERNAL TABLE ordered_table ( + a0 INT, + a INT, + b INT, + c INT UNSIGNED, + d INT +) +STORED AS CSV +WITH ORDER (c ASC) +LOCATION '../core/tests/data/window_2.csv' +OPTIONS ('format.has_header' 'true'); + +query TT +EXPLAIN SELECT ABS(c) as abs_c +FROM ordered_table +ORDER BY abs_c +limit 5; +---- +logical_plan +01)Limit: skip=0, fetch=5 +02)--Sort: abs_c ASC NULLS LAST, fetch=5 +03)----Projection: abs(ordered_table.c) AS abs_c +04)------TableScan: ordered_table projection=[c] +physical_plan +01)GlobalLimitExec: skip=0, fetch=5 +02)--SortPreservingMergeExec: [abs_c@0 ASC NULLS LAST], fetch=5 +03)----ProjectionExec: expr=[abs(c@0) as abs_c] +04)------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +05)--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c], output_ordering=[c@0 ASC NULLS LAST], has_header=true \ No newline at end of file