From 8c2462ea1d3e355fc9df226b720f7d0e328bccca Mon Sep 17 00:00:00 2001 From: Kumar Ujjawal Date: Wed, 7 Jan 2026 16:15:41 +0530 Subject: [PATCH 1/3] refactor: Optimize required_columns from BTreeSet to Vec in struct PushdownChecker --- .../datasource-parquet/src/row_filter.rs | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/datafusion/datasource-parquet/src/row_filter.rs b/datafusion/datasource-parquet/src/row_filter.rs index 04c11b8875541..a527ef3c3bc00 100644 --- a/datafusion/datasource-parquet/src/row_filter.rs +++ b/datafusion/datasource-parquet/src/row_filter.rs @@ -276,7 +276,7 @@ struct PushdownChecker<'schema> { /// Does the expression reference any columns not present in the file schema? projected_columns: bool, /// Indices into the file schema of columns required to evaluate the expression. - required_columns: BTreeSet, + required_columns: Vec, /// Tracks the nested column behavior found during traversal. nested_behavior: NestedColumnSupport, /// Whether nested list columns are supported by the predicate semantics. @@ -290,7 +290,7 @@ impl<'schema> PushdownChecker<'schema> { Self { non_primitive_columns: false, projected_columns: false, - required_columns: BTreeSet::default(), + required_columns: Vec::new(), nested_behavior: NestedColumnSupport::PrimitiveOnly, allow_list_columns, file_schema, @@ -307,7 +307,9 @@ impl<'schema> PushdownChecker<'schema> { } }; - self.required_columns.insert(idx); + if !self.required_columns.contains(&idx) { + self.required_columns.push(idx); + } let data_type = self.file_schema.field(idx).data_type(); if DataType::is_nested(data_type) { @@ -390,9 +392,8 @@ enum NestedColumnSupport { Unsupported, } -#[derive(Debug)] struct PushdownColumns { - required_columns: BTreeSet, + required_columns: Vec, nested: NestedColumnSupport, } @@ -411,9 +412,13 @@ fn pushdown_columns( let allow_list_columns = supports_list_predicates(expr); let mut checker = PushdownChecker::new(file_schema, allow_list_columns); expr.visit(&mut checker)?; - Ok((!checker.prevents_pushdown()).then_some(PushdownColumns { - required_columns: checker.required_columns, - nested: checker.nested_behavior, + let prevents_pushdown = checker.prevents_pushdown(); + let nested = checker.nested_behavior; + let mut required_columns = checker.required_columns; + required_columns.sort_unstable(); + Ok((!prevents_pushdown).then_some(PushdownColumns { + required_columns, + nested, })) } From 2a51524885625ad0dffba7f69720282a9ed67c93 Mon Sep 17 00:00:00 2001 From: Kumar Ujjawal Date: Mon, 12 Jan 2026 16:25:21 +0530 Subject: [PATCH 2/3] use dedup and update comments --- .../datasource-parquet/src/row_filter.rs | 30 ++++++++++++------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/datafusion/datasource-parquet/src/row_filter.rs b/datafusion/datasource-parquet/src/row_filter.rs index a527ef3c3bc00..1b7b36b995e39 100644 --- a/datafusion/datasource-parquet/src/row_filter.rs +++ b/datafusion/datasource-parquet/src/row_filter.rs @@ -307,9 +307,8 @@ impl<'schema> PushdownChecker<'schema> { } }; - if !self.required_columns.contains(&idx) { - self.required_columns.push(idx); - } + // Duplicates are handled by dedup() in into_sorted_columns() + self.required_columns.push(idx); let data_type = self.file_schema.field(idx).data_type(); if DataType::is_nested(data_type) { @@ -357,6 +356,21 @@ impl<'schema> PushdownChecker<'schema> { fn prevents_pushdown(&self) -> bool { self.non_primitive_columns || self.projected_columns } + + /// Consumes the checker and returns sorted, deduplicated column indices + /// wrapped in a `PushdownColumns` struct. + /// + /// This method sorts the column indices and removes duplicates. The sort + /// is required because downstream code relies on column indices being in + /// ascending order for correct schema projection. + fn into_sorted_columns(mut self) -> PushdownColumns { + self.required_columns.sort_unstable(); + self.required_columns.dedup(); + PushdownColumns { + required_columns: self.required_columns, + nested: self.nested_behavior, + } + } } impl TreeNodeVisitor<'_> for PushdownChecker<'_> { @@ -392,6 +406,7 @@ enum NestedColumnSupport { Unsupported, } +#[derive(Debug)] struct PushdownColumns { required_columns: Vec, nested: NestedColumnSupport, @@ -412,14 +427,7 @@ fn pushdown_columns( let allow_list_columns = supports_list_predicates(expr); let mut checker = PushdownChecker::new(file_schema, allow_list_columns); expr.visit(&mut checker)?; - let prevents_pushdown = checker.prevents_pushdown(); - let nested = checker.nested_behavior; - let mut required_columns = checker.required_columns; - required_columns.sort_unstable(); - Ok((!prevents_pushdown).then_some(PushdownColumns { - required_columns, - nested, - })) + Ok((!checker.prevents_pushdown()).then(|| checker.into_sorted_columns())) } fn leaf_indices_for_roots( From 63b2ab5e7ea6e5076d181b293df80bdd88568108 Mon Sep 17 00:00:00 2001 From: Kumar Ujjawal Date: Tue, 13 Jan 2026 17:49:07 +0530 Subject: [PATCH 3/3] add comments for required_column --- datafusion/datasource-parquet/src/row_filter.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/datafusion/datasource-parquet/src/row_filter.rs b/datafusion/datasource-parquet/src/row_filter.rs index 1b7b36b995e39..bc195f1767878 100644 --- a/datafusion/datasource-parquet/src/row_filter.rs +++ b/datafusion/datasource-parquet/src/row_filter.rs @@ -406,8 +406,12 @@ enum NestedColumnSupport { Unsupported, } +/// Result of checking which columns are required for filter pushdown. #[derive(Debug)] struct PushdownColumns { + /// Sorted, unique column indices into the file schema required to evaluate + /// the filter expression. Must be in ascending order for correct schema + /// projection matching. required_columns: Vec, nested: NestedColumnSupport, }