diff --git a/crates/db/src/tuplebox/base_relation.rs b/crates/db/src/tuplebox/base_relation.rs index f4e253a2..aaa29fa0 100644 --- a/crates/db/src/tuplebox/base_relation.rs +++ b/crates/db/src/tuplebox/base_relation.rs @@ -17,8 +17,8 @@ use std::sync::Arc; use moor_values::util::slice_ref::SliceRef; +use crate::tuplebox::tuples::SlotBox; use crate::tuplebox::tuples::TupleRef; -use crate::tuplebox::tuples::{SlotBox, TupleId}; use crate::tuplebox::RelationId; /// Represents a 'canonical' base binary relation, which is a set of tuples of domain, codomain, @@ -71,38 +71,35 @@ impl BaseRelation { panic!("Relation already has a secondary index"); } self.index_codomain = Some(im::HashMap::new()); - for tuple_ref in self.tuples.iter() { - let tuple = tuple_ref.get(); + for tuple in self.tuples.iter() { // ... update the secondary index. self.index_codomain .as_mut() .unwrap() .entry(tuple.codomain().as_slice().to_vec()) .or_default() - .insert(tuple_ref.clone()); + .insert(tuple.clone()); } } - /// Establish indexes on a tuple initial-loaded from secondary storage. Basically a, "trust us, + /// Establish indexes for a tuple initial-loaded from secondary storage. Basically a, "trust us, /// this exists" move. - pub fn index_tuple(&mut self, tuple_id: TupleId) { - let tuple_ref = TupleRef::new(self.slotbox.clone(), tuple_id); - self.tuples.insert(tuple_ref.clone()); - let tuple = tuple_ref.get(); + pub fn index_tuple(&mut self, tuple: TupleRef) { + self.tuples.insert(tuple.clone()); // Reset timestamp to 0, since this is a tuple initial-loaded from secondary storage. tuple.update_timestamp(self.id, self.slotbox.clone(), 0); // Update the domain index to point to the tuple... self.index_domain - .insert(tuple.domain().as_slice().to_vec(), tuple_ref.clone()); + .insert(tuple.domain().as_slice().to_vec(), tuple.clone()); // ... and update the secondary index if there is one. if let Some(index) = &mut self.index_codomain { index .entry(tuple.codomain().as_slice().to_vec()) .or_insert_with(HashSet::new) - .insert(tuple_ref); + .insert(tuple); } } @@ -113,10 +110,7 @@ impl BaseRelation { pub fn predicate_scan bool>(&self, f: &F) -> HashSet { self.tuples .iter() - .filter(|t| { - let t = t.get(); - f(&(t.domain(), t.codomain())) - }) + .filter(|t| f(&(t.domain(), t.codomain()))) .cloned() .collect() } @@ -148,41 +142,38 @@ impl BaseRelation { } /// Update or insert a tuple into the relation. - pub fn upsert_tuple(&mut self, new_tuple_ref: TupleRef) { - let tuple = new_tuple_ref.get(); + pub fn upsert_tuple(&mut self, tuple: TupleRef) { // First check the domain->tuple id index to see if we're inserting or updating. let existing_tuple_ref = self.index_domain.get(tuple.domain().as_slice()).cloned(); match existing_tuple_ref { None => { // Insert into the tuple list and the index. self.index_domain - .insert(tuple.domain().as_slice().to_vec(), new_tuple_ref.clone()); - self.tuples.insert(new_tuple_ref.clone()); + .insert(tuple.domain().as_slice().to_vec(), tuple.clone()); + self.tuples.insert(tuple.clone()); if let Some(codomain_index) = &mut self.index_codomain { codomain_index .entry(tuple.codomain().as_slice().to_vec()) .or_insert_with(HashSet::new) - .insert(new_tuple_ref); + .insert(tuple); } } Some(existing_tuple) => { // We need the old value so we can update the codomain index. - let old_value = existing_tuple.get(); - if let Some(codomain_index) = &mut self.index_codomain { codomain_index - .entry(old_value.codomain().as_slice().to_vec()) + .entry(existing_tuple.codomain().as_slice().to_vec()) .or_insert_with(HashSet::new) .remove(&existing_tuple); codomain_index .entry(tuple.codomain().as_slice().to_vec()) .or_insert_with(HashSet::new) - .insert(new_tuple_ref.clone()); + .insert(tuple.clone()); } self.index_domain - .insert(tuple.domain().as_slice().to_vec(), new_tuple_ref.clone()); + .insert(tuple.domain().as_slice().to_vec(), tuple.clone()); self.tuples.remove(&existing_tuple); - self.tuples.insert(new_tuple_ref); + self.tuples.insert(tuple); } } } diff --git a/crates/db/src/tuplebox/coldstorage.rs b/crates/db/src/tuplebox/coldstorage.rs index 8ec7dd62..18c0fed7 100644 --- a/crates/db/src/tuplebox/coldstorage.rs +++ b/crates/db/src/tuplebox/coldstorage.rs @@ -69,7 +69,6 @@ impl ColdStorage { let page_storage = Arc::new(Mutex::new(PageStore::new(path.join("pages"), &eventfd))); let wal_manager = WalManager { page_storage: page_storage.clone(), - slot_box: slot_box.clone(), }; // Do initial recovery of anything left in the WAL before starting up, which should // flush everything to page storage, from which we can then go and load it. @@ -107,27 +106,27 @@ impl ColdStorage { let mut restored_slots = HashMap::new(); let mut restored_bytes = 0; for (page_size, page_num, relation_id) in ids { - let sb_page = slot_box.restore(page_num).expect("Unable to get page"); - let slot_ids = sb_page.load(|buf| { - ps.read_page_buf(page_num, relation_id, buf) - .expect("Unable to read page") - }); - // The allocator needs to know that this page is used. - slot_box.mark_page_used(relation_id, sb_page.available_content_bytes(), page_num); + let tuple_ids = slot_box + .clone() + .load_page(relation_id, page_num, |buf| { + ps.read_page_buf(page_num, relation_id, buf) + .expect("Unable to read page") + }) + .expect("Unable to get page"); + restored_slots .entry(relation_id) .or_insert_with(HashSet::new) - .insert((page_num, slot_ids)); + .insert(tuple_ids); restored_bytes += page_size; } // Now iterate all the slots we restored, and re-establish their indexes in the relations they belong to. let mut restored_count = 0; - for (relation_id, pages) in restored_slots { - for (page_num, slot_ids) in pages { - let relation = &mut relations[relation_id.0]; - for slot_id in slot_ids { - let tuple_id: TupleId = (page_num, slot_id); + for (relation_id, relation_tuple_ids) in restored_slots { + for page_tuple_ids in relation_tuple_ids { + for tuple_id in page_tuple_ids { + let relation = &mut relations[relation_id.0]; relation.index_tuple(tuple_id); restored_count += 1; } @@ -182,7 +181,8 @@ impl ColdStorage { } } }, - // When the eventfd is triggered by the page store, we need to ask it to process completions. + // When the eventfd is triggered by the page store, we need to ask it to process + // completions. _ = event_fd.read(&mut buf) => { let _ = ps.lock().unwrap().process_completions(); } @@ -244,7 +244,10 @@ impl ColdStorage { for t in r.tuples() { match t { TxTuple::Insert(_) | TxTuple::Update(_) | TxTuple::Tombstone { .. } => { - let (page_id, _slot_id) = t.tuple_id(); + let TupleId { + page: page_id, + slot: _slot_id, + } = t.tuple_id(); dirty_pages.insert((page_id, r.id)); } TxTuple::Value(_) => { @@ -290,7 +293,6 @@ struct WalManager { // TODO: having a lock on the cold storage should not be necessary, but it is not !Sync, despite // it supposedly being thread safe. page_storage: Arc>, - slot_box: Arc, } impl Debug for WalManager { @@ -351,12 +353,6 @@ impl LogManager for WalManager { return Ok(()); }; - for tuple_id in evicted { - if let Err(e) = self.slot_box.remove(tuple_id) { - warn!(?tuple_id, e = ?e, "Failed to evict page from slot box"); - } - } - Ok(()) } } @@ -491,7 +487,10 @@ impl WalManager { pid as PageId, relation_id, )); - to_evict.push((pid as PageId, slot_id as SlotId)); + to_evict.push(TupleId { + page: pid as PageId, + slot: slot_id as SlotId, + }); } } } diff --git a/crates/db/src/tuplebox/pool/buffer_pool.rs b/crates/db/src/tuplebox/pool/buffer_pool.rs index af4b1bb3..81af50f1 100644 --- a/crates/db/src/tuplebox/pool/buffer_pool.rs +++ b/crates/db/src/tuplebox/pool/buffer_pool.rs @@ -222,6 +222,29 @@ impl BufferPool { Ok((AtomicPtr::new(addr), sc.block_size)) } + /// Find the buffer id (bid) for a given pointer. Can be used to identify the page + /// that a pointer belongs to, in case of page fault. + #[allow(dead_code)] // Legitimate potential future use + pub fn identify_page(&self, ptr: AtomicPtr) -> Result { + // Look at the address ranges for each size class to find the one that contains the pointer. + for (sc_idx, sc) in self.size_classes.iter().enumerate() { + let base = sc.base_addr.load(Ordering::SeqCst); + let base = base as usize; + let end = base + sc.virt_size; + let ptr = ptr.load(Ordering::SeqCst) as usize; + if ptr >= base && ptr < end { + // Found the size class that contains the pointer. Now we need to find the offset + // within the size class. + let offset = ptr - base; + let offset = offset / sc.block_size; + let offset = offset * sc.block_size; + let bid = Self::newbid(offset, sc_idx as u8); + return Ok(bid); + } + } + Err(PagerError::InvalidTuplePointer) + } + /// Get the total reserved capacity of the buffer pool. #[allow(dead_code)] // Legitimate potential future use pub fn capacity_bytes(&self) -> usize { diff --git a/crates/db/src/tuplebox/pool/mod.rs b/crates/db/src/tuplebox/pool/mod.rs index 86f9a5b4..fc4a5262 100644 --- a/crates/db/src/tuplebox/pool/mod.rs +++ b/crates/db/src/tuplebox/pool/mod.rs @@ -39,4 +39,7 @@ pub enum PagerError { #[error("Invalid page access")] CouldNotAccess, + + #[error("Invalid tuple pointer")] + InvalidTuplePointer, } diff --git a/crates/db/src/tuplebox/tb.rs b/crates/db/src/tuplebox/tb.rs index 6fadc668..572105a3 100644 --- a/crates/db/src/tuplebox/tb.rs +++ b/crates/db/src/tuplebox/tb.rs @@ -203,11 +203,10 @@ impl TupleBox { // here. let Some(cv) = canon_tuple else { match &tuple { - TxTuple::Insert(tref) => { - let t = tref.get(); + TxTuple::Insert(t) => { t.update_timestamp(relation_id, self.slotbox.clone(), tx_ts); let forked_relation = commitset.fork(relation_id, canonical); - forked_relation.upsert_tuple(tref.clone()); + forked_relation.upsert_tuple(t.clone()); continue; } TxTuple::Tombstone { .. } => { @@ -231,7 +230,6 @@ impl TupleBox { // Check the timestamp on the value, if it's newer than the read-timestamp, // we have for this tuple then that's a conflict, because it means someone else has // already committed a change to this tuple. - let cv = cv.get(); if cv.ts() > tuple.ts() { return Err(CommitError::TupleVersionConflict); } @@ -240,11 +238,10 @@ impl TupleBox { // branching of the old one. let forked_relation = commitset.fork(relation_id, canonical); match &tuple { - TxTuple::Insert(tref) | TxTuple::Update(tref) => { - let t = tref.get(); + TxTuple::Insert(t) | TxTuple::Update(t) => { t.update_timestamp(relation_id, self.slotbox.clone(), tx_ts); let forked_relation = commitset.fork(relation_id, canonical); - forked_relation.upsert_tuple(tref.clone()); + forked_relation.upsert_tuple(t.clone()); } TxTuple::Value(..) => {} TxTuple::Tombstone { diff --git a/crates/db/src/tuplebox/tuples/mod.rs b/crates/db/src/tuplebox/tuples/mod.rs index 58355581..4a0bb85b 100644 --- a/crates/db/src/tuplebox/tuples/mod.rs +++ b/crates/db/src/tuplebox/tuples/mod.rs @@ -12,18 +12,23 @@ // this program. If not, see . // -pub use slotbox::{PageId, SlotBox, SlotBoxError, SlotId, TupleId}; +pub use slotbox::{PageId, SlotBox, SlotBoxError, SlotId}; use thiserror::Error; -pub use tuple::Tuple; -pub use tuple_ref::TupleRef; +pub use tuple::TupleRef; pub use tx_tuple::TxTuple; +mod slot_ptr; mod slotbox; mod slotted_page; mod tuple; -mod tuple_ref; mod tx_tuple; +#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)] +pub struct TupleId { + pub page: PageId, + pub slot: SlotId, +} + #[derive(Debug, Clone, Eq, PartialEq, Error)] pub enum TupleError { #[error("Tuple not found")] diff --git a/crates/db/src/tuplebox/tuples/slot_ptr.rs b/crates/db/src/tuplebox/tuples/slot_ptr.rs new file mode 100644 index 00000000..4d4beaac --- /dev/null +++ b/crates/db/src/tuplebox/tuples/slot_ptr.rs @@ -0,0 +1,96 @@ +use crate::tuplebox::tuples::{SlotBox, TupleId}; +use moor_values::util::slice_ref::ByteSource; +use std::hash::Hash; +use std::sync::atomic::AtomicPtr; +use std::sync::atomic::Ordering::SeqCst; +use std::sync::Arc; + +/// A reference to a tuple in a SlotBox, owned by the SlotBox itself. TupleRefs are given a pointer to these, +/// which allows the SlotBox to manage the lifetime of the tuple, swizzling it in and out of memory as needed. +/// Adds a layer of indirection to each tuple access, but is better than passing around tuple ids + slotbox +/// references. +pub struct SlotPtr { + sb: Arc, + id: TupleId, + buflen: usize, + bufaddr: AtomicPtr, + + _pin: std::marker::PhantomPinned, +} + +impl SlotPtr { + pub(crate) fn create( + sb: Arc, + tuple_id: TupleId, + bufaddr: AtomicPtr, + buflen: usize, + ) -> Self { + SlotPtr { + sb: sb.clone(), + id: tuple_id, + bufaddr, + buflen, + _pin: std::marker::PhantomPinned, + } + } +} + +impl PartialEq for SlotPtr { + fn eq(&self, other: &Self) -> bool { + self.id == other.id + } +} + +impl Eq for SlotPtr {} + +impl Hash for SlotPtr { + fn hash(&self, state: &mut H) { + self.id.hash(state) + } +} + +impl SlotPtr { + pub fn id(&self) -> TupleId { + self.id + } + + fn buffer(&self) -> &[u8] { + let buf_addr = self.bufaddr.load(SeqCst); + unsafe { std::slice::from_raw_parts(buf_addr, self.buflen) } + } + + pub fn byte_source(&self) -> Box { + Box::new(SlotByteSource { + ptr: AtomicPtr::new((self as *const SlotPtr) as *mut SlotPtr), + }) + } + + pub fn upcount(&self) { + self.sb.upcount(self.id).unwrap(); + } + + pub fn dncount(&self) { + self.sb.dncount(self.id).unwrap(); + } +} + +/// So we can build SliceRefs off of SlotPtrs +pub struct SlotByteSource { + ptr: AtomicPtr, +} + +impl ByteSource for SlotByteSource { + fn as_slice(&self) -> &[u8] { + let ptr = self.ptr.load(SeqCst); + let buffer = (unsafe { &(*ptr) }).buffer(); + buffer + } + + fn len(&self) -> usize { + let ptr = self.ptr.load(SeqCst); + let buffer = (unsafe { &(*ptr) }).buffer(); + buffer.len() + } + + fn touch(&self) {} +} diff --git a/crates/db/src/tuplebox/tuples/slotbox.rs b/crates/db/src/tuplebox/tuples/slotbox.rs index cc0823ae..f81f9dbd 100644 --- a/crates/db/src/tuplebox/tuples/slotbox.rs +++ b/crates/db/src/tuplebox/tuples/slotbox.rs @@ -29,24 +29,26 @@ // TODO: rename me, _I_ am the tuplebox. The "slots" are just where my tuples get stored. use std::cmp::max; +use std::collections::HashMap; use std::pin::Pin; use std::sync::atomic::AtomicPtr; use std::sync::atomic::Ordering::SeqCst; -use std::sync::Mutex; +use std::sync::{Arc, Mutex}; use sized_chunks::SparseChunk; use thiserror::Error; use tracing::error; use crate::tuplebox::pool::{Bid, BufferPool, PagerError}; +use crate::tuplebox::tuples::slot_ptr::SlotPtr; pub use crate::tuplebox::tuples::slotted_page::SlotId; use crate::tuplebox::tuples::slotted_page::{ slot_index_overhead, slot_page_empty_size, SlottedPage, }; +use crate::tuplebox::tuples::{TupleId, TupleRef}; use crate::tuplebox::RelationId; pub type PageId = usize; -pub type TupleId = (PageId, SlotId); /// A SlotBox is a collection of (variable sized) pages, each of which is a collection of slots, each of which is holds /// dynamically sized tuples. @@ -72,81 +74,65 @@ impl SlotBox { /// Allocates a new slot for a tuple, somewhere in one of the pages we managed. /// Does not allow tuples from different relations to mix on the same page. pub fn allocate( - &self, + self: Arc, size: usize, relation_id: RelationId, initial_value: Option<&[u8]>, - ) -> Result { - // Pick a buffer pool size. If the tuples are small, we use a reasonable sized page that could in theory hold - // a few tuples, but if the tuples are large, we use a page size that might hold only one or two. - // This way really large values can be slotted into the correct page size. - let tuple_size = size + slot_index_overhead(); - let page_size = max(32768, tuple_size.next_power_of_two()); - - let needed_space = size + slot_index_overhead(); - + ) -> Result { let mut inner = self.inner.lock().unwrap(); - // Check if we have a free spot for this relation that can fit the tuple. - let (pid, offset) = - { inner.find_space(relation_id, needed_space, slot_page_empty_size(page_size))? }; - - let mut page_handle = inner.page_for(pid)?; - - let free_space = page_handle.available_content_bytes(); - let mut page_write_lock = page_handle.write_lock(); - if let Ok((slot_id, page_remaining, _)) = page_write_lock.allocate(size, initial_value) { - inner.finish_alloc(pid, relation_id, offset, page_remaining); - return Ok((pid, slot_id)); - } - // If we get here, then we failed to allocate on the page we wanted to, which means there's - // data coherence issues between the pages last-reported free space and the actual free - panic!( - "Page {} failed to allocate, we wanted {} bytes, but it only has {},\ - but our records show it has {}, and its pid in that offset is {:?}", - pid, - size, - free_space, - inner.available_page_space[relation_id.0][offset].available, - inner.available_page_space[relation_id.0][offset].bid - ); + inner.do_alloc(size, relation_id, initial_value, &self) } - pub fn remove(&self, id: TupleId) -> Result<(), SlotBoxError> { + pub(crate) fn load_page)>( + self: Arc, + relation_id: RelationId, + id: PageId, + mut lf: LF, + ) -> Result, SlotBoxError> { let mut inner = self.inner.lock().unwrap(); - inner.do_remove(id) - } - pub fn restore<'a>(&self, id: PageId) -> Result, SlotBoxError> { - let inner = self.inner.lock().unwrap(); - let (addr, page_size) = match inner.pool.restore(Bid(id as u64)) { - Ok(v) => v, - Err(PagerError::CouldNotAccess) => { - return Err(SlotBoxError::TupleNotFound(id)); - } - Err(e) => { - panic!("Unexpected buffer pool error: {:?}", e); - } - }; + // Re-allocate the page. + let page = inner.do_restore_page(id).unwrap(); - Ok(SlottedPage::for_page(addr, page_size)) + // Find all the slots referenced in this page. + let slot_ids = page.load(|buf| { + lf(buf); + }); + + // Now make sure we have swizrefs for all of them. + let mut refs = vec![]; + for (slot, buflen, addr) in slot_ids.into_iter() { + let tuple_id = TupleId { page: id, slot }; + let swizref = Box::pin(SlotPtr::create(self.clone(), tuple_id, addr, buflen)); + inner.swizrefs.insert(tuple_id, swizref); + let swizref = inner.swizrefs.get_mut(&tuple_id).unwrap(); + let sp = unsafe { Pin::into_inner_unchecked(swizref.as_mut()) }; + let ptr = sp as *mut SlotPtr; + let ptr = AtomicPtr::new(ptr); + let tuple_ref = TupleRef::at_ptr(ptr); + refs.push(tuple_ref); + } + // The allocator needs to know that this page is used. + inner.do_mark_page_used(relation_id, page.available_content_bytes(), id); + Ok(refs) } - pub fn page_for<'a>(&self, id: PageId) -> Result, SlotBoxError> { + pub(crate) fn page_for<'a>(&self, id: PageId) -> Result, SlotBoxError> { let inner = self.inner.lock().unwrap(); inner.page_for(id) } pub fn upcount(&self, id: TupleId) -> Result<(), SlotBoxError> { let inner = self.inner.lock().unwrap(); - let page_handle = inner.page_for(id.0)?; - page_handle.upcount(id.1) + let page_handle = inner.page_for(id.page)?; + page_handle.upcount(id.slot) } pub fn dncount(&self, id: TupleId) -> Result<(), SlotBoxError> { let mut inner = self.inner.lock().unwrap(); - let page_handle = inner.page_for(id.0)?; - if page_handle.dncount(id.1)? { + let page_handle = inner.page_for(id.page)?; + if page_handle.dncount(id.slot)? { inner.do_remove(id)?; } Ok(()) @@ -154,38 +140,37 @@ impl SlotBox { pub fn get(&self, id: TupleId) -> Result, SlotBoxError> { let inner = self.inner.lock().unwrap(); - let page_handle = inner.page_for(id.0)?; + let page_handle = inner.page_for(id.page)?; let lock = page_handle.read_lock(); - let slc = lock.get_slot(id.1)?; + let slc = lock.get_slot(id.slot)?; Ok(slc) } pub fn update( - &self, + self: Arc, relation_id: RelationId, id: TupleId, new_value: &[u8], - ) -> Result { + ) -> Result, SlotBoxError> { // The lock scope has to be limited here, or we'll deadlock if we need to re-allocate. { let mut inner = self.inner.lock().unwrap(); - let mut page_handle = inner.page_for(id.0)?; + let mut page_handle = inner.page_for(id.page)?; // If the value size is the same as the old value, we can just update in place, otherwise // it's a brand new allocation, and we have to remove the old one first. let mut page_write = page_handle.write_lock(); - let mut existing = page_write.get_slot_mut(id.1).expect("Invalid tuple id"); - // let mut existing = page_handle.get_mut(id.1).expect("Invalid tuple id"); + let mut existing = page_write.get_slot_mut(id.slot).expect("Invalid tuple id"); if existing.len() == new_value.len() { existing.copy_from_slice(new_value); - return Ok(id); + return Ok(None); } inner.do_remove(id)?; } - let new_id = self.allocate(new_value.len(), relation_id, Some(new_value))?; - Ok(new_id) + let new_tup = self.allocate(new_value.len(), relation_id, Some(new_value))?; + Ok(Some(new_tup)) } pub fn update_with)>( @@ -194,10 +179,11 @@ impl SlotBox { mut f: F, ) -> Result<(), SlotBoxError> { let inner = self.inner.lock().unwrap(); - let mut page_handle = inner.page_for(id.0)?; - + let mut page_handle = inner.page_for(id.page)?; let mut page_write = page_handle.write_lock(); - let existing = page_write.get_slot_mut(id.1).expect("Invalid tuple id"); + + let existing = page_write.get_slot_mut(id.slot).expect("Invalid tuple id"); + f(existing); Ok(()) } @@ -221,30 +207,6 @@ impl SlotBox { ) .collect() } - - pub fn mark_page_used(&self, relation_id: RelationId, free_space: usize, pid: PageId) { - let mut allocator = self.inner.lock().unwrap(); - - let bid = Bid(pid as u64); - let Some(available_page_space) = allocator.available_page_space.get_mut(relation_id.0) - else { - allocator.available_page_space.insert( - relation_id.0, - vec![PageSpace { - available: free_space, - bid, - }], - ); - return; - }; - - // allocator.bitmap.insert(pid as usize); - available_page_space.push(PageSpace { - available: free_space, - bid, - }); - available_page_space.sort_by(|a, b| a.available.cmp(&b.available)); - } } struct PageSpace { @@ -259,6 +221,7 @@ struct Inner { // Will look into it once/if benchmarking justifies it. // The set of used pages, indexed by relation, in sorted order of the free space available in them. available_page_space: SparseChunk, 64>, + swizrefs: HashMap>>, } impl Inner { @@ -266,15 +229,105 @@ impl Inner { Self { available_page_space: SparseChunk::new(), pool, + swizrefs: HashMap::new(), } } + fn do_alloc( + &mut self, + size: usize, + relation_id: RelationId, + initial_value: Option<&[u8]>, + sb: &Arc, + ) -> Result { + let tuple_size = size + slot_index_overhead(); + let page_size = max(32768, tuple_size.next_power_of_two()); + + // Check if we have a free spot for this relation that can fit the tuple. + let (page, offset) = + { self.find_space(relation_id, tuple_size, slot_page_empty_size(page_size))? }; + + let mut page_handle = self.page_for(page)?; + + let free_space = page_handle.available_content_bytes(); + let mut page_write_lock = page_handle.write_lock(); + if let Ok((slot, page_remaining, mut buf)) = page_write_lock.allocate(size, initial_value) { + self.finish_alloc(page, relation_id, offset, page_remaining); + + // Make a swizzlable ptr reference and shove it in our set, and then return a tuple ref + // which has a ptr to it. + let buflen = buf.as_ref().len(); + let bufaddr = AtomicPtr::new(buf.as_mut_ptr()); + let tuple_id = TupleId { page, slot }; + + // Heap allocate the swizref, and and pin it, take the address of it, then stick the swizref + // in our set. + let mut swizref = Box::pin(SlotPtr::create(sb.clone(), tuple_id, bufaddr, buflen)); + let swizaddr = unsafe { swizref.as_mut().get_unchecked_mut() } as *mut SlotPtr; + self.swizrefs.insert(tuple_id, swizref); + + // Establish initial refcount using this existing lock. + page_write_lock.upcount(slot).unwrap(); + + return Ok(TupleRef::at_ptr(AtomicPtr::new(swizaddr))); + } + + // If we get here, then we failed to allocate on the page we wanted to, which means there's + // data coherence issues between the pages last-reported free space and the actual free + panic!( + "Page {} failed to allocate, we wanted {} bytes, but it only has {},\ + but our records show it has {}, and its pid in that offset is {:?}", + page, + size, + free_space, + self.available_page_space[relation_id.0][offset].available, + self.available_page_space[relation_id.0][offset].bid + ); + } + + fn do_restore_page<'a>(&mut self, id: PageId) -> Result, SlotBoxError> { + let (addr, page_size) = match self.pool.restore(Bid(id as u64)) { + Ok(v) => v, + Err(PagerError::CouldNotAccess) => { + return Err(SlotBoxError::TupleNotFound(id)); + } + Err(e) => { + panic!("Unexpected buffer pool error: {:?}", e); + } + }; + + Ok(SlottedPage::for_page(addr, page_size)) + } + + fn do_mark_page_used(&mut self, relation_id: RelationId, free_space: usize, pid: PageId) { + let bid = Bid(pid as u64); + let Some(available_page_space) = self.available_page_space.get_mut(relation_id.0) else { + self.available_page_space.insert( + relation_id.0, + vec![PageSpace { + available: free_space, + bid, + }], + ); + return; + }; + + // allocator.bitmap.insert(pid as usize); + available_page_space.push(PageSpace { + available: free_space, + bid, + }); + available_page_space.sort_by(|a, b| a.available.cmp(&b.available)); + } fn do_remove(&mut self, id: TupleId) -> Result<(), SlotBoxError> { - let mut page_handle = self.page_for(id.0)?; + let mut page_handle = self.page_for(id.page)?; let mut write_lock = page_handle.write_lock(); - let (new_free, _, is_empty) = write_lock.remove_slot(id.1)?; - self.report_free(id.0, new_free, is_empty); + let (new_free, _, is_empty) = write_lock.remove_slot(id.slot)?; + self.report_free(id.page, new_free, is_empty); + + // TODO: The swizref stays just in case? + // self.swizrefs.remove(&id); Ok(()) } @@ -379,15 +432,13 @@ impl Inner { fn finish_alloc( &mut self, - pid: PageId, + _pid: PageId, relation_id: RelationId, offset: usize, page_remaining_bytes: usize, ) { let available_page_space = &mut self.available_page_space[relation_id.0]; let entry = &mut available_page_space[offset]; - assert!(entry.available >= page_remaining_bytes); - assert_eq!(entry.bid.0, pid as u64); entry.available = page_remaining_bytes; // If we (unlikely) consumed all the bytes, then we can remove the page from the avail pages @@ -441,22 +492,24 @@ impl Inner { mod tests { use rand::distributions::Alphanumeric; use rand::{thread_rng, Rng}; + use std::sync::Arc; - use crate::tuplebox::tuples::slotbox::{SlotBox, SlotBoxError, TupleId}; + use crate::tuplebox::tuples::slotbox::{SlotBox, SlotBoxError}; use crate::tuplebox::tuples::slotted_page::slot_page_empty_size; + use crate::tuplebox::tuples::TupleRef; use crate::tuplebox::RelationId; - fn fill_until_full(sb: &mut SlotBox) -> Vec<(TupleId, Vec)> { + fn fill_until_full(sb: &Arc) -> Vec<(TupleRef, Vec)> { let mut tuples = Vec::new(); // fill until full... (SlotBoxError::BoxFull) loop { let mut rng = thread_rng(); let tuple_len = rng.gen_range(1..(slot_page_empty_size(52000))); - let tuple: Vec = rng.sample_iter(&Alphanumeric).take(tuple_len).collect(); - match sb.allocate(tuple.len(), RelationId(0), Some(&tuple)) { - Ok(tuple_id) => { - tuples.push((tuple_id, tuple)); + let value: Vec = rng.sample_iter(&Alphanumeric).take(tuple_len).collect(); + match TupleRef::allocate(RelationId(0), sb.clone(), 0, &value, &value) { + Ok(tref) => { + tuples.push((tref, value)); } Err(SlotBoxError::BoxFull(_, _)) => { break; @@ -472,19 +525,18 @@ mod tests { // Just allocate a single tuple, and verify that we can retrieve it. #[test] fn test_one_page_one_slot() { - let sb = SlotBox::new(32768 * 64); - let tuple = vec![1, 2, 3, 4, 5]; - let tuple_id = sb - .allocate(tuple.len(), RelationId(0), Some(&tuple)) + let sb = Arc::new(SlotBox::new(32768 * 64)); + let expected_value = vec![1, 2, 3, 4, 5]; + let _retrieved = sb + .clone() + .allocate(expected_value.len(), RelationId(0), Some(&expected_value)) .unwrap(); - let retrieved = sb.get(tuple_id).unwrap(); - assert_eq!(tuple, *retrieved); } - // Fill just one page and verify that we can retrieve all the tuples. + // Fill just one page and verify that we can retrieve them all. #[test] fn test_one_page_a_few_slots() { - let sb = SlotBox::new(32768 * 64); + let sb = Arc::new(SlotBox::new(32768 * 64)); let mut tuples = Vec::new(); let mut last_page_id = None; loop { @@ -492,82 +544,90 @@ mod tests { let tuple_len = rng.gen_range(1..128); let tuple: Vec = rng.sample_iter(&Alphanumeric).take(tuple_len).collect(); let tuple_id = sb + .clone() .allocate(tuple.len(), RelationId(0), Some(&tuple)) .unwrap(); if let Some(last_page_id) = last_page_id { - if last_page_id != tuple_id.0 { + if last_page_id != tuple_id.id() { break; } } - last_page_id = Some(tuple_id.0); + last_page_id = Some(tuple_id.id()); tuples.push((tuple_id, tuple)); } - for (id, tuple) in tuples { - let retrieved = sb.get(id).unwrap(); - assert_eq!(tuple, *retrieved); + for (tuple, expected_value) in tuples { + let retrieved = tuple.slot_buffer(); + assert_eq!(expected_value, retrieved.as_slice()); } } // Fill one page, then overflow into another, and verify we can get the tuple that's on the next page. #[test] fn test_page_overflow() { - let sb = SlotBox::new(32768 * 64); + let sb = Arc::new(SlotBox::new(32768 * 64)); let mut tuples = Vec::new(); let mut first_page_id = None; - let (next_page_tuple_id, next_page_tuple) = loop { + let (next_page_tuple_id, next_page_value) = loop { let mut rng = thread_rng(); let tuple_len = rng.gen_range(1..128); let tuple: Vec = rng.sample_iter(&Alphanumeric).take(tuple_len).collect(); let tuple_id = sb + .clone() .allocate(tuple.len(), RelationId(0), Some(&tuple)) .unwrap(); if let Some(last_page_id) = first_page_id { - if last_page_id != tuple_id.0 { + if last_page_id != tuple_id.id() { break (tuple_id, tuple); } } - first_page_id = Some(tuple_id.0); + first_page_id = Some(tuple_id.id()); tuples.push((tuple_id, tuple)); }; - for (id, tuple) in tuples { - let retrieved = sb.get(id).unwrap(); - assert_eq!(tuple, *retrieved); + for (tuple, expected_value) in tuples { + let retrieved = tuple.slot_buffer(); + assert_eq!(expected_value, retrieved.as_slice()); } // Now verify that the last tuple was on another, new page, and that we can retrieve it. - assert_ne!(next_page_tuple_id.0, first_page_id.unwrap()); - let retrieved = sb.get(next_page_tuple_id).unwrap(); - assert_eq!(*retrieved, next_page_tuple); + assert_ne!(next_page_tuple_id.id(), first_page_id.unwrap()); + let retrieved = next_page_tuple_id.slot_buffer(); + assert_eq!(retrieved.as_slice(), next_page_value); } // Generate a pile of random sized tuples (which accumulate to more than a single page size), // and then scan back and verify their presence/equality. #[test] fn test_basic_add_fill_etc() { - let mut sb = SlotBox::new(32768 * 32); - let tuples = fill_until_full(&mut sb); - for (i, (id, tuple)) in tuples.iter().enumerate() { - let retrieved = sb.get(*id).unwrap(); - assert_eq!(*tuple, *retrieved, "Mismatch at {}th tuple", i); + let mut sb = Arc::new(SlotBox::new(32768 * 32)); + let mut tuples = fill_until_full(&mut sb); + for (i, (tuple, expected_value)) in tuples.iter().enumerate() { + let retrieved = tuple.domain(); + assert_eq!( + *expected_value, + retrieved.as_slice(), + "Mismatch at {}th tuple", + i + ); } let used_pages = sb.used_pages(); assert_ne!(used_pages.len(), tuples.len()); - // Now free them all the tuples. - for (id, _tuple) in tuples { - sb.remove(id).unwrap(); - } + + // Now free all the tuples. This will destroy their refcounts. + tuples.clear(); } // Verify that filling our box up and then emptying it out again works. Should end up with // everything mmap DONTNEED'd, and we should be able to re-fill it again, too. #[test] fn test_full_fill_and_empty() { - let mut sb = SlotBox::new(32768 * 64); - let tuples = fill_until_full(&mut sb); - for (id, _) in &tuples { - sb.remove(*id).unwrap(); - } + let mut sb = Arc::new(SlotBox::new(32768 * 64)); + let mut tuples = fill_until_full(&mut sb); + + // Collect the manual ids of the tuples we've allocated, so we can check them for refcount goodness. + let ids = tuples.iter().map(|(t, _)| t.id()).collect::>(); + tuples.clear(); + // Verify that everything is gone. - for (id, _) in tuples { + for id in ids { assert!(sb.get(id).is_err()); } } @@ -576,20 +636,25 @@ mod tests { // fill back up again and verify the new presence. #[test] fn test_fill_and_free_and_refill_etc() { - let mut sb = SlotBox::new(32768 * 64); + let mut sb = Arc::new(SlotBox::new(32768 * 64)); let mut tuples = fill_until_full(&mut sb); let mut rng = thread_rng(); let mut freed_tuples = Vec::new(); - for _ in 0..tuples.len() / 2 { + + // Pick a bunch of tuples at random to free, and remove them from the tuples set, which should dncount + // them to 0, freeing them. + let to_remove = tuples.len() / 2; + for _ in 0..to_remove { let idx = rng.gen_range(0..tuples.len()); - let (id, tuple) = tuples.remove(idx); - sb.remove(id).unwrap(); - freed_tuples.push((id, tuple)); + let (tuple, value) = tuples.remove(idx); + let id = tuple.id(); + freed_tuples.push((id, value)); } + // What we expected to still be there is there. - for (id, tuple) in &tuples { - let retrieved = sb.get(*id).unwrap(); - assert_eq!(*tuple, *retrieved); + for (tuple, expected_value) in &tuples { + let retrieved = tuple.domain(); + assert_eq!(*expected_value, retrieved.as_slice()); } // What we expected to not be there is not there. for (id, _) in freed_tuples { @@ -598,13 +663,13 @@ mod tests { // Now fill back up again. let new_tuples = fill_until_full(&mut sb); // Verify both the new tuples and the old tuples are there. - for (id, tuple) in new_tuples { - let retrieved = sb.get(id).unwrap(); - assert_eq!(tuple, *retrieved); + for (tuple, expected) in new_tuples { + let retrieved = tuple.domain(); + assert_eq!(expected, retrieved.as_slice()); } - for (id, tuple) in tuples { - let retrieved = sb.get(id).unwrap(); - assert_eq!(tuple, *retrieved); + for (tuple, expected) in tuples { + let retrieved = tuple.domain(); + assert_eq!(expected, retrieved.as_slice()); } } } diff --git a/crates/db/src/tuplebox/tuples/slotted_page.rs b/crates/db/src/tuplebox/tuples/slotted_page.rs index 2f5f3b76..f68a8296 100644 --- a/crates/db/src/tuplebox/tuples/slotted_page.rs +++ b/crates/db/src/tuplebox/tuples/slotted_page.rs @@ -32,11 +32,10 @@ use std::pin::Pin; use std::sync::atomic::Ordering::{Acquire, Relaxed, Release, SeqCst}; use std::sync::atomic::{AtomicPtr, AtomicU16, AtomicU32}; +use crate::tuplebox::tuples::slotbox::SlotBoxError; use atomic_wait::{wait, wake_all, wake_one}; use tracing::error; -use crate::tuplebox::tuples::slotbox::SlotBoxError; - pub type SlotId = usize; // Note that if a page is empty, either because it's new, or because all its slots have been @@ -120,12 +119,12 @@ struct SlotIndexEntry { // The number of live references to this slot refcount: AtomicU16, // The offset of the slot in the content region - offset: u16, + offset: u32, // The allocated length of the slot in the content region. This remains constant for the life // of the slot, even if it is freed and re-used. - allocated: u16, + allocated: u32, // The actual in-use length of the data. When a slot is freed, this is set to 0. - used_bytes: u16, + used_bytes: u32, _pin: std::marker::PhantomPinned, } @@ -135,11 +134,11 @@ impl SlotIndexEntry { fn alloc(mut self: Pin<&mut Self>, content_position: usize, size: usize) { unsafe { let index_entry = self.as_mut().get_unchecked_mut(); - index_entry.offset = content_position as u16; + index_entry.offset = content_position as u32; // Net-new slots always have their full size used in their new index entry. - index_entry.used_bytes = size as u16; - index_entry.allocated = size as u16; + index_entry.used_bytes = size as u32; + index_entry.allocated = size as u32; index_entry.refcount = AtomicU16::new(0); index_entry.used = true; } @@ -150,7 +149,7 @@ impl SlotIndexEntry { unsafe { let entry = self.as_mut().get_unchecked_mut(); entry.used = true; - entry.used_bytes = size as u16; + entry.used_bytes = size as u32; } } @@ -201,6 +200,7 @@ impl<'a> SlottedPage<'a> { } /// How much space is available in this page? + #[allow(dead_code)] pub fn free_space_bytes(&self) -> usize { let header = self.header(); let used = (header.num_slots * std::mem::size_of::() as u32) as usize @@ -300,7 +300,10 @@ impl<'a> SlottedPage<'a> { /// Load into this page from an external byte source, which is assumed to be in our page /// format, and then reset all refcounts to 0, clear lock state, and return the set of all valid /// slot IDs. - pub fn load)>(&self, mut lf: LF) -> Vec { + pub(crate) fn load)>( + &self, + mut lf: LF, + ) -> Vec<(SlotId, usize, AtomicPtr)> { // First copy in the physical bytes into our address. let memory_as_slice = unsafe { Pin::new_unchecked(std::slice::from_raw_parts_mut( @@ -315,17 +318,24 @@ impl<'a> SlottedPage<'a> { header.lock_state.store(0, SeqCst); header.writer_wake_counter.store(0, SeqCst); - // Now reset all the refcounts to 0, and collect the list of all active slots., - let mut slot_ids = vec![]; + // Now reset all the refcounts to 1, and collect the list of all active slots., + let mut slots = vec![]; let num_slots = header.num_slots; for i in 0..num_slots { let index_entry = self.get_index_entry_mut(i as usize); if index_entry.used { - index_entry.refcount.store(0, SeqCst); - slot_ids.push(i as SlotId); + index_entry.refcount.store(1, SeqCst); + let slot_id = i as SlotId; + let ptr = unsafe { + self.base_address + .load(SeqCst) + .offset(index_entry.offset as isize) + }; + let ptr = AtomicPtr::new(ptr); + slots.push((slot_id, index_entry.used_bytes as usize, ptr)); } } - slot_ids + slots } /// Copy the contents of this page into a slice. @@ -421,23 +431,17 @@ impl<'a> SlottedPage<'a> { let offset = index_entry.offset as usize; let length = index_entry.used_bytes as usize; + assert!( + offset + length <= self.page_size, + "slot {} is out of bounds", + slot_id + ); + let memory_as_slice = unsafe { std::slice::from_raw_parts_mut(self.base_address.load(SeqCst), self.page_size) }; Ok(unsafe { Pin::new_unchecked(&mut memory_as_slice[offset..offset + length]) }) } - - pub fn num_active_slots(&self) -> usize { - let ns = self.header().num_slots as usize; - let mut total = 0; - for i in 0..ns { - let index_entry = self.get_index_entry(i); - if index_entry.used { - total += 1; - } - } - total - } } impl<'a> SlottedPage<'a> { @@ -607,10 +611,6 @@ pub struct PageWriteGuard<'a> { } impl<'a> PageWriteGuard<'a> { - pub fn page_ptr(&self) -> *mut u8 { - self.base_address - } - pub fn get_slot_mut(&mut self, slot_id: SlotId) -> Result, SlotBoxError> { let sp = SlottedPage { base_address: AtomicPtr::new(self.base_address), @@ -641,6 +641,15 @@ impl<'a> PageWriteGuard<'a> { sp.remove_slot(slot_id) } + pub fn upcount(&mut self, slot_id: SlotId) -> Result<(), SlotBoxError> { + let sp = SlottedPage { + base_address: AtomicPtr::new(self.base_address), + page_size: self.page_size, + _marker: Default::default(), + }; + sp.upcount(slot_id) + } + fn header_mut(&self) -> Pin<&mut SlottedPageHeader> { let header_ptr = self.base_address as *mut SlottedPageHeader; diff --git a/crates/db/src/tuplebox/tuples/tuple.rs b/crates/db/src/tuplebox/tuples/tuple.rs index 419ffca1..37145584 100644 --- a/crates/db/src/tuplebox/tuples/tuple.rs +++ b/crates/db/src/tuplebox/tuples/tuple.rs @@ -13,24 +13,39 @@ // use binary_layout::define_layout; +use std::hash::{Hash, Hasher}; +use std::pin::Pin; +use std::sync::atomic::AtomicPtr; +use std::sync::atomic::Ordering::SeqCst; use std::sync::Arc; use crate::tuplebox::RelationId; use moor_values::util::slice_ref::SliceRef; -use crate::tuplebox::tuples::TupleRef; -use crate::tuplebox::tuples::{SlotBox, TupleId}; +use crate::tuplebox::tuples::slot_ptr::SlotPtr; +use crate::tuplebox::tuples::{SlotBox, SlotBoxError, TupleId}; + +const MAGIC_MARKER: u32 = 0xcafebabe; define_layout!(tuple_header, LittleEndian, { + magic_marker: u32, ts: u64, domain_size: u32, codomain_size: u32, }); -#[derive(Clone)] -pub struct Tuple(TupleRef); +pub struct TupleRef { + sp: AtomicPtr, +} -impl Tuple { +impl TupleRef { + // Wrap an existing SlotPtr. + // Note: to avoid deadlocking at construction, assumes that the tuple is already upcounted by the + // caller. + pub(crate) fn at_ptr(sp: AtomicPtr) -> Self { + + Self { sp } + } /// Allocate the given tuple in a slotbox. pub fn allocate( relation_id: RelationId, @@ -38,71 +53,126 @@ impl Tuple { ts: u64, domain: &[u8], codomain: &[u8], - ) -> TupleRef { + ) -> Result { let total_size = tuple_header::SIZE.unwrap() + domain.len() + codomain.len(); - let tuple_id = sb - .allocate(total_size, relation_id, None) - .expect("Failed to allocate tuple"); - sb.update_with(tuple_id, |mut buffer| { + let tuple_ref = sb.clone().allocate(total_size, relation_id, None)?; + sb.update_with(tuple_ref.id(), |mut buffer| { + let domain_len = domain.len(); + let codomain_len = codomain.len(); { let mut header = tuple_header::View::new(buffer.as_mut().get_mut()); + header.magic_marker_mut().write(MAGIC_MARKER); header.ts_mut().write(ts); - header.domain_size_mut().write(domain.len() as u32); - header.codomain_size_mut().write(codomain.len() as u32); + header.domain_size_mut().write(domain_len as u32); + header.codomain_size_mut().write(codomain_len as u32); } let start_pos = tuple_header::SIZE.unwrap(); - buffer[start_pos..start_pos + domain.len()].copy_from_slice(domain); - buffer[start_pos + domain.len()..].copy_from_slice(codomain); - }) - .unwrap(); - TupleRef::new(sb.clone(), tuple_id) + buffer[start_pos..start_pos + domain_len].copy_from_slice(domain); + buffer[start_pos + domain_len..].copy_from_slice(codomain); + })?; + Ok(tuple_ref) } - fn buffer(&self) -> SliceRef { - SliceRef::from_byte_source(Box::new(self.0.clone())) + pub fn id(&self) -> TupleId { + self.resolve_slot_ptr().as_ref().id() } + /// Update the timestamp of the tuple. pub fn update_timestamp(&self, relation_id: RelationId, sb: Arc, ts: u64) { - let mut buffer = self.buffer().as_slice().to_vec(); + let mut buffer = self.slot_buffer().as_slice().to_vec(); let mut header = tuple_header::View::new(&mut buffer); header.ts_mut().write(ts); - let id = self.0.id; - let new_id = sb - .update(relation_id, self.0.id, buffer.as_slice()) - .unwrap(); - assert_eq!(id, new_id); - } - - pub fn from_tuple_id(sb: Arc, tuple_id: TupleId) -> Self { - Self(TupleRef::new(sb, tuple_id)) + let id = self.resolve_slot_ptr().as_ref().id(); + // The update method will return a new tuple ID if the tuple is moved, and it should *not* + // for timestamp updates. + assert!(sb + .update(relation_id, id, buffer.as_slice()) + .unwrap() + .is_none()); } + /// The timestamp of the tuple. pub fn ts(&self) -> u64 { - let buffer = self.buffer(); - tuple_header::View::new(buffer.as_slice()).ts().read() + let buffer = self.slot_buffer(); + let header = tuple_header::View::new(buffer.as_slice()); + assert_eq!(header.magic_marker().read(), MAGIC_MARKER); + header.ts().read() } + /// The domain of the tuple. pub fn domain(&self) -> SliceRef { - let buffer = self.buffer(); - let domain_size = tuple_header::View::new(buffer.as_slice()) - .domain_size() - .read(); + let buffer = self.slot_buffer(); + let header = tuple_header::View::new(buffer.as_slice()); + assert_eq!(header.magic_marker().read(), MAGIC_MARKER); + let domain_size = header.domain_size().read(); buffer .slice(tuple_header::SIZE.unwrap()..tuple_header::SIZE.unwrap() + domain_size as usize) } + /// The codomain of the tuple. pub fn codomain(&self) -> SliceRef { - let buffer = self.buffer(); - - let domain_size = tuple_header::View::new(buffer.as_slice()) - .domain_size() - .read() as usize; - let codomain_size = tuple_header::View::new(buffer.as_slice()) - .codomain_size() - .read() as usize; + let buffer = self.slot_buffer(); + let header = tuple_header::View::new(buffer.as_slice()); + assert_eq!(header.magic_marker().read(), MAGIC_MARKER); + let domain_size = header.domain_size().read() as usize; + let codomain_size = header.codomain_size().read() as usize; buffer.slice( tuple_header::SIZE.unwrap() + domain_size ..tuple_header::SIZE.unwrap() + domain_size + codomain_size, ) } + + /// The raw buffer of the tuple, including the header, not dividing up the domain and codomain. + pub fn slot_buffer(&self) -> SliceRef { + let slot_ptr = self.resolve_slot_ptr(); + SliceRef::from_byte_source(slot_ptr.byte_source()) + } +} + +impl TupleRef { + fn resolve_slot_ptr(&self) -> Pin<&mut SlotPtr> { + unsafe { Pin::new_unchecked(&mut *self.sp.load(SeqCst)) } + } + + fn upcount(&self) { + let slot_ptr = self.resolve_slot_ptr(); + slot_ptr.upcount(); + } + + fn dncount(&self) { + let slot_ptr = self.resolve_slot_ptr(); + slot_ptr.dncount(); + } +} + +impl Hash for TupleRef { + fn hash(&self, state: &mut H) { + let id = self.id(); + id.hash(state); + } +} + +impl PartialEq for TupleRef { + fn eq(&self, other: &Self) -> bool { + let (id, other_id) = (self.id(), other.id()); + id == other_id + } +} + +impl Eq for TupleRef {} + +impl Drop for TupleRef { + fn drop(&mut self) { + self.dncount() + } +} + +impl Clone for TupleRef { + fn clone(&self) -> Self { + self.upcount(); + let addr = self.sp.load(SeqCst); + Self { + sp: AtomicPtr::new(addr), + } + } } diff --git a/crates/db/src/tuplebox/tuples/tuple_ref.rs b/crates/db/src/tuplebox/tuples/tuple_ref.rs deleted file mode 100644 index 06f7871a..00000000 --- a/crates/db/src/tuplebox/tuples/tuple_ref.rs +++ /dev/null @@ -1,64 +0,0 @@ -use crate::tuplebox::tuples::{SlotBox, Tuple, TupleId}; -use moor_values::util::slice_ref::ByteSource; -use std::hash::{Hash, Hasher}; -use std::sync::Arc; - -pub struct TupleRef { - sb: Arc, - pub id: TupleId, -} - -impl PartialEq for TupleRef { - fn eq(&self, other: &Self) -> bool { - self.id == other.id - } -} - -impl Eq for TupleRef {} - -impl Hash for TupleRef { - fn hash(&self, state: &mut H) { - let slc = self.sb.get(self.id).expect("Unable to get tuple"); - slc.hash(state) - } -} -impl TupleRef { - pub fn new(sb: Arc, id: TupleId) -> Self { - sb.upcount(id).expect("Unable to add tuple"); - Self { sb, id } - } - - pub fn get(&self) -> Tuple { - Tuple::from_tuple_id(self.sb.clone(), self.id) - } -} - -impl Drop for TupleRef { - fn drop(&mut self) { - self.sb.dncount(self.id).expect("Unable to dncount tuple"); - } -} - -impl Clone for TupleRef { - fn clone(&self) -> Self { - self.sb.upcount(self.id).expect("Unable to upcount tuple"); - Self { - sb: self.sb.clone(), - id: self.id, - } - } -} - -impl ByteSource for TupleRef { - fn as_slice(&self) -> &[u8] { - self.sb.get(self.id).expect("Unable to get tuple").get_ref() - } - - fn len(&self) -> usize { - self.as_slice().len() - } - - fn touch(&self) { - // noop - } -} diff --git a/crates/db/src/tuplebox/tuples/tx_tuple.rs b/crates/db/src/tuplebox/tuples/tx_tuple.rs index 8a0b9498..360a3bb1 100644 --- a/crates/db/src/tuplebox/tuples/tx_tuple.rs +++ b/crates/db/src/tuplebox/tuples/tx_tuple.rs @@ -36,9 +36,7 @@ pub enum TxTuple { impl TxTuple { pub fn domain(&self) -> SliceRef { match self { - TxTuple::Insert(tref) | TxTuple::Update(tref) | TxTuple::Value(tref) => { - tref.get().domain() - } + TxTuple::Insert(tref) | TxTuple::Update(tref) | TxTuple::Value(tref) => tref.domain(), TxTuple::Tombstone { ts: _, tuple_id: _, @@ -48,7 +46,7 @@ impl TxTuple { } pub fn tuple_id(&self) -> TupleId { match self { - TxTuple::Insert(tref) | TxTuple::Update(tref) | TxTuple::Value(tref) => tref.id, + TxTuple::Insert(tref) | TxTuple::Update(tref) | TxTuple::Value(tref) => tref.id(), TxTuple::Tombstone { ts: _, tuple_id: id, @@ -59,7 +57,7 @@ impl TxTuple { pub fn ts(&self) -> u64 { match self { - TxTuple::Insert(tref) | TxTuple::Update(tref) | TxTuple::Value(tref) => tref.get().ts(), + TxTuple::Insert(tref) | TxTuple::Update(tref) | TxTuple::Value(tref) => tref.ts(), TxTuple::Tombstone { ts, tuple_id: _, diff --git a/crates/db/src/tuplebox/tx/transaction.rs b/crates/db/src/tuplebox/tx/transaction.rs index 4e5cbd98..597b583a 100644 --- a/crates/db/src/tuplebox/tx/transaction.rs +++ b/crates/db/src/tuplebox/tx/transaction.rs @@ -341,12 +341,12 @@ impl TransientRelation { &self, domain: SliceRef, ) -> Result<(SliceRef, SliceRef), TupleError> { - let tuple_id = self + let tuple_idx = self .domain_tuples .get(domain.as_slice()) .copied() .ok_or(TupleError::NotFound); - tuple_id.map(|id| self.tuples[id].clone()) + tuple_idx.map(|id| self.tuples[id].clone()) } /// Seek for tuples by their indexed codomain value, if there's an index. Panics if there is no @@ -359,11 +359,11 @@ impl TransientRelation { // We could do full-scan, but in this case we're going to assume that the caller knows // what they're doing. let codomain_domain = self.codomain_domain.as_ref().expect("No codomain index"); - let tuple_ids = codomain_domain + let tuple_indexes = codomain_domain .get(codomain.as_slice()) .cloned() .ok_or(TupleError::NotFound)?; - Ok(tuple_ids + Ok(tuple_indexes .iter() .map(|tid| self.tuples[*tid].clone()) .collect()) @@ -385,10 +385,10 @@ impl TransientRelation { if self.domain_tuples.contains_key(domain.as_slice()) { return Err(TupleError::Duplicate); } - let tuple_id = self.tuples.len(); + let tuple_idx = self.tuples.len(); self.tuples.push((domain.clone(), codomain.clone())); self.domain_tuples - .insert(domain.as_slice().to_vec(), tuple_id) + .insert(domain.as_slice().to_vec(), tuple_idx) .map(|_| ()) .ok_or(TupleError::Duplicate) } @@ -399,15 +399,15 @@ impl TransientRelation { domain: SliceRef, codomain: SliceRef, ) -> Result<(), TupleError> { - let tuple_id = self + let tuple_idx = self .domain_tuples .get(domain.as_slice()) .copied() .ok_or(TupleError::NotFound)?; if self.codomain_domain.is_some() { - self.update_secondary(tuple_id, None, Some(codomain.clone())); + self.update_secondary(tuple_idx, None, Some(codomain.clone())); } - self.tuples[tuple_id] = (domain, codomain); + self.tuples[tuple_idx] = (domain, codomain); Ok(()) } @@ -417,41 +417,41 @@ impl TransientRelation { domain: SliceRef, codomain: SliceRef, ) -> Result<(), TupleError> { - let tuple_id = match self.domain_tuples.get(domain.as_slice()) { - Some(tuple_id) => { - self.tuples[*tuple_id] = (domain, codomain.clone()); - *tuple_id + let tuple_idx = match self.domain_tuples.get(domain.as_slice()) { + Some(tuple_idx) => { + self.tuples[*tuple_idx] = (domain, codomain.clone()); + *tuple_idx } None => { - let tuple_id = self.tuples.len(); + let tuple_idx = self.tuples.len(); self.tuples.push((domain.clone(), codomain.clone())); self.domain_tuples - .insert(domain.as_slice().to_vec(), tuple_id); - tuple_id + .insert(domain.as_slice().to_vec(), tuple_idx); + tuple_idx } }; - self.update_secondary(tuple_id, None, Some(codomain.clone())); + self.update_secondary(tuple_idx, None, Some(codomain.clone())); Ok(()) } /// Remove a tuple from the relation. pub async fn remove_by_domain(&mut self, domain: SliceRef) -> Result<(), TupleError> { - let tuple_id = self + let tuple_idx = self .domain_tuples .remove(domain.as_slice()) .ok_or(TupleError::NotFound)?; if self.codomain_domain.is_some() { - self.update_secondary(tuple_id, None, None); + self.update_secondary(tuple_idx, None, None); } - self.tuples.remove(tuple_id); + self.tuples.remove(tuple_idx); Ok(()) } pub(crate) fn update_secondary( &mut self, - tuple_id: usize, + tuple_idx: usize, old_codomain: Option, new_codomain: Option, ) { @@ -464,13 +464,13 @@ impl TransientRelation { index .entry(old_codomain.as_slice().to_vec()) .or_insert_with(HashSet::new) - .remove(&tuple_id); + .remove(&tuple_idx); } if let Some(new_codomain) = new_codomain { index .entry(new_codomain.as_slice().to_vec()) .or_insert_with(HashSet::new) - .insert(tuple_id); + .insert(tuple_idx); } } } @@ -536,15 +536,14 @@ mod tests { // Verify canonical state. { let relation = &db.canonical.read().await[0]; - let tref = relation + let tuple = relation .seek_by_domain(attr(b"abc")) .expect("Expected tuple to exist"); - let tuple = tref.get(); assert_eq!(tuple.codomain().as_slice(), b"123"); let tuples = relation.seek_by_codomain(attr(b"123")); assert_eq!(tuples.len(), 1); - let tuple = tuples.iter().next().unwrap().get(); + let tuple = tuples.iter().next().unwrap(); assert_eq!(tuple.domain().as_slice(), b"abc"); assert_eq!(tuple.codomain().as_slice(), b"123"); } diff --git a/crates/db/src/tuplebox/tx/working_set.rs b/crates/db/src/tuplebox/tx/working_set.rs index 795e89d3..7e1d3881 100644 --- a/crates/db/src/tuplebox/tx/working_set.rs +++ b/crates/db/src/tuplebox/tx/working_set.rs @@ -19,7 +19,7 @@ use moor_values::util::slice_ref::SliceRef; use crate::tuplebox::tb::{RelationInfo, TupleBox}; use crate::tuplebox::tuples::{SlotBox, TupleError}; -use crate::tuplebox::tuples::{Tuple, TxTuple}; +use crate::tuplebox::tuples::{TupleRef, TxTuple}; use crate::tuplebox::RelationId; /// The local tx "working set" of mutations to base relations, and consists of the set of operations @@ -69,11 +69,10 @@ impl WorkingSet { let relation = &mut self.relations[relation_id.0]; // Check local first. - if let Some(tuple_id) = relation.domain_index.get(domain.as_slice()) { - let local_version = relation.tuples.get(*tuple_id).unwrap(); + if let Some(tuple_idx) = relation.domain_index.get(domain.as_slice()) { + let local_version = relation.tuples.get(*tuple_idx).unwrap(); return match &local_version { TxTuple::Insert(t) | TxTuple::Update(t) | TxTuple::Value(t) => { - let t = t.get(); Ok((t.domain().clone(), t.codomain().clone())) } TxTuple::Tombstone { .. } => Err(TupleError::NotFound), @@ -89,19 +88,18 @@ impl WorkingSet { } }) .await?; - let tuple_id = relation.tuples.len(); + let tuple_idx = relation.tuples.len(); relation.tuples.push(TxTuple::Value(canon_t.clone())); relation .domain_index - .insert(domain.as_slice().to_vec(), tuple_id); - let t = canon_t.get(); + .insert(domain.as_slice().to_vec(), tuple_idx); if let Some(ref mut codomain_index) = relation.codomain_index { codomain_index - .entry(t.codomain().as_slice().to_vec()) + .entry(canon_t.codomain().as_slice().to_vec()) .or_insert_with(HashSet::new) - .insert(tuple_id); + .insert(tuple_idx); } - Ok((t.domain(), t.codomain())) + Ok((canon_t.domain(), canon_t.codomain())) } pub(crate) async fn seek_by_codomain( @@ -131,24 +129,23 @@ impl WorkingSet { }; // By performing the seek, we'll materialize the tuples into our local working set, which // will in turn update the codomain index for those tuples. - for tuples in tuples_for_codomain { + for tuple in tuples_for_codomain { let _ = self - .seek_by_domain(db.clone(), relation_id, tuples.get().domain()) + .seek_by_domain(db.clone(), relation_id, tuple.domain()) .await; } let relation = &mut self.relations[relation_id.0]; let codomain_index = relation.codomain_index.as_ref().expect("No codomain index"); - let tuple_ids = codomain_index + let tuple_indexes = codomain_index .get(codomain.as_slice()) .cloned() .unwrap_or_else(HashSet::new) .into_iter(); - let tuples = tuple_ids.filter_map(|tid| { + let tuples = tuple_indexes.filter_map(|tid| { let t = relation.tuples.get(tid).expect("Tuple not found"); match &t { TxTuple::Insert(t) | TxTuple::Update(t) | TxTuple::Value(t) => { - let t = t.get(); Some((t.domain(), t.codomain())) } TxTuple::Tombstone { .. } => None, @@ -180,19 +177,19 @@ impl WorkingSet { }) .await?; - let tuple_id = relation.tuples.len(); - let new_t = Tuple::allocate( + let tuple_idx = relation.tuples.len(); + let new_t = TupleRef::allocate( relation_id, self.slotbox.clone(), self.ts, domain.as_slice(), codomain.as_slice(), ); - relation.tuples.push(TxTuple::Insert(new_t)); + relation.tuples.push(TxTuple::Insert(new_t.unwrap())); relation .domain_index - .insert(domain.as_slice().to_vec(), tuple_id); - relation.update_secondary(tuple_id, None, Some(codomain.clone())); + .insert(domain.as_slice().to_vec(), tuple_idx); + relation.update_secondary(tuple_idx, None, Some(codomain.clone())); Ok(()) } @@ -210,7 +207,6 @@ impl WorkingSet { let mut by_domain = HashMap::new(); for t in tuples { - let t = t.get(); by_domain.insert(t.domain().as_slice().to_vec(), t); } @@ -225,9 +221,8 @@ impl WorkingSet { } match t { TxTuple::Insert(t) | TxTuple::Update(t) | TxTuple::Value(t) => { - let t = t.get(); if f(&(t.domain(), t.codomain())) { - by_domain.insert(t.domain().as_slice().to_vec(), t); + by_domain.insert(t.domain().as_slice().to_vec(), t.clone()); } else { by_domain.remove(t.domain().as_slice()); } @@ -256,35 +251,33 @@ impl WorkingSet { // If we have an existing copy, we will update it, but keep its existing derivation // timestamp and operation type. - if let Some(tuple_id) = relation.domain_index.get_mut(domain.as_slice()).cloned() { - let existing = relation.tuples.get_mut(tuple_id).expect("Tuple not found"); + if let Some(tuple_idx) = relation.domain_index.get_mut(domain.as_slice()).cloned() { + let existing = relation.tuples.get_mut(tuple_idx).expect("Tuple not found"); let (replacement, old_value) = match &existing { TxTuple::Tombstone { .. } => return Err(TupleError::NotFound), TxTuple::Insert(t) => { - let t = t.get(); - let new_t = Tuple::allocate( + let new_t = TupleRef::allocate( relation_id, self.slotbox.clone(), t.ts(), domain.as_slice(), codomain.as_slice(), ); - (TxTuple::Insert(new_t), (t.domain(), t.codomain())) + (TxTuple::Insert(new_t.unwrap()), (t.domain(), t.codomain())) } TxTuple::Update(t) | TxTuple::Value(t) => { - let t = t.get(); - let new_t = Tuple::allocate( + let new_t = TupleRef::allocate( relation_id, self.slotbox.clone(), t.ts(), domain.as_slice(), codomain.as_slice(), ); - (TxTuple::Update(new_t), (t.domain(), t.codomain())) + (TxTuple::Update(new_t.unwrap()), (t.domain(), t.codomain())) } }; *existing = replacement; - relation.update_secondary(tuple_id, Some(old_value.1), Some(codomain.clone())); + relation.update_secondary(tuple_idx, Some(old_value.1), Some(codomain.clone())); return Ok(()); } @@ -294,7 +287,6 @@ impl WorkingSet { let (old_codomain, ts) = db .with_relation(relation_id, |relation| { if let Some(tuple) = relation.seek_by_domain(domain.clone()) { - let tuple = tuple.get(); Ok((tuple.codomain().clone(), tuple.ts())) } else { Err(TupleError::NotFound) @@ -303,19 +295,19 @@ impl WorkingSet { .await?; // Write into the local copy an update operation. - let tuple_id = relation.tuples.len(); - let new_t = Tuple::allocate( + let tuple_idx = relation.tuples.len(); + let new_t = TupleRef::allocate( relation_id, self.slotbox.clone(), ts, domain.as_slice(), codomain.as_slice(), ); - relation.tuples.push(TxTuple::Update(new_t)); + relation.tuples.push(TxTuple::Update(new_t.unwrap())); relation .domain_index - .insert(domain.as_slice().to_vec(), tuple_id); - relation.update_secondary(tuple_id, Some(old_codomain), Some(codomain.clone())); + .insert(domain.as_slice().to_vec(), tuple_idx); + relation.update_secondary(tuple_idx, Some(old_codomain), Some(codomain.clone())); Ok(()) } @@ -334,34 +326,35 @@ impl WorkingSet { // timestamp. // If it's an insert, we have to keep it an insert, same for update, but if it's a delete, // we have to turn it into an update. - if let Some(tuple_id) = relation.domain_index.get_mut(domain.as_slice()).cloned() { - let existing = relation.tuples.get_mut(tuple_id).expect("Tuple not found"); + if let Some(tuple_idx) = relation.domain_index.get_mut(domain.as_slice()).cloned() { + let existing = relation.tuples.get_mut(tuple_idx).expect("Tuple not found"); let (replacement, old) = match &existing { TxTuple::Insert(t) => { - let t = t.get(); - let new_t = Tuple::allocate( + let new_t = TupleRef::allocate( relation_id, self.slotbox.clone(), t.ts(), domain.as_slice(), codomain.as_slice(), ); - (TxTuple::Insert(new_t), Some((t.domain(), t.codomain()))) + ( + TxTuple::Insert(new_t.unwrap()), + Some((t.domain(), t.codomain())), + ) } TxTuple::Tombstone { ts, .. } => { // We need to allocate a new tuple... - let new_t = Tuple::allocate( + let new_t = TupleRef::allocate( relation_id, self.slotbox.clone(), *ts, domain.as_slice(), codomain.as_slice(), ); - (TxTuple::Update(new_t), None) + (TxTuple::Update(new_t.unwrap()), None) } - TxTuple::Update(t) | TxTuple::Value(t) => { - let tuple = t.get(); - let new_t = Tuple::allocate( + TxTuple::Update(tuple) | TxTuple::Value(tuple) => { + let new_t = TupleRef::allocate( relation_id, self.slotbox.clone(), tuple.ts(), @@ -369,13 +362,13 @@ impl WorkingSet { codomain.as_slice(), ); ( - TxTuple::Update(new_t), + TxTuple::Update(new_t.unwrap()), Some((tuple.domain(), tuple.codomain())), ) } }; *existing = replacement; - relation.update_secondary(tuple_id, old.map(|o| o.1), Some(codomain.clone())); + relation.update_secondary(tuple_idx, old.map(|o| o.1), Some(codomain.clone())); return Ok(()); } @@ -386,8 +379,7 @@ impl WorkingSet { let (operation, old) = db .with_relation(relation_id, |relation| { if let Some(tuple) = relation.seek_by_domain(domain.clone()) { - let tuple = tuple.get(); - let new_t = Tuple::allocate( + let new_t = TupleRef::allocate( relation_id, self.slotbox.clone(), tuple.ts(), @@ -395,29 +387,29 @@ impl WorkingSet { codomain.as_slice(), ); ( - TxTuple::Update(new_t), + TxTuple::Update(new_t.unwrap()), Some((tuple.domain(), tuple.codomain())), ) } else { - let new_t = Tuple::allocate( + let new_t = TupleRef::allocate( relation_id, self.slotbox.clone(), self.ts, domain.as_slice(), codomain.as_slice(), ); - (TxTuple::Insert(new_t), None) + (TxTuple::Insert(new_t.unwrap()), None) } }) .await; - let tuple_id = relation.tuples.len(); + let tuple_idx = relation.tuples.len(); relation.tuples.push(operation); relation .domain_index - .insert(domain.as_slice().to_vec(), tuple_id); + .insert(domain.as_slice().to_vec(), tuple_idx); // Remove the old codomain->domain index entry if it exists, and then add the new one. - relation.update_secondary(tuple_id, old.map(|o| o.0), Some(codomain.clone())); + relation.update_secondary(tuple_idx, old.map(|o| o.0), Some(codomain.clone())); Ok(()) } @@ -440,7 +432,6 @@ impl WorkingSet { let old_v = match &tuple_v { TxTuple::Insert(t) | TxTuple::Update(t) | TxTuple::Value(t) => { - let t = t.get(); (t.domain(), t.codomain()) } TxTuple::Tombstone { .. } => { @@ -456,28 +447,26 @@ impl WorkingSet { return Ok(()); } - let (ts, old, tuple_id) = db + let (ts, old_codomain, tuple) = db .with_relation(relation_id, |relation| { if let Some(tuple) = relation.seek_by_domain(domain.clone()) { - let id = tuple.id; - let tuple = tuple.get(); - Ok((tuple.ts(), tuple.codomain().clone(), id)) + Ok((tuple.ts(), tuple.codomain().clone(), tuple)) } else { Err(TupleError::NotFound) } }) .await?; - let local_tuple_id = relation.tuples.len(); + let local_tuple_idx = relation.tuples.len(); relation.tuples.push(TxTuple::Tombstone { ts, domain: domain.clone(), - tuple_id, + tuple_id: tuple.id(), }); relation .domain_index - .insert(domain.as_slice().to_vec(), local_tuple_id); - relation.update_secondary(local_tuple_id, Some(old), None); + .insert(domain.as_slice().to_vec(), local_tuple_idx); + relation.update_secondary(local_tuple_idx, Some(old_codomain), None); Ok(()) } } diff --git a/crates/db/tests/tb_restore.rs b/crates/db/tests/tb_restore.rs index 90f6e96b..26647f4a 100644 --- a/crates/db/tests/tb_restore.rs +++ b/crates/db/tests/tb_restore.rs @@ -123,7 +123,7 @@ mod test { let r_tups: Vec<_> = db .with_relation(relation, |r| { let tuples = r.predicate_scan(&|_| true); - tuples.iter().map(|t| to_val(t.get().domain())).collect() + tuples.iter().map(|t| to_val(t.domain())).collect() }) .await; expected.insert(relation, r_tups); @@ -159,7 +159,7 @@ mod test { for t in expected_tuples { let t = from_val(*t); let v = r.seek_by_domain(t).unwrap(); - tups.push(to_val(v.get().domain())); + tups.push(to_val(v.domain())); } tups }) diff --git a/crates/values/src/util/slice_ref.rs b/crates/values/src/util/slice_ref.rs index affb7cd9..18deb88f 100644 --- a/crates/values/src/util/slice_ref.rs +++ b/crates/values/src/util/slice_ref.rs @@ -96,6 +96,7 @@ impl SliceRef { b.as_slice() })) } + #[must_use] pub fn from_bytes(buf: &[u8]) -> Self { Self(Yoke::attach_to_cart(