diff --git a/crates/db/src/tuplebox/coldstorage.rs b/crates/db/src/tuplebox/coldstorage.rs index 18c0fed71..5f6a4a731 100644 --- a/crates/db/src/tuplebox/coldstorage.rs +++ b/crates/db/src/tuplebox/coldstorage.rs @@ -313,7 +313,8 @@ impl LogManager for WalManager { Self::chunk_to_mutations(&chunk, &mut write_batch, &mut evicted); } let mut ps = self.page_storage.lock().unwrap(); - ps.write_batch(write_batch).expect("Unable to write batch"); + ps.enqueue_page_mutations(write_batch) + .expect("Unable to write batch"); Ok(()) } @@ -348,7 +349,7 @@ impl LogManager for WalManager { error!("Unable to lock cold storage"); return Ok(()); }; - if let Err(e) = ps.write_batch(write_batch) { + if let Err(e) = ps.enqueue_page_mutations(write_batch) { error!("Unable to write batch: {:?}", e); return Ok(()); }; diff --git a/crates/db/src/tuplebox/page_storage.rs b/crates/db/src/tuplebox/page_storage.rs index 2472a68c7..5f8043fb4 100644 --- a/crates/db/src/tuplebox/page_storage.rs +++ b/crates/db/src/tuplebox/page_storage.rs @@ -37,6 +37,20 @@ pub(crate) enum PageStoreMutation { /// Manages the directory of pages, one file per page. /// Each page is a fixed size. /// will attempt to use io_uring to do the writes async. reads are synchronous +/// +/// TODO: deleted pages are not destroyed, they are just left on disk, which means if the same +/// page id is re-used, the old data could be read. +/// TODO: right now this is a page-per-file which is maybe not the most efficient. +/// TODO: verify the fsync chained to writes via io_uring is actually working, and that +/// the durability guarantees are, at least approximately, correct. +/// TODO: we'll need async reads once eviction/paging is implemented. +/// TODO: it's weird that the eventfd is handled outside of this struct, but the io_uring is +/// handled inside. it got this way because of ownership and initialization patterns, but +/// it's not ideal. +/// TODO: probably end up needing similar functionality for the implementation of the +/// write-ahead-log, so abstract up the notion of an io_uring+eventfd "io q" and use that +/// for both. + pub(crate) struct PageStore { dir: PathBuf, uring: IoUring, @@ -45,6 +59,9 @@ pub(crate) struct PageStore { } impl PageStore { + /// Establish the page store, creating the directories if they don't exist, + /// setting up the io_uring, and tying it to the passed-in eventfd for + /// signaling when requests complete. pub(crate) fn new(dir: PathBuf, eventfd: &EventFd) -> Self { // Check for dir path, if not there, create. if !dir.exists() { @@ -64,7 +81,7 @@ impl PageStore { } } - // Blocking call to wait for all outstanding requests to complete. + /// Blocking call to wait for all outstanding requests to complete. pub(crate) fn wait_complete(&mut self) { while !self.buffers.is_empty() { while let Some(completion) = self.uring.completion().next() { @@ -75,6 +92,8 @@ impl PageStore { } } + /// Process any completions that have come in since the last time this was called, and + /// return true if there are no outstanding requests. pub(crate) fn process_completions(&mut self) -> bool { while let Some(completion) = self.uring.completion().next() { let request_id = completion.user_data(); @@ -83,6 +102,8 @@ impl PageStore { self.buffers.is_empty() } + /// Get a catalog of all the pages in the store; their sizes, their page numbers, and the + /// relation they belong to. pub(crate) fn list_pages(&self) -> HashSet<(usize, PageId, RelationId)> { let mut pages = HashSet::new(); for entry in std::fs::read_dir(&self.dir).unwrap() { @@ -102,6 +123,7 @@ impl PageStore { pages } + /// Read the special sequences page into a buffer. pub(crate) fn read_sequence_page(&self) -> std::io::Result>> { let path = self.dir.join("sequences.page"); let mut file = match File::open(path) { @@ -143,12 +165,20 @@ impl PageStore { Ok(()) } - pub(crate) fn write_batch(&mut self, batch: Vec) -> std::io::Result<()> { - // We can't submit a new batch until all the previous requests have completed. - while let Some(completion) = self.uring.completion().next() { - let request_id = completion.user_data(); - self.buffers.remove(&request_id); - } + /// Enqueue a batch of mutations to be written to disk. Will return immediately after + /// submitting the batch to the kernel via io_uring. + pub(crate) fn enqueue_page_mutations( + &mut self, + batch: Vec, + ) -> std::io::Result<()> { + // We prolly shouldn't submit a new batch until all the previous requests have completed. + // TODO: this isn't actually verifying that all the requests have completed, and there's + // some kind of bug with that because in reality they don't always seem to complete, + // even after the eventfd is triggered. which makes me think something is getting + // dropped on the floor in io_uring completions, even tho the actual pages *do* get + // written to disk. + self.process_completions(); + for mutation in batch { let request_id = self.next_request_id; self.next_request_id += 1;