diff --git a/crates/daemon/src/connections_tb.rs b/crates/daemon/src/connections_tb.rs index 2f301923..ff742bd3 100644 --- a/crates/daemon/src/connections_tb.rs +++ b/crates/daemon/src/connections_tb.rs @@ -12,7 +12,7 @@ // this program. If not, see . // -//! An implementation of the connections db that uses tuplebox. +//! An implementation of the connections db that uses rdb. use std::collections::HashSet; use std::path::PathBuf; @@ -25,7 +25,7 @@ use strum::{Display, EnumCount, EnumIter, IntoEnumIterator}; use tracing::{debug, error, warn}; use uuid::Uuid; -use moor_db::tuplebox::{RelationId, RelationInfo, Transaction, TupleBox}; +use moor_db::rdb::{RelBox, RelationId, RelationInfo, Transaction}; use moor_kernel::tasks::sessions::SessionError; use moor_values::util::slice_ref::SliceRef; use moor_values::var::objid::Objid; @@ -36,7 +36,7 @@ use crate::connections::{ConnectionsDB, CONNECTION_TIMEOUT_DURATION}; const CONNECTIONS_DB_MEM_SIZE: usize = 1 << 26; pub struct ConnectionsTb { - tb: Arc, + tb: Arc, } impl ConnectionsTb { @@ -53,7 +53,7 @@ impl ConnectionsTb { .collect(); relations[ConnectionRelation::ClientConnection as usize].secondary_indexed = true; - let tb = TupleBox::new(CONNECTIONS_DB_MEM_SIZE, path, &relations, 1).await; + let tb = RelBox::new(CONNECTIONS_DB_MEM_SIZE, path, &relations, 1).await; Self { tb } } } diff --git a/crates/db/benches/tb_single_thread.rs b/crates/db/benches/tb_single_thread.rs index 303afa0a..b02086c0 100644 --- a/crates/db/benches/tb_single_thread.rs +++ b/crates/db/benches/tb_single_thread.rs @@ -16,19 +16,19 @@ //! Does not measure single-item reads, deletes, or updates, or concurrent access. use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use moor_db::rdb::{RelBox, RelationInfo}; use moor_db::testing::jepsen::{History, Type, Value}; -use moor_db::tuplebox::{RelationInfo, TupleBox}; use moor_values::util::slice_ref::SliceRef; use std::sync::Arc; use std::time::{Duration, Instant}; // This is a struct that tells Criterion.rs to use the "futures" crate's current-thread executor -use moor_db::tuplebox::RelationId; +use moor_db::rdb::RelationId; use moor_values::util::{BitArray, Bitset64}; use tokio::runtime::Runtime; /// Build a test database with a bunch of relations -async fn test_db() -> Arc { +async fn test_db() -> Arc { // Generate 10 test relations that we'll use for testing. let relations = (0..63) .map(|i| RelationInfo { @@ -39,7 +39,7 @@ async fn test_db() -> Arc { }) .collect::>(); - TupleBox::new(1 << 24, None, &relations, 0).await + RelBox::new(1 << 24, None, &relations, 0).await } fn from_val(value: i64) -> SliceRef { @@ -68,7 +68,7 @@ async fn list_append_scan_workload(iters: u64, events: &Vec) -> Duratio let start = Instant::now(); - black_box(for e in events { + for e in events { match e.r#type { Type::invoke => { // Start a transaction. @@ -114,7 +114,8 @@ async fn list_append_scan_workload(iters: u64, events: &Vec) -> Duratio tx.rollback().await.unwrap(); } } - }); + } + black_box(()); cumulative += start.elapsed(); } cumulative @@ -131,7 +132,7 @@ async fn list_append_seek_workload(iters: u64, events: &Vec) -> Duratio let mut processes: BitArray<_, 256, Bitset64<8>> = BitArray::new(); let start = Instant::now(); - black_box(for e in events { + for e in events { match e.r#type { Type::invoke => { // Start a transaction. @@ -181,7 +182,8 @@ async fn list_append_seek_workload(iters: u64, events: &Vec) -> Duratio tx.rollback().await.unwrap(); } } - }); + } + black_box(()); cumulative += start.elapsed(); } cumulative diff --git a/crates/db/src/lib.rs b/crates/db/src/lib.rs index 54c0e016..8e099c63 100644 --- a/crates/db/src/lib.rs +++ b/crates/db/src/lib.rs @@ -18,16 +18,15 @@ use moor_values::model::world_state::WorldStateSource; use moor_values::model::WorldStateError; use crate::loader::LoaderInterface; -use crate::tb_worldstate::TupleBoxWorldStateSource; +use crate::odb::RelBoxWorldState; mod db_loader_client; pub mod db_tx; mod db_worldstate; pub mod loader; -pub mod object_relations; -pub mod tb_worldstate; -pub mod tuplebox; +pub mod rdb; +pub mod odb; #[doc(hidden)] pub mod testing; @@ -63,8 +62,7 @@ impl DatabaseBuilder { /// database was newly created, and false if it was already present. pub async fn open_db(&self) -> Result<(Box, bool), String> { let (db, fresh) = - TupleBoxWorldStateSource::open(self.path.clone(), self.memory_size.unwrap_or(1 << 40)) - .await; + RelBoxWorldState::open(self.path.clone(), self.memory_size.unwrap_or(1 << 40)).await; Ok((Box::new(db), fresh)) } } diff --git a/crates/db/src/odb/mod.rs b/crates/db/src/odb/mod.rs new file mode 100644 index 00000000..5bf4c460 --- /dev/null +++ b/crates/db/src/odb/mod.rs @@ -0,0 +1,19 @@ +// Copyright (C) 2024 Ryan Daum +// +// This program is free software: you can redistribute it and/or modify it under +// the terms of the GNU General Public License as published by the Free Software +// Foundation, version 3. +// +// This program is distributed in the hope that it will be useful, but WITHOUT +// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +// FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License along with +// this program. If not, see . +// + +mod object_relations; +mod rb_worldstate; + +pub use object_relations::{WorldStateRelation, WorldStateSequences}; +pub use rb_worldstate::{RelBoxTransaction, RelBoxWorldState}; diff --git a/crates/db/src/object_relations.rs b/crates/db/src/odb/object_relations.rs similarity index 96% rename from crates/db/src/object_relations.rs rename to crates/db/src/odb/object_relations.rs index a2159bba..b1a686f9 100644 --- a/crates/db/src/object_relations.rs +++ b/crates/db/src/odb/object_relations.rs @@ -21,8 +21,8 @@ use moor_values::util::slice_ref::SliceRef; use moor_values::var::objid::Objid; use moor_values::AsByteBuffer; -use crate::tuplebox::TupleError; -use crate::tuplebox::{RelationId, Transaction}; +use crate::rdb::TupleError; +use crate::rdb::{RelationId, Transaction}; /// The set of binary relations that are used to represent the world state in the moor system. #[repr(usize)] @@ -204,7 +204,7 @@ async fn delete_if_exists( } } -pub async fn delete_composite_if_exists( +pub async fn delete_composite_if_exists( tx: &Transaction, rel: WorldStateRelation, oid: Objid, @@ -243,14 +243,14 @@ mod tests { use moor_values::model::objset::ObjSet; use moor_values::var::objid::Objid; - use crate::object_relations::WorldStateRelation::ObjectParent; - use crate::object_relations::{ + use crate::odb::object_relations::WorldStateRelation::ObjectParent; + use crate::odb::object_relations::{ get_object_by_codomain, get_object_value, insert_object_value, upsert_object_value, WorldStateRelation, WorldStateSequences, }; - use crate::tuplebox::{RelationInfo, TupleBox}; + use crate::rdb::{RelBox, RelationInfo}; - async fn test_db() -> Arc { + async fn test_db() -> Arc { let mut relations: Vec = WorldStateRelation::iter() .map(|wsr| { RelationInfo { @@ -264,7 +264,7 @@ mod tests { relations[ObjectParent as usize].secondary_indexed = true; relations[WorldStateRelation::ObjectLocation as usize].secondary_indexed = true; - TupleBox::new(1 << 24, None, &relations, WorldStateSequences::COUNT).await + RelBox::new(1 << 24, None, &relations, WorldStateSequences::COUNT).await } /// Test simple relations mapping oid->oid (with secondary index), independent of all other diff --git a/crates/db/src/tb_worldstate.rs b/crates/db/src/odb/rb_worldstate.rs similarity index 97% rename from crates/db/src/tb_worldstate.rs rename to crates/db/src/odb/rb_worldstate.rs index 4b044515..ef66e013 100644 --- a/crates/db/src/tb_worldstate.rs +++ b/crates/db/src/odb/rb_worldstate.rs @@ -21,7 +21,8 @@ use strum::{EnumCount, IntoEnumIterator}; use tracing::warn; use uuid::Uuid; -use crate::tuplebox::TupleError; +use crate::rdb::TupleError; +use crate::Database; use moor_values::model::defset::{HasUuid, Named}; use moor_values::model::objects::{ObjAttrs, ObjFlag}; use moor_values::model::objset::ObjSet; @@ -40,19 +41,19 @@ use moor_values::{AsByteBuffer, NOTHING, SYSTEM_OBJECT}; use crate::db_tx::DbTransaction; use crate::db_worldstate::DbTxWorldState; use crate::loader::LoaderInterface; -use crate::object_relations::{ +use crate::odb::object_relations; +use crate::odb::object_relations::{ get_all_object_keys_matching, WorldStateRelation, WorldStateSequences, }; -use crate::tuplebox::{CommitError, Transaction}; -use crate::tuplebox::{RelationInfo, TupleBox}; -use crate::{object_relations, Database}; +use crate::rdb::{CommitError, Transaction}; +use crate::rdb::{RelBox, RelationInfo}; -/// An implementation of `WorldState` / `WorldStateSource` that uses the TupleBox as its backing -pub struct TupleBoxWorldStateSource { - db: Arc, +/// An implementation of `WorldState` / `WorldStateSource` that uses the rdb as its backing +pub struct RelBoxWorldState { + db: Arc, } -impl TupleBoxWorldStateSource { +impl RelBoxWorldState { pub async fn open(path: Option, memory_size: usize) -> (Self, bool) { let mut relations: Vec = WorldStateRelation::iter() .map(|wsr| { @@ -69,7 +70,7 @@ impl TupleBoxWorldStateSource { relations[WorldStateRelation::ObjectParent as usize].secondary_indexed = true; // Same with "contents". relations[WorldStateRelation::ObjectLocation as usize].secondary_indexed = true; - let db = TupleBox::new(memory_size, path, &relations, WorldStateSequences::COUNT).await; + let db = RelBox::new(memory_size, path, &relations, WorldStateSequences::COUNT).await; // Check the db for sys (#0) object to see if this is a fresh DB or not. let fresh_db = { @@ -83,19 +84,19 @@ impl TupleBoxWorldStateSource { } #[async_trait] -impl WorldStateSource for TupleBoxWorldStateSource { +impl WorldStateSource for RelBoxWorldState { async fn new_world_state(&self) -> Result, WorldStateError> { - let tx = TupleBoxTransaction::new(self.db.clone()); + let tx = RelBoxTransaction::new(self.db.clone()); return Ok(Box::new(DbTxWorldState { tx: Box::new(tx) })); } } -pub struct TupleBoxTransaction { +pub struct RelBoxTransaction { tx: Transaction, } #[async_trait] -impl DbTransaction for TupleBoxTransaction { +impl DbTransaction for RelBoxTransaction { async fn get_players(&self) -> Result { // TODO: this is going to be not-at-all performant in the long run, and we'll need a way to // cache this or index it better @@ -304,7 +305,7 @@ impl DbTransaction for TupleBoxTransaction { for p in old_props.iter() { if old_ancestors.contains(&p.definer()) { delort_props.push(p.uuid()); - object_relations::delete_composite_if_exists::( + object_relations::delete_composite_if_exists( &self.tx, WorldStateRelation::ObjectPropertyValue, o, @@ -343,7 +344,7 @@ impl DbTransaction for TupleBoxTransaction { for p in old_props.iter() { if old_ancestors.contains(&p.definer()) { inherited_props.push(p.uuid()); - object_relations::delete_composite_if_exists::( + object_relations::delete_composite_if_exists( &self.tx, WorldStateRelation::ObjectPropertyValue, c, @@ -1003,8 +1004,8 @@ impl DbTransaction for TupleBoxTransaction { } } -impl TupleBoxTransaction { - pub fn new(db: Arc) -> Self { +impl RelBoxTransaction { + pub fn new(db: Arc) -> Self { let tx = db.start_tx(); Self { tx } } @@ -1082,9 +1083,9 @@ impl TupleBoxTransaction { } } -impl Database for TupleBoxWorldStateSource { +impl Database for RelBoxWorldState { fn loader_client(&mut self) -> Result, WorldStateError> { - let tx = TupleBoxTransaction::new(self.db.clone()); + let tx = RelBoxTransaction::new(self.db.clone()); Ok(Box::new(DbTxWorldState { tx: Box::new(tx) })) } @@ -1111,11 +1112,11 @@ mod tests { use moor_values::NOTHING; use crate::db_tx::DbTransaction; - use crate::object_relations::{WorldStateRelation, WorldStateSequences}; - use crate::tb_worldstate::TupleBoxTransaction; - use crate::tuplebox::{RelationInfo, TupleBox}; + use crate::odb::object_relations::{WorldStateRelation, WorldStateSequences}; + use crate::odb::rb_worldstate::RelBoxTransaction; + use crate::rdb::{RelBox, RelationInfo}; - async fn test_db() -> Arc { + async fn test_db() -> Arc { let mut relations: Vec = WorldStateRelation::iter() .map(|wsr| { RelationInfo { @@ -1129,13 +1130,13 @@ mod tests { relations[WorldStateRelation::ObjectParent as usize].secondary_indexed = true; relations[WorldStateRelation::ObjectLocation as usize].secondary_indexed = true; - TupleBox::new(1 << 24, None, &relations, WorldStateSequences::COUNT).await + RelBox::new(1 << 24, None, &relations, WorldStateSequences::COUNT).await } #[tokio::test] async fn test_create_object() { let db = test_db().await; - let tx = TupleBoxTransaction::new(db.clone()); + let tx = RelBoxTransaction::new(db.clone()); let oid = tx .create_object( None, @@ -1158,7 +1159,7 @@ mod tests { assert_eq!(tx.commit().await, Ok(CommitResult::Success)); // Verify existence in a new transaction. - let tx = TupleBoxTransaction::new(db); + let tx = RelBoxTransaction::new(db); assert!(tx.object_valid(oid).await.unwrap()); assert_eq!(tx.get_object_owner(oid).await.unwrap(), NOTHING); } @@ -1166,7 +1167,7 @@ mod tests { #[tokio::test] async fn test_create_object_fixed_id() { let db = test_db().await; - let tx = TupleBoxTransaction::new(db); + let tx = RelBoxTransaction::new(db); // Force at 1. let oid = tx .create_object(Some(Objid(1)), ObjAttrs::default()) @@ -1182,7 +1183,7 @@ mod tests { #[tokio::test] async fn test_parent_children() { let db = test_db().await; - let tx = TupleBoxTransaction::new(db); + let tx = RelBoxTransaction::new(db); // Single parent/child relationship. let a = tx @@ -1281,7 +1282,7 @@ mod tests { #[tokio::test] async fn test_descendants() { let db = test_db().await; - let tx = TupleBoxTransaction::new(db); + let tx = RelBoxTransaction::new(db); let a = tx .create_object( @@ -1373,7 +1374,7 @@ mod tests { #[tokio::test] async fn test_location_contents() { let db = test_db().await; - let tx = TupleBoxTransaction::new(db.clone()); + let tx = RelBoxTransaction::new(db.clone()); let a = tx .create_object( @@ -1483,7 +1484,7 @@ mod tests { #[tokio::test] async fn test_object_move_commits() { let db = test_db().await; - let tx = TupleBoxTransaction::new(db.clone()); + let tx = RelBoxTransaction::new(db.clone()); let a = tx .create_object( @@ -1541,7 +1542,7 @@ mod tests { assert_eq!(tx.commit().await, Ok(CommitResult::Success)); - let tx = TupleBoxTransaction::new(db.clone()); + let tx = RelBoxTransaction::new(db.clone()); assert_eq!(tx.get_object_location(b).await.unwrap(), a); assert_eq!(tx.get_object_location(c).await.unwrap(), a); let contents = tx @@ -1565,7 +1566,7 @@ mod tests { assert_eq!(tx.get_object_contents(c).await.unwrap(), ObjSet::from(&[b])); assert_eq!(tx.commit().await, Ok(CommitResult::Success)); - let tx = TupleBoxTransaction::new(db.clone()); + let tx = RelBoxTransaction::new(db.clone()); assert_eq!(tx.get_object_location(b).await.unwrap(), c); assert_eq!(tx.get_object_location(c).await.unwrap(), a); assert_eq!(tx.get_object_contents(a).await.unwrap(), ObjSet::from(&[c])); @@ -1576,7 +1577,7 @@ mod tests { #[tokio::test] async fn test_simple_property() { let db = test_db().await; - let tx = TupleBoxTransaction::new(db); + let tx = RelBoxTransaction::new(db); let oid = tx .create_object( @@ -1612,7 +1613,7 @@ mod tests { #[tokio::test] async fn test_verb_add_update() { let db = test_db().await; - let tx = TupleBoxTransaction::new(db.clone()); + let tx = RelBoxTransaction::new(db.clone()); let oid = tx .create_object( None, @@ -1666,7 +1667,7 @@ mod tests { // Now commit, and try to resolve again. assert_eq!(tx.commit().await, Ok(CommitResult::Success)); - let tx = TupleBoxTransaction::new(db); + let tx = RelBoxTransaction::new(db); let vh = tx.resolve_verb(oid, "test2".into(), None).await.unwrap(); assert_eq!(vh.names(), vec!["test2"]); assert_eq!(tx.commit().await, Ok(CommitResult::Success)); @@ -1675,7 +1676,7 @@ mod tests { #[tokio::test] async fn test_transitive_property_resolution() { let db = test_db().await; - let tx = TupleBoxTransaction::new(db); + let tx = RelBoxTransaction::new(db); let a = tx .create_object( @@ -1748,7 +1749,7 @@ mod tests { #[tokio::test] async fn test_transitive_property_resolution_clear_property() { let db = test_db().await; - let tx = TupleBoxTransaction::new(db); + let tx = RelBoxTransaction::new(db); let a = tx .create_object( @@ -1809,7 +1810,7 @@ mod tests { #[tokio::test] async fn test_verb_resolve() { let db = test_db().await; - let tx = TupleBoxTransaction::new(db.clone()); + let tx = RelBoxTransaction::new(db.clone()); let a = tx .create_object( @@ -1884,7 +1885,7 @@ mod tests { assert_eq!(tx.commit().await, Ok(CommitResult::Success)); // Verify existence in a new transaction. - let tx = TupleBoxTransaction::new(db); + let tx = RelBoxTransaction::new(db); assert_eq!( tx.resolve_verb(a, "test".into(), None) .await @@ -1905,7 +1906,7 @@ mod tests { #[tokio::test] async fn test_verb_resolve_inherited() { let db = test_db().await; - let tx = TupleBoxTransaction::new(db); + let tx = RelBoxTransaction::new(db); let a = tx .create_object( @@ -1975,7 +1976,7 @@ mod tests { #[tokio::test] async fn test_verb_resolve_wildcard() { let db = test_db().await; - let tx = TupleBoxTransaction::new(db); + let tx = RelBoxTransaction::new(db); let a = tx .create_object( None, diff --git a/crates/db/src/tuplebox/backing.rs b/crates/db/src/rdb/backing.rs similarity index 97% rename from crates/db/src/tuplebox/backing.rs rename to crates/db/src/rdb/backing.rs index dce14294..76dfb7d0 100644 --- a/crates/db/src/tuplebox/backing.rs +++ b/crates/db/src/rdb/backing.rs @@ -18,7 +18,7 @@ use tokio::sync::mpsc::UnboundedSender; -use crate::tuplebox::tx::WorkingSet; +use crate::rdb::tx::WorkingSet; pub struct BackingStoreClient { sender: UnboundedSender, diff --git a/crates/db/src/tuplebox/base_relation.rs b/crates/db/src/rdb/base_relation.rs similarity index 99% rename from crates/db/src/tuplebox/base_relation.rs rename to crates/db/src/rdb/base_relation.rs index 75d2635f..59796667 100644 --- a/crates/db/src/tuplebox/base_relation.rs +++ b/crates/db/src/rdb/base_relation.rs @@ -16,8 +16,8 @@ use std::collections::HashSet; use moor_values::util::slice_ref::SliceRef; -use crate::tuplebox::tuples::TupleRef; -use crate::tuplebox::RelationId; +use crate::rdb::tuples::TupleRef; +use crate::rdb::RelationId; /// Represents a 'canonical' base binary relation, which is a set of tuples of domain, codomain, /// with a default (hash) index on the domain and an optional (hash) index on the codomain. diff --git a/crates/db/src/tuplebox/coldstorage.rs b/crates/db/src/rdb/coldstorage.rs similarity index 95% rename from crates/db/src/tuplebox/coldstorage.rs rename to crates/db/src/rdb/coldstorage.rs index 89b92a5d..34c87bae 100644 --- a/crates/db/src/tuplebox/coldstorage.rs +++ b/crates/db/src/rdb/coldstorage.rs @@ -29,16 +29,17 @@ use tokio::sync::mpsc::UnboundedReceiver; use tokio_eventfd::EventFd; use tracing::{debug, error, info, warn}; -use crate::tuplebox::backing::{BackingStoreClient, WriterMessage}; -use crate::tuplebox::base_relation::BaseRelation; -use crate::tuplebox::page_storage::{PageStore, PageStoreMutation}; -use crate::tuplebox::tb::RelationInfo; -use crate::tuplebox::tuples::TxTuple; -use crate::tuplebox::tuples::{PageId, SlotBox, SlotId, TupleId}; -use crate::tuplebox::tx::WorkingSet; -use crate::tuplebox::RelationId; - -/// Uses WAL + custom page store as the persistent backing store & write-ahead-log for the tuplebox. +use crate::rdb::backing::{BackingStoreClient, WriterMessage}; +use crate::rdb::base_relation::BaseRelation; +use crate::rdb::page_storage::{PageStore, PageStoreMutation}; +use crate::rdb::paging::TupleBox; +use crate::rdb::paging::{PageId, SlotId}; +use crate::rdb::relbox::RelationInfo; +use crate::rdb::tuples::{TupleId, TxTuple}; +use crate::rdb::tx::WorkingSet; +use crate::rdb::RelationId; + +/// Uses WAL + custom page store as the persistent backing store & write-ahead-log for the rdb. pub struct ColdStorage {} define_layout!(sequence_page, LittleEndian, { @@ -63,7 +64,7 @@ impl ColdStorage { _schema: &[RelationInfo], relations: &mut [BaseRelation], sequences: &mut Vec, - slot_box: Arc, + tuple_box: Arc, ) -> BackingStoreClient { let eventfd = EventFd::new(0, false).unwrap(); @@ -90,7 +91,7 @@ impl ColdStorage { let sequence_page = sequence_page::View::new(&sequence_page[..]); let num_sequences = sequence_page.num_sequences().read(); assert_eq!(num_sequences, sequences.len() as u64, - "Number of sequences in the sequence page does not match the number of sequences in the tuplebox"); + "Number of sequences in the sequence page does not match the number of sequences in the rdb"); let sequences_bytes = sequence_page.sequences().to_vec(); let sequence_size = sequence::SIZE.unwrap() as u64; for i in 0..num_sequences { @@ -107,7 +108,7 @@ impl ColdStorage { let mut restored_slots = HashMap::new(); let mut restored_bytes = 0; for (page_size, page_num, relation_id) in ids { - let tuple_ids = slot_box + let tuple_ids = tuple_box .clone() .load_page(relation_id, page_num, |buf| { ps.read_page_buf(page_num, relation_id, buf) @@ -145,7 +146,7 @@ impl ColdStorage { tokio::spawn(Self::listen_loop( writer_receive, wal, - slot_box.clone(), + tuple_box.clone(), page_storage.clone(), eventfd, )); @@ -157,7 +158,7 @@ impl ColdStorage { async fn listen_loop( mut writer_receive: UnboundedReceiver, wal: WriteAheadLog, - slot_box: Arc, + tuple_box: Arc, ps: Arc>, mut event_fd: EventFd, ) { @@ -167,7 +168,7 @@ impl ColdStorage { writer_message = writer_receive.recv() => { match writer_message { Some(WriterMessage::Commit(ts, ws, sequences)) => { - Self::perform_writes(wal.clone(), slot_box.clone(), ts, ws, sequences).await; + Self::perform_writes(wal.clone(), tuple_box.clone(), ts, ws, sequences).await; } Some(WriterMessage::Shutdown) => { // Flush the WAL @@ -195,7 +196,7 @@ impl ColdStorage { /// the changes durable. async fn perform_writes( wal: WriteAheadLog, - slot_box: Arc, + tuple_box: Arc, ts: u64, ws: WorkingSet, sequences: Vec, @@ -260,7 +261,7 @@ impl ColdStorage { for (page_id, r) in &dirty_pages { // Get the slotboxy page for this tuple. - let Ok(page) = slot_box.page_for(*page_id) else { + let Ok(page) = tuple_box.page_for(*page_id) else { // If the slot or page is already gone, ce la vie, we don't need to sync it. continue; }; diff --git a/crates/db/src/tuplebox/mod.rs b/crates/db/src/rdb/mod.rs similarity index 96% rename from crates/db/src/tuplebox/mod.rs rename to crates/db/src/rdb/mod.rs index 340e1570..9bdb3b75 100644 --- a/crates/db/src/tuplebox/mod.rs +++ b/crates/db/src/rdb/mod.rs @@ -29,11 +29,12 @@ mod coldstorage; mod page_storage; mod pool; -mod tb; +mod paging; +mod relbox; mod tuples; mod tx; -pub use tb::{RelationInfo, TupleBox}; +pub use relbox::{RelBox, RelationInfo}; pub use tuples::TupleError; pub use tx::{CommitError, Transaction}; diff --git a/crates/db/src/tuplebox/page_storage.rs b/crates/db/src/rdb/page_storage.rs similarity index 99% rename from crates/db/src/tuplebox/page_storage.rs rename to crates/db/src/rdb/page_storage.rs index 02aadd69..ea16800c 100644 --- a/crates/db/src/tuplebox/page_storage.rs +++ b/crates/db/src/rdb/page_storage.rs @@ -14,8 +14,8 @@ // TODO: there's no way this is "robust" enough to be used in production -use crate::tuplebox::tuples::PageId; -use crate::tuplebox::RelationId; +use crate::rdb::paging::PageId; +use crate::rdb::RelationId; use io_uring::squeue::Flags; use io_uring::types::Fd; use io_uring::{opcode, IoUring}; diff --git a/crates/db/src/rdb/paging/mod.rs b/crates/db/src/rdb/paging/mod.rs new file mode 100644 index 00000000..b2219eb2 --- /dev/null +++ b/crates/db/src/rdb/paging/mod.rs @@ -0,0 +1,17 @@ +use thiserror::Error; + +mod slotted_page; +mod tuple_box; +mod tuple_ptr; + +pub use slotted_page::SlotId; +pub use tuple_box::{PageId, TupleBox}; +pub use tuple_ptr::TuplePtr; + +#[derive(Debug, Clone, Error)] +pub enum TupleBoxError { + #[error("Page is full, cannot insert slot of size {0} with {1} bytes remaining")] + BoxFull(usize, usize), + #[error("Tuple not found at index {0}")] + TupleNotFound(usize), +} diff --git a/crates/db/src/tuplebox/tuples/slotted_page.rs b/crates/db/src/rdb/paging/slotted_page.rs similarity index 86% rename from crates/db/src/tuplebox/tuples/slotted_page.rs rename to crates/db/src/rdb/paging/slotted_page.rs index 867eb05e..04a39c3b 100644 --- a/crates/db/src/tuplebox/tuples/slotted_page.rs +++ b/crates/db/src/rdb/paging/slotted_page.rs @@ -35,7 +35,7 @@ use std::sync::atomic::Ordering::{Acquire, Relaxed, Release, SeqCst}; use atomic_wait::{wait, wake_all, wake_one}; use tracing::error; -use crate::tuplebox::tuples::slotbox::SlotBoxError; +use crate::rdb::paging::TupleBoxError; pub type SlotId = u32; @@ -45,7 +45,7 @@ pub type SlotId = u32; // identical, and we can just start using them right away in a null-state without doing any // initialization. #[repr(C, align(8))] -struct SlottedPageHeader { +struct PageHeader { // The number of bytes used in the page used_bytes: u32, // The length of our slots index in bytes. Starts at initial zero. @@ -65,7 +65,7 @@ struct SlottedPageHeader { _pin: std::marker::PhantomPinned, } -impl SlottedPageHeader { +impl PageHeader { /// Explicit unlock. Used by both the guard fn unlock_for_writes(self: Pin<&mut Self>) { self.lock_state.store(0, Release); @@ -99,7 +99,7 @@ impl SlottedPageHeader { header.used_bytes += size as u32; header.num_slots += 1; header.content_length += padded_size as u32; - header.index_length += std::mem::size_of::() as u32; + header.index_length += std::mem::size_of::() as u32; new_slot } } @@ -116,7 +116,7 @@ impl SlottedPageHeader { } #[repr(C, align(8))] -struct SlotIndexEntry { +struct IndexEntry { used: bool, // The number of live references to this slot refcount: u16, @@ -131,7 +131,7 @@ struct SlotIndexEntry { _pin: std::marker::PhantomPinned, } -impl SlotIndexEntry { +impl IndexEntry { // Update accounting for the presence of a new entry. fn alloc( mut self: Pin<&mut Self>, @@ -203,15 +203,15 @@ pub fn slot_page_empty_size(page_size: usize) -> usize { } pub const fn slot_page_overhead() -> usize { - std::mem::size_of::() + std::mem::size_of::() } pub const fn slot_index_overhead() -> usize { - std::mem::size_of::() + std::mem::size_of::() } impl<'a> SlottedPage<'a> { - pub fn for_page(base_address: *mut u8, page_size: usize) -> Self { + pub(crate) fn for_page(base_address: *mut u8, page_size: usize) -> Self { Self { base_address, page_size: page_size as u32, @@ -221,24 +221,24 @@ impl<'a> SlottedPage<'a> { /// How much space is available in this page? #[allow(dead_code)] - pub fn free_space_bytes(&self) -> usize { + pub(crate) fn free_space_bytes(&self) -> usize { let header = self.header(); - let used = (header.num_slots * std::mem::size_of::() as u32) as usize + let used = (header.num_slots * std::mem::size_of::() as u32) as usize + header.used_bytes as usize - + std::mem::size_of::(); + + std::mem::size_of::(); (self.page_size as usize).saturating_sub(used) } /// How many bytes are available for appending to this page (i.e. not counting the space /// we could re-use, via e.g. used_bytes) - pub fn available_content_bytes(&self) -> usize { + pub(crate) fn available_content_bytes(&self) -> usize { let header = self.header(); let content_length = header.content_length as usize; let index_length = header.index_length as usize; - let header_size = std::mem::size_of::(); + let header_size = std::mem::size_of::(); - let avail = index_length + content_length + header_size; - (self.page_size as usize).saturating_sub(avail) + let consumed = index_length + content_length + header_size; + (self.page_size as usize).saturating_sub(consumed) } /// Add the slot into the page, copying it into the memory region, and returning the slot id @@ -247,11 +247,11 @@ impl<'a> SlottedPage<'a> { &self, size: usize, initial_value: Option<&[u8]>, - ) -> Result<(SlotId, usize, Pin<&'a mut [u8]>), SlotBoxError> { + ) -> Result<(SlotId, usize, Pin<&'a mut [u8]>), TupleBoxError> { // See if we can use an existing slot to put the slot in, or if there's any fit at all. let (can_fit, fit_slot) = self.find_fit(size); if !can_fit { - return Err(SlotBoxError::BoxFull(size, self.available_content_bytes())); + return Err(TupleBoxError::BoxFull(size, self.available_content_bytes())); } let header = self.header_mut(); if let Some(fit_slot) = fit_slot { @@ -290,9 +290,9 @@ impl<'a> SlottedPage<'a> { let content_start_position = (content_start_position + 7) & !7; // If the content start bleeds over into the index (+ our new entry), then we can't fit the slot. - let index_entry_size = std::mem::size_of::(); + let index_entry_size = std::mem::size_of::(); if content_start_position <= current_index_end + index_entry_size { - return Err(SlotBoxError::BoxFull( + return Err(TupleBoxError::BoxFull( size + index_entry_size, self.available_content_bytes(), )); @@ -362,14 +362,14 @@ impl<'a> SlottedPage<'a> { } /// Copy the contents of this page into a slice. - pub fn save_into(&self, buf: &mut [u8]) { + pub(crate) fn save_into(&self, buf: &mut [u8]) { let _ = self.read_lock(); let memory_as_slice = unsafe { std::slice::from_raw_parts_mut(self.base_address, self.page_size as usize) }; buf.copy_from_slice(memory_as_slice); } - fn remove_slot(&self, slot_id: SlotId) -> Result<(usize, usize, bool), SlotBoxError> { + fn remove_slot(&self, slot_id: SlotId) -> Result<(usize, usize, bool), TupleBoxError> { // TODO: slots at start of content-length can be removed by shrinking the content-length // portion. @@ -392,21 +392,21 @@ impl<'a> SlottedPage<'a> { Ok((self.available_content_bytes(), slot_size, is_empty)) } - pub(crate) fn refcount(&self, slot_id: SlotId) -> Result { + pub(crate) fn refcount(&self, slot_id: SlotId) -> Result { let index_entry = self.get_index_entry(slot_id); if !index_entry.used { - return Err(SlotBoxError::TupleNotFound(slot_id as usize)); + return Err(TupleBoxError::TupleNotFound(slot_id as usize)); } Ok(index_entry.refcount) } - pub(crate) fn upcount(&self, slot_id: SlotId) -> Result<(), SlotBoxError> { + pub(crate) fn upcount(&self, slot_id: SlotId) -> Result<(), TupleBoxError> { let mut index_entry = self.get_index_entry_mut(slot_id); unsafe { index_entry.as_mut().get_unchecked_mut() }.refcount += 1; Ok(()) } - pub(crate) fn dncount(&self, slot_id: SlotId) -> Result { + pub(crate) fn dncount(&self, slot_id: SlotId) -> Result { let mut index_entry = self.get_index_entry_mut(slot_id); unsafe { index_entry.as_mut().get_unchecked_mut() }.refcount -= 1; if index_entry.refcount == 0 { @@ -415,7 +415,8 @@ impl<'a> SlottedPage<'a> { Ok(false) } - fn get_slot(&self, slot_id: SlotId) -> Result, SlotBoxError> { + #[allow(dead_code)] + pub(crate) fn get_slot(&self, slot_id: SlotId) -> Result, TupleBoxError> { // Check that the index is in bounds let num_slots = self.header().num_slots as SlotId; if slot_id >= num_slots { @@ -423,14 +424,14 @@ impl<'a> SlottedPage<'a> { "slot_id {} is out of bounds for page with {} slots", slot_id, num_slots ); - return Err(SlotBoxError::TupleNotFound(slot_id as usize)); + return Err(TupleBoxError::TupleNotFound(slot_id as usize)); } // Read the index entry; let index_entry = self.get_index_entry(slot_id); if !index_entry.used { error!("slot_id {} is not used, invalid tuple", slot_id); - return Err(SlotBoxError::TupleNotFound(slot_id as usize)); + return Err(TupleBoxError::TupleNotFound(slot_id as usize)); } let offset = index_entry.offset as usize; let length = index_entry.used_bytes as usize; @@ -443,17 +444,17 @@ impl<'a> SlottedPage<'a> { Ok(unsafe { Pin::new_unchecked(&memory_as_slice[offset..offset + length]) }) } - fn get_slot_mut(&self, slot_id: SlotId) -> Result, SlotBoxError> { + fn get_slot_mut(&self, slot_id: SlotId) -> Result, TupleBoxError> { // Check that the index is in bounds let num_slots = self.header().num_slots as SlotId; if slot_id >= num_slots { - return Err(SlotBoxError::TupleNotFound(slot_id as usize)); + return Err(TupleBoxError::TupleNotFound(slot_id as usize)); } // Read the index entry; let index_entry = self.get_index_entry(slot_id); if !index_entry.used { - return Err(SlotBoxError::TupleNotFound(slot_id as usize)); + return Err(TupleBoxError::TupleNotFound(slot_id as usize)); } let offset = index_entry.offset as usize; let length = index_entry.used_bytes as usize; @@ -550,27 +551,27 @@ impl<'a> SlottedPage<'a> { } #[inline] - fn header(&self) -> Pin<&SlottedPageHeader> { + fn header(&self) -> Pin<&PageHeader> { // Cast the base address to a pointear to the header - let header_ptr = self.base_address as *const SlottedPageHeader; + let header_ptr = self.base_address as *const PageHeader; unsafe { Pin::new_unchecked(&*header_ptr) } } #[inline] - fn header_mut(&self) -> Pin<&mut SlottedPageHeader> { + fn header_mut(&self) -> Pin<&mut PageHeader> { // Cast the base address to a pointer to the header - let header_ptr = self.base_address as *mut SlottedPageHeader; + let header_ptr = self.base_address as *mut PageHeader; unsafe { Pin::new_unchecked(&mut *header_ptr) } } /// Return the offset, size of the slot at the given index. - fn offset_of(&self, tid: SlotId) -> Result<(usize, usize), SlotBoxError> { + fn offset_of(&self, tid: SlotId) -> Result<(usize, usize), TupleBoxError> { // Check that the index is in bounds let num_slots = self.header().num_slots as SlotId; if tid >= num_slots { - return Err(SlotBoxError::TupleNotFound(tid as usize)); + return Err(TupleBoxError::TupleNotFound(tid as usize)); } // Read the index entry; @@ -602,7 +603,7 @@ impl<'a> SlottedPage<'a> { let index_length = header.index_length as isize; let content_length = header.content_length as isize; - let header_size = std::mem::size_of::() as isize; + let header_size = std::mem::size_of::() as isize; let total_needed = index_length + content_length + header_size; // Align to 8-byte boundary cuz that's what we'll actually need. @@ -615,9 +616,9 @@ impl<'a> SlottedPage<'a> { (avail >= size as isize, None) } - fn get_index_entry(&self, slot_id: SlotId) -> Pin<&SlotIndexEntry> { - let index_offset = std::mem::size_of::() - + ((slot_id as usize) * std::mem::size_of::()); + fn get_index_entry(&self, slot_id: SlotId) -> Pin<&IndexEntry> { + let index_offset = std::mem::size_of::() + + ((slot_id as usize) * std::mem::size_of::()); let base_address = self.base_address; @@ -630,13 +631,13 @@ impl<'a> SlottedPage<'a> { "slot {} is not 8-byte aligned", slot_id ); - Pin::new_unchecked(&*(slot_address as *const SlotIndexEntry)) + Pin::new_unchecked(&*(slot_address as *const IndexEntry)) } } - fn get_index_entry_mut(&self, slot_id: SlotId) -> Pin<&mut SlotIndexEntry> { - let index_offset = std::mem::size_of::() - + ((slot_id as usize) * std::mem::size_of::()); + fn get_index_entry_mut(&self, slot_id: SlotId) -> Pin<&mut IndexEntry> { + let index_offset = std::mem::size_of::() + + ((slot_id as usize) * std::mem::size_of::()); let base_address = self.base_address; unsafe { @@ -648,7 +649,7 @@ impl<'a> SlottedPage<'a> { "slot {} is not 8-byte aligned", slot_id ); - Pin::new_unchecked(&mut *(slot_address as *mut SlotIndexEntry)) + Pin::new_unchecked(&mut *(slot_address as *mut IndexEntry)) } } } @@ -661,12 +662,8 @@ pub struct PageWriteGuard<'a> { } impl<'a> PageWriteGuard<'a> { - pub fn get_slot_mut(&mut self, slot_id: SlotId) -> Result, SlotBoxError> { - let sp = SlottedPage { - base_address: self.base_address, - page_size: self.page_size, - _marker: Default::default(), - }; + pub fn get_slot_mut(&mut self, slot_id: SlotId) -> Result, TupleBoxError> { + let sp = SlottedPage::for_page(self.base_address, self.page_size as usize); sp.get_slot_mut(slot_id) } @@ -675,38 +672,25 @@ impl<'a> PageWriteGuard<'a> { &mut self, size: usize, initial_value: Option<&[u8]>, - ) -> Result<(SlotId, usize, Pin<&'a mut [u8]>), SlotBoxError> { - let sp = SlottedPage { - base_address: self.base_address, - page_size: self.page_size, - _marker: Default::default(), - }; + ) -> Result<(SlotId, usize, Pin<&'a mut [u8]>), TupleBoxError> { + let sp = SlottedPage::for_page(self.base_address, self.page_size as usize); sp.allocate(size, initial_value) } - pub fn remove_slot(&mut self, slot_id: SlotId) -> Result<(usize, usize, bool), SlotBoxError> { - let sp = SlottedPage { - base_address: self.base_address, - page_size: self.page_size, - _marker: Default::default(), - }; + pub fn remove_slot(&mut self, slot_id: SlotId) -> Result<(usize, usize, bool), TupleBoxError> { + let sp = SlottedPage::for_page(self.base_address, self.page_size as usize); sp.remove_slot(slot_id) } #[inline(always)] - pub fn upcount(&mut self, slot_id: SlotId) -> Result<(), SlotBoxError> { - let sp = SlottedPage { - base_address: self.base_address, - page_size: self.page_size, - _marker: Default::default(), - }; + pub fn upcount(&mut self, slot_id: SlotId) -> Result<(), TupleBoxError> { + let sp = SlottedPage::for_page(self.base_address, self.page_size as usize); sp.upcount(slot_id) } #[inline] - fn header_mut(&self) -> Pin<&mut SlottedPageHeader> { - let header_ptr = self.base_address as *mut SlottedPageHeader; - + fn header_mut(&self) -> Pin<&mut PageHeader> { + let header_ptr = self.base_address as *mut PageHeader; unsafe { Pin::new_unchecked(&mut *header_ptr) } } } @@ -728,19 +712,16 @@ pub struct PageReadGuard<'a> { impl<'a> PageReadGuard<'a> { #[inline(always)] - fn header(&self) -> Pin<&SlottedPageHeader> { - let header_ptr = self.base_address as *const SlottedPageHeader; + fn header(&self) -> Pin<&PageHeader> { + let header_ptr = self.base_address as *const PageHeader; unsafe { Pin::new_unchecked(&*header_ptr) } } #[inline(always)] - pub fn get_slot(&self, slot_id: SlotId) -> Result, SlotBoxError> { - let sp = SlottedPage { - base_address: self.base_address as _, - page_size: self.page_size as _, - _marker: Default::default(), - }; + #[allow(dead_code)] + pub(crate) fn get_slot(&self, slot_id: SlotId) -> Result, TupleBoxError> { + let sp = SlottedPage::for_page(self.base_address as *mut u8, self.page_size as usize); sp.get_slot(slot_id) } } @@ -762,10 +743,8 @@ impl<'a> Drop for PageReadGuard<'a> { #[cfg(test)] mod tests { - use crate::tuplebox::tuples::slotbox::SlotBoxError; - use crate::tuplebox::tuples::slotted_page::{ - slot_page_empty_size, SlotId, SlotIndexEntry, SlottedPage, - }; + use crate::rdb::paging::slotted_page::{slot_page_empty_size, IndexEntry, SlotId, SlottedPage}; + use crate::rdb::paging::TupleBoxError; fn random_fill(page: &SlottedPage) -> Vec<(SlotId, Vec)> { let mut collected_slots = vec![]; @@ -775,12 +754,12 @@ mod tests { let result = page.allocate(size, Some(&vec![123; size])); // If avail can't fit the size of the slot plus the index entry, then we should be // getting an error. - if avail < size + std::mem::size_of::() { - assert!(matches!(result, Err(SlotBoxError::BoxFull(_, _)))); + if avail < size + std::mem::size_of::() { + assert!(matches!(result, Err(TupleBoxError::BoxFull(_, _)))); break; } // Sometimes we can cease allocation because that's how the cookie crumbles with padding, - if matches!(result, Err(SlotBoxError::BoxFull(_, _))) { + if matches!(result, Err(TupleBoxError::BoxFull(_, _))) { break; } // Otherwise, we should be getting a slot id and a new avail that is smaller than the @@ -845,7 +824,7 @@ mod tests { page.remove_slot(*tid).unwrap(); assert!(matches!( page.get_slot(*tid), - Err(SlotBoxError::TupleNotFound(_)) + Err(TupleBoxError::TupleNotFound(_)) )); removed_slots.push(*tid); } diff --git a/crates/db/src/tuplebox/tuples/slotbox.rs b/crates/db/src/rdb/paging/tuple_box.rs similarity index 82% rename from crates/db/src/tuplebox/tuples/slotbox.rs rename to crates/db/src/rdb/paging/tuple_box.rs index fa27df7f..3ac66ead 100644 --- a/crates/db/src/tuplebox/tuples/slotbox.rs +++ b/crates/db/src/rdb/paging/tuple_box.rs @@ -16,8 +16,8 @@ // most common case of fixed-size tuples. // TODO: implement the ability to expire and page-out tuples based on LRU or random/second // chance eviction (ala leanstore). will require separate PageIds from Bids, and will -// involve rewriting SlotPtr on the fly to point to a new page when restored. -// SlotPtr will also get a new field for last-access-time, so that we can do our eviction +// involve rewriting TuplePtr on the fly to point to a new page when restored. +// TuplePtr will also get a new field for last-access-time, so that we can do our eviction // TODO: store indexes in here, too (custom paged datastructure impl) // TODO: verify locking/concurrency safety of this thing -- loom test, stateright, or jepsen, etc. // TODO: there is still some really gross stuff in here about the management of free space in @@ -26,8 +26,6 @@ // to be a sporadic failure where we end up with a "Page not found" error in the allocator on // free, meaning the page was not found in the used pages list. // whether any of this is worth futzing with after the fixed-size impl is done, I don't know. -// TODO: rename me, _I_ am the tuplebox. The "slots" are just where my tuples get stored. tho once -// indexes are in here, things will get confusing (everything here assumes pages hold tuples) use std::cmp::max; use std::collections::HashMap; @@ -36,35 +34,24 @@ use std::sync::atomic::Ordering::SeqCst; use std::sync::{Arc, Mutex}; use moor_values::util::{BitArray, Bitset64}; -use thiserror::Error; -use tracing::{error, warn}; - -use crate::tuplebox::pool::{Bid, BufferPool, PagerError}; -pub use crate::tuplebox::tuples::slotted_page::SlotId; -use crate::tuplebox::tuples::slotted_page::{ - slot_index_overhead, slot_page_empty_size, SlottedPage, -}; -use crate::tuplebox::tuples::tuple_ptr::TuplePtr; -use crate::tuplebox::tuples::{TupleId, TupleRef}; -use crate::tuplebox::RelationId; +use tracing::warn; + +use crate::rdb::paging::slotted_page::{slot_index_overhead, slot_page_empty_size, SlottedPage}; +use crate::rdb::paging::tuple_ptr::TuplePtr; +use crate::rdb::paging::TupleBoxError; +use crate::rdb::pool::{Bid, BufferPool, PagerError}; +use crate::rdb::tuples::{TupleId, TupleRef}; +use crate::rdb::RelationId; pub type PageId = usize; -/// A SlotBox is a collection of (variable sized) pages, each of which is a collection of slots, each of which is holds +/// A TupleBox is a collection of (variable sized) pages, each of which is a collection of slots, each of which is holds /// dynamically sized tuples. -pub struct SlotBox { +pub struct TupleBox { inner: Mutex, } -#[derive(Debug, Clone, Error)] -pub enum SlotBoxError { - #[error("Page is full, cannot insert slot of size {0} with {1} bytes remaining")] - BoxFull(usize, usize), - #[error("Tuple not found at index {0}")] - TupleNotFound(usize), -} - -impl SlotBox { +impl TupleBox { pub fn new(virt_size: usize) -> Self { let pool = BufferPool::new(virt_size).expect("Could not create buffer pool"); let inner = Mutex::new(Inner::new(pool)); @@ -73,23 +60,32 @@ impl SlotBox { /// Allocates a new slot for a tuple, somewhere in one of the pages we managed. /// Does not allow tuples from different relations to mix on the same page. + #[inline(always)] pub fn allocate( self: Arc, size: usize, relation_id: RelationId, initial_value: Option<&[u8]>, - ) -> Result { + ) -> Result { let mut inner = self.inner.lock().unwrap(); inner.do_alloc(size, relation_id, initial_value, &self) } + #[inline(always)] + pub fn get(&self, id: TupleId) -> Result { + let mut inner = self.inner.lock().unwrap(); + inner.do_get(id) + } + + /// Restore a page for `relation_id` from secondary storage, returning references to all the + /// tuples discovered in it as a result. pub(crate) fn load_page)>( self: Arc, relation_id: RelationId, id: PageId, mut lf: LF, - ) -> Result, SlotBoxError> { + ) -> Result, TupleBoxError> { let mut inner = self.inner.lock().unwrap(); // Re-allocate the page. @@ -100,16 +96,16 @@ impl SlotBox { lf(buf); }); - // Now make sure we have swizrefs for all of them. + // Now make sure we have pointers for all of them. let mut refs = vec![]; for (slot, buflen, addr) in slot_ids.into_iter() { let tuple_id = TupleId { page: id, slot }; let swizref = Box::pin(TuplePtr::create(self.clone(), tuple_id, addr, buflen)); - inner.swizrefs.insert(tuple_id, swizref); - let swizref = inner.swizrefs.get_mut(&tuple_id).unwrap(); - let sp = unsafe { Pin::into_inner_unchecked(swizref.as_mut()) }; + inner.tuple_ptrs.insert(tuple_id, swizref); + let tuple_ptr = inner.tuple_ptrs.get_mut(&tuple_id).unwrap(); + let sp = unsafe { Pin::into_inner_unchecked(tuple_ptr.as_mut()) }; let ptr = sp as *mut TuplePtr; - let tuple_ref = TupleRef::at_ptr(ptr); + let tuple_ref = TupleRef::at_tptr(ptr); refs.push(tuple_ref); } // The allocator needs to know that this page is used. @@ -118,26 +114,26 @@ impl SlotBox { } #[inline(always)] - pub(crate) fn page_for<'a>(&self, id: PageId) -> Result, SlotBoxError> { + pub(crate) fn page_for<'a>(&self, id: PageId) -> Result, TupleBoxError> { let inner = self.inner.lock().unwrap(); inner.page_for(id) } - pub fn refcount(&self, id: TupleId) -> Result { + pub fn refcount(&self, id: TupleId) -> Result { let inner = self.inner.lock().unwrap(); let page_handle = inner.page_for(id.page)?; page_handle.refcount(id.slot) } #[inline(always)] - pub fn upcount(&self, id: TupleId) -> Result<(), SlotBoxError> { + pub fn upcount(&self, id: TupleId) -> Result<(), TupleBoxError> { let inner = self.inner.lock().unwrap(); let page_handle = inner.page_for(id.page)?; page_handle.upcount(id.slot) } #[inline(always)] - pub fn dncount(&self, id: TupleId) -> Result<(), SlotBoxError> { + pub fn dncount(&self, id: TupleId) -> Result<(), TupleBoxError> { let mut inner = self.inner.lock().unwrap(); let page_handle = inner.page_for(id.page)?; if page_handle.dncount(id.slot)? { @@ -146,47 +142,11 @@ impl SlotBox { Ok(()) } - #[inline(always)] - pub fn get(&self, id: TupleId) -> Result, SlotBoxError> { - let inner = self.inner.lock().unwrap(); - let page_handle = inner.page_for(id.page)?; - - let lock = page_handle.read_lock(); - - let slc = lock.get_slot(id.slot)?; - Ok(slc) - } - - pub fn update( - self: Arc, - relation_id: RelationId, - id: TupleId, - new_value: &[u8], - ) -> Result, SlotBoxError> { - let new_tup = { - let mut inner = self.inner.lock().unwrap(); - let mut page_handle = inner.page_for(id.page)?; - - // If the value size is the same as the old value, we can just update in place, otherwise - // it's a brand new allocation, and we have to remove the old one first. - let mut page_write = page_handle.write_lock(); - let mut existing = page_write.get_slot_mut(id.slot).expect("Invalid tuple id"); - if existing.len() == new_value.len() { - existing.copy_from_slice(new_value); - return Ok(None); - } - inner.do_remove(id)?; - - inner.do_alloc(new_value.len(), relation_id, Some(new_value), &self)? - }; - Ok(Some(new_tup)) - } - pub fn update_with)>( &self, id: TupleId, mut f: F, - ) -> Result<(), SlotBoxError> { + ) -> Result<(), TupleBoxError> { let inner = self.inner.lock().unwrap(); let mut page_handle = inner.page_for(id.page)?; let mut page_write = page_handle.write_lock(); @@ -224,7 +184,7 @@ struct Inner { /// pointers in the TupleRefs themselves. // TODO: This needs to be broken down by page id, too, so that we can manage swap-in/swap-out at // the page granularity. - swizrefs: HashMap>>, + tuple_ptrs: HashMap>>, } impl Inner { @@ -232,7 +192,7 @@ impl Inner { Self { available_page_space: BitArray::new(), pool, - swizrefs: HashMap::new(), + tuple_ptrs: HashMap::new(), } } @@ -241,8 +201,8 @@ impl Inner { size: usize, relation_id: RelationId, initial_value: Option<&[u8]>, - sb: &Arc, - ) -> Result { + sb: &Arc, + ) -> Result { let tuple_size = size + slot_index_overhead(); let page_size = max(32768, tuple_size.next_power_of_two()); @@ -268,14 +228,16 @@ impl Inner { // Heap allocate the swizref, and and pin it, take the address of it, then stick the swizref // in our set. - let mut swizref = Box::pin(TuplePtr::create(sb.clone(), tuple_id, bufaddr, buflen)); - let swizaddr = unsafe { swizref.as_mut().get_unchecked_mut() } as *mut TuplePtr; - self.swizrefs.insert(tuple_id, swizref); + let mut tuple_ptr = + Box::pin(TuplePtr::create(sb.clone(), tuple_id, bufaddr, buflen)); + let tuple_ptr_addr = + unsafe { tuple_ptr.as_mut().get_unchecked_mut() } as *mut TuplePtr; + self.tuple_ptrs.insert(tuple_id, tuple_ptr); // Establish initial refcount using this existing lock. page_write_lock.upcount(slot).unwrap(); - return Ok(TupleRef::at_ptr(swizaddr)); + return Ok(TupleRef::at_tptr(tuple_ptr_addr)); } tries += 1; if tries > 50 { @@ -284,11 +246,27 @@ impl Inner { } } - fn do_restore_page<'a>(&mut self, id: PageId) -> Result, SlotBoxError> { + fn do_get(&mut self, id: TupleId) -> Result { + // We should already have a tuple pointer for this, otherwise it's not in the box. + self.tuple_ptrs + .get_mut(&id) + .map(|tptr| { + let tptr_ref = unsafe { Pin::into_inner_unchecked(tptr.as_mut()) }; + let tptr_ptr = tptr_ref.as_mut_ptr(); + + TupleRef::at_tptr(tptr_ptr) + }) + .map_or_else( + || Err(TupleBoxError::TupleNotFound(id.slot as usize)), + |v| Ok(v), + ) + } + + fn do_restore_page<'a>(&mut self, id: PageId) -> Result, TupleBoxError> { let (addr, page_size) = match self.pool.restore(Bid(id as u64)) { Ok(v) => v, Err(PagerError::CouldNotAccess) => { - return Err(SlotBoxError::TupleNotFound(id)); + return Err(TupleBoxError::TupleNotFound(id)); } Err(e) => { panic!("Unexpected buffer pool error: {:?}", e); @@ -309,24 +287,26 @@ impl Inner { available_page_space.insert(free_space, bid); } - fn do_remove(&mut self, id: TupleId) -> Result<(), SlotBoxError> { - let mut page_handle = self.page_for(id.page)?; - let mut write_lock = page_handle.write_lock(); - - let (new_free, _, is_empty) = write_lock.remove_slot(id.slot)?; - self.report_free(id.page, new_free, is_empty); + fn do_remove(&mut self, id: TupleId) -> Result<(), TupleBoxError> { + // We have to release the lock before dropping the pointer, or we get a fault + // decrementing the page's lock count. Unclear to me why. + { + let mut page_handle = self.page_for(id.page)?; + let mut write_lock = page_handle.write_lock(); + let (new_free, _, is_empty) = write_lock.remove_slot(id.slot)?; - // TODO: The swizref stays just in case? - // self.swizrefs.remove(&id); + self.report_free(id.page, new_free, is_empty); + } + self.tuple_ptrs.remove(&id); Ok(()) } - fn page_for<'a>(&self, page_num: usize) -> Result, SlotBoxError> { + fn page_for<'a>(&self, page_num: usize) -> Result, TupleBoxError> { let (page_address, page_size) = match self.pool.resolve_ptr::(Bid(page_num as u64)) { Ok(v) => v, Err(PagerError::CouldNotAccess) => { - return Err(SlotBoxError::TupleNotFound(page_num)); + return Err(TupleBoxError::TupleNotFound(page_num)); } Err(e) => { panic!("Unexpected buffer pool error: {:?}", e); @@ -340,12 +320,12 @@ impl Inner { &mut self, relation_id: RelationId, page_size: usize, - ) -> Result<(PageId, usize), SlotBoxError> { + ) -> Result<(PageId, usize), TupleBoxError> { // Ask the buffer pool for a new page of the given size. let (bid, _, actual_size) = match self.pool.alloc(page_size) { Ok(v) => v, Err(PagerError::InsufficientRoom { desired, available }) => { - return Err(SlotBoxError::BoxFull(desired, available)); + return Err(TupleBoxError::BoxFull(desired, available)); } Err(e) => { panic!("Unexpected buffer pool error: {:?}", e); @@ -374,7 +354,7 @@ impl Inner { relation_id: RelationId, tuple_size: usize, page_size: usize, - ) -> Result<(PageId, usize), SlotBoxError> { + ) -> Result<(PageId, usize), TupleBoxError> { // Do we have a used pages set for this relation? If not, we can start one, and allocate a // new full page to it, and return. When we actually do the allocation, we'll be able to // find the page in the used pages set. @@ -425,8 +405,9 @@ impl Inner { } /// The amount of space available for each page known to the allocator for a relation. -/// Kept in two vectors, one for the available space, and one for the page ids, and kept sorted by -/// available space, with the page ids in the same order. +/// Page id & available space are encoded in a single u128, so that we can sort by available space +/// without having to do a lot of gymnastics. +/// We keep an allocation list like this per-relation. struct PageSpace { // Lower 64 bits of the page id, upper 64 bits are the size // In this way we can sort by available space, and keep the page ids in the same order @@ -549,15 +530,15 @@ mod tests { use rand::distributions::Alphanumeric; use rand::{thread_rng, Rng}; - use crate::tuplebox::tuples::slotbox::{SlotBox, SlotBoxError}; - use crate::tuplebox::tuples::slotted_page::slot_page_empty_size; - use crate::tuplebox::tuples::TupleRef; - use crate::tuplebox::RelationId; + use crate::rdb::paging::slotted_page::slot_page_empty_size; + use crate::rdb::paging::tuple_box::{TupleBox, TupleBoxError}; + use crate::rdb::tuples::TupleRef; + use crate::rdb::RelationId; - fn fill_until_full(sb: &Arc) -> Vec<(TupleRef, Vec)> { + fn fill_until_full(sb: &Arc) -> Vec<(TupleRef, Vec)> { let mut tuples = Vec::new(); - // fill until full... (SlotBoxError::BoxFull) + // fill until full... (TupleBoxError::BoxFull) loop { let mut rng = thread_rng(); let tuple_len = rng.gen_range(1..(slot_page_empty_size(52000))); @@ -566,7 +547,7 @@ mod tests { Ok(tref) => { tuples.push((tref, value)); } - Err(SlotBoxError::BoxFull(_, _)) => { + Err(TupleBoxError::BoxFull(_, _)) => { break; } Err(e) => { @@ -580,7 +561,7 @@ mod tests { // Just allocate a single tuple, and verify that we can retrieve it. #[test] fn test_one_page_one_slot() { - let sb = Arc::new(SlotBox::new(32768 * 64)); + let sb = Arc::new(TupleBox::new(32768 * 64)); let expected_value = vec![1, 2, 3, 4, 5]; let _retrieved = sb .clone() @@ -591,7 +572,7 @@ mod tests { // Fill just one page and verify that we can retrieve them all. #[test] fn test_one_page_a_few_slots() { - let sb = Arc::new(SlotBox::new(32768 * 64)); + let sb = Arc::new(TupleBox::new(32768 * 64)); let mut tuples = Vec::new(); let mut last_page_id = None; loop { @@ -623,7 +604,7 @@ mod tests { // Fill one page, then overflow into another, and verify we can get the tuple that's on the next page. #[test] fn test_page_overflow() { - let sb = Arc::new(SlotBox::new(32768 * 64)); + let sb = Arc::new(TupleBox::new(32768 * 64)); let mut tuples = Vec::new(); let mut first_page_id = None; let (next_page_tuple_id, next_page_value) = loop { @@ -656,7 +637,7 @@ mod tests { // and then scan back and verify their presence/equality. #[test] fn test_basic_add_fill_etc() { - let sb = Arc::new(SlotBox::new(32768 * 32)); + let sb = Arc::new(TupleBox::new(32768 * 32)); let mut tuples = fill_until_full(&sb); for (i, (tuple, expected_value)) in tuples.iter().enumerate() { let retrieved_domain = tuple.domain(); @@ -685,7 +666,7 @@ mod tests { // everything mmap DONTNEED'd, and we should be able to re-fill it again, too. #[test] fn test_full_fill_and_empty() { - let sb = Arc::new(SlotBox::new(32768 * 64)); + let sb = Arc::new(TupleBox::new(32768 * 64)); let mut tuples = fill_until_full(&sb); // Collect the manual ids of the tuples we've allocated, so we can check them for refcount goodness. @@ -702,7 +683,7 @@ mod tests { // fill back up again and verify the new presence. #[test] fn test_fill_and_free_and_refill_etc() { - let sb = Arc::new(SlotBox::new(32768 * 64)); + let sb = Arc::new(TupleBox::new(32768 * 64)); let mut tuples = fill_until_full(&sb); let mut rng = thread_rng(); let mut freed_tuples = Vec::new(); diff --git a/crates/db/src/tuplebox/tuples/tuple_ptr.rs b/crates/db/src/rdb/paging/tuple_ptr.rs similarity index 87% rename from crates/db/src/tuplebox/tuples/tuple_ptr.rs rename to crates/db/src/rdb/paging/tuple_ptr.rs index fde7b487..d261e2d0 100644 --- a/crates/db/src/tuplebox/tuples/tuple_ptr.rs +++ b/crates/db/src/rdb/paging/tuple_ptr.rs @@ -15,18 +15,18 @@ use std::hash::Hash; use std::sync::Arc; +use crate::rdb::paging::TupleBox; +use crate::rdb::tuples::TupleId; use moor_values::util::slice_ref::ByteSource; -use crate::tuplebox::tuples::{SlotBox, TupleId}; - -/// A reference to a tuple in a SlotBox, owned by the SlotBox itself. TupleRefs are given a pointer to these, -/// which allows the SlotBox to manage the lifetime of the tuple, swizzling it in and out of memory as needed. -/// Adds a layer of indirection to each tuple access, but is better than passing around tuple ids + slotbox +/// A reference to a tuple in a TupleBox, managed by the TupleBox itself. TupleRefs are given a pointer to these, +/// which allows the TupleBox to manage the lifetime of the tuple, swizzling it in and out of memory as needed. +/// Adds a layer of indirection to each tuple access, but is better than passing around tuple ids + TupleBox /// references. // TODO: rather than decoding a tuple out of a buffer in the slot, the slot should just hold the tuple structure pub struct TuplePtr { - sb: Arc, + sb: Arc, id: TupleId, buflen: u32, bufaddr: *mut u8, @@ -39,7 +39,7 @@ unsafe impl Sync for TuplePtr {} impl TuplePtr { pub(crate) fn create( - sb: Arc, + sb: Arc, tuple_id: TupleId, bufaddr: *mut u8, buflen: usize, @@ -113,7 +113,7 @@ impl TuplePtr { } } -/// So we can build SliceRefs off of SlotPtrs +/// So we can build SliceRefs off of TuplePtrs pub struct SlotByteSource { ptr: *const TuplePtr, } diff --git a/crates/db/src/tuplebox/pool/buffer_pool.rs b/crates/db/src/rdb/pool/buffer_pool.rs similarity index 98% rename from crates/db/src/tuplebox/pool/buffer_pool.rs rename to crates/db/src/rdb/pool/buffer_pool.rs index e5c2b239..8a7bc110 100644 --- a/crates/db/src/tuplebox/pool/buffer_pool.rs +++ b/crates/db/src/rdb/pool/buffer_pool.rs @@ -29,8 +29,8 @@ use std::cmp::max; use std::sync::atomic::{AtomicPtr, AtomicUsize, Ordering}; -use crate::tuplebox::pool::size_class::SizeClass; -use crate::tuplebox::pool::{Bid, PagerError}; +use crate::rdb::pool::size_class::SizeClass; +use crate::rdb::pool::{Bid, PagerError}; // 32k -> 1MB page sizes supported. // TODO: If we end up with values bigger than 1MB, they should probably be handled by "external" pages, @@ -261,8 +261,8 @@ impl BufferPool { #[cfg(test)] mod tests { - use crate::tuplebox::pool::buffer_pool::{BufferPool, HIGHEST_SIZE_CLASS_POWER_OF}; - use crate::tuplebox::pool::PagerError; + use crate::rdb::pool::buffer_pool::{BufferPool, HIGHEST_SIZE_CLASS_POWER_OF}; + use crate::rdb::pool::PagerError; const MB_256: usize = 1 << 28; diff --git a/crates/db/src/tuplebox/pool/mod.rs b/crates/db/src/rdb/pool/mod.rs similarity index 100% rename from crates/db/src/tuplebox/pool/mod.rs rename to crates/db/src/rdb/pool/mod.rs diff --git a/crates/db/src/tuplebox/pool/size_class.rs b/crates/db/src/rdb/pool/size_class.rs similarity index 99% rename from crates/db/src/tuplebox/pool/size_class.rs rename to crates/db/src/rdb/pool/size_class.rs index bc927041..051b68a3 100644 --- a/crates/db/src/tuplebox/pool/size_class.rs +++ b/crates/db/src/rdb/pool/size_class.rs @@ -19,7 +19,7 @@ use human_bytes::human_bytes; use libc::{madvise, MADV_DONTNEED, MAP_ANONYMOUS, MAP_PRIVATE, PROT_READ, PROT_WRITE}; use tracing::info; -use crate::tuplebox::pool::PagerError; +use crate::rdb::pool::PagerError; type BitSet = hi_sparse_bitset::BitSet; diff --git a/crates/db/src/tuplebox/tb.rs b/crates/db/src/rdb/relbox.rs similarity index 95% rename from crates/db/src/tuplebox/tb.rs rename to crates/db/src/rdb/relbox.rs index 3d353f6d..4ab55062 100644 --- a/crates/db/src/tuplebox/tb.rs +++ b/crates/db/src/rdb/relbox.rs @@ -22,13 +22,13 @@ use std::sync::Arc; use tokio::sync::RwLock; use tracing::info; -use crate::tuplebox::backing::BackingStoreClient; -use crate::tuplebox::base_relation::BaseRelation; -use crate::tuplebox::tuples::SlotBox; -use crate::tuplebox::tuples::TxTuple; -use crate::tuplebox::tx::WorkingSet; -use crate::tuplebox::tx::{CommitError, CommitSet, Transaction}; -use crate::tuplebox::RelationId; +use crate::rdb::backing::BackingStoreClient; +use crate::rdb::base_relation::BaseRelation; +use crate::rdb::paging::TupleBox; +use crate::rdb::tuples::TxTuple; +use crate::rdb::tx::WorkingSet; +use crate::rdb::tx::{CommitError, CommitSet, Transaction}; +use crate::rdb::RelationId; /// Meta-data about a relation #[derive(Clone, Debug)] @@ -43,12 +43,12 @@ pub struct RelationInfo { pub secondary_indexed: bool, } -/// The tuplebox is the set of relations, referenced by their unique (usize) relation ID. +/// The rdb is the set of relations, referenced by their unique (usize) relation ID. /// It exposes interfaces for starting & managing transactions on those relations. /// It is, essentially, a micro database. // TODO: locking in general here is (probably) safe, but not optimal. optimistic locking would be // better for various portions here. -pub struct TupleBox { +pub struct RelBox { relation_info: Vec, /// The monotonically increasing transaction ID "timestamp" counter. // TODO: take a look at Adnan's thread-sharded approach described in section 3.1 @@ -60,19 +60,19 @@ pub struct TupleBox { // TODO: this is a candidate for an optimistic lock. pub(crate) canonical: RwLock>, - slotbox: Arc, + slotbox: Arc, backing_store: Option, } -impl TupleBox { +impl RelBox { pub async fn new( memory_size: usize, path: Option, relations: &[RelationInfo], num_sequences: usize, ) -> Arc { - let slotbox = Arc::new(SlotBox::new(memory_size)); + let slotbox = Arc::new(TupleBox::new(memory_size)); let mut base_relations = Vec::with_capacity(relations.len()); for (rid, r) in relations.iter().enumerate() { base_relations.push(BaseRelation::new(RelationId(rid), 0)); @@ -84,7 +84,7 @@ impl TupleBox { let backing_store = match path { None => None, Some(path) => { - let bs = crate::tuplebox::coldstorage::ColdStorage::start( + let bs = crate::rdb::coldstorage::ColdStorage::start( path, relations, &mut base_relations, diff --git a/crates/db/src/tuplebox/tuples/mod.rs b/crates/db/src/rdb/tuples/mod.rs similarity index 90% rename from crates/db/src/tuplebox/tuples/mod.rs rename to crates/db/src/rdb/tuples/mod.rs index 71814912..c8dddd81 100644 --- a/crates/db/src/tuplebox/tuples/mod.rs +++ b/crates/db/src/rdb/tuples/mod.rs @@ -14,13 +14,10 @@ use thiserror::Error; -pub use slotbox::{PageId, SlotBox, SlotBoxError, SlotId}; +use crate::rdb::paging::{PageId, SlotId}; pub use tuple_ref::TupleRef; pub use tx_tuple::TxTuple; -mod slotbox; -mod slotted_page; -mod tuple_ptr; mod tuple_ref; mod tx_tuple; diff --git a/crates/db/src/tuplebox/tuples/tuple_ref.rs b/crates/db/src/rdb/tuples/tuple_ref.rs similarity index 94% rename from crates/db/src/tuplebox/tuples/tuple_ref.rs rename to crates/db/src/rdb/tuples/tuple_ref.rs index afe13334..b5b26974 100644 --- a/crates/db/src/tuplebox/tuples/tuple_ref.rs +++ b/crates/db/src/rdb/tuples/tuple_ref.rs @@ -18,9 +18,10 @@ use std::sync::Arc; use moor_values::util::slice_ref::SliceRef; -use crate::tuplebox::tuples::tuple_ptr::TuplePtr; -use crate::tuplebox::tuples::{SlotBox, SlotBoxError, TupleId}; -use crate::tuplebox::RelationId; +use crate::rdb::paging::TuplePtr; +use crate::rdb::paging::{TupleBox, TupleBoxError}; +use crate::rdb::tuples::TupleId; +use crate::rdb::RelationId; pub struct TupleRef { // Yo dawg I heard you like pointers, so I put a pointer in your pointer. @@ -37,20 +38,20 @@ struct Header { unsafe impl Send for TupleRef {} unsafe impl Sync for TupleRef {} impl TupleRef { - // Wrap an existing SlotPtr. + // Wrap an existing TuplePtr. // Note: to avoid deadlocking at construction, assumes that the tuple is already upcounted by the // caller. - pub(crate) fn at_ptr(sp: *mut TuplePtr) -> Self { + pub(crate) fn at_tptr(sp: *mut TuplePtr) -> Self { Self { sp } } /// Allocate the given tuple in a slotbox. pub fn allocate( relation_id: RelationId, - sb: Arc, + sb: Arc, ts: u64, domain: &[u8], codomain: &[u8], - ) -> Result { + ) -> Result { let total_size = std::mem::size_of::
() + domain.len() + codomain.len(); let tuple_ref = sb.clone().allocate(total_size, relation_id, None)?; sb.update_with(tuple_ref.id(), |mut buffer| { diff --git a/crates/db/src/tuplebox/tuples/tx_tuple.rs b/crates/db/src/rdb/tuples/tx_tuple.rs similarity index 96% rename from crates/db/src/tuplebox/tuples/tx_tuple.rs rename to crates/db/src/rdb/tuples/tx_tuple.rs index 7bbde439..df35987d 100644 --- a/crates/db/src/tuplebox/tuples/tx_tuple.rs +++ b/crates/db/src/rdb/tuples/tx_tuple.rs @@ -14,8 +14,8 @@ use moor_values::util::slice_ref::SliceRef; -use crate::tuplebox::tuples::TupleId; -use crate::tuplebox::tuples::TupleRef; +use crate::rdb::tuples::TupleId; +use crate::rdb::tuples::TupleRef; /// Possible operations on tuples, in the context local to a transaction. #[derive(Clone)] diff --git a/crates/db/src/tuplebox/tx/mod.rs b/crates/db/src/rdb/tx/mod.rs similarity index 100% rename from crates/db/src/tuplebox/tx/mod.rs rename to crates/db/src/rdb/tx/mod.rs diff --git a/crates/db/src/tuplebox/tx/relvar.rs b/crates/db/src/rdb/tx/relvar.rs similarity index 95% rename from crates/db/src/tuplebox/tx/relvar.rs rename to crates/db/src/rdb/tx/relvar.rs index e58b9153..f18f61c6 100644 --- a/crates/db/src/tuplebox/tx/relvar.rs +++ b/crates/db/src/rdb/tx/relvar.rs @@ -14,9 +14,9 @@ use moor_values::util::slice_ref::SliceRef; -use crate::tuplebox::tuples::TupleError; -use crate::tuplebox::tx::transaction::Transaction; -use crate::tuplebox::RelationId; +use crate::rdb::tuples::TupleError; +use crate::rdb::tx::transaction::Transaction; +use crate::rdb::RelationId; /// A reference / handle / pointer to a relation, the actual operations are managed through the /// transaction. diff --git a/crates/db/src/tuplebox/tx/transaction.rs b/crates/db/src/rdb/tx/transaction.rs similarity index 98% rename from crates/db/src/tuplebox/tx/transaction.rs rename to crates/db/src/rdb/tx/transaction.rs index 6c2f4ee7..f619659a 100644 --- a/crates/db/src/tuplebox/tx/transaction.rs +++ b/crates/db/src/rdb/tx/transaction.rs @@ -21,18 +21,18 @@ use tokio::sync::RwLock; use moor_values::util::slice_ref::SliceRef; -use crate::tuplebox::base_relation::BaseRelation; -use crate::tuplebox::tb::TupleBox; -use crate::tuplebox::tuples::SlotBox; -use crate::tuplebox::tuples::TupleError; -use crate::tuplebox::tx::relvar::RelVar; -use crate::tuplebox::tx::working_set::WorkingSet; -use crate::tuplebox::RelationId; +use crate::rdb::base_relation::BaseRelation; +use crate::rdb::paging::TupleBox; +use crate::rdb::relbox::RelBox; +use crate::rdb::tuples::TupleError; +use crate::rdb::tx::relvar::RelVar; +use crate::rdb::tx::working_set::WorkingSet; +use crate::rdb::RelationId; /// A versioned transaction, which is a fork of the current canonical base relations. pub struct Transaction { /// Where we came from, for referencing back to the base relations. - db: Arc, + db: Arc, /// The "working set" is the set of retrieved and/or modified tuples from base relations, known /// to the transaction, and represents the set of values that will be committed to the base /// relations at commit time. @@ -57,7 +57,7 @@ pub enum CommitError { } impl Transaction { - pub fn new(ts: u64, slotbox: Arc, db: Arc) -> Self { + pub fn new(ts: u64, slotbox: Arc, db: Arc) -> Self { let ws = WorkingSet::new(slotbox.clone(), &db.relation_info(), ts); let next_transient_relation_id = RelationId::transient(db.relation_info().len()); @@ -490,17 +490,17 @@ mod tests { use moor_values::util::slice_ref::SliceRef; - use crate::tuplebox::tb::{RelationInfo, TupleBox}; - use crate::tuplebox::tuples::TupleError; - use crate::tuplebox::tx::transaction::CommitError; - use crate::tuplebox::{RelationId, Transaction}; + use crate::rdb::relbox::{RelBox, RelationInfo}; + use crate::rdb::tuples::TupleError; + use crate::rdb::tx::transaction::CommitError; + use crate::rdb::{RelationId, Transaction}; fn attr(slice: &[u8]) -> SliceRef { SliceRef::from_bytes(slice) } - async fn test_db() -> Arc { - TupleBox::new( + async fn test_db() -> Arc { + RelBox::new( 1 << 24, None, &[RelationInfo { diff --git a/crates/db/src/tuplebox/tx/working_set.rs b/crates/db/src/rdb/tx/working_set.rs similarity index 97% rename from crates/db/src/tuplebox/tx/working_set.rs rename to crates/db/src/rdb/tx/working_set.rs index ed46f3b8..64d24ae4 100644 --- a/crates/db/src/tuplebox/tx/working_set.rs +++ b/crates/db/src/rdb/tx/working_set.rs @@ -18,10 +18,10 @@ use std::sync::Arc; use moor_values::util::slice_ref::SliceRef; -use crate::tuplebox::tb::{RelationInfo, TupleBox}; -use crate::tuplebox::tuples::{SlotBox, TupleError}; -use crate::tuplebox::tuples::{TupleRef, TxTuple}; -use crate::tuplebox::RelationId; +use crate::rdb::paging::TupleBox; +use crate::rdb::relbox::{RelBox, RelationInfo}; +use crate::rdb::tuples::{TupleRef, TxTuple}; +use crate::rdb::{RelationId, TupleError}; /// The local tx "working set" of mutations to base relations, and consists of the set of operations /// we will attempt to make permanent when the transaction commits. @@ -31,12 +31,12 @@ use crate::tuplebox::RelationId; pub struct WorkingSet { pub(crate) ts: u64, pub(crate) schema: Vec, - pub(crate) slotbox: Arc, + pub(crate) slotbox: Arc, pub(crate) relations: BitArray>, } impl WorkingSet { - pub(crate) fn new(slotbox: Arc, schema: &[RelationInfo], ts: u64) -> Self { + pub(crate) fn new(slotbox: Arc, schema: &[RelationInfo], ts: u64) -> Self { let relations = BitArray::new(); Self { ts, @@ -79,7 +79,7 @@ impl WorkingSet { pub(crate) async fn seek_by_domain( &mut self, - db: &Arc, + db: &Arc, relation_id: RelationId, domain: SliceRef, ) -> Result<(SliceRef, SliceRef), TupleError> { @@ -119,7 +119,7 @@ impl WorkingSet { pub(crate) async fn seek_by_codomain( &mut self, - db: &Arc, + db: &Arc, relation_id: RelationId, codomain: SliceRef, ) -> Result, TupleError> { @@ -169,7 +169,7 @@ impl WorkingSet { pub(crate) async fn insert_tuple( &mut self, - db: &Arc, + db: &Arc, relation_id: RelationId, domain: SliceRef, codomain: SliceRef, @@ -207,7 +207,7 @@ impl WorkingSet { pub(crate) async fn predicate_scan bool>( &mut self, - db: &Arc, + db: &Arc, relation_id: RelationId, f: F, ) -> Result, TupleError> { @@ -252,7 +252,7 @@ impl WorkingSet { pub(crate) async fn update_tuple( &mut self, - db: &Arc, + db: &Arc, relation_id: RelationId, domain: SliceRef, codomain: SliceRef, @@ -323,7 +323,7 @@ impl WorkingSet { /// committing it to the canonical base relations. pub(crate) async fn upsert_tuple( &mut self, - db: &Arc, + db: &Arc, relation_id: RelationId, domain: SliceRef, codomain: SliceRef, @@ -423,7 +423,7 @@ impl WorkingSet { /// committing the delete to the canonical base relations. pub(crate) async fn remove_by_domain( &mut self, - db: &Arc, + db: &Arc, relation_id: RelationId, domain: SliceRef, ) -> Result<(), TupleError> { diff --git a/crates/db/tests/jepsen.rs b/crates/db/tests/jepsen.rs index 085efacb..f9705ee3 100644 --- a/crates/db/tests/jepsen.rs +++ b/crates/db/tests/jepsen.rs @@ -13,12 +13,12 @@ // pub mod support { - use moor_db::tuplebox::{RelationInfo, TupleBox}; + use moor_db::rdb::{RelBox, RelationInfo}; use std::path::PathBuf; use std::sync::Arc; /// Build a test database with a bunch of relations - pub async fn test_db(dir: PathBuf) -> Arc { + pub async fn test_db(dir: PathBuf) -> Arc { // Generate 10 test relations that we'll use for testing. let relations = (0..100) .map(|i| RelationInfo { @@ -29,7 +29,7 @@ pub mod support { }) .collect::>(); - TupleBox::new(1 << 24, Some(dir), &relations, 0).await + RelBox::new(1 << 24, Some(dir), &relations, 0).await } } @@ -39,9 +39,9 @@ mod tests { use std::sync::Arc; use tracing_test::traced_test; + use moor_db::rdb::RelBox; + use moor_db::rdb::{RelationId, Transaction}; use moor_db::testing::jepsen::{History, Type, Value}; - use moor_db::tuplebox::TupleBox; - use moor_db::tuplebox::{RelationId, Transaction}; use moor_values::util::slice_ref::SliceRef; @@ -58,7 +58,7 @@ mod tests { async fn check_expected( process: i64, - _db: Arc, + _db: Arc, tx: &Transaction, relation: RelationId, expected_values: &Option>, @@ -94,7 +94,7 @@ mod tests { async fn check_completion( process: i64, - db: Arc, + db: Arc, tx: &Transaction, values: Vec, action_type: Type, diff --git a/crates/db/tests/tb_restore.rs b/crates/db/tests/rdb_restore.rs similarity index 96% rename from crates/db/tests/tb_restore.rs rename to crates/db/tests/rdb_restore.rs index 8e316784..9501c8ef 100644 --- a/crates/db/tests/tb_restore.rs +++ b/crates/db/tests/rdb_restore.rs @@ -18,9 +18,9 @@ mod test { use std::path::PathBuf; use std::sync::Arc; + use moor_db::rdb::{RelBox, RelationInfo}; + use moor_db::rdb::{RelationId, Transaction}; use moor_db::testing::jepsen::{History, Type, Value}; - use moor_db::tuplebox::{RelationId, Transaction}; - use moor_db::tuplebox::{RelationInfo, TupleBox}; use moor_values::util::slice_ref::SliceRef; fn from_val(value: i64) -> SliceRef { @@ -33,7 +33,7 @@ mod test { } async fn fill_db( - db: Arc, + db: Arc, events: &Vec, processes: &mut HashMap>, ) { @@ -85,7 +85,7 @@ mod test { } } } - pub async fn test_db(dir: PathBuf) -> Arc { + pub async fn test_db(dir: PathBuf) -> Arc { // Generate 10 test relations that we'll use for testing. let relations = (0..100) .map(|i| RelationInfo { @@ -96,7 +96,7 @@ mod test { }) .collect::>(); - TupleBox::new(1 << 24, Some(dir), &relations, 0).await + RelBox::new(1 << 24, Some(dir), &relations, 0).await } // Open a db in a test dir, fill it with some goop, close it, reopen it, and check that the goop is still there. diff --git a/crates/db/tests/worldstate_restore.rs b/crates/db/tests/worldstate_restore.rs index 58b94035..38cd3a78 100644 --- a/crates/db/tests/worldstate_restore.rs +++ b/crates/db/tests/worldstate_restore.rs @@ -15,9 +15,9 @@ #[cfg(test)] mod test { use moor_db::db_tx::DbTransaction; - use moor_db::object_relations::{WorldStateRelation, WorldStateSequences}; - use moor_db::tb_worldstate::TupleBoxTransaction; - use moor_db::tuplebox::{RelationInfo, TupleBox}; + use moor_db::odb::RelBoxTransaction; + use moor_db::odb::{WorldStateRelation, WorldStateSequences}; + use moor_db::rdb::{RelBox, RelationInfo}; use moor_values::model::defset::HasUuid; use moor_values::model::objects::ObjAttrs; use moor_values::model::r#match::VerbArgsSpec; @@ -29,7 +29,7 @@ mod test { use std::sync::Arc; use strum::{EnumCount, IntoEnumIterator}; - pub async fn test_db(dir: PathBuf) -> Arc { + pub async fn test_db(dir: PathBuf) -> Arc { let mut relations: Vec = WorldStateRelation::iter() .map(|wsr| { RelationInfo { @@ -43,7 +43,7 @@ mod test { relations[WorldStateRelation::ObjectParent as usize].secondary_indexed = true; relations[WorldStateRelation::ObjectLocation as usize].secondary_indexed = true; - TupleBox::new(1 << 24, Some(dir), &relations, WorldStateSequences::COUNT).await + RelBox::new(1 << 24, Some(dir), &relations, WorldStateSequences::COUNT).await } #[tokio::test] @@ -54,7 +54,7 @@ mod test { let a = { let db = test_db(tmpdir.path().into()).await; - let tx = TupleBoxTransaction::new(db.clone()); + let tx = RelBoxTransaction::new(db.clone()); let a = tx .create_object( @@ -103,7 +103,7 @@ mod test { .next() .is_some()); - let tx = TupleBoxTransaction::new(db.clone()); + let tx = RelBoxTransaction::new(db.clone()); let v_uuid = tx .resolve_verb(a, "test".into(), None) diff --git a/crates/kernel/benches/vm_benches.rs b/crates/kernel/benches/vm_benches.rs index 6faf88bb..c370ae29 100644 --- a/crates/kernel/benches/vm_benches.rs +++ b/crates/kernel/benches/vm_benches.rs @@ -1,3 +1,17 @@ +// Copyright (C) 2024 Ryan Daum +// +// This program is free software: you can redistribute it and/or modify it under +// the terms of the GNU General Public License as published by the Free Software +// Foundation, version 3. +// +// This program is distributed in the hope that it will be useful, but WITHOUT +// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +// FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License along with +// this program. If not, see . +// + //! Benchmarks of various virtual machine executions //! In general attempting to keep isolated from the object/world-state and simply execute //! program code that doesn't interact with the DB, to measure opcode execution efficiency. @@ -9,7 +23,7 @@ use criterion::{criterion_group, criterion_main, Criterion}; use tokio::runtime::Runtime; use moor_compiler::codegen::compile; -use moor_db::tb_worldstate::TupleBoxWorldStateSource; +use moor_db::odb::RelBoxWorldState; use moor_kernel::tasks::scheduler::AbortLimitReason; use moor_kernel::tasks::sessions::{NoopClientSession, Session}; use moor_kernel::tasks::vm_host::{VMHostResponse, VmHost}; @@ -22,8 +36,8 @@ use moor_values::util::bitenum::BitEnum; use moor_values::var::Var; use moor_values::{AsByteBuffer, NOTHING, SYSTEM_OBJECT}; -async fn create_worldstate() -> TupleBoxWorldStateSource { - let (ws_source, _) = TupleBoxWorldStateSource::open(None, 1 << 24).await; +async fn create_worldstate() -> RelBoxWorldState { + let (ws_source, _) = RelBoxWorldState::open(None, 1 << 24).await; let mut tx = ws_source.new_world_state().await.unwrap(); let _sysobj = tx .create_object(SYSTEM_OBJECT, NOTHING, SYSTEM_OBJECT, BitEnum::all()) @@ -73,7 +87,7 @@ pub async fn prepare_call_verb( } async fn prepare_vm_execution( - ws_source: &mut TupleBoxWorldStateSource, + ws_source: &mut RelBoxWorldState, program: &str, max_ticks: usize, ) -> VmHost { diff --git a/crates/kernel/src/vm/vm_test.rs b/crates/kernel/src/vm/vm_test.rs index f6aa5551..2df22a56 100644 --- a/crates/kernel/src/vm/vm_test.rs +++ b/crates/kernel/src/vm/vm_test.rs @@ -36,7 +36,7 @@ mod tests { use moor_compiler::labels::Names; use moor_compiler::opcode::Op::*; use moor_compiler::opcode::{Op, Program}; - use moor_db::tb_worldstate::TupleBoxWorldStateSource; + use moor_db::odb::RelBoxWorldState; use moor_values::model::{Event, NarrativeEvent}; use test_case::test_case; @@ -52,8 +52,8 @@ mod tests { } // Create an in memory db with a single object (#0) containing a single provided verb. - async fn test_db_with_verbs(verbs: &[(&str, &Program)]) -> TupleBoxWorldStateSource { - let (state, _) = TupleBoxWorldStateSource::open(None, 1 << 30).await; + async fn test_db_with_verbs(verbs: &[(&str, &Program)]) -> RelBoxWorldState { + let (state, _) = RelBoxWorldState::open(None, 1 << 30).await; let mut tx = state.new_world_state().await.unwrap(); let sysobj = tx .create_object(SYSTEM_OBJECT, NOTHING, SYSTEM_OBJECT, BitEnum::all()) @@ -101,7 +101,7 @@ mod tests { state } - async fn test_db_with_verb(verb_name: &str, program: &Program) -> TupleBoxWorldStateSource { + async fn test_db_with_verb(verb_name: &str, program: &Program) -> RelBoxWorldState { test_db_with_verbs(&[(verb_name, program)]).await } diff --git a/crates/kernel/testsuite/basic/basic_suite.rs b/crates/kernel/testsuite/basic/basic_suite.rs index 55f7d10c..abb933f7 100644 --- a/crates/kernel/testsuite/basic/basic_suite.rs +++ b/crates/kernel/testsuite/basic/basic_suite.rs @@ -14,7 +14,7 @@ use moor_compiler::codegen::compile; use moor_compiler::opcode::Program; -use moor_db::tb_worldstate::TupleBoxWorldStateSource; +use moor_db::odb::RelBoxWorldState; use moor_db::Database; use moor_kernel::tasks::sessions::NoopClientSession; use moor_kernel::tasks::vm_test_utils::call_verb; @@ -36,7 +36,7 @@ fn testsuite_dir() -> PathBuf { } /// Create a minimal Db to support the test harness. -async fn load_textdump(db: &mut TupleBoxWorldStateSource) { +async fn load_textdump(db: &mut RelBoxWorldState) { let mut tx = db.loader_client().unwrap(); textdump_load( tx.as_mut(), @@ -47,7 +47,7 @@ async fn load_textdump(db: &mut TupleBoxWorldStateSource) { assert_eq!(tx.commit().await.unwrap(), CommitResult::Success); } -async fn compile_verbs(db: &mut TupleBoxWorldStateSource, verbs: &[(&str, &Program)]) { +async fn compile_verbs(db: &mut RelBoxWorldState, verbs: &[(&str, &Program)]) { let mut tx = db.new_world_state().await.unwrap(); for (verb_name, program) in verbs { let binary = program.make_copy_as_vec(); @@ -85,7 +85,7 @@ async fn compile_verbs(db: &mut TupleBoxWorldStateSource, verbs: &[(&str, &Progr assert_eq!(tx.commit().await.unwrap(), CommitResult::Success); } -async fn eval(db: &mut TupleBoxWorldStateSource, expression: &str) -> Var { +async fn eval(db: &mut RelBoxWorldState, expression: &str) -> Var { let binary = compile(format!("return {expression};").as_str()).unwrap(); compile_verbs(db, &[("test", &binary)]).await; let mut state = db.new_world_state().await.unwrap(); @@ -123,7 +123,7 @@ async fn run_basic_test(test_dir: &str) { // Frustratingly the individual test lines are not independent, so we need to run them in a // single database. - let (mut db, _) = TupleBoxWorldStateSource::open(None, 1 << 30).await; + let (mut db, _) = RelBoxWorldState::open(None, 1 << 30).await; load_textdump(&mut db).await; for (line_num, (input, expected_output)) in zipped.enumerate() { let evaluated = eval(&mut db, input).await;