Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion/datasource-parquet/src/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ mod writer;
pub use access_plan::{ParquetAccessPlan, RowGroupAccess};
pub use file_format::*;
pub use metrics::ParquetFileMetrics;
pub use opener::ParquetMorselizer;
pub use page_filter::PagePruningAccessPlanFilter;
pub use reader::*; // Expose so downstream crates can use it
pub use row_filter::build_row_filter;
Expand Down
1,668 changes: 1,194 additions & 474 deletions datafusion/datasource-parquet/src/opener.rs

Large diffs are not rendered by default.

143 changes: 87 additions & 56 deletions datafusion/datasource-parquet/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,22 @@ use std::sync::Arc;

use crate::DefaultParquetFileReaderFactory;
use crate::ParquetFileReaderFactory;
use crate::opener::ParquetOpener;
use crate::opener::build_pruning_predicates;
use crate::ParquetMorselizer;
use crate::opener::{
EncryptionContext, ParquetMorselizerState, build_pruning_predicates,
};
use crate::row_filter::can_expr_be_pushed_down_with_schemas;
use datafusion_common::config::ConfigOptions;
#[cfg(feature = "parquet_encryption")]
use datafusion_common::config::EncryptionFactoryOptions;
use datafusion_datasource::as_file_source;
use datafusion_datasource::file_stream::FileOpener;
use datafusion_datasource::morsel::Morselizer;

use arrow::datatypes::TimeUnit;
use datafusion_common::DataFusionError;
use datafusion_common::config::TableParquetOptions;
use datafusion_common::tree_node::TreeNodeRecursion;
use datafusion_common::{DataFusionError, internal_err};
use datafusion_datasource::TableSchema;
use datafusion_datasource::file::FileSource;
use datafusion_datasource::file_scan_config::FileScanConfig;
Expand Down Expand Up @@ -474,48 +477,12 @@ impl ParquetSource {
}
}

pub(crate) fn with_reverse_row_groups(mut self, reverse_row_groups: bool) -> Self {
self.reverse_row_groups = reverse_row_groups;
self
}
#[cfg(test)]
pub(crate) fn reverse_row_groups(&self) -> bool {
self.reverse_row_groups
}
}

/// Parses datafusion.common.config.ParquetOptions.coerce_int96 String to a arrow_schema.datatype.TimeUnit
pub(crate) fn parse_coerce_int96_string(
str_setting: &str,
) -> datafusion_common::Result<TimeUnit> {
let str_setting_lower: &str = &str_setting.to_lowercase();

match str_setting_lower {
"ns" => Ok(TimeUnit::Nanosecond),
"us" => Ok(TimeUnit::Microsecond),
"ms" => Ok(TimeUnit::Millisecond),
"s" => Ok(TimeUnit::Second),
_ => Err(DataFusionError::Configuration(format!(
"Unknown or unsupported parquet coerce_int96: \
{str_setting}. Valid values are: ns, us, ms, and s."
))),
}
}

/// Allows easy conversion from ParquetSource to Arc&lt;dyn FileSource&gt;
impl From<ParquetSource> for Arc<dyn FileSource> {
fn from(source: ParquetSource) -> Self {
as_file_source(source)
}
}

impl FileSource for ParquetSource {
fn create_file_opener(
fn create_parquet_morselizer(
&self,
object_store: Arc<dyn ObjectStore>,
base_config: &FileScanConfig,
partition: usize,
) -> datafusion_common::Result<Arc<dyn FileOpener>> {
) -> datafusion_common::Result<ParquetMorselizer> {
let expr_adapter_factory = base_config
.expr_adapter_factory
.clone()
Expand All @@ -526,14 +493,24 @@ impl FileSource for ParquetSource {
Arc::new(DefaultParquetFileReaderFactory::new(object_store)) as _
});

#[cfg(not(feature = "parquet_encryption"))]
let encryption_context = EncryptionContext::default();

#[cfg(feature = "parquet_encryption")]
let file_decryption_properties = self
.table_parquet_options()
.crypto
.file_decryption
.clone()
.map(FileDecryptionProperties::from)
.map(Arc::new);
let encryption_context = {
let file_decryption_properties = self
.table_parquet_options()
.crypto
.file_decryption
.clone()
.map(FileDecryptionProperties::from)
.map(Arc::new);

EncryptionContext::new(
file_decryption_properties,
self.get_encryption_factory_with_config(),
)
};

let coerce_int96 = self
.table_parquet_options
Expand All @@ -542,12 +519,12 @@ impl FileSource for ParquetSource {
.as_ref()
.map(|time_unit| parse_coerce_int96_string(time_unit.as_str()).unwrap());

let opener = Arc::new(ParquetOpener {
Ok(ParquetMorselizer::new(ParquetMorselizerState {
partition_index: partition,
projection: self.projection.clone(),
batch_size: self
.batch_size
.expect("Batch size must set before creating ParquetOpener"),
.expect("Batch size must set before creating ParquetMorselizer"),
limit: base_config.limit,
preserve_order: base_config.preserve_order,
predicate: self.predicate.clone(),
Expand All @@ -562,15 +539,69 @@ impl FileSource for ParquetSource {
enable_bloom_filter: self.bloom_filter_on_read(),
enable_row_group_stats_pruning: self.table_parquet_options.global.pruning,
coerce_int96,
#[cfg(feature = "parquet_encryption")]
file_decryption_properties,
expr_adapter_factory,
#[cfg(feature = "parquet_encryption")]
encryption_factory: self.get_encryption_factory_with_config(),
encryption_context,
max_predicate_cache_size: self.max_predicate_cache_size(),
reverse_row_groups: self.reverse_row_groups,
});
Ok(opener)
}))
}

pub(crate) fn with_reverse_row_groups(mut self, reverse_row_groups: bool) -> Self {
self.reverse_row_groups = reverse_row_groups;
self
}
#[cfg(test)]
pub(crate) fn reverse_row_groups(&self) -> bool {
self.reverse_row_groups
}
}

/// Parses datafusion.common.config.ParquetOptions.coerce_int96 String to a arrow_schema.datatype.TimeUnit
pub(crate) fn parse_coerce_int96_string(
str_setting: &str,
) -> datafusion_common::Result<TimeUnit> {
let str_setting_lower: &str = &str_setting.to_lowercase();

match str_setting_lower {
"ns" => Ok(TimeUnit::Nanosecond),
"us" => Ok(TimeUnit::Microsecond),
"ms" => Ok(TimeUnit::Millisecond),
"s" => Ok(TimeUnit::Second),
_ => Err(DataFusionError::Configuration(format!(
"Unknown or unsupported parquet coerce_int96: \
{str_setting}. Valid values are: ns, us, ms, and s."
))),
}
}

/// Allows easy conversion from ParquetSource to Arc&lt;dyn FileSource&gt;
impl From<ParquetSource> for Arc<dyn FileSource> {
fn from(source: ParquetSource) -> Self {
as_file_source(source)
}
}

impl FileSource for ParquetSource {
fn create_file_opener(
&self,
_object_store: Arc<dyn ObjectStore>,
_base_config: &FileScanConfig,
_partition: usize,
) -> datafusion_common::Result<Arc<dyn FileOpener>> {
internal_err!(
"ParquetSource::create_file_opener called but it supports Morsel API"
)
}

fn create_morselizer(
&self,
object_store: Arc<dyn ObjectStore>,
base_config: &FileScanConfig,
partition: usize,
) -> datafusion_common::Result<Box<dyn Morselizer>> {
let morselizer =
self.create_parquet_morselizer(object_store, base_config, partition)?;
Ok(Box::new(morselizer))
}

fn as_any(&self) -> &dyn Any {
Expand Down
24 changes: 23 additions & 1 deletion datafusion/datasource/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use std::sync::Arc;
use crate::file_groups::FileGroupPartitioner;
use crate::file_scan_config::FileScanConfig;
use crate::file_stream::FileOpener;
use crate::morsel::{FileOpenerMorselizer, Morselizer};
#[expect(deprecated)]
use crate::schema_adapter::SchemaAdapterFactory;
use datafusion_common::config::ConfigOptions;
Expand Down Expand Up @@ -63,13 +64,34 @@ pub fn as_file_source<T: FileSource + 'static>(source: T) -> Arc<dyn FileSource>
///
/// [`DataSource`]: crate::source::DataSource
pub trait FileSource: Send + Sync {
/// Creates a `dyn FileOpener` based on given parameters
/// Creates a `dyn FileOpener` based on given parameters.
///
/// `FileSource`s that implement the Morsel API should return a "Not
/// Implemented" or "Internal" error for this API.
///
/// TODO: deprecate
fn create_file_opener(
&self,
object_store: Arc<dyn ObjectStore>,
base_config: &FileScanConfig,
partition: usize,
) -> Result<Arc<dyn FileOpener>>;

/// Creates a `dyn Morselizer` based on given parameters.
///
/// The default implementation preserves existing behavior by adapting the
/// legacy [`FileOpener`] API into a [`Morselizer`]. File formats with a
/// native morsel-driven implementation should override this method to
/// return a [`Morselizer`] and not implement the [`FileOpener`] API.
fn create_morselizer(
&self,
object_store: Arc<dyn ObjectStore>,
base_config: &FileScanConfig,
partition: usize,
) -> Result<Box<dyn Morselizer>> {
let opener = self.create_file_opener(object_store, base_config, partition)?;
Ok(Box::new(FileOpenerMorselizer::new(opener)))
}
/// Any
fn as_any(&self) -> &dyn Any;

Expand Down
9 changes: 7 additions & 2 deletions datafusion/datasource/src/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -586,9 +586,14 @@ impl DataSource for FileScanConfig {

let source = self.file_source.with_batch_size(batch_size);

let opener = source.create_file_opener(object_store, self, partition)?;
let morselizer = source.create_morselizer(object_store, self, partition)?;

let stream = FileStream::new(self, partition, opener, source.metrics())?;
let stream = FileStream::new_with_morselizer(
self,
partition,
morselizer,
source.metrics(),
)?;
Ok(Box::pin(cooperative(stream)))
}

Expand Down
Loading
Loading