Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 17 additions & 114 deletions datafusion/datasource/src/file_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
//! compliant with the `SendableRecordBatchStream` trait.

use std::collections::VecDeque;
use std::mem;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
Expand Down Expand Up @@ -98,10 +97,6 @@ impl FileStream {
self
}

/// Begin opening the next file in parallel while decoding the current file in FileStream.
///
/// Since file opening is mostly IO (and may involve a
/// bunch of sequential IO), it can be parallelized with decoding.
fn start_next_file(&mut self) -> Option<Result<FileOpenFuture>> {
let part_file = self.file_iter.pop_front()?;
Some(self.file_opener.open(part_file))
Expand All @@ -110,50 +105,24 @@ impl FileStream {
fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<RecordBatch>>> {
loop {
match &mut self.state {
FileStreamState::Idle => {
self.file_stream_metrics.time_opening.start();

match self.start_next_file().transpose() {
Ok(Some(future)) => self.state = FileStreamState::Open { future },
Ok(None) => return Poll::Ready(None),
Err(e) => {
self.state = FileStreamState::Error;
return Poll::Ready(Some(Err(e)));
}
FileStreamState::Idle => match self.start_next_file().transpose() {
Ok(Some(future)) => {
self.file_stream_metrics.time_opening.start();
self.state = FileStreamState::Open { future };
}
}
Ok(None) => return Poll::Ready(None),
Err(e) => {
self.state = FileStreamState::Error;
return Poll::Ready(Some(Err(e)));
}
},
FileStreamState::Open { future } => match ready!(future.poll_unpin(cx)) {
Ok(reader) => {
self.file_stream_metrics.files_opened.add(1);
// include time needed to start opening in `start_next_file`
self.file_stream_metrics.time_opening.stop();
let next = {
let scanning_total_metric = self
.file_stream_metrics
.time_scanning_total
.metrics
.clone();
let _timer = scanning_total_metric.timer();
self.start_next_file().transpose()
Copy link
Contributor Author

@Dandandan Dandandan Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this @alamb is what I was mostly talking about. It will read the footer (what we want) but AFAIK also:

  • build the pruning predicate (I think this is suboptimal, too early)
  • prune row groups
  • optionally load the page index
  • return the stream (without driving that forward)

We should be able to do this much better with the IO / CPU separation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense to me 👍🏻

};
self.file_stream_metrics.time_scanning_until_data.start();
self.file_stream_metrics.time_scanning_total.start();

match next {
Ok(Some(next_future)) => {
self.state = FileStreamState::Scan {
reader,
next: Some(NextOpen::Pending(next_future)),
};
}
Ok(None) => {
self.state = FileStreamState::Scan { reader, next: None };
}
Err(e) => {
self.state = FileStreamState::Error;
return Poll::Ready(Some(Err(e)));
}
}
self.state = FileStreamState::Scan { reader };
}
Err(e) => {
self.file_stream_metrics.file_open_errors.add(1);
Expand All @@ -170,14 +139,7 @@ impl FileStream {
}
}
},
FileStreamState::Scan { reader, next } => {
// We need to poll the next `FileOpenFuture` here to drive it forward
if let Some(next_open_future) = next
&& let NextOpen::Pending(f) = next_open_future
&& let Poll::Ready(reader) = f.as_mut().poll(cx)
{
*next_open_future = NextOpen::Ready(reader);
}
FileStreamState::Scan { reader } => {
match ready!(reader.poll_next_unpin(cx)) {
Some(Ok(batch)) => {
self.file_stream_metrics.time_scanning_until_data.stop();
Expand All @@ -189,12 +151,9 @@ impl FileStream {
batch
} else {
let batch = batch.slice(0, *remain);
// Count this file, the prefetched next file
// (if any), and all remaining files we will
// never open.
let done = 1
+ self.file_iter.len()
+ usize::from(next.is_some());
// Count this file and all remaining files
// we will never open.
let done = 1 + self.file_iter.len();
self.file_stream_metrics
.files_processed
.add(done);
Expand All @@ -214,29 +173,9 @@ impl FileStream {
self.file_stream_metrics.time_scanning_total.stop();

match self.on_error {
// If `OnError::Skip` we skip the file as soon as we hit the first error
OnError::Skip => {
self.file_stream_metrics.files_processed.add(1);
match mem::take(next) {
Some(future) => {
self.file_stream_metrics.time_opening.start();

match future {
NextOpen::Pending(future) => {
self.state =
FileStreamState::Open { future }
}
NextOpen::Ready(reader) => {
self.state = FileStreamState::Open {
future: Box::pin(
std::future::ready(reader),
),
}
}
}
}
None => return Poll::Ready(None),
}
self.state = FileStreamState::Idle;
}
OnError::Fail => {
self.state = FileStreamState::Error;
Expand All @@ -248,26 +187,7 @@ impl FileStream {
self.file_stream_metrics.files_processed.add(1);
self.file_stream_metrics.time_scanning_until_data.stop();
self.file_stream_metrics.time_scanning_total.stop();

match mem::take(next) {
Some(future) => {
self.file_stream_metrics.time_opening.start();

match future {
NextOpen::Pending(future) => {
self.state = FileStreamState::Open { future }
}
NextOpen::Ready(reader) => {
self.state = FileStreamState::Open {
future: Box::pin(std::future::ready(
reader,
)),
}
}
}
}
None => return Poll::Ready(None),
}
self.state = FileStreamState::Idle;
}
}
}
Expand Down Expand Up @@ -323,14 +243,6 @@ pub trait FileOpener: Unpin + Send + Sync {
fn open(&self, partitioned_file: PartitionedFile) -> Result<FileOpenFuture>;
}

/// Represents the state of the next `FileOpenFuture`. Since we need to poll
/// this future while scanning the current file, we need to store the result if it
/// is ready
pub enum NextOpen {
Pending(FileOpenFuture),
Ready(Result<BoxStream<'static, Result<RecordBatch>>>),
}

pub enum FileStreamState {
/// The idle state, no file is currently being read
Idle,
Expand All @@ -345,10 +257,6 @@ pub enum FileStreamState {
Scan {
/// The reader instance
reader: BoxStream<'static, Result<RecordBatch>>,
/// A [`FileOpenFuture`] for the next file to be processed.
/// This allows the next file to be opened in parallel while the
/// current file is read.
next: Option<NextOpen>,
},
/// Encountered an error
Error,
Expand Down Expand Up @@ -388,11 +296,6 @@ pub struct FileStreamMetrics {
///
/// Time between when [`FileOpener::open`] is called and when the
/// [`FileStream`] receives a stream for reading.
///
/// If there are multiple files being scanned, the stream
/// will open the next file in the background while scanning the
/// current file. This metric will only capture time spent opening
/// while not also scanning.
/// [`FileStream`]: <https://github.com/apache/datafusion/blob/main/datafusion/datasource/src/file_stream.rs>
pub time_opening: StartableTime,
/// Wall clock time elapsed for file scanning + first record batch of decompression + decoding
Expand Down
Loading