Skip to content

Commit

Permalink
Rework v2
Browse files Browse the repository at this point in the history
  • Loading branch information
akiradeveloper committed Oct 29, 2023
1 parent b5dd5ec commit 99a68a8
Show file tree
Hide file tree
Showing 34 changed files with 588 additions and 387 deletions.
11 changes: 7 additions & 4 deletions lol2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,22 @@ http-serde = "1"
log = "0.4"
moka = { version = "0.12", features = ["sync"] }
once_cell = "1.18"
phi-detector = "0.3"
prost = "0.12"
rand = "0.8"
serde = { version = "1.0", features = ["derive"] }
serde_bytes = "0.11"
shrinkwraprs = "0.3"
spin = "0.9"
tokio = { version = "1", features = ["rt"] }
tokio-retry = "0.3"
tokio-util = "0.7"
tonic = "0.10"
phi-detector = "0.3"
prost = "0.12"
rand = "0.8"
tower = "0.4.13"

[dev-dependencies]
tokio = { version = "1", features = ["full"] }

[build-dependencies]
tonic-build = "0.10"
prost-build = "0.12"
prost-build = "0.12"
3 changes: 2 additions & 1 deletion lol2/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {

let mut config = prost_build::Config::new();
config.bytes(&[
".lol2.Request.message",
".lol2.WriteRequest.message",
".lol2.ReadRequest.message",
".lol2.Response.message",
".lol2.KernRequest.message",
".lol2.LogStreamEntry.command",
Expand Down
21 changes: 11 additions & 10 deletions lol2/proto/lol2.proto
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,17 @@ import "google/protobuf/empty.proto";

package lol2;

message Request {
message WriteRequest {
bytes message = 1;
bool mutation = 2;
// unique identifier of this request
// duplicated requests with the same unique identifier are only executed once.
string request_id = 2;
}

message ReadRequest {
bytes message = 1;
}

message Response {
bytes message = 1;
}
Expand Down Expand Up @@ -72,21 +79,15 @@ message RemoveServerRequest {
string server_id = 1;
}

message ClusterInfo {
optional string known_leader_id = 1;
repeated string known_members = 2;
}

service Raft {
rpc Process(Request) returns (Response);
rpc Write(WriteRequest) returns (Response);
rpc Read(ReadRequest) returns (Response);
rpc ProcessKernRequest (KernRequest) returns (google.protobuf.Empty);
rpc GetClusterInfo (google.protobuf.Empty) returns (ClusterInfo);
rpc RequestVote (VoteRequest) returns (VoteResponse);
rpc AddServer (AddServerRequest) returns (google.protobuf.Empty);
rpc RemoveServer (RemoveServerRequest) returns (google.protobuf.Empty);
rpc SendLogStream (stream LogStreamChunk) returns (SendLogStreamResponse);
rpc GetSnapshot (GetSnapshotRequest) returns (stream SnapshotChunk);
rpc SendHeartbeat (Heartbeat) returns (google.protobuf.Empty);
rpc TimeoutNow (google.protobuf.Empty) returns (google.protobuf.Empty);
rpc Noop (google.protobuf.Empty) returns (google.protobuf.Empty);
}
2 changes: 1 addition & 1 deletion lol2/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::*;

pub type RaftClient = raft::raft_client::RaftClient<tonic::transport::channel::Channel>;
pub use raft::{AddServerRequest, ClusterInfo, RemoveServerRequest, Request, Response};
pub use raft::{AddServerRequest, ReadRequest, RemoveServerRequest, Response, WriteRequest};
10 changes: 7 additions & 3 deletions lol2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use process::RaftProcess;
use std::future::Future;
use std::sync::Arc;
use std::time::Duration;
use tonic::transport::Uri;
use tonic::transport::{Endpoint, Uri};

mod raft {
tonic::include_proto!("lol2");
Expand All @@ -36,7 +36,11 @@ mod raft {
)]
pub struct NodeId(#[serde(with = "http_serde::uri")] Uri);
impl NodeId {
pub fn new(uri: Uri) -> NodeId {
NodeId(uri)
pub fn new(uri: Uri) -> Self {
Self(uri)
}
pub fn from_str(url: &str) -> Result<Self> {
let url = url.parse()?;
Ok(Self(url))
}
}
11 changes: 5 additions & 6 deletions lol2/src/process/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ use super::*;

pub mod request {
use super::*;
pub struct UserRequest {
pub struct UserWriteRequest {
pub message: Bytes,
pub request_id: String,
}
pub struct UserReadRequest {
pub message: Bytes,
pub mutation: bool,
}
pub struct KernRequest {
pub message: Bytes,
Expand Down Expand Up @@ -42,8 +45,4 @@ pub mod response {
pub success: bool,
pub log_last_index: Index,
}
pub struct ClusterInfo {
pub known_leader: Option<NodeId>,
pub known_members: HashSet<NodeId>,
}
}
12 changes: 8 additions & 4 deletions lol2/src/process/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,20 @@ use super::*;

#[derive(serde::Serialize, serde::Deserialize)]
pub enum Command<'a> {
Noop,
Snapshot {
Barrier(Term),
ClusterConfiguration {
membership: HashSet<NodeId>,
},
ClusterConfiguration {
Snapshot {
membership: HashSet<NodeId>,
},
Req {
ExecuteRequest {
#[serde(with = "serde_bytes")]
message: &'a [u8],
request_id: String,
},
CompleteRequest {
request_id: String,
},
}

Expand Down
102 changes: 78 additions & 24 deletions lol2/src/process/command_log/consumer.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,19 @@
use super::*;

impl CommandLog {
pub fn register_completion(&self, index: Index, completion: Completion) {
match completion {
Completion::User(completion) => {
self.user_completions.lock().insert(index, completion);
}
Completion::Kern(completion) => {
self.kern_completions.lock().insert(index, completion);
}
}
}

pub async fn advance_snapshot_index(&self) -> Result<()> {
let cur_snapshot_index = self.snapshot_index.load(Ordering::SeqCst);
let cur_snapshot_index = self.snapshot_pointer.load(Ordering::SeqCst);
let proposed_snapshot_index = self.app.propose_new_snapshot().await?;
if proposed_snapshot_index > cur_snapshot_index {
info!("find a newer proposed snapshot@{proposed_snapshot_index}. will move the snapshot index.");
Expand All @@ -27,6 +38,7 @@ impl CommandLog {
..old_entry
}
};
// TODO wait for follower catch up
self.insert_snapshot(new_snapshot_entry).await?;
}
Ok(())
Expand All @@ -40,49 +52,91 @@ impl CommandLog {

let process_index = cur_user_index + 1;
let e = self.get_entry(process_index).await?;
let command = Command::deserialize(&e.command);

debug!("process user@{process_index}");
match Command::deserialize(&e.command) {
Command::Snapshot { .. } => {
app.install_snapshot(process_index).await?;
}
Command::Req { message } => {
let resp = app.process_write(message, process_index).await?;
if let Some(user_completion) =
self.user_completions.lock().unwrap().remove(&process_index)
{
user_completion.complete_with(resp);
let do_process = match command {
Command::ExecuteRequest { .. } => true,
Command::CompleteRequest { .. } => true,
Command::Snapshot { .. } => true,
_ => false,
};

if do_process {
debug!("process user@{process_index}");
match command {
Command::Snapshot { .. } => {
app.install_snapshot(process_index).await?;
}
Command::ExecuteRequest {
message,
request_id,
} => {
// If the request has never been executed, we should execute it.
if self.response_cache.should_execute(&request_id) {
let resp = app.process_write(message, process_index).await?;
self.response_cache
.insert_response(request_id.clone(), resp);
}

// Leader may have the completion for the request.
if let Some(user_completion) =
self.user_completions.lock().remove(&process_index)
{
if let Some(resp) = self.response_cache.get_response(&request_id) {
user_completion.complete_with(resp);
// After the request is completed, we queue a `CompleteRequest` command for terminating the context.
// This should be queued and replicated to the followers otherwise followers
// will never know the request is completed and the context will never be terminated.
let command = Command::CompleteRequest { request_id };
self.append_new_entry(Command::serialize(command), None)
.await
.ok();
}
}
}
Command::CompleteRequest { request_id } => {
self.response_cache.complete_response(&request_id);
}
_ => {}
}
_ => {}
}

self.user_pointer.fetch_max(process_index, Ordering::SeqCst);
self.user_pointer.store(process_index, Ordering::SeqCst);

Ok(true)
}

pub(crate) async fn advance_kern_process(&self, voter: Voter) -> Result<bool> {
let cur_kern_index = self.kern_pointer.load(Ordering::SeqCst);
if cur_kern_index >= self.commit_index.load(Ordering::SeqCst) {
if cur_kern_index >= self.commit_pointer.load(Ordering::SeqCst) {
return Ok(false);
}

let process_index = cur_kern_index + 1;
let e = self.get_entry(process_index).await?;
let command = Command::deserialize(&e.command);

debug!("process kern@{process_index}");
if std::matches!(Command::deserialize(&e.command), Command::Noop) {
let term = e.this_clock.term;
voter.commit_safe_term(term);
}
let do_process = match command {
Command::Barrier { .. } => true,
Command::ClusterConfiguration { .. } => true,
_ => false,
};

if let Some(kern_completion) = self.kern_completions.lock().unwrap().remove(&process_index)
{
kern_completion.complete();
if do_process {
debug!("process kern@{process_index}");
match command {
Command::Barrier(term) => {
voter.commit_safe_term(term);
}
Command::ClusterConfiguration { .. } => {}
_ => {}
}
if let Some(kern_completion) = self.kern_completions.lock().remove(&process_index) {
kern_completion.complete();
}
}

self.kern_pointer.fetch_max(process_index, Ordering::SeqCst);
self.kern_pointer.store(process_index, Ordering::SeqCst);

Ok(true)
}
Expand Down
Loading

0 comments on commit 99a68a8

Please sign in to comment.