From 50ff8bc13622521c895b7530dced6c46b6bcc759 Mon Sep 17 00:00:00 2001 From: Pavel Tcholakov Date: Tue, 26 Nov 2024 16:01:14 -0500 Subject: [PATCH] Reuse a BytesMut buffer for upload chunk data --- .../src/partition/snapshots/repository.rs | 60 +++++++++++++------ 1 file changed, 41 insertions(+), 19 deletions(-) diff --git a/crates/worker/src/partition/snapshots/repository.rs b/crates/worker/src/partition/snapshots/repository.rs index 06ad37b07..7ac5315c1 100644 --- a/crates/worker/src/partition/snapshots/repository.rs +++ b/crates/worker/src/partition/snapshots/repository.rs @@ -16,8 +16,9 @@ use async_trait::async_trait; use aws_config::default_provider::credentials::DefaultCredentialsChain; use aws_config::BehaviorVersion; use aws_credential_types::provider::ProvideCredentials; +use bytes::BytesMut; use object_store::aws::AmazonS3Builder; -use object_store::{GetOptions, MultipartUpload, ObjectStore, PutPayload}; +use object_store::{MultipartUpload, ObjectStore, PutPayload}; use serde::{Deserialize, Serialize}; use serde_with::serde_as; use tokio::io::AsyncReadExt; @@ -299,7 +300,7 @@ impl SnapshotRepository { /// S3 and other stores require a certain minimum size for the parts of a multipart upload. It is an /// API error to attempt a multipart put below this size, apart from the final segment. -const MULTIPART_UPLOAD_THRESHOLD_BYTES: usize = 5 * 1024 * 1024; +const MULTIPART_UPLOAD_CHUNK_SIZE_BYTES: usize = 5 * 1024 * 1024; async fn put_snapshot_object( snapshot_path: &Path, @@ -308,28 +309,49 @@ async fn put_snapshot_object( ) -> anyhow::Result { let mut snapshot = tokio::fs::File::open(snapshot_path).await?; - if snapshot.metadata().await?.len() < MULTIPART_UPLOAD_THRESHOLD_BYTES as u64 { + if snapshot.metadata().await?.len() < MULTIPART_UPLOAD_CHUNK_SIZE_BYTES as u64 { let payload = PutPayload::from(tokio::fs::read(snapshot_path).await?); - object_store.put(key, payload).await.map_err(|e| e.into()) - } else { - let mut upload = object_store.put_multipart(key).await?; + return object_store.put(key, payload).await.map_err(|e| e.into()); + } + + debug!("Performing multipart upload for {key}"); + let mut upload = object_store.put_multipart(key).await?; + + let mut buf = BytesMut::new(); + let result: anyhow::Result<_> = async { loop { - let mut buf = vec![0; MULTIPART_UPLOAD_THRESHOLD_BYTES]; - let n = snapshot.read(&mut buf).await?; - if n == 0 { + let mut len = 0; + buf.reserve(MULTIPART_UPLOAD_CHUNK_SIZE_BYTES); + + // Ensure full buffer unless at EOF + while buf.len() < MULTIPART_UPLOAD_CHUNK_SIZE_BYTES { + len = snapshot.read_buf(&mut buf).await?; + if len == 0 { + break; + } + } + + if !buf.is_empty() { + upload + .put_part(PutPayload::from_bytes(buf.split().freeze())) + .await?; + } + + if len == 0 { break; } - let part = PutPayload::from(buf); - upload - .put_part(part) - .await - .context("Failed to put snapshot part in repository")?; - trace!("Uploaded chunk of {} bytes", n); } - upload - .complete() - .await - .context("Failed to put snapshot in repository") + upload.complete().await.map_err(|e| anyhow!(e)) + } + .await; + + match result { + Ok(r) => Ok(r), + Err(e) => { + debug!("Aborting failed multipart upload"); + upload.abort().await?; + Err(e) + } } }