diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 70d3ce7cf9a9..12c3e192cdfc 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -110,11 +110,12 @@ pub const DEFAULT_BATCH_SIZE: usize = 1024; pub struct ArrowReaderBuilder { /// The "input" to read parquet data from. /// - /// Note in the case of the [`ParquetPushDecoderBuilder`], there - /// is no underlying input, which is indicated by a type parameter of [`NoInput`] + /// Note in the case of the [`ParquetPushDecoderBuilder`] there is no + /// underlying reader; the input is instead [`PushDecoderInput`], the buffer that + /// caller-pushed bytes accumulate in. /// /// [`ParquetPushDecoderBuilder`]: crate::arrow::push_decoder::ParquetPushDecoderBuilder - /// [`NoInput`]: crate::arrow::push_decoder::NoInput + /// [`PushDecoderInput`]: crate::arrow::push_decoder::PushDecoderInput pub(crate) input: T, pub(crate) metadata: Arc, diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 6c7890b75c45..3bba746e740e 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -54,7 +54,7 @@ pub use metadata::*; mod store; use crate::DecodeResult; -use crate::arrow::push_decoder::{NoInput, ParquetPushDecoder, ParquetPushDecoderBuilder}; +use crate::arrow::push_decoder::{ParquetPushDecoder, ParquetPushDecoderBuilder, PushDecoderInput}; #[cfg(feature = "object_store")] pub use store::*; @@ -600,7 +600,7 @@ impl ParquetRecordBatchStreamBuilder { let projected_schema = Arc::new(Schema::new(projected_fields)); let decoder = ParquetPushDecoderBuilder { - input: NoInput, + input: PushDecoderInput::default(), metadata, schema, fields, diff --git a/parquet/src/arrow/push_decoder/mod.rs b/parquet/src/arrow/push_decoder/mod.rs index f905d6fb2ccb..6dc5520bb975 100644 --- a/parquet/src/arrow/push_decoder/mod.rs +++ b/parquet/src/arrow/push_decoder/mod.rs @@ -27,11 +27,11 @@ use crate::arrow::arrow_reader::{ }; use crate::errors::ParquetError; use crate::file::metadata::ParquetMetaData; -use crate::util::push_buffers::PushBuffers; +pub use crate::util::push_buffers::PushBuffers; use arrow_array::RecordBatch; use bytes::Bytes; -use reader_builder::{RowBudget, RowGroupReaderBuilder}; -use remaining::RemainingRowGroups; +use reader_builder::{RowBudget, RowGroupReaderBuilder, RowGroupReaderBuilderParts}; +use remaining::{RemainingRowGroups, RemainingRowGroupsParts}; use std::ops::Range; use std::sync::Arc; @@ -109,19 +109,100 @@ use std::sync::Arc; /// } /// } /// ``` -pub type ParquetPushDecoderBuilder = ArrowReaderBuilder; - -/// Type that represents "No input" for the [`ParquetPushDecoderBuilder`] /// -/// There is no "input" for the push decoder by design (the idea is that -/// the caller pushes data to the decoder as needed).. +/// # Adaptive scans +/// +/// The scan strategy is not fixed once [`build`](Self::build) is called: it +/// can be changed *while decoding*, at row-group boundaries. +/// +/// The important API for this is [`ParquetPushDecoder::try_next_reader`]. +/// Unlike [`try_decode`](ParquetPushDecoder::try_decode), which barrels +/// straight through row-group boundaries, `try_next_reader` returns once per +/// row group — leaving a clean window *between* row groups. At any such +/// boundary, [`ParquetPushDecoder::into_builder`] hands back a +/// `ParquetPushDecoderBuilder` for the row groups not yet decoded. Change any +/// option on it (projection, row filter, row selection policy, …) and +/// [`build`](Self::build) a fresh decoder that resumes from the next row +/// group. This is how a query engine promotes or demotes filters — for +/// example turning a row filter on or off — based on the selectivity observed +/// in the row groups decoded so far. /// -/// However, [`ArrowReaderBuilder`] is shared with the sync and async readers, -/// which DO have an `input`. To support reusing the same builder code for -/// all three types of decoders, we define this `NoInput` for the push decoder to -/// denote in the type system there is no type. -#[derive(Debug, Clone, Copy)] -pub struct NoInput; +/// ``` +/// # use std::ops::Range; +/// # use std::sync::Arc; +/// # use bytes::Bytes; +/// # use arrow_array::record_batch; +/// # use parquet::DecodeResult; +/// # use parquet::arrow::ProjectionMask; +/// # use parquet::arrow::push_decoder::ParquetPushDecoderBuilder; +/// # use parquet::arrow::ArrowWriter; +/// # use parquet::file::metadata::ParquetMetaDataPushDecoder; +/// # use parquet::file::properties::WriterProperties; +/// # let file_bytes = { +/// # let batch = record_batch!( +/// # ("a", Int32, [1, 2, 3, 4, 5, 6]), +/// # ("b", Int32, [6, 5, 4, 3, 2, 1]) +/// # ).unwrap(); +/// # // Small row groups so the test file has two of them. +/// # let props = WriterProperties::builder().set_max_row_group_row_count(Some(3)).build(); +/// # let mut buffer = vec![]; +/// # let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), Some(props)).unwrap(); +/// # writer.write(&batch).unwrap(); +/// # writer.close().unwrap(); +/// # Bytes::from(buffer) +/// # }; +/// # let get_range = |r: &Range| file_bytes.slice(r.start as usize..r.end as usize); +/// # let file_length = file_bytes.len() as u64; +/// # let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_length).unwrap(); +/// # metadata_decoder.push_ranges(vec![0..file_length], vec![file_bytes.clone()]).unwrap(); +/// # let DecodeResult::Data(parquet_metadata) = metadata_decoder.try_decode().unwrap() else { panic!() }; +/// # let parquet_metadata = Arc::new(parquet_metadata); +/// let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(parquet_metadata) +/// .unwrap() +/// .build() +/// .unwrap(); +/// +/// // Drive the decoder one row group at a time with `try_next_reader`. +/// loop { +/// match decoder.try_next_reader().unwrap() { +/// DecodeResult::NeedsData(ranges) => { +/// // Fetch and hand over the bytes the decoder asked for. +/// let data = ranges.iter().map(|r| get_range(r)).collect(); +/// decoder.push_ranges(ranges, data).unwrap(); +/// } +/// DecodeResult::Data(reader) => { +/// // Decode this row group's batches. +/// for batch in reader { +/// assert!(batch.unwrap().num_rows() > 0); +/// } +/// // We are now at a row-group boundary. Based on whatever stats +/// // were gathered, optionally change strategy for the row groups +/// // still to come: drop or promote a row filter, narrow or widen +/// // the projection, etc. +/// if decoder.is_at_row_group_boundary() && decoder.row_groups_remaining() > 0 { +/// let builder = decoder.into_builder().unwrap(); +/// // e.g. column "b" turned out not to be needed. +/// let projection = ProjectionMask::columns(builder.parquet_schema(), ["a"]); +/// decoder = builder.with_projection(projection).build().unwrap(); +/// } +/// } +/// DecodeResult::Finished => break, +/// } +/// } +/// ``` +pub type ParquetPushDecoderBuilder = ArrowReaderBuilder; + +/// The `input` of a [`ParquetPushDecoderBuilder`]. +/// +/// The shared [`ArrowReaderBuilder`] is generic over an `input`. The sync and +/// async builders read from a file or async reader; the push decoder has no +/// reader, so its input is the [`PushBuffers`] that caller-pushed bytes +/// accumulate in (empty for a fresh builder). +#[derive(Debug, Default)] +pub struct PushDecoderInput { + /// Bytes pushed into the decoder, awaiting decode. + buffers: PushBuffers, +} /// Methods for building a ParquetDecoder. See the base [`ArrowReaderBuilder`] for /// more options that can be configured. @@ -156,15 +237,24 @@ impl ParquetPushDecoderBuilder { /// See [`ArrowReaderMetadata::try_new`] for how to create the metadata from /// the Parquet metadata and reader options. pub fn new_with_metadata(arrow_reader_metadata: ArrowReaderMetadata) -> Self { - Self::new_builder(NoInput, arrow_reader_metadata) + Self::new_builder(PushDecoderInput::default(), arrow_reader_metadata) + } + + /// Provide a preexisting [`PushBuffers`] for the built decoder to read + /// from, so bytes already fetched are not requested again. + pub fn with_buffers(self, buffers: PushBuffers) -> Self { + Self { + input: PushDecoderInput { buffers }, + ..self + } } /// Create a [`ParquetPushDecoder`] with the configured options pub fn build(self) -> Result { let Self { - input: NoInput, + input: PushDecoderInput { buffers }, metadata: parquet_metadata, - schema: _, + schema, fields, batch_size, row_groups, @@ -185,9 +275,9 @@ impl ParquetPushDecoderBuilder { .as_ref() .is_some_and(|filter| !filter.predicates.is_empty()); - // Prepare to build RowGroup readers - let file_len = 0; // not used in push decoder - let buffers = PushBuffers::new(file_len); + // Prepare to build RowGroup readers. `buffers` carries any bytes the + // caller already pushed (preserved across `into_builder`); a fresh + // builder supplies an empty `PushBuffers`. let row_group_reader_builder = RowGroupReaderBuilder::new( batch_size, projection, @@ -202,6 +292,7 @@ impl ParquetPushDecoderBuilder { // Initialize the decoder with the configured options let remaining_row_groups = RemainingRowGroups::new( + schema, parquet_metadata, row_groups, selection, @@ -218,6 +309,55 @@ impl ParquetPushDecoderBuilder { } } +/// Reassemble a [`ParquetPushDecoderBuilder`] from a decoder's not-yet-decoded +/// state — the inverse of [`ParquetPushDecoderBuilder::build`]. The rebuilt +/// builder pins the remaining row groups and carries the remaining row +/// selection, offset/limit budget, and buffered bytes. +fn builder_from_remaining(parts: RemainingRowGroupsParts) -> ParquetPushDecoderBuilder { + let RemainingRowGroupsParts { + metadata, + schema, + row_groups, + selection, + offset, + limit, + reader_builder, + } = parts; + let RowGroupReaderBuilderParts { + batch_size, + projection, + fields, + filter, + max_predicate_cache_size, + metrics, + row_selection_policy, + buffers, + } = reader_builder; + + ArrowReaderBuilder { + input: PushDecoderInput::default(), + metadata, + schema, + fields, + batch_size, + // The frontier tracks remaining row groups explicitly, so the rebuilt + // builder always pins them (even if the original left `row_groups` as + // `None` meaning "all"). + row_groups: Some(row_groups), + projection, + filter, + selection, + row_selection_policy, + limit, + offset, + metrics, + max_predicate_cache_size, + } + // Carry the decoder's already-fetched bytes across the rebuild so the new + // decoder does not re-request them. + .with_buffers(buffers) +} + /// A push based Parquet Decoder /// /// See [`ParquetPushDecoderBuilder`] for an example of how to build and use the decoder. @@ -377,6 +517,80 @@ impl ParquetPushDecoder { pub fn clear_all_ranges(&mut self) { self.state.clear_all_ranges(); } + + /// True iff the decoder is at a row-group boundary, where + /// [`Self::into_builder`] can reconfigure the scan. + /// + /// A boundary is "between row groups": the previous row group's + /// [`ParquetRecordBatchReader`] has been fully extracted (via + /// [`Self::try_next_reader`]) or fully drained (via [`Self::try_decode`]), + /// and the next row group has not yet been planned. While + /// [`Self::try_decode`] is iterating an active row group's reader this + /// returns `false`; with [`Self::try_next_reader`] there is a clean + /// window between two consecutive returns where this is `true`. + pub fn is_at_row_group_boundary(&self) -> bool { + self.state.is_at_row_group_boundary() + } + + /// Number of row groups left to decode after the one currently in flight. + /// Useful as a "should I bother reconfiguring the scan?" signal. + pub fn row_groups_remaining(&self) -> usize { + self.state.row_groups_remaining() + } + + /// Decompose this decoder back into a [`ParquetPushDecoderBuilder`] for the + /// row groups that have *not* yet been decoded. + /// + /// This is the API for *adaptive* scans. Drive the decoder with + /// [`Self::try_next_reader`]; at any row-group boundary, call + /// `into_builder` to recover a builder, adjust it with the usual + /// [`ParquetPushDecoderBuilder`] setters, and + /// [`build`](ParquetPushDecoderBuilder::build) a fresh decoder that resumes + /// from the next row group: + /// + /// ```no_run + /// # use parquet::arrow::push_decoder::ParquetPushDecoder; + /// # use parquet::arrow::arrow_reader::RowFilter; + /// # fn get_decoder() -> ParquetPushDecoder { unimplemented!() } + /// # fn new_filter() -> RowFilter { unimplemented!() } + /// let mut decoder = get_decoder(); + /// // ... drive `decoder.try_next_reader()` for a few row groups ... + /// if decoder.is_at_row_group_boundary() && decoder.row_groups_remaining() > 0 { + /// decoder = decoder + /// .into_builder() + /// .unwrap() + /// // any builder option can be changed here, e.g. promote a + /// // filter into a row filter based on observed selectivity + /// .with_row_filter(new_filter()) + /// .build() + /// .unwrap(); + /// } + /// ``` + /// + /// The returned builder pins the not-yet-decoded row groups (via + /// [`with_row_groups`](ArrowReaderBuilder::with_row_groups)) and carries the + /// not-yet-consumed row selection and offset/limit budget, so rows from + /// already-decoded row groups are not produced again. Every other option — + /// projection, row filter, row selection policy, batch size, metrics, + /// predicate-cache size — is left exactly as the decoder had it and can be + /// overridden before [`build`](ParquetPushDecoderBuilder::build). + /// + /// # Errors + /// + /// Returns `Err(ParquetError::General)` when the decoder is not at a + /// row-group boundary (check [`Self::is_at_row_group_boundary`] first) or + /// has already finished. The decoder is consumed either way. + /// + /// # Buffered bytes + /// + /// The decoder's buffered bytes are carried across the rebuild: bytes + /// already fetched for row groups the new configuration still reads are + /// not re-requested. Bytes the new configuration no longer needs stay + /// buffered until [`clear_all_ranges`](Self::clear_all_ranges) is called + /// or the rebuilt decoder is dropped. + pub fn into_builder(self) -> Result { + self.state.into_builder() + } } /// Internal state machine for the [`ParquetPushDecoder`] @@ -599,6 +813,59 @@ impl ParquetDecoderState { ParquetDecoderState::Finished => {} } } + + fn is_at_row_group_boundary(&self) -> bool { + match self { + ParquetDecoderState::ReadingRowGroup { + remaining_row_groups, + } => remaining_row_groups.is_at_row_group_boundary(), + // Mid-row-group: the active reader holds an `ArrayReader` and + // `ReadPlan` keyed to the *current* projection/filter; rebuilding + // would require throwing that work away. + ParquetDecoderState::DecodingRowGroup { .. } => false, + ParquetDecoderState::Finished => false, + } + } + + fn row_groups_remaining(&self) -> usize { + match self { + ParquetDecoderState::ReadingRowGroup { + remaining_row_groups, + } => remaining_row_groups.row_groups_remaining(), + ParquetDecoderState::DecodingRowGroup { + remaining_row_groups, + .. + } => remaining_row_groups.row_groups_remaining(), + ParquetDecoderState::Finished => 0, + } + } + + fn into_builder(self) -> Result { + let remaining_row_groups = match self { + ParquetDecoderState::ReadingRowGroup { + remaining_row_groups, + } => remaining_row_groups, + ParquetDecoderState::DecodingRowGroup { .. } => { + return Err(ParquetError::General( + "into_builder called while a row group is being decoded; \ + check is_at_row_group_boundary() first" + .to_string(), + )); + } + ParquetDecoderState::Finished => { + return Err(ParquetError::General( + "into_builder called on a finished decoder".to_string(), + )); + } + }; + if !remaining_row_groups.is_at_row_group_boundary() { + return Err(ParquetError::General( + "into_builder called mid-row-group; check is_at_row_group_boundary() first" + .to_string(), + )); + } + Ok(builder_from_remaining(remaining_row_groups.into_parts())) + } } #[cfg(test)] @@ -1476,6 +1743,270 @@ mod test { expect_finished(decoder.try_decode()); } + /// `into_builder` between row groups recovers a builder for the + /// not-yet-decoded row groups; rebuilding it with a new row filter + /// applies that filter to the subsequent row groups while leaving the + /// already-decoded row group's results untouched. + /// + /// See the "Adaptive scans" section of [`ParquetPushDecoderBuilder`] for + /// the high-level overview. + #[test] + fn test_into_builder_installs_filter_between_row_groups() { + let schema_descr = test_file_parquet_metadata() + .file_metadata() + .schema_descr_ptr(); + let mut decoder = prefetched_decoder(1024); + + // Reader for row group 0 — no filter. + let reader0 = expect_data(decoder.try_next_reader()); + let batches0: Vec<_> = reader0.collect::>().unwrap(); + let batch0 = concat_batches(&TEST_BATCH.schema(), &batches0).unwrap(); + assert_eq!(batch0, TEST_BATCH.slice(0, 200)); + + // We're between row groups now. Rebuild with a filter on column "a". + assert!(decoder.is_at_row_group_boundary()); + assert_eq!(decoder.row_groups_remaining(), 1); + let filter = + ArrowPredicateFn::new(ProjectionMask::columns(&schema_descr, ["a"]), |batch| { + gt(batch.column(0), &Int64Array::new_scalar(250)) + }); + let mut decoder = decoder + .into_builder() + .unwrap() + .with_row_filter(RowFilter::new(vec![Box::new(filter)])) + .build() + .unwrap(); + + // Reader for row group 1 — filter applied. The rebuilt decoder kept + // the buffered bytes (see `test_into_builder_preserves_buffered_bytes`) + // so no data needs to be re-supplied. Column "a" in RG1 has values + // 200..399; `a > 250` keeps 251..399 = 149 rows. + let reader1 = expect_data(decoder.try_next_reader()); + let batches1: Vec<_> = reader1.collect::>().unwrap(); + let batch1 = concat_batches(&TEST_BATCH.schema(), &batches1).unwrap(); + assert_eq!(batch1, TEST_BATCH.slice(251, 149)); + expect_finished(decoder.try_next_reader()); + } + + /// `into_builder` is rejected while a row group's reader is being + /// drained (`DecodingRowGroup`); the error points at + /// `is_at_row_group_boundary`. + #[test] + fn test_into_builder_rejected_mid_row_group() { + let mut decoder = prefetched_decoder(50); + + // Decode one batch to land mid-row-group, inside `DecodingRowGroup` + // with an active reader — not a boundary. + expect_data(decoder.try_decode()); + assert!(!decoder.is_at_row_group_boundary()); + + let err = decoder.into_builder().unwrap_err(); + let err_msg = format!("{err}"); + assert!( + err_msg.contains("is_at_row_group_boundary"), + "unexpected error: {err_msg}" + ); + } + + /// `into_builder` is rejected once the decoder has finished. + #[test] + fn test_into_builder_rejected_on_finished_decoder() { + let mut decoder = prefetched_decoder(1024); + expect_data(decoder.try_decode()); + expect_data(decoder.try_decode()); + expect_finished(decoder.try_decode()); + assert!(!decoder.is_at_row_group_boundary()); + + let err = decoder.into_builder().unwrap_err(); + assert!( + format!("{err}").contains("finished"), + "unexpected error: {err}" + ); + } + + /// `try_next_reader` hands the active reader off to the caller and + /// transitions the decoder back to `ReadingRowGroup` — so the caller + /// can call `into_builder` even while still holding the returned + /// reader. (The handed-off reader has no link back to the decoder's + /// projection/filter; it has its own `ArrayReader` and `ReadPlan`.) + #[test] + fn test_into_builder_allowed_while_iterating_handed_off_reader() { + let mut decoder = prefetched_decoder(1024); + + let reader0 = expect_data(decoder.try_next_reader()); + // Decoder no longer owns the reader, so it considers itself + // "between row groups". + assert!(decoder.is_at_row_group_boundary()); + // Recovering the builder consumes the decoder but leaves `reader0` + // valid: iterating it is independent of the decoder's state. + let _builder = decoder.into_builder().unwrap(); + let batches: Vec<_> = reader0.collect::>().unwrap(); + let batch0 = concat_batches(&TEST_BATCH.schema(), &batches).unwrap(); + assert_eq!(batch0, TEST_BATCH.slice(0, 200)); + } + + /// `into_builder` recovers a builder for the *remaining* row groups and + /// carries the not-yet-consumed offset/limit budget, so a rebuilt + /// decoder resumes where the original left off rather than restarting. + #[test] + fn test_into_builder_resumes_remaining_budget() { + // limit = 250 spans both 200-row row groups: all 200 rows of RG0 + // plus the first 50 rows of RG1. + let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()) + .unwrap() + .with_batch_size(1024) + .with_limit(250) + .build() + .unwrap(); + prefetch_test_file(&mut decoder); + + // RG0 contributes all 200 of its rows. + let reader0 = expect_data(decoder.try_next_reader()); + let batches0: Vec<_> = reader0.collect::>().unwrap(); + let batch0 = concat_batches(&TEST_BATCH.schema(), &batches0).unwrap(); + assert_eq!(batch0, TEST_BATCH.slice(0, 200)); + + // Rebuild without changing anything: the remaining 50-row limit and + // the not-yet-decoded RG1 must carry through (as do the buffers, so + // no data needs re-supplying). + assert!(decoder.is_at_row_group_boundary()); + let mut decoder = decoder.into_builder().unwrap().build().unwrap(); + + let reader1 = expect_data(decoder.try_next_reader()); + let batches1: Vec<_> = reader1.collect::>().unwrap(); + let batch1 = concat_batches(&TEST_BATCH.schema(), &batches1).unwrap(); + // Only the first 50 rows of RG1 (200..249) — the rest of the limit. + assert_eq!(batch1, TEST_BATCH.slice(200, 50)); + expect_finished(decoder.try_next_reader()); + } + + /// `into_builder` carries the decoder's buffered bytes across the + /// rebuild: the rebuilt decoder keeps them and does not re-request data + /// it already holds. + #[test] + fn test_into_builder_preserves_buffered_bytes() { + let mut decoder = prefetched_decoder(1024); + assert_eq!(decoder.buffered_bytes(), test_file_len()); + + // Drain RG0. + let reader0 = expect_data(decoder.try_next_reader()); + let _: Vec<_> = reader0.collect::>().unwrap(); + // RG1's bytes are still staged inside the decoder. + let buffered = decoder.buffered_bytes(); + assert!(buffered > 0); + + // Rebuilding via into_builder keeps the staged bytes. + let mut decoder = decoder.into_builder().unwrap().build().unwrap(); + assert_eq!(decoder.buffered_bytes(), buffered); + + // RG1's bytes are already buffered, so it decodes without a + // `NeedsData` round-trip. + let reader1 = expect_data(decoder.try_next_reader()); + let batches1: Vec<_> = reader1.collect::>().unwrap(); + let batch1 = concat_batches(&TEST_BATCH.schema(), &batches1).unwrap(); + assert_eq!(batch1, TEST_BATCH.slice(200, 200)); + expect_finished(decoder.try_next_reader()); + } + + /// Drive the decoder incrementally. Start with a narrow projection, + /// drain RG0, then `into_builder` and widen the projection to all three + /// columns. The rebuilt decoder's `NeedsData` for RG1 must request + /// bytes for *all three* columns, not just the originally-projected + /// "a". The expected ranges are hardcoded because `TEST_BATCH` and the + /// writer settings are static; this pins the layout cleanly without a + /// parallel reference decoder. + #[test] + fn test_into_builder_expand_projection_requests_new_bytes() { + let metadata = test_file_parquet_metadata(); + let schema_descr = metadata.file_metadata().schema_descr_ptr(); + + let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(metadata) + .unwrap() + .with_batch_size(1024) + .with_projection(ProjectionMask::columns(&schema_descr, ["a"])) + .build() + .unwrap(); + + // RG0: incrementally satisfy the narrow request — a single + // contiguous range for the "a"-only projection. + let ranges_rg0 = expect_needs_data(decoder.try_next_reader()); + assert_eq!(ranges_rg0, vec![4..1860]); + push_ranges_to_decoder(&mut decoder, ranges_rg0); + + let reader0 = expect_data(decoder.try_next_reader()); + let batches0: Vec<_> = reader0.collect::>().unwrap(); + let batch0 = concat_batches(&batches0[0].schema(), &batches0).unwrap(); + assert_eq!(batch0, TEST_BATCH.slice(0, 200).project(&[0]).unwrap()); + + // Widen the projection at the boundary. + assert!(decoder.is_at_row_group_boundary()); + let mut decoder = decoder + .into_builder() + .unwrap() + .with_projection(ProjectionMask::columns(&schema_descr, ["a", "b", "c"])) + .build() + .unwrap(); + + // RG1 now requests "a", "b", and "c" column chunks: ~1.8KiB each + // for "a" and "b", ~7.5KiB for the StringView column "c". + let ranges_rg1 = expect_needs_data(decoder.try_next_reader()); + assert_eq!(ranges_rg1, vec![11062..12918, 12918..14774, 14774..22230]); + push_ranges_to_decoder(&mut decoder, ranges_rg1); + + let reader1 = expect_data(decoder.try_next_reader()); + let batches1: Vec<_> = reader1.collect::>().unwrap(); + let batch1 = concat_batches(&TEST_BATCH.schema(), &batches1).unwrap(); + assert_eq!(batch1, TEST_BATCH.slice(200, 200)); + expect_finished(decoder.try_next_reader()); + } + + /// Mirror of [`test_into_builder_expand_projection_requests_new_bytes`]: + /// start with the full projection, drain RG0, then `into_builder` and + /// narrow the projection to just column "a". RG1's `NeedsData` must + /// request only the single "a" column-chunk range, not the three a + /// wide projection would. + #[test] + fn test_into_builder_narrow_projection_requests_fewer_bytes() { + let metadata = test_file_parquet_metadata(); + let schema_descr = metadata.file_metadata().schema_descr_ptr(); + + let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(metadata) + .unwrap() + .with_batch_size(1024) + .build() + .unwrap(); + + // RG0 with the default (full) projection — three column ranges. + let ranges_rg0 = expect_needs_data(decoder.try_next_reader()); + assert_eq!(ranges_rg0, vec![4..1860, 1860..3716, 3716..11062]); + push_ranges_to_decoder(&mut decoder, ranges_rg0); + + let reader0 = expect_data(decoder.try_next_reader()); + let batches0: Vec<_> = reader0.collect::>().unwrap(); + let batch0 = concat_batches(&TEST_BATCH.schema(), &batches0).unwrap(); + assert_eq!(batch0, TEST_BATCH.slice(0, 200)); + + // Narrow the projection at the boundary. + assert!(decoder.is_at_row_group_boundary()); + let mut decoder = decoder + .into_builder() + .unwrap() + .with_projection(ProjectionMask::columns(&schema_descr, ["a"])) + .build() + .unwrap(); + + // RG1 now requests column "a" only — a single 1856-byte range. + let ranges_rg1 = expect_needs_data(decoder.try_next_reader()); + assert_eq!(ranges_rg1, vec![11062..12918]); + push_ranges_to_decoder(&mut decoder, ranges_rg1); + + let reader1 = expect_data(decoder.try_next_reader()); + let batches1: Vec<_> = reader1.collect::>().unwrap(); + let batch1 = concat_batches(&batches1[0].schema(), &batches1).unwrap(); + assert_eq!(batch1, TEST_BATCH.slice(200, 200).project(&[0]).unwrap()); + expect_finished(decoder.try_next_reader()); + } + /// Returns a batch with 400 rows, with 3 columns: "a", "b", "c" /// /// Note c is a different types (so the data page sizes will be different) @@ -1582,6 +2113,25 @@ mod test { metadata_decoder.push_ranges(ranges, data).unwrap(); } + /// Push the entire test file into `decoder`. + fn prefetch_test_file(decoder: &mut ParquetPushDecoder) { + decoder + .push_range(test_file_range(), TEST_FILE_DATA.clone()) + .unwrap(); + } + + /// Build a decoder over the test file with the given batch size and + /// prefetch the whole file into it. + fn prefetched_decoder(batch_size: usize) -> ParquetPushDecoder { + let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()) + .unwrap() + .with_batch_size(batch_size) + .build() + .unwrap(); + prefetch_test_file(&mut decoder); + decoder + } + fn push_ranges_to_decoder(decoder: &mut ParquetPushDecoder, ranges: Vec>) { let data = ranges .iter() diff --git a/parquet/src/arrow/push_decoder/reader_builder/mod.rs b/parquet/src/arrow/push_decoder/reader_builder/mod.rs index 0452cea436a3..dacf1a2caad9 100644 --- a/parquet/src/arrow/push_decoder/reader_builder/mod.rs +++ b/parquet/src/arrow/push_decoder/reader_builder/mod.rs @@ -104,6 +104,16 @@ impl RowBudget { matches!(self.limit, Some(0)) } + /// The offset still to be skipped before the next readable row group. + pub(crate) fn offset(self) -> Option { + self.offset + } + + /// The number of output rows still permitted across the remaining row groups. + pub(crate) fn limit(self) -> Option { + self.limit + } + /// Returns how many selected rows remain after applying this budget. pub(crate) fn rows_after(self, rows_before_budget: usize) -> usize { let rows_after_offset = rows_before_budget.saturating_sub(self.offset.unwrap_or(0)); @@ -263,6 +273,25 @@ pub(crate) struct RowGroupReaderBuilder { buffers: PushBuffers, } +/// The parts of a [`RowGroupReaderBuilder`] needed to rebuild it, recovered by +/// [`RowGroupReaderBuilder::into_parts`]. +/// +/// `metadata` is not included: it is a whole-file property carried alongside +/// `schema` in `RemainingRowGroupsParts`. +#[derive(Debug)] +pub(crate) struct RowGroupReaderBuilderParts { + pub batch_size: usize, + pub projection: ProjectionMask, + pub fields: Option>, + pub filter: Option, + pub max_predicate_cache_size: usize, + pub metrics: ArrowReaderMetrics, + pub row_selection_policy: RowSelectionPolicy, + /// Bytes already pushed into the decoder, carried across a rebuild so they + /// are not re-requested. + pub buffers: PushBuffers, +} + impl RowGroupReaderBuilder { /// Create a new RowGroupReaderBuilder #[expect(clippy::too_many_arguments)] @@ -291,11 +320,49 @@ impl RowGroupReaderBuilder { } } + /// Decompose into [`RowGroupReaderBuilderParts`] so the builder can be + /// reconstructed. The runtime decode `state` is discarded; `metadata` is + /// recovered from the frontier instead (see `RemainingRowGroups::into_parts`). + pub(crate) fn into_parts(self) -> RowGroupReaderBuilderParts { + // If a new field is added to `RowGroupReaderBuilder`, it must be added here and in `RowGroupReaderBuilderParts`, + // or at least evaluate how it should be handled in the decomposition and reconstruction of the builder. + let Self { + batch_size, + projection, + metadata: _, + fields, + filter, + max_predicate_cache_size, + metrics, + row_selection_policy, + state: _, + buffers, + } = self; + RowGroupReaderBuilderParts { + batch_size, + projection, + fields, + filter, + max_predicate_cache_size, + metrics, + row_selection_policy, + buffers, + } + } + /// Push new data buffers that can be used to satisfy pending requests pub fn push_data(&mut self, ranges: Vec>, buffers: Vec) { self.buffers.push_ranges(ranges, buffers); } + /// True iff the inner state is `Finished`. This is the only state in + /// which it is safe to decompose the builder via [`Self::into_parts`], + /// because no `RowGroupInfo`, `FilterInfo`, or in-flight `DataRequest` + /// is referencing the row-group-scoped decode state. + pub(crate) fn is_finished(&self) -> bool { + matches!(self.state, Some(RowGroupDecoderState::Finished)) + } + /// Returns the total number of buffered bytes available pub fn buffered_bytes(&self) -> u64 { self.buffers.buffered_bytes() diff --git a/parquet/src/arrow/push_decoder/remaining.rs b/parquet/src/arrow/push_decoder/remaining.rs index 33e13abf9c12..d1070d2aa69f 100644 --- a/parquet/src/arrow/push_decoder/remaining.rs +++ b/parquet/src/arrow/push_decoder/remaining.rs @@ -18,10 +18,11 @@ use crate::DecodeResult; use crate::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection}; use crate::arrow::push_decoder::reader_builder::{ - RowBudget, RowGroupBuildResult, RowGroupReaderBuilder, + RowBudget, RowGroupBuildResult, RowGroupReaderBuilder, RowGroupReaderBuilderParts, }; use crate::errors::ParquetError; use crate::file::metadata::ParquetMetaData; +use arrow_schema::SchemaRef; use bytes::Bytes; use std::collections::VecDeque; use std::ops::Range; @@ -185,6 +186,11 @@ impl RowGroupFrontier { /// work item. [`RowGroupReaderBuilder`] owns decoding for the active row group. #[derive(Debug)] pub(crate) struct RemainingRowGroups { + /// The arrow schema of the decoded output. Carried only so + /// [`Self::into_parts`] can hand it to a rebuilt builder; unused while + /// decoding. + schema: SchemaRef, + /// Cross-row-group scan state for queued work. frontier: RowGroupFrontier, @@ -192,8 +198,30 @@ pub(crate) struct RemainingRowGroups { row_group_reader_builder: RowGroupReaderBuilder, } +/// The state recovered from a [`RemainingRowGroups`] by +/// [`RemainingRowGroups::into_parts`], describing the row groups *not* yet +/// decoded so a builder reconstructed from it resumes where the decoder left off. +#[derive(Debug)] +pub(crate) struct RemainingRowGroupsParts { + /// The arrow schema of the decoded output. + pub schema: SchemaRef, + /// The Parquet file metadata. + pub metadata: Arc, + /// Row groups not yet handed to the reader builder. + pub row_groups: Vec, + /// The not-yet-consumed slice of the global row selection. + pub selection: Option, + /// Offset still to be skipped before the next readable row group. + pub offset: Option, + /// Output rows still permitted across the remaining row groups. + pub limit: Option, + /// Builder-configurable parts of the inner row-group reader builder. + pub reader_builder: RowGroupReaderBuilderParts, +} + impl RemainingRowGroups { pub fn new( + schema: SchemaRef, parquet_metadata: Arc, row_groups: Vec, selection: Option, @@ -202,6 +230,7 @@ impl RemainingRowGroups { row_group_reader_builder: RowGroupReaderBuilder, ) -> Self { Self { + schema, frontier: RowGroupFrontier::new( parquet_metadata, row_groups, @@ -213,6 +242,36 @@ impl RemainingRowGroups { } } + /// Decompose into [`RemainingRowGroupsParts`]. + /// + /// Must be called at a row-group boundary (see + /// [`Self::is_at_row_group_boundary`]). The inner reader builder's runtime + /// decode state is discarded; its buffered bytes are carried through. + pub(crate) fn into_parts(self) -> RemainingRowGroupsParts { + let Self { + schema, + frontier, + row_group_reader_builder, + } = self; + // `has_predicates` is recomputed by `build()` from the filter. + let RowGroupFrontier { + parquet_metadata, + row_groups, + selection, + budget, + has_predicates: _, + } = frontier; + RemainingRowGroupsParts { + schema, + metadata: parquet_metadata, + row_groups: Vec::from(row_groups), + selection, + offset: budget.offset(), + limit: budget.limit(), + reader_builder: row_group_reader_builder.into_parts(), + } + } + /// Push new data buffers that can be used to satisfy pending requests pub fn push_data(&mut self, ranges: Vec>, buffers: Vec) { self.row_group_reader_builder.push_data(ranges, buffers); @@ -228,6 +287,18 @@ impl RemainingRowGroups { self.row_group_reader_builder.clear_all_ranges(); } + /// True iff the inner row-group reader is between row groups (state + /// `Finished`). Forward to [`RowGroupReaderBuilder::is_finished`]. + pub fn is_at_row_group_boundary(&self) -> bool { + self.row_group_reader_builder.is_finished() + } + + /// Number of row groups remaining (not including the one currently + /// being decoded). + pub fn row_groups_remaining(&self) -> usize { + self.frontier.row_groups.len() + } + /// returns [`ParquetRecordBatchReader`] suitable for reading the next /// group of rows from the Parquet data, or the list of data ranges still /// needed to proceed diff --git a/parquet/src/util/push_buffers.rs b/parquet/src/util/push_buffers.rs index b8225ab3a1db..be245b3359ab 100644 --- a/parquet/src/util/push_buffers.rs +++ b/parquet/src/util/push_buffers.rs @@ -21,9 +21,12 @@ use bytes::Bytes; use std::fmt::Display; use std::ops::Range; -/// Holds multiple buffers of data +/// Holds multiple non-contiguous, caller-provided buffers of file data. /// -/// This is the in-memory buffer for the ParquetDecoder and ParquetMetadataDecoders +/// This is the in-memory buffer used by the push-based Parquet decoders +/// (`ParquetPushDecoder` and `ParquetMetaDataPushDecoder`). It can be +/// constructed up front and handed to a builder so the decoder reuses bytes +/// that have already been fetched. /// /// Features: /// 1. Zero copy @@ -38,8 +41,8 @@ use std::ops::Range; /// /// Thus, the implementation defers to the caller to coalesce subsequent requests /// if desired. -#[derive(Debug, Clone)] -pub(crate) struct PushBuffers { +#[derive(Debug, Clone, Default)] +pub struct PushBuffers { /// the virtual "offset" of this buffers (added to any request) offset: u64, /// The total length of the file being decoded @@ -75,7 +78,11 @@ impl Display for PushBuffers { } impl PushBuffers { - /// Create a new Buffers instance with the given file length + /// Create a new, empty `PushBuffers` for a file of the given length. + /// + /// Use [`PushBuffers::default`] when the file length is unknown or + /// irrelevant (e.g. the push decoder, which tracks ranges by absolute + /// offset and never consults `file_len`). pub fn new(file_len: u64) -> Self { Self { offset: 0, @@ -109,7 +116,7 @@ impl PushBuffers { } /// Returns true if the Buffers contains data for the given range - pub fn has_range(&self, range: &Range) -> bool { + pub(crate) fn has_range(&self, range: &Range) -> bool { self.ranges .iter() .any(|r| r.start <= range.start && r.end >= range.end) @@ -120,25 +127,25 @@ impl PushBuffers { } /// return the file length of the Parquet file being read - pub fn file_len(&self) -> u64 { + pub(crate) fn file_len(&self) -> u64 { self.file_len } /// Specify a new offset - pub fn with_offset(mut self, offset: u64) -> Self { + fn with_offset(mut self, offset: u64) -> Self { self.offset = offset; self } /// Return the total of all buffered ranges #[cfg(feature = "arrow")] - pub fn buffered_bytes(&self) -> u64 { + pub(crate) fn buffered_bytes(&self) -> u64 { self.ranges.iter().map(|r| r.end - r.start).sum() } /// Clear any range and corresponding buffer that is exactly in the ranges_to_clear #[cfg(feature = "arrow")] - pub fn clear_ranges(&mut self, ranges_to_clear: &[Range]) { + pub(crate) fn clear_ranges(&mut self, ranges_to_clear: &[Range]) { let mut new_ranges = Vec::new(); let mut new_buffers = Vec::new(); @@ -156,7 +163,7 @@ impl PushBuffers { } /// Clear all buffered ranges and their corresponding data - pub fn clear_all_ranges(&mut self) { + pub(crate) fn clear_all_ranges(&mut self) { self.ranges.clear(); self.buffers.clear(); }