Skip to content

Commit

Permalink
Remove dependencies to spin crate
Browse files Browse the repository at this point in the history
  • Loading branch information
akiradeveloper committed Nov 1, 2024
1 parent 22f1fac commit 63371cc
Show file tree
Hide file tree
Showing 12 changed files with 63 additions and 47 deletions.
1 change: 0 additions & 1 deletion sorock/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ redb.workspace = true
serde = { version = "1.0", features = ["derive"] }
serde_bytes = "0.11"
shrinkwraprs = "0.3"
spin = "0.9"
thiserror = "1"
tokio = { version = "1", features = ["rt"] }
tokio-util = "0.7"
Expand Down
8 changes: 4 additions & 4 deletions sorock/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::collections::HashMap;
pub struct Inner {
self_node_id: NodeId,
cache: moka::sync::Cache<NodeId, RaftConnection>,
process: spin::RwLock<HashMap<ShardId, Arc<RaftProcess>>>,
process: std::sync::RwLock<HashMap<ShardId, Arc<RaftProcess>>>,
}

/// `RaftNode` contains a set of `RaftProcess`es.
Expand Down Expand Up @@ -37,16 +37,16 @@ impl RaftNode {

/// Attach a Raft process to a shard.
pub fn attach_process(&self, shard_id: ShardId, p: RaftProcess) {
self.process.write().insert(shard_id, Arc::new(p));
self.process.write().unwrap().insert(shard_id, Arc::new(p));
}

/// Detach a Raft process from a shard.
pub fn detach_process(&self, shard_id: ShardId) {
self.process.write().remove(&shard_id);
self.process.write().unwrap().remove(&shard_id);
}

pub(crate) fn get_process(&self, shard_id: ShardId) -> Option<Arc<RaftProcess>> {
self.process.read().get(&shard_id).cloned()
self.process.read().unwrap().get(&shard_id).cloned()
}
}

Expand Down
20 changes: 15 additions & 5 deletions sorock/src/process/command_log/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,16 @@ impl CommandLog {
pub fn register_completion(&self, index: Index, completion: Completion) {
match completion {
Completion::User(completion) => {
self.user_completions.lock().insert(index, completion);
self.user_completions
.lock()
.unwrap()
.insert(index, completion);
}
Completion::Kern(completion) => {
self.kern_completions.lock().insert(index, completion);
self.kern_completions
.lock()
.unwrap()
.insert(index, completion);
}
}
}
Expand Down Expand Up @@ -63,7 +69,6 @@ impl CommandLog {
};

if do_process {
let mut response_cache = self.response_cache.lock();
debug!("process user@{process_index}");
match command {
Command::Snapshot { .. } => {
Expand All @@ -73,13 +78,15 @@ impl CommandLog {
message,
request_id,
} => {
let mut response_cache = self.response_cache.lock().await;
if response_cache.should_execute(&request_id) {
let resp = app.process_write(message, process_index).await?;
response_cache.insert_response(request_id.clone(), resp);
}

// Leader may have the completion for the request.
let user_completion = self.user_completions.lock().remove(&process_index);
let user_completion =
self.user_completions.lock().unwrap().remove(&process_index);
if let Some(user_completion) = user_completion {
if let Some(resp) = response_cache.get_response(&request_id) {
// If client abort the request before retry,
Expand All @@ -100,6 +107,7 @@ impl CommandLog {
}
}
Command::CompleteRequest { request_id } => {
let mut response_cache = self.response_cache.lock().await;
response_cache.complete_response(&request_id);
}
_ => {}
Expand Down Expand Up @@ -135,7 +143,9 @@ impl CommandLog {
Command::ClusterConfiguration { .. } => {}
_ => {}
}
if let Some(kern_completion) = self.kern_completions.lock().remove(&process_index) {
if let Some(kern_completion) =
self.kern_completions.lock().unwrap().remove(&process_index)
{
kern_completion.complete();
}
}
Expand Down
12 changes: 6 additions & 6 deletions sorock/src/process/command_log/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ pub struct Inner {
pub membership_pointer: AtomicU64,

app: App,
response_cache: spin::Mutex<ResponseCache>,
user_completions: spin::Mutex<BTreeMap<Index, completion::UserCompletion>>,
kern_completions: spin::Mutex<BTreeMap<Index, completion::KernCompletion>>,
response_cache: tokio::sync::Mutex<ResponseCache>,
user_completions: std::sync::Mutex<BTreeMap<Index, completion::UserCompletion>>,
kern_completions: std::sync::Mutex<BTreeMap<Index, completion::KernCompletion>>,
}

#[derive(shrinkwraprs::Shrinkwrap, Clone)]
Expand All @@ -48,9 +48,9 @@ impl CommandLog {
storage: Box::new(storage),
app,
snapshot_lock: tokio::sync::RwLock::new(()),
user_completions: spin::Mutex::new(BTreeMap::new()),
kern_completions: spin::Mutex::new(BTreeMap::new()),
response_cache: spin::Mutex::new(ResponseCache::new()),
user_completions: std::sync::Mutex::new(BTreeMap::new()),
kern_completions: std::sync::Mutex::new(BTreeMap::new()),
response_cache: tokio::sync::Mutex::new(ResponseCache::new()),
};
Self(inner.into())
}
Expand Down
4 changes: 2 additions & 2 deletions sorock/src/process/command_log/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@ impl CommandLog {
self.insert_entry(entry).await?;

// discard [this_index, )
self.user_completions.lock().split_off(&this_index);
self.kern_completions.lock().split_off(&this_index);
self.user_completions.lock().unwrap().split_off(&this_index);
self.kern_completions.lock().unwrap().split_off(&this_index);

Ok(TryInsertResult::Inserted)
}
Expand Down
26 changes: 13 additions & 13 deletions sorock/src/process/peer_svc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ struct ThreadHandles {
}

pub struct Inner {
membership: spin::RwLock<HashSet<NodeId>>,
peer_contexts: spin::RwLock<HashMap<NodeId, PeerContexts>>,
peer_threads: spin::Mutex<HashMap<NodeId, ThreadHandles>>,
membership: std::sync::RwLock<HashSet<NodeId>>,
peer_contexts: std::sync::RwLock<HashMap<NodeId, PeerContexts>>,
peer_threads: std::sync::Mutex<HashMap<NodeId, ThreadHandles>>,

command_log: Ref<CommandLog>,
driver: RaftDriver,
Expand Down Expand Up @@ -95,7 +95,7 @@ impl PeerSvc {
return Ok(());
}

if self.peer_contexts.read().contains_key(&id) {
if self.peer_contexts.read().unwrap().contains_key(&id) {
return Ok(());
}

Expand All @@ -104,7 +104,7 @@ impl PeerSvc {
ReplicationProgress::new(last_log_index)
};

let mut peer_contexts = self.peer_contexts.write();
let mut peer_contexts = self.peer_contexts.write().unwrap();
peer_contexts.insert(
id.clone(),
PeerContexts {
Expand All @@ -122,14 +122,14 @@ impl PeerSvc {
),
heartbeater_handle: thread::heartbeat::new(id.clone(), voter),
};
self.peer_threads.lock().insert(id, thread_handles);
self.peer_threads.lock().unwrap().insert(id, thread_handles);

Ok(())
}

fn remove_peer(&self, id: NodeId) {
self.peer_threads.lock().remove(&id);
self.peer_contexts.write().remove(&id);
self.peer_threads.lock().unwrap().remove(&id);
self.peer_contexts.write().unwrap().remove(&id);
}

pub async fn set_membership(
Expand Down Expand Up @@ -171,7 +171,7 @@ impl PeerSvc {
}

info!("membership changed -> {:?}", config);
*self.membership.write() = config;
*self.membership.write().unwrap() = config;

self.command_log
.membership_pointer
Expand All @@ -181,7 +181,7 @@ impl PeerSvc {
}

pub fn read_membership(&self) -> HashSet<NodeId> {
self.membership.read().clone()
self.membership.read().unwrap().clone()
}

pub async fn find_new_commit_index(&self) -> Result<Index> {
Expand All @@ -190,7 +190,7 @@ impl PeerSvc {
let last_log_index = self.command_log.get_log_last_index().await?;
match_indices.push(last_log_index);

let peer_contexts = self.peer_contexts.read();
let peer_contexts = self.peer_contexts.read().unwrap();
for (_, peer) in peer_contexts.iter() {
match_indices.push(peer.progress.match_index);
}
Expand All @@ -204,7 +204,7 @@ impl PeerSvc {
}

pub fn reset_progress(&self, init_next_index: Index) {
let mut peer_contexts = self.peer_contexts.write();
let mut peer_contexts = self.peer_contexts.write().unwrap();
for (_, peer) in peer_contexts.iter_mut() {
peer.progress = ReplicationProgress::new(init_next_index);
}
Expand All @@ -213,7 +213,7 @@ impl PeerSvc {
/// Choose the most advanced follower and send it TimeoutNow.
pub async fn transfer_leadership(&self) -> Result<()> {
let mut xs = {
let peer_contexts = self.peer_contexts.read();
let peer_contexts = self.peer_contexts.read().unwrap();
let mut out = vec![];
for (id, peer) in peer_contexts.iter() {
let progress = peer.progress;
Expand Down
3 changes: 3 additions & 0 deletions sorock/src/process/peer_svc/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ impl PeerSvc {
let peer_context = self
.peer_contexts
.read()
.unwrap()
.get(&follower_id)
.context(Error::PeerNotFound(follower_id.clone()))?
.clone();
Expand All @@ -56,6 +57,7 @@ impl PeerSvc {
let new_progress = ReplicationProgress::new(cur_snapshot_index);
self.peer_contexts
.write()
.unwrap()
.get_mut(&follower_id)
.context(Error::PeerNotFound(follower_id.clone()))?
.progress = new_progress;
Expand Down Expand Up @@ -100,6 +102,7 @@ impl PeerSvc {

self.peer_contexts
.write()
.unwrap()
.get_mut(&follower_id)
.context(Error::PeerNotFound(follower_id.clone()))?
.progress = new_progress;
Expand Down
10 changes: 5 additions & 5 deletions sorock/src/process/query_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,26 +40,26 @@ pub struct Query {

#[derive(Clone)]
pub struct Producer {
inner: Arc<spin::Mutex<Queue<Query>>>,
inner: Arc<std::sync::Mutex<Queue<Query>>>,
}
impl Producer {
/// Register a query for execution when the readable index reaches `read_index`.
/// `read_index` should be the index of the commit pointer at the time of query.
pub fn register(&self, read_index: Index, q: Query) -> Result<()> {
self.inner.lock().push(read_index, q);
self.inner.lock().unwrap().push(read_index, q);
Ok(())
}
}

#[derive(Clone)]
pub struct Processor {
app: Ref<App>,
inner: Arc<spin::Mutex<Queue<Query>>>,
inner: Arc<std::sync::Mutex<Queue<Query>>>,
}
impl Processor {
/// Process the waiting queries up to `readable_index`.
pub async fn process(&self, readable_index: Index) -> usize {
let qs = self.inner.lock().pop(readable_index);
let qs = self.inner.lock().unwrap().pop(readable_index);

let mut futs = vec![];
for (_, q) in qs {
Expand All @@ -81,7 +81,7 @@ impl Processor {
}

pub fn new(app: Ref<App>) -> (Producer, Processor) {
let q = Arc::new(spin::Mutex::new(Queue::new()));
let q = Arc::new(std::sync::Mutex::new(Queue::new()));
let processor = Processor {
inner: q.clone(),
app,
Expand Down
6 changes: 3 additions & 3 deletions sorock/src/process/voter/failure_detector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ impl Inner {
}
}
pub struct FailureDetector {
inner: spin::RwLock<Inner>,
inner: std::sync::RwLock<Inner>,
}
impl FailureDetector {
pub fn new() -> Self {
Expand All @@ -30,7 +30,7 @@ impl FailureDetector {
}

pub fn receive_heartbeat(&self, leader_id: NodeId) {
let mut inner = self.inner.write();
let mut inner = self.inner.write().unwrap();
let cur_watch_id = inner.watch_id.clone();
if cur_watch_id != leader_id {
*inner = Inner::watch(leader_id);
Expand All @@ -45,7 +45,7 @@ impl FailureDetector {
/// Get a random wait time before becoming a candidate.
/// Returns None if the current leader is still considered alive.
pub fn get_election_timeout(&self) -> Option<Duration> {
let inner = self.inner.read();
let inner = self.inner.read().unwrap();

let fd = &inner.detector;
let normal_dist = fd.normal_dist();
Expand Down
8 changes: 4 additions & 4 deletions sorock/src/process/voter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub enum ElectionState {
}

pub struct Inner {
state: spin::Mutex<ElectionState>,
state: std::sync::Mutex<ElectionState>,
ballot: Box<dyn RaftBallotStore>,

/// Serializing any events that may change ballot state simplifies the voter's logic.
Expand All @@ -39,7 +39,7 @@ impl Voter {
driver: RaftDriver,
) -> Self {
let inner = Inner {
state: spin::Mutex::new(ElectionState::Follower),
state: std::sync::Mutex::new(ElectionState::Follower),
ballot: Box::new(ballot_store),
vote_lock: tokio::sync::Mutex::new(()),
safe_term: AtomicU64::new(0),
Expand All @@ -54,12 +54,12 @@ impl Voter {

impl Voter {
pub fn read_election_state(&self) -> ElectionState {
*self.state.lock()
*self.state.lock().unwrap()
}

pub fn write_election_state(&self, e: ElectionState) {
info!("election state -> {e:?}");
*self.state.lock() = e;
*self.state.lock().unwrap() = e;
}

pub async fn read_ballot(&self) -> Result<Ballot> {
Expand Down
4 changes: 2 additions & 2 deletions sorock/src/service/raft/communicator/heartbeat_multiplex.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::*;

use spin::Mutex;
use std::collections::HashMap;
use std::sync::Mutex;

pub struct HeartbeatBuffer {
buf: HashMap<ShardId, request::Heartbeat>,
Expand Down Expand Up @@ -31,7 +31,7 @@ pub async fn run(
tokio::time::sleep(Duration::from_millis(300)).await;

let heartbeats = {
let mut buf = buf.lock();
let mut buf = buf.lock().unwrap();
let out = buf.drain();
out
};
Expand Down
8 changes: 6 additions & 2 deletions sorock/src/service/raft/communicator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ mod stream;

use heartbeat_multiplex::*;
use process::*;
use spin::Mutex;
use std::sync::Arc;
use std::sync::Mutex;
use tokio::task::AbortHandle;

pub struct HandleDrop(AbortHandle);
Expand Down Expand Up @@ -77,7 +77,11 @@ impl Communicator {
}

pub fn queue_heartbeat(&self, req: request::Heartbeat) {
self.conn.heartbeat_buffer.lock().push(self.shard_id, req);
self.conn
.heartbeat_buffer
.lock()
.unwrap()
.push(self.shard_id, req);
}

pub async fn process_user_write_request(
Expand Down

0 comments on commit 63371cc

Please sign in to comment.