Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions datafusion/core/src/datasource/physical_plan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1800,6 +1800,73 @@ mod tests {
assert_eq!(page_index_pages_matched, 1);
}

#[tokio::test]
async fn parquet_page_index_exec_metrics_multiple_predicates() {
let c1: ArrayRef =
Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4)]));
let c2: ArrayRef = Arc::new(Int32Array::from(vec![
Some(11),
Some(12),
Some(13),
Some(14),
]));
let batch = create_batch(vec![("a", c1), ("b", c2)]);

// matches the last page
let filter = col("a").gt_eq(lit(3)).and(col("b").lt_eq(lit(13)));
let rt = RoundTrip::new()
.with_predicate(filter)
.with_page_index_predicate()
.round_trip(vec![batch.clone()])
.await;
let metrics = rt.parquet_exec.metrics().unwrap();

let (page_index_rows_pruned, page_index_rows_matched) =
get_pruning_metric(&metrics, "page_index_rows_pruned");
assert_eq!(page_index_rows_pruned, 2);
assert_eq!(page_index_rows_matched, 2);
let (page_index_pages_pruned, page_index_pages_matched) =
get_pruning_metric(&metrics, "page_index_pages_pruned");
assert_eq!(page_index_pages_pruned, 1);
assert_eq!(page_index_pages_matched, 1);

// matches no pages
let filter = col("a").gt_eq(lit(3)).and(col("b").lt_eq(lit(12)));
let rt = RoundTrip::new()
.with_predicate(filter)
.with_page_index_predicate()
.round_trip(vec![batch.clone()])
.await;
let metrics = rt.parquet_exec.metrics().unwrap();

let (page_index_rows_pruned, page_index_rows_matched) =
get_pruning_metric(&metrics, "page_index_rows_pruned");
assert_eq!(page_index_rows_pruned, 4);
assert_eq!(page_index_rows_matched, 0);
let (page_index_pages_pruned, page_index_pages_matched) =
get_pruning_metric(&metrics, "page_index_pages_pruned");
assert_eq!(page_index_pages_pruned, 2);
assert_eq!(page_index_pages_matched, 0);

// matches both pages
let filter = col("a").gt_eq(lit(2)).and(col("b").lt_eq(lit(13)));
let rt = RoundTrip::new()
.with_predicate(filter)
.with_page_index_predicate()
.round_trip(vec![batch.clone()])
.await;
let metrics = rt.parquet_exec.metrics().unwrap();

let (page_index_rows_pruned, page_index_rows_matched) =
get_pruning_metric(&metrics, "page_index_rows_pruned");
assert_eq!(page_index_rows_pruned, 0);
assert_eq!(page_index_rows_matched, 4);
let (page_index_pages_pruned, page_index_pages_matched) =
get_pruning_metric(&metrics, "page_index_pages_pruned");
assert_eq!(page_index_pages_pruned, 0);
assert_eq!(page_index_pages_matched, 2);
}

/// Returns a string array with contents:
/// "[Foo, null, bar, bar, bar, bar, zzz]"
fn string_batch() -> RecordBatch {
Expand Down
37 changes: 27 additions & 10 deletions datafusion/datasource-parquet/src/page_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,10 @@ impl PagePruningAccessPlanFilter {
for row_group_index in row_group_indexes {
// The selection for this particular row group
let mut overall_selection = None;
let mut total_pages_in_group = 0;
// stores the indexes of the matched pages
let mut matched_pages_in_group: Option<HashSet<usize>> = None;

for predicate in page_index_predicates {
let column = predicate
.required_columns()
Expand Down Expand Up @@ -230,19 +234,31 @@ impl PagePruningAccessPlanFilter {
file_metrics,
);

let Some((selection, total_pages, matched_pages)) = selection else {
let Some((selection, pages)) = selection else {
trace!("No pages pruned in prune_pages_in_one_row_group");
continue;
};
total_pages_select += matched_pages;
total_pages_skip += total_pages - matched_pages;

debug!(
"Use filter and page index to create RowSelection {:?} from predicate: {:?}",
&selection,
predicate.predicate_expr(),
);

total_pages_in_group = pages.len();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Codex suggested:

If every predicate skips due to a missing column index on line 239, the total_pages_in_group remains at 0 and matched_pages_in_group remains None. This causes the row group to silently contribute 0 total, 0 matched, and 0 pruned to the metrics, even though N pages will actually be scanned. While this behavior is not a new regression, it is an inaccuracy worth addressing while modifying this part of the codebase.

To fix this, total_pages_in_group should be derived from the offset index upfront as a property of the row group rather than within the predicate loop. By initializing matched_pages_in_group to (0..total_pages_in_group).collect(), an abstaining pruner will correctly report $N \to N$ matched and 0 pruned. This refactor also allows the Option<HashSet<_>> to be simplified into a plain HashSet<_> and removes the redundant assignment of total_pages_in_group inside the loop.

let matched_pages_indexes: HashSet<_> = pages
.into_iter()
.enumerate()
.filter(|x| x.1)
.map(|x| x.0)
.collect();
if let Some(ref mut m) = matched_pages_in_group {
// only keep pages that also matched in the previous predicate(s)
m.retain(|x| matched_pages_indexes.contains(x));
} else {
matched_pages_in_group = Some(matched_pages_indexes);
}

overall_selection = update_selection(overall_selection, selection);

// if the overall selection has ruled out all rows, no need to
Expand Down Expand Up @@ -278,6 +294,10 @@ impl PagePruningAccessPlanFilter {
);
}
}

let pages_matched = matched_pages_in_group.map_or(0, |m| m.len());
total_pages_select += pages_matched;
total_pages_skip += total_pages_in_group - pages_matched;
}

file_metrics.page_index_rows_pruned.add_pruned(total_skip);
Expand Down Expand Up @@ -309,8 +329,8 @@ fn update_selection(
}
}

/// Returns a [`RowSelection`] for the rows in this row group to scan, in addition to the number of
/// total and matched pages.
/// Returns a [`RowSelection`] for the rows in this row group to scan, in addition to a vec of
/// booleans that state if each page was matched (true) or not (false).
///
/// This Row Selection is formed from the page index and the predicate skips row
/// ranges that can be ruled out based on the predicate.
Expand All @@ -323,7 +343,7 @@ fn prune_pages_in_one_row_group(
converter: StatisticsConverter<'_>,
parquet_metadata: &ParquetMetaData,
metrics: &ParquetFileMetrics,
) -> Option<(RowSelection, usize, usize)> {
) -> Option<(RowSelection, Vec<bool>)> {
let pruning_stats =
PagesPruningStatistics::try_new(row_group_index, converter, parquet_metadata)?;

Expand Down Expand Up @@ -376,10 +396,7 @@ fn prune_pages_in_one_row_group(
};
vec.push(selector);

let total_pages = values.len();
let matched_pages = values.iter().filter(|v| **v).count();

Some((RowSelection::from(vec), total_pages, matched_pages))
Some((RowSelection::from(vec), values))
}

/// Implement [`PruningStatistics`] for one column's PageIndex (column_index + offset_index)
Expand Down
6 changes: 3 additions & 3 deletions datafusion/sqllogictest/test_files/explain_analyze.slt
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ explain analyze select * from cat_tracking where species > 'M' AND s >= 50 order
----
Plan with Metrics
01)SortExec: TopK(fetch=3), expr=[species@0 ASC NULLS LAST], preserve_partitioning=[false], filter=[species@0 < Nlpine Sheep], metrics=[output_rows=3]
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/explain_analyze/data.parquet]]}, projection=[species, s], file_type=parquet, predicate=species@0 > M AND s@1 >= 50 AND DynamicFilter [ species@0 < Nlpine Sheep ], pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50 AND species_null_count@1 != row_count@2 AND species_min@5 < Nlpine Sheep, required_guarantees=[], metrics=[output_rows=3, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_pages_pruned=6 total → 6 matched, limit_pruned_row_groups=0 total → 0 matched, scan_efficiency_ratio=22.13% (521/2.35 K)]
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/explain_analyze/data.parquet]]}, projection=[species, s], file_type=parquet, predicate=species@0 > M AND s@1 >= 50 AND DynamicFilter [ species@0 < Nlpine Sheep ], pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50 AND species_null_count@1 != row_count@2 AND species_min@5 < Nlpine Sheep, required_guarantees=[], metrics=[output_rows=3, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_pages_pruned=3 total → 3 matched, limit_pruned_row_groups=0 total → 0 matched, scan_efficiency_ratio=22.13% (521/2.35 K)]
Copy link
Copy Markdown
Contributor Author

@nuno-faria nuno-faria May 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The file used contains 4 row groups (1 data page each) and the query matched only 3. So the previous 6 was due to double counting by having two predicates.

Also the same reasoning applies to the remaining changes.


statement ok
reset datafusion.explain.analyze_categories;
Expand All @@ -262,7 +262,7 @@ explain analyze select * from cat_tracking where species > 'M' AND s >= 50 order
----
Plan with Metrics
01)SortExec: TopK(fetch=3), expr=[species@0 ASC NULLS LAST], preserve_partitioning=[false], filter=[species@0 < Nlpine Sheep], metrics=[output_rows=3, output_bytes=<slt:ignore>]
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/explain_analyze/data.parquet]]}, projection=[species, s], file_type=parquet, predicate=species@0 > M AND s@1 >= 50 AND DynamicFilter [ species@0 < Nlpine Sheep ], pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50 AND species_null_count@1 != row_count@2 AND species_min@5 < Nlpine Sheep, required_guarantees=[], metrics=[output_rows=3, output_bytes=<slt:ignore>, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_pages_pruned=6 total → 6 matched, limit_pruned_row_groups=0 total → 0 matched, bytes_scanned=<slt:ignore>, scan_efficiency_ratio=<slt:ignore>]
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/explain_analyze/data.parquet]]}, projection=[species, s], file_type=parquet, predicate=species@0 > M AND s@1 >= 50 AND DynamicFilter [ species@0 < Nlpine Sheep ], pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50 AND species_null_count@1 != row_count@2 AND species_min@5 < Nlpine Sheep, required_guarantees=[], metrics=[output_rows=3, output_bytes=<slt:ignore>, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_pages_pruned=3 total → 3 matched, limit_pruned_row_groups=0 total → 0 matched, bytes_scanned=<slt:ignore>, scan_efficiency_ratio=<slt:ignore>]

statement ok
reset datafusion.explain.analyze_categories;
Expand All @@ -277,7 +277,7 @@ explain analyze select * from cat_tracking where species > 'M' AND s >= 50 order
----
Plan with Metrics
01)SortExec: TopK(fetch=3), expr=[species@0 ASC NULLS LAST], preserve_partitioning=[false], filter=[species@0 < Nlpine Sheep], metrics=[output_rows=3, output_bytes=<slt:ignore>]
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/explain_analyze/data.parquet]]}, projection=[species, s], file_type=parquet, predicate=species@0 > M AND s@1 >= 50 AND DynamicFilter [ species@0 < Nlpine Sheep ], pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50 AND species_null_count@1 != row_count@2 AND species_min@5 < Nlpine Sheep, required_guarantees=[], metrics=[output_rows=3, output_bytes=<slt:ignore>, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_pages_pruned=6 total → 6 matched, limit_pruned_row_groups=0 total → 0 matched, bytes_scanned=<slt:ignore>, scan_efficiency_ratio=<slt:ignore>]
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/explain_analyze/data.parquet]]}, projection=[species, s], file_type=parquet, predicate=species@0 > M AND s@1 >= 50 AND DynamicFilter [ species@0 < Nlpine Sheep ], pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50 AND species_null_count@1 != row_count@2 AND species_min@5 < Nlpine Sheep, required_guarantees=[], metrics=[output_rows=3, output_bytes=<slt:ignore>, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_pages_pruned=3 total → 3 matched, limit_pruned_row_groups=0 total → 0 matched, bytes_scanned=<slt:ignore>, scan_efficiency_ratio=<slt:ignore>]

statement ok
reset datafusion.explain.analyze_categories;
Expand Down
4 changes: 2 additions & 2 deletions datafusion/sqllogictest/test_files/limit_pruning.slt
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ set datafusion.explain.analyze_level = summary;
query TT
explain analyze select * from tracking_data where species > 'M' AND s >= 50 limit 3;
----
Plan with Metrics DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit_pruning/data.parquet]]}, projection=[species, s], limit=3, file_type=parquet, predicate=species@0 > M AND s@1 >= 50, pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50, required_guarantees=[], metrics=[output_rows=3, elapsed_compute=<slt:ignore>, output_bytes=<slt:ignore>, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_pages_pruned=2 total → 2 matched, limit_pruned_row_groups=2 total → 0 matched, bytes_scanned=<slt:ignore>, metadata_load_time=<slt:ignore>, scan_efficiency_ratio=<slt:ignore> (171/2.35 K)]
Plan with Metrics DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit_pruning/data.parquet]]}, projection=[species, s], limit=3, file_type=parquet, predicate=species@0 > M AND s@1 >= 50, pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50, required_guarantees=[], metrics=[output_rows=3, elapsed_compute=<slt:ignore>, output_bytes=<slt:ignore>, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_pages_pruned=1 total → 1 matched, limit_pruned_row_groups=2 total → 0 matched, bytes_scanned=<slt:ignore>, metadata_load_time=<slt:ignore>, scan_efficiency_ratio=<slt:ignore> (171/2.35 K)]

# limit_pruned_row_groups=0 total → 0 matched
# because of order by, scan needs to preserve sort, so limit pruning is disabled
Expand All @@ -72,7 +72,7 @@ explain analyze select * from tracking_data where species > 'M' AND s >= 50 orde
----
Plan with Metrics
01)SortExec: TopK(fetch=3), expr=[species@0 ASC NULLS LAST], preserve_partitioning=[false], filter=[species@0 < Nlpine Sheep], metrics=[output_rows=3, elapsed_compute=<slt:ignore>, output_bytes=<slt:ignore>]
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit_pruning/data.parquet]]}, projection=[species, s], file_type=parquet, predicate=species@0 > M AND s@1 >= 50 AND DynamicFilter [ species@0 < Nlpine Sheep ], pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50 AND species_null_count@1 != row_count@2 AND species_min@5 < Nlpine Sheep, required_guarantees=[], metrics=[output_rows=3, elapsed_compute=<slt:ignore>, output_bytes=<slt:ignore>, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_pages_pruned=6 total → 6 matched, limit_pruned_row_groups=0 total → 0 matched, bytes_scanned=<slt:ignore>, metadata_load_time=<slt:ignore>, scan_efficiency_ratio=<slt:ignore> (521/2.35 K)]
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit_pruning/data.parquet]]}, projection=[species, s], file_type=parquet, predicate=species@0 > M AND s@1 >= 50 AND DynamicFilter [ species@0 < Nlpine Sheep ], pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50 AND species_null_count@1 != row_count@2 AND species_min@5 < Nlpine Sheep, required_guarantees=[], metrics=[output_rows=3, elapsed_compute=<slt:ignore>, output_bytes=<slt:ignore>, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_pages_pruned=3 total → 3 matched, limit_pruned_row_groups=0 total → 0 matched, bytes_scanned=<slt:ignore>, metadata_load_time=<slt:ignore>, scan_efficiency_ratio=<slt:ignore> (521/2.35 K)]

statement ok
drop table tracking_data;
Expand Down
Loading