Skip to content

Commit

Permalink
Add PUT conditions to make the latest.json pointer update a CAS opera…
Browse files Browse the repository at this point in the history
…tion
  • Loading branch information
pcholakov committed Nov 27, 2024
1 parent 50ff8bc commit ff6d9ce
Showing 1 changed file with 18 additions and 8 deletions.
26 changes: 18 additions & 8 deletions crates/worker/src/partition/snapshots/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use aws_config::BehaviorVersion;
use aws_credential_types::provider::ProvideCredentials;
use bytes::BytesMut;
use object_store::aws::AmazonS3Builder;
use object_store::{MultipartUpload, ObjectStore, PutPayload};
use object_store::{MultipartUpload, ObjectStore, PutMode, PutOptions, PutPayload};
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
use tokio::io::AsyncReadExt;
Expand Down Expand Up @@ -230,11 +230,10 @@ impl SnapshotRepository {
partition_id = snapshot.partition_id,
));

// We can not do atomic CAS, but we can try to prevent the pointer from moving backwards!
// This does not help with different nodes updating the pointer concurrently but that is acceptable.
// We expect this path to not be contended, this check just serves as a correctness backstop.
// By performing a CAS on the latest snapshot pointer, we can ensure strictly monotonic updates.
let maybe_stored = match self.object_store.get(&latest_path).await {
Ok(result) => {
let attributes = result.attributes.clone();
let parse_result: serde_json::Result<LatestSnapshot> =
serde_json::from_slice(result.bytes().await?.iter().as_slice());
parse_result
Expand All @@ -247,6 +246,7 @@ impl SnapshotRepository {
)
})
.ok()
.map(|metadata| (attributes, metadata))
}
Err(object_store::Error::NotFound { .. }) => {
debug!(
Expand All @@ -263,9 +263,9 @@ impl SnapshotRepository {

if maybe_stored
.as_ref()
.is_some_and(|stored| stored.min_applied_lsn >= snapshot.min_applied_lsn)
.is_some_and(|(_, stored)| stored.min_applied_lsn >= snapshot.min_applied_lsn)
{
let repository_latest_lsn = maybe_stored.expect("is some").min_applied_lsn;
let repository_latest_lsn = maybe_stored.expect("is some").1.min_applied_lsn;
info!(
?repository_latest_lsn,
new_snapshot_lsn = ?snapshot.min_applied_lsn,
Expand All @@ -277,10 +277,20 @@ impl SnapshotRepository {
));
}

let latest_json_payload = PutPayload::from(serde_json::to_string_pretty(&latest)?);
let latest_json = PutPayload::from(serde_json::to_string_pretty(&latest)?);
let conditions = maybe_stored
.map(|(attributes, _)| PutOptions {
mode: PutMode::Overwrite,
attributes,
..PutOptions::default()
})
.unwrap_or_else(|| PutOptions {
mode: PutMode::Create,
..PutOptions::default()
});
let put_result = self
.object_store
.put(&latest_path, latest_json_payload)
.put_opts(&latest_path, latest_json, conditions)
.await?;
debug!(
etag = put_result.e_tag.unwrap_or_default(),
Expand Down

0 comments on commit ff6d9ce

Please sign in to comment.