Skip to content

Conversation

@andygrove
Copy link
Member

@andygrove andygrove commented Jan 3, 2026

Which issue does this PR close?

Rationale for this change

Moving code from Comet to datafusion-spark so that other projects can benefit.

This PR adds Spark-compatible decimal division functions to datafusion-spark. These functions implement Spark's specific decimal division semantics which differ from standard SQL:

  • Legacy mode behavior: Division by zero returns 0 (not an error)
  • Spark rounding: Round half away from zero
  • BigInt fallback: When the required precision exceeds Decimal128 limits (scale > 38), the implementation falls back to BigInt arithmetic

What changes are included in this PR?

  • New file datafusion/spark/src/function/math/decimal_div.rs containing:
    • spark_decimal_div() - Regular decimal division with Spark semantics
    • spark_decimal_integral_div() - Integer division (truncates toward zero)
    • SparkDecimalDiv and SparkDecimalIntegralDiv UDF structs for use by query planners
  • Added num crate dependency to datafusion-spark
  • 6 unit tests covering basic division, rounding, division by zero, integral division, negative values, and NULL handling

Note: These are internal functions intended for use by query planners when Spark-compatible decimal division semantics are needed. The precision and scale of the result type are determined at query planning time based on input types, similar to how Comet uses them.

Are these changes tested?

Yes, unit tests added.

Are there any user-facing changes?

New public APIs in datafusion-spark:

  • SparkDecimalDiv::new(result_precision, result_scale)
  • SparkDecimalIntegralDiv::new(result_precision, result_scale)
  • spark_decimal_div() and spark_decimal_integral_div() functions

@github-actions github-actions bot added the spark label Jan 3, 2026
@andygrove andygrove marked this pull request as ready for review January 4, 2026 00:27
result_scale: i8,
}

impl PartialEq for SparkDecimalDiv {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we remove these manual impls and use derive?

Comment on lines +55 to +60
fn spark_decimal_div_internal(
args: &[ColumnarValue],
result_precision: u8,
result_scale: i8,
is_integral_div: bool,
) -> Result<ColumnarValue> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
fn spark_decimal_div_internal(
args: &[ColumnarValue],
result_precision: u8,
result_scale: i8,
is_integral_div: bool,
) -> Result<ColumnarValue> {
fn spark_decimal_div_internal<const IS_INTEGRAL_DIV: bool>(
left: ArrayRef,
right: ArrayRef,
result_precision: u8,
result_scale: i8,
) -> Result<ColumnarValue> {

So callers can call like so:

let arrays = ColumnarValue::values_to_arrays(args.args)?;
let (left, right) = take_function_args("fn_name", arrays)?;
spark_decimal_div_internal::<TRUE>(left, right, precision, scale)

So can remove code inside related to checking args length, conversion to arrays, etc.

Comment on lines +18 to +27
//! Spark-compatible decimal division functions.
//!
//! This module implements Spark's decimal division semantics, which require
//! special handling for precision and scale that differs from standard SQL.
//!
//! # Scale Expansion
//!
//! For Decimal(p1, s1) / Decimal(p2, s2) = Decimal(p3, s3):
//! The dividend needs to be scaled to s2 + s3 + 1 to get correct precision.
//! This can exceed Decimal128's maximum scale (38), requiring BigInt fallback.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we put a note here that these UDFs are only meant to be used at the physical level instead of the logical? Since it doesn't handle things like type coercion to supported types (decimal128).

Self {
signature: Signature::new(TypeSignature::Any(2), Volatility::Immutable),
result_precision,
result_scale,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need validation here ?
E.g. for result_precision > DECIMAL128_MAX_PRECISION and result_scale.abs() as u8 > result_precision ?

Self {
signature: Signature::new(TypeSignature::Any(2), Volatility::Immutable),
result_precision,
result_scale,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need validation here ?
E.g. for result_precision > DECIMAL128_MAX_PRECISION and result_scale.abs() as u8 > result_precision ?


// Calculate the scale expansion needed
// To get Decimal(p3, s3) from p1/p2, we need to widen s1 to s2 + s3 + 1
let l_exp = ((s2 + result_scale + 1) as u32).saturating_sub(s1 as u32);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With negative scales this sum may still be a negative i8. Casting it to u32 will lead to a big positive number.

/// Create a new SparkDecimalDiv with the specified result precision and scale.
pub fn new(result_precision: u8, result_scale: i8) -> Self {
Self {
signature: Signature::new(TypeSignature::Any(2), Volatility::Immutable),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The signature accepts two parameters of any type.
As far as I can see the arguments' types are "checked" at https://github.com/apache/datafusion/pull/19628/changes#diff-22862a268cd15854f04bd5248f78af375bf9e2d155a7807e6626fb7eeabe8054R87 where it tries to cast them to Decimal128.
Wouldn't it be better to use:

Suggested change
signature: Signature::new(TypeSignature::Any(2), Volatility::Immutable),
signature: Signature::new(TypeSignature::Exact(vec![DataType::Decimal128, DataType::Decimal128]), Volatility::Immutable),

and fail earlier ?

let r_mul = 10_i128.pow(r_exp);

arrow::compute::kernels::arity::try_binary(left, right, |l, r| {
let l = l * l_mul;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could lead to multiplication overflow

} else {
div + &five
} / &ten;
Ok(res.to_i128().unwrap_or(i128::MAX))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If res is a big negative number that is smaller than i128::MIN then unwrap_or() should return i128::MIN, not MAX

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants