Skip to content

Commit

Permalink
Improve commentary in coldstorage/page_storage
Browse files Browse the repository at this point in the history
  • Loading branch information
rdaum committed Jan 5, 2024
1 parent 6112c4e commit 93ef45a
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 9 deletions.
5 changes: 3 additions & 2 deletions crates/db/src/tuplebox/coldstorage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,8 @@ impl LogManager for WalManager {
Self::chunk_to_mutations(&chunk, &mut write_batch, &mut evicted);
}
let mut ps = self.page_storage.lock().unwrap();
ps.write_batch(write_batch).expect("Unable to write batch");
ps.enqueue_page_mutations(write_batch)
.expect("Unable to write batch");

Ok(())
}
Expand Down Expand Up @@ -348,7 +349,7 @@ impl LogManager for WalManager {
error!("Unable to lock cold storage");
return Ok(());
};
if let Err(e) = ps.write_batch(write_batch) {
if let Err(e) = ps.enqueue_page_mutations(write_batch) {
error!("Unable to write batch: {:?}", e);
return Ok(());
};
Expand Down
44 changes: 37 additions & 7 deletions crates/db/src/tuplebox/page_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,20 @@ pub(crate) enum PageStoreMutation {
/// Manages the directory of pages, one file per page.
/// Each page is a fixed size.
/// will attempt to use io_uring to do the writes async. reads are synchronous
///
/// TODO: deleted pages are not destroyed, they are just left on disk, which means if the same
/// page id is re-used, the old data could be read.
/// TODO: right now this is a page-per-file which is maybe not the most efficient.
/// TODO: verify the fsync chained to writes via io_uring is actually working, and that
/// the durability guarantees are, at least approximately, correct.
/// TODO: we'll need async reads once eviction/paging is implemented.
/// TODO: it's weird that the eventfd is handled outside of this struct, but the io_uring is
/// handled inside. it got this way because of ownership and initialization patterns, but
/// it's not ideal.
/// TODO: probably end up needing similar functionality for the implementation of the
/// write-ahead-log, so abstract up the notion of an io_uring+eventfd "io q" and use that
/// for both.
pub(crate) struct PageStore {
dir: PathBuf,
uring: IoUring,
Expand All @@ -45,6 +59,9 @@ pub(crate) struct PageStore {
}

impl PageStore {
/// Establish the page store, creating the directories if they don't exist,
/// setting up the io_uring, and tying it to the passed-in eventfd for
/// signaling when requests complete.
pub(crate) fn new(dir: PathBuf, eventfd: &EventFd) -> Self {
// Check for dir path, if not there, create.
if !dir.exists() {
Expand All @@ -64,7 +81,7 @@ impl PageStore {
}
}

// Blocking call to wait for all outstanding requests to complete.
/// Blocking call to wait for all outstanding requests to complete.
pub(crate) fn wait_complete(&mut self) {
while !self.buffers.is_empty() {
while let Some(completion) = self.uring.completion().next() {
Expand All @@ -75,6 +92,8 @@ impl PageStore {
}
}

/// Process any completions that have come in since the last time this was called, and
/// return true if there are no outstanding requests.
pub(crate) fn process_completions(&mut self) -> bool {
while let Some(completion) = self.uring.completion().next() {
let request_id = completion.user_data();
Expand All @@ -83,6 +102,8 @@ impl PageStore {
self.buffers.is_empty()
}

/// Get a catalog of all the pages in the store; their sizes, their page numbers, and the
/// relation they belong to.
pub(crate) fn list_pages(&self) -> HashSet<(usize, PageId, RelationId)> {
let mut pages = HashSet::new();
for entry in std::fs::read_dir(&self.dir).unwrap() {
Expand All @@ -102,6 +123,7 @@ impl PageStore {
pages
}

/// Read the special sequences page into a buffer.
pub(crate) fn read_sequence_page(&self) -> std::io::Result<Option<Vec<u8>>> {
let path = self.dir.join("sequences.page");
let mut file = match File::open(path) {
Expand Down Expand Up @@ -143,12 +165,20 @@ impl PageStore {
Ok(())
}

pub(crate) fn write_batch(&mut self, batch: Vec<PageStoreMutation>) -> std::io::Result<()> {
// We can't submit a new batch until all the previous requests have completed.
while let Some(completion) = self.uring.completion().next() {
let request_id = completion.user_data();
self.buffers.remove(&request_id);
}
/// Enqueue a batch of mutations to be written to disk. Will return immediately after
/// submitting the batch to the kernel via io_uring.
pub(crate) fn enqueue_page_mutations(
&mut self,
batch: Vec<PageStoreMutation>,
) -> std::io::Result<()> {
// We prolly shouldn't submit a new batch until all the previous requests have completed.
// TODO: this isn't actually verifying that all the requests have completed, and there's
// some kind of bug with that because in reality they don't always seem to complete,
// even after the eventfd is triggered. which makes me think something is getting
// dropped on the floor in io_uring completions, even tho the actual pages *do* get
// written to disk.
self.process_completions();

for mutation in batch {
let request_id = self.next_request_id;
self.next_request_id += 1;
Expand Down

0 comments on commit 93ef45a

Please sign in to comment.