diff --git a/crates/memory-usage/src/lib.rs b/crates/memory-usage/src/lib.rs index 1ea3755810c..a89762cb5cb 100644 --- a/crates/memory-usage/src/lib.rs +++ b/crates/memory-usage/src/lib.rs @@ -126,6 +126,13 @@ impl MemoryUsage for std::collections::BTreeMap< } } +impl MemoryUsage for std::collections::BTreeSet { + fn heap_usage(&self) -> usize { + // NB: this is best-effort, since we don't have a `capacity()` method on `BTreeMap`. + self.len() * mem::size_of::() + self.iter().map(|t| t.heap_usage()).sum::() + } +} + #[cfg(feature = "smallvec")] impl MemoryUsage for smallvec::SmallVec where diff --git a/crates/table/src/page.rs b/crates/table/src/page.rs index dc6acacf82f..cfb66d97f10 100644 --- a/crates/table/src/page.rs +++ b/crates/table/src/page.rs @@ -382,6 +382,13 @@ impl PageHeader { pub(super) fn present_rows_storage_ptr_for_test(&self) -> *const () { self.fixed.present_rows.storage().as_ptr().cast() } + + /// Returns the number of var-len granules available for allocation, + /// including those in the "gap" between the fixed-len and var-len part of the page. + fn available_var_len_granules(&self) -> usize { + self.var.freelist_len as usize + + VarLenGranule::space_to_granules(gap_remaining_size(self.var.first, self.fixed.last)) + } } /// Fixed-length row portions must be at least large enough to store a `FreeCellRef`. @@ -1195,6 +1202,12 @@ impl Page { self.header.var.num_granules as usize } + /// Returns the number of var-len granules free to store data, + /// including those in the "gap" between the fixed-len and var-len part of the page. + pub fn available_var_len_granules(&self) -> usize { + self.header.available_var_len_granules() + } + #[cfg(test)] /// # Safety /// @@ -1369,7 +1382,7 @@ impl Page { /// Returns whether the row is full with respect to storing a fixed row with `fixed_row_size` /// and no variable component. - pub fn is_full(&self, fixed_row_size: Size) -> bool { + pub(crate) fn is_full(&self, fixed_row_size: Size) -> bool { !self.has_space_for_row(fixed_row_size, 0) } diff --git a/crates/table/src/pages.rs b/crates/table/src/pages.rs index 14bc46345db..99ad96eadf3 100644 --- a/crates/table/src/pages.rs +++ b/crates/table/src/pages.rs @@ -9,6 +9,7 @@ use super::var_len::VarLenMembers; use core::ops::{ControlFlow, Deref, Index, IndexMut}; use spacetimedb_sats::layout::Size; use spacetimedb_sats::memory_usage::MemoryUsage; +use std::collections::BTreeSet; use std::ops::DerefMut; use thiserror::Error; @@ -39,8 +40,21 @@ impl IndexMut for Pages { pub struct Pages { /// The collection of pages under management. pages: Vec>, - /// The set of pages that aren't yet full. - non_full_pages: Vec, + /// The set of pages that aren't yet full, + /// sorted by the number of var-len granules available in each page. + /// + /// Used during insertion to locate a page with enough space to store a given row. + /// + /// The first value in the tuple is [`Page::available_var_len_granules`], and the second value is the page index. + /// + /// Pages for which [`Page::is_full`] is true are not stored. + /// + /// If multiple pages have the same number of granules available, they are then sorted by `PageIndex`. + /// This maintains a deterministic sort order, + /// so that replaying the same set of operations on multiple datastores + /// will always result in the same layout of rows in pages, + /// regardless of when those datastores were (re)started prior to or during the sequence of operations. + non_full_pages: BTreeSet<(usize, PageIndex)>, } impl MemoryUsage for Pages { @@ -51,6 +65,48 @@ impl MemoryUsage for Pages { } impl Pages { + #[cfg(test)] + pub(crate) fn assert_non_full_pages_consistent(&self, fixed_row_size: Size) { + let mut seen_page_indexes = BTreeSet::new(); + for &(_, page_index) in &self.non_full_pages { + assert!( + seen_page_indexes.insert(page_index), + "page {:?} appears multiple times in non_full_pages", + page_index + ); + } + + for (idx, page) in self.pages.iter().enumerate() { + let page_index = PageIndex(idx as u64); + let is_full = page.is_full(fixed_row_size); + let available_granules = page.available_var_len_granules(); + let entries_for_page: Vec<_> = self + .non_full_pages + .iter() + .copied() + .filter(|&(_, idx)| idx == page_index) + .collect(); + + if is_full { + assert!( + entries_for_page.is_empty(), + "page {:?} has 0 available var-len granules but appears in non_full_pages as {:?}", + page_index, + entries_for_page + ); + } else { + assert_eq!( + entries_for_page, + vec![(available_granules, page_index)], + "page {:?} has {} available var-len granules but non_full_pages has {:?}", + page_index, + available_granules, + entries_for_page + ); + } + } + } + /// Is there space to allocate another page? pub fn can_allocate_new_page(&self) -> Result { let new_idx = self.len(); @@ -78,7 +134,14 @@ impl Pages { page.clear(); } // Mark every page non-full. - self.non_full_pages = (0..self.pages.len()).map(|idx| PageIndex(idx as u64)).collect(); + self.non_full_pages = (0..self.pages.len()) + // We could probably compute the number of available granules once and use it for all pages, + // rather than calling the method on each page, + // but we'd have to do some amount of reasoning to demonstrate it was correct + // based on the definition of `Page::clear`, + // and why bother? + .map(|idx| (self.pages[idx].available_var_len_granules(), PageIndex(idx as u64))) + .collect(); } /// Get a reference to fixed-len row data. @@ -94,7 +157,7 @@ impl Pages { /// returning an error if the new number of pages would overflow `PageIndex::MAX`. /// /// The new page is initially empty, but is not added to the non-full set. - /// Callers should call [`Pages::maybe_mark_page_non_full`] after operating on the new page. + /// Callers should call [`Pages::record_page_non_full`] after operating on the new page. fn allocate_new_page(&mut self, pool: &PagePool, fixed_row_size: Size) -> Result { let new_idx = self.can_allocate_new_page()?; @@ -107,23 +170,10 @@ impl Pages { /// Reserve a new, initially empty page. pub fn reserve_empty_page(&mut self, pool: &PagePool, fixed_row_size: Size) -> Result { let idx = self.allocate_new_page(pool, fixed_row_size)?; - self.mark_page_non_full(idx); + self.record_page_non_full(idx, fixed_row_size); Ok(idx) } - /// Mark the page at `idx` as non-full. - pub fn mark_page_non_full(&mut self, idx: PageIndex) { - self.non_full_pages.push(idx); - } - - /// If the page at `page_index` is not full, - /// add it to the non-full set so that later insertions can access it. - pub fn maybe_mark_page_non_full(&mut self, page_index: PageIndex, fixed_row_size: Size) { - if !self[page_index].is_full(fixed_row_size) { - self.non_full_pages.push(page_index); - } - } - /// Call `f` with a reference to a page which satisfies /// `page.has_space_for_row(fixed_row_size, num_var_len_granules)`. pub fn with_page_to_insert_row( @@ -135,7 +185,7 @@ impl Pages { ) -> Result<(PageIndex, Res), Error> { let page_index = self.find_page_with_space_for_row(pool, fixed_row_size, num_var_len_granules)?; let res = f(&mut self[page_index]); - self.maybe_mark_page_non_full(page_index, fixed_row_size); + self.record_page_non_full(page_index, fixed_row_size); Ok((page_index, res)) } @@ -143,7 +193,7 @@ impl Pages { /// containing `num_var_len_granules` granules of var-len data. /// /// Retrieving a page in this way will remove it from the non-full set. - /// After performing an insertion, the caller should use [`Pages::maybe_mark_page_non_full`] + /// After performing an insertion, the caller should use [`Pages::record_page_available_granules`] /// to restore the page to the non-full set. fn find_page_with_space_for_row( &mut self, @@ -151,14 +201,13 @@ impl Pages { fixed_row_size: Size, num_var_len_granules: usize, ) -> Result { - if let Some((page_idx_idx, page_idx)) = self + if let Some((page_num_free_granules, page_idx)) = self .non_full_pages - .iter() + .range((num_var_len_granules, PageIndex(0))..) .copied() - .enumerate() .find(|(_, page_idx)| self[*page_idx].has_space_for_row(fixed_row_size, num_var_len_granules)) { - self.non_full_pages.swap_remove(page_idx_idx); + self.non_full_pages.remove(&(page_num_free_granules, page_idx)); return Ok(page_idx); } @@ -232,23 +281,86 @@ impl Pages { row_ptr: RowPointer, blob_store: &mut dyn BlobStore, ) -> BlobNumBytes { - let page = &mut self[row_ptr.page_index()]; + let page_index = row_ptr.page_index(); + + self.with_updating_non_full_pages(page_index, fixed_row_size, |this| { + let page = &mut this[page_index]; + + // SAFETY: + // - `row_ptr.page_offset()` does point to a valid row in this page + // as the caller promised that `row_ptr` points to a valid row in `self`. + // + // - `fixed_row_size` is consistent with the size in bytes of the fixed part of the row. + // The size is also conistent with `var_len_visitor`. + unsafe { page.delete_row(row_ptr.page_offset(), fixed_row_size, var_len_visitor, blob_store) } + }) + } + + /// Collect information about the page `self[page_index]` sufficient to update [`Self::non_full_pages`], + /// then run `body` to update the page, and finally update [`Self::non_full_pages`] for its new fullness and capacity. + /// + /// `body` should not update any pages other than the one identified by `page_index`. + fn with_updating_non_full_pages( + &mut self, + page_index: PageIndex, + fixed_row_size: Size, + body: impl FnOnce(&mut Self) -> Ret, + ) -> Ret { + let page = &self[page_index]; + let full_before = page.is_full(fixed_row_size); - // SAFETY: - // - `row_ptr.page_offset()` does point to a valid row in this page - // as the caller promised that `row_ptr` points to a valid row in `self`. - // - // - `fixed_row_size` is consistent with the size in bytes of the fixed part of the row. - // The size is also conistent with `var_len_visitor`. - let blob_store_deleted_bytes = - unsafe { page.delete_row(row_ptr.page_offset(), fixed_row_size, var_len_visitor, blob_store) }; - - // If the page was previously full, mark it as non-full now, - // since we just opened a space in it. + let available_granules_before = page.available_var_len_granules(); + + let ret = body(self); + + self.update_page_non_full(available_granules_before, full_before, page_index, fixed_row_size); + + ret + } + + /// Update [`Self::non_full_pages`] to change the number of var-len granules available in the page at `self[page_index]`, + /// first deleting any old entry and then re-inserting the new entry. + /// + /// The entry for `page` in `self.non_full_granules` should not have been deleted prior to calling this method. + /// If the entry has already been deleted or was never present, instead use [`Self::record_page_non_full`]. + /// + /// `available_granules_before` should be the previous count from [`Page::available_var_len_granules`], + /// prior to whatever operation made space available in the page. + /// This is necessary because `non_full_pages` is a `BTreeSet` sorted by `(available_granules, page_index)`, + /// so locating the `page_index` without the `available_granules` would be slow. + /// + /// `full_before` should be the result of [`Page::is_full`] prior to whatever operation made space available in the page. + /// This is necessary because `non_full_pages` does not store full pages (as the name implies), + /// so we should not attempt to delete the previous entry if the page was previously full. + fn update_page_non_full( + &mut self, + available_granules_before: usize, + full_before: bool, + page_index: PageIndex, + fixed_row_size: Size, + ) { if full_before { - self.mark_page_non_full(row_ptr.page_index()); + debug_assert!(!self.non_full_pages.remove(&(available_granules_before, page_index))); + } else { + let _prev = self.non_full_pages.remove(&(available_granules_before, page_index)); + debug_assert!(_prev); + } + + self.record_page_non_full(page_index, fixed_row_size); + } + + /// Record the number of available var-len granules in the page at `self[page_index]` into [`Self::non_full_pages`]. + /// + /// Prior to calling this function, there must not be an entry for `page_index` in [`Self::non_full_pages`]. + fn record_page_non_full(&mut self, page_index: PageIndex, fixed_row_size: Size) { + debug_assert!(!self.non_full_pages.iter().any(|(_, idx)| *idx == page_index)); + + let page = &self[page_index]; + let available_granules = page.available_var_len_granules(); + + if !page.is_full(fixed_row_size) { + self.non_full_pages.insert((available_granules, page_index)); } - blob_store_deleted_bytes } /// Materialize a view of rows in `self` for which the `filter` returns `true`. @@ -358,7 +470,9 @@ impl Pages { self.non_full_pages = pages .iter() .enumerate() - .filter_map(|(idx, page)| (!page.is_full(fixed_row_size)).then_some(PageIndex(idx as _))) + .filter_map(|(idx, page)| { + (!page.is_full(fixed_row_size)).then_some((page.available_var_len_granules(), PageIndex(idx as _))) + }) .collect(); self.pages = pages; } diff --git a/crates/table/src/table.rs b/crates/table/src/table.rs index 1d406b42eaf..46a321000fc 100644 --- a/crates/table/src/table.rs +++ b/crates/table/src/table.rs @@ -2746,6 +2746,39 @@ pub(crate) mod test { // which is already what the actual implementation does. } + /// Test that the recording of non-full pages and counts of var-len granules available + /// by the page manager are correct after insertions and deletes. + /// + /// Tested here rather than in pages.rs because it's easier to test with typed rows than raw byte buffers. + #[test] + fn non_full_pages_consistent((ty, vals) in generate_typed_row_vec(0..SIZE, 128, 2048)) { + let pool = PagePool::new_for_test(); + let mut blob_store = HashMapBlobStore::default(); + let mut table = table(ty); + let mut inserted_row_ptrs = Vec::new(); + + table.inner.pages.assert_non_full_pages_consistent(table.inner.row_layout.size()); + + // Insert 3 rows at a time, then delete the last 1. + // This keeps the page usage growing towards fullness, but also includes some deletes. + for rows in vals.chunks(3) { + for row in rows { + let row_ptr = match table.insert(&pool, &mut blob_store, row) { + Ok((_, row_ref)) => row_ref.pointer(), + Err(InsertError::Duplicate(_)) => continue, + Err(e) => return Err(TestCaseError::fail(format!("unexpected insert error: {e:?}"))), + }; + inserted_row_ptrs.push(row_ptr); + table.inner.pages.assert_non_full_pages_consistent(table.inner.row_layout.size()); + } + + if let Some(row_ptr) = inserted_row_ptrs.pop() { + table.delete(&mut blob_store, row_ptr, |_| ()); + table.inner.pages.assert_non_full_pages_consistent(table.inner.row_layout.size()); + } + } + } + #[test] fn index_size_reporting_matches_slow_implementations_single_column( (ty, vals) in generate_typed_row_vec(1..SIZE, 128, 2048), @@ -2809,6 +2842,55 @@ pub(crate) mod test { assert!(complex.eq(simple)); } + /// Assert that we prefer inserting into lower-`PageIndex` pages when possible, + /// and that `non_full_pages` is correctly updated even for pages with no var-len granules available. + /// + /// A previous incorrect implementation of [`Pages`] + /// used only the available number of var-len granules to decide if a page was full, + /// not considering that a page with zero var-len granules available could still have space for one or more rows, + /// provided those rows were entirely fixed-len. + /// This test demonstrates that we will insert an entirely fixed-len row into a non-full page with zero granules available. + /// + /// This test also demonstrates that, when searching for a page to insert into, we'll prefer the lowest-indexed page, + /// assuming no other reason to prefer a different page. + #[test] + fn prefer_earlier_non_full_page() { + let pool = PagePool::new_for_test(); + let mut blob_store = HashMapBlobStore::default(); + let mut table = table(ProductType::from([AlgebraicType::I32])); + + let mut inserted_ptrs = Vec::new(); + let mut next_value = 0i32; + while table.num_pages() < 2 { + let (_, row_ref) = table.insert(&pool, &mut blob_store, &product![next_value]).unwrap(); + inserted_ptrs.push(row_ref.pointer()); + next_value += 1; + } + + let first_page = &table.inner.pages[PageIndex(0)]; + let second_page = &table.inner.pages[PageIndex(1)]; + assert!(first_page.is_full(table.row_size())); + assert_eq!(first_page.available_var_len_granules(), 0); + assert!(!second_page.is_full(table.row_size())); + assert!(second_page.available_var_len_granules() > 0); + + let first_ptr = inserted_ptrs[0]; + table.delete(&mut blob_store, first_ptr, |_| ()); + + let first_page = &table.inner.pages[PageIndex(0)]; + assert!(!first_page.is_full(table.row_size())); + assert_eq!(first_page.available_var_len_granules(), 0); + + let (_, row_ref) = table.insert(&pool, &mut blob_store, &product![next_value]).unwrap(); + let new_ptr = row_ref.pointer(); + assert_eq!(new_ptr.page_index(), first_ptr.page_index()); + assert_eq!(new_ptr.page_offset(), first_ptr.page_offset()); + + let first_page = &table.inner.pages[PageIndex(0)]; + assert!(first_page.is_full(table.row_size())); + assert_eq!(first_page.available_var_len_granules(), 0); + } + #[test] #[should_panic] fn read_row_unaligned_page_offset_soundness() {