diff --git a/src/adapter/src/catalog.rs b/src/adapter/src/catalog.rs index e847de39a6065..ec020d53a22dd 100644 --- a/src/adapter/src/catalog.rs +++ b/src/adapter/src/catalog.rs @@ -3624,7 +3624,7 @@ impl Catalog { if !storage.is_read_only() { // IMPORTANT: we durably record the new timestamp before using it. storage - .persist_timestamp(&Timeline::EpochMilliseconds, boot_ts) + .set_timestamp(&Timeline::EpochMilliseconds, boot_ts) .await?; } @@ -5336,7 +5336,7 @@ impl Catalog { ) -> Result<(), Error> { self.storage() .await - .persist_timestamp(timeline, timestamp) + .set_timestamp(timeline, timestamp) .await .err_into() } diff --git a/src/catalog/src/lib.rs b/src/catalog/src/lib.rs index 23863618a12d5..684788ed42580 100644 --- a/src/catalog/src/lib.rs +++ b/src/catalog/src/lib.rs @@ -183,7 +183,7 @@ pub struct BootstrapArgs { #[async_trait] pub trait ReadOnlyDurableCatalogState: Debug + Send { /// Reports if the catalog state has been initialized. - async fn is_initialized(&self) -> Result; + async fn is_initialized(&mut self) -> Result; // TODO(jkosh44) add and implement open methods to be implementation agnostic. /* /// Checks to see if opening the catalog would be @@ -289,8 +289,9 @@ pub trait ReadOnlyDurableCatalogState: Debug + Send { self.get_next_id(USER_REPLICA_ID_ALLOC_KEY).await } - /// Dumps the entire catalog contents in human readable JSON. - async fn dump(&self) -> Result; + // TODO(jkosh44) Implement this for the catalog debug tool. + /* /// Dumps the entire catalog contents in human readable JSON. + async fn dump(&self) -> Result;*/ } /// A read-write API for the durable catalog state. @@ -351,7 +352,7 @@ pub trait DurableCatalogState: ReadOnlyDurableCatalogState { ) -> Result<(), Error>; /// Persist new global timestamp for a timeline. - async fn persist_timestamp( + async fn set_timestamp( &mut self, timeline: &Timeline, timestamp: mz_repr::Timestamp, diff --git a/src/catalog/src/stash.rs b/src/catalog/src/stash.rs index 7a70aee9c07dc..a4088b91dbd5f 100644 --- a/src/catalog/src/stash.rs +++ b/src/catalog/src/stash.rs @@ -20,16 +20,15 @@ use itertools::Itertools; use mz_audit_log::{VersionedEvent, VersionedStorageUsage}; use mz_controller_types::{ClusterId, ReplicaId}; -use mz_ore::collections::CollectionExt; use mz_ore::now::NowFn; +use mz_ore::result::ResultExt; use mz_ore::retry::Retry; use mz_proto::{ProtoType, RustType}; use mz_repr::adt::mz_acl_item::MzAclItem; use mz_repr::role_id::RoleId; use mz_repr::{GlobalId, Timestamp}; use mz_sql::catalog::{ - CatalogError as SqlCatalogError, CatalogItemType, DefaultPrivilegeAclItem, - DefaultPrivilegeObject, + CatalogError as SqlCatalogError, DefaultPrivilegeAclItem, DefaultPrivilegeObject, }; use mz_sql::names::CommentObjectId; use mz_stash::objects::proto; @@ -54,10 +53,9 @@ use crate::{ AUDIT_LOG_COLLECTION, CLUSTER_COLLECTION, CLUSTER_INTROSPECTION_SOURCE_INDEX_COLLECTION, CLUSTER_REPLICA_COLLECTION, COMMENTS_COLLECTION, CONFIG_COLLECTION, DATABASES_COLLECTION, DEFAULT_PRIVILEGES_COLLECTION, ID_ALLOCATOR_COLLECTION, ITEM_COLLECTION, ROLES_COLLECTION, - SCHEMAS_COLLECTION, SETTING_COLLECTION, STORAGE_USAGE_COLLECTION, SYSTEM_CLUSTER_ID_ALLOC_KEY, + SCHEMAS_COLLECTION, SETTING_COLLECTION, STORAGE_USAGE_COLLECTION, SYSTEM_CONFIGURATION_COLLECTION, SYSTEM_GID_MAPPING_COLLECTION, SYSTEM_PRIVILEGES_COLLECTION, - SYSTEM_REPLICA_ID_ALLOC_KEY, TIMESTAMP_COLLECTION, USER_CLUSTER_ID_ALLOC_KEY, - USER_REPLICA_ID_ALLOC_KEY, + TIMESTAMP_COLLECTION, }; /// A [`Connection`] represent an open connection to the stash. It exposes optimized methods for @@ -149,7 +147,7 @@ impl Connection { if !conn.stash.is_readonly() { if let Some(deploy_generation) = deploy_generation { - match conn.persist_deploy_generation(deploy_generation).await { + match conn.set_deploy_generation(deploy_generation).await { Ok(()) => {} Err(e) => { return Err((conn.stash, e)); @@ -193,14 +191,6 @@ impl Connection { Ok(conn) } - async fn set_connect_timeout(&mut self, connect_timeout: Duration) { - self.stash.set_connect_timeout(connect_timeout).await; - } - - fn is_read_only(&self) -> bool { - self.stash.is_readonly() - } - async fn get_setting(&mut self, key: &str) -> Result, Error> { let v = SETTING_COLLECTION .peek_key_one( @@ -225,70 +215,29 @@ impl Connection { .await .map_err(|e| e.into()) } +} - async fn get_catalog_content_version(&mut self) -> Result, Error> { - self.get_setting("catalog_content_version").await - } - - async fn set_catalog_content_version(&mut self, new_version: &str) -> Result<(), Error> { - self.set_setting("catalog_content_version", new_version) - .await +#[async_trait] +impl ReadOnlyDurableCatalogState for Connection { + #[tracing::instrument(level = "debug", skip(self))] + async fn is_initialized(&mut self) -> Result { + self.stash.is_initialized().await.err_into() } - #[tracing::instrument(level = "info", skip_all)] - async fn load_databases(&mut self) -> Result, Error> { - let entries = DATABASES_COLLECTION.peek_one(&mut self.stash).await?; - let databases = entries - .into_iter() - .map(RustType::from_proto) - .map_ok(|(k, v): (DatabaseKey, DatabaseValue)| Database { - id: k.id, - name: v.name, - owner_id: v.owner_id, - privileges: v.privileges, - }) - .collect::>()?; - - Ok(databases) + fn is_read_only(&self) -> bool { + self.stash.is_readonly() } - #[tracing::instrument(level = "info", skip_all)] - async fn load_schemas(&mut self) -> Result, Error> { - let entries = SCHEMAS_COLLECTION.peek_one(&mut self.stash).await?; - let schemas = entries - .into_iter() - .map(RustType::from_proto) - .map_ok(|(k, v): (SchemaKey, SchemaValue)| Schema { - id: k.id, - name: v.name, - database_id: v.database_id, - owner_id: v.owner_id, - privileges: v.privileges, - }) - .collect::>()?; - - Ok(schemas) + fn epoch(&mut self) -> Option { + self.stash.epoch() } - #[tracing::instrument(level = "info", skip_all)] - async fn load_roles(&mut self) -> Result, Error> { - let entries = ROLES_COLLECTION.peek_one(&mut self.stash).await?; - let roles = entries - .into_iter() - .map(RustType::from_proto) - .map_ok(|(k, v): (RoleKey, RoleValue)| Role { - id: k.id, - name: v.name, - attributes: v.attributes, - membership: v.membership, - }) - .collect::>()?; - - Ok(roles) + async fn get_catalog_content_version(&mut self) -> Result, Error> { + self.get_setting("catalog_content_version").await } #[tracing::instrument(level = "info", skip_all)] - async fn load_clusters(&mut self) -> Result, Error> { + async fn get_clusters(&mut self) -> Result, Error> { let entries = CLUSTER_COLLECTION.peek_one(&mut self.stash).await?; let clusters = entries .into_iter() @@ -307,7 +256,7 @@ impl Connection { } #[tracing::instrument(level = "info", skip_all)] - async fn load_cluster_replicas(&mut self) -> Result, Error> { + async fn get_cluster_replicas(&mut self) -> Result, Error> { let entries = CLUSTER_REPLICA_COLLECTION.peek_one(&mut self.stash).await?; let replicas = entries .into_iter() @@ -327,84 +276,63 @@ impl Connection { } #[tracing::instrument(level = "info", skip_all)] - async fn load_audit_log(&mut self) -> Result, Error> { - let entries = AUDIT_LOG_COLLECTION.peek_one(&mut self.stash).await?; - let logs: Vec<_> = entries - .into_keys() - .map(AuditLogKey::from_proto) - .map_ok(|e| e.event) + async fn get_databases(&mut self) -> Result, Error> { + let entries = DATABASES_COLLECTION.peek_one(&mut self.stash).await?; + let databases = entries + .into_iter() + .map(RustType::from_proto) + .map_ok(|(k, v): (DatabaseKey, DatabaseValue)| Database { + id: k.id, + name: v.name, + owner_id: v.owner_id, + privileges: v.privileges, + }) .collect::>()?; - Ok(logs.into_iter()) + Ok(databases) } - /// Loads storage usage events and permanently deletes from the stash those - /// that happened more than the retention period ago from boot_ts. #[tracing::instrument(level = "info", skip_all)] - async fn fetch_and_prune_storage_usage( - &mut self, - retention_period: Option, - boot_ts: mz_repr::Timestamp, - ) -> Result, Error> { - // If no usage retention period is set, set the cutoff to MIN so nothing - // is removed. - let cutoff_ts = match retention_period { - None => u128::MIN, - Some(period) => u128::from(boot_ts).saturating_sub(period.as_millis()), - }; - let is_read_only = self.is_read_only(); - Ok(self - .stash - .with_transaction(move |tx| { - Box::pin(async move { - let collection = STORAGE_USAGE_COLLECTION.from_tx(&tx).await?; - let rows = tx.peek_one(collection).await?; - let mut events = Vec::with_capacity(rows.len()); - let mut batch = collection.make_batch_tx(&tx).await?; - for ev in rows.into_keys() { - let event: StorageUsageKey = ev.clone().into_rust()?; - if u128::from(event.metric.timestamp()) >= cutoff_ts { - events.push(event.metric); - } else if retention_period.is_some() { - collection.append_to_batch(&mut batch, &ev, &(), -1); - } - } - // Delete things only if a retention period is - // specified (otherwise opening readonly catalogs - // can fail). - if retention_period.is_some() && !is_read_only { - tx.append(vec![batch]).await?; - } - Ok(events) - }) + async fn get_schemas(&mut self) -> Result, Error> { + let entries = SCHEMAS_COLLECTION.peek_one(&mut self.stash).await?; + let schemas = entries + .into_iter() + .map(RustType::from_proto) + .map_ok(|(k, v): (SchemaKey, SchemaValue)| Schema { + id: k.id, + name: v.name, + database_id: v.database_id, + owner_id: v.owner_id, + privileges: v.privileges, }) - .await?) + .collect::>()?; + + Ok(schemas) } - /// Load the persisted mapping of system object to global ID. Key is (schema-name, object-name). #[tracing::instrument(level = "info", skip_all)] - async fn load_system_gids( - &mut self, - ) -> Result, Error> { + async fn get_system_items(&mut self) -> Result, Error> { let entries = SYSTEM_GID_MAPPING_COLLECTION .peek_one(&mut self.stash) .await?; - let system_gid_mappings = entries + let system_item = entries .into_iter() .map(RustType::from_proto) - .map_ok(|(k, v): (GidMappingKey, GidMappingValue)| { - ( - (k.schema_name, k.object_type, k.object_name), - (GlobalId::System(v.id), v.fingerprint), - ) - }) + .map_ok( + |(k, v): (GidMappingKey, GidMappingValue)| SystemObjectMapping { + schema_name: k.schema_name, + object_type: k.object_type, + object_name: k.object_name, + id: GlobalId::System(v.id), + fingerprint: v.fingerprint, + }, + ) .collect::>()?; - - Ok(system_gid_mappings) + Ok(system_item) } #[tracing::instrument(level = "info", skip_all)] - async fn load_introspection_source_index_gids( + async fn get_introspection_source_indexes( &mut self, cluster_id: ClusterId, ) -> Result, Error> { @@ -431,9 +359,25 @@ impl Connection { Ok(sources) } - /// Load the persisted default privileges. #[tracing::instrument(level = "info", skip_all)] - async fn load_default_privileges( + async fn get_roles(&mut self) -> Result, Error> { + let entries = ROLES_COLLECTION.peek_one(&mut self.stash).await?; + let roles = entries + .into_iter() + .map(RustType::from_proto) + .map_ok(|(k, v): (RoleKey, RoleValue)| Role { + id: k.id, + name: v.name, + attributes: v.attributes, + membership: v.membership, + }) + .collect::>()?; + + Ok(roles) + } + + #[tracing::instrument(level = "info", skip_all)] + async fn get_default_privileges( &mut self, ) -> Result, Error> { Ok(DEFAULT_PRIVILEGES_COLLECTION @@ -455,9 +399,8 @@ impl Connection { .collect::>()?) } - /// Load the persisted system privileges. #[tracing::instrument(level = "info", skip_all)] - async fn load_system_privileges(&mut self) -> Result, Error> { + async fn get_system_privileges(&mut self) -> Result, Error> { Ok(SYSTEM_PRIVILEGES_COLLECTION .peek_one(&mut self.stash) .await? @@ -473,9 +416,8 @@ impl Connection { .collect::>()?) } - /// Load the persisted server configurations. #[tracing::instrument(level = "info", skip_all)] - async fn load_system_configuration(&mut self) -> Result, Error> { + async fn get_system_configurations(&mut self) -> Result, Error> { SYSTEM_CONFIGURATION_COLLECTION .peek_one(&mut self.stash) .await? @@ -484,9 +426,8 @@ impl Connection { .collect() } - /// Load all comments. #[tracing::instrument(level = "info", skip_all)] - async fn load_comments( + async fn get_comments( &mut self, ) -> Result, String)>, Error> { let comments = COMMENTS_COLLECTION @@ -500,183 +441,22 @@ impl Connection { Ok(comments) } - /// Persist mapping from system objects to global IDs and fingerprints. - /// - /// Panics if provided id is not a system id. - async fn set_system_object_mapping( - &mut self, - mappings: Vec, - ) -> Result<(), Error> { - if mappings.is_empty() { - return Ok(()); - } - - let mappings = mappings - .into_iter() - .map(|mapping| { - let id = if let GlobalId::System(id) = mapping.id { - id - } else { - panic!("non-system id provided") - }; - ( - GidMappingKey { - schema_name: mapping.schema_name, - object_type: mapping.object_type, - object_name: mapping.object_name, - }, - GidMappingValue { - id, - fingerprint: mapping.fingerprint, - }, - ) - }) - .map(|e| RustType::into_proto(&e)); - SYSTEM_GID_MAPPING_COLLECTION - .upsert(&mut self.stash, mappings) - .await - .map_err(|e| e.into()) - } - - /// Panics if provided id is not a system id - async fn set_introspection_source_index_gids( - &mut self, - mappings: Vec<(ClusterId, &str, GlobalId)>, - ) -> Result<(), Error> { - if mappings.is_empty() { - return Ok(()); - } - - let mappings = mappings + #[tracing::instrument(level = "info", skip_all)] + async fn get_timestamps(&mut self) -> Result, Error> { + let entries = TIMESTAMP_COLLECTION.peek_one(&mut self.stash).await?; + let timestamps = entries .into_iter() - .map(|(cluster_id, name, index_id)| { - let index_id = if let GlobalId::System(id) = index_id { - id - } else { - panic!("non-system id provided") - }; - ( - ClusterIntrospectionSourceIndexKey { - cluster_id, - name: name.to_string(), - }, - ClusterIntrospectionSourceIndexValue { index_id }, - ) + .map(RustType::from_proto) + .map_ok(|(k, v): (TimestampKey, TimestampValue)| { + (k.id.parse().expect("invalid timeline persisted"), v.ts) }) - .map(|e| RustType::into_proto(&e)); - CLUSTER_INTROSPECTION_SOURCE_INDEX_COLLECTION - .upsert(&mut self.stash, mappings) - .await - .map_err(|e| e.into()) - } - - /// Set the configuration of a replica. - /// This accepts only one item, as we currently use this only for the default cluster - async fn set_replica_config( - &mut self, - replica_id: ReplicaId, - cluster_id: ClusterId, - name: String, - config: ReplicaConfig, - owner_id: RoleId, - ) -> Result<(), Error> { - let key = ClusterReplicaKey { id: replica_id }.into_proto(); - let val = ClusterReplicaValue { - cluster_id, - name, - config, - owner_id, - } - .into_proto(); - CLUSTER_REPLICA_COLLECTION - .upsert_key(&mut self.stash, key, |_| Ok::<_, Error>(val)) - .await??; - Ok(()) - } - - async fn allocate_system_ids(&mut self, amount: u64) -> Result, Error> { - let id = self.allocate_id("system", amount).await?; - - Ok(id.into_iter().map(GlobalId::System).collect()) - } - - async fn allocate_user_id(&mut self) -> Result { - let id = self.allocate_id("user", 1).await?; - let id = id.into_element(); - Ok(GlobalId::User(id)) - } - - async fn allocate_system_cluster_id(&mut self) -> Result { - let id = self.allocate_id(SYSTEM_CLUSTER_ID_ALLOC_KEY, 1).await?; - let id = id.into_element(); - Ok(ClusterId::System(id)) - } - - async fn allocate_user_cluster_id(&mut self) -> Result { - let id = self.allocate_id(USER_CLUSTER_ID_ALLOC_KEY, 1).await?; - let id = id.into_element(); - Ok(ClusterId::User(id)) - } - - async fn allocate_user_replica_id(&mut self) -> Result { - let id = self.allocate_id(USER_REPLICA_ID_ALLOC_KEY, 1).await?; - let id = id.into_element(); - Ok(ReplicaId::User(id)) - } - - /// Get the next system replica id without allocating it. - async fn get_next_system_replica_id(&mut self) -> Result { - self.get_next_id(SYSTEM_REPLICA_ID_ALLOC_KEY).await - } - - /// Get the next user replica id without allocating it. - async fn get_next_user_replica_id(&mut self) -> Result { - self.get_next_id(USER_REPLICA_ID_ALLOC_KEY).await - } - - async fn get_next_id(&mut self, id_type: &str) -> Result { - ID_ALLOCATOR_COLLECTION - .peek_key_one( - &mut self.stash, - IdAllocKey { - name: id_type.to_string(), - } - .into_proto(), - ) - .await - .map(|x| x.expect("must exist").next_id) - .map_err(Into::into) - } + .collect::>()?; - #[tracing::instrument(level = "debug", skip(self))] - async fn allocate_id(&mut self, id_type: &str, amount: u64) -> Result, Error> { - if amount == 0 { - return Ok(Vec::new()); - } - let key = IdAllocKey { - name: id_type.to_string(), - } - .into_proto(); - let (prev, next) = ID_ALLOCATOR_COLLECTION - .upsert_key(&mut self.stash, key, move |prev| { - let id = prev.expect("must exist").next_id; - match id.checked_add(amount) { - Some(next_gid) => Ok(IdAllocValue { next_id: next_gid }.into_proto()), - None => Err(Error::from(SqlCatalogError::IdExhaustion)), - } - }) - .await??; - let id = prev.expect("must exist").next_id; - Ok((id..next.next_id).collect()) + Ok(timestamps) } - /// Gets a global timestamp for a timeline that has been persisted to disk. - /// - /// Returns `None` if no persisted timestamp for the specified timeline exists. - async fn try_get_persisted_timestamp( - &mut self, - timeline: &Timeline, - ) -> Result, Error> { + #[tracing::instrument(level = "info", skip_all)] + async fn get_timestamp(&mut self, timeline: &Timeline) -> Result, Error> { let key = proto::TimestampKey { id: timeline.to_string(), }; @@ -689,63 +469,38 @@ impl Connection { Ok(val.map(|v| v.ts)) } - /// Get all global timestamps that has been persisted to disk. - async fn get_all_persisted_timestamps( - &mut self, - ) -> Result, Error> { - let entries = TIMESTAMP_COLLECTION.peek_one(&mut self.stash).await?; - let timestamps = entries - .into_iter() - .map(RustType::from_proto) - .map_ok(|(k, v): (TimestampKey, TimestampValue)| { - (k.id.parse().expect("invalid timeline persisted"), v.ts) - }) + #[tracing::instrument(level = "info", skip_all)] + async fn get_audit_logs(&mut self) -> Result, Error> { + let entries = AUDIT_LOG_COLLECTION.peek_one(&mut self.stash).await?; + let logs: Vec<_> = entries + .into_keys() + .map(AuditLogKey::from_proto) + .map_ok(|e| e.event) .collect::>()?; - Ok(timestamps) + Ok(logs) } - /// Persist new global timestamp for a timeline to disk. #[tracing::instrument(level = "debug", skip(self))] - async fn persist_timestamp( - &mut self, - timeline: &Timeline, - timestamp: mz_repr::Timestamp, - ) -> Result<(), Error> { - let key = proto::TimestampKey { - id: timeline.to_string(), - }; - let (prev, next) = TIMESTAMP_COLLECTION - .upsert_key(&mut self.stash, key, move |_| { - Ok::<_, Error>(TimestampValue { ts: timestamp }.into_proto()) - }) - .await??; - if let Some(prev) = prev { - assert!(next >= prev, "global timestamp must always go up"); - } - Ok(()) - } - - async fn persist_deploy_generation(&mut self, deploy_generation: u64) -> Result<(), Error> { - CONFIG_COLLECTION - .upsert_key( + async fn get_next_id(&mut self, id_type: &str) -> Result { + ID_ALLOCATOR_COLLECTION + .peek_key_one( &mut self.stash, - proto::ConfigKey { - key: DEPLOY_GENERATION.into(), - }, - move |_| { - Ok::<_, Error>(proto::ConfigValue { - value: deploy_generation, - }) - }, + IdAllocKey { + name: id_type.to_string(), + } + .into_proto(), ) - .await??; - Ok(()) + .await + .map(|x| x.expect("must exist").next_id) + .map_err(Into::into) } +} - /// Creates a new [`Transaction`]. +#[async_trait] +impl DurableCatalogState for Connection { #[tracing::instrument(name = "storage::transaction", level = "debug", skip_all)] - async fn transaction<'a>(&'a mut self) -> Result, Error> { + async fn transaction(&mut self) -> Result { let ( databases, schemas, @@ -817,12 +572,8 @@ impl Connection { ) } - /// Confirms that this [`Connection`] is connected as the stash leader. - async fn confirm_leadership(&mut self) -> Result<(), Error> { - Ok(self.stash.confirm_leadership().await?) - } - - async fn commit(&mut self, txn_batch: TransactionBatch) -> Result<(), Error> { + #[tracing::instrument(name = "storage::transaction", level = "debug", skip_all)] + async fn commit_transaction(&mut self, txn_batch: TransactionBatch) -> Result<(), Error> { async fn add_batch<'tx, K, V>( tx: &'tx mz_stash::Transaction<'tx>, batches: &mut Vec, @@ -951,158 +702,129 @@ impl Connection { Ok(()) } -} - -#[async_trait] -impl ReadOnlyDurableCatalogState for Connection { - async fn is_initialized(&self) -> Result { - Connection::is_initialized(self).await - } - - fn is_read_only(&self) -> bool { - Connection::is_read_only(self) - } - - fn is_read_only(&self) -> bool { - Connection::is_read_only(self) - } - - fn epoch(&mut self) -> Option { - self.stash.epoch() - } - - async fn get_catalog_content_version(&mut self) -> Result, Error> { - Connection::get_catalog_content_version(self).await - } - - async fn get_clusters(&mut self) -> Result, Error> { - Connection::load_clusters(self).await - } - - async fn get_cluster_replicas(&mut self) -> Result, Error> { - Connection::load_cluster_replicas(self).await - } - - async fn get_databases(&mut self) -> Result, Error> { - Connection::load_databases(self).await - } - - async fn get_schemas(&mut self) -> Result, Error> { - Connection::load_schemas(self).await - } - - async fn get_system_items(&mut self) -> Result, Error> { - Ok(self - .load_system_gids() - .await? - .into_iter() - .map( - |((schema_name, object_type, object_name), (id, fingerprint))| { - SystemObjectMapping { - schema_name, - object_type, - object_name, - id, - fingerprint, - } - }, - ) - .collect()) - } - - async fn get_introspection_source_indexes( - &mut self, - cluster_id: ClusterId, - ) -> Result, Error> { - Connection::load_introspection_source_index_gids(self, cluster_id).await - } - - async fn get_roles(&mut self) -> Result, Error> { - Connection::load_roles(self).await - } - - async fn get_default_privileges( - &mut self, - ) -> Result, Error> { - Connection::load_default_privileges(self).await - } - - async fn get_system_privileges(&mut self) -> Result, Error> { - Connection::load_system_privileges(self).await - } - - async fn get_system_configurations(&mut self) -> Result, Error> { - Connection::load_system_configuration(self).await - } - - async fn get_comments( - &mut self, - ) -> Result, String)>, Error> { - Connection::load_comments(self).await - } - - async fn get_timestamps(&mut self) -> Result, Error> { - Connection::get_all_persisted_timestamps(self).await - } - - async fn get_timestamp(&mut self, timeline: &Timeline) -> Result, Error> { - Connection::try_get_persisted_timestamp(self, timeline).await - } - - async fn get_audit_logs(&mut self) -> Result, Error> { - Ok(Connection::load_audit_log(self).await?.collect()) - } - - async fn get_next_id(&mut self, id_type: &str) -> Result { - Connection::get_next_id(self, id_type).await - } - - async fn dump(&self) -> Result { - Connection::dump(self).await - } -} - -#[async_trait] -impl DurableCatalogState for Connection { - async fn transaction(&mut self) -> Result { - Connection::transaction(self).await - } - - async fn commit_transaction(&mut self, txn_batch: TransactionBatch) -> Result<(), Error> { - Connection::commit(self, txn_batch).await - } + #[tracing::instrument(level = "debug", skip(self))] async fn confirm_leadership(&mut self) -> Result<(), Error> { - Connection::confirm_leadership(self).await + Ok(self.stash.confirm_leadership().await?) } async fn set_connect_timeout(&mut self, connect_timeout: Duration) { - Connection::set_connect_timeout(self, connect_timeout).await + self.stash.set_connect_timeout(connect_timeout).await; } + #[tracing::instrument(level = "debug", skip(self))] async fn set_catalog_content_version(&mut self, new_version: &str) -> Result<(), Error> { - Connection::set_catalog_content_version(self, new_version).await + self.set_setting("catalog_content_version", new_version) + .await } + #[tracing::instrument(level = "info", skip_all)] async fn get_and_prune_storage_usage( &mut self, retention_period: Option, boot_ts: Timestamp, ) -> Result, Error> { - Connection::get_and_prune_storage_usage(self, retention_period, boot_ts).await + // If no usage retention period is set, set the cutoff to MIN so nothing + // is removed. + let cutoff_ts = match retention_period { + None => u128::MIN, + Some(period) => u128::from(boot_ts).saturating_sub(period.as_millis()), + }; + let is_read_only = self.is_read_only(); + Ok(self + .stash + .with_transaction(move |tx| { + Box::pin(async move { + let collection = STORAGE_USAGE_COLLECTION.from_tx(&tx).await?; + let rows = tx.peek_one(collection).await?; + let mut events = Vec::with_capacity(rows.len()); + let mut batch = collection.make_batch_tx(&tx).await?; + for ev in rows.into_keys() { + let event: StorageUsageKey = ev.clone().into_rust()?; + if u128::from(event.metric.timestamp()) >= cutoff_ts { + events.push(event.metric); + } else if retention_period.is_some() { + collection.append_to_batch(&mut batch, &ev, &(), -1); + } + } + // Delete things only if a retention period is + // specified (otherwise opening readonly catalogs + // can fail). + if retention_period.is_some() && !is_read_only { + tx.append(vec![batch]).await?; + } + Ok(events) + }) + }) + .await?) } + #[tracing::instrument(level = "debug", skip(self))] async fn set_system_items(&mut self, mappings: Vec) -> Result<(), Error> { - Connection::set_system_object_mapping(self, mappings).await + if mappings.is_empty() { + return Ok(()); + } + + let mappings = mappings + .into_iter() + .map(|mapping| { + let id = if let GlobalId::System(id) = mapping.id { + id + } else { + panic!("non-system id provided") + }; + ( + GidMappingKey { + schema_name: mapping.schema_name, + object_type: mapping.object_type, + object_name: mapping.object_name, + }, + GidMappingValue { + id, + fingerprint: mapping.fingerprint, + }, + ) + }) + .map(|e| RustType::into_proto(&e)); + SYSTEM_GID_MAPPING_COLLECTION + .upsert(&mut self.stash, mappings) + .await + .map_err(|e| e.into()) } + #[tracing::instrument(level = "debug", skip(self))] async fn set_introspection_source_indexes( &mut self, mappings: Vec<(ClusterId, &str, GlobalId)>, ) -> Result<(), Error> { - Connection::set_introspection_source_index_gids(self, mappings).await + if mappings.is_empty() { + return Ok(()); + } + + let mappings = mappings + .into_iter() + .map(|(cluster_id, name, index_id)| { + let index_id = if let GlobalId::System(id) = index_id { + id + } else { + panic!("non-system id provided") + }; + ( + ClusterIntrospectionSourceIndexKey { + cluster_id, + name: name.to_string(), + }, + ClusterIntrospectionSourceIndexValue { index_id }, + ) + }) + .map(|e| RustType::into_proto(&e)); + CLUSTER_INTROSPECTION_SOURCE_INDEX_COLLECTION + .upsert(&mut self.stash, mappings) + .await + .map_err(|e| e.into()) } + #[tracing::instrument(level = "debug", skip(self))] async fn set_replica_config( &mut self, replica_id: ReplicaId, @@ -1111,23 +833,78 @@ impl DurableCatalogState for Connection { config: ReplicaConfig, owner_id: RoleId, ) -> Result<(), Error> { - Connection::set_replica_config(self, replica_id, cluster_id, name, config, owner_id).await + let key = ClusterReplicaKey { id: replica_id }.into_proto(); + let val = ClusterReplicaValue { + cluster_id, + name, + config, + owner_id, + } + .into_proto(); + CLUSTER_REPLICA_COLLECTION + .upsert_key(&mut self.stash, key, |_| Ok::<_, Error>(val)) + .await??; + Ok(()) } - async fn persist_timestamp( + #[tracing::instrument(level = "debug", skip(self))] + async fn set_timestamp( &mut self, timeline: &Timeline, timestamp: Timestamp, ) -> Result<(), Error> { - Connection::persist_timestamp(self, timeline, timestamp).await + let key = proto::TimestampKey { + id: timeline.to_string(), + }; + let (prev, next) = TIMESTAMP_COLLECTION + .upsert_key(&mut self.stash, key, move |_| { + Ok::<_, Error>(TimestampValue { ts: timestamp }.into_proto()) + }) + .await??; + if let Some(prev) = prev { + assert!(next >= prev, "global timestamp must always go up"); + } + Ok(()) } + #[tracing::instrument(level = "debug", skip(self))] async fn set_deploy_generation(&mut self, deploy_generation: u64) -> Result<(), Error> { - Connection::set_deploy_generation(self, deploy_generation).await + CONFIG_COLLECTION + .upsert_key( + &mut self.stash, + proto::ConfigKey { + key: DEPLOY_GENERATION.into(), + }, + move |_| { + Ok::<_, Error>(proto::ConfigValue { + value: deploy_generation, + }) + }, + ) + .await??; + Ok(()) } + #[tracing::instrument(level = "debug", skip(self))] async fn allocate_id(&mut self, id_type: &str, amount: u64) -> Result, Error> { - Connection::allocate_id(self, id_type, amount).await + if amount == 0 { + return Ok(Vec::new()); + } + let key = IdAllocKey { + name: id_type.to_string(), + } + .into_proto(); + let (prev, next) = ID_ALLOCATOR_COLLECTION + .upsert_key(&mut self.stash, key, move |prev| { + let id = prev.expect("must exist").next_id; + match id.checked_add(amount) { + Some(next_gid) => Ok(IdAllocValue { next_id: next_gid }.into_proto()), + None => Err(Error::from(SqlCatalogError::IdExhaustion)), + } + }) + .await??; + let id = prev.expect("must exist").next_id; + Ok((id..next.next_id).collect()) } }