Skip to content

Commit 8627e09

Browse files
committed
WIP
1 parent 0633f88 commit 8627e09

23 files changed

+1610
-65
lines changed

src/adapter/src/catalog.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -718,13 +718,14 @@ impl Catalog {
718718
#[cfg(test)]
719719
pub async fn allocate_system_id(&self) -> Result<(CatalogItemId, GlobalId), Error> {
720720
use mz_ore::collections::CollectionExt;
721-
self.storage()
722-
.await
723-
.allocate_system_ids(1)
724-
.await
725-
.maybe_terminate("allocating system ids")
726-
.map(|ids| ids.into_element())
727-
.err_into()
721+
722+
let mut txn = self.storage().await.transaction().await?;
723+
let id = txn
724+
.allocate_system_item_ids(1)
725+
.maybe_terminate("allocating system ids")?
726+
.into_element();
727+
txn.commit().await?;
728+
Ok(id)
728729
}
729730

730731
/// Get the next system item ID without allocating it.

src/adapter/src/catalog/open.rs

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,7 @@ use mz_catalog::config::{ClusterReplicaSizeMap, StateConfig};
2828
use mz_catalog::durable::objects::{
2929
SystemObjectDescription, SystemObjectMapping, SystemObjectUniqueIdentifier,
3030
};
31-
use mz_catalog::durable::{
32-
ClusterVariant, ClusterVariantManaged, Transaction, SYSTEM_CLUSTER_ID_ALLOC_KEY,
33-
};
31+
use mz_catalog::durable::{ClusterVariant, ClusterVariantManaged, Transaction};
3432
use mz_catalog::expr_cache::{
3533
ExpressionCacheConfig, ExpressionCacheHandle, GlobalExpressions, LocalExpressions,
3634
};
@@ -626,6 +624,8 @@ impl Catalog {
626624
);
627625
}
628626

627+
catalog.storage().await.bootstrap_complete();
628+
629629
Ok(OpenCatalogResult {
630630
catalog,
631631
storage_collections_to_drop,
@@ -766,6 +766,10 @@ impl Catalog {
766766

767767
let (new_item_id, new_global_id) = match id {
768768
CatalogItemId::System(_) => txn.allocate_system_item_ids(1)?.into_element(),
769+
CatalogItemId::IntrospectionSourceIndex(id) => (
770+
CatalogItemId::IntrospectionSourceIndex(id),
771+
GlobalId::IntrospectionSourceIndex(id),
772+
),
769773
CatalogItemId::User(_) => txn.allocate_user_item_ids(1)?.into_element(),
770774
_ => unreachable!("can't migrate id: {id}"),
771775
};
@@ -1216,10 +1220,7 @@ fn add_new_builtin_clusters_migration(
12161220
if !cluster_names.contains(builtin_cluster.name) {
12171221
let cluster_size = builtin_cluster_sizes.get_size(builtin_cluster.name)?;
12181222
let cluster_allocation = cluster_sizes.get_allocation_by_name(&cluster_size)?;
1219-
let id = txn.get_and_increment_id(SYSTEM_CLUSTER_ID_ALLOC_KEY.to_string())?;
1220-
let id = ClusterId::System(id);
12211223
txn.insert_system_cluster(
1222-
id,
12231224
builtin_cluster.name,
12241225
vec![],
12251226
builtin_cluster.privileges.to_vec(),
@@ -1259,13 +1260,9 @@ fn add_new_remove_old_builtin_introspection_source_migration(
12591260
}
12601261
}
12611262

1262-
let new_ids = txn.allocate_system_item_ids(usize_to_u64(new_logs.len()))?;
1263-
let new_entries = new_logs
1264-
.into_iter()
1265-
.zip_eq(new_ids)
1266-
.map(|(log, (item_id, gid))| (log, item_id, gid));
1267-
1268-
for (log, item_id, gid) in new_entries {
1263+
for log in new_logs {
1264+
let (item_id, gid) =
1265+
Transaction::allocate_introspection_source_index_id(&cluster.id, log.variant);
12691266
new_indexes.push((cluster.id, log.name.to_string(), item_id, gid));
12701267
}
12711268

src/adapter/src/catalog/transact.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ use mz_catalog::memory::objects::{
3232
use mz_catalog::SYSTEM_CONN_ID;
3333
use mz_controller::clusters::{ManagedReplicaLocation, ReplicaConfig, ReplicaLocation};
3434
use mz_controller_types::{ClusterId, ReplicaId};
35-
use mz_ore::cast::usize_to_u64;
3635
use mz_ore::collections::HashSet;
3736
use mz_ore::instrument;
3837
use mz_ore::now::EpochMillis;
@@ -56,6 +55,7 @@ use mz_sql::session::vars::{Value as VarValue, VarInput};
5655
use mz_sql::{rbac, DEFAULT_SCHEMA};
5756
use mz_sql_parser::ast::{QualifiedReplica, Value};
5857
use mz_storage_client::controller::StorageController;
58+
use timely::Container;
5959
use tracing::{info, trace};
6060

6161
use crate::catalog::{
@@ -903,12 +903,19 @@ impl Catalog {
903903
let privileges: Vec<_> =
904904
merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
905905
.collect();
906-
let introspection_source_ids =
907-
tx.allocate_system_item_ids(usize_to_u64(introspection_sources.len()))?;
906+
let introspection_source_ids: Vec<_> = introspection_sources
907+
.iter()
908+
.map(|introspection_source| {
909+
Transaction::allocate_introspection_source_index_id(
910+
&id,
911+
introspection_source.variant,
912+
)
913+
})
914+
.collect();
908915

909916
let introspection_sources = introspection_sources
910917
.into_iter()
911-
.zip_eq(introspection_source_ids.into_iter())
918+
.zip_eq(introspection_source_ids)
912919
.map(|(log, (item_id, gid))| (log, item_id, gid))
913920
.collect();
914921

src/adapter/src/coord.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2301,7 +2301,8 @@ impl Coordinator {
23012301
let id_too_large = match id {
23022302
CatalogItemId::System(id) => *id >= next_system_item_id,
23032303
CatalogItemId::User(id) => *id >= next_user_item_id,
2304-
CatalogItemId::Transient(_) => false,
2304+
CatalogItemId::IntrospectionSourceIndex(_)
2305+
| CatalogItemId::Transient(_) => false,
23052306
};
23062307
if id_too_large {
23072308
info!(

src/catalog/protos/hashes.json

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
[
22
{
33
"name": "objects.proto",
4-
"md5": "2d781c72c4a56b13dfb1b4215f3614f0"
4+
"md5": "4db86834a7af4b5a310e4a8a1ba92e0e"
55
},
66
{
77
"name": "objects_v67.proto",
@@ -26,5 +26,9 @@
2626
{
2727
"name": "objects_v72.proto",
2828
"md5": "b21cb2b1b41649c78405731e53560d59"
29+
},
30+
{
31+
"name": "objects_v73.proto",
32+
"md5": "65c222651e901fac2391423a4f034fcc"
2933
}
3034
]

src/catalog/protos/objects.proto

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ message ClusterIntrospectionSourceIndexValue {
8383
// doesn't know how to read the new SystemCatalogItemId type.
8484
uint64 index_id = 1;
8585
uint32 oid = 2;
86-
SystemGlobalId global_id = 3;
86+
IntrospectionSourceIndexGlobalId global_id = 3;
8787
}
8888

8989
message ClusterReplicaKey {
@@ -307,6 +307,7 @@ message CatalogItemId {
307307
uint64 system = 1;
308308
uint64 user = 2;
309309
uint64 transient = 3;
310+
uint64 introspection_source_index = 4;
310311
}
311312
}
312313

@@ -321,6 +322,7 @@ message GlobalId {
321322
uint64 user = 2;
322323
uint64 transient = 3;
323324
Empty explain = 4;
325+
uint64 introspection_source_index = 5;
324326
}
325327
}
326328

@@ -329,10 +331,15 @@ message SystemGlobalId {
329331
uint64 value = 1;
330332
}
331333

334+
/// A newtype wrapper for a `GlobalId` that is always in the "introspection source index" namespace.
335+
message IntrospectionSourceIndexGlobalId {
336+
uint64 value = 1;
337+
}
338+
332339
message ClusterId {
333340
oneof value {
334-
uint64 system = 1;
335-
uint64 user = 2;
341+
uint32 system = 1;
342+
uint32 user = 2;
336343
}
337344
}
338345

0 commit comments

Comments
 (0)