Skip to content

Commit

Permalink
Revert "Make search bootstrap async" (#23530)
Browse files Browse the repository at this point in the history
Reverting #23436 and #23377

GitOrigin-RevId: c2775a7e410cb31c3c7af5276ee65d5511696eb8
  • Loading branch information
emmaling27 authored and Convex, Inc. committed Mar 15, 2024
1 parent 2a8c8a5 commit 5845e4f
Show file tree
Hide file tree
Showing 22 changed files with 1,322 additions and 1,415 deletions.
11 changes: 5 additions & 6 deletions crates/application/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ pub struct Application<RT: Runtime> {
index_worker: Arc<Mutex<RT::Handle>>,
fast_forward_worker: Arc<Mutex<RT::Handle>>,
search_worker: Arc<Mutex<RT::Handle>>,
search_and_vector_bootstrap_worker: Arc<Mutex<RT::Handle>>,
vector_bootstrap_worker: Arc<Mutex<RT::Handle>>,
table_summary_worker: TableSummaryClient<RT>,
schema_worker: Arc<Mutex<RT::Handle>>,
snapshot_import_worker: Arc<Mutex<RT::Handle>>,
Expand Down Expand Up @@ -442,7 +442,7 @@ impl<RT: Runtime> Clone for Application<RT> {
index_worker: self.index_worker.clone(),
fast_forward_worker: self.fast_forward_worker.clone(),
search_worker: self.search_worker.clone(),
search_and_vector_bootstrap_worker: self.search_and_vector_bootstrap_worker.clone(),
vector_bootstrap_worker: self.vector_bootstrap_worker.clone(),
table_summary_worker: self.table_summary_worker.clone(),
schema_worker: self.schema_worker.clone(),
snapshot_import_worker: self.snapshot_import_worker.clone(),
Expand Down Expand Up @@ -535,8 +535,7 @@ impl<RT: Runtime> Application<RT> {
searcher,
);
let search_worker = Arc::new(Mutex::new(runtime.spawn("search_worker", search_worker)));
let search_and_vector_bootstrap_worker =
Arc::new(Mutex::new(database.start_search_and_vector_bootstrap()));
let vector_bootstrap_worker = Arc::new(Mutex::new(database.start_vector_bootstrap()));
let table_summary_worker =
TableSummaryWorker::start(runtime.clone(), database.clone(), persistence.clone());
let schema_worker = Arc::new(Mutex::new(runtime.spawn(
Expand Down Expand Up @@ -642,7 +641,7 @@ impl<RT: Runtime> Application<RT> {
index_worker,
fast_forward_worker,
search_worker,
search_and_vector_bootstrap_worker,
vector_bootstrap_worker,
table_summary_worker,
schema_worker,
export_worker,
Expand Down Expand Up @@ -2475,7 +2474,7 @@ impl<RT: Runtime> Application<RT> {
self.schema_worker.lock().shutdown();
self.index_worker.lock().shutdown();
self.search_worker.lock().shutdown();
self.search_and_vector_bootstrap_worker.lock().shutdown();
self.vector_bootstrap_worker.lock().shutdown();
self.export_worker.lock().shutdown();
self.snapshot_import_worker.lock().shutdown();
self.runner.shutdown().await?;
Expand Down
95 changes: 15 additions & 80 deletions crates/database/src/committer.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::{
cmp,
collections::BTreeSet,
ops::Bound,
sync::Arc,
};

Expand Down Expand Up @@ -33,10 +32,7 @@ use common::{
PersistenceReader,
RepeatablePersistence,
RetentionValidator,
TimestampRange,
},
persistence_helpers::stream_revision_pairs,
query::Order,
runtime::{
Runtime,
RuntimeInstant,
Expand All @@ -57,7 +53,6 @@ use common::{
DatabaseIndexValue,
RepeatableTimestamp,
Timestamp,
WriteTimestamp,
},
value::ResolvedDocumentId,
};
Expand All @@ -75,12 +70,9 @@ use futures::{
stream::FuturesOrdered,
FutureExt,
StreamExt,
TryStreamExt,
};
use indexing::index_registry::IndexRegistry;
use parking_lot::Mutex;
use prometheus::VMHistogram;
use search::SearchIndexManager;
use usage_tracking::{
DocInVectorIndex,
FunctionUsageTracker,
Expand Down Expand Up @@ -269,16 +261,11 @@ impl<RT: Runtime> Committer<RT> {
Some(CommitterMessage::BumpMaxRepeatableTs { result }) => {
self.bump_max_repeatable_ts(result);
},
Some(CommitterMessage::FinishSearchAndVectorBootstrap {
search_index_manager,
Some(CommitterMessage::FinishVectorBootstrap {
vector_index_manager, bootstrap_ts, result,
}) => {
self.finish_search_and_vector_bootstrap(
search_index_manager,
vector_index_manager,
bootstrap_ts,
result
).await;
self.finish_vector_bootstrap(vector_index_manager, bootstrap_ts, result)
.await;
},
Some(CommitterMessage::LoadIndexesIntoMemory {
tables, result
Expand All @@ -292,52 +279,8 @@ impl<RT: Runtime> Committer<RT> {
}
}

async fn update_indexes_since_bootstrap(
search_index_manager: &mut SearchIndexManager,
vector_index_manager: &mut VectorIndexManager,
bootstrap_ts: Timestamp,
persistence: RepeatablePersistence,
registry: &IndexRegistry,
retention_validator: Arc<dyn RetentionValidator>,
) -> anyhow::Result<()> {
anyhow::ensure!(
!search_index_manager.is_bootstrapping(),
"Trying to update search index while it's still bootstrapping"
);
anyhow::ensure!(
!vector_index_manager.is_bootstrapping(),
"Trying to update vector index while it's still bootstrapping"
);
let range = (Bound::Excluded(bootstrap_ts), Bound::Unbounded);

let document_stream = persistence.load_documents(
TimestampRange::new(range)?,
Order::Asc,
retention_validator,
);
let revision_stream = stream_revision_pairs(document_stream, &persistence);
futures::pin_mut!(revision_stream);

while let Some(revision_pair) = revision_stream.try_next().await? {
search_index_manager.update(
registry,
revision_pair.prev_document(),
revision_pair.document(),
WriteTimestamp::Committed(revision_pair.ts().succ()?),
)?;
vector_index_manager.update(
registry,
revision_pair.prev_document(),
revision_pair.document(),
WriteTimestamp::Committed(revision_pair.ts().succ()?),
)?;
}
Ok(())
}

async fn finish_search_and_vector_bootstrap(
async fn finish_vector_bootstrap(
&mut self,
mut search_index_manager: SearchIndexManager,
mut vector_index_manager: VectorIndexManager,
bootstrap_ts: RepeatableTimestamp,
result: oneshot::Sender<anyhow::Result<()>>,
Expand All @@ -356,15 +299,13 @@ impl<RT: Runtime> Committer<RT> {
self.retention_validator.clone(),
);

let res = Self::update_indexes_since_bootstrap(
&mut search_index_manager,
&mut vector_index_manager,
*bootstrap_ts,
repeatable_persistence,
&last_snapshot.index_registry,
self.retention_validator.clone(),
)
.await;
let res = vector_index_manager
.read_updates_since_bootstrap(
*bootstrap_ts,
repeatable_persistence,
&last_snapshot.index_registry,
)
.await;
if res.is_err() {
let _ = result.send(res);
return;
Expand All @@ -376,10 +317,7 @@ impl<RT: Runtime> Committer<RT> {
if latest_ts != snapshot_manager.latest_ts() {
panic!("Snapshots were changed concurrently during commit?");
}
snapshot_manager.overwrite_last_snapshot_search_and_vector_indexes(
search_index_manager,
vector_index_manager,
);
snapshot_manager.overwrite_last_snapshot_vectors(vector_index_manager);
tracing::info!("Committed backfilled vector indexes");
let _ = result.send(Ok(()));
}
Expand Down Expand Up @@ -818,15 +756,13 @@ impl<RT: Runtime> Clone for CommitterClient<RT> {
}

impl<RT: Runtime> CommitterClient<RT> {
pub async fn finish_search_and_vector_bootstrap(
pub async fn finish_vector_bootstrap(
&self,
search_index_manager: SearchIndexManager,
vector_index_manager: VectorIndexManager,
bootstrap_ts: RepeatableTimestamp,
) -> anyhow::Result<()> {
let (tx, rx) = oneshot::channel();
let message = CommitterMessage::FinishSearchAndVectorBootstrap {
search_index_manager,
let message = CommitterMessage::FinishVectorBootstrap {
vector_index_manager,
bootstrap_ts,
result: tx,
Expand Down Expand Up @@ -982,8 +918,7 @@ enum CommitterMessage {
tables: BTreeSet<TableName>,
result: oneshot::Sender<anyhow::Result<()>>,
},
FinishSearchAndVectorBootstrap {
search_index_manager: SearchIndexManager,
FinishVectorBootstrap {
vector_index_manager: VectorIndexManager,
bootstrap_ts: RepeatableTimestamp,
result: oneshot::Sender<anyhow::Result<()>>,
Expand Down
34 changes: 17 additions & 17 deletions crates/database/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ use pb::funrun::BootstrapMetadata as BootstrapMetadataProto;
use search::{
query::RevisionWithKeys,
SearchIndexManager,
SearchIndexManagerState,
Searcher,
};
use storage::Storage;
Expand Down Expand Up @@ -178,7 +177,6 @@ use crate::{
verify_invariants_timer,
},
retention::LeaderRetentionManager,
search_and_vector_bootstrap::SearchAndVectorIndexBootstrapWorker,
snapshot_manager::{
Snapshot,
SnapshotManager,
Expand All @@ -195,12 +193,14 @@ use crate::{
self,
TableSummarySnapshot,
},
text_search_bootstrap::bootstrap_search,
token::Token,
transaction_id_generator::TransactionIdGenerator,
transaction_index::{
SearchIndexManagerSnapshot,
TransactionIndex,
},
vector_bootstrap::VectorBootstrapWorker,
write_log::{
new_write_log,
LogReader,
Expand Down Expand Up @@ -602,10 +602,14 @@ impl DatabaseSnapshot {
};
drop(load_indexes_into_memory_timer);

let search = SearchIndexManager::new(
SearchIndexManagerState::Bootstrapping,
persistence.version(),
);
let (search_indexes, persistence_version) = bootstrap_search(
&index_registry,
&repeatable_persistence,
&table_mapping,
retention_validator.clone(),
)
.await?;
let search = SearchIndexManager::from_bootstrap(search_indexes, persistence_version);
let vector = VectorIndexManager::bootstrap_index_metadata(
&index_registry,
retention_validator.clone(),
Expand Down Expand Up @@ -878,27 +882,23 @@ impl<RT: Runtime> Database<RT> {
tracing::info!("Set search storage to {search_storage:?}");
}

pub fn start_search_and_vector_bootstrap(&self) -> RT::Handle {
let worker = self.new_search_and_vector_bootstrap_worker();
pub fn start_vector_bootstrap(&self) -> RT::Handle {
let worker = self.new_vector_bootstrap_worker();
self.runtime
.spawn("search_and_vector_bootstrap", async move {
worker.start().await
})
.spawn("vector_bootstrap", async move { worker.start().await })
}

#[cfg(test)]
pub fn new_search_and_vector_bootstrap_worker_for_testing(
&self,
) -> SearchAndVectorIndexBootstrapWorker<RT> {
self.new_search_and_vector_bootstrap_worker()
pub fn new_vector_bootstrap_worker_for_testing(&self) -> VectorBootstrapWorker<RT> {
self.new_vector_bootstrap_worker()
}

fn new_search_and_vector_bootstrap_worker(&self) -> SearchAndVectorIndexBootstrapWorker<RT> {
fn new_vector_bootstrap_worker(&self) -> VectorBootstrapWorker<RT> {
let (ts, snapshot) = self.snapshot_manager.lock().latest();
let vector_persistence =
RepeatablePersistence::new(self.reader.clone(), ts, self.retention_validator());
let table_mapping = snapshot.table_mapping().clone();
SearchAndVectorIndexBootstrapWorker::new(
VectorBootstrapWorker::new(
self.runtime.clone(),
snapshot.index_registry,
vector_persistence,
Expand Down
3 changes: 2 additions & 1 deletion crates/database/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,17 @@ mod preloaded;
pub mod query;
mod reads;
mod retention;
mod search_and_vector_bootstrap;
mod snapshot_manager;
mod stack_traces;
pub mod subscription;
mod table_registry;
pub mod table_summary;
mod text_search_bootstrap;
mod token;
mod transaction;
mod transaction_id_generator;
mod transaction_index;
mod vector_bootstrap;
pub mod vector_index_worker;
mod virtual_tables;
mod write_limits;
Expand Down
Loading

0 comments on commit 5845e4f

Please sign in to comment.