diff --git a/vortex-file/src/read/driver.rs b/vortex-file/src/read/driver.rs index d8fd638adc8..75de633abd9 100644 --- a/vortex-file/src/read/driver.rs +++ b/vortex-file/src/read/driver.rs @@ -174,7 +174,7 @@ impl State { IoRequest::new_single(request) }), Some(window) => self.next_coalesced(window).map(|request| { - match request.requests.len() { + match request.requests().len() { 1 => self.metrics.individual_requests.add(1), num_requests => { self.metrics.coalesced_requests.add(1); @@ -311,11 +311,14 @@ impl State { current_end - aligned_start, ); - Some(CoalescedRequest { - range: aligned_start..current_end, - alignment: self.coalesced_buffer_alignment, - requests, - }) + Some( + CoalescedRequest::try_new( + aligned_start..current_end, + self.coalesced_buffer_alignment, + requests, + ) + .vortex_expect("each request is correctly constructed and range.start <= range.end"), + ) } } @@ -438,8 +441,8 @@ mod tests { match outputs[0].inner() { IoRequestInner::Coalesced(coalesced) => { - assert_eq!(coalesced.range, 0..30); - assert_eq!(coalesced.requests.len(), 3); + assert_eq!(*coalesced.range(), 0..30); + assert_eq!(coalesced.requests().len(), 3); } _ => panic!("Expected coalesced request"), } @@ -467,7 +470,7 @@ mod tests { .await; assert_eq!(outputs.len(), 1); match outputs[0].inner() { - IoRequestInner::Coalesced(c) => assert_eq!(c.requests.len(), 2), + IoRequestInner::Coalesced(c) => assert_eq!(c.requests().len(), 2), _ => panic!("Expected coalesced"), } } @@ -512,10 +515,10 @@ mod tests { assert_eq!(outputs.len(), 1); match outputs[0].inner() { IoRequestInner::Coalesced(coalesced) => { - assert_eq!(coalesced.range.start, 4); - assert_eq!(coalesced.alignment, Alignment::new(4)); - for req in &coalesced.requests { - let rel = req.offset - coalesced.range.start; + assert_eq!(coalesced.range().start, 4); + assert_eq!(coalesced.alignment(), Alignment::new(4)); + for req in coalesced.requests() { + let rel = req.offset - coalesced.range().start; assert_eq!(rel % *req.alignment as u64, 0); } } @@ -657,12 +660,12 @@ mod tests { match outputs[0].inner() { IoRequestInner::Coalesced(coalesced) => { - assert_eq!(coalesced.range, 0..110); - assert_eq!(coalesced.requests.len(), 3); + assert_eq!(*coalesced.range(), 0..110); + assert_eq!(coalesced.requests().len(), 3); // Should be sorted by offset - assert_eq!(coalesced.requests[0].offset, 0); - assert_eq!(coalesced.requests[1].offset, 50); - assert_eq!(coalesced.requests[2].offset, 100); + assert_eq!(coalesced.requests()[0].offset, 0); + assert_eq!(coalesced.requests()[1].offset, 50); + assert_eq!(coalesced.requests()[2].offset, 100); } _ => panic!("Expected coalesced request"), } diff --git a/vortex-file/src/read/request.rs b/vortex-file/src/read/request.rs index 7caaa08d3d8..233accf47c7 100644 --- a/vortex-file/src/read/request.rs +++ b/vortex-file/src/read/request.rs @@ -12,8 +12,10 @@ use vortex_buffer::Alignment; use vortex_error::VortexError; use vortex_error::VortexExpect; use vortex_error::VortexResult; +use vortex_error::vortex_ensure; /// An I/O request, either a single read or a coalesced set of reads. +#[derive(Debug)] pub(crate) struct IoRequest(IoRequestInner); impl IoRequest { @@ -37,8 +39,10 @@ impl IoRequest { pub fn len(&self) -> usize { match &self.0 { IoRequestInner::Single(r) => r.length, - IoRequestInner::Coalesced(r) => usize::try_from(r.range.end - r.range.start) - .vortex_expect("range too big for usize"), + IoRequestInner::Coalesced(r) => { + usize::try_from(r.range.end.saturating_sub(r.range.start)) + .vortex_expect("range too big for usize") + } } } @@ -78,6 +82,7 @@ impl IoRequest { } } +#[derive(Debug)] pub(crate) enum IoRequestInner { Single(ReadRequest), Coalesced(CoalescedRequest), @@ -115,9 +120,9 @@ impl ReadRequest { /// A set of I/O requests that have been coalesced into a single larger request. pub(crate) struct CoalescedRequest { - pub(crate) range: Range, - pub(crate) alignment: Alignment, // Global max segment alignment used for the coalesced range. - pub(crate) requests: Vec, // TODO(ngates): we could have enum of Single/Many to avoid Vec. + range: Range, + alignment: Alignment, // Global max segment alignment used for the coalesced range. + requests: Vec, // TODO(ngates): we could have enum of Single/Many to avoid Vec. } impl Debug for CoalescedRequest { @@ -132,6 +137,58 @@ impl Debug for CoalescedRequest { } impl CoalescedRequest { + pub fn try_new( + range: Range, + alignment: Alignment, + requests: Vec, + ) -> VortexResult { + vortex_ensure!( + range.start <= range.end, + "CoalescedRequest: range.start, {}, must be less than or equal to range.end, {}.", + range.start, + range.end, + ); + for req in requests.iter() { + vortex_ensure!( + req.offset >= range.start, + "CoalescedRequest: sub-request for length {} at file offset {} precedes coalesced range: {}..{}. {:?}", + req.length, + req.offset, + range.start, + range.end, + req, + ); + vortex_ensure!( + req.offset.saturating_add(req.length as u64) <= range.end, + "CoalescedRequest: sub-request for length {} at file offset {} exceeds the coalesced range: {}..{}. {:?}", + req.length, + req.offset, + range.start, + range.end, + req, + ); + } + Ok(Self { + range, + alignment, + requests, + }) + } + + #[allow(unused)] + pub fn range(&self) -> &Range { + &self.range + } + + #[allow(unused)] + pub fn alignment(&self) -> Alignment { + self.alignment + } + + pub fn requests(&self) -> &[ReadRequest] { + &self.requests + } + pub fn resolve(self, result: VortexResult) { match result { Ok(buffer) => { diff --git a/vortex-file/src/segments/source.rs b/vortex-file/src/segments/source.rs index 8f83150c4bb..277441eb5db 100644 --- a/vortex-file/src/segments/source.rs +++ b/vortex-file/src/segments/source.rs @@ -16,6 +16,7 @@ use vortex_array::buffer::BufferHandle; use vortex_buffer::Alignment; use vortex_buffer::ByteBuffer; use vortex_error::VortexResult; +use vortex_error::vortex_bail; use vortex_error::vortex_err; use vortex_error::vortex_panic; use vortex_io::VortexReadAt; @@ -115,6 +116,18 @@ impl FileSegmentSource { let result = reader .read_at(req.offset(), req.len(), req.alignment()) .await; + let result = result.and_then(|buffer| { + if req.len() != buffer.len() { + vortex_bail!( + "FileSegmentSource: expected buffer of length {} but received {}. {:?}", + req.len(), + buffer.len(), + req + ) + } + Ok(buffer) + }); + req.resolve(result); } })