Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 120 additions & 12 deletions datafusion/common/src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,24 @@ impl Precision<usize> {
}

impl Precision<ScalarValue> {
fn sum_data_type(data_type: &DataType) -> DataType {
match data_type {
DataType::Int8 | DataType::Int16 | DataType::Int32 => DataType::Int64,
DataType::UInt8 | DataType::UInt16 | DataType::UInt32 => DataType::UInt64,
_ => data_type.clone(),
}
}

fn cast_scalar_to_sum_type(value: &ScalarValue) -> Result<ScalarValue> {
let source_type = value.data_type();
let target_type = Self::sum_data_type(&source_type);
if source_type == target_type {
Ok(value.clone())
} else {
value.cast_to(&target_type)
}
}

/// Calculates the sum of two (possibly inexact) [`ScalarValue`] values,
/// conservatively propagating exactness information. If one of the input
/// values is [`Precision::Absent`], the result is `Absent` too.
Expand All @@ -228,6 +246,28 @@ impl Precision<ScalarValue> {
}
}

/// Casts integer values to the wider SQL `SUM` return type.
///
/// This narrows overflow risk when `sum_value` statistics are merged:
/// `Int8/Int16/Int32 -> Int64` and `UInt8/UInt16/UInt32 -> UInt64`.
pub fn cast_to_sum_type(&self) -> Precision<ScalarValue> {
match (self.is_exact(), self.get_value()) {
(Some(true), Some(value)) => Self::cast_scalar_to_sum_type(value)
.map(Precision::Exact)
.unwrap_or(Precision::Absent),
(Some(false), Some(value)) => Self::cast_scalar_to_sum_type(value)
.map(Precision::Inexact)
.unwrap_or(Precision::Absent),
(_, _) => Precision::Absent,
}
}

/// SUM-style addition with integer widening to match SQL `SUM` return
/// types for smaller integral inputs.
pub fn add_for_sum(&self, other: &Precision<ScalarValue>) -> Precision<ScalarValue> {
precision_add(&self.cast_to_sum_type(), &other.cast_to_sum_type())
}

/// Calculates the difference of two (possibly inexact) [`ScalarValue`] values,
/// conservatively propagating exactness information. If one of the input
/// values is [`Precision::Absent`], the result is `Absent` too.
Expand Down Expand Up @@ -620,7 +660,7 @@ impl Statistics {
/// assert_eq!(merged.column_statistics[0].max_value,
/// Precision::Exact(ScalarValue::from(200)));
/// assert_eq!(merged.column_statistics[0].sum_value,
/// Precision::Exact(ScalarValue::from(1500)));
/// Precision::Exact(ScalarValue::Int64(Some(1500))));
/// ```
pub fn try_merge_iter<'a, I>(items: I, schema: &Schema) -> Result<Statistics>
where
Expand Down Expand Up @@ -671,17 +711,16 @@ impl Statistics {
.collect();

// Accumulate all statistics in a single pass.
// Uses precision_add for sum (avoids the expensive
// ScalarValue::add round-trip through Arrow arrays), and
// Uses add_for_sum to keep integer sum_value statistics in
// SUM-compatible widened types, and
// Precision::min/max which use cheap PartialOrd comparison.
for stat in items.iter().skip(1) {
for (col_idx, col_stats) in column_statistics.iter_mut().enumerate() {
let item_cs = &stat.column_statistics[col_idx];

col_stats.null_count = col_stats.null_count.add(&item_cs.null_count);
col_stats.byte_size = col_stats.byte_size.add(&item_cs.byte_size);
col_stats.sum_value =
precision_add(&col_stats.sum_value, &item_cs.sum_value);
col_stats.sum_value = col_stats.sum_value.add_for_sum(&item_cs.sum_value);
col_stats.min_value = col_stats.min_value.min(&item_cs.min_value);
col_stats.max_value = col_stats.max_value.max(&item_cs.max_value);
}
Expand Down Expand Up @@ -776,7 +815,15 @@ pub struct ColumnStatistics {
pub max_value: Precision<ScalarValue>,
/// Minimum value of column
pub min_value: Precision<ScalarValue>,
/// Sum value of a column
/// Sum value of a column.
///
/// For integral columns, values should be kept in SUM-compatible widened
/// types (`Int8/Int16/Int32 -> Int64`, `UInt8/UInt16/UInt32 -> UInt64`) to
/// reduce overflow risk during statistics propagation.
///
/// Callers should prefer [`ColumnStatistics::with_sum_value`] for setting
/// this field and [`Precision<ScalarValue>::add_for_sum`] /
/// [`Precision<ScalarValue>::cast_to_sum_type`] for sum arithmetic.
pub sum_value: Precision<ScalarValue>,
/// Number of distinct values
pub distinct_count: Precision<usize>,
Expand Down Expand Up @@ -841,7 +888,19 @@ impl ColumnStatistics {

/// Set the sum value
pub fn with_sum_value(mut self, sum_value: Precision<ScalarValue>) -> Self {
self.sum_value = sum_value;
self.sum_value = match sum_value {
Precision::Exact(value) => {
Precision::<ScalarValue>::cast_scalar_to_sum_type(&value)
.map(Precision::Exact)
.unwrap_or(Precision::Absent)
}
Precision::Inexact(value) => {
Precision::<ScalarValue>::cast_scalar_to_sum_type(&value)
.map(Precision::Inexact)
.unwrap_or(Precision::Absent)
}
Precision::Absent => Precision::Absent,
};
self
}

Expand Down Expand Up @@ -994,6 +1053,45 @@ mod tests {
assert_eq!(precision.add(&Precision::Absent), Precision::Absent);
}

#[test]
fn test_add_for_sum_scalar_integer_widening() {
let precision = Precision::Exact(ScalarValue::Int32(Some(42)));

assert_eq!(
precision.add_for_sum(&Precision::Exact(ScalarValue::Int32(Some(23)))),
Precision::Exact(ScalarValue::Int64(Some(65))),
);
assert_eq!(
precision.add_for_sum(&Precision::Inexact(ScalarValue::Int32(Some(23)))),
Precision::Inexact(ScalarValue::Int64(Some(65))),
);
}

#[test]
fn test_add_for_sum_prevents_int32_overflow() {
let lhs = Precision::Exact(ScalarValue::Int32(Some(i32::MAX)));
let rhs = Precision::Exact(ScalarValue::Int32(Some(1)));

assert_eq!(
lhs.add_for_sum(&rhs),
Precision::Exact(ScalarValue::Int64(Some(i64::from(i32::MAX) + 1))),
);
}

#[test]
fn test_add_for_sum_scalar_unsigned_integer_widening() {
let precision = Precision::Exact(ScalarValue::UInt32(Some(42)));

assert_eq!(
precision.add_for_sum(&Precision::Exact(ScalarValue::UInt32(Some(23)))),
Precision::Exact(ScalarValue::UInt64(Some(65))),
);
assert_eq!(
precision.add_for_sum(&Precision::Inexact(ScalarValue::UInt32(Some(23)))),
Precision::Inexact(ScalarValue::UInt64(Some(65))),
);
}

#[test]
fn test_sub() {
let precision1 = Precision::Exact(42);
Expand Down Expand Up @@ -1239,7 +1337,7 @@ mod tests {
);
assert_eq!(
col1_stats.sum_value,
Precision::Exact(ScalarValue::Int32(Some(1100)))
Precision::Exact(ScalarValue::Int64(Some(1100)))
); // 500 + 600

let col2_stats = &summary_stats.column_statistics[1];
Expand All @@ -1254,7 +1352,7 @@ mod tests {
);
assert_eq!(
col2_stats.sum_value,
Precision::Exact(ScalarValue::Int32(Some(2200)))
Precision::Exact(ScalarValue::Int64(Some(2200)))
); // 1000 + 1200
}

Expand Down Expand Up @@ -1649,6 +1747,16 @@ mod tests {
assert_eq!(col_stats.byte_size, Precision::Exact(8192));
}

#[test]
fn test_with_sum_value_builder_widens_small_integers() {
let col_stats = ColumnStatistics::new_unknown()
.with_sum_value(Precision::Exact(ScalarValue::UInt32(Some(123))));
assert_eq!(
col_stats.sum_value,
Precision::Exact(ScalarValue::UInt64(Some(123)))
);
}

#[test]
fn test_with_fetch_scales_byte_size() {
// Test that byte_size is scaled by the row ratio in with_fetch
Expand Down Expand Up @@ -1796,7 +1904,7 @@ mod tests {
);
assert_eq!(
col1_stats.sum_value,
Precision::Exact(ScalarValue::Int32(Some(1100)))
Precision::Exact(ScalarValue::Int64(Some(1100)))
);

let col2_stats = &summary_stats.column_statistics[1];
Expand All @@ -1811,7 +1919,7 @@ mod tests {
);
assert_eq!(
col2_stats.sum_value,
Precision::Exact(ScalarValue::Int32(Some(2200)))
Precision::Exact(ScalarValue::Int64(Some(2200)))
);
}

Expand Down Expand Up @@ -2159,7 +2267,7 @@ mod tests {
);
assert_eq!(
col_stats.sum_value,
Precision::Inexact(ScalarValue::Int32(Some(1500)))
Precision::Inexact(ScalarValue::Int64(Some(1500)))
);
}
}
81 changes: 78 additions & 3 deletions datafusion/datasource/src/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ fn sort_columns_from_physical_sort_exprs(
since = "47.0.0",
note = "Please use `get_files_with_limit` and `compute_all_files_statistics` instead"
)]
#[expect(unused)]
#[cfg_attr(not(test), expect(unused))]
pub async fn get_statistics_with_limit(
all_files: impl Stream<Item = Result<(PartitionedFile, Arc<Statistics>)>>,
file_schema: SchemaRef,
Expand Down Expand Up @@ -329,7 +329,7 @@ pub async fn get_statistics_with_limit(
col_stats_set[index].null_count = file_column.null_count;
col_stats_set[index].max_value = file_column.max_value;
col_stats_set[index].min_value = file_column.min_value;
col_stats_set[index].sum_value = file_column.sum_value;
col_stats_set[index].sum_value = file_column.sum_value.cast_to_sum_type();
}

// If the number of rows exceeds the limit, we can stop processing
Expand Down Expand Up @@ -374,7 +374,7 @@ pub async fn get_statistics_with_limit(
col_stats.null_count = col_stats.null_count.add(file_nc);
col_stats.max_value = col_stats.max_value.max(file_max);
col_stats.min_value = col_stats.min_value.min(file_min);
col_stats.sum_value = col_stats.sum_value.add(file_sum);
col_stats.sum_value = col_stats.sum_value.add_for_sum(file_sum);
col_stats.byte_size = col_stats.byte_size.add(file_sbs);
}

Expand Down Expand Up @@ -497,3 +497,78 @@ pub fn add_row_stats(
) -> Precision<usize> {
file_num_rows.add(&num_rows)
}

#[cfg(test)]
mod tests {
use super::*;
use crate::PartitionedFile;
use arrow::datatypes::{DataType, Field, Schema};
use futures::stream;

fn file_stats(sum: u32) -> Statistics {
Statistics {
num_rows: Precision::Exact(1),
total_byte_size: Precision::Exact(4),
column_statistics: vec![ColumnStatistics {
null_count: Precision::Exact(0),
max_value: Precision::Exact(ScalarValue::UInt32(Some(sum))),
min_value: Precision::Exact(ScalarValue::UInt32(Some(sum))),
sum_value: Precision::Exact(ScalarValue::UInt32(Some(sum))),
distinct_count: Precision::Exact(1),
byte_size: Precision::Exact(4),
}],
}
}

#[tokio::test]
#[expect(deprecated)]
async fn test_get_statistics_with_limit_casts_first_file_sum_to_sum_type()
-> Result<()> {
let schema =
Arc::new(Schema::new(vec![Field::new("c1", DataType::UInt32, true)]));

let files = stream::iter(vec![Ok((
PartitionedFile::new("f1.parquet", 1),
Arc::new(file_stats(100)),
))]);

let (_group, stats) =
get_statistics_with_limit(files, schema, None, false).await?;

assert_eq!(
stats.column_statistics[0].sum_value,
Precision::Exact(ScalarValue::UInt64(Some(100)))
);

Ok(())
}

#[tokio::test]
#[expect(deprecated)]
async fn test_get_statistics_with_limit_merges_sum_with_unsigned_widening()
-> Result<()> {
let schema =
Arc::new(Schema::new(vec![Field::new("c1", DataType::UInt32, true)]));

let files = stream::iter(vec![
Ok((
PartitionedFile::new("f1.parquet", 1),
Arc::new(file_stats(100)),
)),
Ok((
PartitionedFile::new("f2.parquet", 1),
Arc::new(file_stats(200)),
)),
]);

let (_group, stats) =
get_statistics_with_limit(files, schema, None, true).await?;

assert_eq!(
stats.column_statistics[0].sum_value,
Precision::Exact(ScalarValue::UInt64(Some(300)))
);

Ok(())
}
}
42 changes: 37 additions & 5 deletions datafusion/physical-expr/src/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -693,12 +693,15 @@ impl ProjectionExprs {
Precision::Absent
};

let sum_value = Precision::<ScalarValue>::from(stats.num_rows)
.cast_to(&value.data_type())
.ok()
.map(|row_count| {
Precision::Exact(value.clone()).multiply(&row_count)
let widened_sum = Precision::Exact(value.clone()).cast_to_sum_type();
let sum_value = widened_sum
.get_value()
.and_then(|sum| {
Precision::<ScalarValue>::from(stats.num_rows)
.cast_to(&sum.data_type())
.ok()
})
.map(|row_count| widened_sum.multiply(&row_count))
.unwrap_or(Precision::Absent);

ColumnStatistics {
Expand Down Expand Up @@ -2866,6 +2869,35 @@ pub(crate) mod tests {
Ok(())
}

#[test]
fn test_project_statistics_with_i32_literal_sum_widens_to_i64() -> Result<()> {
let input_stats = get_stats();
let input_schema = get_schema();

let projection = ProjectionExprs::new(vec![
ProjectionExpr {
expr: Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
alias: "constant".to_string(),
},
ProjectionExpr {
expr: Arc::new(Column::new("col0", 0)),
alias: "num".to_string(),
},
]);

let output_stats = projection.project_statistics(
input_stats,
&projection.project_schema(&input_schema)?,
)?;

assert_eq!(
output_stats.column_statistics[0].sum_value,
Precision::Exact(ScalarValue::Int64(Some(50)))
);

Ok(())
}

// Test statistics calculation for NULL literal (constant NULL column)
#[test]
fn test_project_statistics_with_null_literal() -> Result<()> {
Expand Down
Loading
Loading