From 009322a28c02e2a498024b3e57eedcf1728d315d Mon Sep 17 00:00:00 2001 From: Lee Danilek Date: Wed, 21 Aug 2024 17:11:06 -0400 Subject: [PATCH] make SchemaRegistry (#29075) goal: keep parsed schemas in memory so we don't have to re-parse them on every `db.insert` i copied the pattern of TableRegistry which is pretty similar. things to note: 1. there's a two-phase-commit-like thing whenever we do a write. this `begin_update`/`apply`/`update` pattern was copied directly from TableRegistry 2. we only store the Active/Pending/Validated schema, and we only allow lookups by status. we could later add lookups by ID but that doesn't seem necessary yet, since it's only looked up by ID when the schema is changing. 3. need to track the readset directly, so i pushed that down into `Transaction::get_schema_by_state`. the goal is to make it hard to read a schema without taking the correct read dependency. we could pass a `&mut TransactionReadSet` into the `SchemaRegistry::get_by_state` method, which would be even more safe but doesn't match the existing patterns. 4. the function runner wants to load registries from memory, but this is a little tricky: for schemas in components created after the in memory indexes were bootstrapped, the `_schemas` table isn't in memory so we failed to load registries. to fix this, i added a call to `self.database.load_indexes_into_memory` and tests started passing again GitOrigin-RevId: 420efb6ed0898d3aaf1d241b655b85e84497defa --- crates/application/src/deploy_config.rs | 9 +- .../src/bootstrap_model/schema_state.rs | 11 +- .../src/bootstrap_model/schema/mod.rs | 45 +---- crates/database/src/database.rs | 23 ++- crates/database/src/lib.rs | 2 + crates/database/src/schema_registry.rs | 171 ++++++++++++++++++ crates/database/src/snapshot_manager.rs | 8 + crates/database/src/transaction.rs | 67 +++++-- crates/database/src/writes.rs | 2 + .../function_runner/src/in_memory_indexes.rs | 46 ++++- 10 files changed, 326 insertions(+), 58 deletions(-) create mode 100644 crates/database/src/schema_registry.rs diff --git a/crates/application/src/deploy_config.rs b/crates/application/src/deploy_config.rs index a8a260a9..5c7d7d3f 100644 --- a/crates/application/src/deploy_config.rs +++ b/crates/application/src/deploy_config.rs @@ -24,10 +24,14 @@ use common::{ NodeDependency, }, }; -use database::WriteSource; +use database::{ + WriteSource, + SCHEMAS_TABLE, +}; use errors::ErrorMetadata; use isolate::EvaluateAppDefinitionsResult; use keybroker::Identity; +use maplit::btreeset; use model::{ auth::types::AuthDiff, components::{ @@ -159,6 +163,9 @@ impl Application { .await?; if !dry_run { self.commit(tx, WriteSource::new("start_push")).await?; + self.database + .load_indexes_into_memory(btreeset! { SCHEMAS_TABLE.clone() }) + .await?; } schema_change }; diff --git a/crates/common/src/bootstrap_model/schema_state.rs b/crates/common/src/bootstrap_model/schema_state.rs index c9490bcf..cbdfb3bc 100644 --- a/crates/common/src/bootstrap_model/schema_state.rs +++ b/crates/common/src/bootstrap_model/schema_state.rs @@ -25,7 +25,7 @@ use value::codegen_convex_serialization; /// time. /// /// 2. At most one schema can be in the `Active` state at a time. -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] #[cfg_attr(any(test, feature = "testing"), derive(proptest_derive::Arbitrary))] pub enum SchemaState { Pending, @@ -38,6 +38,15 @@ pub enum SchemaState { Overwritten, } +impl SchemaState { + /// Indicates a schema should be cached because it can be used for writes, + /// and it can be cached by state because at most one schema can exist in + /// the state. + pub fn is_unique(&self) -> bool { + matches!(self, Self::Pending | Self::Validated | Self::Active) + } +} + #[derive(Serialize, Deserialize)] #[serde(tag = "state", rename_all = "camelCase")] pub enum SerializedSchemaState { diff --git a/crates/database/src/bootstrap_model/schema/mod.rs b/crates/database/src/bootstrap_model/schema/mod.rs index 80905b39..b581fc8a 100644 --- a/crates/database/src/bootstrap_model/schema/mod.rs +++ b/crates/database/src/bootstrap_model/schema/mod.rs @@ -19,8 +19,6 @@ use common::{ ResolvedDocument, }, query::{ - IndexRange, - IndexRangeExpression, Order, Query, }, @@ -33,8 +31,6 @@ use common::{ }; use errors::ErrorMetadata; use value::{ - val, - ConvexObject, FieldPath, NamespacedTableMapping, ResolvedDocumentId, @@ -216,35 +212,13 @@ impl<'a, RT: Runtime> SchemaModel<'a, RT> { &mut self, state: SchemaState, ) -> anyhow::Result> { - match state { - SchemaState::Pending | SchemaState::Validated | SchemaState::Active => {}, - SchemaState::Failed { .. } | SchemaState::Overwritten => anyhow::bail!( - "Getting schema by state is only permitted for Pending, Validated, or Active \ - states, since Failed or Overwritten states may have multiple documents." - ), - } - let state_value = val!(state); - let index_range = IndexRange { - index_name: SCHEMAS_STATE_INDEX.clone(), - range: vec![IndexRangeExpression::Eq( - SCHEMA_STATE_FIELD.clone(), - state_value.into(), - )], - order: Order::Asc, - }; - let query = Query::index_range(index_range); - let mut query_stream = ResolvedQuery::new(self.tx, self.namespace, query)?; - let schema = query_stream - .expect_at_most_one(self.tx) - .await? - .map(|doc| { - Ok::<(ResolvedDocumentId, DatabaseSchema), anyhow::Error>(( - doc.id().to_owned(), - parse_schema_traced(doc.into_value().into_value())?.schema, - )) - }) - .transpose()?; - Ok(schema) + anyhow::ensure!( + state.is_unique(), + "Getting schema by state is only permitted for Pending, Validated, or Active states, \ + since Failed or Overwritten states may have multiple documents." + ); + let schema_doc = self.tx.get_schema_by_state(self.namespace, state)?; + Ok(schema_doc.map(|doc| (doc.id(), doc.into_value().schema))) } pub async fn submit_pending( @@ -512,8 +486,3 @@ impl<'a, RT: Runtime> SchemaModel<'a, RT> { Ok(()) } } - -#[minitrace::trace] -fn parse_schema_traced(value: ConvexObject) -> anyhow::Result { - SchemaMetadata::try_from(value) -} diff --git a/crates/database/src/database.rs b/crates/database/src/database.rs index f062c414..d436e3bc 100644 --- a/crates/database/src/database.rs +++ b/crates/database/src/database.rs @@ -35,6 +35,7 @@ use common::{ TabletIndexMetadata, INDEX_TABLE, }, + schema::SchemaMetadata, tables::{ TableMetadata, TableState, @@ -167,6 +168,7 @@ use crate::{ verify_invariants_timer, }, retention::LeaderRetentionManager, + schema_registry::SchemaRegistry, search_index_bootstrap::SearchIndexBootstrapWorker, snapshot_manager::{ Snapshot, @@ -199,6 +201,7 @@ use crate::{ FollowerRetentionManager, TableIterator, Transaction, + SCHEMAS_TABLE, }; /// Controls the number of read set backtraces to show when debugging @@ -582,17 +585,34 @@ impl DatabaseSnapshot { tracing::info!("Bootstrapping table metadata..."); let table_registry = Self::load_table_registry( &persistence_snapshot, - table_mapping, + table_mapping.clone(), table_states, &index_registry, ) .await?; + let mut schema_docs = BTreeMap::new(); + for namespace in table_mapping.namespaces_for_name(&SCHEMAS_TABLE) { + let schema_tablet = + table_mapping.namespace(namespace).name_to_tablet()(SCHEMAS_TABLE.clone())?; + let by_id = index_registry.must_get_by_id(schema_tablet)?.id; + let schema_documents = Self::load_table_documents::( + &persistence_snapshot, + by_id, + schema_tablet, + ) + .await?; + schema_docs.insert(namespace, schema_documents); + } + + let schema_registry = SchemaRegistry::bootstrap(schema_docs); + Ok(Self { ts: persistence_snapshot.timestamp(), bootstrap_metadata, snapshot: Snapshot { table_registry, + schema_registry, table_summaries, index_registry, in_memory_indexes, @@ -1420,6 +1440,7 @@ impl Database { creation_time, transaction_index, snapshot.table_registry, + snapshot.schema_registry, count_snapshot, self.runtime.clone(), usage_tracker, diff --git a/crates/database/src/lib.rs b/crates/database/src/lib.rs index 4d1620c9..ae324108 100644 --- a/crates/database/src/lib.rs +++ b/crates/database/src/lib.rs @@ -47,6 +47,7 @@ mod write_limits; mod write_log; mod writes; +mod schema_registry; mod table_iteration; #[cfg(any(test, feature = "testing"))] pub mod test_helpers; @@ -67,6 +68,7 @@ pub use reads::{ TransactionReadSize, OVER_LIMIT_HELP, }; +pub use schema_registry::SchemaRegistry; pub use table_registry::TableRegistry; pub use token::{ SerializedToken, diff --git a/crates/database/src/schema_registry.rs b/crates/database/src/schema_registry.rs new file mode 100644 index 00000000..ad9db158 --- /dev/null +++ b/crates/database/src/schema_registry.rs @@ -0,0 +1,171 @@ +use std::collections::BTreeMap; + +use common::{ + bootstrap_model::{ + index::database_index::IndexedFields, + schema::{ + SchemaMetadata, + SchemaState, + }, + }, + document::{ + ParsedDocument, + ResolvedDocument, + }, + query::{ + IndexRange, + IndexRangeExpression, + Order, + }, + types::TabletIndexName, + value::ResolvedDocumentId, +}; +use imbl::OrdMap; +use value::{ + val, + TableMapping, + TableNamespace, + TabletId, +}; + +use crate::{ + TransactionReadSet, + SCHEMAS_STATE_INDEX, + SCHEMAS_TABLE, + SCHEMA_STATE_FIELD, +}; + +/// This structure is an index over the `_schemas` tables. +#[derive(Debug, Clone, PartialEq)] +pub struct SchemaRegistry { + // Stores schemas where state.is_unique() is true. + namespaced: OrdMap>>, +} + +impl SchemaRegistry { + pub fn bootstrap( + schema_docs: BTreeMap>>, + ) -> Self { + let namespaced = schema_docs + .into_iter() + .map(|(namespace, docs)| { + let schemas: OrdMap<_, _> = docs + .into_iter() + .filter(|schema| schema.state.is_unique()) + .map(|schema| (schema.state.clone(), schema)) + .collect(); + (namespace, schemas) + }) + .collect(); + Self { namespaced } + } + + pub(crate) fn update( + &mut self, + table_mapping: &TableMapping, + id: ResolvedDocumentId, + old_doc: Option<&ResolvedDocument>, + new_doc: Option<&ResolvedDocument>, + ) -> anyhow::Result<()> { + self.begin_update(table_mapping, id, old_doc, new_doc)? + .apply(); + Ok(()) + } + + pub(crate) fn begin_update<'a>( + &'a mut self, + table_mapping: &TableMapping, + id: ResolvedDocumentId, + old_doc: Option<&ResolvedDocument>, + new_doc: Option<&ResolvedDocument>, + ) -> anyhow::Result> { + let mut schema_update = None; + let namespace = table_mapping.tablet_namespace(id.tablet_id)?; + if table_mapping + .namespace(namespace) + .tablet_matches_name(id.tablet_id, &SCHEMAS_TABLE) + { + let old_schema = match old_doc { + None => None, + Some(old_doc) => Some(ParsedDocument::try_from(old_doc.clone())?), + }; + let new_schema = match new_doc { + None => None, + Some(new_doc) => Some(ParsedDocument::try_from(new_doc.clone())?), + }; + schema_update = Some(SchemaUpdate { + namespace, + old_schema, + new_schema, + }); + } + Ok(Update { + registry: self, + update: schema_update, + }) + } + + pub fn get_by_state( + &self, + namespace: TableNamespace, + state: SchemaState, + schema_tablet: TabletId, + reads: &mut TransactionReadSet, + ) -> anyhow::Result>> { + // Reading from the schema_registry, so take read dependency directly. + let state_value = val!(state.clone()); + let index_range = IndexRange { + index_name: SCHEMAS_STATE_INDEX.clone(), + range: vec![IndexRangeExpression::Eq( + SCHEMA_STATE_FIELD.clone(), + state_value.into(), + )], + order: Order::Asc, + }; + let fields = IndexedFields::try_from(vec![SCHEMA_STATE_FIELD.clone()])?; + let interval = index_range.compile(fields.clone())?; + reads.record_indexed_derived(TabletIndexName::by_id(schema_tablet), fields, interval); + + let schema = self + .namespaced + .get(&namespace) + .and_then(|registry| registry.get(&state)) + .cloned(); + Ok(schema) + } +} + +pub(crate) struct SchemaUpdate { + pub namespace: TableNamespace, + pub old_schema: Option>, + pub new_schema: Option>, +} + +pub(crate) struct Update<'a> { + registry: &'a mut SchemaRegistry, + update: Option, +} + +impl<'a> Update<'a> { + pub(crate) fn apply(self) { + if let Some(update) = self.update { + let namespaced_registry = self + .registry + .namespaced + .entry(update.namespace) + .or_default(); + if let Some(old_schema) = update.old_schema { + if let Some(cached) = namespaced_registry.get(&old_schema.state) + && cached == &old_schema + { + namespaced_registry.remove(&old_schema.state); + } + } + if let Some(new_schema) = update.new_schema { + if new_schema.state.is_unique() { + namespaced_registry.insert(new_schema.state.clone(), new_schema); + } + } + } + } +} diff --git a/crates/database/src/snapshot_manager.rs b/crates/database/src/snapshot_manager.rs index 856c3d3b..90260214 100644 --- a/crates/database/src/snapshot_manager.rs +++ b/crates/database/src/snapshot_manager.rs @@ -43,6 +43,7 @@ use vector::{ }; use crate::{ + schema_registry::SchemaRegistry, table_registry::{ TableUpdate, TableUpdateMode, @@ -182,6 +183,7 @@ impl TableSummaries { #[derive(Clone)] pub struct Snapshot { pub table_registry: TableRegistry, + pub schema_registry: SchemaRegistry, pub table_summaries: TableSummaries, pub index_registry: IndexRegistry, pub in_memory_indexes: BackendInMemoryIndexes, @@ -207,6 +209,12 @@ impl Snapshot { insertion.map(|d| &d.value().0), ) .context("Table registry update failed")?; + self.schema_registry.update( + self.table_registry.table_mapping(), + document_id, + removal, + insertion, + )?; self.table_summaries .update( document_id, diff --git a/crates/database/src/transaction.rs b/crates/database/src/transaction.rs index 35ffeef2..21938e21 100644 --- a/crates/database/src/transaction.rs +++ b/crates/database/src/transaction.rs @@ -19,6 +19,10 @@ use common::{ IndexMetadata, INDEX_TABLE, }, + schema::{ + SchemaMetadata, + SchemaState, + }, tables::{ TableMetadata, TABLES_TABLE, @@ -27,6 +31,7 @@ use common::{ document::{ CreationTime, DocumentUpdate, + ParsedDocument, ResolvedDocument, }, identity::InertIdentity, @@ -108,6 +113,7 @@ use crate::{ TableFilter, }, reads::TransactionReadSet, + schema_registry::SchemaRegistry, snapshot_manager::{ Snapshot, SnapshotManager, @@ -128,6 +134,7 @@ use crate::{ SystemMetadataModel, TableModel, TableRegistry, + SCHEMAS_TABLE, }; /// Safe default number of items to return for each list or filter operation @@ -149,6 +156,7 @@ pub struct Transaction { pub(crate) index: NestedWrites, pub(crate) metadata: NestedWrites, + pub(crate) schema_registry: NestedWrites, pub(crate) count_snapshot: Arc, /// The change in the number of documents in table that have had writes in /// this transaction. If there is no entry for a table, assume deltas @@ -182,7 +190,12 @@ pub trait TableCountSnapshot: Send + Sync + 'static { async fn count(&self, table: TabletId) -> anyhow::Result; } -pub type SubtransactionToken = [NestedWriteToken; 3]; +pub struct SubtransactionToken { + writes: NestedWriteToken, + index: NestedWriteToken, + tables: NestedWriteToken, + schema_registry: NestedWriteToken, +} impl Transaction { pub fn new( @@ -191,6 +204,7 @@ impl Transaction { creation_time: CreationTime, index: TransactionIndex, metadata: TableRegistry, + schema_registry: SchemaRegistry, count: Arc, runtime: RT, usage_tracker: FunctionUsageTracker, @@ -206,6 +220,7 @@ impl Transaction { scheduled_size: TransactionWriteSize::default(), index: NestedWrites::new(index), metadata: NestedWrites::new(metadata), + schema_registry: NestedWrites::new(schema_registry), count_snapshot: count, table_count_deltas: BTreeMap::new(), stats: BTreeMap::new(), @@ -306,24 +321,28 @@ impl Transaction { } pub fn begin_subtransaction(&mut self) -> SubtransactionToken { - [ - self.writes.begin_nested(), - self.index.begin_nested(), - self.metadata.begin_nested(), - ] + SubtransactionToken { + writes: self.writes.begin_nested(), + index: self.index.begin_nested(), + tables: self.metadata.begin_nested(), + schema_registry: self.schema_registry.begin_nested(), + } } pub fn commit_subtransaction(&mut self, tokens: SubtransactionToken) -> anyhow::Result<()> { - self.writes.commit_nested(tokens[0])?; - self.index.commit_nested(tokens[1])?; - self.metadata.commit_nested(tokens[2])?; + self.writes.commit_nested(tokens.writes)?; + self.index.commit_nested(tokens.index)?; + self.metadata.commit_nested(tokens.tables)?; + self.schema_registry.commit_nested(tokens.schema_registry)?; Ok(()) } pub fn rollback_subtransaction(&mut self, tokens: SubtransactionToken) -> anyhow::Result<()> { - self.writes.rollback_nested(tokens[0])?; - self.index.rollback_nested(tokens[1])?; - self.metadata.rollback_nested(tokens[2])?; + self.writes.rollback_nested(tokens.writes)?; + self.index.rollback_nested(tokens.index)?; + self.metadata.rollback_nested(tokens.tables)?; + self.schema_registry + .rollback_nested(tokens.schema_registry)?; Ok(()) } @@ -331,6 +350,7 @@ impl Transaction { self.writes.require_not_nested()?; self.index.require_not_nested()?; self.metadata.require_not_nested()?; + self.schema_registry.require_not_nested()?; Ok(()) } @@ -636,6 +656,22 @@ impl Transaction { .record_indexed_derived(tables_by_id, IndexedFields::by_id(), Interval::all()); } + /// Reads the schema from the cache, and records a read dependency. + /// Used by SchemaModel. + pub(crate) fn get_schema_by_state( + &mut self, + namespace: TableNamespace, + state: SchemaState, + ) -> anyhow::Result>> { + let schema_tablet = self + .table_mapping() + .namespace(namespace) + .id(&SCHEMAS_TABLE)? + .tablet_id; + self.schema_registry + .get_by_state(namespace, state, schema_tablet, &mut self.reads) + } + // XXX move to table model? #[cfg(any(test, feature = "testing"))] pub async fn create_system_table_testing( @@ -825,6 +861,12 @@ impl Transaction { let index_update = self .index .begin_update(old_document.clone(), new_document.clone())?; + let schema_update = self.schema_registry.begin_update( + self.metadata.table_mapping(), + id, + old_document.as_ref(), + new_document.as_ref(), + )?; let metadata_update = self.metadata.begin_update( index_update.registry(), id, @@ -869,6 +911,7 @@ impl Transaction { index_update.apply(); metadata_update.apply(); + schema_update.apply(); *self.table_count_deltas.entry(id.tablet_id).or_default() += delta; Ok(()) diff --git a/crates/database/src/writes.rs b/crates/database/src/writes.rs index 1fcdf5aa..8093edd5 100644 --- a/crates/database/src/writes.rs +++ b/crates/database/src/writes.rs @@ -46,6 +46,7 @@ use value::{ use crate::{ bootstrap_model::defaults::BootstrapTableIds, reads::TransactionReadSet, + schema_registry::SchemaRegistry, TableRegistry, }; @@ -60,6 +61,7 @@ pub trait PendingWrites: Clone {} impl PendingWrites for Writes {} impl PendingWrites for TableRegistry {} +impl PendingWrites for SchemaRegistry {} pub type NestedWriteToken = u32; diff --git a/crates/function_runner/src/in_memory_indexes.rs b/crates/function_runner/src/in_memory_indexes.rs index 2f4cb2d2..4ae6480b 100644 --- a/crates/function_runner/src/in_memory_indexes.rs +++ b/crates/function_runner/src/in_memory_indexes.rs @@ -43,12 +43,14 @@ use common::{ use database::{ BootstrapMetadata, DatabaseSnapshot, + SchemaRegistry, TableCountSnapshot, TableRegistry, Transaction, TransactionIdGenerator, TransactionIndex, TransactionTextSnapshot, + SCHEMAS_TABLE, }; use futures::{ FutureExt, @@ -91,6 +93,7 @@ fn make_transaction( existing_writes: FunctionWrites, rt: RT, table_registry: TableRegistry, + schema_registry: SchemaRegistry, index_registry: IndexRegistry, table_count_snapshot: Arc, database_index_snapshot: DatabaseIndexSnapshot, @@ -112,6 +115,7 @@ fn make_transaction( creation_time, transaction_index, table_registry, + schema_registry, table_count_snapshot, rt.clone(), usage_tracker, @@ -245,7 +249,7 @@ impl InMemoryIndexCache { Ok(index_map.0.into_iter().map(|(_k, (_ts, v))| v.unpack())) } - async fn load_table_and_index_registry( + async fn load_registries( &self, persistence_snapshot: PersistenceSnapshot, instance_name: String, @@ -256,7 +260,12 @@ impl InMemoryIndexCache { tables_tablet_id, index_tablet_id, }: BootstrapMetadata, - ) -> anyhow::Result<(TableRegistry, IndexRegistry, DatabaseIndexSnapshot)> { + ) -> anyhow::Result<( + TableRegistry, + SchemaRegistry, + IndexRegistry, + DatabaseIndexSnapshot, + )> { let index_documents_fut = self.must_get_or_load_unpacked( instance_name.clone(), index_by_id, @@ -290,6 +299,27 @@ impl InMemoryIndexCache { persistence_snapshot.persistence().version(), )?; DatabaseSnapshot::::verify_invariants(&table_registry, &index_registry)?; + let mut schema_docs = BTreeMap::new(); + for namespace in table_mapping.namespaces_for_name(&SCHEMAS_TABLE) { + let schema_tablet = + table_mapping.namespace(namespace).name_to_tablet()(SCHEMAS_TABLE.clone())?; + let index_id = index_registry.must_get_by_id(schema_tablet)?.id; + let schema_doc_iter = self + .must_get_or_load_unpacked( + instance_name.clone(), + index_id, + &in_memory_index_last_modified, + persistence_snapshot.clone(), + schema_tablet, + SCHEMAS_TABLE.clone(), + ) + .await?; + schema_docs.insert( + namespace, + schema_doc_iter.map(TryFrom::try_from).try_collect()?, + ); + } + let schema_registry = SchemaRegistry::bootstrap(schema_docs); let in_memory_indexes = FunctionRunnerInMemoryIndexes { cache: self.clone(), instance_name: instance_name.clone(), @@ -302,7 +332,12 @@ impl InMemoryIndexCache { table_mapping, persistence_snapshot, ); - Ok((table_registry, index_registry, database_index_snapshot)) + Ok(( + table_registry, + schema_registry, + index_registry, + database_index_snapshot, + )) } /// Loads table and index registry from cache or persistence snapshot. @@ -335,8 +370,8 @@ impl InMemoryIndexCache { let persistence_snapshot = repeatable_persistence.read_snapshot(repeatable_persistence.upper_bound())?; - let (table_registry, index_registry, database_index_snapshot) = self - .load_table_and_index_registry( + let (table_registry, schema_registry, index_registry, database_index_snapshot) = self + .load_registries( persistence_snapshot, instance_name, in_memory_index_last_modified, @@ -349,6 +384,7 @@ impl InMemoryIndexCache { existing_writes, self.rt.clone(), table_registry, + schema_registry, index_registry, table_count_snapshot, database_index_snapshot,