diff --git a/sorock/Cargo.toml b/sorock/Cargo.toml index 4357f9e1..d03ac56b 100644 --- a/sorock/Cargo.toml +++ b/sorock/Cargo.toml @@ -29,7 +29,6 @@ redb.workspace = true serde = { version = "1.0", features = ["derive"] } serde_bytes = "0.11" shrinkwraprs = "0.3" -spin = "0.9" thiserror = "1" tokio = { version = "1", features = ["rt"] } tokio-util = "0.7" diff --git a/sorock/src/node.rs b/sorock/src/node.rs index 0909c814..ddc0cd5c 100644 --- a/sorock/src/node.rs +++ b/sorock/src/node.rs @@ -6,7 +6,7 @@ use std::collections::HashMap; pub struct Inner { self_node_id: NodeId, cache: moka::sync::Cache, - process: spin::RwLock>>, + process: std::sync::RwLock>>, } /// `RaftNode` contains a set of `RaftProcess`es. @@ -37,16 +37,16 @@ impl RaftNode { /// Attach a Raft process to a shard. pub fn attach_process(&self, shard_id: ShardId, p: RaftProcess) { - self.process.write().insert(shard_id, Arc::new(p)); + self.process.write().unwrap().insert(shard_id, Arc::new(p)); } /// Detach a Raft process from a shard. pub fn detach_process(&self, shard_id: ShardId) { - self.process.write().remove(&shard_id); + self.process.write().unwrap().remove(&shard_id); } pub(crate) fn get_process(&self, shard_id: ShardId) -> Option> { - self.process.read().get(&shard_id).cloned() + self.process.read().unwrap().get(&shard_id).cloned() } } diff --git a/sorock/src/process/command_log/consumer.rs b/sorock/src/process/command_log/consumer.rs index 4ed8600c..af62c383 100644 --- a/sorock/src/process/command_log/consumer.rs +++ b/sorock/src/process/command_log/consumer.rs @@ -4,10 +4,16 @@ impl CommandLog { pub fn register_completion(&self, index: Index, completion: Completion) { match completion { Completion::User(completion) => { - self.user_completions.lock().insert(index, completion); + self.user_completions + .lock() + .unwrap() + .insert(index, completion); } Completion::Kern(completion) => { - self.kern_completions.lock().insert(index, completion); + self.kern_completions + .lock() + .unwrap() + .insert(index, completion); } } } @@ -63,7 +69,6 @@ impl CommandLog { }; if do_process { - let mut response_cache = self.response_cache.lock(); debug!("process user@{process_index}"); match command { Command::Snapshot { .. } => { @@ -73,13 +78,15 @@ impl CommandLog { message, request_id, } => { + let mut response_cache = self.response_cache.lock().await; if response_cache.should_execute(&request_id) { let resp = app.process_write(message, process_index).await?; response_cache.insert_response(request_id.clone(), resp); } // Leader may have the completion for the request. - let user_completion = self.user_completions.lock().remove(&process_index); + let user_completion = + self.user_completions.lock().unwrap().remove(&process_index); if let Some(user_completion) = user_completion { if let Some(resp) = response_cache.get_response(&request_id) { // If client abort the request before retry, @@ -100,6 +107,7 @@ impl CommandLog { } } Command::CompleteRequest { request_id } => { + let mut response_cache = self.response_cache.lock().await; response_cache.complete_response(&request_id); } _ => {} @@ -135,7 +143,9 @@ impl CommandLog { Command::ClusterConfiguration { .. } => {} _ => {} } - if let Some(kern_completion) = self.kern_completions.lock().remove(&process_index) { + if let Some(kern_completion) = + self.kern_completions.lock().unwrap().remove(&process_index) + { kern_completion.complete(); } } diff --git a/sorock/src/process/command_log/mod.rs b/sorock/src/process/command_log/mod.rs index fed5c246..3204e9c0 100644 --- a/sorock/src/process/command_log/mod.rs +++ b/sorock/src/process/command_log/mod.rs @@ -29,9 +29,9 @@ pub struct Inner { pub membership_pointer: AtomicU64, app: App, - response_cache: spin::Mutex, - user_completions: spin::Mutex>, - kern_completions: spin::Mutex>, + response_cache: tokio::sync::Mutex, + user_completions: std::sync::Mutex>, + kern_completions: std::sync::Mutex>, } #[derive(shrinkwraprs::Shrinkwrap, Clone)] @@ -48,9 +48,9 @@ impl CommandLog { storage: Box::new(storage), app, snapshot_lock: tokio::sync::RwLock::new(()), - user_completions: spin::Mutex::new(BTreeMap::new()), - kern_completions: spin::Mutex::new(BTreeMap::new()), - response_cache: spin::Mutex::new(ResponseCache::new()), + user_completions: std::sync::Mutex::new(BTreeMap::new()), + kern_completions: std::sync::Mutex::new(BTreeMap::new()), + response_cache: tokio::sync::Mutex::new(ResponseCache::new()), }; Self(inner.into()) } diff --git a/sorock/src/process/command_log/producer.rs b/sorock/src/process/command_log/producer.rs index f8b5d9e5..18682f32 100644 --- a/sorock/src/process/command_log/producer.rs +++ b/sorock/src/process/command_log/producer.rs @@ -128,8 +128,8 @@ impl CommandLog { self.insert_entry(entry).await?; // discard [this_index, ) - self.user_completions.lock().split_off(&this_index); - self.kern_completions.lock().split_off(&this_index); + self.user_completions.lock().unwrap().split_off(&this_index); + self.kern_completions.lock().unwrap().split_off(&this_index); Ok(TryInsertResult::Inserted) } diff --git a/sorock/src/process/peer_svc/mod.rs b/sorock/src/process/peer_svc/mod.rs index dd20fd6b..ef0a468e 100644 --- a/sorock/src/process/peer_svc/mod.rs +++ b/sorock/src/process/peer_svc/mod.rs @@ -32,9 +32,9 @@ struct ThreadHandles { } pub struct Inner { - membership: spin::RwLock>, - peer_contexts: spin::RwLock>, - peer_threads: spin::Mutex>, + membership: std::sync::RwLock>, + peer_contexts: std::sync::RwLock>, + peer_threads: std::sync::Mutex>, command_log: Ref, driver: RaftDriver, @@ -95,7 +95,7 @@ impl PeerSvc { return Ok(()); } - if self.peer_contexts.read().contains_key(&id) { + if self.peer_contexts.read().unwrap().contains_key(&id) { return Ok(()); } @@ -104,7 +104,7 @@ impl PeerSvc { ReplicationProgress::new(last_log_index) }; - let mut peer_contexts = self.peer_contexts.write(); + let mut peer_contexts = self.peer_contexts.write().unwrap(); peer_contexts.insert( id.clone(), PeerContexts { @@ -122,14 +122,14 @@ impl PeerSvc { ), heartbeater_handle: thread::heartbeat::new(id.clone(), voter), }; - self.peer_threads.lock().insert(id, thread_handles); + self.peer_threads.lock().unwrap().insert(id, thread_handles); Ok(()) } fn remove_peer(&self, id: NodeId) { - self.peer_threads.lock().remove(&id); - self.peer_contexts.write().remove(&id); + self.peer_threads.lock().unwrap().remove(&id); + self.peer_contexts.write().unwrap().remove(&id); } pub async fn set_membership( @@ -171,7 +171,7 @@ impl PeerSvc { } info!("membership changed -> {:?}", config); - *self.membership.write() = config; + *self.membership.write().unwrap() = config; self.command_log .membership_pointer @@ -181,7 +181,7 @@ impl PeerSvc { } pub fn read_membership(&self) -> HashSet { - self.membership.read().clone() + self.membership.read().unwrap().clone() } pub async fn find_new_commit_index(&self) -> Result { @@ -190,7 +190,7 @@ impl PeerSvc { let last_log_index = self.command_log.get_log_last_index().await?; match_indices.push(last_log_index); - let peer_contexts = self.peer_contexts.read(); + let peer_contexts = self.peer_contexts.read().unwrap(); for (_, peer) in peer_contexts.iter() { match_indices.push(peer.progress.match_index); } @@ -204,7 +204,7 @@ impl PeerSvc { } pub fn reset_progress(&self, init_next_index: Index) { - let mut peer_contexts = self.peer_contexts.write(); + let mut peer_contexts = self.peer_contexts.write().unwrap(); for (_, peer) in peer_contexts.iter_mut() { peer.progress = ReplicationProgress::new(init_next_index); } @@ -213,7 +213,7 @@ impl PeerSvc { /// Choose the most advanced follower and send it TimeoutNow. pub async fn transfer_leadership(&self) -> Result<()> { let mut xs = { - let peer_contexts = self.peer_contexts.read(); + let peer_contexts = self.peer_contexts.read().unwrap(); let mut out = vec![]; for (id, peer) in peer_contexts.iter() { let progress = peer.progress; diff --git a/sorock/src/process/peer_svc/replication.rs b/sorock/src/process/peer_svc/replication.rs index 38a24bfe..45c48d28 100644 --- a/sorock/src/process/peer_svc/replication.rs +++ b/sorock/src/process/peer_svc/replication.rs @@ -35,6 +35,7 @@ impl PeerSvc { let peer_context = self .peer_contexts .read() + .unwrap() .get(&follower_id) .context(Error::PeerNotFound(follower_id.clone()))? .clone(); @@ -56,6 +57,7 @@ impl PeerSvc { let new_progress = ReplicationProgress::new(cur_snapshot_index); self.peer_contexts .write() + .unwrap() .get_mut(&follower_id) .context(Error::PeerNotFound(follower_id.clone()))? .progress = new_progress; @@ -100,6 +102,7 @@ impl PeerSvc { self.peer_contexts .write() + .unwrap() .get_mut(&follower_id) .context(Error::PeerNotFound(follower_id.clone()))? .progress = new_progress; diff --git a/sorock/src/process/query_queue.rs b/sorock/src/process/query_queue.rs index ab09d9cb..a0c04a49 100644 --- a/sorock/src/process/query_queue.rs +++ b/sorock/src/process/query_queue.rs @@ -40,13 +40,13 @@ pub struct Query { #[derive(Clone)] pub struct Producer { - inner: Arc>>, + inner: Arc>>, } impl Producer { /// Register a query for execution when the readable index reaches `read_index`. /// `read_index` should be the index of the commit pointer at the time of query. pub fn register(&self, read_index: Index, q: Query) -> Result<()> { - self.inner.lock().push(read_index, q); + self.inner.lock().unwrap().push(read_index, q); Ok(()) } } @@ -54,12 +54,12 @@ impl Producer { #[derive(Clone)] pub struct Processor { app: Ref, - inner: Arc>>, + inner: Arc>>, } impl Processor { /// Process the waiting queries up to `readable_index`. pub async fn process(&self, readable_index: Index) -> usize { - let qs = self.inner.lock().pop(readable_index); + let qs = self.inner.lock().unwrap().pop(readable_index); let mut futs = vec![]; for (_, q) in qs { @@ -81,7 +81,7 @@ impl Processor { } pub fn new(app: Ref) -> (Producer, Processor) { - let q = Arc::new(spin::Mutex::new(Queue::new())); + let q = Arc::new(std::sync::Mutex::new(Queue::new())); let processor = Processor { inner: q.clone(), app, diff --git a/sorock/src/process/voter/failure_detector.rs b/sorock/src/process/voter/failure_detector.rs index 01bb26eb..edbbdf94 100644 --- a/sorock/src/process/voter/failure_detector.rs +++ b/sorock/src/process/voter/failure_detector.rs @@ -18,7 +18,7 @@ impl Inner { } } pub struct FailureDetector { - inner: spin::RwLock, + inner: std::sync::RwLock, } impl FailureDetector { pub fn new() -> Self { @@ -30,7 +30,7 @@ impl FailureDetector { } pub fn receive_heartbeat(&self, leader_id: NodeId) { - let mut inner = self.inner.write(); + let mut inner = self.inner.write().unwrap(); let cur_watch_id = inner.watch_id.clone(); if cur_watch_id != leader_id { *inner = Inner::watch(leader_id); @@ -45,7 +45,7 @@ impl FailureDetector { /// Get a random wait time before becoming a candidate. /// Returns None if the current leader is still considered alive. pub fn get_election_timeout(&self) -> Option { - let inner = self.inner.read(); + let inner = self.inner.read().unwrap(); let fd = &inner.detector; let normal_dist = fd.normal_dist(); diff --git a/sorock/src/process/voter/mod.rs b/sorock/src/process/voter/mod.rs index 34c4a219..0cbfc4ce 100644 --- a/sorock/src/process/voter/mod.rs +++ b/sorock/src/process/voter/mod.rs @@ -14,7 +14,7 @@ pub enum ElectionState { } pub struct Inner { - state: spin::Mutex, + state: std::sync::Mutex, ballot: Box, /// Serializing any events that may change ballot state simplifies the voter's logic. @@ -39,7 +39,7 @@ impl Voter { driver: RaftDriver, ) -> Self { let inner = Inner { - state: spin::Mutex::new(ElectionState::Follower), + state: std::sync::Mutex::new(ElectionState::Follower), ballot: Box::new(ballot_store), vote_lock: tokio::sync::Mutex::new(()), safe_term: AtomicU64::new(0), @@ -54,12 +54,12 @@ impl Voter { impl Voter { pub fn read_election_state(&self) -> ElectionState { - *self.state.lock() + *self.state.lock().unwrap() } pub fn write_election_state(&self, e: ElectionState) { info!("election state -> {e:?}"); - *self.state.lock() = e; + *self.state.lock().unwrap() = e; } pub async fn read_ballot(&self) -> Result { diff --git a/sorock/src/service/raft/communicator/heartbeat_multiplex.rs b/sorock/src/service/raft/communicator/heartbeat_multiplex.rs index b33f8b37..388ee30c 100644 --- a/sorock/src/service/raft/communicator/heartbeat_multiplex.rs +++ b/sorock/src/service/raft/communicator/heartbeat_multiplex.rs @@ -1,7 +1,7 @@ use super::*; -use spin::Mutex; use std::collections::HashMap; +use std::sync::Mutex; pub struct HeartbeatBuffer { buf: HashMap, @@ -31,7 +31,7 @@ pub async fn run( tokio::time::sleep(Duration::from_millis(300)).await; let heartbeats = { - let mut buf = buf.lock(); + let mut buf = buf.lock().unwrap(); let out = buf.drain(); out }; diff --git a/sorock/src/service/raft/communicator/mod.rs b/sorock/src/service/raft/communicator/mod.rs index 33a56f2e..86cd9722 100644 --- a/sorock/src/service/raft/communicator/mod.rs +++ b/sorock/src/service/raft/communicator/mod.rs @@ -5,8 +5,8 @@ mod stream; use heartbeat_multiplex::*; use process::*; -use spin::Mutex; use std::sync::Arc; +use std::sync::Mutex; use tokio::task::AbortHandle; pub struct HandleDrop(AbortHandle); @@ -77,7 +77,11 @@ impl Communicator { } pub fn queue_heartbeat(&self, req: request::Heartbeat) { - self.conn.heartbeat_buffer.lock().push(self.shard_id, req); + self.conn + .heartbeat_buffer + .lock() + .unwrap() + .push(self.shard_id, req); } pub async fn process_user_write_request(