diff --git a/parquet-testing b/parquet-testing index a3d96a65e11e..550368ca77b9 160000 --- a/parquet-testing +++ b/parquet-testing @@ -1 +1 @@ -Subproject commit a3d96a65e11e2bbca7d22a894e8313ede90a33a3 +Subproject commit 550368ca77b97231efead39251a96bd6f8f08c6e diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index d1ada01c3773..b3a4927d3a8c 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -35,6 +35,7 @@ ring = { version = "0.17", default-features = false, features = ["wasm32_unknown [target.'cfg(not(target_arch = "wasm32"))'.dependencies] ahash = { version = "0.8", default-features = false, features = ["runtime-rng"] } +snappy_src = { version = "0.2", default-features = false, optional = true } [dependencies] arrow-array = { workspace = true, optional = true } @@ -81,7 +82,6 @@ ring = { version = "0.17", default-features = false, features = ["std"], optiona [dev-dependencies] base64 = { version = "0.22", default-features = false, features = ["std"] } criterion = { workspace = true, default-features = false, features = ["async_futures"] } -snap = { version = "1.0", default-features = false } tempfile = { version = "3.0", default-features = false } insta = "1.43.1" brotli = { version = "8.0", default-features = false, features = ["std"] } @@ -101,6 +101,8 @@ all-features = true [features] default = ["arrow", "snap", "brotli", "flate2-zlib-rs", "lz4", "zstd", "base64", "simdutf8"] +# Enable snappy compression (uses C++ snappy library via snappy_src, plus snap crate for CLI framed format) +snap = ["snappy_src", "dep:snap"] # Enable lz4 lz4 = ["lz4_flex"] # Enable arrow reader/writer APIs diff --git a/parquet/src/compression.rs b/parquet/src/compression.rs index fe2fb59c5b8c..eb814013be04 100644 --- a/parquet/src/compression.rs +++ b/parquet/src/compression.rs @@ -196,21 +196,111 @@ pub fn create_codec(codec: CodecType, _options: &CodecOptions) -> Result Self { + Self + } + } + + impl Codec for SnappyCodec { + fn decompress( + &mut self, + input_buf: &[u8], + output_buf: &mut Vec, + uncompress_size: Option, + ) -> Result { + let len = match uncompress_size { + Some(size) => size, + None => { + let mut result: usize = 0; + let status = unsafe { + snappy_uncompressed_length( + input_buf.as_ptr() as *const c_char, + input_buf.len(), + &mut result, + ) + }; + if status != snappy_status_SNAPPY_OK { + return Err(general_err!("snappy: unable to get uncompressed length")); + } + result + } + }; + let offset = output_buf.len(); + output_buf.reserve(len); + // SAFETY: we just reserved `len` bytes and snappy will write exactly `len` bytes + unsafe { output_buf.set_len(offset + len) }; + let mut out_len = len; + let status = unsafe { + snappy_uncompress( + input_buf.as_ptr() as *const c_char, + input_buf.len(), + output_buf[offset..].as_mut_ptr() as *mut c_char, + &mut out_len, + ) + }; + if status != snappy_status_SNAPPY_OK { + output_buf.truncate(offset); + return Err(general_err!("snappy: decompress error")); + } + Ok(out_len) + } + + fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec) -> Result<()> { + let output_buf_len = output_buf.len(); + let required_len = + unsafe { snappy_max_compressed_length(input_buf.len()) }; + output_buf.reserve(required_len); + // SAFETY: we just reserved `required_len` bytes + unsafe { output_buf.set_len(output_buf_len + required_len) }; + let mut out_len = required_len; + let status = unsafe { + snappy_compress( + input_buf.as_ptr() as *const c_char, + input_buf.len(), + output_buf[output_buf_len..].as_mut_ptr() as *mut c_char, + &mut out_len, + ) + }; + if status != snappy_status_SNAPPY_OK { + output_buf.truncate(output_buf_len); + return Err(general_err!("snappy: compress error")); + } + output_buf.truncate(output_buf_len + out_len); + Ok(()) + } + } +} + +/// Snappy codec using pure-Rust snap crate (wasm32 fallback) +#[cfg(all(any(feature = "snap", test), target_arch = "wasm32"))] mod snappy_codec { - use snap::raw::{Decoder, Encoder, decompress_len, max_compress_len}; + use snap::raw::{Decoder, Encoder}; use crate::compression::Codec; use crate::errors::Result; - /// Codec for Snappy compression format. pub struct SnappyCodec { decoder: Decoder, encoder: Encoder, } impl SnappyCodec { - /// Creates new Snappy compression codec. pub(crate) fn new() -> Self { Self { decoder: Decoder::new(), @@ -226,29 +316,44 @@ mod snappy_codec { output_buf: &mut Vec, uncompress_size: Option, ) -> Result { - let len = match uncompress_size { - Some(size) => size, - None => decompress_len(input_buf)?, - }; + let len = uncompress_size.unwrap_or_else(|| { + snap::raw::decompress_len(input_buf).unwrap_or(0) + }); let offset = output_buf.len(); output_buf.resize(offset + len, 0); - self.decoder - .decompress(input_buf, &mut output_buf[offset..]) - .map_err(|e| e.into()) + match self.decoder.decompress(input_buf, &mut output_buf[offset..]) { + Ok(n) => { + output_buf.truncate(offset + n); + Ok(n) + } + Err(e) => { + output_buf.truncate(offset); + Err(general_err!("snappy decompress error: {}", e)) + } + } } fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec) -> Result<()> { let output_buf_len = output_buf.len(); - let required_len = max_compress_len(input_buf.len()); + let required_len = snap::raw::max_compress_len(input_buf.len()); output_buf.resize(output_buf_len + required_len, 0); - let n = self + match self .encoder - .compress(input_buf, &mut output_buf[output_buf_len..])?; - output_buf.truncate(output_buf_len + n); - Ok(()) + .compress(input_buf, &mut output_buf[output_buf_len..]) + { + Ok(n) => { + output_buf.truncate(output_buf_len + n); + Ok(()) + } + Err(e) => { + output_buf.truncate(output_buf_len); + Err(general_err!("snappy compress error: {}", e)) + } + } } } } + #[cfg(any(feature = "snap", test))] pub use snappy_codec::*; diff --git a/parquet/src/errors.rs b/parquet/src/errors.rs index 0533d7662c5f..e2294b34e093 100644 --- a/parquet/src/errors.rs +++ b/parquet/src/errors.rs @@ -101,13 +101,6 @@ impl From for ParquetError { } } -#[cfg(any(feature = "snap", test))] -impl From for ParquetError { - fn from(e: snap::Error) -> ParquetError { - ParquetError::External(Box::new(e)) - } -} - impl From for ParquetError { fn from(e: thrift::Error) -> ParquetError { ParquetError::External(Box::new(e))