Skip to content

Commit

Permalink
Handle one-element array return value in ScalarFunctionExpr (apache#1…
Browse files Browse the repository at this point in the history
…2965) (#276)

This was done in apache#12922 only for math functions.
We now generalize this fallback to all scalar UDFs.
  • Loading branch information
joroKr21 authored Oct 19, 2024
1 parent 92bdbeb commit 661a6b3
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 20 deletions.
11 changes: 0 additions & 11 deletions datafusion/expr/src/columnar_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,17 +217,6 @@ impl ColumnarValue {
}
}
}

/// Converts an [`ArrayRef`] to a [`ColumnarValue`] based on the supplied arguments.
/// This is useful for scalar UDF implementations to fulfil their contract:
/// if all arguments are scalar values, the result should also be a scalar value.
pub fn from_args_and_result(args: &[Self], result: ArrayRef) -> Result<Self> {
if result.len() == 1 && args.iter().all(|arg| matches!(arg, Self::Scalar(_))) {
Ok(Self::Scalar(ScalarValue::try_from_array(&result, 0)?))
} else {
Ok(Self::Array(result))
}
}
}

#[cfg(test)]
Expand Down
12 changes: 6 additions & 6 deletions datafusion/functions/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,8 @@ macro_rules! make_math_unary_udf {
$OUTPUT_ORDERING(input)
}

fn invoke(&self, col_args: &[ColumnarValue]) -> Result<ColumnarValue> {
let args = ColumnarValue::values_to_arrays(col_args)?;
fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
let args = ColumnarValue::values_to_arrays(args)?;
let arr: ArrayRef = match args[0].data_type() {
DataType::Float64 => {
Arc::new(make_function_scalar_inputs_return_type!(
Expand Down Expand Up @@ -251,7 +251,7 @@ macro_rules! make_math_unary_udf {
}
};

ColumnarValue::from_args_and_result(col_args, arr)
Ok(ColumnarValue::Array(arr))
}
}
}
Expand Down Expand Up @@ -332,8 +332,8 @@ macro_rules! make_math_binary_udf {
$OUTPUT_ORDERING(input)
}

fn invoke(&self, col_args: &[ColumnarValue]) -> Result<ColumnarValue> {
let args = ColumnarValue::values_to_arrays(col_args)?;
fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
let args = ColumnarValue::values_to_arrays(args)?;
let arr: ArrayRef = match args[0].data_type() {
DataType::Float64 => Arc::new(make_function_inputs2!(
&args[0],
Expand All @@ -360,7 +360,7 @@ macro_rules! make_math_binary_udf {
}
};

ColumnarValue::from_args_and_result(col_args, arr)
Ok(ColumnarValue::Array(arr))
}
}
}
Expand Down
18 changes: 15 additions & 3 deletions datafusion/physical-expr/src/scalar_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ use crate::PhysicalExpr;

use arrow::datatypes::{DataType, Schema};
use arrow::record_batch::RecordBatch;
use datafusion_common::{internal_err, DFSchema, Result};
use arrow_array::Array;
use datafusion_common::{internal_err, DFSchema, Result, ScalarValue};
use datafusion_expr::interval_arithmetic::Interval;
use datafusion_expr::sort_properties::ExprProperties;
use datafusion_expr::type_coercion::functions::data_types_with_scalar_udf;
Expand Down Expand Up @@ -136,8 +137,19 @@ impl PhysicalExpr for ScalarFunctionExpr {

if let ColumnarValue::Array(array) = &output {
if array.len() != batch.num_rows() {
return internal_err!("UDF returned a different number of rows than expected. Expected: {}, Got: {}",
batch.num_rows(), array.len());
// If the arguments are a non-empty slice of scalar values, we can assume that
// returning a one-element array is equivalent to returning a scalar.
let preserve_scalar = array.len() == 1
&& !inputs.is_empty()
&& inputs
.iter()
.all(|arg| matches!(arg, ColumnarValue::Scalar(_)));
return if preserve_scalar {
ScalarValue::try_from_array(array, 0).map(ColumnarValue::Scalar)
} else {
internal_err!("UDF returned a different number of rows than expected. Expected: {}, Got: {}",
batch.num_rows(), array.len())
};
}
}
Ok(output)
Expand Down

0 comments on commit 661a6b3

Please sign in to comment.