diff --git a/.licensure.yml b/.licensure.yml index 45aaace5..e2d6c6b1 100644 --- a/.licensure.yml +++ b/.licensure.yml @@ -30,4 +30,4 @@ excludes: - README.* - LICENSE.* - Dockerfile - - .*\.(md|rst|txt|yml|png|jpg|gif|db|pem|lock|json|toml) + - .*\.(md|rst|txt|yml|png|jpg|gif|db|pem|lock|json|toml|in|out) diff --git a/Cargo.lock b/Cargo.lock index 4808960d..2dc29c1b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1370,6 +1370,12 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" +[[package]] +name = "human_bytes" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91f255a4535024abf7640cb288260811fc14794f62b063652ed349f9a6c2348e" + [[package]] name = "hyper" version = "0.14.27" @@ -1879,8 +1885,10 @@ dependencies = [ "criterion", "crossbeam-channel", "hi_sparse_bitset", + "human_bytes", "im", "io-uring", + "lazy_static", "libc", "metrics 0.21.1", "metrics-macros", diff --git a/Cargo.toml b/Cargo.toml index c958ffea..30990237 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -61,6 +61,7 @@ criterion = { version = "0.5.1", features = ["async_tokio"] } crossbeam-channel = "0.5.10" decorum = "0.3.1" # For ordering & comparing our floats enum-primitive-derive = "0.3.0" +human_bytes = "0.4.3" inventory = "0.3.14" itertools = "0.12.0" lazy_static = "1.4.0" diff --git a/crates/daemon/src/connections_tb.rs b/crates/daemon/src/connections_tb.rs index d4e2ad64..8965fd1f 100644 --- a/crates/daemon/src/connections_tb.rs +++ b/crates/daemon/src/connections_tb.rs @@ -37,7 +37,6 @@ use rpc_common::RpcRequestError; use crate::connections::{ConnectionsDB, CONNECTION_TIMEOUT_DURATION}; const CONNECTIONS_DB_MEM_SIZE: usize = 1 << 26; -const CONNECTIONS_DB_PAGE_SIZE: usize = 32768; pub struct ConnectionsTb { tb: Arc, } @@ -56,14 +55,7 @@ impl ConnectionsTb { .collect(); relations[ConnectionRelation::ClientConnection as usize].secondary_indexed = true; - let tb = TupleBox::new( - CONNECTIONS_DB_MEM_SIZE, - CONNECTIONS_DB_PAGE_SIZE, - path, - &relations, - 1, - ) - .await; + let tb = TupleBox::new(CONNECTIONS_DB_MEM_SIZE, path, &relations, 1).await; Self { tb } } } diff --git a/crates/db/Cargo.toml b/crates/db/Cargo.toml index fbfd4351..8a6a0ce0 100644 --- a/crates/db/Cargo.toml +++ b/crates/db/Cargo.toml @@ -24,6 +24,8 @@ moor-values = { path = "../values" } async-trait.workspace = true strum.workspace = true uuid.workspace = true +lazy_static.workspace = true +human_bytes.workspace = true ## Error declaration/ handling thiserror.workspace = true diff --git a/crates/db/benches/tb_single_thread.rs b/crates/db/benches/tb_single_thread.rs index 6d65b122..2989406b 100644 --- a/crates/db/benches/tb_single_thread.rs +++ b/crates/db/benches/tb_single_thread.rs @@ -39,7 +39,7 @@ async fn test_db() -> Arc { }) .collect::>(); - TupleBox::new(1 << 24, 4096, None, &relations, 0).await + TupleBox::new(1 << 24, None, &relations, 0).await } fn from_val(value: i64) -> SliceRef { diff --git a/crates/db/src/object_relations.rs b/crates/db/src/object_relations.rs index 35840940..4358c265 100644 --- a/crates/db/src/object_relations.rs +++ b/crates/db/src/object_relations.rs @@ -263,8 +263,7 @@ mod tests { relations[ObjectParent as usize].secondary_indexed = true; relations[WorldStateRelation::ObjectLocation as usize].secondary_indexed = true; - - TupleBox::new(1 << 24, 32768, None, &relations, WorldStateSequences::COUNT).await + TupleBox::new(1 << 24, None, &relations, WorldStateSequences::COUNT).await } /// Test simple relations mapping oid->oid (with secondary index), independent of all other diff --git a/crates/db/src/tb_worldstate.rs b/crates/db/src/tb_worldstate.rs index 49cfbb2d..287c741c 100644 --- a/crates/db/src/tb_worldstate.rs +++ b/crates/db/src/tb_worldstate.rs @@ -47,10 +47,6 @@ use crate::tuplebox::tb::{RelationInfo, TupleBox}; use crate::tuplebox::{CommitError, Transaction}; use crate::{object_relations, tuplebox, Database}; -// TODO: Totally arbitrary and needs profiling. Needs to be big enough to hold entire props and -// verbs. -const PAGE_SIZE: usize = 65536; - /// An implementation of `WorldState` / `WorldStateSource` that uses the TupleBox as its backing pub struct TupleBoxWorldStateSource { db: Arc, @@ -73,14 +69,7 @@ impl TupleBoxWorldStateSource { relations[WorldStateRelation::ObjectParent as usize].secondary_indexed = true; // Same with "contents". relations[WorldStateRelation::ObjectLocation as usize].secondary_indexed = true; - let db = TupleBox::new( - memory_size, - PAGE_SIZE, - path, - &relations, - WorldStateSequences::COUNT, - ) - .await; + let db = TupleBox::new(memory_size, path, &relations, WorldStateSequences::COUNT).await; // Check the db for sys (#0) object to see if this is a fresh DB or not. let fresh_db = { @@ -1140,7 +1129,7 @@ mod tests { relations[WorldStateRelation::ObjectParent as usize].secondary_indexed = true; relations[WorldStateRelation::ObjectLocation as usize].secondary_indexed = true; - TupleBox::new(1 << 24, 4096, None, &relations, WorldStateSequences::COUNT).await + TupleBox::new(1 << 24, None, &relations, WorldStateSequences::COUNT).await } #[tokio::test] diff --git a/crates/db/src/tuplebox/coldstorage.rs b/crates/db/src/tuplebox/coldstorage.rs index b943c113..a6235703 100644 --- a/crates/db/src/tuplebox/coldstorage.rs +++ b/crates/db/src/tuplebox/coldstorage.rs @@ -77,11 +77,11 @@ impl ColdStorage { }; // Grab page storage and wait for all the writes to complete. - let mut cs = page_storage.lock().unwrap(); - cs.wait_complete(); + let mut ps = page_storage.lock().unwrap(); + ps.wait_complete(); // Get the sequence page, and load the sequences from it, if any. - if let Ok(Some(sequence_page)) = cs.read_sequence_page() { + if let Ok(Some(sequence_page)) = ps.read_sequence_page() { let sequence_page = sequence_page::View::new(&sequence_page[..]); let num_sequences = sequence_page.num_sequences().read(); assert_eq!(num_sequences, sequences.len() as u64, @@ -98,13 +98,13 @@ impl ColdStorage { } // Recover all the pages from cold storage and re-index all the tuples in them. - let ids = cs.list_pages(); + let ids = ps.list_pages(); let mut restored_slots = HashMap::new(); let mut restored_bytes = 0; for (page_size, page_num, relation_id) in ids { - let sb_page = slot_box.page_for(page_num); + let sb_page = slot_box.restore(page_num).expect("Unable to get page"); let slot_ids = sb_page.load(|buf| { - cs.read_page_buf(page_num, relation_id, buf) + ps.read_page_buf(page_num, relation_id, buf) .expect("Unable to read page") }); // The allocator needs to know that this page is used. @@ -217,13 +217,11 @@ impl ColdStorage { // For syncing pages, we don't need to sync each individual tuple, we we just find the set of dirty pages // and sync them. // The pages that are modified will be need be read-locked while they are copied. - let mut dirty_tuple_count = 0; let mut dirty_pages = HashSet::new(); for r in &ws.relations { for t in r.tuples() { match t { TxTuple::Insert(_) | TxTuple::Update(_) | TxTuple::Tombstone { .. } => { - dirty_tuple_count += 1; let (page_id, _slot_id) = t.tuple_id(); dirty_pages.insert((page_id, r.id)); } @@ -234,12 +232,12 @@ impl ColdStorage { } } - let mut total_synced_tuples = 0; - for (page_id, r) in &dirty_pages { - // Get the page for this tuple. - let page = slot_box.page_for(*page_id); - total_synced_tuples += page.num_active_slots(); + // Get the slotboxy page for this tuple. + let Ok(page) = slot_box.page_for(*page_id) else { + // If the slot or page is already gone, ce la vie, we don't need to sync it. + continue; + }; // Copy the page into the WAL entry directly. let wal_entry_buffer = make_wal_entry( @@ -254,21 +252,6 @@ impl ColdStorage { write_batch.push((*page_id, Some(wal_entry_buffer))); } - let mut total_tuples = 0; - for p in slot_box.used_pages() { - let page = slot_box.page_for(p); - total_tuples += page.num_active_slots(); - } - - debug!( - dirty_tuple_count, - dirt_pages = dirty_pages.len(), - num_relations = ws.relations.len(), - total_synced_tuples, - total_tuples, - "Syncing dirty pages to WAL" - ); - let mut sync_wal = wal.begin_entry().expect("Failed to begin WAL entry"); for (_page_id, wal_entry_buf) in write_batch { if let Some(wal_entry_buf) = wal_entry_buf { @@ -305,8 +288,9 @@ impl LogManager for WalManager { for chunk in chunks { Self::chunk_to_mutations(&chunk, &mut write_batch, &mut evicted); } - let mut cs = self.page_storage.lock().unwrap(); - cs.write_batch(write_batch).expect("Unable to write batch"); + let mut ps = self.page_storage.lock().unwrap(); + ps.write_batch(write_batch).expect("Unable to write batch"); + Ok(()) } @@ -336,11 +320,11 @@ impl LogManager for WalManager { } } - let Ok(mut cs) = self.page_storage.lock() else { + let Ok(mut ps) = self.page_storage.lock() else { error!("Unable to lock cold storage"); return Ok(()); }; - if let Err(e) = cs.write_batch(write_batch) { + if let Err(e) = ps.write_batch(write_batch) { error!("Unable to write batch: {:?}", e); return Ok(()); }; diff --git a/crates/db/src/tuplebox/mod.rs b/crates/db/src/tuplebox/mod.rs index 69528a62..8c7a4556 100644 --- a/crates/db/src/tuplebox/mod.rs +++ b/crates/db/src/tuplebox/mod.rs @@ -27,6 +27,7 @@ mod base_relation; mod coldstorage; mod page_storage; +mod pool; mod slots; pub mod tb; mod tuples; diff --git a/crates/db/src/tuplebox/page_storage.rs b/crates/db/src/tuplebox/page_storage.rs index 364af5e1..1b3c881c 100644 --- a/crates/db/src/tuplebox/page_storage.rs +++ b/crates/db/src/tuplebox/page_storage.rs @@ -12,6 +12,8 @@ // this program. If not, see . // +// TODO: there's no way this is "robust" enough to be used in production + use crate::tuplebox::slots::PageId; use crate::tuplebox::RelationId; use im::{HashMap, HashSet}; diff --git a/crates/db/src/tuplebox/pool/buffer_pool.rs b/crates/db/src/tuplebox/pool/buffer_pool.rs new file mode 100644 index 00000000..af4b1bb3 --- /dev/null +++ b/crates/db/src/tuplebox/pool/buffer_pool.rs @@ -0,0 +1,352 @@ +// Copyright (C) 2024 Ryan Daum +// +// This program is free software: you can redistribute it and/or modify it under +// the terms of the GNU General Public License as published by the Free Software +// Foundation, version 3. +// +// This program is distributed in the hope that it will be useful, but WITHOUT +// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +// FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License along with +// this program. If not, see . +// + +//! Vaguely `Umbra` inspired bufferpool. +//! +//! Buffers are allocated via anonymous memory mapping, and `MADV_DONTNEED` to page them out, using +//! a variant of VM overcommit to allow for more allocations than physical memory while keeping +//! consistent memory addresses to avoid a giant page table. +//! +//! As in Umbra, the "same" physical memory is allocated in multiple memory mapped regions, to permit +//! multiple page sizes. By this we mean, multiple size classes are allocated in multiple mmap pools +//! and as long as the sum of all *used* pages remains lower than physical memory, we can allocate +//! freely without worrying about complicated page splitting strategies. +//! +//! For now each sice class is using a simple bitmap index to manage allocation. This is not really +//! the most ideal structure (O(N) lookup, but O(1) free), but it's simplest to implement for now. +//! Some simple optimizations can be done to add a per-size-class free list, and later add more +//! sophisticated allocation strategies can be used. + +use std::cmp::max; +use std::sync::atomic::{AtomicPtr, AtomicUsize, Ordering}; + +use crate::tuplebox::pool::size_class::SizeClass; +use crate::tuplebox::pool::{Bid, PagerError}; + +// 32k -> 1MB page sizes supported. +// TODO: If we end up with values bigger than 1MB, they should probably be handled by "external" pages, +// that is, pages that are not part of the buffer pool, but are instead read directly from file +// references as needed, because they are likely to just thrash the crap out of the buffer pool +// and are probably just big binary blogs like images, etc. +pub const LOWEST_SIZE_CLASS_POWER_OF: usize = 12; +pub const HIGHEST_SIZE_CLASS_POWER_OF: usize = 20; + +pub struct BufferPool { + // Statistics. + pub capacity_bytes: AtomicUsize, + pub allocated_bytes: AtomicUsize, + pub available_bytes: AtomicUsize, + pub size_classes: [SizeClass; HIGHEST_SIZE_CLASS_POWER_OF - LOWEST_SIZE_CLASS_POWER_OF + 1], +} + +// TODO: shrink/grow +// TODO: free list + +impl BufferPool { + pub fn new(capacity: usize) -> Result { + let region_4k = SizeClass::new_anon(1 << 12, capacity)?; + let region_8k = SizeClass::new_anon(1 << 13, capacity)?; + let region_16k = SizeClass::new_anon(1 << 14, capacity)?; + let region_32k = SizeClass::new_anon(1 << 15, capacity)?; + let region_64k = SizeClass::new_anon(1 << 16, capacity)?; + let region_128k = SizeClass::new_anon(1 << 17, capacity)?; + let region_256k = SizeClass::new_anon(1 << 18, capacity)?; + let region_512k = SizeClass::new_anon(1 << 19, capacity)?; + let region_1024k = SizeClass::new_anon(1 << 20, capacity)?; + + let size_classes = [ + region_4k, + region_8k, + region_16k, + region_32k, + region_64k, + region_128k, + region_256k, + region_512k, + region_1024k, + ]; + Ok(Self { + capacity_bytes: AtomicUsize::new(capacity), + allocated_bytes: AtomicUsize::new(0), + available_bytes: AtomicUsize::new(capacity), + + size_classes, + }) + } + + pub fn newbid(offset: usize, size_class: u8) -> Bid { + // Verify that the offset is aligned such that we can encode the size class in the lower 4 + // bits. + assert_eq!(offset & 0b11, 0); + + // Verify that the size class fits in 4 bits. + assert!(size_class < 16); + + // Size class gets encoded into the lower 4 bits. + let bid = offset as u64 | u64::from(size_class); + + Bid(bid) + } + + pub fn offset_of(bid: Bid) -> usize { + // Offset is our value with the lower 4 bits masked out. + bid.0 as usize & !0b1111 + } + + pub fn size_class_of(bid: Bid) -> u8 { + // Size class is the lower 4 bits. + (bid.0 & 0b1111) as u8 + } + + // Legitimate potential future use + #[allow(dead_code)] + pub fn page_size_of(bid: Bid) -> usize { + 1 << ((Self::size_class_of(bid) as usize) + LOWEST_SIZE_CLASS_POWER_OF) + } + + #[allow(dead_code)] + pub fn page_size_for_size_class(size_class: u8) -> usize { + 1 << ((size_class as usize) + LOWEST_SIZE_CLASS_POWER_OF) + } +} + +impl BufferPool { + /// Allocate a buffer of the given size. + pub fn alloc(&self, size: usize) -> Result<(Bid, AtomicPtr, usize), PagerError> { + if size > self.available_bytes.load(Ordering::SeqCst) { + return Err(PagerError::InsufficientRoom { + desired: size, + available: self.allocated_bytes.load(Ordering::SeqCst), + }); + } + + let np2 = (64 - (size - 1).leading_zeros()) as isize; + let sc_idx = np2 - (LOWEST_SIZE_CLASS_POWER_OF as isize); + let sc_idx = max(sc_idx, 0) as usize; + if sc_idx >= self.size_classes.len() { + return Err(PagerError::UnsupportedSize(1 << np2)); + } + + let nearest_class = &self.size_classes[sc_idx]; + let block_size = nearest_class.block_size; + + // Ask the size class for its offset for allocation. + let offset = nearest_class.alloc()? * block_size; + + // Bookkeeping + self.allocated_bytes.fetch_add(block_size, Ordering::SeqCst); + self.available_bytes.fetch_sub(block_size, Ordering::SeqCst); + + // The bid is the offset into the buffer pool + the size class in the lower 4 bits. + let sc_idx = sc_idx as u8; + let bid = Self::newbid(offset, sc_idx); + + // Note that this is the actual address, that is, it does not have the size-class encoded + // in it (aka PagePointer) + let addr = self.resolve_ptr(bid).unwrap().0; + + Ok((bid, addr, block_size)) + } + + /// Free a buffer, completely deallocating it, by which we mean removing it from the index of + /// used pages. + pub fn free(&self, page: Bid) -> Result<(), PagerError> { + let sc = Self::size_class_of(page); + let sc = &self.size_classes[sc as usize]; + let block_size = sc.block_size; + let offset = Self::offset_of(page); + sc.free(offset / block_size)?; + + // Bookkeeping + self.allocated_bytes.fetch_sub(block_size, Ordering::SeqCst); + self.available_bytes.fetch_add(block_size, Ordering::SeqCst); + + Ok(()) + } + + /// Check if a given buffer handle is allocated. + pub fn is_allocated(&self, page: Bid) -> bool { + let sc_num = Self::size_class_of(page); + let sc = &self.size_classes[sc_num as usize]; + let block_size = sc.block_size; + let offset = Self::offset_of(page); + sc.is_allocated(offset / block_size) + } + + /// Mark a buffer as restored, by which we mean adding it to the index of used pages at this position. + /// This is used to restore a buffer from disk. + pub fn restore(&self, page: Bid) -> Result<(AtomicPtr, usize), PagerError> { + let sc_num = Self::size_class_of(page); + let sc = &self.size_classes[sc_num as usize]; + let block_size = sc.block_size; + let offset = Self::offset_of(page); + + sc.restore(offset / block_size)?; + + // Bookkeeping + self.allocated_bytes.fetch_add(block_size, Ordering::SeqCst); + self.available_bytes.fetch_sub(block_size, Ordering::SeqCst); + + let addr = sc.base_addr.load(Ordering::SeqCst); + let addr = unsafe { addr.add(offset) }.cast::(); + + Ok((AtomicPtr::new(addr), block_size)) + } + + /// Returns the physical pointer and page size for a page. + pub fn resolve_ptr(&self, bid: Bid) -> Result<(AtomicPtr, usize), PagerError> { + if !Self::is_allocated(self, bid) { + return Err(PagerError::CouldNotAccess); + } + + let sc_num = Self::size_class_of(bid); + let sc = &self.size_classes[sc_num as usize]; + let offset = Self::offset_of(bid); + + assert!(offset < sc.virt_size, "Offset out of bound for size class"); + + let addr = sc.base_addr.load(Ordering::SeqCst); + let addr = unsafe { addr.add(offset) }.cast::(); + + Ok((AtomicPtr::new(addr), sc.block_size)) + } + + /// Get the total reserved capacity of the buffer pool. + #[allow(dead_code)] // Legitimate potential future use + pub fn capacity_bytes(&self) -> usize { + self.capacity_bytes.load(Ordering::Relaxed) + } + /// Get the total used space in the buffer pool. + #[allow(dead_code)] // Legitimate potential future use + pub fn available_bytes(&self) -> usize { + self.available_bytes.load(Ordering::Relaxed) + } + /// Get the total usable free space in the buffer pool. + #[allow(dead_code)] // Legitimate potential future use + pub fn allocated_bytes(&self) -> usize { + self.allocated_bytes.load(Ordering::Relaxed) + } +} + +#[cfg(test)] +mod tests { + use crate::tuplebox::pool::buffer_pool::{BufferPool, HIGHEST_SIZE_CLASS_POWER_OF}; + use crate::tuplebox::pool::PagerError; + + const MB_256: usize = 1 << 28; + + #[test] + fn test_empty_pool() { + let capacity = MB_256; + let bp = BufferPool::new(capacity).unwrap(); + + assert_eq!(bp.capacity_bytes(), capacity); + assert_eq!(bp.available_bytes(), capacity); + assert_eq!(bp.allocated_bytes(), 0); + } + + #[test] + fn test_buffer_allocation_perfect() { + let capacity = MB_256; + let bp = BufferPool::new(capacity).unwrap(); + + // Allocate buffers that fit just inside powers of 2, so no fragmentation will occur due to + // rounding up nearest size. + let buffer_sizes = [1 << 12, 1 << 14, 1 << 16, 1 << 18]; + let mut bids = Vec::new(); + for &size in &buffer_sizes { + let bid = bp.alloc(size).unwrap().0; + bids.push(bid); + } + + // In this scenario, allocation will always match requested. So should be 0 fragmentation + // so no lost bytes. + let expected_allocated_bytes: usize = buffer_sizes.iter().sum(); + assert_eq!(bp.allocated_bytes(), expected_allocated_bytes); + assert_eq!(bp.available_bytes(), capacity - expected_allocated_bytes); + + // Free the buffers and check that they are released + for pid in bids { + bp.free(pid).unwrap(); + } + assert_eq!(bp.available_bytes(), capacity); + assert_eq!(bp.allocated_bytes(), 0); + } + + #[test] + fn test_buffer_allocation_fragmented() { + let capacity = MB_256; + let bp = BufferPool::new(capacity).unwrap(); + + // Allocate buffers that fit 10 bytes under some powers of 2, so we accumulate some + // fragmentation. + let fb = 10; + let buffer_sizes = [ + (1 << 12) - fb, + (1 << 14) - fb, + (1 << 16) - fb, + (1 << 18) - fb, + ]; + let mut pids = Vec::new(); + for &size in &buffer_sizes { + let pid = bp.alloc(size).unwrap().0; + pids.push(pid); + assert!(bp.is_allocated(pid)); + } + + let expected_lost_bytes: usize = fb * buffer_sizes.len(); + let expected_requested_bytes: usize = buffer_sizes.iter().sum(); + let expected_allocated_bytes: usize = expected_requested_bytes + expected_lost_bytes; + + assert_eq!(bp.allocated_bytes(), expected_allocated_bytes); + assert_eq!(bp.available_bytes(), capacity - expected_allocated_bytes); + + // Free the buffers and check that they are released + for pid in pids { + bp.free(pid).unwrap(); + assert!(!bp.is_allocated(pid)); + } + assert_eq!(bp.available_bytes(), capacity); + assert_eq!(bp.allocated_bytes(), 0); + } + + #[test] + fn test_error_conditions() { + let capacity = MB_256; + let bp = BufferPool::new(capacity).unwrap(); + + // Test capacity limit + let res = bp.alloc(capacity + 1); + assert!(matches!(res, Err(PagerError::InsufficientRoom { .. }))); + + // Test unsupported size class + let res = bp.alloc(1 << (HIGHEST_SIZE_CLASS_POWER_OF + 1)); + assert!(matches!(res, Err(PagerError::UnsupportedSize(_)))); + + // Test unable to allocate + let mut allocated = vec![]; + for _ in 0..bp.size_classes.len() { + let res = bp.alloc(capacity + 1); + match res { + Ok(bh) => { + allocated.push(bh); + } + Err(e) => { + assert!(matches!(e, PagerError::InsufficientRoom { .. })); + break; + } + } + } + } +} diff --git a/crates/db/src/tuplebox/pool/mod.rs b/crates/db/src/tuplebox/pool/mod.rs new file mode 100644 index 00000000..86f9a5b4 --- /dev/null +++ b/crates/db/src/tuplebox/pool/mod.rs @@ -0,0 +1,42 @@ +// Copyright (C) 2024 Ryan Daum +// +// This program is free software: you can redistribute it and/or modify it under +// the terms of the GNU General Public License as published by the Free Software +// Foundation, version 3. +// +// This program is distributed in the hope that it will be useful, but WITHOUT +// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +// FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License along with +// this program. If not, see . +// + +mod buffer_pool; +mod size_class; + +pub use buffer_pool::BufferPool; + +/// The unique identifier for currently extant buffers. Buffer ids can be ephemeral. +/// It is up to the pager to map these to non-ephemeral, long lived, page identifiers (Bid) through +/// whatever means makes sense. +#[derive(Clone, Copy, Eq, PartialEq, Debug)] +pub struct Bid(pub u64); + +#[derive(thiserror::Error, Debug)] +pub enum PagerError { + #[error("Error in setting up the page / buffer pool: {0}")] + InitializationError(String), + + #[error("Insufficient room in buffer pool (wanted {desired:?}, had {available:?})")] + InsufficientRoom { desired: usize, available: usize }, + + #[error("Unsupported size class (wanted {0:?})")] + UnsupportedSize(usize), + + #[error("Unable to allocate a buffer")] + CouldNotAllocate, + + #[error("Invalid page access")] + CouldNotAccess, +} diff --git a/crates/db/src/tuplebox/pool/size_class.rs b/crates/db/src/tuplebox/pool/size_class.rs new file mode 100644 index 00000000..31ccfc42 --- /dev/null +++ b/crates/db/src/tuplebox/pool/size_class.rs @@ -0,0 +1,247 @@ +// Copyright (C) 2024 Ryan Daum +// +// This program is free software: you can redistribute it and/or modify it under +// the terms of the GNU General Public License as published by the Free Software +// Foundation, version 3. +// +// This program is distributed in the hope that it will be useful, but WITHOUT +// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +// FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License along with +// this program. If not, see . +// + +use std::io; +use std::ptr::null_mut; +use std::sync::atomic::{AtomicPtr, AtomicUsize, Ordering}; +use std::sync::RwLock; + +use hi_sparse_bitset::BitSetInterface; +use human_bytes::human_bytes; +use libc::{madvise, MADV_DONTNEED, MAP_ANONYMOUS, MAP_PRIVATE, PROT_READ, PROT_WRITE}; +use tracing::info; + +use crate::tuplebox::pool::PagerError; + +type BitSet = hi_sparse_bitset::BitSet; + +pub struct SizeClass { + pub block_size: usize, + pub base_addr: AtomicPtr, + pub virt_size: usize, + + inner: RwLock, + + // stats + num_blocks_used: AtomicUsize, +} + +struct SCInner { + free_list: Vec, + allocset: BitSet, +} + +fn find_first_empty(bs: &BitSet) -> usize { + let mut iter = bs.iter(); + + let mut pos = None; + // Scan forward until we find the first empty bit. + loop { + match iter.next() { + Some(bit) => { + if bit != 0 && !bs.contains(bit - 1) { + return bit - 1; + } + pos = Some(bit); + } + // Nothing in the set, or we've reached the end. + None => { + let Some(pos) = pos else { + return 0; + }; + + return pos + 1; + } + } + } +} + +impl SizeClass { + pub fn new_anon(block_size: usize, virt_size: usize) -> Result { + let base_addr = unsafe { + libc::mmap64( + null_mut(), + virt_size, + PROT_READ | PROT_WRITE, + MAP_ANONYMOUS | MAP_PRIVATE, + -1, + 0, + ) + }; + + if base_addr == libc::MAP_FAILED { + let err = io::Error::last_os_error(); + return Err(PagerError::InitializationError(format!( + "Mmap failed for size class block_size: {block_size}, virt_size {virt_size}: {err}" + ))); + } + + info!( + "Mapped {:?} bytes at {:?} for size class {}", + human_bytes(virt_size as f64), + base_addr, + human_bytes(block_size as f64), + ); + + let base_addr = base_addr.cast::(); + + // Build the bitmap index + Ok(Self { + block_size, + base_addr: AtomicPtr::new(base_addr), + virt_size, + inner: RwLock::new(SCInner { + free_list: vec![], + allocset: BitSet::new(), + }), + num_blocks_used: Default::default(), + }) + } + + pub fn alloc(&self) -> Result { + // Check the free list first. + let mut inner = self.inner.write().unwrap(); + if let Some(blocknum) = inner.free_list.pop() { + inner.allocset.insert(blocknum); + self.num_blocks_used.fetch_add(1, Ordering::SeqCst); + return Ok(blocknum); + } + + let blocknum = find_first_empty(&inner.allocset); + + if blocknum >= self.virt_size / self.block_size { + return Err(PagerError::InsufficientRoom { + desired: self.block_size, + available: self.available(), + }); + } + + inner.allocset.insert(blocknum); + self.num_blocks_used.fetch_add(1, Ordering::SeqCst); + Ok(blocknum) + } + + pub fn restore(&self, blocknum: usize) -> Result<(), PagerError> { + // Assert + let mut inner = self.inner.write().unwrap(); + + // Assert that the block is not already allocated. + if inner.allocset.contains(blocknum) { + return Err(PagerError::CouldNotAllocate); + } + + inner.allocset.insert(blocknum); + self.num_blocks_used.fetch_add(1, Ordering::SeqCst); + Ok(()) + } + + pub fn free(&self, blocknum: usize) -> Result<(), PagerError> { + let mut inner = self.inner.write().unwrap(); + + unsafe { + let base_addr = self.base_addr.load(Ordering::SeqCst); + let addr = base_addr.offset(blocknum as isize * self.block_size as isize); + // Panic on fail here because this working is a fundamental invariant that we cannot + // recover from. + let madv_resp = madvise(addr.cast(), self.block_size, MADV_DONTNEED); + if madv_resp != 0 { + panic!( + "MADV_DONTNEED failed, errno: {}", + io::Error::last_os_error() + ); + } + } + inner.allocset.remove(blocknum); + inner.free_list.push(blocknum); + self.num_blocks_used.fetch_sub(1, Ordering::SeqCst); + Ok(()) + } + + #[allow(dead_code)] // Legitimate potential future use + pub fn page_out(&self, blocknum: usize) -> Result<(), PagerError> { + let mut inner = self.inner.write().unwrap(); + + unsafe { + let addr = self.base_addr.load(Ordering::SeqCst); + // Panic on fail here because this working is a fundamental invariant that we cannot + // recover from. + let madv_result = madvise( + addr.offset(blocknum as isize * self.block_size as isize) + .cast(), + self.block_size, + MADV_DONTNEED, + ); + if madv_result != 0 { + panic!( + "MADV_DONTNEED failed, errno: {}", + io::Error::last_os_error() + ); + } + } + inner.allocset.remove(blocknum); + self.num_blocks_used.fetch_sub(1, Ordering::SeqCst); + Ok(()) + } + + pub fn is_allocated(&self, blocknum: usize) -> bool { + let inner = self.inner.read().unwrap(); + inner.allocset.contains(blocknum) + } + + pub fn bytes_used(&self) -> usize { + self.num_blocks_used.load(Ordering::Relaxed) * self.block_size + } + + pub fn available(&self) -> usize { + self.virt_size - self.bytes_used() + } +} + +impl Drop for SizeClass { + fn drop(&mut self) { + let result = unsafe { + let base_addr = self.base_addr.load(Ordering::SeqCst); + libc::munmap( + base_addr.cast::(), + self.virt_size as libc::size_t, + ) + }; + + if result != 0 { + let err = io::Error::last_os_error(); + panic!("Unable to munmap buffer pool: {err}"); + } + } +} + +#[cfg(test)] +mod tests { + use crate::tuplebox::pool::size_class::{find_first_empty, BitSet}; + + #[test] + fn test_bitset_seek() { + let mut bs = BitSet::new(); + assert_eq!(find_first_empty(&bs), 0); + bs.insert(0); + assert_eq!(find_first_empty(&bs), 1); + bs.insert(1); + assert_eq!(find_first_empty(&bs), 2); + bs.remove(0); + assert_eq!(find_first_empty(&bs), 0); + bs.insert(1); + bs.insert(2); + bs.remove(1); + assert_eq!(find_first_empty(&bs), 1); + } +} diff --git a/crates/db/src/tuplebox/slots/slotbox.rs b/crates/db/src/tuplebox/slots/slotbox.rs index 994f6afe..a524fd35 100644 --- a/crates/db/src/tuplebox/slots/slotbox.rs +++ b/crates/db/src/tuplebox/slots/slotbox.rs @@ -12,7 +12,8 @@ // this program. If not, see . // -// TODO: use a more general purpose pager (e.g. my own umbra-like buffer mgr) +// TODO: implement a more general purpose pager that handles LRU eviction +// and so can be used for larger-than-ram datasets (means adding a pagetable for Pid->Bid) // TODO: store indexes in here, too (custom paged datastructure impl) // TODO: add fixed-size slotted page impl for Sized items, providing more efficiency. // TODO: verify locking/concurrency safety of this thing -- loom test + stateright, or jepsen. @@ -22,41 +23,30 @@ // to be a sporadic failure where we end up with a "Page not found" error in the allocator on // free, meaning the page was not found in the used pages list. -use std::io; +use std::cmp::max; use std::pin::Pin; -use std::ptr::null_mut; use std::sync::atomic::AtomicPtr; use std::sync::atomic::Ordering::SeqCst; use std::sync::Mutex; -use hi_sparse_bitset::BitSetInterface; -use libc::{MAP_ANONYMOUS, MAP_PRIVATE, PROT_READ, PROT_WRITE}; use sized_chunks::SparseChunk; use thiserror::Error; -use tracing::warn; +use tracing::error; +use crate::tuplebox::pool::{Bid, BufferPool, PagerError}; pub use crate::tuplebox::slots::slotted_page::SlotId; use crate::tuplebox::slots::slotted_page::{ - slot_index_overhead, slot_page_empty_size, PageWriteGuard, SlottedPage, + slot_index_overhead, slot_page_empty_size, SlottedPage, }; use crate::tuplebox::RelationId; pub type PageId = usize; pub type TupleId = (PageId, SlotId); -/// A region of memory backed by SlottedPages. -/// Is: -/// A region of anonymously mmap'd memory, a multiple of page sizes, and logically divided up -/// into SlottedPages -/// An index/list of the pages that have free space. -/// Each slot accessing by a unique id which is a combination of its page index and slot index. -/// When a page is totally empty, madvise DONTNEED is called on it, so that the OS can free it -/// from process RSS. +/// A SlotBox is a collection of (variable sized) pages, each of which is a collection of slots, each of which is holds +/// dynamically sized tuples. pub struct SlotBox { - /// The base address of the mmap'd region. - base_address: AtomicPtr, - page_size: usize, - allocator: Mutex, + inner: Mutex, } #[derive(Debug, Clone, Error)] @@ -68,31 +58,10 @@ pub enum SlotBoxError { } impl SlotBox { - pub fn new(page_size: usize, virt_size: usize) -> Self { - assert!(virt_size % page_size == 0 && virt_size >= 64); - - // Allocate (virtual) memory region using mmap. - let base_addr = unsafe { - libc::mmap64( - null_mut(), - virt_size, - PROT_READ | PROT_WRITE, - MAP_ANONYMOUS | MAP_PRIVATE, - -1, - 0, - ) - }; - - if base_addr == libc::MAP_FAILED { - let err = io::Error::last_os_error(); - panic!("Mmap failed for size class virt_size {virt_size}: {err}"); - } - - Self { - base_address: AtomicPtr::new(base_addr as *mut u8), - page_size, - allocator: Mutex::new(Allocator::new(virt_size / page_size)), - } + pub fn new(virt_size: usize) -> Self { + let pool = BufferPool::new(virt_size).expect("Could not create buffer pool"); + let inner = Mutex::new(Inner::new(pool)); + Self { inner } } /// Allocates a new slot for a tuple, somewhere in one of the pages we managed. @@ -103,21 +72,25 @@ impl SlotBox { relation_id: RelationId, initial_value: Option<&[u8]>, ) -> Result { - assert!(size <= (slot_page_empty_size(self.page_size))); + // Pick a buffer pool size. If the tuples are small, we use a reasonable sized page that could in theory hold + // a few tuples, but if the tuples are large, we use a page size that might hold only one or two. + // This way really large values can be slotted into the correct page size. + let tuple_size = size + slot_index_overhead(); + let page_size = max(32768, tuple_size.next_power_of_two()); - let mut allocator = self.allocator.lock().unwrap(); let needed_space = size + slot_index_overhead(); - let (pid, offset) = allocator.find_space( - relation_id, - needed_space, - slot_page_empty_size(self.page_size), - )?; - let mut page_handle = self.page_for(pid); + + let mut inner = self.inner.lock().unwrap(); + // Check if we have a free spot for this relation that can fit the tuple. + let (pid, offset) = + { inner.find_space(relation_id, needed_space, slot_page_empty_size(page_size))? }; + + let mut page_handle = inner.page_for(pid)?; + let free_space = page_handle.available_content_bytes(); - // assert!(free_space >= size); let mut page_write_lock = page_handle.write_lock(); if let Ok((slot_id, page_remaining, _)) = page_write_lock.allocate(size, initial_value) { - allocator.finish_alloc(pid, relation_id, offset, page_remaining); + inner.finish_alloc(pid, relation_id, offset, page_remaining); return Ok((pid, slot_id)); } @@ -125,58 +98,59 @@ impl SlotBox { // data coherence issues between the pages last-reported free space and the actual free panic!( "Page {} failed to allocate, we wanted {} bytes, but it only has {},\ - but our records show it has {}, and its pid in that offset is {}", + but our records show it has {}, and its pid in that offset is {:?}", pid, size, free_space, - allocator.available_page_space[relation_id.0][offset].0, - allocator.available_page_space[relation_id.0][offset].1 + inner.available_page_space[relation_id.0][offset].available, + inner.available_page_space[relation_id.0][offset].bid ); } pub fn remove(&self, id: TupleId) -> Result<(), SlotBoxError> { - let mut page_handle = self.page_for(id.0); - let mut write_lock = page_handle.write_lock(); - self.do_remove(&mut write_lock, id) + let mut inner = self.inner.lock().unwrap(); + inner.do_remove(id) + } + + pub fn restore<'a>(&self, id: PageId) -> Result, SlotBoxError> { + let inner = self.inner.lock().unwrap(); + let (addr, page_size) = match inner.pool.restore(Bid(id as u64)) { + Ok(v) => v, + Err(PagerError::CouldNotAccess) => { + return Err(SlotBoxError::TupleNotFound(id)); + } + Err(e) => { + panic!("Unexpected buffer pool error: {:?}", e); + } + }; + + Ok(SlottedPage::for_page(addr, page_size)) + } + + pub fn page_for<'a>(&self, id: PageId) -> Result, SlotBoxError> { + let inner = self.inner.lock().unwrap(); + inner.page_for(id) } pub fn upcount(&self, id: TupleId) -> Result<(), SlotBoxError> { - let page_handle = self.page_for(id.0); + let inner = self.inner.lock().unwrap(); + let page_handle = inner.page_for(id.0)?; page_handle.upcount(id.1) } pub fn dncount(&self, id: TupleId) -> Result<(), SlotBoxError> { - let page_handle = self.page_for(id.0); + let mut inner = self.inner.lock().unwrap(); + let page_handle = inner.page_for(id.0)?; if page_handle.dncount(id.1)? { - self.remove(id)?; - } - Ok(()) - } - - fn do_remove(&self, page_lock: &mut PageWriteGuard, id: TupleId) -> Result<(), SlotBoxError> { - let (new_free, _, is_empty) = page_lock.remove_slot(id.1)?; - - // Update record in allocator. - let mut allocator = self.allocator.lock().unwrap(); - allocator.report_free(id.0, new_free, is_empty); - - // And if the page is completely free, then we can madvise DONTNEED it and let the OS free - // it from our RSS. - if is_empty { - unsafe { - let result = libc::madvise( - page_lock.page_ptr() as _, - self.page_size, - libc::MADV_DONTNEED, - ); - assert_eq!(result, 0, "madvise failed"); - } + inner.do_remove(id)?; } Ok(()) } pub fn get(&self, id: TupleId) -> Result, SlotBoxError> { - let page_handle = self.page_for(id.0); + let inner = self.inner.lock().unwrap(); + let page_handle = inner.page_for(id.0)?; + let lock = page_handle.read_lock(); let slc = lock.get_slot(id.1)?; @@ -189,9 +163,10 @@ impl SlotBox { id: TupleId, new_value: &[u8], ) -> Result { - // This lock scope has to be limited here, or we'll deadlock if we need to re-allocate. + // The lock scope has to be limited here, or we'll deadlock if we need to re-allocate. { - let mut page_handle = self.page_for(id.0); + let mut inner = self.inner.lock().unwrap(); + let mut page_handle = inner.page_for(id.0)?; // If the value size is the same as the old value, we can just update in place, otherwise // it's a brand new allocation, and we have to remove the old one first. @@ -202,7 +177,7 @@ impl SlotBox { existing.copy_from_slice(new_value); return Ok(id); } - self.do_remove(&mut page_write, id)?; + inner.do_remove(id)?; } let new_id = self.allocate(new_value.len(), relation_id, Some(new_value))?; Ok(new_id) @@ -213,7 +188,8 @@ impl SlotBox { id: TupleId, mut f: F, ) -> Result<(), SlotBoxError> { - let mut page_handle = self.page_for(id.0); + let inner = self.inner.lock().unwrap(); + let mut page_handle = inner.page_for(id.0)?; let mut page_write = page_handle.write_lock(); let existing = page_write.get_slot_mut(id.1).expect("Invalid tuple id"); @@ -222,142 +198,176 @@ impl SlotBox { } pub fn num_pages(&self) -> usize { - let allocator = self.allocator.lock().unwrap(); - allocator.available_page_space.len() + let inner = self.inner.lock().unwrap(); + inner.available_page_space.len() } pub fn used_pages(&self) -> Vec { - let allocator = self.allocator.lock().unwrap(); + let allocator = self.inner.lock().unwrap(); allocator .available_page_space .iter() .flatten() - .map(|(_, pid)| *pid) + .map( + |PageSpace { + available: _, + bid: pid, + }| pid.0 as PageId, + ) .collect() } pub fn mark_page_used(&self, relation_id: RelationId, free_space: usize, pid: PageId) { - let mut allocator = self.allocator.lock().unwrap(); + let mut allocator = self.inner.lock().unwrap(); + let bid = Bid(pid as u64); let Some(available_page_space) = allocator.available_page_space.get_mut(relation_id.0) else { - allocator - .available_page_space - .insert(relation_id.0, vec![(free_space, pid)]); + allocator.available_page_space.insert( + relation_id.0, + vec![PageSpace { + available: free_space, + bid, + }], + ); return; }; // allocator.bitmap.insert(pid as usize); - available_page_space.push((free_space, pid)); - available_page_space.sort_by(|a, b| a.0.cmp(&b.0)); + available_page_space.push(PageSpace { + available: free_space, + bid, + }); + available_page_space.sort_by(|a, b| a.available.cmp(&b.available)); } } -impl SlotBox { - pub fn page_for<'a>(&self, page_num: usize) -> SlottedPage<'a> { - let base_address = self.base_address.load(SeqCst); - let page_address = unsafe { base_address.add(page_num * self.page_size) }; - let page_handle = SlottedPage::for_page(AtomicPtr::new(page_address), self.page_size); - page_handle - } +struct PageSpace { + available: usize, + bid: Bid, } -fn find_empty(bs: &B) -> usize { - let mut iter = bs.iter(); - - let mut pos: Option = None; - // Scan forward until we find the first empty bit. - loop { - match iter.next() { - Some(bit) => { - let p: usize = bit; - if bit != 0 && !bs.contains(p - 1) { - return p - 1; - } - pos = Some(p); - } - // Nothing in the set, or we've reached the end. - None => { - let Some(pos) = pos else { - return 0; - }; - - return pos + 1; - } - } - } -} - -struct Allocator { - max_pages: usize, +struct Inner { + pool: BufferPool, // TODO: could keep two separate vectors here -- one with the page sizes, separate for the page // ids, so that SIMD can be used to used to search and sort. // Will look into it once/if benchmarking justifies it. // The set of used pages, indexed by relation, in sorted order of the free space available in them. - available_page_space: SparseChunk, 64>, - bitmap: hi_sparse_bitset::BitSet, + available_page_space: SparseChunk, 64>, } -impl Allocator { - fn new(max_pages: usize) -> Self { +impl Inner { + fn new(pool: BufferPool) -> Self { Self { - max_pages, available_page_space: SparseChunk::new(), - bitmap: Default::default(), + pool, } } - /// Find room to allocate a new slot of the given size, does not do the actual allocation yet, + fn do_remove(&mut self, id: TupleId) -> Result<(), SlotBoxError> { + let mut page_handle = self.page_for(id.0)?; + let mut write_lock = page_handle.write_lock(); + + let (new_free, _, is_empty) = write_lock.remove_slot(id.1)?; + self.report_free(id.0, new_free, is_empty); + + Ok(()) + } + + fn page_for<'a>(&self, page_num: usize) -> Result, SlotBoxError> { + let (page_address, page_size) = match self.pool.resolve_ptr(Bid(page_num as u64)) { + Ok(v) => v, + Err(PagerError::CouldNotAccess) => { + return Err(SlotBoxError::TupleNotFound(page_num)); + } + Err(e) => { + panic!("Unexpected buffer pool error: {:?}", e); + } + }; + let page_address = page_address.load(SeqCst); + let page_handle = SlottedPage::for_page(AtomicPtr::new(page_address), page_size); + Ok(page_handle) + } + + fn alloc( + &mut self, + relation_id: RelationId, + page_size: usize, + ) -> Result<(PageId, usize), SlotBoxError> { + // Ask the buffer pool for a new page of the given size. + let (bid, _, actual_size) = match self.pool.alloc(page_size) { + Ok(v) => v, + Err(PagerError::InsufficientRoom { desired, available }) => { + return Err(SlotBoxError::BoxFull(desired, available)); + } + Err(e) => { + panic!("Unexpected buffer pool error: {:?}", e); + } + }; + match self.available_page_space.get_mut(relation_id.0) { + Some(available_page_space) => { + available_page_space.push(PageSpace { + available: slot_page_empty_size(actual_size), + bid, + }); + available_page_space.sort_by(|a, b| a.available.cmp(&b.available)); + Ok((bid.0 as PageId, available_page_space.len() - 1)) + } + None => { + self.available_page_space.insert( + relation_id.0, + vec![PageSpace { + available: slot_page_empty_size(actual_size), + bid, + }], + ); + Ok((bid.0 as PageId, 0)) + } + } + } + + /// Find room to allocate a new tuple of the given size, does not do the actual allocation yet, /// just finds the page to allocate it on. /// Returns the page id, and the offset into the `available_page_space` vector for that relation. fn find_space( &mut self, relation_id: RelationId, - bytes: usize, - empty_size: usize, + tuple_size: usize, + page_size: usize, ) -> Result<(PageId, usize), SlotBoxError> { // Do we have a used pages set for this relation? If not, we can start one, and allocate a // new full page to it, and return. When we actually do the allocation, we'll be able to // find the page in the used pages set. let Some(available_page_space) = self.available_page_space.get_mut(relation_id.0) else { - let pid = find_empty(&self.bitmap); - if pid >= self.max_pages { - return Err(SlotBoxError::BoxFull(bytes, 0)); - } - self.available_page_space - .insert(relation_id.0, vec![(empty_size, pid)]); - self.bitmap.insert(pid); - return Ok((pid, 0)); + // Ask the buffer pool for a new buffer. + return self.alloc(relation_id, page_size); }; // Look for the first page with enough space in our vector of used pages, which is kept // sorted by free space. - let found = available_page_space.binary_search_by(|(free_space, _)| free_space.cmp(&bytes)); + let found = available_page_space.binary_search_by( + |PageSpace { + available: free_space, + bid: _, + }| free_space.cmp(&tuple_size), + ); return match found { // Exact match, highly unlikely, but possible. - Ok(entry_num) => Ok((available_page_space[entry_num].1, entry_num)), + Ok(entry_num) => { + let exact_match = (available_page_space[entry_num].bid, entry_num); + let pid = exact_match.0 .0 as PageId; + Ok((pid, entry_num)) + } // Out of room, need to allocate a new page. Err(position) if position == available_page_space.len() => { // If we didn't find a page with enough space, then we need to allocate a new page. - // Find first empty position in the bitset. - let first_empty = find_empty(&self.bitmap); - assert!(!self.bitmap.contains(first_empty)); - assert!(!available_page_space - .iter().any(|(_, p)| *p == first_empty)); - if first_empty >= self.max_pages { - return Err(SlotBoxError::BoxFull(bytes, 0)); - } - - let pid = first_empty as PageId; - available_page_space.push((empty_size, pid)); - - Ok((pid, available_page_space.len() - 1)) + return self.alloc(relation_id, page_size); } // Found a page we add to. Err(entry_num) => { let entry = available_page_space.get_mut(entry_num).unwrap(); - Ok((entry.1, entry_num)) + Ok((entry.bid.0 as PageId, entry_num)) } }; } @@ -371,41 +381,52 @@ impl Allocator { ) { let available_page_space = &mut self.available_page_space[relation_id.0]; let entry = &mut available_page_space[offset]; - assert!(entry.0 >= page_remaining_bytes); - assert_eq!(entry.1, pid); + assert!(entry.available >= page_remaining_bytes); + assert_eq!(entry.bid.0, pid as u64); - entry.0 = page_remaining_bytes; + entry.available = page_remaining_bytes; // If we (unlikely) consumed all the bytes, then we can remove the page from the avail pages // set. - if entry.0 == 0 { + if entry.available == 0 { available_page_space.remove(offset); } - self.bitmap.insert(pid); - available_page_space.sort_by(|a, b| a.0.cmp(&b.0)); + available_page_space.sort_by(|a, b| a.available.cmp(&b.available)); } fn report_free(&mut self, pid: PageId, new_size: usize, is_empty: bool) { // Seek the page in the available_page_space vectors, and add the bytes back to its free space. // We don't know the relation id here, so we have to linear scan all of them. for available_page_space in self.available_page_space.iter_mut() { - let Some(found) = available_page_space.iter_mut().find(|(_, p)| *p == pid) else { + let Some(found) = available_page_space.iter_mut().find( + |PageSpace { + available: _, + bid: p, + }| p.0 == pid as u64, + ) else { continue; }; - found.0 = new_size; + found.available = new_size; // If the page is now totally empty, then we can remove it from the available_page_space vector. if is_empty { - available_page_space.retain(|(_, p)| *p != pid); - self.bitmap.remove(pid); + available_page_space.retain( + |PageSpace { + available: _, + bid: p, + }| p.0 != pid as u64, + ); + self.pool + .free(Bid(pid as u64)) + .expect("Could not free page"); } - available_page_space.sort_by(|a, b| a.0.cmp(&b.0)); + available_page_space.sort_by(|a, b| a.available.cmp(&b.available)); return; } - warn!( - "Page not found in used pages in allocator on free; pid {}", + error!( + "Page not found in used pages in allocator on free; pid {}; could be double-free, dangling weak reference?", pid ); } @@ -417,7 +438,7 @@ mod tests { use rand::{thread_rng, Rng}; use crate::tuplebox::slots::slotbox::{SlotBox, SlotBoxError, TupleId}; - use crate::tuplebox::slots::slotted_page::{slot_page_empty_size, slot_page_overhead}; + use crate::tuplebox::slots::slotted_page::slot_page_empty_size; use crate::tuplebox::RelationId; fn fill_until_full(sb: &mut SlotBox) -> Vec<(TupleId, Vec)> { @@ -426,7 +447,7 @@ mod tests { // fill until full... (SlotBoxError::BoxFull) loop { let mut rng = thread_rng(); - let tuple_len = rng.gen_range(1..(slot_page_empty_size(30000) - slot_page_overhead())); + let tuple_len = rng.gen_range(1..(slot_page_empty_size(52000))); let tuple: Vec = rng.sample_iter(&Alphanumeric).take(tuple_len).collect(); match sb.allocate(tuple.len(), RelationId(0), Some(&tuple)) { Ok(tuple_id) => { @@ -442,15 +463,86 @@ mod tests { } tuples } + + // Just allocate a single tuple, and verify that we can retrieve it. + #[test] + fn test_one_page_one_slot() { + let sb = SlotBox::new(32768 * 64); + let tuple = vec![1, 2, 3, 4, 5]; + let tuple_id = sb + .allocate(tuple.len(), RelationId(0), Some(&tuple)) + .unwrap(); + let retrieved = sb.get(tuple_id).unwrap(); + assert_eq!(tuple, *retrieved); + } + + // Fill just one page and verify that we can retrieve all the tuples. + #[test] + fn test_one_page_a_few_slots() { + let sb = SlotBox::new(32768 * 64); + let mut tuples = Vec::new(); + let mut last_page_id = None; + loop { + let mut rng = thread_rng(); + let tuple_len = rng.gen_range(1..128); + let tuple: Vec = rng.sample_iter(&Alphanumeric).take(tuple_len).collect(); + let tuple_id = sb + .allocate(tuple.len(), RelationId(0), Some(&tuple)) + .unwrap(); + if let Some(last_page_id) = last_page_id { + if last_page_id != tuple_id.0 { + break; + } + } + last_page_id = Some(tuple_id.0); + tuples.push((tuple_id, tuple)); + } + for (id, tuple) in tuples { + let retrieved = sb.get(id).unwrap(); + assert_eq!(tuple, *retrieved); + } + } + + // Fill one page, then overflow into another, and verify we can get the tuple that's on the next page. + #[test] + fn test_page_overflow() { + let sb = SlotBox::new(32768 * 64); + let mut tuples = Vec::new(); + let mut first_page_id = None; + let (next_page_tuple_id, next_page_tuple) = loop { + let mut rng = thread_rng(); + let tuple_len = rng.gen_range(1..128); + let tuple: Vec = rng.sample_iter(&Alphanumeric).take(tuple_len).collect(); + let tuple_id = sb + .allocate(tuple.len(), RelationId(0), Some(&tuple)) + .unwrap(); + if let Some(last_page_id) = first_page_id { + if last_page_id != tuple_id.0 { + break (tuple_id, tuple); + } + } + first_page_id = Some(tuple_id.0); + tuples.push((tuple_id, tuple)); + }; + for (id, tuple) in tuples { + let retrieved = sb.get(id).unwrap(); + assert_eq!(tuple, *retrieved); + } + // Now verify that the last tuple was on another, new page, and that we can retrieve it. + assert_ne!(next_page_tuple_id.0, first_page_id.unwrap()); + let retrieved = sb.get(next_page_tuple_id).unwrap(); + assert_eq!(*retrieved, next_page_tuple); + } + // Generate a pile of random sized tuples (which accumulate to more than a single page size), // and then scan back and verify their presence/equality. #[test] fn test_basic_add_fill_etc() { - let mut sb = SlotBox::new(32768, 32768 * 64); + let mut sb = SlotBox::new(32768 * 32); let tuples = fill_until_full(&mut sb); - for (id, tuple) in &tuples { + for (i, (id, tuple)) in tuples.iter().enumerate() { let retrieved = sb.get(*id).unwrap(); - assert_eq!(*tuple, *retrieved); + assert_eq!(*tuple, *retrieved, "Mismatch at {}th tuple", i); } let used_pages = sb.used_pages(); assert_ne!(used_pages.len(), tuples.len()); @@ -464,7 +556,7 @@ mod tests { // everything mmap DONTNEED'd, and we should be able to re-fill it again, too. #[test] fn test_full_fill_and_empty() { - let mut sb = SlotBox::new(32768, 32768 * 64); + let mut sb = SlotBox::new(32768 * 64); let tuples = fill_until_full(&mut sb); for (id, _) in &tuples { sb.remove(*id).unwrap(); @@ -479,7 +571,7 @@ mod tests { // fill back up again and verify the new presence. #[test] fn test_fill_and_free_and_refill_etc() { - let mut sb = SlotBox::new(32768, 32768 * 64); + let mut sb = SlotBox::new(32768 * 64); let mut tuples = fill_until_full(&mut sb); let mut rng = thread_rng(); let mut freed_tuples = Vec::new(); diff --git a/crates/db/src/tuplebox/slots/slotted_page.rs b/crates/db/src/tuplebox/slots/slotted_page.rs index dc5606c9..498a62b4 100644 --- a/crates/db/src/tuplebox/slots/slotted_page.rs +++ b/crates/db/src/tuplebox/slots/slotted_page.rs @@ -216,7 +216,7 @@ impl<'a> SlottedPage<'a> { let content_length = header.content_length as usize; let index_length = header.index_length as usize; let header_size = std::mem::size_of::(); - + self.page_size - (index_length + content_length + header_size) } @@ -370,6 +370,11 @@ impl<'a> SlottedPage<'a> { let index_entry = self.get_index_entry_mut(slot_id); let new_count = index_entry.refcount.fetch_sub(1, SeqCst); // Return true to indicate that the slot is now unused, but the slotbox will do the actual removal. + assert_ne!( + new_count, 0, + "attempt to double-free slot {}; how did it come to this?", + slot_id + ); if new_count == 1 { return Ok(true); } @@ -514,14 +519,14 @@ impl<'a> SlottedPage<'a> { fn header(&self) -> Pin<&SlottedPageHeader> { // Cast the base address to a pointear to the header let header_ptr = self.base_address.load(SeqCst) as *const SlottedPageHeader; - + unsafe { Pin::new_unchecked(&*header_ptr) } } fn header_mut(&self) -> Pin<&mut SlottedPageHeader> { // Cast the base address to a pointer to the header let header_ptr = self.base_address.load(SeqCst) as *mut SlottedPageHeader; - + unsafe { Pin::new_unchecked(&mut *header_ptr) } } @@ -575,7 +580,7 @@ impl<'a> SlottedPage<'a> { + (slot_id * std::mem::size_of::()); let base_address = self.base_address.load(SeqCst); - + unsafe { let slot_address = base_address.add(index_offset); Pin::new_unchecked(&*(slot_address as *const SlotIndexEntry)) @@ -586,7 +591,7 @@ impl<'a> SlottedPage<'a> { let index_offset = std::mem::size_of::() + (slot_id * std::mem::size_of::()); let base_address = self.base_address.load(SeqCst); - + unsafe { let slot_address = base_address.add(index_offset); Pin::new_unchecked(&mut *(slot_address as *mut SlotIndexEntry)) @@ -638,7 +643,7 @@ impl<'a> PageWriteGuard<'a> { fn header_mut(&self) -> Pin<&mut SlottedPageHeader> { let header_ptr = self.base_address as *mut SlottedPageHeader; - + unsafe { Pin::new_unchecked(&mut *header_ptr) } } } @@ -660,7 +665,7 @@ pub struct PageReadGuard<'a> { impl<'a> PageReadGuard<'a> { fn header(&self) -> Pin<&SlottedPageHeader> { let header_ptr = self.base_address as *const SlottedPageHeader; - + unsafe { Pin::new_unchecked(&*header_ptr) } } diff --git a/crates/db/src/tuplebox/tb.rs b/crates/db/src/tuplebox/tb.rs index fac7940d..b37e8e91 100644 --- a/crates/db/src/tuplebox/tb.rs +++ b/crates/db/src/tuplebox/tb.rs @@ -72,12 +72,11 @@ pub struct TupleBox { impl TupleBox { pub async fn new( memory_size: usize, - page_size: usize, path: Option, relations: &[RelationInfo], num_sequences: usize, ) -> Arc { - let slotbox = Arc::new(SlotBox::new(page_size, memory_size)); + let slotbox = Arc::new(SlotBox::new(memory_size)); let mut base_relations = Vec::with_capacity(relations.len()); for (rid, r) in relations.iter().enumerate() { base_relations.push(BaseRelation::new(slotbox.clone(), RelationId(rid), 0)); diff --git a/crates/db/src/tuplebox/tx/transaction.rs b/crates/db/src/tuplebox/tx/transaction.rs index a4b1fa02..08c9ef89 100644 --- a/crates/db/src/tuplebox/tx/transaction.rs +++ b/crates/db/src/tuplebox/tx/transaction.rs @@ -493,7 +493,6 @@ mod tests { async fn test_db() -> Arc { TupleBox::new( 1 << 24, - 4096, None, &[RelationInfo { name: "test".to_string(), diff --git a/crates/db/tests/jepsen.rs b/crates/db/tests/jepsen.rs index a607a041..653901ac 100644 --- a/crates/db/tests/jepsen.rs +++ b/crates/db/tests/jepsen.rs @@ -29,7 +29,7 @@ pub mod support { }) .collect::>(); - TupleBox::new(1 << 24, 4096, Some(dir), &relations, 0).await + TupleBox::new(1 << 24, Some(dir), &relations, 0).await } } diff --git a/crates/db/tests/tb_restore.rs b/crates/db/tests/tb_restore.rs index 7e5c0de5..90f6e96b 100644 --- a/crates/db/tests/tb_restore.rs +++ b/crates/db/tests/tb_restore.rs @@ -96,7 +96,7 @@ mod test { }) .collect::>(); - TupleBox::new(1 << 24, 4096, Some(dir), &relations, 0).await + TupleBox::new(1 << 24, Some(dir), &relations, 0).await } // Open a db in a test dir, fill it with some goop, close it, reopen it, and check that the goop is still there. @@ -134,14 +134,19 @@ mod test { // Verify the WAL directory is not empty. assert!(std::fs::read_dir(format!("{}/wal", tmpdir_str)) .unwrap() - .next().is_some()); - { + .next() + .is_some()); + + // Now reopen the db and verify that the tuples are still there. We'll do this a few times, to make sure that + // the recovery is working. + for _ in 0..5 { let db = test_db(tmpdir.path().into()).await; // Verify the pages directory is not empty after recovery. assert!(std::fs::read_dir(format!("{}/pages", tmpdir_str)) .unwrap() - .next().is_some()); + .next() + .is_some()); let mut found = HashMap::new(); // Verify all the tuples in all the relations are there for relation in tuples.keys() { diff --git a/crates/db/tests/worldstate_restore.rs b/crates/db/tests/worldstate_restore.rs index b0f048c6..9e03a1e9 100644 --- a/crates/db/tests/worldstate_restore.rs +++ b/crates/db/tests/worldstate_restore.rs @@ -43,14 +43,7 @@ mod test { relations[WorldStateRelation::ObjectParent as usize].secondary_indexed = true; relations[WorldStateRelation::ObjectLocation as usize].secondary_indexed = true; - TupleBox::new( - 1 << 24, - 4096, - Some(dir), - &relations, - WorldStateSequences::COUNT, - ) - .await + TupleBox::new(1 << 24, Some(dir), &relations, WorldStateSequences::COUNT).await } #[tokio::test] @@ -99,14 +92,16 @@ mod test { // Verify the WAL directory is not empty. assert!(std::fs::read_dir(format!("{}/wal", tmpdir_str)) .unwrap() - .next().is_some()); + .next() + .is_some()); { let db = test_db(tmpdir.path().into()).await; // Verify the pages directory is not empty after recovery. assert!(std::fs::read_dir(format!("{}/pages", tmpdir_str)) .unwrap() - .next().is_some()); + .next() + .is_some()); let tx = TupleBoxTransaction::new(db.clone());