-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Push partition_statistics into DataSource #18233
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
| if let Some(partition) = partition { | ||
| let mut statistics = Statistics::new_unknown(&self.schema()); | ||
| if let Some(file_config) = | ||
| self.data_source.as_any().downcast_ref::<FileScanConfig>() | ||
| { | ||
| if let Some(file_group) = file_config.file_groups.get(partition) { | ||
| if let Some(stat) = file_group.file_statistics(None) { | ||
| statistics = stat.clone(); | ||
| } | ||
| } | ||
| } | ||
| Ok(statistics) | ||
| } else { | ||
| Ok(self.data_source.statistics()?) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was clearly buggy: in the partition = None path it returns projected statistics (see current implementation in FileScanConfig::statistics), if it took the Some(partition) path it calculated the statistics but then never projected them.
| #[test] | ||
| fn test_partition_statistics_projection() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test demonstrates the bug and fails on main
| fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> { | ||
| if let Some(partition) = partition { | ||
| // Compute statistics for a specific partition | ||
| if let Some(batches) = self.partitions.get(partition) { | ||
| Ok(common::compute_record_batch_statistics( | ||
| from_ref(&batches), | ||
| &self.schema, | ||
| self.projection.clone(), | ||
| )) | ||
| } else { | ||
| // Invalid partition index | ||
| Ok(Statistics::new_unknown(&self.projected_schema)) | ||
| } | ||
| } else { | ||
| // Compute statistics across all partitions | ||
| Ok(common::compute_record_batch_statistics( | ||
| &self.partitions, | ||
| &self.schema, | ||
| self.projection.clone(), | ||
| )) | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously unimplemented because only FileScanConfig was handled via downcast matching.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @adriangb
| SchedulingType::NonCooperative | ||
| } | ||
| fn statistics(&self) -> Result<Statistics>; | ||
| fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an API changing, I think we should make statistics as deprecated, and add a new partition_statistics API
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call. I added fn statistics() back to DataSource trait as a deprecated method and made it default to partition_statistics(None). This should make it fully backwards compatible for all implementations.
Removes a downcast match in favor of use of the trait. This mirrors the changes to DataSourceExec to use partition_statistics instead of statistics.
c8fefdb to
f2b0b8c
Compare
Removes a downcast match in favor of use of the trait. This mirrors the changes to DataSourceExec to use partition_statistics instead of statistics from #15852