diff --git a/Cargo.lock b/Cargo.lock index 27020ab35..6b6187d30 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -633,9 +633,9 @@ dependencies = [ [[package]] name = "aws-sdk-sso" -version = "1.35.0" +version = "1.36.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc3ef4ee9cdd19ec6e8b10d963b79637844bbf41c31177b77a188eaa941e69f7" +checksum = "6acca681c53374bf1d9af0e317a41d12a44902ca0f2d1e10e5cb5bb98ed74f35" dependencies = [ "aws-credential-types", "aws-runtime", @@ -655,9 +655,9 @@ dependencies = [ [[package]] name = "aws-sdk-ssooidc" -version = "1.36.0" +version = "1.37.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "527f3da450ea1f09f95155dba6153bd0d83fe0923344a12e1944dfa5d0b32064" +checksum = "b79c6bdfe612503a526059c05c9ccccbf6bd9530b003673cb863e547fd7c0c9a" dependencies = [ "aws-credential-types", "aws-runtime", @@ -796,13 +796,17 @@ dependencies = [ "aws-smithy-types", "bytes", "fastrand", + "h2 0.3.26", "http 0.2.12", "http-body 0.4.6", "http-body 1.0.1", "httparse", + "hyper 0.14.30", + "hyper-rustls 0.24.2", "once_cell", "pin-project-lite", "pin-utils", + "rustls 0.21.12", "tokio", "tracing", ] @@ -3058,6 +3062,25 @@ dependencies = [ "syn 2.0.85", ] +[[package]] +name = "h2" +version = "0.3.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81fe527a889e1532da5c525686d96d4c2e74cdd345badf8dfef9f6b39dd5f5e8" +dependencies = [ + "bytes", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http 0.2.12", + "indexmap 2.2.6", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "h2" version = "0.4.5" @@ -3265,6 +3288,7 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", + "h2 0.3.26", "http 0.2.12", "http-body 0.4.6", "httparse", @@ -3287,7 +3311,7 @@ dependencies = [ "bytes", "futures-channel", "futures-util", - "h2", + "h2 0.4.5", "http 1.1.0", "http-body 1.0.1", "httparse", @@ -3529,24 +3553,23 @@ checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" [[package]] name = "idna" -version = "0.5.0" +version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "634d9b1461af396cad843f47fdba5597a4f9e6ddd4bfb6ff5d85028c25cb12f6" +checksum = "686f825264d630750a544639377bae737628043f20d38bbc029e8f29ea968a7e" dependencies = [ - "unicode-bidi", - "unicode-normalization", + "idna_adapter", + "smallvec", + "utf8_iter", ] [[package]] -name = "idna" -version = "1.0.2" +name = "idna_adapter" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bd69211b9b519e98303c015e21a007e293db403b6c85b9b124e133d25e242cdd" +checksum = "daca1df1c957320b2cf139ac61e7bd64fed304c5040df000a745aa1de3b4ef71" dependencies = [ "icu_normalizer", "icu_properties", - "smallvec", - "utf8_iter", ] [[package]] @@ -3622,7 +3645,7 @@ dependencies = [ "log", "num-format", "once_cell", - "quick-xml", + "quick-xml 0.26.0", "rgb", "str_stack", ] @@ -3815,7 +3838,7 @@ dependencies = [ "email_address", "fancy-regex", "fraction", - "idna 1.0.2", + "idna", "itoa", "num-cmp", "once_cell", @@ -4606,18 +4629,27 @@ dependencies = [ [[package]] name = "object_store" -version = "0.11.0" +version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25a0c4b3a0e31f8b66f71ad8064521efa773910196e2cde791436f13409f3b45" +checksum = "6eb4c22c6154a1e759d7099f9ffad7cc5ef8245f9efbab4a41b92623079c82f3" dependencies = [ "async-trait", + "base64 0.22.0", "bytes", "chrono", "futures", "humantime", + "hyper 1.4.1", "itertools 0.13.0", + "md-5", "parking_lot", "percent-encoding", + "quick-xml 0.36.2", + "rand", + "reqwest", + "ring", + "serde", + "serde_json", "snafu 0.8.5", "tokio", "tracing", @@ -5458,6 +5490,16 @@ dependencies = [ "memchr", ] +[[package]] +name = "quick-xml" +version = "0.36.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7649a7b4df05aed9ea7ec6f628c67c9953a43869b8bc50929569b2999d443fe" +dependencies = [ + "memchr", + "serde", +] + [[package]] name = "quinn" version = "0.11.5" @@ -5770,7 +5812,7 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "h2", + "h2 0.4.5", "http 1.1.0", "http-body 1.0.1", "http-body-util", @@ -5786,6 +5828,7 @@ dependencies = [ "pin-project-lite", "quinn", "rustls 0.23.11", + "rustls-native-certs 0.7.1", "rustls-pemfile 2.1.2", "rustls-pki-types", "serde", @@ -6628,7 +6671,7 @@ dependencies = [ "bytestring", "derive_builder", "futures", - "h2", + "h2 0.4.5", "http 1.1.0", "http-body-util", "http-serde", @@ -6904,6 +6947,7 @@ dependencies = [ "tracing-opentelemetry", "typify", "ulid", + "url", "xxhash-rust", ] @@ -6966,6 +7010,9 @@ dependencies = [ "anyhow", "assert2", "async-channel", + "async-trait", + "aws-config", + "aws-credential-types", "bytes", "bytestring", "codederror", @@ -6976,6 +7023,7 @@ dependencies = [ "humantime", "itertools 0.13.0", "metrics", + "object_store", "opentelemetry", "parking_lot", "pin-project", @@ -7008,6 +7056,7 @@ dependencies = [ "serde_json", "serde_with", "strum 0.26.2", + "tar", "tempfile", "test-log", "thiserror", @@ -7018,6 +7067,7 @@ dependencies = [ "tracing-opentelemetry", "tracing-subscriber", "ulid", + "url", ] [[package]] @@ -8024,6 +8074,17 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" +[[package]] +name = "tar" +version = "0.4.43" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c65998313f8e17d0d553d28f91a0df93e4dbbbf770279c7bc21ca0f09ea1a1f6" +dependencies = [ + "filetime", + "libc", + "xattr", +] + [[package]] name = "tempfile" version = "3.10.1" @@ -8392,7 +8453,7 @@ dependencies = [ "base64 0.22.0", "bytes", "flate2", - "h2", + "h2 0.4.5", "http 1.1.0", "http-body 1.0.1", "http-body-util", @@ -8816,12 +8877,12 @@ checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" [[package]] name = "url" -version = "2.5.0" +version = "2.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31e6302e3bb753d46e83516cae55ae196fc0c309407cf11ab35cc51a4c2a4633" +checksum = "8d157f1b96d14500ffdc1f10ba712e780825526c03d9a49b4d0324b0d9113ada" dependencies = [ "form_urlencoded", - "idna 0.5.0", + "idna", "percent-encoding", "serde", ] @@ -9310,6 +9371,17 @@ dependencies = [ "tap", ] +[[package]] +name = "xattr" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8da84f1a25939b27f6820d92aed108f83ff920fdf11a7b19366c27c4cda81d4f" +dependencies = [ + "libc", + "linux-raw-sys", + "rustix", +] + [[package]] name = "xmlparser" version = "0.13.6" diff --git a/Cargo.toml b/Cargo.toml index 129e82ba5..160cf3fe1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -77,6 +77,8 @@ arrow = { version = "53.1.0", default-features = false } assert2 = "0.3.11" async-channel = "2.1.1" async-trait = "0.1.73" +aws-config = { version = "1.5.4", default-features = false, features = ["rt-tokio", "sso", "rustls"] } +aws-credential-types = { version = "1.2.0", default-features = false } axum = { version = "0.7.5", default-features = false } base64 = "0.22" bitflags = { version = "2.6.0" } @@ -134,6 +136,7 @@ metrics-exporter-prometheus = { version = "0.15", default-features = false, feat "async-runtime", ] } moka = "0.12.5" +object_store = { version = "0.11.1", features = ["aws"] } once_cell = "1.18" opentelemetry = { version = "0.24.0" } opentelemetry-http = { version = "0.13.0" } diff --git a/crates/core/src/worker_api/partition_processor_manager.rs b/crates/core/src/worker_api/partition_processor_manager.rs index d84eb7c3a..5b42395a5 100644 --- a/crates/core/src/worker_api/partition_processor_manager.rs +++ b/crates/core/src/worker_api/partition_processor_manager.rs @@ -82,6 +82,8 @@ pub enum SnapshotError { SnapshotExportError(PartitionId, #[source] anyhow::Error), #[error("Snapshot failed for partition {0}: {1}")] SnapshotMetadataHeaderError(PartitionId, #[source] io::Error), + #[error("Error putting partition id {0} snapshot into repository: {1}")] + RepositoryPutError(PartitionId, #[source] anyhow::Error), #[error("Internal error creating snapshot for partition {0}: {1}")] Internal(PartitionId, String), } @@ -94,6 +96,7 @@ impl SnapshotError { SnapshotError::InvalidState(partition_id) => *partition_id, SnapshotError::SnapshotExportError(partition_id, _) => *partition_id, SnapshotError::SnapshotMetadataHeaderError(partition_id, _) => *partition_id, + SnapshotError::RepositoryPutError(partition_id, _) => *partition_id, SnapshotError::Internal(partition_id, _) => *partition_id, } } diff --git a/crates/service-client/Cargo.toml b/crates/service-client/Cargo.toml index c3835836b..08e8f1d7b 100644 --- a/crates/service-client/Cargo.toml +++ b/crates/service-client/Cargo.toml @@ -13,8 +13,8 @@ options_schema = ["dep:schemars", "restate-types/schemars"] [dependencies] arc-swap = { workspace = true } -aws-config = { version = "1.5.4", default-features = false, features = ["rt-tokio", "sso"] } -aws-credential-types = {version = "1.2.0", default-features = false} +aws-config = { workspace = true } +aws-credential-types = { workspace = true } aws-sdk-lambda = {version = "1.36.0", default-features = false, features = ["rt-tokio"]} aws-sdk-sts = {version = "1.35.0", default-features = false, features = ["rt-tokio"]} base64 = { workspace = true } diff --git a/crates/types/Cargo.toml b/crates/types/Cargo.toml index 7d87ff31a..04486bc5a 100644 --- a/crates/types/Cargo.toml +++ b/crates/types/Cargo.toml @@ -74,6 +74,7 @@ tracing = { workspace = true } tracing-opentelemetry = { workspace = true } ulid = { workspace = true } xxhash-rust = { workspace = true, features = ["xxh3"] } +url = "2.5.3" [dev-dependencies] restate-test-util = { workspace = true } diff --git a/crates/types/src/config/worker.rs b/crates/types/src/config/worker.rs index 187f1818b..48dc03c1b 100644 --- a/crates/types/src/config/worker.rs +++ b/crates/types/src/config/worker.rs @@ -8,18 +8,19 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use serde::{Deserialize, Serialize}; -use serde_with::serde_as; +use std::collections::HashMap; use std::num::{NonZeroU16, NonZeroU64, NonZeroUsize}; use std::path::PathBuf; use std::time::Duration; -use tracing::warn; -use restate_serde_util::NonZeroByteCount; +use serde::{Deserialize, Serialize}; +use serde_with::serde_as; +use tracing::warn; use super::{CommonOptions, RocksDbOptions, RocksDbOptionsBuilder}; use crate::identifiers::PartitionId; use crate::retries::RetryPolicy; +use restate_serde_util::NonZeroByteCount; /// # Worker options #[serde_as] @@ -359,6 +360,32 @@ impl Default for StorageOptions { #[serde(rename_all = "kebab-case")] #[builder(default)] pub struct SnapshotsOptions { + /// # Destination + /// + /// Where snapshots are moved after they get created. This property support URLs with either + /// `s3://` or `file://` protocol scheme. The URL is parsed with + /// + /// Example: `s3://snapshots-bucket/restate/cluster` will send snapshots to the specified + /// bucket, prefixing keys with the URL path. + /// + /// Default: `None` + pub destination: Option, + + /// # Additional options + /// + /// Extra configuration options that will be passed to `object_store`. Supported keys include: + /// + /// - `region` + /// - `access_key_id` + /// - `secret_access_key` + /// - `session_token` + /// - `endpoint_url` + /// + /// For the full list of available options, please refer to the [Object Store configuration system reference](https://docs.rs/object_store/latest/object_store/index.html#configuration-system) + /// and the specific provider's documentation, e.g. [S3](https://docs.rs/object_store/latest/object_store/aws/enum.AmazonS3ConfigKey.html). + #[serde(flatten, skip_serializing_if = "HashMap::is_empty")] + pub additional_options: HashMap, + /// # Automatic snapshot creation frequency /// /// Number of log records that trigger a snapshot to be created. diff --git a/crates/worker/Cargo.toml b/crates/worker/Cargo.toml index 214383ac3..cac209993 100644 --- a/crates/worker/Cargo.toml +++ b/crates/worker/Cargo.toml @@ -45,6 +45,9 @@ restate-wal-protocol = { workspace = true } anyhow = { workspace = true } assert2 = { workspace = true } async-channel = { workspace = true } +async-trait = { workspace = true } +aws-config = { workspace = true } +aws-credential-types = { workspace = true } bytes = { workspace = true } bytestring = { workspace = true } codederror = { workspace = true } @@ -54,6 +57,7 @@ futures = { workspace = true } humantime = { workspace = true } itertools = { workspace = true } metrics = { workspace = true } +object_store = { workspace = true } opentelemetry = { workspace = true } parking_lot = { workspace = true } pin-project = { workspace = true } @@ -63,6 +67,8 @@ serde = { workspace = true } serde_json = { workspace = true } serde_with = { workspace = true } strum = { workspace = true } +tar = "0.4.43" +tempfile = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } tokio-stream = { workspace = true } @@ -70,6 +76,7 @@ tokio-util = { workspace = true } tracing = { workspace = true } tracing-opentelemetry = { workspace = true } ulid = { workspace = true } +url = { workspace = true } [dev-dependencies] restate-bifrost = { workspace = true, features = ["test-util"] } diff --git a/crates/worker/src/lib.rs b/crates/worker/src/lib.rs index 1469a72e9..dc50c9c44 100644 --- a/crates/worker/src/lib.rs +++ b/crates/worker/src/lib.rs @@ -46,6 +46,7 @@ use restate_types::live::Live; use restate_types::protobuf::common::WorkerStatus; use crate::partition::invoker_storage_reader::InvokerStorageReader; +use crate::partition::snapshots::SnapshotRepository; use crate::partition_processor_manager::PartitionProcessorManager; pub use self::error::*; @@ -79,6 +80,9 @@ pub enum BuildError { ), #[code(unknown)] Invoker(#[from] restate_invoker_impl::BuildError), + #[error("failed opening partition store: {0}")] + #[code(unknown)] + SnapshotsRepository(#[from] anyhow::Error), } #[derive(Debug, thiserror::Error, CodedError)] @@ -144,6 +148,9 @@ impl Worker { partition_store_manager.clone(), router_builder, bifrost, + SnapshotRepository::create(config.common.base_dir(), &config.worker.snapshots) + .await + .map_err(BuildError::SnapshotsRepository)?, ); // handle RPCs diff --git a/crates/worker/src/partition/mod.rs b/crates/worker/src/partition/mod.rs index 9e7e00099..2a58fbb81 100644 --- a/crates/worker/src/partition/mod.rs +++ b/crates/worker/src/partition/mod.rs @@ -74,6 +74,7 @@ mod cleaner; pub mod invoker_storage_reader; mod leadership; pub mod shuffle; +pub mod snapshots; mod state_machine; pub mod types; diff --git a/crates/worker/src/partition/snapshots/mod.rs b/crates/worker/src/partition/snapshots/mod.rs new file mode 100644 index 000000000..e368b03a6 --- /dev/null +++ b/crates/worker/src/partition/snapshots/mod.rs @@ -0,0 +1,15 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +mod repository; +mod snapshot_task; + +pub use repository::SnapshotRepository; +pub use snapshot_task::*; diff --git a/crates/worker/src/partition/snapshots/repository.rs b/crates/worker/src/partition/snapshots/repository.rs new file mode 100644 index 000000000..1710a1b88 --- /dev/null +++ b/crates/worker/src/partition/snapshots/repository.rs @@ -0,0 +1,206 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::path::{Path, PathBuf}; +use std::sync::Arc; + +use anyhow::{anyhow, Context}; +use async_trait::async_trait; +use aws_config::default_provider::credentials::DefaultCredentialsChain; +use aws_config::BehaviorVersion; +use aws_credential_types::provider::ProvideCredentials; +use object_store::aws::AmazonS3Builder; +use object_store::{ObjectStore, PutPayload}; +use tempfile::NamedTempFile; +use tracing::{debug, trace_span, warn}; +use url::Url; + +use restate_core::task_center; +use restate_partition_store::snapshots::PartitionSnapshotMetadata; +use restate_types::config::SnapshotsOptions; +use restate_types::identifiers::PartitionId; + +/// Provides read and write access to the long-term partition snapshot storage destination. +#[derive(Clone)] +pub struct SnapshotRepository { + object_store: Arc, + destination: Url, + prefix: String, + staging_path: PathBuf, +} + +impl SnapshotRepository { + pub async fn create( + base_dir: PathBuf, + snapshots_options: &SnapshotsOptions, + ) -> anyhow::Result { + let destination = snapshots_options + .destination + .as_ref() + .map(|s| Ok(s.clone())) + .unwrap_or_else(|| { + base_dir + .join("pp-snapshots") + .into_os_string() + .into_string() + .map(|path| format!("file://{path}")) + }) + .map_err(|e| anyhow!("Unable to convert path to string: {:?}", e))?; + let destination = + Url::parse(&destination).context("Failed parsing snapshot repository URL")?; + + // AWS-specific ergonomics optimization: without explicit configuration, we use the AWS SDK + // detected region and default credentials provider. This makes object_store behave + // similarly to the Lambda invoker, respecting AWS_PROFILE and available session creds. + let object_store: Arc = if destination.scheme() == "s3" + && destination.query().is_none() + && snapshots_options.additional_options.is_empty() + { + let aws_region = aws_config::load_defaults(BehaviorVersion::v2024_03_28()) + .await + .region() + .context("Unable to determine AWS region to use with S3")? + .clone(); + + let store = AmazonS3Builder::new() + .with_url(destination.clone()) + .with_region(aws_region.to_string()) + .with_credentials(Arc::new(AwsSdkCredentialsProvider { + credentials_provider: DefaultCredentialsChain::builder().build().await, + })) + .build()?; + + Arc::new(store) + } else { + object_store::parse_url_opts(&destination, &snapshots_options.additional_options)? + .0 + .into() + }; + + let prefix = destination.path().into(); + Ok(SnapshotRepository { + object_store, + destination, + prefix, + staging_path: base_dir.clone().join("snapshot-staging"), + }) + } + + /// Write a partition snapshot to the snapshot repository. + pub(crate) async fn put( + &self, + partition_id: PartitionId, + metadata: &PartitionSnapshotMetadata, + snapshot_path: &Path, + ) -> anyhow::Result<()> { + let snapshot_id = metadata.snapshot_id; + let lsn = metadata.min_applied_lsn; + + debug!( + %snapshot_id, + partition_id = ?partition_id, + %lsn, + "Publishing partition snapshot to: {}", + self.destination, + ); + + // All common object stores list objects in lexicographical order, with no option for + // reverse order. We inject an explicit sort key into the snapshot prefix to make sure that + // the latest snapshot is always first. + let inverted_sort_key = format!("{:016x}", u64::MAX - lsn.as_u64()); + + // The snapshot data / metadata key format is: [/]/__.tar + let snapshot_key = match self.prefix.as_str() { + "" | "/" => format!( + "{partition_id}/{sk}_{lsn}_{snapshot_id}.tar", + sk = inverted_sort_key, + lsn = metadata.min_applied_lsn, + ), + prefix => format!( + "{trimmed_prefix}/{partition_id}/{sk}_{lsn}_{snapshot_id}.tar", + trimmed_prefix = prefix.trim_start_matches('/').trim_end_matches('/'), + sk = inverted_sort_key, + ), + }; + + let staging_path = self.staging_path.clone(); + tokio::fs::create_dir_all(&staging_path).await?; + + let snapshot_path = snapshot_path.to_path_buf(); + let packaging_task = task_center().spawn_blocking_unmanaged( + "package-snapshot", + Some(partition_id), + async move { + trace_span!("package-snapshot", %snapshot_id).in_scope(|| { + let mut tarball = tar::Builder::new(NamedTempFile::new_in(&staging_path)?); + debug!( + "Creating snapshot tarball of {:?} in: {:?}...", + &staging_path, + tarball.get_ref() + ); + tarball.append_dir_all(".", snapshot_path)?; + tarball.finish()?; + tarball.into_inner() + }) + }, + ); + let tarball = packaging_task.await??; + + // todo(pavel): don't buffer the entire snapshot in memory! + let payload = PutPayload::from(tokio::fs::read(tarball.path()).await?); + + let upload = self + .object_store + .put( + &object_store::path::Path::from(snapshot_key.clone()), + payload, + ) + .await + .context("Failed to put snapshot in repository")?; + + debug!( + %snapshot_id, + etag = upload.e_tag.unwrap_or_default(), + "Successfully published snapshot to repository as: {}", + snapshot_key, + ); + Ok(()) + } +} + +#[derive(Debug)] +struct AwsSdkCredentialsProvider { + credentials_provider: DefaultCredentialsChain, +} + +#[async_trait] +impl object_store::CredentialProvider for AwsSdkCredentialsProvider { + type Credential = object_store::aws::AwsCredential; + + async fn get_credential(&self) -> object_store::Result> { + let creds = self + .credentials_provider + .provide_credentials() + .await + .map_err(|e| { + warn!(error = ?e, "Failed to get AWS credentials from credentials provider"); + object_store::Error::Generic { + store: "snapshot repository store", + source: e.into(), + } + })?; + + Ok(Arc::new(object_store::aws::AwsCredential { + key_id: creds.access_key_id().to_string(), + secret_key: creds.secret_access_key().to_string(), + token: creds.session_token().map(|t| t.to_string()), + })) + } +} diff --git a/crates/worker/src/partition_processor_manager/snapshot_task.rs b/crates/worker/src/partition/snapshots/snapshot_task.rs similarity index 90% rename from crates/worker/src/partition_processor_manager/snapshot_task.rs rename to crates/worker/src/partition/snapshots/snapshot_task.rs index b8bb9cd23..bfe7ca264 100644 --- a/crates/worker/src/partition_processor_manager/snapshot_task.rs +++ b/crates/worker/src/partition/snapshots/snapshot_task.rs @@ -22,6 +22,8 @@ use restate_partition_store::PartitionStoreManager; use restate_types::identifiers::{PartitionId, SnapshotId}; use restate_types::logs::Lsn; +use crate::partition::snapshots::SnapshotRepository; + /// Handle to an outstanding [`SnapshotPartitionTask`] that has been spawned, including a reference /// to notify the requester. pub struct PendingSnapshotTask { @@ -36,6 +38,7 @@ pub struct SnapshotPartitionTask { pub snapshot_base_path: PathBuf, pub partition_store_manager: PartitionStoreManager, pub archived_lsn_sender: watch::Sender>, + pub snapshot_repository: SnapshotRepository, } impl SnapshotPartitionTask { @@ -52,6 +55,7 @@ impl SnapshotPartitionTask { self.partition_id, self.snapshot_base_path, self.archived_lsn_sender, + self.snapshot_repository, ) .await; @@ -84,6 +88,7 @@ async fn create_snapshot_inner( partition_id: PartitionId, snapshot_base_path: PathBuf, archived_lsn_sender: watch::Sender>, + snapshot_repository: SnapshotRepository, ) -> Result { let snapshot_id = SnapshotId::new(); let snapshot = partition_store_manager @@ -96,11 +101,14 @@ async fn create_snapshot_inner( cluster_name, node_name, partition_id, - snapshot, + &snapshot, ) .await?; - // todo(pavel): SnapshotRepository integration will go in here in a future PR + snapshot_repository + .put(partition_id, &metadata, snapshot.base_dir.as_path()) + .await + .map_err(|e| SnapshotError::RepositoryPutError(partition_id, e))?; archived_lsn_sender .send(Some(metadata.min_applied_lsn)) @@ -116,7 +124,7 @@ async fn write_snapshot_metadata_header( cluster_name: String, node_name: String, partition_id: PartitionId, - snapshot: LocalPartitionSnapshot, + snapshot: &LocalPartitionSnapshot, ) -> Result { let snapshot_meta = PartitionSnapshotMetadata { version: SnapshotFormatVersion::V1, @@ -125,7 +133,7 @@ async fn write_snapshot_metadata_header( partition_id, created_at: humantime::Timestamp::from(SystemTime::now()), snapshot_id, - key_range: snapshot.key_range, + key_range: snapshot.key_range.clone(), min_applied_lsn: snapshot.min_applied_lsn, db_comparator_name: snapshot.db_comparator_name.clone(), files: snapshot.files.clone(), diff --git a/crates/worker/src/partition_processor_manager/mod.rs b/crates/worker/src/partition_processor_manager/mod.rs index f54bd6896..714ecdb40 100644 --- a/crates/worker/src/partition_processor_manager/mod.rs +++ b/crates/worker/src/partition_processor_manager/mod.rs @@ -11,7 +11,6 @@ mod message_handler; mod persisted_lsn_watchdog; mod processor_state; -mod snapshot_task; mod spawn_processor_task; use std::collections::hash_map::Entry; @@ -36,14 +35,12 @@ use crate::metric_definitions::PARTITION_LAST_APPLIED_LOG_LSN; use crate::metric_definitions::PARTITION_LAST_PERSISTED_LOG_LSN; use crate::metric_definitions::PARTITION_TIME_SINCE_LAST_RECORD; use crate::metric_definitions::PARTITION_TIME_SINCE_LAST_STATUS_UPDATE; +use crate::partition::snapshots::{PendingSnapshotTask, SnapshotPartitionTask, SnapshotRepository}; use crate::partition_processor_manager::message_handler::PartitionProcessorManagerMessageHandler; use crate::partition_processor_manager::persisted_lsn_watchdog::PersistedLogLsnWatchdog; use crate::partition_processor_manager::processor_state::{ LeaderEpochToken, ProcessorState, StartedProcessor, }; -use crate::partition_processor_manager::snapshot_task::{ - PendingSnapshotTask, SnapshotPartitionTask, -}; use crate::partition_processor_manager::spawn_processor_task::SpawnPartitionProcessorTask; use restate_bifrost::Bifrost; use restate_core::network::{Incoming, MessageRouterBuilder, MessageStream}; @@ -106,6 +103,7 @@ pub struct PartitionProcessorManager { pending_snapshots: HashMap, snapshot_export_tasks: FuturesUnordered>>, + snapshot_repository: SnapshotRepository, } #[derive(Debug, thiserror::Error)] @@ -172,6 +170,7 @@ impl PartitionProcessorManager { partition_store_manager: PartitionStoreManager, router_builder: &mut MessageRouterBuilder, bifrost: Bifrost, + snapshot_repository: SnapshotRepository, ) -> Self { let incoming_update_processors = router_builder.subscribe_to_stream(2); let incoming_partition_processor_rpc = router_builder.subscribe_to_stream(128); @@ -198,6 +197,7 @@ impl PartitionProcessorManager { asynchronous_operations: JoinSet::default(), snapshot_export_tasks: FuturesUnordered::default(), pending_snapshots: HashMap::default(), + snapshot_repository, } } @@ -606,7 +606,12 @@ impl PartitionProcessorManager { } }; - self.spawn_create_snapshot_task(partition_id, sender, archived_lsn_sender); + self.spawn_create_snapshot_task( + partition_id, + sender, + archived_lsn_sender, + self.snapshot_repository.clone(), + ); } fn on_control_processors(&mut self) { @@ -784,6 +789,7 @@ impl PartitionProcessorManager { partition_id: PartitionId, result_sender: oneshot::Sender, archived_lsn_sender: watch::Sender>, + snapshot_repository: SnapshotRepository, ) { if let Entry::Vacant(entry) = self.pending_snapshots.entry(partition_id) { let config = self.updateable_config.live_load(); @@ -797,6 +803,7 @@ impl PartitionProcessorManager { partition_store_manager: self.partition_store_manager.clone(), snapshot_base_path, archived_lsn_sender, + snapshot_repository, }; let snapshot_span = tracing::info_span!("create-snapshot"); @@ -911,7 +918,9 @@ enum EventKind { #[cfg(test)] mod tests { + use crate::partition::snapshots::SnapshotRepository; use crate::partition_processor_manager::PartitionProcessorManager; + use crate::BuildError; use googletest::IntoTestResult; use restate_bifrost::providers::memory_loglet; use restate_bifrost::BifrostService; @@ -919,7 +928,9 @@ mod tests { use restate_core::{TaskKind, TestCoreEnvBuilder}; use restate_partition_store::PartitionStoreManager; use restate_rocksdb::RocksDbManager; - use restate_types::config::{CommonOptions, Configuration, RocksDbOptions, StorageOptions}; + use restate_types::config::{ + CommonOptions, Configuration, RocksDbOptions, SnapshotsOptions, StorageOptions, + }; use restate_types::health::HealthStatus; use restate_types::identifiers::{PartitionId, PartitionKey}; use restate_types::live::{Constant, Live}; @@ -931,6 +942,7 @@ mod tests { use restate_types::protobuf::node::Header; use restate_types::{GenerationalNodeId, Version}; use std::time::Duration; + use tempfile::TempDir; use test_log::test; /// This test ensures that the lifecycle of partition processors is properly managed by the @@ -970,6 +982,7 @@ mod tests { ) .await?; + let snapshots_options = SnapshotsOptions::default(); let partition_processor_manager = PartitionProcessorManager::new( env_builder.tc.clone(), health_status, @@ -979,6 +992,9 @@ mod tests { partition_store_manager, &mut env_builder.router_builder, bifrost, + SnapshotRepository::create(TempDir::new()?.into_path(), &snapshots_options) + .await + .map_err(BuildError::SnapshotsRepository)?, ); let env = env_builder.build().await;