Skip to content

Commit

Permalink
Update callers to use trait methods
Browse files Browse the repository at this point in the history
  • Loading branch information
jkosh44 committed Sep 17, 2023
1 parent 6112120 commit 553fce7
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 65 deletions.
80 changes: 54 additions & 26 deletions src/adapter/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ pub const LINKED_CLUSTER_REPLICA_NAME: &str = "linked";
pub struct Catalog {
state: CatalogState,
plans: CatalogPlans,
storage: Arc<tokio::sync::Mutex<mz_catalog::Connection>>,
storage: Arc<tokio::sync::Mutex<Box<dyn mz_catalog::DurableCatalogState>>>,
transient_revision: u64,
}

Expand Down Expand Up @@ -3617,7 +3617,7 @@ impl Catalog {
let boot_ts = {
let mut storage = catalog.storage().await;
let previous_ts = storage
.try_get_persisted_timestamp(&Timeline::EpochMilliseconds)
.get_timestamp(&Timeline::EpochMilliseconds)
.await?
.expect("missing EpochMilliseconds timeline");
let boot_ts = timeline::monotonic_now(config.now, previous_ts);
Expand All @@ -3633,7 +3633,7 @@ impl Catalog {

catalog.create_temporary_schema(&SYSTEM_CONN_ID, MZ_SYSTEM_ROLE_ID)?;

let databases = catalog.storage().await.load_databases().await?;
let databases = catalog.storage().await.get_databases().await?;
for mz_catalog::Database {
id,
name,
Expand All @@ -3660,7 +3660,7 @@ impl Catalog {
.insert(name.clone(), id.clone());
}

let schemas = catalog.storage().await.load_schemas().await?;
let schemas = catalog.storage().await.get_schemas().await?;
for mz_catalog::Schema {
id,
name,
Expand Down Expand Up @@ -3707,7 +3707,7 @@ impl Catalog {
schemas_by_name.insert(name.clone(), id);
}

let roles = catalog.storage().await.load_roles().await?;
let roles = catalog.storage().await.get_roles().await?;
for mz_catalog::Role {
id,
name,
Expand All @@ -3729,15 +3729,15 @@ impl Catalog {
);
}

let default_privileges = catalog.storage().await.load_default_privileges().await?;
let default_privileges = catalog.storage().await.get_default_privileges().await?;
for (default_privilege_object, default_privilege) in default_privileges {
catalog
.state
.default_privileges
.grant(default_privilege_object, default_privilege);
}

let system_privileges = catalog.storage().await.load_system_privileges().await?;
let system_privileges = catalog.storage().await.get_system_privileges().await?;
catalog.state.system_privileges.grant_all(system_privileges);

catalog
Expand All @@ -3754,7 +3754,7 @@ impl Catalog {
mz_repr::VARIABLE_LENGTH_ROW_ENCODING
.store(variable_length_row_encoding, atomic::Ordering::SeqCst);

let comments = catalog.storage().await.load_comments().await?;
let comments = catalog.storage().await.get_comments().await?;
for (object_id, sub_component, comment) in comments {
catalog
.state
Expand All @@ -3772,7 +3772,23 @@ impl Catalog {

catalog.load_builtin_types().await?;

let persisted_builtin_ids = catalog.storage().await.load_system_gids().await?;
let persisted_builtin_ids: BTreeMap<_, _> = catalog
.storage()
.await
.get_system_items()
.await?
.into_iter()
.map(|mapping| {
(
(
mapping.schema_name,
mapping.object_type,
mapping.object_name,
),
(mapping.id, mapping.fingerprint),
)
})
.collect();
let AllocatedBuiltinSystemIds {
all_builtins,
new_builtins,
Expand Down Expand Up @@ -3960,7 +3976,7 @@ impl Catalog {
}
}

let clusters = catalog.storage().await.load_clusters().await?;
let clusters = catalog.storage().await.get_clusters().await?;
let mut cluster_azs = BTreeMap::new();
for mz_catalog::Cluster {
id,
Expand All @@ -3974,7 +3990,7 @@ impl Catalog {
let introspection_source_index_gids = catalog
.storage()
.await
.load_introspection_source_index_gids(id)
.get_introspection_source_indexes(id)
.await?;

let AllocatedBuiltinSystemIds {
Expand All @@ -3994,7 +4010,7 @@ impl Catalog {
catalog
.storage()
.await
.set_introspection_source_index_gids(
.set_introspection_source_indexes(
new_indexes
.iter()
.map(|(log, index_id)| (id, log.name, *index_id))
Expand All @@ -4017,7 +4033,7 @@ impl Catalog {
);
}

let replicas = catalog.storage().await.load_cluster_replicas().await?;
let replicas = catalog.storage().await.get_cluster_replicas().await?;
for mz_catalog::ClusterReplica {
cluster_id,
replica_id,
Expand Down Expand Up @@ -4127,7 +4143,7 @@ impl Catalog {
catalog
.storage()
.await
.set_system_object_mapping(new_system_id_mappings)
.set_system_items(new_system_id_mappings)
.await?;

let last_seen_version = catalog
Expand Down Expand Up @@ -4291,7 +4307,7 @@ impl Catalog {
_ => unreachable!("all operators must be scalar functions"),
}
}
let audit_logs = catalog.storage().await.load_audit_log().await?;
let audit_logs = catalog.storage().await.get_audit_logs().await?;
for event in audit_logs {
builtin_table_updates.push(catalog.state.pack_audit_log_update(&event)?);
}
Expand All @@ -4302,7 +4318,7 @@ impl Catalog {
let storage_usage_events = catalog
.storage()
.await
.fetch_and_prune_storage_usage(config.storage_usage_retention_period, boot_ts)
.get_and_prune_storage_usage(config.storage_usage_retention_period, boot_ts)
.await?;
for event in storage_usage_events {
builtin_table_updates.push(catalog.state.pack_storage_usage_update(&event)?);
Expand Down Expand Up @@ -4344,7 +4360,7 @@ impl Catalog {
system_parameter_sync_config: Option<SystemParameterSyncConfig>,
boot_ts: mz_repr::Timestamp,
) -> Result<(), AdapterError> {
let system_config = self.storage().await.load_system_configuration().await?;
let system_config = self.storage().await.get_system_configurations().await?;

for (name, value) in &system_parameter_defaults {
match self
Expand Down Expand Up @@ -4459,7 +4475,23 @@ impl Catalog {
/// resolve all references.
#[tracing::instrument(level = "info", skip_all)]
async fn load_builtin_types(&mut self) -> Result<(), Error> {
let persisted_builtin_ids = self.storage().await.load_system_gids().await?;
let persisted_builtin_ids: BTreeMap<_, _> = self
.storage()
.await
.get_system_items()
.await?
.into_iter()
.map(|mapping| {
(
(
mapping.schema_name,
mapping.object_type,
mapping.object_name,
),
(mapping.id, mapping.fingerprint),
)
})
.collect();

let AllocatedBuiltinSystemIds {
all_builtins,
Expand Down Expand Up @@ -4542,7 +4574,7 @@ impl Catalog {
.collect();
self.storage()
.await
.set_system_object_mapping(new_system_id_mappings)
.set_system_items(new_system_id_mappings)
.await?;

Ok(())
Expand Down Expand Up @@ -5092,7 +5124,7 @@ impl Catalog {
"false"
};
let (catalog, _, _, _) = Catalog::open(Config {
storage,
storage: Box::new(storage),
unsafe_mode: true,
all_features: false,
build_info: &DUMMY_BUILD_INFO,
Expand Down Expand Up @@ -5179,7 +5211,7 @@ impl Catalog {
Self::for_sessionless_user_state(state, MZ_SYSTEM_ROLE_ID)
}

async fn storage<'a>(&'a self) -> MutexGuard<'a, mz_catalog::Connection> {
async fn storage<'a>(&'a self) -> MutexGuard<'a, Box<dyn mz_catalog::DurableCatalogState>> {
self.storage.lock().await
}

Expand Down Expand Up @@ -5275,11 +5307,7 @@ impl Catalog {
pub async fn get_all_persisted_timestamps(
&self,
) -> Result<BTreeMap<Timeline, mz_repr::Timestamp>, Error> {
self.storage()
.await
.get_all_persisted_timestamps()
.await
.err_into()
self.storage().await.get_timestamps().await.err_into()
}

/// Get the next system replica id without allocating it.
Expand Down
2 changes: 1 addition & 1 deletion src/adapter/src/catalog/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::config::SystemParameterSyncConfig;
#[derive(Debug)]
pub struct Config<'a> {
/// The connection to the stash.
pub storage: mz_catalog::Connection,
pub storage: Box<dyn mz_catalog::DurableCatalogState>,
/// Whether to enable unsafe mode.
pub unsafe_mode: bool,
/// Whether the build is a local dev build.
Expand Down
2 changes: 1 addition & 1 deletion src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ impl PlanValidity {
/// Configures a coordinator.
pub struct Config {
pub dataflow_client: mz_controller::Controller,
pub storage: mz_catalog::Connection,
pub storage: Box<dyn mz_catalog::DurableCatalogState>,
pub unsafe_mode: bool,
pub all_features: bool,
pub build_info: &'static BuildInfo,
Expand Down
7 changes: 5 additions & 2 deletions src/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@
use async_trait::async_trait;
use std::collections::BTreeMap;
use std::fmt;
use std::fmt::Formatter;
use std::fmt::{Debug, Formatter};
use std::num::NonZeroI64;
use std::time::Duration;

Expand Down Expand Up @@ -181,7 +181,7 @@ pub struct BootstrapArgs {

/// A read only API for the durable catalog state.
#[async_trait]
pub trait ReadOnlyDurableCatalogState {
pub trait ReadOnlyDurableCatalogState: Debug + Send {
/// Reports if the catalog state has been initialized.
async fn is_initialized(&self) -> Result<bool, Error>;

Expand All @@ -201,6 +201,9 @@ pub trait ReadOnlyDurableCatalogState {
/// it will fail to open in read only mode.
async fn open_read_only(&mut self) -> Result<(), Error>;*/

/// Returns true if the catalog is opened in read only mode, false otherwise.
fn is_read_only(&self) -> bool;

/// Returns the epoch of the current durable catalog state. The epoch acts as
/// a fencing token to prevent split brain issues across two
/// [`DurableCatalogState`]s. When a new [`DurableCatalogState`] opens the
Expand Down
Loading

0 comments on commit 553fce7

Please sign in to comment.