Teach Datafusion to project only accessed struct leaves in row filter pushdown#20854
Teach Datafusion to project only accessed struct leaves in row filter pushdown#20854friendlymatthew wants to merge 3 commits intoapache:mainfrom
Conversation
d24acc3 to
15b2586
Compare
friendlymatthew
left a comment
There was a problem hiding this comment.
self review
| }; | ||
|
|
||
| self.required_columns.push(idx); | ||
| self.struct_field_accesses.push(StructFieldAccess { |
There was a problem hiding this comment.
struct field accesses are tracked in a separate vec rather than being pushed into required_columns
This is intentional since required_columns feeds into leaf_indices_for_roots which expands a root index to all its leaves. By keeping the struct accesses separate, we can resolve them to only the specific leaves needed via resolve_struct_field_leaves
| schema_descr, | ||
| ); | ||
| leaf_indices.extend_from_slice(&struct_leaf_indices); | ||
| leaf_indices.sort_unstable(); |
There was a problem hiding this comment.
Will this cause errors if the sorting is different from the root_indices
There was a problem hiding this comment.
To my knowledge, no.
leaf_indices is only used to build a ProjectionMask::leaves
datafusion/datafusion/datasource-parquet/src/row_filter.rs
Lines 144 to 149 in 5af7361
ProjectionMask does not care about order. It builds a boolean mask of size vec![false; num_columns] and sets via mask[leaf_idx] = true.
Other call sites that use leaf_indices isn't considering order
There was a problem hiding this comment.
Ok, then is that change needed? I.e, the sort and dedup
There was a problem hiding this comment.
leaf_indices and root_indices serve different purposes. leaf indices become the ProjectionMask, telling the parquet decoder which physical leaf columns to read from disk. root indicies (+ struct field accesses) become the filter schema, telling Arrow what schema to use when reconstructing the record batch
Arrow just takes whatever decoded leaves are available and assembles them into the schema it was given. So suppose you had leaf_indices=[2] with root_indices[1]. The masks says decode leaf 2 and the schema says give me struct column 1, pruned to just this specific field
There was a problem hiding this comment.
sort and dedup are necessary since we concatenate leaf_indices_for_roots and resolve_struct_field_leaves. The first iterates parquet leaves 0..N collecting those belonging to regular (non-struct) cols, and the second does the same for struct field accesses. Both produce individually sorted output, but when a struct column appears before a regular column in the schema, the struct's leaf indices are numerically lower.
Dedup is needed because the same struct field can appear multiple times in a filter expression like get_field(s, 'val') > 5 and get_field(s, 'val') < 100 producing duplicate entries in struct_field_accesses. Without dedup, we'd double count the compressed size of that column
| self.projected_columns = true; | ||
| return Some(TreeNodeRecursion::Jump); |
There was a problem hiding this comment.
I know this is pre existing, but what is this code path supposed to catch? Under what circumstances would there be a column that doesn't exist in the file schema at this point, and why is it a "projected" column?
There was a problem hiding this comment.
This can happen during schema evolution (a column that was added after the file was written) or partition columns that are projected onto the scan but don't exist physically on disk
In either case, we can't push the filter down because the decoder has no data for this column
I'll add a comment explaining this or do it in a follow up PR
There was a problem hiding this comment.
I don't think that's true. We resolve partition columns to literals before we reach this point. And any missing columns would have also been replaced with null literals.
| /// Field names forming the path into the struct. | ||
| /// e.g., `["value"]` for `s['value']`, `["outer", "inner"]` for `s['outer']['inner']`. |
There was a problem hiding this comment.
I assume we don't support stuff like array_has_any(get_field(s, 'items'), 5)?
There was a problem hiding this comment.
We do support it! Here's a test that repros: 44a02f3
| fn resolve_struct_field_leaves( | ||
| accesses: &[StructFieldAccess], | ||
| file_schema: &Schema, | ||
| schema_descr: &SchemaDescriptor, | ||
| ) -> Vec<usize> { |
There was a problem hiding this comment.
Let's make sure we share this logic for projections.
More generally: there should be a single place where there is a function along the lines of:
fn build_parquet_read_plan(expr: &Arc<dyn PhysicalExpr>) -> ParquetReadPlan {
...
}
struct ParquetReadPlan {
// leaf projections
projection_mask: ProjectionMask,
// the schema to read back with
schema: SchemaRef,
// the transformed expression (do we need this?)
expr: Arc<dyn PhysicalExpr>
}There was a problem hiding this comment.
Think this makes a lot of sense. I've charted a big picture idea of what this refactor will look like: #20913
| /// filter expression. | ||
| /// | ||
| /// For regular (non-struct) columns, the full field type is used. | ||
| /// For struct columns accessed via `get_field`, a pruned struct type is created |
There was a problem hiding this comment.
Instead of a pruned struct type, why not transform the expression from get_field(s, 'f') -> Alias(Column("s.f", 123), "get_field(s, 'f')")` or something like that? Then we don't need to manipulate the data (assemble a struct with pruned fields)
There was a problem hiding this comment.
The constraint is what arrow's parquet reader produces. ProjectionMask::leaves still returns a nested StructArray with only the projected fields, not flat top-level cols. So Column("s.f") wouldn't match the reader output without an extra flattening step in ArrowPredicate::evaluate
The pruned struct approach keeps the schema in sync with what the reader naturally returns. We'd need to invent a way to get flat columns from the arrow parquet reader to make that work
There was a problem hiding this comment.
Okay interesting. I would have thought that ProjectionMask::leaves returned a flat column, not a nested struct. Thanks for explaining 😄
44a02f3 to
8dba654
Compare
Which issue does this PR close?
Rationale for this change
This PR refines how the
FilterCandidateBuilderprojects struct columns during Parquet row filter pushdown.Previously, a filter like
s['value'] > 10would cause the reader to decode all leaf columns of a structs, becausePushdownCheckeronly tracked the root column index and expanded it to every leaf. This wastes I/O and decode time on fields the filter never touchesNow, the builder resolves only the matching Parquet leaf columns. It does this by building a pruned filter schema that reflects exactly what the Parquet reader produces when projecting a subset of struct leaves, ensuring the expression evaluates against the correct types