Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce SnapshotRepository and object store integration #2310

Merged
merged 22 commits into from
Dec 6, 2024

Conversation

pcholakov
Copy link
Contributor

@pcholakov pcholakov commented Nov 18, 2024

This change introduces a SnapshotRepository responsible for uploading snapshots to a remote object store.

Sample usage

Configuration:

[worker.snapshots]
destination = "file:///Users/pavel/test-cluster-snapshots"

Currently only s3:// and file:// URLs are supported and work just as expected.

Snapshot creation:

> tree ~/test-cluster-snapshots/
/Users/pavel/test-cluster-snapshots/
├── 0
│   ├── latest.json
│   ├── lsn_85
│   │   ├── 000499.sst
│   │   ├── 000559.sst
│   │   ├── 000578.sst
│   │   └── metadata.json
│   └── lsn_88
│       ├── 000579.sst
│       ├── 000602.sst
│       └── metadata.json
├── 1
│   ├── latest.json
│   ├── lsn_94
│   │   ├── 000491.sst
│   │   ├── 000555.sst
│   │   ├── 000586.sst
│   │   └── metadata.json
│   └── lsn_96
│       ├── 000587.sst
│       ├── 000603.sst
│       └── metadata.json
└── 2
    ├── latest.json
    ├── lsn_71
    │   ├── 000475.sst
    │   ├── 000547.sst
    │   ├── 000594.sst
    │   └── metadata.json
    └── lsn_72
        ├── 000595.sst
        ├── 000604.sst
        └── metadata.json

10 directories, 24 files

> cat ~/test-cluster-snapshots/1/latest.json
{
  "version": "V1",
  "lsn": 96,
  "partition_id": 1,
  "node_name": "Pavels-MacBook-Pro.local",
  "created_at": "2024-11-21T19:17:15.755049000Z",
  "snapshot_id": "snap_17IyaexCTaiY7ZvMJcsPdg5",
  "min_applied_lsn": 96,
  "path": "lsn_96"
}

> cat ~/test-cluster-snapshots/1/lsn_96/metadata.json
{
  "version": "V1",
  "cluster_name": "localcluster",
  "partition_id": 1,
  "node_name": "Pavels-MacBook-Pro.local",
  "created_at": "2024-11-21T19:17:15.755049000Z",
  "snapshot_id": "snap_17IyaexCTaiY7ZvMJcsPdg5",
  "key_range": {
    "start": 768614336404564651,
    "end": 1537228672809129301
  },
  "min_applied_lsn": 96,
  "db_comparator_name": "leveldb.BytewiseComparator",
  "files": [
    {
      "column_family_name": "",
      "name": "/000603.sst",
      "directory": "/Users/pavel/restate/restate/restate-data/Pavels-MacBook-Pro.local/db-snapshots/1/snap_17IyaexCTaiY7ZvMJcsPdg5",
      "size": 1268,
      "level": 0,
      "start_key": "64650000000000000001010453454c46",
      "end_key": "667300000000000000010000000000000002",
      "smallest_seqno": 6063,
      "largest_seqno": 6064,
      "num_entries": 0,
      "num_deletions": 0
    },
    {
      "column_family_name": "",
      "name": "/000587.sst",
      "directory": "/Users/pavel/restate/restate/restate-data/Pavels-MacBook-Pro.local/db-snapshots/1/snap_17IyaexCTaiY7ZvMJcsPdg5",
      "size": 1143,
      "level": 6,
      "start_key": "64650000000000000001010453454c46",
      "end_key": "667300000000000000010000000000000002",
      "smallest_seqno": 0,
      "largest_seqno": 0,
      "num_entries": 0,
      "num_deletions": 0
    }
  ]
}

Future work:

  • Implement fetching and bootstrapping from snapshot
  • Implement parallel multi-part upload
  • Implement trim-gap handling using snapshots

Closes: #2197

Copy link

github-actions bot commented Nov 18, 2024

Test Results

  7 files  ±0    7 suites  ±0   4m 31s ⏱️ +13s
 47 tests ±0   46 ✅ ±0  1 💤 ±0  0 ❌ ±0 
182 runs  ±0  179 ✅ ±0  3 💤 ±0  0 ❌ ±0 

Results for commit 24abfde. ± Comparison against base commit 7a62c27.

♻️ This comment has been updated with latest results.

muhamadazmy
muhamadazmy previously approved these changes Nov 18, 2024
Copy link
Contributor

@muhamadazmy muhamadazmy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @pcholakov for creating this PR. It looks good to me! I left 2 very minor comments

crates/worker/src/lib.rs Outdated Show resolved Hide resolved
.into_string()
.map(|path| format!("file://{path}"))
})
.map_err(|e| anyhow!("Unable to convert path to string: {:?}", e))?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.map_err(|e| anyhow!("Unable to convert path to string: {:?}", e))?;
.context("Unable to convert path to string")?;

This will still include the 'inner' error in the output string when printed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That approach doesn't work here because OsString::into_string() returns Result<String, OsString>, which doesn't meet Anyhow's trait bounds :-)

crates/worker/src/partition/snapshots/repository.rs Outdated Show resolved Hide resolved
crates/worker/src/partition/snapshots/repository.rs Outdated Show resolved Hide resolved
Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for creating this PR @pcholakov. The changes look really good. The one question I have is whether there is a way to avoid materializing the tarball and re-reading into memory. It would be awesome if we can stream the tarballing into the object-store upload.

crates/types/Cargo.toml Outdated Show resolved Hide resolved
crates/worker/src/partition/snapshots/repository.rs Outdated Show resolved Hide resolved
/// Write a partition snapshot to the snapshot repository.
pub(crate) async fn put(
&self,
partition_id: PartitionId,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't partition_id already part of PartitionSnapshotMetadata?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed :-)

crates/worker/src/partition/snapshots/repository.rs Outdated Show resolved Hide resolved
crates/worker/src/partition/snapshots/repository.rs Outdated Show resolved Hide resolved
crates/worker/src/partition/snapshots/repository.rs Outdated Show resolved Hide resolved
Comment on lines 156 to 157
// todo(pavel): don't buffer the entire snapshot in memory!
let payload = PutPayload::from(tokio::fs::read(tarball.path()).await?);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would indeed be great. Especially once we have larger snapshots.

Copy link
Contributor

@muhamadazmy muhamadazmy Nov 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ObjecStore already supports multi part upload, you can use that to upload the tar in chunks instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented in the latest revision! 🎉

// the latest snapshot is always first.
let inverted_sort_key = format!("{:016x}", u64::MAX - lsn.as_u64());

// The snapshot data / metadata key format is: [<base_prefix>/]<partition_id>/<sort_key>_<lsn>_<snapshot_id>.tar
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the idea for distinguishing full from incremental snapshots in the future? Would the latter have a completely different path or contain a marker file that denotes them as incremental?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm about to introduce this shortly to this PR - the key idea is to upload the tar archives and metadata JSON files separately, so that interested nodes can easily query just the metadata. We can gradually introduce additional attributes to the metadata JSON schema to support referencing the constituent parts of an incremental snapshot. The snapshot format version field within the metadata blob will allow nodes to know how to interpret it - or fail loudly if the Restate server is an older version that doesn't understand it.

The paths will be something like:

  • [<prefix>/]metadata/<partition_id>/<sort_key>-<snapshot_id>-{lsn}.json
  • [<prefix>/]snapshot/<partition_id>/<sort_key>-<snapshot_id>-{lsn}.tar

I imagine that at some point we'll add incremental snapshots and the repository format will then look something along the lines of:

  • [<prefix>/]metadata/<partition_id>/<sort_key>-<snapshot_id>-{lsn}.json (V2)
  • [<prefix>/]files/<partition_id>/<snapshot_id>-{filename}.sst

In this world, there will no longer be 1:1 metadata-to-snapshot correspondence but rather a 1:n relationship. Additionally, we may want to write some sort of index metadata to make it cheaper to garbage collect disused SSTs - but I haven't thought too much about that yet.

crates/worker/src/partition/snapshots/repository.rs Outdated Show resolved Hide resolved
crates/worker/src/partition/snapshots/repository.rs Outdated Show resolved Hide resolved
@pcholakov pcholakov force-pushed the refactor/snapshots-to-ppm branch from 9f6d162 to d686e7e Compare November 20, 2024 16:57
Copy link
Contributor Author

@pcholakov pcholakov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @tillrohrmann and @muhamadazmy for your early input, it was really valuable! I've pushed a new revision but I still want to remove tar archiving before I mark it ready for review.

crates/worker/src/lib.rs Outdated Show resolved Hide resolved
crates/worker/Cargo.toml Outdated Show resolved Hide resolved
.into_string()
.map(|path| format!("file://{path}"))
})
.map_err(|e| anyhow!("Unable to convert path to string: {:?}", e))?;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That approach doesn't work here because OsString::into_string() returns Result<String, OsString>, which doesn't meet Anyhow's trait bounds :-)

crates/worker/src/partition/snapshots/repository.rs Outdated Show resolved Hide resolved
crates/worker/src/partition/snapshots/repository.rs Outdated Show resolved Hide resolved
crates/worker/src/partition/snapshots/repository.rs Outdated Show resolved Hide resolved
crates/worker/src/partition/snapshots/repository.rs Outdated Show resolved Hide resolved
let mut tarball = tar::Builder::new(NamedTempFile::new_in(&staging_path)?);
debug!(
"Creating snapshot tarball of {:?} in: {:?}...",
&staging_path,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've renamed staging_path to local_snapshot_path for clarity - that's the raw RocksDB column family export directory with the SSTs plus our own metadata JSON blob. We then tar that directory up into an archive at the path snapshot_archive_path.

crates/worker/src/partition/snapshots/repository.rs Outdated Show resolved Hide resolved
Comment on lines 156 to 157
// todo(pavel): don't buffer the entire snapshot in memory!
let payload = PutPayload::from(tokio::fs::read(tarball.path()).await?);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented in the latest revision! 🎉

@pcholakov pcholakov force-pushed the feat/snapshot-upload branch from 768bddf to 56e659f Compare November 20, 2024 20:05
Base automatically changed from refactor/snapshots-to-ppm to main November 21, 2024 10:54
@pcholakov pcholakov force-pushed the feat/snapshot-upload branch from 56e659f to cee99e6 Compare November 21, 2024 19:18
@pcholakov pcholakov changed the title Introduce SnapshotsRepository backed by object_store Introduce SnapshotRepository and object store integration Nov 21, 2024
@pcholakov pcholakov marked this pull request as ready for review November 21, 2024 19:22
@pcholakov pcholakov force-pushed the feat/snapshot-upload branch from 76f4843 to 38268d6 Compare November 22, 2024 12:58
@pcholakov pcholakov dismissed muhamadazmy’s stale review November 22, 2024 13:33

Substantial changes since initial revision

Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for creating this PR @pcholakov. The changes look really nice. I left a few minor comments. The one question I had was whether concurrent modifications of a snapshot metadata.json or the latest.json can be a problem (e.g. if an old and new leader upload a snapshot at the same time)?

crates/core/src/worker_api/partition_processor_manager.rs Outdated Show resolved Hide resolved
Comment on lines 58 to 59
/// Restate cluster name which produced the snapshot.
pub lsn: Lsn,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment seems to be a bit off.

Copy link
Contributor Author

@pcholakov pcholakov Nov 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The entire field is redundant! (We have min_applied_lsn below.)

Comment on lines 153 to 159
debug!(
%lsn,
"Publishing partition snapshot to: {}",
self.destination,
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can instrument put via #[instrument()] and include the lsn, snapshot id, etc.

Comment on lines 196 to 202
let put_result = self
.object_store
.put(&metadata_key, metadata_json_payload)
.await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a possibility for two processes taking a snapshot for the same lsn (e.g. an old leader and a new one) which aren't exactly the same because the effective lsn is different? If this is possible, is this a problem?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely! This is partly why I'm still on the fence about the exact snapshot naming scheme. One simple solution is to use snapshot IDs to disambiguate snapshots for the same LSN as they must be (modulo ULID collision) unique across nodes. I'd combine that with conditional put (only succeed if file does not exist) and complain loudly if it ever fails.

Comment on lines 222 to 292
let put_result = self
.object_store
.put(&latest_path, latest_json_payload)
.await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question here but for different lsns. How are we gonna us e the latest.json? I could imagine how a slow old leader completes a snapshot after a new snapshot has been completed.

Copy link
Contributor Author

@pcholakov pcholakov Nov 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have an idea here that hadn't made it into the PR just yet: just download the previous pointer and check that we aren't moving backwards. This should be enough to prevent the worst case of some node going to sleep mid-snapshot and wreaking havoc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since I wrote that comment, we have been blessed with proper S3 conditional put, so I rewrote the update logic to perform a CAS 🎉 I'm not doing this preemptively since this path should be uncontended, but the check is there as a defensive measure against going backwards and overwriting something we didn't mean to.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good :-)

Comment on lines 171 to 227
for file in &snapshot.files {
let filename = file.name.trim_start_matches("/");
let key = object_store::path::Path::from(format!(
"{}/{}",
snapshot_prefix.as_str(),
filename
));
let put_result = put_snapshot_object(
local_snapshot_path.join(filename).as_path(),
&key,
&self.object_store,
)
.await?;
debug!(
etag = put_result.e_tag.unwrap_or_default(),
?key,
"Put snapshot data file completed",
);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uploading multiple files concurrently, will probably only cause higher and less predictable resource utilization. And we aren't in a rush, I guess.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed! It was easier to be more predictable with a single upload stream. The impact I'm most concerned about is the memory overhead. S3 advises using fairly large chunks - order 100MB - for maximum throughput so maybe it's worth looking into memory mapped IO down the line.

crates/worker/src/partition/snapshots/repository.rs Outdated Show resolved Hide resolved
crates/worker/src/partition/snapshots/repository.rs Outdated Show resolved Hide resolved
@pcholakov pcholakov force-pushed the feat/snapshot-upload branch 3 times, most recently from defc6ee to 7291ede Compare November 26, 2024 21:35
@pcholakov
Copy link
Contributor Author

@tillrohrmann if you could take another look please, that would be great! I think I've covered all the comments:

  • added a unique snapshot ID to the snapshot path to guarantee uniqueness
  • eliminated Error suffix from various error variants
  • reusing a BytesMut buffer to minimize allocation (still sequential multipart upload though; ran out of time - let's park that a a future improvement if desired)
  • now performing an S3 CAS on pointer bump 🎉
  • a partial multipart upload is cleaned up on error

The partition snapshot prefix looks like this in S3 with the latest changes:

image image

@pcholakov pcholakov force-pushed the feat/snapshot-upload branch from 7291ede to ff6d9ce Compare November 27, 2024 12:43
Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot for updating the PR @pcholakov. It looks really good to me. I think we are very close to merging it.

The last remaining questions I had were around resource management in case of failures when uploading snapshots. In particular, who is cleaning up partial snapshot artifacts (ssts) and when are we deleting the local snapshot files.

When conditionally updating the latest.json should we retry in case there was a concurrent modification?

I was also wondering whether we shouldn't configure a local snapshot directory if no destination was specified. That way users can control the snapshotting by configuring a valid destination.

Comment on lines 386 to 387
#[serde(flatten, skip_serializing_if = "HashMap::is_empty")]
pub additional_options: HashMap<String, String>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With serde(flatten) this will act a bit as catch all fields specified under SnapshotOptions in the toml that aren't specifically defined in SnapshotOptions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, totally didn't see that because I haven't tested this option at all! I'm not sure how to handle it with serde but I'll find a way to make it work.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you test it and it works?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't seem to use this field anymore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I clearly did all the precursor work but then forgot to delete the field - sorry about that! This is now removed.

Comment on lines 90 to 95
base_dir
.join("pp-snapshots")
.into_os_string()
.into_string()
.map(|path| format!("file://{path}"))
.map_err(|e| anyhow!("Unable to convert path to string: {:?}", e))?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we by default snapshot to local disk which is not accessible by every PP process, then we have to make the trim logic conditional on this fact. It will also require us to implement a snapshot exchange mechanism if it is only local because once we trim and then start new PPs then we need such a mechanism. Instead I would suggest to only create the SnapshotRepository, if the destination is configured. And it is the responsibility of the user to ensure that destination is accessible by all nodes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is sensible - it became quite apparent when I started working on the auto-trim-by-archived-LSN PR earlier. Partly this was motivated by wanting the CreateSnapshot RPC to just work out of the box, but it's not worth the potential confusion. I still think I would prefer automated trimming to be opt-in regardless of the existence of a valid SnapshotRepository but we can have that discussion in a separate PR.

// locations just work. This makes object_store behave similarly to the Lambda invoker.
let object_store: Arc<dyn ObjectStore> = if destination.scheme() == "s3"
&& destination.query().is_none()
&& snapshots_options.additional_options.is_empty()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if additional_options contains some other settings but not the credentials and is therefore not empty?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can make that work; there are a handful of config keys (region + access key) that are in conflict with the AWS config provider. It's a reasonable expectation to merge the configs.

// SDK credentials provider so that the conventional environment variables and config
// locations just work. This makes object_store behave similarly to the Lambda invoker.
let object_store: Arc<dyn ObjectStore> = if destination.scheme() == "s3"
&& destination.query().is_none()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why isn't it a good idea to rely on default credentials if the destination contains a query part?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The query part is how object_store configuration typically works - you can pass API keys and other bits of config as URL parameters. I explicitly didn't want to deal with merging config from two completely different config providers, I'm not even sure it's possible to do it in a sane way. The logic is that if you wish to override the config, then you own setting up all of it. I think that's reasonable behavior except I see that I've completely neglected to mention that in the SnapshotsOptions docs - I'll fix that.

&key,
&self.object_store,
)
.await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens with already uploaded files if we fail at this point? Will this leave an incomplete snapshot behind?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct; my thinking was that we have to deal with pruning the repository separately anyway and I would handle it there - but I can make a best-effort attempt at cleanup on upload. In general, I've altogether skipped hardening the snapshot path with things like retries. I was planning to do that as a follow-up but can certainly address it now if the PR is not getting too big.

None
}
Err(e) => {
bail!("Failed to get latest snapshot pointer: {}", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering whether a warn logging might be good enough. Technically we did complete the snapshot. It just might be unused because latest.json isn't updated. How would the caller handle an error compared to an Ok(())?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, we might wanna clean up the snapshot because it wouldn't be used because latest.json hasn't been updated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I probably would rather make a best-effort attempt at cleaning up partially uploaded keys, and return error. As far as response semantics, what I hope to achieve is that the caller has an unambiguous confirmation that a snapshot exists, at a given LSN (I know technically we don't yet return the LSN in CreateSnapshotResponse but still.) I think this will become important in the future where the cluster controller might want to orchestrate snapshot/restore sequences across nodes.

I have one more follow up to make here - currently we return an error if the LSN is unchanged from the latest archived-LSN snapshot in the repository. That should be a no-op and return success, with the ID of the existing latest snapshot, basically making it an idempotent no-op operation to call repeatedly, even if the log is not moving.

Comment on lines 291 to 292
let put_result = self
.object_store
.put_opts(&latest_path, latest_json, conditions)
.await?;
Copy link
Contributor

@tillrohrmann tillrohrmann Nov 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the contract of the put method wrt to updating the latest.json because I've left a few comments regarding this. Is your intention that a successfully uploaded snapshot must update the latest.json and if it fails, then the whole put method failed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there was a concurrent update, shouldn't we retry until we know that there is a newer snapshot?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Up till your comment, I was thinking of the put contract as returning the status of whether the specific create-snapshot request succeeded, or not. But if you zoom out a bit, the caller really only cares that a snapshot exists, at some LSN, in the shared repository.

Partly also, in the current world, we don't expect any contention on latest.json so this is mainly just a paranoid defensive line against a node experiencing a really long pause between starting a snapshot, and trying to bump the latest pointer - long enough that another processor has become the leader, and taken over snapshotting.

I think a perfectly reasonable fallback here is that, if there is a concurrent update, we just read the latest value and return that to the caller. I'll update the code to behave like that.

debug!("Performing multipart upload for {key}");
let mut upload = object_store.put_multipart(key).await?;

let mut buf = BytesMut::new();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you pass in this buffer into this method, then you can reuse it across uploading multiple files and don't have reallocate it for every file again.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call! Initially, I was thinking of doing multiple puts in parallel so deliberately did not reuse this.

Subsequently, I changed my thinking around concurrency: I think we should optimise the restore path for maximum throughput as we want a cold Partition Processor to get up to speed ASAP, but on the create snapshot path we should rather optimise for minimal disruption to the ongoing Restate request processing. Let me know if you disagree with the thinking here.

Comment on lines 332 to 485
loop {
let mut len = 0;
buf.reserve(MULTIPART_UPLOAD_CHUNK_SIZE_BYTES);

// Ensure full buffer unless at EOF
while buf.len() < MULTIPART_UPLOAD_CHUNK_SIZE_BYTES {
len = snapshot.read_buf(&mut buf).await?;
if len == 0 {
break;
}
}

if !buf.is_empty() {
upload
.put_part(PutPayload::from_bytes(buf.split().freeze()))
.await?;
}

if len == 0 {
break;
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic looks sound to me :-)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great! I had some extra logging to make sure we are definitely not allocating more into the buffer but it's a new API I haven't worked with before 😅

@pcholakov
Copy link
Contributor Author

@tillrohrmann

Have to ask again about the CAS loop because I don't understand how it currently will work.

I was answering something slightly different, my apologies! I think this is the relevant answer: #2310 (comment) :-)

@pcholakov pcholakov force-pushed the feat/snapshot-upload branch from 3a87e07 to aee1a12 Compare December 5, 2024 17:06
Copy link
Contributor Author

@pcholakov pcholakov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I really think this is everything covered now 😅

);
snapshot2.min_applied_lsn = snapshot1.min_applied_lsn.next();

repository.put(&snapshot2, source_dir).await?;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No words. 🤦‍♂️

I added coverage and made the LatestSnapshot struct construction a lot nicer in the process. Thank you for flagging this!

@@ -139,6 +140,7 @@ metrics-exporter-prometheus = { version = "0.15", default-features = false, feat
"async-runtime",
] }
moka = "0.12.5"
object_store = { version = "0.11.1", features = ["aws"] }
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are not officially supporting anything other than S3, it was easier to just not compile the other providers. The file provider is always enabled, it seems.

@@ -23,7 +23,7 @@ arc-swap = { workspace = true }
futures = { workspace = true }
futures-util = { workspace = true }
http = { workspace = true }
pprof = { version = "0.13", features = ["criterion", "flamegraph"] }
pprof = { version = "0.14", features = ["criterion", "flamegraph"] }
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated RUSTSEC update.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for fixing this :-)

Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work @pcholakov. The changes look good to me. +1 for merging :-)

@@ -23,7 +23,7 @@ arc-swap = { workspace = true }
futures = { workspace = true }
futures-util = { workspace = true }
http = { workspace = true }
pprof = { version = "0.13", features = ["criterion", "flamegraph"] }
pprof = { version = "0.14", features = ["criterion", "flamegraph"] }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for fixing this :-)

@pcholakov pcholakov merged commit 1cdb6f8 into main Dec 6, 2024
13 checks passed
@pcholakov pcholakov deleted the feat/snapshot-upload branch December 6, 2024 08:49
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Uploading state snapshots to an object store
3 participants