diff --git a/Cargo.lock b/Cargo.lock index 0f7a33f3d14..af4afb92b5c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1238,6 +1238,16 @@ dependencies = [ "serde_core", ] +[[package]] +name = "cardinality-estimator" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53a7bc4e9a7ab9239a4a46df35d75101cee57defc8e4255a1b2b6f8eab8de87e" +dependencies = [ + "enum_dispatch", + "wyhash", +] + [[package]] name = "cargo-platform" version = "0.3.3" @@ -3760,6 +3770,18 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "enum_dispatch" +version = "0.3.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa18ce2bc66555b3218614519ac839ddb759a7d6720732f979ef8d13be147ecd" +dependencies = [ + "once_cell", + "proc-macro2", + "quote", + "syn 2.0.117", +] + [[package]] name = "env_filter" version = "1.0.1" @@ -10279,6 +10301,7 @@ dependencies = [ name = "vortex-compressor" version = "0.1.0" dependencies = [ + "cardinality-estimator", "codspeed-divan-compat", "itertools 0.14.0", "num-traits", @@ -11732,6 +11755,15 @@ version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1ffae5123b2d3fc086436f8834ae3ab053a283cfac8fe0a0b8eaae044768a4c4" +[[package]] +name = "wyhash" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf6e163c25e3fac820b4b453185ea2dea3b6a3e0a721d4d23d75bd33734c295" +dependencies = [ + "rand_core 0.6.4", +] + [[package]] name = "wyz" version = "0.5.1" diff --git a/Cargo.toml b/Cargo.toml index 35179561e14..77114026ab4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -112,6 +112,7 @@ bit-vec = "0.9.0" bitvec = "1.0.1" bytes = "1.11.1" bzip2 = "0.6.0" +cardinality-estimator = "1.0.3" cargo_metadata = "0.23.1" cbindgen = "0.29.0" cc = "1.2" diff --git a/encodings/fastlanes/src/rle/array/mod.rs b/encodings/fastlanes/src/rle/array/mod.rs index aeaba0ef42e..8793179c7d3 100644 --- a/encodings/fastlanes/src/rle/array/mod.rs +++ b/encodings/fastlanes/src/rle/array/mod.rs @@ -473,7 +473,7 @@ mod tests { .indices() .clone() .execute::(&mut ctx)? - .narrow()?; + .narrow(&mut ctx)?; let re_encoded = RLEData::encode(indices_prim.as_view(), &mut ctx)?; // Reconstruct the outer RLE with re-encoded indices. diff --git a/encodings/runend/src/compress.rs b/encodings/runend/src/compress.rs index b490076c355..ba46588d88b 100644 --- a/encodings/runend/src/compress.rs +++ b/encodings/runend/src/compress.rs @@ -84,7 +84,9 @@ pub fn runend_encode( } }; - let ends = ends.narrow().vortex_expect("Ends must succeed downcasting"); + let ends = ends + .narrow(ctx) + .vortex_expect("Ends must succeed downcasting"); ends.statistics() .set(Stat::IsStrictSorted, Precision::Exact(true.into())); diff --git a/vortex-array/benches/dict_compare.rs b/vortex-array/benches/dict_compare.rs index 602ad8638f7..b37974f6323 100644 --- a/vortex-array/benches/dict_compare.rs +++ b/vortex-array/benches/dict_compare.rs @@ -4,6 +4,7 @@ #![expect(clippy::unwrap_used)] use std::str::from_utf8; +use std::sync::LazyLock; use vortex_array::Canonical; use vortex_array::IntoArray; @@ -21,6 +22,7 @@ use vortex_array::expr::eq; use vortex_array::expr::lit; use vortex_array::expr::root; use vortex_array::scalar_fn::fns::operators::Operator; +use vortex_array::session::ArraySession; use vortex_session::VortexSession; fn main() { @@ -45,15 +47,21 @@ const LENGTH_AND_UNIQUE_VALUES: &[(usize, usize)] = &[ (100_000, 2048), ]; +static SESSION: LazyLock = + LazyLock::new(|| VortexSession::empty().with::()); + #[divan::bench(args = LENGTH_AND_UNIQUE_VALUES)] fn bench_compare_primitive(bencher: divan::Bencher, (len, uniqueness): (usize, usize)) { let primitive_arr = gen_primitive_for_dict::(len, uniqueness); - let dict = dict_encode(&primitive_arr.clone().into_array()).unwrap(); + let dict = dict_encode( + &primitive_arr.clone().into_array(), + &mut SESSION.create_execution_ctx(), + ) + .unwrap(); let value = primitive_arr.as_slice::()[0]; - let session = VortexSession::empty(); bencher - .with_inputs(|| (&dict, session.create_execution_ctx())) + .with_inputs(|| (&dict, SESSION.create_execution_ctx())) .bench_refs(|(dict, ctx)| { dict.clone() .into_array() @@ -67,13 +75,16 @@ fn bench_compare_primitive(bencher: divan::Bencher, (len, uniqueness): (usize, u #[divan::bench(args = LENGTH_AND_UNIQUE_VALUES)] fn bench_compare_varbin(bencher: divan::Bencher, (len, uniqueness): (usize, usize)) { let varbin_arr = VarBinArray::from(gen_varbin_words(len, uniqueness)); - let dict = dict_encode(&varbin_arr.clone().into_array()).unwrap(); + let dict = dict_encode( + &varbin_arr.clone().into_array(), + &mut SESSION.create_execution_ctx(), + ) + .unwrap(); let bytes = varbin_arr.with_iterator(|i| i.next().unwrap().unwrap().to_vec()); let value = from_utf8(bytes.as_slice()).unwrap(); - let session = VortexSession::empty(); bencher - .with_inputs(|| (&dict, session.create_execution_ctx())) + .with_inputs(|| (&dict, SESSION.create_execution_ctx())) .bench_refs(|(dict, ctx)| { dict.clone() .into_array() @@ -87,13 +98,16 @@ fn bench_compare_varbin(bencher: divan::Bencher, (len, uniqueness): (usize, usiz #[divan::bench(args = LENGTH_AND_UNIQUE_VALUES)] fn bench_compare_varbinview(bencher: divan::Bencher, (len, uniqueness): (usize, usize)) { let varbinview_arr = VarBinViewArray::from_iter_str(gen_varbin_words(len, uniqueness)); - let dict = dict_encode(&varbinview_arr.clone().into_array()).unwrap(); + let dict = dict_encode( + &varbinview_arr.clone().into_array(), + &mut SESSION.create_execution_ctx(), + ) + .unwrap(); let bytes = varbinview_arr.with_iterator(|i| i.next().unwrap().unwrap().to_vec()); let value = from_utf8(bytes.as_slice()).unwrap(); - let session = VortexSession::empty(); bencher - .with_inputs(|| (&dict, session.create_execution_ctx())) + .with_inputs(|| (&dict, SESSION.create_execution_ctx())) .bench_refs(|(dict, ctx)| { dict.clone() .into_array() @@ -122,13 +136,16 @@ fn bench_compare_sliced_dict_primitive( (codes_len, values_len): (usize, usize), ) { let primitive_arr = gen_primitive_for_dict::(codes_len.max(values_len), values_len); - let dict = dict_encode(&primitive_arr.clone().into_array()).unwrap(); + let dict = dict_encode( + &primitive_arr.clone().into_array(), + &mut SESSION.create_execution_ctx(), + ) + .unwrap(); let dict = dict.into_array().slice(0..codes_len).unwrap(); let value = primitive_arr.as_slice::()[0]; - let session = VortexSession::empty(); bencher - .with_inputs(|| (&dict, session.create_execution_ctx())) + .with_inputs(|| (&dict, SESSION.create_execution_ctx())) .bench_refs(|(dict, ctx)| { dict.clone() .apply(&eq(root(), lit(value))) @@ -144,14 +161,17 @@ fn bench_compare_sliced_dict_varbinview( (codes_len, values_len): (usize, usize), ) { let varbin_arr = VarBinArray::from(gen_varbin_words(codes_len.max(values_len), values_len)); - let dict = dict_encode(&varbin_arr.clone().into_array()).unwrap(); + let dict = dict_encode( + &varbin_arr.clone().into_array(), + &mut SESSION.create_execution_ctx(), + ) + .unwrap(); let dict = dict.into_array().slice(0..codes_len).unwrap(); let bytes = varbin_arr.with_iterator(|i| i.next().unwrap().unwrap().to_vec()); let value = from_utf8(bytes.as_slice()).unwrap(); - let session = VortexSession::empty(); bencher - .with_inputs(|| (&dict, session.create_execution_ctx())) + .with_inputs(|| (&dict, SESSION.create_execution_ctx())) .bench_refs(|(dict, ctx)| { dict.clone() .apply(&eq(root(), lit(value))) diff --git a/vortex-array/benches/dict_compress.rs b/vortex-array/benches/dict_compress.rs index 4f220ccaa23..26cedd00a25 100644 --- a/vortex-array/benches/dict_compress.rs +++ b/vortex-array/benches/dict_compress.rs @@ -3,12 +3,13 @@ #![expect(clippy::unwrap_used)] +use std::sync::LazyLock; + use divan::Bencher; use rand::distr::Distribution; use rand::distr::StandardUniform; use vortex_array::Canonical; use vortex_array::IntoArray; -use vortex_array::LEGACY_SESSION; use vortex_array::VortexSessionExecute; use vortex_array::arrays::VarBinArray; use vortex_array::arrays::VarBinViewArray; @@ -16,6 +17,8 @@ use vortex_array::arrays::dict_test::gen_primitive_for_dict; use vortex_array::arrays::dict_test::gen_varbin_words; use vortex_array::builders::dict::dict_encode; use vortex_array::dtype::NativePType; +use vortex_array::session::ArraySession; +use vortex_session::VortexSession; fn main() { divan::main(); @@ -35,35 +38,39 @@ const BENCH_ARGS: &[(usize, usize)] = &[ (10_000, 512), ]; +static SESSION: LazyLock = + LazyLock::new(|| VortexSession::empty().with::()); + #[divan::bench(types = [u8, f32, i64], args = BENCH_ARGS)] fn encode_primitives(bencher: Bencher, (len, unique_values): (usize, usize)) where T: NativePType, StandardUniform: Distribution, { - let primitive_arr = gen_primitive_for_dict::(len, unique_values); + let primitive_arr = gen_primitive_for_dict::(len, unique_values).into_array(); bencher - .with_inputs(|| &primitive_arr) - .bench_refs(|arr| dict_encode(&arr.clone().into_array())); + .with_inputs(|| (&primitive_arr, SESSION.create_execution_ctx())) + .bench_refs(|(arr, ctx)| dict_encode(arr, ctx)); } #[divan::bench(args = BENCH_ARGS)] fn encode_varbin(bencher: Bencher, (len, unique_values): (usize, usize)) { - let varbin_arr = VarBinArray::from(gen_varbin_words(len, unique_values)); + let varbin_arr = VarBinArray::from(gen_varbin_words(len, unique_values)).into_array(); bencher - .with_inputs(|| &varbin_arr) - .bench_refs(|arr| dict_encode(&arr.clone().into_array())); + .with_inputs(|| (&varbin_arr, SESSION.create_execution_ctx())) + .bench_refs(|(arr, ctx)| dict_encode(arr, ctx)); } #[divan::bench(args = BENCH_ARGS)] fn encode_varbinview(bencher: Bencher, (len, unique_values): (usize, usize)) { - let varbinview_arr = VarBinViewArray::from_iter_str(gen_varbin_words(len, unique_values)); + let varbinview_arr = + VarBinViewArray::from_iter_str(gen_varbin_words(len, unique_values)).into_array(); bencher - .with_inputs(|| &varbinview_arr) - .bench_refs(|arr| dict_encode(&arr.clone().into_array())); + .with_inputs(|| (&varbinview_arr, SESSION.create_execution_ctx())) + .bench_refs(|(arr, ctx)| dict_encode(arr, ctx)); } #[divan::bench(types = [u8, f32, i64], args = BENCH_ARGS)] @@ -72,34 +79,37 @@ where T: NativePType, StandardUniform: Distribution, { - let primitive_arr = gen_primitive_for_dict::(len, unique_values); - let dict = dict_encode(&primitive_arr.into_array()) + let primitive_arr = gen_primitive_for_dict::(len, unique_values).into_array(); + let dict = dict_encode(&primitive_arr, &mut SESSION.create_execution_ctx()) .unwrap() .into_array(); bencher - .with_inputs(|| (&dict, LEGACY_SESSION.create_execution_ctx())) + .with_inputs(|| (&dict, SESSION.create_execution_ctx())) .bench_refs(|(dict, ctx)| (**dict).clone().execute::(ctx)); } #[divan::bench(args = BENCH_ARGS)] fn decode_varbin(bencher: Bencher, (len, unique_values): (usize, usize)) { - let varbin_arr = VarBinArray::from(gen_varbin_words(len, unique_values)); - let dict = dict_encode(&varbin_arr.into_array()).unwrap().into_array(); + let varbin_arr = VarBinArray::from(gen_varbin_words(len, unique_values)).into_array(); + let dict = dict_encode(&varbin_arr, &mut SESSION.create_execution_ctx()) + .unwrap() + .into_array(); bencher - .with_inputs(|| (&dict, LEGACY_SESSION.create_execution_ctx())) + .with_inputs(|| (&dict, SESSION.create_execution_ctx())) .bench_refs(|(dict, ctx)| (**dict).clone().execute::(ctx)); } #[divan::bench(args = BENCH_ARGS)] fn decode_varbinview(bencher: Bencher, (len, unique_values): (usize, usize)) { - let varbinview_arr = VarBinViewArray::from_iter_str(gen_varbin_words(len, unique_values)); - let dict = dict_encode(&varbinview_arr.into_array()) + let varbinview_arr = + VarBinViewArray::from_iter_str(gen_varbin_words(len, unique_values)).into_array(); + let dict = dict_encode(&varbinview_arr, &mut SESSION.create_execution_ctx()) .unwrap() .into_array(); bencher - .with_inputs(|| (&dict, LEGACY_SESSION.create_execution_ctx())) + .with_inputs(|| (&dict, SESSION.create_execution_ctx())) .bench_refs(|(dict, ctx)| (**dict).clone().execute::(ctx)); } diff --git a/vortex-array/public-api.lock b/vortex-array/public-api.lock index b6de5284907..d7252f6f7b5 100644 --- a/vortex-array/public-api.lock +++ b/vortex-array/public-api.lock @@ -4038,7 +4038,7 @@ pub trait vortex_array::arrays::primitive::PrimitiveArrayExt: vortex_array::Type pub fn vortex_array::arrays::primitive::PrimitiveArrayExt::buffer_handle(&self) -> &vortex_array::buffer::BufferHandle -pub fn vortex_array::arrays::primitive::PrimitiveArrayExt::narrow(&self) -> vortex_error::VortexResult +pub fn vortex_array::arrays::primitive::PrimitiveArrayExt::narrow(&self, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult pub fn vortex_array::arrays::primitive::PrimitiveArrayExt::nullability(&self) -> vortex_array::dtype::Nullability @@ -4054,7 +4054,7 @@ impl> vortex_arr pub fn T::buffer_handle(&self) -> &vortex_array::buffer::BufferHandle -pub fn T::narrow(&self) -> vortex_error::VortexResult +pub fn T::narrow(&self, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult pub fn T::nullability(&self) -> vortex_array::dtype::Nullability @@ -7440,13 +7440,13 @@ pub trait vortex_array::builders::dict::DictEncoder: core::marker::Send pub fn vortex_array::builders::dict::DictEncoder::codes_ptype(&self) -> vortex_array::dtype::PType -pub fn vortex_array::builders::dict::DictEncoder::encode(&mut self, &vortex_array::ArrayRef) -> vortex_array::ArrayRef +pub fn vortex_array::builders::dict::DictEncoder::encode(&mut self, &vortex_array::ArrayRef, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult pub fn vortex_array::builders::dict::DictEncoder::reset(&mut self) -> vortex_array::ArrayRef -pub fn vortex_array::builders::dict::dict_encode(&vortex_array::ArrayRef) -> vortex_error::VortexResult +pub fn vortex_array::builders::dict::dict_encode(&vortex_array::ArrayRef, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult -pub fn vortex_array::builders::dict::dict_encode_with_constraints(&vortex_array::ArrayRef, &vortex_array::builders::dict::DictConstraints) -> vortex_error::VortexResult +pub fn vortex_array::builders::dict::dict_encode_with_constraints(&vortex_array::ArrayRef, &vortex_array::builders::dict::DictConstraints, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult pub fn vortex_array::builders::dict::dict_encoder(&vortex_array::ArrayRef, &vortex_array::builders::dict::DictConstraints) -> alloc::boxed::Box diff --git a/vortex-array/src/arrays/dict/array.rs b/vortex-array/src/arrays/dict/array.rs index eafbc9f6642..b3af2e47f34 100644 --- a/vortex-array/src/arrays/dict/array.rs +++ b/vortex-array/src/arrays/dict/array.rs @@ -4,6 +4,7 @@ use std::fmt::Display; use std::fmt::Formatter; +use num_traits::AsPrimitive; use vortex_buffer::BitBuffer; use vortex_error::VortexExpect; use vortex_error::VortexResult; @@ -133,7 +134,7 @@ pub trait DictArrayExt: TypedArrayRef + DictArraySlotsExt { } let referenced_mask = self.compute_referenced_values_mask(true)?; - let all_referenced = referenced_mask.iter().all(|v| v); + let all_referenced = referenced_mask.true_count() == referenced_mask.len(); vortex_ensure!(all_referenced, "value in dict not referenced"); } @@ -157,13 +158,9 @@ pub trait DictArrayExt: TypedArrayRef + DictArraySlotsExt { match codes_validity.bit_buffer() { AllOr::All => { match_each_integer_ptype!(codes_primitive.ptype(), |P| { - #[allow( - clippy::cast_possible_truncation, - clippy::cast_sign_loss, - reason = "codes are non-negative indices; a negative signed code would wrap to a large usize and panic on the bounds-checked array index" - )] - for &idx in codes_primitive.as_slice::

() { - values_vec[idx as usize] = referenced_value; + for idx in codes_primitive.as_slice::

() { + let idxu: usize = idx.as_(); + values_vec[idxu] = referenced_value; } }); } @@ -171,14 +168,9 @@ pub trait DictArrayExt: TypedArrayRef + DictArraySlotsExt { AllOr::Some(mask) => { match_each_integer_ptype!(codes_primitive.ptype(), |P| { let codes = codes_primitive.as_slice::

(); - - #[allow( - clippy::cast_possible_truncation, - clippy::cast_sign_loss, - reason = "codes are non-negative indices; a negative signed code would wrap to a large usize and panic on the bounds-checked array index" - )] mask.set_indices().for_each(|idx| { - values_vec[codes[idx] as usize] = referenced_value; + let idxu: usize = codes[idx].as_(); + values_vec[idxu] = referenced_value; }); }); } diff --git a/vortex-array/src/arrays/dict/compute/cast.rs b/vortex-array/src/arrays/dict/compute/cast.rs index ee80a1140b4..da4b04f94b8 100644 --- a/vortex-array/src/arrays/dict/compute/cast.rs +++ b/vortex-array/src/arrays/dict/compute/cast.rs @@ -46,13 +46,18 @@ impl CastReduce for Dict { #[cfg(test)] mod tests { + use std::sync::LazyLock; + use rstest::rstest; use vortex_buffer::buffer; + use vortex_session::VortexSession; use crate::IntoArray; #[expect(deprecated)] use crate::ToCanonical as _; + use crate::VortexSessionExecute; use crate::arrays::Dict; + use crate::arrays::DictArray; use crate::arrays::PrimitiveArray; use crate::arrays::dict::DictArraySlotsExt; use crate::assert_arrays_eq; @@ -62,11 +67,19 @@ mod tests { use crate::dtype::DType; use crate::dtype::Nullability; use crate::dtype::PType; + use crate::session::ArraySession; + + static SESSION: LazyLock = + LazyLock::new(|| VortexSession::empty().with::()); + + fn encode_dict(array: &crate::ArrayRef) -> DictArray { + dict_encode(array, &mut SESSION.create_execution_ctx()).unwrap() + } #[test] fn test_cast_dict_to_wider_type() { let values = buffer![1i32, 2, 3, 2, 1].into_array(); - let dict = dict_encode(&values).unwrap(); + let dict = encode_dict(&values); let casted = dict .into_array() @@ -86,7 +99,7 @@ mod tests { fn test_cast_dict_nullable() { let values = PrimitiveArray::from_option_iter([Some(10i32), None, Some(20), Some(10), None]); - let dict = dict_encode(&values.into_array()).unwrap(); + let dict = encode_dict(&values.into_array()); let casted = dict .into_array() @@ -102,7 +115,7 @@ mod tests { fn test_cast_dict_allvalid_to_nonnullable_and_back() { // Create an AllValid dict array (no nulls) let values = buffer![10i32, 20, 30, 40].into_array(); - let dict = dict_encode(&values).unwrap(); + let dict = encode_dict(&values); // Verify initial state - codes should be NonNullable, values should be NonNullable assert_eq!(dict.codes().dtype().nullability(), Nullability::NonNullable); @@ -171,10 +184,10 @@ mod tests { } #[rstest] - #[case(dict_encode(&buffer![1i32, 2, 3, 2, 1, 3].into_array()).unwrap().into_array())] - #[case(dict_encode(&buffer![100u32, 200, 100, 300, 200].into_array()).unwrap().into_array())] - #[case(dict_encode(&PrimitiveArray::from_option_iter([Some(1i32), None, Some(2), Some(1), None]).into_array()).unwrap().into_array())] - #[case(dict_encode(&buffer![1.5f32, 2.5, 1.5, 3.5].into_array()).unwrap().into_array())] + #[case(encode_dict(&buffer![1i32, 2, 3, 2, 1, 3].into_array()).into_array())] + #[case(encode_dict(&buffer![100u32, 200, 100, 300, 200].into_array()).into_array())] + #[case(encode_dict(&PrimitiveArray::from_option_iter([Some(1i32), None, Some(2), Some(1), None]).into_array()).into_array())] + #[case(encode_dict(&buffer![1.5f32, 2.5, 1.5, 3.5].into_array()).into_array())] fn test_cast_dict_conformance(#[case] array: crate::ArrayRef) { test_cast_conformance(&array); } diff --git a/vortex-array/src/arrays/dict/compute/min_max.rs b/vortex-array/src/arrays/dict/compute/min_max.rs index b390005adcf..47a1f0eb8ee 100644 --- a/vortex-array/src/arrays/dict/compute/min_max.rs +++ b/vortex-array/src/arrays/dict/compute/min_max.rs @@ -60,21 +60,27 @@ impl DynAggregateKernel for DictMinMaxKernel { #[cfg(test)] mod tests { + use std::sync::LazyLock; + use rstest::rstest; use vortex_buffer::buffer; use vortex_error::VortexResult; + use vortex_session::VortexSession; use crate::ArrayRef; use crate::IntoArray; - use crate::LEGACY_SESSION; use crate::VortexSessionExecute; use crate::aggregate_fn::fns::min_max::min_max; use crate::arrays::DictArray; use crate::arrays::PrimitiveArray; use crate::builders::dict::dict_encode; + use crate::session::ArraySession; + + static SESSION: LazyLock = + LazyLock::new(|| VortexSession::empty().with::()); fn assert_min_max(array: &ArrayRef, expected: Option<(i32, i32)>) -> VortexResult<()> { - let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let mut ctx = SESSION.create_execution_ctx(); match (min_max(array, &mut ctx)?, expected) { (Some(result), Some((expected_min, expected_max))) => { assert_eq!(i32::try_from(&result.min)?, expected_min); @@ -117,7 +123,11 @@ mod tests { } fn dict_single() -> DictArray { - dict_encode(&buffer![42i32].into_array()).expect("valid single-value dictionary") + dict_encode( + &buffer![42i32].into_array(), + &mut SESSION.create_execution_ctx(), + ) + .expect("valid single-value dictionary") } fn dict_nullable_codes() -> DictArray { @@ -132,6 +142,7 @@ mod tests { dict_encode( &PrimitiveArray::from_option_iter([Some(1i32), None, Some(2), Some(1), None]) .into_array(), + &mut SESSION.create_execution_ctx(), ) .expect("valid nullable-value dictionary") } @@ -166,7 +177,7 @@ mod tests { #[test] fn test_sliced_dict() -> VortexResult<()> { let reference = PrimitiveArray::from_iter([1, 5, 10, 50, 100]); - let dict = dict_encode(&reference.into_array())?; + let dict = dict_encode(&reference.into_array(), &mut SESSION.create_execution_ctx())?; let sliced = dict.slice(1..3)?; assert_min_max(&sliced, Some((5, 10))) } diff --git a/vortex-array/src/arrays/dict/compute/mod.rs b/vortex-array/src/arrays/dict/compute/mod.rs index c56cc8ef367..492cf92fbc5 100644 --- a/vortex-array/src/arrays/dict/compute/mod.rs +++ b/vortex-array/src/arrays/dict/compute/mod.rs @@ -54,16 +54,21 @@ impl FilterReduce for Dict { #[cfg(test)] mod test { + use std::sync::LazyLock; + #[expect(unused_imports)] use itertools::Itertools; use vortex_buffer::buffer; + use vortex_session::VortexSession; use crate::ArrayRef; use crate::IntoArray; #[expect(deprecated)] use crate::ToCanonical as _; + use crate::VortexSessionExecute; use crate::accessor::ArrayAccessor; use crate::arrays::ConstantArray; + use crate::arrays::DictArray; use crate::arrays::PrimitiveArray; use crate::arrays::VarBinArray; use crate::arrays::VarBinViewArray; @@ -77,6 +82,15 @@ mod test { use crate::dtype::Nullability; use crate::dtype::PType::I32; use crate::scalar_fn::fns::operators::Operator; + use crate::session::ArraySession; + + static SESSION: LazyLock = + LazyLock::new(|| VortexSession::empty().with::()); + + fn encode_dict(array: &ArrayRef) -> DictArray { + dict_encode(array, &mut SESSION.create_execution_ctx()).unwrap() + } + #[test] fn canonicalise_nullable_primitive() { let values: Vec> = (0..65) @@ -88,8 +102,7 @@ mod test { }) .collect(); - let dict = - dict_encode(&PrimitiveArray::from_option_iter(values.clone()).into_array()).unwrap(); + let dict = encode_dict(&PrimitiveArray::from_option_iter(values.clone()).into_array()); #[expect(deprecated)] let actual = dict.as_array().to_primitive(); @@ -103,7 +116,7 @@ mod test { let unique_values: Vec = (0..32).collect(); let expected = PrimitiveArray::from_iter((0..1000).map(|i| unique_values[i % 32])); - let dict = dict_encode(&expected.clone().into_array()).unwrap(); + let dict = encode_dict(&expected.clone().into_array()); #[expect(deprecated)] let actual = dict.as_array().to_primitive(); @@ -115,7 +128,7 @@ mod test { let unique_values: Vec = (0..100).collect(); let expected = PrimitiveArray::from_iter((0..1000).map(|i| unique_values[i % 100])); - let dict = dict_encode(&expected.clone().into_array()).unwrap(); + let dict = encode_dict(&expected.clone().into_array()); #[expect(deprecated)] let actual = dict.as_array().to_primitive(); @@ -129,7 +142,7 @@ mod test { DType::Utf8(Nullability::Nullable), ); assert_eq!(reference.len(), 6); - let dict = dict_encode(&reference.clone().into_array()).unwrap(); + let dict = encode_dict(&reference.clone().into_array()); #[expect(deprecated)] let flattened_dict = dict.as_array().to_varbinview(); assert_eq!( @@ -151,7 +164,7 @@ mod test { Some(1), Some(5), ]); - let dict = dict_encode(&reference.into_array()).unwrap(); + let dict = encode_dict(&reference.into_array()); dict.slice(1..4).unwrap() } @@ -169,17 +182,16 @@ mod test { #[test] fn test_mask_dict_array() { - let array = dict_encode(&buffer![2, 0, 2, 0, 10].into_array()).unwrap(); + let array = encode_dict(&buffer![2, 0, 2, 0, 10].into_array()); test_mask_conformance(&array.into_array()); - let array = dict_encode( + let array = encode_dict( &PrimitiveArray::from_option_iter([Some(2), None, Some(2), Some(0), Some(10)]) .into_array(), - ) - .unwrap(); + ); test_mask_conformance(&array.into_array()); - let array = dict_encode( + let array = encode_dict( &VarBinArray::from_iter( [ Some("hello"), @@ -191,24 +203,22 @@ mod test { DType::Utf8(Nullability::Nullable), ) .into_array(), - ) - .unwrap(); + ); test_mask_conformance(&array.into_array()); } #[test] fn test_filter_dict_array() { - let array = dict_encode(&buffer![2, 0, 2, 0, 10].into_array()).unwrap(); + let array = encode_dict(&buffer![2, 0, 2, 0, 10].into_array()); test_filter_conformance(&array.into_array()); - let array = dict_encode( + let array = encode_dict( &PrimitiveArray::from_option_iter([Some(2), None, Some(2), Some(0), Some(10)]) .into_array(), - ) - .unwrap(); + ); test_filter_conformance(&array.into_array()); - let array = dict_encode( + let array = encode_dict( &VarBinArray::from_iter( [ Some("hello"), @@ -220,14 +230,13 @@ mod test { DType::Utf8(Nullability::Nullable), ) .into_array(), - ) - .unwrap(); + ); test_filter_conformance(&array.into_array()); } #[test] fn test_take_dict() { - let array = dict_encode(&buffer![1, 2].into_array()).unwrap(); + let array = encode_dict(&buffer![1, 2].into_array()); assert_eq!( array @@ -240,17 +249,16 @@ mod test { #[test] fn test_take_dict_conformance() { - let array = dict_encode(&buffer![2, 0, 2, 0, 10].into_array()).unwrap(); + let array = encode_dict(&buffer![2, 0, 2, 0, 10].into_array()); test_take_conformance(&array.into_array()); - let array = dict_encode( + let array = encode_dict( &PrimitiveArray::from_option_iter([Some(2), None, Some(2), Some(0), Some(10)]) .into_array(), - ) - .unwrap(); + ); test_take_conformance(&array.into_array()); - let array = dict_encode( + let array = encode_dict( &VarBinArray::from_iter( [ Some("hello"), @@ -262,18 +270,22 @@ mod test { DType::Utf8(Nullability::Nullable), ) .into_array(), - ) - .unwrap(); + ); test_take_conformance(&array.into_array()); } } #[cfg(test)] mod tests { + use std::sync::LazyLock; + use rstest::rstest; use vortex_buffer::buffer; + use vortex_session::VortexSession; + use crate::ArrayRef; use crate::IntoArray; + use crate::VortexSessionExecute; use crate::arrays::DictArray; use crate::arrays::PrimitiveArray; use crate::arrays::VarBinArray; @@ -281,35 +293,43 @@ mod tests { use crate::compute::conformance::consistency::test_array_consistency; use crate::dtype::DType; use crate::dtype::Nullability; + use crate::session::ArraySession; + + static SESSION: LazyLock = + LazyLock::new(|| VortexSession::empty().with::()); + + fn encode_dict(array: &ArrayRef) -> DictArray { + dict_encode(array, &mut SESSION.create_execution_ctx()).unwrap() + } #[rstest] // Primitive arrays - #[case::dict_i32(dict_encode(&buffer![1i32, 2, 3, 2, 1].into_array()).unwrap())] + #[case::dict_i32(encode_dict(&buffer![1i32, 2, 3, 2, 1].into_array()))] #[case::dict_nullable_codes(DictArray::try_new( buffer![0u32, 1, 2, 2, 0].into_array(), PrimitiveArray::from_option_iter([Some(10), Some(20), None]).into_array(), ).unwrap())] - #[case::dict_nullable_values(dict_encode( + #[case::dict_nullable_values(encode_dict( &PrimitiveArray::from_option_iter([Some(1i32), None, Some(2), Some(1), None]).into_array() - ).unwrap())] - #[case::dict_u64(dict_encode(&buffer![100u64, 200, 100, 300, 200].into_array()).unwrap())] + ))] + #[case::dict_u64(encode_dict(&buffer![100u64, 200, 100, 300, 200].into_array()))] // String arrays - #[case::dict_str(dict_encode( + #[case::dict_str(encode_dict( &VarBinArray::from_iter( ["hello", "world", "hello", "test", "world"].map(Some), DType::Utf8(Nullability::NonNullable), ).into_array() - ).unwrap())] - #[case::dict_nullable_str(dict_encode( + ))] + #[case::dict_nullable_str(encode_dict( &VarBinArray::from_iter( [Some("hello"), None, Some("world"), Some("hello"), None], DType::Utf8(Nullability::Nullable), ).into_array() - ).unwrap())] + ))] // Edge cases - #[case::dict_single(dict_encode(&buffer![42i32].into_array()).unwrap())] - #[case::dict_all_same(dict_encode(&buffer![5i32, 5, 5, 5, 5].into_array()).unwrap())] - #[case::dict_large(dict_encode(&PrimitiveArray::from_iter((0..1000).map(|i| i % 10)).into_array()).unwrap())] + #[case::dict_single(encode_dict(&buffer![42i32].into_array()))] + #[case::dict_all_same(encode_dict(&buffer![5i32, 5, 5, 5, 5].into_array()))] + #[case::dict_large(encode_dict(&PrimitiveArray::from_iter((0..1000).map(|i| i % 10)).into_array()))] fn test_dict_consistency(#[case] array: DictArray) { test_array_consistency(&array.into_array()); } diff --git a/vortex-array/src/arrays/primitive/array/cast.rs b/vortex-array/src/arrays/primitive/array/cast.rs index 0023ea71fb7..0d9db991d26 100644 --- a/vortex-array/src/arrays/primitive/array/cast.rs +++ b/vortex-array/src/arrays/primitive/array/cast.rs @@ -37,17 +37,25 @@ impl PrimitiveData { #[cfg(test)] mod tests { + use std::sync::LazyLock; + use rstest::rstest; use vortex_buffer::Buffer; use vortex_buffer::buffer; + use vortex_session::VortexSession; + use crate::VortexSessionExecute; use crate::arrays::PrimitiveArray; use crate::arrays::primitive::PrimitiveArrayExt; use crate::dtype::DType; use crate::dtype::Nullability; use crate::dtype::PType; + use crate::session::ArraySession; use crate::validity::Validity; + static SESSION: LazyLock = + LazyLock::new(|| VortexSession::empty().with::()); + #[test] fn test_downcast_all_invalid() { let array = PrimitiveArray::new( @@ -55,7 +63,7 @@ mod tests { Validity::AllInvalid, ); - let result = array.narrow().unwrap(); + let result = array.narrow(&mut SESSION.create_execution_ctx()).unwrap(); assert_eq!( result.dtype(), &DType::Primitive(PType::U8, Nullability::Nullable) @@ -74,7 +82,7 @@ mod tests { #[case(vec![i32::MIN as i64, i32::MAX as i64], PType::I32)] fn test_downcast_signed(#[case] values: Vec, #[case] expected_ptype: PType) { let array = PrimitiveArray::from_iter(values); - let result = array.narrow().unwrap(); + let result = array.narrow(&mut SESSION.create_execution_ctx()).unwrap(); assert_eq!(result.ptype(), expected_ptype); } @@ -86,21 +94,21 @@ mod tests { #[case(vec![0_u64, u32::MAX as u64], PType::U32)] fn test_downcast_unsigned(#[case] values: Vec, #[case] expected_ptype: PType) { let array = PrimitiveArray::from_iter(values); - let result = array.narrow().unwrap(); + let result = array.narrow(&mut SESSION.create_execution_ctx()).unwrap(); assert_eq!(result.ptype(), expected_ptype); } #[test] fn test_downcast_keeps_original_if_too_large() { let array = PrimitiveArray::from_iter(vec![0_u64, u64::MAX]); - let result = array.narrow().unwrap(); + let result = array.narrow(&mut SESSION.create_execution_ctx()).unwrap(); assert_eq!(result.ptype(), PType::U64); } #[test] fn test_downcast_preserves_nullability() { let array = PrimitiveArray::from_option_iter([Some(0_i32), None, Some(127)]); - let result = array.narrow().unwrap(); + let result = array.narrow(&mut SESSION.create_execution_ctx()).unwrap(); assert_eq!( result.dtype(), &DType::Primitive(PType::U8, Nullability::Nullable) @@ -113,7 +121,7 @@ mod tests { fn test_downcast_preserves_values() { let values = vec![-100_i16, 0, 100]; let array = PrimitiveArray::from_iter(values); - let result = array.narrow().unwrap(); + let result = array.narrow(&mut SESSION.create_execution_ctx()).unwrap(); assert_eq!(result.ptype(), PType::I8); // Check that the values were properly downscaled @@ -124,14 +132,14 @@ mod tests { #[test] fn test_downcast_with_mixed_signs_chooses_signed() { let array = PrimitiveArray::from_iter(vec![-1_i32, 200]); - let result = array.narrow().unwrap(); + let result = array.narrow(&mut SESSION.create_execution_ctx()).unwrap(); assert_eq!(result.ptype(), PType::I16); } #[test] fn test_downcast_floats() { let array = PrimitiveArray::from_iter(vec![1.0_f32, 2.0, 3.0]); - let result = array.narrow().unwrap(); + let result = array.narrow(&mut SESSION.create_execution_ctx()).unwrap(); // Floats should remain unchanged since they can't be downscaled to integers assert_eq!(result.ptype(), PType::F32); } @@ -139,9 +147,9 @@ mod tests { #[test] fn test_downcast_empty_array() { let array = PrimitiveArray::new(Buffer::::empty(), Validity::AllInvalid); - let result = array.narrow().unwrap(); + let result = array.narrow(&mut SESSION.create_execution_ctx()).unwrap(); let array2 = PrimitiveArray::new(Buffer::::empty(), Validity::NonNullable); - let result2 = array2.narrow().unwrap(); + let result2 = array2.narrow(&mut SESSION.create_execution_ctx()).unwrap(); // Empty arrays should not have their validity changed assert!(matches!(result.validity(), Ok(Validity::AllInvalid))); assert!(matches!(result2.validity(), Ok(Validity::NonNullable))); diff --git a/vortex-array/src/arrays/primitive/array/mod.rs b/vortex-array/src/arrays/primitive/array/mod.rs index 254c4b0baee..d0225ab532e 100644 --- a/vortex-array/src/arrays/primitive/array/mod.rs +++ b/vortex-array/src/arrays/primitive/array/mod.rs @@ -15,10 +15,9 @@ use vortex_error::VortexResult; use vortex_error::vortex_err; use vortex_error::vortex_panic; -use crate::LEGACY_SESSION; +use crate::ExecutionCtx; #[expect(deprecated)] use crate::ToCanonical as _; -use crate::VortexSessionExecute; use crate::array::Array; use crate::array::ArrayParts; use crate::array::TypedArrayRef; @@ -148,13 +147,12 @@ pub trait PrimitiveArrayExt: TypedArrayRef { } /// Narrow the array to the smallest possible integer type that can represent all values. - fn narrow(&self) -> VortexResult { + fn narrow(&self, ctx: &mut ExecutionCtx) -> VortexResult { if !self.ptype().is_int() { return Ok(self.to_owned()); } - let mut ctx = LEGACY_SESSION.create_execution_ctx(); - let Some(min_max) = min_max(self.as_ref(), &mut ctx)? else { + let Some(min_max) = min_max(self.as_ref(), ctx)? else { return Ok(PrimitiveArray::new( Buffer::::zeroed(self.len()), self.validity(), @@ -183,58 +181,46 @@ pub trait PrimitiveArrayExt: TypedArrayRef { if min < 0 || max < 0 { // Signed if min >= i8::MIN as i64 && max <= i8::MAX as i64 { - #[expect(deprecated)] - let result = self + return self .as_ref() .cast(DType::Primitive(PType::I8, nullability))? - .to_primitive(); - return Ok(result); + .execute::(ctx); } if min >= i16::MIN as i64 && max <= i16::MAX as i64 { - #[expect(deprecated)] - let result = self + return self .as_ref() .cast(DType::Primitive(PType::I16, nullability))? - .to_primitive(); - return Ok(result); + .execute::(ctx); } if min >= i32::MIN as i64 && max <= i32::MAX as i64 { - #[expect(deprecated)] - let result = self + return self .as_ref() .cast(DType::Primitive(PType::I32, nullability))? - .to_primitive(); - return Ok(result); + .execute::(ctx); } } else { // Unsigned if max <= u8::MAX as i64 { - #[expect(deprecated)] - let result = self + return self .as_ref() .cast(DType::Primitive(PType::U8, nullability))? - .to_primitive(); - return Ok(result); + .execute::(ctx); } if max <= u16::MAX as i64 { - #[expect(deprecated)] - let result = self + return self .as_ref() .cast(DType::Primitive(PType::U16, nullability))? - .to_primitive(); - return Ok(result); + .execute::(ctx); } if max <= u32::MAX as i64 { - #[expect(deprecated)] - let result = self + return self .as_ref() .cast(DType::Primitive(PType::U32, nullability))? - .to_primitive(); - return Ok(result); + .execute::(ctx); } } @@ -493,18 +479,18 @@ impl Array { let buffer = match &validity { Validity::NonNullable | Validity::AllValid => { - BufferMut::::from_iter(buf_iter.zip(iter::repeat(true)).map(f)) + Buffer::::from_trusted_len_iter(buf_iter.zip(iter::repeat(true)).map(f)) } Validity::AllInvalid => { - BufferMut::::from_iter(buf_iter.zip(iter::repeat(false)).map(f)) + Buffer::::from_trusted_len_iter(buf_iter.zip(iter::repeat(false)).map(f)) } Validity::Array(val) => { #[expect(deprecated)] let val = val.to_bool().into_bit_buffer(); - BufferMut::::from_iter(buf_iter.zip(val.iter()).map(f)) + Buffer::::from_trusted_len_iter(buf_iter.zip(val.iter()).map(f)) } }; - Ok(PrimitiveArray::new(buffer.freeze(), validity)) + Ok(PrimitiveArray::new(buffer, validity)) } } diff --git a/vortex-array/src/arrays/varbin/vtable/mod.rs b/vortex-array/src/arrays/varbin/vtable/mod.rs index b72ac5eaa48..2def49ecb3b 100644 --- a/vortex-array/src/arrays/varbin/vtable/mod.rs +++ b/vortex-array/src/arrays/varbin/vtable/mod.rs @@ -22,6 +22,7 @@ use crate::array::VTable; use crate::arrays::varbin::VarBinArrayExt; use crate::arrays::varbin::VarBinData; use crate::arrays::varbin::array::NUM_SLOTS; +use crate::arrays::varbin::array::OFFSETS_SLOT; use crate::arrays::varbin::array::SLOT_NAMES; use crate::buffer::BufferHandle; use crate::dtype::DType; @@ -90,7 +91,7 @@ impl VTable for VarBin { "VarBinArray expected {NUM_SLOTS} slots, found {}", slots.len() ); - let offsets = slots[crate::arrays::varbin::array::OFFSETS_SLOT] + let offsets = slots[OFFSETS_SLOT] .as_ref() .vortex_expect("VarBinArray offsets slot"); vortex_ensure!( @@ -138,7 +139,6 @@ impl VTable for VarBin { dtype: &DType, len: usize, metadata: &[u8], - buffers: &[BufferHandle], children: &dyn ArrayChildren, _session: &VortexSession, diff --git a/vortex-array/src/builders/dict/bytes.rs b/vortex-array/src/builders/dict/bytes.rs index d66870d6ba5..2aa81e5b82b 100644 --- a/vortex-array/src/builders/dict/bytes.rs +++ b/vortex-array/src/builders/dict/bytes.rs @@ -1,15 +1,21 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors +use std::cell::OnceCell; use std::hash::BuildHasher; use std::mem; use std::sync::Arc; +use itertools::Itertools; +use num_traits::AsPrimitive; +use vortex_array::ExecutionCtx; use vortex_buffer::BitBufferMut; use vortex_buffer::BufferMut; use vortex_buffer::ByteBufferMut; use vortex_error::VortexExpect; +use vortex_error::VortexResult; use vortex_error::vortex_panic; +use vortex_mask::AllOr; use vortex_utils::aliases::hash_map::DefaultHashBuilder; use vortex_utils::aliases::hash_map::HashTable; use vortex_utils::aliases::hash_map::HashTableEntry; @@ -18,23 +24,24 @@ use vortex_utils::aliases::hash_map::RandomState; use super::DictConstraints; use super::DictEncoder; use crate::ArrayRef; +use crate::ArrayView; use crate::IntoArray; -use crate::accessor::ArrayAccessor; use crate::arrays::PrimitiveArray; use crate::arrays::VarBin; use crate::arrays::VarBinView; use crate::arrays::VarBinViewArray; +use crate::arrays::varbin::VarBinArrayExt; use crate::arrays::varbinview::build_views::BinaryView; -#[expect(deprecated)] -use crate::canonical::ToCanonical as _; use crate::dtype::DType; use crate::dtype::PType; use crate::dtype::UnsignedPType; +use crate::match_each_integer_ptype; use crate::validity::Validity; /// Dictionary encode varbin array. Specializes for primitive byte arrays to avoid double copying -pub struct BytesDictBuilder { - lookup: Option>, +pub struct BytesDictBuilder { + lookup: Option>, + null_code: OnceCell, views: BufferMut, values: ByteBufferMut, values_nulls: BitBufferMut, @@ -58,6 +65,7 @@ impl BytesDictBuilder { Self { lookup: Some(HashTable::new()), views: BufferMut::::empty(), + null_code: OnceCell::new(), values: BufferMut::empty(), values_nulls: BitBufferMut::empty(), hasher: DefaultHashBuilder::default(), @@ -71,18 +79,16 @@ impl BytesDictBuilder { self.views.len() * size_of::() + self.values.len() } - fn lookup_bytes(&self, idx: usize) -> Option<&[u8]> { - self.values_nulls.value(idx).then(|| { - let bin_view = &self.views[idx]; - if bin_view.is_inlined() { - bin_view.as_inlined().value() - } else { - &self.values[bin_view.as_view().as_range()] - } - }) + fn lookup_bytes(&self, idx: usize) -> &[u8] { + let bin_view = &self.views[idx]; + if bin_view.is_inlined() { + bin_view.as_inlined().value() + } else { + &self.values[bin_view.as_view().as_range()] + } } - fn encode_value(&mut self, lookup: &mut HashTable, val: Option<&[u8]>) -> Option { + fn encode_value(&mut self, lookup: &mut HashTable, val: &[u8]) -> Option { match lookup.entry( self.hasher.hash_one(val), |idx| val == self.lookup_bytes(idx.as_()), @@ -95,35 +101,25 @@ impl BytesDictBuilder { } let next_code = self.views.len(); - match val { - None => { - // Null value - self.views.push(BinaryView::default()); - self.values_nulls.append_false(); - } - Some(val) => { - let view = BinaryView::make_view( - val, - 0, - u32::try_from(self.values.len()) - .vortex_expect("values length must fit in u32"), - ); - let additional_bytes = if view.is_inlined() { - size_of::() - } else { - size_of::() + val.len() - }; + let view = BinaryView::make_view( + val, + 0, + u32::try_from(self.values.len()).vortex_expect("values length must fit in u32"), + ); + let additional_bytes = if view.is_inlined() { + size_of::() + } else { + size_of::() + val.len() + }; - if self.dict_bytes() + additional_bytes > self.max_dict_bytes { - return None; - } + if self.dict_bytes() + additional_bytes > self.max_dict_bytes { + return None; + } - self.views.push(view); - self.values_nulls.append_true(); - if !view.is_inlined() { - self.values.extend_from_slice(val); - } - } + self.views.push(view); + self.values_nulls.append_true(); + if !view.is_inlined() { + self.values.extend_from_slice(val); } let next_code = Code::from_usize(next_code).unwrap_or_else(|| { @@ -134,29 +130,162 @@ impl BytesDictBuilder { } } - fn encode_bytes>(&mut self, accessor: &A, len: usize) -> ArrayRef { + #[expect(clippy::cognitive_complexity)] + fn encode_varbin( + &mut self, + var_bin: ArrayView, + ctx: &mut ExecutionCtx, + ) -> VortexResult { let mut local_lookup = self.lookup.take().vortex_expect("Must have a lookup dict"); - let mut codes: BufferMut = BufferMut::with_capacity(len); + let mut codes: BufferMut = BufferMut::with_capacity(var_bin.len()); - accessor.with_iterator(|it| { - for value in it { - let Some(code) = self.encode_value(&mut local_lookup, value) else { - break; - }; - // SAFETY: we reserved capacity in the buffer for `len` elements - unsafe { codes.push_unchecked(code) } + let offsets = var_bin.offsets().clone().execute::(ctx)?; + let bytes = var_bin.bytes(); + let validity_mask = var_bin.validity()?.execute_mask(var_bin.len(), ctx)?; + + match validity_mask.bit_buffer() { + AllOr::All => { + match_each_integer_ptype!(offsets.ptype(), |P| { + let slice_offsets = offsets.as_slice::

(); + for w in slice_offsets.windows(2) { + let start = w[0].as_(); + let end = w[1].as_(); + let Some(code) = self.encode_value(&mut local_lookup, &bytes[start..end]) + else { + break; + }; + // SAFETY: we reserved capacity in the buffer for `len` elements + unsafe { codes.push_unchecked(code) } + } + }) } - }); + AllOr::None => { + self.views.push(BinaryView::default()); + self.values_nulls.append_false(); + unsafe { + codes.push_n_unchecked( + Code::from_usize(0).vortex_expect("must fit 0"), + var_bin.len(), + ) + } + } + AllOr::Some(b) => { + match_each_integer_ptype!(offsets.ptype(), |P| { + let slice_offsets = offsets.as_slice::

(); + for (w, valid) in slice_offsets.windows(2).zip_eq(b.iter()) { + if !valid { + let code = self.null_code.get_or_init(|| { + let code = self.views.len(); + self.views.push(BinaryView::default()); + self.values_nulls.append_false(); + Code::from_usize(code).unwrap_or_else(|| { + vortex_panic!("{} has to fit into {}", code, Code::PTYPE) + }) + }); + unsafe { codes.push_unchecked(*code) } + } else { + let start = w[0].as_(); + let end = w[1].as_(); + let Some(code) = + self.encode_value(&mut local_lookup, &bytes[start..end]) + else { + break; + }; + // SAFETY: we reserved capacity in the buffer for `len` elements + unsafe { codes.push_unchecked(code) } + } + } + }) + } + } // Restore lookup dictionary back into the struct self.lookup = Some(local_lookup); - PrimitiveArray::new(codes, Validity::NonNullable).into_array() + Ok(PrimitiveArray::new(codes, Validity::NonNullable)) + } + + fn encode_varbinview( + &mut self, + var_bin_view: ArrayView, + ctx: &mut ExecutionCtx, + ) -> VortexResult { + let mut local_lookup = self.lookup.take().vortex_expect("Must have a lookup dict"); + let mut codes: BufferMut = BufferMut::with_capacity(var_bin_view.len()); + + let views = var_bin_view.views(); + let buffers = var_bin_view + .data_buffers() + .iter() + .map(|b| b.as_host()) + .collect::>(); + let validity_mask = var_bin_view + .validity()? + .execute_mask(var_bin_view.len(), ctx)?; + + match validity_mask.bit_buffer() { + AllOr::All => { + for view in views { + let value = if view.is_inlined() { + view.as_inlined().value() + } else { + &buffers[view.as_view().buffer_index as usize][view.as_view().as_range()] + }; + let Some(code) = self.encode_value(&mut local_lookup, value) else { + break; + }; + // SAFETY: we reserved capacity in the buffer for `len` elements + unsafe { codes.push_unchecked(code) } + } + } + AllOr::None => { + self.views.push(BinaryView::default()); + self.values_nulls.append_false(); + unsafe { + codes.push_n_unchecked( + Code::from_usize(0).vortex_expect("must fit 0"), + var_bin_view.len(), + ) + } + } + AllOr::Some(b) => { + for (view, valid) in views.iter().zip_eq(b.iter()) { + if !valid { + let code = self.null_code.get_or_init(|| { + let code = self.views.len(); + self.views.push(BinaryView::default()); + self.values_nulls.append_false(); + Code::from_usize(code).unwrap_or_else(|| { + vortex_panic!("{} has to fit into {}", code, Code::PTYPE) + }) + }); + unsafe { codes.push_unchecked(*code) } + } else { + let value = if view.is_inlined() { + view.as_inlined().value() + } else { + &buffers[view.as_view().buffer_index as usize] + [view.as_view().as_range()] + }; + let Some(code) = self.encode_value(&mut local_lookup, value) else { + break; + }; + // SAFETY: we reserved capacity in the buffer for `len` elements + unsafe { codes.push_unchecked(code) } + } + } + } + } + + // Restore lookup dictionary back into the struct + self.lookup = Some(local_lookup); + + Ok(PrimitiveArray::new(codes, Validity::NonNullable)) } } impl DictEncoder for BytesDictBuilder { - fn encode(&mut self, array: &ArrayRef) -> ArrayRef { + fn encode(&mut self, array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult { debug_assert_eq!( &self.dtype, array.dtype(), @@ -165,17 +294,18 @@ impl DictEncoder for BytesDictBuilder { self.dtype ); - let len = array.len(); if let Some(varbinview) = array.as_opt::() { - self.encode_bytes(&varbinview.into_owned(), len) + self.encode_varbinview(varbinview, ctx) } else if let Some(varbin) = array.as_opt::() { - self.encode_bytes(&varbin.into_owned(), len) + self.encode_varbin(varbin, ctx) } else { // NOTE(aduffy): it is very rare that this path would be taken, only e.g. // if we're performing dictionary encoding downstream of some other compression. - #[expect(deprecated)] - let varbinview = array.to_varbinview(); - self.encode_bytes(&varbinview, len) + let vbv_array = array.clone().execute::(ctx)?.into_array(); + let varbinview = vbv_array + .as_opt::() + .vortex_expect("Must be a VarBinView"); + self.encode_varbinview(varbinview, ctx) } } @@ -205,24 +335,38 @@ impl DictEncoder for BytesDictBuilder { #[cfg(test)] mod test { use std::str; + use std::sync::LazyLock; + + use vortex_array::arrays::PrimitiveArray; + use vortex_session::VortexSession; use crate::IntoArray; - #[expect(deprecated)] - use crate::ToCanonical as _; + use crate::VortexSessionExecute; use crate::accessor::ArrayAccessor; use crate::arrays::VarBinArray; + use crate::arrays::VarBinViewArray; use crate::arrays::dict::DictArraySlotsExt; use crate::builders::dict::dict_encode; + use crate::session::ArraySession; + + static SESSION: LazyLock = + LazyLock::new(|| VortexSession::empty().with::()); #[test] fn encode_varbin() { - let arr = VarBinArray::from(vec!["hello", "world", "hello", "again", "world"]); - let dict = dict_encode(&arr.into_array()).unwrap(); - #[expect(deprecated)] - let codes = dict.codes().to_primitive(); + let arr = VarBinViewArray::from_iter_str(vec!["hello", "world", "hello", "again", "world"]); + let dict = dict_encode(&arr.into_array(), &mut SESSION.create_execution_ctx()).unwrap(); + let codes = dict + .codes() + .clone() + .execute::(&mut SESSION.create_execution_ctx()) + .unwrap(); assert_eq!(codes.as_slice::(), &[0, 1, 0, 2, 1]); - #[expect(deprecated)] - let values = dict.values().to_varbinview(); + let values = dict + .values() + .clone() + .execute::(&mut SESSION.create_execution_ctx()) + .unwrap(); values.with_iterator(|iter| { assert_eq!( iter.flatten() @@ -235,7 +379,7 @@ mod test { #[test] fn encode_varbin_nulls() { - let arr: VarBinArray = vec![ + let arr: VarBinViewArray = vec![ Some("hello"), None, Some("world"), @@ -247,12 +391,18 @@ mod test { ] .into_iter() .collect(); - let dict = dict_encode(&arr.into_array()).unwrap(); - #[expect(deprecated)] - let codes = dict.codes().to_primitive(); + let dict = dict_encode(&arr.into_array(), &mut SESSION.create_execution_ctx()).unwrap(); + let codes = dict + .codes() + .clone() + .execute::(&mut SESSION.create_execution_ctx()) + .unwrap(); assert_eq!(codes.as_slice::(), &[0, 1, 2, 0, 1, 3, 2, 1]); - #[expect(deprecated)] - let values = dict.values().to_varbinview(); + let values = dict + .values() + .clone() + .execute::(&mut SESSION.create_execution_ctx()) + .unwrap(); values.with_iterator(|iter| { assert_eq!( iter.map(|b| b.map(|v| unsafe { str::from_utf8_unchecked(v) })) @@ -265,9 +415,12 @@ mod test { #[test] fn repeated_values() { let arr = VarBinArray::from(vec!["a", "a", "b", "b", "a", "b", "a", "b"]); - let dict = dict_encode(&arr.into_array()).unwrap(); - #[expect(deprecated)] - let values = dict.values().to_varbinview(); + let dict = dict_encode(&arr.into_array(), &mut SESSION.create_execution_ctx()).unwrap(); + let values = dict + .values() + .clone() + .execute::(&mut SESSION.create_execution_ctx()) + .unwrap(); values.with_iterator(|iter| { assert_eq!( iter.flatten() @@ -276,8 +429,11 @@ mod test { vec!["a", "b"] ); }); - #[expect(deprecated)] - let codes = dict.codes().to_primitive(); + let codes = dict + .codes() + .clone() + .execute::(&mut SESSION.create_execution_ctx()) + .unwrap(); assert_eq!(codes.as_slice::(), &[0, 0, 1, 1, 0, 1, 0, 1]); } } diff --git a/vortex-array/src/builders/dict/mod.rs b/vortex-array/src/builders/dict/mod.rs index 7cfcfbbfb8a..4bfb89bb582 100644 --- a/vortex-array/src/builders/dict/mod.rs +++ b/vortex-array/src/builders/dict/mod.rs @@ -3,16 +3,16 @@ use bytes::bytes_dict_builder; use primitive::primitive_dict_builder; +use vortex_array::ExecutionCtx; use vortex_error::VortexResult; use vortex_error::vortex_bail; use vortex_error::vortex_panic; use crate::ArrayRef; use crate::IntoArray; -#[expect(deprecated)] -use crate::ToCanonical as _; use crate::arrays::DictArray; use crate::arrays::Primitive; +use crate::arrays::PrimitiveArray; use crate::arrays::VarBin; use crate::arrays::VarBinView; use crate::arrays::primitive::PrimitiveArrayExt; @@ -35,7 +35,7 @@ pub const UNCONSTRAINED: DictConstraints = DictConstraints { pub trait DictEncoder: Send { /// Assign dictionary codes to the given input array. - fn encode(&mut self, array: &ArrayRef) -> ArrayRef; + fn encode(&mut self, array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult; /// Clear the encoder state to make it ready for a new round of decoding. fn reset(&mut self) -> ArrayRef; @@ -65,11 +65,10 @@ pub fn dict_encoder(array: &ArrayRef, constraints: &DictConstraints) -> Box VortexResult { let mut encoder = dict_encoder(array, constraints); - let encoded = encoder.encode(array); - #[expect(deprecated)] - let codes = encoded.to_primitive().narrow()?; + let codes = encoder.encode(array, ctx)?.narrow(ctx)?; // SAFETY: The encoding process will produce a value set of codes and values // All values in the dictionary are guaranteed to be referenced by at least one code // since we build the dictionary from the codes we observe during encoding @@ -81,8 +80,8 @@ pub fn dict_encode_with_constraints( } } -pub fn dict_encode(array: &ArrayRef) -> VortexResult { - let dict_array = dict_encode_with_constraints(array, &UNCONSTRAINED)?; +pub fn dict_encode(array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult { + let dict_array = dict_encode_with_constraints(array, &UNCONSTRAINED, ctx)?; if dict_array.len() != array.len() { vortex_bail!( "must have encoded all {} elements, but only encoded {}", diff --git a/vortex-array/src/builders/dict/primitive.rs b/vortex-array/src/builders/dict/primitive.rs index 03d930dbef1..ac1af105ec6 100644 --- a/vortex-array/src/builders/dict/primitive.rs +++ b/vortex-array/src/builders/dict/primitive.rs @@ -1,23 +1,25 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors +use std::cell::OnceCell; use std::hash::Hash; use std::mem; use rustc_hash::FxBuildHasher; use vortex_buffer::BitBufferMut; use vortex_buffer::BufferMut; +use vortex_error::VortexExpect; +use vortex_error::VortexResult; use vortex_error::vortex_panic; +use vortex_mask::Mask; use vortex_utils::aliases::hash_map::Entry; use vortex_utils::aliases::hash_map::HashMap; use super::DictConstraints; use super::DictEncoder; use crate::ArrayRef; +use crate::ExecutionCtx; use crate::IntoArray; -#[expect(deprecated)] -use crate::ToCanonical as _; -use crate::accessor::ArrayAccessor; use crate::arrays::PrimitiveArray; use crate::arrays::primitive::NativeValue; use crate::dtype::NativePType; @@ -72,6 +74,7 @@ where .min(constraints.max_bytes / T::PTYPE.byte_width()); Self { lookup: HashMap::with_hasher(FxBuildHasher), + null_code: OnceCell::new(), values: BufferMut::::empty(), values_nulls: BitBufferMut::empty(), nullability, @@ -79,8 +82,8 @@ where } } - fn encode_value(&mut self, v: Option) -> Option { - match self.lookup.entry(v.map(NativeValue)) { + fn encode_value(&mut self, v: T) -> Option { + match self.lookup.entry(NativeValue(v)) { Entry::Occupied(o) => Some(*o.get()), Entry::Vacant(vac) => { if self.values.len() >= self.max_dict_len { @@ -89,18 +92,9 @@ where let next_code = Code::from_usize(self.values.len()).unwrap_or_else(|| { vortex_panic!("{} has to fit into {}", self.values.len(), Code::PTYPE) }); - vac.insert(next_code); - match v { - None => { - self.values.push(T::default()); - self.values_nulls.append_false(); - } - Some(v) => { - self.values.push(v); - self.values_nulls.append_true(); - } - } - Some(next_code) + self.values.push(v); + self.values_nulls.append_true(); + Some(*vac.insert(next_code)) } } } @@ -110,7 +104,8 @@ where /// /// Null values are stored in the values of the dictionary such that codes are always non-null. pub struct PrimitiveDictBuilder { - lookup: HashMap>, Code, FxBuildHasher>, + lookup: HashMap, Code, FxBuildHasher>, + null_code: OnceCell, values: BufferMut, values_nulls: BitBufferMut, nullability: Nullability, @@ -123,21 +118,53 @@ where NativeValue: Hash + Eq, Code: UnsignedPType, { - fn encode(&mut self, array: &ArrayRef) -> ArrayRef { + fn encode(&mut self, array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult { let mut codes = BufferMut::::with_capacity(array.len()); - #[expect(deprecated)] - let prim = array.to_primitive(); - prim.with_iterator(|it| { - for value in it { - let Some(code) = self.encode_value(value.copied()) else { - break; - }; - unsafe { codes.push_unchecked(code) } + let prim = array.clone().execute::(ctx)?; + match prim.validity()?.execute_mask(array.len(), ctx)? { + Mask::AllTrue(_) => { + for &value in prim.as_slice::() { + let Some(code) = self.encode_value(value) else { + break; + }; + unsafe { codes.push_unchecked(code) } + } + } + Mask::AllFalse(_) => { + self.values.push(T::default()); + self.values_nulls.append_false(); + unsafe { + codes.push_n_unchecked( + Code::from_usize(0).vortex_expect("must fit 0"), + array.len(), + ) + } } - }); + Mask::Values(v) => { + let bit_buff = v.bit_buffer(); + for (&value, valid) in prim.as_slice::().iter().zip(bit_buff) { + if !valid { + let code = self.null_code.get_or_init(|| { + let code = self.values.len(); + self.values.push(T::default()); + self.values_nulls.append_false(); + Code::from_usize(code).unwrap_or_else(|| { + vortex_panic!("{} has to fit into {}", code, Code::PTYPE) + }) + }); + unsafe { codes.push_unchecked(*code) } + } else { + let Some(code) = self.encode_value(value) else { + break; + }; + unsafe { codes.push_unchecked(code) } + } + } + } + } - PrimitiveArray::new(codes, Validity::NonNullable).into_array() + Ok(PrimitiveArray::new(codes, Validity::NonNullable)) } fn reset(&mut self) -> ArrayRef { @@ -155,20 +182,26 @@ where #[cfg(test)] mod test { - #[expect(unused_imports)] - use itertools::Itertools; + use std::sync::LazyLock; + use vortex_buffer::buffer; + use vortex_session::VortexSession; use crate::IntoArray as _; + use crate::VortexSessionExecute; use crate::arrays::dict::DictArraySlotsExt; use crate::assert_arrays_eq; use crate::builders::dict::dict_encode; use crate::builders::dict::primitive::PrimitiveArray; + use crate::session::ArraySession; + + static SESSION: LazyLock = + LazyLock::new(|| VortexSession::empty().with::()); #[test] fn encode_primitive() { let arr = buffer![1, 1, 3, 3, 3].into_array(); - let dict = dict_encode(&arr).unwrap(); + let dict = dict_encode(&arr, &mut SESSION.create_execution_ctx()).unwrap(); let expected_codes = buffer![0u8, 0, 1, 1, 1].into_array(); assert_arrays_eq!(dict.codes(), expected_codes); @@ -188,8 +221,9 @@ mod test { None, Some(3), None, - ]); - let dict = dict_encode(&arr.into_array()).unwrap(); + ]) + .into_array(); + let dict = dict_encode(&arr, &mut SESSION.create_execution_ctx()).unwrap(); let expected_codes = buffer![0u8, 0, 1, 2, 2, 1, 2, 1].into_array(); assert_arrays_eq!(dict.codes(), expected_codes); diff --git a/vortex-array/src/scalar/arrow.rs b/vortex-array/src/scalar/arrow.rs index 811417144f3..f8277ecf9a6 100644 --- a/vortex-array/src/scalar/arrow.rs +++ b/vortex-array/src/scalar/arrow.rs @@ -556,10 +556,10 @@ mod tests { #[rstest] #[case(TimeUnit::Nanoseconds, "UTC", 1234567890000000000i64)] - #[case(TimeUnit::Microseconds, "EST", 1234567890000000i64)] + #[case(TimeUnit::Microseconds, "America/New_York", 1234567890000000i64)] #[case(TimeUnit::Microseconds, "Asia/Qatar", 1234567890000000i64)] #[case(TimeUnit::Microseconds, "Australia/Sydney", 1234567890000000i64)] - #[case(TimeUnit::Milliseconds, "HST", 1234567890000i64)] + #[case(TimeUnit::Milliseconds, "Pacific/Honolulu", 1234567890000i64)] #[case(TimeUnit::Seconds, "GMT", 1234567890i64)] fn test_temporal_timestamp_tz_to_arrow( #[case] time_unit: TimeUnit, diff --git a/vortex-btrblocks/src/schemes/float.rs b/vortex-btrblocks/src/schemes/float.rs index 8c55b38d345..687d79444ac 100644 --- a/vortex-btrblocks/src/schemes/float.rs +++ b/vortex-btrblocks/src/schemes/float.rs @@ -267,7 +267,7 @@ impl Scheme for NullDominatedSparseScheme { .indices() .clone() .execute::(exec_ctx)? - .narrow()?; + .narrow(exec_ctx)?; let compressed_indices = compressor.compress_child( &indices.into_array(), &compress_ctx, diff --git a/vortex-btrblocks/src/schemes/integer.rs b/vortex-btrblocks/src/schemes/integer.rs index dfd61deb80b..25dfb9fe444 100644 --- a/vortex-btrblocks/src/schemes/integer.rs +++ b/vortex-btrblocks/src/schemes/integer.rs @@ -553,7 +553,7 @@ impl Scheme for SparseScheme { .indices() .clone() .execute::(exec_ctx)? - .narrow()?; + .narrow(exec_ctx)?; let compressed_indices = compressor.compress_child( &indices.into_array(), @@ -723,11 +723,16 @@ impl Scheme for SequenceScheme { return CompressionEstimate::Verdict(EstimateVerdict::Skip); } - // If the distinct_values_count was computed, and not all values are unique, then this - // cannot be encoded as a sequence array. - if stats - .distinct_count() - .is_some_and(|count| count as usize != data.array_len()) + // If the distinct_values_count was computed, and the array has clearly fewer distinct + // values than its length, then this cannot be encoded as a sequence array. The distinct + // count is now sourced from a cardinality estimator, so we allow a small tolerance to + // account for its approximation error (~1-2% for typical inputs). Arrays that are truly + // sequences will fall through to the deferred callback which validates them exactly. + let distinct_count = stats.distinct_count(); + let array_len = data.array_len(); + let tolerance = array_len.div_ceil(16); + if distinct_count + .is_some_and(|count| (count as usize).saturating_add(tolerance) < array_len) { return CompressionEstimate::Verdict(EstimateVerdict::Skip); } @@ -848,7 +853,7 @@ pub(crate) fn rle_compress( .indices() .clone() .execute::(exec_ctx)? - .narrow()?; + .narrow(exec_ctx)?; try_compress_delta( compressor, &rle_indices_primitive.into_array(), @@ -865,7 +870,7 @@ pub(crate) fn rle_compress( .indices() .clone() .execute::(exec_ctx)? - .narrow()?; + .narrow(exec_ctx)?; compressor.compress_child( &rle_indices_primitive.into_array(), &compress_ctx, @@ -879,7 +884,7 @@ pub(crate) fn rle_compress( .values_idx_offsets() .clone() .execute::(exec_ctx)? - .narrow()?; + .narrow(exec_ctx)?; let compressed_offsets = compressor.compress_child( &rle_offsets_primitive.into_array(), &compress_ctx, diff --git a/vortex-btrblocks/src/schemes/patches.rs b/vortex-btrblocks/src/schemes/patches.rs index 0d9854c3b23..69ca8450f12 100644 --- a/vortex-btrblocks/src/schemes/patches.rs +++ b/vortex-btrblocks/src/schemes/patches.rs @@ -18,7 +18,7 @@ pub fn compress_patches(patches: Patches, ctx: &mut ExecutionCtx) -> VortexResul .indices() .clone() .execute::(ctx)? - .narrow()? + .narrow(ctx)? .into_array(); // Check if the values are constant. @@ -39,7 +39,7 @@ pub fn compress_patches(patches: Patches, ctx: &mut ExecutionCtx) -> VortexResul let offsets_primitive = offsets .clone() .execute::(ctx)? - .narrow()? + .narrow(ctx)? .into_array(); Ok::(offsets_primitive) }) diff --git a/vortex-btrblocks/src/schemes/string.rs b/vortex-btrblocks/src/schemes/string.rs index 0df5a268157..ef7886e8584 100644 --- a/vortex-btrblocks/src/schemes/string.rs +++ b/vortex-btrblocks/src/schemes/string.rs @@ -95,7 +95,7 @@ impl Scheme for FSSTScheme { .uncompressed_lengths() .clone() .execute::(exec_ctx)? - .narrow()?; + .narrow(exec_ctx)?; let compressed_original_lengths = compressor.compress_child( &uncompressed_lengths_primitive.into_array(), &compress_ctx, @@ -109,7 +109,7 @@ impl Scheme for FSSTScheme { .offsets() .clone() .execute::(exec_ctx)? - .narrow()?; + .narrow(exec_ctx)?; let compressed_codes_offsets = compressor.compress_child( &codes_offsets_primitive.into_array(), &compress_ctx, @@ -206,7 +206,7 @@ impl Scheme for NullDominatedSparseScheme { .indices() .clone() .execute::(exec_ctx)? - .narrow()?; + .narrow(exec_ctx)?; let compressed_indices = compressor.compress_child( &indices.into_array(), &compress_ctx, diff --git a/vortex-btrblocks/src/schemes/temporal.rs b/vortex-btrblocks/src/schemes/temporal.rs index 73aa9eedfb4..c5c0abc46aa 100644 --- a/vortex-btrblocks/src/schemes/temporal.rs +++ b/vortex-btrblocks/src/schemes/temporal.rs @@ -98,7 +98,7 @@ impl Scheme for TemporalScheme { subseconds, } = split_temporal(temporal_array, exec_ctx)?; - let days_primitive = days.execute::(exec_ctx)?.narrow()?; + let days_primitive = days.execute::(exec_ctx)?.narrow(exec_ctx)?; let days = compressor.compress_child( &days_primitive.into_array(), &compress_ctx, @@ -106,7 +106,9 @@ impl Scheme for TemporalScheme { 0, exec_ctx, )?; - let seconds_primitive = seconds.execute::(exec_ctx)?.narrow()?; + let seconds_primitive = seconds + .execute::(exec_ctx)? + .narrow(exec_ctx)?; let seconds = compressor.compress_child( &seconds_primitive.into_array(), &compress_ctx, @@ -114,7 +116,9 @@ impl Scheme for TemporalScheme { 1, exec_ctx, )?; - let subseconds_primitive = subseconds.execute::(exec_ctx)?.narrow()?; + let subseconds_primitive = subseconds + .execute::(exec_ctx)? + .narrow(exec_ctx)?; let subseconds = compressor.compress_child( &subseconds_primitive.into_array(), &compress_ctx, diff --git a/vortex-buffer/public-api.lock b/vortex-buffer/public-api.lock index f6c52f9df89..d3443a5d035 100644 --- a/vortex-buffer/public-api.lock +++ b/vortex-buffer/public-api.lock @@ -122,7 +122,7 @@ impl vortex_buffer::trusted_len::Trus impl vortex_buffer::trusted_len::TrustedLen for core::iter::adapters::skip::Skip where I: vortex_buffer::trusted_len::TrustedLen -impl vortex_buffer::trusted_len::TrustedLen for core::iter::adapters::zip::Zip where T: vortex_buffer::trusted_len::TrustedLen, U: vortex_buffer::trusted_len::TrustedLen +impl vortex_buffer::trusted_len::TrustedLen for core::iter::adapters::zip::Zip where T: vortex_buffer::trusted_len::TrustedLen, U: core::iter::traits::iterator::Iterator impl vortex_buffer::trusted_len::TrustedLen for core::array::iter::IntoIter diff --git a/vortex-buffer/src/trusted_len.rs b/vortex-buffer/src/trusted_len.rs index 13cf25d0546..7ef6f682be4 100644 --- a/vortex-buffer/src/trusted_len.rs +++ b/vortex-buffer/src/trusted_len.rs @@ -1,8 +1,6 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors -use itertools::ProcessResults; - /// Trait for all types which have a known upper-bound. /// /// Functions that receive a `TrustedLen` iterator can assume that it's `size_hint` is exact, @@ -146,7 +144,7 @@ unsafe impl TrustedLen for crate::Iter<'_, T> {} unsafe impl TrustedLen for crate::BufferIterator {} // ProcessResults -unsafe impl<'a, I, T: 'a, E: 'a> TrustedLen for ProcessResults<'a, I, E> where +unsafe impl<'a, I, T: 'a, E: 'a> TrustedLen for itertools::ProcessResults<'a, I, E> where I: TrustedLen> { } @@ -158,7 +156,7 @@ unsafe impl TrustedLen for std::iter::Enumerate where I: TrustedLen TrustedLen for std::iter::Zip where T: TrustedLen, - U: TrustedLen, + U: Iterator, { } diff --git a/vortex-compressor/Cargo.toml b/vortex-compressor/Cargo.toml index 2d28d86a5e6..49870bea026 100644 --- a/vortex-compressor/Cargo.toml +++ b/vortex-compressor/Cargo.toml @@ -14,6 +14,7 @@ rust-version = { workspace = true } version = { workspace = true } [dependencies] +cardinality-estimator = { workspace = true } itertools = { workspace = true } num-traits = { workspace = true } parking_lot = { workspace = true } diff --git a/vortex-compressor/benches/dict_encode.rs b/vortex-compressor/benches/dict_encode.rs index 02eaa906cb1..64612e3b664 100644 --- a/vortex-compressor/benches/dict_encode.rs +++ b/vortex-compressor/benches/dict_encode.rs @@ -3,17 +3,21 @@ #![expect(clippy::unwrap_used)] +use std::sync::LazyLock; + use divan::Bencher; use vortex_array::IntoArray; -use vortex_array::LEGACY_SESSION; use vortex_array::VortexSessionExecute; use vortex_array::arrays::BoolArray; use vortex_array::arrays::PrimitiveArray; use vortex_array::builders::dict::dict_encode; +use vortex_array::session::ArraySession; use vortex_array::validity::Validity; use vortex_buffer::BufferMut; -use vortex_compressor::builtins::integer_dictionary_encode; -use vortex_compressor::stats::IntegerStats; +use vortex_session::VortexSession; + +static SESSION: LazyLock = + LazyLock::new(|| VortexSession::empty().with::()); fn make_array() -> PrimitiveArray { let values: BufferMut = (0..50).cycle().take(64_000).collect(); @@ -34,18 +38,8 @@ fn make_array() -> PrimitiveArray { fn encode_generic(bencher: Bencher) { let array = make_array().into_array(); bencher - .with_inputs(|| &array) - .bench_refs(|array| dict_encode(array).unwrap()); -} - -#[cfg(not(codspeed))] -#[divan::bench] -fn encode_specialized(bencher: Bencher) { - let array = make_array(); - let stats = IntegerStats::generate(&array, &mut LEGACY_SESSION.create_execution_ctx()); - bencher - .with_inputs(|| &stats) - .bench_refs(|stats| integer_dictionary_encode(array.as_view(), stats)); + .with_inputs(|| (&array, SESSION.create_execution_ctx())) + .bench_refs(|(array, ctx)| dict_encode(array, ctx).unwrap()); } fn main() { diff --git a/vortex-compressor/public-api.lock b/vortex-compressor/public-api.lock index cbdc65377f0..cd822932baf 100644 --- a/vortex-compressor/public-api.lock +++ b/vortex-compressor/public-api.lock @@ -268,10 +268,6 @@ pub fn vortex_compressor::builtins::StringDictScheme::scheme_name(&self) -> &'st pub fn vortex_compressor::builtins::StringDictScheme::stats_options(&self) -> vortex_compressor::stats::GenerateStatsOptions -pub fn vortex_compressor::builtins::float_dictionary_encode(vortex_array::array::view::ArrayView<'_, vortex_array::arrays::primitive::vtable::Primitive>, &vortex_compressor::stats::FloatStats) -> vortex_error::VortexResult - -pub fn vortex_compressor::builtins::integer_dictionary_encode(vortex_array::array::view::ArrayView<'_, vortex_array::arrays::primitive::vtable::Primitive>, &vortex_compressor::stats::IntegerStats) -> vortex_error::VortexResult - pub fn vortex_compressor::builtins::is_float_primitive(&vortex_array::canonical::Canonical) -> bool pub fn vortex_compressor::builtins::is_integer_primitive(&vortex_array::canonical::Canonical) -> bool @@ -754,10 +750,6 @@ pub fn vortex_compressor::stats::BoolStats::fmt(&self, &mut core::fmt::Formatter pub struct vortex_compressor::stats::FloatDistinctInfo -impl vortex_compressor::stats::FloatDistinctInfo - -pub fn vortex_compressor::stats::FloatDistinctInfo::distinct_values(&self) -> &vortex_utils::aliases::hash_set::HashSet, rustc_hash::FxBuildHasher> - impl core::clone::Clone for vortex_compressor::stats::FloatDistinctInfo pub fn vortex_compressor::stats::FloatDistinctInfo::clone(&self) -> vortex_compressor::stats::FloatDistinctInfo @@ -844,10 +836,6 @@ impl core::marker::Copy for vortex_compressor::stats::GenerateStatsOptions pub struct vortex_compressor::stats::IntegerDistinctInfo -impl vortex_compressor::stats::IntegerDistinctInfo - -pub fn vortex_compressor::stats::IntegerDistinctInfo::distinct_values(&self) -> &vortex_utils::aliases::hash_map::HashMap, u32, rustc_hash::FxBuildHasher> - impl core::clone::Clone for vortex_compressor::stats::IntegerDistinctInfo pub fn vortex_compressor::stats::IntegerDistinctInfo::clone(&self) -> vortex_compressor::stats::IntegerDistinctInfo diff --git a/vortex-compressor/src/builtins/dict/float.rs b/vortex-compressor/src/builtins/dict/float.rs index 51d553b591f..90a93cd5568 100644 --- a/vortex-compressor/src/builtins/dict/float.rs +++ b/vortex-compressor/src/builtins/dict/float.rs @@ -7,19 +7,15 @@ //! external compatibility. use vortex_array::ArrayRef; -use vortex_array::ArrayView; use vortex_array::Canonical; use vortex_array::ExecutionCtx; use vortex_array::IntoArray; use vortex_array::arrays::DictArray; -use vortex_array::arrays::Primitive; use vortex_array::arrays::PrimitiveArray; use vortex_array::arrays::dict::DictArrayExt; use vortex_array::arrays::dict::DictArraySlotsExt; use vortex_array::arrays::primitive::PrimitiveArrayExt; -use vortex_array::dtype::half::f16; -use vortex_array::validity::Validity; -use vortex_buffer::Buffer; +use vortex_array::builders::dict::dict_encode; use vortex_error::VortexExpect; use vortex_error::VortexResult; @@ -36,8 +32,6 @@ use crate::scheme::DescendantExclusion; use crate::scheme::Scheme; use crate::scheme::SchemeExt; use crate::stats::ArrayAndStats; -use crate::stats::FloatErasedStats; -use crate::stats::FloatStats; use crate::stats::GenerateStatsOptions; impl Scheme for FloatDictScheme { @@ -113,8 +107,7 @@ impl Scheme for FloatDictScheme { compress_ctx: CompressorContext, exec_ctx: &mut ExecutionCtx, ) -> VortexResult { - let stats = data.float_stats(exec_ctx); - let dict = dictionary_encode(data.array_as_primitive(), &stats)?; + let dict = dict_encode(data.array(), exec_ctx)?; let has_all_values_referenced = dict.has_all_values_referenced(); @@ -127,7 +120,7 @@ impl Scheme for FloatDictScheme { .codes() .clone() .execute::(exec_ctx)? - .narrow()? + .narrow(exec_ctx)? .into_array(); let compressed_codes = compressor.compress_child(&narrowed_codes, &compress_ctx, self.id(), 1, exec_ctx)?; @@ -143,106 +136,6 @@ impl Scheme for FloatDictScheme { } } -/// Encodes a typed float array into a [`DictArray`] using the pre-computed distinct values. -macro_rules! typed_encode { - ($source_array:ident, $stats:ident, $typed:ident, $typ:ty) => {{ - let distinct = $typed.distinct().vortex_expect( - "this must be present since `DictScheme` declared that we need distinct values", - ); - - let values_validity = match $source_array.validity()? { - Validity::NonNullable => Validity::NonNullable, - _ => Validity::AllValid, - }; - let codes_validity = $source_array.validity()?; - - let values: Buffer<$typ> = distinct.distinct_values().iter().map(|x| x.0).collect(); - - let max_code = values.len(); - let codes = if max_code <= u8::MAX as usize { - let buf = >::encode( - &values, - $source_array.as_slice::<$typ>(), - ); - PrimitiveArray::new(buf, codes_validity).into_array() - } else if max_code <= u16::MAX as usize { - let buf = >::encode( - &values, - $source_array.as_slice::<$typ>(), - ); - PrimitiveArray::new(buf, codes_validity).into_array() - } else { - let buf = >::encode( - &values, - $source_array.as_slice::<$typ>(), - ); - PrimitiveArray::new(buf, codes_validity).into_array() - }; - - let values = PrimitiveArray::new(values, values_validity).into_array(); - // SAFETY: enforced by the DictEncoder. - Ok(unsafe { DictArray::new_unchecked(codes, values).set_all_values_referenced(true) }) - }}; -} - -/// Compresses a floating-point array into a dictionary array according to attached stats. -/// -/// # Errors -/// -/// Returns an error if unable to compute validity. -pub fn dictionary_encode( - array: ArrayView<'_, Primitive>, - stats: &FloatStats, -) -> VortexResult { - match stats.erased() { - FloatErasedStats::F16(typed) => typed_encode!(array, stats, typed, f16), - FloatErasedStats::F32(typed) => typed_encode!(array, stats, typed, f32), - FloatErasedStats::F64(typed) => typed_encode!(array, stats, typed, f64), - } -} - -/// Stateless encoder that maps values to dictionary codes via a `HashMap`. -struct DictEncoder; - -/// Trait for encoding values of type `T` into codes of type `I`. -trait Encode { - /// Using the distinct value set, turn the values into a set of codes. - fn encode(distinct: &[T], values: &[T]) -> Buffer; -} - -/// Implements [`Encode`] for a float type using its bit representation as the hash key. -macro_rules! impl_encode { - ($typ:ty, $utyp:ty) => { impl_encode!($typ, $utyp, u8, u16, u32); }; - ($typ:ty, $utyp:ty, $($ityp:ty),+) => { - $( - impl Encode<$typ, $ityp> for DictEncoder { - #[expect(clippy::cast_possible_truncation)] - fn encode(distinct: &[$typ], values: &[$typ]) -> Buffer<$ityp> { - let mut codes = - vortex_utils::aliases::hash_map::HashMap::<$utyp, $ityp>::with_capacity( - distinct.len(), - ); - for (code, &value) in distinct.iter().enumerate() { - codes.insert(value.to_bits(), code as $ityp); - } - - let mut output = vortex_buffer::BufferMut::with_capacity(values.len()); - for value in values { - // Any code lookups which fail are for nulls, so their value does not matter. - output.push(codes.get(&value.to_bits()).copied().unwrap_or_default()); - } - - output.freeze() - } - } - )* - }; -} - -impl_encode!(f16, u16); -impl_encode!(f32, u32); -impl_encode!(f64, u64); - #[cfg(test)] mod tests { use vortex_array::IntoArray; @@ -251,16 +144,13 @@ mod tests { use vortex_array::arrays::PrimitiveArray; use vortex_array::arrays::dict::DictArraySlotsExt; use vortex_array::assert_arrays_eq; + use vortex_array::builders::dict::dict_encode; use vortex_array::session::ArraySession; use vortex_array::validity::Validity; use vortex_buffer::buffer; use vortex_error::VortexResult; use vortex_session::VortexSession; - use super::dictionary_encode; - use crate::stats::FloatStats; - use crate::stats::GenerateStatsOptions; - #[test] fn test_float_dict_encode() -> VortexResult<()> { let mut ctx = VortexSession::empty() @@ -269,21 +159,14 @@ mod tests { let values = buffer![1f32, 2f32, 2f32, 0f32, 1f32]; let validity = Validity::Array(BoolArray::from_iter([true, true, true, false, true]).into_array()); - let array = PrimitiveArray::new(values, validity); + let array = PrimitiveArray::new(values, validity).into_array(); - let stats = FloatStats::generate_opts( - &array, - GenerateStatsOptions { - count_distinct_values: true, - }, - &mut ctx, - ); - let dict_array = dictionary_encode(array.as_view(), &stats)?; - assert_eq!(dict_array.values().len(), 2); + let dict_array = dict_encode(&array, &mut ctx)?; + assert_eq!(dict_array.values().len(), 3); assert_eq!(dict_array.codes().len(), 5); let expected = PrimitiveArray::new( - buffer![1f32, 2f32, 2f32, 1f32, 1f32], + buffer![1f32, 2f32, 2f32, 0f32, 1f32], Validity::Array(BoolArray::from_iter([true, true, true, false, true]).into_array()), ) .into_array(); diff --git a/vortex-compressor/src/builtins/dict/integer.rs b/vortex-compressor/src/builtins/dict/integer.rs index 140afdcebf1..a5cab56e97d 100644 --- a/vortex-compressor/src/builtins/dict/integer.rs +++ b/vortex-compressor/src/builtins/dict/integer.rs @@ -7,18 +7,15 @@ //! for external compatibility. use vortex_array::ArrayRef; -use vortex_array::ArrayView; use vortex_array::Canonical; use vortex_array::ExecutionCtx; use vortex_array::IntoArray; use vortex_array::arrays::DictArray; -use vortex_array::arrays::Primitive; use vortex_array::arrays::PrimitiveArray; use vortex_array::arrays::dict::DictArrayExt; use vortex_array::arrays::dict::DictArraySlotsExt; use vortex_array::arrays::primitive::PrimitiveArrayExt; -use vortex_array::validity::Validity; -use vortex_buffer::Buffer; +use vortex_array::builders::dict::dict_encode; use vortex_error::VortexExpect; use vortex_error::VortexResult; @@ -32,8 +29,6 @@ use crate::scheme::Scheme; use crate::scheme::SchemeExt; use crate::stats::ArrayAndStats; use crate::stats::GenerateStatsOptions; -use crate::stats::IntegerErasedStats; -use crate::stats::IntegerStats; impl Scheme for IntDictScheme { fn scheme_name(&self) -> &'static str { @@ -107,8 +102,7 @@ impl Scheme for IntDictScheme { compress_ctx: CompressorContext, exec_ctx: &mut ExecutionCtx, ) -> VortexResult { - let stats = data.integer_stats(exec_ctx); - let dict = dictionary_encode(data.array_as_primitive(), &stats)?; + let dict = dict_encode(data.array(), exec_ctx)?; // Values = child 0. let compressed_values = @@ -119,7 +113,7 @@ impl Scheme for IntDictScheme { .codes() .clone() .execute::(exec_ctx)? - .narrow()? + .narrow(exec_ctx)? .into_array(); let compressed_codes = compressor.compress_child(&narrowed_codes, &compress_ctx, self.id(), 1, exec_ctx)?; @@ -135,121 +129,6 @@ impl Scheme for IntDictScheme { } } -/// Encodes a typed integer array into a [`DictArray`] using the pre-computed distinct values. -macro_rules! typed_encode { - ($source_array:ident, $stats:ident, $typed:ident, $typ:ty) => {{ - let distinct = $typed.distinct().vortex_expect( - "this must be present since `DictScheme` declared that we need distinct values", - ); - - let values_validity = match $source_array.validity()? { - Validity::NonNullable => Validity::NonNullable, - _ => Validity::AllValid, - }; - let codes_validity = $source_array.validity()?; - - let values: Buffer<$typ> = distinct.distinct_values().keys().map(|x| x.0).collect(); - - let max_code = values.len(); - let codes = if max_code <= u8::MAX as usize { - let buf = >::encode( - &values, - $source_array.as_slice::<$typ>(), - ); - PrimitiveArray::new(buf, codes_validity).into_array() - } else if max_code <= u16::MAX as usize { - let buf = >::encode( - &values, - $source_array.as_slice::<$typ>(), - ); - PrimitiveArray::new(buf, codes_validity).into_array() - } else { - let buf = >::encode( - &values, - $source_array.as_slice::<$typ>(), - ); - PrimitiveArray::new(buf, codes_validity).into_array() - }; - - let values = PrimitiveArray::new(values, values_validity).into_array(); - // SAFETY: invariants enforced in DictEncoder. - Ok(unsafe { DictArray::new_unchecked(codes, values).set_all_values_referenced(true) }) - }}; -} - -/// Compresses an integer array into a dictionary array according to attached stats. -/// -/// # Errors -/// -/// Returns an error if unable to compute validity. -#[expect( - clippy::cognitive_complexity, - reason = "complexity from match on all integer types" -)] -pub fn dictionary_encode( - array: ArrayView<'_, Primitive>, - stats: &IntegerStats, -) -> VortexResult { - match stats.erased() { - IntegerErasedStats::U8(typed) => typed_encode!(array, stats, typed, u8), - IntegerErasedStats::U16(typed) => typed_encode!(array, stats, typed, u16), - IntegerErasedStats::U32(typed) => typed_encode!(array, stats, typed, u32), - IntegerErasedStats::U64(typed) => typed_encode!(array, stats, typed, u64), - IntegerErasedStats::I8(typed) => typed_encode!(array, stats, typed, i8), - IntegerErasedStats::I16(typed) => typed_encode!(array, stats, typed, i16), - IntegerErasedStats::I32(typed) => typed_encode!(array, stats, typed, i32), - IntegerErasedStats::I64(typed) => typed_encode!(array, stats, typed, i64), - } -} - -/// Stateless encoder that maps values to dictionary codes via a `HashMap`. -struct DictEncoder; - -/// Trait for encoding values of type `T` into codes of type `I`. -trait Encode { - /// Using the distinct value set, turn the values into a set of codes. - fn encode(distinct: &[T], values: &[T]) -> Buffer; -} - -/// Implements [`Encode`] for an integer type with all code width variants (u8, u16, u32). -macro_rules! impl_encode { - ($typ:ty) => { impl_encode!($typ, u8, u16, u32); }; - ($typ:ty, $($ityp:ty),+) => { - $( - impl Encode<$typ, $ityp> for DictEncoder { - #[expect(clippy::cast_possible_truncation)] - fn encode(distinct: &[$typ], values: &[$typ]) -> Buffer<$ityp> { - let mut codes = - vortex_utils::aliases::hash_map::HashMap::<$typ, $ityp>::with_capacity( - distinct.len(), - ); - for (code, &value) in distinct.iter().enumerate() { - codes.insert(value, code as $ityp); - } - - let mut output = vortex_buffer::BufferMut::with_capacity(values.len()); - for value in values { - // Any code lookups which fail are for nulls, so their value does not matter. - // SAFETY: we have exactly sized output to be as large as values. - unsafe { output.push_unchecked(codes.get(value).copied().unwrap_or_default()) }; - } - - output.freeze() - } - } - )* - }; -} - -impl_encode!(u8); -impl_encode!(u16); -impl_encode!(u32); -impl_encode!(u64); -impl_encode!(i8); -impl_encode!(i16); -impl_encode!(i32); -impl_encode!(i64); - #[cfg(test)] mod tests { use vortex_array::IntoArray; @@ -258,15 +137,13 @@ mod tests { use vortex_array::arrays::PrimitiveArray; use vortex_array::arrays::dict::DictArraySlotsExt; use vortex_array::assert_arrays_eq; + use vortex_array::builders::dict::dict_encode; use vortex_array::session::ArraySession; use vortex_array::validity::Validity; use vortex_buffer::buffer; use vortex_error::VortexResult; use vortex_session::VortexSession; - use super::dictionary_encode; - use crate::stats::IntegerStats; - #[test] fn test_dict_encode_integer_stats() -> VortexResult<()> { let mut ctx = VortexSession::empty() @@ -275,21 +152,14 @@ mod tests { let data = buffer![100i32, 200, 100, 0, 100]; let validity = Validity::Array(BoolArray::from_iter([true, true, true, false, true]).into_array()); - let array = PrimitiveArray::new(data, validity); + let array = PrimitiveArray::new(data, validity).into_array(); - let stats = IntegerStats::generate_opts( - &array, - crate::stats::GenerateStatsOptions { - count_distinct_values: true, - }, - &mut ctx, - ); - let dict_array = dictionary_encode(array.as_view(), &stats)?; - assert_eq!(dict_array.values().len(), 2); + let dict_array = dict_encode(&array, &mut ctx)?; + assert_eq!(dict_array.values().len(), 3); assert_eq!(dict_array.codes().len(), 5); let expected = PrimitiveArray::new( - buffer![100i32, 200, 100, 100, 100], + buffer![100i32, 200, 100, 0, 100], Validity::Array(BoolArray::from_iter([true, true, true, false, true]).into_array()), ) .into_array(); diff --git a/vortex-compressor/src/builtins/dict/mod.rs b/vortex-compressor/src/builtins/dict/mod.rs index c8e573b4fbc..77d6a020196 100644 --- a/vortex-compressor/src/builtins/dict/mod.rs +++ b/vortex-compressor/src/builtins/dict/mod.rs @@ -18,6 +18,3 @@ pub struct StringDictScheme; mod float; mod integer; mod string; - -pub use float::dictionary_encode as float_dictionary_encode; -pub use integer::dictionary_encode as integer_dictionary_encode; diff --git a/vortex-compressor/src/builtins/dict/string.rs b/vortex-compressor/src/builtins/dict/string.rs index ac6affdf854..94be23fbd64 100644 --- a/vortex-compressor/src/builtins/dict/string.rs +++ b/vortex-compressor/src/builtins/dict/string.rs @@ -98,7 +98,7 @@ impl Scheme for StringDictScheme { compress_ctx: CompressorContext, exec_ctx: &mut ExecutionCtx, ) -> VortexResult { - let dict = dict_encode(data.array())?; + let dict = dict_encode(data.array(), exec_ctx)?; // Values = child 0. let compressed_values = @@ -109,7 +109,7 @@ impl Scheme for StringDictScheme { .codes() .clone() .execute::(exec_ctx)? - .narrow()? + .narrow(exec_ctx)? .into_array(); let compressed_codes = compressor.compress_child(&narrowed_codes, &compress_ctx, self.id(), 1, exec_ctx)?; diff --git a/vortex-compressor/src/builtins/mod.rs b/vortex-compressor/src/builtins/mod.rs index c5bd9f343f5..3dff9fe0f87 100644 --- a/vortex-compressor/src/builtins/mod.rs +++ b/vortex-compressor/src/builtins/mod.rs @@ -37,8 +37,6 @@ mod dict; pub use dict::FloatDictScheme; pub use dict::IntDictScheme; pub use dict::StringDictScheme; -pub use dict::float_dictionary_encode; -pub use dict::integer_dictionary_encode; mod constant; diff --git a/vortex-compressor/src/compressor.rs b/vortex-compressor/src/compressor.rs index d3ad0d9d5c3..4f10b9d540d 100644 --- a/vortex-compressor/src/compressor.rs +++ b/vortex-compressor/src/compressor.rs @@ -500,7 +500,7 @@ impl CascadingCompressor { .offsets() .clone() .execute::(exec_ctx)? - .narrow()?; + .narrow(exec_ctx)?; let compressed_offsets = self.compress_canonical( Canonical::Primitive(list_offsets_primitive), offset_ctx, @@ -530,7 +530,7 @@ impl CascadingCompressor { .offsets() .clone() .execute::(exec_ctx)? - .narrow()?; + .narrow(exec_ctx)?; let compressed_offsets = self.compress_canonical( Canonical::Primitive(list_view_offsets_primitive), offset_ctx, @@ -542,7 +542,7 @@ impl CascadingCompressor { .sizes() .clone() .execute::(exec_ctx)? - .narrow()?; + .narrow(exec_ctx)?; let compressed_sizes = self.compress_canonical( Canonical::Primitive(list_view_sizes_primitive), sizes_ctx, diff --git a/vortex-compressor/src/stats/bool.rs b/vortex-compressor/src/stats/bool.rs index 8825ec8a7f6..8ea9f318536 100644 --- a/vortex-compressor/src/stats/bool.rs +++ b/vortex-compressor/src/stats/bool.rs @@ -92,18 +92,24 @@ impl BoolStats { #[cfg(test)] mod tests { - use vortex_array::LEGACY_SESSION; + use std::sync::LazyLock; + use vortex_array::VortexSessionExecute; use vortex_array::arrays::BoolArray; + use vortex_array::session::ArraySession; use vortex_array::validity::Validity; use vortex_buffer::BitBuffer; use vortex_error::VortexResult; + use vortex_session::VortexSession; use super::BoolStats; + static SESSION: LazyLock = + LazyLock::new(|| VortexSession::empty().with::()); + #[test] fn test_all_true() -> VortexResult<()> { - let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let mut ctx = SESSION.create_execution_ctx(); let array = BoolArray::new( BitBuffer::from(vec![true, true, true]), Validity::NonNullable, @@ -118,7 +124,7 @@ mod tests { #[test] fn test_all_false() -> VortexResult<()> { - let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let mut ctx = SESSION.create_execution_ctx(); let array = BoolArray::new( BitBuffer::from(vec![false, false, false]), Validity::NonNullable, @@ -133,7 +139,7 @@ mod tests { #[test] fn test_mixed() -> VortexResult<()> { - let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let mut ctx = SESSION.create_execution_ctx(); let array = BoolArray::new( BitBuffer::from(vec![true, false, true]), Validity::NonNullable, @@ -148,7 +154,7 @@ mod tests { #[test] fn test_with_nulls() -> VortexResult<()> { - let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let mut ctx = SESSION.create_execution_ctx(); let array = BoolArray::new( BitBuffer::from(vec![true, false, true]), Validity::from_iter([true, false, true]), diff --git a/vortex-compressor/src/stats/float.rs b/vortex-compressor/src/stats/float.rs index d968e8d368f..c103d252580 100644 --- a/vortex-compressor/src/stats/float.rs +++ b/vortex-compressor/src/stats/float.rs @@ -4,10 +4,11 @@ //! Float compression statistics. use std::hash::Hash; +use std::marker::PhantomData; +use cardinality_estimator::CardinalityEstimator; use itertools::Itertools; use num_traits::Float; -use rustc_hash::FxBuildHasher; use vortex_array::ExecutionCtx; use vortex_array::arrays::PrimitiveArray; use vortex_array::arrays::primitive::NativeValue; @@ -19,24 +20,19 @@ use vortex_error::VortexResult; use vortex_error::vortex_err; use vortex_error::vortex_panic; use vortex_mask::AllOr; -use vortex_utils::aliases::hash_set::HashSet; use super::GenerateStatsOptions; /// Information about the distinct values in a float array. +/// +/// The distinct count is an estimate produced by Cloudflare's cardinality estimator, which is +/// exact for small cardinalities and approximate beyond that. #[derive(Debug, Clone)] pub struct DistinctInfo { - /// The set of distinct float values. - distinct_values: HashSet, FxBuildHasher>, - /// The count of unique values. This _must_ be non-zero. + /// The estimated count of unique values. This _must_ be non-zero. distinct_count: u32, -} - -impl DistinctInfo { - /// Returns a reference to the distinct values set. - pub fn distinct_values(&self) -> &HashSet, FxBuildHasher> { - &self.distinct_values - } + /// Phantom marker for the float element type. + _marker: PhantomData, } /// Typed statistics for a specific float type. @@ -188,8 +184,8 @@ where average_run_length: 0, erased: TypedStats { distinct: Some(DistinctInfo { - distinct_values: HashSet::with_capacity_and_hasher(0, FxBuildHasher), distinct_count: 0, + _marker: PhantomData, }), } .into(), @@ -202,13 +198,9 @@ where .ok_or_else(|| vortex_err!("Failed to compute null_count"))?; let value_count = array.len() - null_count; - // Keep a HashMap of T, then convert the keys into PValue afterward since value is - // so much more efficient to hash and search for. - let mut distinct_values = if count_distinct_values { - HashSet::with_capacity_and_hasher(array.len() / 2, FxBuildHasher) - } else { - HashSet::with_hasher(FxBuildHasher) - }; + // Cloudflare's cardinality estimator gives us a bounded-memory approximation of the + // number of distinct values, replacing the previous exact `HashSet`. + let mut estimator: CardinalityEstimator> = CardinalityEstimator::new(); let validity = array .as_ref() @@ -227,7 +219,7 @@ where AllOr::All => { for value in first_valid_buff { if count_distinct_values { - distinct_values.insert(NativeValue(value)); + estimator.insert(&NativeValue(value)); } if value != prev { @@ -244,7 +236,7 @@ where { if valid { if count_distinct_values { - distinct_values.insert(NativeValue(value)); + estimator.insert(&NativeValue(value)); } if value != prev { @@ -260,9 +252,10 @@ where let value_count = u32::try_from(value_count)?; let distinct = count_distinct_values.then(|| DistinctInfo { - distinct_count: u32::try_from(distinct_values.len()) - .vortex_expect("more than u32::MAX distinct values"), - distinct_values, + distinct_count: u32::try_from(estimator.estimate()) + .vortex_expect("more than u32::MAX distinct values") + .max(1), + _marker: PhantomData, }); Ok(FloatStats { @@ -275,19 +268,25 @@ where #[cfg(test)] mod tests { + use std::sync::LazyLock; + use vortex_array::IntoArray; - use vortex_array::LEGACY_SESSION; use vortex_array::VortexSessionExecute; use vortex_array::arrays::PrimitiveArray; + use vortex_array::session::ArraySession; use vortex_array::validity::Validity; use vortex_buffer::buffer; use vortex_error::VortexResult; + use vortex_session::VortexSession; use super::FloatStats; + static SESSION: LazyLock = + LazyLock::new(|| VortexSession::empty().with::()); + #[test] fn test_float_stats() -> VortexResult<()> { - let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let mut ctx = SESSION.create_execution_ctx(); let floats = buffer![0.0f32, 1.0f32, 2.0f32].into_array(); let floats = floats.execute::(&mut ctx)?; @@ -308,7 +307,7 @@ mod tests { #[test] fn test_float_stats_leading_nulls() { - let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let mut ctx = SESSION.create_execution_ctx(); let floats = PrimitiveArray::new( buffer![0.0f32, 1.0f32, 2.0f32], Validity::from_iter([false, true, true]), diff --git a/vortex-compressor/src/stats/integer.rs b/vortex-compressor/src/stats/integer.rs index 64345ac5f06..ee8020e3b53 100644 --- a/vortex-compressor/src/stats/integer.rs +++ b/vortex-compressor/src/stats/integer.rs @@ -5,8 +5,8 @@ use std::hash::Hash; +use cardinality_estimator::CardinalityEstimator; use num_traits::PrimInt; -use rustc_hash::FxBuildHasher; use vortex_array::ExecutionCtx; use vortex_array::arrays::PrimitiveArray; use vortex_array::arrays::primitive::NativeValue; @@ -20,30 +20,27 @@ use vortex_error::VortexError; use vortex_error::VortexExpect; use vortex_error::VortexResult; use vortex_mask::AllOr; -use vortex_utils::aliases::hash_map::HashMap; use super::GenerateStatsOptions; /// Information about the distinct values in an integer array. +/// +/// The `distinct_count` is an estimate computed using Cloudflare's cardinality estimator, which +/// yields exact counts for small cardinalities (<= 128 for the default parameters) and a +/// HyperLogLog++ approximation for larger cardinalities. The most frequent value is tracked using +/// the Boyer-Moore majority candidate algorithm, so `most_frequent_value` and `top_frequency` are +/// only guaranteed to reflect the true majority element when some value accounts for more than +/// half of the non-null entries; otherwise they are treated as a best-effort estimate. #[derive(Debug, Clone)] pub struct DistinctInfo { - /// The unique values and their occurrences. - distinct_values: HashMap, u32, FxBuildHasher>, - /// The count of unique values. This _must_ be non-zero. + /// The estimated count of unique values. This _must_ be non-zero. distinct_count: u32, - /// The most frequent value. + /// The most frequent value (Boyer-Moore majority candidate). most_frequent_value: T, - /// The number of times the most frequent value occurs. + /// The exact number of times `most_frequent_value` occurs in the array. top_frequency: u32, } -impl DistinctInfo { - /// Returns a reference to the distinct values map. - pub fn distinct_values(&self) -> &HashMap, u32, FxBuildHasher> { - &self.distinct_values - } -} - /// Typed statistics for a specific integer type. #[derive(Debug, Clone)] pub struct TypedStats { @@ -346,7 +343,6 @@ where min: T::max_value(), max: T::min_value(), distinct: Some(DistinctInfo { - distinct_values: HashMap::with_capacity_and_hasher(0, FxBuildHasher), distinct_count: 0, most_frequent_value: T::zero(), top_frequency: 0, @@ -370,12 +366,10 @@ where let buffer = array.to_buffer::(); let head = buffer[head_idx]; - let mut loop_state = LoopState { - distinct_values: if count_distinct_values { - HashMap::with_capacity_and_hasher(array.len() / 2, FxBuildHasher) - } else { - HashMap::with_hasher(FxBuildHasher) - }, + let mut loop_state = LoopState:: { + estimator: CardinalityEstimator::new(), + bm_candidate: head, + bm_votes: 0, prev: head, runs: 1, }; @@ -450,18 +444,25 @@ where .vortex_expect("max should be computed"); let distinct = count_distinct_values.then(|| { - let (&top_value, &top_count) = loop_state - .distinct_values - .iter() - .max_by_key(|&(_, &count)| count) - .vortex_expect("we know this is non-empty"); + // The cardinality estimator is exact for small cardinalities and approximate beyond. + // We clamp to at least 1 because we are inside the non-empty/non-all-null branch. + let distinct_count = u32::try_from(loop_state.estimator.estimate()) + .vortex_expect("there are more than `u32::MAX` distinct values") + .max(1); + + // Count the Boyer-Moore majority candidate exactly via a second pass. If any value + // accounts for more than half of the non-null entries, this counts that value; otherwise + // the returned count is a best-effort estimate for whichever candidate survived. + let top_frequency = count_occurrences::( + buffer.as_slice(), + validity.bit_buffer(), + loop_state.bm_candidate, + ); DistinctInfo { - distinct_count: u32::try_from(loop_state.distinct_values.len()) - .vortex_expect("there are more than `u32::MAX` distinct values"), - most_frequent_value: top_value.0, - top_frequency: top_count, - distinct_values: loop_state.distinct_values, + distinct_count, + most_frequent_value: loop_state.bm_candidate, + top_frequency, } }); @@ -479,13 +480,54 @@ where } /// Internal loop state for integer stats computation. -struct LoopState { +struct LoopState +where + T: IntegerPType, + NativeValue: Eq + Hash, +{ /// The previous value seen. prev: T, /// The run count. runs: u32, - /// The distinct values map. - distinct_values: HashMap, u32, FxBuildHasher>, + /// Cloudflare's cardinality estimator, used to approximate the number of distinct values + /// without materializing an exact hash map. + estimator: CardinalityEstimator>, + /// Boyer-Moore majority candidate; holds the current candidate for the most frequent value. + bm_candidate: T, + /// Boyer-Moore vote counter for `bm_candidate`. + bm_votes: u32, +} + +/// Updates the Boyer-Moore majority-vote state for a single value. +#[inline(always)] +fn boyer_moore_observe(state: &mut LoopState, value: T) +where + T: IntegerPType, + NativeValue: Eq + Hash, +{ + if state.bm_votes == 0 { + state.bm_candidate = value; + state.bm_votes = 1; + } else if value == state.bm_candidate { + state.bm_votes += 1; + } else { + state.bm_votes -= 1; + } +} + +/// Counts exact occurrences of `needle` in `buffer`, restricted to valid positions according to +/// `validity`. +fn count_occurrences(buffer: &[T], validity: AllOr<&BitBuffer>, needle: T) -> u32 { + let count = match validity { + AllOr::All => buffer.iter().filter(|&&v| v == needle).count(), + AllOr::None => 0, + AllOr::Some(mask) => buffer + .iter() + .enumerate() + .filter(|&(idx, &v)| mask.value(idx) && v == needle) + .count(), + }; + u32::try_from(count).vortex_expect("occurrences cannot exceed `u32::MAX`") } /// Inner loop for non-null chunks of 64 values. @@ -499,7 +541,8 @@ fn inner_loop_nonnull( { for &value in values { if count_distinct_values { - *state.distinct_values.entry(NativeValue(value)).or_insert(0) += 1; + state.estimator.insert(&NativeValue(value)); + boyer_moore_observe(state, value); } if value != state.prev { @@ -522,7 +565,8 @@ fn inner_loop_nullable( for (idx, &value) in values.iter().enumerate() { if is_valid.value(idx) { if count_distinct_values { - *state.distinct_values.entry(NativeValue(value)).or_insert(0) += 1; + state.estimator.insert(&NativeValue(value)); + boyer_moore_observe(state, value); } if value != state.prev { @@ -546,7 +590,8 @@ fn inner_loop_naive( for (idx, &value) in values.iter().enumerate() { if is_valid.value(idx) { if count_distinct_values { - *state.distinct_values.entry(NativeValue(value)).or_insert(0) += 1; + state.estimator.insert(&NativeValue(value)); + boyer_moore_observe(state, value); } if value != state.prev { @@ -560,22 +605,27 @@ fn inner_loop_naive( #[cfg(test)] mod tests { use std::iter; + use std::sync::LazyLock; - use vortex_array::LEGACY_SESSION; use vortex_array::VortexSessionExecute; use vortex_array::arrays::PrimitiveArray; + use vortex_array::session::ArraySession; use vortex_array::validity::Validity; use vortex_buffer::BitBuffer; use vortex_buffer::Buffer; use vortex_buffer::buffer; use vortex_error::VortexResult; + use vortex_session::VortexSession; use super::IntegerStats; use super::typed_int_stats; + static SESSION: LazyLock = + LazyLock::new(|| VortexSession::empty().with::()); + #[test] fn test_naive_count_distinct_values() -> VortexResult<()> { - let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let mut ctx = SESSION.create_execution_ctx(); let array = PrimitiveArray::new(buffer![217u8, 0], Validity::NonNullable); let stats = typed_int_stats::(&array, true, &mut ctx)?; assert_eq!(stats.distinct_count().unwrap(), 2); @@ -584,7 +634,7 @@ mod tests { #[test] fn test_naive_count_distinct_values_nullable() -> VortexResult<()> { - let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let mut ctx = SESSION.create_execution_ctx(); let array = PrimitiveArray::new( buffer![217u8, 0], Validity::from(BitBuffer::from(vec![true, false])), @@ -596,7 +646,7 @@ mod tests { #[test] fn test_count_distinct_values() -> VortexResult<()> { - let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let mut ctx = SESSION.create_execution_ctx(); let array = PrimitiveArray::new((0..128u8).collect::>(), Validity::NonNullable); let stats = typed_int_stats::(&array, true, &mut ctx)?; assert_eq!(stats.distinct_count().unwrap(), 128); @@ -605,7 +655,7 @@ mod tests { #[test] fn test_count_distinct_values_nullable() -> VortexResult<()> { - let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let mut ctx = SESSION.create_execution_ctx(); let array = PrimitiveArray::new( (0..128u8).collect::>(), Validity::from(BitBuffer::from_iter( @@ -619,7 +669,7 @@ mod tests { #[test] fn test_integer_stats_leading_nulls() { - let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let mut ctx = SESSION.create_execution_ctx(); let ints = PrimitiveArray::new(buffer![0, 1, 2], Validity::from_iter([false, true, true])); let stats = IntegerStats::generate_opts( @@ -635,4 +685,40 @@ mod tests { assert_eq!(stats.average_run_length, 1); assert_eq!(stats.distinct_count().unwrap(), 2); } + + #[test] + fn test_most_frequent_value_dominates() -> VortexResult<()> { + let mut ctx = SESSION.create_execution_ctx(); + // A value that appears in 95% of the array must be recovered exactly by the + // Boyer-Moore tracking plus second-pass count. + let top = -1i32; + let mut data: Vec = vec![top; 950]; + data.extend(0..50i32); + let array = PrimitiveArray::new(Buffer::copy_from(&data), Validity::NonNullable); + let stats = typed_int_stats::(&array, true, &mut ctx)?; + let (top_value, top_count) = stats + .erased() + .most_frequent_value_and_count() + .expect("distinct info must be present"); + assert_eq!(top_value, top.into()); + assert_eq!(top_count, 950); + Ok(()) + } + + #[test] + fn test_cardinality_estimate_large_unique() -> VortexResult<()> { + let mut ctx = SESSION.create_execution_ctx(); + // For 1024 distinct values the estimator falls back to HyperLogLog++; verify the + // estimate is within the expected error bound (~1.6% for the default P/W). + let array = + PrimitiveArray::new((0..1024u32).collect::>(), Validity::NonNullable); + let stats = typed_int_stats::(&array, true, &mut ctx)?; + let estimated = stats.distinct_count().unwrap(); + let error_ratio = (estimated as f64 - 1024.0).abs() / 1024.0; + assert!( + error_ratio < 0.05, + "estimator error {error_ratio} exceeds 5% for 1024 distinct values" + ); + Ok(()) + } } diff --git a/vortex-compressor/src/stats/string.rs b/vortex-compressor/src/stats/string.rs index 8613aa5cc37..fdc4aa08cc5 100644 --- a/vortex-compressor/src/stats/string.rs +++ b/vortex-compressor/src/stats/string.rs @@ -3,12 +3,12 @@ //! String compression statistics. +use cardinality_estimator::CardinalityEstimator; use vortex_array::ExecutionCtx; use vortex_array::arrays::VarBinViewArray; use vortex_error::VortexExpect; use vortex_error::VortexResult; use vortex_error::vortex_err; -use vortex_utils::aliases::hash_set::HashSet; use super::GenerateStatsOptions; @@ -24,23 +24,27 @@ pub struct StringStats { null_count: u32, } -/// Estimate the number of distinct strings in the var bin view array. +/// Estimate the number of distinct strings in the var bin view array using Cloudflare's +/// cardinality estimator. +/// +/// The signal used for each string is the 64-bit combination of its length and first 4-byte +/// prefix: two strings that are equal must agree on both. This remains an approximation for +/// strings that share a 4-byte prefix and length, but is exact for distinct prefixes/lengths. +/// The cardinality estimator itself is exact for small cardinalities and falls back to +/// HyperLogLog++ for larger ones. fn estimate_distinct_count(strings: &VarBinViewArray) -> VortexResult { let views = strings.views(); - // Iterate the views. Two strings which are equal must have the same first 8-bytes. - // NOTE: there are cases where this performs pessimally, e.g. when we have strings that all - // share a 4-byte prefix and have the same length. - let mut distinct = HashSet::with_capacity(views.len() / 2); - views.iter().for_each(|&view| { + let mut estimator: CardinalityEstimator = CardinalityEstimator::new(); + for &view in views.iter() { #[expect( clippy::cast_possible_truncation, reason = "approximate uniqueness with view prefix" )] let len_and_prefix = view.as_u128() as u64; - distinct.insert(len_and_prefix); - }); + estimator.insert(&len_and_prefix); + } - Ok(u32::try_from(distinct.len())?) + Ok(u32::try_from(estimator.estimate())?) } impl StringStats { diff --git a/vortex-ffi/src/data_source.rs b/vortex-ffi/src/data_source.rs index 33ebe937b3a..a4b6e60293d 100644 --- a/vortex-ffi/src/data_source.rs +++ b/vortex-ffi/src/data_source.rs @@ -179,7 +179,7 @@ mod tests { assert_error(error); assert!(ds.is_null()); - opts.paths = c"*.vortex".as_ptr(); + opts.paths = c"definitely-missing-dir/*.vortex".as_ptr(); let ds = vx_data_source_new(session, &raw const opts, &raw mut error); assert_error(error); assert!(ds.is_null()); diff --git a/vortex-layout/src/layouts/dict/writer.rs b/vortex-layout/src/layouts/dict/writer.rs index 7013becd4df..f83c426b55b 100644 --- a/vortex-layout/src/layouts/dict/writer.rs +++ b/vortex-layout/src/layouts/dict/writer.rs @@ -20,6 +20,8 @@ use futures::stream::once; use futures::try_join; use vortex_array::ArrayContext; use vortex_array::ArrayRef; +use vortex_array::IntoArray; +use vortex_array::LEGACY_SESSION; use vortex_array::VortexSessionExecute; use vortex_array::arrays::Dict; use vortex_array::builders::dict::DictConstraints; @@ -557,7 +559,9 @@ fn encode_chunk( mut encoder: Box, chunk: &ArrayRef, ) -> VortexResult { - let encoded = encoder.encode(chunk); + let encoded = encoder + .encode(chunk, &mut LEGACY_SESSION.create_execution_ctx())? + .into_array(); match remainder(chunk, encoded.len())? { None => Ok(EncodingState::Continue((encoder, encoded))), Some(unencoded) => Ok(EncodingState::Done((encoder.reset(), encoded, unencoded))), diff --git a/vortex-test/compat-gen/src/fixtures/arrays/synthetic/encodings/dict.rs b/vortex-test/compat-gen/src/fixtures/arrays/synthetic/encodings/dict.rs index add08b5f8e3..0e3a9d5a089 100644 --- a/vortex-test/compat-gen/src/fixtures/arrays/synthetic/encodings/dict.rs +++ b/vortex-test/compat-gen/src/fixtures/arrays/synthetic/encodings/dict.rs @@ -5,14 +5,17 @@ use vortex::array::ArrayId; use vortex::array::ArrayRef; use vortex::array::ArrayVTable; use vortex::array::IntoArray; +use vortex::array::VortexSessionExecute; use vortex::array::arrays::Dict; use vortex::array::arrays::PrimitiveArray; use vortex::array::arrays::StructArray; use vortex::array::arrays::VarBinArray; use vortex::array::builders::dict::dict_encode; use vortex::array::dtype::FieldNames; +use vortex::array::session::ArraySession; use vortex::array::validity::Validity; use vortex::error::VortexResult; +use vortex::session::VortexSession; use super::N; use crate::fixtures::FlatLayoutFixture; @@ -83,6 +86,9 @@ impl FlatLayoutFixture for DictFixture { .map(|i| insertion_values[(i * 7 + 3) % insertion_values.len()]) .collect(); let insertion_ordered_col = VarBinArray::from_strs(insertion_ordered); + let mut ctx = VortexSession::empty() + .with::() + .create_execution_ctx(); let arr = StructArray::try_new( FieldNames::from([ @@ -100,18 +106,18 @@ impl FlatLayoutFixture for DictFixture { "insertion_ordered", ]), vec![ - dict_encode(&str_col.into_array())?.into_array(), - dict_encode(&int_col.into_array())?.into_array(), - dict_encode(&nullable_col.into_array())?.into_array(), - dict_encode(&single_col.into_array())?.into_array(), - dict_encode(&bool_cat_col.into_array())?.into_array(), - dict_encode(&all_null_col.into_array())?.into_array(), - dict_encode(&single_non_null_col.into_array())?.into_array(), - dict_encode(&threshold_255_col.into_array())?.into_array(), - dict_encode(&threshold_256_col.into_array())?.into_array(), - dict_encode(&threshold_257_col.into_array())?.into_array(), - dict_encode(&long_col.into_array())?.into_array(), - dict_encode(&insertion_ordered_col.into_array())?.into_array(), + dict_encode(&str_col.into_array(), &mut ctx)?.into_array(), + dict_encode(&int_col.into_array(), &mut ctx)?.into_array(), + dict_encode(&nullable_col.into_array(), &mut ctx)?.into_array(), + dict_encode(&single_col.into_array(), &mut ctx)?.into_array(), + dict_encode(&bool_cat_col.into_array(), &mut ctx)?.into_array(), + dict_encode(&all_null_col.into_array(), &mut ctx)?.into_array(), + dict_encode(&single_non_null_col.into_array(), &mut ctx)?.into_array(), + dict_encode(&threshold_255_col.into_array(), &mut ctx)?.into_array(), + dict_encode(&threshold_256_col.into_array(), &mut ctx)?.into_array(), + dict_encode(&threshold_257_col.into_array(), &mut ctx)?.into_array(), + dict_encode(&long_col.into_array(), &mut ctx)?.into_array(), + dict_encode(&insertion_ordered_col.into_array(), &mut ctx)?.into_array(), ], N, Validity::NonNullable, diff --git a/vortex/benches/single_encoding_throughput.rs b/vortex/benches/single_encoding_throughput.rs index be253187956..f3b1d39911f 100644 --- a/vortex/benches/single_encoding_throughput.rs +++ b/vortex/benches/single_encoding_throughput.rs @@ -213,16 +213,21 @@ fn bench_for_decompress_i32(bencher: Bencher) { #[divan::bench(name = "dict_compress_u32")] fn bench_dict_compress_u32(bencher: Bencher) { let (uint_array, ..) = setup_primitive_arrays(); + let array = uint_array.into_array(); with_byte_counter(bencher, NUM_VALUES * 4) - .with_inputs(|| &uint_array) - .bench_refs(|a| dict_encode(&a.clone().into_array()).unwrap()); + .with_inputs(|| (&array, SESSION.create_execution_ctx())) + .bench_refs(|(a, ctx)| dict_encode(a, ctx).unwrap()); } #[divan::bench(name = "dict_decompress_u32")] fn bench_dict_decompress_u32(bencher: Bencher) { let (uint_array, ..) = setup_primitive_arrays(); - let compressed = dict_encode(&uint_array.into_array()).unwrap(); + let compressed = dict_encode( + &uint_array.into_array(), + &mut SESSION.create_execution_ctx(), + ) + .unwrap(); with_byte_counter(bencher, NUM_VALUES * 4) .with_inputs(|| (&compressed, SESSION.create_execution_ctx())) @@ -393,17 +398,22 @@ fn bench_dict_compress_string(bencher: Bencher) { let varbinview_arr = VarBinViewArray::from_iter_str(gen_varbin_words(NUM_VALUES as usize, 0.00005)); let nbytes = varbinview_arr.nbytes() as u64; + let array = varbinview_arr.into_array(); with_byte_counter(bencher, nbytes) - .with_inputs(|| &varbinview_arr) - .bench_refs(|a| dict_encode(&a.clone().into_array()).unwrap()); + .with_inputs(|| (&array, SESSION.create_execution_ctx())) + .bench_refs(|(a, ctx)| dict_encode(a, ctx).unwrap()); } #[divan::bench(name = "dict_decompress_string")] fn bench_dict_decompress_string(bencher: Bencher) { let varbinview_arr = VarBinViewArray::from_iter_str(gen_varbin_words(NUM_VALUES as usize, 0.00005)); - let dict = dict_encode(&varbinview_arr.clone().into_array()).unwrap(); + let dict = dict_encode( + &varbinview_arr.clone().into_array(), + &mut SESSION.create_execution_ctx(), + ) + .unwrap(); let nbytes = varbinview_arr.into_array().nbytes() as u64; with_byte_counter(bencher, nbytes)