From 661a6b39273c5a3b174b6dba45fe58b15c3e1142 Mon Sep 17 00:00:00 2001 From: Georgi Krastev Date: Sat, 19 Oct 2024 12:49:43 +0300 Subject: [PATCH] Handle one-element array return value in ScalarFunctionExpr (#12965) (#276) This was done in #12922 only for math functions. We now generalize this fallback to all scalar UDFs. --- datafusion/expr/src/columnar_value.rs | 11 ----------- datafusion/functions/src/macros.rs | 12 ++++++------ .../physical-expr/src/scalar_function.rs | 18 +++++++++++++++--- 3 files changed, 21 insertions(+), 20 deletions(-) diff --git a/datafusion/expr/src/columnar_value.rs b/datafusion/expr/src/columnar_value.rs index 7b614ba9c491..4d90b668942e 100644 --- a/datafusion/expr/src/columnar_value.rs +++ b/datafusion/expr/src/columnar_value.rs @@ -217,17 +217,6 @@ impl ColumnarValue { } } } - - /// Converts an [`ArrayRef`] to a [`ColumnarValue`] based on the supplied arguments. - /// This is useful for scalar UDF implementations to fulfil their contract: - /// if all arguments are scalar values, the result should also be a scalar value. - pub fn from_args_and_result(args: &[Self], result: ArrayRef) -> Result { - if result.len() == 1 && args.iter().all(|arg| matches!(arg, Self::Scalar(_))) { - Ok(Self::Scalar(ScalarValue::try_from_array(&result, 0)?)) - } else { - Ok(Self::Array(result)) - } - } } #[cfg(test)] diff --git a/datafusion/functions/src/macros.rs b/datafusion/functions/src/macros.rs index 1aadaae6c1d6..a75a4fbfe8ef 100644 --- a/datafusion/functions/src/macros.rs +++ b/datafusion/functions/src/macros.rs @@ -222,8 +222,8 @@ macro_rules! make_math_unary_udf { $OUTPUT_ORDERING(input) } - fn invoke(&self, col_args: &[ColumnarValue]) -> Result { - let args = ColumnarValue::values_to_arrays(col_args)?; + fn invoke(&self, args: &[ColumnarValue]) -> Result { + let args = ColumnarValue::values_to_arrays(args)?; let arr: ArrayRef = match args[0].data_type() { DataType::Float64 => { Arc::new(make_function_scalar_inputs_return_type!( @@ -251,7 +251,7 @@ macro_rules! make_math_unary_udf { } }; - ColumnarValue::from_args_and_result(col_args, arr) + Ok(ColumnarValue::Array(arr)) } } } @@ -332,8 +332,8 @@ macro_rules! make_math_binary_udf { $OUTPUT_ORDERING(input) } - fn invoke(&self, col_args: &[ColumnarValue]) -> Result { - let args = ColumnarValue::values_to_arrays(col_args)?; + fn invoke(&self, args: &[ColumnarValue]) -> Result { + let args = ColumnarValue::values_to_arrays(args)?; let arr: ArrayRef = match args[0].data_type() { DataType::Float64 => Arc::new(make_function_inputs2!( &args[0], @@ -360,7 +360,7 @@ macro_rules! make_math_binary_udf { } }; - ColumnarValue::from_args_and_result(col_args, arr) + Ok(ColumnarValue::Array(arr)) } } } diff --git a/datafusion/physical-expr/src/scalar_function.rs b/datafusion/physical-expr/src/scalar_function.rs index 10e29b41031d..de3e956e8509 100644 --- a/datafusion/physical-expr/src/scalar_function.rs +++ b/datafusion/physical-expr/src/scalar_function.rs @@ -39,7 +39,8 @@ use crate::PhysicalExpr; use arrow::datatypes::{DataType, Schema}; use arrow::record_batch::RecordBatch; -use datafusion_common::{internal_err, DFSchema, Result}; +use arrow_array::Array; +use datafusion_common::{internal_err, DFSchema, Result, ScalarValue}; use datafusion_expr::interval_arithmetic::Interval; use datafusion_expr::sort_properties::ExprProperties; use datafusion_expr::type_coercion::functions::data_types_with_scalar_udf; @@ -136,8 +137,19 @@ impl PhysicalExpr for ScalarFunctionExpr { if let ColumnarValue::Array(array) = &output { if array.len() != batch.num_rows() { - return internal_err!("UDF returned a different number of rows than expected. Expected: {}, Got: {}", - batch.num_rows(), array.len()); + // If the arguments are a non-empty slice of scalar values, we can assume that + // returning a one-element array is equivalent to returning a scalar. + let preserve_scalar = array.len() == 1 + && !inputs.is_empty() + && inputs + .iter() + .all(|arg| matches!(arg, ColumnarValue::Scalar(_))); + return if preserve_scalar { + ScalarValue::try_from_array(array, 0).map(ColumnarValue::Scalar) + } else { + internal_err!("UDF returned a different number of rows than expected. Expected: {}, Got: {}", + batch.num_rows(), array.len()) + }; } } Ok(output)