Skip to content

Commit

Permalink
Prevent latest snapshot pointer from moving backwards
Browse files Browse the repository at this point in the history
  • Loading branch information
pcholakov committed Nov 27, 2024
1 parent 36dab65 commit 7fbab4e
Showing 1 changed file with 61 additions and 7 deletions.
68 changes: 61 additions & 7 deletions crates/worker/src/partition/snapshots/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,17 @@
use std::path::{Path, PathBuf};
use std::sync::Arc;

use anyhow::{anyhow, Context};
use anyhow::{anyhow, bail, Context};
use async_trait::async_trait;
use aws_config::default_provider::credentials::DefaultCredentialsChain;
use aws_config::BehaviorVersion;
use aws_credential_types::provider::ProvideCredentials;
use object_store::aws::AmazonS3Builder;
use object_store::{MultipartUpload, ObjectStore, PutPayload};
use object_store::{GetOptions, MultipartUpload, ObjectStore, PutPayload};
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
use tokio::io::AsyncReadExt;
use tracing::{debug, instrument, trace};
use tracing::{debug, info, instrument, trace};
use url::Url;

use restate_partition_store::snapshots::{PartitionSnapshotMetadata, SnapshotFormatVersion};
Expand Down Expand Up @@ -57,6 +57,9 @@ pub struct LatestSnapshot {

pub partition_id: PartitionId,

/// Restate cluster name which produced the snapshot.
pub cluster_name: String,

/// Node that produced this snapshot.
pub node_name: String,

Expand Down Expand Up @@ -193,8 +196,10 @@ impl SnapshotRepository {
);
}

let metadata_key =
object_store::path::Path::from(format!("{}/metadata.json", full_snapshot_path.as_str()));
let metadata_key = object_store::path::Path::from(format!(
"{}/metadata.json",
full_snapshot_path.as_str()
));
let metadata_json_payload = PutPayload::from(
serde_json::to_string_pretty(snapshot).expect("Can always serialize JSON"),
);
Expand All @@ -210,10 +215,11 @@ impl SnapshotRepository {

let latest = LatestSnapshot {
version: snapshot.version,
partition_id: snapshot.partition_id,
cluster_name: snapshot.cluster_name.clone(),
node_name: snapshot.node_name.clone(),
created_at: snapshot.created_at.clone(),
partition_id: snapshot.partition_id,
snapshot_id: snapshot.snapshot_id,
created_at: snapshot.created_at.clone(),
min_applied_lsn: snapshot.min_applied_lsn,
path: relative_snapshot_path,
};
Expand All @@ -222,6 +228,54 @@ impl SnapshotRepository {
prefix = self.prefix,
partition_id = snapshot.partition_id,
));

// We can not do atomic CAS, but we can try to prevent the pointer from moving backwards!
// This does not help with different nodes updating the pointer concurrently but that is acceptable.
// We expect this path to not be contended, this check just serves as a correctness backstop.
let maybe_stored = match self.object_store.get(&latest_path).await {
Ok(result) => {
let parse_result: serde_json::Result<LatestSnapshot> =
serde_json::from_slice(result.bytes().await?.iter().as_slice());
parse_result
.inspect_err(|e| {
info!(
repository_latest_lsn = "unknown",
new_snapshot_lsn = ?snapshot.min_applied_lsn,
"Failed to parse stored latest snapshot pointer, will update it: {}",
e
)
})
.ok()
}
Err(object_store::Error::NotFound { .. }) => {
debug!(
repository_latest_lsn = "none",
new_snapshot_lsn = ?snapshot.min_applied_lsn,
"No latest snapshot pointer found, will create one"
);
None
}
Err(e) => {
bail!("Failed to get latest snapshot pointer: {}", e);
}
};

if maybe_stored
.as_ref()
.is_some_and(|stored| stored.min_applied_lsn >= snapshot.min_applied_lsn)
{
let repository_latest_lsn = maybe_stored.expect("is some").min_applied_lsn;
info!(
?repository_latest_lsn,
new_snapshot_lsn = ?snapshot.min_applied_lsn,
"Newly created snapshot is not newer than the latest stored snapshot, will not update latest pointer"
);
return Err(anyhow!(
"Snapshot repository already contains snapshot at LSN {}",
repository_latest_lsn,
));
}

let latest_json_payload = PutPayload::from(serde_json::to_string_pretty(&latest)?);
let put_result = self
.object_store
Expand Down

0 comments on commit 7fbab4e

Please sign in to comment.