Skip to content

Teach Datafusion to project only accessed struct leaves in row filter pushdown#20854

Open
friendlymatthew wants to merge 3 commits intoapache:mainfrom
pydantic:friendlymatthew/struct-leaf-projection
Open

Teach Datafusion to project only accessed struct leaves in row filter pushdown#20854
friendlymatthew wants to merge 3 commits intoapache:mainfrom
pydantic:friendlymatthew/struct-leaf-projection

Conversation

@friendlymatthew
Copy link
Contributor

Which issue does this PR close?

Rationale for this change

This PR refines how the FilterCandidateBuilder projects struct columns during Parquet row filter pushdown.

Previously, a filter like s['value'] > 10 would cause the reader to decode all leaf columns of a struct s, because PushdownChecker only tracked the root column index and expanded it to every leaf. This wastes I/O and decode time on fields the filter never touches

Now, the builder resolves only the matching Parquet leaf columns. It does this by building a pruned filter schema that reflects exactly what the Parquet reader produces when projecting a subset of struct leaves, ensuring the expression evaluates against the correct types

@github-actions github-actions bot added the datasource Changes to the datasource crate label Mar 10, 2026
@friendlymatthew friendlymatthew force-pushed the friendlymatthew/struct-leaf-projection branch from d24acc3 to 15b2586 Compare March 10, 2026 18:41
Copy link
Contributor Author

@friendlymatthew friendlymatthew left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self review

};

self.required_columns.push(idx);
self.struct_field_accesses.push(StructFieldAccess {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

struct field accesses are tracked in a separate vec rather than being pushed into required_columns

This is intentional since required_columns feeds into leaf_indices_for_roots which expands a root index to all its leaves. By keeping the struct accesses separate, we can resolve them to only the specific leaves needed via resolve_struct_field_leaves

schema_descr,
);
leaf_indices.extend_from_slice(&struct_leaf_indices);
leaf_indices.sort_unstable();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this cause errors if the sorting is different from the root_indices

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To my knowledge, no.

leaf_indices is only used to build a ProjectionMask::leaves

// Use leaf indices: when nested columns are involved, we must specify
// leaf (primitive) column indices in the Parquet schema so the decoder
// can properly project and filter nested structures.
projection_mask: ProjectionMask::leaves(
metadata.file_metadata().schema_descr(),
candidate.projection.leaf_indices.iter().copied(),

ProjectionMask does not care about order. It builds a boolean mask of size vec![false; num_columns] and sets via mask[leaf_idx] = true.

Other call sites that use leaf_indices isn't considering order

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, then is that change needed? I.e, the sort and dedup

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

leaf_indices and root_indices serve different purposes. leaf indices become the ProjectionMask, telling the parquet decoder which physical leaf columns to read from disk. root indicies (+ struct field accesses) become the filter schema, telling Arrow what schema to use when reconstructing the record batch

Arrow just takes whatever decoded leaves are available and assembles them into the schema it was given. So suppose you had leaf_indices=[2] with root_indices[1]. The masks says decode leaf 2 and the schema says give me struct column 1, pruned to just this specific field

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sort and dedup are necessary since we concatenate leaf_indices_for_roots and resolve_struct_field_leaves. The first iterates parquet leaves 0..N collecting those belonging to regular (non-struct) cols, and the second does the same for struct field accesses. Both produce individually sorted output, but when a struct column appears before a regular column in the schema, the struct's leaf indices are numerically lower.

Dedup is needed because the same struct field can appear multiple times in a filter expression like get_field(s, 'val') > 5 and get_field(s, 'val') < 100 producing duplicate entries in struct_field_accesses. Without dedup, we'd double count the compressed size of that column

Comment on lines +351 to +352
self.projected_columns = true;
return Some(TreeNodeRecursion::Jump);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is pre existing, but what is this code path supposed to catch? Under what circumstances would there be a column that doesn't exist in the file schema at this point, and why is it a "projected" column?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can happen during schema evolution (a column that was added after the file was written) or partition columns that are projected onto the scan but don't exist physically on disk

In either case, we can't push the filter down because the decoder has no data for this column

I'll add a comment explaining this or do it in a follow up PR

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that's true. We resolve partition columns to literals before we reach this point. And any missing columns would have also been replaced with null literals.

Comment on lines +523 to +524
/// Field names forming the path into the struct.
/// e.g., `["value"]` for `s['value']`, `["outer", "inner"]` for `s['outer']['inner']`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume we don't support stuff like array_has_any(get_field(s, 'items'), 5)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do support it! Here's a test that repros: 44a02f3

Comment on lines +570 to +574
fn resolve_struct_field_leaves(
accesses: &[StructFieldAccess],
file_schema: &Schema,
schema_descr: &SchemaDescriptor,
) -> Vec<usize> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make sure we share this logic for projections.

More generally: there should be a single place where there is a function along the lines of:

fn build_parquet_read_plan(expr: &Arc<dyn PhysicalExpr>) -> ParquetReadPlan {
  ...
}

struct ParquetReadPlan {
   // leaf projections
   projection_mask: ProjectionMask,
   // the schema to read back with
   schema: SchemaRef,
   // the transformed expression (do we need this?)
   expr: Arc<dyn PhysicalExpr>
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think this makes a lot of sense. I've charted a big picture idea of what this refactor will look like: #20913

/// filter expression.
///
/// For regular (non-struct) columns, the full field type is used.
/// For struct columns accessed via `get_field`, a pruned struct type is created
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of a pruned struct type, why not transform the expression from get_field(s, 'f') -> Alias(Column("s.f", 123), "get_field(s, 'f')")` or something like that? Then we don't need to manipulate the data (assemble a struct with pruned fields)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The constraint is what arrow's parquet reader produces. ProjectionMask::leaves still returns a nested StructArray with only the projected fields, not flat top-level cols. So Column("s.f") wouldn't match the reader output without an extra flattening step in ArrowPredicate::evaluate

The pruned struct approach keeps the schema in sync with what the reader naturally returns. We'd need to invent a way to get flat columns from the arrow parquet reader to make that work

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay interesting. I would have thought that ProjectionMask::leaves returned a flat column, not a nested struct. Thanks for explaining 😄

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

datasource Changes to the datasource crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Read single struct fields in ParquetOpener

3 participants