diff --git a/crates/common/src/persistence.rs b/crates/common/src/persistence.rs index 2049e2fb..205ed9b4 100644 --- a/crates/common/src/persistence.rs +++ b/crates/common/src/persistence.rs @@ -307,6 +307,9 @@ pub trait RetentionValidator: Sync + Send { /// Call validate_snapshot *after* reading at the snapshot, to confirm all /// data in the snapshot is within retention. async fn validate_snapshot(&self, ts: Timestamp) -> anyhow::Result<()>; + /// Call validate_document_snapshot *after* reading at the snapshot, to + /// confirm the documents log is valid at this snapshot. + async fn validate_document_snapshot(&self, ts: Timestamp) -> anyhow::Result<()>; async fn min_snapshot_ts(&self) -> anyhow::Result; async fn min_document_snapshot_ts(&self) -> anyhow::Result; @@ -326,6 +329,7 @@ pub trait PersistenceReader: Send + Sync + 'static { range: TimestampRange, order: Order, page_size: u32, + retention_validator: Arc, ) -> DocumentStream<'_>; /// Loads documents within the given table and the given timestamp range. @@ -342,21 +346,13 @@ pub trait PersistenceReader: Send + Sync + 'static { range: TimestampRange, order: Order, page_size: u32, + retention_validator: Arc, ) -> DocumentStream<'_> { - self.load_documents(range, order, page_size) + self.load_documents(range, order, page_size, retention_validator) .try_filter(move |(_, doc_id, _)| future::ready(*doc_id.table() == table_id)) .boxed() } - /// Returns all timestamps in ascending (ts, id) order. - fn load_all_documents(&self) -> DocumentStream { - self.load_documents( - TimestampRange::all(), - Order::Asc, - *DEFAULT_DOCUMENTS_PAGE_SIZE, - ) - } - /// Look up the previous revision of `(id, ts)`, returning a map where for /// each `(id, ts)` we have... /// @@ -433,7 +429,12 @@ pub trait PersistenceReader: Send + Sync + 'static { async fn max_ts(&self) -> anyhow::Result> { // Fetch the document with the maximum timestamp and also MaxRepeatableTimestamp // in parallel. - let mut stream = self.load_documents(TimestampRange::all(), Order::Desc, 1); + let mut stream = self.load_documents( + TimestampRange::all(), + Order::Desc, + 1, + Arc::new(NoopRetentionValidator), + ); let max_repeatable = self.get_persistence_global(PersistenceGlobalKey::MaxRepeatableTimestamp); let (max_committed, max_repeatable) = try_join!(stream.try_next(), max_repeatable)?; @@ -444,6 +445,18 @@ pub trait PersistenceReader: Send + Sync + 'static { } fn version(&self) -> PersistenceVersion; + + /// Returns all timestamps and documents in ascending (ts, table_id, id) + /// order. Only should be used for testing + #[cfg(any(test, feature = "testing"))] + fn load_all_documents(&self) -> DocumentStream { + self.load_documents( + TimestampRange::all(), + Order::Asc, + *DEFAULT_DOCUMENTS_PAGE_SIZE, + Arc::new(NoopRetentionValidator), + ) + } } pub fn now_ts(max_ts: Timestamp, rt: &RT) -> anyhow::Result { @@ -508,16 +521,32 @@ impl RepeatablePersistence { /// Same as [`Persistence::load_all_documents`] but only including documents /// in the snapshot range. - pub fn load_all_documents(&self, order: Order) -> DocumentStream<'_> { - self.load_documents(TimestampRange::snapshot(*self.upper_bound), order) + pub fn load_all_documents( + &self, + order: Order, + retention_validator: Arc, + ) -> DocumentStream<'_> { + self.load_documents( + TimestampRange::snapshot(*self.upper_bound), + order, + retention_validator, + ) } /// Same as [`Persistence::load_documents`] but only including documents in /// the snapshot range. - pub fn load_documents(&self, range: TimestampRange, order: Order) -> DocumentStream<'_> { - let stream = self - .reader - .load_documents(range, order, *DEFAULT_DOCUMENTS_PAGE_SIZE); + pub fn load_documents( + &self, + range: TimestampRange, + order: Order, + retention_validator: Arc, + ) -> DocumentStream<'_> { + let stream = self.reader.load_documents( + range, + order, + *DEFAULT_DOCUMENTS_PAGE_SIZE, + retention_validator, + ); Box::pin(stream.try_filter(|(ts, ..)| future::ready(*ts <= *self.upper_bound))) } @@ -688,6 +717,10 @@ impl RetentionValidator for NoopRetentionValidator { Ok(()) } + async fn validate_document_snapshot(&self, _ts: Timestamp) -> anyhow::Result<()> { + Ok(()) + } + async fn min_snapshot_ts(&self) -> anyhow::Result { Ok(Timestamp::MIN) } diff --git a/crates/common/src/testing/persistence_test_suite.rs b/crates/common/src/testing/persistence_test_suite.rs index 773ebda0..ae7d2018 100644 --- a/crates/common/src/testing/persistence_test_suite.rs +++ b/crates/common/src/testing/persistence_test_suite.rs @@ -593,7 +593,13 @@ pub async fn test_load_documents_from_table( for page_size in 1..3 { let docs: Vec<_> = p .reader() - .load_documents_from_table(table_id, range, order, page_size) + .load_documents_from_table( + table_id, + range, + order, + page_size, + Arc::new(NoopRetentionValidator), + ) .try_collect() .await?; let docs: Vec<_> = docs.into_iter().collect(); @@ -611,7 +617,7 @@ pub async fn test_load_documents( ) -> anyhow::Result<()> { let docs: Vec<_> = p .reader() - .load_documents(range, order, 10) + .load_documents(range, order, 10, Arc::new(NoopRetentionValidator)) .try_collect() .await?; let docs: Vec<_> = docs diff --git a/crates/common/src/testing/test_persistence.rs b/crates/common/src/testing/test_persistence.rs index 178625de..7672cc6e 100644 --- a/crates/common/src/testing/test_persistence.rs +++ b/crates/common/src/testing/test_persistence.rs @@ -266,6 +266,7 @@ impl PersistenceReader for TestPersistence { range: TimestampRange, order: Order, _page_size: u32, + _retention_validator: Arc, ) -> DocumentStream<'_> { let log = { self.inner.lock().log.clone() }; diff --git a/crates/database/src/database.rs b/crates/database/src/database.rs index 32fd8b8c..681cf80f 100644 --- a/crates/database/src/database.rs +++ b/crates/database/src/database.rs @@ -602,10 +602,18 @@ impl DatabaseSnapshot { }; drop(load_indexes_into_memory_timer); - let (search_indexes, persistence_version) = - bootstrap_search(&index_registry, &repeatable_persistence, &table_mapping).await?; + let (search_indexes, persistence_version) = bootstrap_search( + &index_registry, + &repeatable_persistence, + &table_mapping, + retention_validator.clone(), + ) + .await?; let search = SearchIndexManager::from_bootstrap(search_indexes, persistence_version); - let vector = VectorIndexManager::bootstrap_index_metadata(&index_registry)?; + let vector = VectorIndexManager::bootstrap_index_metadata( + &index_registry, + retention_validator.clone(), + )?; // Step 3: Stream document changes since the last table summary snapshot so they // are up to date. @@ -896,6 +904,7 @@ impl Database { vector_persistence, table_mapping, self.committer.clone(), + self.retention_validator(), ) } @@ -928,6 +937,7 @@ impl Database { timestamp_range, Order::Asc, *DEFAULT_DOCUMENTS_PAGE_SIZE, + self.retention_validator(), ) .then(|val| async { while let Err(not_until) = rate_limiter.check() { @@ -1493,7 +1503,8 @@ impl Database { Some(ts) => TimestampRange::new((Bound::Excluded(ts), Bound::Unbounded))?, None => TimestampRange::all(), }; - let mut document_stream = repeatable_persistence.load_documents(range, Order::Asc); + let mut document_stream = + repeatable_persistence.load_documents(range, Order::Asc, self.retention_validator()); // deltas accumulated in (ts, id) order to return. let mut deltas = vec![]; // new_cursor is set once, when we know the final timestamp. @@ -1502,7 +1513,17 @@ impl Database { // should request another page. let mut has_more = false; let mut rows_read = 0; - while let Some((ts, id, maybe_doc)) = document_stream.try_next().await? { + while let Some((ts, id, maybe_doc)) = match document_stream.try_next().await { + Ok::<_, Error>(doc) => doc, + Err(e) if e.is_out_of_retention() => { + // Throws a user error if the documents window is out of retention + anyhow::bail!(ErrorMetadata::bad_request( + "InvalidWindowToReadDocuments", + "Documents cannot be read at the given timestamp" + )) + }, + Err(e) => anyhow::bail!(e), + } { rows_read += 1; if let Some(new_cursor) = new_cursor && new_cursor < ts diff --git a/crates/database/src/index_worker.rs b/crates/database/src/index_worker.rs index 74a7c73d..e271afd5 100644 --- a/crates/database/src/index_worker.rs +++ b/crates/database/src/index_worker.rs @@ -740,7 +740,7 @@ impl IndexWriter { index_selector: &'a IndexSelector, ) -> impl Stream> + 'a { let document_stream = reader - .load_documents(range, order) + .load_documents(range, order, self.retention_validator.clone()) .try_filter(|(_, id, _)| future::ready(index_selector.filter_id(*id))); stream_revision_pairs(document_stream, reader) } diff --git a/crates/database/src/persistence_helpers.rs b/crates/database/src/persistence_helpers.rs index adefc278..fb94fd78 100644 --- a/crates/database/src/persistence_helpers.rs +++ b/crates/database/src/persistence_helpers.rs @@ -1,6 +1,9 @@ +use std::sync::Arc; + use common::{ persistence::{ RepeatablePersistence, + RetentionValidator, TimestampRange, }, persistence_helpers::{ @@ -47,8 +50,9 @@ pub async fn stream_transactions<'a>( // id)` space in the middle of a transaction boundary. range: TimestampRange, order: Order, + retention_validator: Arc, ) { - let document_stream = reader.load_documents(range, order); + let document_stream = reader.load_documents(range, order, retention_validator); let revision_stream = stream_revision_pairs(document_stream, reader); futures::pin_mut!(revision_stream); diff --git a/crates/database/src/retention.rs b/crates/database/src/retention.rs index e4b4ec90..5b320755 100644 --- a/crates/database/src/retention.rs +++ b/crates/database/src/retention.rs @@ -442,6 +442,7 @@ impl LeaderRetentionManager { min_snapshot_ts: Timestamp, all_indexes: &BTreeMap, IndexedFields)>, persistence_version: PersistenceVersion, + retention_validator: Arc, ) { tracing::trace!( "expired_index_entries: reading expired index entries from {cursor:?} to {:?}", @@ -449,7 +450,11 @@ impl LeaderRetentionManager { ); let reader_ = &reader; let mut index_entry_chunks = reader - .load_documents(TimestampRange::new(cursor..min_snapshot_ts)?, Order::Asc) + .load_documents( + TimestampRange::new(cursor..min_snapshot_ts)?, + Order::Asc, + retention_validator, + ) .try_chunks(*RETENTION_READ_CHUNK) .map(move |chunk| async move { let chunk = chunk?.to_vec(); @@ -559,7 +564,7 @@ impl LeaderRetentionManager { let reader = persistence.reader(); let persistence_version = reader.version(); let snapshot_ts = new_static_repeatable_ts(min_snapshot_ts, reader.as_ref(), rt).await?; - let reader = RepeatablePersistence::new(reader, snapshot_ts, retention_validator); + let reader = RepeatablePersistence::new(reader, snapshot_ts, retention_validator.clone()); tracing::trace!("delete: about to grab chunks"); let expired_chunks = Self::expired_index_entries( @@ -568,6 +573,7 @@ impl LeaderRetentionManager { min_snapshot_ts, all_indexes, persistence_version, + retention_validator.clone(), ) .try_chunks(*RETENTION_DELETE_CHUNK); pin_mut!(expired_chunks); @@ -718,6 +724,7 @@ impl LeaderRetentionManager { &mut all_indexes, &mut index_cursor, index_table_id, + retention_validator.clone(), ) .await?; tracing::trace!("go_delete: Loaded initial indexes"); @@ -737,6 +744,7 @@ impl LeaderRetentionManager { &mut all_indexes, &mut index_cursor, index_table_id, + retention_validator.clone(), ) .await?; tracing::trace!("go_delete: loaded second round of indexes"); @@ -840,12 +848,14 @@ impl LeaderRetentionManager { all_indexes: &mut BTreeMap, IndexedFields)>, cursor: &mut Timestamp, index_table_id: TableIdAndTableNumber, + retention_validator: Arc, ) -> anyhow::Result<()> { let reader = persistence.reader(); let mut document_stream = reader.load_documents( TimestampRange::greater_than(*cursor), Order::Asc, *DEFAULT_DOCUMENTS_PAGE_SIZE, + retention_validator, ); while let Some((ts, _, maybe_doc)) = document_stream.try_next().await? { Self::accumulate_index_document(maybe_doc, all_indexes, index_table_id)?; @@ -863,7 +873,23 @@ impl RetentionValidator for LeaderRetentionManager { let min_snapshot_ts = self.bounds_reader.lock().min_snapshot_ts; log_snapshot_verification_age(&self.rt, ts, min_snapshot_ts, false, true); if ts < min_snapshot_ts { - anyhow::bail!(snapshot_invalid_error(ts, min_snapshot_ts)); + anyhow::bail!(snapshot_invalid_error( + ts, + min_snapshot_ts, + RetentionType::Index + )); + } + Ok(()) + } + + async fn validate_document_snapshot(&self, ts: Timestamp) -> anyhow::Result<()> { + let min_snapshot_ts = self.bounds_reader.lock().min_document_snapshot_ts; + if ts < min_snapshot_ts { + anyhow::bail!(snapshot_invalid_error( + ts, + min_snapshot_ts, + RetentionType::Document + )); } Ok(()) } @@ -962,7 +988,23 @@ impl RetentionValidator for FollowerRetentionManager { let min_snapshot_ts = self.min_snapshot_ts().await?; log_snapshot_verification_age(&self.rt, ts, min_snapshot_ts, false, false); if ts < min_snapshot_ts { - anyhow::bail!(snapshot_invalid_error(ts, min_snapshot_ts)); + anyhow::bail!(snapshot_invalid_error( + ts, + min_snapshot_ts, + RetentionType::Index + )); + } + Ok(()) + } + + async fn validate_document_snapshot(&self, ts: Timestamp) -> anyhow::Result<()> { + let min_snapshot_ts = self.min_document_snapshot_ts().await?; + if ts < min_snapshot_ts { + anyhow::bail!(snapshot_invalid_error( + ts, + min_snapshot_ts, + RetentionType::Document + )); } Ok(()) } @@ -1000,9 +1042,13 @@ impl RetentionValidator for FollowerRetentionManager { } } -fn snapshot_invalid_error(ts: Timestamp, min_snapshot_ts: Timestamp) -> anyhow::Error { +fn snapshot_invalid_error( + ts: Timestamp, + min_snapshot_ts: Timestamp, + retention_type: RetentionType, +) -> anyhow::Error { anyhow::anyhow!(ErrorMetadata::out_of_retention()).context(format!( - "Snapshot timestamp out of retention window: {ts} < {min_snapshot_ts}" + "{retention_type:?} snapshot timestamp out of retention window: {ts} < {min_snapshot_ts}" )) } @@ -1189,6 +1235,7 @@ mod tests { min_snapshot_ts, &all_indexes, persistence_version, + retention_validator.clone(), ); let expired: Vec<_> = expired_stream.try_collect().await?; diff --git a/crates/database/src/table_iteration.rs b/crates/database/src/table_iteration.rs index 00542844..f730d035 100644 --- a/crates/database/src/table_iteration.rs +++ b/crates/database/src/table_iteration.rs @@ -302,7 +302,11 @@ impl TableIterator { let repeatable_persistence = RepeatablePersistence::new(reader, end_ts, self.retention_validator.clone()); let documents = repeatable_persistence - .load_documents(TimestampRange::new(start_ts.succ()?..=*end_ts)?, Order::Asc) + .load_documents( + TimestampRange::new(start_ts.succ()?..=*end_ts)?, + Order::Asc, + self.retention_validator.clone(), + ) .try_chunks(self.page_size); pin_mut!(documents); let mut skipped_revs = BTreeSet::new(); diff --git a/crates/database/src/table_summary.rs b/crates/database/src/table_summary.rs index bed8447c..20286c48 100644 --- a/crates/database/src/table_summary.rs +++ b/crates/database/src/table_summary.rs @@ -510,8 +510,13 @@ pub async fn bootstrap( cmp::max(base_snapshot_ts, target_ts), retention_validator.clone(), ); - let transaction_stream = - stream_transactions(bootstrap_tables, &repeatable_persistence, range, order); + let transaction_stream = stream_transactions( + bootstrap_tables, + &repeatable_persistence, + range, + order, + retention_validator.clone(), + ); futures::pin_mut!(transaction_stream); let mut num_added = 0; diff --git a/crates/database/src/text_search_bootstrap.rs b/crates/database/src/text_search_bootstrap.rs index d83d6312..c81ce41c 100644 --- a/crates/database/src/text_search_bootstrap.rs +++ b/crates/database/src/text_search_bootstrap.rs @@ -2,6 +2,7 @@ use std::{ cmp::max, future, ops::Bound, + sync::Arc, }; use common::{ @@ -14,6 +15,7 @@ use common::{ }, persistence::{ RepeatablePersistence, + RetentionValidator, TimestampRange, }, persistence_helpers::stream_revision_pairs, @@ -43,6 +45,7 @@ pub async fn bootstrap_search( registry: &IndexRegistry, persistence: &RepeatablePersistence, table_mapping: &TableMapping, + retention_validator: Arc, ) -> anyhow::Result<(OrdMap, PersistenceVersion)> { let timer = crate::metrics::search::bootstrap_timer(); let mut num_revisions = 0; @@ -107,7 +110,11 @@ pub async fn bootstrap_search( Bound::Included(*persistence.upper_bound()), ); let document_stream = persistence - .load_documents(TimestampRange::new(range)?, Order::Asc) + .load_documents( + TimestampRange::new(range)?, + Order::Asc, + retention_validator.clone(), + ) .try_filter(|(_, id, _)| future::ready(id.table() == index.name.table())); let revision_stream = stream_revision_pairs(document_stream, persistence); futures::pin_mut!(revision_stream); @@ -233,8 +240,13 @@ mod tests { RepeatablePersistence::new(tp.reader(), tx.begin_timestamp(), retention_validator); let snapshot = db.snapshot(tx.begin_timestamp())?; - let (indexes, _) = - bootstrap_search(&snapshot.index_registry, &persistence, tx.table_mapping()).await?; + let (indexes, _) = bootstrap_search( + &snapshot.index_registry, + &persistence, + tx.table_mapping(), + Arc::new(NoopRetentionValidator), + ) + .await?; Ok(indexes) } diff --git a/crates/database/src/transaction_index.rs b/crates/database/src/transaction_index.rs index 777a3c18..bf1a0a0f 100644 --- a/crates/database/src/transaction_index.rs +++ b/crates/database/src/transaction_index.rs @@ -820,6 +820,7 @@ mod tests { persistence::{ now_ts, ConflictStrategy, + NoopRetentionValidator, Persistence, RepeatablePersistence, }, @@ -914,8 +915,13 @@ mod tests { )?; let index = BackendInMemoryIndexes::bootstrap(&index_registry, index_documents, ts)?; - let (indexes, version) = - bootstrap_search(&index_registry, &persistence, id_generator).await?; + let (indexes, version) = bootstrap_search( + &index_registry, + &persistence, + id_generator, + Arc::new(NoopRetentionValidator), + ) + .await?; let search = SearchIndexManager::from_bootstrap(indexes, version); Ok((index_registry, index, search, index_id_by_name)) diff --git a/crates/database/src/vector_bootstrap.rs b/crates/database/src/vector_bootstrap.rs index 9e95dbcf..14962ba4 100644 --- a/crates/database/src/vector_bootstrap.rs +++ b/crates/database/src/vector_bootstrap.rs @@ -5,6 +5,7 @@ use std::{ }, collections::BTreeMap, ops::Bound, + sync::Arc, time::Duration, }; @@ -20,6 +21,7 @@ use common::{ knobs::UDF_EXECUTOR_OCC_MAX_RETRIES, persistence::{ RepeatablePersistence, + RetentionValidator, TimestampRange, }, persistence_helpers::stream_revision_pairs, @@ -63,6 +65,7 @@ pub struct VectorBootstrapWorker { table_mapping: TableMapping, committer_client: CommitterClient, backoff: Backoff, + retention_validator: Arc, } const INITIAL_BACKOFF: Duration = Duration::from_millis(10); @@ -84,6 +87,7 @@ impl VectorBootstrapWorker { persistence: RepeatablePersistence, table_mapping: TableMapping, committer_client: CommitterClient, + retention_validator: Arc, ) -> Self { { Self { @@ -93,6 +97,7 @@ impl VectorBootstrapWorker { persistence, committer_client, backoff: Backoff::new(INITIAL_BACKOFF, MAX_BACKOFF), + retention_validator, } } } @@ -150,7 +155,12 @@ impl VectorBootstrapWorker { indexes_with_fast_forward_ts.push((index, fast_forward_ts)); } - Self::bootstrap(&self.persistence, indexes_with_fast_forward_ts).await + Self::bootstrap( + &self.persistence, + indexes_with_fast_forward_ts, + self.retention_validator.clone(), + ) + .await } fn vector_indexes_to_bootstrap( @@ -214,6 +224,7 @@ impl VectorBootstrapWorker { ParsedDocument, Option, )>, + retention_validator: Arc, ) -> anyhow::Result { let _status = log_worker_starting("VectorBootstrap"); let timer = vector::metrics::bootstrap_timer(); @@ -234,7 +245,11 @@ impl VectorBootstrapWorker { Bound::Excluded(oldest_index_ts), Bound::Included(*upper_bound), ); - let document_stream = persistence.load_documents(TimestampRange::new(range)?, Order::Asc); + let document_stream = persistence.load_documents( + TimestampRange::new(range)?, + Order::Asc, + retention_validator.clone(), + ); let revision_stream = stream_revision_pairs(document_stream, persistence); futures::pin_mut!(revision_stream); @@ -277,7 +292,10 @@ impl VectorBootstrapWorker { }) .collect(), ); - Ok(VectorIndexManager { indexes }) + Ok(VectorIndexManager { + indexes, + retention_validator: retention_validator.clone(), + }) } async fn finish_bootstrap( diff --git a/crates/sqlite/src/lib.rs b/crates/sqlite/src/lib.rs index 936b42a2..a5effd0c 100644 --- a/crates/sqlite/src/lib.rs +++ b/crates/sqlite/src/lib.rs @@ -111,6 +111,16 @@ impl SqlitePersistence { retention_validator.validate_snapshot(ts).await?; } + #[allow(clippy::needless_lifetimes)] + #[try_stream(ok = T, error = anyhow::Error)] + async fn validate_document_snapshot( + &self, + ts: Timestamp, + retention_validator: Arc, + ) { + retention_validator.validate_document_snapshot(ts).await?; + } + fn _index_scan_inner( &self, index_id: IndexId, @@ -486,6 +496,7 @@ impl PersistenceReader for SqlitePersistence { range: TimestampRange, order: Order, _page_size: u32, + retention_validator: Arc, ) -> DocumentStream<'_> { let triples = try { let connection = &self.inner.lock().connection; @@ -514,8 +525,12 @@ impl PersistenceReader for SqlitePersistence { } triples }; + // load_documents isn't async so we have to validate snapshot as part of the + // stream. + let validate = + self.validate_document_snapshot(range.min_timestamp_inclusive(), retention_validator); match triples { - Ok(s) => stream::iter(s).boxed(), + Ok(s) => (validate.chain(stream::iter(s))).boxed(), Err(e) => stream::once(async { Err(e) }).boxed(), } } diff --git a/crates/vector/src/vector_index_manager.rs b/crates/vector/src/vector_index_manager.rs index 1943e72f..4ccdce9d 100644 --- a/crates/vector/src/vector_index_manager.rs +++ b/crates/vector/src/vector_index_manager.rs @@ -20,6 +20,7 @@ use common::{ }, persistence::{ RepeatablePersistence, + RetentionValidator, TimestampRange, }, persistence_helpers::stream_revision_pairs, @@ -73,6 +74,7 @@ use crate::{ #[derive(Clone)] pub struct VectorIndexManager { pub indexes: IndexState, + pub retention_validator: Arc, } #[derive(Clone)] @@ -166,7 +168,11 @@ impl VectorIndexManager { anyhow::ensure!(!self.is_backfilling()); let range = (Bound::Excluded(bootstrap_ts), Bound::Unbounded); - let document_stream = persistence.load_documents(TimestampRange::new(range)?, Order::Asc); + let document_stream = persistence.load_documents( + TimestampRange::new(range)?, + Order::Asc, + self.retention_validator.clone(), + ); let revision_stream = stream_revision_pairs(document_stream, &persistence); futures::pin_mut!(revision_stream); @@ -185,11 +191,17 @@ impl VectorIndexManager { matches!(self.indexes, IndexState::Bootstrapping(..)) } - pub fn bootstrap_index_metadata(registry: &IndexRegistry) -> anyhow::Result { + pub fn bootstrap_index_metadata( + registry: &IndexRegistry, + retention_validator: Arc, + ) -> anyhow::Result { let _timer = bootstrap_vector_indexes_timer(); let vector_indexes_and_metadata = get_vector_index_states(registry)?; let indexes = IndexState::Bootstrapping(vector_indexes_and_metadata); - Ok(Self { indexes }) + Ok(Self { + indexes, + retention_validator, + }) } pub fn backfilled_and_enabled_index_sizes(