Skip to content

Commit

Permalink
make SchemaRegistry (#29075)
Browse files Browse the repository at this point in the history
goal: keep parsed schemas in memory so we don't have to re-parse them on every `db.insert`

i copied the pattern of TableRegistry which is pretty similar. things to note:

1. there's a two-phase-commit-like thing whenever we do a write. this `begin_update`/`apply`/`update` pattern was copied directly from TableRegistry
2. we only store the Active/Pending/Validated schema, and we only allow lookups by status. we could later add lookups by ID but that doesn't seem necessary yet, since it's only looked up by ID when the schema is changing.
3. need to track the readset directly, so i pushed that down into `Transaction::get_schema_by_state`. the goal is to make it hard to read a schema without taking the correct read dependency. we could pass a `&mut TransactionReadSet` into the `SchemaRegistry::get_by_state` method, which would be even more safe but doesn't match the existing patterns.
4. the function runner wants to load registries from memory, but this is a little tricky: for schemas in components created after the in memory indexes were bootstrapped, the `_schemas` table isn't in memory so we failed to load registries. to fix this, i added a call to `self.database.load_indexes_into_memory` and tests started passing again

GitOrigin-RevId: 420efb6ed0898d3aaf1d241b655b85e84497defa
  • Loading branch information
ldanilek authored and Convex, Inc. committed Aug 21, 2024
1 parent 4371ebd commit 009322a
Show file tree
Hide file tree
Showing 10 changed files with 326 additions and 58 deletions.
9 changes: 8 additions & 1 deletion crates/application/src/deploy_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,14 @@ use common::{
NodeDependency,
},
};
use database::WriteSource;
use database::{
WriteSource,
SCHEMAS_TABLE,
};
use errors::ErrorMetadata;
use isolate::EvaluateAppDefinitionsResult;
use keybroker::Identity;
use maplit::btreeset;
use model::{
auth::types::AuthDiff,
components::{
Expand Down Expand Up @@ -159,6 +163,9 @@ impl<RT: Runtime> Application<RT> {
.await?;
if !dry_run {
self.commit(tx, WriteSource::new("start_push")).await?;
self.database
.load_indexes_into_memory(btreeset! { SCHEMAS_TABLE.clone() })
.await?;
}
schema_change
};
Expand Down
11 changes: 10 additions & 1 deletion crates/common/src/bootstrap_model/schema_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use value::codegen_convex_serialization;
/// time.
///
/// 2. At most one schema can be in the `Active` state at a time.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
#[cfg_attr(any(test, feature = "testing"), derive(proptest_derive::Arbitrary))]
pub enum SchemaState {
Pending,
Expand All @@ -38,6 +38,15 @@ pub enum SchemaState {
Overwritten,
}

impl SchemaState {
/// Indicates a schema should be cached because it can be used for writes,
/// and it can be cached by state because at most one schema can exist in
/// the state.
pub fn is_unique(&self) -> bool {
matches!(self, Self::Pending | Self::Validated | Self::Active)
}
}

#[derive(Serialize, Deserialize)]
#[serde(tag = "state", rename_all = "camelCase")]
pub enum SerializedSchemaState {
Expand Down
45 changes: 7 additions & 38 deletions crates/database/src/bootstrap_model/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ use common::{
ResolvedDocument,
},
query::{
IndexRange,
IndexRangeExpression,
Order,
Query,
},
Expand All @@ -33,8 +31,6 @@ use common::{
};
use errors::ErrorMetadata;
use value::{
val,
ConvexObject,
FieldPath,
NamespacedTableMapping,
ResolvedDocumentId,
Expand Down Expand Up @@ -216,35 +212,13 @@ impl<'a, RT: Runtime> SchemaModel<'a, RT> {
&mut self,
state: SchemaState,
) -> anyhow::Result<Option<(ResolvedDocumentId, DatabaseSchema)>> {
match state {
SchemaState::Pending | SchemaState::Validated | SchemaState::Active => {},
SchemaState::Failed { .. } | SchemaState::Overwritten => anyhow::bail!(
"Getting schema by state is only permitted for Pending, Validated, or Active \
states, since Failed or Overwritten states may have multiple documents."
),
}
let state_value = val!(state);
let index_range = IndexRange {
index_name: SCHEMAS_STATE_INDEX.clone(),
range: vec![IndexRangeExpression::Eq(
SCHEMA_STATE_FIELD.clone(),
state_value.into(),
)],
order: Order::Asc,
};
let query = Query::index_range(index_range);
let mut query_stream = ResolvedQuery::new(self.tx, self.namespace, query)?;
let schema = query_stream
.expect_at_most_one(self.tx)
.await?
.map(|doc| {
Ok::<(ResolvedDocumentId, DatabaseSchema), anyhow::Error>((
doc.id().to_owned(),
parse_schema_traced(doc.into_value().into_value())?.schema,
))
})
.transpose()?;
Ok(schema)
anyhow::ensure!(
state.is_unique(),
"Getting schema by state is only permitted for Pending, Validated, or Active states, \
since Failed or Overwritten states may have multiple documents."
);
let schema_doc = self.tx.get_schema_by_state(self.namespace, state)?;
Ok(schema_doc.map(|doc| (doc.id(), doc.into_value().schema)))
}

pub async fn submit_pending(
Expand Down Expand Up @@ -512,8 +486,3 @@ impl<'a, RT: Runtime> SchemaModel<'a, RT> {
Ok(())
}
}

#[minitrace::trace]
fn parse_schema_traced(value: ConvexObject) -> anyhow::Result<SchemaMetadata> {
SchemaMetadata::try_from(value)
}
23 changes: 22 additions & 1 deletion crates/database/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use common::{
TabletIndexMetadata,
INDEX_TABLE,
},
schema::SchemaMetadata,
tables::{
TableMetadata,
TableState,
Expand Down Expand Up @@ -167,6 +168,7 @@ use crate::{
verify_invariants_timer,
},
retention::LeaderRetentionManager,
schema_registry::SchemaRegistry,
search_index_bootstrap::SearchIndexBootstrapWorker,
snapshot_manager::{
Snapshot,
Expand Down Expand Up @@ -199,6 +201,7 @@ use crate::{
FollowerRetentionManager,
TableIterator,
Transaction,
SCHEMAS_TABLE,
};

/// Controls the number of read set backtraces to show when debugging
Expand Down Expand Up @@ -582,17 +585,34 @@ impl<RT: Runtime> DatabaseSnapshot<RT> {
tracing::info!("Bootstrapping table metadata...");
let table_registry = Self::load_table_registry(
&persistence_snapshot,
table_mapping,
table_mapping.clone(),
table_states,
&index_registry,
)
.await?;

let mut schema_docs = BTreeMap::new();
for namespace in table_mapping.namespaces_for_name(&SCHEMAS_TABLE) {
let schema_tablet =
table_mapping.namespace(namespace).name_to_tablet()(SCHEMAS_TABLE.clone())?;
let by_id = index_registry.must_get_by_id(schema_tablet)?.id;
let schema_documents = Self::load_table_documents::<SchemaMetadata>(
&persistence_snapshot,
by_id,
schema_tablet,
)
.await?;
schema_docs.insert(namespace, schema_documents);
}

let schema_registry = SchemaRegistry::bootstrap(schema_docs);

Ok(Self {
ts: persistence_snapshot.timestamp(),
bootstrap_metadata,
snapshot: Snapshot {
table_registry,
schema_registry,
table_summaries,
index_registry,
in_memory_indexes,
Expand Down Expand Up @@ -1420,6 +1440,7 @@ impl<RT: Runtime> Database<RT> {
creation_time,
transaction_index,
snapshot.table_registry,
snapshot.schema_registry,
count_snapshot,
self.runtime.clone(),
usage_tracker,
Expand Down
2 changes: 2 additions & 0 deletions crates/database/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ mod write_limits;
mod write_log;
mod writes;

mod schema_registry;
mod table_iteration;
#[cfg(any(test, feature = "testing"))]
pub mod test_helpers;
Expand All @@ -67,6 +68,7 @@ pub use reads::{
TransactionReadSize,
OVER_LIMIT_HELP,
};
pub use schema_registry::SchemaRegistry;
pub use table_registry::TableRegistry;
pub use token::{
SerializedToken,
Expand Down
171 changes: 171 additions & 0 deletions crates/database/src/schema_registry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
use std::collections::BTreeMap;

use common::{
bootstrap_model::{
index::database_index::IndexedFields,
schema::{
SchemaMetadata,
SchemaState,
},
},
document::{
ParsedDocument,
ResolvedDocument,
},
query::{
IndexRange,
IndexRangeExpression,
Order,
},
types::TabletIndexName,
value::ResolvedDocumentId,
};
use imbl::OrdMap;
use value::{
val,
TableMapping,
TableNamespace,
TabletId,
};

use crate::{
TransactionReadSet,
SCHEMAS_STATE_INDEX,
SCHEMAS_TABLE,
SCHEMA_STATE_FIELD,
};

/// This structure is an index over the `_schemas` tables.
#[derive(Debug, Clone, PartialEq)]
pub struct SchemaRegistry {
// Stores schemas where state.is_unique() is true.
namespaced: OrdMap<TableNamespace, OrdMap<SchemaState, ParsedDocument<SchemaMetadata>>>,
}

impl SchemaRegistry {
pub fn bootstrap(
schema_docs: BTreeMap<TableNamespace, Vec<ParsedDocument<SchemaMetadata>>>,
) -> Self {
let namespaced = schema_docs
.into_iter()
.map(|(namespace, docs)| {
let schemas: OrdMap<_, _> = docs
.into_iter()
.filter(|schema| schema.state.is_unique())
.map(|schema| (schema.state.clone(), schema))
.collect();
(namespace, schemas)
})
.collect();
Self { namespaced }
}

pub(crate) fn update(
&mut self,
table_mapping: &TableMapping,
id: ResolvedDocumentId,
old_doc: Option<&ResolvedDocument>,
new_doc: Option<&ResolvedDocument>,
) -> anyhow::Result<()> {
self.begin_update(table_mapping, id, old_doc, new_doc)?
.apply();
Ok(())
}

pub(crate) fn begin_update<'a>(
&'a mut self,
table_mapping: &TableMapping,
id: ResolvedDocumentId,
old_doc: Option<&ResolvedDocument>,
new_doc: Option<&ResolvedDocument>,
) -> anyhow::Result<Update<'a>> {
let mut schema_update = None;
let namespace = table_mapping.tablet_namespace(id.tablet_id)?;
if table_mapping
.namespace(namespace)
.tablet_matches_name(id.tablet_id, &SCHEMAS_TABLE)
{
let old_schema = match old_doc {
None => None,
Some(old_doc) => Some(ParsedDocument::try_from(old_doc.clone())?),
};
let new_schema = match new_doc {
None => None,
Some(new_doc) => Some(ParsedDocument::try_from(new_doc.clone())?),
};
schema_update = Some(SchemaUpdate {
namespace,
old_schema,
new_schema,
});
}
Ok(Update {
registry: self,
update: schema_update,
})
}

pub fn get_by_state(
&self,
namespace: TableNamespace,
state: SchemaState,
schema_tablet: TabletId,
reads: &mut TransactionReadSet,
) -> anyhow::Result<Option<ParsedDocument<SchemaMetadata>>> {
// Reading from the schema_registry, so take read dependency directly.
let state_value = val!(state.clone());
let index_range = IndexRange {
index_name: SCHEMAS_STATE_INDEX.clone(),
range: vec![IndexRangeExpression::Eq(
SCHEMA_STATE_FIELD.clone(),
state_value.into(),
)],
order: Order::Asc,
};
let fields = IndexedFields::try_from(vec![SCHEMA_STATE_FIELD.clone()])?;
let interval = index_range.compile(fields.clone())?;
reads.record_indexed_derived(TabletIndexName::by_id(schema_tablet), fields, interval);

let schema = self
.namespaced
.get(&namespace)
.and_then(|registry| registry.get(&state))
.cloned();
Ok(schema)
}
}

pub(crate) struct SchemaUpdate {
pub namespace: TableNamespace,
pub old_schema: Option<ParsedDocument<SchemaMetadata>>,
pub new_schema: Option<ParsedDocument<SchemaMetadata>>,
}

pub(crate) struct Update<'a> {
registry: &'a mut SchemaRegistry,
update: Option<SchemaUpdate>,
}

impl<'a> Update<'a> {
pub(crate) fn apply(self) {
if let Some(update) = self.update {
let namespaced_registry = self
.registry
.namespaced
.entry(update.namespace)
.or_default();
if let Some(old_schema) = update.old_schema {
if let Some(cached) = namespaced_registry.get(&old_schema.state)
&& cached == &old_schema
{
namespaced_registry.remove(&old_schema.state);
}
}
if let Some(new_schema) = update.new_schema {
if new_schema.state.is_unique() {
namespaced_registry.insert(new_schema.state.clone(), new_schema);
}
}
}
}
}
Loading

0 comments on commit 009322a

Please sign in to comment.