Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 126 additions & 16 deletions parquet/src/arrow/array_reader/byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,17 +397,26 @@ impl ByteArrayDecoderPlain {
return Err(ParquetError::EOF("eof decoding byte array".into()));
}

// Stop early rather than overflow the 32-bit offset buffer.
// self.offset is NOT advanced, so the next read() call resumes
// from this value in the following batch.
if output.would_overflow(len as usize) {
break;
}

output.try_push(&buf[start_offset..end_offset], self.validate_utf8)?;

self.offset = end_offset;
read += 1;
}
self.max_remaining_values -= to_read;
// Use actual reads, not the requested amount, so max_remaining_values
// stays correct when we stop early.
self.max_remaining_values -= read;

if self.validate_utf8 {
output.check_valid_utf8(initial_values_length)?;
output.check_valid_utf8(initial_values_length)?
Copy link

Copilot AI Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The if self.validate_utf8 { ... } block isn’t terminated with a semicolon before Ok(read), which makes this function fail to compile. Add a trailing ; after the if block (or after the check_valid_utf8(...)? inside it).

Suggested change
output.check_valid_utf8(initial_values_length)?
output.check_valid_utf8(initial_values_length)?;

Copilot uses AI. Check for mistakes.
}
Ok(to_read)
Ok(read)
}

pub fn skip(&mut self, to_skip: usize) -> Result<usize> {
Expand Down Expand Up @@ -489,22 +498,33 @@ impl ByteArrayDecoderDeltaLength {
output.values.reserve(total_bytes);

let mut current_offset = self.data_offset;
let mut read = 0;
for length in src_lengths {
let end_offset = current_offset + *length as usize;
let value_len = *length as usize;
let end_offset = current_offset + value_len;

// Stop early rather than overflow the 32-bit offset buffer.
// length_offset / data_offset are only advanced by what was
// actually consumed, so the next read() resumes correctly.
if output.would_overflow(value_len) {
break;
}

output.try_push(
&self.data.as_ref()[current_offset..end_offset],
self.validate_utf8,
)?;
current_offset = end_offset;
read += 1;
}

self.data_offset = current_offset;
self.length_offset += to_read;
self.length_offset += read;

if self.validate_utf8 {
output.check_valid_utf8(initial_values_length)?;
output.check_valid_utf8(initial_values_length)?
Copy link

Copilot AI Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The if self.validate_utf8 { ... } block is missing a terminating semicolon before Ok(read), so this won’t compile. Add ; after the if block (or after check_valid_utf8(...)? inside the block).

Suggested change
output.check_valid_utf8(initial_values_length)?
output.check_valid_utf8(initial_values_length)?;

Copilot uses AI. Check for mistakes.
}
Ok(to_read)
Ok(read)
}

fn skip(&mut self, to_skip: usize) -> Result<usize> {
Expand Down Expand Up @@ -542,13 +562,35 @@ impl ByteArrayDecoderDelta {
let initial_values_length = output.values.len();
output.offsets.reserve(len.min(self.decoder.remaining()));

let read = self
.decoder
.read(len, |bytes| output.try_push(bytes, self.validate_utf8))?;

if self.validate_utf8 {
output.check_valid_utf8(initial_values_length)?;
let mut read = 0;
let mut overflow = false;
let validate_utf8 = self.validate_utf8;

let result = self.decoder.read(len, |bytes| {
// Stop early rather than overflow the 32-bit offset buffer.
// Returning Err leaves the DeltaByteArrayDecoder positioned at
// this value, so the next read() resumes correctly.
if output.would_overflow(bytes.len()) {
overflow = true;
return Err(general_err!("index overflow decoding byte array"));
}
output.try_push(bytes, validate_utf8)?;
read += 1;
Ok(())
});

match result {
Ok(_) => {
if self.validate_utf8 {
output.check_valid_utf8(initial_values_length)?;
}
}
// Overflow is expected – decoder is correctly positioned at the
// value that didn't fit, ready for the next batch.
Err(_) if overflow => {}
Err(e) => return Err(e),
}

Ok(read)
}

Expand Down Expand Up @@ -580,9 +622,45 @@ impl ByteArrayDecoderDictionary {
return Ok(0);
}

self.decoder.read(len, |keys| {
output.extend_from_dictionary(keys, dict.offsets.as_slice(), dict.values.as_slice())
})
let dict_offsets = dict.offsets.as_slice();
let dict_values = dict.values.as_slice();
let mut total_read = 0;

// Process one key at a time so we can stop cleanly when the output
// buffer would overflow. When the closure returns Err the
// DictIndexDecoder does NOT advance its position, so the same key will
// be retried in the next batch – no values are lost or skipped.
while total_read < len {
let mut overflow = false;
let n = self.decoder.read(1, |keys| {
let key = keys[0] as usize;
if key + 1 >= dict_offsets.len() {
return Err(general_err!(
"dictionary key beyond bounds of dictionary: 0..{}",
dict_offsets.len().saturating_sub(1)
));
}
Comment on lines +635 to +642
Copy link

Copilot AI Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let key = keys[0] as usize; can wrap negative dictionary indices to huge usize values, and key + 1 can overflow (e.g. key == -1) causing a panic in debug builds. Convert with usize::try_from(keys[0]) (returning an error on negatives) and avoid key + 1 overflow by comparing key >= dict_offsets.len().saturating_sub(1) or using checked_add(1).

Copilot uses AI. Check for mistakes.
let start = dict_offsets[key].as_usize();
let end = dict_offsets[key + 1].as_usize();
let value = &dict_values[start..end];

if output.would_overflow(value.len()) {
overflow = true;
return Err(general_err!("index overflow decoding byte array"));
}
// Dictionary values were validated at dictionary-page decode time.
output.try_push(value, false)
});
Comment on lines +629 to +653
Copy link

Copilot AI Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This dictionary path now calls self.decoder.read(1, ...) in a loop, which adds substantial per-value overhead compared to decoding indices in batches. If possible, consider decoding a larger chunk of keys at a time and stopping without losing already-consumed keys (e.g., by tracking how many keys in the provided slice were successfully processed and advancing the decoder by that amount), or otherwise document/justify the performance tradeoff since dictionary pages are often hot paths.

Copilot uses AI. Check for mistakes.

match n {
Ok(0) => break, // no more values in this page
Ok(_) => total_read += 1, // successfully pushed one key
Err(_) if overflow => break, // would overflow – stop for this batch
Err(e) => return Err(e), // real error
}
}

Ok(total_read)
}

fn skip<I: OffsetSizeTrait>(
Expand All @@ -607,6 +685,38 @@ mod tests {
use arrow_array::{Array, StringArray};
use arrow_buffer::Buffer;

/// Verify that `ByteArrayDecoderPlain` transparently splits a page across
/// multiple `read()` calls when asked for fewer values than the page holds,
/// and that no values are lost between calls.
#[test]
fn test_plain_decoder_partial_read() {
let mut page: Vec<u8> = Vec::new();
for s in [b"foo" as &[u8], b"bar", b"baz"] {
page.extend_from_slice(&(s.len() as u32).to_le_bytes());
page.extend_from_slice(s);
}

let mut decoder = ByteArrayDecoderPlain::new(bytes::Bytes::from(page), 3, Some(3), false);

let mut output = OffsetBuffer::<i32>::default();

// First read: ask for 2 out of 3 values.
let n = decoder.read(&mut output, 2).unwrap();
assert_eq!(n, 2);
assert_eq!(&output.values, b"foobar");
assert_eq!(output.offsets.as_slice(), &[0, 3, 6]);

// Second read: gets the remaining value.
let n2 = decoder.read(&mut output, 2).unwrap();
assert_eq!(n2, 1);
assert_eq!(&output.values, b"foobarbaz");
assert_eq!(output.offsets.as_slice(), &[0, 3, 6, 9]);

// No more values.
let n3 = decoder.read(&mut output, 1).unwrap();
assert_eq!(n3, 0);
}

#[test]
fn test_byte_array_decoder() {
let (pages, encoded_dictionary) =
Expand Down
37 changes: 37 additions & 0 deletions parquet/src/arrow/buffer/offset_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,19 @@ impl<I: OffsetSizeTrait> OffsetBuffer<I> {
self.len() == 0
}

/// Returns `true` if appending `data_len` bytes would overflow the offset type `I`.
///
/// Used by decoders to stop filling a batch early rather than returning an error,
/// allowing the remaining values to be emitted in a subsequent batch.
#[inline]
pub fn would_overflow(&self, data_len: usize) -> bool {
// Use checked_add to handle the case where the sum itself overflows usize.
match self.values.len().checked_add(data_len) {
Some(total) => I::from_usize(total).is_none(),
None => true, // usize addition overflowed → definitely can't fit
}
}

/// If `validate_utf8` this verifies that the first character of `data` is
/// the start of a UTF-8 codepoint
///
Expand Down Expand Up @@ -318,6 +331,30 @@ mod tests {
buffer.check_valid_utf8(12).unwrap_err();
}

#[test]
fn test_would_overflow() {
// Buffer with 5 bytes already written.
let mut buf = OffsetBuffer::<i32>::default();
buf.try_push(b"hello", false).unwrap(); // values.len() == 5

// Within i32::MAX – should not report overflow.
assert!(!buf.would_overflow(0));
assert!(!buf.would_overflow(1));
// 5 + (i32::MAX - 5) == i32::MAX, still representable.
assert!(!buf.would_overflow(i32::MAX as usize - 5));
// 5 + (i32::MAX - 4) == i32::MAX + 1, overflows i32.
assert!(buf.would_overflow(i32::MAX as usize - 4));
assert!(buf.would_overflow(i32::MAX as usize));
// usize::MAX must be caught without panicking.
assert!(buf.would_overflow(usize::MAX));

// i64 offset type: the i32 boundary is fine.
let mut buf64 = OffsetBuffer::<i64>::default();
buf64.try_push(b"hello", false).unwrap();
assert!(!buf64.would_overflow(i32::MAX as usize - 4));
assert!(!buf64.would_overflow(i32::MAX as usize));
}

#[test]
fn test_pad_nulls_empty() {
let mut buffer = OffsetBuffer::<i32>::default();
Expand Down
Loading