Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
e8c48c5
bench(parquet): add short and large string write benches
adriangb May 14, 2026
57952c8
fix(parquet): bound data page byte size for large variable-width values
adriangb May 14, 2026
e0d85fa
perf(parquet): O(1)-per-value count_values_within_byte_budget for vie…
adriangb May 14, 2026
81d81a3
perf(parquet): cut byte-budget cost on dict input and sparse-null lists
adriangb May 14, 2026
53d61aa
refactor(parquet): drop byte_size, extract chunk decision into ByteBu…
adriangb May 14, 2026
ac56d99
perf(parquet): inline ByteBudgetChunker entry points
adriangb May 14, 2026
63eacaf
perf(parquet): skip per-chunk vals_in_chunk computation when all valu…
adriangb May 14, 2026
9f443d1
perf(parquet): force-inline ByteBudgetChunker hot path, split cold pa…
adriangb May 14, 2026
77ebc07
perf(parquet): mark cold paths #[cold] so they move out of hot icache
adriangb May 15, 2026
4b92635
fix(parquet): correct nested-column value counting in ByteBudgetChunker
adriangb May 15, 2026
beb5fc2
refactor(parquet): drop ValueCountStrategy, count values via def levels
adriangb May 15, 2026
5806635
style(parquet): cargo fmt
adriangb May 15, 2026
97e8a45
refactor(parquet): drop #[cold] hints, keep slow paths #[inline(never)]
adriangb May 15, 2026
6980026
fix(parquet): bound dictionary page size during dict encoding
adriangb May 17, 2026
0590686
fix(parquet): update arrow_writer_layout for tightened dictionary page
adriangb May 17, 2026
7a3c97c
perf(parquet): O(1) fast path for view-array page budgeting
adriangb May 17, 2026
2e24fb9
fix(parquet): drop broken intra-doc link in max_view_value_len
adriangb May 17, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 34 additions & 1 deletion parquet/benches/arrow_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use std::sync::Arc;
use arrow::datatypes::*;
use arrow::util::bench_util::{create_f16_array, create_f32_array, create_f64_array};
use arrow::{record_batch::RecordBatch, util::data_gen::*};
use arrow_array::RecordBatchOptions;
use arrow_array::{RecordBatchOptions, StringArray};
use parquet::errors::Result;
use parquet::file::properties::{CdcOptions, WriterProperties, WriterVersion};

Expand Down Expand Up @@ -100,6 +100,29 @@ fn create_string_bench_batch(
)?)
}

/// 1 M short, fixed-width 8-byte strings. Exercises the BYTE_ARRAY hot path
/// for the case where individual values are small enough that the byte-budget
/// based sub-batch sizing in `write_batch_internal` should always resolve to
/// the full chunk (no granular splitting, no regression vs. current behavior).
fn create_short_string_bench_batch(size: usize) -> Result<RecordBatch> {
let array = Arc::new(StringArray::from_iter_values(
(0..size).map(|i| format!("{i:08}")),
)) as _;
Ok(RecordBatch::try_from_iter([("col", array)])?)
}

/// `size` rows of `value_size`-byte strings. Exercises the BYTE_ARRAY path
/// where individual values are large enough that batching the default
/// `write_batch_size` of them would blow the page byte limit by orders of
/// magnitude — the case the page-size fix targets.
fn create_large_string_bench_batch(size: usize, value_size: usize) -> Result<RecordBatch> {
let value = "x".repeat(value_size);
let array = Arc::new(StringArray::from_iter_values(
(0..size).map(|_| value.as_str()),
)) as _;
Ok(RecordBatch::try_from_iter([("col", array)])?)
}

fn create_string_and_binary_view_bench_batch(
size: usize,
null_density: f32,
Expand Down Expand Up @@ -392,6 +415,16 @@ fn create_batches() -> Vec<(&'static str, RecordBatch)> {
let batch = create_string_bench_batch(BATCH_SIZE, 0.25, 0.75).unwrap();
batches.push(("string", batch));

let batch = create_short_string_bench_batch(BATCH_SIZE).unwrap();
batches.push(("short_string_non_null", batch));

// 1024 rows × 256 KiB = 256 MiB total. With the default 1 MiB page byte
// limit, this is the case where the page-size fix kicks in: each value
// needs its own page, and `write_batch_size = 1024` would otherwise
// buffer all 256 MiB before the post-write check runs.
let batch = create_large_string_bench_batch(1024, 256 * 1024).unwrap();
batches.push(("large_string_non_null", batch));

let batch = create_string_and_binary_view_bench_batch(BATCH_SIZE, 0.25, 0.75).unwrap();
batches.push(("string_and_binary_view", batch));

Expand Down
223 changes: 222 additions & 1 deletion parquet/src/arrow/arrow_writer/byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@ use crate::geospatial::statistics::GeospatialStatistics;
use crate::schema::types::ColumnDescPtr;
use crate::util::bit_util::num_required_bits;
use crate::util::interner::{Interner, Storage};
use arrow_array::types::ByteArrayType;
use arrow_array::{
Array, ArrayAccessor, BinaryArray, BinaryViewArray, DictionaryArray, FixedSizeBinaryArray,
LargeBinaryArray, LargeStringArray, StringArray, StringViewArray,
GenericByteArray, LargeBinaryArray, LargeStringArray, StringArray, StringViewArray,
};
use arrow_buffer::{ArrowNativeType, Buffer};
use arrow_schema::DataType;

macro_rules! downcast_dict_impl {
Expand Down Expand Up @@ -475,6 +477,90 @@ impl ColumnValueEncoder for ByteArrayEncoder {
Ok(())
}

fn count_values_within_byte_budget_gather(
values: &Self::Values,
indices: &[usize],
byte_budget: usize,
) -> Option<usize> {
// `ByteArrayEncoder` only ever writes via `write_gather`, so this
// is the relevant method.
//
// Two-stage walk for the simple offset-buffer byte array types:
// 1. If indices are contiguous, compute the total payload in
// O(1) via a single subtraction on the offsets buffer.
// When the total fits the budget — the overwhelmingly
// common "small values" case — return immediately.
// 2. Otherwise, walk per-value byte sizes from the offsets
// buffer (still cheap, no slice/UTF-8 construction) and
// exit at the first value that pushes the cumulative sum
// past the budget. This bounds skewed distributions: an
// outlier value is caught wherever it lands in the chunk.
let count = match values.data_type() {
DataType::Utf8 => count_within_budget_offsets(
values.as_any().downcast_ref::<StringArray>().unwrap(),
indices,
byte_budget,
),
DataType::LargeUtf8 => count_within_budget_offsets(
values.as_any().downcast_ref::<LargeStringArray>().unwrap(),
indices,
byte_budget,
),
DataType::Binary => count_within_budget_offsets(
values.as_any().downcast_ref::<BinaryArray>().unwrap(),
indices,
byte_budget,
),
DataType::LargeBinary => count_within_budget_offsets(
values.as_any().downcast_ref::<LargeBinaryArray>().unwrap(),
indices,
byte_budget,
),
// View arrays carry each value's length in the low 32 bits of
// its u128 view word, so lengths are scannable without touching
// any data buffer — and the common small-value case skips even
// that scan via an O(1) conservative bound.
DataType::Utf8View => {
let array = values.as_any().downcast_ref::<StringViewArray>().unwrap();
count_within_budget_views(
array.views(),
indices,
byte_budget,
max_view_value_len(array.data_buffers()),
)
}
DataType::BinaryView => {
let array = values.as_any().downcast_ref::<BinaryViewArray>().unwrap();
count_within_budget_views(
array.views(),
indices,
byte_budget,
max_view_value_len(array.data_buffers()),
)
}
// For arrow Dictionary input, treat every chunk as fitting
// and stay on the batched path. The arrow array being
// Dictionary-encoded in the first place implies its values
// are small enough that dedup is worthwhile, which is the
// opposite of the "5 MiB blob per row" case this fix
// targets. Doing a per-value walk through dict keys (each
// value lookup is keys[i] → values[key] → slice) on every
// chunk costs ~+30-80% vs `main` after writer-dict spill,
// and there is essentially nothing to bound.
DataType::Dictionary(_, _) => indices.len(),
// FixedSizeBinary falls through to the per-value walk via
// `ArrayAccessor::value`.
_ => downcast_op!(
values.data_type(),
values,
count_within_budget_accessor,
indices,
byte_budget
),
};
Some(count)
}

fn num_values(&self) -> usize {
match &self.dict_encoder {
Some(encoder) => encoder.indices.len(),
Expand Down Expand Up @@ -587,6 +673,141 @@ where
}
}

/// Cumulative-scan fallback used for byte array types that don't expose
/// a single contiguous offsets buffer — view arrays, dictionary arrays,
/// fixed-size binary. Returns the largest `k` such that the first `k`
/// values picked out by `indices` encode to at most `byte_budget` bytes
/// (or `indices.len()` if they all fit, or `1` if a single value alone
/// exceeds the budget).
///
/// Free function so it can be used with `downcast_op!`.
fn count_within_budget_accessor<T>(values: T, indices: &[usize], byte_budget: usize) -> usize
where
T: ArrayAccessor + Copy,
T::Item: AsRef<[u8]>,
{
let mut cum: usize = 0;
for (i, idx) in indices.iter().enumerate() {
let value_len = values.value(*idx).as_ref().len() + std::mem::size_of::<u32>();
cum = cum.saturating_add(value_len);
if cum > byte_budget {
return i.max(1);
}
}
indices.len()
}

/// Upper bound on any single value's byte length in a view array.
///
/// An inline view stores at most 12 bytes; an
/// out-of-line view's data is a contiguous slice of exactly one data
/// buffer, so it cannot be longer than the largest data buffer. This is a
/// loose bound (a value is usually far smaller than a whole buffer) but it
/// is O(number of buffers) and always sound.
fn max_view_value_len(buffers: &[Buffer]) -> usize {
/// Bytes that fit inline in a u128 view word (the rest is len + prefix).
const MAX_INLINE_VIEW_LEN: usize = 12;
buffers
.iter()
.map(|b| b.len())
.max()
.unwrap_or(0)
.max(MAX_INLINE_VIEW_LEN)
}

/// Two-stage budget count for view arrays (`Utf8View`, `BinaryView`).
///
/// 1. View arrays have no prefix-sum offsets buffer, so the exact O(1)
/// span subtraction `count_within_budget_offsets` uses is unavailable.
/// But a *conservative* O(1) bound is: every value is at most
/// `max_value_len` bytes, so the whole chunk fits the budget when
/// `n * (max_value_len + 4) <= byte_budget`. This skips the per-value
/// walk for the common small-value case — what view arrays are built
/// for, and exactly the case where there is nothing to bound.
/// 2. Otherwise scan per-value lengths from the low 32 bits of each u128
/// view word (no data-buffer dereference) and stop at the first value
/// that pushes the cumulative sum past the budget.
fn count_within_budget_views(
views: &[u128],
indices: &[usize],
byte_budget: usize,
max_value_len: usize,
) -> usize {
// Stage 1: O(1) conservative upper bound.
let per_value = max_value_len + std::mem::size_of::<u32>();
if indices.len().saturating_mul(per_value) <= byte_budget {
return indices.len();
}
// Stage 2: exact per-value scan.
let mut cum: usize = 0;
for (i, idx) in indices.iter().enumerate() {
let len = (views[*idx] as u32) as usize;
cum = cum.saturating_add(len + std::mem::size_of::<u32>());
if cum > byte_budget {
return i.max(1);
}
}
indices.len()
}

/// Two-stage fast path for `GenericByteArray<O>`
/// (Utf8/LargeUtf8/Binary/LargeBinary).
///
/// `indices` are assumed sorted ascending — they always are here, since
/// they come from `non_null_indices`, which is built in array order.
///
/// 1. The span `offsets[last+1] - offsets[first]` is an O(1) upper
/// bound on the chunk's payload: it covers every array position in
/// `[first, last]`, a superset of `indices`. For a non-null chunk
/// `indices` *is* that whole range; for a chunk drawn from a
/// nullable column the skipped positions are nulls, whose offset
/// delta is zero, so the span still equals the exact payload.
/// Either way, if the upper bound fits the budget every value
/// fits — return `indices.len()` with no per-value work. This
/// covers the overwhelmingly common "small values" case for both
/// non-null *and* nullable columns.
/// 2. Otherwise the chunk is genuinely near the budget: walk per-index
/// lengths from the offsets buffer directly (no slice/UTF-8
/// construction) and stop at the first value that pushes the
/// cumulative sum past the budget.
fn count_within_budget_offsets<T: ByteArrayType>(
values: &GenericByteArray<T>,
indices: &[usize],
byte_budget: usize,
) -> usize {
if indices.is_empty() {
return 0;
}
let n = indices.len();
let first = indices[0];
let last = indices[n - 1];
let offsets = values.value_offsets();
let prefix_overhead = std::mem::size_of::<u32>();

// Stage 1: O(1) span upper bound. Skips Stage 2 in the common case —
// including nullable columns, whose `indices` are sparse. The earlier
// `last - first + 1 == n` contiguity gate forced every nullable chunk
// onto the O(n) Stage 2 walk even though the span check is valid for
// any sorted index set.
if last >= first {
let payload = (offsets[last + 1] - offsets[first]).as_usize();
if payload + n * prefix_overhead <= byte_budget {
return n;
}
}

// Stage 2: scan per-index lengths from the offsets buffer.
let mut cum: usize = 0;
for (i, idx) in indices.iter().enumerate() {
let len = (offsets[idx + 1] - offsets[*idx]).as_usize() + prefix_overhead;
cum = cum.saturating_add(len);
if cum > byte_budget {
return i.max(1);
}
}
n
}

/// Computes the min and max for the provided array and indices
///
/// This is a free function so it can be used with `downcast_op!`
Expand Down
Loading
Loading