Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
# Scoped feature flags — durable persistence (step 2) implementation notes

These are implementation notes for **step 2** of the scoped feature flags design
(`20260609_scoped_feature_flags.md`, section *"Storage: a durable cache, written
solely by the sync loop"*). Steps 0, 1, and the persistence-ready
re-architecture (working copy in the coordinator) are already implemented on
this branch.

> Status: **not yet implemented.** This branch adds only the in-memory working
> copy (`ScopedParameters` in the coordinator) plus *inert stubs* for the durable
> `Transaction` accessors (`get/upsert/remove_replica_system_config` in
> `src/catalog/src/durable/transaction.rs`, and the `ReplicaSystemConfiguration`
> object type in `src/catalog/src/durable/objects.rs`). The durable collection
> itself is intentionally deferred, because completing it requires running the
> catalog migration tooling and test suite (golden encoding files) — see
> *"Why this isn't on the branch yet"*.
## Goal

Persist scoped overrides so they survive an `environmentd` restart and an LD
outage; fall back to the environment-wide value only on a *cold cache* (an
object never yet evaluated against LD). The sync loop is the **sole writer**.

## Schema decision

The design writes the durable key as `(Cluster(ClusterId) | Replica(ReplicaId),
parameter_name) -> value`. We implement this as **one durable collection per
scope** rather than a single collection with a sum-typed key, because the durable
proto model is flat (no oneof ergonomics) and each collection then mirrors the
existing `system_configurations` collection almost verbatim:

- `replica_system_configurations`, keyed by `(ReplicaId, name) -> String`
added in step 2 (this note).
- `cluster_system_configurations`, keyed by `(ClusterId, name) -> String`
added in step 3 (cluster-coherent), identical recipe with `ClusterId`.

Both are the per-object analog of `system_configurations` (the `ALTER SYSTEM`
durable collection). Object ids are never reused
(`USER_CLUSTER_ID_ALLOC_KEY`), so entries for a dropped object are inert and GC
is hygiene-only (lazy prune of ids absent from the catalog).

## Version-bump recipe (catalog snapshot + migration)

Adding a durable collection changes `src/catalog-protos/src/objects.rs`, which
requires a catalog version bump. The migration is a **no-op** because the change
only *adds* a `StateUpdateKind` variant and message types (JSON-compatible — the
same situation as `v84_to_v85`). Steps (mirroring the header doc in
`src/catalog/src/durable/upgrade.rs`):

1. `objects_v85.rs` already exists and equals the current `objects.rs`.
2. Bump `CATALOG_VERSION` 85 → 86 in `src/catalog-protos/src/lib.rs`; add
`pub mod objects_v86;`.
3. Make the proto changes in `src/catalog-protos/src/objects.rs` (below).
4. Copy the new `objects.rs` to `src/catalog-protos/src/objects_v86.rs`
verbatim (the snapshots are standalone module copies; v85 and the current
`objects.rs` are byte-identical today).
5. Update `src/catalog-protos/objects_hashes.json`: set `objects.rs` to its new
md5 (`md5sum src/catalog-protos/src/objects.rs`). `build.rs` auto-persists the
hash for the new `objects_v86.rs`, but `objects.rs`'s changed hash makes the
build hard-fail until updated by hand.
6. Add `v86` to the second group of the `objects!` macro in
`src/catalog/src/durable/upgrade.rs`.
7. Create `src/catalog/src/durable/upgrade/v85_to_v86.rs` — a no-op migration
(`Vec::new()`), like `v84_to_v85.rs`. Add `mod v85_to_v86;` and a `85 => {
run_versioned_upgrade(..., v85_to_v86::upgrade).await }` arm to `run_upgrade`.
8. Generate the encoding golden files:
`cargo test --package mz-catalog --lib durable::upgrade::tests::generate_missing_encodings -- --ignored`
then run the catalog test suite to validate round-trips.

## The ~27 collection sites (mirror `system_configurations` / `ServerConfiguration`)

For `replica_system_configurations`, keyed by `(ReplicaId, name) -> value`. The
proto `ReplicaId` enum (`System(u64) | User(u64)`) already exists and has a
`RustType` impl (used by `ClusterReplicaKey`).

**`src/catalog-protos/src/objects.rs`** (proto types):
- `ReplicaSystemConfigurationKey { replica_id: ReplicaId, name: String }`
- `ReplicaSystemConfigurationValue { value: String }`
- `StateUpdateKind::ReplicaSystemConfiguration(ReplicaSystemConfiguration)`
- wrapper `ReplicaSystemConfiguration { key, value }`
(then mirror all four into `objects_v86.rs`).

**`src/catalog/src/durable/objects.rs`**:
- `ReplicaSystemConfiguration { replica_id, name, value }` (added on this branch)
+ a `DurableType` impl (`Key = ReplicaSystemConfigurationKey`, `Value =
ReplicaSystemConfigurationValue`).
- add the `Snapshot.replica_system_configurations` field.

**`src/catalog/src/durable/objects/serialization.rs`**:
- `RustType<proto::ReplicaSystemConfigurationKey>` and `…Value` impls (the key
reuses the existing `ReplicaId <-> proto::ReplicaId` `RustType`).

**`src/catalog/src/durable/objects/state_update.rs`**:
- `StateUpdateKind::ReplicaSystemConfiguration(key, value)` variant.
- arms in `collection_type()`, `into_proto_owned()`, `from_proto()`.
- `TransactionBatch` destructure + `from_batch` + `.chain(...)`.

**`src/catalog/src/durable/transaction.rs`**:
- `Transaction.replica_system_configurations:
TableTransaction<ReplicaSystemConfigurationKey, ReplicaSystemConfigurationValue>`.
- `Transaction::new` Snapshot destructure + `TableTransaction::new(...)`.
- replace the **stub** accessors (added on this branch) with real
`set(...)`/`delete(...)`/`items()` bodies.
- `TransactionBatch.replica_system_configurations` field + `is_empty()` +
`pending()` + `current_items_proto()`.

**`src/catalog/src/durable/persist.rs`**: snapshot-apply arm + trace-building arm.

**`src/catalog/src/durable/debug.rs`**: `CollectionType` variant,
`collection_impl!` invocation, `Trace` field, `Trace::new()` init.

**`src/catalog/src/memory/objects.rs`**: `StateUpdateKind` and
`BootstrapStateUpdateKind` variants + the two conversion arms.

## Wiring (step 2b, against the real accessors)

Once the collection exists, replace the stub bodies and wire the coordinator:

- **Startup (bootstrap):** read `get_replica_system_configurations()` into
`Coordinator::scoped_system_parameters.replica`, then call
`reconcile_scoped_system_parameters` so values are in effect *before* the
first LD sync. (Today the working copy starts empty.)
- **Reconcile (sole writer):** in `reconcile_scoped_system_parameters`, after
replacing the working copy, persist the diff against the durable collection
via a catalog transaction — `upsert` changed entries, `remove` ones LD no
longer serves (the sync loop already sends the *complete* desired state, so
the coordinator diffs old-vs-new working copy). Prefer a dedicated
`catalog::Op` (mirroring `Op::UpdateSystemConfiguration`) so it flows through
`catalog_transact`.
- **LD slow/unavailable:** keep serving the loaded values (no revert).
- **Cold cache:** empty working copy ⇒ env-wide fallback (already the behavior).
- **Lazy GC:** on startup and each reconcile, drop persisted entries whose object
id is absent from the catalog.

## Why this isn't on the branch yet

The version bump regenerates encoding **golden files** (step 8) and is validated
by the catalog migration test suite. That tooling can't run in the current
remote environment (the toolchain only builds with `--ignore-rust-version`, and
the catalog test binary exhausts the container disk). A durable catalog
migration that compiles but whose golden files/round-trip tests are unrun is a
production hazard, so the collection is deferred to a session with the proper
toolchain, where steps 1–8 above are mechanical and the migration is a no-op.
28 changes: 28 additions & 0 deletions doc/user/content/reference/system-catalog/mz_internal.md
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,34 @@ The `mz_sessions` table contains a row for each active session in the system.
| `connected_at` | [`timestamp with time zone`] | The time at which the session connected to the system. |


## `mz_cluster_system_parameters`

The `mz_cluster_system_parameters` table contains a row for each cluster-coherent
system parameter whose value, as resolved from LaunchDarkly, differs from the
environment-wide value for a given cluster.

<!-- RELATION_SPEC mz_internal.mz_cluster_system_parameters -->
| Field | Type | Meaning |
| ------------ | -------- | -------- |
| `cluster_id` | [`text`] | The ID of the cluster. Corresponds to [`mz_clusters.id`](../mz_catalog/#mz_clusters). |
| `name` | [`text`] | The name of the cluster-coherent system parameter. |
| `value` | [`text`] | The cluster-scoped value of the system parameter, as resolved from LaunchDarkly. |


## `mz_replica_system_parameters`

The `mz_replica_system_parameters` table contains a row for each replica-local
system parameter whose value, as resolved from LaunchDarkly, differs from the
environment-wide value for a given cluster replica.

<!-- RELATION_SPEC mz_internal.mz_replica_system_parameters -->
| Field | Type | Meaning |
| ------------ | -------- | -------- |
| `replica_id` | [`text`] | The ID of the cluster replica. Corresponds to [`mz_cluster_replicas.id`](../mz_catalog/#mz_cluster_replicas). |
| `name` | [`text`] | The name of the replica-local system parameter. |
| `value` | [`text`] | The replica-scoped value of the system parameter, as resolved from LaunchDarkly. |


## `mz_network_policies`

The `mz_network_policies` table contains a row for each network policy in the
Expand Down
85 changes: 84 additions & 1 deletion src/adapter/src/catalog/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,12 @@ impl CatalogState {
StateUpdateKind::SystemConfiguration(system_configuration) => {
self.apply_system_configuration_update(system_configuration, diff, retractions);
}
StateUpdateKind::ClusterSystemConfiguration(cluster_system_configuration) => {
self.apply_cluster_system_configuration_update(cluster_system_configuration, diff);
}
StateUpdateKind::ReplicaSystemConfiguration(replica_system_configuration) => {
self.apply_replica_system_configuration_update(replica_system_configuration, diff);
}
StateUpdateKind::Cluster(cluster) => {
self.apply_cluster_update(cluster, diff, retractions);
}
Expand Down Expand Up @@ -524,6 +530,63 @@ impl CatalogState {
}
}

#[instrument(level = "debug")]
fn apply_cluster_system_configuration_update(
&mut self,
cfg: mz_catalog::durable::ClusterSystemConfiguration,
diff: StateDiff,
) {
// Retraction is conditional on the value matching, so a value change
// (retraction of the old value + addition of the new) is correct
// regardless of the order the two updates are applied in.
let cluster = &mut self.scoped_system_parameters.cluster;
match diff {
StateDiff::Addition => {
cluster
.entry(cfg.cluster_id)
.or_default()
.insert(cfg.name, cfg.value);
}
StateDiff::Retraction => {
if let Some(values) = cluster.get_mut(&cfg.cluster_id) {
if values.get(&cfg.name) == Some(&cfg.value) {
values.remove(&cfg.name);
if values.is_empty() {
cluster.remove(&cfg.cluster_id);
}
}
}
}
}
}

#[instrument(level = "debug")]
fn apply_replica_system_configuration_update(
&mut self,
cfg: mz_catalog::durable::ReplicaSystemConfiguration,
diff: StateDiff,
) {
let replica = &mut self.scoped_system_parameters.replica;
match diff {
StateDiff::Addition => {
replica
.entry(cfg.replica_id)
.or_default()
.insert(cfg.name, cfg.value);
}
StateDiff::Retraction => {
if let Some(values) = replica.get_mut(&cfg.replica_id) {
if values.get(&cfg.name) == Some(&cfg.value) {
values.remove(&cfg.name);
if values.is_empty() {
replica.remove(&cfg.replica_id);
}
}
}
}
}
}

#[instrument(level = "debug")]
fn apply_cluster_update(
&mut self,
Expand Down Expand Up @@ -1401,6 +1464,22 @@ impl CatalogState {
vec![self.pack_system_privileges_update(system_privilege, diff)]
}
StateUpdateKind::SystemConfiguration(_) => Vec::new(),
StateUpdateKind::ClusterSystemConfiguration(cfg) => {
vec![self.pack_cluster_system_parameter_update(
&cfg.cluster_id.to_string(),
&cfg.name,
&cfg.value,
diff,
)]
}
StateUpdateKind::ReplicaSystemConfiguration(cfg) => {
vec![self.pack_replica_system_parameter_update(
&cfg.replica_id.to_string(),
&cfg.name,
&cfg.value,
diff,
)]
}
StateUpdateKind::Cluster(cluster) => self.pack_cluster_update(&cluster.name, diff),
StateUpdateKind::IntrospectionSourceIndex(introspection_source_index) => {
self.pack_item_update(introspection_source_index.item_id, diff)
Expand Down Expand Up @@ -2198,8 +2277,10 @@ fn sort_updates(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
&mut pre_cluster_additions,
),
StateUpdateKind::Cluster(_)
| StateUpdateKind::ClusterSystemConfiguration(_)
| StateUpdateKind::IntrospectionSourceIndex(_)
| StateUpdateKind::ClusterReplica(_) => push_update(
| StateUpdateKind::ClusterReplica(_)
| StateUpdateKind::ReplicaSystemConfiguration(_) => push_update(
update,
diff,
&mut cluster_retractions,
Expand Down Expand Up @@ -2545,6 +2626,8 @@ impl ApplyState {
| DefaultPrivilege(_)
| SystemPrivilege(_)
| SystemConfiguration(_)
| ClusterSystemConfiguration(_)
| ReplicaSystemConfiguration(_)
| Cluster(_)
| NetworkPolicy(_)
| ClusterReplica(_)
Expand Down
57 changes: 49 additions & 8 deletions src/adapter/src/catalog/builtin_table_updates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@ use mz_catalog::SYSTEM_CONN_ID;
use mz_catalog::builtin::{
BuiltinTable, MZ_AGGREGATES, MZ_ARRAY_TYPES, MZ_AUDIT_EVENTS, MZ_AWS_CONNECTIONS,
MZ_AWS_PRIVATELINK_CONNECTIONS, MZ_BASE_TYPES, MZ_CLUSTER_REPLICA_SIZES, MZ_CLUSTER_REPLICAS,
MZ_CLUSTER_SCHEDULES, MZ_CLUSTERS, MZ_COLUMNS, MZ_COMMENTS, MZ_DEFAULT_PRIVILEGES,
MZ_EGRESS_IPS, MZ_FUNCTIONS, MZ_HISTORY_RETENTION_STRATEGIES, MZ_ICEBERG_SINKS,
MZ_INDEX_COLUMNS, MZ_KAFKA_CONNECTIONS, MZ_KAFKA_SINKS, MZ_KAFKA_SOURCE_TABLES,
MZ_KAFKA_SOURCES, MZ_LICENSE_KEYS, MZ_LIST_TYPES, MZ_MAP_TYPES,
MZ_CLUSTER_SCHEDULES, MZ_CLUSTER_SYSTEM_PARAMETERS, MZ_CLUSTERS, MZ_COLUMNS, MZ_COMMENTS,
MZ_DEFAULT_PRIVILEGES, MZ_EGRESS_IPS, MZ_FUNCTIONS, MZ_HISTORY_RETENTION_STRATEGIES,
MZ_ICEBERG_SINKS, MZ_INDEX_COLUMNS, MZ_KAFKA_CONNECTIONS, MZ_KAFKA_SINKS,
MZ_KAFKA_SOURCE_TABLES, MZ_KAFKA_SOURCES, MZ_LICENSE_KEYS, MZ_LIST_TYPES, MZ_MAP_TYPES,
MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES, MZ_MYSQL_SOURCE_TABLES, MZ_OBJECT_DEPENDENCIES,
MZ_OBJECT_GLOBAL_IDS, MZ_OPERATORS, MZ_POSTGRES_SOURCE_TABLES, MZ_POSTGRES_SOURCES,
MZ_PSEUDO_TYPES, MZ_REPLACEMENTS, MZ_ROLE_AUTH, MZ_ROLE_PARAMETERS, MZ_ROLES, MZ_SESSIONS,
MZ_SINKS, MZ_SOURCE_REFERENCES, MZ_SQL_SERVER_SOURCE_TABLES, MZ_SSH_TUNNEL_CONNECTIONS,
MZ_STORAGE_USAGE_BY_SHARD, MZ_SUBSCRIPTIONS, MZ_SYSTEM_PRIVILEGES, MZ_TABLES,
MZ_TYPE_PG_METADATA, MZ_TYPES, MZ_VIEWS, MZ_WEBHOOKS_SOURCES,
MZ_PSEUDO_TYPES, MZ_REPLACEMENTS, MZ_REPLICA_SYSTEM_PARAMETERS, MZ_ROLE_AUTH,
MZ_ROLE_PARAMETERS, MZ_ROLES, MZ_SESSIONS, MZ_SINKS, MZ_SOURCE_REFERENCES,
MZ_SQL_SERVER_SOURCE_TABLES, MZ_SSH_TUNNEL_CONNECTIONS, MZ_STORAGE_USAGE_BY_SHARD,
MZ_SUBSCRIPTIONS, MZ_SYSTEM_PRIVILEGES, MZ_TABLES, MZ_TYPE_PG_METADATA, MZ_TYPES, MZ_VIEWS,
MZ_WEBHOOKS_SOURCES,
};
use mz_catalog::config::AwsPrincipalContext;
use mz_catalog::durable::SourceReferences;
Expand Down Expand Up @@ -1764,6 +1765,46 @@ impl CatalogState {
)
}

/// Packs a row for the `mz_cluster_system_parameters` introspection table:
/// the cluster-coherent scoped value of `name` on the cluster `cluster_id`.
pub fn pack_cluster_system_parameter_update(
&self,
cluster_id: &str,
name: &str,
value: &str,
diff: Diff,
) -> BuiltinTableUpdate<&'static BuiltinTable> {
BuiltinTableUpdate::row(
&*MZ_CLUSTER_SYSTEM_PARAMETERS,
Row::pack_slice(&[
Datum::String(cluster_id),
Datum::String(name),
Datum::String(value),
]),
diff,
)
}

/// Packs a row for the `mz_replica_system_parameters` introspection table:
/// the replica-local scoped value of `name` on the replica `replica_id`.
pub fn pack_replica_system_parameter_update(
&self,
replica_id: &str,
name: &str,
value: &str,
diff: Diff,
) -> BuiltinTableUpdate<&'static BuiltinTable> {
BuiltinTableUpdate::row(
&*MZ_REPLICA_SYSTEM_PARAMETERS,
Row::pack_slice(&[
Datum::String(replica_id),
Datum::String(name),
Datum::String(value),
]),
diff,
)
}

pub fn pack_default_privileges_update(
&self,
default_privilege_object: &DefaultPrivilegeObject,
Expand Down
3 changes: 3 additions & 0 deletions src/adapter/src/catalog/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ impl Catalog {
role_auth_by_id: imbl::OrdMap::new(),
network_policies_by_name: imbl::OrdMap::new(),
system_configuration: Arc::new(system_configuration),
scoped_system_parameters: Default::default(),
default_privileges: Arc::new(DefaultPrivileges::default()),
system_privileges: Arc::new(PrivilegeMap::default()),
comments: Arc::new(CommentsMap::default()),
Expand Down Expand Up @@ -266,6 +267,8 @@ impl Catalog {
| BootstrapStateUpdateKind::DefaultPrivilege(_)
| BootstrapStateUpdateKind::SystemPrivilege(_)
| BootstrapStateUpdateKind::SystemConfiguration(_)
| BootstrapStateUpdateKind::ClusterSystemConfiguration(_)
| BootstrapStateUpdateKind::ReplicaSystemConfiguration(_)
| BootstrapStateUpdateKind::Cluster(_)
| BootstrapStateUpdateKind::NetworkPolicy(_)
| BootstrapStateUpdateKind::ClusterReplica(_) => {
Expand Down
Loading
Loading