From 1386ec2b229f7efce92c1b29deec8a3e4cf4bc7f Mon Sep 17 00:00:00 2001 From: buraksenn Date: Tue, 10 Mar 2026 13:47:27 +0300 Subject: [PATCH 1/4] initial implementation --- .../aggregate_statistics.rs | 160 +++++++++++++++++- datafusion/functions-aggregate/src/count.rs | 43 ++--- 2 files changed, 182 insertions(+), 21 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/aggregate_statistics.rs b/datafusion/core/tests/physical_optimizer/aggregate_statistics.rs index 4218f76fa135a..b9dce0866f215 100644 --- a/datafusion/core/tests/physical_optimizer/aggregate_statistics.rs +++ b/datafusion/core/tests/physical_optimizer/aggregate_statistics.rs @@ -23,16 +23,23 @@ use arrow::array::Int32Array; use arrow::array::{Int64Array, StringArray}; use arrow::datatypes::{DataType, Field, Schema}; use arrow::record_batch::RecordBatch; +use datafusion::datasource::listing::PartitionedFile; use datafusion::datasource::memory::MemTable; use datafusion::datasource::memory::MemorySourceConfig; +use datafusion::datasource::physical_plan::ParquetSource; use datafusion::datasource::source::DataSourceExec; use datafusion::prelude::{SessionConfig, SessionContext}; -use datafusion_common::Result; use datafusion_common::assert_batches_eq; use datafusion_common::cast::as_int64_array; use datafusion_common::config::ConfigOptions; +use datafusion_common::stats::Precision; +use datafusion_common::{ColumnStatistics, Result, Statistics}; +use datafusion_datasource::file_scan_config::FileScanConfigBuilder; use datafusion_execution::TaskContext; +use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_expr::Operator; +use datafusion_functions_aggregate::count::count_udaf; +use datafusion_physical_expr::aggregate::AggregateExprBuilder; use datafusion_physical_expr::expressions::{self, cast}; use datafusion_physical_optimizer::PhysicalOptimizerRule; use datafusion_physical_optimizer::aggregate_statistics::AggregateStatistics; @@ -402,3 +409,154 @@ async fn utf8_grouping_min_max_limit_fallbacks() -> Result<()> { Ok(()) } + +fn mock_data_with_distinct_count( + distinct_count: Precision, +) -> Arc { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Int32, true), + ])); + + let statistics = Statistics { + num_rows: Precision::Exact(100), + total_byte_size: Precision::Absent, + column_statistics: vec![ + ColumnStatistics { + distinct_count, + null_count: Precision::Exact(10), + ..Default::default() + }, + ColumnStatistics::default(), + ], + }; + + let config = FileScanConfigBuilder::new( + ObjectStoreUrl::parse("test:///").unwrap(), + Arc::new(ParquetSource::new(Arc::clone(&schema))), + ) + .with_file(PartitionedFile::new("x".to_string(), 100)) + .with_statistics(statistics) + .build(); + + DataSourceExec::from_data_source(config) +} + +#[tokio::test] +async fn test_count_distinct_with_exact_statistics() -> Result<()> { + let source = mock_data_with_distinct_count(Precision::Exact(42)); + let schema = source.schema(); + + let count_distinct_expr = + AggregateExprBuilder::new(count_udaf(), vec![expressions::col("a", &schema)?]) + .schema(Arc::clone(&schema)) + .alias("COUNT(DISTINCT a)") + .distinct() + .build()?; + + let partial_agg = AggregateExec::try_new( + AggregateMode::Partial, + PhysicalGroupBy::default(), + vec![Arc::new(count_distinct_expr.clone())], + vec![None], + source, + Arc::clone(&schema), + )?; + + let final_agg = AggregateExec::try_new( + AggregateMode::Final, + PhysicalGroupBy::default(), + vec![Arc::new(count_distinct_expr)], + vec![None], + Arc::new(partial_agg), + Arc::clone(&schema), + )?; + + let conf = ConfigOptions::new(); + let optimized = AggregateStatistics::new().optimize(Arc::new(final_agg), &conf)?; + + assert!(optimized.as_any().is::()); + + let task_ctx = Arc::new(TaskContext::default()); + let result = common::collect(optimized.execute(0, task_ctx)?).await?; + assert_eq!(result.len(), 1); + assert_eq!(as_int64_array(result[0].column(0)).unwrap().values(), &[42]); + + Ok(()) +} + +#[tokio::test] +async fn test_count_distinct_with_absent_statistics() -> Result<()> { + let source = mock_data_with_distinct_count(Precision::Absent); + let schema = source.schema(); + + let count_distinct_expr = + AggregateExprBuilder::new(count_udaf(), vec![expressions::col("a", &schema)?]) + .schema(Arc::clone(&schema)) + .alias("COUNT(DISTINCT a)") + .distinct() + .build()?; + + let partial_agg = AggregateExec::try_new( + AggregateMode::Partial, + PhysicalGroupBy::default(), + vec![Arc::new(count_distinct_expr.clone())], + vec![None], + source, + Arc::clone(&schema), + )?; + + let final_agg = AggregateExec::try_new( + AggregateMode::Final, + PhysicalGroupBy::default(), + vec![Arc::new(count_distinct_expr)], + vec![None], + Arc::new(partial_agg), + Arc::clone(&schema), + )?; + + let conf = ConfigOptions::new(); + let optimized = AggregateStatistics::new().optimize(Arc::new(final_agg), &conf)?; + + assert!(optimized.as_any().is::()); + + Ok(()) +} + +#[tokio::test] +async fn test_count_distinct_with_inexact_statistics() -> Result<()> { + let source = mock_data_with_distinct_count(Precision::Inexact(42)); + let schema = source.schema(); + + let count_distinct_expr = + AggregateExprBuilder::new(count_udaf(), vec![expressions::col("a", &schema)?]) + .schema(Arc::clone(&schema)) + .alias("COUNT(DISTINCT a)") + .distinct() + .build()?; + + let partial_agg = AggregateExec::try_new( + AggregateMode::Partial, + PhysicalGroupBy::default(), + vec![Arc::new(count_distinct_expr.clone())], + vec![None], + source, + Arc::clone(&schema), + )?; + + let final_agg = AggregateExec::try_new( + AggregateMode::Final, + PhysicalGroupBy::default(), + vec![Arc::new(count_distinct_expr)], + vec![None], + Arc::new(partial_agg), + Arc::clone(&schema), + )?; + + let conf = ConfigOptions::new(); + let optimized = AggregateStatistics::new().optimize(Arc::new(final_agg), &conf)?; + + assert!(optimized.as_any().is::()); + + Ok(()) +} diff --git a/datafusion/functions-aggregate/src/count.rs b/datafusion/functions-aggregate/src/count.rs index 376cf39745903..a358bc0c2484e 100644 --- a/datafusion/functions-aggregate/src/count.rs +++ b/datafusion/functions-aggregate/src/count.rs @@ -365,31 +365,34 @@ impl AggregateUDFImpl for Count { } fn value_from_stats(&self, statistics_args: &StatisticsArgs) -> Option { + let [expr] = statistics_args.exprs else { + return None; + }; + let col_stats = &statistics_args.statistics.column_statistics; + if statistics_args.is_distinct { + let col_expr = expr.as_any().downcast_ref::()?; + if let Precision::Exact(dc) = col_stats[col_expr.index()].distinct_count { + return Some(ScalarValue::Int64(Some(dc as i64))); + } return None; } - if let Precision::Exact(num_rows) = statistics_args.statistics.num_rows - && statistics_args.exprs.len() == 1 - { - // TODO optimize with exprs other than Column - if let Some(col_expr) = statistics_args.exprs[0] - .as_any() - .downcast_ref::() - { - let current_val = &statistics_args.statistics.column_statistics - [col_expr.index()] - .null_count; - if let &Precision::Exact(val) = current_val { - return Some(ScalarValue::Int64(Some((num_rows - val) as i64))); - } - } else if let Some(lit_expr) = statistics_args.exprs[0] - .as_any() - .downcast_ref::() - && lit_expr.value() == &COUNT_STAR_EXPANSION - { - return Some(ScalarValue::Int64(Some(num_rows as i64))); + + let Precision::Exact(num_rows) = statistics_args.statistics.num_rows else { + return None; + }; + + if let Some(col_expr) = expr.as_any().downcast_ref::() { + if let Precision::Exact(val) = col_stats[col_expr.index()].null_count { + return Some(ScalarValue::Int64(Some((num_rows - val) as i64))); } + } else if let Some(lit_expr) = + expr.as_any().downcast_ref::() + && lit_expr.value() == &COUNT_STAR_EXPANSION + { + return Some(ScalarValue::Int64(Some(num_rows as i64))); } + None } From 0a7a035b8167c87e16ce470e2255bb7cda16a913 Mon Sep 17 00:00:00 2001 From: buraksenn Date: Tue, 10 Mar 2026 14:02:26 +0300 Subject: [PATCH 2/4] additional test case and comment --- .../aggregate_statistics.rs | 76 ++++++++----------- datafusion/functions-aggregate/src/count.rs | 2 + 2 files changed, 34 insertions(+), 44 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/aggregate_statistics.rs b/datafusion/core/tests/physical_optimizer/aggregate_statistics.rs index b9dce0866f215..9e0bd5fa32bf7 100644 --- a/datafusion/core/tests/physical_optimizer/aggregate_statistics.rs +++ b/datafusion/core/tests/physical_optimizer/aggregate_statistics.rs @@ -412,7 +412,7 @@ async fn utf8_grouping_min_max_limit_fallbacks() -> Result<()> { fn mock_data_with_distinct_count( distinct_count: Precision, -) -> Arc { +) -> Arc { let schema = Arc::new(Schema::new(vec![ Field::new("a", DataType::Int32, true), Field::new("b", DataType::Int32, true), @@ -442,9 +442,10 @@ fn mock_data_with_distinct_count( DataSourceExec::from_data_source(config) } -#[tokio::test] -async fn test_count_distinct_with_exact_statistics() -> Result<()> { - let source = mock_data_with_distinct_count(Precision::Exact(42)); +fn optimize_count_distinct( + distinct_count: Precision, +) -> Result> { + let source = mock_data_with_distinct_count(distinct_count); let schema = source.schema(); let count_distinct_expr = @@ -473,7 +474,12 @@ async fn test_count_distinct_with_exact_statistics() -> Result<()> { )?; let conf = ConfigOptions::new(); - let optimized = AggregateStatistics::new().optimize(Arc::new(final_agg), &conf)?; + AggregateStatistics::new().optimize(Arc::new(final_agg), &conf) +} + +#[tokio::test] +async fn test_count_distinct_with_exact_statistics() -> Result<()> { + let optimized = optimize_count_distinct(Precision::Exact(42))?; assert!(optimized.as_any().is::()); @@ -487,53 +493,35 @@ async fn test_count_distinct_with_exact_statistics() -> Result<()> { #[tokio::test] async fn test_count_distinct_with_absent_statistics() -> Result<()> { - let source = mock_data_with_distinct_count(Precision::Absent); - let schema = source.schema(); - - let count_distinct_expr = - AggregateExprBuilder::new(count_udaf(), vec![expressions::col("a", &schema)?]) - .schema(Arc::clone(&schema)) - .alias("COUNT(DISTINCT a)") - .distinct() - .build()?; - - let partial_agg = AggregateExec::try_new( - AggregateMode::Partial, - PhysicalGroupBy::default(), - vec![Arc::new(count_distinct_expr.clone())], - vec![None], - source, - Arc::clone(&schema), - )?; - - let final_agg = AggregateExec::try_new( - AggregateMode::Final, - PhysicalGroupBy::default(), - vec![Arc::new(count_distinct_expr)], - vec![None], - Arc::new(partial_agg), - Arc::clone(&schema), - )?; - - let conf = ConfigOptions::new(); - let optimized = AggregateStatistics::new().optimize(Arc::new(final_agg), &conf)?; - + let optimized = optimize_count_distinct(Precision::Absent)?; assert!(optimized.as_any().is::()); - Ok(()) } #[tokio::test] async fn test_count_distinct_with_inexact_statistics() -> Result<()> { - let source = mock_data_with_distinct_count(Precision::Inexact(42)); + let optimized = optimize_count_distinct(Precision::Inexact(42))?; + assert!(optimized.as_any().is::()); + Ok(()) +} + +#[tokio::test] +async fn test_count_distinct_with_non_column_expr() -> Result<()> { + let source = mock_data_with_distinct_count(Precision::Exact(42)); let schema = source.schema(); - let count_distinct_expr = - AggregateExprBuilder::new(count_udaf(), vec![expressions::col("a", &schema)?]) - .schema(Arc::clone(&schema)) - .alias("COUNT(DISTINCT a)") - .distinct() - .build()?; + let expr = expressions::binary( + expressions::col("a", &schema)?, + Operator::Plus, + expressions::col("b", &schema)?, + &schema, + )?; + + let count_distinct_expr = AggregateExprBuilder::new(count_udaf(), vec![expr]) + .schema(Arc::clone(&schema)) + .alias("COUNT(DISTINCT a + b)") + .distinct() + .build()?; let partial_agg = AggregateExec::try_new( AggregateMode::Partial, diff --git a/datafusion/functions-aggregate/src/count.rs b/datafusion/functions-aggregate/src/count.rs index a358bc0c2484e..6ea9fbf96986a 100644 --- a/datafusion/functions-aggregate/src/count.rs +++ b/datafusion/functions-aggregate/src/count.rs @@ -371,6 +371,8 @@ impl AggregateUDFImpl for Count { let col_stats = &statistics_args.statistics.column_statistics; if statistics_args.is_distinct { + // Only column references can be resolved from statistics; + // expressions like casts or literals are not supported. let col_expr = expr.as_any().downcast_ref::()?; if let Precision::Exact(dc) = col_stats[col_expr.index()].distinct_count { return Some(ScalarValue::Int64(Some(dc as i64))); From 8a99533a600d2b0e63cc2e024bfdef36d6361c92 Mon Sep 17 00:00:00 2001 From: buraksenn Date: Tue, 10 Mar 2026 14:07:46 +0300 Subject: [PATCH 3/4] reintroduce deleted comment --- datafusion/functions-aggregate/src/count.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/datafusion/functions-aggregate/src/count.rs b/datafusion/functions-aggregate/src/count.rs index 6ea9fbf96986a..0a481658f2a45 100644 --- a/datafusion/functions-aggregate/src/count.rs +++ b/datafusion/functions-aggregate/src/count.rs @@ -384,6 +384,7 @@ impl AggregateUDFImpl for Count { return None; }; + // TODO optimize with exprs other than Column if let Some(col_expr) = expr.as_any().downcast_ref::() { if let Precision::Exact(val) = col_stats[col_expr.index()].null_count { return Some(ScalarValue::Int64(Some((num_rows - val) as i64))); From 9a2d11e9bd086f027b34709b5f5edda62ec96578 Mon Sep 17 00:00:00 2001 From: buraksenn Date: Tue, 10 Mar 2026 21:33:58 +0300 Subject: [PATCH 4/4] address reviews --- .../aggregate_statistics.rs | 267 +++++++++--------- datafusion/functions-aggregate/src/count.rs | 9 +- 2 files changed, 142 insertions(+), 134 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/aggregate_statistics.rs b/datafusion/core/tests/physical_optimizer/aggregate_statistics.rs index 9e0bd5fa32bf7..850f9d187780b 100644 --- a/datafusion/core/tests/physical_optimizer/aggregate_statistics.rs +++ b/datafusion/core/tests/physical_optimizer/aggregate_statistics.rs @@ -410,141 +410,146 @@ async fn utf8_grouping_min_max_limit_fallbacks() -> Result<()> { Ok(()) } -fn mock_data_with_distinct_count( - distinct_count: Precision, -) -> Arc { - let schema = Arc::new(Schema::new(vec![ - Field::new("a", DataType::Int32, true), - Field::new("b", DataType::Int32, true), - ])); - - let statistics = Statistics { - num_rows: Precision::Exact(100), - total_byte_size: Precision::Absent, - column_statistics: vec![ - ColumnStatistics { - distinct_count, - null_count: Precision::Exact(10), - ..Default::default() - }, - ColumnStatistics::default(), - ], - }; - - let config = FileScanConfigBuilder::new( - ObjectStoreUrl::parse("test:///").unwrap(), - Arc::new(ParquetSource::new(Arc::clone(&schema))), - ) - .with_file(PartitionedFile::new("x".to_string(), 100)) - .with_statistics(statistics) - .build(); - - DataSourceExec::from_data_source(config) -} - -fn optimize_count_distinct( - distinct_count: Precision, -) -> Result> { - let source = mock_data_with_distinct_count(distinct_count); - let schema = source.schema(); - - let count_distinct_expr = - AggregateExprBuilder::new(count_udaf(), vec![expressions::col("a", &schema)?]) +#[tokio::test] +async fn test_count_distinct_optimization() -> Result<()> { + struct TestCase { + name: &'static str, + distinct_count: Precision, + use_column_expr: bool, + expect_optimized: bool, + expected_value: Option, + } + + let cases = vec![ + TestCase { + name: "exact statistics", + distinct_count: Precision::Exact(42), + use_column_expr: true, + expect_optimized: true, + expected_value: Some(42), + }, + TestCase { + name: "absent statistics", + distinct_count: Precision::Absent, + use_column_expr: true, + expect_optimized: false, + expected_value: None, + }, + TestCase { + name: "inexact statistics", + distinct_count: Precision::Inexact(42), + use_column_expr: true, + expect_optimized: false, + expected_value: None, + }, + TestCase { + name: "non-column expression with exact statistics", + distinct_count: Precision::Exact(42), + use_column_expr: false, + expect_optimized: false, + expected_value: None, + }, + ]; + + for case in cases { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Int32, true), + ])); + + let statistics = Statistics { + num_rows: Precision::Exact(100), + total_byte_size: Precision::Absent, + column_statistics: vec![ + ColumnStatistics { + distinct_count: case.distinct_count, + null_count: Precision::Exact(10), + ..Default::default() + }, + ColumnStatistics::default(), + ], + }; + + let config = FileScanConfigBuilder::new( + ObjectStoreUrl::parse("test:///").unwrap(), + Arc::new(ParquetSource::new(Arc::clone(&schema))), + ) + .with_file(PartitionedFile::new("x".to_string(), 100)) + .with_statistics(statistics) + .build(); + + let source: Arc = DataSourceExec::from_data_source(config); + let schema = source.schema(); + + let (agg_args, alias): (Vec>, _) = + if case.use_column_expr { + (vec![expressions::col("a", &schema)?], "COUNT(DISTINCT a)") + } else { + ( + vec![expressions::binary( + expressions::col("a", &schema)?, + Operator::Plus, + expressions::col("b", &schema)?, + &schema, + )?], + "COUNT(DISTINCT a + b)", + ) + }; + + let count_distinct_expr = AggregateExprBuilder::new(count_udaf(), agg_args) .schema(Arc::clone(&schema)) - .alias("COUNT(DISTINCT a)") + .alias(alias) .distinct() .build()?; - let partial_agg = AggregateExec::try_new( - AggregateMode::Partial, - PhysicalGroupBy::default(), - vec![Arc::new(count_distinct_expr.clone())], - vec![None], - source, - Arc::clone(&schema), - )?; - - let final_agg = AggregateExec::try_new( - AggregateMode::Final, - PhysicalGroupBy::default(), - vec![Arc::new(count_distinct_expr)], - vec![None], - Arc::new(partial_agg), - Arc::clone(&schema), - )?; - - let conf = ConfigOptions::new(); - AggregateStatistics::new().optimize(Arc::new(final_agg), &conf) -} - -#[tokio::test] -async fn test_count_distinct_with_exact_statistics() -> Result<()> { - let optimized = optimize_count_distinct(Precision::Exact(42))?; - - assert!(optimized.as_any().is::()); - - let task_ctx = Arc::new(TaskContext::default()); - let result = common::collect(optimized.execute(0, task_ctx)?).await?; - assert_eq!(result.len(), 1); - assert_eq!(as_int64_array(result[0].column(0)).unwrap().values(), &[42]); - - Ok(()) -} - -#[tokio::test] -async fn test_count_distinct_with_absent_statistics() -> Result<()> { - let optimized = optimize_count_distinct(Precision::Absent)?; - assert!(optimized.as_any().is::()); - Ok(()) -} - -#[tokio::test] -async fn test_count_distinct_with_inexact_statistics() -> Result<()> { - let optimized = optimize_count_distinct(Precision::Inexact(42))?; - assert!(optimized.as_any().is::()); - Ok(()) -} - -#[tokio::test] -async fn test_count_distinct_with_non_column_expr() -> Result<()> { - let source = mock_data_with_distinct_count(Precision::Exact(42)); - let schema = source.schema(); - - let expr = expressions::binary( - expressions::col("a", &schema)?, - Operator::Plus, - expressions::col("b", &schema)?, - &schema, - )?; - - let count_distinct_expr = AggregateExprBuilder::new(count_udaf(), vec![expr]) - .schema(Arc::clone(&schema)) - .alias("COUNT(DISTINCT a + b)") - .distinct() - .build()?; - - let partial_agg = AggregateExec::try_new( - AggregateMode::Partial, - PhysicalGroupBy::default(), - vec![Arc::new(count_distinct_expr.clone())], - vec![None], - source, - Arc::clone(&schema), - )?; - - let final_agg = AggregateExec::try_new( - AggregateMode::Final, - PhysicalGroupBy::default(), - vec![Arc::new(count_distinct_expr)], - vec![None], - Arc::new(partial_agg), - Arc::clone(&schema), - )?; - - let conf = ConfigOptions::new(); - let optimized = AggregateStatistics::new().optimize(Arc::new(final_agg), &conf)?; - - assert!(optimized.as_any().is::()); + let partial_agg = AggregateExec::try_new( + AggregateMode::Partial, + PhysicalGroupBy::default(), + vec![Arc::new(count_distinct_expr.clone())], + vec![None], + source, + Arc::clone(&schema), + )?; + + let final_agg = AggregateExec::try_new( + AggregateMode::Final, + PhysicalGroupBy::default(), + vec![Arc::new(count_distinct_expr)], + vec![None], + Arc::new(partial_agg), + Arc::clone(&schema), + )?; + + let conf = ConfigOptions::new(); + let optimized = + AggregateStatistics::new().optimize(Arc::new(final_agg), &conf)?; + + if case.expect_optimized { + assert!( + optimized.as_any().is::(), + "'{}': expected ProjectionExec", + case.name + ); + + if let Some(expected_val) = case.expected_value { + let task_ctx = Arc::new(TaskContext::default()); + let result = common::collect(optimized.execute(0, task_ctx)?).await?; + assert_eq!(result.len(), 1, "'{}': expected 1 batch", case.name); + assert_eq!( + as_int64_array(result[0].column(0)).unwrap().values(), + &[expected_val], + "'{}': unexpected value", + case.name + ); + } + } else { + assert!( + optimized.as_any().is::(), + "'{}': expected AggregateExec (not optimized)", + case.name + ); + } + } Ok(()) } diff --git a/datafusion/functions-aggregate/src/count.rs b/datafusion/functions-aggregate/src/count.rs index 0a481658f2a45..ebe3c60a4ddde 100644 --- a/datafusion/functions-aggregate/src/count.rs +++ b/datafusion/functions-aggregate/src/count.rs @@ -375,7 +375,8 @@ impl AggregateUDFImpl for Count { // expressions like casts or literals are not supported. let col_expr = expr.as_any().downcast_ref::()?; if let Precision::Exact(dc) = col_stats[col_expr.index()].distinct_count { - return Some(ScalarValue::Int64(Some(dc as i64))); + let dc = i64::try_from(dc).ok()?; + return Some(ScalarValue::Int64(Some(dc))); } return None; } @@ -387,13 +388,15 @@ impl AggregateUDFImpl for Count { // TODO optimize with exprs other than Column if let Some(col_expr) = expr.as_any().downcast_ref::() { if let Precision::Exact(val) = col_stats[col_expr.index()].null_count { - return Some(ScalarValue::Int64(Some((num_rows - val) as i64))); + let count = i64::try_from(num_rows - val).ok()?; + return Some(ScalarValue::Int64(Some(count))); } } else if let Some(lit_expr) = expr.as_any().downcast_ref::() && lit_expr.value() == &COUNT_STAR_EXPANSION { - return Some(ScalarValue::Int64(Some(num_rows as i64))); + let num_rows = i64::try_from(num_rows).ok()?; + return Some(ScalarValue::Int64(Some(num_rows))); } None