Skip to content

Commit

Permalink
feat: introduce opendal AsyncWriter for parquet integrations (#4958)
Browse files Browse the repository at this point in the history
* chore: init parquet crate

* feat: implement the `OpendalAsyncWriter`

* chore: apply suggestions from CR

* chore: remove arrow dep from default

* chore(ci): add ci for opendal_parquet

* test: add test for async writer

* chore: remove arrow dep

* chore(ci): add doc test

* Update .github/workflows/ci_integration_parquet.yml

* chore(ci): run cargo test

---------

Co-authored-by: Xuanwo <[email protected]>
  • Loading branch information
WenyXu and Xuanwo authored Aug 5, 2024
1 parent fac74d4 commit b37191d
Show file tree
Hide file tree
Showing 7 changed files with 417 additions and 0 deletions.
51 changes: 51 additions & 0 deletions .github/workflows/ci_integration_parquet.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

name: Integration Parquet CI

on:
push:
branches:
- main
pull_request:
branches:
- main
paths:
- "integrations/parquet/**"
- "core/**"
- ".github/workflows/ci_integration_parquet.yml"

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}-${{ github.event_name }}
cancel-in-progress: true

jobs:
check_clippy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4

- name: Setup Rust toolchain
uses: ./.github/actions/setup

- name: Cargo clippy
working-directory: integrations/parquet
run: cargo clippy --all-targets --all-features -- -D warnings

- name: Cargo test
working-directory: integrations/parquet
run: cargo test
24 changes: 24 additions & 0 deletions .github/workflows/docs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,29 @@ jobs:
name: virtiofs-opendal-docs
path: ./integrations/virtiofs/target/doc

build-parquet-opendal-doc:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v4

- name: Setup Rust toolchain
uses: ./.github/actions/setup

- name: Setup Rust Nightly
run: |
rustup toolchain install ${{ env.RUST_DOC_TOOLCHAIN }}
- name: Build parquet-opendal doc
working-directory: "integrations/parquet"
run: cargo +${{ env.RUST_DOC_TOOLCHAIN }} doc --lib --no-deps --all-features

- name: Upload docs
uses: actions/upload-artifact@v3
with:
name: object-parquet-docs
path: ./integrations/parquet/target/doc

build-website:
runs-on: ubuntu-latest
needs:
Expand All @@ -423,6 +446,7 @@ jobs:
- build-fuse3-opendal-doc
- build-unftp-sbe-opendal-doc
- build-virtiofs-opendal-doc
- build-parquet-opendal-doc

steps:
- uses: actions/checkout@v4
Expand Down
1 change: 1 addition & 0 deletions integrations/parquet/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Cargo.lock
52 changes: 52 additions & 0 deletions integrations/parquet/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

[package]
description = "parquet Integration for Apache OpenDAL"
name = "parquet_opendal"

authors = ["Apache OpenDAL <[email protected]>"]
edition = "2021"
homepage = "https://opendal.apache.org/"
license = "Apache-2.0"
repository = "https://github.com/apache/opendal"
rust-version = "1.75"
version = "0.0.1"

[dependencies]
async-trait = "0.1"
bytes = "1"
futures = "0.3"
opendal = { version = "0.48.0", path = "../../core" }
parquet = { version = "52.0", default-features = false, features = [
"async",
"arrow",
] }

[dev-dependencies]
opendal = { version = "0.48.0", path = "../../core", features = [
"services-memory",
"services-s3",
] }
rand = "0.8.5"
tokio = { version = "1", features = ["fs", "macros", "rt-multi-thread"] }
arrow = { version = "52.0" }


[[example]]
name = "async_writer"
path = "examples/async_writer.rs"
45 changes: 45 additions & 0 deletions integrations/parquet/examples/async_writer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use std::sync::Arc;

use arrow::array::{ArrayRef, Int64Array, RecordBatch};

use opendal::{services::S3Config, Operator};
use parquet::arrow::{arrow_reader::ParquetRecordBatchReaderBuilder, AsyncArrowWriter};
use parquet_opendal::AsyncWriter;

#[tokio::main]
async fn main() {
let mut cfg = S3Config::default();
cfg.access_key_id = Some("my_access_key".to_string());
cfg.secret_access_key = Some("my_secret_key".to_string());
cfg.endpoint = Some("my_endpoint".to_string());
cfg.region = Some("my_region".to_string());
cfg.bucket = "my_bucket".to_string();

// Create a new operator
let operator = Operator::from_config(cfg).unwrap().finish();
let path = "/path/to/file.parquet";

// Create an async writer
let writer = AsyncWriter::new(
operator
.writer_with(path)
.chunk(32 * 1024 * 1024)
.concurrent(8)
.await
.unwrap(),
);

let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
let mut writer = AsyncArrowWriter::try_new(writer, to_write.schema(), None).unwrap();
writer.write(&to_write).await.unwrap();
writer.close().await.unwrap();

let buffer = operator.read(path).await.unwrap().to_bytes();
let mut reader = ParquetRecordBatchReaderBuilder::try_new(buffer)
.unwrap()
.build()
.unwrap();
let read = reader.next().unwrap().unwrap();
assert_eq!(to_write, read);
}
172 changes: 172 additions & 0 deletions integrations/parquet/src/async_writer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use bytes::Bytes;
use parquet::arrow::async_writer::AsyncFileWriter;
use parquet::errors::{ParquetError, Result};

use futures::future::BoxFuture;
use opendal::Writer;

/// AsyncWriter implements AsyncFileWriter trait by using opendal.
///
/// ```no_run
/// use std::sync::Arc;
///
/// use arrow::array::{ArrayRef, Int64Array, RecordBatch};
///
/// use opendal::{services::S3Config, Operator};
/// use parquet::arrow::{arrow_reader::ParquetRecordBatchReaderBuilder, AsyncArrowWriter};
/// use parquet_opendal::AsyncWriter;
///
/// #[tokio::main]
/// async fn main() {
/// let mut cfg = S3Config::default();
/// cfg.access_key_id = Some("my_access_key".to_string());
/// cfg.secret_access_key = Some("my_secret_key".to_string());
/// cfg.endpoint = Some("my_endpoint".to_string());
/// cfg.region = Some("my_region".to_string());
/// cfg.bucket = "my_bucket".to_string();
///
/// // Create a new operator
/// let operator = Operator::from_config(cfg).unwrap().finish();
/// let path = "/path/to/file.parquet";
///
/// // Create an async writer
/// let writer = AsyncWriter::new(
/// operator
/// .writer_with(path)
/// .chunk(32 * 1024 * 1024)
/// .concurrent(8)
/// .await
/// .unwrap(),
/// );
///
/// let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
/// let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
/// let mut writer = AsyncArrowWriter::try_new(writer, to_write.schema(), None).unwrap();
/// writer.write(&to_write).await.unwrap();
/// writer.close().await.unwrap();
///
/// let buffer = operator.read(path).await.unwrap().to_bytes();
/// let mut reader = ParquetRecordBatchReaderBuilder::try_new(buffer)
/// .unwrap()
/// .build()
/// .unwrap();
/// let read = reader.next().unwrap().unwrap();
/// assert_eq!(to_write, read);
/// }
/// ```
pub struct AsyncWriter {
inner: Writer,
}

impl AsyncWriter {
/// Create a [`OpendalAsyncWriter`] by given [`Writer`].
pub fn new(writer: Writer) -> Self {
Self { inner: writer }
}
}

impl AsyncFileWriter for AsyncWriter {
fn write(&mut self, bs: Bytes) -> BoxFuture<'_, Result<()>> {
Box::pin(async move {
self.inner
.write(bs)
.await
.map_err(|err| ParquetError::External(Box::new(err)))
})
}

fn complete(&mut self) -> BoxFuture<'_, Result<()>> {
Box::pin(async move {
self.inner
.close()
.await
.map_err(|err| ParquetError::External(Box::new(err)))
})
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use super::*;
use arrow::array::{ArrayRef, Int64Array, RecordBatch};
use opendal::{services, Operator};
use parquet::arrow::{arrow_reader::ParquetRecordBatchReaderBuilder, AsyncArrowWriter};

#[tokio::test]
async fn test_basic() {
let op = Operator::new(services::Memory::default()).unwrap().finish();
let path = "data/test.txt";
let mut writer = AsyncWriter::new(op.writer(path).await.unwrap());
let bytes = Bytes::from_static(b"hello, world!");
writer.write(bytes).await.unwrap();
let bytes = Bytes::from_static(b"hello, OpenDAL!");
writer.write(bytes).await.unwrap();
writer.complete().await.unwrap();

let bytes = op.read(path).await.unwrap().to_vec();
assert_eq!(bytes, b"hello, world!hello, OpenDAL!");
}

#[tokio::test]
async fn test_abort() {
let op = Operator::new(services::Memory::default()).unwrap().finish();
let path = "data/test.txt";
let mut writer = AsyncWriter::new(op.writer(path).await.unwrap());
let bytes = Bytes::from_static(b"hello, world!");
writer.write(bytes).await.unwrap();
let bytes = Bytes::from_static(b"hello, OpenDAL!");
writer.write(bytes).await.unwrap();
drop(writer);

let exist = op.is_exist(path).await.unwrap();
assert!(!exist);
}

#[tokio::test]
async fn test_async_writer() {
let operator = Operator::new(services::Memory::default()).unwrap().finish();
let path = "/path/to/file.parquet";

let writer = AsyncWriter::new(
operator
.writer_with(path)
.chunk(32 * 1024 * 1024)
.concurrent(8)
.await
.unwrap(),
);

let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
let mut writer = AsyncArrowWriter::try_new(writer, to_write.schema(), None).unwrap();
writer.write(&to_write).await.unwrap();
writer.close().await.unwrap();

let buffer = operator.read(path).await.unwrap().to_bytes();
let mut reader = ParquetRecordBatchReaderBuilder::try_new(buffer)
.unwrap()
.build()
.unwrap();
let read = reader.next().unwrap().unwrap();
assert_eq!(to_write, read);
}
}
Loading

0 comments on commit b37191d

Please sign in to comment.