Skip to content

Commit

Permalink
PhysicalExpr Orderings with Range Information (apache#10504)
Browse files Browse the repository at this point in the history
* Self review

* Fix null interval accumulation

* Refactor monotonicity

* Ignore failing tests

* Initial impl

* Ready for review

* Update properties.rs

* Update configs.md

Update configs.md

* cargo doc

* Add abs test

* Update properties.rs

* Update udf.rs

* Review Part 1

* Review Part 2

* Minor

---------

Co-authored-by: Mehmet Ozan Kabak <[email protected]>
  • Loading branch information
berkaysynnada and ozankabak authored May 17, 2024
1 parent dbd77b4 commit d2fb05e
Show file tree
Hide file tree
Showing 42 changed files with 1,196 additions and 503 deletions.
28 changes: 12 additions & 16 deletions datafusion-examples/examples/advanced_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,21 @@
// specific language governing permissions and limitations
// under the License.

use datafusion::{
arrow::{
array::{ArrayRef, Float32Array, Float64Array},
datatypes::DataType,
record_batch::RecordBatch,
},
logical_expr::Volatility,
};
use std::any::Any;
use std::sync::Arc;

use arrow::array::{new_null_array, Array, AsArray};
use arrow::array::{
new_null_array, Array, ArrayRef, AsArray, Float32Array, Float64Array,
};
use arrow::compute;
use arrow::datatypes::Float64Type;
use arrow::datatypes::{DataType, Float64Type};
use arrow::record_batch::RecordBatch;
use datafusion::error::Result;
use datafusion::logical_expr::Volatility;
use datafusion::prelude::*;
use datafusion_common::{internal_err, ScalarValue};
use datafusion_expr::{
ColumnarValue, FuncMonotonicity, ScalarUDF, ScalarUDFImpl, Signature,
};
use std::sync::Arc;
use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
use datafusion_expr::{ColumnarValue, ScalarUDF, ScalarUDFImpl, Signature};

/// This example shows how to use the full ScalarUDFImpl API to implement a user
/// defined function. As in the `simple_udf.rs` example, this struct implements
Expand Down Expand Up @@ -186,8 +181,9 @@ impl ScalarUDFImpl for PowUdf {
&self.aliases
}

fn monotonicity(&self) -> Result<Option<FuncMonotonicity>> {
Ok(Some(vec![Some(true)]))
fn monotonicity(&self, input: &[ExprProperties]) -> Result<SortProperties> {
// The POW function preserves the order of its argument.
Ok(input[0].sort_properties)
}
}

Expand Down
13 changes: 7 additions & 6 deletions datafusion-examples/examples/function_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,18 @@
// specific language governing permissions and limitations
// under the License.

use std::result::Result as RResult;
use std::sync::Arc;

use datafusion::error::Result;
use datafusion::execution::context::{
FunctionFactory, RegisterFunction, SessionContext, SessionState,
};
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{exec_err, internal_err, DataFusionError};
use datafusion_expr::simplify::ExprSimplifyResult;
use datafusion_expr::simplify::SimplifyInfo;
use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyInfo};
use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
use datafusion_expr::{CreateFunction, Expr, ScalarUDF, ScalarUDFImpl, Signature};
use std::result::Result as RResult;
use std::sync::Arc;

/// This example shows how to utilize [FunctionFactory] to implement simple
/// SQL-macro like functions using a `CREATE FUNCTION` statement. The same
Expand Down Expand Up @@ -156,8 +157,8 @@ impl ScalarUDFImpl for ScalarFunctionWrapper {
&[]
}

fn monotonicity(&self) -> Result<Option<datafusion_expr::FuncMonotonicity>> {
Ok(None)
fn monotonicity(&self, _input: &[ExprProperties]) -> Result<SortProperties> {
Ok(SortProperties::Unordered)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3572,7 +3572,11 @@ pub(crate) mod tests {
expr: col("c", &schema).unwrap(),
options: SortOptions::default(),
}];
let alias = vec![("a".to_string(), "a".to_string())];
let alias = vec![
("a".to_string(), "a".to_string()),
("b".to_string(), "b".to_string()),
("c".to_string(), "c".to_string()),
];
let plan = sort_preserving_merge_exec(
sort_key.clone(),
sort_exec(
Expand All @@ -3585,7 +3589,7 @@ pub(crate) mod tests {
let expected = &[
"SortExec: expr=[c@2 ASC], preserve_partitioning=[true]",
// Since this projection is trivial, increasing parallelism is not beneficial
"ProjectionExec: expr=[a@0 as a]",
"ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_optimized!(expected, plan.clone(), true);
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/physical_optimizer/join_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ use crate::physical_plan::{ExecutionPlan, ExecutionPlanProperties};
use arrow_schema::Schema;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::{internal_err, JoinSide, JoinType};
use datafusion_expr::sort_properties::SortProperties;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::sort_properties::SortProperties;
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};

/// The [`JoinSelection`] rule tries to modify a given plan so that it can
Expand Down Expand Up @@ -561,7 +561,7 @@ fn hash_join_convert_symmetric_subrule(
let name = schema.field(*index).name();
let col = Arc::new(Column::new(name, *index)) as _;
// Check if the column is ordered.
equivalence.get_expr_ordering(col).data
equivalence.get_expr_properties(col).sort_properties
!= SortProperties::Unordered
},
)
Expand Down
4 changes: 0 additions & 4 deletions datafusion/core/src/physical_optimizer/projection_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1376,7 +1376,6 @@ mod tests {
)),
],
DataType::Int32,
None,
)),
Arc::new(CaseExpr::try_new(
Some(Arc::new(Column::new("d", 2))),
Expand Down Expand Up @@ -1442,7 +1441,6 @@ mod tests {
)),
],
DataType::Int32,
None,
)),
Arc::new(CaseExpr::try_new(
Some(Arc::new(Column::new("d", 3))),
Expand Down Expand Up @@ -1511,7 +1509,6 @@ mod tests {
)),
],
DataType::Int32,
None,
)),
Arc::new(CaseExpr::try_new(
Some(Arc::new(Column::new("d", 2))),
Expand Down Expand Up @@ -1577,7 +1574,6 @@ mod tests {
)),
],
DataType::Int32,
None,
)),
Arc::new(CaseExpr::try_new(
Some(Arc::new(Column::new("d_new", 3))),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ mod sp_repartition_fuzz_tests {
config::SessionConfig, memory_pool::MemoryConsumer, SendableRecordBatchStream,
};
use datafusion_physical_expr::{
equivalence::{EquivalenceClass, EquivalenceProperties},
expressions::{col, Column},
EquivalenceProperties, PhysicalExpr, PhysicalSortExpr,
PhysicalExpr, PhysicalSortExpr,
};
use test_utils::add_empty_batches;

use datafusion_physical_expr::equivalence::EquivalenceClass;
use itertools::izip;
use rand::{rngs::StdRng, seq::SliceRandom, Rng, SeedableRng};

Expand Down Expand Up @@ -78,7 +78,7 @@ mod sp_repartition_fuzz_tests {

let mut eq_properties = EquivalenceProperties::new(test_schema.clone());
// Define a and f are aliases
eq_properties.add_equal_conditions(col_a, col_f);
eq_properties.add_equal_conditions(col_a, col_f)?;
// Column e has constant value.
eq_properties = eq_properties.add_constants([col_e.clone()]);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,27 @@
// specific language governing permissions and limitations
// under the License.

use std::any::Any;
use std::sync::Arc;

use arrow::compute::kernels::numeric::add;
use arrow_array::{ArrayRef, Float32Array, Float64Array, Int32Array, RecordBatch};
use arrow_schema::{DataType, Field, Schema};
use datafusion::execution::context::{FunctionFactory, RegisterFunction, SessionState};
use datafusion::prelude::*;
use datafusion::{execution::registry::FunctionRegistry, test_util};
use datafusion_common::cast::{as_float64_array, as_int32_array};
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{
assert_batches_eq, assert_batches_sorted_eq, cast::as_float64_array,
cast::as_int32_array, not_impl_err, plan_err, ExprSchema, Result, ScalarValue,
assert_batches_eq, assert_batches_sorted_eq, assert_contains, exec_err, internal_err,
not_impl_err, plan_err, DataFusionError, ExprSchema, Result, ScalarValue,
};
use datafusion_common::{assert_contains, exec_err, internal_err, DataFusionError};
use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyInfo};
use datafusion_expr::{
Accumulator, ColumnarValue, CreateFunction, ExprSchemable, LogicalPlanBuilder,
ScalarUDF, ScalarUDFImpl, Signature, Volatility,
};
use std::any::Any;
use std::sync::Arc;

/// test that casting happens on udfs.
/// c11 is f32, but `custom_sqrt` requires f64. Casting happens but the logical plan and
Expand Down Expand Up @@ -776,10 +777,6 @@ impl ScalarUDFImpl for ScalarFunctionWrapper {
fn aliases(&self) -> &[String] {
&[]
}

fn monotonicity(&self) -> Result<Option<datafusion_expr::FuncMonotonicity>> {
Ok(None)
}
}

impl ScalarFunctionWrapper {
Expand Down
Loading

0 comments on commit d2fb05e

Please sign in to comment.