diff --git a/vortex-array/src/builders/dict/bytes.rs b/vortex-array/src/builders/dict/bytes.rs index d6343e62e99..a66f8ed5efa 100644 --- a/vortex-array/src/builders/dict/bytes.rs +++ b/vortex-array/src/builders/dict/bytes.rs @@ -9,6 +9,7 @@ use vortex_buffer::BitBufferMut; use vortex_buffer::BufferMut; use vortex_buffer::ByteBufferMut; use vortex_dtype::DType; +use vortex_dtype::PType; use vortex_dtype::UnsignedPType; use vortex_error::VortexExpect; use vortex_error::vortex_panic; @@ -195,6 +196,10 @@ impl DictEncoder for BytesDictBuilder { .into_array() } } + + fn codes_ptype(&self) -> PType { + Code::PTYPE + } } #[cfg(test)] diff --git a/vortex-array/src/builders/dict/mod.rs b/vortex-array/src/builders/dict/mod.rs index 8d4c2306bb7..b7b3a2bc1c2 100644 --- a/vortex-array/src/builders/dict/mod.rs +++ b/vortex-array/src/builders/dict/mod.rs @@ -3,6 +3,7 @@ use bytes::bytes_dict_builder; use primitive::primitive_dict_builder; +use vortex_dtype::PType; use vortex_dtype::match_each_native_ptype; use vortex_error::VortexResult; use vortex_error::vortex_bail; @@ -37,6 +38,9 @@ pub trait DictEncoder: Send { /// Clear the encoder state to make it ready for a new round of decoding. fn reset(&mut self) -> ArrayRef; + + /// Returns the PType of the codes this encoder produces. + fn codes_ptype(&self) -> PType; } pub fn dict_encoder(array: &dyn Array, constraints: &DictConstraints) -> Box { diff --git a/vortex-array/src/builders/dict/primitive.rs b/vortex-array/src/builders/dict/primitive.rs index 374c5f2b6c9..99163e49192 100644 --- a/vortex-array/src/builders/dict/primitive.rs +++ b/vortex-array/src/builders/dict/primitive.rs @@ -9,6 +9,7 @@ use vortex_buffer::BitBufferMut; use vortex_buffer::BufferMut; use vortex_dtype::NativePType; use vortex_dtype::Nullability; +use vortex_dtype::PType; use vortex_dtype::UnsignedPType; use vortex_error::vortex_panic; use vortex_utils::aliases::hash_map::Entry; @@ -145,6 +146,10 @@ where ) .into_array() } + + fn codes_ptype(&self) -> PType { + Code::PTYPE + } } #[cfg(test)] diff --git a/vortex-layout/src/layouts/dict/writer.rs b/vortex-layout/src/layouts/dict/writer.rs index 9b6a714c254..57cafbf7ba3 100644 --- a/vortex-layout/src/layouts/dict/writer.rs +++ b/vortex-layout/src/layouts/dict/writer.rs @@ -27,9 +27,10 @@ use vortex_array::builders::dict::DictEncoder; use vortex_array::builders::dict::dict_encoder; use vortex_btrblocks::BtrBlocksCompressor; use vortex_dtype::DType; -use vortex_dtype::Nullability::NonNullable; +use vortex_dtype::Nullability; use vortex_dtype::PType; use vortex_error::VortexError; +use vortex_error::VortexExpect; use vortex_error::VortexResult; use vortex_error::vortex_err; use vortex_io::kanal_ext::KanalExt; @@ -49,10 +50,22 @@ use crate::sequence::SequentialStream; use crate::sequence::SequentialStreamAdapter; use crate::sequence::SequentialStreamExt; +/// Constraints for dictionary layout encoding. +/// +/// Note that [`max_len`](Self::max_len) is limited to `u16` (65,535 entries) by design. Since +/// layout chunks are typically ~8k elements, having more than 64k unique values in a dictionary +/// means dictionary encoding provides little compression benefit. If a column has very high +/// cardinality, the fallback encoding strategy should be used instead. #[derive(Clone)] pub struct DictLayoutConstraints { + /// Maximum size of the dictionary in bytes. pub max_bytes: usize, - // Dict layout codes currently only support u16 codes + /// Maximum dictionary length. Limited to `u16` because dictionaries with more than 64k unique + /// values provide diminishing compression returns given typical chunk sizes (~8k elements). + /// + /// The codes dtype is determined upfront from this constraint: + /// - [`PType::U8`] when max_len <= 255 + /// - [`PType::U16`] when max_len > 255 pub max_len: u16, } @@ -226,7 +239,11 @@ impl LayoutStrategy for DictStrategy { } enum DictionaryChunk { - Codes((SequenceId, ArrayRef)), + Codes { + seq_id: SequenceId, + codes: ArrayRef, + codes_ptype: PType, + }, Values((SequenceId, ArrayRef)), } @@ -288,26 +305,33 @@ impl DictStreamState { match self.encoder.take() { None => match start_encoding(&self.constraints, &remaining) { EncodingState::Continue((encoder, encoded)) => { - res.push(labeler.codes(encoded)); + let ptype = encoder.codes_ptype(); + res.push(labeler.codes(encoded, ptype)); self.encoder = Some(encoder); } EncodingState::Done((values, encoded, unencoded)) => { - res.push(labeler.codes(encoded)); + // Encoder was created and consumed within start_encoding + let ptype = PType::try_from(encoded.dtype()) + .vortex_expect("codes should be primitive"); + res.push(labeler.codes(encoded, ptype)); res.push(labeler.values(values)); to_be_encoded = Some(unencoded); } }, - Some(encoder) => match encode_chunk(encoder, &remaining) { - EncodingState::Continue((encoder, encoded)) => { - res.push(labeler.codes(encoded)); - self.encoder = Some(encoder); - } - EncodingState::Done((values, encoded, unencoded)) => { - res.push(labeler.codes(encoded)); - res.push(labeler.values(values)); - to_be_encoded = Some(unencoded); + Some(encoder) => { + let ptype = encoder.codes_ptype(); + match encode_chunk(encoder, &remaining) { + EncodingState::Continue((encoder, encoded)) => { + res.push(labeler.codes(encoded, ptype)); + self.encoder = Some(encoder); + } + EncodingState::Done((values, encoded, unencoded)) => { + res.push(labeler.codes(encoded, ptype)); + res.push(labeler.values(values)); + to_be_encoded = Some(unencoded); + } } - }, + } } } res @@ -331,8 +355,12 @@ impl DictChunkLabeler { Self { sequence_pointer } } - fn codes(&mut self, chunk: ArrayRef) -> DictionaryChunk { - DictionaryChunk::Codes((self.sequence_pointer.advance(), chunk)) + fn codes(&mut self, chunk: ArrayRef, ptype: PType) -> DictionaryChunk { + DictionaryChunk::Codes { + seq_id: self.sequence_pointer.advance(), + codes: chunk, + codes_ptype: ptype, + } } fn values(&mut self, chunk: ArrayRef) -> DictionaryChunk { @@ -387,7 +415,11 @@ impl Stream for DictionaryTransformer { } match self.input.poll_next_unpin(cx) { - Poll::Ready(Some(Ok(DictionaryChunk::Codes(codes)))) => { + Poll::Ready(Some(Ok(DictionaryChunk::Codes { + seq_id, + codes, + codes_ptype, + }))) => { if self.active_codes_tx.is_none() { // Start a new group let (codes_tx, codes_rx) = kanal::bounded_async::(1); @@ -396,13 +428,18 @@ impl Stream for DictionaryTransformer { self.active_codes_tx = Some(codes_tx.clone()); self.active_values_tx = Some(values_tx); - // Send first codes + // Use passed codes_ptype instead of getting from array + let codes_dtype = DType::Primitive(codes_ptype, Nullability::NonNullable); + + // Send first codes. self.pending_send = - Some(Box::pin(async move { codes_tx.send(Ok(codes)).await })); + Some(Box::pin( + async move { codes_tx.send(Ok((seq_id, codes))).await }, + )); - // Create output streams + // Create output streams. let codes_stream = SequentialStreamAdapter::new( - DType::Primitive(PType::U16, NonNullable), + codes_dtype, codes_rx.into_stream().boxed(), ) .sendable(); @@ -416,13 +453,13 @@ impl Stream for DictionaryTransformer { .boxed(); return Poll::Ready(Some((codes_stream, values_future))); - } else { - // Continue streaming codes to existing group - if let Some(tx) = &self.active_codes_tx { - let tx = tx.clone(); - self.pending_send = - Some(Box::pin(async move { tx.send(Ok(codes)).await })); - } + } + + // Continue streaming codes to existing group + if let Some(tx) = &self.active_codes_tx { + let tx = tx.clone(); + self.pending_send = + Some(Box::pin(async move { tx.send(Ok((seq_id, codes))).await })); } } Poll::Ready(Some(Ok(DictionaryChunk::Values(values)))) => { @@ -514,3 +551,106 @@ fn encode_chunk(mut encoder: Box, chunk: &dyn Array) -> Encodin fn remainder(array: &dyn Array, encoded_len: usize) -> Option { (encoded_len < array.len()).then(|| array.slice(encoded_len..array.len())) } + +#[cfg(test)] +mod tests { + use futures::StreamExt; + use vortex_array::IntoArray; + use vortex_array::arrays::VarBinArray; + use vortex_array::builders::dict::DictConstraints; + use vortex_dtype::DType; + use vortex_dtype::Nullability::NonNullable; + use vortex_dtype::PType; + + use super::DictionaryTransformer; + use super::dict_encode_stream; + use crate::sequence::SequenceId; + use crate::sequence::SequentialStream; + use crate::sequence::SequentialStreamAdapter; + use crate::sequence::SequentialStreamExt; + + /// Regression test for a bug where the codes stream dtype was hardcoded to U16 instead of + /// using the actual codes dtype from the array. When `max_len <= 255`, the dict encoder + /// produces U8 codes, but the stream was incorrectly typed as U16, causing a dtype mismatch + /// assertion failure in [`SequentialStreamAdapter`]. + #[tokio::test] + async fn test_dict_transformer_uses_u8_for_small_dictionaries() { + // Use max_len = 100 to force U8 codes (since 100 <= 255). + let constraints = DictConstraints { + max_bytes: 1024 * 1024, + max_len: 100, + }; + + // Create a simple string array with a few unique values. + let arr = VarBinArray::from(vec!["hello", "world", "hello", "world"]).into_array(); + + // Wrap into a sequential stream. + let mut pointer = SequenceId::root(); + let input_stream = SequentialStreamAdapter::new( + arr.dtype().clone(), + futures::stream::once(async move { Ok((pointer.advance(), arr)) }), + ) + .sendable(); + + // Encode into dict chunks. + let dict_stream = dict_encode_stream(input_stream, constraints); + + // Transform into codes/values streams. + let mut transformer = DictionaryTransformer::new(dict_stream); + + // Get the first (and only) run. + let (codes_stream, _values_fut) = transformer + .next() + .await + .expect("expected at least one dictionary run"); + + // The key assertion: codes stream dtype should be U8, not U16. + assert_eq!( + codes_stream.dtype(), + &DType::Primitive(PType::U8, NonNullable), + "codes stream should use U8 dtype for small dictionaries, not U16" + ); + } + + /// Test that the codes stream uses U16 dtype when the dictionary has more than 255 entries. + #[tokio::test] + async fn test_dict_transformer_uses_u16_for_large_dictionaries() { + // Use max_len = 1000 to allow U16 codes (since 1000 > 255). + let constraints = DictConstraints { + max_bytes: 1024 * 1024, + max_len: 1000, + }; + + // Create an array with more than 255 distinct values to force U16 codes. + let values: Vec = (0..300).map(|i| format!("value_{i}")).collect(); + let arr = + VarBinArray::from(values.iter().map(|s| s.as_str()).collect::>()).into_array(); + + // Wrap into a sequential stream. + let mut pointer = SequenceId::root(); + let input_stream = SequentialStreamAdapter::new( + arr.dtype().clone(), + futures::stream::once(async move { Ok((pointer.advance(), arr)) }), + ) + .sendable(); + + // Encode into dict chunks. + let dict_stream = dict_encode_stream(input_stream, constraints); + + // Transform into codes/values streams. + let mut transformer = DictionaryTransformer::new(dict_stream); + + // Get the first (and only) run. + let (codes_stream, _values_fut) = transformer + .next() + .await + .expect("expected at least one dictionary run"); + + // Codes stream dtype should be U16 since we have more than 255 distinct values. + assert_eq!( + codes_stream.dtype(), + &DType::Primitive(PType::U16, NonNullable), + "codes stream should use U16 dtype for dictionaries with >255 entries" + ); + } +}