Skip to content

Commit

Permalink
More robustness for the page store
Browse files Browse the repository at this point in the history
Issue a (linked) fsync into io_uring along with every page write.

Use eventfd to trigger a notification when each page write completes, and then use that trigger to process completions, to avoid polling.
  • Loading branch information
rdaum committed Jan 2, 2024
1 parent f5a9bec commit 94b6e11
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 27 deletions.
59 changes: 58 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ async-trait = "0.1.76"
tokio = { version = "1.35.1", features = ["full"] }
tokio-test = "0.4.3"
tokio-util = { version = "0.7.10", features = ["full"] }
tokio-eventfd = "0.2.1"

# Used for RPC daemon/client
tmq = "0.4.0"
Expand Down
1 change: 1 addition & 0 deletions crates/db/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ atomic-wait.workspace = true
okaywal.workspace = true
io-uring.workspace = true
hi_sparse_bitset.workspace = true
tokio-eventfd.workspace = true

# For testing & benching common bits
serde_json.workspace = true
Expand Down
54 changes: 38 additions & 16 deletions crates/db/src/tuplebox/coldstorage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ use binary_layout::{define_layout, Field, LayoutAs};
use im::{HashMap, HashSet};
use okaywal::{Entry, EntryId, LogManager, SegmentReader, WriteAheadLog};
use strum::FromRepr;
use tokio::io::AsyncReadExt;
use tokio::select;
use tokio::sync::mpsc::UnboundedReceiver;
use tokio_eventfd::EventFd;
use tracing::{debug, error, info, warn};

use crate::tuplebox::backing::{BackingStoreClient, WriterMessage};
Expand Down Expand Up @@ -61,7 +64,9 @@ impl ColdStorage {
sequences: &mut Vec<u64>,
slot_box: Arc<SlotBox>,
) -> BackingStoreClient {
let page_storage = Arc::new(Mutex::new(PageStore::new(path.join("pages"))));
let eventfd = EventFd::new(0, false).unwrap();

let page_storage = Arc::new(Mutex::new(PageStore::new(path.join("pages"), &eventfd)));
let wal_manager = WalManager {
page_storage: page_storage.clone(),
slot_box: slot_box.clone(),
Expand Down Expand Up @@ -137,7 +142,13 @@ impl ColdStorage {

// Start the listen loop
let (writer_send, writer_receive) = tokio::sync::mpsc::unbounded_channel();
tokio::spawn(Self::listen_loop(writer_receive, wal, slot_box.clone()));
tokio::spawn(Self::listen_loop(
writer_receive,
wal,
slot_box.clone(),
page_storage.clone(),
eventfd,
));

// And return the client to it.
BackingStoreClient::new(writer_send)
Expand All @@ -147,22 +158,33 @@ impl ColdStorage {
mut writer_receive: UnboundedReceiver<WriterMessage>,
wal: WriteAheadLog,
slot_box: Arc<SlotBox>,
ps: Arc<Mutex<PageStore>>,
mut event_fd: EventFd,
) {
let mut buf = [0; 8];
loop {
match writer_receive.recv().await {
Some(WriterMessage::Commit(ts, ws, sequences)) => {
Self::perform_writes(wal.clone(), slot_box.clone(), ts, ws, sequences).await;
}
Some(WriterMessage::Shutdown) => {
// Flush the WAL
wal.shutdown().expect("Unable to flush WAL");

info!("Shutting down WAL writer thread");
return;
}
None => {
error!("Writer thread channel closed, shutting down");
return;
select! {
writer_message = writer_receive.recv() => {
match writer_message {
Some(WriterMessage::Commit(ts, ws, sequences)) => {
Self::perform_writes(wal.clone(), slot_box.clone(), ts, ws, sequences).await;
}
Some(WriterMessage::Shutdown) => {
// Flush the WAL
wal.shutdown().expect("Unable to flush WAL");

info!("Shutting down WAL writer thread");
return;
}
None => {
error!("Writer thread channel closed, shutting down");
return;
}
}
},
// When the eventfd is triggered by the page store, we need to ask it to process completions.
_ = event_fd.read(&mut buf) => {
let _ = ps.lock().unwrap().process_completions();
}
}
}
Expand Down
62 changes: 52 additions & 10 deletions crates/db/src/tuplebox/page_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@
use crate::tuplebox::slots::PageId;
use crate::tuplebox::RelationId;
use im::{HashMap, HashSet};
use io_uring::squeue::Flags;
use io_uring::types::Fd;
use io_uring::{opcode, IoUring};
use std::fs::{File, OpenOptions};
use std::io::Read;
use std::os::fd::{IntoRawFd, RawFd};
use std::os::fd::{AsRawFd, IntoRawFd, RawFd};
use std::path::PathBuf;
use std::pin::Pin;
use tracing::info;
use std::thread::yield_now;
use tokio_eventfd::EventFd;

pub(crate) enum PageStoreMutation {
SyncRelationPage(RelationId, PageId, Box<[u8]>),
Expand All @@ -43,12 +45,17 @@ pub(crate) struct PageStore {
}

impl PageStore {
pub(crate) fn new(dir: PathBuf) -> Self {
pub(crate) fn new(dir: PathBuf, eventfd: &EventFd) -> Self {
// Check for dir path, if not there, create.
if !dir.exists() {
std::fs::create_dir_all(&dir).unwrap();
}
let uring = IoUring::new(8).unwrap();

// Set up the eventfd...
let eventfd_fd = eventfd.as_raw_fd();
uring.submitter().register_eventfd(eventfd_fd).unwrap();

Self {
dir,
uring,
Expand All @@ -57,15 +64,23 @@ impl PageStore {
}
}

// Blocking call to wait for all outstanding requests to complete.
pub(crate) fn wait_complete(&mut self) {
info!("Waiting for {} completions", self.buffers.len());
while !self.buffers.is_empty() {
while let Some(completion) = self.uring.completion().next() {
let request_id = completion.user_data();
self.buffers.remove(&request_id);
}
yield_now();
}
info!("All completions done");
}

pub(crate) fn process_completions(&mut self) -> bool {
while let Some(completion) = self.uring.completion().next() {
let request_id = completion.user_data();
self.buffers.remove(&request_id);
}
self.buffers.is_empty()
}

pub(crate) fn list_pages(&self) -> HashSet<(usize, PageId, RelationId)> {
Expand Down Expand Up @@ -128,9 +143,8 @@ impl PageStore {
Ok(())
}

// TODO: batch submit + fsync
pub(crate) fn write_batch(&mut self, batch: Vec<PageStoreMutation>) -> std::io::Result<()> {
// go through previous completions and remove the buffers
// We can't submit a new batch until all the previous requests have completed.
while let Some(completion) = self.uring.completion().next() {
let request_id = completion.user_data();
self.buffers.remove(&request_id);
Expand All @@ -151,13 +165,27 @@ impl PageStore {

let write_e = opcode::Write::new(Fd(fd), data_ptr as _, len as _)
.build()
.user_data(request_id);
.user_data(request_id)
.flags(Flags::IO_LINK);
unsafe {
self.uring
.submission()
.push(&write_e)
.expect("Unable to push write to submission queue");
}

// Tell the kernel to flush the file to disk after writing it, and this should be
// linked to the write above.
let fsync_e = opcode::Fsync::new(Fd(fd))
.build()
.user_data(request_id)
.flags(Flags::IO_LINK);
unsafe {
self.uring
.submission()
.push(&fsync_e)
.expect("Unable to push fsync to submission queue");
}
}
PageStoreMutation::SyncSequencePage(data) => {
let path = self.dir.join("sequences.page");
Expand All @@ -171,20 +199,34 @@ impl PageStore {

let write_e = opcode::Write::new(Fd(fd), data_ptr as _, len as _)
.build()
.user_data(request_id);
.user_data(request_id)
.flags(Flags::IO_LINK);

unsafe {
self.uring
.submission()
.push(&write_e)
.expect("Unable to push write to submission queue");
}

let fsync_e = opcode::Fsync::new(Fd(fd))
.build()
.user_data(request_id)
.flags(Flags::IO_LINK);
unsafe {
self.uring
.submission()
.push(&fsync_e)
.expect("Unable to push fsync to submission queue");
}
}
PageStoreMutation::DeleteRelationPage(_, _) => {
// TODO
}
}
self.uring.submit()?;
self.uring.submit().expect("Unable to submit to io_uring");
}

Ok(())
}
}

0 comments on commit 94b6e11

Please sign in to comment.