From 8f7581536e8a361df2c2958a7df53c44cea2ea94 Mon Sep 17 00:00:00 2001 From: Kumar Ujjawal Date: Wed, 11 Mar 2026 09:59:04 +0530 Subject: [PATCH 1/3] fix(stats): widen sum_value integer arithmetic to SUM-compatible types --- datafusion/common/src/stats.rs | 98 +++++++++++++++++-- datafusion/datasource/src/statistics.rs | 81 ++++++++++++++- datafusion/physical-expr/src/projection.rs | 42 +++++++- .../physical-plan/src/joins/cross_join.rs | 85 ++++++++++++---- datafusion/physical-plan/src/union.rs | 2 +- 5 files changed, 273 insertions(+), 35 deletions(-) diff --git a/datafusion/common/src/stats.rs b/datafusion/common/src/stats.rs index 36dab2a1ea283..97a396518b474 100644 --- a/datafusion/common/src/stats.rs +++ b/datafusion/common/src/stats.rs @@ -22,7 +22,6 @@ use std::fmt::{self, Debug, Display}; use crate::{Result, ScalarValue}; use crate::error::_plan_err; -use crate::utils::aggregate::precision_add; use arrow::datatypes::{DataType, Schema}; /// Represents a value with a degree of certainty. `Precision` is used to @@ -203,6 +202,14 @@ impl Precision { } impl Precision { + fn sum_data_type(data_type: &DataType) -> DataType { + match data_type { + DataType::Int8 | DataType::Int16 | DataType::Int32 => DataType::Int64, + DataType::UInt8 | DataType::UInt16 | DataType::UInt32 => DataType::UInt64, + _ => data_type.clone(), + } + } + /// Calculates the sum of two (possibly inexact) [`ScalarValue`] values, /// conservatively propagating exactness information. If one of the input /// values is [`Precision::Absent`], the result is `Absent` too. @@ -228,6 +235,46 @@ impl Precision { } } + /// Casts integer values to the wider SQL `SUM` return type. + /// + /// This narrows overflow risk when `sum_value` statistics are merged: + /// `Int8/Int16/Int32 -> Int64` and `UInt8/UInt16/UInt32 -> UInt64`. + pub fn cast_to_sum_type(&self) -> Precision { + match self { + Precision::Exact(value) => { + let source_type = value.data_type(); + let target_type = Self::sum_data_type(&source_type); + if source_type == target_type { + Precision::Exact(value.clone()) + } else { + value + .cast_to(&target_type) + .map(Precision::Exact) + .unwrap_or(Precision::Absent) + } + } + Precision::Inexact(value) => { + let source_type = value.data_type(); + let target_type = Self::sum_data_type(&source_type); + if source_type == target_type { + Precision::Inexact(value.clone()) + } else { + value + .cast_to(&target_type) + .map(Precision::Inexact) + .unwrap_or(Precision::Absent) + } + } + Precision::Absent => Precision::Absent, + } + } + + /// SUM-style addition with integer widening to match SQL `SUM` return + /// types for smaller integral inputs. + pub fn add_for_sum(&self, other: &Precision) -> Precision { + self.cast_to_sum_type().add(&other.cast_to_sum_type()) + } + /// Calculates the difference of two (possibly inexact) [`ScalarValue`] values, /// conservatively propagating exactness information. If one of the input /// values is [`Precision::Absent`], the result is `Absent` too. @@ -671,8 +718,8 @@ impl Statistics { .collect(); // Accumulate all statistics in a single pass. - // Uses precision_add for sum (avoids the expensive - // ScalarValue::add round-trip through Arrow arrays), and + // Uses add_for_sum to keep integer sum_value statistics in + // SUM-compatible widened types, and // Precision::min/max which use cheap PartialOrd comparison. for stat in items.iter().skip(1) { for (col_idx, col_stats) in column_statistics.iter_mut().enumerate() { @@ -681,7 +728,7 @@ impl Statistics { col_stats.null_count = col_stats.null_count.add(&item_cs.null_count); col_stats.byte_size = col_stats.byte_size.add(&item_cs.byte_size); col_stats.sum_value = - precision_add(&col_stats.sum_value, &item_cs.sum_value); + col_stats.sum_value.add_for_sum(&item_cs.sum_value); col_stats.min_value = col_stats.min_value.min(&item_cs.min_value); col_stats.max_value = col_stats.max_value.max(&item_cs.max_value); } @@ -994,6 +1041,45 @@ mod tests { assert_eq!(precision.add(&Precision::Absent), Precision::Absent); } + #[test] + fn test_add_for_sum_scalar_integer_widening() { + let precision = Precision::Exact(ScalarValue::Int32(Some(42))); + + assert_eq!( + precision.add_for_sum(&Precision::Exact(ScalarValue::Int32(Some(23)))), + Precision::Exact(ScalarValue::Int64(Some(65))), + ); + assert_eq!( + precision.add_for_sum(&Precision::Inexact(ScalarValue::Int32(Some(23)))), + Precision::Inexact(ScalarValue::Int64(Some(65))), + ); + } + + #[test] + fn test_add_for_sum_prevents_int32_overflow() { + let lhs = Precision::Exact(ScalarValue::Int32(Some(i32::MAX))); + let rhs = Precision::Exact(ScalarValue::Int32(Some(1))); + + assert_eq!( + lhs.add_for_sum(&rhs), + Precision::Exact(ScalarValue::Int64(Some(i64::from(i32::MAX) + 1))), + ); + } + + #[test] + fn test_add_for_sum_scalar_unsigned_integer_widening() { + let precision = Precision::Exact(ScalarValue::UInt32(Some(42))); + + assert_eq!( + precision.add_for_sum(&Precision::Exact(ScalarValue::UInt32(Some(23)))), + Precision::Exact(ScalarValue::UInt64(Some(65))), + ); + assert_eq!( + precision.add_for_sum(&Precision::Inexact(ScalarValue::UInt32(Some(23)))), + Precision::Inexact(ScalarValue::UInt64(Some(65))), + ); + } + #[test] fn test_sub() { let precision1 = Precision::Exact(42); @@ -1239,7 +1325,7 @@ mod tests { ); assert_eq!( col1_stats.sum_value, - Precision::Exact(ScalarValue::Int32(Some(1100))) + Precision::Exact(ScalarValue::Int64(Some(1100))) ); // 500 + 600 let col2_stats = &summary_stats.column_statistics[1]; @@ -1254,7 +1340,7 @@ mod tests { ); assert_eq!( col2_stats.sum_value, - Precision::Exact(ScalarValue::Int32(Some(2200))) + Precision::Exact(ScalarValue::Int64(Some(2200))) ); // 1000 + 1200 } diff --git a/datafusion/datasource/src/statistics.rs b/datafusion/datasource/src/statistics.rs index b1a56e096c222..e5a1e4613b3d4 100644 --- a/datafusion/datasource/src/statistics.rs +++ b/datafusion/datasource/src/statistics.rs @@ -293,7 +293,7 @@ fn sort_columns_from_physical_sort_exprs( since = "47.0.0", note = "Please use `get_files_with_limit` and `compute_all_files_statistics` instead" )] -#[expect(unused)] +#[cfg_attr(not(test), expect(unused))] pub async fn get_statistics_with_limit( all_files: impl Stream)>>, file_schema: SchemaRef, @@ -329,7 +329,7 @@ pub async fn get_statistics_with_limit( col_stats_set[index].null_count = file_column.null_count; col_stats_set[index].max_value = file_column.max_value; col_stats_set[index].min_value = file_column.min_value; - col_stats_set[index].sum_value = file_column.sum_value; + col_stats_set[index].sum_value = file_column.sum_value.cast_to_sum_type(); } // If the number of rows exceeds the limit, we can stop processing @@ -374,7 +374,7 @@ pub async fn get_statistics_with_limit( col_stats.null_count = col_stats.null_count.add(file_nc); col_stats.max_value = col_stats.max_value.max(file_max); col_stats.min_value = col_stats.min_value.min(file_min); - col_stats.sum_value = col_stats.sum_value.add(file_sum); + col_stats.sum_value = col_stats.sum_value.add_for_sum(file_sum); col_stats.byte_size = col_stats.byte_size.add(file_sbs); } @@ -497,3 +497,78 @@ pub fn add_row_stats( ) -> Precision { file_num_rows.add(&num_rows) } + +#[cfg(test)] +mod tests { + use super::*; + use crate::PartitionedFile; + use arrow::datatypes::{DataType, Field, Schema}; + use futures::stream; + + fn file_stats(sum: u32) -> Statistics { + Statistics { + num_rows: Precision::Exact(1), + total_byte_size: Precision::Exact(4), + column_statistics: vec![ColumnStatistics { + null_count: Precision::Exact(0), + max_value: Precision::Exact(ScalarValue::UInt32(Some(sum))), + min_value: Precision::Exact(ScalarValue::UInt32(Some(sum))), + sum_value: Precision::Exact(ScalarValue::UInt32(Some(sum))), + distinct_count: Precision::Exact(1), + byte_size: Precision::Exact(4), + }], + } + } + + #[tokio::test] + #[expect(deprecated)] + async fn test_get_statistics_with_limit_casts_first_file_sum_to_sum_type() + -> Result<()> { + let schema = + Arc::new(Schema::new(vec![Field::new("c1", DataType::UInt32, true)])); + + let files = stream::iter(vec![Ok(( + PartitionedFile::new("f1.parquet", 1), + Arc::new(file_stats(100)), + ))]); + + let (_group, stats) = + get_statistics_with_limit(files, schema, None, false).await?; + + assert_eq!( + stats.column_statistics[0].sum_value, + Precision::Exact(ScalarValue::UInt64(Some(100))) + ); + + Ok(()) + } + + #[tokio::test] + #[expect(deprecated)] + async fn test_get_statistics_with_limit_merges_sum_with_unsigned_widening() + -> Result<()> { + let schema = + Arc::new(Schema::new(vec![Field::new("c1", DataType::UInt32, true)])); + + let files = stream::iter(vec![ + Ok(( + PartitionedFile::new("f1.parquet", 1), + Arc::new(file_stats(100)), + )), + Ok(( + PartitionedFile::new("f2.parquet", 1), + Arc::new(file_stats(200)), + )), + ]); + + let (_group, stats) = + get_statistics_with_limit(files, schema, None, true).await?; + + assert_eq!( + stats.column_statistics[0].sum_value, + Precision::Exact(ScalarValue::UInt64(Some(300))) + ); + + Ok(()) + } +} diff --git a/datafusion/physical-expr/src/projection.rs b/datafusion/physical-expr/src/projection.rs index dbbd289415277..de6377bddc012 100644 --- a/datafusion/physical-expr/src/projection.rs +++ b/datafusion/physical-expr/src/projection.rs @@ -693,12 +693,15 @@ impl ProjectionExprs { Precision::Absent }; - let sum_value = Precision::::from(stats.num_rows) - .cast_to(&value.data_type()) - .ok() - .map(|row_count| { - Precision::Exact(value.clone()).multiply(&row_count) + let widened_sum = Precision::Exact(value.clone()).cast_to_sum_type(); + let sum_value = widened_sum + .get_value() + .and_then(|sum| { + Precision::::from(stats.num_rows) + .cast_to(&sum.data_type()) + .ok() }) + .map(|row_count| widened_sum.multiply(&row_count)) .unwrap_or(Precision::Absent); ColumnStatistics { @@ -2866,6 +2869,35 @@ pub(crate) mod tests { Ok(()) } + #[test] + fn test_project_statistics_with_i32_literal_sum_widens_to_i64() -> Result<()> { + let input_stats = get_stats(); + let input_schema = get_schema(); + + let projection = ProjectionExprs::new(vec![ + ProjectionExpr { + expr: Arc::new(Literal::new(ScalarValue::Int32(Some(10)))), + alias: "constant".to_string(), + }, + ProjectionExpr { + expr: Arc::new(Column::new("col0", 0)), + alias: "num".to_string(), + }, + ]); + + let output_stats = projection.project_statistics( + input_stats, + &projection.project_schema(&input_schema)?, + )?; + + assert_eq!( + output_stats.column_statistics[0].sum_value, + Precision::Exact(ScalarValue::Int64(Some(50))) + ); + + Ok(()) + } + // Test statistics calculation for NULL literal (constant NULL column) #[test] fn test_project_statistics_with_null_literal() -> Result<()> { diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index a895f69dc5138..b64de91d95997 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -458,32 +458,34 @@ fn stats_cartesian_product( // Min, max and distinct_count on the other hand are invariants. let cross_join_stats = left_col_stats .into_iter() - .map(|s| ColumnStatistics { - null_count: s.null_count.multiply(&right_row_count), - distinct_count: s.distinct_count, - min_value: s.min_value, - max_value: s.max_value, - sum_value: s - .sum_value - .get_value() - // Cast the row count into the same type as any existing sum value - .and_then(|v| { - Precision::::from(right_row_count) - .cast_to(&v.data_type()) - .ok() - }) - .map(|row_count| s.sum_value.multiply(&row_count)) - .unwrap_or(Precision::Absent), - byte_size: Precision::Absent, + .map(|s| { + let widened_sum = s.sum_value.cast_to_sum_type(); + ColumnStatistics { + null_count: s.null_count.multiply(&right_row_count), + distinct_count: s.distinct_count, + min_value: s.min_value, + max_value: s.max_value, + sum_value: widened_sum + .get_value() + // Cast the row count into the same type as any existing sum value + .and_then(|v| { + Precision::::from(right_row_count) + .cast_to(&v.data_type()) + .ok() + }) + .map(|row_count| widened_sum.multiply(&row_count)) + .unwrap_or(Precision::Absent), + byte_size: Precision::Absent, + } }) .chain(right_col_stats.into_iter().map(|s| { + let widened_sum = s.sum_value.cast_to_sum_type(); ColumnStatistics { null_count: s.null_count.multiply(&left_row_count), distinct_count: s.distinct_count, min_value: s.min_value, max_value: s.max_value, - sum_value: s - .sum_value + sum_value: widened_sum .get_value() // Cast the row count into the same type as any existing sum value .and_then(|v| { @@ -491,7 +493,7 @@ fn stats_cartesian_product( .cast_to(&v.data_type()) .ok() }) - .map(|row_count| s.sum_value.multiply(&row_count)) + .map(|row_count| widened_sum.multiply(&row_count)) .unwrap_or(Precision::Absent), byte_size: Precision::Absent, } @@ -875,6 +877,49 @@ mod tests { assert_eq!(result, expected); } + #[tokio::test] + async fn test_stats_cartesian_product_unsigned_sum_widens_to_u64() { + let left_row_count = 2; + let right_row_count = 3; + + let left = Statistics { + num_rows: Precision::Exact(left_row_count), + total_byte_size: Precision::Exact(10), + column_statistics: vec![ColumnStatistics { + distinct_count: Precision::Exact(2), + max_value: Precision::Exact(ScalarValue::UInt32(Some(10))), + min_value: Precision::Exact(ScalarValue::UInt32(Some(1))), + sum_value: Precision::Exact(ScalarValue::UInt32(Some(7))), + null_count: Precision::Exact(0), + byte_size: Precision::Absent, + }], + }; + + let right = Statistics { + num_rows: Precision::Exact(right_row_count), + total_byte_size: Precision::Exact(10), + column_statistics: vec![ColumnStatistics { + distinct_count: Precision::Exact(3), + max_value: Precision::Exact(ScalarValue::UInt32(Some(12))), + min_value: Precision::Exact(ScalarValue::UInt32(Some(0))), + sum_value: Precision::Exact(ScalarValue::UInt32(Some(11))), + null_count: Precision::Exact(0), + byte_size: Precision::Absent, + }], + }; + + let result = stats_cartesian_product(left, right); + + assert_eq!( + result.column_statistics[0].sum_value, + Precision::Exact(ScalarValue::UInt64(Some(21))) + ); + assert_eq!( + result.column_statistics[1].sum_value, + Precision::Exact(ScalarValue::UInt64(Some(22))) + ); + } + #[tokio::test] async fn test_join() -> Result<()> { let task_ctx = Arc::new(TaskContext::default()); diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index dafcd6ee4014d..77f73318fdad3 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -857,7 +857,7 @@ fn col_stats_union( left.distinct_count = union_distinct_count(&left, right); left.min_value = left.min_value.min(&right.min_value); left.max_value = left.max_value.max(&right.max_value); - left.sum_value = left.sum_value.add(&right.sum_value); + left.sum_value = left.sum_value.add_for_sum(&right.sum_value); left.null_count = left.null_count.add(&right.null_count); left From e88ae0259fa5e3957493e40be56a8782446dbb2e Mon Sep 17 00:00:00 2001 From: Kumar Ujjawal Date: Tue, 17 Mar 2026 10:39:13 +0530 Subject: [PATCH 2/3] refactor(stats): document and normalize sum_value widening --- datafusion/common/src/stats.rs | 90 +++++++++++++++++++++------------- 1 file changed, 56 insertions(+), 34 deletions(-) diff --git a/datafusion/common/src/stats.rs b/datafusion/common/src/stats.rs index 97a396518b474..2650e94f5ea9f 100644 --- a/datafusion/common/src/stats.rs +++ b/datafusion/common/src/stats.rs @@ -22,6 +22,7 @@ use std::fmt::{self, Debug, Display}; use crate::{Result, ScalarValue}; use crate::error::_plan_err; +use crate::utils::aggregate::precision_add; use arrow::datatypes::{DataType, Schema}; /// Represents a value with a degree of certainty. `Precision` is used to @@ -210,6 +211,16 @@ impl Precision { } } + fn cast_scalar_to_sum_type(value: &ScalarValue) -> Result { + let source_type = value.data_type(); + let target_type = Self::sum_data_type(&source_type); + if source_type == target_type { + Ok(value.clone()) + } else { + value.cast_to(&target_type) + } + } + /// Calculates the sum of two (possibly inexact) [`ScalarValue`] values, /// conservatively propagating exactness information. If one of the input /// values is [`Precision::Absent`], the result is `Absent` too. @@ -240,39 +251,21 @@ impl Precision { /// This narrows overflow risk when `sum_value` statistics are merged: /// `Int8/Int16/Int32 -> Int64` and `UInt8/UInt16/UInt32 -> UInt64`. pub fn cast_to_sum_type(&self) -> Precision { - match self { - Precision::Exact(value) => { - let source_type = value.data_type(); - let target_type = Self::sum_data_type(&source_type); - if source_type == target_type { - Precision::Exact(value.clone()) - } else { - value - .cast_to(&target_type) - .map(Precision::Exact) - .unwrap_or(Precision::Absent) - } - } - Precision::Inexact(value) => { - let source_type = value.data_type(); - let target_type = Self::sum_data_type(&source_type); - if source_type == target_type { - Precision::Inexact(value.clone()) - } else { - value - .cast_to(&target_type) - .map(Precision::Inexact) - .unwrap_or(Precision::Absent) - } - } - Precision::Absent => Precision::Absent, + match (self.is_exact(), self.get_value()) { + (Some(true), Some(value)) => Self::cast_scalar_to_sum_type(value) + .map(Precision::Exact) + .unwrap_or(Precision::Absent), + (Some(false), Some(value)) => Self::cast_scalar_to_sum_type(value) + .map(Precision::Inexact) + .unwrap_or(Precision::Absent), + (_, _) => Precision::Absent, } } /// SUM-style addition with integer widening to match SQL `SUM` return /// types for smaller integral inputs. pub fn add_for_sum(&self, other: &Precision) -> Precision { - self.cast_to_sum_type().add(&other.cast_to_sum_type()) + precision_add(&self.cast_to_sum_type(), &other.cast_to_sum_type()) } /// Calculates the difference of two (possibly inexact) [`ScalarValue`] values, @@ -727,8 +720,7 @@ impl Statistics { col_stats.null_count = col_stats.null_count.add(&item_cs.null_count); col_stats.byte_size = col_stats.byte_size.add(&item_cs.byte_size); - col_stats.sum_value = - col_stats.sum_value.add_for_sum(&item_cs.sum_value); + col_stats.sum_value = col_stats.sum_value.add_for_sum(&item_cs.sum_value); col_stats.min_value = col_stats.min_value.min(&item_cs.min_value); col_stats.max_value = col_stats.max_value.max(&item_cs.max_value); } @@ -823,7 +815,15 @@ pub struct ColumnStatistics { pub max_value: Precision, /// Minimum value of column pub min_value: Precision, - /// Sum value of a column + /// Sum value of a column. + /// + /// For integral columns, values should be kept in SUM-compatible widened + /// types (`Int8/Int16/Int32 -> Int64`, `UInt8/UInt16/UInt32 -> UInt64`) to + /// reduce overflow risk during statistics propagation. + /// + /// Callers should prefer [`ColumnStatistics::with_sum_value`] for setting + /// this field and [`Precision::add_for_sum`] / + /// [`Precision::cast_to_sum_type`] for sum arithmetic. pub sum_value: Precision, /// Number of distinct values pub distinct_count: Precision, @@ -888,7 +888,19 @@ impl ColumnStatistics { /// Set the sum value pub fn with_sum_value(mut self, sum_value: Precision) -> Self { - self.sum_value = sum_value; + self.sum_value = match sum_value { + Precision::Exact(value) => { + Precision::::cast_scalar_to_sum_type(&value) + .map(Precision::Exact) + .unwrap_or(Precision::Absent) + } + Precision::Inexact(value) => { + Precision::::cast_scalar_to_sum_type(&value) + .map(Precision::Inexact) + .unwrap_or(Precision::Absent) + } + Precision::Absent => Precision::Absent, + }; self } @@ -1735,6 +1747,16 @@ mod tests { assert_eq!(col_stats.byte_size, Precision::Exact(8192)); } + #[test] + fn test_with_sum_value_builder_widens_small_integers() { + let col_stats = ColumnStatistics::new_unknown() + .with_sum_value(Precision::Exact(ScalarValue::UInt32(Some(123)))); + assert_eq!( + col_stats.sum_value, + Precision::Exact(ScalarValue::UInt64(Some(123))) + ); + } + #[test] fn test_with_fetch_scales_byte_size() { // Test that byte_size is scaled by the row ratio in with_fetch @@ -1882,7 +1904,7 @@ mod tests { ); assert_eq!( col1_stats.sum_value, - Precision::Exact(ScalarValue::Int32(Some(1100))) + Precision::Exact(ScalarValue::Int64(Some(1100))) ); let col2_stats = &summary_stats.column_statistics[1]; @@ -1897,7 +1919,7 @@ mod tests { ); assert_eq!( col2_stats.sum_value, - Precision::Exact(ScalarValue::Int32(Some(2200))) + Precision::Exact(ScalarValue::Int64(Some(2200))) ); } @@ -2245,7 +2267,7 @@ mod tests { ); assert_eq!( col_stats.sum_value, - Precision::Inexact(ScalarValue::Int32(Some(1500))) + Precision::Inexact(ScalarValue::Int64(Some(1500))) ); } } From 698bc1464e3470f8282ee2e6234a75701211bedb Mon Sep 17 00:00:00 2001 From: Kumar Ujjawal Date: Tue, 17 Mar 2026 14:30:05 +0530 Subject: [PATCH 3/3] fix doc --- datafusion/common/src/stats.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/common/src/stats.rs b/datafusion/common/src/stats.rs index 2650e94f5ea9f..c509a7265507a 100644 --- a/datafusion/common/src/stats.rs +++ b/datafusion/common/src/stats.rs @@ -660,7 +660,7 @@ impl Statistics { /// assert_eq!(merged.column_statistics[0].max_value, /// Precision::Exact(ScalarValue::from(200))); /// assert_eq!(merged.column_statistics[0].sum_value, - /// Precision::Exact(ScalarValue::from(1500))); + /// Precision::Exact(ScalarValue::Int64(Some(1500)))); /// ``` pub fn try_merge_iter<'a, I>(items: I, schema: &Schema) -> Result where