From 2428b088b71b59115ab2af2a60121a0fde58b570 Mon Sep 17 00:00:00 2001 From: Lee Danilek Date: Fri, 4 Oct 2024 11:09:03 -0400 Subject: [PATCH] pass RepeatableTimestamp through vector/text index backfill (#30292) currently we downgrade RepeatableTimestamp (which is a Timestamp with a proof that it is repeatable) to a plain Timestamp and then try to upgrade it again by waiting for the committer to sync the proof to persistence. We can pass through the RepeatableTimestamp and that should work. In the case where we read a Timestamp out of the database, we know that it was created from a `tx.begin_timestamp()`, so it's repeatable and we can use the in-memory Database (don't need it to be synced to persistence) to prove it with `tx.begin_timestamp().prior_ts()` GitOrigin-RevId: 55fde9a5c7652d6afbac73cf66dfb2082009fba1 --- .../src/index_workers/search_flusher.rs | 4 +-- crates/database/src/index_workers/writer.rs | 26 ++++++++++--------- 2 files changed, 16 insertions(+), 14 deletions(-) diff --git a/crates/database/src/index_workers/search_flusher.rs b/crates/database/src/index_workers/search_flusher.rs index b956636b..3a044ea1 100644 --- a/crates/database/src/index_workers/search_flusher.rs +++ b/crates/database/src/index_workers/search_flusher.rs @@ -425,7 +425,7 @@ impl SearchFlusher { let data = SnapshotData::MultiSegment(new_and_updated_parts); Ok(IndexBuildResult { - snapshot_ts: *new_ts, + snapshot_ts: new_ts, data, total_stats, new_segment_stats, @@ -615,7 +615,7 @@ pub(crate) struct IndexBuild { #[derive(Debug)] pub struct IndexBuildResult { - pub snapshot_ts: Timestamp, + pub snapshot_ts: RepeatableTimestamp, pub data: SnapshotData, pub total_stats: T::Statistics, pub new_segment_stats: Option, diff --git a/crates/database/src/index_workers/writer.rs b/crates/database/src/index_workers/writer.rs index 4aa9a0bd..1c4c2cf1 100644 --- a/crates/database/src/index_workers/writer.rs +++ b/crates/database/src/index_workers/writer.rs @@ -16,7 +16,6 @@ use common::{ document::ParsedDocument, knobs::DEFAULT_DOCUMENTS_PAGE_SIZE, persistence::{ - new_static_repeatable_ts, PersistenceReader, RepeatablePersistence, TimestampRange, @@ -30,7 +29,10 @@ use common::{ Mutex, MutexGuard, }, - types::TabletIndexName, + types::{ + RepeatableTimestamp, + TabletIndexName, + }, }; use governor::Quota; use itertools::Itertools; @@ -287,6 +289,7 @@ impl Inner { let (developer_config, state) = T::extract_metadata(metadata)?; let snapshot_ts = *state.ts().context("Compacted a segment without a ts?")?; + let snapshot_ts = tx.begin_timestamp().prior_ts(snapshot_ts)?; let mut current_segments = state.segments().clone(); let is_merge_required = @@ -403,7 +406,7 @@ impl Inner { async fn commit_backfill_flush( &self, job: &IndexBuild, - backfill_complete_ts: Timestamp, + backfill_complete_ts: RepeatableTimestamp, mut new_and_modified_segments: Vec, new_segment_id: Option, backfill_result: MultiSegmentBackfillResult, @@ -441,7 +444,7 @@ impl Inner { developer_config, if backfill_result.is_backfill_complete { SearchOnDiskState::Backfilled(SearchSnapshot { - ts: backfill_complete_ts, + ts: *backfill_complete_ts, data: SnapshotData::MultiSegment(new_and_modified_segments), }) } else { @@ -450,7 +453,7 @@ impl Inner { cursor: backfill_result .new_cursor .map(|cursor| cursor.internal_id()), - backfill_snapshot_ts: Some(backfill_complete_ts), + backfill_snapshot_ts: Some(*backfill_complete_ts), }) }, ) @@ -463,7 +466,7 @@ impl Inner { async fn commit_snapshot_flush( &self, job: &IndexBuild, - new_ts: Timestamp, + new_ts: RepeatableTimestamp, mut new_and_modified_segments: Vec, new_segment_id: Option, schema: T::Schema, @@ -530,7 +533,7 @@ impl Inner { job.metadata_id, job.index_name.clone(), developer_config, - current_disk_state.with_updated_snapshot(new_ts, new_and_modified_segments)?, + current_disk_state.with_updated_snapshot(*new_ts, new_and_modified_segments)?, ) .await?; @@ -549,7 +552,7 @@ impl Inner { &self, segments_to_update: Vec, start_ts: Timestamp, - current_ts: Timestamp, + current_ts: RepeatableTimestamp, index_name: TabletIndexName, rate_limit_pages_per_second: NonZeroU32, schema: T::Schema, @@ -585,7 +588,7 @@ impl Inner { reader: Arc, segments_to_update: Vec, start_ts: Timestamp, - current_ts: Timestamp, + current_ts: RepeatableTimestamp, index_name: TabletIndexName, storage: Arc, rate_limit_pages_per_second: NonZeroU32, @@ -604,14 +607,13 @@ impl Inner { T::download_previous_segments(&runtime, storage.clone(), segments_to_update).await?; let documents = database.load_documents_in_table( *index_name.table(), - TimestampRange::new((Bound::Excluded(start_ts), Bound::Included(current_ts)))?, + TimestampRange::new((Bound::Excluded(start_ts), Bound::Included(*current_ts)))?, Order::Asc, &row_rate_limiter, ); - let ts = new_static_repeatable_ts(current_ts, reader.as_ref(), &runtime).await?; let repeatable_persistence = - RepeatablePersistence::new(reader, ts, database.retention_validator()); + RepeatablePersistence::new(reader, current_ts, database.retention_validator()); T::merge_deletes( &runtime, &mut previous_segments,