Skip to content

Commit

Permalink
Optimize tuple access and make room for page-outs
Browse files Browse the repository at this point in the history
Remove the pointless duplication of TupleRef & Tuple, and then
makes TupleRef hold a pointer to a backing "SlotPtr" entity which
points directly to the actual underlying tuple data in the buffer pool.

This acts both as an optimization -- avoiding a call through to the
"slotbox" each time to retrieve a tuple's value and giving a 20%
improvement in throughput in the db benchmark -- and a potential
future site for handling the swizzling of paged-out tuples. When
paging out, it should be possible to rewrite the SlotPtr at will, to
handle the movement of pages in and out of the buffer pool.
  • Loading branch information
rdaum committed Jan 5, 2024
1 parent 734a5ca commit 53cf13f
Show file tree
Hide file tree
Showing 16 changed files with 636 additions and 455 deletions.
43 changes: 17 additions & 26 deletions crates/db/src/tuplebox/base_relation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ use std::sync::Arc;

use moor_values::util::slice_ref::SliceRef;

use crate::tuplebox::tuples::SlotBox;
use crate::tuplebox::tuples::TupleRef;
use crate::tuplebox::tuples::{SlotBox, TupleId};
use crate::tuplebox::RelationId;

/// Represents a 'canonical' base binary relation, which is a set of tuples of domain, codomain,
Expand Down Expand Up @@ -71,38 +71,35 @@ impl BaseRelation {
panic!("Relation already has a secondary index");
}
self.index_codomain = Some(im::HashMap::new());
for tuple_ref in self.tuples.iter() {
let tuple = tuple_ref.get();
for tuple in self.tuples.iter() {
// ... update the secondary index.
self.index_codomain
.as_mut()
.unwrap()
.entry(tuple.codomain().as_slice().to_vec())
.or_default()
.insert(tuple_ref.clone());
.insert(tuple.clone());
}
}

/// Establish indexes on a tuple initial-loaded from secondary storage. Basically a, "trust us,
/// Establish indexes for a tuple initial-loaded from secondary storage. Basically a, "trust us,
/// this exists" move.
pub fn index_tuple(&mut self, tuple_id: TupleId) {
let tuple_ref = TupleRef::new(self.slotbox.clone(), tuple_id);
self.tuples.insert(tuple_ref.clone());
let tuple = tuple_ref.get();
pub fn index_tuple(&mut self, tuple: TupleRef) {
self.tuples.insert(tuple.clone());

// Reset timestamp to 0, since this is a tuple initial-loaded from secondary storage.
tuple.update_timestamp(self.id, self.slotbox.clone(), 0);

// Update the domain index to point to the tuple...
self.index_domain
.insert(tuple.domain().as_slice().to_vec(), tuple_ref.clone());
.insert(tuple.domain().as_slice().to_vec(), tuple.clone());

// ... and update the secondary index if there is one.
if let Some(index) = &mut self.index_codomain {
index
.entry(tuple.codomain().as_slice().to_vec())
.or_insert_with(HashSet::new)
.insert(tuple_ref);
.insert(tuple);
}
}

Expand All @@ -113,10 +110,7 @@ impl BaseRelation {
pub fn predicate_scan<F: Fn(&(SliceRef, SliceRef)) -> bool>(&self, f: &F) -> HashSet<TupleRef> {
self.tuples
.iter()
.filter(|t| {
let t = t.get();
f(&(t.domain(), t.codomain()))
})
.filter(|t| f(&(t.domain(), t.codomain())))
.cloned()
.collect()
}
Expand Down Expand Up @@ -148,41 +142,38 @@ impl BaseRelation {
}

/// Update or insert a tuple into the relation.
pub fn upsert_tuple(&mut self, new_tuple_ref: TupleRef) {
let tuple = new_tuple_ref.get();
pub fn upsert_tuple(&mut self, tuple: TupleRef) {
// First check the domain->tuple id index to see if we're inserting or updating.
let existing_tuple_ref = self.index_domain.get(tuple.domain().as_slice()).cloned();
match existing_tuple_ref {
None => {
// Insert into the tuple list and the index.
self.index_domain
.insert(tuple.domain().as_slice().to_vec(), new_tuple_ref.clone());
self.tuples.insert(new_tuple_ref.clone());
.insert(tuple.domain().as_slice().to_vec(), tuple.clone());
self.tuples.insert(tuple.clone());
if let Some(codomain_index) = &mut self.index_codomain {
codomain_index
.entry(tuple.codomain().as_slice().to_vec())
.or_insert_with(HashSet::new)
.insert(new_tuple_ref);
.insert(tuple);
}
}
Some(existing_tuple) => {
// We need the old value so we can update the codomain index.
let old_value = existing_tuple.get();

if let Some(codomain_index) = &mut self.index_codomain {
codomain_index
.entry(old_value.codomain().as_slice().to_vec())
.entry(existing_tuple.codomain().as_slice().to_vec())
.or_insert_with(HashSet::new)
.remove(&existing_tuple);
codomain_index
.entry(tuple.codomain().as_slice().to_vec())
.or_insert_with(HashSet::new)
.insert(new_tuple_ref.clone());
.insert(tuple.clone());
}
self.index_domain
.insert(tuple.domain().as_slice().to_vec(), new_tuple_ref.clone());
.insert(tuple.domain().as_slice().to_vec(), tuple.clone());
self.tuples.remove(&existing_tuple);
self.tuples.insert(new_tuple_ref);
self.tuples.insert(tuple);
}
}
}
Expand Down
47 changes: 23 additions & 24 deletions crates/db/src/tuplebox/coldstorage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ impl ColdStorage {
let page_storage = Arc::new(Mutex::new(PageStore::new(path.join("pages"), &eventfd)));
let wal_manager = WalManager {
page_storage: page_storage.clone(),
slot_box: slot_box.clone(),
};
// Do initial recovery of anything left in the WAL before starting up, which should
// flush everything to page storage, from which we can then go and load it.
Expand Down Expand Up @@ -107,27 +106,27 @@ impl ColdStorage {
let mut restored_slots = HashMap::new();
let mut restored_bytes = 0;
for (page_size, page_num, relation_id) in ids {
let sb_page = slot_box.restore(page_num).expect("Unable to get page");
let slot_ids = sb_page.load(|buf| {
ps.read_page_buf(page_num, relation_id, buf)
.expect("Unable to read page")
});
// The allocator needs to know that this page is used.
slot_box.mark_page_used(relation_id, sb_page.available_content_bytes(), page_num);
let tuple_ids = slot_box
.clone()
.load_page(relation_id, page_num, |buf| {
ps.read_page_buf(page_num, relation_id, buf)
.expect("Unable to read page")
})
.expect("Unable to get page");

restored_slots
.entry(relation_id)
.or_insert_with(HashSet::new)
.insert((page_num, slot_ids));
.insert(tuple_ids);
restored_bytes += page_size;
}

// Now iterate all the slots we restored, and re-establish their indexes in the relations they belong to.
let mut restored_count = 0;
for (relation_id, pages) in restored_slots {
for (page_num, slot_ids) in pages {
let relation = &mut relations[relation_id.0];
for slot_id in slot_ids {
let tuple_id: TupleId = (page_num, slot_id);
for (relation_id, relation_tuple_ids) in restored_slots {
for page_tuple_ids in relation_tuple_ids {
for tuple_id in page_tuple_ids {
let relation = &mut relations[relation_id.0];
relation.index_tuple(tuple_id);
restored_count += 1;
}
Expand Down Expand Up @@ -182,7 +181,8 @@ impl ColdStorage {
}
}
},
// When the eventfd is triggered by the page store, we need to ask it to process completions.
// When the eventfd is triggered by the page store, we need to ask it to process
// completions.
_ = event_fd.read(&mut buf) => {
let _ = ps.lock().unwrap().process_completions();
}
Expand Down Expand Up @@ -244,7 +244,10 @@ impl ColdStorage {
for t in r.tuples() {
match t {
TxTuple::Insert(_) | TxTuple::Update(_) | TxTuple::Tombstone { .. } => {
let (page_id, _slot_id) = t.tuple_id();
let TupleId {
page: page_id,
slot: _slot_id,
} = t.tuple_id();
dirty_pages.insert((page_id, r.id));
}
TxTuple::Value(_) => {
Expand Down Expand Up @@ -290,7 +293,6 @@ struct WalManager {
// TODO: having a lock on the cold storage should not be necessary, but it is not !Sync, despite
// it supposedly being thread safe.
page_storage: Arc<Mutex<PageStore>>,
slot_box: Arc<SlotBox>,
}

impl Debug for WalManager {
Expand Down Expand Up @@ -351,12 +353,6 @@ impl LogManager for WalManager {
return Ok(());
};

for tuple_id in evicted {
if let Err(e) = self.slot_box.remove(tuple_id) {
warn!(?tuple_id, e = ?e, "Failed to evict page from slot box");
}
}

Ok(())
}
}
Expand Down Expand Up @@ -491,7 +487,10 @@ impl WalManager {
pid as PageId,
relation_id,
));
to_evict.push((pid as PageId, slot_id as SlotId));
to_evict.push(TupleId {
page: pid as PageId,
slot: slot_id as SlotId,
});
}
}
}
Expand Down
23 changes: 23 additions & 0 deletions crates/db/src/tuplebox/pool/buffer_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,29 @@ impl BufferPool {
Ok((AtomicPtr::new(addr), sc.block_size))
}

/// Find the buffer id (bid) for a given pointer. Can be used to identify the page
/// that a pointer belongs to, in case of page fault.
#[allow(dead_code)] // Legitimate potential future use
pub fn identify_page<T>(&self, ptr: AtomicPtr<T>) -> Result<Bid, PagerError> {
// Look at the address ranges for each size class to find the one that contains the pointer.
for (sc_idx, sc) in self.size_classes.iter().enumerate() {
let base = sc.base_addr.load(Ordering::SeqCst);
let base = base as usize;
let end = base + sc.virt_size;
let ptr = ptr.load(Ordering::SeqCst) as usize;
if ptr >= base && ptr < end {
// Found the size class that contains the pointer. Now we need to find the offset
// within the size class.
let offset = ptr - base;
let offset = offset / sc.block_size;
let offset = offset * sc.block_size;
let bid = Self::newbid(offset, sc_idx as u8);
return Ok(bid);
}
}
Err(PagerError::InvalidTuplePointer)
}

/// Get the total reserved capacity of the buffer pool.
#[allow(dead_code)] // Legitimate potential future use
pub fn capacity_bytes(&self) -> usize {
Expand Down
3 changes: 3 additions & 0 deletions crates/db/src/tuplebox/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,7 @@ pub enum PagerError {

#[error("Invalid page access")]
CouldNotAccess,

#[error("Invalid tuple pointer")]
InvalidTuplePointer,
}
11 changes: 4 additions & 7 deletions crates/db/src/tuplebox/tb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,11 +203,10 @@ impl TupleBox {
// here.
let Some(cv) = canon_tuple else {
match &tuple {
TxTuple::Insert(tref) => {
let t = tref.get();
TxTuple::Insert(t) => {
t.update_timestamp(relation_id, self.slotbox.clone(), tx_ts);
let forked_relation = commitset.fork(relation_id, canonical);
forked_relation.upsert_tuple(tref.clone());
forked_relation.upsert_tuple(t.clone());
continue;
}
TxTuple::Tombstone { .. } => {
Expand All @@ -231,7 +230,6 @@ impl TupleBox {
// Check the timestamp on the value, if it's newer than the read-timestamp,
// we have for this tuple then that's a conflict, because it means someone else has
// already committed a change to this tuple.
let cv = cv.get();
if cv.ts() > tuple.ts() {
return Err(CommitError::TupleVersionConflict);
}
Expand All @@ -240,11 +238,10 @@ impl TupleBox {
// branching of the old one.
let forked_relation = commitset.fork(relation_id, canonical);
match &tuple {
TxTuple::Insert(tref) | TxTuple::Update(tref) => {
let t = tref.get();
TxTuple::Insert(t) | TxTuple::Update(t) => {
t.update_timestamp(relation_id, self.slotbox.clone(), tx_ts);
let forked_relation = commitset.fork(relation_id, canonical);
forked_relation.upsert_tuple(tref.clone());
forked_relation.upsert_tuple(t.clone());
}
TxTuple::Value(..) => {}
TxTuple::Tombstone {
Expand Down
13 changes: 9 additions & 4 deletions crates/db/src/tuplebox/tuples/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,23 @@
// this program. If not, see <https://www.gnu.org/licenses/>.
//

pub use slotbox::{PageId, SlotBox, SlotBoxError, SlotId, TupleId};
pub use slotbox::{PageId, SlotBox, SlotBoxError, SlotId};
use thiserror::Error;
pub use tuple::Tuple;
pub use tuple_ref::TupleRef;
pub use tuple::TupleRef;
pub use tx_tuple::TxTuple;

mod slot_ptr;
mod slotbox;
mod slotted_page;
mod tuple;
mod tuple_ref;
mod tx_tuple;

#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
pub struct TupleId {
pub page: PageId,
pub slot: SlotId,
}

#[derive(Debug, Clone, Eq, PartialEq, Error)]
pub enum TupleError {
#[error("Tuple not found")]
Expand Down
Loading

0 comments on commit 53cf13f

Please sign in to comment.