Skip to content

Commit

Permalink
try_close() and Test wal recovery
Browse files Browse the repository at this point in the history
  • Loading branch information
namse committed Nov 7, 2024
1 parent 5ca0e44 commit ea80807
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 64 deletions.
90 changes: 59 additions & 31 deletions luda-editor/new-server/bptree/src/id_set/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ pub struct Backend {
wal: Wal,
cache: PageCache,
request_rx: Receiver<Request>,
backend_close_tx: oneshot::Sender<()>,
}

impl Backend {
pub fn open(
path: impl AsRef<Path>,
request_rx: Receiver<Request>,
cache: PageCache,
backend_close_tx: oneshot::Sender<()>,
) -> Result<()> {
let path = path.as_ref();

Expand All @@ -35,48 +37,62 @@ impl Backend {
cache,
request_rx,
file_write_fd,
backend_close_tx,
};
this.run();
Ok(())
}

fn run(mut self) {
tokio::spawn(async move {
loop {
let Some(mut request) = self.request_rx.recv().await else {
break;
let mut close_requested = false;
'outer: while !close_requested {
const LIMIT: usize = 64;
let mut requests = Vec::with_capacity(LIMIT);

if self.request_rx.recv_many(&mut requests, LIMIT).await == 0 {
break 'outer;
};

let mut operator = Operator::new(self.cache.load(), self.file_read_fd.clone());

let mut txs = Vec::<Tx>::new();

let mut result;
let mut result = Ok(());

let start_time = tokio::time::Instant::now();

loop {
match request {
Request::Insert { id, tx } => {
txs.push(Tx::Insert { tx });
result = operator.insert(id);
}
Request::Delete { id, tx } => {
txs.push(Tx::Delete { tx });
result = operator.delete(id);
}
Request::Contains { id, tx } => {
let mut contains = false;
let contains_result = operator.contains(id);
if let Ok(true) = contains_result {
contains = true;
for request in requests.drain(..) {
match request {
Request::Insert { id, tx } => {
txs.push(Tx::Insert { tx });
result = operator.insert(id);
}
Request::Delete { id, tx } => {
txs.push(Tx::Delete { tx });
result = operator.delete(id);
}
Request::Contains { id, tx } => {
let mut contains = false;
let contains_result = operator.contains(id);
if let Ok(true) = contains_result {
contains = true;
}
let tx = Tx::Contains { tx, contains };
txs.push(tx);
result = contains_result.map(|_| ());
}
Request::Close => {
close_requested = true;
}
let tx = Tx::Contains { tx, contains };
txs.push(tx);
result = contains_result.map(|_| ());
}
}

if close_requested {
break;
}

if result.is_err() {
break;
}
Expand All @@ -85,11 +101,17 @@ impl Backend {
break;
}

match timeout(Duration::from_millis(1), self.request_rx.recv()).await {
Ok(Some(_request)) => {
request = _request;
match timeout(
Duration::from_millis(1),
self.request_rx.recv_many(&mut requests, LIMIT),
)
.await
{
Ok(recv) => {
if recv == 0 {
break;
}
}
Ok(None) => break,
Err(_) => break,
}
}
Expand All @@ -106,15 +128,18 @@ impl Backend {
if let Some(ExecuteError::ExecutorDown) = err.downcast_ref::<ExecuteError>()
{
eprintln!("Executor down!");
return;
break 'outer;
}
}

if result.is_ok() {
let mut new_pages = pages_read_from_file;
new_pages.append(&mut updated_pages);
let stale_tuples = self.cache.push(new_pages);
self.write_staled_pages(stale_tuples).await;
if let Err(err) = self.write_staled_pages(stale_tuples).await {
eprintln!("Error on writing staled pages: {:?}", err);
break 'outer;
}
}
}

Expand All @@ -139,13 +164,16 @@ impl Backend {
}
});
}

_ = self.wal.close().await;
_ = self.backend_close_tx.send(());
});
}

/// Don't fsync
async fn write_staled_pages(&mut self, stale_tuples: Vec<(PageOffset, Page)>) {
async fn write_staled_pages(&mut self, stale_tuples: Vec<(PageOffset, Page)>) -> Result<()> {
if stale_tuples.is_empty() {
return;
return Ok(());
}
let mut sleep_time = Duration::from_millis(100);
for _ in 0..=10 {
Expand All @@ -158,15 +186,15 @@ impl Backend {
})();

if result.is_ok() {
return;
return Ok(());
}

eprintln!("Error on writing staled pages: {:?}", result);
tokio::time::sleep(sleep_time).await;
sleep_time = (sleep_time * 2).max(Duration::from_secs(4));
}

unreachable!("Too many retrial on writing staled pages");
anyhow::bail!("Too many retrial on writing staled pages");
}
}

Expand Down
49 changes: 17 additions & 32 deletions luda-editor/new-server/bptree/src/id_set/frontend.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,22 @@
use super::*;
use std::{
collections::{hash_map::Entry, HashMap},
path::{Path, PathBuf},
sync::{Arc, Mutex, OnceLock, Weak},
sync::Arc,
};
use tokio::sync::{mpsc, oneshot};

/// Frontend for the IdSet data structure.
#[derive(Clone)]
pub struct IdSet {
path: PathBuf,
request_tx: Arc<mpsc::Sender<Request>>,
cache: PageCache,
backend_close_rx: oneshot::Receiver<()>,
}

type OpenedPaths = HashMap<PathBuf, Weak<IdSet>>;
static OPENED_PATHS: OnceLock<Arc<Mutex<OpenedPaths>>> = OnceLock::new();

impl IdSet {
/// - `path`
/// - The path to the file where the data is stored.
/// - **Make sure no one is using this file.**
/// - `cache_limit`
/// - 1 cache is 4KB. 100 `cache_limit` will be 400KB.
/// - Put enough `cache_limit`.
Expand All @@ -26,30 +25,16 @@ impl IdSet {
let path = path.as_ref();

let (request_tx, request_rx) = mpsc::channel(4096);
let (backend_close_tx, backend_close_rx) = oneshot::channel();

let this = Arc::new(Self {
path: path.to_path_buf(),
request_tx: Arc::new(request_tx),
cache: PageCache::new(cache_limit),
backend_close_rx,
});

{
match OPENED_PATHS
.get_or_init(Default::default)
.lock()
.unwrap()
.entry(this.path.clone())
{
Entry::Occupied(_) => {
return Err(anyhow::anyhow!("IdSet already opened at path: {:?}", path));
}
Entry::Vacant(entry) => {
entry.insert(Arc::downgrade(&this));
}
}
}

Backend::open(&this.path, request_rx, this.cache.clone())?;
Backend::open(&this.path, request_rx, this.cache.clone(), backend_close_tx)?;

Ok(this)
}
Expand Down Expand Up @@ -93,15 +78,15 @@ impl IdSet {
.map_err(|_| anyhow::anyhow!("Failed to received result from rx, id: {}", id))?
.map_err(|_| anyhow::anyhow!("Failed to check if id exists: {}", id))
}
}
pub async fn try_close(self: Arc<Self>) -> Result<(), Arc<Self>> {
let inner = Arc::try_unwrap(self)?;

if inner.request_tx.send(Request::Close).await.is_err() {
return Ok(());
}

_ = inner.backend_close_rx.await;

impl Drop for IdSet {
fn drop(&mut self) {
OPENED_PATHS
.get()
.unwrap()
.lock()
.unwrap()
.remove(&self.path);
Ok(())
}
}
39 changes: 39 additions & 0 deletions luda-editor/new-server/bptree/src/id_set/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ enum Request {
id: u128,
tx: oneshot::Sender<Result<bool, ()>>,
},
Close,
}

#[cfg(test)]
Expand Down Expand Up @@ -259,4 +260,42 @@ mod test {
}
join_set.join_all().await;
}

#[tokio::test]
async fn test_insert_turn_off_contains() {
let path = std::env::temp_dir().join("test_insert_turn_off_contains");
if path.exists() {
std::fs::remove_file(&path).unwrap();
}
let wal_path = path.with_extension("wal");
if wal_path.exists() {
std::fs::remove_file(&wal_path).unwrap();
}
std::fs::create_dir_all(path.parent().unwrap()).unwrap();

let set = IdSet::new(&path, 5000).unwrap();
let mut join_set = JoinSet::new();
for i in 1..=10000 {
let set = set.clone();
join_set.spawn(async move { set.insert(i as Id).await });
}
join_set.join_all().await;

assert!(set.try_close().await.is_ok());

let set = IdSet::new(path, 5000).unwrap();
let mut join_set = JoinSet::new();
for i in 1..=20000 {
let set = set.clone();
join_set.spawn(async move {
let contains = set.contains(i as Id).await.unwrap();
if i <= 10000 {
assert!(contains, "{i}");
} else {
assert!(!contains, "{i}");
}
});
}
join_set.join_all().await;
}
}
19 changes: 18 additions & 1 deletion luda-editor/new-server/bptree/src/id_set/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ pub struct Wal {
write_offset: usize,
written: usize,
tx: mpsc::UnboundedSender<ExecutorRequest>,
executer_close_rx: oneshot::Receiver<()>,
}

impl Wal {
Expand All @@ -64,6 +65,7 @@ impl Wal {
let (shadow_read_fd, mut shadow_write_fd) = split_file(shadow_file);

let (tx, rx) = mpsc::unbounded_channel();
let (executer_close_tx, executer_close_rx) = oneshot::channel();

let mut this = Self {
wal_write_fd,
Expand All @@ -72,6 +74,7 @@ impl Wal {
write_offset: 0,
written: 0,
tx,
executer_close_rx,
};

if file_write_fd.len()? == 0 {
Expand Down Expand Up @@ -109,6 +112,7 @@ impl Wal {
shadow_write_fd,
rx,
read_offset: this.read_offset.clone(),
close_tx: executer_close_tx,
}
.start();

Expand Down Expand Up @@ -191,6 +195,11 @@ impl Wal {

Ok(())
}

pub(crate) async fn close(self) {
_ = self.tx.send(ExecutorRequest::Close);
_ = self.executer_close_rx.await;
}
}

/// Use MSB as a flag to indicate the reset version
Expand Down Expand Up @@ -237,6 +246,7 @@ struct Executor {
shadow_write_fd: WriteFd,
rx: mpsc::UnboundedReceiver<ExecutorRequest>,
read_offset: ReadOffset,
close_tx: oneshot::Sender<()>,
}

impl Executor {
Expand All @@ -250,6 +260,10 @@ impl Executor {
ExecutorRequest::Reset => {
self.read_offset.reset();
}
ExecutorRequest::Close => {
let _ = self.close_tx.send(());
return;
}
}
}
});
Expand Down Expand Up @@ -373,9 +387,12 @@ fn execute_one(
#[derive(Debug)]
enum ExecutorRequest {
/// Push new wal record
Push { written: usize },
Push {
written: usize,
},
/// Reset wal file
Reset,
Close,
}

#[derive(Debug)]
Expand Down

0 comments on commit ea80807

Please sign in to comment.