diff --git a/vortex-array/src/patches.rs b/vortex-array/src/patches.rs index fa532546b50..fc736afa6dc 100644 --- a/vortex-array/src/patches.rs +++ b/vortex-array/src/patches.rs @@ -34,6 +34,8 @@ use crate::dtype::Nullability; use crate::dtype::Nullability::NonNullable; use crate::dtype::PType; use crate::dtype::UnsignedPType; +use crate::expr::stats::Precision; +use crate::expr::stats::Stat; use crate::match_each_integer_ptype; use crate::match_each_unsigned_integer_ptype; use crate::scalar::PValue; @@ -175,16 +177,27 @@ impl Patches { // Perform validation of components when they are host-resident. // This is not possible to do eagerly when the data is on GPU memory. if indices.is_host() && values.is_host() { - let max = usize::try_from(&indices.execute_scalar( + let last = indices.execute_scalar( indices.len() - 1, &mut LEGACY_SESSION.create_execution_ctx(), - )?) - .map_err(|_| vortex_err!("indices must be a number"))?; + )?; + let max = + usize::try_from(&last).map_err(|_| vortex_err!("indices must be a number"))?; vortex_ensure!( max - offset < array_len, "Patch indices {max:?}, offset {offset} are longer than the array length {array_len}" ); + // Seed Min/Max stats on indices so search_index can short-circuit + // out-of-range lookups, and so pruning/predicate-pushdown consumers + // see populated bounds. + let first = indices.execute_scalar(0, &mut LEGACY_SESSION.create_execution_ctx())?; + if let (Some(min_value), Some(max_value)) = (first.value(), last.value()) { + let stats = indices.statistics(); + stats.set(Stat::Min, Precision::Exact(min_value.clone())); + stats.set(Stat::Max, Precision::Exact(max_value.clone())); + } + #[cfg(debug_assertions)] { use crate::VortexSessionExecute; @@ -394,6 +407,10 @@ impl Patches { /// [`SearchResult::Found(patch_idx)`]: SearchResult::Found /// [`SearchResult::NotFound(insertion_point)`]: SearchResult::NotFound pub fn search_index(&self, index: usize) -> VortexResult { + if let Some(result) = self.search_index_out_of_range(index) { + return Ok(result); + } + if self.chunk_offsets.is_some() { return self.search_index_chunked(index); } @@ -401,6 +418,28 @@ impl Patches { Self::search_index_binary_search(&self.indices, index + self.offset) } + #[inline] + fn search_index_out_of_range(&self, index: usize) -> Option { + let (min, max) = self.cached_bounds()?; + if index < min { + Some(SearchResult::NotFound(0)) + } else if index > max { + Some(SearchResult::NotFound(self.indices.len())) + } else { + None + } + } + + #[inline] + fn cached_bounds(&self) -> Option<(usize, usize)> { + let offset = self.offset; + self.indices.statistics().with_typed_stats_set(|typed| { + let raw_min = usize::try_from(typed.get_value(Stat::Min)?.as_exact()?).ok()?; + let raw_max = usize::try_from(typed.get_value(Stat::Max)?.as_exact()?).ok()?; + Some((raw_min.checked_sub(offset)?, raw_max.checked_sub(offset)?)) + }) + } + /// Binary searches for `needle` in the indices array. /// /// # Returns @@ -551,29 +590,34 @@ impl Patches { }) } - /// Returns the minimum patch index + /// Returns the minimum patch index. pub fn min_index(&self) -> VortexResult { - let first = self + if let Some((min, _)) = self.cached_bounds() { + return Ok(min); + } + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let raw: usize = self .indices - .execute_scalar(0, &mut LEGACY_SESSION.create_execution_ctx())? - .as_primitive() - .as_::() - .ok_or_else(|| vortex_err!("index does not fit in usize"))?; - Ok(first - self.offset) + .statistics() + .compute_min(&mut ctx) + .ok_or_else(|| vortex_err!("min index unavailable"))?; + raw.checked_sub(self.offset) + .ok_or_else(|| vortex_err!("offset {} exceeds min index {}", self.offset, raw)) } - /// Returns the maximum patch index + /// Returns the maximum patch index. pub fn max_index(&self) -> VortexResult { - let last = self + if let Some((_, max)) = self.cached_bounds() { + return Ok(max); + } + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let raw: usize = self .indices - .execute_scalar( - self.indices.len() - 1, - &mut LEGACY_SESSION.create_execution_ctx(), - )? - .as_primitive() - .as_::() - .ok_or_else(|| vortex_err!("index does not fit in usize"))?; - Ok(last - self.offset) + .statistics() + .compute_max(&mut ctx) + .ok_or_else(|| vortex_err!("max index unavailable"))?; + raw.checked_sub(self.offset) + .ok_or_else(|| vortex_err!("offset {} exceeds max index {}", self.offset, raw)) } /// Filter the patches by a mask, resulting in new patches for the filtered array. @@ -1750,6 +1794,66 @@ mod test { assert_eq!(patches.search_index(9).unwrap(), SearchResult::NotFound(3)); } + #[test] + fn test_search_index_out_of_range_fast_path() { + let patches = Patches::new( + 100, + 0, + buffer![10u64, 20, 30, 40].into_array(), + buffer![1i32, 2, 3, 4].into_array(), + None, + ) + .unwrap(); + + assert_eq!( + patches.search_index_out_of_range(0), + Some(SearchResult::NotFound(0)) + ); + assert_eq!( + patches.search_index_out_of_range(9), + Some(SearchResult::NotFound(0)) + ); + assert_eq!( + patches.search_index_out_of_range(41), + Some(SearchResult::NotFound(4)) + ); + assert_eq!( + patches.search_index_out_of_range(99), + Some(SearchResult::NotFound(4)) + ); + assert_eq!(patches.search_index_out_of_range(10), None); + assert_eq!(patches.search_index_out_of_range(25), None); + assert_eq!(patches.search_index_out_of_range(40), None); + + assert_eq!(patches.search_index(5).unwrap(), SearchResult::NotFound(0)); + assert_eq!(patches.search_index(50).unwrap(), SearchResult::NotFound(4)); + } + + #[test] + fn test_search_index_out_of_range_with_offset() { + let patches = Patches::new( + 100, + 0, + buffer![10u64, 50, 90].into_array(), + buffer![1i32, 2, 3].into_array(), + None, + ) + .unwrap(); + let sliced = patches.slice(40..95).unwrap().unwrap(); + + assert_eq!(sliced.min_index().unwrap(), 10); + assert_eq!(sliced.max_index().unwrap(), 50); + assert_eq!( + sliced.search_index_out_of_range(5), + Some(SearchResult::NotFound(0)) + ); + assert_eq!( + sliced.search_index_out_of_range(54), + Some(SearchResult::NotFound(2)) + ); + assert_eq!(sliced.search_index_out_of_range(30), None); + } + #[test] fn test_mask_boundary_patches() { // Test masking patches at array boundaries diff --git a/vortex-array/src/stats/array.rs b/vortex-array/src/stats/array.rs index 248ca3587f2..a7fcf843f8f 100644 --- a/vortex-array/src/stats/array.rs +++ b/vortex-array/src/stats/array.rs @@ -5,7 +5,7 @@ use std::sync::Arc; -use parking_lot::RwLock; +use arc_swap::ArcSwap; use vortex_array::ExecutionCtx; use vortex_error::VortexError; use vortex_error::VortexResult; @@ -32,9 +32,20 @@ use crate::scalar::ScalarValue; /// A shared [`StatsSet`] stored in an array. Can be shared by copies of the array and can also be mutated in place. // TODO(adamg): This is a very bad name. -#[derive(Clone, Default, Debug)] +#[derive(Clone, Debug)] pub struct ArrayStats { - inner: Arc>, + // Lock-free reads via copy-on-write. Writes are last-writer-wins; + // concurrent writers may lose updates, which is acceptable for stats + // (they're hints and can be recomputed). + inner: Arc>, +} + +impl Default for ArrayStats { + fn default() -> Self { + Self { + inner: Arc::new(ArcSwap::from_pointee(StatsSet::default())), + } + } } /// Reference to an array's [`StatsSet`]. Can be used to get and mutate the underlying stats. @@ -55,42 +66,49 @@ impl ArrayStats { } pub fn set(&self, stat: Stat, value: Precision) { - self.inner.write().set(stat, value); + let mut new_stats = (**self.inner.load()).clone(); + new_stats.set(stat, value); + self.inner.store(Arc::new(new_stats)); } pub fn clear(&self, stat: Stat) { - self.inner.write().clear(stat); + let mut new_stats = (**self.inner.load()).clone(); + new_stats.clear(stat); + self.inner.store(Arc::new(new_stats)); } pub fn retain(&self, stats: &[Stat]) { - self.inner.write().retain_only(stats); + let mut new_stats = (**self.inner.load()).clone(); + new_stats.retain_only(stats); + self.inner.store(Arc::new(new_stats)); } } impl From for ArrayStats { fn from(value: StatsSet) -> Self { Self { - inner: Arc::new(RwLock::new(value)), + inner: Arc::new(ArcSwap::from_pointee(value)), } } } impl From for StatsSet { fn from(value: ArrayStats) -> Self { - value.inner.read().clone() + (**value.inner.load()).clone() } } impl StatsSetRef<'_> { pub(crate) fn replace(&self, stats: StatsSet) { - *self.array_stats.inner.write() = stats; + self.array_stats.inner.store(Arc::new(stats)); } pub fn set_iter(&self, iter: StatsSetIntoIter) { - let mut guard = self.array_stats.inner.write(); + let mut new_stats = (**self.array_stats.inner.load()).clone(); for (stat, value) in iter { - guard.set(stat, value); + new_stats.set(stat, value); } + self.array_stats.inner.store(Arc::new(new_stats)); } pub fn inherit_from(&self, stats: StatsSetRef<'_>) { @@ -101,38 +119,33 @@ impl StatsSetRef<'_> { } pub fn inherit<'a>(&self, iter: impl Iterator)>) { - let mut guard = self.array_stats.inner.write(); + let mut new_stats = (**self.array_stats.inner.load()).clone(); for (stat, value) in iter { if !value.is_exact() { - if !guard.get(*stat).is_some_and(|v| v.is_exact()) { - guard.set(*stat, value.clone()); + if !new_stats.get(*stat).is_some_and(|v| v.is_exact()) { + new_stats.set(*stat, value.clone()); } } else { - guard.set(*stat, value.clone()); + new_stats.set(*stat, value.clone()); } } + self.array_stats.inner.store(Arc::new(new_stats)); } pub fn with_typed_stats_set U>(&self, apply: F) -> U { - apply( - self.array_stats - .inner - .read() - .as_typed_ref(self.dyn_array_ref.dtype()), - ) + let snapshot = self.array_stats.inner.load(); + apply(snapshot.as_typed_ref(self.dyn_array_ref.dtype())) } pub fn with_mut_typed_stats_set U>(&self, apply: F) -> U { - apply( - self.array_stats - .inner - .write() - .as_mut_typed_ref(self.dyn_array_ref.dtype()), - ) + let mut new_stats = (**self.array_stats.inner.load()).clone(); + let result = apply(new_stats.as_mut_typed_ref(self.dyn_array_ref.dtype())); + self.array_stats.inner.store(Arc::new(new_stats)); + result } pub fn to_owned(&self) -> StatsSet { - self.array_stats.inner.read().clone() + (**self.array_stats.inner.load()).clone() } /// Returns a clone of the underlying [`ArrayStats`]. @@ -149,8 +162,8 @@ impl StatsSetRef<'_> { &self, f: F, ) -> R { - let lock = self.array_stats.inner.read(); - f(&mut lock.iter()) + let snapshot = self.array_stats.inner.load(); + f(&mut snapshot.iter()) } pub fn compute_stat(&self, stat: Stat, ctx: &mut ExecutionCtx) -> VortexResult> { @@ -288,12 +301,12 @@ impl StatsProvider for StatsSetRef<'_> { fn get(&self, stat: Stat) -> Option> { self.array_stats .inner - .read() + .load() .as_typed_ref(self.dyn_array_ref.dtype()) .get(stat) } fn len(&self) -> usize { - self.array_stats.inner.read().len() + self.array_stats.inner.load().len() } } diff --git a/vortex-array/src/stats/stats_set.rs b/vortex-array/src/stats/stats_set.rs index a0ca186fed6..b82b4a077a8 100644 --- a/vortex-array/src/stats/stats_set.rs +++ b/vortex-array/src/stats/stats_set.rs @@ -114,6 +114,14 @@ impl StatsSet { .map(|(_, v)| v.clone()) } + /// Borrow the value for a given stat without cloning the underlying `ScalarValue`. + pub fn get_value(&self, stat: Stat) -> Option<&Precision> { + self.values + .iter() + .find(|(s, _)| *s == stat) + .map(|(_, v)| v) + } + /// Length of the stats set pub fn len(&self) -> usize { self.values.len() @@ -225,6 +233,13 @@ pub struct TypedStatsSetRef<'a, 'b> { pub dtype: &'b DType, } +impl<'a, 'b> TypedStatsSetRef<'a, 'b> { + /// Borrow the value for a given stat without constructing a [`Scalar`]. + pub fn get_value(&self, stat: Stat) -> Option> { + self.values.get_value(stat).map(|p| p.as_ref()) + } +} + impl StatsProvider for TypedStatsSetRef<'_, '_> { fn get(&self, stat: Stat) -> Option> { self.values.get(stat).map(|p| {