Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 36 additions & 13 deletions parquet/src/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -435,24 +435,14 @@ impl ProjectionMask {
root_leaf_counts[root_idx] += 1;
}

// Keep only leaves whose root has exactly one leaf (non-nested) and is not a
// LIST. LIST is encoded as a wrapped logical type with a single leaf, e.g.
// https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
//
// ```text
// // List<String> (list non-null, elements nullable)
// required group my_list (LIST) {
// repeated group list {
// optional binary element (STRING);
// }
// }
// ```
// Cache only top-level primitive columns.
// Even a one-leaf group is nested; caching it drops parent def levels.
let mut included_leaves = Vec::new();
for leaf_idx in 0..num_leaves {
if self.leaf_included(leaf_idx) {
let root = schema.get_column_root(leaf_idx);
let root_idx = schema.get_column_root_idx(leaf_idx);
if root_leaf_counts[root_idx] == 1 && !root.is_list() {
if root_leaf_counts[root_idx] == 1 && root.is_primitive() {
included_leaves.push(leaf_idx);
}
}
Expand Down Expand Up @@ -1042,6 +1032,39 @@ mod test {
);
}

#[test]
fn test_projection_mask_without_nested_single_leaf_struct() {
// Regression: a single-leaf struct is still nested.
let schema = parse_schema(
"
message test_schema {
OPTIONAL group address {
REQUIRED BYTE_ARRAY street (UTF8);
}
REQUIRED INT32 id;
}
",
);

// street -> empty; root is a struct
let mask = ProjectionMask::leaves(&schema, [0]);
assert_eq!(None, mask.without_nested_types(&schema));

// street, id --> id only
let mask = ProjectionMask::leaves(&schema, [0, 1]);
assert_eq!(
Some(ProjectionMask::leaves(&schema, [1])),
mask.without_nested_types(&schema)
);

// all --> id only
let mask = ProjectionMask::all();
assert_eq!(
Some(ProjectionMask::leaves(&schema, [1])),
mask.without_nested_types(&schema)
);
}

/// Converts a schema string into a `SchemaDescriptor`
fn parse_schema(schema: &str) -> SchemaDescriptor {
let parquet_group_type = parse_message_type(schema).unwrap();
Expand Down
78 changes: 78 additions & 0 deletions parquet/tests/arrow_reader/predicate_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
//! Test for predicate cache in Parquet Arrow reader

use super::io::TestReader;
use arrow::array::Array;
use arrow::array::ArrayRef;
use arrow::array::Int64Array;
use arrow::compute::and;
Expand Down Expand Up @@ -92,6 +93,83 @@ async fn test_cache_projection_excludes_nested_columns() {
test.run_async(async_builder).await;
}

/// Regression: cache must match no-cache for a nullable single-leaf struct.
#[tokio::test]
async fn test_async_predicate_on_single_leaf_nullable_struct() {
// Rows: b = NULL, then b.aa = "hello".
let aa: StringArray = StringArray::from(vec!["padding", "hello"]);
let nulls = arrow_buffer::NullBuffer::from(vec![false, true]);
let b = StructArray::new(
vec![Arc::new(Field::new("aa", DataType::Utf8, false))].into(),
vec![Arc::new(aa) as ArrayRef],
Some(nulls),
);
let input_batch = RecordBatch::try_from_iter([("b", Arc::new(b) as ArrayRef)]).unwrap();

let mut output = Vec::new();
let mut writer = ArrowWriter::try_new(&mut output, input_batch.schema(), None).unwrap();
writer.write(&input_batch).unwrap();
writer.close().unwrap();
let bytes = Bytes::from(output);

// Since `aa` is required, `b.aa IS NULL` means `b` is NULL.
let build_is_null_filter = |schema_descr: &parquet::schema::types::SchemaDescPtr| -> RowFilter {
let mask = ProjectionMask::leaves(schema_descr, vec![0]);
let predicate = ArrowPredicateFn::new(mask.clone(), |batch: RecordBatch| {
let struct_arr = batch.column(0).as_struct();
let leaf = struct_arr.column(0);
Ok((0..batch.num_rows())
.map(|i| struct_arr.is_null(i) || leaf.is_null(i))
.collect::<arrow_array::BooleanArray>())
});
RowFilter::new(vec![Box::new(predicate)])
};

// Default cache.
let reader = TestReader::new(bytes.clone());
let async_builder =
ParquetRecordBatchStreamBuilder::new_with_options(reader, ArrowReaderOptions::default())
.await
.unwrap();
let schema_descr = async_builder.metadata().file_metadata().schema_descr_ptr();
let async_builder = async_builder
.with_projection(ProjectionMask::leaves(&schema_descr, vec![0]))
.with_row_filter(build_is_null_filter(&schema_descr));
let mut stream = async_builder.build().unwrap();
let mut row_count_cached = 0;
while let Some(batch) = stream.next().await {
row_count_cached += batch.unwrap().num_rows();
}

// Cache disabled.
let reader = TestReader::new(bytes.clone());
let async_builder =
ParquetRecordBatchStreamBuilder::new_with_options(reader, ArrowReaderOptions::default())
.await
.unwrap();
let schema_descr = async_builder.metadata().file_metadata().schema_descr_ptr();
let async_builder = async_builder
.with_projection(ProjectionMask::leaves(&schema_descr, vec![0]))
.with_row_filter(build_is_null_filter(&schema_descr))
.with_max_predicate_cache_size(0);
let mut stream = async_builder.build().unwrap();
let mut row_count_uncached = 0;
while let Some(batch) = stream.next().await {
row_count_uncached += batch.unwrap().num_rows();
}

assert_eq!(
row_count_uncached, 1,
"control: with cache disabled the predicate must match exactly one row (parent NULL)"
);
assert_eq!(
row_count_cached, row_count_uncached,
"cached reader must match uncached reader; \
got {row_count_cached} cached vs {row_count_uncached} uncached. \
single-leaf struct roots must stay out of the cache."
);
}

// -- Begin test infrastructure --

/// A test parquet file
Expand Down