Skip to content

Commit

Permalink
feat : added btreemap for backend implementation (akiradeveloper#431)
Browse files Browse the repository at this point in the history
  • Loading branch information
Imesh7 committed Dec 17, 2024
1 parent c6fc712 commit 87e2c4f
Show file tree
Hide file tree
Showing 4 changed files with 354 additions and 0 deletions.
32 changes: 32 additions & 0 deletions sorock/src/backend/btree/ballot.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use anyhow::Ok;

use crate::process::{Ballot, RaftBallotStore};

use super::*;

pub struct BallotStore {
ballot: Mutex<Ballot>,
}

impl BallotStore {
pub fn new() -> BallotStore {
Self {
ballot: Mutex::new(Ballot::new()),
}
}
}

#[async_trait]
impl RaftBallotStore for BallotStore {
async fn save_ballot(&self, _v: Ballot) -> Result<()> {
// let mut balot = self.ballot;
let mut ballot = self.ballot.lock();
*ballot = _v;
Ok(())
}

async fn load_ballot(&self) -> Result<Ballot> {
let ballot = self.ballot.lock();
Ok(ballot.clone())
}
}
170 changes: 170 additions & 0 deletions sorock/src/backend/btree/log.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
use std::time::Duration;

use super::*;

mod value {
use super::*;

#[derive(serde::Deserialize, serde::Serialize)]
struct OnDiskStruct {
prev_term: u64,
cur_index: u64,
cur_term: u64,
command: bytes::Bytes,
}

pub fn ser(x: Entry) -> Vec<u8> {
let x = OnDiskStruct {
prev_term: x.prev_clock.term,
cur_index: x.this_clock.index,
cur_term: x.this_clock.term,
command: x.command,
};
let bin = bincode::serialize(&x).unwrap();
bin
}

pub fn desr(bin: &[u8]) -> Entry {
let x: OnDiskStruct = bincode::deserialize(bin).unwrap();
Entry {
prev_clock: Clock {
index: x.cur_index - 1,
term: x.prev_term,
},
this_clock: Clock {
index: x.cur_index,
term: x.prev_term,
},
command: x.command,
}
}
}

struct LazyInsert {
index: Index,
inner: Entry,
space: String,
notifier: oneshot::Sender<()>,
}

#[derive(Clone)]
pub struct Sender {
tx: crossbeam::channel::Sender<LazyInsert>,
}

#[derive(Clone)]
pub struct Reaper {
log_store: Arc<Mutex<BTreeMap<String, LogStore>>>,
rx: crossbeam::channel::Receiver<LazyInsert>,
}

impl Reaper {
pub fn new(log_store: Arc<Mutex<BTreeMap<String, LogStore>>>) -> (Self, Sender) {
let (tx, rx) = crossbeam::channel::unbounded();

let tx = Sender { tx };
let this = Self { log_store, rx };
(this, tx)
}

pub fn reap(&self) -> Result<()> {
let mut elems = vec![];

// Blocked until the first element is received.
let head = self.rx.recv_timeout(Duration::from_millis(100))?;
elems.push(head);

let n = self.rx.len();
for _ in 0..n {
let e = self.rx.try_recv().unwrap();
elems.push(e);
}

let mut notifiers = vec![];

for e in elems {
self.log_store
.lock()
.get(&e.space)
.unwrap()
.b_tree_map
.lock()
.insert(e.index, value::ser(e.inner));

notifiers.push(e.notifier);
}

for notifier in notifiers {
notifier.send(()).ok();
}
Ok(())
}
}

#[derive(Clone)]
pub struct LogStore {
b_tree_map: Arc<Mutex<BTreeMap<u64, Vec<u8>>>>,
reaper_queue: crossbeam::channel::Sender<LazyInsert>,
space: String,
}

impl LogStore {
pub fn new(shard_id: u32, q: Sender) -> Result<Self> {
let space = format!("log-{shard_id}");

Ok(Self {
b_tree_map: Arc::new(Mutex::new(BTreeMap::new())),
reaper_queue: q.tx,
space: space,
})
}
}

#[async_trait]
impl RaftLogStore for LogStore {
async fn insert_entry(&self, i: Index, e: Entry) -> Result<()> {
let (tx, rx) = oneshot::channel();

let lazy_insert = LazyInsert {
index: i,
inner: e,
space: self.space.clone(),
notifier: tx,
};

self.reaper_queue
.send(lazy_insert)
.map_err(|e| anyhow::anyhow!("failed to queue an entry {:?}", e.to_string()))?;

rx.await?;
Ok(())
}

async fn delete_entries_before(&self, i: Index) -> Result<()> {
let map = self.b_tree_map.clone();
map.lock().retain(|k, _| k >= &i);
Ok(())
}

async fn get_entry(&self, i: Index) -> Result<Option<Entry>> {
match self.b_tree_map.lock().get(&i) {
Some(entry) => Ok(Some(value::desr(entry))),
None => Ok(None),
}
}

async fn get_head_index(&self) -> Result<Index> {
let map = self.b_tree_map.lock();
let index = map.first_key_value().unwrap_or_else(|| {
panic!("Ref map is null");
});
Ok(index.0.clone())
}

async fn get_last_index(&self) -> Result<Index> {
let map = self.b_tree_map.lock();

let last_entry = map.last_key_value().unwrap();
Ok(last_entry.0.clone())
}
}
151 changes: 151 additions & 0 deletions sorock/src/backend/btree/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
use crate as sorock;

mod ballot;
mod log;

use std::{collections::BTreeMap, sync::Arc};

use anyhow::Result;
use async_trait::async_trait;
use ballot::BallotStore;
use crossbeam::channel::{Receiver, Sender, TryRecvError};
use log::LogStore;
use sorock::process::*;
use spin::Mutex;

#[derive(Clone)]
pub struct Backend {
log_store_map: Arc<Mutex<BTreeMap<String, LogStore>>>,
_ballot_store_map: Arc<BTreeMap<String, BallotStore>>,
tx: log::Sender,
_kill_tx: crossbeam::channel::Sender<()>,
}

impl Backend {
pub fn new() -> Self {
let log_store_map = Arc::new(Mutex::new(BTreeMap::new()));
let _ballot_store_map = Arc::new(BTreeMap::new());
let (reaper, tx) = log::Reaper::new(log_store_map.clone());
let (_kill_tx, kill_rx): (Sender<()>, Receiver<()>) = crossbeam::channel::bounded(0);

std::thread::spawn(move || loop {
if let Err(TryRecvError::Disconnected) = kill_rx.try_recv() {
break;
}
reaper.reap().ok();
});

Self {
log_store_map,
_ballot_store_map,
tx,
_kill_tx,
}
}

pub fn get(&self, index: u32) -> Result<(impl RaftLogStore, impl RaftBallotStore)> {
let log_space = format!("log-{index}");

let mut log = self.log_store_map.lock();
let log_store = log
.entry(log_space.clone())
.or_insert_with(|| LogStore::new(index, self.tx.clone()).unwrap())
.clone();

let ballot_store = ballot::BallotStore::new();

Ok((log_store, ballot_store))
}
}

#[cfg(test)]
mod tests {
use super::*;

#[tokio::test]
async fn basic_test() -> Result<()> {
let b_tree_map = Backend::new();

let entry1 = Entry {
prev_clock: Clock { index: 0, term: 0 },
this_clock: Clock { index: 1, term: 1 },
command: bytes::Bytes::from("hello"),
};
let entry2 = Entry {
prev_clock: Clock { index: 1, term: 1 },
this_clock: Clock { index: 2, term: 1 },
command: bytes::Bytes::from("world"),
};

let (log, _) = b_tree_map.get(0)?;

assert!(log.get_entry(1).await?.is_none());
assert!(log.get_entry(2).await?.is_none());

log.insert_entry(1, entry1).await?;
assert_eq!(log.get_head_index().await?, 1);
assert_eq!(log.get_last_index().await?, 1);
assert!(log.get_entry(1).await?.is_some());
assert!(log.get_entry(2).await?.is_none());

log.insert_entry(2, entry2).await?;
assert_eq!(log.get_head_index().await?, 1);
assert_eq!(log.get_last_index().await?, 2);
assert!(log.get_entry(1).await?.is_some());
assert!(log.get_entry(2).await?.is_some());

log.delete_entries_before(2).await?;
assert_eq!(log.get_head_index().await?, 2);
assert_eq!(log.get_last_index().await?, 2);
assert!(log.get_entry(1).await?.is_none());
assert!(log.get_entry(2).await?.is_some());

Ok(())
}

#[tokio::test(flavor = "multi_thread")]
async fn insert_test() -> Result<()> {
use rand::Rng;

let b_tree_map = Arc::new(Backend::new());

let mut futs = vec![];
for shard in 0..100 {
let b_tree_map = b_tree_map.clone();
let fut = async move {
let mut rng = rand::thread_rng();
let (log, _) = b_tree_map.get(shard).unwrap();
for i in 0..300 {
let prev = i;
let cur = i + 1;
let b: Vec<u8> = (0..100).map(|_| rng.gen()).collect();
let e = Entry {
prev_clock: Clock {
index: prev,
term: 1,
},
this_clock: Clock {
index: cur,
term: 1,
},
command: b.into(),
};
log.insert_entry(cur, e).await.unwrap();
}
};
futs.push(fut);
}

futures::future::join_all(futs).await;

for shard_id in 0..100 {
for i in 1..=100 {
let (log, _) = b_tree_map.get(shard_id).unwrap();
let e = log.get_entry(i).await.unwrap().unwrap();
assert_eq!(e.this_clock.index, i);
}
}

Ok(())
}
}
1 change: 1 addition & 0 deletions sorock/src/backend/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod redb;
pub mod btree;

0 comments on commit 87e2c4f

Please sign in to comment.