Skip to content

Commit 00de3b0

Browse files
committed
Add support for time-based snapshot intervals
1 parent 31716d7 commit 00de3b0

File tree

9 files changed

+131
-45
lines changed

9 files changed

+131
-45
lines changed

crates/partition-store/src/partition_db.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,15 @@ use restate_types::partitions::{CfName, Partition};
3030

3131
use crate::durable_lsn_tracking::{AppliedLsnCollectorFactory, DurableLsnEventListener};
3232
use crate::memory::MemoryBudget;
33-
use crate::snapshots::LocalPartitionSnapshot;
33+
use crate::snapshots::{ArchivedLsn, LocalPartitionSnapshot};
3434

3535
type SmartString = smartstring::SmartString<smartstring::LazyCompact>;
3636

3737
#[derive(Clone)]
3838
pub struct PartitionDb {
3939
meta: Arc<Partition>,
4040
durable_lsn: watch::Sender<Option<Lsn>>,
41-
archived_lsn: watch::Sender<Option<Lsn>>,
41+
archived_lsn: watch::Sender<Option<ArchivedLsn>>,
4242
// Note: Rust will drop the fields in the order they are declared in the struct.
4343
// It's crucial to keep the column family and the database in this exact order.
4444
cf: PartitionBoundCfHandle,
@@ -48,7 +48,7 @@ pub struct PartitionDb {
4848
impl PartitionDb {
4949
pub(crate) fn new(
5050
meta: Arc<Partition>,
51-
archived_lsn: watch::Sender<Option<Lsn>>,
51+
archived_lsn: watch::Sender<Option<ArchivedLsn>>,
5252
rocksdb: Arc<RocksDb>,
5353
cf: Arc<BoundColumnFamily<'_>>,
5454
) -> Self {
@@ -94,10 +94,10 @@ impl PartitionDb {
9494
.await
9595
}
9696

97-
pub(crate) fn note_archived_lsn(&self, lsn: Lsn) -> bool {
97+
pub(crate) fn note_archived_lsn(&self, archived_lsn: ArchivedLsn) -> bool {
9898
self.archived_lsn.send_if_modified(|current| {
99-
if current.is_none_or(|c| lsn > c) {
100-
*current = Some(lsn);
99+
if current.as_mut().is_none_or(|c| &archived_lsn > c) {
100+
*current = Some(archived_lsn);
101101
true
102102
} else {
103103
false
@@ -106,11 +106,11 @@ impl PartitionDb {
106106
}
107107

108108
/// The last (locally) known archived LSN for this partition
109-
pub fn get_archived_lsn(&self) -> Option<Lsn> {
109+
pub fn get_archived_lsn(&self) -> Option<ArchivedLsn> {
110110
*self.archived_lsn.borrow()
111111
}
112112

113-
pub fn watch_archived_lsn(&self) -> watch::Receiver<Option<Lsn>> {
113+
pub fn watch_archived_lsn(&self) -> watch::Receiver<Option<ArchivedLsn>> {
114114
self.archived_lsn.subscribe()
115115
}
116116

@@ -171,7 +171,7 @@ impl PartitionBoundCfHandle {
171171

172172
pub(crate) struct PartitionCell {
173173
meta: Arc<Partition>,
174-
archived_lsn: watch::Sender<Option<Lsn>>,
174+
archived_lsn: watch::Sender<Option<ArchivedLsn>>,
175175
durable_lsn: RwLock<Option<watch::Sender<Option<Lsn>>>>,
176176
pub(crate) inner: AsyncRwLock<State>,
177177
}

crates/partition-store/src/partition_store_manager.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use restate_types::partitions::Partition;
2626
use crate::SnapshotError;
2727
use crate::memory::MemoryController;
2828
use crate::partition_db::{AllDataCf, PartitionCell, PartitionDb, RocksConfigurator};
29-
use crate::snapshots::{LocalPartitionSnapshot, Snapshots};
29+
use crate::snapshots::{ArchivedLsn, LocalPartitionSnapshot, Snapshots};
3030
use crate::{BuildError, OpenError, PartitionStore, SnapshotErrorKind};
3131

3232
const PARTITION_CF_PREFIX: &str = "data-";
@@ -183,7 +183,10 @@ impl PartitionStoreManager {
183183
self.snapshots.is_repository_configured()
184184
}
185185

186-
pub async fn refresh_latest_archived_lsn(&self, partition_id: PartitionId) -> Option<Lsn> {
186+
pub async fn refresh_latest_archived_lsn(
187+
&self,
188+
partition_id: PartitionId,
189+
) -> Option<ArchivedLsn> {
187190
let db = self.get_partition_db(partition_id).await?;
188191
self.snapshots.refresh_latest_archived_lsn(db).await
189192
}

crates/partition-store/src/snapshots.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,15 @@ use std::sync::Arc;
1818
use crate::{PartitionDb, PartitionStore, SnapshotError, SnapshotErrorKind};
1919

2020
pub use self::metadata::*;
21-
pub use self::repository::SnapshotRepository;
21+
pub use self::repository::{ArchivedLsn, SnapshotRepository};
2222
pub use self::snapshot_task::*;
2323

2424
use tokio::sync::Semaphore;
2525
use tracing::{debug, instrument, warn};
2626

2727
use restate_types::config::Configuration;
2828
use restate_types::identifiers::{PartitionId, SnapshotId};
29-
use restate_types::logs::{Lsn, SequenceNumber};
29+
use restate_types::logs::Lsn;
3030

3131
#[derive(Clone)]
3232
pub struct Snapshots {
@@ -85,7 +85,7 @@ impl Snapshots {
8585
})
8686
}
8787

88-
pub async fn refresh_latest_archived_lsn(&self, db: PartitionDb) -> Option<Lsn> {
88+
pub async fn refresh_latest_archived_lsn(&self, db: PartitionDb) -> Option<ArchivedLsn> {
8989
let Some(repository) = &self.repository else {
9090
return None;
9191
};
@@ -94,10 +94,9 @@ impl Snapshots {
9494
let archived_lsn = repository
9595
.get_latest_archived_lsn(partition_id)
9696
.await
97-
.inspect(|lsn| debug!(?partition_id, "Latest archived LSN: {}", lsn))
9897
.inspect_err(|err| warn!(?partition_id, "Unable to get latest archived LSN: {}", err))
9998
.ok()
100-
.unwrap_or(Lsn::INVALID);
99+
.unwrap_or(ArchivedLsn::None);
101100
db.note_archived_lsn(archived_lsn);
102101
Some(archived_lsn)
103102
}

crates/partition-store/src/snapshots/repository.rs

Lines changed: 55 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
use std::collections::HashMap;
1212
use std::path::{Path, PathBuf};
1313
use std::sync::Arc;
14+
use std::time::{Duration, SystemTime, SystemTimeError};
1415

1516
use anyhow::{Context, anyhow, bail};
1617
use bytes::BytesMut;
@@ -152,6 +153,54 @@ impl LatestSnapshot {
152153
}
153154
}
154155

156+
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
157+
pub enum ArchivedLsn {
158+
None,
159+
Snapshot {
160+
// Ordering is intentional: LSN takes priority over elapsed wall clock time for comparisons
161+
min_applied_lsn: Lsn,
162+
created_at: SystemTime,
163+
},
164+
}
165+
166+
impl ArchivedLsn {
167+
pub fn get_min_applied_lsn(&self) -> Lsn {
168+
match self {
169+
ArchivedLsn::None => Lsn::INVALID,
170+
ArchivedLsn::Snapshot {
171+
min_applied_lsn, ..
172+
} => *min_applied_lsn,
173+
}
174+
}
175+
176+
pub fn get_age(&self) -> Result<Duration, SystemTimeError> {
177+
match self {
178+
ArchivedLsn::None => Ok(Duration::MAX),
179+
ArchivedLsn::Snapshot { created_at, .. } => {
180+
SystemTime::now().duration_since(*created_at)
181+
}
182+
}
183+
}
184+
}
185+
186+
impl From<&LatestSnapshot> for ArchivedLsn {
187+
fn from(latest: &LatestSnapshot) -> Self {
188+
ArchivedLsn::Snapshot {
189+
min_applied_lsn: latest.min_applied_lsn,
190+
created_at: latest.created_at.into(),
191+
}
192+
}
193+
}
194+
195+
impl From<&PartitionSnapshotMetadata> for ArchivedLsn {
196+
fn from(metadata: &PartitionSnapshotMetadata) -> Self {
197+
ArchivedLsn::Snapshot {
198+
min_applied_lsn: metadata.min_applied_lsn,
199+
created_at: metadata.created_at.into(),
200+
}
201+
}
202+
}
203+
155204
struct UniqueSnapshotKey {
156205
lsn: Lsn,
157206
snapshot_id: SnapshotId,
@@ -543,14 +592,17 @@ impl SnapshotRepository {
543592

544593
/// Retrieve the latest known LSN to be archived to the snapshot repository.
545594
/// Response of `Ok(Lsn::INVALID)` indicates no existing snapshot for the partition.
546-
pub async fn get_latest_archived_lsn(&self, partition_id: PartitionId) -> anyhow::Result<Lsn> {
595+
pub async fn get_latest_archived_lsn(
596+
&self,
597+
partition_id: PartitionId,
598+
) -> anyhow::Result<ArchivedLsn> {
547599
let latest_path = self.get_latest_snapshot_pointer(partition_id);
548600

549601
let latest = match self.object_store.get(&latest_path).await {
550602
Ok(result) => result,
551603
Err(object_store::Error::NotFound { .. }) => {
552604
debug!("Latest snapshot data not found in repository");
553-
return Ok(Lsn::INVALID);
605+
return Ok(ArchivedLsn::None);
554606
}
555607
Err(e) => {
556608
return Err(anyhow::Error::new(e).context(format!(
@@ -562,7 +614,7 @@ impl SnapshotRepository {
562614
let latest: LatestSnapshot = serde_json::from_slice(&latest.bytes().await?)?;
563615
debug!(partition_id = %partition_id, snapshot_id = %latest.snapshot_id, "Latest snapshot metadata: {:?}", latest);
564616

565-
Ok(latest.min_applied_lsn)
617+
Ok(ArchivedLsn::from(&latest))
566618
}
567619

568620
async fn get_latest_snapshot_metadata_for_update(

crates/partition-store/src/snapshots/snapshot_task.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use super::{
2424
SnapshotFormatVersion, SnapshotRepository,
2525
};
2626
use crate::PartitionStoreManager;
27+
use crate::snapshots::ArchivedLsn;
2728

2829
/// Creates a partition store snapshot along with Restate snapshot metadata.
2930
pub struct SnapshotPartitionTask {
@@ -91,7 +92,7 @@ impl SnapshotPartitionTask {
9192
.get_partition_db(self.partition_id)
9293
.await
9394
{
94-
db.note_archived_lsn(metadata.min_applied_lsn);
95+
db.note_archived_lsn(ArchivedLsn::from(&metadata));
9596
}
9697

9798
Ok(metadata)

crates/types/src/config/worker.rs

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -434,11 +434,12 @@ impl Default for StorageOptions {
434434
}
435435
}
436436

437-
/// # Snapshot options.
437+
/// # Snapshot options
438438
///
439-
/// Partition store snapshotting settings. At a minimum, set `destination` and
440-
/// `snapshot-interval-num-records` to enable snapshotting. For a complete example, see
441-
/// [Snapshots](https://docs.restate.dev/operate/snapshots).
439+
/// Partition store object-store snapshotting settings. At a minimum, set `destination` to enable
440+
/// manual snapshotting via `restatectl`. Additionally, `snapshot-interval` and
441+
/// `snapshot-interval-num-records` can be used to configure automated periodic snapshots. For a
442+
/// complete example, see [Snapshots](https://docs.restate.dev/operate/snapshots).
442443
#[serde_as]
443444
#[derive(Debug, Clone, Serialize, Deserialize, derive_builder::Builder)]
444445
#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
@@ -448,20 +449,35 @@ impl Default for StorageOptions {
448449
pub struct SnapshotsOptions {
449450
/// # Snapshot destination URL
450451
///
451-
/// Base URL for cluster snapshots. Supports `s3://` and `file://` protocol scheme.
452+
/// Base URL for cluster snapshots. Currently only supports the `s3://` protocol scheme.
452453
/// S3-compatible object stores must support ETag-based conditional writes.
453454
///
454455
/// Default: `None`
455456
pub destination: Option<String>,
456457

457-
/// # Automatic snapshot creation frequency
458+
/// # Automatic snapshot time interval
459+
///
460+
/// A time interval at which partition snapshots will be created. If
461+
/// `snapshot-interval-num-records` is also set, it will be treated as an additional requirement
462+
/// before a snapshot is taken. Use both time-based and record-based intervals to reduce the
463+
/// number of snapshots created during times of low activity.
464+
///
465+
/// Snapshot intervals are calculated based on the wall clock timestamps reported by cluster
466+
/// nodes, assuming a basic level of clock synchronization within the cluster.
467+
///
468+
/// This setting does not influence explicitly requested snapshots triggered using `restatectl`.
469+
///
470+
/// Default: `None` - automatic snapshots are disabled
471+
#[serde(skip_serializing_if = "Option::is_none", default)]
472+
pub snapshot_interval: Option<FriendlyDuration>,
473+
474+
/// # Automatic snapshot minimum records
458475
///
459476
/// Number of log records that trigger a snapshot to be created.
460477
///
461478
/// As snapshots are created asynchronously, the actual number of new records that will trigger
462479
/// a snapshot will vary. The counter for the subsequent snapshot begins from the LSN at which
463-
/// the previous snapshot export was initiated. Only leader Partition Processors will take
464-
/// snapshots for a given partition.
480+
/// the previous snapshot export was initiated.
465481
///
466482
/// This setting does not influence explicitly requested snapshots triggered using `restatectl`.
467483
///
@@ -481,6 +497,7 @@ impl Default for SnapshotsOptions {
481497
fn default() -> Self {
482498
Self {
483499
destination: None,
500+
snapshot_interval: None,
484501
snapshot_interval_num_records: None,
485502
object_store: Default::default(),
486503
object_store_retry_policy: Self::default_retry_policy(),

crates/worker/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,8 @@ impl Worker {
126126
SubscriptionControllerHandle::new(ingress_kafka.create_command_sender());
127127

128128
let snapshots_options = &config.worker.snapshots;
129-
if snapshots_options.snapshot_interval_num_records.is_some()
129+
if (snapshots_options.snapshot_interval.is_some()
130+
|| snapshots_options.snapshot_interval_num_records.is_some())
130131
&& snapshots_options.destination.is_none()
131132
{
132133
return Err(BuildError::SnapshotRepository(anyhow::anyhow!(

crates/worker/src/partition/leadership/durability_tracker.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use std::task::Poll;
1313
use std::time::Duration;
1414

1515
use futures::{Stream, StreamExt};
16+
use restate_partition_store::snapshots::ArchivedLsn;
1617
use tokio::sync::watch;
1718
use tokio::time::{Instant, MissedTickBehavior};
1819
use tokio_stream::wrappers::{IntervalStream, WatchStream};
@@ -39,7 +40,7 @@ pub struct DurabilityTracker {
3940
partition_id: PartitionId,
4041
last_reported_durable_lsn: Lsn,
4142
replica_set_states: PartitionReplicaSetStates,
42-
archived_lsn_watch: WatchStream<Option<Lsn>>,
43+
archived_lsn_watch: WatchStream<Option<ArchivedLsn>>,
4344
check_timer: IntervalStream,
4445
last_warning_at: Instant,
4546
/// cache of the last archived_lsn
@@ -52,7 +53,7 @@ impl DurabilityTracker {
5253
partition_id: PartitionId,
5354
last_reported_durable_lsn: Option<Lsn>,
5455
replica_set_states: PartitionReplicaSetStates,
55-
archived_lsn_watch: watch::Receiver<Option<Lsn>>,
56+
archived_lsn_watch: watch::Receiver<Option<ArchivedLsn>>,
5657
check_interval: Duration,
5758
) -> Self {
5859
let mut check_timer =
@@ -166,7 +167,9 @@ impl Stream for DurabilityTracker {
166167
self.terminated = true;
167168
return Poll::Ready(None);
168169
}
169-
(Poll::Ready(Some(archived)), _) => archived.unwrap_or(Lsn::INVALID),
170+
(Poll::Ready(Some(archived)), _) => archived
171+
.map(|a| a.get_min_applied_lsn())
172+
.unwrap_or(Lsn::INVALID),
170173
(_, Poll::Ready(_)) => self.last_archived,
171174
(Poll::Pending, Poll::Pending) => return Poll::Pending,
172175
};

0 commit comments

Comments
 (0)