diff --git a/Cargo.lock b/Cargo.lock index 2dc29c1b..79ae091c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1012,6 +1012,15 @@ version = "3.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "281e452d3bad4005426416cdba5ccfd4f5c1280e10099e21db27f7c1c28347fc" +[[package]] +name = "fastrand" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e51093e27b0797c359783294ca4f0a911c270184cb10f85783b118614a1501be" +dependencies = [ + "instant", +] + [[package]] name = "fastrand" version = "2.0.1" @@ -1135,6 +1144,21 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" +[[package]] +name = "futures-lite" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49a9d51ce47660b1e808d3c990b4709f2f415d928835a17dfd16991515c46bce" +dependencies = [ + "fastrand 1.9.0", + "futures-core", + "futures-io", + "memchr", + "parking", + "pin-project-lite", + "waker-fn", +] + [[package]] name = "futures-macro" version = "0.3.30" @@ -1516,6 +1540,15 @@ dependencies = [ "hashbrown 0.14.2", ] +[[package]] +name = "instant" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" +dependencies = [ + "cfg-if", +] + [[package]] name = "inventory" version = "0.3.14" @@ -1902,6 +1935,7 @@ dependencies = [ "tempfile", "thiserror", "tokio", + "tokio-eventfd", "tokio-test", "tracing", "tracing-test", @@ -2230,6 +2264,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" +[[package]] +name = "parking" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae" + [[package]] name = "parking_lot" version = "0.12.1" @@ -3184,7 +3224,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "01ce4141aa927a6d1bd34a041795abd0db1cccba5d5f24b009f694bdf3a1f3fa" dependencies = [ "cfg-if", - "fastrand", + "fastrand 2.0.1", "redox_syscall", "rustix", "windows-sys 0.52.0", @@ -3347,6 +3387,17 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "tokio-eventfd" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8574d08892a39f0d7b9d19e1884bf334811dd46f96a4334926858c51a3944f0" +dependencies = [ + "futures-lite", + "libc", + "tokio", +] + [[package]] name = "tokio-macros" version = "2.2.0" @@ -3753,6 +3804,12 @@ version = "0.0.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9dcc60c0624df774c82a0ef104151231d37da4962957d691c011c852b2473314" +[[package]] +name = "waker-fn" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3c4517f54858c779bbcbf228f4fca63d121bf85fbecb2dc578cdf4a39395690" + [[package]] name = "walkdir" version = "2.4.0" diff --git a/Cargo.toml b/Cargo.toml index 30990237..0ff9208f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,6 +38,7 @@ async-trait = "0.1.76" tokio = { version = "1.35.1", features = ["full"] } tokio-test = "0.4.3" tokio-util = { version = "0.7.10", features = ["full"] } +tokio-eventfd = "0.2.1" # Used for RPC daemon/client tmq = "0.4.0" diff --git a/crates/db/Cargo.toml b/crates/db/Cargo.toml index 8a6a0ce0..d0c1ab0f 100644 --- a/crates/db/Cargo.toml +++ b/crates/db/Cargo.toml @@ -51,6 +51,7 @@ atomic-wait.workspace = true okaywal.workspace = true io-uring.workspace = true hi_sparse_bitset.workspace = true +tokio-eventfd.workspace = true # For testing & benching common bits serde_json.workspace = true diff --git a/crates/db/src/tuplebox/coldstorage.rs b/crates/db/src/tuplebox/coldstorage.rs index e74dfc38..3cfe6d32 100644 --- a/crates/db/src/tuplebox/coldstorage.rs +++ b/crates/db/src/tuplebox/coldstorage.rs @@ -22,7 +22,10 @@ use binary_layout::{define_layout, Field, LayoutAs}; use im::{HashMap, HashSet}; use okaywal::{Entry, EntryId, LogManager, SegmentReader, WriteAheadLog}; use strum::FromRepr; +use tokio::io::AsyncReadExt; +use tokio::select; use tokio::sync::mpsc::UnboundedReceiver; +use tokio_eventfd::EventFd; use tracing::{debug, error, info, warn}; use crate::tuplebox::backing::{BackingStoreClient, WriterMessage}; @@ -61,7 +64,9 @@ impl ColdStorage { sequences: &mut Vec, slot_box: Arc, ) -> BackingStoreClient { - let page_storage = Arc::new(Mutex::new(PageStore::new(path.join("pages")))); + let eventfd = EventFd::new(0, false).unwrap(); + + let page_storage = Arc::new(Mutex::new(PageStore::new(path.join("pages"), &eventfd))); let wal_manager = WalManager { page_storage: page_storage.clone(), slot_box: slot_box.clone(), @@ -137,7 +142,13 @@ impl ColdStorage { // Start the listen loop let (writer_send, writer_receive) = tokio::sync::mpsc::unbounded_channel(); - tokio::spawn(Self::listen_loop(writer_receive, wal, slot_box.clone())); + tokio::spawn(Self::listen_loop( + writer_receive, + wal, + slot_box.clone(), + page_storage.clone(), + eventfd, + )); // And return the client to it. BackingStoreClient::new(writer_send) @@ -147,22 +158,33 @@ impl ColdStorage { mut writer_receive: UnboundedReceiver, wal: WriteAheadLog, slot_box: Arc, + ps: Arc>, + mut event_fd: EventFd, ) { + let mut buf = [0; 8]; loop { - match writer_receive.recv().await { - Some(WriterMessage::Commit(ts, ws, sequences)) => { - Self::perform_writes(wal.clone(), slot_box.clone(), ts, ws, sequences).await; - } - Some(WriterMessage::Shutdown) => { - // Flush the WAL - wal.shutdown().expect("Unable to flush WAL"); - - info!("Shutting down WAL writer thread"); - return; - } - None => { - error!("Writer thread channel closed, shutting down"); - return; + select! { + writer_message = writer_receive.recv() => { + match writer_message { + Some(WriterMessage::Commit(ts, ws, sequences)) => { + Self::perform_writes(wal.clone(), slot_box.clone(), ts, ws, sequences).await; + } + Some(WriterMessage::Shutdown) => { + // Flush the WAL + wal.shutdown().expect("Unable to flush WAL"); + + info!("Shutting down WAL writer thread"); + return; + } + None => { + error!("Writer thread channel closed, shutting down"); + return; + } + } + }, + // When the eventfd is triggered by the page store, we need to ask it to process completions. + _ = event_fd.read(&mut buf) => { + let _ = ps.lock().unwrap().process_completions(); } } } diff --git a/crates/db/src/tuplebox/page_storage.rs b/crates/db/src/tuplebox/page_storage.rs index 1b3c881c..8eeb873d 100644 --- a/crates/db/src/tuplebox/page_storage.rs +++ b/crates/db/src/tuplebox/page_storage.rs @@ -17,14 +17,16 @@ use crate::tuplebox::slots::PageId; use crate::tuplebox::RelationId; use im::{HashMap, HashSet}; +use io_uring::squeue::Flags; use io_uring::types::Fd; use io_uring::{opcode, IoUring}; use std::fs::{File, OpenOptions}; use std::io::Read; -use std::os::fd::{IntoRawFd, RawFd}; +use std::os::fd::{AsRawFd, IntoRawFd, RawFd}; use std::path::PathBuf; use std::pin::Pin; -use tracing::info; +use std::thread::yield_now; +use tokio_eventfd::EventFd; pub(crate) enum PageStoreMutation { SyncRelationPage(RelationId, PageId, Box<[u8]>), @@ -43,12 +45,17 @@ pub(crate) struct PageStore { } impl PageStore { - pub(crate) fn new(dir: PathBuf) -> Self { + pub(crate) fn new(dir: PathBuf, eventfd: &EventFd) -> Self { // Check for dir path, if not there, create. if !dir.exists() { std::fs::create_dir_all(&dir).unwrap(); } let uring = IoUring::new(8).unwrap(); + + // Set up the eventfd... + let eventfd_fd = eventfd.as_raw_fd(); + uring.submitter().register_eventfd(eventfd_fd).unwrap(); + Self { dir, uring, @@ -57,15 +64,23 @@ impl PageStore { } } + // Blocking call to wait for all outstanding requests to complete. pub(crate) fn wait_complete(&mut self) { - info!("Waiting for {} completions", self.buffers.len()); while !self.buffers.is_empty() { while let Some(completion) = self.uring.completion().next() { let request_id = completion.user_data(); self.buffers.remove(&request_id); } + yield_now(); } - info!("All completions done"); + } + + pub(crate) fn process_completions(&mut self) -> bool { + while let Some(completion) = self.uring.completion().next() { + let request_id = completion.user_data(); + self.buffers.remove(&request_id); + } + self.buffers.is_empty() } pub(crate) fn list_pages(&self) -> HashSet<(usize, PageId, RelationId)> { @@ -128,9 +143,8 @@ impl PageStore { Ok(()) } - // TODO: batch submit + fsync pub(crate) fn write_batch(&mut self, batch: Vec) -> std::io::Result<()> { - // go through previous completions and remove the buffers + // We can't submit a new batch until all the previous requests have completed. while let Some(completion) = self.uring.completion().next() { let request_id = completion.user_data(); self.buffers.remove(&request_id); @@ -151,13 +165,27 @@ impl PageStore { let write_e = opcode::Write::new(Fd(fd), data_ptr as _, len as _) .build() - .user_data(request_id); + .user_data(request_id) + .flags(Flags::IO_LINK); unsafe { self.uring .submission() .push(&write_e) .expect("Unable to push write to submission queue"); } + + // Tell the kernel to flush the file to disk after writing it, and this should be + // linked to the write above. + let fsync_e = opcode::Fsync::new(Fd(fd)) + .build() + .user_data(request_id) + .flags(Flags::IO_LINK); + unsafe { + self.uring + .submission() + .push(&fsync_e) + .expect("Unable to push fsync to submission queue"); + } } PageStoreMutation::SyncSequencePage(data) => { let path = self.dir.join("sequences.page"); @@ -171,20 +199,34 @@ impl PageStore { let write_e = opcode::Write::new(Fd(fd), data_ptr as _, len as _) .build() - .user_data(request_id); + .user_data(request_id) + .flags(Flags::IO_LINK); + unsafe { self.uring .submission() .push(&write_e) .expect("Unable to push write to submission queue"); } + + let fsync_e = opcode::Fsync::new(Fd(fd)) + .build() + .user_data(request_id) + .flags(Flags::IO_LINK); + unsafe { + self.uring + .submission() + .push(&fsync_e) + .expect("Unable to push fsync to submission queue"); + } } PageStoreMutation::DeleteRelationPage(_, _) => { // TODO } } - self.uring.submit()?; + self.uring.submit().expect("Unable to submit to io_uring"); } + Ok(()) } }