Skip to content

Commit

Permalink
optimize: Use lockfree queue in heartbeat queue
Browse files Browse the repository at this point in the history
  • Loading branch information
akiradeveloper committed Nov 2, 2024
1 parent 22f1fac commit 00a21fd
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 23 deletions.
3 changes: 2 additions & 1 deletion sorock/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ derive_more = { version = "1", features = ["full"] }
flume = "0.11.0"
futures.workspace = true
http-serde = "2"
lockfree = "0.5.1"
moka = { version = "0.12", features = ["sync"] }
oneshot = "0.1.7"
phi-detector = "0.4"
Expand All @@ -43,4 +44,4 @@ tokio = { version = "1", features = ["full"] }
[build-dependencies]
prost-build.workspace = true
protox.workspace = true
tonic-build.workspace = true
tonic-build.workspace = true
29 changes: 12 additions & 17 deletions sorock/src/service/raft/communicator/heartbeat_multiplex.rs
Original file line number Diff line number Diff line change
@@ -1,40 +1,35 @@
use super::*;

use spin::Mutex;
use std::collections::HashMap;

pub struct HeartbeatBuffer {
buf: HashMap<ShardId, request::Heartbeat>,
buf: lockfree::queue::Queue<(ShardId, request::Heartbeat)>,
}
impl HeartbeatBuffer {
pub fn new() -> Self {
Self {
buf: HashMap::new(),
buf: lockfree::queue::Queue::new(),
}
}

pub fn push(&mut self, shard_id: ShardId, req: request::Heartbeat) {
self.buf.insert(shard_id, req);
pub fn push(&self, shard_id: ShardId, req: request::Heartbeat) {
self.buf.push((shard_id, req));
}

fn drain(&mut self) -> HashMap<ShardId, request::Heartbeat> {
self.buf.drain().collect()
fn drain(&self) -> HashMap<ShardId, request::Heartbeat> {
let mut out = HashMap::new();
for (k, v) in self.buf.pop_iter() {
out.insert(k, v);
}
out
}
}

pub async fn run(
buf: Arc<Mutex<HeartbeatBuffer>>,
mut cli: raft::RaftClient,
self_node_id: NodeId,
) {
pub async fn run(buf: Arc<HeartbeatBuffer>, mut cli: raft::RaftClient, self_node_id: NodeId) {
loop {
tokio::time::sleep(Duration::from_millis(300)).await;

let heartbeats = {
let mut buf = buf.lock();
let out = buf.drain();
out
};
let heartbeats = buf.drain();

let states = {
let mut out = HashMap::new();
Expand Down
8 changes: 3 additions & 5 deletions sorock/src/service/raft/communicator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ mod heartbeat_multiplex;
mod stream;

use heartbeat_multiplex::*;
use process::*;
use spin::Mutex;
use std::sync::Arc;
use tokio::task::AbortHandle;

Expand All @@ -19,7 +17,7 @@ impl Drop for HandleDrop {
#[derive(Clone)]
pub struct RaftConnection {
client: raft::RaftClient,
heartbeat_buffer: Arc<Mutex<HeartbeatBuffer>>,
heartbeat_buffer: Arc<HeartbeatBuffer>,
_abort_hdl: Arc<HandleDrop>,
}
impl RaftConnection {
Expand All @@ -35,7 +33,7 @@ impl RaftConnection {
raft::RaftClient::new(chan)
};

let heartbeat_buffer = Arc::new(Mutex::new(HeartbeatBuffer::new()));
let heartbeat_buffer = Arc::new(HeartbeatBuffer::new());

let fut = heartbeat_multiplex::run(heartbeat_buffer.clone(), client.clone(), self_node_id);
let fut = tokio::task::unconstrained(fut);
Expand Down Expand Up @@ -77,7 +75,7 @@ impl Communicator {
}

pub fn queue_heartbeat(&self, req: request::Heartbeat) {
self.conn.heartbeat_buffer.lock().push(self.shard_id, req);
self.conn.heartbeat_buffer.push(self.shard_id, req);
}

pub async fn process_user_write_request(
Expand Down

0 comments on commit 00a21fd

Please sign in to comment.