diff --git a/parquet/benches/arrow_writer.rs b/parquet/benches/arrow_writer.rs index 9b22bb04b3e7..3636cb040253 100644 --- a/parquet/benches/arrow_writer.rs +++ b/parquet/benches/arrow_writer.rs @@ -32,7 +32,7 @@ use std::sync::Arc; use arrow::datatypes::*; use arrow::util::bench_util::{create_f16_array, create_f32_array, create_f64_array}; use arrow::{record_batch::RecordBatch, util::data_gen::*}; -use arrow_array::RecordBatchOptions; +use arrow_array::{RecordBatchOptions, StringArray}; use parquet::errors::Result; use parquet::file::properties::{CdcOptions, WriterProperties, WriterVersion}; @@ -100,6 +100,29 @@ fn create_string_bench_batch( )?) } +/// 1 M short, fixed-width 8-byte strings. Exercises the BYTE_ARRAY hot path +/// for the case where individual values are small enough that the byte-budget +/// based sub-batch sizing in `write_batch_internal` should always resolve to +/// the full chunk (no granular splitting, no regression vs. current behavior). +fn create_short_string_bench_batch(size: usize) -> Result { + let array = Arc::new(StringArray::from_iter_values( + (0..size).map(|i| format!("{i:08}")), + )) as _; + Ok(RecordBatch::try_from_iter([("col", array)])?) +} + +/// `size` rows of `value_size`-byte strings. Exercises the BYTE_ARRAY path +/// where individual values are large enough that batching the default +/// `write_batch_size` of them would blow the page byte limit by orders of +/// magnitude — the case the page-size fix targets. +fn create_large_string_bench_batch(size: usize, value_size: usize) -> Result { + let value = "x".repeat(value_size); + let array = Arc::new(StringArray::from_iter_values( + (0..size).map(|_| value.as_str()), + )) as _; + Ok(RecordBatch::try_from_iter([("col", array)])?) +} + fn create_string_and_binary_view_bench_batch( size: usize, null_density: f32, @@ -392,6 +415,16 @@ fn create_batches() -> Vec<(&'static str, RecordBatch)> { let batch = create_string_bench_batch(BATCH_SIZE, 0.25, 0.75).unwrap(); batches.push(("string", batch)); + let batch = create_short_string_bench_batch(BATCH_SIZE).unwrap(); + batches.push(("short_string_non_null", batch)); + + // 1024 rows × 256 KiB = 256 MiB total. With the default 1 MiB page byte + // limit, this is the case where the page-size fix kicks in: each value + // needs its own page, and `write_batch_size = 1024` would otherwise + // buffer all 256 MiB before the post-write check runs. + let batch = create_large_string_bench_batch(1024, 256 * 1024).unwrap(); + batches.push(("large_string_non_null", batch)); + let batch = create_string_and_binary_view_bench_batch(BATCH_SIZE, 0.25, 0.75).unwrap(); batches.push(("string_and_binary_view", batch)); diff --git a/parquet/src/arrow/arrow_writer/byte_array.rs b/parquet/src/arrow/arrow_writer/byte_array.rs index f56f9570adfb..409b6c8b001c 100644 --- a/parquet/src/arrow/arrow_writer/byte_array.rs +++ b/parquet/src/arrow/arrow_writer/byte_array.rs @@ -30,10 +30,12 @@ use crate::geospatial::statistics::GeospatialStatistics; use crate::schema::types::ColumnDescPtr; use crate::util::bit_util::num_required_bits; use crate::util::interner::{Interner, Storage}; +use arrow_array::types::ByteArrayType; use arrow_array::{ Array, ArrayAccessor, BinaryArray, BinaryViewArray, DictionaryArray, FixedSizeBinaryArray, - LargeBinaryArray, LargeStringArray, StringArray, StringViewArray, + GenericByteArray, LargeBinaryArray, LargeStringArray, StringArray, StringViewArray, }; +use arrow_buffer::{ArrowNativeType, Buffer}; use arrow_schema::DataType; macro_rules! downcast_dict_impl { @@ -475,6 +477,90 @@ impl ColumnValueEncoder for ByteArrayEncoder { Ok(()) } + fn count_values_within_byte_budget_gather( + values: &Self::Values, + indices: &[usize], + byte_budget: usize, + ) -> Option { + // `ByteArrayEncoder` only ever writes via `write_gather`, so this + // is the relevant method. + // + // Two-stage walk for the simple offset-buffer byte array types: + // 1. If indices are contiguous, compute the total payload in + // O(1) via a single subtraction on the offsets buffer. + // When the total fits the budget — the overwhelmingly + // common "small values" case — return immediately. + // 2. Otherwise, walk per-value byte sizes from the offsets + // buffer (still cheap, no slice/UTF-8 construction) and + // exit at the first value that pushes the cumulative sum + // past the budget. This bounds skewed distributions: an + // outlier value is caught wherever it lands in the chunk. + let count = match values.data_type() { + DataType::Utf8 => count_within_budget_offsets( + values.as_any().downcast_ref::().unwrap(), + indices, + byte_budget, + ), + DataType::LargeUtf8 => count_within_budget_offsets( + values.as_any().downcast_ref::().unwrap(), + indices, + byte_budget, + ), + DataType::Binary => count_within_budget_offsets( + values.as_any().downcast_ref::().unwrap(), + indices, + byte_budget, + ), + DataType::LargeBinary => count_within_budget_offsets( + values.as_any().downcast_ref::().unwrap(), + indices, + byte_budget, + ), + // View arrays carry each value's length in the low 32 bits of + // its u128 view word, so lengths are scannable without touching + // any data buffer — and the common small-value case skips even + // that scan via an O(1) conservative bound. + DataType::Utf8View => { + let array = values.as_any().downcast_ref::().unwrap(); + count_within_budget_views( + array.views(), + indices, + byte_budget, + max_view_value_len(array.data_buffers()), + ) + } + DataType::BinaryView => { + let array = values.as_any().downcast_ref::().unwrap(); + count_within_budget_views( + array.views(), + indices, + byte_budget, + max_view_value_len(array.data_buffers()), + ) + } + // For arrow Dictionary input, treat every chunk as fitting + // and stay on the batched path. The arrow array being + // Dictionary-encoded in the first place implies its values + // are small enough that dedup is worthwhile, which is the + // opposite of the "5 MiB blob per row" case this fix + // targets. Doing a per-value walk through dict keys (each + // value lookup is keys[i] → values[key] → slice) on every + // chunk costs ~+30-80% vs `main` after writer-dict spill, + // and there is essentially nothing to bound. + DataType::Dictionary(_, _) => indices.len(), + // FixedSizeBinary falls through to the per-value walk via + // `ArrayAccessor::value`. + _ => downcast_op!( + values.data_type(), + values, + count_within_budget_accessor, + indices, + byte_budget + ), + }; + Some(count) + } + fn num_values(&self) -> usize { match &self.dict_encoder { Some(encoder) => encoder.indices.len(), @@ -587,6 +673,141 @@ where } } +/// Cumulative-scan fallback used for byte array types that don't expose +/// a single contiguous offsets buffer — view arrays, dictionary arrays, +/// fixed-size binary. Returns the largest `k` such that the first `k` +/// values picked out by `indices` encode to at most `byte_budget` bytes +/// (or `indices.len()` if they all fit, or `1` if a single value alone +/// exceeds the budget). +/// +/// Free function so it can be used with `downcast_op!`. +fn count_within_budget_accessor(values: T, indices: &[usize], byte_budget: usize) -> usize +where + T: ArrayAccessor + Copy, + T::Item: AsRef<[u8]>, +{ + let mut cum: usize = 0; + for (i, idx) in indices.iter().enumerate() { + let value_len = values.value(*idx).as_ref().len() + std::mem::size_of::(); + cum = cum.saturating_add(value_len); + if cum > byte_budget { + return i.max(1); + } + } + indices.len() +} + +/// Upper bound on any single value's byte length in a view array. +/// +/// An inline view stores at most 12 bytes; an +/// out-of-line view's data is a contiguous slice of exactly one data +/// buffer, so it cannot be longer than the largest data buffer. This is a +/// loose bound (a value is usually far smaller than a whole buffer) but it +/// is O(number of buffers) and always sound. +fn max_view_value_len(buffers: &[Buffer]) -> usize { + /// Bytes that fit inline in a u128 view word (the rest is len + prefix). + const MAX_INLINE_VIEW_LEN: usize = 12; + buffers + .iter() + .map(|b| b.len()) + .max() + .unwrap_or(0) + .max(MAX_INLINE_VIEW_LEN) +} + +/// Two-stage budget count for view arrays (`Utf8View`, `BinaryView`). +/// +/// 1. View arrays have no prefix-sum offsets buffer, so the exact O(1) +/// span subtraction `count_within_budget_offsets` uses is unavailable. +/// But a *conservative* O(1) bound is: every value is at most +/// `max_value_len` bytes, so the whole chunk fits the budget when +/// `n * (max_value_len + 4) <= byte_budget`. This skips the per-value +/// walk for the common small-value case — what view arrays are built +/// for, and exactly the case where there is nothing to bound. +/// 2. Otherwise scan per-value lengths from the low 32 bits of each u128 +/// view word (no data-buffer dereference) and stop at the first value +/// that pushes the cumulative sum past the budget. +fn count_within_budget_views( + views: &[u128], + indices: &[usize], + byte_budget: usize, + max_value_len: usize, +) -> usize { + // Stage 1: O(1) conservative upper bound. + let per_value = max_value_len + std::mem::size_of::(); + if indices.len().saturating_mul(per_value) <= byte_budget { + return indices.len(); + } + // Stage 2: exact per-value scan. + let mut cum: usize = 0; + for (i, idx) in indices.iter().enumerate() { + let len = (views[*idx] as u32) as usize; + cum = cum.saturating_add(len + std::mem::size_of::()); + if cum > byte_budget { + return i.max(1); + } + } + indices.len() +} + +/// Two-stage fast path for `GenericByteArray` +/// (Utf8/LargeUtf8/Binary/LargeBinary). +/// +/// `indices` are assumed sorted ascending — they always are here, since +/// they come from `non_null_indices`, which is built in array order. +/// +/// 1. The span `offsets[last+1] - offsets[first]` is an O(1) upper +/// bound on the chunk's payload: it covers every array position in +/// `[first, last]`, a superset of `indices`. For a non-null chunk +/// `indices` *is* that whole range; for a chunk drawn from a +/// nullable column the skipped positions are nulls, whose offset +/// delta is zero, so the span still equals the exact payload. +/// Either way, if the upper bound fits the budget every value +/// fits — return `indices.len()` with no per-value work. This +/// covers the overwhelmingly common "small values" case for both +/// non-null *and* nullable columns. +/// 2. Otherwise the chunk is genuinely near the budget: walk per-index +/// lengths from the offsets buffer directly (no slice/UTF-8 +/// construction) and stop at the first value that pushes the +/// cumulative sum past the budget. +fn count_within_budget_offsets( + values: &GenericByteArray, + indices: &[usize], + byte_budget: usize, +) -> usize { + if indices.is_empty() { + return 0; + } + let n = indices.len(); + let first = indices[0]; + let last = indices[n - 1]; + let offsets = values.value_offsets(); + let prefix_overhead = std::mem::size_of::(); + + // Stage 1: O(1) span upper bound. Skips Stage 2 in the common case — + // including nullable columns, whose `indices` are sparse. The earlier + // `last - first + 1 == n` contiguity gate forced every nullable chunk + // onto the O(n) Stage 2 walk even though the span check is valid for + // any sorted index set. + if last >= first { + let payload = (offsets[last + 1] - offsets[first]).as_usize(); + if payload + n * prefix_overhead <= byte_budget { + return n; + } + } + + // Stage 2: scan per-index lengths from the offsets buffer. + let mut cum: usize = 0; + for (i, idx) in indices.iter().enumerate() { + let len = (offsets[idx + 1] - offsets[*idx]).as_usize() + prefix_overhead; + cum = cum.saturating_add(len); + if cum > byte_budget { + return i.max(1); + } + } + n +} + /// Computes the min and max for the provided array and indices /// /// This is a free function so it can be used with `downcast_op!` diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 04a798391429..bc038aa3b0c0 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -4901,6 +4901,245 @@ mod tests { assert_eq!(get_dict_page_size(col1_meta), 1024 * 1024 * 4); } + #[test] + fn test_arrow_writer_caps_page_size_for_large_strings() { + // End-to-end coverage for `ByteArrayEncoder::estimated_value_bytes_gather` + // and the offsets-buffer fast path in `estimate_byte_size_offsets`. + // + // The standard column-writer regression test covers the same + // logic for the non-arrow path; this one drives writes through + // `ArrowWriter`, which is what real users hit and which uses + // `write_gather` with `non_null_indices` (always contiguous for + // a non-null column). + let value_size = 64 * 1024; + let page_byte_limit = 16 * 1024; + let num_rows = 32; + + let schema = Arc::new(Schema::new(vec![Field::new( + "col", + ArrowDataType::Utf8, + false, + )])); + let strings: Vec = (0..num_rows).map(|_| "x".repeat(value_size)).collect(); + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(StringArray::from(strings)) as _], + ) + .unwrap(); + + let props = WriterProperties::builder() + .set_dictionary_enabled(false) + .set_data_page_size_limit(page_byte_limit) + .build(); + let mut writer = ArrowWriter::try_new(Vec::new(), schema, Some(props)).unwrap(); + writer.write(&batch).unwrap(); + let data = Bytes::from(writer.into_inner().unwrap()); + + let mut metadata = ParquetMetaDataReader::new(); + metadata.try_parse(&data).unwrap(); + let metadata = metadata.finish().unwrap(); + let col_meta = metadata.row_group(0).column(0); + let mut reader = + SerializedPageReader::new(Arc::new(data.clone()), col_meta, num_rows, None).unwrap(); + + let mut data_pages = Vec::new(); + while let Some(page) = reader.get_next_page().unwrap() { + if matches!(page, Page::DataPage { .. } | Page::DataPageV2 { .. }) { + data_pages.push(page.buffer().len()); + } + } + + assert!( + data_pages.len() >= num_rows / 2, + "expected at least {} data pages for {num_rows} large strings via ArrowWriter, got {} ({data_pages:?})", + num_rows / 2, + data_pages.len(), + ); + for size in &data_pages { + assert!( + *size <= value_size + 64, + "page size {size} exceeds one-value bound; pages {data_pages:?}" + ); + } + } + + #[test] + fn test_arrow_writer_granular_mode_roundtrip() { + // Granular mode subdivides chunks and writes more pages than + // `main`. Make sure the data we write back is bit-identical to + // what went in — page-count assertions elsewhere only prove + // pages were cut, not that the encoded data is correct. + // + // Mix value sizes so that the cumulative-byte-budget cutoff + // lands mid-chunk, exercising both batched and granular paths + // within the same `write_batch_internal` call. + let small = "tiny".to_string(); + let big = "x".repeat(64 * 1024); + let strings: Vec = (0..256) + .map(|i| { + if i % 16 == 0 { + big.clone() + } else { + small.clone() + } + }) + .collect(); + + let schema = Arc::new(Schema::new(vec![Field::new( + "col", + ArrowDataType::Utf8, + false, + )])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(StringArray::from(strings.clone())) as _], + ) + .unwrap(); + + let props = WriterProperties::builder() + .set_dictionary_enabled(false) + .set_data_page_size_limit(16 * 1024) + .build(); + let mut writer = ArrowWriter::try_new(Vec::new(), schema, Some(props)).unwrap(); + writer.write(&batch).unwrap(); + let data = Bytes::from(writer.into_inner().unwrap()); + + let mut reader = ParquetRecordBatchReader::try_new(data, 1024).unwrap(); + let read = reader.next().unwrap().unwrap(); + assert!(reader.next().is_none(), "expected one batch"); + let col = read + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(col.len(), strings.len()); + for (i, expected) in strings.iter().enumerate() { + assert_eq!( + col.value(i), + expected.as_str(), + "value mismatch at index {i}" + ); + } + } + + #[test] + fn test_arrow_writer_caps_page_size_for_large_string_view() { + // Coverage for the view/dict fallback in + // `ByteArrayEncoder::count_values_within_byte_budget_gather`. + // `Utf8View` arrays don't expose a single contiguous offsets + // buffer, so the gather method falls back to the per-value walk + // via `ArrayAccessor::value`. + use arrow_array::StringViewArray; + let value_size = 64 * 1024; + let page_byte_limit = 16 * 1024; + let num_rows = 32usize; + + let schema = Arc::new(Schema::new(vec![Field::new( + "col", + ArrowDataType::Utf8View, + false, + )])); + let strings: Vec = (0..num_rows).map(|_| "x".repeat(value_size)).collect(); + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(StringViewArray::from_iter_values( + strings.iter().map(|s| s.as_str()), + )) as _], + ) + .unwrap(); + + let props = WriterProperties::builder() + .set_dictionary_enabled(false) + .set_data_page_size_limit(page_byte_limit) + .build(); + let mut writer = ArrowWriter::try_new(Vec::new(), schema, Some(props)).unwrap(); + writer.write(&batch).unwrap(); + let data = Bytes::from(writer.into_inner().unwrap()); + + let mut metadata = ParquetMetaDataReader::new(); + metadata.try_parse(&data).unwrap(); + let metadata = metadata.finish().unwrap(); + let col_meta = metadata.row_group(0).column(0); + let mut reader = + SerializedPageReader::new(Arc::new(data.clone()), col_meta, num_rows, None).unwrap(); + + let mut data_pages = Vec::new(); + while let Some(page) = reader.get_next_page().unwrap() { + if matches!(page, Page::DataPage { .. } | Page::DataPageV2 { .. }) { + data_pages.push(page.buffer().len()); + } + } + assert!( + data_pages.len() >= num_rows / 2, + "expected pages bounded by byte budget for Utf8View, got {data_pages:?}" + ); + for size in &data_pages { + assert!( + *size <= value_size + 64, + "page size {size} exceeds one-value bound; pages {data_pages:?}" + ); + } + } + + #[test] + fn test_arrow_writer_all_null_string_column() { + // The `LevelDataRef::value_count` Uniform branch with + // `value != max_def` (entirely-null chunk) must return 0 so the + // sub-batch sizer short-circuits to batch mode without trying + // to estimate byte budgets for non-existent values. + let num_rows = 1024; + let schema = Arc::new(Schema::new(vec![Field::new( + "col", + ArrowDataType::Utf8, + true, + )])); + let nulls: Vec> = vec![None; num_rows]; + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(StringArray::from(nulls)) as _], + ) + .unwrap(); + + let props = WriterProperties::builder() + .set_dictionary_enabled(false) + .set_data_page_size_limit(16 * 1024) + .build(); + let mut writer = ArrowWriter::try_new(Vec::new(), schema, Some(props)).unwrap(); + writer.write(&batch).unwrap(); + let data = Bytes::from(writer.into_inner().unwrap()); + + // Re-parse the file: row group has one column, every row is + // null, all data pages report `num_rows / page_count` rows. + let mut metadata = ParquetMetaDataReader::new(); + metadata.try_parse(&data).unwrap(); + let metadata = metadata.finish().unwrap(); + let row_group = metadata.row_group(0); + let col_meta = row_group.column(0); + assert_eq!(row_group.num_rows() as usize, num_rows); + // Statistics record `null_count = num_rows` — proves every value + // was written as null. + if let Some(stats) = col_meta.statistics() { + assert_eq!( + stats.null_count_opt().unwrap_or(0) as usize, + num_rows, + "expected all-null column to report null_count = num_rows" + ); + } + + let mut reader = + SerializedPageReader::new(Arc::new(data.clone()), col_meta, num_rows, None).unwrap(); + let mut total_values = 0u32; + while let Some(page) = reader.get_next_page().unwrap() { + if matches!(page, Page::DataPage { .. } | Page::DataPageV2 { .. }) { + total_values += page.num_values(); + } + } + assert_eq!( + total_values as usize, num_rows, + "expected every level position to be represented in some page" + ); + } + struct WriteBatchesShape { num_batches: usize, rows_per_batch: usize, diff --git a/parquet/src/column/writer/byte_budget_chunker.rs b/parquet/src/column/writer/byte_budget_chunker.rs new file mode 100644 index 000000000000..d7276381e655 --- /dev/null +++ b/parquet/src/column/writer/byte_budget_chunker.rs @@ -0,0 +1,206 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Decides how many levels of a chunk to write as one mini-batch so that +//! the resulting data page stays within `data_page_size_limit`. +//! +//! The parquet column writer checks the data page byte limit only *after* +//! each mini-batch finishes writing. Mini-batches are sized in rows +//! (`write_batch_size`, default 1024), so for BYTE_ARRAY columns whose +//! values are large (e.g. multi-MiB blobs) a single mini-batch can buffer +//! GiB into one page before the limit is consulted. +//! +//! This module isolates the per-chunk decision that prevents that: given a +//! chunk's level data and the input values, pick the largest `sub_batch_size` +//! such that one mini-batch fits in one page byte budget. For the +//! overwhelmingly common case (small or fixed-width values) the answer is +//! just `chunk_size` and the decision is O(1) on the column type — only +//! when the input might overflow does the chunker consult the encoder's +//! byte estimate. + +use crate::basic::Type; +use crate::column::writer::LevelDataRef; +use crate::column::writer::encoder::ColumnValueEncoder; +use crate::file::properties::WriterProperties; +use crate::schema::types::ColumnDescriptor; + +/// Picks byte-budget-aware mini-batch sizes for one column. +pub(crate) struct ByteBudgetChunker { + /// Configured data page byte limit for the column. + page_byte_limit: usize, + /// Max definition level of the column; a level equal to this marks a + /// present (non-null) leaf value. Used to count values per chunk. + max_def_level: i16, + /// `true` when no chunk of `base_batch_size` values can ever overflow + /// `page_byte_limit` regardless of input. Set once at column open from + /// the physical type's known per-value byte size; lets the per-chunk + /// decision short-circuit with no work for every numeric, bool, or + /// narrow `FIXED_LEN_BYTE_ARRAY` column. + static_always_fits: bool, + /// Configured dictionary page byte limit for the column. + dict_page_byte_limit: usize, + /// As [`Self::static_always_fits`] but for the dictionary page: `true` + /// when one `base_batch_size` mini-batch of this fixed-width type cannot + /// overshoot `dict_page_byte_limit` by more than one mini-batch's worth. + static_dict_always_fits: bool, +} + +impl ByteBudgetChunker { + #[inline] + pub(crate) fn new( + descr: &ColumnDescriptor, + props: &WriterProperties, + base_batch_size: usize, + ) -> Self { + let page_byte_limit = props.column_data_page_size_limit(descr.path()); + let dict_page_byte_limit = props.column_dictionary_page_size_limit(descr.path()); + let static_bytes_per_value = match descr.physical_type() { + Type::BOOLEAN => Some(1), + Type::INT32 | Type::FLOAT => Some(std::mem::size_of::()), + Type::INT64 | Type::DOUBLE => Some(std::mem::size_of::()), + Type::INT96 => Some(12), + Type::FIXED_LEN_BYTE_ARRAY => Some(descr.type_length().max(0) as usize), + Type::BYTE_ARRAY => None, + }; + let static_fits = |limit: usize| { + static_bytes_per_value + .map(|b| b.saturating_mul(base_batch_size) <= limit) + .unwrap_or(false) + }; + Self { + page_byte_limit, + max_def_level: descr.max_def_level(), + static_always_fits: static_fits(page_byte_limit), + dict_page_byte_limit, + static_dict_always_fits: static_fits(dict_page_byte_limit), + } + } + + /// Decide how many levels at the start of a chunk belong in one + /// mini-batch, so the mini-batch cannot overflow whichever page is + /// currently accumulating value bytes: the data page when plain-encoding, + /// or the *dictionary* page while dictionary-encoding. A returned value + /// smaller than `chunk_size` triggers granular sub-batching in + /// `write_batch_internal`. + /// + /// While dictionary-encoding, the data page holds only small RLE indices, + /// but the dictionary page accumulates the distinct values themselves — + /// so it is the dictionary page's remaining budget that must bound the + /// mini-batch. The per-mini-batch dictionary spill check would otherwise + /// let one mini-batch of large values balloon the dictionary page. + /// + /// Returns `chunk_size` immediately (no value inspection) when the chunk + /// is empty, or when the column is a fixed-width type whose mini-batches + /// statically cannot overshoot the relevant page. + /// + /// `#[inline]`: this is a tiny per-chunk dispatcher; the actual byte + /// inspection lives in the out-of-line `byte_budget_sub_batch_size`. + #[inline] + pub(crate) fn pick_sub_batch_size( + &self, + encoder: &E, + values: &E::Values, + value_indices: Option<&[usize]>, + chunk_def: LevelDataRef<'_>, + values_offset: usize, + chunk_size: usize, + ) -> usize { + if chunk_size == 0 { + return chunk_size; + } + let budget = if encoder.has_dictionary() { + if self.static_dict_always_fits { + return chunk_size; + } + // Bound the mini-batch by the dictionary page's *remaining* + // budget (it accumulates across mini-batches until it spills). + match encoder.estimated_dict_page_size() { + Some(used) => self.dict_page_byte_limit.saturating_sub(used), + None => return chunk_size, + } + } else { + if self.static_always_fits { + return chunk_size; + } + self.page_byte_limit + }; + self.byte_budget_sub_batch_size::( + values, + value_indices, + chunk_def, + values_offset, + chunk_size, + budget, + ) + } + + /// Inspect value sizes to decide how many of the chunk's values fit in + /// `budget` bytes (the data page or dictionary page remaining budget). + /// + /// `#[inline(never)]` keeps this slow path out of the hot + /// `write_batch_internal` loop; numeric and bool columns never reach it. + #[inline(never)] + fn byte_budget_sub_batch_size( + &self, + values: &E::Values, + value_indices: Option<&[usize]>, + chunk_def: LevelDataRef<'_>, + values_offset: usize, + chunk_size: usize, + budget: usize, + ) -> usize { + // How many of this chunk's levels carry an actual value. For a + // non-nullable, unrepeated column every level is a value, so + // `value_count` is O(1) (`Absent`/`Uniform` def levels); only + // nullable or nested columns pay the O(chunk_size) def-level scan. + let vals_in_chunk = chunk_def.value_count(chunk_size, self.max_def_level); + if vals_in_chunk == 0 { + return chunk_size; + } + // Ask the encoder how many of the next values fit in one page byte + // budget. Dispatch on whether the caller supplied gather indices; + // this mirrors how `write_mini_batch` picks `write_gather` vs + // `write`. + let fit = match value_indices { + Some(idx) => { + let end = (values_offset + vals_in_chunk).min(idx.len()); + let start = values_offset.min(end); + E::count_values_within_byte_budget_gather(values, &idx[start..end], budget) + } + None => { + E::count_values_within_byte_budget(values, values_offset, vals_in_chunk, budget) + } + }; + match fit { + None => chunk_size, + Some(values_per_subbatch) => { + // Convert the value count back into a level count. For a + // non-nullable column this is a no-op; for nullable/nested + // columns scale by the chunk's observed value-to-level + // ratio. + let levels_per_subbatch = if vals_in_chunk == chunk_size { + values_per_subbatch + } else { + (values_per_subbatch * chunk_size) + .div_ceil(vals_in_chunk) + .max(1) + }; + chunk_size.min(levels_per_subbatch.max(1)) + } + } + } +} diff --git a/parquet/src/column/writer/encoder.rs b/parquet/src/column/writer/encoder.rs index 2ea3376ae708..83c08e7423fa 100644 --- a/parquet/src/column/writer/encoder.rs +++ b/parquet/src/column/writer/encoder.rs @@ -90,6 +90,42 @@ pub trait ColumnValueEncoder { /// Write the values at the indexes in `indices` to this [`ColumnValueEncoder`] fn write_gather(&mut self, values: &Self::Values, indices: &[usize]) -> Result<()>; + /// Returns the largest `k` such that the first `k` values in + /// `values[offset..offset + len]` encode to at most `byte_budget` + /// bytes — i.e. how many values fit in a single page byte budget. + /// + /// Returns `len` if every value fits. Returns at least 1 if a single + /// value alone exceeds the budget, matching parquet's "at least one + /// value per data page" rule. + /// + /// `None` means "no cheap estimate available"; the caller stays on + /// the batched fast path and lets the post-write + /// `should_add_data_page` check handle bounding. + /// + /// Implementations should short-circuit aggressively: the typical + /// case is "everything fits, return `len`", and the next-most-common + /// case is "one wide value, return 1." The variable-width walk only + /// needs to be precise when the chunk is genuinely near the budget. + fn count_values_within_byte_budget( + _values: &Self::Values, + _offset: usize, + _len: usize, + _byte_budget: usize, + ) -> Option { + None + } + + /// As [`Self::count_values_within_byte_budget`] but using gather + /// `indices` rather than a contiguous range. Returns the number of + /// `indices` that fit, not the maximum index value. + fn count_values_within_byte_budget_gather( + _values: &Self::Values, + _indices: &[usize], + _byte_budget: usize, + ) -> Option { + None + } + /// Returns the number of buffered values fn num_values(&self) -> usize; @@ -247,6 +283,63 @@ impl ColumnValueEncoder for ColumnValueEncoderImpl { self.write_slice(&slice) } + fn count_values_within_byte_budget( + values: &[T::T], + offset: usize, + len: usize, + byte_budget: usize, + ) -> Option { + // Clamp so that a caller-supplied `len` that overruns the input + // (e.g. a level/value mismatch the encoder will reject later) + // returns an estimate instead of panicking here. + let end = (offset + len).min(values.len()); + let start = offset.min(end); + let n = end - start; + // Fixed-size physical types have a constant per-value byte cost, + // so the answer is one division — no need to walk the slice. + let phys = ::PHYSICAL_TYPE; + if phys != Type::BYTE_ARRAY && phys != Type::FIXED_LEN_BYTE_ARRAY { + let per = std::mem::size_of::().max(1); + let fits = (byte_budget / per).max(1); + return Some(fits.min(n)); + } + // Variable-width: scan, accumulate, exit at the first value that + // pushes us past the budget. This both bounds skewed + // distributions (one outlier among small values is caught when + // it lands, regardless of position) and short-circuits when an + // early value alone exceeds the budget. + let mut cum: usize = 0; + for (i, v) in values[start..end].iter().enumerate() { + cum = cum.saturating_add(plain_encoded_byte_size::(v)); + if cum > byte_budget { + return Some(i.max(1)); + } + } + Some(n) + } + + fn count_values_within_byte_budget_gather( + values: &[T::T], + indices: &[usize], + byte_budget: usize, + ) -> Option { + let phys = ::PHYSICAL_TYPE; + if phys != Type::BYTE_ARRAY && phys != Type::FIXED_LEN_BYTE_ARRAY { + let per = std::mem::size_of::().max(1); + let fits = (byte_budget / per).max(1); + return Some(fits.min(indices.len())); + } + let mut cum: usize = 0; + for (i, idx) in indices.iter().enumerate() { + let Some(v) = values.get(*idx) else { continue }; + cum = cum.saturating_add(plain_encoded_byte_size::(v)); + if cum > byte_budget { + return Some(i.max(1)); + } + } + Some(indices.len()) + } + fn num_values(&self) -> usize { self.num_values } @@ -411,3 +504,38 @@ where } } } + +/// Plain-encoded byte cost of a single value of type `T::T`. +/// +/// Derived from [`ParquetValueType::dict_encoding_size`] so we don't add a +/// parallel per-value-size hook to the trait. The components returned by +/// `dict_encoding_size` are `(per-value overhead, value-bytes)`. For +/// plain encoding the on-disk layout is: +/// +/// - `BYTE_ARRAY`: 4-byte length prefix + payload bytes = `overhead + bytes`. +/// - `FIXED_LEN_BYTE_ARRAY`: raw bytes only, taken from the type descriptor's +/// `type_length`. The value's own `dict_encoding_size` reports the length +/// prefix, which is irrelevant for plain FLBA encoding; the encoder passes +/// `type_length` directly. +/// - Everything else (numeric / bool): a constant per-value size; the caller +/// already short-circuits these via `mem::size_of::()` before +/// touching this function, so this branch is unreachable in practice and +/// we fall back to `overhead` defensively. +/// +/// See `dict_encoder.rs::push` (line ~52) for the matching dispatch. +/// +/// Placed at the end of the module deliberately. Inserting it above the +/// `ColumnValueEncoder` trait shifts the trait and `ColumnValueEncoderImpl` +/// within the compiled module enough to perturb downstream code placement, +/// which measurably regresses unrelated arrow-writer string benchmarks +/// (~5-9% on `string` / `string_and_binary_view`). Defining it last keeps +/// the hot encoder code at the offsets it has on `main`. +#[inline] +fn plain_encoded_byte_size(value: &T::T) -> usize { + let (overhead, bytes) = value.dict_encoding_size(); + match ::PHYSICAL_TYPE { + Type::BYTE_ARRAY => overhead + bytes, + Type::FIXED_LEN_BYTE_ARRAY => bytes, + _ => overhead, + } +} diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index 5d14ac6856f9..526801d00e0b 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -48,8 +48,11 @@ use crate::file::properties::{ use crate::file::statistics::{Statistics, ValueStatistics}; use crate::schema::types::{ColumnDescPtr, ColumnDescriptor}; +mod byte_budget_chunker; pub(crate) mod encoder; +use byte_budget_chunker::ByteBudgetChunker; + macro_rules! downcast_writer { ($e:expr, $i:ident, $b:expr) => { match $e { @@ -373,6 +376,24 @@ impl<'a> LevelDataRef<'a> { Self::Uniform { value, .. } => Self::Uniform { value, count: len }, } } + + /// Count of positions in this slice that represent an actual value + /// (definition level equal to `max_def`). `Absent` means the column has + /// `max_def == 0` and every position is a value, so the implicit count + /// is the caller-supplied `total`. + pub(crate) fn value_count(self, total: usize, max_def: i16) -> usize { + match self { + Self::Absent => total, + Self::Materialized(values) => values.iter().filter(|&&d| d == max_def).count(), + Self::Uniform { value, count } => { + if value == max_def { + count + } else { + 0 + } + } + } + } } /// Typed column writer for a primitive column. @@ -544,6 +565,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { } else { self.props.write_batch_size() }; + let chunker = ByteBudgetChunker::new(&self.descr, &self.props, base_batch_size); while levels_offset < num_levels { let mut end_offset = num_levels.min(levels_offset + base_batch_size); @@ -554,14 +576,39 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { } } - values_offset += self.write_mini_batch( + let chunk_size = end_offset - levels_offset; + let chunk_def = def_levels.slice(levels_offset, chunk_size); + let chunk_rep = rep_levels.slice(levels_offset, chunk_size); + + let sub_batch_size = chunker.pick_sub_batch_size( + &self.encoder, values, - values_offset, value_indices, - end_offset - levels_offset, - def_levels.slice(levels_offset, end_offset - levels_offset), - rep_levels.slice(levels_offset, end_offset - levels_offset), - )?; + chunk_def, + values_offset, + chunk_size, + ); + + if sub_batch_size >= chunk_size { + values_offset += self.write_mini_batch( + values, + values_offset, + value_indices, + chunk_size, + chunk_def, + chunk_rep, + )?; + } else { + values_offset += self.write_granular_chunk( + values, + values_offset, + value_indices, + chunk_size, + chunk_def, + chunk_rep, + sub_batch_size, + )?; + } levels_offset = end_offset; } @@ -712,6 +759,78 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { }) } + /// Writes a chunk in sub-batches sized by the caller-supplied + /// `sub_batch_size` so the post-write data page byte limit check + /// fires before the chunk can grossly overshoot + /// `data_page_size_limit`. + /// + /// For flat (unrepeated) columns sub-batches contain up to + /// `sub_batch_size` levels each. For repeated/nested columns + /// sub-batches step from one `rep == 0` boundary to the next so a + /// record never spans data pages, matching the parquet format rule. + /// + /// Returns the total number of values consumed across all sub-batches. + /// + /// `#[inline(never)]` keeps this slow path — only reached for + /// variable-width columns whose values need page splitting — out of + /// the hot `write_batch_internal` loop. + #[allow(clippy::too_many_arguments)] + #[inline(never)] + fn write_granular_chunk( + &mut self, + values: &E::Values, + values_offset: usize, + value_indices: Option<&[usize]>, + chunk_size: usize, + chunk_def: LevelDataRef<'_>, + chunk_rep: LevelDataRef<'_>, + sub_batch_size: usize, + ) -> Result { + let mut values_consumed = 0; + let mut sub_start = 0; + while sub_start < chunk_size { + let sub_end = match chunk_rep { + LevelDataRef::Materialized(levels) => { + // Pack up to `sub_batch_size` levels per mini-batch, then + // extend to the next record boundary (rep == 0) so a + // record never spans data pages. Packing whole records + // rather than stepping one record at a time avoids + // emitting a `write_mini_batch` per record: records + // average only a handful of levels, so the + // record-at-a-time loop issued roughly `sub_batch_size`× + // more mini-batches than necessary. + let mut e = (sub_start + sub_batch_size).min(chunk_size); + while e < chunk_size && levels[e] != 0 { + e += 1; + } + // `sub_batch_size` can be 0 when the chunker sizes a + // sub-batch below one record; always make progress by + // consuming at least the first whole record. + if e == sub_start { + e = sub_start + 1; + while e < chunk_size && levels[e] != 0 { + e += 1; + } + } + e + } + _ => (sub_start + sub_batch_size).min(chunk_size), + }; + let sub_len = sub_end - sub_start; + let written = self.write_mini_batch( + values, + values_offset + values_consumed, + value_indices, + sub_len, + chunk_def.slice(sub_start, sub_len), + chunk_rep.slice(sub_start, sub_len), + )?; + values_consumed += written; + sub_start = sub_end; + } + Ok(values_consumed) + } + /// Creates a new streaming level encoder appropriate for the writer version. fn create_level_encoder(max_level: i16, props: &WriterProperties) -> LevelEncoder { match props.writer_version() { @@ -2675,6 +2794,500 @@ mod tests { assert_eq!(other_values, vec![10]); } + #[test] + fn test_column_writer_caps_page_size_for_large_byte_array_values() { + // Regression: the post-write data page byte limit check only fires + // at mini-batch boundaries, so a 1024-row mini-batch of multi-MiB + // BYTE_ARRAY values used to buffer multiple GiB into a single page + // before the limit was even consulted. With the threshold-based + // granular mode this batch should split into ~one page per value. + let value_size = 64 * 1024; // 64 KiB per value + let page_byte_limit = 16 * 1024; // 16 KiB page limit + let num_rows = 64; + + let mut file = tempfile::tempfile().unwrap(); + let mut write = TrackedWrite::new(&mut file); + let page_writer = Box::new(SerializedPageWriter::new(&mut write)); + let props = Arc::new( + WriterProperties::builder() + .set_writer_version(WriterVersion::PARQUET_1_0) + .set_dictionary_enabled(false) + .set_encoding(Encoding::PLAIN) + .set_data_page_size_limit(page_byte_limit) + // Default write_batch_size (1024) — without the fix this + // buffers the entire input into a single ~4 MiB page. + .build(), + ); + + let mut data = Vec::with_capacity(num_rows); + for i in 0..num_rows { + data.push(ByteArray::from(vec![i as u8; value_size])); + } + + let mut writer = get_test_column_writer::(page_writer, 0, 0, props); + writer.write_batch(&data, None, None).unwrap(); + let r = writer.close().unwrap(); + drop(write); + + let props = ReaderProperties::builder() + .set_backward_compatible_lz4(false) + .build(); + let mut page_reader = Box::new( + SerializedPageReader::new_with_properties( + Arc::new(file), + &r.metadata, + r.rows_written as usize, + None, + Arc::new(props), + ) + .unwrap(), + ); + + let mut data_page_sizes = Vec::new(); + let mut data_page_value_counts = Vec::new(); + while let Some(page) = page_reader.get_next_page().unwrap() { + if matches!( + page.page_type(), + PageType::DATA_PAGE | PageType::DATA_PAGE_V2 + ) { + data_page_sizes.push(page.buffer().len()); + data_page_value_counts.push(page.num_values()); + } + } + + // Every value must end up somewhere. + assert_eq!( + data_page_value_counts.iter().sum::() as usize, + num_rows + ); + // Without the fix this assertion fired with one ~4 MiB page; the + // threshold splits the input so that no page holds more than a + // single oversized value's worth of bytes. + assert!( + data_page_sizes.len() >= num_rows / 2, + "expected pages to be cut close to one per value, got {} pages for sizes {:?}", + data_page_sizes.len(), + data_page_sizes, + ); + // Each page must be bounded by roughly one value's worth of bytes; + // parquet allows a single oversized value to occupy a page by + // itself but never lets us pile many of them together. + for size in &data_page_sizes { + assert!( + *size <= value_size + 64, + "page size {size} exceeds one-value bound ({}B) — pages {:?}", + value_size + 64, + data_page_sizes, + ); + } + } + + #[test] + fn test_column_writer_caps_page_size_for_large_values_in_list() { + // Coverage for the Materialized-rep branch of + // `write_granular_chunk`. The flat-column regression test + // exercises the per-level step; this exercises the + // record-by-record step used when rep levels are present. + // + // Column is `list` (max_def = 1, max_rep = 1) + // with 3 records of 3 large blobs each. The page byte limit is + // smaller than a single blob, so granular mode kicks in, and the + // Materialized-rep arm of `write_granular_chunk` steps from one + // `rep == 0` boundary to the next so a record never spans pages. + let value_size = 32 * 1024; + let page_byte_limit = 16 * 1024; + let values_per_record = 3; + let num_records = 3; + let num_values = values_per_record * num_records; + + // rep levels: 0, 1, 1, 0, 1, 1, 0, 1, 1 + let mut rep_levels = Vec::with_capacity(num_values); + for _ in 0..num_records { + rep_levels.push(0i16); + rep_levels.extend(std::iter::repeat_n(1i16, values_per_record - 1)); + } + let def_levels = vec![1i16; num_values]; + + let mut file = tempfile::tempfile().unwrap(); + let mut write = TrackedWrite::new(&mut file); + let page_writer = Box::new(SerializedPageWriter::new(&mut write)); + let props = Arc::new( + WriterProperties::builder() + .set_writer_version(WriterVersion::PARQUET_1_0) + .set_dictionary_enabled(false) + .set_encoding(Encoding::PLAIN) + .set_data_page_size_limit(page_byte_limit) + .build(), + ); + + let mut data = Vec::with_capacity(num_values); + for i in 0..num_values { + data.push(ByteArray::from(vec![i as u8; value_size])); + } + + let mut writer = get_test_column_writer::(page_writer, 1, 1, props); + writer + .write_batch(&data, Some(&def_levels), Some(&rep_levels)) + .unwrap(); + let r = writer.close().unwrap(); + drop(write); + + let props = ReaderProperties::builder() + .set_backward_compatible_lz4(false) + .build(); + let mut page_reader = Box::new( + SerializedPageReader::new_with_properties( + Arc::new(file), + &r.metadata, + r.rows_written as usize, + None, + Arc::new(props), + ) + .unwrap(), + ); + + let mut data_pages = Vec::new(); + while let Some(page) = page_reader.get_next_page().unwrap() { + if matches!( + page.page_type(), + PageType::DATA_PAGE | PageType::DATA_PAGE_V2 + ) { + data_pages.push((page.buffer().len(), page.num_values())); + } + } + + // The Materialized-rep arm groups levels by record, and each + // record's bytes blow the page byte limit on its own, so we get + // exactly one page per record. + assert_eq!( + data_pages.len(), + num_records, + "expected one data page per record, got {data_pages:?}" + ); + for (bytes, n_values) in &data_pages { + assert_eq!( + *n_values as usize, values_per_record, + "each page must hold a whole record's leaves, got {data_pages:?}" + ); + // Each page is one full record (its leaves cannot be split), + // so allow up to `values_per_record` blobs of payload plus a + // small fudge for level encoding overhead. + let upper_bound = values_per_record * (value_size + 16); + assert!( + *bytes <= upper_bound, + "page size {bytes} exceeds whole-record bound ({upper_bound}); pages {data_pages:?}" + ); + } + } + + #[test] + fn test_column_writer_caps_page_size_with_nullable_large_values() { + // Coverage for `LevelDataRef::value_count` on Materialized def + // levels: a nullable column with mixed nulls and large values. + // `value_count` must return the actual non-null count so the + // byte estimate reflects bytes that will actually be written, + // not the level count. + let value_size = 32 * 1024; + let page_byte_limit = 16 * 1024; + let num_levels = 32; + + // Alternating null / non-null: 16 nulls and 16 values. + let def_levels: Vec = (0..num_levels as i16).map(|i| i % 2).collect(); + let num_values = def_levels.iter().filter(|&&d| d == 1).count(); + + let mut file = tempfile::tempfile().unwrap(); + let mut write = TrackedWrite::new(&mut file); + let page_writer = Box::new(SerializedPageWriter::new(&mut write)); + let props = Arc::new( + WriterProperties::builder() + .set_writer_version(WriterVersion::PARQUET_1_0) + .set_dictionary_enabled(false) + .set_encoding(Encoding::PLAIN) + .set_data_page_size_limit(page_byte_limit) + .build(), + ); + + let mut data = Vec::with_capacity(num_values); + for i in 0..num_values { + data.push(ByteArray::from(vec![i as u8; value_size])); + } + + let mut writer = get_test_column_writer::(page_writer, 1, 0, props); + writer.write_batch(&data, Some(&def_levels), None).unwrap(); + let r = writer.close().unwrap(); + drop(write); + + let props = ReaderProperties::builder() + .set_backward_compatible_lz4(false) + .build(); + let mut page_reader = Box::new( + SerializedPageReader::new_with_properties( + Arc::new(file), + &r.metadata, + r.rows_written as usize, + None, + Arc::new(props), + ) + .unwrap(), + ); + + let mut data_pages = Vec::new(); + while let Some(page) = page_reader.get_next_page().unwrap() { + if matches!( + page.page_type(), + PageType::DATA_PAGE | PageType::DATA_PAGE_V2 + ) { + data_pages.push(page.buffer().len()); + } + } + + // With 16 actual values of 32 KiB each and a 16 KiB page limit, + // every non-null value should get its own page (plus possibly + // adjacent nulls). At minimum, the number of pages must be + // roughly the value count, not 1 (which is what `main` produced). + assert!( + data_pages.len() >= num_values / 2, + "expected at least {} pages for {num_values} large values, got {} pages: {data_pages:?}", + num_values / 2, + data_pages.len(), + ); + // No page contains more than ~one value's worth of payload bytes. + for size in &data_pages { + assert!( + *size <= value_size + 64, + "page size {size} exceeds one-value bound; pages {data_pages:?}" + ); + } + } + + #[test] + fn test_column_writer_dict_enabled_large_values_post_spill() { + // While dictionary encoding is active, `has_dictionary()` short- + // circuits `estimated_value_bytes` — the byte estimate is plain- + // encoded size but dict-encoded pages only store small RLE + // indices, so we'd otherwise shrink pages spuriously. Once the + // dictionary spills (each value is large + unique), plain + // encoding takes over and the byte-budget sub-batch kicks in. + // + // This test makes sure the writer survives that transition and + // produces bounded pages thereafter. + let value_size = 64 * 1024; + let page_byte_limit = 16 * 1024; + let num_rows = 32; + + let mut file = tempfile::tempfile().unwrap(); + let mut write = TrackedWrite::new(&mut file); + let page_writer = Box::new(SerializedPageWriter::new(&mut write)); + let props = Arc::new( + WriterProperties::builder() + .set_writer_version(WriterVersion::PARQUET_1_0) + .set_dictionary_enabled(true) + // Force a small dict so it spills quickly even though + // each value here is unique. + .set_dictionary_page_size_limit(1024) + .set_data_page_size_limit(page_byte_limit) + // Small mini-batches so dict fallback happens part-way + // through the input, leaving subsequent mini-batches to + // exercise the post-spill plain-encoding path that the + // page-size fix actually targets. + .set_write_batch_size(4) + .build(), + ); + + let mut data = Vec::with_capacity(num_rows); + for i in 0..num_rows { + data.push(ByteArray::from(vec![i as u8; value_size])); + } + + let mut writer = get_test_column_writer::(page_writer, 0, 0, props); + writer.write_batch(&data, None, None).unwrap(); + let r = writer.close().unwrap(); + drop(write); + + let props = ReaderProperties::builder() + .set_backward_compatible_lz4(false) + .build(); + let mut page_reader = Box::new( + SerializedPageReader::new_with_properties( + Arc::new(file), + &r.metadata, + r.rows_written as usize, + None, + Arc::new(props), + ) + .unwrap(), + ); + + let mut data_pages = Vec::new(); + while let Some(page) = page_reader.get_next_page().unwrap() { + if matches!( + page.page_type(), + PageType::DATA_PAGE | PageType::DATA_PAGE_V2 + ) { + data_pages.push(page.buffer().len()); + } + } + + // After spill, plain encoding writes one ~64 KiB value per page. + // Without the fix, post-spill writes still buffered all 32 + // values into a single ~2 MiB page. + assert!( + data_pages.len() >= num_rows / 2, + "expected >= {} data pages after dict spill, got {} ({data_pages:?})", + num_rows / 2, + data_pages.len(), + ); + for size in &data_pages { + assert!( + *size <= value_size + 64, + "page size {size} exceeds one-value bound; pages {data_pages:?}" + ); + } + } + + #[test] + fn test_column_writer_caps_dictionary_page_size() { + // A column of large *distinct* values with dictionary encoding on: + // the dictionary page accumulates the values themselves, and its + // spill check runs only once per mini-batch. Without bounding the + // dictionary-encoding mini-batch, one `write_batch_size` mini-batch + // would intern `write_batch_size * value_size` bytes into the + // dictionary page before the check fires (~16 MiB here). The chunker + // must sub-batch the dictionary-encoding phase too. + let value_size = 8 * 1024; + let dict_page_limit = 64 * 1024; + let num_rows = 2048; + + let mut file = tempfile::tempfile().unwrap(); + let mut write = TrackedWrite::new(&mut file); + let page_writer = Box::new(SerializedPageWriter::new(&mut write)); + let props = Arc::new( + WriterProperties::builder() + .set_writer_version(WriterVersion::PARQUET_1_0) + .set_dictionary_enabled(true) + .set_dictionary_page_size_limit(dict_page_limit) + .build(), + ); + + let mut data = Vec::with_capacity(num_rows); + for i in 0..num_rows { + // each value distinct, so the dictionary cannot dedup them + let mut v = vec![0u8; value_size]; + v[..8].copy_from_slice(&(i as u64).to_le_bytes()); + data.push(ByteArray::from(v)); + } + + let mut writer = get_test_column_writer::(page_writer, 0, 0, props); + writer.write_batch(&data, None, None).unwrap(); + let r = writer.close().unwrap(); + drop(write); + + let props = ReaderProperties::builder() + .set_backward_compatible_lz4(false) + .build(); + let mut page_reader = Box::new( + SerializedPageReader::new_with_properties( + Arc::new(file), + &r.metadata, + r.rows_written as usize, + None, + Arc::new(props), + ) + .unwrap(), + ); + + let mut dict_page_size = 0; + while let Some(page) = page_reader.get_next_page().unwrap() { + if page.page_type() == PageType::DICTIONARY_PAGE { + dict_page_size = dict_page_size.max(page.buffer().len()); + } + } + + assert!( + dict_page_size > 0, + "expected the column to dictionary-encode" + ); + // Bounded near the limit (~2x from the post-mini-batch check). Before + // the fix the dictionary page reached num_rows * value_size (~16 MiB, + // 256x the limit). + assert!( + dict_page_size <= 3 * dict_page_limit, + "dictionary page {dict_page_size} exceeds 3x the {dict_page_limit} limit", + ); + } + + #[test] + fn test_column_writer_caps_page_size_for_fixed_len_byte_array() { + // Coverage for `ParquetValueType::byte_size` override on + // `FixedLenByteArray`. With `type_length = 1`, each plain-encoded + // value is one byte, so a 4-byte page byte limit forces the + // sub-batch sizer to write ~4 values per page rather than one + // page for the whole batch. + let page_byte_limit = 4; + let num_values = 128; + + let mut file = tempfile::tempfile().unwrap(); + let mut write = TrackedWrite::new(&mut file); + let page_writer = Box::new(SerializedPageWriter::new(&mut write)); + let props = Arc::new( + WriterProperties::builder() + .set_writer_version(WriterVersion::PARQUET_1_0) + .set_dictionary_enabled(false) + .set_encoding(Encoding::PLAIN) + .set_data_page_size_limit(page_byte_limit) + .build(), + ); + + let mut data = Vec::with_capacity(num_values); + for i in 0..num_values { + let mut fla = FixedLenByteArray::default(); + fla.set_data(Bytes::from(vec![i as u8])); + data.push(fla); + } + + let mut writer = get_test_column_writer::(page_writer, 0, 0, props); + writer.write_batch(&data, None, None).unwrap(); + let r = writer.close().unwrap(); + drop(write); + + let props = ReaderProperties::builder() + .set_backward_compatible_lz4(false) + .build(); + let mut page_reader = Box::new( + SerializedPageReader::new_with_properties( + Arc::new(file), + &r.metadata, + r.rows_written as usize, + None, + Arc::new(props), + ) + .unwrap(), + ); + + let mut data_pages = Vec::new(); + while let Some(page) = page_reader.get_next_page().unwrap() { + if matches!( + page.page_type(), + PageType::DATA_PAGE | PageType::DATA_PAGE_V2 + ) { + data_pages.push(page.buffer().len()); + } + } + + // Without the fix this is a single 128-byte page; with the fix + // the byte budget caps each page at ~`page_byte_limit` bytes. + assert!( + data_pages.len() >= num_values / 8, + "expected pages capped by byte budget, got {data_pages:?}" + ); + for size in &data_pages { + assert!( + *size <= page_byte_limit * 4, + "page size {size} larger than expected; pages {data_pages:?}" + ); + } + } + #[test] fn test_bool_statistics() { let stats = statistics_roundtrip::(&[true, false, false, true]); @@ -4719,6 +5332,42 @@ mod tests { } } + #[test] + fn test_level_data_ref_value_count() { + // `value_count` is what the byte-budget chunker uses to convert a + // chunk's level span into a leaf-value count. It must work for any + // column shape — flat, nullable, or nested — because the leaf + // values array is decoupled from the rep/def level stream. + let max_def = 2; + // Non-nullable / unrepeated: no def levels materialized — every + // level is a value. + assert_eq!(LevelDataRef::Absent.value_count(64, max_def), 64); + // Uniform run of present values, and of nulls. + assert_eq!( + LevelDataRef::Uniform { + value: max_def, + count: 40 + } + .value_count(40, max_def), + 40 + ); + assert_eq!( + LevelDataRef::Uniform { + value: max_def - 1, + count: 40 + } + .value_count(40, max_def), + 0 + ); + // Materialized def levels (nullable / nested): only levels equal to + // `max_def` are values; empty-list / null levels are not. + let levels = [2i16, 0, 2, 1, 2, 2, 0]; + assert_eq!( + LevelDataRef::Materialized(&levels).value_count(levels.len(), max_def), + 4 + ); + } + #[test] fn test_uniform_def_levels_all_null() { // All-null column: def_level=0 (null) for every slot, no values written. diff --git a/parquet/tests/arrow_writer_layout.rs b/parquet/tests/arrow_writer_layout.rs index b9d997beb289..183590f9b847 100644 --- a/parquet/tests/arrow_writer_layout.rs +++ b/parquet/tests/arrow_writer_layout.rs @@ -408,16 +408,16 @@ fn test_string() { columns: vec![ColumnChunk { pages: vec![ Page { - rows: 130, + rows: 125, page_header_size: 38, - compressed_size: 138, + compressed_size: 114, encoding: Encoding::RLE_DICTIONARY, page_type: PageType::DATA_PAGE, }, Page { - rows: 1250, + rows: 1255, page_header_size: 40, - compressed_size: 10000, + compressed_size: 10040, encoding: Encoding::PLAIN, page_type: PageType::DATA_PAGE, }, @@ -429,10 +429,15 @@ fn test_string() { page_type: PageType::DATA_PAGE, }, ], + // The byte-budget chunker now sub-batches the + // dictionary-encoding phase, so the dictionary page is + // bounded at the 1000-byte limit instead of overshooting + // to 1040 — the dictionary spills one mini-batch earlier + // (125 rows rather than 130). dictionary_page: Some(Page { - rows: 130, + rows: 125, page_header_size: 38, - compressed_size: 1040, + compressed_size: 1000, encoding: Encoding::PLAIN, page_type: PageType::DICTIONARY_PAGE, }),