Skip to content

Commit 1d8ee0a

Browse files
committed
extract filter preparation
1 parent 8ad7685 commit 1d8ee0a

1 file changed

Lines changed: 162 additions & 94 deletions

File tree

datafusion/datasource-parquet/src/opener.rs

Lines changed: 162 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,16 @@ struct FiltersPreparedParquetOpen {
224224
page_pruning_predicate: Option<Arc<PagePruningAccessPlanFilter>>,
225225
}
226226

227+
/// Result of CPU-only row-group pruning using parquet metadata.
228+
///
229+
/// This captures the row groups that remain after range, statistics, and
230+
/// limit-based pruning so the next async step can optionally load and apply
231+
/// bloom filters before the final stream is built.
232+
struct RowGroupsPreparedParquetOpen {
233+
prepared: FiltersPreparedParquetOpen,
234+
row_groups: RowGroupAccessPlanFilter,
235+
}
236+
227237
impl ParquetMorselizerState {
228238
/// Perform the CPU-only setup for opening a parquet file.
229239
fn prepare_open_file(
@@ -533,16 +543,123 @@ impl ParquetMorselizerState {
533543
Ok(prepared)
534544
}
535545

536-
/// Continue opening once metadata-derived filters and page indexes are ready.
537-
async fn execute_metadata_loaded_open(
546+
/// Prune row groups using file ranges and parquet metadata.
547+
fn prepare_row_groups(
538548
prepared: FiltersPreparedParquetOpen,
549+
) -> Result<RowGroupsPreparedParquetOpen> {
550+
let loaded = &prepared.loaded;
551+
let inner = &loaded.prepared;
552+
let state = &inner.state;
553+
554+
let file_metadata = Arc::clone(loaded.reader_metadata.metadata());
555+
let rg_metadata = file_metadata.row_groups();
556+
let file_name = inner.partitioned_file.object_meta.location.to_string();
557+
let pruning_pred = prepared.pruning_predicate.as_ref().map(|p| p.as_ref());
558+
559+
let access_plan =
560+
create_initial_plan(&file_name, inner.extensions.clone(), rg_metadata.len())?;
561+
let mut row_groups = RowGroupAccessPlanFilter::new(access_plan);
562+
563+
if let Some(range) = inner.file_range.as_ref() {
564+
row_groups.prune_by_range(rg_metadata, range);
565+
}
566+
567+
if let Some(pruning_pred) = pruning_pred {
568+
if state.enable_row_group_stats_pruning {
569+
row_groups.prune_by_statistics(
570+
&prepared.physical_file_schema,
571+
loaded.reader_metadata.parquet_schema(),
572+
rg_metadata,
573+
pruning_pred,
574+
&inner.file_metrics,
575+
);
576+
} else {
577+
inner
578+
.file_metrics
579+
.row_groups_pruned_statistics
580+
.add_matched(row_groups.remaining_row_group_count());
581+
}
582+
583+
if !(state.enable_bloom_filter && !row_groups.is_empty()) {
584+
inner
585+
.file_metrics
586+
.row_groups_pruned_bloom_filter
587+
.add_matched(row_groups.remaining_row_group_count());
588+
}
589+
} else {
590+
let n_remaining_row_groups = row_groups.remaining_row_group_count();
591+
inner
592+
.file_metrics
593+
.row_groups_pruned_statistics
594+
.add_matched(n_remaining_row_groups);
595+
inner
596+
.file_metrics
597+
.row_groups_pruned_bloom_filter
598+
.add_matched(n_remaining_row_groups);
599+
}
600+
601+
Ok(RowGroupsPreparedParquetOpen {
602+
prepared,
603+
row_groups,
604+
})
605+
}
606+
607+
/// Apply bloom filter pruning when it is enabled and a pruning predicate exists.
608+
async fn prune_prepared_bloom_filters(
609+
mut prepared: RowGroupsPreparedParquetOpen,
610+
) -> Result<RowGroupsPreparedParquetOpen> {
611+
let loaded = &mut prepared.prepared.loaded;
612+
let inner = &mut loaded.prepared;
613+
let state = &inner.state;
614+
let pruning_pred = prepared
615+
.prepared
616+
.pruning_predicate
617+
.as_ref()
618+
.map(|p| p.as_ref());
619+
620+
if let Some(pruning_pred) = pruning_pred
621+
&& state.enable_bloom_filter
622+
&& !prepared.row_groups.is_empty()
623+
{
624+
let bf_reader = mem::replace(
625+
&mut inner.async_file_reader,
626+
state.parquet_file_reader_factory.create_reader(
627+
state.partition_index,
628+
inner.partitioned_file.clone(),
629+
inner.metadata_size_hint,
630+
&state.metrics,
631+
)?,
632+
);
633+
let mut bf_builder = ParquetRecordBatchStreamBuilder::new_with_metadata(
634+
bf_reader,
635+
loaded.reader_metadata.clone(),
636+
);
637+
prepared.row_groups.prune_by_bloom_filters(
638+
&prepared.prepared.physical_file_schema,
639+
&mut bf_builder,
640+
pruning_pred,
641+
&inner.file_metrics,
642+
)
643+
.await;
644+
}
645+
646+
Ok(prepared)
647+
}
648+
649+
/// Build the final parquet stream once all pruning work is complete.
650+
fn build_stream_reader(
651+
prepared: RowGroupsPreparedParquetOpen,
539652
) -> Result<BoxStream<'static, Result<RecordBatch>>> {
653+
let RowGroupsPreparedParquetOpen {
654+
prepared,
655+
mut row_groups,
656+
} = prepared;
540657
let FiltersPreparedParquetOpen {
541658
loaded,
542659
physical_file_schema,
543660
projection,
544661
predicate,
545-
pruning_predicate,
662+
pruning_predicate: _,
546663
page_pruning_predicate,
547664
} = prepared;
548665
let MetadataLoadedParquetOpen {
@@ -552,13 +669,13 @@ impl ParquetMorselizerState {
552669
} = loaded;
553670
let PreparedParquetOpen {
554671
state,
555-
partitioned_file,
556-
file_range,
557-
extensions,
672+
partitioned_file: _,
673+
file_range: _,
674+
extensions: _,
558675
file_metrics,
559676
file_pruner,
560-
metadata_size_hint,
561-
mut async_file_reader,
677+
metadata_size_hint: _,
678+
async_file_reader,
562679
logical_file_schema: _,
563680
output_schema,
564681
projection: _,
@@ -567,99 +684,17 @@ impl ParquetMorselizerState {
567684
file_decryption_properties: _,
568685
} = prepared;
569686

570-
let file_name = partitioned_file.object_meta.location.to_string();
571687
let batch_size = state.batch_size;
572688
let reorder_predicates = state.reorder_filters;
573689
let pushdown_filters = state.pushdown_filters;
574690
let force_filter_selections = state.force_filter_selections;
575-
let enable_bloom_filter = state.enable_bloom_filter;
576-
let enable_row_group_stats_pruning = state.enable_row_group_stats_pruning;
577691
let limit = state.limit;
578-
let parquet_file_reader_factory = Arc::clone(&state.parquet_file_reader_factory);
579-
let partition_index = state.partition_index;
580-
let metrics = state.metrics.clone();
581692
let max_predicate_cache_size = state.max_predicate_cache_size;
582693
let reverse_row_groups = state.reverse_row_groups;
583694
let preserve_order = state.preserve_order;
584-
585-
// ------------------------------------------------------------
586-
// Step: prune row groups by range, predicate and bloom filter
587-
// ------------------------------------------------------------
588-
589-
// Determine which row groups to actually read. The idea is to skip
590-
// as many row groups as possible based on the metadata and query
591695
let file_metadata = Arc::clone(reader_metadata.metadata());
592-
let pruning_pred = pruning_predicate.as_ref().map(|p| p.as_ref());
593696
let rg_metadata = file_metadata.row_groups();
594-
// track which row groups to actually read
595-
let access_plan = create_initial_plan(&file_name, extensions, rg_metadata.len())?;
596-
let mut row_groups = RowGroupAccessPlanFilter::new(access_plan);
597-
// if there is a range restricting what parts of the file to read
598-
if let Some(range) = file_range.as_ref() {
599-
row_groups.prune_by_range(rg_metadata, range);
600-
}
601-
602-
// If there is a predicate that can be evaluated against the metadata
603-
if let Some(pruning_pred) = pruning_pred.as_ref() {
604-
if enable_row_group_stats_pruning {
605-
row_groups.prune_by_statistics(
606-
&physical_file_schema,
607-
reader_metadata.parquet_schema(),
608-
rg_metadata,
609-
pruning_pred,
610-
&file_metrics,
611-
);
612-
} else {
613-
// Update metrics: statistics unavailable, so all row groups are
614-
// matched (not pruned)
615-
file_metrics
616-
.row_groups_pruned_statistics
617-
.add_matched(row_groups.remaining_row_group_count());
618-
}
619697

620-
if enable_bloom_filter && !row_groups.is_empty() {
621-
// Use the existing reader for bloom filter I/O;
622-
// replace with a fresh reader for decoding below.
623-
let bf_reader = mem::replace(
624-
&mut async_file_reader,
625-
parquet_file_reader_factory.create_reader(
626-
partition_index,
627-
partitioned_file.clone(),
628-
metadata_size_hint,
629-
&metrics,
630-
)?,
631-
);
632-
let mut bf_builder = ParquetRecordBatchStreamBuilder::new_with_metadata(
633-
bf_reader,
634-
reader_metadata.clone(),
635-
);
636-
row_groups
637-
.prune_by_bloom_filters(
638-
&physical_file_schema,
639-
&mut bf_builder,
640-
pruning_pred,
641-
&file_metrics,
642-
)
643-
.await;
644-
} else {
645-
// Update metrics: bloom filter unavailable, so all row groups are
646-
// matched (not pruned)
647-
file_metrics
648-
.row_groups_pruned_bloom_filter
649-
.add_matched(row_groups.remaining_row_group_count());
650-
}
651-
} else {
652-
// Update metrics: no predicate, so all row groups are matched (not pruned)
653-
let n_remaining_row_groups = row_groups.remaining_row_group_count();
654-
file_metrics
655-
.row_groups_pruned_statistics
656-
.add_matched(n_remaining_row_groups);
657-
file_metrics
658-
.row_groups_pruned_bloom_filter
659-
.add_matched(n_remaining_row_groups);
660-
}
661-
662-
// Prune by limit if limit is set and limit order is not sensitive
663698
if let (Some(limit), false) = (limit, preserve_order) {
664699
row_groups.prune_by_limit(limit, rg_metadata, &file_metrics);
665700
}
@@ -838,6 +873,8 @@ enum ReadyState {
838873
PrepareFilters(Box<MetadataLoadedParquetOpen>),
839874
/// Planner has prepared filters and can request any missing page index data.
840875
Prepared(Box<FiltersPreparedParquetOpen>),
876+
/// Planner has prepared row-group pruning and can optionally load bloom filters.
877+
BuildStream(Box<RowGroupsPreparedParquetOpen>),
841878
/// Planner has a fully prepared stream ready to wrap as a morsel.
842879
EmitMorsel(BoxStream<'static, Result<RecordBatch>>),
843880
}
@@ -864,6 +901,10 @@ impl ReadyState {
864901
Self::Prepared(prepared.into())
865902
}
866903

904+
fn build_stream(prepared: RowGroupsPreparedParquetOpen) -> Self {
905+
Self::BuildStream(prepared.into())
906+
}
907+
867908
fn emit_morsel(stream: BoxStream<'static, Result<RecordBatch>>) -> Self {
868909
Self::EmitMorsel(stream)
869910
}
@@ -912,6 +953,7 @@ impl ParquetMorselPlannerState {
912953
ReadyState::PruneFiles(_) => "Ready(PruneFiles)",
913954
ReadyState::PrepareFilters(_) => "Ready(PrepareFilters)",
914955
ReadyState::Prepared(_) => "Ready(Prepared)",
956+
ReadyState::BuildStream(_) => "Ready(BuildStream)",
915957
ReadyState::EmitMorsel(_) => "Ready(EmitMorsel)",
916958
},
917959
Self::WaitingIo(_) => "WaitingIo",
@@ -1039,11 +1081,37 @@ impl MorselPlanner for ParquetMorselPlanner {
10391081
let prepared =
10401082
ParquetMorselizerState::load_prepared_page_index(*prepared)
10411083
.await?;
1042-
let stream =
1043-
ParquetMorselizerState::execute_metadata_loaded_open(prepared)
1044-
.await?;
1045-
Ok(ReadyState::emit_morsel(stream))
1084+
let prepared =
1085+
ParquetMorselizerState::prepare_row_groups(prepared)?;
1086+
Ok(ReadyState::build_stream(prepared))
10461087
})),
1088+
ReadyState::BuildStream(prepared) => {
1089+
let should_prune_bloom = prepared
1090+
.prepared
1091+
.pruning_predicate
1092+
.is_some()
1093+
&& prepared.prepared.loaded.prepared.state.enable_bloom_filter
1094+
&& !prepared.row_groups.is_empty();
1095+
if should_prune_bloom {
1096+
Ok(self.schedule_io(async move {
1097+
let prepared =
1098+
ParquetMorselizerState::prune_prepared_bloom_filters(
1099+
*prepared,
1100+
)
1101+
.await?;
1102+
let stream =
1103+
ParquetMorselizerState::build_stream_reader(prepared)?;
1104+
Ok(ReadyState::emit_morsel(stream))
1105+
}))
1106+
} else {
1107+
let stream =
1108+
ParquetMorselizerState::build_stream_reader(*prepared)?;
1109+
self.state = ParquetMorselPlannerState::ready(
1110+
ReadyState::emit_morsel(stream),
1111+
);
1112+
Ok(Some(MorselPlan::new()))
1113+
}
1114+
}
10471115
ReadyState::EmitMorsel(stream) => {
10481116
let morsel = ParquetStreamMorsel::new(stream);
10491117
Ok(Some(MorselPlan::new().with_morsels(vec![Box::new(morsel)])))

0 commit comments

Comments
 (0)