Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

catalog: Simplify shard ID generation #30597

Merged
merged 1 commit into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 37 additions & 2 deletions src/adapter/src/catalog/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,20 @@
use std::collections::BTreeMap;

use mz_catalog::builtin::BuiltinTable;
use mz_catalog::durable::Transaction;
use mz_catalog::durable::{
shard_id, Transaction, BUILTIN_MIGRATION_SEED, BUILTIN_MIGRATION_SHARD_KEY,
EXPRESSION_CACHE_SEED, EXPRESSION_CACHE_SHARD_KEY,
};
use mz_catalog::memory::objects::StateUpdate;
use mz_ore::collections::CollectionExt;
use mz_ore::now::NowFn;
use mz_persist_types::ShardId;
use mz_repr::{CatalogItemId, Timestamp};
use mz_sql::ast::display::AstDisplay;
use mz_sql_parser::ast::{Raw, Statement};
use semver::Version;
use tracing::info;
use uuid::Uuid;
// DO NOT add any more imports from `crate` outside of `crate::catalog`.
use crate::catalog::open::into_consolidatable_updates_startup;
use crate::catalog::state::LocalExpressionCache;
Expand Down Expand Up @@ -185,12 +190,42 @@ pub(crate) async fn migrate(

/// Migrations that run only on the durable catalog before any data is loaded into memory.
pub(crate) fn durable_migrate(
_tx: &mut Transaction,
tx: &mut Transaction,
organization_id: Uuid,
_boot_ts: Timestamp,
) -> Result<(), anyhow::Error> {
// Insert the builtin migration shard into the settings collection.
if tx.get_builtin_migration_shard().is_none() {
let builtin_migration_shard = builtin_migration_shard_id(organization_id);
tx.set_setting(
BUILTIN_MIGRATION_SHARD_KEY.to_string(),
Some(builtin_migration_shard.to_string()),
)?;
}

// Insert the expression cache shard into the settings collection.
if tx.get_expression_cache_shard().is_none() {
let expression_cache_shard = expression_cache_shard_id(organization_id);
tx.set_setting(
EXPRESSION_CACHE_SHARD_KEY.to_string(),
Some(expression_cache_shard.to_string()),
)?;
}
Ok(())
}

/// Deterministically generate a builtin table migration shard ID for the given
/// `organization_id`.
fn builtin_migration_shard_id(organization_id: Uuid) -> ShardId {
shard_id(organization_id, BUILTIN_MIGRATION_SEED)
}

/// Deterministically generate an expression cache shard ID for the given
/// `organization_id`.
pub fn expression_cache_shard_id(organization_id: Uuid) -> ShardId {
shard_id(organization_id, EXPRESSION_CACHE_SEED)
}

// Add new migrations below their appropriate heading, and precede them with a
// short summary of the migration's purpose and optional additional commentary
// about safety or approach.
Expand Down
10 changes: 8 additions & 2 deletions src/adapter/src/catalog/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,11 @@ impl Catalog {

// Migrate/update durable data before we start loading the in-memory catalog.
let (migrated_builtins, new_builtin_collections) = {
migrate::durable_migrate(&mut txn, config.boot_ts)?;
migrate::durable_migrate(
&mut txn,
state.config.environment_id.organization_id(),
config.boot_ts,
)?;
// Overwrite and persist selected parameter values in `remote_system_parameters` that
// was pulled from a remote frontend (e.g. LaunchDarkly) if present.
if let Some(remote_system_parameters) = config.remote_system_parameters {
Expand Down Expand Up @@ -456,8 +460,10 @@ impl Catalog {
let dyncfgs = config.persist_client.dyncfgs().clone();
let expr_cache_config = ExpressionCacheConfig {
deploy_generation,
shard_id: txn
.get_expression_cache_shard()
.expect("expression cache shard should exist for opened catalogs"),
persist: config.persist_client,
organization_id: state.config.environment_id.organization_id(),
current_ids,
remove_prior_gens: !config.read_only,
compact_shard: config.read_only,
Expand Down
7 changes: 4 additions & 3 deletions src/adapter/src/catalog/open/builtin_item_migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ use mz_catalog::builtin::{BuiltinTable, Fingerprint, BUILTINS};
use mz_catalog::config::BuiltinItemMigrationConfig;
use mz_catalog::durable::objects::SystemObjectUniqueIdentifier;
use mz_catalog::durable::{
builtin_migration_shard_id, DurableCatalogError, FenceError, SystemObjectDescription,
SystemObjectMapping, Transaction,
DurableCatalogError, FenceError, SystemObjectDescription, SystemObjectMapping, Transaction,
};
use mz_catalog::memory::error::{Error, ErrorKind};
use mz_catalog::memory::objects::CatalogItem;
Expand Down Expand Up @@ -233,7 +232,9 @@ async fn migrate_builtin_items_0dt(

// 1. Open migration shard.
let organization_id = state.config.environment_id.organization_id();
let shard_id = builtin_migration_shard_id(organization_id);
let shard_id = txn
.get_builtin_migration_shard()
.expect("builtin migration shard should exist for opened catalogs");
let diagnostics = Diagnostics {
shard_name: "builtin_migration".to_string(),
handle_purpose: format!("builtin table migration shard for org {organization_id:?} generation {deploy_generation:?}"),
Expand Down
4 changes: 3 additions & 1 deletion src/catalog/src/durable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub use crate::durable::objects::{
ReplicaLocation, Role, Schema, SourceReference, SourceReferences, StorageCollectionMetadata,
SystemConfiguration, SystemObjectDescription, SystemObjectMapping, UnfinalizedShard,
};
pub use crate::durable::persist::{builtin_migration_shard_id, expression_cache_shard_id};
pub use crate::durable::persist::{shard_id, BUILTIN_MIGRATION_SEED, EXPRESSION_CACHE_SEED};
use crate::durable::persist::{Timestamp, UnopenedPersistCatalogState};
pub use crate::durable::transaction::Transaction;
use crate::durable::transaction::TransactionBatch;
Expand Down Expand Up @@ -67,6 +67,8 @@ pub const STORAGE_USAGE_ID_ALLOC_KEY: &str = "storage_usage";
pub const USER_NETWORK_POLICY_ID_ALLOC_KEY: &str = "user_network_policy";
pub const OID_ALLOC_KEY: &str = "oid";
pub(crate) const CATALOG_CONTENT_VERSION_KEY: &str = "catalog_content_version";
pub const BUILTIN_MIGRATION_SHARD_KEY: &str = "builtin_migration_shard";
pub const EXPRESSION_CACHE_SHARD_KEY: &str = "expression_cache_shard";

#[derive(Clone, Debug)]
pub struct BootstrapArgs {
Expand Down
28 changes: 20 additions & 8 deletions src/catalog/src/durable/initialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use mz_controller::clusters::ReplicaLogging;
use mz_controller_types::{ClusterId, ReplicaId};
use mz_ore::collections::HashSet;
use mz_ore::now::EpochMillis;
use mz_persist_types::ShardId;
use mz_pgrepr::oid::{
FIRST_USER_OID, NETWORK_POLICIES_DEFAULT_POLICY_OID, ROLE_PUBLIC_OID,
SCHEMA_INFORMATION_SCHEMA_OID, SCHEMA_MZ_CATALOG_OID, SCHEMA_MZ_CATALOG_UNSTABLE_OID,
Expand All @@ -45,10 +46,11 @@ use crate::durable::upgrade::CATALOG_VERSION;
use crate::durable::{
BootstrapArgs, CatalogError, ClusterConfig, ClusterVariant, ClusterVariantManaged,
DefaultPrivilege, ReplicaConfig, ReplicaLocation, Role, Schema, Transaction,
AUDIT_LOG_ID_ALLOC_KEY, CATALOG_CONTENT_VERSION_KEY, DATABASE_ID_ALLOC_KEY, OID_ALLOC_KEY,
SCHEMA_ID_ALLOC_KEY, STORAGE_USAGE_ID_ALLOC_KEY, SYSTEM_CLUSTER_ID_ALLOC_KEY,
SYSTEM_REPLICA_ID_ALLOC_KEY, USER_CLUSTER_ID_ALLOC_KEY, USER_NETWORK_POLICY_ID_ALLOC_KEY,
USER_REPLICA_ID_ALLOC_KEY, USER_ROLE_ID_ALLOC_KEY,
AUDIT_LOG_ID_ALLOC_KEY, BUILTIN_MIGRATION_SHARD_KEY, CATALOG_CONTENT_VERSION_KEY,
DATABASE_ID_ALLOC_KEY, EXPRESSION_CACHE_SHARD_KEY, OID_ALLOC_KEY, SCHEMA_ID_ALLOC_KEY,
STORAGE_USAGE_ID_ALLOC_KEY, SYSTEM_CLUSTER_ID_ALLOC_KEY, SYSTEM_REPLICA_ID_ALLOC_KEY,
USER_CLUSTER_ID_ALLOC_KEY, USER_NETWORK_POLICY_ID_ALLOC_KEY, USER_REPLICA_ID_ALLOC_KEY,
USER_ROLE_ID_ALLOC_KEY,
};

/// The key within the "config" Collection that stores the version of the catalog.
Expand Down Expand Up @@ -742,10 +744,20 @@ pub(crate) async fn initialize(
tx.insert_config(key, value)?;
}

for (name, value) in [(
CATALOG_CONTENT_VERSION_KEY.to_string(),
catalog_content_version,
)] {
for (name, value) in [
(
CATALOG_CONTENT_VERSION_KEY.to_string(),
catalog_content_version,
),
(
BUILTIN_MIGRATION_SHARD_KEY.to_string(),
ShardId::new().to_string(),
),
(
EXPRESSION_CACHE_SHARD_KEY.to_string(),
ShardId::new().to_string(),
),
] {
tx.set_setting(name, Some(value))?;
}

Expand Down
18 changes: 3 additions & 15 deletions src/catalog/src/durable/persist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,9 @@ const CATALOG_SEED: usize = 1;
/// race conditions where the shard version decreases after reading it.
const UPGRADE_SEED: usize = 2;
/// Seed used to generate the persist shard ID for builtin table migrations.
const BUILTIN_MIGRATION_SEED: usize = 3;
pub const BUILTIN_MIGRATION_SEED: usize = 3;
/// Seed used to generate the persist shard ID for the expression cache.
const EXPRESSION_CACHE_SEED: usize = 4;
pub const EXPRESSION_CACHE_SEED: usize = 4;

/// Durable catalog mode that dictates the effect of mutable operations.
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
Expand Down Expand Up @@ -1739,20 +1739,8 @@ impl DurableCatalogState for PersistCatalogState {
}
}

/// Deterministically generate a builtin table migration shard ID for the given
/// `organization_id`.
pub fn builtin_migration_shard_id(organization_id: Uuid) -> ShardId {
shard_id(organization_id, BUILTIN_MIGRATION_SEED)
}

/// Deterministically generate an expression cache shard ID for the given
/// `organization_id`.
pub fn expression_cache_shard_id(organization_id: Uuid) -> ShardId {
shard_id(organization_id, EXPRESSION_CACHE_SEED)
}

/// Deterministically generate a shard ID for the given `organization_id` and `seed`.
fn shard_id(organization_id: Uuid, seed: usize) -> ShardId {
pub fn shard_id(organization_id: Uuid, seed: usize) -> ShardId {
let hash = sha2::Sha256::digest(format!("{organization_id}{seed}")).to_vec();
soft_assert_eq_or_log!(hash.len(), 32, "SHA256 returns 32 bytes (256 bits)");
let uuid = Uuid::from_slice(&hash[0..16]).expect("from_slice accepts exactly 16 bytes");
Expand Down
38 changes: 25 additions & 13 deletions src/catalog/src/durable/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,11 @@ use crate::durable::objects::{
};
use crate::durable::{
CatalogError, DefaultPrivilege, DurableCatalogError, DurableCatalogState, NetworkPolicy,
Snapshot, SystemConfiguration, AUDIT_LOG_ID_ALLOC_KEY, CATALOG_CONTENT_VERSION_KEY,
DATABASE_ID_ALLOC_KEY, OID_ALLOC_KEY, SCHEMA_ID_ALLOC_KEY, STORAGE_USAGE_ID_ALLOC_KEY,
SYSTEM_ITEM_ALLOC_KEY, SYSTEM_REPLICA_ID_ALLOC_KEY, USER_ITEM_ALLOC_KEY,
USER_NETWORK_POLICY_ID_ALLOC_KEY, USER_REPLICA_ID_ALLOC_KEY, USER_ROLE_ID_ALLOC_KEY,
Snapshot, SystemConfiguration, AUDIT_LOG_ID_ALLOC_KEY, BUILTIN_MIGRATION_SHARD_KEY,
CATALOG_CONTENT_VERSION_KEY, DATABASE_ID_ALLOC_KEY, EXPRESSION_CACHE_SHARD_KEY, OID_ALLOC_KEY,
SCHEMA_ID_ALLOC_KEY, STORAGE_USAGE_ID_ALLOC_KEY, SYSTEM_ITEM_ALLOC_KEY,
SYSTEM_REPLICA_ID_ALLOC_KEY, USER_ITEM_ALLOC_KEY, USER_NETWORK_POLICY_ID_ALLOC_KEY,
USER_REPLICA_ID_ALLOC_KEY, USER_ROLE_ID_ALLOC_KEY,
};
use crate::memory::objects::{StateDiff, StateUpdate, StateUpdateKind};

Expand Down Expand Up @@ -1629,11 +1630,7 @@ impl<'a> Transaction<'a> {
}

/// Set persisted setting.
pub(crate) fn set_setting(
&mut self,
name: String,
value: Option<String>,
) -> Result<(), CatalogError> {
pub fn set_setting(&mut self, name: String, value: Option<String>) -> Result<(), CatalogError> {
self.settings.set(
SettingKey { name },
value.map(|value| SettingValue { value }),
Expand Down Expand Up @@ -1730,11 +1727,26 @@ impl<'a> Transaction<'a> {

/// Get the value of a persisted config.
pub fn get_config(&self, key: String) -> Option<u64> {
let val = self
.configs
self.configs
.get(&ConfigKey { key })
.map(|entry| entry.value);
val
.map(|entry| entry.value)
}

/// Get the value of a persisted setting.
fn get_setting(&self, name: String) -> Option<String> {
self.settings
.get(&SettingKey { name })
.map(|entry| entry.value)
}

pub fn get_builtin_migration_shard(&self) -> Option<ShardId> {
self.get_setting(BUILTIN_MIGRATION_SHARD_KEY.to_string())
.map(|shard_id| shard_id.parse().expect("valid ShardId"))
}

pub fn get_expression_cache_shard(&self) -> Option<ShardId> {
self.get_setting(EXPRESSION_CACHE_SHARD_KEY.to_string())
.map(|shard_id| shard_id.parse().expect("valid ShardId"))
}

/// Updates the catalog `enable_0dt_deployment` "config" value to
Expand Down
Loading
Loading