Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions vortex-array/src/builders/dict/bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use vortex_buffer::BitBufferMut;
use vortex_buffer::BufferMut;
use vortex_buffer::ByteBufferMut;
use vortex_dtype::DType;
use vortex_dtype::PType;
use vortex_dtype::UnsignedPType;
use vortex_error::VortexExpect;
use vortex_error::vortex_panic;
Expand Down Expand Up @@ -195,6 +196,10 @@ impl<Code: UnsignedPType> DictEncoder for BytesDictBuilder<Code> {
.into_array()
}
}

fn codes_ptype(&self) -> PType {
Code::PTYPE
}
}

#[cfg(test)]
Expand Down
4 changes: 4 additions & 0 deletions vortex-array/src/builders/dict/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

use bytes::bytes_dict_builder;
use primitive::primitive_dict_builder;
use vortex_dtype::PType;
use vortex_dtype::match_each_native_ptype;
use vortex_error::VortexResult;
use vortex_error::vortex_bail;
Expand Down Expand Up @@ -37,6 +38,9 @@ pub trait DictEncoder: Send {

/// Clear the encoder state to make it ready for a new round of decoding.
fn reset(&mut self) -> ArrayRef;

/// Returns the PType of the codes this encoder produces.
fn codes_ptype(&self) -> PType;
}

pub fn dict_encoder(array: &dyn Array, constraints: &DictConstraints) -> Box<dyn DictEncoder> {
Expand Down
5 changes: 5 additions & 0 deletions vortex-array/src/builders/dict/primitive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use vortex_buffer::BitBufferMut;
use vortex_buffer::BufferMut;
use vortex_dtype::NativePType;
use vortex_dtype::Nullability;
use vortex_dtype::PType;
use vortex_dtype::UnsignedPType;
use vortex_error::vortex_panic;
use vortex_utils::aliases::hash_map::Entry;
Expand Down Expand Up @@ -145,6 +146,10 @@ where
)
.into_array()
}

fn codes_ptype(&self) -> PType {
Code::PTYPE
}
}

#[cfg(test)]
Expand Down
198 changes: 169 additions & 29 deletions vortex-layout/src/layouts/dict/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ use vortex_array::builders::dict::DictEncoder;
use vortex_array::builders::dict::dict_encoder;
use vortex_btrblocks::BtrBlocksCompressor;
use vortex_dtype::DType;
use vortex_dtype::Nullability::NonNullable;
use vortex_dtype::Nullability;
use vortex_dtype::PType;
use vortex_error::VortexError;
use vortex_error::VortexExpect;
use vortex_error::VortexResult;
use vortex_error::vortex_err;
use vortex_io::kanal_ext::KanalExt;
Expand All @@ -49,10 +50,22 @@ use crate::sequence::SequentialStream;
use crate::sequence::SequentialStreamAdapter;
use crate::sequence::SequentialStreamExt;

/// Constraints for dictionary layout encoding.
///
/// Note that [`max_len`](Self::max_len) is limited to `u16` (65,535 entries) by design. Since
/// layout chunks are typically ~8k elements, having more than 64k unique values in a dictionary
/// means dictionary encoding provides little compression benefit. If a column has very high
/// cardinality, the fallback encoding strategy should be used instead.
Comment on lines +53 to +58
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be worth saying you can still use a DictArray?

#[derive(Clone)]
pub struct DictLayoutConstraints {
/// Maximum size of the dictionary in bytes.
pub max_bytes: usize,
// Dict layout codes currently only support u16 codes
/// Maximum dictionary length. Limited to `u16` because dictionaries with more than 64k unique
/// values provide diminishing compression returns given typical chunk sizes (~8k elements).
///
/// The codes dtype is determined upfront from this constraint:
/// - [`PType::U8`] when max_len <= 255
/// - [`PType::U16`] when max_len > 255
pub max_len: u16,
}

Expand Down Expand Up @@ -226,7 +239,11 @@ impl LayoutStrategy for DictStrategy {
}

enum DictionaryChunk {
Codes((SequenceId, ArrayRef)),
Codes {
seq_id: SequenceId,
codes: ArrayRef,
codes_ptype: PType,
},
Values((SequenceId, ArrayRef)),
}

Expand Down Expand Up @@ -288,26 +305,33 @@ impl DictStreamState {
match self.encoder.take() {
None => match start_encoding(&self.constraints, &remaining) {
EncodingState::Continue((encoder, encoded)) => {
res.push(labeler.codes(encoded));
let ptype = encoder.codes_ptype();
res.push(labeler.codes(encoded, ptype));
self.encoder = Some(encoder);
}
EncodingState::Done((values, encoded, unencoded)) => {
res.push(labeler.codes(encoded));
// Encoder was created and consumed within start_encoding
let ptype = PType::try_from(encoded.dtype())
.vortex_expect("codes should be primitive");
res.push(labeler.codes(encoded, ptype));
res.push(labeler.values(values));
to_be_encoded = Some(unencoded);
}
},
Some(encoder) => match encode_chunk(encoder, &remaining) {
EncodingState::Continue((encoder, encoded)) => {
res.push(labeler.codes(encoded));
self.encoder = Some(encoder);
}
EncodingState::Done((values, encoded, unencoded)) => {
res.push(labeler.codes(encoded));
res.push(labeler.values(values));
to_be_encoded = Some(unencoded);
Some(encoder) => {
let ptype = encoder.codes_ptype();
match encode_chunk(encoder, &remaining) {
EncodingState::Continue((encoder, encoded)) => {
res.push(labeler.codes(encoded, ptype));
self.encoder = Some(encoder);
}
EncodingState::Done((values, encoded, unencoded)) => {
res.push(labeler.codes(encoded, ptype));
res.push(labeler.values(values));
to_be_encoded = Some(unencoded);
}
}
},
}
}
}
res
Expand All @@ -331,8 +355,12 @@ impl DictChunkLabeler {
Self { sequence_pointer }
}

fn codes(&mut self, chunk: ArrayRef) -> DictionaryChunk {
DictionaryChunk::Codes((self.sequence_pointer.advance(), chunk))
fn codes(&mut self, chunk: ArrayRef, ptype: PType) -> DictionaryChunk {
DictionaryChunk::Codes {
seq_id: self.sequence_pointer.advance(),
codes: chunk,
codes_ptype: ptype,
}
}

fn values(&mut self, chunk: ArrayRef) -> DictionaryChunk {
Expand Down Expand Up @@ -387,7 +415,11 @@ impl Stream for DictionaryTransformer {
}

match self.input.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(DictionaryChunk::Codes(codes)))) => {
Poll::Ready(Some(Ok(DictionaryChunk::Codes {
seq_id,
codes,
codes_ptype,
}))) => {
if self.active_codes_tx.is_none() {
// Start a new group
let (codes_tx, codes_rx) = kanal::bounded_async::<SequencedChunk>(1);
Expand All @@ -396,13 +428,18 @@ impl Stream for DictionaryTransformer {
self.active_codes_tx = Some(codes_tx.clone());
self.active_values_tx = Some(values_tx);

// Send first codes
// Use passed codes_ptype instead of getting from array
let codes_dtype = DType::Primitive(codes_ptype, Nullability::NonNullable);

// Send first codes.
self.pending_send =
Some(Box::pin(async move { codes_tx.send(Ok(codes)).await }));
Some(Box::pin(
async move { codes_tx.send(Ok((seq_id, codes))).await },
));

// Create output streams
// Create output streams.
let codes_stream = SequentialStreamAdapter::new(
DType::Primitive(PType::U16, NonNullable),
codes_dtype,
codes_rx.into_stream().boxed(),
)
.sendable();
Expand All @@ -416,13 +453,13 @@ impl Stream for DictionaryTransformer {
.boxed();

return Poll::Ready(Some((codes_stream, values_future)));
} else {
// Continue streaming codes to existing group
if let Some(tx) = &self.active_codes_tx {
let tx = tx.clone();
self.pending_send =
Some(Box::pin(async move { tx.send(Ok(codes)).await }));
}
}

// Continue streaming codes to existing group
if let Some(tx) = &self.active_codes_tx {
let tx = tx.clone();
self.pending_send =
Some(Box::pin(async move { tx.send(Ok((seq_id, codes))).await }));
}
}
Poll::Ready(Some(Ok(DictionaryChunk::Values(values)))) => {
Expand Down Expand Up @@ -514,3 +551,106 @@ fn encode_chunk(mut encoder: Box<dyn DictEncoder>, chunk: &dyn Array) -> Encodin
fn remainder(array: &dyn Array, encoded_len: usize) -> Option<ArrayRef> {
(encoded_len < array.len()).then(|| array.slice(encoded_len..array.len()))
}

#[cfg(test)]
mod tests {
use futures::StreamExt;
use vortex_array::IntoArray;
use vortex_array::arrays::VarBinArray;
use vortex_array::builders::dict::DictConstraints;
use vortex_dtype::DType;
use vortex_dtype::Nullability::NonNullable;
use vortex_dtype::PType;

use super::DictionaryTransformer;
use super::dict_encode_stream;
use crate::sequence::SequenceId;
use crate::sequence::SequentialStream;
use crate::sequence::SequentialStreamAdapter;
use crate::sequence::SequentialStreamExt;

/// Regression test for a bug where the codes stream dtype was hardcoded to U16 instead of
/// using the actual codes dtype from the array. When `max_len <= 255`, the dict encoder
/// produces U8 codes, but the stream was incorrectly typed as U16, causing a dtype mismatch
/// assertion failure in [`SequentialStreamAdapter`].
#[tokio::test]
async fn test_dict_transformer_uses_u8_for_small_dictionaries() {
// Use max_len = 100 to force U8 codes (since 100 <= 255).
let constraints = DictConstraints {
max_bytes: 1024 * 1024,
max_len: 100,
};

// Create a simple string array with a few unique values.
let arr = VarBinArray::from(vec!["hello", "world", "hello", "world"]).into_array();

// Wrap into a sequential stream.
let mut pointer = SequenceId::root();
let input_stream = SequentialStreamAdapter::new(
arr.dtype().clone(),
futures::stream::once(async move { Ok((pointer.advance(), arr)) }),
)
.sendable();

// Encode into dict chunks.
let dict_stream = dict_encode_stream(input_stream, constraints);

// Transform into codes/values streams.
let mut transformer = DictionaryTransformer::new(dict_stream);

// Get the first (and only) run.
let (codes_stream, _values_fut) = transformer
.next()
.await
.expect("expected at least one dictionary run");

// The key assertion: codes stream dtype should be U8, not U16.
assert_eq!(
codes_stream.dtype(),
&DType::Primitive(PType::U8, NonNullable),
"codes stream should use U8 dtype for small dictionaries, not U16"
);
}

/// Test that the codes stream uses U16 dtype when the dictionary has more than 255 entries.
#[tokio::test]
async fn test_dict_transformer_uses_u16_for_large_dictionaries() {
// Use max_len = 1000 to allow U16 codes (since 1000 > 255).
let constraints = DictConstraints {
max_bytes: 1024 * 1024,
max_len: 1000,
};

// Create an array with more than 255 distinct values to force U16 codes.
let values: Vec<String> = (0..300).map(|i| format!("value_{i}")).collect();
let arr =
VarBinArray::from(values.iter().map(|s| s.as_str()).collect::<Vec<_>>()).into_array();

// Wrap into a sequential stream.
let mut pointer = SequenceId::root();
let input_stream = SequentialStreamAdapter::new(
arr.dtype().clone(),
futures::stream::once(async move { Ok((pointer.advance(), arr)) }),
)
.sendable();

// Encode into dict chunks.
let dict_stream = dict_encode_stream(input_stream, constraints);

// Transform into codes/values streams.
let mut transformer = DictionaryTransformer::new(dict_stream);

// Get the first (and only) run.
let (codes_stream, _values_fut) = transformer
.next()
.await
.expect("expected at least one dictionary run");

// Codes stream dtype should be U16 since we have more than 255 distinct values.
assert_eq!(
codes_stream.dtype(),
&DType::Primitive(PType::U16, NonNullable),
"codes stream should use U16 dtype for dictionaries with >255 entries"
);
}
}
Loading