Skip to content

Commit

Permalink
A little cleanup.
Browse files Browse the repository at this point in the history
  • Loading branch information
rdaum committed Jan 6, 2024
1 parent 043e575 commit ed7de2d
Show file tree
Hide file tree
Showing 9 changed files with 37 additions and 57 deletions.
2 changes: 0 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ binary-layout = "3.2.0"
bytes = "1.5.0"
chrono = "0.4.31"
criterion = { version = "0.5.1", features = ["async_tokio"] }
crossbeam-channel = "0.5.10"
decorum = "0.3.1" # For ordering & comparing our floats
enum-primitive-derive = "0.3.0"
fast-counter = "1.0.0"
Expand Down
1 change: 0 additions & 1 deletion crates/db/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ metrics.workspace = true
metrics-macros.workspace = true

# For the DB layer.
crossbeam-channel.workspace = true
bincode.workspace = true
im.workspace = true
sized-chunks.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion crates/db/src/tuplebox/coldstorage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ impl ColdStorage {
Some(*r),
0,
ts,
page.page_size,
page.page_size as usize,
|buf| page.save_into(buf),
);
write_batch.push((*page_id, Some(wal_entry_buffer)));
Expand Down
11 changes: 0 additions & 11 deletions crates/db/src/tuplebox/tb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
// TODO: support sorted indices, too.
// TODO: 'join' and transitive closure -> datalog-style variable unification

use std::collections::HashSet;
use std::path::PathBuf;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
Expand Down Expand Up @@ -55,9 +54,6 @@ pub struct TupleBox {
// TODO: take a look at Adnan's thread-sharded approach described in section 3.1
// (https://www.vldb.org/pvldb/vol16/p1426-alhomssi.pdf) -- "Ordered Snapshot Instant Commit"
maximum_transaction: AtomicU64,
/// The set of currently active transactions, which will be used to prune old unreferenced
/// versions of tuples.
active_transactions: RwLock<HashSet<u64>>,
/// Monotonically incrementing sequence counters.
sequences: Vec<AtomicU64>,
/// The copy-on-write set of current canonical base relations.
Expand Down Expand Up @@ -109,7 +105,6 @@ impl TupleBox {
Arc::new(Self {
relation_info: relations.to_vec(),
maximum_transaction: AtomicU64::new(0),
active_transactions: RwLock::new(HashSet::new()),
canonical: RwLock::new(base_relations),
sequences,
backing_store,
Expand Down Expand Up @@ -285,16 +280,10 @@ impl TupleBox {
// And update the timestamp on the canonical relation.
canonical[idx].ts = commit_ts;
}
// Clear out the active transaction.
self.active_transactions.write().await.remove(&commit_ts);

Ok(())
}

pub(crate) async fn abort_transaction(&self, ts: u64) {
self.active_transactions.write().await.remove(&ts);
}

pub async fn sync(&self, ts: u64, world_state: WorkingSet) {
if let Some(bs) = &self.backing_store {
let seqs = self
Expand Down
6 changes: 3 additions & 3 deletions crates/db/src/tuplebox/tuples/slot_ptr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::tuplebox::tuples::{SlotBox, TupleId};
pub struct SlotPtr {
sb: Arc<SlotBox>,
id: TupleId,
buflen: usize,
buflen: u32,
bufaddr: *mut u8,

_pin: std::marker::PhantomPinned,
Expand All @@ -46,7 +46,7 @@ impl SlotPtr {
sb: sb.clone(),
id: tuple_id,
bufaddr,
buflen,
buflen: buflen as u32,
_pin: std::marker::PhantomPinned,
}
}
Expand Down Expand Up @@ -85,7 +85,7 @@ impl SlotPtr {
#[inline]
fn buffer(&self) -> &[u8] {
let buf_addr = self.as_ptr();
unsafe { std::slice::from_raw_parts(buf_addr, self.buflen) }
unsafe { std::slice::from_raw_parts(buf_addr, self.buflen as usize) }
}

#[inline]
Expand Down
65 changes: 33 additions & 32 deletions crates/db/src/tuplebox/tuples/slotted_page.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use tracing::error;

use crate::tuplebox::tuples::slotbox::SlotBoxError;

pub type SlotId = usize;
pub type SlotId = u32;

// Note that if a page is empty, either because it's new, or because all its slots have been
// removed, then used_bytes is 0.
Expand Down Expand Up @@ -173,7 +173,7 @@ impl SlotIndexEntry {
/// is the same as the representation in-memory.
pub struct SlottedPage<'a> {
pub(crate) base_address: *mut u8,
pub(crate) page_size: usize,
pub(crate) page_size: u32,

_marker: std::marker::PhantomData<&'a u8>,
}
Expand All @@ -195,7 +195,7 @@ impl<'a> SlottedPage<'a> {
pub fn for_page(base_address: *mut u8, page_size: usize) -> Self {
Self {
base_address,
page_size,
page_size: page_size as u32,
_marker: Default::default(),
}
}
Expand All @@ -207,7 +207,7 @@ impl<'a> SlottedPage<'a> {
let used = (header.num_slots * std::mem::size_of::<SlotIndexEntry>() as u32) as usize
+ header.used_bytes as usize
+ std::mem::size_of::<SlottedPageHeader>();
self.page_size - used
self.page_size as usize - used
}

/// How many bytes are available for appending to this page (i.e. not counting the space
Expand All @@ -218,7 +218,7 @@ impl<'a> SlottedPage<'a> {
let index_length = header.index_length as usize;
let header_size = std::mem::size_of::<SlottedPageHeader>();

self.page_size - (index_length + content_length + header_size)
self.page_size as usize - (index_length + content_length + header_size)
}

/// Add the slot into the page, copying it into the memory region, and returning the slot id
Expand All @@ -235,8 +235,9 @@ impl<'a> SlottedPage<'a> {
}
if let Some(fit_slot) = fit_slot {
let content_position = self.offset_of(fit_slot).unwrap().0;
let memory_as_slice =
unsafe { std::slice::from_raw_parts_mut(self.base_address, self.page_size) };
let memory_as_slice = unsafe {
std::slice::from_raw_parts_mut(self.base_address, self.page_size as usize)
};

// If there's an initial value provided, copy it in.
if let Some(initial_value) = initial_value {
Expand Down Expand Up @@ -270,9 +271,9 @@ impl<'a> SlottedPage<'a> {
// Add the slot to the content region. The start offset is PAGE_SIZE - content_length -
// slot_length. So first thing, copy the bytes into the content region at that position.
let content_length = self.header_mut().content_length as usize;
let content_position = self.page_size - content_length - size;
let content_position = self.page_size as usize - content_length - size;
let memory_as_slice =
unsafe { std::slice::from_raw_parts_mut(self.base_address, self.page_size) };
unsafe { std::slice::from_raw_parts_mut(self.base_address, self.page_size as usize) };

// If there's an initial value provided, copy it in.
if let Some(initial_value) = initial_value {
Expand All @@ -282,8 +283,8 @@ impl<'a> SlottedPage<'a> {
}

// Add the index entry and expand the index region.
let mut index_entry = self.get_index_entry_mut(self.header_mut().num_slots as usize);
index_entry.as_mut().alloc(content_position, size);
let mut index_entry = self.get_index_entry_mut(self.header_mut().num_slots as SlotId);
index_entry.as_mut().alloc(content_position as usize, size);

// Update the header
let header = self.header_mut();
Expand All @@ -307,7 +308,7 @@ impl<'a> SlottedPage<'a> {
let memory_as_slice = unsafe {
Pin::new_unchecked(std::slice::from_raw_parts_mut(
self.base_address,
self.page_size,
self.page_size as usize,
))
};
lf(memory_as_slice);
Expand All @@ -321,7 +322,7 @@ impl<'a> SlottedPage<'a> {
let mut slots = vec![];
let num_slots = header.num_slots;
for i in 0..num_slots {
let mut index_entry = self.get_index_entry_mut(i as usize);
let mut index_entry = self.get_index_entry_mut(i as SlotId);
if index_entry.used {
unsafe { index_entry.as_mut().get_unchecked_mut() }.refcount = 1;
let slot_id = i as SlotId;
Expand All @@ -336,7 +337,7 @@ impl<'a> SlottedPage<'a> {
pub fn save_into(&self, buf: &mut [u8]) {
let _ = self.read_lock();
let memory_as_slice =
unsafe { std::slice::from_raw_parts_mut(self.base_address, self.page_size) };
unsafe { std::slice::from_raw_parts_mut(self.base_address, self.page_size as usize) };
buf.copy_from_slice(memory_as_slice);
}

Expand Down Expand Up @@ -380,52 +381,52 @@ impl<'a> SlottedPage<'a> {

fn get_slot(&self, slot_id: SlotId) -> Result<Pin<&'a [u8]>, SlotBoxError> {
// Check that the index is in bounds
let num_slots = self.header().num_slots as usize;
let num_slots = self.header().num_slots as SlotId;
if slot_id >= num_slots {
error!(
"slot_id {} is out of bounds for page with {} slots",
slot_id, num_slots
);
return Err(SlotBoxError::TupleNotFound(slot_id));
return Err(SlotBoxError::TupleNotFound(slot_id as usize));
}

// Read the index entry;
let index_entry = self.get_index_entry(slot_id);
if !index_entry.used {
error!("slot_id {} is not used, invalid tuple", slot_id);
return Err(SlotBoxError::TupleNotFound(slot_id));
return Err(SlotBoxError::TupleNotFound(slot_id as usize));
}
let offset = index_entry.offset as usize;
let length = index_entry.used_bytes as usize;

let memory_as_slice =
unsafe { std::slice::from_raw_parts(self.base_address, self.page_size) };
unsafe { std::slice::from_raw_parts(self.base_address, self.page_size as usize) };
Ok(unsafe { Pin::new_unchecked(&memory_as_slice[offset..offset + length]) })
}

fn get_slot_mut(&self, slot_id: SlotId) -> Result<Pin<&'a mut [u8]>, SlotBoxError> {
// Check that the index is in bounds
let num_slots = self.header().num_slots as usize;
let num_slots = self.header().num_slots as SlotId;
if slot_id >= num_slots {
return Err(SlotBoxError::TupleNotFound(slot_id));
return Err(SlotBoxError::TupleNotFound(slot_id as usize));
}

// Read the index entry;
let index_entry = self.get_index_entry(slot_id);
if !index_entry.used {
return Err(SlotBoxError::TupleNotFound(slot_id));
return Err(SlotBoxError::TupleNotFound(slot_id as usize));
}
let offset = index_entry.offset as usize;
let length = index_entry.used_bytes as usize;

assert!(
offset + length <= self.page_size,
offset + length <= self.page_size as usize,
"slot {} is out of bounds",
slot_id
);

let memory_as_slice =
unsafe { std::slice::from_raw_parts_mut(self.base_address, self.page_size) };
unsafe { std::slice::from_raw_parts_mut(self.base_address, self.page_size as usize) };
Ok(unsafe { Pin::new_unchecked(&mut memory_as_slice[offset..offset + length]) })
}
}
Expand Down Expand Up @@ -525,9 +526,9 @@ impl<'a> SlottedPage<'a> {
/// Return the offset, size of the slot at the given index.
fn offset_of(&self, tid: SlotId) -> Result<(usize, usize), SlotBoxError> {
// Check that the index is in bounds
let num_slots = self.header().num_slots as usize;
let num_slots = self.header().num_slots as SlotId;
if tid >= num_slots {
return Err(SlotBoxError::TupleNotFound(tid));
return Err(SlotBoxError::TupleNotFound(tid as usize));
}

// Read the index entry;
Expand All @@ -541,7 +542,7 @@ impl<'a> SlottedPage<'a> {
let header = self.header();
let num_slots = header.num_slots;
for i in 0..num_slots {
let index_entry = self.get_index_entry(i as usize);
let index_entry = self.get_index_entry(i as SlotId);
if index_entry.used {
continue;
}
Expand All @@ -554,7 +555,7 @@ impl<'a> SlottedPage<'a> {
// Sort
fits.sort_by(|a, b| a.1.cmp(&b.1));
if let Some((tid, _)) = fits.first() {
return (true, Some(*tid));
return (true, Some(*tid as SlotId));
}

let index_length = header.index_length as isize;
Expand All @@ -569,7 +570,7 @@ impl<'a> SlottedPage<'a> {

fn get_index_entry(&self, slot_id: SlotId) -> Pin<&SlotIndexEntry> {
let index_offset = std::mem::size_of::<SlottedPageHeader>()
+ (slot_id * std::mem::size_of::<SlotIndexEntry>());
+ ((slot_id as usize) * std::mem::size_of::<SlotIndexEntry>());

let base_address = self.base_address;

Expand All @@ -581,7 +582,7 @@ impl<'a> SlottedPage<'a> {

fn get_index_entry_mut(&self, slot_id: SlotId) -> Pin<&mut SlotIndexEntry> {
let index_offset = std::mem::size_of::<SlottedPageHeader>()
+ (slot_id * std::mem::size_of::<SlotIndexEntry>());
+ ((slot_id as usize) * std::mem::size_of::<SlotIndexEntry>());
let base_address = self.base_address;

unsafe {
Expand All @@ -593,7 +594,7 @@ impl<'a> SlottedPage<'a> {

pub struct PageWriteGuard<'a> {
base_address: *mut u8,
page_size: usize,
page_size: u32,

_marker: std::marker::PhantomData<&'a u8>,
}
Expand Down Expand Up @@ -657,7 +658,7 @@ impl<'a> Drop for PageWriteGuard<'a> {

pub struct PageReadGuard<'a> {
base_address: *const u8,
page_size: usize,
page_size: u32,

_marker: std::marker::PhantomData<&'a u8>,
}
Expand All @@ -673,7 +674,7 @@ impl<'a> PageReadGuard<'a> {
pub fn get_slot(&self, slot_id: SlotId) -> Result<Pin<&'a [u8]>, SlotBoxError> {
let sp = SlottedPage {
base_address: self.base_address as _,
page_size: self.page_size,
page_size: self.page_size as _,
_marker: Default::default(),
};
sp.get_slot(slot_id)
Expand Down
5 changes: 0 additions & 5 deletions crates/db/src/tuplebox/tx/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ use crate::tuplebox::RelationId;

/// A versioned transaction, which is a fork of the current canonical base relations.
pub struct Transaction {
/// The timestamp of this transaction, as granted to us by the tuplebox.
pub(crate) ts: u64,
/// Where we came from, for referencing back to the base relations.
db: Arc<TupleBox>,
/// The "working set" is the set of retrieved and/or modified tuples from base relations, known
Expand Down Expand Up @@ -64,7 +62,6 @@ impl Transaction {
let next_transient_relation_id = RelationId::transient(db.relation_info().len());

Self {
ts,
db,
working_set: RwLock::new(Some(ws)),
transient_relations: RwLock::new(HashMap::new()),
Expand Down Expand Up @@ -116,8 +113,6 @@ impl Transaction {

pub async fn rollback(&self) -> Result<(), CommitError> {
self.working_set.write().await.as_mut().unwrap().clear();
// Clear out the active transaction.
self.db.abort_transaction(self.ts).await;
Ok(())
}

Expand Down
1 change: 0 additions & 1 deletion crates/kernel/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,5 +61,4 @@ metrics-util.workspace = true
metrics-macros.workspace = true

# For the DB layer.
crossbeam-channel.workspace = true
bincode.workspace = true

0 comments on commit ed7de2d

Please sign in to comment.