Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion encodings/parquet-variant/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ readme = { workspace = true }
repository = { workspace = true }
rust-version = { workspace = true }
version = { workspace = true }
publish = false
publish = true

[lints]
workspace = true
Expand Down
8 changes: 8 additions & 0 deletions vortex-btrblocks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ version = { workspace = true }
[dependencies]
itertools = { workspace = true }
num-traits = { workspace = true }
parquet-variant-compute = { workspace = true, optional = true }
pco = { workspace = true, optional = true }
rand = { workspace = true }
rustc-hash = { workspace = true }
Expand All @@ -30,6 +31,7 @@ vortex-error = { workspace = true }
vortex-fastlanes = { workspace = true }
vortex-fsst = { workspace = true }
vortex-mask = { workspace = true }
vortex-parquet-variant = { workspace = true, optional = true }
vortex-pco = { workspace = true, optional = true }
vortex-runend = { workspace = true }
vortex-sequence = { workspace = true }
Expand All @@ -41,6 +43,7 @@ vortex-zstd = { workspace = true, optional = true }

[dev-dependencies]
divan = { workspace = true }
rand = { workspace = true }
rstest = { workspace = true }
test-with = { workspace = true }
vortex-array = { workspace = true, features = ["_test-harness"] }
Expand All @@ -49,6 +52,11 @@ vortex-session = { workspace = true }
[features]
# This feature enabled unstable encodings for which we don't guarantee stability.
unstable_encodings = ["dep:vortex-tensor", "vortex-zstd?/unstable_encodings"]
parquet-variant = [
"dep:vortex-parquet-variant",
"dep:parquet-variant-compute",
"zstd",
]
pco = ["dep:pco", "dep:vortex-pco"]
zstd = ["dep:vortex-zstd"]

Expand Down
14 changes: 11 additions & 3 deletions vortex-btrblocks/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ pub const ALL_SCHEMES: &[&dyn Scheme] = &[
// Binary schemes.
////////////////////////////////////////////////////////////////////////////////////////////////
&binary::BinaryDictScheme,
&binary::BinaryFSSTScheme,
&binary::BinaryConstantScheme,
// Decimal schemes.
&decimal::DecimalScheme,
Expand Down Expand Up @@ -142,7 +143,9 @@ impl BtrBlocksCompressorBuilder {
/// Panics if any of the compact schemes are already present.
#[cfg(feature = "zstd")]
pub fn with_compact(self) -> Self {
let builder = self.with_new_scheme(&string::ZstdScheme);
let builder = self
.with_new_scheme(&string::ZstdScheme)
.with_new_scheme(&binary::BinaryZstdScheme);

#[cfg(feature = "pco")]
let builder = builder
Expand Down Expand Up @@ -182,12 +185,17 @@ impl BtrBlocksCompressorBuilder {
string::StringDictScheme.id(),
string::FSSTScheme.id(),
binary::BinaryDictScheme.id(),
binary::BinaryFSSTScheme.id(),
]);

#[cfg(all(feature = "zstd", feature = "unstable_encodings"))]
let builder = builder.with_new_scheme(&string::ZstdBuffersScheme);
let builder = builder
.with_new_scheme(&string::ZstdBuffersScheme)
.with_new_scheme(&binary::BinaryZstdBuffersScheme);
#[cfg(all(feature = "zstd", not(feature = "unstable_encodings")))]
let builder = builder.with_new_scheme(&string::ZstdScheme);
let builder = builder
.with_new_scheme(&string::ZstdScheme)
.with_new_scheme(&binary::BinaryZstdScheme);

builder
}
Expand Down
2 changes: 2 additions & 0 deletions vortex-btrblocks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ mod builder;
mod canonical_compressor;
/// Compression scheme implementations.
pub mod schemes;
#[cfg(feature = "parquet-variant")]
pub mod variant;

// Re-export framework types from vortex-compressor for backwards compatibility.
// Btrblocks-specific exports.
Expand Down
259 changes: 258 additions & 1 deletion vortex-btrblocks/src/schemes/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,263 @@

//! Binary compression schemes.

// Re-export builtin schemes from vortex-compressor.
use vortex_array::ArrayRef;
use vortex_array::Canonical;
use vortex_array::ExecutionCtx;
use vortex_array::IntoArray;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::arrays::VarBinArray;
use vortex_array::arrays::primitive::PrimitiveArrayExt;
use vortex_array::arrays::varbin::VarBinArrayExt;
pub use vortex_compressor::builtins::BinaryConstantScheme;
pub use vortex_compressor::builtins::BinaryDictScheme;
use vortex_compressor::estimate::CompressionEstimate;
use vortex_compressor::estimate::DeferredEstimate;
use vortex_error::VortexResult;
use vortex_fsst::FSST;
use vortex_fsst::FSSTArrayExt;
use vortex_fsst::fsst_compress;
use vortex_fsst::fsst_train_compressor;

use crate::ArrayAndStats;
use crate::CascadingCompressor;
use crate::CompressorContext;
use crate::Scheme;
use crate::SchemeExt;

/// FSST compression for binary values.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub struct BinaryFSSTScheme;

/// Zstd compression for binary values.
#[cfg(feature = "zstd")]
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub struct BinaryZstdScheme;

/// Buffer-level Zstd compression for binary values.
#[cfg(all(feature = "zstd", feature = "unstable_encodings"))]
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub struct BinaryZstdBuffersScheme;

impl Scheme for BinaryFSSTScheme {
fn scheme_name(&self) -> &'static str {
"vortex.binary.fsst"
}

fn matches(&self, canonical: &Canonical) -> bool {
canonical.dtype().is_binary()
}

/// Children: lengths=0, code_offsets=1.
fn num_children(&self) -> usize {
2
}

fn expected_compression_ratio(
&self,
_data: &ArrayAndStats,
_compress_ctx: CompressorContext,
_exec_ctx: &mut ExecutionCtx,
) -> CompressionEstimate {
CompressionEstimate::Deferred(DeferredEstimate::Sample)
}

fn compress(
&self,
compressor: &CascadingCompressor,
data: &ArrayAndStats,
compress_ctx: CompressorContext,
exec_ctx: &mut ExecutionCtx,
) -> VortexResult<ArrayRef> {
let binary = data.array_as_varbinview().into_owned();
let compressor_fsst = fsst_train_compressor(&binary);
let fsst = fsst_compress(
&binary,
binary.len(),
binary.dtype(),
&compressor_fsst,
exec_ctx,
);

let uncompressed_lengths_primitive = fsst
.uncompressed_lengths()
.clone()
.execute::<PrimitiveArray>(exec_ctx)?
.narrow(exec_ctx)?;
let compressed_original_lengths = compressor.compress_child(
&uncompressed_lengths_primitive.into_array(),
&compress_ctx,
self.id(),
0,
exec_ctx,
)?;

let codes_offsets_primitive = fsst
.codes()
.offsets()
.clone()
.execute::<PrimitiveArray>(exec_ctx)?
.narrow(exec_ctx)?;
let compressed_codes_offsets = compressor.compress_child(
&codes_offsets_primitive.into_array(),
&compress_ctx,
self.id(),
1,
exec_ctx,
)?;
let compressed_codes = VarBinArray::try_new(
compressed_codes_offsets,
fsst.codes().bytes().clone(),
fsst.codes().dtype().clone(),
fsst.codes().validity()?,
)?;

let fsst = FSST::try_new(
fsst.dtype().clone(),
fsst.symbols().clone(),
fsst.symbol_lengths().clone(),
compressed_codes,
compressed_original_lengths,
exec_ctx,
)?;

Ok(fsst.into_array())
}
}

#[cfg(feature = "zstd")]
impl Scheme for BinaryZstdScheme {
fn scheme_name(&self) -> &'static str {
"vortex.binary.zstd"
}

fn matches(&self, canonical: &Canonical) -> bool {
canonical.dtype().is_binary()
}

fn expected_compression_ratio(
&self,
_data: &ArrayAndStats,
_compress_ctx: CompressorContext,
_exec_ctx: &mut ExecutionCtx,
) -> CompressionEstimate {
CompressionEstimate::Deferred(DeferredEstimate::Sample)
}

fn compress(
&self,
_compressor: &CascadingCompressor,
data: &ArrayAndStats,
_compress_ctx: CompressorContext,
exec_ctx: &mut ExecutionCtx,
) -> VortexResult<ArrayRef> {
let compacted = data.array_as_varbinview().into_owned().compact_buffers()?;
Ok(
vortex_zstd::Zstd::from_var_bin_view_without_dict(&compacted, 3, 8192, exec_ctx)?
.into_array(),
)
}
}

#[cfg(all(feature = "zstd", feature = "unstable_encodings"))]
impl Scheme for BinaryZstdBuffersScheme {
fn scheme_name(&self) -> &'static str {
"vortex.binary.zstd_buffers"
}

fn matches(&self, canonical: &Canonical) -> bool {
canonical.dtype().is_binary()
}

fn expected_compression_ratio(
&self,
_data: &ArrayAndStats,
_compress_ctx: CompressorContext,
_exec_ctx: &mut ExecutionCtx,
) -> CompressionEstimate {
CompressionEstimate::Deferred(DeferredEstimate::Sample)
}

fn compress(
&self,
_compressor: &CascadingCompressor,
data: &ArrayAndStats,
_compress_ctx: CompressorContext,
exec_ctx: &mut ExecutionCtx,
) -> VortexResult<ArrayRef> {
Ok(vortex_zstd::ZstdBuffers::compress(data.array(), 3, exec_ctx.session())?.into_array())
}
}

#[cfg(test)]
mod tests {
use std::sync::LazyLock;

use vortex_array::IntoArray;
use vortex_array::VortexSessionExecute;
use vortex_array::arrays::VarBinViewArray;
use vortex_array::assert_arrays_eq;
use vortex_array::dtype::DType;
use vortex_array::dtype::Nullability;
use vortex_array::session::ArraySession;
use vortex_error::VortexResult;
use vortex_fsst::FSST;
use vortex_session::VortexSession;

use crate::BtrBlocksCompressor;

static SESSION: LazyLock<VortexSession> =
LazyLock::new(|| VortexSession::empty().with::<ArraySession>());

fn binary_fsst_data() -> VarBinViewArray {
VarBinViewArray::from_iter(
(0..1024).map(|idx| {
Some(format!("variant-key-{idx:04}-invoice-total-line-items").into_bytes())
}),
DType::Binary(Nullability::NonNullable),
)
}

#[test]
fn default_compressor_uses_fsst_for_binary_data() -> VortexResult<()> {
let array = binary_fsst_data().into_array();
let compressed =
BtrBlocksCompressor::default().compress(&array, &mut SESSION.create_execution_ctx())?;

assert!(
compressed.is::<FSST>(),
"expected binary data to be FSST-compressed, got {}",
compressed.encoding_id(),
);
assert!(compressed.nbytes() < array.nbytes());

let decompressed =
compressed.execute::<vortex_array::ArrayRef>(&mut SESSION.create_execution_ctx())?;
assert_arrays_eq!(array, decompressed);

Ok(())
}

#[cfg(feature = "zstd")]
#[test]
fn compact_compressor_uses_zstd_for_binary_data() -> VortexResult<()> {
let array = binary_fsst_data().into_array();
let compressed = crate::BtrBlocksCompressorBuilder::default()
.with_compact()
.build()
.compress(&array, &mut SESSION.create_execution_ctx())?;

assert!(
compressed.is::<vortex_zstd::Zstd>(),
"expected compact binary data to be Zstd-compressed, got {}",
compressed.encoding_id(),
);
assert!(compressed.nbytes() < array.nbytes());

let decompressed =
compressed.execute::<vortex_array::ArrayRef>(&mut SESSION.create_execution_ctx())?;
assert_arrays_eq!(array, decompressed);

Ok(())
}
}
3 changes: 1 addition & 2 deletions vortex-btrblocks/src/schemes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@

pub mod binary;
pub mod bool;
pub mod decimal;
pub mod float;
pub mod integer;
pub mod string;

pub mod decimal;
pub mod temporal;

pub(crate) mod patches;
Expand Down
Loading
Loading