diff --git a/datafusion/datasource/src/file_stream.rs b/datafusion/datasource/src/file_stream.rs index b75e66849b7a1..76279ab9ffa19 100644 --- a/datafusion/datasource/src/file_stream.rs +++ b/datafusion/datasource/src/file_stream.rs @@ -22,7 +22,6 @@ //! compliant with the `SendableRecordBatchStream` trait. use std::collections::VecDeque; -use std::mem; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; @@ -98,10 +97,6 @@ impl FileStream { self } - /// Begin opening the next file in parallel while decoding the current file in FileStream. - /// - /// Since file opening is mostly IO (and may involve a - /// bunch of sequential IO), it can be parallelized with decoding. fn start_next_file(&mut self) -> Option> { let part_file = self.file_iter.pop_front()?; Some(self.file_opener.open(part_file)) @@ -110,50 +105,24 @@ impl FileStream { fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll>> { loop { match &mut self.state { - FileStreamState::Idle => { - self.file_stream_metrics.time_opening.start(); - - match self.start_next_file().transpose() { - Ok(Some(future)) => self.state = FileStreamState::Open { future }, - Ok(None) => return Poll::Ready(None), - Err(e) => { - self.state = FileStreamState::Error; - return Poll::Ready(Some(Err(e))); - } + FileStreamState::Idle => match self.start_next_file().transpose() { + Ok(Some(future)) => { + self.file_stream_metrics.time_opening.start(); + self.state = FileStreamState::Open { future }; } - } + Ok(None) => return Poll::Ready(None), + Err(e) => { + self.state = FileStreamState::Error; + return Poll::Ready(Some(Err(e))); + } + }, FileStreamState::Open { future } => match ready!(future.poll_unpin(cx)) { Ok(reader) => { self.file_stream_metrics.files_opened.add(1); - // include time needed to start opening in `start_next_file` self.file_stream_metrics.time_opening.stop(); - let next = { - let scanning_total_metric = self - .file_stream_metrics - .time_scanning_total - .metrics - .clone(); - let _timer = scanning_total_metric.timer(); - self.start_next_file().transpose() - }; self.file_stream_metrics.time_scanning_until_data.start(); self.file_stream_metrics.time_scanning_total.start(); - - match next { - Ok(Some(next_future)) => { - self.state = FileStreamState::Scan { - reader, - next: Some(NextOpen::Pending(next_future)), - }; - } - Ok(None) => { - self.state = FileStreamState::Scan { reader, next: None }; - } - Err(e) => { - self.state = FileStreamState::Error; - return Poll::Ready(Some(Err(e))); - } - } + self.state = FileStreamState::Scan { reader }; } Err(e) => { self.file_stream_metrics.file_open_errors.add(1); @@ -170,14 +139,7 @@ impl FileStream { } } }, - FileStreamState::Scan { reader, next } => { - // We need to poll the next `FileOpenFuture` here to drive it forward - if let Some(next_open_future) = next - && let NextOpen::Pending(f) = next_open_future - && let Poll::Ready(reader) = f.as_mut().poll(cx) - { - *next_open_future = NextOpen::Ready(reader); - } + FileStreamState::Scan { reader } => { match ready!(reader.poll_next_unpin(cx)) { Some(Ok(batch)) => { self.file_stream_metrics.time_scanning_until_data.stop(); @@ -189,12 +151,9 @@ impl FileStream { batch } else { let batch = batch.slice(0, *remain); - // Count this file, the prefetched next file - // (if any), and all remaining files we will - // never open. - let done = 1 - + self.file_iter.len() - + usize::from(next.is_some()); + // Count this file and all remaining files + // we will never open. + let done = 1 + self.file_iter.len(); self.file_stream_metrics .files_processed .add(done); @@ -214,29 +173,9 @@ impl FileStream { self.file_stream_metrics.time_scanning_total.stop(); match self.on_error { - // If `OnError::Skip` we skip the file as soon as we hit the first error OnError::Skip => { self.file_stream_metrics.files_processed.add(1); - match mem::take(next) { - Some(future) => { - self.file_stream_metrics.time_opening.start(); - - match future { - NextOpen::Pending(future) => { - self.state = - FileStreamState::Open { future } - } - NextOpen::Ready(reader) => { - self.state = FileStreamState::Open { - future: Box::pin( - std::future::ready(reader), - ), - } - } - } - } - None => return Poll::Ready(None), - } + self.state = FileStreamState::Idle; } OnError::Fail => { self.state = FileStreamState::Error; @@ -248,26 +187,7 @@ impl FileStream { self.file_stream_metrics.files_processed.add(1); self.file_stream_metrics.time_scanning_until_data.stop(); self.file_stream_metrics.time_scanning_total.stop(); - - match mem::take(next) { - Some(future) => { - self.file_stream_metrics.time_opening.start(); - - match future { - NextOpen::Pending(future) => { - self.state = FileStreamState::Open { future } - } - NextOpen::Ready(reader) => { - self.state = FileStreamState::Open { - future: Box::pin(std::future::ready( - reader, - )), - } - } - } - } - None => return Poll::Ready(None), - } + self.state = FileStreamState::Idle; } } } @@ -323,14 +243,6 @@ pub trait FileOpener: Unpin + Send + Sync { fn open(&self, partitioned_file: PartitionedFile) -> Result; } -/// Represents the state of the next `FileOpenFuture`. Since we need to poll -/// this future while scanning the current file, we need to store the result if it -/// is ready -pub enum NextOpen { - Pending(FileOpenFuture), - Ready(Result>>), -} - pub enum FileStreamState { /// The idle state, no file is currently being read Idle, @@ -345,10 +257,6 @@ pub enum FileStreamState { Scan { /// The reader instance reader: BoxStream<'static, Result>, - /// A [`FileOpenFuture`] for the next file to be processed. - /// This allows the next file to be opened in parallel while the - /// current file is read. - next: Option, }, /// Encountered an error Error, @@ -388,11 +296,6 @@ pub struct FileStreamMetrics { /// /// Time between when [`FileOpener::open`] is called and when the /// [`FileStream`] receives a stream for reading. - /// - /// If there are multiple files being scanned, the stream - /// will open the next file in the background while scanning the - /// current file. This metric will only capture time spent opening - /// while not also scanning. /// [`FileStream`]: pub time_opening: StartableTime, /// Wall clock time elapsed for file scanning + first record batch of decompression + decoding