Skip to content

Commit

Permalink
Reuse a BytesMut buffer for upload chunk data
Browse files Browse the repository at this point in the history
  • Loading branch information
pcholakov committed Nov 27, 2024
1 parent 7fbab4e commit 50ff8bc
Showing 1 changed file with 41 additions and 19 deletions.
60 changes: 41 additions & 19 deletions crates/worker/src/partition/snapshots/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ use async_trait::async_trait;
use aws_config::default_provider::credentials::DefaultCredentialsChain;
use aws_config::BehaviorVersion;
use aws_credential_types::provider::ProvideCredentials;
use bytes::BytesMut;
use object_store::aws::AmazonS3Builder;
use object_store::{GetOptions, MultipartUpload, ObjectStore, PutPayload};
use object_store::{MultipartUpload, ObjectStore, PutPayload};
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
use tokio::io::AsyncReadExt;
Expand Down Expand Up @@ -299,7 +300,7 @@ impl SnapshotRepository {

/// S3 and other stores require a certain minimum size for the parts of a multipart upload. It is an
/// API error to attempt a multipart put below this size, apart from the final segment.
const MULTIPART_UPLOAD_THRESHOLD_BYTES: usize = 5 * 1024 * 1024;
const MULTIPART_UPLOAD_CHUNK_SIZE_BYTES: usize = 5 * 1024 * 1024;

async fn put_snapshot_object(
snapshot_path: &Path,
Expand All @@ -308,28 +309,49 @@ async fn put_snapshot_object(
) -> anyhow::Result<object_store::PutResult> {
let mut snapshot = tokio::fs::File::open(snapshot_path).await?;

if snapshot.metadata().await?.len() < MULTIPART_UPLOAD_THRESHOLD_BYTES as u64 {
if snapshot.metadata().await?.len() < MULTIPART_UPLOAD_CHUNK_SIZE_BYTES as u64 {
let payload = PutPayload::from(tokio::fs::read(snapshot_path).await?);
object_store.put(key, payload).await.map_err(|e| e.into())
} else {
let mut upload = object_store.put_multipart(key).await?;
return object_store.put(key, payload).await.map_err(|e| e.into());
}

debug!("Performing multipart upload for {key}");
let mut upload = object_store.put_multipart(key).await?;

let mut buf = BytesMut::new();
let result: anyhow::Result<_> = async {
loop {
let mut buf = vec![0; MULTIPART_UPLOAD_THRESHOLD_BYTES];
let n = snapshot.read(&mut buf).await?;
if n == 0 {
let mut len = 0;
buf.reserve(MULTIPART_UPLOAD_CHUNK_SIZE_BYTES);

// Ensure full buffer unless at EOF
while buf.len() < MULTIPART_UPLOAD_CHUNK_SIZE_BYTES {
len = snapshot.read_buf(&mut buf).await?;
if len == 0 {
break;
}
}

if !buf.is_empty() {
upload
.put_part(PutPayload::from_bytes(buf.split().freeze()))
.await?;
}

if len == 0 {
break;
}
let part = PutPayload::from(buf);
upload
.put_part(part)
.await
.context("Failed to put snapshot part in repository")?;
trace!("Uploaded chunk of {} bytes", n);
}
upload
.complete()
.await
.context("Failed to put snapshot in repository")
upload.complete().await.map_err(|e| anyhow!(e))
}
.await;

match result {
Ok(r) => Ok(r),
Err(e) => {
debug!("Aborting failed multipart upload");
upload.abort().await?;
Err(e)
}
}
}

Expand Down

0 comments on commit 50ff8bc

Please sign in to comment.