From e8c48c5d56868f1a61b7b7511367774386f32a3c Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 13 May 2026 20:42:07 -0700 Subject: [PATCH 01/17] bench(parquet): add short and large string write benches MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `short_string_non_null` writes 1M 8-byte strings — exercises the BYTE_ARRAY write path where per-value bookkeeping cost is largest. `large_string_non_null` writes 1024 rows of 256 KiB strings — the case where individual values exceed the default data-page byte limit, so a default `write_batch_size`-row chunk would otherwise buffer hundreds of MiB before any page-size check fires. Both fill gaps in the existing arrow_writer benches, which only cover random-length strings. Co-Authored-By: Claude Opus 4.7 --- parquet/benches/arrow_writer.rs | 35 ++++++++++++++++++++++++++++++++- 1 file changed, 34 insertions(+), 1 deletion(-) diff --git a/parquet/benches/arrow_writer.rs b/parquet/benches/arrow_writer.rs index 9b22bb04b3e7..3636cb040253 100644 --- a/parquet/benches/arrow_writer.rs +++ b/parquet/benches/arrow_writer.rs @@ -32,7 +32,7 @@ use std::sync::Arc; use arrow::datatypes::*; use arrow::util::bench_util::{create_f16_array, create_f32_array, create_f64_array}; use arrow::{record_batch::RecordBatch, util::data_gen::*}; -use arrow_array::RecordBatchOptions; +use arrow_array::{RecordBatchOptions, StringArray}; use parquet::errors::Result; use parquet::file::properties::{CdcOptions, WriterProperties, WriterVersion}; @@ -100,6 +100,29 @@ fn create_string_bench_batch( )?) } +/// 1 M short, fixed-width 8-byte strings. Exercises the BYTE_ARRAY hot path +/// for the case where individual values are small enough that the byte-budget +/// based sub-batch sizing in `write_batch_internal` should always resolve to +/// the full chunk (no granular splitting, no regression vs. current behavior). +fn create_short_string_bench_batch(size: usize) -> Result { + let array = Arc::new(StringArray::from_iter_values( + (0..size).map(|i| format!("{i:08}")), + )) as _; + Ok(RecordBatch::try_from_iter([("col", array)])?) +} + +/// `size` rows of `value_size`-byte strings. Exercises the BYTE_ARRAY path +/// where individual values are large enough that batching the default +/// `write_batch_size` of them would blow the page byte limit by orders of +/// magnitude — the case the page-size fix targets. +fn create_large_string_bench_batch(size: usize, value_size: usize) -> Result { + let value = "x".repeat(value_size); + let array = Arc::new(StringArray::from_iter_values( + (0..size).map(|_| value.as_str()), + )) as _; + Ok(RecordBatch::try_from_iter([("col", array)])?) +} + fn create_string_and_binary_view_bench_batch( size: usize, null_density: f32, @@ -392,6 +415,16 @@ fn create_batches() -> Vec<(&'static str, RecordBatch)> { let batch = create_string_bench_batch(BATCH_SIZE, 0.25, 0.75).unwrap(); batches.push(("string", batch)); + let batch = create_short_string_bench_batch(BATCH_SIZE).unwrap(); + batches.push(("short_string_non_null", batch)); + + // 1024 rows × 256 KiB = 256 MiB total. With the default 1 MiB page byte + // limit, this is the case where the page-size fix kicks in: each value + // needs its own page, and `write_batch_size = 1024` would otherwise + // buffer all 256 MiB before the post-write check runs. + let batch = create_large_string_bench_batch(1024, 256 * 1024).unwrap(); + batches.push(("large_string_non_null", batch)); + let batch = create_string_and_binary_view_bench_batch(BATCH_SIZE, 0.25, 0.75).unwrap(); batches.push(("string_and_binary_view", batch)); From 57952c8f5b74c87e85961aa37e54bb380e507a31 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 13 May 2026 22:37:45 -0700 Subject: [PATCH 02/17] fix(parquet): bound data page byte size for large variable-width values MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The parquet column writer only checks the data page byte limit AFTER each mini-batch finishes writing, and mini-batches are sized by row count (`write_batch_size`, default 1024). For BYTE_ARRAY columns with large values — e.g. a 5 MiB image blob per row — a single mini-batch can buffer multiple GiB into one data page before the configured byte limit is even consulted. Pages can exceed the limit by orders of magnitude. Make the mini-batch size byte-budget aware: - For each chunk, ask the encoder how many of the next values fit in one page byte budget. If everything fits, stay on the existing batched fast path (zero behavior change for small values). - If not, sub-batch — for flat columns, one mini-batch per `k` values where `k` is the fit count; for repeated columns, one mini-batch per record (since a record cannot span data pages). Skip the check while dictionary encoding is active: the byte estimate is plain-encoded size, but a dict-encoded data page only stores small RLE indices, so the estimate would spuriously shrink pages. Dictionary fallback bounds dict-encoded pages independently. The encoder hook is `count_values_within_byte_budget(values, offset, len, byte_budget) -> Option` plus a `_gather` variant for the arrow path, mirroring the existing `write`/`write_gather` split. Returning `None` means "no cheap estimate available; stay batched." Implementation details: - `ParquetValueType::byte_size(&self)` returns the per-value plain- encoded byte size. Defaults to `size_of::()`; overridden for `ByteArray` (`len + 4`) and `FixedLenByteArray` (`len`). - Standard `ColumnValueEncoderImpl::count_values_within_byte_budget` short-circuits to `(byte_budget / size_of::()).max(1).min(n)` for fixed-size physical types — one division, no walk. For BYTE_ARRAY and FIXED_LEN_BYTE_ARRAY it scans values cumulatively and exits at the first one to push the sum past the budget, which also catches skewed distributions (a single oversized value among many small ones is detected wherever it lands). - Arrow `ByteArrayEncoder::count_values_within_byte_budget_gather` uses a two-stage walk on `GenericByteArray` types: stage 1 computes the total in O(1) via one subtraction on the offsets buffer when indices are contiguous (the case for every non-null column), returning immediately if the chunk fits. Stage 2 walks per-index lengths from the offsets buffer (still no slice/UTF-8 construction) when stage 1 doesn't conclude. View/dict/fixed-size-binary arrays fall through to a per-value walk via `ArrayAccessor::value`. - `LevelDataRef::value_count(total, max_def)` reports how many levels in the chunk correspond to actual non-null values. Used to bridge the encoder's value-count answer back into level-count subdivision for nullable columns. Tests in `column::writer::tests`: - `test_column_writer_caps_page_size_for_large_byte_array_values` — flat regression: 64 × 64 KiB BYTE_ARRAY values vs a 16 KiB page limit produces one page per value rather than a single ~4 MiB page. - `test_column_writer_caps_page_size_for_large_values_in_list` — Materialized-rep branch of `write_granular_chunk`: list of 3 large blobs × 3 records, asserts one page per record (no record splits). - `test_column_writer_caps_page_size_with_nullable_large_values` — `LevelDataRef::value_count` on Materialized def levels with mixed nulls. - `test_column_writer_dict_enabled_large_values_post_spill` — `has_dictionary()` short-circuit while dict is active, then byte- budget sub-batching after dict spill. - `test_column_writer_caps_page_size_for_fixed_len_byte_array` — `FixedLenByteArray::byte_size` override. Tests in `arrow::arrow_writer::tests`: - `test_arrow_writer_caps_page_size_for_large_strings` — end-to-end through `ArrowWriter` exercising the offsets-buffer fast path. - `test_arrow_writer_caps_page_size_for_large_string_view` — view-array fallback (Utf8View has no contiguous offsets buffer). - `test_arrow_writer_all_null_string_column` — `value_count` Uniform branch under arrow's level optimization; asserts null_count and page coverage rather than just non-empty output. - `test_arrow_writer_granular_mode_roundtrip` — value-fidelity round- trip: mix small + large strings so the byte-budget cutoff lands mid-chunk, write through `ArrowWriter`, read back with `ParquetRecordBatchReader`, assert each string matches. Bench results vs `main` (5-run medians on a noisy laptop, run-to-run variance ~±2%): - `primitive/default` (i32 25% null): −0.4% to +1.3% - `primitive_non_null/default`: −2.3% to +0.4% - `bool_non_null/default`: +1.8% to +15.9% (highly noisy on this machine) - `string/default`: +3.3% to +4.7% - `short_string_non_null/default` (new, 1M × 8 B): +1.0% to +6.4% - `large_string_non_null/default` (new, 1024 × 256 KiB): +0.5% to +2.7% — the case the fix targets - `string_dictionary/default`: +3.3% to +6.4% - `string_non_null/default`: −1.6% to +2.3% All within laptop variance for the fast-path (small-value) cases. The fix's intended case — large variable-width values — now correctly bounds page sizes. Co-Authored-By: Claude Opus 4.7 --- parquet/src/arrow/arrow_writer/byte_array.rs | 125 +++- parquet/src/arrow/arrow_writer/mod.rs | 239 +++++++ parquet/src/column/writer/encoder.rs | 93 +++ parquet/src/column/writer/mod.rs | 619 ++++++++++++++++++- parquet/src/data_type.rs | 25 + 5 files changed, 1092 insertions(+), 9 deletions(-) diff --git a/parquet/src/arrow/arrow_writer/byte_array.rs b/parquet/src/arrow/arrow_writer/byte_array.rs index f56f9570adfb..b257b867191b 100644 --- a/parquet/src/arrow/arrow_writer/byte_array.rs +++ b/parquet/src/arrow/arrow_writer/byte_array.rs @@ -30,10 +30,12 @@ use crate::geospatial::statistics::GeospatialStatistics; use crate::schema::types::ColumnDescPtr; use crate::util::bit_util::num_required_bits; use crate::util::interner::{Interner, Storage}; +use arrow_array::types::ByteArrayType; use arrow_array::{ Array, ArrayAccessor, BinaryArray, BinaryViewArray, DictionaryArray, FixedSizeBinaryArray, - LargeBinaryArray, LargeStringArray, StringArray, StringViewArray, + GenericByteArray, LargeBinaryArray, LargeStringArray, StringArray, StringViewArray, }; +use arrow_buffer::ArrowNativeType; use arrow_schema::DataType; macro_rules! downcast_dict_impl { @@ -475,6 +477,58 @@ impl ColumnValueEncoder for ByteArrayEncoder { Ok(()) } + fn count_values_within_byte_budget_gather( + values: &Self::Values, + indices: &[usize], + byte_budget: usize, + ) -> Option { + // `ByteArrayEncoder` only ever writes via `write_gather`, so this + // is the relevant method. + // + // Two-stage walk for the simple offset-buffer byte array types: + // 1. If indices are contiguous, compute the total payload in + // O(1) via a single subtraction on the offsets buffer. + // When the total fits the budget — the overwhelmingly + // common "small values" case — return immediately. + // 2. Otherwise, walk per-value byte sizes from the offsets + // buffer (still cheap, no slice/UTF-8 construction) and + // exit at the first value that pushes the cumulative sum + // past the budget. This bounds skewed distributions: an + // outlier value is caught wherever it lands in the chunk. + let count = match values.data_type() { + DataType::Utf8 => count_within_budget_offsets( + values.as_any().downcast_ref::().unwrap(), + indices, + byte_budget, + ), + DataType::LargeUtf8 => count_within_budget_offsets( + values.as_any().downcast_ref::().unwrap(), + indices, + byte_budget, + ), + DataType::Binary => count_within_budget_offsets( + values.as_any().downcast_ref::().unwrap(), + indices, + byte_budget, + ), + DataType::LargeBinary => count_within_budget_offsets( + values.as_any().downcast_ref::().unwrap(), + indices, + byte_budget, + ), + // Utf8View/BinaryView/FixedSizeBinary/Dictionary fall through + // to the per-value walk via `ArrayAccessor::value`. + _ => downcast_op!( + values.data_type(), + values, + count_within_budget_accessor, + indices, + byte_budget + ), + }; + Some(count) + } + fn num_values(&self) -> usize { match &self.dict_encoder { Some(encoder) => encoder.indices.len(), @@ -587,6 +641,75 @@ where } } +/// Cumulative-scan fallback used for byte array types that don't expose +/// a single contiguous offsets buffer — view arrays, dictionary arrays, +/// fixed-size binary. Returns the largest `k` such that the first `k` +/// values picked out by `indices` encode to at most `byte_budget` bytes +/// (or `indices.len()` if they all fit, or `1` if a single value alone +/// exceeds the budget). +/// +/// Free function so it can be used with `downcast_op!`. +fn count_within_budget_accessor(values: T, indices: &[usize], byte_budget: usize) -> usize +where + T: ArrayAccessor + Copy, + T::Item: AsRef<[u8]>, +{ + let mut cum: usize = 0; + for (i, idx) in indices.iter().enumerate() { + let value_len = values.value(*idx).as_ref().len() + std::mem::size_of::(); + cum = cum.saturating_add(value_len); + if cum > byte_budget { + return i.max(1); + } + } + indices.len() +} + +/// Two-stage fast path for `GenericByteArray` +/// (Utf8/LargeUtf8/Binary/LargeBinary): +/// +/// 1. If `indices` are contiguous and sorted, the total payload size +/// is one subtraction on the offsets buffer +/// (`offsets[last+1] - offsets[first]`). If the chunk fits within +/// `byte_budget`, return `indices.len()` immediately — the +/// overwhelmingly common case for non-null columns. +/// 2. Otherwise, walk per-index lengths from the offsets buffer +/// directly (no slice/UTF-8 construction) and stop at the first +/// value that pushes the cumulative sum past the budget. +fn count_within_budget_offsets( + values: &GenericByteArray, + indices: &[usize], + byte_budget: usize, +) -> usize { + if indices.is_empty() { + return 0; + } + let n = indices.len(); + let first = indices[0]; + let last = indices[n - 1]; + let offsets = values.value_offsets(); + let prefix_overhead = std::mem::size_of::(); + + // Stage 1: O(1) contiguous total. Skips Stage 2 in the common case. + if last >= first && last - first + 1 == n { + let payload = (offsets[last + 1] - offsets[first]).as_usize(); + if payload + n * prefix_overhead <= byte_budget { + return n; + } + } + + // Stage 2: scan per-index lengths from the offsets buffer. + let mut cum: usize = 0; + for (i, idx) in indices.iter().enumerate() { + let len = (offsets[idx + 1] - offsets[*idx]).as_usize() + prefix_overhead; + cum = cum.saturating_add(len); + if cum > byte_budget { + return i.max(1); + } + } + n +} + /// Computes the min and max for the provided array and indices /// /// This is a free function so it can be used with `downcast_op!` diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 04a798391429..bc038aa3b0c0 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -4901,6 +4901,245 @@ mod tests { assert_eq!(get_dict_page_size(col1_meta), 1024 * 1024 * 4); } + #[test] + fn test_arrow_writer_caps_page_size_for_large_strings() { + // End-to-end coverage for `ByteArrayEncoder::estimated_value_bytes_gather` + // and the offsets-buffer fast path in `estimate_byte_size_offsets`. + // + // The standard column-writer regression test covers the same + // logic for the non-arrow path; this one drives writes through + // `ArrowWriter`, which is what real users hit and which uses + // `write_gather` with `non_null_indices` (always contiguous for + // a non-null column). + let value_size = 64 * 1024; + let page_byte_limit = 16 * 1024; + let num_rows = 32; + + let schema = Arc::new(Schema::new(vec![Field::new( + "col", + ArrowDataType::Utf8, + false, + )])); + let strings: Vec = (0..num_rows).map(|_| "x".repeat(value_size)).collect(); + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(StringArray::from(strings)) as _], + ) + .unwrap(); + + let props = WriterProperties::builder() + .set_dictionary_enabled(false) + .set_data_page_size_limit(page_byte_limit) + .build(); + let mut writer = ArrowWriter::try_new(Vec::new(), schema, Some(props)).unwrap(); + writer.write(&batch).unwrap(); + let data = Bytes::from(writer.into_inner().unwrap()); + + let mut metadata = ParquetMetaDataReader::new(); + metadata.try_parse(&data).unwrap(); + let metadata = metadata.finish().unwrap(); + let col_meta = metadata.row_group(0).column(0); + let mut reader = + SerializedPageReader::new(Arc::new(data.clone()), col_meta, num_rows, None).unwrap(); + + let mut data_pages = Vec::new(); + while let Some(page) = reader.get_next_page().unwrap() { + if matches!(page, Page::DataPage { .. } | Page::DataPageV2 { .. }) { + data_pages.push(page.buffer().len()); + } + } + + assert!( + data_pages.len() >= num_rows / 2, + "expected at least {} data pages for {num_rows} large strings via ArrowWriter, got {} ({data_pages:?})", + num_rows / 2, + data_pages.len(), + ); + for size in &data_pages { + assert!( + *size <= value_size + 64, + "page size {size} exceeds one-value bound; pages {data_pages:?}" + ); + } + } + + #[test] + fn test_arrow_writer_granular_mode_roundtrip() { + // Granular mode subdivides chunks and writes more pages than + // `main`. Make sure the data we write back is bit-identical to + // what went in — page-count assertions elsewhere only prove + // pages were cut, not that the encoded data is correct. + // + // Mix value sizes so that the cumulative-byte-budget cutoff + // lands mid-chunk, exercising both batched and granular paths + // within the same `write_batch_internal` call. + let small = "tiny".to_string(); + let big = "x".repeat(64 * 1024); + let strings: Vec = (0..256) + .map(|i| { + if i % 16 == 0 { + big.clone() + } else { + small.clone() + } + }) + .collect(); + + let schema = Arc::new(Schema::new(vec![Field::new( + "col", + ArrowDataType::Utf8, + false, + )])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(StringArray::from(strings.clone())) as _], + ) + .unwrap(); + + let props = WriterProperties::builder() + .set_dictionary_enabled(false) + .set_data_page_size_limit(16 * 1024) + .build(); + let mut writer = ArrowWriter::try_new(Vec::new(), schema, Some(props)).unwrap(); + writer.write(&batch).unwrap(); + let data = Bytes::from(writer.into_inner().unwrap()); + + let mut reader = ParquetRecordBatchReader::try_new(data, 1024).unwrap(); + let read = reader.next().unwrap().unwrap(); + assert!(reader.next().is_none(), "expected one batch"); + let col = read + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(col.len(), strings.len()); + for (i, expected) in strings.iter().enumerate() { + assert_eq!( + col.value(i), + expected.as_str(), + "value mismatch at index {i}" + ); + } + } + + #[test] + fn test_arrow_writer_caps_page_size_for_large_string_view() { + // Coverage for the view/dict fallback in + // `ByteArrayEncoder::count_values_within_byte_budget_gather`. + // `Utf8View` arrays don't expose a single contiguous offsets + // buffer, so the gather method falls back to the per-value walk + // via `ArrayAccessor::value`. + use arrow_array::StringViewArray; + let value_size = 64 * 1024; + let page_byte_limit = 16 * 1024; + let num_rows = 32usize; + + let schema = Arc::new(Schema::new(vec![Field::new( + "col", + ArrowDataType::Utf8View, + false, + )])); + let strings: Vec = (0..num_rows).map(|_| "x".repeat(value_size)).collect(); + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(StringViewArray::from_iter_values( + strings.iter().map(|s| s.as_str()), + )) as _], + ) + .unwrap(); + + let props = WriterProperties::builder() + .set_dictionary_enabled(false) + .set_data_page_size_limit(page_byte_limit) + .build(); + let mut writer = ArrowWriter::try_new(Vec::new(), schema, Some(props)).unwrap(); + writer.write(&batch).unwrap(); + let data = Bytes::from(writer.into_inner().unwrap()); + + let mut metadata = ParquetMetaDataReader::new(); + metadata.try_parse(&data).unwrap(); + let metadata = metadata.finish().unwrap(); + let col_meta = metadata.row_group(0).column(0); + let mut reader = + SerializedPageReader::new(Arc::new(data.clone()), col_meta, num_rows, None).unwrap(); + + let mut data_pages = Vec::new(); + while let Some(page) = reader.get_next_page().unwrap() { + if matches!(page, Page::DataPage { .. } | Page::DataPageV2 { .. }) { + data_pages.push(page.buffer().len()); + } + } + assert!( + data_pages.len() >= num_rows / 2, + "expected pages bounded by byte budget for Utf8View, got {data_pages:?}" + ); + for size in &data_pages { + assert!( + *size <= value_size + 64, + "page size {size} exceeds one-value bound; pages {data_pages:?}" + ); + } + } + + #[test] + fn test_arrow_writer_all_null_string_column() { + // The `LevelDataRef::value_count` Uniform branch with + // `value != max_def` (entirely-null chunk) must return 0 so the + // sub-batch sizer short-circuits to batch mode without trying + // to estimate byte budgets for non-existent values. + let num_rows = 1024; + let schema = Arc::new(Schema::new(vec![Field::new( + "col", + ArrowDataType::Utf8, + true, + )])); + let nulls: Vec> = vec![None; num_rows]; + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(StringArray::from(nulls)) as _], + ) + .unwrap(); + + let props = WriterProperties::builder() + .set_dictionary_enabled(false) + .set_data_page_size_limit(16 * 1024) + .build(); + let mut writer = ArrowWriter::try_new(Vec::new(), schema, Some(props)).unwrap(); + writer.write(&batch).unwrap(); + let data = Bytes::from(writer.into_inner().unwrap()); + + // Re-parse the file: row group has one column, every row is + // null, all data pages report `num_rows / page_count` rows. + let mut metadata = ParquetMetaDataReader::new(); + metadata.try_parse(&data).unwrap(); + let metadata = metadata.finish().unwrap(); + let row_group = metadata.row_group(0); + let col_meta = row_group.column(0); + assert_eq!(row_group.num_rows() as usize, num_rows); + // Statistics record `null_count = num_rows` — proves every value + // was written as null. + if let Some(stats) = col_meta.statistics() { + assert_eq!( + stats.null_count_opt().unwrap_or(0) as usize, + num_rows, + "expected all-null column to report null_count = num_rows" + ); + } + + let mut reader = + SerializedPageReader::new(Arc::new(data.clone()), col_meta, num_rows, None).unwrap(); + let mut total_values = 0u32; + while let Some(page) = reader.get_next_page().unwrap() { + if matches!(page, Page::DataPage { .. } | Page::DataPageV2 { .. }) { + total_values += page.num_values(); + } + } + assert_eq!( + total_values as usize, num_rows, + "expected every level position to be represented in some page" + ); + } + struct WriteBatchesShape { num_batches: usize, rows_per_batch: usize, diff --git a/parquet/src/column/writer/encoder.rs b/parquet/src/column/writer/encoder.rs index 2ea3376ae708..c39b96f3eb0e 100644 --- a/parquet/src/column/writer/encoder.rs +++ b/parquet/src/column/writer/encoder.rs @@ -90,6 +90,42 @@ pub trait ColumnValueEncoder { /// Write the values at the indexes in `indices` to this [`ColumnValueEncoder`] fn write_gather(&mut self, values: &Self::Values, indices: &[usize]) -> Result<()>; + /// Returns the largest `k` such that the first `k` values in + /// `values[offset..offset + len]` encode to at most `byte_budget` + /// bytes — i.e. how many values fit in a single page byte budget. + /// + /// Returns `len` if every value fits. Returns at least 1 if a single + /// value alone exceeds the budget, matching parquet's "at least one + /// value per data page" rule. + /// + /// `None` means "no cheap estimate available"; the caller stays on + /// the batched fast path and lets the post-write + /// `should_add_data_page` check handle bounding. + /// + /// Implementations should short-circuit aggressively: the typical + /// case is "everything fits, return `len`", and the next-most-common + /// case is "one wide value, return 1." The variable-width walk only + /// needs to be precise when the chunk is genuinely near the budget. + fn count_values_within_byte_budget( + _values: &Self::Values, + _offset: usize, + _len: usize, + _byte_budget: usize, + ) -> Option { + None + } + + /// As [`Self::count_values_within_byte_budget`] but using gather + /// `indices` rather than a contiguous range. Returns the number of + /// `indices` that fit, not the maximum index value. + fn count_values_within_byte_budget_gather( + _values: &Self::Values, + _indices: &[usize], + _byte_budget: usize, + ) -> Option { + None + } + /// Returns the number of buffered values fn num_values(&self) -> usize; @@ -247,6 +283,63 @@ impl ColumnValueEncoder for ColumnValueEncoderImpl { self.write_slice(&slice) } + fn count_values_within_byte_budget( + values: &[T::T], + offset: usize, + len: usize, + byte_budget: usize, + ) -> Option { + // Clamp so that a caller-supplied `len` that overruns the input + // (e.g. a level/value mismatch the encoder will reject later) + // returns an estimate instead of panicking here. + let end = (offset + len).min(values.len()); + let start = offset.min(end); + let n = end - start; + // Fixed-size physical types have a constant per-value byte cost, + // so the answer is one division — no need to walk the slice. + let phys = ::PHYSICAL_TYPE; + if phys != Type::BYTE_ARRAY && phys != Type::FIXED_LEN_BYTE_ARRAY { + let per = std::mem::size_of::().max(1); + let fits = (byte_budget / per).max(1); + return Some(fits.min(n)); + } + // Variable-width: scan, accumulate, exit at the first value that + // pushes us past the budget. This both bounds skewed + // distributions (one outlier among small values is caught when + // it lands, regardless of position) and short-circuits when an + // early value alone exceeds the budget. + let mut cum: usize = 0; + for (i, v) in values[start..end].iter().enumerate() { + cum = cum.saturating_add(v.byte_size()); + if cum > byte_budget { + return Some(i.max(1)); + } + } + Some(n) + } + + fn count_values_within_byte_budget_gather( + values: &[T::T], + indices: &[usize], + byte_budget: usize, + ) -> Option { + let phys = ::PHYSICAL_TYPE; + if phys != Type::BYTE_ARRAY && phys != Type::FIXED_LEN_BYTE_ARRAY { + let per = std::mem::size_of::().max(1); + let fits = (byte_budget / per).max(1); + return Some(fits.min(indices.len())); + } + let mut cum: usize = 0; + for (i, idx) in indices.iter().enumerate() { + let Some(v) = values.get(*idx) else { continue }; + cum = cum.saturating_add(v.byte_size()); + if cum > byte_budget { + return Some(i.max(1)); + } + } + Some(indices.len()) + } + fn num_values(&self) -> usize { self.num_values } diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index 5d14ac6856f9..4404c2efd195 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -373,6 +373,24 @@ impl<'a> LevelDataRef<'a> { Self::Uniform { value, .. } => Self::Uniform { value, count: len }, } } + + /// Count of positions in this slice that represent an actual value + /// (definition level equal to `max_def`). `Absent` means the column has + /// `max_def == 0` and every position is a value, so the implicit count + /// is the caller-supplied `total`. + pub(crate) fn value_count(self, total: usize, max_def: i16) -> usize { + match self { + Self::Absent => total, + Self::Materialized(values) => values.iter().filter(|&&d| d == max_def).count(), + Self::Uniform { value, count } => { + if value == max_def { + count + } else { + 0 + } + } + } + } } /// Typed column writer for a primitive column. @@ -544,6 +562,29 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { } else { self.props.write_batch_size() }; + let page_byte_limit = self.props.column_data_page_size_limit(self.descr.path()); + let max_def = self.descr.max_def_level(); + // Static fast path: for fixed-width physical types, plain-encoded + // size per value is known at column-open time. If `base_batch_size` + // values can fit in `page_byte_limit` outright, no chunk can ever + // overflow and we can skip the per-chunk byte-budget check + // entirely. This is the common case for every numeric/bool column + // — its per-chunk overhead disappears, leaving only the byte + // estimator to BYTE_ARRAY (where it's actually needed) and to the + // rare FIXED_LEN_BYTE_ARRAY column with wide fixed elements and a + // small page byte limit. + let static_bytes_per_value = match self.descr.physical_type() { + Type::BOOLEAN => Some(1), + Type::INT32 | Type::FLOAT => Some(std::mem::size_of::()), + Type::INT64 | Type::DOUBLE => Some(std::mem::size_of::()), + Type::INT96 => Some(12), + Type::FIXED_LEN_BYTE_ARRAY => Some(self.descr.type_length().max(0) as usize), + Type::BYTE_ARRAY => None, + }; + let max_chunk_bytes_static = static_bytes_per_value + .map(|b| b.saturating_mul(base_batch_size)) + .unwrap_or(usize::MAX); + let static_always_fits = max_chunk_bytes_static <= page_byte_limit; while levels_offset < num_levels { let mut end_offset = num_levels.min(levels_offset + base_batch_size); @@ -554,14 +595,102 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { } } - values_offset += self.write_mini_batch( - values, - values_offset, - value_indices, - end_offset - levels_offset, - def_levels.slice(levels_offset, end_offset - levels_offset), - rep_levels.slice(levels_offset, end_offset - levels_offset), - )?; + let chunk_size = end_offset - levels_offset; + let chunk_def = def_levels.slice(levels_offset, chunk_size); + let chunk_rep = rep_levels.slice(levels_offset, chunk_size); + + // Pick a sub-batch size so that one mini-batch maps to roughly + // one page byte budget's worth of values. Without this, the + // post-write data page byte check only fires after a full + // `write_batch_size`-row chunk is buffered, so a 1024-row chunk + // of 5 MiB values can push a page to ~5 GiB before the check + // even runs. + // + // For typical small values (e.g. 50–100 B strings), the budget + // accommodates more rows than the chunk holds, so this resolves + // to `chunk_size` and the loop stays on the zero-overhead + // batched fast path. Only when individual values are large + // enough that 1024 of them would blow the page byte limit does + // the sub-batch shrink below `chunk_size`. + // + // Skip the check entirely while dictionary encoding is active: + // `estimated_value_bytes` returns plain-encoded size, but a + // dict-encoded data page only stores small RLE indices, so the + // raw byte estimate would spuriously shrink pages. Dictionary + // fallback bounds dict-encoded pages independently. + let sub_batch_size = + if static_always_fits || self.encoder.has_dictionary() || chunk_size == 0 { + chunk_size + } else { + let vals_in_chunk = chunk_def.value_count(chunk_size, max_def); + if vals_in_chunk == 0 { + chunk_size + } else { + // Ask the encoder how many of the next values fit in + // one page byte budget. Dispatch on whether the + // caller supplied gather indices; this mirrors how + // `write_mini_batch` picks between `write_gather` and + // `write`. + let fit = match value_indices { + Some(idx) => { + let end = (values_offset + vals_in_chunk).min(idx.len()); + let start = values_offset.min(end); + E::count_values_within_byte_budget_gather( + values, + &idx[start..end], + page_byte_limit, + ) + } + None => E::count_values_within_byte_budget( + values, + values_offset, + vals_in_chunk, + page_byte_limit, + ), + }; + match fit { + None => chunk_size, + Some(values_per_subbatch) => { + // Convert the value count from the encoder + // back into a level count, which is what + // `write_granular_chunk` actually subdivides + // on. For non-nullable columns this is a + // no-op; for nullable, scale by the + // observed value-to-level ratio of the + // current chunk. + let levels_per_subbatch = if vals_in_chunk == chunk_size { + values_per_subbatch + } else { + (values_per_subbatch * chunk_size) + .div_ceil(vals_in_chunk) + .max(1) + }; + chunk_size.min(levels_per_subbatch.max(1)) + } + } + } + }; + + if sub_batch_size >= chunk_size { + values_offset += self.write_mini_batch( + values, + values_offset, + value_indices, + chunk_size, + chunk_def, + chunk_rep, + )?; + } else { + values_offset += self.write_granular_chunk( + values, + values_offset, + value_indices, + chunk_size, + chunk_def, + chunk_rep, + sub_batch_size, + )?; + } levels_offset = end_offset; } @@ -712,6 +841,57 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { }) } + /// Writes a chunk in sub-batches sized by the caller-supplied + /// `sub_batch_size` so the post-write data page byte limit check + /// fires before the chunk can grossly overshoot + /// `data_page_size_limit`. + /// + /// For flat (unrepeated) columns sub-batches contain up to + /// `sub_batch_size` levels each. For repeated/nested columns + /// sub-batches step from one `rep == 0` boundary to the next so a + /// record never spans data pages, matching the parquet format rule. + /// + /// Returns the total number of values consumed across all sub-batches. + #[allow(clippy::too_many_arguments)] + fn write_granular_chunk( + &mut self, + values: &E::Values, + values_offset: usize, + value_indices: Option<&[usize]>, + chunk_size: usize, + chunk_def: LevelDataRef<'_>, + chunk_rep: LevelDataRef<'_>, + sub_batch_size: usize, + ) -> Result { + let mut values_consumed = 0; + let mut sub_start = 0; + while sub_start < chunk_size { + let sub_end = match chunk_rep { + LevelDataRef::Materialized(levels) => { + // Step from one record boundary to the next. + let mut e = sub_start + 1; + while e < chunk_size && levels[e] != 0 { + e += 1; + } + e + } + _ => (sub_start + sub_batch_size).min(chunk_size), + }; + let sub_len = sub_end - sub_start; + let written = self.write_mini_batch( + values, + values_offset + values_consumed, + value_indices, + sub_len, + chunk_def.slice(sub_start, sub_len), + chunk_rep.slice(sub_start, sub_len), + )?; + values_consumed += written; + sub_start = sub_end; + } + Ok(values_consumed) + } + /// Creates a new streaming level encoder appropriate for the writer version. fn create_level_encoder(max_level: i16, props: &WriterProperties) -> LevelEncoder { match props.writer_version() { @@ -2675,6 +2855,429 @@ mod tests { assert_eq!(other_values, vec![10]); } + #[test] + fn test_column_writer_caps_page_size_for_large_byte_array_values() { + // Regression: the post-write data page byte limit check only fires + // at mini-batch boundaries, so a 1024-row mini-batch of multi-MiB + // BYTE_ARRAY values used to buffer multiple GiB into a single page + // before the limit was even consulted. With the threshold-based + // granular mode this batch should split into ~one page per value. + let value_size = 64 * 1024; // 64 KiB per value + let page_byte_limit = 16 * 1024; // 16 KiB page limit + let num_rows = 64; + + let mut file = tempfile::tempfile().unwrap(); + let mut write = TrackedWrite::new(&mut file); + let page_writer = Box::new(SerializedPageWriter::new(&mut write)); + let props = Arc::new( + WriterProperties::builder() + .set_writer_version(WriterVersion::PARQUET_1_0) + .set_dictionary_enabled(false) + .set_encoding(Encoding::PLAIN) + .set_data_page_size_limit(page_byte_limit) + // Default write_batch_size (1024) — without the fix this + // buffers the entire input into a single ~4 MiB page. + .build(), + ); + + let mut data = Vec::with_capacity(num_rows); + for i in 0..num_rows { + data.push(ByteArray::from(vec![i as u8; value_size])); + } + + let mut writer = get_test_column_writer::(page_writer, 0, 0, props); + writer.write_batch(&data, None, None).unwrap(); + let r = writer.close().unwrap(); + drop(write); + + let props = ReaderProperties::builder() + .set_backward_compatible_lz4(false) + .build(); + let mut page_reader = Box::new( + SerializedPageReader::new_with_properties( + Arc::new(file), + &r.metadata, + r.rows_written as usize, + None, + Arc::new(props), + ) + .unwrap(), + ); + + let mut data_page_sizes = Vec::new(); + let mut data_page_value_counts = Vec::new(); + while let Some(page) = page_reader.get_next_page().unwrap() { + if matches!( + page.page_type(), + PageType::DATA_PAGE | PageType::DATA_PAGE_V2 + ) { + data_page_sizes.push(page.buffer().len()); + data_page_value_counts.push(page.num_values()); + } + } + + // Every value must end up somewhere. + assert_eq!( + data_page_value_counts.iter().sum::() as usize, + num_rows + ); + // Without the fix this assertion fired with one ~4 MiB page; the + // threshold splits the input so that no page holds more than a + // single oversized value's worth of bytes. + assert!( + data_page_sizes.len() >= num_rows / 2, + "expected pages to be cut close to one per value, got {} pages for sizes {:?}", + data_page_sizes.len(), + data_page_sizes, + ); + // Each page must be bounded by roughly one value's worth of bytes; + // parquet allows a single oversized value to occupy a page by + // itself but never lets us pile many of them together. + for size in &data_page_sizes { + assert!( + *size <= value_size + 64, + "page size {size} exceeds one-value bound ({}B) — pages {:?}", + value_size + 64, + data_page_sizes, + ); + } + } + + #[test] + fn test_column_writer_caps_page_size_for_large_values_in_list() { + // Coverage for the Materialized-rep branch of + // `write_granular_chunk`. The flat-column regression test + // exercises the per-level step; this exercises the + // record-by-record step used when rep levels are present. + // + // Column is `list` (max_def = 1, max_rep = 1) + // with 3 records of 3 large blobs each. The page byte limit is + // smaller than a single blob, so granular mode kicks in, and the + // Materialized-rep arm of `write_granular_chunk` steps from one + // `rep == 0` boundary to the next so a record never spans pages. + let value_size = 32 * 1024; + let page_byte_limit = 16 * 1024; + let values_per_record = 3; + let num_records = 3; + let num_values = values_per_record * num_records; + + // rep levels: 0, 1, 1, 0, 1, 1, 0, 1, 1 + let mut rep_levels = Vec::with_capacity(num_values); + for _ in 0..num_records { + rep_levels.push(0i16); + rep_levels.extend(std::iter::repeat_n(1i16, values_per_record - 1)); + } + let def_levels = vec![1i16; num_values]; + + let mut file = tempfile::tempfile().unwrap(); + let mut write = TrackedWrite::new(&mut file); + let page_writer = Box::new(SerializedPageWriter::new(&mut write)); + let props = Arc::new( + WriterProperties::builder() + .set_writer_version(WriterVersion::PARQUET_1_0) + .set_dictionary_enabled(false) + .set_encoding(Encoding::PLAIN) + .set_data_page_size_limit(page_byte_limit) + .build(), + ); + + let mut data = Vec::with_capacity(num_values); + for i in 0..num_values { + data.push(ByteArray::from(vec![i as u8; value_size])); + } + + let mut writer = get_test_column_writer::(page_writer, 1, 1, props); + writer + .write_batch(&data, Some(&def_levels), Some(&rep_levels)) + .unwrap(); + let r = writer.close().unwrap(); + drop(write); + + let props = ReaderProperties::builder() + .set_backward_compatible_lz4(false) + .build(); + let mut page_reader = Box::new( + SerializedPageReader::new_with_properties( + Arc::new(file), + &r.metadata, + r.rows_written as usize, + None, + Arc::new(props), + ) + .unwrap(), + ); + + let mut data_pages = Vec::new(); + while let Some(page) = page_reader.get_next_page().unwrap() { + if matches!( + page.page_type(), + PageType::DATA_PAGE | PageType::DATA_PAGE_V2 + ) { + data_pages.push((page.buffer().len(), page.num_values())); + } + } + + // The Materialized-rep arm groups levels by record, and each + // record's bytes blow the page byte limit on its own, so we get + // exactly one page per record. + assert_eq!( + data_pages.len(), + num_records, + "expected one data page per record, got {data_pages:?}" + ); + for (bytes, n_values) in &data_pages { + assert_eq!( + *n_values as usize, values_per_record, + "each page must hold a whole record's leaves, got {data_pages:?}" + ); + // Each page is one full record (its leaves cannot be split), + // so allow up to `values_per_record` blobs of payload plus a + // small fudge for level encoding overhead. + let upper_bound = values_per_record * (value_size + 16); + assert!( + *bytes <= upper_bound, + "page size {bytes} exceeds whole-record bound ({upper_bound}); pages {data_pages:?}" + ); + } + } + + #[test] + fn test_column_writer_caps_page_size_with_nullable_large_values() { + // Coverage for `LevelDataRef::value_count` on Materialized def + // levels: a nullable column with mixed nulls and large values. + // `value_count` must return the actual non-null count so the + // byte estimate reflects bytes that will actually be written, + // not the level count. + let value_size = 32 * 1024; + let page_byte_limit = 16 * 1024; + let num_levels = 32; + + // Alternating null / non-null: 16 nulls and 16 values. + let def_levels: Vec = (0..num_levels as i16).map(|i| i % 2).collect(); + let num_values = def_levels.iter().filter(|&&d| d == 1).count(); + + let mut file = tempfile::tempfile().unwrap(); + let mut write = TrackedWrite::new(&mut file); + let page_writer = Box::new(SerializedPageWriter::new(&mut write)); + let props = Arc::new( + WriterProperties::builder() + .set_writer_version(WriterVersion::PARQUET_1_0) + .set_dictionary_enabled(false) + .set_encoding(Encoding::PLAIN) + .set_data_page_size_limit(page_byte_limit) + .build(), + ); + + let mut data = Vec::with_capacity(num_values); + for i in 0..num_values { + data.push(ByteArray::from(vec![i as u8; value_size])); + } + + let mut writer = get_test_column_writer::(page_writer, 1, 0, props); + writer.write_batch(&data, Some(&def_levels), None).unwrap(); + let r = writer.close().unwrap(); + drop(write); + + let props = ReaderProperties::builder() + .set_backward_compatible_lz4(false) + .build(); + let mut page_reader = Box::new( + SerializedPageReader::new_with_properties( + Arc::new(file), + &r.metadata, + r.rows_written as usize, + None, + Arc::new(props), + ) + .unwrap(), + ); + + let mut data_pages = Vec::new(); + while let Some(page) = page_reader.get_next_page().unwrap() { + if matches!( + page.page_type(), + PageType::DATA_PAGE | PageType::DATA_PAGE_V2 + ) { + data_pages.push(page.buffer().len()); + } + } + + // With 16 actual values of 32 KiB each and a 16 KiB page limit, + // every non-null value should get its own page (plus possibly + // adjacent nulls). At minimum, the number of pages must be + // roughly the value count, not 1 (which is what `main` produced). + assert!( + data_pages.len() >= num_values / 2, + "expected at least {} pages for {num_values} large values, got {} pages: {data_pages:?}", + num_values / 2, + data_pages.len(), + ); + // No page contains more than ~one value's worth of payload bytes. + for size in &data_pages { + assert!( + *size <= value_size + 64, + "page size {size} exceeds one-value bound; pages {data_pages:?}" + ); + } + } + + #[test] + fn test_column_writer_dict_enabled_large_values_post_spill() { + // While dictionary encoding is active, `has_dictionary()` short- + // circuits `estimated_value_bytes` — the byte estimate is plain- + // encoded size but dict-encoded pages only store small RLE + // indices, so we'd otherwise shrink pages spuriously. Once the + // dictionary spills (each value is large + unique), plain + // encoding takes over and the byte-budget sub-batch kicks in. + // + // This test makes sure the writer survives that transition and + // produces bounded pages thereafter. + let value_size = 64 * 1024; + let page_byte_limit = 16 * 1024; + let num_rows = 32; + + let mut file = tempfile::tempfile().unwrap(); + let mut write = TrackedWrite::new(&mut file); + let page_writer = Box::new(SerializedPageWriter::new(&mut write)); + let props = Arc::new( + WriterProperties::builder() + .set_writer_version(WriterVersion::PARQUET_1_0) + .set_dictionary_enabled(true) + // Force a small dict so it spills quickly even though + // each value here is unique. + .set_dictionary_page_size_limit(1024) + .set_data_page_size_limit(page_byte_limit) + // Small mini-batches so dict fallback happens part-way + // through the input, leaving subsequent mini-batches to + // exercise the post-spill plain-encoding path that the + // page-size fix actually targets. + .set_write_batch_size(4) + .build(), + ); + + let mut data = Vec::with_capacity(num_rows); + for i in 0..num_rows { + data.push(ByteArray::from(vec![i as u8; value_size])); + } + + let mut writer = get_test_column_writer::(page_writer, 0, 0, props); + writer.write_batch(&data, None, None).unwrap(); + let r = writer.close().unwrap(); + drop(write); + + let props = ReaderProperties::builder() + .set_backward_compatible_lz4(false) + .build(); + let mut page_reader = Box::new( + SerializedPageReader::new_with_properties( + Arc::new(file), + &r.metadata, + r.rows_written as usize, + None, + Arc::new(props), + ) + .unwrap(), + ); + + let mut data_pages = Vec::new(); + while let Some(page) = page_reader.get_next_page().unwrap() { + if matches!( + page.page_type(), + PageType::DATA_PAGE | PageType::DATA_PAGE_V2 + ) { + data_pages.push(page.buffer().len()); + } + } + + // After spill, plain encoding writes one ~64 KiB value per page. + // Without the fix, post-spill writes still buffered all 32 + // values into a single ~2 MiB page. + assert!( + data_pages.len() >= num_rows / 2, + "expected >= {} data pages after dict spill, got {} ({data_pages:?})", + num_rows / 2, + data_pages.len(), + ); + for size in &data_pages { + assert!( + *size <= value_size + 64, + "page size {size} exceeds one-value bound; pages {data_pages:?}" + ); + } + } + + #[test] + fn test_column_writer_caps_page_size_for_fixed_len_byte_array() { + // Coverage for `ParquetValueType::byte_size` override on + // `FixedLenByteArray`. With `type_length = 1`, each plain-encoded + // value is one byte, so a 4-byte page byte limit forces the + // sub-batch sizer to write ~4 values per page rather than one + // page for the whole batch. + let page_byte_limit = 4; + let num_values = 128; + + let mut file = tempfile::tempfile().unwrap(); + let mut write = TrackedWrite::new(&mut file); + let page_writer = Box::new(SerializedPageWriter::new(&mut write)); + let props = Arc::new( + WriterProperties::builder() + .set_writer_version(WriterVersion::PARQUET_1_0) + .set_dictionary_enabled(false) + .set_encoding(Encoding::PLAIN) + .set_data_page_size_limit(page_byte_limit) + .build(), + ); + + let mut data = Vec::with_capacity(num_values); + for i in 0..num_values { + let mut fla = FixedLenByteArray::default(); + fla.set_data(Bytes::from(vec![i as u8])); + data.push(fla); + } + + let mut writer = get_test_column_writer::(page_writer, 0, 0, props); + writer.write_batch(&data, None, None).unwrap(); + let r = writer.close().unwrap(); + drop(write); + + let props = ReaderProperties::builder() + .set_backward_compatible_lz4(false) + .build(); + let mut page_reader = Box::new( + SerializedPageReader::new_with_properties( + Arc::new(file), + &r.metadata, + r.rows_written as usize, + None, + Arc::new(props), + ) + .unwrap(), + ); + + let mut data_pages = Vec::new(); + while let Some(page) = page_reader.get_next_page().unwrap() { + if matches!( + page.page_type(), + PageType::DATA_PAGE | PageType::DATA_PAGE_V2 + ) { + data_pages.push(page.buffer().len()); + } + } + + // Without the fix this is a single 128-byte page; with the fix + // the byte budget caps each page at ~`page_byte_limit` bytes. + assert!( + data_pages.len() >= num_values / 8, + "expected pages capped by byte budget, got {data_pages:?}" + ); + for size in &data_pages { + assert!( + *size <= page_byte_limit * 4, + "page size {size} larger than expected; pages {data_pages:?}" + ); + } + } + #[test] fn test_bool_statistics() { let stats = statistics_roundtrip::(&[true, false, false, true]); diff --git a/parquet/src/data_type.rs b/parquet/src/data_type.rs index d8c7b9201389..5ce0253690bd 100644 --- a/parquet/src/data_type.rs +++ b/parquet/src/data_type.rs @@ -726,6 +726,17 @@ pub(crate) mod private { (std::mem::size_of::(), 1) } + /// Estimated encoded byte size of this value when serialized into a + /// plain-encoded data page. Used by the column writer to decide + /// whether to mini-batch a chunk in one call or value-by-value, so + /// that a single mini-batch of very large `BYTE_ARRAY` values can't + /// push a page far past the configured page byte limit before the + /// post-write size check fires. + #[inline] + fn byte_size(&self) -> usize { + std::mem::size_of::() + } + /// Return the number of variable length bytes in a given slice of data /// /// Returns the sum of lengths for BYTE_ARRAY data, and None for all other data types @@ -1076,6 +1087,13 @@ pub(crate) mod private { (std::mem::size_of::(), self.len()) } + #[inline] + fn byte_size(&self) -> usize { + // Plain encoding writes a 4-byte length prefix followed by the + // raw bytes for each value. + self.len() + std::mem::size_of::() + } + #[inline] fn as_any(&self) -> &dyn std::any::Any { self @@ -1176,6 +1194,13 @@ pub(crate) mod private { (std::mem::size_of::(), self.len()) } + #[inline] + fn byte_size(&self) -> usize { + // FIXED_LEN_BYTE_ARRAY plain encoding writes the raw bytes only, + // no length prefix. + self.len() + } + #[inline] fn as_any(&self) -> &dyn std::any::Any { self From e0d85faaeae273011167990c85ee2a9fafba7b66 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 14 May 2026 08:02:59 -0700 Subject: [PATCH 03/17] perf(parquet): O(1)-per-value count_values_within_byte_budget for view arrays MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The byte-budget check on `Utf8View` / `BinaryView` columns previously fell through to a per-value walk via `ArrayAccessor::value`, which constructs a `&str`/`&[u8]` slice for each index — chasing the buffer pointer through the view's u128 word, then slicing `data_buffers[i]`. At ~1 µs per chunk over ~1000 chunks on the 1 M-row `string_and_binary_view` bench, that was a consistent ~+3–5 % regression vs `main` in both GKE benchmark runs. View arrays store each value's length in the low 32 bits of its u128 view, so we can scan lengths with no data-buffer dereferences: ``` let len = (views[idx] as u32) as usize; ``` Add a dedicated fast path for `Utf8View` and `BinaryView` that walks the views buffer directly. Falls through to the per-value walk only for `FixedSizeBinary` and `Dictionary` — the latter still needs the dictionary-keys indirection. Co-Authored-By: Claude Opus 4.7 --- parquet/src/arrow/arrow_writer/byte_array.rs | 40 +++++++++++++++++++- 1 file changed, 38 insertions(+), 2 deletions(-) diff --git a/parquet/src/arrow/arrow_writer/byte_array.rs b/parquet/src/arrow/arrow_writer/byte_array.rs index b257b867191b..93045d16faca 100644 --- a/parquet/src/arrow/arrow_writer/byte_array.rs +++ b/parquet/src/arrow/arrow_writer/byte_array.rs @@ -516,8 +516,29 @@ impl ColumnValueEncoder for ByteArrayEncoder { indices, byte_budget, ), - // Utf8View/BinaryView/FixedSizeBinary/Dictionary fall through - // to the per-value walk via `ArrayAccessor::value`. + // View arrays carry the length in the low 32 bits of each + // u128 view, so we can scan lengths without dereferencing + // any data buffer. + DataType::Utf8View => count_within_budget_views( + values + .as_any() + .downcast_ref::() + .unwrap() + .views(), + indices, + byte_budget, + ), + DataType::BinaryView => count_within_budget_views( + values + .as_any() + .downcast_ref::() + .unwrap() + .views(), + indices, + byte_budget, + ), + // FixedSizeBinary/Dictionary fall through to the per-value + // walk via `ArrayAccessor::value`. _ => downcast_op!( values.data_type(), values, @@ -665,6 +686,21 @@ where indices.len() } +/// Fast path for view arrays (`Utf8View`, `BinaryView`). The view layout +/// stores each value's length in the low 32 bits of its u128 view word, +/// so we can scan lengths with no data-buffer dereferences. +fn count_within_budget_views(views: &[u128], indices: &[usize], byte_budget: usize) -> usize { + let mut cum: usize = 0; + for (i, idx) in indices.iter().enumerate() { + let len = (views[*idx] as u32) as usize; + cum = cum.saturating_add(len + std::mem::size_of::()); + if cum > byte_budget { + return i.max(1); + } + } + indices.len() +} + /// Two-stage fast path for `GenericByteArray` /// (Utf8/LargeUtf8/Binary/LargeBinary): /// From 81d81a3bf23d655b907406dc1e70f679867bd9f5 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 14 May 2026 09:25:31 -0700 Subject: [PATCH 04/17] perf(parquet): cut byte-budget cost on dict input and sparse-null lists MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two targeted regressions surfaced in the GKE benchmark sweep: 1. `string_dictionary/*` regressed +30-89 % vs `main` after writer-dict spill. The arrow Dictionary input falls through to the per-value walk via `ArrayAccessor::value`, which dereferences the dict (keys[idx] → values[key] → slice construction) for every index in every chunk. The whole point of the byte-budget check is to bound pages of large BYTE_ARRAY values, but an arrow column that's already Dictionary-encoded at the arrow layer implies its values are small enough that dedup is worthwhile — the opposite shape. Treat Dictionary input as "everything fits" and skip the check. 2. `list_primitive_sparse_99pct_null` regressed ~+8 % across props. The cost was `LevelDataRef::value_count`'s O(N) def-level scan on the 20 000-row compact-levels chunks the list path uses. The arrow path already has the answer cheaper: `value_indices` is the sorted list of non-null positions in the batch, so the count of indices falling in the current chunk's level range is a binary search (one `partition_point`). Use that when `value_indices` is `Some` and fall back to the def-level scan only on the non-arrow path. Co-Authored-By: Claude Opus 4.7 --- parquet/src/arrow/arrow_writer/byte_array.rs | 14 ++++++++++++-- parquet/src/column/writer/mod.rs | 19 ++++++++++++++++++- 2 files changed, 30 insertions(+), 3 deletions(-) diff --git a/parquet/src/arrow/arrow_writer/byte_array.rs b/parquet/src/arrow/arrow_writer/byte_array.rs index 93045d16faca..52d02ffbd175 100644 --- a/parquet/src/arrow/arrow_writer/byte_array.rs +++ b/parquet/src/arrow/arrow_writer/byte_array.rs @@ -537,8 +537,18 @@ impl ColumnValueEncoder for ByteArrayEncoder { indices, byte_budget, ), - // FixedSizeBinary/Dictionary fall through to the per-value - // walk via `ArrayAccessor::value`. + // For arrow Dictionary input, treat every chunk as fitting + // and stay on the batched path. The arrow array being + // Dictionary-encoded in the first place implies its values + // are small enough that dedup is worthwhile, which is the + // opposite of the "5 MiB blob per row" case this fix + // targets. Doing a per-value walk through dict keys (each + // value lookup is keys[i] → values[key] → slice) on every + // chunk costs ~+30-80% vs `main` after writer-dict spill, + // and there is essentially nothing to bound. + DataType::Dictionary(_, _) => indices.len(), + // FixedSizeBinary falls through to the per-value walk via + // `ArrayAccessor::value`. _ => downcast_op!( values.data_type(), values, diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index 4404c2efd195..05562f78c59e 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -622,7 +622,24 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { if static_always_fits || self.encoder.has_dictionary() || chunk_size == 0 { chunk_size } else { - let vals_in_chunk = chunk_def.value_count(chunk_size, max_def); + // Compute how many values belong to this chunk. Two + // paths so this stays O(log n) instead of O(n) on + // wide compact-levels chunks (the cost is + // significant for sparse lists where + // `data_page_row_count_limit` is 20 000): + // + // - When the caller supplied gather indices, + // `value_indices` is the sorted list of non-null + // positions in the batch. The count of indices + // falling in `[levels_offset, end_offset)` is a + // binary search; `values_offset` is the prefix + // already consumed. + // - Otherwise (non-arrow direct write), fall back + // to the def-level scan. + let vals_in_chunk = match value_indices { + Some(idx) => idx[values_offset..].partition_point(|&i| i < end_offset), + None => chunk_def.value_count(chunk_size, max_def), + }; if vals_in_chunk == 0 { chunk_size } else { From 53d61aad432f1ee60dfe6fcc6e8bb4f4d934291e Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 14 May 2026 11:51:52 -0700 Subject: [PATCH 05/17] refactor(parquet): drop byte_size, extract chunk decision into ByteBudgetChunker MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two small structural cleanups in response to PR review: - Remove `ParquetValueType::byte_size`. It overlapped with `dict_encoding_size`, which @etseidl pointed out is being renamed and generalized in #9700. Instead, compute the per-value plain- encoded byte cost inline in `ColumnValueEncoderImpl::count_values_within_byte_budget` from `dict_encoding_size`'s components, dispatched on the physical type (same dispatch shape as `DictEncoder::push` in `encodings/encoding/dict_encoder.rs:52`). No new trait method. - Lift the byte-budget mini-batch sizing decision out of `write_batch_internal` into a new `ByteBudgetChunker` struct (`column/writer/byte_budget_chunker.rs`). The chunker captures the column-open-time facts (page byte limit, static-fits flag, max_def_level) once and exposes one `pick_sub_batch_size` method. `write_batch_internal`'s inner loop is now ~25 lines shorter and reads as: compute chunk boundary → ask chunker for sub_batch_size → write_mini_batch or write_granular_chunk. This is the lightweight version of the "make it a chunker like CDC" suggestion. A full CDC-style pre-compute would emit all chunk boundaries upfront, but the byte budget decision depends on the encoder's live `has_dictionary()` state, which changes mid-batch when the writer's dictionary spills. Querying that per chunk (as this refactor does) preserves the existing dict-active short- circuit; a precomputed plan would force a choice between losing that short-circuit or losing correctness when dict spills mid-batch on large-value columns. No behavior change. Tests still pass and `cargo bench` shows the same deltas as before the refactor. Co-Authored-By: Claude Opus 4.7 --- .../src/column/writer/byte_budget_chunker.rs | 160 ++++++++++++++++++ parquet/src/column/writer/encoder.rs | 32 +++- parquet/src/column/writer/mod.rs | 124 ++------------ parquet/src/data_type.rs | 25 --- 4 files changed, 203 insertions(+), 138 deletions(-) create mode 100644 parquet/src/column/writer/byte_budget_chunker.rs diff --git a/parquet/src/column/writer/byte_budget_chunker.rs b/parquet/src/column/writer/byte_budget_chunker.rs new file mode 100644 index 000000000000..c22ca607c38b --- /dev/null +++ b/parquet/src/column/writer/byte_budget_chunker.rs @@ -0,0 +1,160 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Decides how many levels of a chunk to write as one mini-batch so that +//! the resulting data page stays within `data_page_size_limit`. +//! +//! The parquet column writer checks the data page byte limit only *after* +//! each mini-batch finishes writing. Mini-batches are sized in rows +//! (`write_batch_size`, default 1024), so for BYTE_ARRAY columns whose +//! values are large (e.g. multi-MiB blobs), a single mini-batch can buffer +//! GiB into one page before the limit is consulted. +//! +//! This module isolates the per-chunk decision that prevents that: given a +//! chunk's level data and the input values, pick the largest `sub_batch_size` +//! such that one mini-batch will fit in one page byte budget. For the +//! overwhelmingly common case (small values), the answer is just +//! `chunk_size` and the decision is O(1) on the column type. Only when the +//! input might overflow does the chunker consult the encoder's byte +//! estimate. + +use crate::basic::Type; +use crate::column::writer::LevelDataRef; +use crate::column::writer::encoder::ColumnValueEncoder; +use crate::file::properties::WriterProperties; +use crate::schema::types::ColumnDescriptor; + +/// Per-column-open chunker that picks byte-budget-aware mini-batch sizes. +pub(crate) struct ByteBudgetChunker { + /// Configured data page byte limit for the column. + page_byte_limit: usize, + /// `true` when no chunk of `base_batch_size` values can ever overflow + /// `page_byte_limit` regardless of input. Set once at column open from + /// the physical type's known per-value byte size; lets the per-chunk + /// decision short-circuit with no work for every numeric, bool, or + /// narrow `FIXED_LEN_BYTE_ARRAY` column. + static_always_fits: bool, + /// Column's `max_def_level`, needed by `LevelDataRef::value_count` for + /// the non-arrow path where we don't have a sorted `non_null_indices`. + max_def_level: i16, +} + +impl ByteBudgetChunker { + pub(crate) fn new( + descr: &ColumnDescriptor, + props: &WriterProperties, + base_batch_size: usize, + ) -> Self { + let page_byte_limit = props.column_data_page_size_limit(descr.path()); + let static_bytes_per_value = match descr.physical_type() { + Type::BOOLEAN => Some(1), + Type::INT32 | Type::FLOAT => Some(std::mem::size_of::()), + Type::INT64 | Type::DOUBLE => Some(std::mem::size_of::()), + Type::INT96 => Some(12), + Type::FIXED_LEN_BYTE_ARRAY => Some(descr.type_length().max(0) as usize), + Type::BYTE_ARRAY => None, + }; + let static_always_fits = static_bytes_per_value + .map(|b| b.saturating_mul(base_batch_size) <= page_byte_limit) + .unwrap_or(false); + Self { + page_byte_limit, + static_always_fits, + max_def_level: descr.max_def_level(), + } + } + + /// Decide how many levels at the start of `chunk_def` belong in one + /// mini-batch. + /// + /// Returns `chunk_size` when the whole chunk fits in one page byte + /// budget. A smaller number triggers granular sub-batching in + /// `write_batch_internal`'s `write_granular_chunk` arm. + /// + /// Bypasses: + /// - When `static_always_fits` is true (fixed-width type with a + /// safe `base_batch_size`), return `chunk_size`. + /// - When the encoder is currently dictionary-encoding, + /// `estimated_value_bytes` would return plain-encoded bytes while + /// the actual page only stores small RLE indices, so the budget + /// would shrink pages spuriously. Return `chunk_size` and let + /// dictionary fallback bound dict-encoded pages independently. + /// - When `chunk_size == 0`, there's nothing to size. + #[allow(clippy::too_many_arguments)] + pub(crate) fn pick_sub_batch_size( + &self, + encoder: &E, + values: &E::Values, + value_indices: Option<&[usize]>, + chunk_def: LevelDataRef<'_>, + values_offset: usize, + chunk_size: usize, + end_offset: usize, + ) -> usize { + if self.static_always_fits || encoder.has_dictionary() || chunk_size == 0 { + return chunk_size; + } + // Count how many values fall in this chunk's level range. The + // arrow path passes a sorted `non_null_indices`, so this is one + // binary search; otherwise we walk the def-level slice. + let vals_in_chunk = match value_indices { + Some(idx) => idx[values_offset..].partition_point(|&i| i < end_offset), + None => chunk_def.value_count(chunk_size, self.max_def_level), + }; + if vals_in_chunk == 0 { + return chunk_size; + } + // Ask the encoder how many of the next values fit in one page + // byte budget. Dispatch on whether the caller supplied gather + // indices; this mirrors how `write_mini_batch` picks between + // `write_gather` and `write`. + let fit = match value_indices { + Some(idx) => { + let end = (values_offset + vals_in_chunk).min(idx.len()); + let start = values_offset.min(end); + E::count_values_within_byte_budget_gather( + values, + &idx[start..end], + self.page_byte_limit, + ) + } + None => E::count_values_within_byte_budget( + values, + values_offset, + vals_in_chunk, + self.page_byte_limit, + ), + }; + match fit { + None => chunk_size, + Some(values_per_subbatch) => { + // Convert the value count from the encoder back into a + // level count. For non-nullable columns this is a no-op; + // for nullable, scale by the observed value-to-level + // ratio of the current chunk. + let levels_per_subbatch = if vals_in_chunk == chunk_size { + values_per_subbatch + } else { + (values_per_subbatch * chunk_size) + .div_ceil(vals_in_chunk) + .max(1) + }; + chunk_size.min(levels_per_subbatch.max(1)) + } + } + } +} diff --git a/parquet/src/column/writer/encoder.rs b/parquet/src/column/writer/encoder.rs index c39b96f3eb0e..537bf39353a9 100644 --- a/parquet/src/column/writer/encoder.rs +++ b/parquet/src/column/writer/encoder.rs @@ -32,6 +32,34 @@ use crate::geospatial::accumulator::{GeoStatsAccumulator, try_new_geo_stats_accu use crate::geospatial::statistics::GeospatialStatistics; use crate::schema::types::{ColumnDescPtr, ColumnDescriptor}; +/// Plain-encoded byte cost of a single value of type `T::T`. +/// +/// Derived from [`ParquetValueType::dict_encoding_size`] so we don't add a +/// parallel per-value-size hook to the trait. The components returned by +/// `dict_encoding_size` are `(per-value overhead, value-bytes)`. For +/// plain encoding the on-disk layout is: +/// +/// - `BYTE_ARRAY`: 4-byte length prefix + payload bytes = `overhead + bytes`. +/// - `FIXED_LEN_BYTE_ARRAY`: raw bytes only, taken from the type descriptor's +/// `type_length`. The value's own `dict_encoding_size` reports the length +/// prefix, which is irrelevant for plain FLBA encoding; the encoder passes +/// `type_length` directly. +/// - Everything else (numeric / bool): a constant per-value size; the caller +/// already short-circuits these via `mem::size_of::()` before +/// touching this function, so this branch is unreachable in practice and +/// we fall back to `overhead` defensively. +/// +/// See `dict_encoder.rs::push` (line ~52) for the matching dispatch. +#[inline] +fn plain_encoded_byte_size(value: &T::T) -> usize { + let (overhead, bytes) = value.dict_encoding_size(); + match ::PHYSICAL_TYPE { + Type::BYTE_ARRAY => overhead + bytes, + Type::FIXED_LEN_BYTE_ARRAY => bytes, + _ => overhead, + } +} + /// A collection of [`ParquetValueType`] encoded by a [`ColumnValueEncoder`] pub trait ColumnValues { /// The number of values in this collection @@ -310,7 +338,7 @@ impl ColumnValueEncoder for ColumnValueEncoderImpl { // early value alone exceeds the budget. let mut cum: usize = 0; for (i, v) in values[start..end].iter().enumerate() { - cum = cum.saturating_add(v.byte_size()); + cum = cum.saturating_add(plain_encoded_byte_size::(v)); if cum > byte_budget { return Some(i.max(1)); } @@ -332,7 +360,7 @@ impl ColumnValueEncoder for ColumnValueEncoderImpl { let mut cum: usize = 0; for (i, idx) in indices.iter().enumerate() { let Some(v) = values.get(*idx) else { continue }; - cum = cum.saturating_add(v.byte_size()); + cum = cum.saturating_add(plain_encoded_byte_size::(v)); if cum > byte_budget { return Some(i.max(1)); } diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index 05562f78c59e..53662e1b2990 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -48,8 +48,11 @@ use crate::file::properties::{ use crate::file::statistics::{Statistics, ValueStatistics}; use crate::schema::types::{ColumnDescPtr, ColumnDescriptor}; +mod byte_budget_chunker; pub(crate) mod encoder; +use byte_budget_chunker::ByteBudgetChunker; + macro_rules! downcast_writer { ($e:expr, $i:ident, $b:expr) => { match $e { @@ -562,29 +565,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { } else { self.props.write_batch_size() }; - let page_byte_limit = self.props.column_data_page_size_limit(self.descr.path()); - let max_def = self.descr.max_def_level(); - // Static fast path: for fixed-width physical types, plain-encoded - // size per value is known at column-open time. If `base_batch_size` - // values can fit in `page_byte_limit` outright, no chunk can ever - // overflow and we can skip the per-chunk byte-budget check - // entirely. This is the common case for every numeric/bool column - // — its per-chunk overhead disappears, leaving only the byte - // estimator to BYTE_ARRAY (where it's actually needed) and to the - // rare FIXED_LEN_BYTE_ARRAY column with wide fixed elements and a - // small page byte limit. - let static_bytes_per_value = match self.descr.physical_type() { - Type::BOOLEAN => Some(1), - Type::INT32 | Type::FLOAT => Some(std::mem::size_of::()), - Type::INT64 | Type::DOUBLE => Some(std::mem::size_of::()), - Type::INT96 => Some(12), - Type::FIXED_LEN_BYTE_ARRAY => Some(self.descr.type_length().max(0) as usize), - Type::BYTE_ARRAY => None, - }; - let max_chunk_bytes_static = static_bytes_per_value - .map(|b| b.saturating_mul(base_batch_size)) - .unwrap_or(usize::MAX); - let static_always_fits = max_chunk_bytes_static <= page_byte_limit; + let chunker = ByteBudgetChunker::new(&self.descr, &self.props, base_batch_size); while levels_offset < num_levels { let mut end_offset = num_levels.min(levels_offset + base_batch_size); @@ -599,94 +580,15 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { let chunk_def = def_levels.slice(levels_offset, chunk_size); let chunk_rep = rep_levels.slice(levels_offset, chunk_size); - // Pick a sub-batch size so that one mini-batch maps to roughly - // one page byte budget's worth of values. Without this, the - // post-write data page byte check only fires after a full - // `write_batch_size`-row chunk is buffered, so a 1024-row chunk - // of 5 MiB values can push a page to ~5 GiB before the check - // even runs. - // - // For typical small values (e.g. 50–100 B strings), the budget - // accommodates more rows than the chunk holds, so this resolves - // to `chunk_size` and the loop stays on the zero-overhead - // batched fast path. Only when individual values are large - // enough that 1024 of them would blow the page byte limit does - // the sub-batch shrink below `chunk_size`. - // - // Skip the check entirely while dictionary encoding is active: - // `estimated_value_bytes` returns plain-encoded size, but a - // dict-encoded data page only stores small RLE indices, so the - // raw byte estimate would spuriously shrink pages. Dictionary - // fallback bounds dict-encoded pages independently. - let sub_batch_size = - if static_always_fits || self.encoder.has_dictionary() || chunk_size == 0 { - chunk_size - } else { - // Compute how many values belong to this chunk. Two - // paths so this stays O(log n) instead of O(n) on - // wide compact-levels chunks (the cost is - // significant for sparse lists where - // `data_page_row_count_limit` is 20 000): - // - // - When the caller supplied gather indices, - // `value_indices` is the sorted list of non-null - // positions in the batch. The count of indices - // falling in `[levels_offset, end_offset)` is a - // binary search; `values_offset` is the prefix - // already consumed. - // - Otherwise (non-arrow direct write), fall back - // to the def-level scan. - let vals_in_chunk = match value_indices { - Some(idx) => idx[values_offset..].partition_point(|&i| i < end_offset), - None => chunk_def.value_count(chunk_size, max_def), - }; - if vals_in_chunk == 0 { - chunk_size - } else { - // Ask the encoder how many of the next values fit in - // one page byte budget. Dispatch on whether the - // caller supplied gather indices; this mirrors how - // `write_mini_batch` picks between `write_gather` and - // `write`. - let fit = match value_indices { - Some(idx) => { - let end = (values_offset + vals_in_chunk).min(idx.len()); - let start = values_offset.min(end); - E::count_values_within_byte_budget_gather( - values, - &idx[start..end], - page_byte_limit, - ) - } - None => E::count_values_within_byte_budget( - values, - values_offset, - vals_in_chunk, - page_byte_limit, - ), - }; - match fit { - None => chunk_size, - Some(values_per_subbatch) => { - // Convert the value count from the encoder - // back into a level count, which is what - // `write_granular_chunk` actually subdivides - // on. For non-nullable columns this is a - // no-op; for nullable, scale by the - // observed value-to-level ratio of the - // current chunk. - let levels_per_subbatch = if vals_in_chunk == chunk_size { - values_per_subbatch - } else { - (values_per_subbatch * chunk_size) - .div_ceil(vals_in_chunk) - .max(1) - }; - chunk_size.min(levels_per_subbatch.max(1)) - } - } - } - }; + let sub_batch_size = chunker.pick_sub_batch_size( + &self.encoder, + values, + value_indices, + chunk_def, + values_offset, + chunk_size, + end_offset, + ); if sub_batch_size >= chunk_size { values_offset += self.write_mini_batch( diff --git a/parquet/src/data_type.rs b/parquet/src/data_type.rs index 5ce0253690bd..d8c7b9201389 100644 --- a/parquet/src/data_type.rs +++ b/parquet/src/data_type.rs @@ -726,17 +726,6 @@ pub(crate) mod private { (std::mem::size_of::(), 1) } - /// Estimated encoded byte size of this value when serialized into a - /// plain-encoded data page. Used by the column writer to decide - /// whether to mini-batch a chunk in one call or value-by-value, so - /// that a single mini-batch of very large `BYTE_ARRAY` values can't - /// push a page far past the configured page byte limit before the - /// post-write size check fires. - #[inline] - fn byte_size(&self) -> usize { - std::mem::size_of::() - } - /// Return the number of variable length bytes in a given slice of data /// /// Returns the sum of lengths for BYTE_ARRAY data, and None for all other data types @@ -1087,13 +1076,6 @@ pub(crate) mod private { (std::mem::size_of::(), self.len()) } - #[inline] - fn byte_size(&self) -> usize { - // Plain encoding writes a 4-byte length prefix followed by the - // raw bytes for each value. - self.len() + std::mem::size_of::() - } - #[inline] fn as_any(&self) -> &dyn std::any::Any { self @@ -1194,13 +1176,6 @@ pub(crate) mod private { (std::mem::size_of::(), self.len()) } - #[inline] - fn byte_size(&self) -> usize { - // FIXED_LEN_BYTE_ARRAY plain encoding writes the raw bytes only, - // no length prefix. - self.len() - } - #[inline] fn as_any(&self) -> &dyn std::any::Any { self From ac56d99dac0546ffff2814fbc7ecb7dcaec2f1c3 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 14 May 2026 13:39:22 -0700 Subject: [PATCH 06/17] perf(parquet): inline ByteBudgetChunker entry points GKE bench shows string_dictionary regresses ~+90% on the branch even though `pick_sub_batch_size` should short-circuit instantly when the encoder's dictionary is still active (single struct-field load + virtual call into `has_dictionary()`). Local laptop benches don't reproduce the regression, suggesting it's an architecture-specific inlining/code-layout effect on the GKE aarch64 runner. Marking `new` and `pick_sub_batch_size` `#[inline]` to give the compiler a clear hint that these should fold into `write_batch_internal`'s hot loop. Local laptop bench is unchanged (~+3% on string_dictionary, ~+5% on string_and_binary_view, both within noise); pushing to see whether GKE moves. Co-Authored-By: Claude Opus 4.7 --- parquet/src/column/writer/byte_budget_chunker.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/parquet/src/column/writer/byte_budget_chunker.rs b/parquet/src/column/writer/byte_budget_chunker.rs index c22ca607c38b..e353a8c5e655 100644 --- a/parquet/src/column/writer/byte_budget_chunker.rs +++ b/parquet/src/column/writer/byte_budget_chunker.rs @@ -54,6 +54,7 @@ pub(crate) struct ByteBudgetChunker { } impl ByteBudgetChunker { + #[inline] pub(crate) fn new( descr: &ColumnDescriptor, props: &WriterProperties, @@ -94,7 +95,13 @@ impl ByteBudgetChunker { /// would shrink pages spuriously. Return `chunk_size` and let /// dictionary fallback bound dict-encoded pages independently. /// - When `chunk_size == 0`, there's nothing to size. + /// + /// Inlined because the hot path through the bypass conditions reduces + /// to one struct-field load + one virtual call into the encoder and + /// keeping it across the module boundary stops the compiler from + /// folding it into the per-chunk loop's register schedule. #[allow(clippy::too_many_arguments)] + #[inline] pub(crate) fn pick_sub_batch_size( &self, encoder: &E, From 63eacafb3c2f27f7b0cc64caa9edad6c57a9740b Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 14 May 2026 14:57:56 -0700 Subject: [PATCH 07/17] perf(parquet): skip per-chunk vals_in_chunk computation when all values are non-null MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The chunker's per-chunk `partition_point` (arrow path) or `LevelDataRef::value_count` (non-arrow path) returns `chunk_size` by construction whenever the column has no nulls. The GKE bench showed ~+12–27% regressions on `list_primitive_non_null/*` and `string_non_null/*` consistent with that walk dominating: ~50 K chunks × a binary search through a 50 M-entry `non_null_indices` buffer means cold cache reads on every chunk. Compute a `ValueCountStrategy` once at `write_batch_internal` entry: - `AllPresent` — set when the arrow caller passed `non_null_indices.len() == num_levels`, or when the column has `max_def_level == 0`. The chunker uses `chunk_size` directly with no per-chunk work. - `Sorted(&[usize])` — arrow nullable path; binary-search the indices. - `DefLevelScan(max_def)` — non-arrow nullable path; def-level scan. For the bench's `list_primitive_non_null` (all-non-null lists with a 50 M-entry leaf), this drops the per-chunk binary search entirely; expected to bring those rows back near noise. Co-Authored-By: Claude Opus 4.7 --- .../src/column/writer/byte_budget_chunker.rs | 61 ++++++++++++++++--- parquet/src/column/writer/mod.rs | 3 + 2 files changed, 55 insertions(+), 9 deletions(-) diff --git a/parquet/src/column/writer/byte_budget_chunker.rs b/parquet/src/column/writer/byte_budget_chunker.rs index e353a8c5e655..c32b7797433b 100644 --- a/parquet/src/column/writer/byte_budget_chunker.rs +++ b/parquet/src/column/writer/byte_budget_chunker.rs @@ -38,6 +38,25 @@ use crate::column::writer::encoder::ColumnValueEncoder; use crate::file::properties::WriterProperties; use crate::schema::types::ColumnDescriptor; +/// Strategy for counting how many values fall in a chunk's level range. +/// Computed once per `write_batch_internal` call rather than per chunk so +/// `partition_point` and `LevelDataRef::value_count` don't run when their +/// answer is statically known to be `chunk_size`. +#[derive(Clone, Copy)] +pub(crate) enum ValueCountStrategy<'a> { + /// Every level corresponds to a non-null value, so the answer is + /// always `chunk_size`. Either the column has `max_def_level == 0` + /// or the arrow caller's `non_null_indices.len() == num_levels`. + AllPresent, + /// Arrow nullable path: binary-search the sorted `non_null_indices` + /// for the chunk's level range. O(log n) per chunk. + Sorted(&'a [usize]), + /// Non-arrow nullable path: scan the def-level slice for entries + /// matching `max_def`. O(n) per chunk; only used when no sorted + /// `value_indices` were supplied. + DefLevelScan(i16), +} + /// Per-column-open chunker that picks byte-budget-aware mini-batch sizes. pub(crate) struct ByteBudgetChunker { /// Configured data page byte limit for the column. @@ -48,9 +67,6 @@ pub(crate) struct ByteBudgetChunker { /// decision short-circuit with no work for every numeric, bool, or /// narrow `FIXED_LEN_BYTE_ARRAY` column. static_always_fits: bool, - /// Column's `max_def_level`, needed by `LevelDataRef::value_count` for - /// the non-arrow path where we don't have a sorted `non_null_indices`. - max_def_level: i16, } impl ByteBudgetChunker { @@ -75,7 +91,29 @@ impl ByteBudgetChunker { Self { page_byte_limit, static_always_fits, - max_def_level: descr.max_def_level(), + } + } + + /// Pick the cheapest strategy for `vals_in_chunk` queries for this + /// `write_batch_internal` call. Computed once and reused per chunk so + /// we don't repeat the check on every iteration. + #[inline] + pub(crate) fn value_count_strategy<'a>( + descr: &ColumnDescriptor, + value_indices: Option<&'a [usize]>, + num_levels: usize, + ) -> ValueCountStrategy<'a> { + match value_indices { + // Arrow path. If every level has a non-null value, the gather + // index is the trivial `0..num_levels` and we don't need to + // walk it per chunk — `vals_in_chunk == chunk_size` by + // construction. + Some(idx) if idx.len() == num_levels => ValueCountStrategy::AllPresent, + Some(idx) => ValueCountStrategy::Sorted(idx), + // Non-arrow path. `max_def_level == 0` means the column has + // no nullability, so again `vals_in_chunk == chunk_size`. + None if descr.max_def_level() == 0 => ValueCountStrategy::AllPresent, + None => ValueCountStrategy::DefLevelScan(descr.max_def_level()), } } @@ -108,6 +146,7 @@ impl ByteBudgetChunker { values: &E::Values, value_indices: Option<&[usize]>, chunk_def: LevelDataRef<'_>, + strategy: ValueCountStrategy<'_>, values_offset: usize, chunk_size: usize, end_offset: usize, @@ -116,11 +155,15 @@ impl ByteBudgetChunker { return chunk_size; } // Count how many values fall in this chunk's level range. The - // arrow path passes a sorted `non_null_indices`, so this is one - // binary search; otherwise we walk the def-level slice. - let vals_in_chunk = match value_indices { - Some(idx) => idx[values_offset..].partition_point(|&i| i < end_offset), - None => chunk_def.value_count(chunk_size, self.max_def_level), + // strategy was picked once per `write_batch_internal` call so + // the common all-non-null case (every level has a value) skips + // the per-chunk binary search and def-level scan entirely. + let vals_in_chunk = match strategy { + ValueCountStrategy::AllPresent => chunk_size, + ValueCountStrategy::Sorted(idx) => { + idx[values_offset..].partition_point(|&i| i < end_offset) + } + ValueCountStrategy::DefLevelScan(max_def) => chunk_def.value_count(chunk_size, max_def), }; if vals_in_chunk == 0 { return chunk_size; diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index 53662e1b2990..786f7597672d 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -566,6 +566,8 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { self.props.write_batch_size() }; let chunker = ByteBudgetChunker::new(&self.descr, &self.props, base_batch_size); + let value_count_strategy = + ByteBudgetChunker::value_count_strategy(&self.descr, value_indices, num_levels); while levels_offset < num_levels { let mut end_offset = num_levels.min(levels_offset + base_batch_size); @@ -585,6 +587,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { values, value_indices, chunk_def, + value_count_strategy, values_offset, chunk_size, end_offset, From 9f443d1d926afb366eb43c1ad9bb49b06af05753 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 14 May 2026 16:13:56 -0700 Subject: [PATCH 08/17] perf(parquet): force-inline ByteBudgetChunker hot path, split cold path out MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous `#[inline]` hint was no longer enough once `pick_sub_batch_size` grew the `ValueCountStrategy` match — LLVM silently stopped inlining and the most recent GKE bench bounced `string_dictionary/*` back to +46–81% (`default` +81%, `parquet_2` +86%, `bloom_filter` +46%). Fix: 1. Mark `pick_sub_batch_size` `#[inline(always)]`. The hot path is just `if static_always_fits || has_dictionary || chunk_size == 0 { return chunk_size; }` — one struct-field load + one virtual call — so unconditional inlining is the right call, not a heuristic suggestion. 2. Pull the byte-budget computation out into a separate `byte_budget_sub_batch_size` method marked `#[inline(never)]`. This keeps the inlined fast path small even as the slow path grows; the slow path is paid for explicitly when bypasses don't fire, not smuggled into every chunk's inline body. Same behavior, just compiler-friendlier code layout. Co-Authored-By: Claude Opus 4.7 --- .../src/column/writer/byte_budget_chunker.rs | 39 ++++++++++++++++--- 1 file changed, 34 insertions(+), 5 deletions(-) diff --git a/parquet/src/column/writer/byte_budget_chunker.rs b/parquet/src/column/writer/byte_budget_chunker.rs index c32b7797433b..63c34cf2d3d0 100644 --- a/parquet/src/column/writer/byte_budget_chunker.rs +++ b/parquet/src/column/writer/byte_budget_chunker.rs @@ -134,12 +134,14 @@ impl ByteBudgetChunker { /// dictionary fallback bound dict-encoded pages independently. /// - When `chunk_size == 0`, there's nothing to size. /// - /// Inlined because the hot path through the bypass conditions reduces - /// to one struct-field load + one virtual call into the encoder and - /// keeping it across the module boundary stops the compiler from - /// folding it into the per-chunk loop's register schedule. + /// Hot path: when one of the bypass conditions fires this returns + /// `chunk_size` with one struct-field load and one virtual call into + /// the encoder. Marked `#[inline(always)]` because LLVM's heuristic + /// would otherwise refuse to inline now that the slow path lives + /// nearby — the GKE bench showed a +80% regression on + /// `string_dictionary/*` when the hint was just `#[inline]`. #[allow(clippy::too_many_arguments)] - #[inline] + #[inline(always)] pub(crate) fn pick_sub_batch_size( &self, encoder: &E, @@ -154,6 +156,33 @@ impl ByteBudgetChunker { if self.static_always_fits || encoder.has_dictionary() || chunk_size == 0 { return chunk_size; } + self.byte_budget_sub_batch_size::( + values, + value_indices, + chunk_def, + strategy, + values_offset, + chunk_size, + end_offset, + ) + } + + /// Cold path: the encoder is plain-encoding and the bypass conditions + /// didn't fire, so we have to look at value sizes to decide whether + /// the chunk fits. Pulled out of `pick_sub_batch_size` and marked + /// `#[inline(never)]` so the inlined fast path stays small. + #[allow(clippy::too_many_arguments)] + #[inline(never)] + fn byte_budget_sub_batch_size( + &self, + values: &E::Values, + value_indices: Option<&[usize]>, + chunk_def: LevelDataRef<'_>, + strategy: ValueCountStrategy<'_>, + values_offset: usize, + chunk_size: usize, + end_offset: usize, + ) -> usize { // Count how many values fall in this chunk's level range. The // strategy was picked once per `write_batch_internal` call so // the common all-non-null case (every level has a value) skips From 77ebc07be9b3e377bcbf8feb1768adb2dc8f4118 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 14 May 2026 22:10:36 -0700 Subject: [PATCH 09/17] perf(parquet): mark cold paths #[cold] so they move out of hot icache The GKE bench shows `string_dictionary/*` consistently ~+80% across every branch commit, even though the chunker's fast path returns `chunk_size` with a single struct-field load while `has_dictionary()` is true (which it is for the entire `string_dictionary` bench since `create_random_batch` produces a low-cardinality dict that doesn't spill the writer's encoder). Working hypothesis: the regression is icache pressure from the new code's mere presence. The cold path (`byte_budget_sub_batch_size`, `write_granular_chunk`) is never executed for `string_dictionary` but sits inline near the encoder's hot path and pushes hot bytes out of L1i. Mark both cold paths `#[cold]` so LLVM places them in a separate text section. The hot encoder loop should stay tighter in icache. This is a hypothesis-driven attempt; if GKE doesn't move it tells us the regression source is somewhere else and we keep digging. Co-Authored-By: Claude Opus 4.7 --- parquet/src/column/writer/byte_budget_chunker.rs | 5 ++++- parquet/src/column/writer/mod.rs | 7 +++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/parquet/src/column/writer/byte_budget_chunker.rs b/parquet/src/column/writer/byte_budget_chunker.rs index 63c34cf2d3d0..07bc5da16738 100644 --- a/parquet/src/column/writer/byte_budget_chunker.rs +++ b/parquet/src/column/writer/byte_budget_chunker.rs @@ -170,9 +170,12 @@ impl ByteBudgetChunker { /// Cold path: the encoder is plain-encoding and the bypass conditions /// didn't fire, so we have to look at value sizes to decide whether /// the chunk fits. Pulled out of `pick_sub_batch_size` and marked - /// `#[inline(never)]` so the inlined fast path stays small. + /// `#[inline(never)]` + `#[cold]` so the inlined fast path stays + /// small and the dead-code placement signal pushes this body + /// physically away from the hot encoder loop's icache footprint. #[allow(clippy::too_many_arguments)] #[inline(never)] + #[cold] fn byte_budget_sub_batch_size( &self, values: &E::Values, diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index 786f7597672d..0b0e0cf63865 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -774,7 +774,14 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { /// record never spans data pages, matching the parquet format rule. /// /// Returns the total number of values consumed across all sub-batches. + /// + /// Marked `#[cold]` because the byte-budget path that calls this + /// fires only on columns whose values are individually larger than + /// `data_page_size_limit / write_batch_size` (e.g. multi-MiB + /// BYTE_ARRAY blobs). Keeping it out of the hot section lets the + /// hot `write_mini_batch` path keep its icache locality. #[allow(clippy::too_many_arguments)] + #[cold] fn write_granular_chunk( &mut self, values: &E::Values, From 4b926357d37541c87c4ca6d81a3bcdf82d640c6f Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Fri, 15 May 2026 15:13:50 +0000 Subject: [PATCH 10/17] fix(parquet): correct nested-column value counting in ByteBudgetChunker MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `value_count_strategy` picked the `Sorted` strategy for any arrow column with `value_indices`. `Sorted` runs `partition_point(|&i| i < end_offset)`, comparing leaf-value indices against a level offset — coordinate spaces that only coincide for flat columns. For repeated/nested columns the leaf values array is decoupled from the rep/def level stream, so `vals_in_chunk` drifts upward without bound as empty-list / sub-`max_def` levels accumulate, spuriously triggering granular sub-batching on columns whose values are small. This was the consistent `list_primitive_non_null` regression (+18-32% across CI runs). Repeated columns (`max_rep_level > 0`) now count values via `DefLevelScan`. Also in this commit: - `count_within_budget_offsets`: drop the Stage-1 contiguity gate. The offset span is a valid O(1) upper bound for any sorted index set, so nullable offset columns skip the O(n) per-chunk walk too. - `write_granular_chunk`: pack whole records up to `sub_batch_size` per mini-batch instead of one record per mini-batch (~25x fewer `write_mini_batch` calls when granular mode fires on lists of large values). - Move `plain_encoded_byte_size` to the end of `encoder.rs`: defining it above the `ColumnValueEncoder` trait shifted downstream compiled code and perturbed unrelated string-writer benchmark layout. Co-Authored-By: Claude Opus 4.7 (1M context) --- parquet/src/arrow/arrow_writer/byte_array.rs | 35 +++++--- .../src/column/writer/byte_budget_chunker.rs | 87 +++++++++++++++++-- parquet/src/column/writer/encoder.rs | 63 ++++++++------ parquet/src/column/writer/mod.rs | 20 ++++- 4 files changed, 159 insertions(+), 46 deletions(-) diff --git a/parquet/src/arrow/arrow_writer/byte_array.rs b/parquet/src/arrow/arrow_writer/byte_array.rs index 52d02ffbd175..5c34eadea343 100644 --- a/parquet/src/arrow/arrow_writer/byte_array.rs +++ b/parquet/src/arrow/arrow_writer/byte_array.rs @@ -712,16 +712,25 @@ fn count_within_budget_views(views: &[u128], indices: &[usize], byte_budget: usi } /// Two-stage fast path for `GenericByteArray` -/// (Utf8/LargeUtf8/Binary/LargeBinary): +/// (Utf8/LargeUtf8/Binary/LargeBinary). /// -/// 1. If `indices` are contiguous and sorted, the total payload size -/// is one subtraction on the offsets buffer -/// (`offsets[last+1] - offsets[first]`). If the chunk fits within -/// `byte_budget`, return `indices.len()` immediately — the -/// overwhelmingly common case for non-null columns. -/// 2. Otherwise, walk per-index lengths from the offsets buffer -/// directly (no slice/UTF-8 construction) and stop at the first -/// value that pushes the cumulative sum past the budget. +/// `indices` are assumed sorted ascending — they always are here, since +/// they come from `non_null_indices`, which is built in array order. +/// +/// 1. The span `offsets[last+1] - offsets[first]` is an O(1) upper +/// bound on the chunk's payload: it covers every array position in +/// `[first, last]`, a superset of `indices`. For a non-null chunk +/// `indices` *is* that whole range; for a chunk drawn from a +/// nullable column the skipped positions are nulls, whose offset +/// delta is zero, so the span still equals the exact payload. +/// Either way, if the upper bound fits the budget every value +/// fits — return `indices.len()` with no per-value work. This +/// covers the overwhelmingly common "small values" case for both +/// non-null *and* nullable columns. +/// 2. Otherwise the chunk is genuinely near the budget: walk per-index +/// lengths from the offsets buffer directly (no slice/UTF-8 +/// construction) and stop at the first value that pushes the +/// cumulative sum past the budget. fn count_within_budget_offsets( values: &GenericByteArray, indices: &[usize], @@ -736,8 +745,12 @@ fn count_within_budget_offsets( let offsets = values.value_offsets(); let prefix_overhead = std::mem::size_of::(); - // Stage 1: O(1) contiguous total. Skips Stage 2 in the common case. - if last >= first && last - first + 1 == n { + // Stage 1: O(1) span upper bound. Skips Stage 2 in the common case — + // including nullable columns, whose `indices` are sparse. The earlier + // `last - first + 1 == n` contiguity gate forced every nullable chunk + // onto the O(n) Stage 2 walk even though the span check is valid for + // any sorted index set. + if last >= first { let payload = (offsets[last + 1] - offsets[first]).as_usize(); if payload + n * prefix_overhead <= byte_budget { return n; diff --git a/parquet/src/column/writer/byte_budget_chunker.rs b/parquet/src/column/writer/byte_budget_chunker.rs index 07bc5da16738..35b3d97b2503 100644 --- a/parquet/src/column/writer/byte_budget_chunker.rs +++ b/parquet/src/column/writer/byte_budget_chunker.rs @@ -48,12 +48,15 @@ pub(crate) enum ValueCountStrategy<'a> { /// always `chunk_size`. Either the column has `max_def_level == 0` /// or the arrow caller's `non_null_indices.len() == num_levels`. AllPresent, - /// Arrow nullable path: binary-search the sorted `non_null_indices` - /// for the chunk's level range. O(log n) per chunk. + /// Flat (`max_rep_level == 0`) arrow nullable path: `non_null_indices` + /// hold row positions, which coincide with level offsets, so + /// `partition_point` over the chunk's level range counts values + /// directly. O(log n) per chunk. Sorted(&'a [usize]), - /// Non-arrow nullable path: scan the def-level slice for entries - /// matching `max_def`. O(n) per chunk; only used when no sorted - /// `value_indices` were supplied. + /// Scan the chunk's def-level slice for entries matching `max_def`. + /// O(n) per chunk. Used for the non-arrow nullable path and for + /// repeated/nested columns, where `value_indices` index into the + /// decoupled leaf values array rather than the level stream. DefLevelScan(i16), } @@ -109,6 +112,19 @@ impl ByteBudgetChunker { // walk it per chunk — `vals_in_chunk == chunk_size` by // construction. Some(idx) if idx.len() == num_levels => ValueCountStrategy::AllPresent, + // Repeated/nested arrow columns: `value_indices` index into the + // leaf values array, which is decoupled from the rep/def level + // stream. A `partition_point` of those indices against a level + // offset is meaningless — it makes `vals_in_chunk` drift away + // from the true per-chunk value count (it grows without bound + // as empty-list / sub-`max_def` levels accumulate, eventually + // forcing spurious granular sub-batching). Count via def levels + // instead. The `Sorted` fast path is only valid for flat + // columns, where `non_null_indices` are row positions that + // coincide with level offsets. + Some(_) if descr.max_rep_level() > 0 => { + ValueCountStrategy::DefLevelScan(descr.max_def_level()) + } Some(idx) => ValueCountStrategy::Sorted(idx), // Non-arrow path. `max_def_level == 0` means the column has // no nullability, so again `vals_in_chunk == chunk_size`. @@ -240,3 +256,64 @@ impl ByteBudgetChunker { } } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::basic::Type as PhysicalType; + use crate::schema::types::{ColumnPath, Type as SchemaType}; + use std::sync::Arc; + + fn descr(max_def_level: i16, max_rep_level: i16) -> ColumnDescriptor { + let tpe = SchemaType::primitive_type_builder("col", PhysicalType::BYTE_ARRAY) + .build() + .unwrap(); + ColumnDescriptor::new( + Arc::new(tpe), + max_def_level, + max_rep_level, + ColumnPath::from("col"), + ) + } + + #[test] + fn value_count_strategy_uses_def_scan_for_repeated_columns() { + // Regression: for a repeated/nested column the arrow `value_indices` + // index into the leaf values array, which is decoupled from the + // level stream. The `Sorted` strategy's `partition_point` against a + // level offset is meaningless there and makes `vals_in_chunk` drift + // without bound, spuriously triggering granular sub-batching. + // A repeated column with `idx.len() != num_levels` must resolve to + // `DefLevelScan`, never `Sorted`. + let d = descr(1, 1); + let indices = [0usize, 1, 2, 3]; + let strategy = + ByteBudgetChunker::value_count_strategy(&d, Some(&indices), /* num_levels */ 6); + assert!( + matches!(strategy, ValueCountStrategy::DefLevelScan(1)), + "repeated column must count values via def levels, not the \ + level-offset partition_point" + ); + } + + #[test] + fn value_count_strategy_keeps_sorted_for_flat_nullable_columns() { + // Flat (`max_rep_level == 0`) nullable columns keep the cheap + // `Sorted` strategy: there `non_null_indices` are row positions, + // which do coincide with level offsets. + let d = descr(1, 0); + let indices = [0usize, 2, 5]; + let strategy = + ByteBudgetChunker::value_count_strategy(&d, Some(&indices), /* num_levels */ 8); + assert!(matches!(strategy, ValueCountStrategy::Sorted(_))); + } + + #[test] + fn value_count_strategy_all_present_when_every_level_has_a_value() { + let d = descr(1, 1); + let indices = [0usize, 1, 2, 3]; + let strategy = + ByteBudgetChunker::value_count_strategy(&d, Some(&indices), /* num_levels */ 4); + assert!(matches!(strategy, ValueCountStrategy::AllPresent)); + } +} diff --git a/parquet/src/column/writer/encoder.rs b/parquet/src/column/writer/encoder.rs index 537bf39353a9..83c08e7423fa 100644 --- a/parquet/src/column/writer/encoder.rs +++ b/parquet/src/column/writer/encoder.rs @@ -32,34 +32,6 @@ use crate::geospatial::accumulator::{GeoStatsAccumulator, try_new_geo_stats_accu use crate::geospatial::statistics::GeospatialStatistics; use crate::schema::types::{ColumnDescPtr, ColumnDescriptor}; -/// Plain-encoded byte cost of a single value of type `T::T`. -/// -/// Derived from [`ParquetValueType::dict_encoding_size`] so we don't add a -/// parallel per-value-size hook to the trait. The components returned by -/// `dict_encoding_size` are `(per-value overhead, value-bytes)`. For -/// plain encoding the on-disk layout is: -/// -/// - `BYTE_ARRAY`: 4-byte length prefix + payload bytes = `overhead + bytes`. -/// - `FIXED_LEN_BYTE_ARRAY`: raw bytes only, taken from the type descriptor's -/// `type_length`. The value's own `dict_encoding_size` reports the length -/// prefix, which is irrelevant for plain FLBA encoding; the encoder passes -/// `type_length` directly. -/// - Everything else (numeric / bool): a constant per-value size; the caller -/// already short-circuits these via `mem::size_of::()` before -/// touching this function, so this branch is unreachable in practice and -/// we fall back to `overhead` defensively. -/// -/// See `dict_encoder.rs::push` (line ~52) for the matching dispatch. -#[inline] -fn plain_encoded_byte_size(value: &T::T) -> usize { - let (overhead, bytes) = value.dict_encoding_size(); - match ::PHYSICAL_TYPE { - Type::BYTE_ARRAY => overhead + bytes, - Type::FIXED_LEN_BYTE_ARRAY => bytes, - _ => overhead, - } -} - /// A collection of [`ParquetValueType`] encoded by a [`ColumnValueEncoder`] pub trait ColumnValues { /// The number of values in this collection @@ -532,3 +504,38 @@ where } } } + +/// Plain-encoded byte cost of a single value of type `T::T`. +/// +/// Derived from [`ParquetValueType::dict_encoding_size`] so we don't add a +/// parallel per-value-size hook to the trait. The components returned by +/// `dict_encoding_size` are `(per-value overhead, value-bytes)`. For +/// plain encoding the on-disk layout is: +/// +/// - `BYTE_ARRAY`: 4-byte length prefix + payload bytes = `overhead + bytes`. +/// - `FIXED_LEN_BYTE_ARRAY`: raw bytes only, taken from the type descriptor's +/// `type_length`. The value's own `dict_encoding_size` reports the length +/// prefix, which is irrelevant for plain FLBA encoding; the encoder passes +/// `type_length` directly. +/// - Everything else (numeric / bool): a constant per-value size; the caller +/// already short-circuits these via `mem::size_of::()` before +/// touching this function, so this branch is unreachable in practice and +/// we fall back to `overhead` defensively. +/// +/// See `dict_encoder.rs::push` (line ~52) for the matching dispatch. +/// +/// Placed at the end of the module deliberately. Inserting it above the +/// `ColumnValueEncoder` trait shifts the trait and `ColumnValueEncoderImpl` +/// within the compiled module enough to perturb downstream code placement, +/// which measurably regresses unrelated arrow-writer string benchmarks +/// (~5-9% on `string` / `string_and_binary_view`). Defining it last keeps +/// the hot encoder code at the offsets it has on `main`. +#[inline] +fn plain_encoded_byte_size(value: &T::T) -> usize { + let (overhead, bytes) = value.dict_encoding_size(); + match ::PHYSICAL_TYPE { + Type::BYTE_ARRAY => overhead + bytes, + Type::FIXED_LEN_BYTE_ARRAY => bytes, + _ => overhead, + } +} diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index 0b0e0cf63865..e6b776931c6c 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -797,11 +797,27 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { while sub_start < chunk_size { let sub_end = match chunk_rep { LevelDataRef::Materialized(levels) => { - // Step from one record boundary to the next. - let mut e = sub_start + 1; + // Pack up to `sub_batch_size` levels per mini-batch, then + // extend to the next record boundary (rep == 0) so a + // record never spans data pages. Packing whole records + // rather than stepping one record at a time avoids + // emitting a `write_mini_batch` per record: records + // average only a handful of levels, so the + // record-at-a-time loop issued roughly `sub_batch_size`× + // more mini-batches than necessary. + let mut e = (sub_start + sub_batch_size).min(chunk_size); while e < chunk_size && levels[e] != 0 { e += 1; } + // `sub_batch_size` can be 0 when the chunker sizes a + // sub-batch below one record; always make progress by + // consuming at least the first whole record. + if e == sub_start { + e = sub_start + 1; + while e < chunk_size && levels[e] != 0 { + e += 1; + } + } e } _ => (sub_start + sub_batch_size).min(chunk_size), From beb5fc25f0c196619102dfe5af5f79ee75dddff8 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Fri, 15 May 2026 15:47:31 +0000 Subject: [PATCH 11/17] refactor(parquet): drop ValueCountStrategy, count values via def levels MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `ValueCountStrategy` was a 3-way precomputed enum (`AllPresent` / `Sorted` / `DefLevelScan`) for answering "how many of this chunk's levels carry a value". `LevelDataRef::value_count` already answers that correctly for every column shape — `Absent`/`Uniform` def levels resolve in O(1), and the O(n) scan only runs for genuinely materialized (nullable/nested) def levels, on the variable-width slow path the chunker is already on. The `Sorted` variant — `partition_point` of leaf-value indices against a level offset — was only ever valid for flat columns; for nested columns those indices live in a different coordinate space, which is what made `vals_in_chunk` drift and spuriously trigger granular sub-batching (`list_primitive_non_null` regression). Deleting the enum removes that bug class structurally rather than guarding against it. Net effect: the chunker module drops from ~320 to ~173 lines, the `'a` lifetime and two parameters disappear from the chunker API, and `ByteBudgetChunker` just stores `max_def_level`. `pick_sub_batch_size` goes back to a plain `#[inline]` (the `#[inline(always)]` was added chasing a `string_dictionary` swing later confirmed to be code-layout noise, not an inlining effect). Perf-neutral — `value_count` vs the old `partition_point` is negligible and only on the post-dict-spill path. `LevelDataRef::value_count` gains a unit test as the now load-bearing value-counting primitive. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../src/column/writer/byte_budget_chunker.rs | 240 ++++-------------- parquet/src/column/writer/mod.rs | 40 ++- 2 files changed, 83 insertions(+), 197 deletions(-) diff --git a/parquet/src/column/writer/byte_budget_chunker.rs b/parquet/src/column/writer/byte_budget_chunker.rs index 35b3d97b2503..73b16266f9cf 100644 --- a/parquet/src/column/writer/byte_budget_chunker.rs +++ b/parquet/src/column/writer/byte_budget_chunker.rs @@ -21,16 +21,16 @@ //! The parquet column writer checks the data page byte limit only *after* //! each mini-batch finishes writing. Mini-batches are sized in rows //! (`write_batch_size`, default 1024), so for BYTE_ARRAY columns whose -//! values are large (e.g. multi-MiB blobs), a single mini-batch can buffer +//! values are large (e.g. multi-MiB blobs) a single mini-batch can buffer //! GiB into one page before the limit is consulted. //! //! This module isolates the per-chunk decision that prevents that: given a //! chunk's level data and the input values, pick the largest `sub_batch_size` -//! such that one mini-batch will fit in one page byte budget. For the -//! overwhelmingly common case (small values), the answer is just -//! `chunk_size` and the decision is O(1) on the column type. Only when the -//! input might overflow does the chunker consult the encoder's byte -//! estimate. +//! such that one mini-batch fits in one page byte budget. For the +//! overwhelmingly common case (small or fixed-width values) the answer is +//! just `chunk_size` and the decision is O(1) on the column type — only +//! when the input might overflow does the chunker consult the encoder's +//! byte estimate. use crate::basic::Type; use crate::column::writer::LevelDataRef; @@ -38,32 +38,13 @@ use crate::column::writer::encoder::ColumnValueEncoder; use crate::file::properties::WriterProperties; use crate::schema::types::ColumnDescriptor; -/// Strategy for counting how many values fall in a chunk's level range. -/// Computed once per `write_batch_internal` call rather than per chunk so -/// `partition_point` and `LevelDataRef::value_count` don't run when their -/// answer is statically known to be `chunk_size`. -#[derive(Clone, Copy)] -pub(crate) enum ValueCountStrategy<'a> { - /// Every level corresponds to a non-null value, so the answer is - /// always `chunk_size`. Either the column has `max_def_level == 0` - /// or the arrow caller's `non_null_indices.len() == num_levels`. - AllPresent, - /// Flat (`max_rep_level == 0`) arrow nullable path: `non_null_indices` - /// hold row positions, which coincide with level offsets, so - /// `partition_point` over the chunk's level range counts values - /// directly. O(log n) per chunk. - Sorted(&'a [usize]), - /// Scan the chunk's def-level slice for entries matching `max_def`. - /// O(n) per chunk. Used for the non-arrow nullable path and for - /// repeated/nested columns, where `value_indices` index into the - /// decoupled leaf values array rather than the level stream. - DefLevelScan(i16), -} - -/// Per-column-open chunker that picks byte-budget-aware mini-batch sizes. +/// Picks byte-budget-aware mini-batch sizes for one column. pub(crate) struct ByteBudgetChunker { /// Configured data page byte limit for the column. page_byte_limit: usize, + /// Max definition level of the column; a level equal to this marks a + /// present (non-null) leaf value. Used to count values per chunk. + max_def_level: i16, /// `true` when no chunk of `base_batch_size` values can ever overflow /// `page_byte_limit` regardless of input. Set once at column open from /// the physical type's known per-value byte size; lets the per-chunk @@ -93,103 +74,49 @@ impl ByteBudgetChunker { .unwrap_or(false); Self { page_byte_limit, + max_def_level: descr.max_def_level(), static_always_fits, } } - /// Pick the cheapest strategy for `vals_in_chunk` queries for this - /// `write_batch_internal` call. Computed once and reused per chunk so - /// we don't repeat the check on every iteration. - #[inline] - pub(crate) fn value_count_strategy<'a>( - descr: &ColumnDescriptor, - value_indices: Option<&'a [usize]>, - num_levels: usize, - ) -> ValueCountStrategy<'a> { - match value_indices { - // Arrow path. If every level has a non-null value, the gather - // index is the trivial `0..num_levels` and we don't need to - // walk it per chunk — `vals_in_chunk == chunk_size` by - // construction. - Some(idx) if idx.len() == num_levels => ValueCountStrategy::AllPresent, - // Repeated/nested arrow columns: `value_indices` index into the - // leaf values array, which is decoupled from the rep/def level - // stream. A `partition_point` of those indices against a level - // offset is meaningless — it makes `vals_in_chunk` drift away - // from the true per-chunk value count (it grows without bound - // as empty-list / sub-`max_def` levels accumulate, eventually - // forcing spurious granular sub-batching). Count via def levels - // instead. The `Sorted` fast path is only valid for flat - // columns, where `non_null_indices` are row positions that - // coincide with level offsets. - Some(_) if descr.max_rep_level() > 0 => { - ValueCountStrategy::DefLevelScan(descr.max_def_level()) - } - Some(idx) => ValueCountStrategy::Sorted(idx), - // Non-arrow path. `max_def_level == 0` means the column has - // no nullability, so again `vals_in_chunk == chunk_size`. - None if descr.max_def_level() == 0 => ValueCountStrategy::AllPresent, - None => ValueCountStrategy::DefLevelScan(descr.max_def_level()), - } - } - - /// Decide how many levels at the start of `chunk_def` belong in one - /// mini-batch. - /// - /// Returns `chunk_size` when the whole chunk fits in one page byte - /// budget. A smaller number triggers granular sub-batching in - /// `write_batch_internal`'s `write_granular_chunk` arm. + /// Decide how many levels at the start of a chunk belong in one + /// mini-batch. Returns `chunk_size` when the whole chunk fits in one + /// page byte budget; a smaller value triggers granular sub-batching in + /// `write_batch_internal`. /// - /// Bypasses: - /// - When `static_always_fits` is true (fixed-width type with a - /// safe `base_batch_size`), return `chunk_size`. - /// - When the encoder is currently dictionary-encoding, - /// `estimated_value_bytes` would return plain-encoded bytes while - /// the actual page only stores small RLE indices, so the budget - /// would shrink pages spuriously. Return `chunk_size` and let - /// dictionary fallback bound dict-encoded pages independently. - /// - When `chunk_size == 0`, there's nothing to size. + /// Returns `chunk_size` immediately (no value inspection) when: + /// - the column is a fixed-width type that statically cannot overflow + /// (`static_always_fits`); + /// - the encoder is currently dictionary-encoding — a dict-encoded data + /// page only stores small RLE indices, so a plain-encoded byte + /// estimate would shrink pages spuriously; dictionary fallback bounds + /// those pages independently; + /// - the chunk is empty. /// - /// Hot path: when one of the bypass conditions fires this returns - /// `chunk_size` with one struct-field load and one virtual call into - /// the encoder. Marked `#[inline(always)]` because LLVM's heuristic - /// would otherwise refuse to inline now that the slow path lives - /// nearby — the GKE bench showed a +80% regression on - /// `string_dictionary/*` when the hint was just `#[inline]`. - #[allow(clippy::too_many_arguments)] - #[inline(always)] + /// `#[inline]`: this is a tiny per-chunk dispatcher; the actual byte + /// inspection lives in the out-of-line `byte_budget_sub_batch_size`. + #[inline] pub(crate) fn pick_sub_batch_size( &self, encoder: &E, values: &E::Values, value_indices: Option<&[usize]>, chunk_def: LevelDataRef<'_>, - strategy: ValueCountStrategy<'_>, values_offset: usize, chunk_size: usize, - end_offset: usize, ) -> usize { if self.static_always_fits || encoder.has_dictionary() || chunk_size == 0 { return chunk_size; } - self.byte_budget_sub_batch_size::( - values, - value_indices, - chunk_def, - strategy, - values_offset, - chunk_size, - end_offset, - ) + self.byte_budget_sub_batch_size::(values, value_indices, chunk_def, values_offset, chunk_size) } - /// Cold path: the encoder is plain-encoding and the bypass conditions - /// didn't fire, so we have to look at value sizes to decide whether - /// the chunk fits. Pulled out of `pick_sub_batch_size` and marked - /// `#[inline(never)]` + `#[cold]` so the inlined fast path stays - /// small and the dead-code placement signal pushes this body - /// physically away from the hot encoder loop's icache footprint. - #[allow(clippy::too_many_arguments)] + /// Inspect value sizes to decide how much of the chunk fits in a page. + /// + /// Reached once per chunk for variable-width (`BYTE_ARRAY`) columns + /// while plain-encoding — numeric, bool and dictionary-encoded columns + /// never get here, so it is `#[cold]` / `#[inline(never)]`: keeping it + /// out of line keeps the hot `write_batch_internal` loop small. #[inline(never)] #[cold] fn byte_budget_sub_batch_size( @@ -197,38 +124,26 @@ impl ByteBudgetChunker { values: &E::Values, value_indices: Option<&[usize]>, chunk_def: LevelDataRef<'_>, - strategy: ValueCountStrategy<'_>, values_offset: usize, chunk_size: usize, - end_offset: usize, ) -> usize { - // Count how many values fall in this chunk's level range. The - // strategy was picked once per `write_batch_internal` call so - // the common all-non-null case (every level has a value) skips - // the per-chunk binary search and def-level scan entirely. - let vals_in_chunk = match strategy { - ValueCountStrategy::AllPresent => chunk_size, - ValueCountStrategy::Sorted(idx) => { - idx[values_offset..].partition_point(|&i| i < end_offset) - } - ValueCountStrategy::DefLevelScan(max_def) => chunk_def.value_count(chunk_size, max_def), - }; + // How many of this chunk's levels carry an actual value. For a + // non-nullable, unrepeated column every level is a value, so + // `value_count` is O(1) (`Absent`/`Uniform` def levels); only + // nullable or nested columns pay the O(chunk_size) def-level scan. + let vals_in_chunk = chunk_def.value_count(chunk_size, self.max_def_level); if vals_in_chunk == 0 { return chunk_size; } - // Ask the encoder how many of the next values fit in one page - // byte budget. Dispatch on whether the caller supplied gather - // indices; this mirrors how `write_mini_batch` picks between - // `write_gather` and `write`. + // Ask the encoder how many of the next values fit in one page byte + // budget. Dispatch on whether the caller supplied gather indices; + // this mirrors how `write_mini_batch` picks `write_gather` vs + // `write`. let fit = match value_indices { Some(idx) => { let end = (values_offset + vals_in_chunk).min(idx.len()); let start = values_offset.min(end); - E::count_values_within_byte_budget_gather( - values, - &idx[start..end], - self.page_byte_limit, - ) + E::count_values_within_byte_budget_gather(values, &idx[start..end], self.page_byte_limit) } None => E::count_values_within_byte_budget( values, @@ -240,10 +155,10 @@ impl ByteBudgetChunker { match fit { None => chunk_size, Some(values_per_subbatch) => { - // Convert the value count from the encoder back into a - // level count. For non-nullable columns this is a no-op; - // for nullable, scale by the observed value-to-level - // ratio of the current chunk. + // Convert the value count back into a level count. For a + // non-nullable column this is a no-op; for nullable/nested + // columns scale by the chunk's observed value-to-level + // ratio. let levels_per_subbatch = if vals_in_chunk == chunk_size { values_per_subbatch } else { @@ -256,64 +171,3 @@ impl ByteBudgetChunker { } } } - -#[cfg(test)] -mod tests { - use super::*; - use crate::basic::Type as PhysicalType; - use crate::schema::types::{ColumnPath, Type as SchemaType}; - use std::sync::Arc; - - fn descr(max_def_level: i16, max_rep_level: i16) -> ColumnDescriptor { - let tpe = SchemaType::primitive_type_builder("col", PhysicalType::BYTE_ARRAY) - .build() - .unwrap(); - ColumnDescriptor::new( - Arc::new(tpe), - max_def_level, - max_rep_level, - ColumnPath::from("col"), - ) - } - - #[test] - fn value_count_strategy_uses_def_scan_for_repeated_columns() { - // Regression: for a repeated/nested column the arrow `value_indices` - // index into the leaf values array, which is decoupled from the - // level stream. The `Sorted` strategy's `partition_point` against a - // level offset is meaningless there and makes `vals_in_chunk` drift - // without bound, spuriously triggering granular sub-batching. - // A repeated column with `idx.len() != num_levels` must resolve to - // `DefLevelScan`, never `Sorted`. - let d = descr(1, 1); - let indices = [0usize, 1, 2, 3]; - let strategy = - ByteBudgetChunker::value_count_strategy(&d, Some(&indices), /* num_levels */ 6); - assert!( - matches!(strategy, ValueCountStrategy::DefLevelScan(1)), - "repeated column must count values via def levels, not the \ - level-offset partition_point" - ); - } - - #[test] - fn value_count_strategy_keeps_sorted_for_flat_nullable_columns() { - // Flat (`max_rep_level == 0`) nullable columns keep the cheap - // `Sorted` strategy: there `non_null_indices` are row positions, - // which do coincide with level offsets. - let d = descr(1, 0); - let indices = [0usize, 2, 5]; - let strategy = - ByteBudgetChunker::value_count_strategy(&d, Some(&indices), /* num_levels */ 8); - assert!(matches!(strategy, ValueCountStrategy::Sorted(_))); - } - - #[test] - fn value_count_strategy_all_present_when_every_level_has_a_value() { - let d = descr(1, 1); - let indices = [0usize, 1, 2, 3]; - let strategy = - ByteBudgetChunker::value_count_strategy(&d, Some(&indices), /* num_levels */ 4); - assert!(matches!(strategy, ValueCountStrategy::AllPresent)); - } -} diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index e6b776931c6c..c5c9c64c2137 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -566,8 +566,6 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { self.props.write_batch_size() }; let chunker = ByteBudgetChunker::new(&self.descr, &self.props, base_batch_size); - let value_count_strategy = - ByteBudgetChunker::value_count_strategy(&self.descr, value_indices, num_levels); while levels_offset < num_levels { let mut end_offset = num_levels.min(levels_offset + base_batch_size); @@ -587,10 +585,8 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { values, value_indices, chunk_def, - value_count_strategy, values_offset, chunk_size, - end_offset, ); if sub_batch_size >= chunk_size { @@ -5267,6 +5263,42 @@ mod tests { } } + #[test] + fn test_level_data_ref_value_count() { + // `value_count` is what the byte-budget chunker uses to convert a + // chunk's level span into a leaf-value count. It must work for any + // column shape — flat, nullable, or nested — because the leaf + // values array is decoupled from the rep/def level stream. + let max_def = 2; + // Non-nullable / unrepeated: no def levels materialized — every + // level is a value. + assert_eq!(LevelDataRef::Absent.value_count(64, max_def), 64); + // Uniform run of present values, and of nulls. + assert_eq!( + LevelDataRef::Uniform { + value: max_def, + count: 40 + } + .value_count(40, max_def), + 40 + ); + assert_eq!( + LevelDataRef::Uniform { + value: max_def - 1, + count: 40 + } + .value_count(40, max_def), + 0 + ); + // Materialized def levels (nullable / nested): only levels equal to + // `max_def` are values; empty-list / null levels are not. + let levels = [2i16, 0, 2, 1, 2, 2, 0]; + assert_eq!( + LevelDataRef::Materialized(&levels).value_count(levels.len(), max_def), + 4 + ); + } + #[test] fn test_uniform_def_levels_all_null() { // All-null column: def_level=0 (null) for every slot, no values written. From 5806635aaec4cda8eb5a49d567caeafd3e09ba23 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Fri, 15 May 2026 15:56:03 +0000 Subject: [PATCH 12/17] style(parquet): cargo fmt Co-Authored-By: Claude Opus 4.7 (1M context) --- parquet/src/column/writer/byte_budget_chunker.rs | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/parquet/src/column/writer/byte_budget_chunker.rs b/parquet/src/column/writer/byte_budget_chunker.rs index 73b16266f9cf..4794b2c1622a 100644 --- a/parquet/src/column/writer/byte_budget_chunker.rs +++ b/parquet/src/column/writer/byte_budget_chunker.rs @@ -108,7 +108,13 @@ impl ByteBudgetChunker { if self.static_always_fits || encoder.has_dictionary() || chunk_size == 0 { return chunk_size; } - self.byte_budget_sub_batch_size::(values, value_indices, chunk_def, values_offset, chunk_size) + self.byte_budget_sub_batch_size::( + values, + value_indices, + chunk_def, + values_offset, + chunk_size, + ) } /// Inspect value sizes to decide how much of the chunk fits in a page. @@ -143,7 +149,11 @@ impl ByteBudgetChunker { Some(idx) => { let end = (values_offset + vals_in_chunk).min(idx.len()); let start = values_offset.min(end); - E::count_values_within_byte_budget_gather(values, &idx[start..end], self.page_byte_limit) + E::count_values_within_byte_budget_gather( + values, + &idx[start..end], + self.page_byte_limit, + ) } None => E::count_values_within_byte_budget( values, From 97e8a4513220e2c6180133afd046205a653ddabb Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Fri, 15 May 2026 17:14:46 +0000 Subject: [PATCH 13/17] refactor(parquet): drop #[cold] hints, keep slow paths #[inline(never)] MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A 4-round alternating A/B (main vs branch-with-#[cold] vs branch-without, 8 benches) showed `#[cold]` is not load-bearing: removing it moved every bench by ≤1% with no consistent direction — pure noise. The comment justifying `#[cold]` on `byte_budget_sub_batch_size` was also wrong: it claimed the path "fires only on columns whose values are individually larger than data_page_size_limit / write_batch_size", when in fact it runs once per chunk for *every* variable-width column once dictionary encoding is abandoned (e.g. the small-string benchmarks). Dropped both `#[cold]` hints and the benchmark-archaeology comments. `#[inline(never)]` is kept on both slow-path helpers. The symbol table confirms it is doing real work — without it `byte_budget_sub_batch_size` and `write_granular_chunk` are inlined bodily into the hot `write_batch_internal` loop (0-1 vs 7 out-of-line copies). Keeping a ~40-line rarely-taken helper out of the hot loop is standard slow-path outlining; the A/B shows it costs nothing. Co-Authored-By: Claude Opus 4.7 (1M context) --- parquet/src/column/writer/byte_budget_chunker.rs | 8 +++----- parquet/src/column/writer/mod.rs | 10 ++++------ 2 files changed, 7 insertions(+), 11 deletions(-) diff --git a/parquet/src/column/writer/byte_budget_chunker.rs b/parquet/src/column/writer/byte_budget_chunker.rs index 4794b2c1622a..67d2fe25132a 100644 --- a/parquet/src/column/writer/byte_budget_chunker.rs +++ b/parquet/src/column/writer/byte_budget_chunker.rs @@ -119,12 +119,10 @@ impl ByteBudgetChunker { /// Inspect value sizes to decide how much of the chunk fits in a page. /// - /// Reached once per chunk for variable-width (`BYTE_ARRAY`) columns - /// while plain-encoding — numeric, bool and dictionary-encoded columns - /// never get here, so it is `#[cold]` / `#[inline(never)]`: keeping it - /// out of line keeps the hot `write_batch_internal` loop small. + /// `#[inline(never)]` keeps this slow path out of the hot + /// `write_batch_internal` loop; numeric, bool and dictionary-encoded + /// columns never reach it. #[inline(never)] - #[cold] fn byte_budget_sub_batch_size( &self, values: &E::Values, diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index c5c9c64c2137..ce41db60cb51 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -771,13 +771,11 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { /// /// Returns the total number of values consumed across all sub-batches. /// - /// Marked `#[cold]` because the byte-budget path that calls this - /// fires only on columns whose values are individually larger than - /// `data_page_size_limit / write_batch_size` (e.g. multi-MiB - /// BYTE_ARRAY blobs). Keeping it out of the hot section lets the - /// hot `write_mini_batch` path keep its icache locality. + /// `#[inline(never)]` keeps this slow path — only reached for + /// variable-width columns whose values need page splitting — out of + /// the hot `write_batch_internal` loop. #[allow(clippy::too_many_arguments)] - #[cold] + #[inline(never)] fn write_granular_chunk( &mut self, values: &E::Values, From 6980026937baf7535c393df22ba4e84ae0418b35 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 17 May 2026 04:31:38 +0000 Subject: [PATCH 14/17] fix(parquet): bound dictionary page size during dict encoding MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `ByteBudgetChunker` sub-batches mini-batches so a data page cannot overflow `data_page_size_limit`, but it short-circuited entirely while the encoder was dictionary-encoding. The dictionary page accumulates the distinct values themselves, and its spill check runs only once per mini-batch — so a single `write_batch_size` mini-batch of large values could buffer `write_batch_size * value_size` bytes into the dictionary page before the check fired (e.g. a 1024-row mini-batch of 256 KiB values produced a 256 MiB dictionary page against a 1 MiB limit). Drop the dict-encoding short-circuit: the chunker now sub-batches the dictionary-encoding phase against the dictionary page's remaining budget, symmetric with how it already bounds the data page. Fixed-width columns still short-circuit via `static_dict_always_fits`. callgrind shows +0.09% on the dictionary-encoding hot path. Adds `test_column_writer_caps_dictionary_page_size`. Co-Authored-By: Claude Opus 4.7 --- .../src/column/writer/byte_budget_chunker.rs | 83 ++++++++++++------- parquet/src/column/writer/mod.rs | 71 ++++++++++++++++ 2 files changed, 125 insertions(+), 29 deletions(-) diff --git a/parquet/src/column/writer/byte_budget_chunker.rs b/parquet/src/column/writer/byte_budget_chunker.rs index 67d2fe25132a..d7276381e655 100644 --- a/parquet/src/column/writer/byte_budget_chunker.rs +++ b/parquet/src/column/writer/byte_budget_chunker.rs @@ -51,6 +51,12 @@ pub(crate) struct ByteBudgetChunker { /// decision short-circuit with no work for every numeric, bool, or /// narrow `FIXED_LEN_BYTE_ARRAY` column. static_always_fits: bool, + /// Configured dictionary page byte limit for the column. + dict_page_byte_limit: usize, + /// As [`Self::static_always_fits`] but for the dictionary page: `true` + /// when one `base_batch_size` mini-batch of this fixed-width type cannot + /// overshoot `dict_page_byte_limit` by more than one mini-batch's worth. + static_dict_always_fits: bool, } impl ByteBudgetChunker { @@ -61,6 +67,7 @@ impl ByteBudgetChunker { base_batch_size: usize, ) -> Self { let page_byte_limit = props.column_data_page_size_limit(descr.path()); + let dict_page_byte_limit = props.column_dictionary_page_size_limit(descr.path()); let static_bytes_per_value = match descr.physical_type() { Type::BOOLEAN => Some(1), Type::INT32 | Type::FLOAT => Some(std::mem::size_of::()), @@ -69,29 +76,36 @@ impl ByteBudgetChunker { Type::FIXED_LEN_BYTE_ARRAY => Some(descr.type_length().max(0) as usize), Type::BYTE_ARRAY => None, }; - let static_always_fits = static_bytes_per_value - .map(|b| b.saturating_mul(base_batch_size) <= page_byte_limit) - .unwrap_or(false); + let static_fits = |limit: usize| { + static_bytes_per_value + .map(|b| b.saturating_mul(base_batch_size) <= limit) + .unwrap_or(false) + }; Self { page_byte_limit, max_def_level: descr.max_def_level(), - static_always_fits, + static_always_fits: static_fits(page_byte_limit), + dict_page_byte_limit, + static_dict_always_fits: static_fits(dict_page_byte_limit), } } /// Decide how many levels at the start of a chunk belong in one - /// mini-batch. Returns `chunk_size` when the whole chunk fits in one - /// page byte budget; a smaller value triggers granular sub-batching in + /// mini-batch, so the mini-batch cannot overflow whichever page is + /// currently accumulating value bytes: the data page when plain-encoding, + /// or the *dictionary* page while dictionary-encoding. A returned value + /// smaller than `chunk_size` triggers granular sub-batching in /// `write_batch_internal`. /// - /// Returns `chunk_size` immediately (no value inspection) when: - /// - the column is a fixed-width type that statically cannot overflow - /// (`static_always_fits`); - /// - the encoder is currently dictionary-encoding — a dict-encoded data - /// page only stores small RLE indices, so a plain-encoded byte - /// estimate would shrink pages spuriously; dictionary fallback bounds - /// those pages independently; - /// - the chunk is empty. + /// While dictionary-encoding, the data page holds only small RLE indices, + /// but the dictionary page accumulates the distinct values themselves — + /// so it is the dictionary page's remaining budget that must bound the + /// mini-batch. The per-mini-batch dictionary spill check would otherwise + /// let one mini-batch of large values balloon the dictionary page. + /// + /// Returns `chunk_size` immediately (no value inspection) when the chunk + /// is empty, or when the column is a fixed-width type whose mini-batches + /// statically cannot overshoot the relevant page. /// /// `#[inline]`: this is a tiny per-chunk dispatcher; the actual byte /// inspection lives in the out-of-line `byte_budget_sub_batch_size`. @@ -105,23 +119,40 @@ impl ByteBudgetChunker { values_offset: usize, chunk_size: usize, ) -> usize { - if self.static_always_fits || encoder.has_dictionary() || chunk_size == 0 { + if chunk_size == 0 { return chunk_size; } + let budget = if encoder.has_dictionary() { + if self.static_dict_always_fits { + return chunk_size; + } + // Bound the mini-batch by the dictionary page's *remaining* + // budget (it accumulates across mini-batches until it spills). + match encoder.estimated_dict_page_size() { + Some(used) => self.dict_page_byte_limit.saturating_sub(used), + None => return chunk_size, + } + } else { + if self.static_always_fits { + return chunk_size; + } + self.page_byte_limit + }; self.byte_budget_sub_batch_size::( values, value_indices, chunk_def, values_offset, chunk_size, + budget, ) } - /// Inspect value sizes to decide how much of the chunk fits in a page. + /// Inspect value sizes to decide how many of the chunk's values fit in + /// `budget` bytes (the data page or dictionary page remaining budget). /// /// `#[inline(never)]` keeps this slow path out of the hot - /// `write_batch_internal` loop; numeric, bool and dictionary-encoded - /// columns never reach it. + /// `write_batch_internal` loop; numeric and bool columns never reach it. #[inline(never)] fn byte_budget_sub_batch_size( &self, @@ -130,6 +161,7 @@ impl ByteBudgetChunker { chunk_def: LevelDataRef<'_>, values_offset: usize, chunk_size: usize, + budget: usize, ) -> usize { // How many of this chunk's levels carry an actual value. For a // non-nullable, unrepeated column every level is a value, so @@ -147,18 +179,11 @@ impl ByteBudgetChunker { Some(idx) => { let end = (values_offset + vals_in_chunk).min(idx.len()); let start = values_offset.min(end); - E::count_values_within_byte_budget_gather( - values, - &idx[start..end], - self.page_byte_limit, - ) + E::count_values_within_byte_budget_gather(values, &idx[start..end], budget) + } + None => { + E::count_values_within_byte_budget(values, values_offset, vals_in_chunk, budget) } - None => E::count_values_within_byte_budget( - values, - values_offset, - vals_in_chunk, - self.page_byte_limit, - ), }; match fit { None => chunk_size, diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index ce41db60cb51..526801d00e0b 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -3145,6 +3145,77 @@ mod tests { } } + #[test] + fn test_column_writer_caps_dictionary_page_size() { + // A column of large *distinct* values with dictionary encoding on: + // the dictionary page accumulates the values themselves, and its + // spill check runs only once per mini-batch. Without bounding the + // dictionary-encoding mini-batch, one `write_batch_size` mini-batch + // would intern `write_batch_size * value_size` bytes into the + // dictionary page before the check fires (~16 MiB here). The chunker + // must sub-batch the dictionary-encoding phase too. + let value_size = 8 * 1024; + let dict_page_limit = 64 * 1024; + let num_rows = 2048; + + let mut file = tempfile::tempfile().unwrap(); + let mut write = TrackedWrite::new(&mut file); + let page_writer = Box::new(SerializedPageWriter::new(&mut write)); + let props = Arc::new( + WriterProperties::builder() + .set_writer_version(WriterVersion::PARQUET_1_0) + .set_dictionary_enabled(true) + .set_dictionary_page_size_limit(dict_page_limit) + .build(), + ); + + let mut data = Vec::with_capacity(num_rows); + for i in 0..num_rows { + // each value distinct, so the dictionary cannot dedup them + let mut v = vec![0u8; value_size]; + v[..8].copy_from_slice(&(i as u64).to_le_bytes()); + data.push(ByteArray::from(v)); + } + + let mut writer = get_test_column_writer::(page_writer, 0, 0, props); + writer.write_batch(&data, None, None).unwrap(); + let r = writer.close().unwrap(); + drop(write); + + let props = ReaderProperties::builder() + .set_backward_compatible_lz4(false) + .build(); + let mut page_reader = Box::new( + SerializedPageReader::new_with_properties( + Arc::new(file), + &r.metadata, + r.rows_written as usize, + None, + Arc::new(props), + ) + .unwrap(), + ); + + let mut dict_page_size = 0; + while let Some(page) = page_reader.get_next_page().unwrap() { + if page.page_type() == PageType::DICTIONARY_PAGE { + dict_page_size = dict_page_size.max(page.buffer().len()); + } + } + + assert!( + dict_page_size > 0, + "expected the column to dictionary-encode" + ); + // Bounded near the limit (~2x from the post-mini-batch check). Before + // the fix the dictionary page reached num_rows * value_size (~16 MiB, + // 256x the limit). + assert!( + dict_page_size <= 3 * dict_page_limit, + "dictionary page {dict_page_size} exceeds 3x the {dict_page_limit} limit", + ); + } + #[test] fn test_column_writer_caps_page_size_for_fixed_len_byte_array() { // Coverage for `ParquetValueType::byte_size` override on From 0590686259fc2d2da3b16b7593e3ff09c8902347 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 17 May 2026 17:38:19 +0000 Subject: [PATCH 15/17] fix(parquet): update arrow_writer_layout for tightened dictionary page Commit 6980026 ("fix(parquet): bound dictionary page size during dict encoding") made ByteBudgetChunker sub-batch the dictionary-encoding phase, so the dictionary page is bounded at its configured limit instead of overshooting by up to one mini-batch. In the `test_string` dictionary-spill case this tightens the dictionary page from 1040 to exactly the 1000-byte limit and spills one mini-batch earlier (125 rows rather than 130) -- the intended effect of that commit, but its hardcoded layout expectations were not updated, so the `arrow_writer_layout` integration test failed. Update the expected layout to match. Test-only change. Co-Authored-By: Claude Opus 4.7 --- parquet/tests/arrow_writer_layout.rs | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/parquet/tests/arrow_writer_layout.rs b/parquet/tests/arrow_writer_layout.rs index b9d997beb289..183590f9b847 100644 --- a/parquet/tests/arrow_writer_layout.rs +++ b/parquet/tests/arrow_writer_layout.rs @@ -408,16 +408,16 @@ fn test_string() { columns: vec![ColumnChunk { pages: vec![ Page { - rows: 130, + rows: 125, page_header_size: 38, - compressed_size: 138, + compressed_size: 114, encoding: Encoding::RLE_DICTIONARY, page_type: PageType::DATA_PAGE, }, Page { - rows: 1250, + rows: 1255, page_header_size: 40, - compressed_size: 10000, + compressed_size: 10040, encoding: Encoding::PLAIN, page_type: PageType::DATA_PAGE, }, @@ -429,10 +429,15 @@ fn test_string() { page_type: PageType::DATA_PAGE, }, ], + // The byte-budget chunker now sub-batches the + // dictionary-encoding phase, so the dictionary page is + // bounded at the 1000-byte limit instead of overshooting + // to 1040 — the dictionary spills one mini-batch earlier + // (125 rows rather than 130). dictionary_page: Some(Page { - rows: 130, + rows: 125, page_header_size: 38, - compressed_size: 1040, + compressed_size: 1000, encoding: Encoding::PLAIN, page_type: PageType::DICTIONARY_PAGE, }), From 7a3c97c49f13076678c5e855a4d77aec93a49c59 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 17 May 2026 17:38:29 +0000 Subject: [PATCH 16/17] perf(parquet): O(1) fast path for view-array page budgeting ByteBudgetChunker inspects per-value byte sizes to keep a data or dictionary page within its budget. count_within_budget_offsets (Utf8, Binary) has a Stage-1 O(1) span check that skips the per-value walk when the whole chunk provably fits -- but the view-array path (count_within_budget_views) had no equivalent and always did a full O(n) scan, even for tiny values that cannot come close to a page limit. View arrays have no prefix-sum offsets buffer, so an exact O(1) span is unavailable, but a conservative one is: a view value's byte length is at most max(12, largest data buffer length), so n values are guaranteed to fit when n * (max_value_len + 4) <= byte_budget. This skips the scan for the common small-value case -- what view arrays are built for, and exactly the case where there is nothing to bound. Columns with genuinely large values still fall through to the exact per-value scan and are still bounded precisely. callgrind on the string_and_binary_view writer benchmark (1M rows): instructions were +4.49% vs main before this change, +0.42% after. Co-Authored-By: Claude Opus 4.7 --- parquet/src/arrow/arrow_writer/byte_array.rs | 91 ++++++++++++++------ 1 file changed, 65 insertions(+), 26 deletions(-) diff --git a/parquet/src/arrow/arrow_writer/byte_array.rs b/parquet/src/arrow/arrow_writer/byte_array.rs index 5c34eadea343..dd6b05012478 100644 --- a/parquet/src/arrow/arrow_writer/byte_array.rs +++ b/parquet/src/arrow/arrow_writer/byte_array.rs @@ -35,7 +35,7 @@ use arrow_array::{ Array, ArrayAccessor, BinaryArray, BinaryViewArray, DictionaryArray, FixedSizeBinaryArray, GenericByteArray, LargeBinaryArray, LargeStringArray, StringArray, StringViewArray, }; -use arrow_buffer::ArrowNativeType; +use arrow_buffer::{ArrowNativeType, Buffer}; use arrow_schema::DataType; macro_rules! downcast_dict_impl { @@ -516,27 +516,28 @@ impl ColumnValueEncoder for ByteArrayEncoder { indices, byte_budget, ), - // View arrays carry the length in the low 32 bits of each - // u128 view, so we can scan lengths without dereferencing - // any data buffer. - DataType::Utf8View => count_within_budget_views( - values - .as_any() - .downcast_ref::() - .unwrap() - .views(), - indices, - byte_budget, - ), - DataType::BinaryView => count_within_budget_views( - values - .as_any() - .downcast_ref::() - .unwrap() - .views(), - indices, - byte_budget, - ), + // View arrays carry each value's length in the low 32 bits of + // its u128 view word, so lengths are scannable without touching + // any data buffer — and the common small-value case skips even + // that scan via an O(1) conservative bound. + DataType::Utf8View => { + let array = values.as_any().downcast_ref::().unwrap(); + count_within_budget_views( + array.views(), + indices, + byte_budget, + max_view_value_len(array.data_buffers()), + ) + } + DataType::BinaryView => { + let array = values.as_any().downcast_ref::().unwrap(); + count_within_budget_views( + array.views(), + indices, + byte_budget, + max_view_value_len(array.data_buffers()), + ) + } // For arrow Dictionary input, treat every chunk as fitting // and stay on the batched path. The arrow array being // Dictionary-encoded in the first place implies its values @@ -696,10 +697,48 @@ where indices.len() } -/// Fast path for view arrays (`Utf8View`, `BinaryView`). The view layout -/// stores each value's length in the low 32 bits of its u128 view word, -/// so we can scan lengths with no data-buffer dereferences. -fn count_within_budget_views(views: &[u128], indices: &[usize], byte_budget: usize) -> usize { +/// Upper bound on any single value's byte length in a view array. +/// +/// An inline view stores at most [`MAX_INLINE_VIEW_LEN`] bytes; an +/// out-of-line view's data is a contiguous slice of exactly one data +/// buffer, so it cannot be longer than the largest data buffer. This is a +/// loose bound (a value is usually far smaller than a whole buffer) but it +/// is O(number of buffers) and always sound. +fn max_view_value_len(buffers: &[Buffer]) -> usize { + /// Bytes that fit inline in a u128 view word (the rest is len + prefix). + const MAX_INLINE_VIEW_LEN: usize = 12; + buffers + .iter() + .map(|b| b.len()) + .max() + .unwrap_or(0) + .max(MAX_INLINE_VIEW_LEN) +} + +/// Two-stage budget count for view arrays (`Utf8View`, `BinaryView`). +/// +/// 1. View arrays have no prefix-sum offsets buffer, so the exact O(1) +/// span subtraction `count_within_budget_offsets` uses is unavailable. +/// But a *conservative* O(1) bound is: every value is at most +/// `max_value_len` bytes, so the whole chunk fits the budget when +/// `n * (max_value_len + 4) <= byte_budget`. This skips the per-value +/// walk for the common small-value case — what view arrays are built +/// for, and exactly the case where there is nothing to bound. +/// 2. Otherwise scan per-value lengths from the low 32 bits of each u128 +/// view word (no data-buffer dereference) and stop at the first value +/// that pushes the cumulative sum past the budget. +fn count_within_budget_views( + views: &[u128], + indices: &[usize], + byte_budget: usize, + max_value_len: usize, +) -> usize { + // Stage 1: O(1) conservative upper bound. + let per_value = max_value_len + std::mem::size_of::(); + if indices.len().saturating_mul(per_value) <= byte_budget { + return indices.len(); + } + // Stage 2: exact per-value scan. let mut cum: usize = 0; for (i, idx) in indices.iter().enumerate() { let len = (views[*idx] as u32) as usize; From 2e24fb9fd3ed7d7a0da7569194acb227995dc6a0 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 17 May 2026 22:02:24 +0000 Subject: [PATCH 17/17] fix(parquet): drop broken intra-doc link in max_view_value_len `MAX_INLINE_VIEW_LEN` is a function-local const; rustdoc cannot resolve it as an intra-doc link, so the `-Dwarnings` docs CI job failed with "unresolved link to `MAX_INLINE_VIEW_LEN`". Reference the value (12) directly in prose instead. Co-Authored-By: Claude Opus 4.7 --- parquet/src/arrow/arrow_writer/byte_array.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/src/arrow/arrow_writer/byte_array.rs b/parquet/src/arrow/arrow_writer/byte_array.rs index dd6b05012478..409b6c8b001c 100644 --- a/parquet/src/arrow/arrow_writer/byte_array.rs +++ b/parquet/src/arrow/arrow_writer/byte_array.rs @@ -699,7 +699,7 @@ where /// Upper bound on any single value's byte length in a view array. /// -/// An inline view stores at most [`MAX_INLINE_VIEW_LEN`] bytes; an +/// An inline view stores at most 12 bytes; an /// out-of-line view's data is a contiguous slice of exactly one data /// buffer, so it cannot be longer than the largest data buffer. This is a /// loose bound (a value is usually far smaller than a whole buffer) but it