Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 127 additions & 0 deletions crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1073,6 +1073,50 @@ impl RelationalDB {
}
}

/// Duration after which expired unused views are cleaned up.
/// Value is chosen arbitrarily; can be tuned later if needed.
const VIEWS_EXPIRATION: std::time::Duration = std::time::Duration::from_secs(10 * 60);

/// Duration to budget for each view cleanup job,
/// so that it doesn't hold transaction lock for to long.
//TODO: Make this value configurable
const VIEW_CLEANUP_BUDGET: std::time::Duration = std::time::Duration::from_millis(10);

/// Spawn a background task that periodically cleans up expired views
pub fn spawn_view_cleanup_loop(db: Arc<RelationalDB>) -> tokio::task::AbortHandle {
tokio::spawn(async move {
let db = &db;
loop {
match db.with_auto_commit(Workload::Internal, |tx| {
tx.clear_expired_views(VIEWS_EXPIRATION, VIEW_CLEANUP_BUDGET)
.map_err(DBError::from)
}) {
Ok((cleared, total_expired)) => {
if cleared != total_expired {
//TODO: Report it as metric
log::info!(
"[{}] DATABASE: cleared {} expired views ({} remaining)",
db.database_identity(),
cleared,
total_expired - cleared
);
}
}
Err(e) => {
log::error!(
"[{}] DATABASE: failed to clear expired views: {}",
db.database_identity(),
e
);
}
}

tokio::time::sleep(VIEWS_EXPIRATION).await;
}
})
.abort_handle()
}

impl RelationalDB {
pub fn create_table(&self, tx: &mut MutTx, schema: TableSchema) -> Result<TableId, DBError> {
Ok(self.inner.create_table_mut_tx(tx, schema)?)
Expand Down Expand Up @@ -2368,6 +2412,7 @@ mod tests {
use std::fs::OpenOptions;
use std::path::PathBuf;
use std::rc::Rc;
use std::time::Instant;

use super::tests_utils::begin_mut_tx;
use super::*;
Expand Down Expand Up @@ -2608,6 +2653,88 @@ mod tests {
Ok(())
}

#[test]
fn test_views() -> ResultTest<()> {
let stdb = TestDB::durable()?;

let (view_id, table_id, module_def, view_def) = setup_view(&stdb)?;
let row_type = view_def.product_type_ref;
let typespace = module_def.typespace();

let sender1 = Identity::ONE;

// Sender 1 insert
insert_view_row(&stdb, view_id, table_id, typespace, row_type, sender1, 42)?;

assert_eq!(
project_views(&stdb, table_id, sender1)[0],
product![42u8],
"View row not inserted correctly"
);

// Sender 2 insert
let sender2 = Identity::ZERO;
let before_sender2 = Instant::now();
insert_view_row(&stdb, view_id, table_id, typespace, row_type, sender2, 84)?;

assert_eq!(
project_views(&stdb, table_id, sender2)[0],
product![84u8],
"Sender 2 view row not inserted correctly"
);

// Restart database (view rows should NOT persist)
let stdb = stdb.reopen()?;

assert!(
project_views(&stdb, table_id, sender1).is_empty(),
"Sender 1 rows should NOT persist after reopen"
);
assert!(
project_views(&stdb, table_id, sender2).is_empty(),
"Sender 2 rows should NOT persist after reopen"
);

let tx = begin_mut_tx(&stdb);
let st = tx.lookup_st_view_subs(view_id)?;
assert!(st.is_empty(), "st_view_subs should also be cleared after restart");
stdb.commit_tx(tx)?;

// Reinsert after restart
insert_view_row(&stdb, view_id, table_id, typespace, row_type, sender2, 91)?;
assert_eq!(
project_views(&stdb, table_id, sender2)[0],
product![91u8],
"Post-restart inserted rows must appear"
);

// Clean expired rows
let mut tx = begin_mut_tx(&stdb);
tx.clear_expired_views(
Instant::now().saturating_duration_since(before_sender2),
VIEW_CLEANUP_BUDGET,
)?;
stdb.commit_tx(tx)?;

// Only sender2 exists after reinsertion
assert!(
project_views(&stdb, table_id, sender1).is_empty(),
"Sender 1 should remain empty"
);
assert_eq!(
project_views(&stdb, table_id, sender2)[0],
product![91u8],
"Sender 2 row should remain"
);

// And st_view_subs must reflect only sender2
let tx = begin_mut_tx(&stdb);
let st_after = tx.lookup_st_view_subs(view_id)?;
assert_eq!(st_after.len(), 1);
assert_eq!(st_after[0].identity.0, sender2);

Ok(())
}
#[test]
fn test_table_name() -> ResultTest<()> {
let stdb = TestDB::durable()?;
Expand Down
14 changes: 8 additions & 6 deletions crates/core/src/host/host_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use super::{Scheduler, UpdateDatabaseResult};
use crate::client::{ClientActorId, ClientName};
use crate::database_logger::DatabaseLogger;
use crate::db::persistence::PersistenceProvider;
use crate::db::relational_db::{self, DiskSizeFn, RelationalDB, Txdata};
use crate::db::relational_db::{self, spawn_view_cleanup_loop, DiskSizeFn, RelationalDB, Txdata};
use crate::db::{self, spawn_tx_metrics_recorder};
use crate::energy::{EnergyMonitor, EnergyQuanta, NullEnergyMonitor};
use crate::host::module_host::ModuleRuntime as _;
Expand Down Expand Up @@ -746,6 +746,9 @@ struct Host {
/// Handle to the task responsible for recording metrics for each transaction.
/// The task is aborted when [`Host`] is dropped.
tx_metrics_recorder_task: AbortHandle,
/// Handle to the task responsible for cleaning up old views.
/// The task is aborted when [`Host`] is dropped.
view_cleanup_task: AbortHandle,
}

impl Host {
Expand Down Expand Up @@ -870,19 +873,17 @@ impl Host {

scheduler_starter.start(&module_host)?;
let disk_metrics_recorder_task = tokio::spawn(metric_reporter(replica_ctx.clone())).abort_handle();
let view_cleanup_task = spawn_view_cleanup_loop(replica_ctx.relational_db.clone());

let module = watch::Sender::new(module_host);
//TODO(shub): Below code interfere with `exit_module` code,
// I suspect channel internally holds a reference to the module,
// even after we drop the sender.
//
// replica_ctx.subscriptions.init(module.subscribe());

Ok(Host {
module,
replica_ctx,
scheduler,
disk_metrics_recorder_task,
tx_metrics_recorder_task,
view_cleanup_task,
})
}

Expand Down Expand Up @@ -1059,6 +1060,7 @@ impl Drop for Host {
fn drop(&mut self) {
self.disk_metrics_recorder_task.abort();
self.tx_metrics_recorder_task.abort();
self.view_cleanup_task.abort();
}
}

Expand Down
4 changes: 2 additions & 2 deletions crates/datastore/src/locking_tx_datastore/committed_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -669,8 +669,8 @@ impl CommittedState {
tx_data.has_rows_or_connect_disconnect(ctx.reducer_context())
}

pub(super) fn drop_view_from_read_sets(&mut self, view_id: ViewId) {
self.read_sets.remove_view(view_id)
pub(super) fn drop_view_from_read_sets(&mut self, view_id: ViewId, sender: Option<Identity>) {
self.read_sets.remove_view(view_id, sender)
}

pub(super) fn merge(&mut self, tx_state: TxState, read_sets: ViewReadSets, ctx: &ExecutionContext) -> TxData {
Expand Down
107 changes: 101 additions & 6 deletions crates/datastore/src/locking_tx_datastore/mut_tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,9 @@ impl ViewReadSets {
}

/// Removes keys for `view_id` from the read set
pub fn remove_view(&mut self, view_id: ViewId) {
pub fn remove_view(&mut self, view_id: ViewId, sender: Option<Identity>) {
self.tables.retain(|_, readset| {
readset.remove_view(view_id);
readset.remove_view(view_id, sender);
!readset.is_empty()
});
}
Expand Down Expand Up @@ -144,9 +144,14 @@ impl TableReadSet {
self.table_scans.is_empty()
}

/// Removes keys for `view_id` from the read set
fn remove_view(&mut self, view_id: ViewId) {
self.table_scans.retain(|info| info.view_id != view_id);
/// Removes keys for `view_id` from the read set, optionally filtering by `sender`
fn remove_view(&mut self, view_id: ViewId, sender: Option<Identity>) {
if let Some(identity) = sender {
self.table_scans
.retain(|call| !(call.view_id == view_id && call.sender.as_ref() == Some(&identity)));
} else {
self.table_scans.retain(|call| call.view_id != view_id);
}
}

/// Merge or union two read sets for this table
Expand Down Expand Up @@ -221,7 +226,13 @@ impl MutTxId {
/// Removes keys for `view_id` from the committed read set.
/// Used for dropping views in an auto-migration.
pub fn drop_view_from_committed_read_set(&mut self, view_id: ViewId) {
self.committed_state_write_lock.drop_view_from_read_sets(view_id)
self.committed_state_write_lock.drop_view_from_read_sets(view_id, None)
}

/// Removes a specific view call from the committed read set.
pub fn drop_view_with_sender_from_committed_read_set(&mut self, view_id: ViewId, sender: Identity) {
self.committed_state_write_lock
.drop_view_from_read_sets(view_id, Some(sender))
}
}

Expand Down Expand Up @@ -1961,6 +1972,90 @@ impl MutTxId {
Ok(())
}

/// Clean up views that have no subscribers and haven’t been called recently.
///
/// This function will scan for subscription entries in `st_view_sub` where:
/// - `has_subscribers == false`, `num_subscribers == 0`.
/// - `last_called` is older than `expiration_duration`.
///
/// For each such expired view:
/// 1. It clears the backing table,
/// 2. Removes the view from the committed read set, and
/// 3. Deletes the subscription row.
///
/// The cleanup is bounded by a total `max_duration`. The function stops when either:
/// - all expired views have been processed, or
/// - the `max_duration` budget is reached.
///
/// Returns a tuple `(cleaned, total_expired)`:
/// - `cleaned`: Number of views actually cleaned (deleted) in this run.
/// - `total_expired`: Total number of expired views found (even if not all were cleaned due to time budget).
pub fn clear_expired_views(
&mut self,
expiration_duration: Duration,
max_duration: Duration,
) -> Result<(usize, usize)> {
let start = std::time::Instant::now();
let now = Timestamp::now();
let expiration_threshold = now - expiration_duration;
let mut cleaned_count = 0;

// Collect all expired views from st_view_sub
let expired_items: Vec<(ViewId, Identity, RowPointer)> = self
.iter_by_col_eq(
ST_VIEW_SUB_ID,
StViewSubFields::HasSubscribers,
&AlgebraicValue::from(false),
)?
.filter_map(|row_ref| {
let row = StViewSubRow::try_from(row_ref).expect("Failed to deserialize st_view_sub row");

if !row.has_subscribers && row.num_subscribers == 0 && row.last_called.0 < expiration_threshold {
Some((row.view_id, row.identity.into(), row_ref.pointer()))
} else {
None
}
})
.collect();

let total_expired = expired_items.len();

// For each expired view subscription, clear the backing table and delete the subscription
for (view_id, sender, sub_row_ptr) in expired_items {
// Check if we've exceeded our time budget
if start.elapsed() >= max_duration {
break;
}

let StViewRow {
table_id, is_anonymous, ..
} = self.lookup_st_view(view_id)?;
let table_id = table_id.expect("views have backing table");

if is_anonymous {
self.clear_table(table_id)?;
self.drop_view_from_committed_read_set(view_id);
} else {
let rows_to_delete = self
.iter_by_col_eq(table_id, 0, &sender.into())?
.map(|res| res.pointer())
.collect::<Vec<_>>();

for row_ptr in rows_to_delete {
self.delete(table_id, row_ptr)?;
}

self.drop_view_with_sender_from_committed_read_set(view_id, sender);
}

// Finally, delete the subscription row
self.delete(ST_VIEW_SUB_ID, sub_row_ptr)?;
cleaned_count += 1;
}

Ok((cleaned_count, total_expired))
}

/// Decrement `num_subscribers` in `st_view_sub` to effectively unsubscribe a caller from a view.
pub fn unsubscribe_view(&mut self, view_id: ViewId, arg_id: ArgId, sender: Identity) -> Result<()> {
use StViewSubFields::*;
Expand Down
Loading