Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat : added btreemap for backend implementation (#431) #458

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions sorock/src/backend/btree/ballot.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use anyhow::Ok;

use crate::process::{Ballot, RaftBallotStore};

use super::*;

pub struct BallotStore {
ballot: Mutex<Ballot>,
}

impl BallotStore {
pub fn new() -> BallotStore {
Self {
ballot: Mutex::new(Ballot::new()),
}
}
}

#[async_trait]
impl RaftBallotStore for BallotStore {
async fn save_ballot(&self, _v: Ballot) -> Result<()> {
// let mut balot = self.ballot;
let mut ballot = self.ballot.lock();
*ballot = _v;
Ok(())
}

async fn load_ballot(&self) -> Result<Ballot> {
let ballot = self.ballot.lock();
Ok(ballot.clone())
}
}
170 changes: 170 additions & 0 deletions sorock/src/backend/btree/log.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
use std::time::Duration;

use super::*;

mod value {
use super::*;

#[derive(serde::Deserialize, serde::Serialize)]
struct OnDiskStruct {
prev_term: u64,
cur_index: u64,
cur_term: u64,
command: bytes::Bytes,
}

pub fn ser(x: Entry) -> Vec<u8> {
let x = OnDiskStruct {
prev_term: x.prev_clock.term,
cur_index: x.this_clock.index,
cur_term: x.this_clock.term,
command: x.command,
};
let bin = bincode::serialize(&x).unwrap();
bin
}

pub fn desr(bin: &[u8]) -> Entry {
let x: OnDiskStruct = bincode::deserialize(bin).unwrap();
Entry {
prev_clock: Clock {
index: x.cur_index - 1,
term: x.prev_term,
},
this_clock: Clock {
index: x.cur_index,
term: x.prev_term,
},
command: x.command,
}
}
}

struct LazyInsert {
index: Index,
inner: Entry,
space: String,
notifier: oneshot::Sender<()>,
}

#[derive(Clone)]
pub struct Sender {
tx: crossbeam::channel::Sender<LazyInsert>,
}

#[derive(Clone)]
pub struct Reaper {
log_store: Arc<Mutex<BTreeMap<String, LogStore>>>,
rx: crossbeam::channel::Receiver<LazyInsert>,
}

impl Reaper {
pub fn new(log_store: Arc<Mutex<BTreeMap<String, LogStore>>>) -> (Self, Sender) {
let (tx, rx) = crossbeam::channel::unbounded();

let tx = Sender { tx };
let this = Self { log_store, rx };
(this, tx)
}

pub fn reap(&self) -> Result<()> {
let mut elems = vec![];

// Blocked until the first element is received.
let head = self.rx.recv_timeout(Duration::from_millis(100))?;
elems.push(head);

let n = self.rx.len();
for _ in 0..n {
let e = self.rx.try_recv().unwrap();
elems.push(e);
}

let mut notifiers = vec![];

for e in elems {
self.log_store
.lock()
.get(&e.space)
.unwrap()
.b_tree_map
.lock()
.insert(e.index, value::ser(e.inner));

notifiers.push(e.notifier);
}

for notifier in notifiers {
notifier.send(()).ok();
}
Ok(())
}
}

#[derive(Clone)]
pub struct LogStore {
b_tree_map: Arc<Mutex<BTreeMap<u64, Vec<u8>>>>,
reaper_queue: crossbeam::channel::Sender<LazyInsert>,
space: String,
}

impl LogStore {
pub fn new(shard_id: u32, q: Sender) -> Result<Self> {
let space = format!("log-{shard_id}");

Ok(Self {
b_tree_map: Arc::new(Mutex::new(BTreeMap::new())),
reaper_queue: q.tx,
space: space,
})
}
}

#[async_trait]
impl RaftLogStore for LogStore {
async fn insert_entry(&self, i: Index, e: Entry) -> Result<()> {
let (tx, rx) = oneshot::channel();

let lazy_insert = LazyInsert {
index: i,
inner: e,
space: self.space.clone(),
notifier: tx,
};

self.reaper_queue
.send(lazy_insert)
.map_err(|e| anyhow::anyhow!("failed to queue an entry {:?}", e.to_string()))?;

rx.await?;
Ok(())
}

async fn delete_entries_before(&self, i: Index) -> Result<()> {
let map = self.b_tree_map.clone();
map.lock().retain(|k, _| k >= &i);
Ok(())
}

async fn get_entry(&self, i: Index) -> Result<Option<Entry>> {
match self.b_tree_map.lock().get(&i) {
Some(entry) => Ok(Some(value::desr(entry))),
None => Ok(None),
}
}

async fn get_head_index(&self) -> Result<Index> {
let map = self.b_tree_map.lock();
let index = map.first_key_value().unwrap_or_else(|| {
panic!("Ref map is null");
});
Ok(index.0.clone())
}

async fn get_last_index(&self) -> Result<Index> {
let map = self.b_tree_map.lock();

let last_entry = map.last_key_value().unwrap();
Ok(last_entry.0.clone())
}
}
151 changes: 151 additions & 0 deletions sorock/src/backend/btree/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
use crate as sorock;

mod ballot;
mod log;

use std::{collections::BTreeMap, sync::Arc};

use anyhow::Result;
use async_trait::async_trait;
use ballot::BallotStore;
use crossbeam::channel::{Receiver, Sender, TryRecvError};
use log::LogStore;
use sorock::process::*;
use spin::Mutex;

#[derive(Clone)]
pub struct Backend {
log_store_map: Arc<Mutex<BTreeMap<String, LogStore>>>,
_ballot_store_map: Arc<BTreeMap<String, BallotStore>>,
tx: log::Sender,
_kill_tx: crossbeam::channel::Sender<()>,
}

impl Backend {
pub fn new() -> Self {
let log_store_map = Arc::new(Mutex::new(BTreeMap::new()));
let _ballot_store_map = Arc::new(BTreeMap::new());
let (reaper, tx) = log::Reaper::new(log_store_map.clone());
let (_kill_tx, kill_rx): (Sender<()>, Receiver<()>) = crossbeam::channel::bounded(0);

std::thread::spawn(move || loop {
if let Err(TryRecvError::Disconnected) = kill_rx.try_recv() {
break;
}
reaper.reap().ok();
});

Self {
log_store_map,
_ballot_store_map,
tx,
_kill_tx,
}
}

pub fn get(&self, index: u32) -> Result<(impl RaftLogStore, impl RaftBallotStore)> {
let log_space = format!("log-{index}");

let mut log = self.log_store_map.lock();
let log_store = log
.entry(log_space.clone())
.or_insert_with(|| LogStore::new(index, self.tx.clone()).unwrap())
.clone();

let ballot_store = ballot::BallotStore::new();

Ok((log_store, ballot_store))
}
}

#[cfg(test)]
mod tests {
use super::*;

#[tokio::test]
async fn basic_test() -> Result<()> {
let b_tree_map = Backend::new();

let entry1 = Entry {
prev_clock: Clock { index: 0, term: 0 },
this_clock: Clock { index: 1, term: 1 },
command: bytes::Bytes::from("hello"),
};
let entry2 = Entry {
prev_clock: Clock { index: 1, term: 1 },
this_clock: Clock { index: 2, term: 1 },
command: bytes::Bytes::from("world"),
};

let (log, _) = b_tree_map.get(0)?;

assert!(log.get_entry(1).await?.is_none());
assert!(log.get_entry(2).await?.is_none());

log.insert_entry(1, entry1).await?;
assert_eq!(log.get_head_index().await?, 1);
assert_eq!(log.get_last_index().await?, 1);
assert!(log.get_entry(1).await?.is_some());
assert!(log.get_entry(2).await?.is_none());

log.insert_entry(2, entry2).await?;
assert_eq!(log.get_head_index().await?, 1);
assert_eq!(log.get_last_index().await?, 2);
assert!(log.get_entry(1).await?.is_some());
assert!(log.get_entry(2).await?.is_some());

log.delete_entries_before(2).await?;
assert_eq!(log.get_head_index().await?, 2);
assert_eq!(log.get_last_index().await?, 2);
assert!(log.get_entry(1).await?.is_none());
assert!(log.get_entry(2).await?.is_some());

Ok(())
}

#[tokio::test(flavor = "multi_thread")]
async fn insert_test() -> Result<()> {
use rand::Rng;

let b_tree_map = Arc::new(Backend::new());

let mut futs = vec![];
for shard in 0..100 {
let b_tree_map = b_tree_map.clone();
let fut = async move {
let mut rng = rand::thread_rng();
let (log, _) = b_tree_map.get(shard).unwrap();
for i in 0..300 {
let prev = i;
let cur = i + 1;
let b: Vec<u8> = (0..100).map(|_| rng.gen()).collect();
let e = Entry {
prev_clock: Clock {
index: prev,
term: 1,
},
this_clock: Clock {
index: cur,
term: 1,
},
command: b.into(),
};
log.insert_entry(cur, e).await.unwrap();
}
};
futs.push(fut);
}

futures::future::join_all(futs).await;

for shard_id in 0..100 {
for i in 1..=100 {
let (log, _) = b_tree_map.get(shard_id).unwrap();
let e = log.get_entry(i).await.unwrap().unwrap();
assert_eq!(e.this_clock.index, i);
}
}

Ok(())
}
}
1 change: 1 addition & 0 deletions sorock/src/backend/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod redb;
pub mod btree;