Skip to content

Commit

Permalink
Enforce document retention (#23375)
Browse files Browse the repository at this point in the history
Add checks whenever we are reading directly from the `documents` log. This involved adding new methods to the `RetentionValidator` that allow us to validate a timestamp and passing through the `RetentionValidator` to all methods that access the documents log.

I also made `load_all_documents` a testing only feature.

GitOrigin-RevId: 57e6cde4ad7f6531c7aa7b531062d54fb5179478
  • Loading branch information
jordanhunt22 authored and Convex, Inc. committed Mar 13, 2024
1 parent 537864f commit 4b0e312
Show file tree
Hide file tree
Showing 14 changed files with 231 additions and 47 deletions.
67 changes: 50 additions & 17 deletions crates/common/src/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,9 @@ pub trait RetentionValidator: Sync + Send {
/// Call validate_snapshot *after* reading at the snapshot, to confirm all
/// data in the snapshot is within retention.
async fn validate_snapshot(&self, ts: Timestamp) -> anyhow::Result<()>;
/// Call validate_document_snapshot *after* reading at the snapshot, to
/// confirm the documents log is valid at this snapshot.
async fn validate_document_snapshot(&self, ts: Timestamp) -> anyhow::Result<()>;
async fn min_snapshot_ts(&self) -> anyhow::Result<Timestamp>;
async fn min_document_snapshot_ts(&self) -> anyhow::Result<Timestamp>;

Expand All @@ -326,6 +329,7 @@ pub trait PersistenceReader: Send + Sync + 'static {
range: TimestampRange,
order: Order,
page_size: u32,
retention_validator: Arc<dyn RetentionValidator>,
) -> DocumentStream<'_>;

/// Loads documents within the given table and the given timestamp range.
Expand All @@ -342,21 +346,13 @@ pub trait PersistenceReader: Send + Sync + 'static {
range: TimestampRange,
order: Order,
page_size: u32,
retention_validator: Arc<dyn RetentionValidator>,
) -> DocumentStream<'_> {
self.load_documents(range, order, page_size)
self.load_documents(range, order, page_size, retention_validator)
.try_filter(move |(_, doc_id, _)| future::ready(*doc_id.table() == table_id))
.boxed()
}

/// Returns all timestamps in ascending (ts, id) order.
fn load_all_documents(&self) -> DocumentStream {
self.load_documents(
TimestampRange::all(),
Order::Asc,
*DEFAULT_DOCUMENTS_PAGE_SIZE,
)
}

/// Look up the previous revision of `(id, ts)`, returning a map where for
/// each `(id, ts)` we have...
///
Expand Down Expand Up @@ -433,7 +429,12 @@ pub trait PersistenceReader: Send + Sync + 'static {
async fn max_ts(&self) -> anyhow::Result<Option<Timestamp>> {
// Fetch the document with the maximum timestamp and also MaxRepeatableTimestamp
// in parallel.
let mut stream = self.load_documents(TimestampRange::all(), Order::Desc, 1);
let mut stream = self.load_documents(
TimestampRange::all(),
Order::Desc,
1,
Arc::new(NoopRetentionValidator),
);
let max_repeatable =
self.get_persistence_global(PersistenceGlobalKey::MaxRepeatableTimestamp);
let (max_committed, max_repeatable) = try_join!(stream.try_next(), max_repeatable)?;
Expand All @@ -444,6 +445,18 @@ pub trait PersistenceReader: Send + Sync + 'static {
}

fn version(&self) -> PersistenceVersion;

/// Returns all timestamps and documents in ascending (ts, table_id, id)
/// order. Only should be used for testing
#[cfg(any(test, feature = "testing"))]
fn load_all_documents(&self) -> DocumentStream {
self.load_documents(
TimestampRange::all(),
Order::Asc,
*DEFAULT_DOCUMENTS_PAGE_SIZE,
Arc::new(NoopRetentionValidator),
)
}
}

pub fn now_ts<RT: Runtime>(max_ts: Timestamp, rt: &RT) -> anyhow::Result<Timestamp> {
Expand Down Expand Up @@ -508,16 +521,32 @@ impl RepeatablePersistence {

/// Same as [`Persistence::load_all_documents`] but only including documents
/// in the snapshot range.
pub fn load_all_documents(&self, order: Order) -> DocumentStream<'_> {
self.load_documents(TimestampRange::snapshot(*self.upper_bound), order)
pub fn load_all_documents(
&self,
order: Order,
retention_validator: Arc<dyn RetentionValidator>,
) -> DocumentStream<'_> {
self.load_documents(
TimestampRange::snapshot(*self.upper_bound),
order,
retention_validator,
)
}

/// Same as [`Persistence::load_documents`] but only including documents in
/// the snapshot range.
pub fn load_documents(&self, range: TimestampRange, order: Order) -> DocumentStream<'_> {
let stream = self
.reader
.load_documents(range, order, *DEFAULT_DOCUMENTS_PAGE_SIZE);
pub fn load_documents(
&self,
range: TimestampRange,
order: Order,
retention_validator: Arc<dyn RetentionValidator>,
) -> DocumentStream<'_> {
let stream = self.reader.load_documents(
range,
order,
*DEFAULT_DOCUMENTS_PAGE_SIZE,
retention_validator,
);
Box::pin(stream.try_filter(|(ts, ..)| future::ready(*ts <= *self.upper_bound)))
}

Expand Down Expand Up @@ -688,6 +717,10 @@ impl RetentionValidator for NoopRetentionValidator {
Ok(())
}

async fn validate_document_snapshot(&self, _ts: Timestamp) -> anyhow::Result<()> {
Ok(())
}

async fn min_snapshot_ts(&self) -> anyhow::Result<Timestamp> {
Ok(Timestamp::MIN)
}
Expand Down
10 changes: 8 additions & 2 deletions crates/common/src/testing/persistence_test_suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,13 @@ pub async fn test_load_documents_from_table<P: Persistence>(
for page_size in 1..3 {
let docs: Vec<_> = p
.reader()
.load_documents_from_table(table_id, range, order, page_size)
.load_documents_from_table(
table_id,
range,
order,
page_size,
Arc::new(NoopRetentionValidator),
)
.try_collect()
.await?;
let docs: Vec<_> = docs.into_iter().collect();
Expand All @@ -611,7 +617,7 @@ pub async fn test_load_documents<P: Persistence>(
) -> anyhow::Result<()> {
let docs: Vec<_> = p
.reader()
.load_documents(range, order, 10)
.load_documents(range, order, 10, Arc::new(NoopRetentionValidator))
.try_collect()
.await?;
let docs: Vec<_> = docs
Expand Down
1 change: 1 addition & 0 deletions crates/common/src/testing/test_persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ impl PersistenceReader for TestPersistence {
range: TimestampRange,
order: Order,
_page_size: u32,
_retention_validator: Arc<dyn RetentionValidator>,
) -> DocumentStream<'_> {
let log = { self.inner.lock().log.clone() };

Expand Down
31 changes: 26 additions & 5 deletions crates/database/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -602,10 +602,18 @@ impl DatabaseSnapshot {
};
drop(load_indexes_into_memory_timer);

let (search_indexes, persistence_version) =
bootstrap_search(&index_registry, &repeatable_persistence, &table_mapping).await?;
let (search_indexes, persistence_version) = bootstrap_search(
&index_registry,
&repeatable_persistence,
&table_mapping,
retention_validator.clone(),
)
.await?;
let search = SearchIndexManager::from_bootstrap(search_indexes, persistence_version);
let vector = VectorIndexManager::bootstrap_index_metadata(&index_registry)?;
let vector = VectorIndexManager::bootstrap_index_metadata(
&index_registry,
retention_validator.clone(),
)?;

// Step 3: Stream document changes since the last table summary snapshot so they
// are up to date.
Expand Down Expand Up @@ -896,6 +904,7 @@ impl<RT: Runtime> Database<RT> {
vector_persistence,
table_mapping,
self.committer.clone(),
self.retention_validator(),
)
}

Expand Down Expand Up @@ -928,6 +937,7 @@ impl<RT: Runtime> Database<RT> {
timestamp_range,
Order::Asc,
*DEFAULT_DOCUMENTS_PAGE_SIZE,
self.retention_validator(),
)
.then(|val| async {
while let Err(not_until) = rate_limiter.check() {
Expand Down Expand Up @@ -1493,7 +1503,8 @@ impl<RT: Runtime> Database<RT> {
Some(ts) => TimestampRange::new((Bound::Excluded(ts), Bound::Unbounded))?,
None => TimestampRange::all(),
};
let mut document_stream = repeatable_persistence.load_documents(range, Order::Asc);
let mut document_stream =
repeatable_persistence.load_documents(range, Order::Asc, self.retention_validator());
// deltas accumulated in (ts, id) order to return.
let mut deltas = vec![];
// new_cursor is set once, when we know the final timestamp.
Expand All @@ -1502,7 +1513,17 @@ impl<RT: Runtime> Database<RT> {
// should request another page.
let mut has_more = false;
let mut rows_read = 0;
while let Some((ts, id, maybe_doc)) = document_stream.try_next().await? {
while let Some((ts, id, maybe_doc)) = match document_stream.try_next().await {
Ok::<_, Error>(doc) => doc,
Err(e) if e.is_out_of_retention() => {
// Throws a user error if the documents window is out of retention
anyhow::bail!(ErrorMetadata::bad_request(
"InvalidWindowToReadDocuments",
"Documents cannot be read at the given timestamp"
))
},
Err(e) => anyhow::bail!(e),
} {
rows_read += 1;
if let Some(new_cursor) = new_cursor
&& new_cursor < ts
Expand Down
2 changes: 1 addition & 1 deletion crates/database/src/index_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -740,7 +740,7 @@ impl<RT: Runtime> IndexWriter<RT> {
index_selector: &'a IndexSelector,
) -> impl Stream<Item = anyhow::Result<RevisionPair>> + 'a {
let document_stream = reader
.load_documents(range, order)
.load_documents(range, order, self.retention_validator.clone())
.try_filter(|(_, id, _)| future::ready(index_selector.filter_id(*id)));
stream_revision_pairs(document_stream, reader)
}
Expand Down
6 changes: 5 additions & 1 deletion crates/database/src/persistence_helpers.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::sync::Arc;

use common::{
persistence::{
RepeatablePersistence,
RetentionValidator,
TimestampRange,
},
persistence_helpers::{
Expand Down Expand Up @@ -47,8 +50,9 @@ pub async fn stream_transactions<'a>(
// id)` space in the middle of a transaction boundary.
range: TimestampRange,
order: Order,
retention_validator: Arc<dyn RetentionValidator>,
) {
let document_stream = reader.load_documents(range, order);
let document_stream = reader.load_documents(range, order, retention_validator);
let revision_stream = stream_revision_pairs(document_stream, reader);
futures::pin_mut!(revision_stream);

Expand Down
59 changes: 53 additions & 6 deletions crates/database/src/retention.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,14 +442,19 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
min_snapshot_ts: Timestamp,
all_indexes: &BTreeMap<IndexId, (GenericIndexName<TableId>, IndexedFields)>,
persistence_version: PersistenceVersion,
retention_validator: Arc<dyn RetentionValidator>,
) {
tracing::trace!(
"expired_index_entries: reading expired index entries from {cursor:?} to {:?}",
min_snapshot_ts,
);
let reader_ = &reader;
let mut index_entry_chunks = reader
.load_documents(TimestampRange::new(cursor..min_snapshot_ts)?, Order::Asc)
.load_documents(
TimestampRange::new(cursor..min_snapshot_ts)?,
Order::Asc,
retention_validator,
)
.try_chunks(*RETENTION_READ_CHUNK)
.map(move |chunk| async move {
let chunk = chunk?.to_vec();
Expand Down Expand Up @@ -559,7 +564,7 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
let reader = persistence.reader();
let persistence_version = reader.version();
let snapshot_ts = new_static_repeatable_ts(min_snapshot_ts, reader.as_ref(), rt).await?;
let reader = RepeatablePersistence::new(reader, snapshot_ts, retention_validator);
let reader = RepeatablePersistence::new(reader, snapshot_ts, retention_validator.clone());

tracing::trace!("delete: about to grab chunks");
let expired_chunks = Self::expired_index_entries(
Expand All @@ -568,6 +573,7 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
min_snapshot_ts,
all_indexes,
persistence_version,
retention_validator.clone(),
)
.try_chunks(*RETENTION_DELETE_CHUNK);
pin_mut!(expired_chunks);
Expand Down Expand Up @@ -718,6 +724,7 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
&mut all_indexes,
&mut index_cursor,
index_table_id,
retention_validator.clone(),
)
.await?;
tracing::trace!("go_delete: Loaded initial indexes");
Expand All @@ -737,6 +744,7 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
&mut all_indexes,
&mut index_cursor,
index_table_id,
retention_validator.clone(),
)
.await?;
tracing::trace!("go_delete: loaded second round of indexes");
Expand Down Expand Up @@ -840,12 +848,14 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
all_indexes: &mut BTreeMap<IndexId, (GenericIndexName<TableId>, IndexedFields)>,
cursor: &mut Timestamp,
index_table_id: TableIdAndTableNumber,
retention_validator: Arc<dyn RetentionValidator>,
) -> anyhow::Result<()> {
let reader = persistence.reader();
let mut document_stream = reader.load_documents(
TimestampRange::greater_than(*cursor),
Order::Asc,
*DEFAULT_DOCUMENTS_PAGE_SIZE,
retention_validator,
);
while let Some((ts, _, maybe_doc)) = document_stream.try_next().await? {
Self::accumulate_index_document(maybe_doc, all_indexes, index_table_id)?;
Expand All @@ -863,7 +873,23 @@ impl<RT: Runtime> RetentionValidator for LeaderRetentionManager<RT> {
let min_snapshot_ts = self.bounds_reader.lock().min_snapshot_ts;
log_snapshot_verification_age(&self.rt, ts, min_snapshot_ts, false, true);
if ts < min_snapshot_ts {
anyhow::bail!(snapshot_invalid_error(ts, min_snapshot_ts));
anyhow::bail!(snapshot_invalid_error(
ts,
min_snapshot_ts,
RetentionType::Index
));
}
Ok(())
}

async fn validate_document_snapshot(&self, ts: Timestamp) -> anyhow::Result<()> {
let min_snapshot_ts = self.bounds_reader.lock().min_document_snapshot_ts;
if ts < min_snapshot_ts {
anyhow::bail!(snapshot_invalid_error(
ts,
min_snapshot_ts,
RetentionType::Document
));
}
Ok(())
}
Expand Down Expand Up @@ -962,7 +988,23 @@ impl<RT: Runtime> RetentionValidator for FollowerRetentionManager<RT> {
let min_snapshot_ts = self.min_snapshot_ts().await?;
log_snapshot_verification_age(&self.rt, ts, min_snapshot_ts, false, false);
if ts < min_snapshot_ts {
anyhow::bail!(snapshot_invalid_error(ts, min_snapshot_ts));
anyhow::bail!(snapshot_invalid_error(
ts,
min_snapshot_ts,
RetentionType::Index
));
}
Ok(())
}

async fn validate_document_snapshot(&self, ts: Timestamp) -> anyhow::Result<()> {
let min_snapshot_ts = self.min_document_snapshot_ts().await?;
if ts < min_snapshot_ts {
anyhow::bail!(snapshot_invalid_error(
ts,
min_snapshot_ts,
RetentionType::Document
));
}
Ok(())
}
Expand Down Expand Up @@ -1000,9 +1042,13 @@ impl<RT: Runtime> RetentionValidator for FollowerRetentionManager<RT> {
}
}

fn snapshot_invalid_error(ts: Timestamp, min_snapshot_ts: Timestamp) -> anyhow::Error {
fn snapshot_invalid_error(
ts: Timestamp,
min_snapshot_ts: Timestamp,
retention_type: RetentionType,
) -> anyhow::Error {
anyhow::anyhow!(ErrorMetadata::out_of_retention()).context(format!(
"Snapshot timestamp out of retention window: {ts} < {min_snapshot_ts}"
"{retention_type:?} snapshot timestamp out of retention window: {ts} < {min_snapshot_ts}"
))
}

Expand Down Expand Up @@ -1189,6 +1235,7 @@ mod tests {
min_snapshot_ts,
&all_indexes,
persistence_version,
retention_validator.clone(),
);
let expired: Vec<_> = expired_stream.try_collect().await?;

Expand Down
Loading

0 comments on commit 4b0e312

Please sign in to comment.