diff --git a/.github/workflows/ci_integration_parquet.yml b/.github/workflows/ci_integration_parquet.yml new file mode 100644 index 00000000000..421faa418d5 --- /dev/null +++ b/.github/workflows/ci_integration_parquet.yml @@ -0,0 +1,51 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: Integration Parquet CI + +on: + push: + branches: + - main + pull_request: + branches: + - main + paths: + - "integrations/parquet/**" + - "core/**" + - ".github/workflows/ci_integration_parquet.yml" + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }}-${{ github.event_name }} + cancel-in-progress: true + +jobs: + check_clippy: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Setup Rust toolchain + uses: ./.github/actions/setup + + - name: Cargo clippy + working-directory: integrations/parquet + run: cargo clippy --all-targets --all-features -- -D warnings + + - name: Cargo test + working-directory: integrations/parquet + run: cargo test diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml index fbf561af63c..7b9d9e0dbaa 100644 --- a/.github/workflows/docs.yml +++ b/.github/workflows/docs.yml @@ -406,6 +406,29 @@ jobs: name: virtiofs-opendal-docs path: ./integrations/virtiofs/target/doc + build-parquet-opendal-doc: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + + - name: Setup Rust toolchain + uses: ./.github/actions/setup + + - name: Setup Rust Nightly + run: | + rustup toolchain install ${{ env.RUST_DOC_TOOLCHAIN }} + + - name: Build parquet-opendal doc + working-directory: "integrations/parquet" + run: cargo +${{ env.RUST_DOC_TOOLCHAIN }} doc --lib --no-deps --all-features + + - name: Upload docs + uses: actions/upload-artifact@v3 + with: + name: object-parquet-docs + path: ./integrations/parquet/target/doc + build-website: runs-on: ubuntu-latest needs: @@ -423,6 +446,7 @@ jobs: - build-fuse3-opendal-doc - build-unftp-sbe-opendal-doc - build-virtiofs-opendal-doc + - build-parquet-opendal-doc steps: - uses: actions/checkout@v4 diff --git a/integrations/parquet/.gitignore b/integrations/parquet/.gitignore new file mode 100644 index 00000000000..03314f77b5a --- /dev/null +++ b/integrations/parquet/.gitignore @@ -0,0 +1 @@ +Cargo.lock diff --git a/integrations/parquet/Cargo.toml b/integrations/parquet/Cargo.toml new file mode 100644 index 00000000000..1efe19b75a8 --- /dev/null +++ b/integrations/parquet/Cargo.toml @@ -0,0 +1,52 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +description = "parquet Integration for Apache OpenDAL" +name = "parquet_opendal" + +authors = ["Apache OpenDAL "] +edition = "2021" +homepage = "https://opendal.apache.org/" +license = "Apache-2.0" +repository = "https://github.com/apache/opendal" +rust-version = "1.75" +version = "0.0.1" + +[dependencies] +async-trait = "0.1" +bytes = "1" +futures = "0.3" +opendal = { version = "0.48.0", path = "../../core" } +parquet = { version = "52.0", default-features = false, features = [ + "async", + "arrow", +] } + +[dev-dependencies] +opendal = { version = "0.48.0", path = "../../core", features = [ + "services-memory", + "services-s3", +] } +rand = "0.8.5" +tokio = { version = "1", features = ["fs", "macros", "rt-multi-thread"] } +arrow = { version = "52.0" } + + +[[example]] +name = "async_writer" +path = "examples/async_writer.rs" diff --git a/integrations/parquet/examples/async_writer.rs b/integrations/parquet/examples/async_writer.rs new file mode 100644 index 00000000000..9f16f69eac5 --- /dev/null +++ b/integrations/parquet/examples/async_writer.rs @@ -0,0 +1,45 @@ +use std::sync::Arc; + +use arrow::array::{ArrayRef, Int64Array, RecordBatch}; + +use opendal::{services::S3Config, Operator}; +use parquet::arrow::{arrow_reader::ParquetRecordBatchReaderBuilder, AsyncArrowWriter}; +use parquet_opendal::AsyncWriter; + +#[tokio::main] +async fn main() { + let mut cfg = S3Config::default(); + cfg.access_key_id = Some("my_access_key".to_string()); + cfg.secret_access_key = Some("my_secret_key".to_string()); + cfg.endpoint = Some("my_endpoint".to_string()); + cfg.region = Some("my_region".to_string()); + cfg.bucket = "my_bucket".to_string(); + + // Create a new operator + let operator = Operator::from_config(cfg).unwrap().finish(); + let path = "/path/to/file.parquet"; + + // Create an async writer + let writer = AsyncWriter::new( + operator + .writer_with(path) + .chunk(32 * 1024 * 1024) + .concurrent(8) + .await + .unwrap(), + ); + + let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef; + let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap(); + let mut writer = AsyncArrowWriter::try_new(writer, to_write.schema(), None).unwrap(); + writer.write(&to_write).await.unwrap(); + writer.close().await.unwrap(); + + let buffer = operator.read(path).await.unwrap().to_bytes(); + let mut reader = ParquetRecordBatchReaderBuilder::try_new(buffer) + .unwrap() + .build() + .unwrap(); + let read = reader.next().unwrap().unwrap(); + assert_eq!(to_write, read); +} diff --git a/integrations/parquet/src/async_writer.rs b/integrations/parquet/src/async_writer.rs new file mode 100644 index 00000000000..027c9214c05 --- /dev/null +++ b/integrations/parquet/src/async_writer.rs @@ -0,0 +1,172 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use bytes::Bytes; +use parquet::arrow::async_writer::AsyncFileWriter; +use parquet::errors::{ParquetError, Result}; + +use futures::future::BoxFuture; +use opendal::Writer; + +/// AsyncWriter implements AsyncFileWriter trait by using opendal. +/// +/// ```no_run +/// use std::sync::Arc; +/// +/// use arrow::array::{ArrayRef, Int64Array, RecordBatch}; +/// +/// use opendal::{services::S3Config, Operator}; +/// use parquet::arrow::{arrow_reader::ParquetRecordBatchReaderBuilder, AsyncArrowWriter}; +/// use parquet_opendal::AsyncWriter; +/// +/// #[tokio::main] +/// async fn main() { +/// let mut cfg = S3Config::default(); +/// cfg.access_key_id = Some("my_access_key".to_string()); +/// cfg.secret_access_key = Some("my_secret_key".to_string()); +/// cfg.endpoint = Some("my_endpoint".to_string()); +/// cfg.region = Some("my_region".to_string()); +/// cfg.bucket = "my_bucket".to_string(); +/// +/// // Create a new operator +/// let operator = Operator::from_config(cfg).unwrap().finish(); +/// let path = "/path/to/file.parquet"; +/// +/// // Create an async writer +/// let writer = AsyncWriter::new( +/// operator +/// .writer_with(path) +/// .chunk(32 * 1024 * 1024) +/// .concurrent(8) +/// .await +/// .unwrap(), +/// ); +/// +/// let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef; +/// let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap(); +/// let mut writer = AsyncArrowWriter::try_new(writer, to_write.schema(), None).unwrap(); +/// writer.write(&to_write).await.unwrap(); +/// writer.close().await.unwrap(); +/// +/// let buffer = operator.read(path).await.unwrap().to_bytes(); +/// let mut reader = ParquetRecordBatchReaderBuilder::try_new(buffer) +/// .unwrap() +/// .build() +/// .unwrap(); +/// let read = reader.next().unwrap().unwrap(); +/// assert_eq!(to_write, read); +/// } +/// ``` +pub struct AsyncWriter { + inner: Writer, +} + +impl AsyncWriter { + /// Create a [`OpendalAsyncWriter`] by given [`Writer`]. + pub fn new(writer: Writer) -> Self { + Self { inner: writer } + } +} + +impl AsyncFileWriter for AsyncWriter { + fn write(&mut self, bs: Bytes) -> BoxFuture<'_, Result<()>> { + Box::pin(async move { + self.inner + .write(bs) + .await + .map_err(|err| ParquetError::External(Box::new(err))) + }) + } + + fn complete(&mut self) -> BoxFuture<'_, Result<()>> { + Box::pin(async move { + self.inner + .close() + .await + .map_err(|err| ParquetError::External(Box::new(err))) + }) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use super::*; + use arrow::array::{ArrayRef, Int64Array, RecordBatch}; + use opendal::{services, Operator}; + use parquet::arrow::{arrow_reader::ParquetRecordBatchReaderBuilder, AsyncArrowWriter}; + + #[tokio::test] + async fn test_basic() { + let op = Operator::new(services::Memory::default()).unwrap().finish(); + let path = "data/test.txt"; + let mut writer = AsyncWriter::new(op.writer(path).await.unwrap()); + let bytes = Bytes::from_static(b"hello, world!"); + writer.write(bytes).await.unwrap(); + let bytes = Bytes::from_static(b"hello, OpenDAL!"); + writer.write(bytes).await.unwrap(); + writer.complete().await.unwrap(); + + let bytes = op.read(path).await.unwrap().to_vec(); + assert_eq!(bytes, b"hello, world!hello, OpenDAL!"); + } + + #[tokio::test] + async fn test_abort() { + let op = Operator::new(services::Memory::default()).unwrap().finish(); + let path = "data/test.txt"; + let mut writer = AsyncWriter::new(op.writer(path).await.unwrap()); + let bytes = Bytes::from_static(b"hello, world!"); + writer.write(bytes).await.unwrap(); + let bytes = Bytes::from_static(b"hello, OpenDAL!"); + writer.write(bytes).await.unwrap(); + drop(writer); + + let exist = op.is_exist(path).await.unwrap(); + assert!(!exist); + } + + #[tokio::test] + async fn test_async_writer() { + let operator = Operator::new(services::Memory::default()).unwrap().finish(); + let path = "/path/to/file.parquet"; + + let writer = AsyncWriter::new( + operator + .writer_with(path) + .chunk(32 * 1024 * 1024) + .concurrent(8) + .await + .unwrap(), + ); + + let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef; + let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap(); + let mut writer = AsyncArrowWriter::try_new(writer, to_write.schema(), None).unwrap(); + writer.write(&to_write).await.unwrap(); + writer.close().await.unwrap(); + + let buffer = operator.read(path).await.unwrap().to_bytes(); + let mut reader = ParquetRecordBatchReaderBuilder::try_new(buffer) + .unwrap() + .build() + .unwrap(); + let read = reader.next().unwrap().unwrap(); + assert_eq!(to_write, read); + } +} diff --git a/integrations/parquet/src/lib.rs b/integrations/parquet/src/lib.rs new file mode 100644 index 00000000000..ded082d8237 --- /dev/null +++ b/integrations/parquet/src/lib.rs @@ -0,0 +1,72 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! parquet_opendal provides parquet IO utils. +//! +//! AsyncWriter implements AsyncFileWriter trait by using opendal. +//! +//! ```no_run +//! use std::sync::Arc; +//! +//! use arrow::array::{ArrayRef, Int64Array, RecordBatch}; +//! +//! use opendal::{services::S3Config, Operator}; +//! use parquet::arrow::{arrow_reader::ParquetRecordBatchReaderBuilder, AsyncArrowWriter}; +//! use parquet_opendal::AsyncWriter; +//! +//! #[tokio::main] +//! async fn main() { +//! let mut cfg = S3Config::default(); +//! cfg.access_key_id = Some("my_access_key".to_string()); +//! cfg.secret_access_key = Some("my_secret_key".to_string()); +//! cfg.endpoint = Some("my_endpoint".to_string()); +//! cfg.region = Some("my_region".to_string()); +//! cfg.bucket = "my_bucket".to_string(); +//! +//! // Create a new operator +//! let operator = Operator::from_config(cfg).unwrap().finish(); +//! let path = "/path/to/file.parquet"; +//! +//! // Create an async writer +//! let writer = AsyncWriter::new( +//! operator +//! .writer_with(path) +//! .chunk(32 * 1024 * 1024) +//! .concurrent(8) +//! .await +//! .unwrap(), +//! ); +//! +//! let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef; +//! let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap(); +//! let mut writer = AsyncArrowWriter::try_new(writer, to_write.schema(), None).unwrap(); +//! writer.write(&to_write).await.unwrap(); +//! writer.close().await.unwrap(); +//! +//! let buffer = operator.read(path).await.unwrap().to_bytes(); +//! let mut reader = ParquetRecordBatchReaderBuilder::try_new(buffer) +//! .unwrap() +//! .build() +//! .unwrap(); +//! let read = reader.next().unwrap().unwrap(); +//! assert_eq!(to_write, read); +//! } +//! ``` + +mod async_writer; + +pub use async_writer::AsyncWriter;