Skip to content

Commit

Permalink
Clean up error types
Browse files Browse the repository at this point in the history
  • Loading branch information
namse committed Nov 11, 2024
1 parent 2665f63 commit 16a0a4c
Show file tree
Hide file tree
Showing 9 changed files with 380 additions and 332 deletions.
31 changes: 21 additions & 10 deletions luda-editor/new-server/bptree/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion luda-editor/new-server/bptree/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ edition = "2021"
targets = ["x86_64-unknown-linux-gnu"]

[dependencies]
anyhow = { version = "1", features = ["backtrace"] }
arc-swap = "1"
bytes = "1"
crc = "3"
futures = "0.3.31"
libc = "0.2"
thiserror = "2"
tokio = { version = "1", features = ["fs", "macros", "rt", "sync", "time"] }

[dev-dependencies]
Expand Down
76 changes: 44 additions & 32 deletions luda-editor/new-server/bptree/src/id_set/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,24 @@ use super::*;
use std::{path::Path, time::Duration};
use tokio::{sync::mpsc::Receiver, time::timeout};

type Result<T> = std::result::Result<T, BackendError>;

pub struct Backend {
file_read_fd: ReadFd,
file_write_fd: WriteFd,
wal: Wal,
cache: PageCache,
request_rx: Receiver<Request>,
request_rx: Receiver<FeBeRequest>,
backend_close_tx: oneshot::Sender<()>,
}

impl Backend {
pub async fn open(
path: impl AsRef<Path>,
request_rx: Receiver<Request>,
request_rx: Receiver<FeBeRequest>,
cache: PageCache,
backend_close_tx: oneshot::Sender<()>,
) -> Result<()> {
) -> std::io::Result<()> {
let path = path.as_ref();

let file = tokio::fs::OpenOptions::new()
Expand Down Expand Up @@ -59,22 +61,22 @@ impl Backend {

let mut txs = Vec::<Tx>::new();

let mut result = Ok(());
let mut result: Result<()> = Ok(());

let start_time = tokio::time::Instant::now();

loop {
for request in requests.drain(..) {
match request {
Request::Insert { id, tx } => {
FeBeRequest::Insert { id, tx } => {
txs.push(Tx::Insert { tx });
result = operator.insert(id).await;
result = operator.insert(id).await.map_err(BackendError::from);
}
Request::Delete { id, tx } => {
FeBeRequest::Delete { id, tx } => {
txs.push(Tx::Delete { tx });
result = operator.delete(id).await;
result = operator.delete(id).await.map_err(BackendError::from);
}
Request::Contains { id, tx } => {
FeBeRequest::Contains { id, tx } => {
let contains_result = operator.contains(id).await;
let tx_result;
match contains_result {
Expand All @@ -84,15 +86,15 @@ impl Backend {
}
Err(err) => {
tx_result = Err(());
result = Err(err);
result = Err(err.into());
}
}
txs.push(Tx::Contains {
tx,
result: tx_result,
});
}
Request::Next {
FeBeRequest::Next {
exclusive_start_id,
tx,
} => {
Expand All @@ -105,7 +107,7 @@ impl Backend {
}
Err(err) => {
tx_result = Err(());
result = Err(err);
result = Err(err.into());
}
};

Expand All @@ -114,7 +116,7 @@ impl Backend {
result: tx_result,
});
}
Request::Close => {
FeBeRequest::Close => {
close_requested = true;
}
}
Expand Down Expand Up @@ -153,22 +155,21 @@ impl Backend {
pages_read_from_file,
} = operator.done();

result = self.wal.update_pages(&updated_pages);
result = self
.wal
.update_pages(&updated_pages)
.map_err(BackendError::from);

if let Err(err) = &result {
if let Some(ExecuteError::ExecutorDown) = err.downcast_ref::<ExecuteError>()
{
eprintln!("Executor down!");
break 'outer;
}
if let Err(BackendError::Wal(WalError::ExecutorDown)) = &result {
eprintln!("Executor down!");
break 'outer;
}

if result.is_ok() {
let mut new_pages = pages_read_from_file;
new_pages.append(&mut updated_pages);
let stale_tuples = self.cache.push(new_pages);
if let Err(err) = self.write_staled_pages(stale_tuples).await {
eprintln!("Error on writing staled pages: {:?}", err);
if self.write_staled_pages(stale_tuples).await.is_err() {
break 'outer;
}
}
Expand Down Expand Up @@ -208,14 +209,17 @@ impl Backend {
});
}

/// Don't fsync
async fn write_staled_pages(&mut self, stale_tuples: Vec<(PageOffset, Page)>) -> Result<()> {
/// NOTE: Don't fsync here.
async fn write_staled_pages(
&mut self,
stale_tuples: Vec<(PageOffset, Page)>,
) -> std::result::Result<(), ()> {
if stale_tuples.is_empty() {
return Ok(());
}
let mut sleep_time = Duration::from_millis(100);
for _ in 0..=10 {
let result: Result<()> = (|| {
let result: std::io::Result<()> = (|| {
for (offset, page) in &stale_tuples {
self.file_write_fd
.write_exact(page.as_slice(), offset.file_offset())?;
Expand All @@ -232,23 +236,31 @@ impl Backend {
sleep_time = (sleep_time * 2).max(Duration::from_secs(4));
}

anyhow::bail!("Too many retrial on writing staled pages");
Err(())
}
}

#[derive(Debug, Error)]
enum BackendError {
#[error("Error on wal: {0}")]
Wal(#[from] WalError),
#[error("io: {0}")]
Io(#[from] std::io::Error),
}

enum Tx {
Insert {
tx: oneshot::Sender<Result<(), ()>>,
tx: oneshot::Sender<std::result::Result<(), ()>>,
},
Delete {
tx: oneshot::Sender<Result<(), ()>>,
tx: oneshot::Sender<std::result::Result<(), ()>>,
},
Contains {
tx: oneshot::Sender<Result<bool, ()>>,
result: Result<bool, ()>,
tx: oneshot::Sender<std::result::Result<bool, ()>>,
result: std::result::Result<bool, ()>,
},
Next {
tx: oneshot::Sender<Result<Option<Vec<Id>>, ()>>,
result: Result<Option<Vec<Id>>, ()>,
tx: oneshot::Sender<std::result::Result<Option<Vec<Id>>, ()>>,
result: std::result::Result<Option<Vec<Id>>, ()>,
},
}
16 changes: 8 additions & 8 deletions luda-editor/new-server/bptree/src/id_set/fd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{
os::fd::{IntoRawFd, RawFd},
};

type Result<T> = anyhow::Result<T>;
type Result<T> = std::io::Result<T>;

pub fn split_file(file: File) -> (ReadFd, WriteFd) {
let fd = file.into_raw_fd();
Expand Down Expand Up @@ -63,10 +63,10 @@ impl ReadFd {
)
};
if len < 0 {
return Err(Error::last_os_error().into());
return Err(Error::last_os_error());
}
if len == 0 {
return Err(Error::from(io::ErrorKind::UnexpectedEof).into());
return Err(Error::from(io::ErrorKind::UnexpectedEof));
}
buf_offset += len as usize;
}
Expand Down Expand Up @@ -97,7 +97,7 @@ impl WriteFd {
)
};
if len < 0 {
return Err(Error::last_os_error().into());
return Err(Error::last_os_error());
}
assert_ne!(len, 0);
buf_offset += len as usize;
Expand All @@ -108,15 +108,15 @@ impl WriteFd {

pub fn set_len(&mut self, len: usize) -> Result<()> {
if unsafe { ftruncate(self.fd, len as _) } < 0 {
Err(Error::last_os_error().into())
Err(Error::last_os_error())
} else {
Ok(())
}
}

pub fn fsync(&mut self) -> Result<()> {
if unsafe { fsync(self.fd) } < 0 {
Err(Error::last_os_error().into())
Err(Error::last_os_error())
} else {
Ok(())
}
Expand All @@ -131,7 +131,7 @@ impl WriteFd {
unsafe { sendfile(self.fd, source.fd(), &mut offset, count - offset as usize) };

if len < 0 {
return Err(Error::last_os_error().into());
return Err(Error::last_os_error());
}
assert!(offset > 0);
offset += len as i64;
Expand All @@ -150,7 +150,7 @@ pub(crate) trait BorrowFd {
unsafe {
let mut stat = std::mem::MaybeUninit::<libc::stat64>::uninit();
if fstat64(self.fd(), stat.as_mut_ptr()) < 0 {
Err(Error::last_os_error().into())
Err(Error::last_os_error())
} else {
Ok(stat.assume_init().st_size as usize)
}
Expand Down
Loading

0 comments on commit 16a0a4c

Please sign in to comment.