-
Notifications
You must be signed in to change notification settings - Fork 1.9k
feat: Add Spark-compatible decimal division #19628
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
| result_scale: i8, | ||
| } | ||
|
|
||
| impl PartialEq for SparkDecimalDiv { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we remove these manual impls and use derive?
| fn spark_decimal_div_internal( | ||
| args: &[ColumnarValue], | ||
| result_precision: u8, | ||
| result_scale: i8, | ||
| is_integral_div: bool, | ||
| ) -> Result<ColumnarValue> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| fn spark_decimal_div_internal( | |
| args: &[ColumnarValue], | |
| result_precision: u8, | |
| result_scale: i8, | |
| is_integral_div: bool, | |
| ) -> Result<ColumnarValue> { | |
| fn spark_decimal_div_internal<const IS_INTEGRAL_DIV: bool>( | |
| left: ArrayRef, | |
| right: ArrayRef, | |
| result_precision: u8, | |
| result_scale: i8, | |
| ) -> Result<ColumnarValue> { |
So callers can call like so:
let arrays = ColumnarValue::values_to_arrays(args.args)?;
let (left, right) = take_function_args("fn_name", arrays)?;
spark_decimal_div_internal::<TRUE>(left, right, precision, scale)So can remove code inside related to checking args length, conversion to arrays, etc.
| //! Spark-compatible decimal division functions. | ||
| //! | ||
| //! This module implements Spark's decimal division semantics, which require | ||
| //! special handling for precision and scale that differs from standard SQL. | ||
| //! | ||
| //! # Scale Expansion | ||
| //! | ||
| //! For Decimal(p1, s1) / Decimal(p2, s2) = Decimal(p3, s3): | ||
| //! The dividend needs to be scaled to s2 + s3 + 1 to get correct precision. | ||
| //! This can exceed Decimal128's maximum scale (38), requiring BigInt fallback. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we put a note here that these UDFs are only meant to be used at the physical level instead of the logical? Since it doesn't handle things like type coercion to supported types (decimal128).
| Self { | ||
| signature: Signature::new(TypeSignature::Any(2), Volatility::Immutable), | ||
| result_precision, | ||
| result_scale, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need validation here ?
E.g. for result_precision > DECIMAL128_MAX_PRECISION and result_scale.abs() as u8 > result_precision ?
| Self { | ||
| signature: Signature::new(TypeSignature::Any(2), Volatility::Immutable), | ||
| result_precision, | ||
| result_scale, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need validation here ?
E.g. for result_precision > DECIMAL128_MAX_PRECISION and result_scale.abs() as u8 > result_precision ?
|
|
||
| // Calculate the scale expansion needed | ||
| // To get Decimal(p3, s3) from p1/p2, we need to widen s1 to s2 + s3 + 1 | ||
| let l_exp = ((s2 + result_scale + 1) as u32).saturating_sub(s1 as u32); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With negative scales this sum may still be a negative i8. Casting it to u32 will lead to a big positive number.
| /// Create a new SparkDecimalDiv with the specified result precision and scale. | ||
| pub fn new(result_precision: u8, result_scale: i8) -> Self { | ||
| Self { | ||
| signature: Signature::new(TypeSignature::Any(2), Volatility::Immutable), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The signature accepts two parameters of any type.
As far as I can see the arguments' types are "checked" at https://github.com/apache/datafusion/pull/19628/changes#diff-22862a268cd15854f04bd5248f78af375bf9e2d155a7807e6626fb7eeabe8054R87 where it tries to cast them to Decimal128.
Wouldn't it be better to use:
| signature: Signature::new(TypeSignature::Any(2), Volatility::Immutable), | |
| signature: Signature::new(TypeSignature::Exact(vec![DataType::Decimal128, DataType::Decimal128]), Volatility::Immutable), |
and fail earlier ?
| let r_mul = 10_i128.pow(r_exp); | ||
|
|
||
| arrow::compute::kernels::arity::try_binary(left, right, |l, r| { | ||
| let l = l * l_mul; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could lead to multiplication overflow
| } else { | ||
| div + &five | ||
| } / &ten; | ||
| Ok(res.to_i128().unwrap_or(i128::MAX)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If res is a big negative number that is smaller than i128::MIN then unwrap_or() should return i128::MIN, not MAX
Which issue does this PR close?
datafusion-sparkSpark Compatible Functions #15914Rationale for this change
Moving code from Comet to datafusion-spark so that other projects can benefit.
This PR adds Spark-compatible decimal division functions to datafusion-spark. These functions implement Spark's specific decimal division semantics which differ from standard SQL:
What changes are included in this PR?
Note: These are internal functions intended for use by query planners when Spark-compatible decimal division semantics are needed. The precision and scale of the result type are determined at query planning time based on input types, similar to how Comet uses them.
Are these changes tested?
Yes, unit tests added.
Are there any user-facing changes?
New public APIs in datafusion-spark: