Skip to content
52 changes: 41 additions & 11 deletions parquet/src/arrow/arrow_writer/levels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,13 @@ enum LevelInfoBuilder {
Struct(Vec<LevelInfoBuilder>, LevelContext, Option<NullBuffer>),
}

/// Minimum sub-range length before the bulk-fill fast path in `write_leaf`
/// becomes profitable for null-heavy leaf columns. Below this, per-call
/// slice + popcount overhead regresses list/struct paths that call
/// `write_leaf` many times with tiny ranges. Picked via threshold sweep;
/// see <https://github.com/apache/arrow-rs/pull/9967> for the rationale.
const BULK_FILL_MIN_LEN: usize = 64;

impl LevelInfoBuilder {
/// Create a new [`LevelInfoBuilder`] for the given [`Field`] and parent [`LevelContext`]
fn try_new(field: &Field, parent_ctx: LevelContext, array: &ArrayRef) -> Result<Self> {
Expand Down Expand Up @@ -645,20 +652,43 @@ impl LevelInfoBuilder {
match &info.logical_nulls {
Some(nulls) => {
assert!(range.end <= nulls.len());
let nulls = nulls.inner();
info.def_levels.extend_from_iter(range.clone().map(|i| {
// Safety: range.end was asserted to be in bounds earlier
let valid = unsafe { nulls.value_unchecked(i) };
max_def_level - (!valid as i16)
}));
info.non_null_indices.reserve(len);
info.non_null_indices.extend(
BitIndexIterator::new(nulls.inner(), nulls.offset() + range.start, len)
.map(|i| i + range.start),
);
// Bulk-fill is profitable only on null-heavy ranges long enough to
// amortize the slice/popcount cost; see `BULK_FILL_MIN_LEN` and the
// PR description for the threshold sweep. The gate uses the cached
// buffer-wide `null_count` (O(1)) to stay cheap on the cold path.
if len >= BULK_FILL_MIN_LEN && nulls.null_count() * 2 >= nulls.len() {
let range_nulls = nulls.slice(range.start, len);
let valid_in_range = len - range_nulls.null_count();
let null_def_level = max_def_level - 1;
let buf = info
.def_levels
.materialize_mut()
.expect("definition levels present");
let base = buf.len();
buf.resize(base + len, null_def_level);
for i in range_nulls.valid_indices() {
buf[base + i] = max_def_level;
}
info.non_null_indices.reserve(valid_in_range);
info.non_null_indices
.extend(range_nulls.valid_indices().map(|i| i + range.start));
} else {
let bits = nulls.inner();
info.def_levels.extend_from_iter(range.clone().map(|i| {
// Safety: range.end was asserted to be in bounds earlier
let valid = unsafe { bits.value_unchecked(i) };
max_def_level - (!valid as i16)
}));
info.non_null_indices.reserve(len);
info.non_null_indices.extend(
BitIndexIterator::new(bits.inner(), bits.offset() + range.start, len)
.map(|i| i + range.start),
);
}
}
None => {
info.append_def_level_run(max_def_level, len);
info.non_null_indices.reserve(len);
info.non_null_indices.extend(range.clone());
}
}
Expand Down
Loading