Skip to content

Commit

Permalink
pass RepeatableTimestamp through vector/text index backfill (#30292)
Browse files Browse the repository at this point in the history
currently we downgrade RepeatableTimestamp (which is a Timestamp with a proof that it is repeatable) to a plain Timestamp and then try to upgrade it again by waiting for the committer to sync the proof to persistence. We can pass through the RepeatableTimestamp and that should work.

In the case where we read a Timestamp out of the database, we know that it was created from a `tx.begin_timestamp()`, so it's repeatable and we can use the in-memory Database (don't need it to be synced to persistence) to prove it with `tx.begin_timestamp().prior_ts()`

GitOrigin-RevId: 55fde9a5c7652d6afbac73cf66dfb2082009fba1
  • Loading branch information
ldanilek authored and Convex, Inc. committed Oct 4, 2024
1 parent e779f2c commit 2428b08
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 14 deletions.
4 changes: 2 additions & 2 deletions crates/database/src/index_workers/search_flusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ impl<RT: Runtime, T: SearchIndex + 'static> SearchFlusher<RT, T> {
let data = SnapshotData::MultiSegment(new_and_updated_parts);

Ok(IndexBuildResult {
snapshot_ts: *new_ts,
snapshot_ts: new_ts,
data,
total_stats,
new_segment_stats,
Expand Down Expand Up @@ -615,7 +615,7 @@ pub(crate) struct IndexBuild<T: SearchIndex> {

#[derive(Debug)]
pub struct IndexBuildResult<T: SearchIndex> {
pub snapshot_ts: Timestamp,
pub snapshot_ts: RepeatableTimestamp,
pub data: SnapshotData<T::Segment>,
pub total_stats: T::Statistics,
pub new_segment_stats: Option<T::Statistics>,
Expand Down
26 changes: 14 additions & 12 deletions crates/database/src/index_workers/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use common::{
document::ParsedDocument,
knobs::DEFAULT_DOCUMENTS_PAGE_SIZE,
persistence::{
new_static_repeatable_ts,
PersistenceReader,
RepeatablePersistence,
TimestampRange,
Expand All @@ -30,7 +29,10 @@ use common::{
Mutex,
MutexGuard,
},
types::TabletIndexName,
types::{
RepeatableTimestamp,
TabletIndexName,
},
};
use governor::Quota;
use itertools::Itertools;
Expand Down Expand Up @@ -287,6 +289,7 @@ impl<RT: Runtime, T: SearchIndex> Inner<RT, T> {

let (developer_config, state) = T::extract_metadata(metadata)?;
let snapshot_ts = *state.ts().context("Compacted a segment without a ts?")?;
let snapshot_ts = tx.begin_timestamp().prior_ts(snapshot_ts)?;
let mut current_segments = state.segments().clone();

let is_merge_required =
Expand Down Expand Up @@ -403,7 +406,7 @@ impl<RT: Runtime, T: SearchIndex> Inner<RT, T> {
async fn commit_backfill_flush(
&self,
job: &IndexBuild<T>,
backfill_complete_ts: Timestamp,
backfill_complete_ts: RepeatableTimestamp,
mut new_and_modified_segments: Vec<T::Segment>,
new_segment_id: Option<String>,
backfill_result: MultiSegmentBackfillResult,
Expand Down Expand Up @@ -441,7 +444,7 @@ impl<RT: Runtime, T: SearchIndex> Inner<RT, T> {
developer_config,
if backfill_result.is_backfill_complete {
SearchOnDiskState::Backfilled(SearchSnapshot {
ts: backfill_complete_ts,
ts: *backfill_complete_ts,
data: SnapshotData::MultiSegment(new_and_modified_segments),
})
} else {
Expand All @@ -450,7 +453,7 @@ impl<RT: Runtime, T: SearchIndex> Inner<RT, T> {
cursor: backfill_result
.new_cursor
.map(|cursor| cursor.internal_id()),
backfill_snapshot_ts: Some(backfill_complete_ts),
backfill_snapshot_ts: Some(*backfill_complete_ts),
})
},
)
Expand All @@ -463,7 +466,7 @@ impl<RT: Runtime, T: SearchIndex> Inner<RT, T> {
async fn commit_snapshot_flush(
&self,
job: &IndexBuild<T>,
new_ts: Timestamp,
new_ts: RepeatableTimestamp,
mut new_and_modified_segments: Vec<T::Segment>,
new_segment_id: Option<String>,
schema: T::Schema,
Expand Down Expand Up @@ -530,7 +533,7 @@ impl<RT: Runtime, T: SearchIndex> Inner<RT, T> {
job.metadata_id,
job.index_name.clone(),
developer_config,
current_disk_state.with_updated_snapshot(new_ts, new_and_modified_segments)?,
current_disk_state.with_updated_snapshot(*new_ts, new_and_modified_segments)?,
)
.await?;

Expand All @@ -549,7 +552,7 @@ impl<RT: Runtime, T: SearchIndex> Inner<RT, T> {
&self,
segments_to_update: Vec<T::Segment>,
start_ts: Timestamp,
current_ts: Timestamp,
current_ts: RepeatableTimestamp,
index_name: TabletIndexName,
rate_limit_pages_per_second: NonZeroU32,
schema: T::Schema,
Expand Down Expand Up @@ -585,7 +588,7 @@ impl<RT: Runtime, T: SearchIndex> Inner<RT, T> {
reader: Arc<dyn PersistenceReader>,
segments_to_update: Vec<T::Segment>,
start_ts: Timestamp,
current_ts: Timestamp,
current_ts: RepeatableTimestamp,
index_name: TabletIndexName,
storage: Arc<dyn Storage>,
rate_limit_pages_per_second: NonZeroU32,
Expand All @@ -604,14 +607,13 @@ impl<RT: Runtime, T: SearchIndex> Inner<RT, T> {
T::download_previous_segments(&runtime, storage.clone(), segments_to_update).await?;
let documents = database.load_documents_in_table(
*index_name.table(),
TimestampRange::new((Bound::Excluded(start_ts), Bound::Included(current_ts)))?,
TimestampRange::new((Bound::Excluded(start_ts), Bound::Included(*current_ts)))?,
Order::Asc,
&row_rate_limiter,
);

let ts = new_static_repeatable_ts(current_ts, reader.as_ref(), &runtime).await?;
let repeatable_persistence =
RepeatablePersistence::new(reader, ts, database.retention_validator());
RepeatablePersistence::new(reader, current_ts, database.retention_validator());
T::merge_deletes(
&runtime,
&mut previous_segments,
Expand Down

0 comments on commit 2428b08

Please sign in to comment.