Skip to content

Commit 247655b

Browse files
Add system table for tracking view subscribers (#3482)
# Description of Changes <!-- Please describe your change, mention any related tickets, and so on here. --> Adds two new system tables for views. One tracks view arguments, the other tracks view subscribers. `st_view_arg` generates a unique id for each unique argument instantiation. This id will act as a foreign key that both read sets and backing tables can reference. `st_view_client` is needed so that we can drop views when clients unsubscribe or disconnect. Note this disconnect logic is not implemented in this patch. Eventually both of these tables should be ephemeral. There's no reason they need to write to the commitlog. Note also that the schema of a view's backing table has been updated. It no longer stores a view's argument values but rather the `id` from `st_view_arg`. # API and ABI breaking changes <!-- If this is an API or ABI breaking change, please apply the corresponding GitHub label. --> None # Expected complexity level and risk <!-- How complicated do you think these changes are? Grade on a scale from 1 to 5, where 1 is a trivial change, and 5 is a deep-reaching and complex change. This complexity rating applies not only to the complexity apparent in the diff, but also to its interactions with existing and future code. If you answered more than a 2, explain what is complex about the PR, and what other components it interacts with in potentially concerning ways. --> 1 # Testing <!-- Describe any testing you've done, and any testing you'd like your reviewers to do, so that you're confident that all the changes work as expected! --> Tests will be added in the patch that computes read sets --------- Signed-off-by: joshua-spacetime <[email protected]> Co-authored-by: Shubham Mishra <[email protected]>
1 parent 3f1de9e commit 247655b

File tree

4 files changed

+146
-60
lines changed

4 files changed

+146
-60
lines changed

crates/datastore/src/locking_tx_datastore/committed_state.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,15 @@ use crate::{
1717
ST_CONSTRAINT_ID, ST_CONSTRAINT_IDX, ST_CONSTRAINT_NAME, ST_INDEX_ID, ST_INDEX_IDX, ST_INDEX_NAME,
1818
ST_MODULE_ID, ST_MODULE_IDX, ST_ROW_LEVEL_SECURITY_ID, ST_ROW_LEVEL_SECURITY_IDX, ST_SCHEDULED_ID,
1919
ST_SCHEDULED_IDX, ST_SEQUENCE_ID, ST_SEQUENCE_IDX, ST_SEQUENCE_NAME, ST_TABLE_ID, ST_TABLE_IDX, ST_VAR_ID,
20-
ST_VAR_IDX,
20+
ST_VAR_IDX, ST_VIEW_ARG_ID, ST_VIEW_ARG_IDX,
2121
},
2222
traits::TxData,
2323
};
2424
use crate::{
2525
locking_tx_datastore::mut_tx::ReadSet,
2626
system_tables::{
27-
ST_CONNECTION_CREDENTIALS_ID, ST_CONNECTION_CREDENTIALS_IDX, ST_VIEW_COLUMN_ID, ST_VIEW_COLUMN_IDX, ST_VIEW_ID,
28-
ST_VIEW_IDX, ST_VIEW_PARAM_ID, ST_VIEW_PARAM_IDX,
27+
ST_CONNECTION_CREDENTIALS_ID, ST_CONNECTION_CREDENTIALS_IDX, ST_VIEW_CLIENT_ID, ST_VIEW_CLIENT_IDX,
28+
ST_VIEW_COLUMN_ID, ST_VIEW_COLUMN_IDX, ST_VIEW_ID, ST_VIEW_IDX, ST_VIEW_PARAM_ID, ST_VIEW_PARAM_IDX,
2929
},
3030
};
3131
use anyhow::anyhow;
@@ -304,6 +304,8 @@ impl CommittedState {
304304
self.create_table(ST_VIEW_ID, schemas[ST_VIEW_IDX].clone());
305305
self.create_table(ST_VIEW_PARAM_ID, schemas[ST_VIEW_PARAM_IDX].clone());
306306
self.create_table(ST_VIEW_COLUMN_ID, schemas[ST_VIEW_COLUMN_IDX].clone());
307+
self.create_table(ST_VIEW_CLIENT_ID, schemas[ST_VIEW_CLIENT_IDX].clone());
308+
self.create_table(ST_VIEW_ARG_ID, schemas[ST_VIEW_ARG_IDX].clone());
307309

308310
// Insert the sequences into `st_sequences`
309311
let (st_sequences, blob_store, pool) =

crates/datastore/src/locking_tx_datastore/datastore.rs

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1252,12 +1252,13 @@ mod tests {
12521252
use crate::system_tables::{
12531253
system_tables, StColumnRow, StConnectionCredentialsFields, StConstraintData, StConstraintFields,
12541254
StConstraintRow, StIndexAlgorithm, StIndexFields, StIndexRow, StRowLevelSecurityFields, StScheduledFields,
1255-
StSequenceFields, StSequenceRow, StTableRow, StVarFields, StViewFields, ST_CLIENT_NAME, ST_COLUMN_ID,
1256-
ST_COLUMN_NAME, ST_CONNECTION_CREDENTIALS_ID, ST_CONNECTION_CREDENTIALS_NAME, ST_CONSTRAINT_ID,
1255+
StSequenceFields, StSequenceRow, StTableRow, StVarFields, StViewArgFields, StViewFields, ST_CLIENT_NAME,
1256+
ST_COLUMN_ID, ST_COLUMN_NAME, ST_CONNECTION_CREDENTIALS_ID, ST_CONNECTION_CREDENTIALS_NAME, ST_CONSTRAINT_ID,
12571257
ST_CONSTRAINT_NAME, ST_INDEX_ID, ST_INDEX_NAME, ST_MODULE_NAME, ST_RESERVED_SEQUENCE_RANGE,
12581258
ST_ROW_LEVEL_SECURITY_ID, ST_ROW_LEVEL_SECURITY_NAME, ST_SCHEDULED_ID, ST_SCHEDULED_NAME, ST_SEQUENCE_ID,
1259-
ST_SEQUENCE_NAME, ST_TABLE_NAME, ST_VAR_ID, ST_VAR_NAME, ST_VIEW_COLUMN_ID, ST_VIEW_COLUMN_NAME, ST_VIEW_ID,
1260-
ST_VIEW_NAME, ST_VIEW_PARAM_ID, ST_VIEW_PARAM_NAME,
1259+
ST_SEQUENCE_NAME, ST_TABLE_NAME, ST_VAR_ID, ST_VAR_NAME, ST_VIEW_ARG_ID, ST_VIEW_ARG_NAME, ST_VIEW_CLIENT_ID,
1260+
ST_VIEW_CLIENT_NAME, ST_VIEW_COLUMN_ID, ST_VIEW_COLUMN_NAME, ST_VIEW_ID, ST_VIEW_NAME, ST_VIEW_PARAM_ID,
1261+
ST_VIEW_PARAM_NAME,
12611262
};
12621263
use crate::traits::{IsolationLevel, MutTx};
12631264
use crate::Result;
@@ -1713,6 +1714,8 @@ mod tests {
17131714
TableRow { id: ST_VIEW_ID.into(), name: ST_VIEW_NAME, ty: StTableType::System, access: StAccess::Public, primary_key: Some(StViewFields::ViewId.into()) },
17141715
TableRow { id: ST_VIEW_PARAM_ID.into(), name: ST_VIEW_PARAM_NAME, ty: StTableType::System, access: StAccess::Public, primary_key: None },
17151716
TableRow { id: ST_VIEW_COLUMN_ID.into(), name: ST_VIEW_COLUMN_NAME, ty: StTableType::System, access: StAccess::Public, primary_key: None },
1717+
TableRow { id: ST_VIEW_CLIENT_ID.into(), name: ST_VIEW_CLIENT_NAME, ty: StTableType::System, access: StAccess::Public, primary_key: None },
1718+
TableRow { id: ST_VIEW_ARG_ID.into(), name: ST_VIEW_ARG_NAME, ty: StTableType::System, access: StAccess::Public, primary_key: Some(StViewArgFields::Id.into()) },
17161719

17171720
]));
17181721
#[rustfmt::skip]
@@ -1788,6 +1791,14 @@ mod tests {
17881791
ColRow { table: ST_VIEW_COLUMN_ID.into(), pos: 1, name: "col_pos", ty: ColId::get_type() },
17891792
ColRow { table: ST_VIEW_COLUMN_ID.into(), pos: 2, name: "col_name", ty: AlgebraicType::String },
17901793
ColRow { table: ST_VIEW_COLUMN_ID.into(), pos: 3, name: "col_type", ty: AlgebraicType::bytes() },
1794+
1795+
ColRow { table: ST_VIEW_CLIENT_ID.into(), pos: 0, name: "view_id", ty: ViewId::get_type() },
1796+
ColRow { table: ST_VIEW_CLIENT_ID.into(), pos: 1, name: "arg_id", ty: AlgebraicType::U64 },
1797+
ColRow { table: ST_VIEW_CLIENT_ID.into(), pos: 2, name: "identity", ty: AlgebraicType::U256 },
1798+
ColRow { table: ST_VIEW_CLIENT_ID.into(), pos: 3, name: "connection_id", ty: AlgebraicType::U128 },
1799+
1800+
ColRow { table: ST_VIEW_ARG_ID.into(), pos: 0, name: "id", ty: AlgebraicType::U64 },
1801+
ColRow { table: ST_VIEW_ARG_ID.into(), pos: 1, name: "bytes", ty: AlgebraicType::bytes() },
17911802
]));
17921803
#[rustfmt::skip]
17931804
assert_eq!(query.scan_st_indexes()?, map_array([
@@ -1808,6 +1819,10 @@ mod tests {
18081819
IndexRow { id: 15, table: ST_VIEW_ID.into(), col: col(1), name: "st_view_view_name_idx_btree", },
18091820
IndexRow { id: 16, table: ST_VIEW_PARAM_ID.into(), col: col_list![0, 1], name: "st_view_param_view_id_param_pos_idx_btree", },
18101821
IndexRow { id: 17, table: ST_VIEW_COLUMN_ID.into(), col: col_list![0, 1], name: "st_view_column_view_id_col_pos_idx_btree", },
1822+
IndexRow { id: 18, table: ST_VIEW_CLIENT_ID.into(), col: col_list![0, 1], name: "st_view_client_view_id_arg_id_idx_btree", },
1823+
IndexRow { id: 19, table: ST_VIEW_CLIENT_ID.into(), col: col_list![2, 3], name: "st_view_client_identity_connection_id_idx_btree", },
1824+
IndexRow { id: 20, table: ST_VIEW_ARG_ID.into(), col: col(0), name: "st_view_arg_id_idx_btree", },
1825+
IndexRow { id: 21, table: ST_VIEW_ARG_ID.into(), col: col(1), name: "st_view_arg_bytes_idx_btree", },
18111826
]));
18121827
let start = ST_RESERVED_SEQUENCE_RANGE as i128 + 1;
18131828
#[rustfmt::skip]
@@ -1819,6 +1834,7 @@ mod tests {
18191834
SequenceRow { id: 3, table: ST_CONSTRAINT_ID.into(), col_pos: 0, name: "st_constraint_constraint_id_seq", start },
18201835
SequenceRow { id: 4, table: ST_SCHEDULED_ID.into(), col_pos: 0, name: "st_scheduled_schedule_id_seq", start },
18211836
SequenceRow { id: 6, table: ST_VIEW_ID.into(), col_pos: 0, name: "st_view_view_id_seq", start },
1837+
SequenceRow { id: 7, table: ST_VIEW_ARG_ID.into(), col_pos: 0, name: "st_view_arg_id_seq", start },
18221838
],
18231839
|row| StSequenceRow {
18241840
allocated: start - 1,
@@ -1843,6 +1859,8 @@ mod tests {
18431859
ConstraintRow { constraint_id: 14, table_id: ST_VIEW_ID.into(), unique_columns: col(1), constraint_name: "st_view_view_name_key", },
18441860
ConstraintRow { constraint_id: 15, table_id: ST_VIEW_PARAM_ID.into(), unique_columns: col_list![0, 1], constraint_name: "st_view_param_view_id_param_pos_key", },
18451861
ConstraintRow { constraint_id: 16, table_id: ST_VIEW_COLUMN_ID.into(), unique_columns: col_list![0, 1], constraint_name: "st_view_column_view_id_col_pos_key", },
1862+
ConstraintRow { constraint_id: 17, table_id: ST_VIEW_ARG_ID.into(), unique_columns: col(0), constraint_name: "st_view_arg_id_key", },
1863+
ConstraintRow { constraint_id: 18, table_id: ST_VIEW_ARG_ID.into(), unique_columns: col(1), constraint_name: "st_view_arg_bytes_key", },
18461864
]));
18471865

18481866
// Verify we get back the tables correctly with the proper ids...
@@ -2263,6 +2281,10 @@ mod tests {
22632281
IndexRow { id: 15, table: ST_VIEW_ID.into(), col: col(1), name: "st_view_view_name_idx_btree", },
22642282
IndexRow { id: 16, table: ST_VIEW_PARAM_ID.into(), col: col_list![0, 1], name: "st_view_param_view_id_param_pos_idx_btree", },
22652283
IndexRow { id: 17, table: ST_VIEW_COLUMN_ID.into(), col: col_list![0, 1], name: "st_view_column_view_id_col_pos_idx_btree", },
2284+
IndexRow { id: 18, table: ST_VIEW_CLIENT_ID.into(), col: col_list![0, 1], name: "st_view_client_view_id_arg_id_idx_btree", },
2285+
IndexRow { id: 19, table: ST_VIEW_CLIENT_ID.into(), col: col_list![2, 3], name: "st_view_client_identity_connection_id_idx_btree", },
2286+
IndexRow { id: 20, table: ST_VIEW_ARG_ID.into(), col: col(0), name: "st_view_arg_id_idx_btree", },
2287+
IndexRow { id: 21, table: ST_VIEW_ARG_ID.into(), col: col(1), name: "st_view_arg_bytes_idx_btree", },
22662288
IndexRow { id: seq_start, table: FIRST_NON_SYSTEM_ID, col: col(0), name: "Foo_id_idx_btree", },
22672289
IndexRow { id: seq_start + 1, table: FIRST_NON_SYSTEM_ID, col: col(1), name: "Foo_name_idx_btree", },
22682290
IndexRow { id: seq_start + 2, table: FIRST_NON_SYSTEM_ID, col: col(2), name: "Foo_age_idx_btree", },

crates/datastore/src/system_tables.rs

Lines changed: 87 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,10 @@ pub const ST_VIEW_ID: TableId = TableId(12);
7171
pub const ST_VIEW_PARAM_ID: TableId = TableId(13);
7272
/// The static ID of the table that tracks view columns
7373
pub const ST_VIEW_COLUMN_ID: TableId = TableId(14);
74+
/// The static ID of the table that tracks the clients subscribed to each view
75+
pub const ST_VIEW_CLIENT_ID: TableId = TableId(15);
76+
/// The static ID of the table that tracks view arguments
77+
pub const ST_VIEW_ARG_ID: TableId = TableId(16);
7478

7579
pub(crate) const ST_CONNECTION_CREDENTIALS_NAME: &str = "st_connection_credentials";
7680
pub const ST_TABLE_NAME: &str = "st_table";
@@ -86,6 +90,8 @@ pub(crate) const ST_ROW_LEVEL_SECURITY_NAME: &str = "st_row_level_security";
8690
pub(crate) const ST_VIEW_NAME: &str = "st_view";
8791
pub(crate) const ST_VIEW_PARAM_NAME: &str = "st_view_param";
8892
pub(crate) const ST_VIEW_COLUMN_NAME: &str = "st_view_column";
93+
pub(crate) const ST_VIEW_CLIENT_NAME: &str = "st_view_client";
94+
pub(crate) const ST_VIEW_ARG_NAME: &str = "st_view_arg";
8995
/// Reserved range of sequence values used for system tables.
9096
///
9197
/// Ids for user-created tables will start at `ST_RESERVED_SEQUENCE_RANGE`.
@@ -115,7 +121,7 @@ pub enum SystemTable {
115121
st_row_level_security,
116122
}
117123

118-
pub fn system_tables() -> [TableSchema; 14] {
124+
pub fn system_tables() -> [TableSchema; 16] {
119125
[
120126
// The order should match the `id` of the system table, that start with [ST_TABLE_IDX].
121127
st_table_schema(),
@@ -132,6 +138,8 @@ pub fn system_tables() -> [TableSchema; 14] {
132138
st_view_schema(),
133139
st_view_param_schema(),
134140
st_view_column_schema(),
141+
st_view_client_schema(),
142+
st_view_arg_schema(),
135143
]
136144
}
137145

@@ -174,6 +182,8 @@ pub(crate) const ST_CONNECTION_CREDENTIALS_IDX: usize = 10;
174182
pub(crate) const ST_VIEW_IDX: usize = 11;
175183
pub(crate) const ST_VIEW_PARAM_IDX: usize = 12;
176184
pub(crate) const ST_VIEW_COLUMN_IDX: usize = 13;
185+
pub(crate) const ST_VIEW_CLIENT_IDX: usize = 14;
186+
pub(crate) const ST_VIEW_ARG_IDX: usize = 15;
177187

178188
macro_rules! st_fields_enum {
179189
($(#[$attr:meta])* enum $ty_name:ident { $($name:expr, $var:ident = $discr:expr,)* }) => {
@@ -240,6 +250,18 @@ st_fields_enum!(enum StViewColumnFields {
240250
"col_type", ColType = 3,
241251
});
242252
// WARNING: For a stable schema, don't change the field names and discriminants.
253+
st_fields_enum!(enum StViewClientFields {
254+
"view_id", ViewId = 0,
255+
"arg_id", ArgId = 1,
256+
"identity", Identity = 2,
257+
"connection_id", ConnectionId = 3,
258+
});
259+
// WARNING: For a stable schema, don't change the field names and discriminants.
260+
st_fields_enum!(enum StViewArgFields {
261+
"id", Id = 0,
262+
"bytes", Bytes = 1,
263+
});
264+
// WARNING: For a stable schema, don't change the field names and discriminants.
243265
st_fields_enum!(enum StViewParamFields {
244266
"view_id", ViewId = 0,
245267
"param_pos", ParamPos = 1,
@@ -376,6 +398,25 @@ fn system_module_def() -> ModuleDef {
376398
.with_unique_constraint(st_view_param_unique_cols)
377399
.with_index_no_accessor_name(btree(st_view_param_unique_cols));
378400

401+
let st_view_client_type = builder.add_type::<StViewClientRow>();
402+
builder
403+
.build_table(
404+
ST_VIEW_CLIENT_NAME,
405+
*st_view_client_type.as_ref().expect("should be ref"),
406+
)
407+
.with_type(TableType::System)
408+
.with_index_no_accessor_name(btree([StViewClientFields::ViewId, StViewClientFields::ArgId]))
409+
.with_index_no_accessor_name(btree([StViewClientFields::Identity, StViewClientFields::ConnectionId]));
410+
411+
let st_view_arg_type = builder.add_type::<StViewArgRow>();
412+
builder
413+
.build_table(ST_VIEW_ARG_NAME, *st_view_arg_type.as_ref().expect("should be ref"))
414+
.with_type(TableType::System)
415+
.with_auto_inc_primary_key(StViewArgFields::Id)
416+
.with_index_no_accessor_name(btree(StViewArgFields::Id))
417+
.with_unique_constraint(StViewArgFields::Bytes)
418+
.with_index_no_accessor_name(btree(StViewArgFields::Bytes));
419+
379420
let st_index_type = builder.add_type::<StIndexRow>();
380421
builder
381422
.build_table(ST_INDEX_NAME, *st_index_type.as_ref().expect("should be ref"))
@@ -475,6 +516,8 @@ fn system_module_def() -> ModuleDef {
475516
validate_system_table::<StViewFields>(&result, ST_VIEW_NAME);
476517
validate_system_table::<StViewParamFields>(&result, ST_VIEW_PARAM_NAME);
477518
validate_system_table::<StViewColumnFields>(&result, ST_VIEW_COLUMN_NAME);
519+
validate_system_table::<StViewClientFields>(&result, ST_VIEW_CLIENT_NAME);
520+
validate_system_table::<StViewArgFields>(&result, ST_VIEW_ARG_NAME);
478521

479522
result
480523
}
@@ -514,6 +557,8 @@ lazy_static::lazy_static! {
514557
m.insert("st_view_view_name_key", ConstraintId(14));
515558
m.insert("st_view_param_view_id_param_pos_key", ConstraintId(15));
516559
m.insert("st_view_column_view_id_col_pos_key", ConstraintId(16));
560+
m.insert("st_view_arg_id_key", ConstraintId(17));
561+
m.insert("st_view_arg_bytes_key", ConstraintId(18));
517562
m
518563
};
519564
}
@@ -540,6 +585,10 @@ lazy_static::lazy_static! {
540585
m.insert("st_view_view_name_idx_btree", IndexId(15));
541586
m.insert("st_view_param_view_id_param_pos_idx_btree", IndexId(16));
542587
m.insert("st_view_column_view_id_col_pos_idx_btree", IndexId(17));
588+
m.insert("st_view_client_view_id_arg_id_idx_btree", IndexId(18));
589+
m.insert("st_view_client_identity_connection_id_idx_btree", IndexId(19));
590+
m.insert("st_view_arg_id_idx_btree", IndexId(20));
591+
m.insert("st_view_arg_bytes_idx_btree", IndexId(21));
543592
m
544593
};
545594
}
@@ -555,6 +604,7 @@ lazy_static::lazy_static! {
555604
m.insert("st_scheduled_schedule_id_seq", SequenceId(4));
556605
m.insert("st_sequence_sequence_id_seq", SequenceId(5));
557606
m.insert("st_view_view_id_seq", SequenceId(6));
607+
m.insert("st_view_arg_id_seq", SequenceId(7));
558608
m
559609
};
560610
}
@@ -667,6 +717,14 @@ pub fn st_view_column_schema() -> TableSchema {
667717
st_schema(ST_VIEW_COLUMN_NAME, ST_VIEW_COLUMN_ID)
668718
}
669719

720+
pub fn st_view_client_schema() -> TableSchema {
721+
st_schema(ST_VIEW_CLIENT_NAME, ST_VIEW_CLIENT_ID)
722+
}
723+
724+
pub fn st_view_arg_schema() -> TableSchema {
725+
st_schema(ST_VIEW_ARG_NAME, ST_VIEW_ARG_ID)
726+
}
727+
670728
/// If `table_id` refers to a known system table, return its schema.
671729
///
672730
/// Used when restoring from a snapshot; system tables are reinstantiated with this schema,
@@ -689,6 +747,8 @@ pub(crate) fn system_table_schema(table_id: TableId) -> Option<TableSchema> {
689747
ST_VIEW_ID => Some(st_view_schema()),
690748
ST_VIEW_PARAM_ID => Some(st_view_param_schema()),
691749
ST_VIEW_COLUMN_ID => Some(st_view_column_schema()),
750+
ST_VIEW_CLIENT_ID => Some(st_view_client_schema()),
751+
ST_VIEW_ARG_ID => Some(st_view_arg_schema()),
692752
_ => None,
693753
}
694754
}
@@ -863,6 +923,32 @@ pub struct StViewParamRow {
863923
pub param_type: AlgebraicTypeViaBytes,
864924
}
865925

926+
/// System table [ST_VIEW_CLIENT_NAME]
927+
///
928+
/// | view_id | arg_id | identity | connection_id |
929+
/// |---------|--------|----------|---------------|
930+
/// | 1 | 2 | 0x... | 0x... |
931+
#[derive(Debug, Clone, Eq, PartialEq, SpacetimeType)]
932+
#[sats(crate = spacetimedb_lib)]
933+
pub struct StViewClientRow {
934+
pub view_id: ViewId,
935+
pub arg_id: u64,
936+
pub identity: IdentityViaU256,
937+
pub connection_id: ConnectionIdViaU128,
938+
}
939+
940+
/// System table [ST_VIEW_ARG_NAME]
941+
///
942+
/// | id | bytes |
943+
/// |----|---------|
944+
/// | 1 | <bytes> |
945+
#[derive(Debug, Clone, Eq, PartialEq, SpacetimeType)]
946+
#[sats(crate = spacetimedb_lib)]
947+
pub struct StViewArgRow {
948+
pub id: u64,
949+
pub bytes: Box<[u8]>,
950+
}
951+
866952
/// System Table [ST_INDEX_NAME]
867953
///
868954
/// | index_id | table_id | index_name | index_algorithm |

0 commit comments

Comments
 (0)