diff --git a/datafusion/datasource-parquet/src/row_filter.rs b/datafusion/datasource-parquet/src/row_filter.rs index 0c0d516aea9ba..20f0adfd5eba6 100644 --- a/datafusion/datasource-parquet/src/row_filter.rs +++ b/datafusion/datasource-parquet/src/row_filter.rs @@ -75,7 +75,7 @@ use std::collections::BTreeSet; use std::sync::Arc; use arrow::array::BooleanArray; -use arrow::datatypes::{DataType, Schema, SchemaRef}; +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use arrow::error::{ArrowError, Result as ArrowResult}; use arrow::record_batch::RecordBatch; use datafusion_functions::core::getfield::GetFieldFunc; @@ -88,7 +88,7 @@ use datafusion_common::Result; use datafusion_common::cast::as_boolean_array; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor}; use datafusion_physical_expr::ScalarFunctionExpr; -use datafusion_physical_expr::expressions::Column; +use datafusion_physical_expr::expressions::{Column, Literal}; use datafusion_physical_expr::utils::reassign_expr_columns; use datafusion_physical_expr::{PhysicalExpr, split_conjunction}; @@ -137,7 +137,7 @@ impl DatafusionArrowPredicate { time: metrics::Time, ) -> Result { let physical_expr = - reassign_expr_columns(candidate.expr, &candidate.filter_schema)?; + reassign_expr_columns(candidate.expr, &candidate.read_plan.projected_schema)?; Ok(Self { physical_expr, @@ -146,7 +146,7 @@ impl DatafusionArrowPredicate { // can properly project and filter nested structures. projection_mask: ProjectionMask::leaves( metadata.file_metadata().schema_descr(), - candidate.projection.leaf_indices.iter().copied(), + candidate.read_plan.leaf_indices.iter().copied(), ), rows_pruned, rows_matched, @@ -199,22 +199,22 @@ pub(crate) struct FilterCandidate { required_bytes: usize, /// Can this filter use an index (e.g. a page index) to prune rows? can_use_index: bool, - /// Column indices into the parquet file schema required to evaluate this filter. - projection: LeafProjection, - /// The Arrow schema containing only the columns required by this filter, - /// projected from the file's Arrow schema. - filter_schema: SchemaRef, + /// The resolved Parquet read plan (leaf indices + projected schema). + read_plan: ParquetReadPlan, } -/// Projection specification for nested columns using Parquet leaf column indices. +/// The result of resolving which Parquet leaf columns and Arrow schema fields +/// are needed to evaluate an expression against a Parquet file /// -/// For nested types like List and Struct, Parquet stores data in leaf columns -/// (the primitive fields). This struct tracks which leaf columns are needed -/// to evaluate a filter expression. +/// This is the shared output of the column resolution pipeline used by both +/// the row filter to build `ArrowPredicate`s and the opener to build `ProjectionMask`s #[derive(Debug, Clone)] -struct LeafProjection { - /// Leaf column indices in the Parquet schema descriptor. - leaf_indices: Vec, +pub(crate) struct ParquetReadPlan { + /// Leaf column indices in the Parquet schema descriptor to decode + pub leaf_indices: Vec, + /// The projected Arrow schema containing only the columns/fields required + /// Struct types are pruned to include only the accessed sub-fields + pub projected_schema: SchemaRef, } /// Helper to build a `FilterCandidate`. @@ -246,30 +246,18 @@ impl FilterCandidateBuilder { /// * `Ok(None)` if the expression cannot be used as an ArrowFilter /// * `Err(e)` if an error occurs while building the candidate pub fn build(self, metadata: &ParquetMetaData) -> Result> { - let Some(required_columns) = pushdown_columns(&self.expr, &self.file_schema)? - else { - return Ok(None); - }; - - let root_indices: Vec<_> = - required_columns.required_columns.into_iter().collect(); - - let leaf_indices = leaf_indices_for_roots( - &root_indices, - metadata.file_metadata().schema_descr(), - ); - - let projected_schema = Arc::new(self.file_schema.project(&root_indices)?); - - let required_bytes = size_of_columns(&leaf_indices, metadata)?; - let can_use_index = columns_sorted(&leaf_indices, metadata)?; + let schema_descr = metadata.file_metadata().schema_descr(); + let read_plan = + match build_parquet_read_plan(&self.expr, &self.file_schema, schema_descr)? { + Some(plan) => plan, + None => return Ok(None), + }; Ok(Some(FilterCandidate { expr: self.expr, - required_bytes, - can_use_index, - projection: LeafProjection { leaf_indices }, - filter_schema: projected_schema, + required_bytes: size_of_columns(&read_plan.leaf_indices, metadata)?, + can_use_index: columns_sorted(&read_plan.leaf_indices, metadata)?, + read_plan, })) } } @@ -290,7 +278,10 @@ struct PushdownChecker<'schema> { /// Does the expression reference any columns not present in the file schema? projected_columns: bool, /// Indices into the file schema of columns required to evaluate the expression. + /// Does not include struct columns accessed via `get_field`. required_columns: Vec, + /// Struct field accesses via `get_field`. + struct_field_accesses: Vec, /// Whether nested list columns are supported by the predicate semantics. allow_list_columns: bool, /// The Arrow schema of the parquet file. @@ -303,6 +294,7 @@ impl<'schema> PushdownChecker<'schema> { non_primitive_columns: false, projected_columns: false, required_columns: Vec::new(), + struct_field_accesses: Vec::new(), allow_list_columns, file_schema, } @@ -330,16 +322,17 @@ impl<'schema> PushdownChecker<'schema> { fn check_struct_field_column( &mut self, column_name: &str, + field_path: Vec, ) -> Option { - let idx = match self.file_schema.index_of(column_name) { - Ok(idx) => idx, - Err(_) => { - self.projected_columns = true; - return Some(TreeNodeRecursion::Jump); - } + let Ok(idx) = self.file_schema.index_of(column_name) else { + self.projected_columns = true; + return Some(TreeNodeRecursion::Jump); }; - self.required_columns.push(idx); + self.struct_field_accesses.push(StructFieldAccess { + root_index: idx, + field_path, + }); None } @@ -410,6 +403,7 @@ impl<'schema> PushdownChecker<'schema> { self.required_columns.dedup(); PushdownColumns { required_columns: self.required_columns, + struct_field_accesses: self.struct_field_accesses, } } } @@ -448,10 +442,39 @@ impl TreeNodeVisitor<'_> for PushdownChecker<'_> { .and_then(|a| a.as_any().downcast_ref::()) { let return_type = func.return_type(); - if !DataType::is_nested(return_type) { - if let Some(recursion) = self.check_struct_field_column(column.name()) - { - return Ok(recursion); + + if !DataType::is_nested(return_type) + || self.is_nested_type_supported(return_type) + { + // try to resolve all field name arguments to strinrg literals + // if any argument is not a string literal, we can not determine the exact + // leaf path so we fall back to reading the entire struct root column + let field_path = args[1..] + .iter() + .map(|arg| { + arg.as_any().downcast_ref::().and_then(|lit| { + lit.value().try_as_str().flatten().map(|s| s.to_string()) + }) + }) + .collect(); + + match field_path { + Some(path) => { + if let Some(recursion) = + self.check_struct_field_column(column.name(), path) + { + return Ok(recursion); + } + } + None => { + // Could not resolve field path — fall back to + // reading the entire struct root column. + if let Some(recursion) = + self.check_single_column(column.name()) + { + return Ok(recursion); + } + } } return Ok(TreeNodeRecursion::Jump); @@ -478,8 +501,24 @@ impl TreeNodeVisitor<'_> for PushdownChecker<'_> { struct PushdownColumns { /// Sorted, unique column indices into the file schema required to evaluate /// the filter expression. Must be in ascending order for correct schema - /// projection matching. + /// projection matching. Does not include struct columns accessed via `get_field`. required_columns: Vec, + /// Struct field accesses via `get_field`. Each entry records the root struct + /// column index and the field path being accessed. + struct_field_accesses: Vec, +} + +/// Records a struct field access via `get_field(struct_col, 'field1', 'field2', ...)`. +/// +/// This allows the row filter to project only the specific Parquet leaf columns +/// needed by the filter, rather than all leaves of the struct. +#[derive(Debug, Clone)] +struct StructFieldAccess { + /// Arrow root column index of the struct in the file schema. + root_index: usize, + /// Field names forming the path into the struct. + /// e.g., `["value"]` for `s['value']`, `["outer", "inner"]` for `s['outer']['inner']`. + field_path: Vec, } /// Checks if a given expression can be pushed down to the parquet decoder. @@ -500,17 +539,64 @@ fn pushdown_columns( Ok((!checker.prevents_pushdown()).then(|| checker.into_sorted_columns())) } -fn leaf_indices_for_roots( - root_indices: &[usize], +/// Resolves which Parquet leaf columns and Arrow schema fields are needed +/// to evaluate `expr` against a Parquet file +/// +/// Returns `Ok(Some(plan))` when the expression can be evaluated using only +/// pushdown-compatible columns. `Ok(None)` when it can not (it references +/// whole struct columns or columns missing from disk) +/// +/// Note: this is a shared entry point used by both row filter construction and +/// the opener's projection logic +pub(crate) fn build_parquet_read_plan( + expr: &Arc, + file_schema: &Schema, schema_descr: &SchemaDescriptor, -) -> Vec { +) -> Result> { + let Some(required_columns) = pushdown_columns(expr, file_schema)? else { + return Ok(None); + }; + + let root_indices = &required_columns.required_columns; + + let mut leaf_indices = + leaf_indices_for_roots(root_indices.iter().copied(), schema_descr); + + let struct_leaf_indices = resolve_struct_field_leaves( + &required_columns.struct_field_accesses, + file_schema, + schema_descr, + ); + leaf_indices.extend_from_slice(&struct_leaf_indices); + leaf_indices.sort_unstable(); + leaf_indices.dedup(); + + let projected_schema = build_filter_schema( + file_schema, + root_indices, + &required_columns.struct_field_accesses, + ); + + Ok(Some(ParquetReadPlan { + leaf_indices, + projected_schema, + })) +} + +fn leaf_indices_for_roots( + root_indices: I, + schema_descr: &SchemaDescriptor, +) -> Vec +where + I: IntoIterator, +{ // Always map root (Arrow) indices to Parquet leaf indices via the schema // descriptor. Arrow root indices only equal Parquet leaf indices when the // schema has no group columns (Struct, Map, etc.); when group columns // exist, their children become separate leaves and shift all subsequent // leaf indices. // Struct columns are unsupported. - let root_set: BTreeSet<_> = root_indices.iter().copied().collect(); + let root_set: BTreeSet<_> = root_indices.into_iter().collect(); (0..schema_descr.num_columns()) .filter(|leaf_idx| { @@ -519,6 +605,149 @@ fn leaf_indices_for_roots( .collect() } +/// Resolves struct field access to specific Parquet leaf column indices +/// +/// For every `StructFieldAccess`, finds the leaf columns in the Parquet schema +/// whose path matches the struct root name + field path. This avoids reading all +/// leaves of a struct when only specific fields are needed +fn resolve_struct_field_leaves( + accesses: &[StructFieldAccess], + file_schema: &Schema, + schema_descr: &SchemaDescriptor, +) -> Vec { + let mut leaf_indices = Vec::new(); + + for access in accesses { + let root_name = file_schema.field(access.root_index).name(); + let prefix = std::iter::once(root_name.as_str()) + .chain(access.field_path.iter().map(|p| p.as_str())) + .collect::>(); + + for leaf_idx in 0..schema_descr.num_columns() { + let col = schema_descr.column(leaf_idx); + let col_path = col.path().parts(); + + // A leaf matches if its path starts with our prefix. + // e.g., prefix=["s", "value"] matches leaf path ["s", "value"] + // prefix=["s", "outer"] matches ["s", "outer", "inner"] + + // a leaf matches iff its path starts with our prefix + // for example: prefix=["s", "value"] matches leaf path ["s", "value"] + // prefix=["s", "outer"] matches ["s", "outer", "inner"] + let leaf_matches_path = col_path.len() >= prefix.len() + && col_path.iter().zip(prefix.iter()).all(|(a, b)| a == b); + + if leaf_matches_path { + leaf_indices.push(leaf_idx); + } + } + } + + leaf_indices +} + +/// Builds a filter schema that includes only the fields actually accessed by the +/// filter expression. +/// +/// For regular (non-struct) columns, the full field type is used. +/// For struct columns accessed via `get_field`, a pruned struct type is created +/// containing only the fields along the access path. Note: it must match the schema +/// that the Parquet reader produces when projecting specific struct leaves +fn build_filter_schema( + file_schema: &Schema, + regular_indices: &[usize], + struct_field_accesses: &[StructFieldAccess], +) -> SchemaRef { + let all_indices = regular_indices + .iter() + .copied() + .chain( + struct_field_accesses + .iter() + .map(|&StructFieldAccess { root_index, .. }| root_index), + ) + .collect::>(); + + let fields = all_indices + .iter() + .map(|&idx| { + let field = file_schema.field(idx); + + // collect all field paths that access this root struct column + let field_paths = struct_field_accesses + .iter() + .filter_map( + |&StructFieldAccess { + root_index, + ref field_path, + }| { + (root_index == idx).then_some(field_path.as_slice()) + }, + ) + .collect::>(); + + if field_paths.is_empty() { + // its a regular column - use the full type + return Arc::new(field.clone()); + } + + let pruned_data_type = prune_struct_type(field.data_type(), &field_paths); + Arc::new(Field::new( + field.name(), + pruned_data_type, + field.is_nullable(), + )) + }) + .collect::>(); + + Arc::new(Schema::new(fields)) +} + +fn prune_struct_type(dt: &DataType, paths: &[&[String]]) -> DataType { + let DataType::Struct(fields) = dt else { + return dt.clone(); + }; + + let needed = paths + .iter() + .filter_map(|p| p.first().map(|s| s.as_str())) + .collect::>(); + + let pruned_fields = fields + .iter() + .filter_map(|f| { + if !needed.contains(f.name().as_str()) { + return None; + } + + let sub_paths = paths + .iter() + .filter_map(|path| { + if path.first().map(|s| s.as_str()) == Some(f.name()) { + Some(&path[1..]) + } else { + None + } + }) + .filter(|sub| !sub.is_empty()) + .collect::>(); + + let out = if sub_paths.is_empty() { + // Leaf of access path — keep the field as-is. + Arc::clone(f) + } else { + // Recurse into nested struct. + let pruned = prune_struct_type(f.data_type(), &sub_paths); + Arc::new(Field::new(f.name(), pruned, f.is_nullable())) + }; + + Some(out) + }) + .collect::>(); + + DataType::Struct(pruned_fields.into()) +} + /// Checks if a predicate expression can be pushed down to the parquet decoder. /// /// Returns `true` if all columns referenced by the expression: @@ -722,11 +951,15 @@ pub fn build_row_filter( #[cfg(test)] mod test { use super::*; + use arrow::datatypes::Fields; use datafusion_common::ScalarValue; - use arrow::array::{ListBuilder, StringBuilder}; + use arrow::array::{ + Int32Array, ListBuilder, StringArray, StringBuilder, StructArray, + }; use arrow::datatypes::{Field, TimeUnit::Nanosecond}; use datafusion_expr::{Expr, col}; + use datafusion_functions::core::get_field; use datafusion_functions_nested::array_has::{ array_has_all_udf, array_has_any_udf, array_has_udf, }; @@ -774,7 +1007,7 @@ mod test { .expect("building candidate") .expect("list pushdown should be supported"); - assert_eq!(candidate.projection.leaf_indices, vec![list_index]); + assert_eq!(candidate.read_plan.leaf_indices, vec![list_index]); } #[test] @@ -1232,7 +1465,7 @@ mod test { // col_b is Parquet leaf 3 (shifted by struct_col's two children). assert_eq!( - candidate.projection.leaf_indices, + candidate.read_plan.leaf_indices, vec![3], "leaf_indices should be [3] for col_b" ); @@ -1250,7 +1483,7 @@ mod test { )])); // get_field(struct_col, 'a') > 5 - let get_field_expr = datafusion_functions::core::get_field().call(vec![ + let get_field_expr = get_field().call(vec![ col("struct_col"), Expr::Literal(ScalarValue::Utf8(Some("a".to_string())), None), ]); @@ -1275,7 +1508,7 @@ mod test { )])); // get_field(struct_col, 'nested') IS NOT NULL — the leaf is still a struct - let get_field_expr = datafusion_functions::core::get_field().call(vec![ + let get_field_expr = get_field().call(vec![ col("struct_col"), Expr::Literal(ScalarValue::Utf8(Some("nested".to_string())), None), ]); @@ -1285,6 +1518,41 @@ mod test { assert!(!can_expr_be_pushed_down_with_schemas(&expr, &table_schema)); } + /// get_field returning a list inside a struct should allow pushdown when + /// wrapped in a supported list predicate like `array_has_any`. + /// e.g. `array_has_any(get_field(s, 'items'), make_array('x'))` + #[test] + fn get_field_list_leaf_with_array_predicate_allows_pushdown() { + let item_field = Arc::new(Field::new("item", DataType::Utf8, true)); + let table_schema = Arc::new(Schema::new(vec![Field::new( + "s", + DataType::Struct( + vec![ + Arc::new(Field::new("id", DataType::Int32, true)), + Arc::new(Field::new("items", DataType::List(item_field), true)), + ] + .into(), + ), + true, + )])); + + // array_has_any(get_field(s, 'items'), make_array('x')) + let get_field_expr = get_field().call(vec![ + col("s"), + Expr::Literal(ScalarValue::Utf8(Some("items".to_string())), None), + ]); + let expr = array_has_any( + get_field_expr, + make_array(vec![Expr::Literal( + ScalarValue::Utf8(Some("x".to_string())), + None, + )]), + ); + let expr = logical2physical(&expr, &table_schema); + + assert!(can_expr_be_pushed_down_with_schemas(&expr, &table_schema)); + } + /// get_field on a struct produces correct Parquet leaf indices. #[test] fn get_field_filter_candidate_has_correct_leaf_indices() { @@ -1292,7 +1560,7 @@ mod test { // Schema: id (Int32), s (Struct{value: Int32, label: Utf8}) // Parquet leaves: id=0, s.value=1, s.label=2 - let struct_fields: arrow::datatypes::Fields = vec![ + let struct_fields: Fields = vec![ Arc::new(Field::new("value", DataType::Int32, false)), Arc::new(Field::new("label", DataType::Utf8, false)), ] @@ -1332,7 +1600,7 @@ mod test { let file_schema = builder.schema().clone(); // get_field(s, 'value') > 5 - let get_field_expr = datafusion_functions::core::get_field().call(vec![ + let get_field_expr = get_field().call(vec![ col("s"), Expr::Literal(ScalarValue::Utf8(Some("value".to_string())), None), ]); @@ -1344,12 +1612,12 @@ mod test { .expect("building candidate") .expect("get_field filter on struct should be pushable"); - // The root column is s (Arrow index 1), which expands to Parquet - // leaves 1 (s.value) and 2 (s.label). + // The filter accesses only s.value, so only Parquet leaf 1 is needed. + // Leaf 2 (s.label) is not read, reducing unnecessary I/O. assert_eq!( - candidate.projection.leaf_indices, - vec![1, 2], - "leaf_indices should contain both leaves of struct s" + candidate.read_plan.leaf_indices, + vec![1], + "leaf_indices should contain only the accessed struct field leaf" ); } @@ -1374,7 +1642,7 @@ mod test { )])); // s['outer']['inner'] > 5 - let get_field_expr = datafusion_functions::core::get_field().call(vec![ + let get_field_expr = get_field().call(vec![ col("s"), Expr::Literal(ScalarValue::Utf8(Some("outer".to_string())), None), Expr::Literal(ScalarValue::Utf8(Some("inner".to_string())), None), @@ -1389,17 +1657,23 @@ mod test { /// and the filter actually works against a Parquet file. #[test] fn get_field_deeply_nested_filter_candidate() { - use arrow::array::{Int32Array, StructArray}; - - // Schema: id (Int32), s (Struct{outer: Struct{inner: Int32}}) - // Parquet leaves: id=0, s.outer.inner=1 - let inner_fields: arrow::datatypes::Fields = - vec![Arc::new(Field::new("inner", DataType::Int32, false))].into(); - let outer_fields: arrow::datatypes::Fields = vec![Arc::new(Field::new( - "outer", - DataType::Struct(inner_fields.clone()), - false, - ))] + use arrow::array::{Int32Array, StringArray, StructArray}; + + // Schema: id (Int32), s (Struct{outer: Struct{extra: Int32, inner: Int32}, tag: Utf8}) + // Parquet leaves: id=0, s.outer.extra=1, s.outer.inner=2, s.tag=3 + let inner_fields: Fields = vec![ + Arc::new(Field::new("extra", DataType::Int32, false)), + Arc::new(Field::new("inner", DataType::Int32, false)), + ] + .into(); + let outer_fields: Fields = vec![ + Arc::new(Field::new( + "outer", + DataType::Struct(inner_fields.clone()), + false, + )), + Arc::new(Field::new("tag", DataType::Utf8, false)), + ] .into(); let schema = Arc::new(Schema::new(vec![ Field::new("id", DataType::Int32, false), @@ -1408,11 +1682,20 @@ mod test { let inner_struct = StructArray::new( inner_fields, - vec![Arc::new(Int32Array::from(vec![10, 20, 30])) as _], + vec![ + Arc::new(Int32Array::from(vec![100, 200, 300])) as _, + Arc::new(Int32Array::from(vec![10, 20, 30])) as _, + ], + None, + ); + let outer_struct = StructArray::new( + outer_fields, + vec![ + Arc::new(inner_struct) as _, + Arc::new(StringArray::from(vec!["x", "y", "z"])) as _, + ], None, ); - let outer_struct = - StructArray::new(outer_fields, vec![Arc::new(inner_struct) as _], None); let batch = RecordBatch::try_new( Arc::clone(&schema), vec![ @@ -1435,11 +1718,12 @@ mod test { let metadata = builder.metadata().clone(); let file_schema = builder.schema().clone(); - // Parquet should have 2 leaves: id=0, s.outer.inner=1 - assert_eq!(metadata.file_metadata().schema_descr().num_columns(), 2); + // Parquet should have 4 leaves: id=0, s.outer.extra=1, s.outer.inner=2, s.tag=3 + assert_eq!(metadata.file_metadata().schema_descr().num_columns(), 4); // get_field(s, 'outer', 'inner') > 15 - let get_field_expr = datafusion_functions::core::get_field().call(vec![ + // Should only need leaf 2 (s.outer.inner), not leaf 1 (s.outer.extra) or leaf 3 (s.tag). + let get_field_expr = get_field().call(vec![ col("s"), Expr::Literal(ScalarValue::Utf8(Some("outer".to_string())), None), Expr::Literal(ScalarValue::Utf8(Some("inner".to_string())), None), @@ -1452,14 +1736,101 @@ mod test { .expect("building candidate") .expect("deeply nested get_field filter should be pushable"); - // Root column is s (Arrow index 1), which has one leaf: s.outer.inner=1 + // Only s.outer.inner (leaf 2) should be projected, assert_eq!( - candidate.projection.leaf_indices, - vec![1], - "leaf_indices should be [1] for s.outer.inner" + candidate.read_plan.leaf_indices, + vec![2], + "leaf_indices should be [2] for s.outer.inner, skipping sibling and cousin leaves" ); } + /// End-to-end: get_field filter on a struct column with multiple fields + /// reads only the needed leaf and correctly filters rows during Parquet decoding. + #[test] + fn get_field_end_to_end_filters_rows() { + // Schema: id (Int32), s (Struct{value: Int32, label: Utf8}) + // Parquet leaves: id=0, s.value=1, s.label=2 + let struct_fields: Fields = vec![ + Arc::new(Field::new("value", DataType::Int32, false)), + Arc::new(Field::new("label", DataType::Utf8, false)), + ] + .into(); + + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("s", DataType::Struct(struct_fields.clone()), false), + ])); + + // +----+--------------------------+ + // | id | s | + // +----+--------------------------+ + // | 1 | {value: 10, label: "a"} | + // | 2 | {value: 20, label: "b"} | + // | 3 | {value: 30, label: "c"} | + // +----+--------------------------+ + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(StructArray::new( + struct_fields, + vec![ + Arc::new(Int32Array::from(vec![10, 20, 30])) as _, + Arc::new(StringArray::from(vec!["a", "b", "c"])) as _, + ], + None, + )), + ], + ) + .unwrap(); + + let file = NamedTempFile::new().expect("temp file"); + let mut writer = + ArrowWriter::try_new(file.reopen().unwrap(), Arc::clone(&schema), None) + .expect("writer"); + writer.write(&batch).expect("write batch"); + writer.close().expect("close writer"); + + let reader_file = file.reopen().expect("reopen file"); + let parquet_reader_builder = + ParquetRecordBatchReaderBuilder::try_new(reader_file) + .expect("reader builder"); + let metadata = parquet_reader_builder.metadata().clone(); + let file_schema = parquet_reader_builder.schema().clone(); + + // get_field(s, 'value') > 15 — should match rows with value=20 and value=30 + let get_field_expr = get_field().call(vec![ + col("s"), + Expr::Literal(ScalarValue::Utf8(Some("value".to_string())), None), + ]); + let predicate_expr = + get_field_expr.gt(Expr::Literal(ScalarValue::Int32(Some(15)), None)); + let expr = logical2physical(&predicate_expr, &file_schema); + + let metrics = ExecutionPlanMetricsSet::new(); + let file_metrics = ParquetFileMetrics::new(0, "struct_e2e.parquet", &metrics); + + let row_filter = + build_row_filter(&expr, &file_schema, &metadata, false, &file_metrics) + .expect("building row filter") + .expect("row filter should exist"); + + let reader = parquet_reader_builder + .with_row_filter(row_filter) + .build() + .expect("build reader"); + + let mut total_rows = 0; + for batch in reader { + let batch = batch.expect("record batch"); + total_rows += batch.num_rows(); + } + + assert_eq!(total_rows, 2, "expected 2 rows matching value > 15"); + assert_eq!(file_metrics.pushdown_rows_pruned.value(), 1); + assert_eq!(file_metrics.pushdown_rows_matched.value(), 2); + } + /// Sanity check that the given expression could be evaluated against the given schema without any errors. /// This will fail if the expression references columns that are not in the schema or if the types of the columns are incompatible, etc. fn check_expression_can_evaluate_against_schema(