From 16a0a4c07a66cbdc02f2904108327636a494fb04 Mon Sep 17 00:00:00 2001 From: namse Date: Mon, 11 Nov 2024 05:07:56 +0000 Subject: [PATCH] Clean up error types --- luda-editor/new-server/bptree/Cargo.lock | 31 ++- luda-editor/new-server/bptree/Cargo.toml | 2 +- .../new-server/bptree/src/id_set/backend.rs | 76 ++--- .../new-server/bptree/src/id_set/fd.rs | 16 +- .../new-server/bptree/src/id_set/frontend.rs | 118 ++++---- .../new-server/bptree/src/id_set/mod.rs | 23 +- .../new-server/bptree/src/id_set/operator.rs | 7 +- .../bptree/src/id_set/wal/executor.rs | 180 ++++++++++++ .../bptree/src/id_set/{wal.rs => wal/mod.rs} | 259 ++++-------------- 9 files changed, 380 insertions(+), 332 deletions(-) create mode 100644 luda-editor/new-server/bptree/src/id_set/wal/executor.rs rename luda-editor/new-server/bptree/src/id_set/{wal.rs => wal/mod.rs} (54%) diff --git a/luda-editor/new-server/bptree/Cargo.lock b/luda-editor/new-server/bptree/Cargo.lock index 890ffaaf5..192d6c046 100644 --- a/luda-editor/new-server/bptree/Cargo.lock +++ b/luda-editor/new-server/bptree/Cargo.lock @@ -17,15 +17,6 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" -[[package]] -name = "anyhow" -version = "1.0.92" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "74f37166d7d48a0284b99dd824694c26119c700b53bf0d1540cdb147dbdaaf13" -dependencies = [ - "backtrace", -] - [[package]] name = "arc-swap" version = "1.7.1" @@ -57,12 +48,12 @@ dependencies = [ name = "bptree" version = "0.1.0" dependencies = [ - "anyhow", "arc-swap", "bytes", "crc", "futures", "libc", + "thiserror", "tokio", ] @@ -274,6 +265,26 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "thiserror" +version = "2.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c006c85c7651b3cf2ada4584faa36773bd07bac24acfb39f3c431b36d7e667aa" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "2.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f077553d607adc1caf65430528a576c757a71ed73944b66ebb58ef2bbd243568" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "tokio" version = "1.41.0" diff --git a/luda-editor/new-server/bptree/Cargo.toml b/luda-editor/new-server/bptree/Cargo.toml index 6270b6e9c..5484cf35f 100644 --- a/luda-editor/new-server/bptree/Cargo.toml +++ b/luda-editor/new-server/bptree/Cargo.toml @@ -7,12 +7,12 @@ edition = "2021" targets = ["x86_64-unknown-linux-gnu"] [dependencies] -anyhow = { version = "1", features = ["backtrace"] } arc-swap = "1" bytes = "1" crc = "3" futures = "0.3.31" libc = "0.2" +thiserror = "2" tokio = { version = "1", features = ["fs", "macros", "rt", "sync", "time"] } [dev-dependencies] diff --git a/luda-editor/new-server/bptree/src/id_set/backend.rs b/luda-editor/new-server/bptree/src/id_set/backend.rs index 470aaa119..4e7779ae4 100644 --- a/luda-editor/new-server/bptree/src/id_set/backend.rs +++ b/luda-editor/new-server/bptree/src/id_set/backend.rs @@ -2,22 +2,24 @@ use super::*; use std::{path::Path, time::Duration}; use tokio::{sync::mpsc::Receiver, time::timeout}; +type Result = std::result::Result; + pub struct Backend { file_read_fd: ReadFd, file_write_fd: WriteFd, wal: Wal, cache: PageCache, - request_rx: Receiver, + request_rx: Receiver, backend_close_tx: oneshot::Sender<()>, } impl Backend { pub async fn open( path: impl AsRef, - request_rx: Receiver, + request_rx: Receiver, cache: PageCache, backend_close_tx: oneshot::Sender<()>, - ) -> Result<()> { + ) -> std::io::Result<()> { let path = path.as_ref(); let file = tokio::fs::OpenOptions::new() @@ -59,22 +61,22 @@ impl Backend { let mut txs = Vec::::new(); - let mut result = Ok(()); + let mut result: Result<()> = Ok(()); let start_time = tokio::time::Instant::now(); loop { for request in requests.drain(..) { match request { - Request::Insert { id, tx } => { + FeBeRequest::Insert { id, tx } => { txs.push(Tx::Insert { tx }); - result = operator.insert(id).await; + result = operator.insert(id).await.map_err(BackendError::from); } - Request::Delete { id, tx } => { + FeBeRequest::Delete { id, tx } => { txs.push(Tx::Delete { tx }); - result = operator.delete(id).await; + result = operator.delete(id).await.map_err(BackendError::from); } - Request::Contains { id, tx } => { + FeBeRequest::Contains { id, tx } => { let contains_result = operator.contains(id).await; let tx_result; match contains_result { @@ -84,7 +86,7 @@ impl Backend { } Err(err) => { tx_result = Err(()); - result = Err(err); + result = Err(err.into()); } } txs.push(Tx::Contains { @@ -92,7 +94,7 @@ impl Backend { result: tx_result, }); } - Request::Next { + FeBeRequest::Next { exclusive_start_id, tx, } => { @@ -105,7 +107,7 @@ impl Backend { } Err(err) => { tx_result = Err(()); - result = Err(err); + result = Err(err.into()); } }; @@ -114,7 +116,7 @@ impl Backend { result: tx_result, }); } - Request::Close => { + FeBeRequest::Close => { close_requested = true; } } @@ -153,22 +155,21 @@ impl Backend { pages_read_from_file, } = operator.done(); - result = self.wal.update_pages(&updated_pages); + result = self + .wal + .update_pages(&updated_pages) + .map_err(BackendError::from); - if let Err(err) = &result { - if let Some(ExecuteError::ExecutorDown) = err.downcast_ref::() - { - eprintln!("Executor down!"); - break 'outer; - } + if let Err(BackendError::Wal(WalError::ExecutorDown)) = &result { + eprintln!("Executor down!"); + break 'outer; } if result.is_ok() { let mut new_pages = pages_read_from_file; new_pages.append(&mut updated_pages); let stale_tuples = self.cache.push(new_pages); - if let Err(err) = self.write_staled_pages(stale_tuples).await { - eprintln!("Error on writing staled pages: {:?}", err); + if self.write_staled_pages(stale_tuples).await.is_err() { break 'outer; } } @@ -208,14 +209,17 @@ impl Backend { }); } - /// Don't fsync - async fn write_staled_pages(&mut self, stale_tuples: Vec<(PageOffset, Page)>) -> Result<()> { + /// NOTE: Don't fsync here. + async fn write_staled_pages( + &mut self, + stale_tuples: Vec<(PageOffset, Page)>, + ) -> std::result::Result<(), ()> { if stale_tuples.is_empty() { return Ok(()); } let mut sleep_time = Duration::from_millis(100); for _ in 0..=10 { - let result: Result<()> = (|| { + let result: std::io::Result<()> = (|| { for (offset, page) in &stale_tuples { self.file_write_fd .write_exact(page.as_slice(), offset.file_offset())?; @@ -232,23 +236,31 @@ impl Backend { sleep_time = (sleep_time * 2).max(Duration::from_secs(4)); } - anyhow::bail!("Too many retrial on writing staled pages"); + Err(()) } } +#[derive(Debug, Error)] +enum BackendError { + #[error("Error on wal: {0}")] + Wal(#[from] WalError), + #[error("io: {0}")] + Io(#[from] std::io::Error), +} + enum Tx { Insert { - tx: oneshot::Sender>, + tx: oneshot::Sender>, }, Delete { - tx: oneshot::Sender>, + tx: oneshot::Sender>, }, Contains { - tx: oneshot::Sender>, - result: Result, + tx: oneshot::Sender>, + result: std::result::Result, }, Next { - tx: oneshot::Sender>, ()>>, - result: Result>, ()>, + tx: oneshot::Sender>, ()>>, + result: std::result::Result>, ()>, }, } diff --git a/luda-editor/new-server/bptree/src/id_set/fd.rs b/luda-editor/new-server/bptree/src/id_set/fd.rs index c1b05879f..cbf780f7c 100644 --- a/luda-editor/new-server/bptree/src/id_set/fd.rs +++ b/luda-editor/new-server/bptree/src/id_set/fd.rs @@ -9,7 +9,7 @@ use std::{ os::fd::{IntoRawFd, RawFd}, }; -type Result = anyhow::Result; +type Result = std::io::Result; pub fn split_file(file: File) -> (ReadFd, WriteFd) { let fd = file.into_raw_fd(); @@ -63,10 +63,10 @@ impl ReadFd { ) }; if len < 0 { - return Err(Error::last_os_error().into()); + return Err(Error::last_os_error()); } if len == 0 { - return Err(Error::from(io::ErrorKind::UnexpectedEof).into()); + return Err(Error::from(io::ErrorKind::UnexpectedEof)); } buf_offset += len as usize; } @@ -97,7 +97,7 @@ impl WriteFd { ) }; if len < 0 { - return Err(Error::last_os_error().into()); + return Err(Error::last_os_error()); } assert_ne!(len, 0); buf_offset += len as usize; @@ -108,7 +108,7 @@ impl WriteFd { pub fn set_len(&mut self, len: usize) -> Result<()> { if unsafe { ftruncate(self.fd, len as _) } < 0 { - Err(Error::last_os_error().into()) + Err(Error::last_os_error()) } else { Ok(()) } @@ -116,7 +116,7 @@ impl WriteFd { pub fn fsync(&mut self) -> Result<()> { if unsafe { fsync(self.fd) } < 0 { - Err(Error::last_os_error().into()) + Err(Error::last_os_error()) } else { Ok(()) } @@ -131,7 +131,7 @@ impl WriteFd { unsafe { sendfile(self.fd, source.fd(), &mut offset, count - offset as usize) }; if len < 0 { - return Err(Error::last_os_error().into()); + return Err(Error::last_os_error()); } assert!(offset > 0); offset += len as i64; @@ -150,7 +150,7 @@ pub(crate) trait BorrowFd { unsafe { let mut stat = std::mem::MaybeUninit::::uninit(); if fstat64(self.fd(), stat.as_mut_ptr()) < 0 { - Err(Error::last_os_error().into()) + Err(Error::last_os_error()) } else { Ok(stat.assume_init().st_size as usize) } diff --git a/luda-editor/new-server/bptree/src/id_set/frontend.rs b/luda-editor/new-server/bptree/src/id_set/frontend.rs index 0856941fe..ad4497917 100644 --- a/luda-editor/new-server/bptree/src/id_set/frontend.rs +++ b/luda-editor/new-server/bptree/src/id_set/frontend.rs @@ -1,17 +1,25 @@ use super::*; use std::{ collections::VecDeque, + fmt::Debug, path::{Path, PathBuf}, sync::Arc, }; use tokio::sync::{mpsc, oneshot}; /// Frontend for the IdSet data structure. +#[derive(Clone)] pub struct IdSet { path: PathBuf, - request_tx: Arc>, cache: PageCache, - backend_close_rx: oneshot::Receiver<()>, + request_tx: Arc>, + backend_close_rx: Arc>, +} + +impl Debug for IdSet { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("IdSet").field("path", &self.path).finish() + } } impl IdSet { @@ -22,18 +30,18 @@ impl IdSet { /// - 1 cache is 4KB. 100 `cache_limit` will be 400KB. /// - Put enough `cache_limit`. /// - If `IdSet` cannot find data from cache, it will read from disk, which is very slow. - pub async fn new(path: impl AsRef, cache_limit: usize) -> Result> { + pub async fn new(path: impl AsRef, cache_limit: usize) -> Result { let path = path.as_ref(); let (request_tx, request_rx) = mpsc::channel(4096); let (backend_close_tx, backend_close_rx) = oneshot::channel(); - let this = Arc::new(Self { + let this = Self { path: path.to_path_buf(), request_tx: Arc::new(request_tx), cache: PageCache::new(cache_limit), - backend_close_rx, - }); + backend_close_rx: Arc::new(backend_close_rx), + }; Backend::open(&this.path, request_rx, this.cache.clone(), backend_close_tx).await?; @@ -41,27 +49,11 @@ impl IdSet { } pub async fn insert(&self, id: u128) -> Result<()> { let (tx, rx) = oneshot::channel(); - - self.request_tx - .send(Request::Insert { id, tx }) - .await - .map_err(|_| anyhow::anyhow!("IdSet backend is down"))?; - - rx.await - .map_err(|_| anyhow::anyhow!("Failed to received result from rx, id: {}", id))? - .map_err(|_| anyhow::anyhow!("Failed to insert id: {}", id)) + self.send_request(FeBeRequest::Insert { id, tx }, rx).await } pub async fn delete(&self, id: u128) -> Result<()> { let (tx, rx) = oneshot::channel(); - - self.request_tx - .send(Request::Delete { id, tx }) - .await - .map_err(|_| anyhow::anyhow!("IdSet backend is down"))?; - - rx.await - .map_err(|_| anyhow::anyhow!("Failed to received result from rx, id: {}", id))? - .map_err(|_| anyhow::anyhow!("Failed to delete id: {}", id)) + self.send_request(FeBeRequest::Delete { id, tx }, rx).await } pub async fn contains(&self, id: u128) -> Result { if let Some(cached) = self.cache.contains_id(id) { @@ -69,15 +61,8 @@ impl IdSet { } let (tx, rx) = oneshot::channel(); - - self.request_tx - .send(Request::Contains { id, tx }) + self.send_request(FeBeRequest::Contains { id, tx }, rx) .await - .map_err(|_| anyhow::anyhow!("IdSet backend is down"))?; - - rx.await - .map_err(|_| anyhow::anyhow!("Failed to received result from rx, id: {}", id))? - .map_err(|_| anyhow::anyhow!("Failed to check if id exists: {}", id)) } /// # Return /// - `None` if there is no more data. @@ -87,30 +72,16 @@ impl IdSet { } let (tx, rx) = oneshot::channel(); - - self.request_tx - .send(Request::Next { + self.send_request( + FeBeRequest::Next { exclusive_start_id, tx, - }) - .await - .map_err(|_| anyhow::anyhow!("IdSet backend is down"))?; - - rx.await - .map_err(|_| { - anyhow::anyhow!( - "Failed to received result from rx, exclusive_start_id: {:?}", - exclusive_start_id - ) - })? - .map_err(|_| { - anyhow::anyhow!( - "Failed to get next of exclusive_start_id exists: {:?}", - exclusive_start_id - ) - }) + }, + rx, + ) + .await } - pub fn stream(self: &Arc) -> impl futures::Stream> + 'static + Unpin { + pub fn stream(&self) -> impl futures::Stream> + 'static + Unpin { struct State { exclusive_start_id: Option, ids: VecDeque, @@ -145,15 +116,42 @@ impl IdSet { }, )) } - pub async fn try_close(self: Arc) -> Result<(), Arc> { - let inner = Arc::try_unwrap(self)?; + pub async fn try_close(self) -> std::result::Result<(), Self> { + let Self { + path, + cache, + request_tx, + backend_close_rx, + } = self; + match Arc::try_unwrap(backend_close_rx) { + Ok(backend_close_rx) => { + if request_tx.send(FeBeRequest::Close).await.is_err() { + return Ok(()); + } - if inner.request_tx.send(Request::Close).await.is_err() { - return Ok(()); + _ = backend_close_rx.await; + Ok(()) + } + Err(backend_close_rx) => Err(Self { + path, + cache, + request_tx, + backend_close_rx, + }), } + } + async fn send_request( + &self, + request: FeBeRequest, + rx: oneshot::Receiver>, + ) -> Result { + self.request_tx + .send(request) + .await + .map_err(|_| Error::Broken)?; - _ = inner.backend_close_rx.await; - - Ok(()) + rx.await + .map_err(|_| Error::Broken)? + .map_err(|_| Error::Temporary) } } diff --git a/luda-editor/new-server/bptree/src/id_set/mod.rs b/luda-editor/new-server/bptree/src/id_set/mod.rs index 74339a223..cad581e51 100644 --- a/luda-editor/new-server/bptree/src/id_set/mod.rs +++ b/luda-editor/new-server/bptree/src/id_set/mod.rs @@ -20,34 +20,45 @@ mod operator; mod pages; mod wal; -use anyhow::Result; use backend::*; use cache::*; use fd::*; pub use frontend::*; use operator::*; use pages::*; +use thiserror::Error; use tokio::sync::oneshot; use wal::*; pub type Id = u128; -enum Request { +#[derive(Error, Debug)] +pub enum Error { + #[error("Something broken, please close and reopen the IdSet")] + Broken, + #[error("Temporary error")] + Temporary, + #[error("IO error: {0}")] + Io(#[from] std::io::Error), +} +type Result = std::result::Result; + +enum FeBeRequest { Insert { id: Id, - tx: oneshot::Sender>, + tx: oneshot::Sender>, }, Delete { id: Id, - tx: oneshot::Sender>, + tx: oneshot::Sender>, }, Contains { id: Id, - tx: oneshot::Sender>, + tx: oneshot::Sender>, }, Next { exclusive_start_id: Option, - tx: oneshot::Sender>, ()>>, + tx: oneshot::Sender>, ()>>, }, Close, } diff --git a/luda-editor/new-server/bptree/src/id_set/operator.rs b/luda-editor/new-server/bptree/src/id_set/operator.rs index 668e897e6..7a1e895da 100644 --- a/luda-editor/new-server/bptree/src/id_set/operator.rs +++ b/luda-editor/new-server/bptree/src/id_set/operator.rs @@ -1,6 +1,8 @@ use super::*; use std::collections::{btree_map::Entry, BTreeMap}; +type Result = std::io::Result; + pub struct Operator { cache: CachedPages, pages_updated: BTreeMap, @@ -100,10 +102,7 @@ impl Operator { let contains = leaf_node.contains(id); Ok(contains) } - pub async fn next( - &mut self, - exclusive_start_id: Option, - ) -> std::result::Result>, anyhow::Error> { + pub async fn next(&mut self, exclusive_start_id: Option) -> Result>> { let mut leaf_node_offset = self .find_leaf_node_for(exclusive_start_id.unwrap_or_default()) .await?; diff --git a/luda-editor/new-server/bptree/src/id_set/wal/executor.rs b/luda-editor/new-server/bptree/src/id_set/wal/executor.rs new file mode 100644 index 000000000..ca35370c9 --- /dev/null +++ b/luda-editor/new-server/bptree/src/id_set/wal/executor.rs @@ -0,0 +1,180 @@ +use super::*; +use std::time::Duration; + +pub(crate) struct Executor { + wal_read_fd: ReadFd, + shadow_write_fd: WriteFd, + rx: mpsc::UnboundedReceiver, + read_offset: ReadOffset, + close_tx: oneshot::Sender<()>, +} + +impl Executor { + pub(crate) fn new( + wal_read_fd: ReadFd, + shadow_write_fd: WriteFd, + rx: mpsc::UnboundedReceiver, + read_offset: ReadOffset, + close_tx: oneshot::Sender<()>, + ) -> Self { + Self { + wal_read_fd, + shadow_write_fd, + rx, + read_offset, + close_tx, + } + } + pub(crate) fn start(mut self) { + tokio::spawn(async move { + while let Some(request) = self.rx.recv().await { + match request { + ExecutorRequest::Push { written } => { + self.handle_push(written).await; + } + ExecutorRequest::Reset => { + self.read_offset.reset(); + } + ExecutorRequest::Close => { + let _ = self.close_tx.send(()); + return; + } + } + } + }); + } + pub(crate) async fn handle_push(&mut self, written: usize) { + let mut sleep_time = Duration::from_millis(100); + let mut read_count = 0; + + while read_count < written { + let mut success = false; + + for _ in 0..=10 { + match execute_one( + &self.wal_read_fd, + self.read_offset.get(), + &mut self.shadow_write_fd, + ) + .await + { + Ok(new_read_offset) => { + read_count += new_read_offset - self.read_offset.get(); + self.read_offset.set(new_read_offset); + success = true; + break; + } + Err(err) => { + if err.is_corrupted() { + unreachable!("wal file is corrupted: {:?}", err); + } + + eprintln!( + "Error on execute wal record. error: {:?} Retry after {:?}", + err, sleep_time + ); + tokio::time::sleep(sleep_time).await; + sleep_time = (sleep_time * 2).max(Duration::from_secs(4)); + } + } + } + + if !success { + unreachable!("Too many retrial on writing staled pages"); + } + } + + assert_eq!(written, read_count); + } +} + +/// # Return +/// The next read offset. +/// +/// This function returns next read offset on successful execution +/// because it would be failed in the middle of the execution. +pub(crate) async fn execute_one( + wal_read_fd: &ReadFd, + mut wal_read_offset: usize, + file_write_fd: &mut WriteFd, +) -> Result { + let header = { + let size = size_of::(); + let header = wal_read_fd.read_init::(wal_read_offset).await?; + wal_read_offset += size; + header + }; + + match header.body_types { + // Init + 0 => { + let root_node_offset = PageOffset::new(1); + + let header = Header::new(PageOffset::NULL, root_node_offset, PageOffset::new(2)); + + let root_node = LeafNode::new(PageOffset::NULL, PageOffset::NULL); + + let mut bytes = Vec::with_capacity(size_of::
() + size_of::()); + bytes.put_slice(header.as_slice()); + bytes.put_slice(root_node.as_slice()); + + file_write_fd.set_len(0)?; + file_write_fd.write_exact(&bytes, 0)?; + } + // PutPage + 1 => { + let body = { + let body_length = header.body_length as usize; + if body_length != size_of::() { + return Err(ExecuteError::WrongBodySize { + expected: size_of::(), + actual: body_length, + } + .into()); + } + let body = wal_read_fd.read_init::(wal_read_offset).await?; + wal_read_offset += body_length; + body + }; + + let body_checksum = checksum(body.as_slice()); + let bad_checksum = body_checksum != header.checksum; + if bad_checksum { + return Err(ExecuteError::Checksum { + expected: header.checksum, + actual: body_checksum, + } + .into()); + } + + file_write_fd.write_exact(body.page.as_slice(), body.page_offset.file_offset())?; + } + body_type => { + return Err(ExecuteError::WrongBodyType { body_type }.into()); + } + } + file_write_fd.fsync()?; + + Ok(wal_read_offset) +} + +#[derive(Debug)] +pub(crate) enum ExecutorRequest { + /// Push new wal record + Push { + written: usize, + }, + /// Reset wal file + Reset, + Close, +} + +#[derive(Debug, Error)] +pub(crate) enum ExecuteError { + #[error("checksum error: expected={expected}, actual={actual}")] + Checksum { expected: u64, actual: u64 }, + #[error("wrong body type: {body_type}")] + WrongBodyType { body_type: u8 }, + #[error("wrong body size: expected={expected}, actual={actual}")] + WrongBodySize { expected: usize, actual: usize }, +} diff --git a/luda-editor/new-server/bptree/src/id_set/wal.rs b/luda-editor/new-server/bptree/src/id_set/wal/mod.rs similarity index 54% rename from luda-editor/new-server/bptree/src/id_set/wal.rs rename to luda-editor/new-server/bptree/src/id_set/wal/mod.rs index e13446427..3e2547a40 100644 --- a/luda-editor/new-server/bptree/src/id_set/wal.rs +++ b/luda-editor/new-server/bptree/src/id_set/wal/mod.rs @@ -14,19 +14,20 @@ //! But for the strong consistency and durability, WAL file will be used every start time. //! +mod executor; + use super::*; use crate::checksum; use bytes::BufMut; +use executor::*; use std::{ collections::BTreeMap, - fmt::Display, io::ErrorKind, sync::{atomic::AtomicU64, Arc}, - time::Duration, }; use tokio::{fs::OpenOptions, sync::mpsc}; -type Result = anyhow::Result; +type Result = std::result::Result; pub struct Wal { wal_write_fd: WriteFd, @@ -47,7 +48,7 @@ impl Wal { pub(crate) async fn open( path: std::path::PathBuf, file_write_fd: &mut WriteFd, - ) -> Result { + ) -> std::io::Result { let wal_file = OpenOptions::new() .create(true) .read(true) @@ -97,7 +98,17 @@ impl Wal { if err.is_corrupted() { break; } - return Err(err); + match err { + WalError::Executor(execute_error) => match execute_error { + ExecuteError::Checksum { .. } => unreachable!(), + ExecuteError::WrongBodyType { .. } => unreachable!(), + ExecuteError::WrongBodySize { .. } => unreachable!(), + }, + WalError::Io(error) => { + return Err(error); + } + WalError::ExecutorDown => unreachable!(), + } } }; } @@ -110,13 +121,13 @@ impl Wal { this.written = 0; } - Executor { + Executor::new( wal_read_fd, shadow_write_fd, rx, - read_offset: this.read_offset.clone(), - close_tx: executer_close_tx, - } + this.read_offset.clone(), + executer_close_tx, + ) .start(); Ok(this) @@ -143,7 +154,7 @@ impl Wal { Ok(()) } - fn write_wal(&mut self, body: Body) -> Result<()> { + fn write_wal(&mut self, body: Body) -> std::io::Result<()> { let body_bytes = body.as_slice(); let header = WalHeader { checksum: checksum(body_bytes), @@ -160,7 +171,7 @@ impl Wal { Ok(()) } - fn write(&mut self, buf: &[u8]) -> Result<()> { + fn write(&mut self, buf: &[u8]) -> std::io::Result<()> { self.wal_write_fd .write_exact(buf, self.write_offset + self.written)?; self.written += buf.len(); @@ -174,7 +185,7 @@ impl Wal { .send(ExecutorRequest::Push { written: self.written, }) - .map_err(|_| ExecuteError::ExecutorDown)?; + .map_err(|_| WalError::ExecutorDown)?; self.write_offset += self.written; self.written = 0; @@ -189,7 +200,7 @@ impl Wal { if reader_cached_writer { self.tx .send(ExecutorRequest::Reset) - .map_err(|_| ExecuteError::ExecutorDown)?; + .map_err(|_| WalError::ExecutorDown)?; self.wal_write_fd.set_len(0)?; self.written = 0; self.write_offset = 0; @@ -205,6 +216,30 @@ impl Wal { } } +#[derive(Debug, thiserror::Error)] +pub(crate) enum WalError { + #[error("Error on executor: {0}")] + Executor(#[from] ExecuteError), + #[error("io: {0}")] + Io(#[from] std::io::Error), + #[error("Executor is down")] + ExecutorDown, +} + +impl WalError { + fn is_corrupted(&self) -> bool { + match self { + WalError::Executor(execute_error) => match execute_error { + ExecuteError::Checksum { .. } => true, + ExecuteError::WrongBodyType { .. } => true, + ExecuteError::WrongBodySize { .. } => true, + }, + WalError::Io(error) => error.kind() == ErrorKind::UnexpectedEof, + WalError::ExecutorDown => false, + } + } +} + /// Use MSB as a flag to indicate the reset version #[derive(Debug, Clone)] struct ReadOffset { @@ -244,204 +279,6 @@ impl ReadOffset { } } -struct Executor { - wal_read_fd: ReadFd, - shadow_write_fd: WriteFd, - rx: mpsc::UnboundedReceiver, - read_offset: ReadOffset, - close_tx: oneshot::Sender<()>, -} - -impl Executor { - fn start(mut self) { - tokio::spawn(async move { - while let Some(request) = self.rx.recv().await { - match request { - ExecutorRequest::Push { written } => { - self.handle_push(written).await; - } - ExecutorRequest::Reset => { - self.read_offset.reset(); - } - ExecutorRequest::Close => { - let _ = self.close_tx.send(()); - return; - } - } - } - }); - } - async fn handle_push(&mut self, written: usize) { - let mut sleep_time = Duration::from_millis(100); - let mut read_count = 0; - - while read_count < written { - let mut success = false; - - for _ in 0..=10 { - match execute_one( - &self.wal_read_fd, - self.read_offset.get(), - &mut self.shadow_write_fd, - ) - .await - { - Ok(new_read_offset) => { - read_count += new_read_offset - self.read_offset.get(); - self.read_offset.set(new_read_offset); - success = true; - break; - } - Err(err) => { - if err.is_corrupted() { - unreachable!("wal file is corrupted: {:?}", err); - } - - eprintln!( - "Error on execute wal record. error: {:?} Retry after {:?}", - err, sleep_time - ); - tokio::time::sleep(sleep_time).await; - sleep_time = (sleep_time * 2).max(Duration::from_secs(4)); - } - } - } - - if !success { - unreachable!("Too many retrial on writing staled pages"); - } - } - - assert_eq!(written, read_count); - } -} - -/// # Return -/// The next read offset. -/// -/// This function returns next read offset on successful execution -/// because it would be failed in the middle of the execution. -async fn execute_one( - wal_read_fd: &ReadFd, - mut wal_read_offset: usize, - file_write_fd: &mut WriteFd, -) -> Result { - let header = { - let size = size_of::(); - let header = wal_read_fd.read_init::(wal_read_offset).await?; - wal_read_offset += size; - header - }; - - match header.body_types { - // Init - 0 => { - let root_node_offset = PageOffset::new(1); - - let header = Header::new(PageOffset::NULL, root_node_offset, PageOffset::new(2)); - - let root_node = LeafNode::new(PageOffset::NULL, PageOffset::NULL); - - let mut bytes = Vec::with_capacity(size_of::
() + size_of::()); - bytes.put_slice(header.as_slice()); - bytes.put_slice(root_node.as_slice()); - - file_write_fd.set_len(0)?; - file_write_fd.write_exact(&bytes, 0)?; - } - // PutPage - 1 => { - let body = { - let body_length = header.body_length as usize; - if body_length != size_of::() { - return Err(ExecuteError::WrongBodySize { - expected: size_of::(), - actual: body_length, - } - .into()); - } - let body = wal_read_fd.read_init::(wal_read_offset).await?; - wal_read_offset += body_length; - body - }; - - let body_checksum = checksum(body.as_slice()); - let bad_checksum = body_checksum != header.checksum; - if bad_checksum { - return Err(ExecuteError::Checksum { - expected: header.checksum, - actual: body_checksum, - } - .into()); - } - - file_write_fd.write_exact(body.page.as_slice(), body.page_offset.file_offset())?; - } - body_type => { - return Err(ExecuteError::WrongBodyType { body_type }.into()); - } - } - file_write_fd.fsync()?; - - Ok(wal_read_offset) -} - -#[derive(Debug)] -enum ExecutorRequest { - /// Push new wal record - Push { - written: usize, - }, - /// Reset wal file - Reset, - Close, -} - -#[derive(Debug)] -pub(crate) enum ExecuteError { - #[allow(dead_code)] - Checksum { - expected: u64, - actual: u64, - }, - #[allow(dead_code)] - WrongBodyType { - body_type: u8, - }, - ExecutorDown, - #[allow(dead_code)] - WrongBodySize { - expected: usize, - actual: usize, - }, -} - -trait CorruptionCheck { - fn is_corrupted(&self) -> bool; -} -impl CorruptionCheck for anyhow::Error { - fn is_corrupted(&self) -> bool { - if let Some(err) = self.downcast_ref::() { - match err { - ExecuteError::Checksum { .. } => true, - ExecuteError::WrongBodyType { .. } => true, - ExecuteError::ExecutorDown => false, - ExecuteError::WrongBodySize { .. } => true, - } - } else if let Some(err) = self.downcast_ref::() { - err.kind() == ErrorKind::UnexpectedEof - } else { - false - } - } -} -impl Display for ExecuteError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{:?}", self) - } -} -impl std::error::Error for ExecuteError {} - #[repr(C)] #[derive(Debug)] struct WalHeader {