Skip to content

Commit 1b87bac

Browse files
Shubham8287Centril
andauthored
Views: ephemeral tables (#3670)
# Description of Changes Make View backing tables and related St tables not persistent. 1. Modifies `CommittedState` to hold set of ephemeral tables. 2. Update `TxData` to contain a subset of ephemeral tables which has been modified in current transaction. `do_durability` filter those table out before writting the transaction to commitlog. depends on: #3651 # API and ABI breaking changes NA # Expected complexity level and risk 2.5. looks simple but changes comes in the hotpath, I ensured we don't do unneccessary heap allocations but patch has the potential to regress perfomance. # Testing - unit test. --------- Signed-off-by: Shubham Mishra <[email protected]> Co-authored-by: Mazdak Farrokhzad <[email protected]>
1 parent f1b6328 commit 1b87bac

File tree

6 files changed

+198
-5
lines changed

6 files changed

+198
-5
lines changed

crates/core/src/db/relational_db.rs

Lines changed: 117 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -821,9 +821,18 @@ impl RelationalDB {
821821
Txdata,
822822
};
823823

824+
let is_not_ephemeral_table = |table_id: &TableId| -> bool {
825+
tx_data
826+
.ephemeral_tables()
827+
.map(|etables| !etables.contains(table_id))
828+
.unwrap_or(true)
829+
};
830+
824831
if tx_data.tx_offset().is_some() {
825832
let inserts: Box<_> = tx_data
826833
.inserts()
834+
// Skip ephemeral tables
835+
.filter(|(table_id, _)| is_not_ephemeral_table(table_id))
827836
.map(|(table_id, rowdata)| Ops {
828837
table_id: *table_id,
829838
rowdata: rowdata.clone(),
@@ -834,6 +843,7 @@ impl RelationalDB {
834843

835844
let deletes: Box<_> = tx_data
836845
.deletes()
846+
.filter(|(table_id, _)| is_not_ephemeral_table(table_id))
837847
.map(|(table_id, rowdata)| Ops {
838848
table_id: *table_id,
839849
rowdata: rowdata.clone(),
@@ -842,6 +852,8 @@ impl RelationalDB {
842852
.filter(|ops| !truncates.contains(&ops.table_id))
843853
.collect();
844854

855+
let truncates = truncates.into_iter().filter(is_not_ephemeral_table).collect();
856+
845857
let inputs = reducer_context.map(|rcx| rcx.into());
846858

847859
let txdata = Txdata {
@@ -850,7 +862,7 @@ impl RelationalDB {
850862
mutations: Some(Mutations {
851863
inserts,
852864
deletes,
853-
truncates: truncates.into_iter().collect(),
865+
truncates,
854866
}),
855867
};
856868

@@ -2408,6 +2420,27 @@ mod tests {
24082420
TableSchema::from_module_def(&def, table, (), TableId::SENTINEL)
24092421
}
24102422

2423+
fn view_module_def() -> ModuleDef {
2424+
let mut builder = RawModuleDefV9Builder::new();
2425+
2426+
let return_type_ref = builder.add_algebraic_type(
2427+
[],
2428+
"my_view_return_type",
2429+
AlgebraicType::product([("b", AlgebraicType::U8)]),
2430+
true,
2431+
);
2432+
builder.add_view(
2433+
"my_view",
2434+
0,
2435+
true,
2436+
false,
2437+
ProductType::unit(),
2438+
AlgebraicType::array(AlgebraicType::Ref(return_type_ref)),
2439+
);
2440+
let raw = builder.finish();
2441+
raw.try_into().expect("table validation failed")
2442+
}
2443+
24112444
fn table_auto_inc() -> TableSchema {
24122445
table(
24132446
"MyTable",
@@ -2492,6 +2525,89 @@ mod tests {
24922525
Ok(())
24932526
}
24942527

2528+
fn setup_view(stdb: &TestDB) -> ResultTest<(ViewId, TableId, ModuleDef, ViewDef)> {
2529+
let module_def = view_module_def();
2530+
let view_def = module_def.view("my_view").unwrap();
2531+
2532+
let mut tx = begin_mut_tx(stdb);
2533+
let (view_id, table_id) = stdb.create_view(&mut tx, &module_def, view_def)?;
2534+
stdb.commit_tx(tx)?;
2535+
2536+
Ok((view_id, table_id, module_def.clone(), view_def.clone()))
2537+
}
2538+
2539+
fn insert_view_row(
2540+
stdb: &TestDB,
2541+
view_id: ViewId,
2542+
table_id: TableId,
2543+
typespace: &Typespace,
2544+
row_type: AlgebraicTypeRef,
2545+
sender: Identity,
2546+
v: u8,
2547+
) -> ResultTest<()> {
2548+
let to_bsatn = |pv: &ProductValue| {
2549+
Bytes::from(bsatn::to_vec(&AlgebraicValue::Array([pv.clone()].into())).expect("bstan serialization failed"))
2550+
};
2551+
2552+
let row_pv = |v: u8| product![v];
2553+
2554+
let mut tx = begin_mut_tx(stdb);
2555+
tx.subscribe_view(view_id, ArgId::SENTINEL, sender)?;
2556+
stdb.materialize_view(&mut tx, table_id, sender, row_type, to_bsatn(&row_pv(v)), typespace)?;
2557+
stdb.commit_tx(tx)?;
2558+
2559+
Ok(())
2560+
}
2561+
2562+
fn project_views(stdb: &TestDB, table_id: TableId, sender: Identity) -> Vec<ProductValue> {
2563+
let tx = begin_tx(stdb);
2564+
2565+
stdb.iter_by_col_eq(&tx, table_id, 0, &sender.into())
2566+
.unwrap()
2567+
.map(|row| {
2568+
let pv = row.to_product_value();
2569+
ProductValue {
2570+
elements: pv.elements.iter().skip(1).cloned().collect(),
2571+
}
2572+
})
2573+
.collect()
2574+
}
2575+
2576+
#[test]
2577+
fn test_view_tables_are_ephemeral() -> ResultTest<()> {
2578+
let stdb = TestDB::durable()?;
2579+
2580+
let (view_id, table_id, module_def, view_def) = setup_view(&stdb)?;
2581+
let row_type = view_def.product_type_ref;
2582+
let typespace = module_def.typespace();
2583+
2584+
// Write some rows (reusing the same helper)
2585+
insert_view_row(&stdb, view_id, table_id, typespace, row_type, Identity::ONE, 10)?;
2586+
insert_view_row(&stdb, view_id, table_id, typespace, row_type, Identity::ZERO, 20)?;
2587+
2588+
assert!(
2589+
!project_views(&stdb, table_id, Identity::ZERO).is_empty(),
2590+
"View table should NOT be empty after insert"
2591+
);
2592+
2593+
// Reopen the database — view tables must not persist
2594+
let stdb = stdb.reopen()?;
2595+
2596+
// Validate that the view's backing table has been removed
2597+
assert!(
2598+
project_views(&stdb, table_id, Identity::ZERO).is_empty(),
2599+
"View table should be empty after reopening the database"
2600+
);
2601+
2602+
let tx = begin_mut_tx(&stdb);
2603+
let subs_rows = tx.lookup_st_view_subs(view_id)?;
2604+
assert!(
2605+
subs_rows.is_empty(),
2606+
"st_view_subs should be empty after reopening the database"
2607+
);
2608+
Ok(())
2609+
}
2610+
24952611
#[test]
24962612
fn test_table_name() -> ResultTest<()> {
24972613
let stdb = TestDB::durable()?;

crates/datastore/src/error.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use super::system_tables::SystemTable;
22
use enum_as_inner::EnumAsInner;
33
use spacetimedb_lib::db::raw_def::{v9::RawSql, RawIndexDefV8};
4-
use spacetimedb_primitives::{ColId, ColList, IndexId, SequenceId, TableId};
4+
use spacetimedb_primitives::{ColId, ColList, IndexId, SequenceId, TableId, ViewId};
55
use spacetimedb_sats::buffer::DecodeError;
66
use spacetimedb_sats::{product_value::InvalidFieldError, satn::Satn};
77
use spacetimedb_sats::{AlgebraicType, AlgebraicValue, ProductValue};
@@ -41,6 +41,8 @@ pub enum DatastoreError {
4141
pub enum ViewError {
4242
#[error("view '{0}' not found")]
4343
NotFound(Box<str>),
44+
#[error("Table backing View '{0}' not found")]
45+
TableNotFound(ViewId),
4446
#[error("failed to deserialize view arguments from row")]
4547
DeserializeArgs,
4648
#[error("failed to deserialize view return value: {0}")]

crates/datastore/src/locking_tx_datastore/committed_state.rs

Lines changed: 48 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,18 @@ use super::{
88
};
99
use crate::{
1010
db_metrics::DB_METRICS,
11-
error::{DatastoreError, IndexError, TableError},
11+
error::{DatastoreError, IndexError, TableError, ViewError},
1212
execution_context::ExecutionContext,
1313
locking_tx_datastore::{mut_tx::ViewReadSets, state_view::iter_st_column_for_table},
1414
system_tables::{
1515
system_tables, StColumnRow, StConstraintData, StConstraintRow, StIndexRow, StSequenceRow, StTableFields,
16-
StTableRow, SystemTable, ST_CLIENT_ID, ST_CLIENT_IDX, ST_COLUMN_ID, ST_COLUMN_IDX, ST_COLUMN_NAME,
16+
StTableRow, StViewRow, SystemTable, ST_CLIENT_ID, ST_CLIENT_IDX, ST_COLUMN_ID, ST_COLUMN_IDX, ST_COLUMN_NAME,
1717
ST_CONSTRAINT_ID, ST_CONSTRAINT_IDX, ST_CONSTRAINT_NAME, ST_INDEX_ID, ST_INDEX_IDX, ST_INDEX_NAME,
1818
ST_MODULE_ID, ST_MODULE_IDX, ST_ROW_LEVEL_SECURITY_ID, ST_ROW_LEVEL_SECURITY_IDX, ST_SCHEDULED_ID,
1919
ST_SCHEDULED_IDX, ST_SEQUENCE_ID, ST_SEQUENCE_IDX, ST_SEQUENCE_NAME, ST_TABLE_ID, ST_TABLE_IDX, ST_VAR_ID,
2020
ST_VAR_IDX, ST_VIEW_ARG_ID, ST_VIEW_ARG_IDX,
2121
},
22-
traits::TxData,
22+
traits::{EphemeralTables, TxData},
2323
};
2424
use crate::{
2525
locking_tx_datastore::ViewCallInfo,
@@ -80,6 +80,12 @@ pub struct CommittedState {
8080
/// Any overlap will trigger a re-evaluation of the affected view,
8181
/// and its read set will be updated accordingly.
8282
read_sets: ViewReadSets,
83+
84+
/// Tables which do not need to be made persistent.
85+
/// These include:
86+
/// - system tables: `st_view_sub`, `st_view_arg`
87+
/// - Tables which back views.
88+
pub(super) ephemeral_tables: EphemeralTables,
8389
}
8490

8591
impl CommittedState {
@@ -99,6 +105,7 @@ impl MemoryUsage for CommittedState {
99105
page_pool: _,
100106
table_dropped,
101107
read_sets,
108+
ephemeral_tables,
102109
} = self;
103110
// NOTE(centril): We do not want to include the heap usage of `page_pool` as it's a shared resource.
104111
next_tx_offset.heap_usage()
@@ -107,6 +114,7 @@ impl MemoryUsage for CommittedState {
107114
+ index_id_map.heap_usage()
108115
+ table_dropped.heap_usage()
109116
+ read_sets.heap_usage()
117+
+ ephemeral_tables.heap_usage()
110118
}
111119
}
112120

@@ -171,6 +179,7 @@ impl CommittedState {
171179
table_dropped: <_>::default(),
172180
read_sets: <_>::default(),
173181
page_pool,
182+
ephemeral_tables: <_>::default(),
174183
}
175184
}
176185

@@ -518,6 +527,32 @@ impl CommittedState {
518527
Ok(())
519528
}
520529

530+
pub(super) fn collect_ephemeral_tables(&mut self) -> Result<()> {
531+
self.ephemeral_tables = self.ephemeral_tables()?.into_iter().collect();
532+
Ok(())
533+
}
534+
535+
fn ephemeral_tables(&self) -> Result<Vec<TableId>> {
536+
let mut tables = vec![ST_VIEW_SUB_ID, ST_VIEW_ARG_ID];
537+
538+
let Some(st_view) = self.tables.get(&ST_VIEW_ID) else {
539+
return Ok(tables);
540+
};
541+
let backing_tables = st_view
542+
.scan_rows(&self.blob_store)
543+
.map(|row_ref| {
544+
let view_row = StViewRow::try_from(row_ref)?;
545+
view_row
546+
.table_id
547+
.ok_or_else(|| DatastoreError::View(ViewError::TableNotFound(view_row.view_id)))
548+
})
549+
.collect::<Result<Vec<_>>>()?;
550+
551+
tables.extend(backing_tables);
552+
553+
Ok(tables)
554+
}
555+
521556
/// After replaying all old transactions,
522557
/// inserts and deletes into the system tables
523558
/// might not be reflected in the schemas of the built tables.
@@ -675,6 +710,8 @@ impl CommittedState {
675710
self.next_tx_offset += 1;
676711
}
677712

713+
tx_data.set_ephemeral_tables(&self.ephemeral_tables);
714+
678715
tx_data
679716
}
680717

@@ -847,17 +884,25 @@ impl CommittedState {
847884
}
848885
// A table was removed. Add it back.
849886
TableRemoved(table_id, table) => {
887+
let is_view_table = table.schema.is_view();
850888
// We don't need to deal with sub-components.
851889
// That is, we don't need to add back indices and such.
852890
// Instead, there will be separate pending schema changes like `IndexRemoved`.
853891
self.tables.insert(table_id, table);
892+
893+
// Incase, the table was ephemeral, add it back to that set as well.
894+
if is_view_table {
895+
self.ephemeral_tables.insert(table_id);
896+
}
854897
}
855898
// A table was added. Remove it.
856899
TableAdded(table_id) => {
857900
// We don't need to deal with sub-components.
858901
// That is, we don't need to remove indices and such.
859902
// Instead, there will be separate pending schema changes like `IndexAdded`.
860903
self.tables.remove(&table_id);
904+
// Incase, the table was ephemeral, remove it from that set as well.
905+
self.ephemeral_tables.remove(&table_id);
861906
}
862907
// A table's access was changed. Change back to the old one.
863908
TableAlterAccess(table_id, access) => {

crates/datastore/src/locking_tx_datastore/datastore.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,8 @@ impl Locking {
146146
committed_state.build_indexes()?;
147147
// Figure out where to pick up for each sequence.
148148
*self.sequence_state.lock() = committed_state.build_sequence_state()?;
149+
150+
committed_state.collect_ephemeral_tables()?;
149151
Ok(())
150152
}
151153

crates/datastore/src/locking_tx_datastore/mut_tx.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,8 @@ impl MutTxId {
384384
self.insert_into_st_view_param(view_id, param_columns)?;
385385
self.insert_into_st_view_column(view_id, return_columns)?;
386386

387+
self.committed_state_write_lock.ephemeral_tables.insert(table_id);
388+
387389
Ok((view_id, table_id))
388390
}
389391

crates/datastore/src/traits.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,8 @@ pub enum IsolationLevel {
166166
Serializable,
167167
}
168168

169+
pub type EphemeralTables = IntSet<TableId>;
170+
169171
/// A record of all the operations within a transaction.
170172
///
171173
/// Some extra information is embedded here
@@ -191,6 +193,11 @@ pub struct TxData {
191193
/// `None` implies that `inserts` and `deletes` are both empty,
192194
/// but `Some` does not necessarily imply that either is non-empty.
193195
tx_offset: Option<u64>,
196+
197+
/// Set of ephemeral tables modified in this transaction (only populated when a view is executed).
198+
/// These tables do not need to be persisted to disk.
199+
/// Every table listed here must appear in either `inserts` or `deletes`.
200+
ephemeral_tables: Option<EphemeralTables>,
194201
}
195202

196203
impl TxData {
@@ -226,6 +233,25 @@ impl TxData {
226233
self.truncates.extend(truncated_tables);
227234
}
228235

236+
/// Determines which ephemeral tables were modified in this transaction.
237+
///
238+
/// Iterates over the tables updated in this transaction and records those that
239+
/// also appear in `all_ephemeral_tables`.
240+
/// `self.ephemeral_tables` remains `None` if no ephemeral tables were modified.
241+
pub fn set_ephemeral_tables(&mut self, all_ephemeral_tables: &EphemeralTables) {
242+
for tid in self.tables.keys() {
243+
if all_ephemeral_tables.contains(tid) {
244+
self.ephemeral_tables
245+
.get_or_insert_with(EphemeralTables::default)
246+
.insert(*tid);
247+
}
248+
}
249+
}
250+
251+
pub fn ephemeral_tables(&self) -> Option<&EphemeralTables> {
252+
self.ephemeral_tables.as_ref()
253+
}
254+
229255
/// Obtain an iterator over the inserted rows per table.
230256
pub fn inserts(&self) -> impl Iterator<Item = (&TableId, &Arc<[ProductValue]>)> + '_ {
231257
self.inserts.iter()

0 commit comments

Comments
 (0)