From 87e2c4fac56ec7f0baae80aa8fef0b8b76c92de9 Mon Sep 17 00:00:00 2001 From: 2019imesh <2019imesh@gmail.com> Date: Tue, 17 Dec 2024 19:31:02 +0530 Subject: [PATCH] feat : added btreemap for backend implementation (#431) --- sorock/src/backend/btree/ballot.rs | 32 ++++++ sorock/src/backend/btree/log.rs | 170 +++++++++++++++++++++++++++++ sorock/src/backend/btree/mod.rs | 151 +++++++++++++++++++++++++ sorock/src/backend/mod.rs | 1 + 4 files changed, 354 insertions(+) create mode 100644 sorock/src/backend/btree/ballot.rs create mode 100644 sorock/src/backend/btree/log.rs create mode 100644 sorock/src/backend/btree/mod.rs diff --git a/sorock/src/backend/btree/ballot.rs b/sorock/src/backend/btree/ballot.rs new file mode 100644 index 00000000..1aa7fd68 --- /dev/null +++ b/sorock/src/backend/btree/ballot.rs @@ -0,0 +1,32 @@ +use anyhow::Ok; + +use crate::process::{Ballot, RaftBallotStore}; + +use super::*; + +pub struct BallotStore { + ballot: Mutex, +} + +impl BallotStore { + pub fn new() -> BallotStore { + Self { + ballot: Mutex::new(Ballot::new()), + } + } +} + +#[async_trait] +impl RaftBallotStore for BallotStore { + async fn save_ballot(&self, _v: Ballot) -> Result<()> { + // let mut balot = self.ballot; + let mut ballot = self.ballot.lock(); + *ballot = _v; + Ok(()) + } + + async fn load_ballot(&self) -> Result { + let ballot = self.ballot.lock(); + Ok(ballot.clone()) + } +} diff --git a/sorock/src/backend/btree/log.rs b/sorock/src/backend/btree/log.rs new file mode 100644 index 00000000..aa40e4d7 --- /dev/null +++ b/sorock/src/backend/btree/log.rs @@ -0,0 +1,170 @@ +use std::time::Duration; + +use super::*; + +mod value { + use super::*; + + #[derive(serde::Deserialize, serde::Serialize)] + struct OnDiskStruct { + prev_term: u64, + cur_index: u64, + cur_term: u64, + command: bytes::Bytes, + } + + pub fn ser(x: Entry) -> Vec { + let x = OnDiskStruct { + prev_term: x.prev_clock.term, + cur_index: x.this_clock.index, + cur_term: x.this_clock.term, + command: x.command, + }; + let bin = bincode::serialize(&x).unwrap(); + bin + } + + pub fn desr(bin: &[u8]) -> Entry { + let x: OnDiskStruct = bincode::deserialize(bin).unwrap(); + Entry { + prev_clock: Clock { + index: x.cur_index - 1, + term: x.prev_term, + }, + this_clock: Clock { + index: x.cur_index, + term: x.prev_term, + }, + command: x.command, + } + } +} + +struct LazyInsert { + index: Index, + inner: Entry, + space: String, + notifier: oneshot::Sender<()>, +} + +#[derive(Clone)] +pub struct Sender { + tx: crossbeam::channel::Sender, +} + +#[derive(Clone)] +pub struct Reaper { + log_store: Arc>>, + rx: crossbeam::channel::Receiver, +} + +impl Reaper { + pub fn new(log_store: Arc>>) -> (Self, Sender) { + let (tx, rx) = crossbeam::channel::unbounded(); + + let tx = Sender { tx }; + let this = Self { log_store, rx }; + (this, tx) + } + + pub fn reap(&self) -> Result<()> { + let mut elems = vec![]; + + // Blocked until the first element is received. + let head = self.rx.recv_timeout(Duration::from_millis(100))?; + elems.push(head); + + let n = self.rx.len(); + for _ in 0..n { + let e = self.rx.try_recv().unwrap(); + elems.push(e); + } + + let mut notifiers = vec![]; + + for e in elems { + self.log_store + .lock() + .get(&e.space) + .unwrap() + .b_tree_map + .lock() + .insert(e.index, value::ser(e.inner)); + + notifiers.push(e.notifier); + } + + for notifier in notifiers { + notifier.send(()).ok(); + } + Ok(()) + } +} + +#[derive(Clone)] +pub struct LogStore { + b_tree_map: Arc>>>, + reaper_queue: crossbeam::channel::Sender, + space: String, +} + +impl LogStore { + pub fn new(shard_id: u32, q: Sender) -> Result { + let space = format!("log-{shard_id}"); + + Ok(Self { + b_tree_map: Arc::new(Mutex::new(BTreeMap::new())), + reaper_queue: q.tx, + space: space, + }) + } +} + +#[async_trait] +impl RaftLogStore for LogStore { + async fn insert_entry(&self, i: Index, e: Entry) -> Result<()> { + let (tx, rx) = oneshot::channel(); + + let lazy_insert = LazyInsert { + index: i, + inner: e, + space: self.space.clone(), + notifier: tx, + }; + + self.reaper_queue + .send(lazy_insert) + .map_err(|e| anyhow::anyhow!("failed to queue an entry {:?}", e.to_string()))?; + + rx.await?; + Ok(()) + } + + async fn delete_entries_before(&self, i: Index) -> Result<()> { + let map = self.b_tree_map.clone(); + map.lock().retain(|k, _| k >= &i); + Ok(()) + } + + async fn get_entry(&self, i: Index) -> Result> { + match self.b_tree_map.lock().get(&i) { + Some(entry) => Ok(Some(value::desr(entry))), + None => Ok(None), + } + } + + async fn get_head_index(&self) -> Result { + let map = self.b_tree_map.lock(); + let index = map.first_key_value().unwrap_or_else(|| { + panic!("Ref map is null"); + }); + Ok(index.0.clone()) + } + + async fn get_last_index(&self) -> Result { + let map = self.b_tree_map.lock(); + + let last_entry = map.last_key_value().unwrap(); + Ok(last_entry.0.clone()) + } +} diff --git a/sorock/src/backend/btree/mod.rs b/sorock/src/backend/btree/mod.rs new file mode 100644 index 00000000..83b67fe8 --- /dev/null +++ b/sorock/src/backend/btree/mod.rs @@ -0,0 +1,151 @@ +use crate as sorock; + +mod ballot; +mod log; + +use std::{collections::BTreeMap, sync::Arc}; + +use anyhow::Result; +use async_trait::async_trait; +use ballot::BallotStore; +use crossbeam::channel::{Receiver, Sender, TryRecvError}; +use log::LogStore; +use sorock::process::*; +use spin::Mutex; + +#[derive(Clone)] +pub struct Backend { + log_store_map: Arc>>, + _ballot_store_map: Arc>, + tx: log::Sender, + _kill_tx: crossbeam::channel::Sender<()>, +} + +impl Backend { + pub fn new() -> Self { + let log_store_map = Arc::new(Mutex::new(BTreeMap::new())); + let _ballot_store_map = Arc::new(BTreeMap::new()); + let (reaper, tx) = log::Reaper::new(log_store_map.clone()); + let (_kill_tx, kill_rx): (Sender<()>, Receiver<()>) = crossbeam::channel::bounded(0); + + std::thread::spawn(move || loop { + if let Err(TryRecvError::Disconnected) = kill_rx.try_recv() { + break; + } + reaper.reap().ok(); + }); + + Self { + log_store_map, + _ballot_store_map, + tx, + _kill_tx, + } + } + + pub fn get(&self, index: u32) -> Result<(impl RaftLogStore, impl RaftBallotStore)> { + let log_space = format!("log-{index}"); + + let mut log = self.log_store_map.lock(); + let log_store = log + .entry(log_space.clone()) + .or_insert_with(|| LogStore::new(index, self.tx.clone()).unwrap()) + .clone(); + + let ballot_store = ballot::BallotStore::new(); + + Ok((log_store, ballot_store)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn basic_test() -> Result<()> { + let b_tree_map = Backend::new(); + + let entry1 = Entry { + prev_clock: Clock { index: 0, term: 0 }, + this_clock: Clock { index: 1, term: 1 }, + command: bytes::Bytes::from("hello"), + }; + let entry2 = Entry { + prev_clock: Clock { index: 1, term: 1 }, + this_clock: Clock { index: 2, term: 1 }, + command: bytes::Bytes::from("world"), + }; + + let (log, _) = b_tree_map.get(0)?; + + assert!(log.get_entry(1).await?.is_none()); + assert!(log.get_entry(2).await?.is_none()); + + log.insert_entry(1, entry1).await?; + assert_eq!(log.get_head_index().await?, 1); + assert_eq!(log.get_last_index().await?, 1); + assert!(log.get_entry(1).await?.is_some()); + assert!(log.get_entry(2).await?.is_none()); + + log.insert_entry(2, entry2).await?; + assert_eq!(log.get_head_index().await?, 1); + assert_eq!(log.get_last_index().await?, 2); + assert!(log.get_entry(1).await?.is_some()); + assert!(log.get_entry(2).await?.is_some()); + + log.delete_entries_before(2).await?; + assert_eq!(log.get_head_index().await?, 2); + assert_eq!(log.get_last_index().await?, 2); + assert!(log.get_entry(1).await?.is_none()); + assert!(log.get_entry(2).await?.is_some()); + + Ok(()) + } + + #[tokio::test(flavor = "multi_thread")] + async fn insert_test() -> Result<()> { + use rand::Rng; + + let b_tree_map = Arc::new(Backend::new()); + + let mut futs = vec![]; + for shard in 0..100 { + let b_tree_map = b_tree_map.clone(); + let fut = async move { + let mut rng = rand::thread_rng(); + let (log, _) = b_tree_map.get(shard).unwrap(); + for i in 0..300 { + let prev = i; + let cur = i + 1; + let b: Vec = (0..100).map(|_| rng.gen()).collect(); + let e = Entry { + prev_clock: Clock { + index: prev, + term: 1, + }, + this_clock: Clock { + index: cur, + term: 1, + }, + command: b.into(), + }; + log.insert_entry(cur, e).await.unwrap(); + } + }; + futs.push(fut); + } + + futures::future::join_all(futs).await; + + for shard_id in 0..100 { + for i in 1..=100 { + let (log, _) = b_tree_map.get(shard_id).unwrap(); + let e = log.get_entry(i).await.unwrap().unwrap(); + assert_eq!(e.this_clock.index, i); + } + } + + Ok(()) + } +} diff --git a/sorock/src/backend/mod.rs b/sorock/src/backend/mod.rs index fe037a0a..64618c72 100644 --- a/sorock/src/backend/mod.rs +++ b/sorock/src/backend/mod.rs @@ -1 +1,2 @@ pub mod redb; +pub mod btree;