Skip to content

Commit

Permalink
catalog: Simplify shard ID generation (#30597)
Browse files Browse the repository at this point in the history
The catalog has functionality to deterministically generate a random
shard ID for a given environment. It used when we need a shard before
the catalog is fully opened. It is strictly worse than generating a
totally random shard because we limit the amount of randomness.

The catalog also has an un-migratable collection called "settings" which
maps string keys to string values. This collection is accessible
immediately after reading in a snapshot of the catalog, before we finish
opening the catalog. Any shard ID used after reading in the snapshot can
be generated fully randomly and stored in the settings collection.

Two shards fit into this category:

  - The builtin migration shard.
  - The expression cache shard.

This commit adds a migration to move both those shards to the settings
collection for existing environments. New environments will generate the
shards randomly and stash the IDs in the settings collection.
  • Loading branch information
jkosh44 authored Nov 25, 2024
1 parent d0f68ce commit 2141dfd
Show file tree
Hide file tree
Showing 10 changed files with 240 additions and 85 deletions.
39 changes: 37 additions & 2 deletions src/adapter/src/catalog/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,20 @@
use std::collections::BTreeMap;

use mz_catalog::builtin::BuiltinTable;
use mz_catalog::durable::Transaction;
use mz_catalog::durable::{
shard_id, Transaction, BUILTIN_MIGRATION_SEED, BUILTIN_MIGRATION_SHARD_KEY,
EXPRESSION_CACHE_SEED, EXPRESSION_CACHE_SHARD_KEY,
};
use mz_catalog::memory::objects::StateUpdate;
use mz_ore::collections::CollectionExt;
use mz_ore::now::NowFn;
use mz_persist_types::ShardId;
use mz_repr::{CatalogItemId, Timestamp};
use mz_sql::ast::display::AstDisplay;
use mz_sql_parser::ast::{Raw, Statement};
use semver::Version;
use tracing::info;
use uuid::Uuid;
// DO NOT add any more imports from `crate` outside of `crate::catalog`.
use crate::catalog::open::into_consolidatable_updates_startup;
use crate::catalog::state::LocalExpressionCache;
Expand Down Expand Up @@ -185,12 +190,42 @@ pub(crate) async fn migrate(

/// Migrations that run only on the durable catalog before any data is loaded into memory.
pub(crate) fn durable_migrate(
_tx: &mut Transaction,
tx: &mut Transaction,
organization_id: Uuid,
_boot_ts: Timestamp,
) -> Result<(), anyhow::Error> {
// Insert the builtin migration shard into the settings collection.
if tx.get_builtin_migration_shard().is_none() {
let builtin_migration_shard = builtin_migration_shard_id(organization_id);
tx.set_setting(
BUILTIN_MIGRATION_SHARD_KEY.to_string(),
Some(builtin_migration_shard.to_string()),
)?;
}

// Insert the expression cache shard into the settings collection.
if tx.get_expression_cache_shard().is_none() {
let expression_cache_shard = expression_cache_shard_id(organization_id);
tx.set_setting(
EXPRESSION_CACHE_SHARD_KEY.to_string(),
Some(expression_cache_shard.to_string()),
)?;
}
Ok(())
}

/// Deterministically generate a builtin table migration shard ID for the given
/// `organization_id`.
fn builtin_migration_shard_id(organization_id: Uuid) -> ShardId {
shard_id(organization_id, BUILTIN_MIGRATION_SEED)
}

/// Deterministically generate an expression cache shard ID for the given
/// `organization_id`.
pub fn expression_cache_shard_id(organization_id: Uuid) -> ShardId {
shard_id(organization_id, EXPRESSION_CACHE_SEED)
}

// Add new migrations below their appropriate heading, and precede them with a
// short summary of the migration's purpose and optional additional commentary
// about safety or approach.
Expand Down
10 changes: 8 additions & 2 deletions src/adapter/src/catalog/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,11 @@ impl Catalog {

// Migrate/update durable data before we start loading the in-memory catalog.
let (migrated_builtins, new_builtin_collections) = {
migrate::durable_migrate(&mut txn, config.boot_ts)?;
migrate::durable_migrate(
&mut txn,
state.config.environment_id.organization_id(),
config.boot_ts,
)?;
// Overwrite and persist selected parameter values in `remote_system_parameters` that
// was pulled from a remote frontend (e.g. LaunchDarkly) if present.
if let Some(remote_system_parameters) = config.remote_system_parameters {
Expand Down Expand Up @@ -456,8 +460,10 @@ impl Catalog {
let dyncfgs = config.persist_client.dyncfgs().clone();
let expr_cache_config = ExpressionCacheConfig {
deploy_generation,
shard_id: txn
.get_expression_cache_shard()
.expect("expression cache shard should exist for opened catalogs"),
persist: config.persist_client,
organization_id: state.config.environment_id.organization_id(),
current_ids,
remove_prior_gens: !config.read_only,
compact_shard: config.read_only,
Expand Down
7 changes: 4 additions & 3 deletions src/adapter/src/catalog/open/builtin_item_migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ use mz_catalog::builtin::{BuiltinTable, Fingerprint, BUILTINS};
use mz_catalog::config::BuiltinItemMigrationConfig;
use mz_catalog::durable::objects::SystemObjectUniqueIdentifier;
use mz_catalog::durable::{
builtin_migration_shard_id, DurableCatalogError, FenceError, SystemObjectDescription,
SystemObjectMapping, Transaction,
DurableCatalogError, FenceError, SystemObjectDescription, SystemObjectMapping, Transaction,
};
use mz_catalog::memory::error::{Error, ErrorKind};
use mz_catalog::memory::objects::CatalogItem;
Expand Down Expand Up @@ -233,7 +232,9 @@ async fn migrate_builtin_items_0dt(

// 1. Open migration shard.
let organization_id = state.config.environment_id.organization_id();
let shard_id = builtin_migration_shard_id(organization_id);
let shard_id = txn
.get_builtin_migration_shard()
.expect("builtin migration shard should exist for opened catalogs");
let diagnostics = Diagnostics {
shard_name: "builtin_migration".to_string(),
handle_purpose: format!("builtin table migration shard for org {organization_id:?} generation {deploy_generation:?}"),
Expand Down
4 changes: 3 additions & 1 deletion src/catalog/src/durable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub use crate::durable::objects::{
ReplicaLocation, Role, Schema, SourceReference, SourceReferences, StorageCollectionMetadata,
SystemConfiguration, SystemObjectDescription, SystemObjectMapping, UnfinalizedShard,
};
pub use crate::durable::persist::{builtin_migration_shard_id, expression_cache_shard_id};
pub use crate::durable::persist::{shard_id, BUILTIN_MIGRATION_SEED, EXPRESSION_CACHE_SEED};
use crate::durable::persist::{Timestamp, UnopenedPersistCatalogState};
pub use crate::durable::transaction::Transaction;
use crate::durable::transaction::TransactionBatch;
Expand Down Expand Up @@ -67,6 +67,8 @@ pub const STORAGE_USAGE_ID_ALLOC_KEY: &str = "storage_usage";
pub const USER_NETWORK_POLICY_ID_ALLOC_KEY: &str = "user_network_policy";
pub const OID_ALLOC_KEY: &str = "oid";
pub(crate) const CATALOG_CONTENT_VERSION_KEY: &str = "catalog_content_version";
pub const BUILTIN_MIGRATION_SHARD_KEY: &str = "builtin_migration_shard";
pub const EXPRESSION_CACHE_SHARD_KEY: &str = "expression_cache_shard";

#[derive(Clone, Debug)]
pub struct BootstrapArgs {
Expand Down
28 changes: 20 additions & 8 deletions src/catalog/src/durable/initialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use mz_controller::clusters::ReplicaLogging;
use mz_controller_types::{ClusterId, ReplicaId};
use mz_ore::collections::HashSet;
use mz_ore::now::EpochMillis;
use mz_persist_types::ShardId;
use mz_pgrepr::oid::{
FIRST_USER_OID, NETWORK_POLICIES_DEFAULT_POLICY_OID, ROLE_PUBLIC_OID,
SCHEMA_INFORMATION_SCHEMA_OID, SCHEMA_MZ_CATALOG_OID, SCHEMA_MZ_CATALOG_UNSTABLE_OID,
Expand All @@ -45,10 +46,11 @@ use crate::durable::upgrade::CATALOG_VERSION;
use crate::durable::{
BootstrapArgs, CatalogError, ClusterConfig, ClusterVariant, ClusterVariantManaged,
DefaultPrivilege, ReplicaConfig, ReplicaLocation, Role, Schema, Transaction,
AUDIT_LOG_ID_ALLOC_KEY, CATALOG_CONTENT_VERSION_KEY, DATABASE_ID_ALLOC_KEY, OID_ALLOC_KEY,
SCHEMA_ID_ALLOC_KEY, STORAGE_USAGE_ID_ALLOC_KEY, SYSTEM_CLUSTER_ID_ALLOC_KEY,
SYSTEM_REPLICA_ID_ALLOC_KEY, USER_CLUSTER_ID_ALLOC_KEY, USER_NETWORK_POLICY_ID_ALLOC_KEY,
USER_REPLICA_ID_ALLOC_KEY, USER_ROLE_ID_ALLOC_KEY,
AUDIT_LOG_ID_ALLOC_KEY, BUILTIN_MIGRATION_SHARD_KEY, CATALOG_CONTENT_VERSION_KEY,
DATABASE_ID_ALLOC_KEY, EXPRESSION_CACHE_SHARD_KEY, OID_ALLOC_KEY, SCHEMA_ID_ALLOC_KEY,
STORAGE_USAGE_ID_ALLOC_KEY, SYSTEM_CLUSTER_ID_ALLOC_KEY, SYSTEM_REPLICA_ID_ALLOC_KEY,
USER_CLUSTER_ID_ALLOC_KEY, USER_NETWORK_POLICY_ID_ALLOC_KEY, USER_REPLICA_ID_ALLOC_KEY,
USER_ROLE_ID_ALLOC_KEY,
};

/// The key within the "config" Collection that stores the version of the catalog.
Expand Down Expand Up @@ -742,10 +744,20 @@ pub(crate) async fn initialize(
tx.insert_config(key, value)?;
}

for (name, value) in [(
CATALOG_CONTENT_VERSION_KEY.to_string(),
catalog_content_version,
)] {
for (name, value) in [
(
CATALOG_CONTENT_VERSION_KEY.to_string(),
catalog_content_version,
),
(
BUILTIN_MIGRATION_SHARD_KEY.to_string(),
ShardId::new().to_string(),
),
(
EXPRESSION_CACHE_SHARD_KEY.to_string(),
ShardId::new().to_string(),
),
] {
tx.set_setting(name, Some(value))?;
}

Expand Down
18 changes: 3 additions & 15 deletions src/catalog/src/durable/persist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,9 @@ const CATALOG_SEED: usize = 1;
/// race conditions where the shard version decreases after reading it.
const UPGRADE_SEED: usize = 2;
/// Seed used to generate the persist shard ID for builtin table migrations.
const BUILTIN_MIGRATION_SEED: usize = 3;
pub const BUILTIN_MIGRATION_SEED: usize = 3;
/// Seed used to generate the persist shard ID for the expression cache.
const EXPRESSION_CACHE_SEED: usize = 4;
pub const EXPRESSION_CACHE_SEED: usize = 4;

/// Durable catalog mode that dictates the effect of mutable operations.
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
Expand Down Expand Up @@ -1739,20 +1739,8 @@ impl DurableCatalogState for PersistCatalogState {
}
}

/// Deterministically generate a builtin table migration shard ID for the given
/// `organization_id`.
pub fn builtin_migration_shard_id(organization_id: Uuid) -> ShardId {
shard_id(organization_id, BUILTIN_MIGRATION_SEED)
}

/// Deterministically generate an expression cache shard ID for the given
/// `organization_id`.
pub fn expression_cache_shard_id(organization_id: Uuid) -> ShardId {
shard_id(organization_id, EXPRESSION_CACHE_SEED)
}

/// Deterministically generate a shard ID for the given `organization_id` and `seed`.
fn shard_id(organization_id: Uuid, seed: usize) -> ShardId {
pub fn shard_id(organization_id: Uuid, seed: usize) -> ShardId {
let hash = sha2::Sha256::digest(format!("{organization_id}{seed}")).to_vec();
soft_assert_eq_or_log!(hash.len(), 32, "SHA256 returns 32 bytes (256 bits)");
let uuid = Uuid::from_slice(&hash[0..16]).expect("from_slice accepts exactly 16 bytes");
Expand Down
38 changes: 25 additions & 13 deletions src/catalog/src/durable/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,11 @@ use crate::durable::objects::{
};
use crate::durable::{
CatalogError, DefaultPrivilege, DurableCatalogError, DurableCatalogState, NetworkPolicy,
Snapshot, SystemConfiguration, AUDIT_LOG_ID_ALLOC_KEY, CATALOG_CONTENT_VERSION_KEY,
DATABASE_ID_ALLOC_KEY, OID_ALLOC_KEY, SCHEMA_ID_ALLOC_KEY, STORAGE_USAGE_ID_ALLOC_KEY,
SYSTEM_ITEM_ALLOC_KEY, SYSTEM_REPLICA_ID_ALLOC_KEY, USER_ITEM_ALLOC_KEY,
USER_NETWORK_POLICY_ID_ALLOC_KEY, USER_REPLICA_ID_ALLOC_KEY, USER_ROLE_ID_ALLOC_KEY,
Snapshot, SystemConfiguration, AUDIT_LOG_ID_ALLOC_KEY, BUILTIN_MIGRATION_SHARD_KEY,
CATALOG_CONTENT_VERSION_KEY, DATABASE_ID_ALLOC_KEY, EXPRESSION_CACHE_SHARD_KEY, OID_ALLOC_KEY,
SCHEMA_ID_ALLOC_KEY, STORAGE_USAGE_ID_ALLOC_KEY, SYSTEM_ITEM_ALLOC_KEY,
SYSTEM_REPLICA_ID_ALLOC_KEY, USER_ITEM_ALLOC_KEY, USER_NETWORK_POLICY_ID_ALLOC_KEY,
USER_REPLICA_ID_ALLOC_KEY, USER_ROLE_ID_ALLOC_KEY,
};
use crate::memory::objects::{StateDiff, StateUpdate, StateUpdateKind};

Expand Down Expand Up @@ -1629,11 +1630,7 @@ impl<'a> Transaction<'a> {
}

/// Set persisted setting.
pub(crate) fn set_setting(
&mut self,
name: String,
value: Option<String>,
) -> Result<(), CatalogError> {
pub fn set_setting(&mut self, name: String, value: Option<String>) -> Result<(), CatalogError> {
self.settings.set(
SettingKey { name },
value.map(|value| SettingValue { value }),
Expand Down Expand Up @@ -1730,11 +1727,26 @@ impl<'a> Transaction<'a> {

/// Get the value of a persisted config.
pub fn get_config(&self, key: String) -> Option<u64> {
let val = self
.configs
self.configs
.get(&ConfigKey { key })
.map(|entry| entry.value);
val
.map(|entry| entry.value)
}

/// Get the value of a persisted setting.
fn get_setting(&self, name: String) -> Option<String> {
self.settings
.get(&SettingKey { name })
.map(|entry| entry.value)
}

pub fn get_builtin_migration_shard(&self) -> Option<ShardId> {
self.get_setting(BUILTIN_MIGRATION_SHARD_KEY.to_string())
.map(|shard_id| shard_id.parse().expect("valid ShardId"))
}

pub fn get_expression_cache_shard(&self) -> Option<ShardId> {
self.get_setting(EXPRESSION_CACHE_SHARD_KEY.to_string())
.map(|shard_id| shard_id.parse().expect("valid ShardId"))
}

/// Updates the catalog `enable_0dt_deployment` "config" value to
Expand Down
Loading

0 comments on commit 2141dfd

Please sign in to comment.