Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adapter: Create deterministic log index IDs #30603

Merged
merged 12 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 52 additions & 11 deletions src/adapter/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,14 +518,51 @@ impl Catalog {
Fut: Future<Output = T>,
{
let persist_client = PersistClient::new_for_tests().await;
let environmentd_id = Uuid::new_v4();
let organization_id = Uuid::new_v4();
let bootstrap_args = test_bootstrap_args();
let catalog = Self::open_debug_catalog(persist_client, environmentd_id, &bootstrap_args)
let catalog = Self::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
.await
.expect("can open debug catalog");
f(catalog).await
}

/// Like [`Catalog::with_debug`], but the catalog created believes that bootstrap is still
/// in progress.
pub async fn with_debug_in_bootstrap<F, Fut, T>(f: F) -> T
where
F: FnOnce(Catalog) -> Fut,
Fut: Future<Output = T>,
{
let persist_client = PersistClient::new_for_tests().await;
let organization_id = Uuid::new_v4();
let bootstrap_args = test_bootstrap_args();
let mut catalog =
Self::open_debug_catalog(persist_client.clone(), organization_id, &bootstrap_args)
.await
.expect("can open debug catalog");

// Replace `storage` in `catalog` with one that doesn't think bootstrap is over.
let now = SYSTEM_TIME.clone();
let openable_storage = TestCatalogStateBuilder::new(persist_client)
.with_organization_id(organization_id)
.with_default_deploy_generation()
.build()
.await
.expect("can create durable catalog");
let mut storage = openable_storage
.open(now().into(), &bootstrap_args)
.await
.expect("can open durable catalog");
// Drain updates.
let _ = storage
.sync_to_current_updates()
.await
.expect("can sync to current updates");
catalog.storage = Arc::new(tokio::sync::Mutex::new(storage));

f(catalog).await
}

/// Opens a debug catalog.
///
/// See [`Catalog::with_debug`].
Expand Down Expand Up @@ -733,13 +770,17 @@ impl Catalog {
commit_ts: mz_repr::Timestamp,
) -> Result<(CatalogItemId, GlobalId), Error> {
use mz_ore::collections::CollectionExt;
self.storage()
.await
.allocate_system_ids(1, commit_ts)
.await
.maybe_terminate("allocating system ids")
.map(|ids| ids.into_element())
.err_into()

let mut storage = self.storage().await;
let mut txn = storage.transaction().await?;
let id = txn
.allocate_system_item_ids(1)
.maybe_terminate("allocating system ids")?
.into_element();
// Drain transaction.
let _ = txn.get_and_commit_op_updates();
txn.commit(commit_ts).await?;
Ok(id)
}

/// Get the next system item ID without allocating it.
Expand Down Expand Up @@ -2647,7 +2688,7 @@ mod tests {
assert_eq!(
mz_sql::catalog::ObjectType::ClusterReplica,
conn_catalog.get_object_type(&ObjectId::ClusterReplica((
ClusterId::User(1),
ClusterId::user(1).expect("1 is a valid ID"),
ReplicaId::User(1)
)))
);
Expand All @@ -2669,7 +2710,7 @@ mod tests {
assert_eq!(
None,
conn_catalog.get_privileges(&SystemObjectId::Object(ObjectId::ClusterReplica((
ClusterId::User(1),
ClusterId::user(1).expect("1 is a valid ID"),
ReplicaId::User(1),
))))
);
Expand Down
29 changes: 13 additions & 16 deletions src/adapter/src/catalog/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@ use mz_catalog::config::{ClusterReplicaSizeMap, StateConfig};
use mz_catalog::durable::objects::{
SystemObjectDescription, SystemObjectMapping, SystemObjectUniqueIdentifier,
};
use mz_catalog::durable::{
ClusterVariant, ClusterVariantManaged, Transaction, SYSTEM_CLUSTER_ID_ALLOC_KEY,
};
use mz_catalog::durable::{ClusterVariant, ClusterVariantManaged, Transaction};
use mz_catalog::expr_cache::{
ExpressionCacheConfig, ExpressionCacheHandle, GlobalExpressions, LocalExpressions,
};
Expand Down Expand Up @@ -631,6 +629,8 @@ impl Catalog {
);
}

catalog.storage().await.mark_bootstrap_complete();

Ok(OpenCatalogResult {
catalog,
storage_collections_to_drop,
Expand Down Expand Up @@ -773,6 +773,10 @@ impl Catalog {

let (new_item_id, new_global_id) = match id {
CatalogItemId::System(_) => txn.allocate_system_item_ids(1)?.into_element(),
CatalogItemId::IntrospectionSourceIndex(id) => (
CatalogItemId::IntrospectionSourceIndex(id),
GlobalId::IntrospectionSourceIndex(id),
),
CatalogItemId::User(_) => txn.allocate_user_item_ids(1)?.into_element(),
_ => unreachable!("can't migrate id: {id}"),
};
Expand Down Expand Up @@ -1223,10 +1227,7 @@ fn add_new_builtin_clusters_migration(
if !cluster_names.contains(builtin_cluster.name) {
let cluster_size = builtin_cluster_sizes.get_size(builtin_cluster.name)?;
let cluster_allocation = cluster_sizes.get_allocation_by_name(&cluster_size)?;
let id = txn.get_and_increment_id(SYSTEM_CLUSTER_ID_ALLOC_KEY.to_string())?;
let id = ClusterId::System(id);
txn.insert_system_cluster(
id,
builtin_cluster.name,
vec![],
builtin_cluster.privileges.to_vec(),
Expand Down Expand Up @@ -1266,13 +1267,9 @@ fn add_new_remove_old_builtin_introspection_source_migration(
}
}

let new_ids = txn.allocate_system_item_ids(usize_to_u64(new_logs.len()))?;
let new_entries = new_logs
.into_iter()
.zip_eq(new_ids)
.map(|(log, (item_id, gid))| (log, item_id, gid));

for (log, item_id, gid) in new_entries {
for log in new_logs {
let (item_id, gid) =
Transaction::allocate_introspection_source_index_id(&cluster.id, log.variant);
new_indexes.push((cluster.id, log.name.to_string(), item_id, gid));
}

Expand Down Expand Up @@ -1647,7 +1644,7 @@ mod builtin_migration_tests {
.with_key(vec![0])
.finish(),
resolved_ids: resolved_ids.into_iter().collect(),
cluster_id: ClusterId::User(1),
cluster_id: ClusterId::user(1).expect("1 is a valid ID"),
non_null_assertions: vec![],
custom_logical_compaction_window: None,
refresh_schedule: None,
Expand All @@ -1664,7 +1661,7 @@ mod builtin_migration_tests {
keys: Default::default(),
conn_id: None,
resolved_ids: [(on_item_id, on_gid)].into_iter().collect(),
cluster_id: ClusterId::User(1),
cluster_id: ClusterId::user(1).expect("1 is a valid ID"),
custom_logical_compaction_window: None,
is_retained_metrics_object: false,
})
Expand Down Expand Up @@ -1760,7 +1757,7 @@ mod builtin_migration_tests {
}

async fn run_test_case(test_case: BuiltinMigrationTestCase) {
Catalog::with_debug(|mut catalog| async move {
Catalog::with_debug_in_bootstrap(|mut catalog| async move {
let mut item_id_mapping = BTreeMap::new();
let mut name_mapping = BTreeMap::new();

Expand Down
14 changes: 10 additions & 4 deletions src/adapter/src/catalog/transact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ use mz_catalog::memory::objects::{
use mz_catalog::SYSTEM_CONN_ID;
use mz_controller::clusters::{ManagedReplicaLocation, ReplicaConfig, ReplicaLocation};
use mz_controller_types::{ClusterId, ReplicaId};
use mz_ore::cast::usize_to_u64;
use mz_ore::collections::HashSet;
use mz_ore::instrument;
use mz_ore::now::EpochMillis;
Expand Down Expand Up @@ -903,12 +902,19 @@ impl Catalog {
let privileges: Vec<_> =
merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
.collect();
let introspection_source_ids =
tx.allocate_system_item_ids(usize_to_u64(introspection_sources.len()))?;
let introspection_source_ids: Vec<_> = introspection_sources
.iter()
.map(|introspection_source| {
Transaction::allocate_introspection_source_index_id(
&id,
introspection_source.variant,
)
})
.collect();

let introspection_sources = introspection_sources
.into_iter()
.zip_eq(introspection_source_ids.into_iter())
.zip_eq(introspection_source_ids)
.map(|(log, (item_id, gid))| (log, item_id, gid))
.collect();

Expand Down
5 changes: 3 additions & 2 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2301,7 +2301,8 @@ impl Coordinator {
let id_too_large = match id {
CatalogItemId::System(id) => *id >= next_system_item_id,
CatalogItemId::User(id) => *id >= next_user_item_id,
CatalogItemId::Transient(_) => false,
CatalogItemId::IntrospectionSourceIndex(_)
| CatalogItemId::Transient(_) => false,
};
if id_too_large {
info!(
Expand Down Expand Up @@ -3624,7 +3625,7 @@ impl Coordinator {

// An arbitrary compute instance ID to satisfy the function calls below. Note that
// this only works because this function will never run.
let compute_instance = ComputeInstanceId::User(1);
let compute_instance = ComputeInstanceId::user(1).expect("1 is a valid ID");

let _: () = self.ship_dataflow(dataflow, compute_instance, None).await;
}
Expand Down
2 changes: 1 addition & 1 deletion src/adapter/src/coord/validity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ mod tests {
let PlanValidity::Checks { cluster_id, .. } = validity else {
panic!();
};
*cluster_id = Some(ClusterId::User(3));
*cluster_id = Some(ClusterId::user(3).expect("3 is a valid ID"));
}),
Box::new(|res| {
assert_contains!(
Expand Down
2 changes: 2 additions & 0 deletions src/buf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ breaking:
# reason: does currently not require backward-compatibility
- catalog/protos/objects_v72.proto
# reason: does currently not require backward-compatibility
- catalog/protos/objects_v73.proto
# reason: does currently not require backward-compatibility
- cluster-client/src/client.proto
# reason: does currently not require backward-compatibility
- compute-client/src/logging.proto
Expand Down
6 changes: 5 additions & 1 deletion src/catalog/protos/hashes.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[
{
"name": "objects.proto",
"md5": "2d781c72c4a56b13dfb1b4215f3614f0"
"md5": "65c8ec9661c8a207bc9eb5af098fa98f"
},
{
"name": "objects_v67.proto",
Expand All @@ -26,5 +26,9 @@
{
"name": "objects_v72.proto",
"md5": "b21cb2b1b41649c78405731e53560d59"
},
{
"name": "objects_v73.proto",
"md5": "d5d1a8c6b1aa8212245cfd343a3b8417"
}
]
18 changes: 15 additions & 3 deletions src/catalog/protos/objects.proto
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,12 @@ message ClusterIntrospectionSourceIndexKey {
}

message ClusterIntrospectionSourceIndexValue {
// TODO(parkmycar): Ideally this is a SystemCatalogItemId but making this change panics 0dt
// TODO(parkmycar): Ideally this is a IntrospectionSourceCatalogItemId but making this change panics 0dt
// upgrades if there were new builtin objects added since the older version of Materialize
// doesn't know how to read the new SystemCatalogItemId type.
// doesn't know how to read the new IntrospectionSourceCatalogItemId type.
uint64 index_id = 1;
uint32 oid = 2;
SystemGlobalId global_id = 3;
IntrospectionSourceIndexGlobalId global_id = 3;
}

message ClusterReplicaKey {
Expand Down Expand Up @@ -307,6 +307,7 @@ message CatalogItemId {
uint64 system = 1;
uint64 user = 2;
uint64 transient = 3;
uint64 introspection_source_index = 4;
}
}

Expand All @@ -315,12 +316,18 @@ message SystemCatalogItemId {
uint64 value = 1;
}

/// A newtype wrapper for a `CatalogItemId` that is always in the "introspection source index" namespace.
message IntrospectionSourceIndexCatalogItemId {
uint64 value = 1;
}

message GlobalId {
oneof value {
uint64 system = 1;
uint64 user = 2;
uint64 transient = 3;
Empty explain = 4;
uint64 introspection_source_index = 5;
}
}

Expand All @@ -329,6 +336,11 @@ message SystemGlobalId {
uint64 value = 1;
}

/// A newtype wrapper for a `GlobalId` that is always in the "introspection source index" namespace.
message IntrospectionSourceIndexGlobalId {
uint64 value = 1;
}

message ClusterId {
oneof value {
uint64 system = 1;
Expand Down
Loading