Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove dependencies to spin crate #440

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion sorock/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ redb.workspace = true
serde = { version = "1.0", features = ["derive"] }
serde_bytes = "0.11"
shrinkwraprs = "0.3"
spin = "0.9"
thiserror = "1"
tokio = { version = "1", features = ["rt"] }
tokio-util = "0.7"
Expand Down
8 changes: 4 additions & 4 deletions sorock/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::collections::HashMap;
pub struct Inner {
self_node_id: NodeId,
cache: moka::sync::Cache<NodeId, RaftConnection>,
process: spin::RwLock<HashMap<ShardId, Arc<RaftProcess>>>,
process: std::sync::RwLock<HashMap<ShardId, Arc<RaftProcess>>>,
}

/// `RaftNode` contains a set of `RaftProcess`es.
Expand Down Expand Up @@ -37,16 +37,16 @@ impl RaftNode {

/// Attach a Raft process to a shard.
pub fn attach_process(&self, shard_id: ShardId, p: RaftProcess) {
self.process.write().insert(shard_id, Arc::new(p));
self.process.write().unwrap().insert(shard_id, Arc::new(p));
}

/// Detach a Raft process from a shard.
pub fn detach_process(&self, shard_id: ShardId) {
self.process.write().remove(&shard_id);
self.process.write().unwrap().remove(&shard_id);
}

pub(crate) fn get_process(&self, shard_id: ShardId) -> Option<Arc<RaftProcess>> {
self.process.read().get(&shard_id).cloned()
self.process.read().unwrap().get(&shard_id).cloned()
}
}

Expand Down
20 changes: 15 additions & 5 deletions sorock/src/process/command_log/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,16 @@ impl CommandLog {
pub fn register_completion(&self, index: Index, completion: Completion) {
match completion {
Completion::User(completion) => {
self.user_completions.lock().insert(index, completion);
self.user_completions
.lock()
.unwrap()
.insert(index, completion);
}
Completion::Kern(completion) => {
self.kern_completions.lock().insert(index, completion);
self.kern_completions
.lock()
.unwrap()
.insert(index, completion);
}
}
}
Expand Down Expand Up @@ -63,7 +69,6 @@ impl CommandLog {
};

if do_process {
let mut response_cache = self.response_cache.lock();
debug!("process user@{process_index}");
match command {
Command::Snapshot { .. } => {
Expand All @@ -73,13 +78,15 @@ impl CommandLog {
message,
request_id,
} => {
let mut response_cache = self.response_cache.lock().await;
if response_cache.should_execute(&request_id) {
let resp = app.process_write(message, process_index).await?;
response_cache.insert_response(request_id.clone(), resp);
}

// Leader may have the completion for the request.
let user_completion = self.user_completions.lock().remove(&process_index);
let user_completion =
self.user_completions.lock().unwrap().remove(&process_index);
if let Some(user_completion) = user_completion {
if let Some(resp) = response_cache.get_response(&request_id) {
// If client abort the request before retry,
Expand All @@ -100,6 +107,7 @@ impl CommandLog {
}
}
Command::CompleteRequest { request_id } => {
let mut response_cache = self.response_cache.lock().await;
response_cache.complete_response(&request_id);
}
_ => {}
Expand Down Expand Up @@ -135,7 +143,9 @@ impl CommandLog {
Command::ClusterConfiguration { .. } => {}
_ => {}
}
if let Some(kern_completion) = self.kern_completions.lock().remove(&process_index) {
if let Some(kern_completion) =
self.kern_completions.lock().unwrap().remove(&process_index)
{
kern_completion.complete();
}
}
Expand Down
12 changes: 6 additions & 6 deletions sorock/src/process/command_log/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ pub struct Inner {
pub membership_pointer: AtomicU64,

app: App,
response_cache: spin::Mutex<ResponseCache>,
user_completions: spin::Mutex<BTreeMap<Index, completion::UserCompletion>>,
kern_completions: spin::Mutex<BTreeMap<Index, completion::KernCompletion>>,
response_cache: tokio::sync::Mutex<ResponseCache>,
user_completions: std::sync::Mutex<BTreeMap<Index, completion::UserCompletion>>,
kern_completions: std::sync::Mutex<BTreeMap<Index, completion::KernCompletion>>,
}

#[derive(shrinkwraprs::Shrinkwrap, Clone)]
Expand All @@ -48,9 +48,9 @@ impl CommandLog {
storage: Box::new(storage),
app,
snapshot_lock: tokio::sync::RwLock::new(()),
user_completions: spin::Mutex::new(BTreeMap::new()),
kern_completions: spin::Mutex::new(BTreeMap::new()),
response_cache: spin::Mutex::new(ResponseCache::new()),
user_completions: std::sync::Mutex::new(BTreeMap::new()),
kern_completions: std::sync::Mutex::new(BTreeMap::new()),
response_cache: tokio::sync::Mutex::new(ResponseCache::new()),
};
Self(inner.into())
}
Expand Down
4 changes: 2 additions & 2 deletions sorock/src/process/command_log/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@ impl CommandLog {
self.insert_entry(entry).await?;

// discard [this_index, )
self.user_completions.lock().split_off(&this_index);
self.kern_completions.lock().split_off(&this_index);
self.user_completions.lock().unwrap().split_off(&this_index);
self.kern_completions.lock().unwrap().split_off(&this_index);

Ok(TryInsertResult::Inserted)
}
Expand Down
26 changes: 13 additions & 13 deletions sorock/src/process/peer_svc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ struct ThreadHandles {
}

pub struct Inner {
membership: spin::RwLock<HashSet<NodeId>>,
peer_contexts: spin::RwLock<HashMap<NodeId, PeerContexts>>,
peer_threads: spin::Mutex<HashMap<NodeId, ThreadHandles>>,
membership: std::sync::RwLock<HashSet<NodeId>>,
peer_contexts: std::sync::RwLock<HashMap<NodeId, PeerContexts>>,
peer_threads: std::sync::Mutex<HashMap<NodeId, ThreadHandles>>,

command_log: Ref<CommandLog>,
driver: RaftDriver,
Expand Down Expand Up @@ -95,7 +95,7 @@ impl PeerSvc {
return Ok(());
}

if self.peer_contexts.read().contains_key(&id) {
if self.peer_contexts.read().unwrap().contains_key(&id) {
return Ok(());
}

Expand All @@ -104,7 +104,7 @@ impl PeerSvc {
ReplicationProgress::new(last_log_index)
};

let mut peer_contexts = self.peer_contexts.write();
let mut peer_contexts = self.peer_contexts.write().unwrap();
peer_contexts.insert(
id.clone(),
PeerContexts {
Expand All @@ -122,14 +122,14 @@ impl PeerSvc {
),
heartbeater_handle: thread::heartbeat::new(id.clone(), voter),
};
self.peer_threads.lock().insert(id, thread_handles);
self.peer_threads.lock().unwrap().insert(id, thread_handles);

Ok(())
}

fn remove_peer(&self, id: NodeId) {
self.peer_threads.lock().remove(&id);
self.peer_contexts.write().remove(&id);
self.peer_threads.lock().unwrap().remove(&id);
self.peer_contexts.write().unwrap().remove(&id);
}

pub async fn set_membership(
Expand Down Expand Up @@ -171,7 +171,7 @@ impl PeerSvc {
}

info!("membership changed -> {:?}", config);
*self.membership.write() = config;
*self.membership.write().unwrap() = config;

self.command_log
.membership_pointer
Expand All @@ -181,7 +181,7 @@ impl PeerSvc {
}

pub fn read_membership(&self) -> HashSet<NodeId> {
self.membership.read().clone()
self.membership.read().unwrap().clone()
}

pub async fn find_new_commit_index(&self) -> Result<Index> {
Expand All @@ -190,7 +190,7 @@ impl PeerSvc {
let last_log_index = self.command_log.get_log_last_index().await?;
match_indices.push(last_log_index);

let peer_contexts = self.peer_contexts.read();
let peer_contexts = self.peer_contexts.read().unwrap();
for (_, peer) in peer_contexts.iter() {
match_indices.push(peer.progress.match_index);
}
Expand All @@ -204,7 +204,7 @@ impl PeerSvc {
}

pub fn reset_progress(&self, init_next_index: Index) {
let mut peer_contexts = self.peer_contexts.write();
let mut peer_contexts = self.peer_contexts.write().unwrap();
for (_, peer) in peer_contexts.iter_mut() {
peer.progress = ReplicationProgress::new(init_next_index);
}
Expand All @@ -213,7 +213,7 @@ impl PeerSvc {
/// Choose the most advanced follower and send it TimeoutNow.
pub async fn transfer_leadership(&self) -> Result<()> {
let mut xs = {
let peer_contexts = self.peer_contexts.read();
let peer_contexts = self.peer_contexts.read().unwrap();
let mut out = vec![];
for (id, peer) in peer_contexts.iter() {
let progress = peer.progress;
Expand Down
3 changes: 3 additions & 0 deletions sorock/src/process/peer_svc/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ impl PeerSvc {
let peer_context = self
.peer_contexts
.read()
.unwrap()
.get(&follower_id)
.context(Error::PeerNotFound(follower_id.clone()))?
.clone();
Expand All @@ -56,6 +57,7 @@ impl PeerSvc {
let new_progress = ReplicationProgress::new(cur_snapshot_index);
self.peer_contexts
.write()
.unwrap()
.get_mut(&follower_id)
.context(Error::PeerNotFound(follower_id.clone()))?
.progress = new_progress;
Expand Down Expand Up @@ -100,6 +102,7 @@ impl PeerSvc {

self.peer_contexts
.write()
.unwrap()
.get_mut(&follower_id)
.context(Error::PeerNotFound(follower_id.clone()))?
.progress = new_progress;
Expand Down
10 changes: 5 additions & 5 deletions sorock/src/process/query_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,26 +40,26 @@ pub struct Query {

#[derive(Clone)]
pub struct Producer {
inner: Arc<spin::Mutex<Queue<Query>>>,
inner: Arc<std::sync::Mutex<Queue<Query>>>,
}
impl Producer {
/// Register a query for execution when the readable index reaches `read_index`.
/// `read_index` should be the index of the commit pointer at the time of query.
pub fn register(&self, read_index: Index, q: Query) -> Result<()> {
self.inner.lock().push(read_index, q);
self.inner.lock().unwrap().push(read_index, q);
Ok(())
}
}

#[derive(Clone)]
pub struct Processor {
app: Ref<App>,
inner: Arc<spin::Mutex<Queue<Query>>>,
inner: Arc<std::sync::Mutex<Queue<Query>>>,
}
impl Processor {
/// Process the waiting queries up to `readable_index`.
pub async fn process(&self, readable_index: Index) -> usize {
let qs = self.inner.lock().pop(readable_index);
let qs = self.inner.lock().unwrap().pop(readable_index);

let mut futs = vec![];
for (_, q) in qs {
Expand All @@ -81,7 +81,7 @@ impl Processor {
}

pub fn new(app: Ref<App>) -> (Producer, Processor) {
let q = Arc::new(spin::Mutex::new(Queue::new()));
let q = Arc::new(std::sync::Mutex::new(Queue::new()));
let processor = Processor {
inner: q.clone(),
app,
Expand Down
6 changes: 3 additions & 3 deletions sorock/src/process/voter/failure_detector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ impl Inner {
}
}
pub struct FailureDetector {
inner: spin::RwLock<Inner>,
inner: std::sync::RwLock<Inner>,
}
impl FailureDetector {
pub fn new() -> Self {
Expand All @@ -30,7 +30,7 @@ impl FailureDetector {
}

pub fn receive_heartbeat(&self, leader_id: NodeId) {
let mut inner = self.inner.write();
let mut inner = self.inner.write().unwrap();
let cur_watch_id = inner.watch_id.clone();
if cur_watch_id != leader_id {
*inner = Inner::watch(leader_id);
Expand All @@ -45,7 +45,7 @@ impl FailureDetector {
/// Get a random wait time before becoming a candidate.
/// Returns None if the current leader is still considered alive.
pub fn get_election_timeout(&self) -> Option<Duration> {
let inner = self.inner.read();
let inner = self.inner.read().unwrap();

let fd = &inner.detector;
let normal_dist = fd.normal_dist();
Expand Down
8 changes: 4 additions & 4 deletions sorock/src/process/voter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub enum ElectionState {
}

pub struct Inner {
state: spin::Mutex<ElectionState>,
state: std::sync::Mutex<ElectionState>,
ballot: Box<dyn RaftBallotStore>,

/// Serializing any events that may change ballot state simplifies the voter's logic.
Expand All @@ -39,7 +39,7 @@ impl Voter {
driver: RaftDriver,
) -> Self {
let inner = Inner {
state: spin::Mutex::new(ElectionState::Follower),
state: std::sync::Mutex::new(ElectionState::Follower),
ballot: Box::new(ballot_store),
vote_lock: tokio::sync::Mutex::new(()),
safe_term: AtomicU64::new(0),
Expand All @@ -54,12 +54,12 @@ impl Voter {

impl Voter {
pub fn read_election_state(&self) -> ElectionState {
*self.state.lock()
*self.state.lock().unwrap()
}

pub fn write_election_state(&self, e: ElectionState) {
info!("election state -> {e:?}");
*self.state.lock() = e;
*self.state.lock().unwrap() = e;
}

pub async fn read_ballot(&self) -> Result<Ballot> {
Expand Down
4 changes: 2 additions & 2 deletions sorock/src/service/raft/communicator/heartbeat_multiplex.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::*;

use spin::Mutex;
use std::collections::HashMap;
use std::sync::Mutex;

pub struct HeartbeatBuffer {
buf: HashMap<ShardId, request::Heartbeat>,
Expand Down Expand Up @@ -31,7 +31,7 @@ pub async fn run(
tokio::time::sleep(Duration::from_millis(300)).await;

let heartbeats = {
let mut buf = buf.lock();
let mut buf = buf.lock().unwrap();
let out = buf.drain();
out
};
Expand Down
8 changes: 6 additions & 2 deletions sorock/src/service/raft/communicator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ mod stream;

use heartbeat_multiplex::*;
use process::*;
use spin::Mutex;
use std::sync::Arc;
use std::sync::Mutex;
use tokio::task::AbortHandle;

pub struct HandleDrop(AbortHandle);
Expand Down Expand Up @@ -77,7 +77,11 @@ impl Communicator {
}

pub fn queue_heartbeat(&self, req: request::Heartbeat) {
self.conn.heartbeat_buffer.lock().push(self.shard_id, req);
self.conn
.heartbeat_buffer
.lock()
.unwrap()
.push(self.shard_id, req);
}

pub async fn process_user_write_request(
Expand Down
Loading