From 99a68a8534ce2ef39d51cde016545d14c7abc30d Mon Sep 17 00:00:00 2001 From: Akira Hayakawa Date: Wed, 18 Oct 2023 22:39:30 +0000 Subject: [PATCH] Rework v2 --- lol2/Cargo.toml | 11 +- lol2/build.rs | 3 +- lol2/proto/lol2.proto | 21 ++-- lol2/src/client.rs | 2 +- lol2/src/lib.rs | 10 +- lol2/src/process/api.rs | 11 +- lol2/src/process/command.rs | 12 +- lol2/src/process/command_log/consumer.rs | 102 +++++++++++++---- lol2/src/process/command_log/mod.rs | 47 ++++---- lol2/src/process/command_log/producer.rs | 107 +++++++----------- .../src/process/command_log/response_cache.rs | 63 +++++++++++ lol2/src/process/kern_message.rs | 1 - lol2/src/process/mod.rs | 6 + lol2/src/process/peer_svc/mod.rs | 30 ++--- lol2/src/process/peer_svc/replication.rs | 12 +- lol2/src/process/raft_process/cluster.rs | 55 ++++----- lol2/src/process/raft_process/mod.rs | 7 -- lol2/src/process/raft_process/queue.rs | 37 ++---- lol2/src/process/raft_process/responder.rs | 70 ++++++++---- lol2/src/process/thread/advance_commit.rs | 6 +- lol2/src/process/voter/election.rs | 9 +- lol2/src/process/voter/heartbeat.rs | 6 +- lol2/src/process/voter/mod.rs | 7 +- lol2/src/process/voter/stepdown.rs | 5 +- lol2/src/raft_service/mod.rs | 55 ++++----- lol2/src/requester/mod.rs | 19 +++- tests/env/src/lib.rs | 19 +++- tests/lol-tests/Cargo.toml | 2 + tests/lol-tests/src/lib.rs | 54 ++++----- tests/lol-tests/tests/one_node.rs | 45 ++++++-- tests/lol-tests/tests/three_nodes.rs | 46 ++++---- tests/testapp/Cargo.toml | 2 + tests/testapp/src/app/mod.rs | 57 +++++++--- tests/testapp/src/lib.rs | 36 ++++-- 34 files changed, 588 insertions(+), 387 deletions(-) create mode 100644 lol2/src/process/command_log/response_cache.rs diff --git a/lol2/Cargo.toml b/lol2/Cargo.toml index f38ced84..e8301f44 100644 --- a/lol2/Cargo.toml +++ b/lol2/Cargo.toml @@ -17,19 +17,22 @@ http-serde = "1" log = "0.4" moka = { version = "0.12", features = ["sync"] } once_cell = "1.18" +phi-detector = "0.3" +prost = "0.12" +rand = "0.8" serde = { version = "1.0", features = ["derive"] } serde_bytes = "0.11" shrinkwraprs = "0.3" +spin = "0.9" tokio = { version = "1", features = ["rt"] } +tokio-retry = "0.3" tokio-util = "0.7" tonic = "0.10" -phi-detector = "0.3" -prost = "0.12" -rand = "0.8" +tower = "0.4.13" [dev-dependencies] tokio = { version = "1", features = ["full"] } [build-dependencies] tonic-build = "0.10" -prost-build = "0.12" \ No newline at end of file +prost-build = "0.12" diff --git a/lol2/build.rs b/lol2/build.rs index d0813fe5..8e05a1d6 100644 --- a/lol2/build.rs +++ b/lol2/build.rs @@ -5,7 +5,8 @@ fn main() -> Result<(), Box> { let mut config = prost_build::Config::new(); config.bytes(&[ - ".lol2.Request.message", + ".lol2.WriteRequest.message", + ".lol2.ReadRequest.message", ".lol2.Response.message", ".lol2.KernRequest.message", ".lol2.LogStreamEntry.command", diff --git a/lol2/proto/lol2.proto b/lol2/proto/lol2.proto index 0f2b8a83..fad781b8 100644 --- a/lol2/proto/lol2.proto +++ b/lol2/proto/lol2.proto @@ -4,10 +4,17 @@ import "google/protobuf/empty.proto"; package lol2; -message Request { +message WriteRequest { bytes message = 1; - bool mutation = 2; + // unique identifier of this request + // duplicated requests with the same unique identifier are only executed once. + string request_id = 2; } + +message ReadRequest { + bytes message = 1; +} + message Response { bytes message = 1; } @@ -72,15 +79,10 @@ message RemoveServerRequest { string server_id = 1; } -message ClusterInfo { - optional string known_leader_id = 1; - repeated string known_members = 2; -} - service Raft { - rpc Process(Request) returns (Response); + rpc Write(WriteRequest) returns (Response); + rpc Read(ReadRequest) returns (Response); rpc ProcessKernRequest (KernRequest) returns (google.protobuf.Empty); - rpc GetClusterInfo (google.protobuf.Empty) returns (ClusterInfo); rpc RequestVote (VoteRequest) returns (VoteResponse); rpc AddServer (AddServerRequest) returns (google.protobuf.Empty); rpc RemoveServer (RemoveServerRequest) returns (google.protobuf.Empty); @@ -88,5 +90,4 @@ service Raft { rpc GetSnapshot (GetSnapshotRequest) returns (stream SnapshotChunk); rpc SendHeartbeat (Heartbeat) returns (google.protobuf.Empty); rpc TimeoutNow (google.protobuf.Empty) returns (google.protobuf.Empty); - rpc Noop (google.protobuf.Empty) returns (google.protobuf.Empty); } \ No newline at end of file diff --git a/lol2/src/client.rs b/lol2/src/client.rs index aa1d2fd1..dcc5a8cf 100644 --- a/lol2/src/client.rs +++ b/lol2/src/client.rs @@ -1,4 +1,4 @@ use super::*; pub type RaftClient = raft::raft_client::RaftClient; -pub use raft::{AddServerRequest, ClusterInfo, RemoveServerRequest, Request, Response}; +pub use raft::{AddServerRequest, ReadRequest, RemoveServerRequest, Response, WriteRequest}; diff --git a/lol2/src/lib.rs b/lol2/src/lib.rs index a82757e2..23cee396 100644 --- a/lol2/src/lib.rs +++ b/lol2/src/lib.rs @@ -16,7 +16,7 @@ use process::RaftProcess; use std::future::Future; use std::sync::Arc; use std::time::Duration; -use tonic::transport::Uri; +use tonic::transport::{Endpoint, Uri}; mod raft { tonic::include_proto!("lol2"); @@ -36,7 +36,11 @@ mod raft { )] pub struct NodeId(#[serde(with = "http_serde::uri")] Uri); impl NodeId { - pub fn new(uri: Uri) -> NodeId { - NodeId(uri) + pub fn new(uri: Uri) -> Self { + Self(uri) + } + pub fn from_str(url: &str) -> Result { + let url = url.parse()?; + Ok(Self(url)) } } diff --git a/lol2/src/process/api.rs b/lol2/src/process/api.rs index 1a576cf6..f49571b7 100644 --- a/lol2/src/process/api.rs +++ b/lol2/src/process/api.rs @@ -2,9 +2,12 @@ use super::*; pub mod request { use super::*; - pub struct UserRequest { + pub struct UserWriteRequest { + pub message: Bytes, + pub request_id: String, + } + pub struct UserReadRequest { pub message: Bytes, - pub mutation: bool, } pub struct KernRequest { pub message: Bytes, @@ -42,8 +45,4 @@ pub mod response { pub success: bool, pub log_last_index: Index, } - pub struct ClusterInfo { - pub known_leader: Option, - pub known_members: HashSet, - } } diff --git a/lol2/src/process/command.rs b/lol2/src/process/command.rs index 75f36411..d07b98a2 100644 --- a/lol2/src/process/command.rs +++ b/lol2/src/process/command.rs @@ -2,16 +2,20 @@ use super::*; #[derive(serde::Serialize, serde::Deserialize)] pub enum Command<'a> { - Noop, - Snapshot { + Barrier(Term), + ClusterConfiguration { membership: HashSet, }, - ClusterConfiguration { + Snapshot { membership: HashSet, }, - Req { + ExecuteRequest { #[serde(with = "serde_bytes")] message: &'a [u8], + request_id: String, + }, + CompleteRequest { + request_id: String, }, } diff --git a/lol2/src/process/command_log/consumer.rs b/lol2/src/process/command_log/consumer.rs index 8824e904..6b73f869 100644 --- a/lol2/src/process/command_log/consumer.rs +++ b/lol2/src/process/command_log/consumer.rs @@ -1,8 +1,19 @@ use super::*; impl CommandLog { + pub fn register_completion(&self, index: Index, completion: Completion) { + match completion { + Completion::User(completion) => { + self.user_completions.lock().insert(index, completion); + } + Completion::Kern(completion) => { + self.kern_completions.lock().insert(index, completion); + } + } + } + pub async fn advance_snapshot_index(&self) -> Result<()> { - let cur_snapshot_index = self.snapshot_index.load(Ordering::SeqCst); + let cur_snapshot_index = self.snapshot_pointer.load(Ordering::SeqCst); let proposed_snapshot_index = self.app.propose_new_snapshot().await?; if proposed_snapshot_index > cur_snapshot_index { info!("find a newer proposed snapshot@{proposed_snapshot_index}. will move the snapshot index."); @@ -27,6 +38,7 @@ impl CommandLog { ..old_entry } }; + // TODO wait for follower catch up self.insert_snapshot(new_snapshot_entry).await?; } Ok(()) @@ -40,49 +52,91 @@ impl CommandLog { let process_index = cur_user_index + 1; let e = self.get_entry(process_index).await?; + let command = Command::deserialize(&e.command); - debug!("process user@{process_index}"); - match Command::deserialize(&e.command) { - Command::Snapshot { .. } => { - app.install_snapshot(process_index).await?; - } - Command::Req { message } => { - let resp = app.process_write(message, process_index).await?; - if let Some(user_completion) = - self.user_completions.lock().unwrap().remove(&process_index) - { - user_completion.complete_with(resp); + let do_process = match command { + Command::ExecuteRequest { .. } => true, + Command::CompleteRequest { .. } => true, + Command::Snapshot { .. } => true, + _ => false, + }; + + if do_process { + debug!("process user@{process_index}"); + match command { + Command::Snapshot { .. } => { + app.install_snapshot(process_index).await?; } + Command::ExecuteRequest { + message, + request_id, + } => { + // If the request has never been executed, we should execute it. + if self.response_cache.should_execute(&request_id) { + let resp = app.process_write(message, process_index).await?; + self.response_cache + .insert_response(request_id.clone(), resp); + } + + // Leader may have the completion for the request. + if let Some(user_completion) = + self.user_completions.lock().remove(&process_index) + { + if let Some(resp) = self.response_cache.get_response(&request_id) { + user_completion.complete_with(resp); + // After the request is completed, we queue a `CompleteRequest` command for terminating the context. + // This should be queued and replicated to the followers otherwise followers + // will never know the request is completed and the context will never be terminated. + let command = Command::CompleteRequest { request_id }; + self.append_new_entry(Command::serialize(command), None) + .await + .ok(); + } + } + } + Command::CompleteRequest { request_id } => { + self.response_cache.complete_response(&request_id); + } + _ => {} } - _ => {} } - self.user_pointer.fetch_max(process_index, Ordering::SeqCst); + self.user_pointer.store(process_index, Ordering::SeqCst); Ok(true) } pub(crate) async fn advance_kern_process(&self, voter: Voter) -> Result { let cur_kern_index = self.kern_pointer.load(Ordering::SeqCst); - if cur_kern_index >= self.commit_index.load(Ordering::SeqCst) { + if cur_kern_index >= self.commit_pointer.load(Ordering::SeqCst) { return Ok(false); } let process_index = cur_kern_index + 1; let e = self.get_entry(process_index).await?; + let command = Command::deserialize(&e.command); - debug!("process kern@{process_index}"); - if std::matches!(Command::deserialize(&e.command), Command::Noop) { - let term = e.this_clock.term; - voter.commit_safe_term(term); - } + let do_process = match command { + Command::Barrier { .. } => true, + Command::ClusterConfiguration { .. } => true, + _ => false, + }; - if let Some(kern_completion) = self.kern_completions.lock().unwrap().remove(&process_index) - { - kern_completion.complete(); + if do_process { + debug!("process kern@{process_index}"); + match command { + Command::Barrier(term) => { + voter.commit_safe_term(term); + } + Command::ClusterConfiguration { .. } => {} + _ => {} + } + if let Some(kern_completion) = self.kern_completions.lock().remove(&process_index) { + kern_completion.complete(); + } } - self.kern_pointer.fetch_max(process_index, Ordering::SeqCst); + self.kern_pointer.store(process_index, Ordering::SeqCst); Ok(true) } diff --git a/lol2/src/process/command_log/mod.rs b/lol2/src/process/command_log/mod.rs index ac9d2a48..bdc8d94d 100644 --- a/lol2/src/process/command_log/mod.rs +++ b/lol2/src/process/command_log/mod.rs @@ -3,26 +3,31 @@ use super::*; mod consumer; mod membership; mod producer; -pub use producer::TryInsertResult; +mod response_cache; +use response_cache::ResponseCache; +pub use producer::TryInsertResult; pub struct Inner { - pub commit_index: AtomicU64, + append_lock: tokio::sync::Mutex<()>, + storage: Box, + + pub commit_pointer: AtomicU64, kern_pointer: AtomicU64, pub user_pointer: AtomicU64, - pub snapshot_index: AtomicU64, + pub snapshot_pointer: AtomicU64, /// Lock entries in range [snapshot_index, user_application_index] - pub snapshot_lock: tokio::sync::RwLock<()>, + snapshot_lock: tokio::sync::RwLock<()>, /// The index of the last membership. /// Unless `commit_index` >= membership_index`, /// new membership changes are not allowed to be queued. - pub membership_index: AtomicU64, + pub membership_pointer: AtomicU64, - storage: Box, app: App, - user_completions: std::sync::Mutex>, - kern_completions: std::sync::Mutex>, + response_cache: ResponseCache, + user_completions: spin::Mutex>, + kern_completions: spin::Mutex>, } #[derive(shrinkwraprs::Shrinkwrap, Clone)] @@ -30,16 +35,18 @@ pub struct CommandLog(pub Arc); impl CommandLog { pub fn new(storage: impl RaftLogStore, app: App) -> Self { let inner = Inner { - snapshot_index: AtomicU64::new(0), - commit_index: AtomicU64::new(0), + append_lock: tokio::sync::Mutex::new(()), + snapshot_pointer: AtomicU64::new(0), + commit_pointer: AtomicU64::new(0), kern_pointer: AtomicU64::new(0), user_pointer: AtomicU64::new(0), - membership_index: AtomicU64::new(0), + membership_pointer: AtomicU64::new(0), storage: Box::new(storage), app, snapshot_lock: tokio::sync::RwLock::new(()), - user_completions: std::sync::Mutex::new(BTreeMap::new()), - kern_completions: std::sync::Mutex::new(BTreeMap::new()), + user_completions: spin::Mutex::new(BTreeMap::new()), + kern_completions: spin::Mutex::new(BTreeMap::new()), + response_cache: ResponseCache::new(), }; Self(inner.into()) } @@ -58,7 +65,7 @@ impl CommandLog { // so we can restart from installing the snapshot. snapshot_index - 1 }; - self.commit_index.store(start_index, Ordering::SeqCst); + self.commit_pointer.store(start_index, Ordering::SeqCst); self.kern_pointer.store(start_index, Ordering::SeqCst); self.user_pointer.store(start_index, Ordering::SeqCst); @@ -68,13 +75,13 @@ impl CommandLog { impl Inner { pub async fn delete_old_snapshots(&self) -> Result<()> { - let cur_snapshot_index = self.snapshot_index.load(Ordering::Relaxed); + let cur_snapshot_index = self.snapshot_pointer.load(Ordering::Relaxed); self.app.delete_snapshots_before(cur_snapshot_index).await?; Ok(()) } pub async fn delete_old_entries(&self) -> Result<()> { - let cur_snapshot_index = self.snapshot_index.load(Ordering::Relaxed); + let cur_snapshot_index = self.snapshot_pointer.load(Ordering::Relaxed); self.storage .delete_entries_before(cur_snapshot_index) .await?; @@ -84,13 +91,13 @@ impl Inner { pub async fn insert_snapshot(&self, e: Entry) -> Result<()> { let _g = self.snapshot_lock.write().await; - let cur_snapshot_index = self.snapshot_index.load(Ordering::SeqCst); + let cur_snapshot_index = self.snapshot_pointer.load(Ordering::SeqCst); let new_snapshot_index = e.this_clock.index; ensure!(new_snapshot_index > cur_snapshot_index); self.storage.insert_entry(new_snapshot_index, e).await?; - self.snapshot_index + self.snapshot_pointer .store(new_snapshot_index, Ordering::SeqCst); info!("inserted a new snapshot@{new_snapshot_index}"); @@ -100,7 +107,7 @@ impl Inner { pub async fn open_snapshot(&self, index: Index) -> Result { let _g = self.snapshot_lock.read().await; - let cur_snapshot_index = self.snapshot_index.load(Ordering::SeqCst); + let cur_snapshot_index = self.snapshot_pointer.load(Ordering::SeqCst); ensure!(index == cur_snapshot_index); let st = self.app.open_snapshot(index).await?; @@ -126,6 +133,6 @@ impl Inner { } pub fn allow_queue_new_membership(&self) -> bool { - self.commit_index.load(Ordering::SeqCst) >= self.membership_index.load(Ordering::SeqCst) + self.commit_pointer.load(Ordering::SeqCst) >= self.membership_pointer.load(Ordering::SeqCst) } } diff --git a/lol2/src/process/command_log/producer.rs b/lol2/src/process/command_log/producer.rs index fd3a3319..6876136d 100644 --- a/lol2/src/process/command_log/producer.rs +++ b/lol2/src/process/command_log/producer.rs @@ -7,12 +7,9 @@ pub enum TryInsertResult { } impl CommandLog { - pub async fn append_new_entry( - &self, - command: Bytes, - completion: Option, - term: Term, - ) -> Result { + pub async fn append_new_entry(&self, command: Bytes, term: Option) -> Result { + let _g = self.append_lock.lock().await; + let cur_last_log_index = self.get_log_last_index().await?; let prev_clock = self .storage @@ -21,8 +18,12 @@ impl CommandLog { .unwrap() .this_clock; let append_index = cur_last_log_index + 1; + let this_term = match term { + Some(t) => t, + None => prev_clock.term, + }; let this_clock = Clock { - term, + term: this_term, index: append_index, }; let e = Entry { @@ -31,30 +32,8 @@ impl CommandLog { command, }; self.insert_entry(e).await?; - if let Some(completion) = completion { - match completion { - Completion::User(completion) => { - self.user_completions - .lock() - .unwrap() - .insert(append_index, completion); - } - Completion::Kern(completion) => { - self.kern_completions - .lock() - .unwrap() - .insert(append_index, completion); - } - } - } - Ok(append_index) - } - pub async fn append_noop_barrier(&self, term: Term) -> Result { - let index = self - .append_new_entry(Command::serialize(Command::Noop), None, term) - .await?; - Ok(index) + Ok(append_index) } pub async fn try_insert_entry( @@ -65,42 +44,42 @@ impl CommandLog { ) -> Result { // If the entry is snapshot then we should insert this entry without consistency checks. // Old entries before the new snapshot will be garbage collected. - if std::matches!( - Command::deserialize(&entry.command), - Command::Snapshot { .. } - ) { - let Clock { - term: _, - index: snapshot_index, - } = entry.this_clock; - warn!( - "log is too old. replicated a snapshot (idx={}) from leader", - snapshot_index - ); - - // Invariant: snapshot entry exists => snapshot exists - if let Err(e) = self - .app - .fetch_snapshot(snapshot_index, sender_id.clone(), driver) - .await - { - error!( - "could not fetch app snapshot (idx={}) from sender {}", - snapshot_index, sender_id, + match Command::deserialize(&entry.command) { + Command::Snapshot { membership } => { + let Clock { + term: _, + index: snapshot_index, + } = entry.this_clock; + warn!( + "log is too old. replicated a snapshot (idx={}) from leader", + snapshot_index ); - return Err(e); - } - self.insert_snapshot(entry).await?; + // Invariant: snapshot entry exists => snapshot exists + if let Err(e) = self + .app + .fetch_snapshot(snapshot_index, sender_id.clone(), driver) + .await + { + error!( + "could not fetch app snapshot (idx={}) from sender {}", + snapshot_index, sender_id, + ); + return Err(e); + } - self.commit_index - .store(snapshot_index - 1, Ordering::SeqCst); - self.kern_pointer - .store(snapshot_index - 1, Ordering::SeqCst); - self.user_pointer - .store(snapshot_index - 1, Ordering::SeqCst); + self.insert_snapshot(entry).await?; - return Ok(TryInsertResult::Inserted); + self.commit_pointer + .store(snapshot_index - 1, Ordering::SeqCst); + self.kern_pointer + .store(snapshot_index - 1, Ordering::SeqCst); + self.user_pointer + .store(snapshot_index - 1, Ordering::SeqCst); + + return Ok(TryInsertResult::Inserted); + } + _ => {} } let Clock { @@ -139,8 +118,8 @@ impl CommandLog { self.insert_entry(entry).await?; // discard [this_index, ) - self.user_completions.lock().unwrap().split_off(&this_index); - self.kern_completions.lock().unwrap().split_off(&this_index); + self.user_completions.lock().split_off(&this_index); + self.kern_completions.lock().split_off(&this_index); Ok(TryInsertResult::Inserted) } diff --git a/lol2/src/process/command_log/response_cache.rs b/lol2/src/process/command_log/response_cache.rs new file mode 100644 index 00000000..4c1a4701 --- /dev/null +++ b/lol2/src/process/command_log/response_cache.rs @@ -0,0 +1,63 @@ +use super::*; + +struct TTLSet { + cache: moka::sync::Cache, +} +impl TTLSet { + pub fn new(ttl: Duration) -> Self { + let builder = moka::sync::Cache::builder() + .initial_capacity(1000) + .time_to_live(ttl); + Self { + cache: builder.build(), + } + } + + pub fn exists(&self, k: &str) -> bool { + self.cache.contains_key(k) + } + + pub fn insert(&self, k: String) { + self.cache.insert(k, ()); + } +} + +pub struct ResponseCache { + responses: spin::Mutex>, + completes: TTLSet, +} +impl ResponseCache { + pub fn new() -> Self { + Self { + responses: spin::Mutex::new(HashMap::new()), + // Client may retry the request even after the response + // but with a sane client this could happen in quite a short time. + // Completed request_id is kept for 5 seconds to prevent executing + // the same request twice. + completes: TTLSet::new(Duration::from_secs(5)), + } + } + + pub fn should_execute(&self, k: &str) -> bool { + if self.completes.exists(k) { + return false; + } + match self.responses.lock().get(k) { + Some(_) => false, + None => true, + } + } + + pub fn insert_response(&self, k: String, v: Bytes) { + self.responses.lock().insert(k, v); + } + + pub fn get_response(&self, k: &str) -> Option { + self.responses.lock().get(k).cloned() + } + + pub fn complete_response(&self, k: &str) { + self.responses.lock().remove(k); + self.completes.insert(k.to_owned()); + } +} diff --git a/lol2/src/process/kern_message.rs b/lol2/src/process/kern_message.rs index d8f8f2f8..fb02b6aa 100644 --- a/lol2/src/process/kern_message.rs +++ b/lol2/src/process/kern_message.rs @@ -4,7 +4,6 @@ use super::*; pub enum KernRequest { AddServer(NodeId), RemoveServer(NodeId), - Noop, } impl KernRequest { diff --git a/lol2/src/process/mod.rs b/lol2/src/process/mod.rs index 16cc48d7..79cbb127 100644 --- a/lol2/src/process/mod.rs +++ b/lol2/src/process/mod.rs @@ -107,3 +107,9 @@ pub trait RaftBallotStore: Sync + Send + 'static { async fn save_ballot(&self, v: Ballot) -> Result<()>; async fn load_ballot(&self) -> Result; } + +#[derive(Clone, Hash, PartialEq, Eq, serde::Serialize, serde::Deserialize, Debug)] +pub struct ExecutionKey { + pub client_id: String, + pub seq_num: u64, +} diff --git a/lol2/src/process/peer_svc/mod.rs b/lol2/src/process/peer_svc/mod.rs index ea08f05d..ee5c2042 100644 --- a/lol2/src/process/peer_svc/mod.rs +++ b/lol2/src/process/peer_svc/mod.rs @@ -29,9 +29,9 @@ struct ThreadHandles { } pub struct Inner { - membership: std::sync::RwLock>, - peer_contexts: std::sync::RwLock>, - peer_threads: std::sync::Mutex>, + membership: spin::RwLock>, + peer_contexts: spin::RwLock>, + peer_threads: spin::Mutex>, command_log: CommandLog, driver: RaftDriver, @@ -81,7 +81,7 @@ impl PeerSvc { return Ok(()); } - if self.peer_contexts.read().unwrap().contains_key(&id) { + if self.peer_contexts.read().contains_key(&id) { return Ok(()); } @@ -90,7 +90,7 @@ impl PeerSvc { ReplicationProgress::new(last_log_index) }; - let mut peer_contexts = self.peer_contexts.write().unwrap(); + let mut peer_contexts = self.peer_contexts.write(); peer_contexts.insert( id.clone(), PeerContexts { @@ -102,14 +102,14 @@ impl PeerSvc { replicator_handle: thread::replication::new(id.clone(), self.clone(), voter.clone()), heartbeater_handle: thread::heartbeat::new(id.clone(), voter), }; - self.peer_threads.lock().unwrap().insert(id, thread_handles); + self.peer_threads.lock().insert(id, thread_handles); Ok(()) } fn remove_peer(&self, id: NodeId) { - self.peer_threads.lock().unwrap().remove(&id); - self.peer_contexts.write().unwrap().remove(&id); + self.peer_threads.lock().remove(&id); + self.peer_contexts.write().remove(&id); } pub async fn set_membership( @@ -151,17 +151,17 @@ impl PeerSvc { } info!("membership changed -> {:?}", config); - *self.membership.write().unwrap() = config; + *self.membership.write() = config; self.command_log - .membership_index - .fetch_max(index, Ordering::SeqCst); + .membership_pointer + .store(index, Ordering::SeqCst); Ok(()) } pub fn read_membership(&self) -> HashSet { - self.membership.read().unwrap().clone() + self.membership.read().clone() } pub async fn find_new_commit_index(&self) -> Result { @@ -170,7 +170,7 @@ impl PeerSvc { let last_log_index = self.command_log.get_log_last_index().await?; match_indices.push(last_log_index); - let peer_contexts = self.peer_contexts.read().unwrap(); + let peer_contexts = self.peer_contexts.read(); for (_, peer) in peer_contexts.iter() { match_indices.push(peer.progress.match_index); } @@ -184,7 +184,7 @@ impl PeerSvc { } pub fn reset_progress(&self, init_next_index: Index) { - let mut peer_contexts = self.peer_contexts.write().unwrap(); + let mut peer_contexts = self.peer_contexts.write(); for (_, peer) in peer_contexts.iter_mut() { peer.progress = ReplicationProgress::new(init_next_index); } @@ -193,7 +193,7 @@ impl PeerSvc { /// Choose the most advanced follower and send it TimeoutNow. pub async fn transfer_leadership(&self) -> Result<()> { let mut xs = { - let peer_contexts = self.peer_contexts.read().unwrap(); + let peer_contexts = self.peer_contexts.read(); let mut out = vec![]; for (id, peer) in peer_contexts.iter() { let progress = peer.progress; diff --git a/lol2/src/process/peer_svc/replication.rs b/lol2/src/process/peer_svc/replication.rs index e9b9b87b..1a4da73c 100644 --- a/lol2/src/process/peer_svc/replication.rs +++ b/lol2/src/process/peer_svc/replication.rs @@ -28,13 +28,7 @@ impl PeerSvc { } pub async fn advance_replication(&self, follower_id: NodeId) -> Result { - let peer = self - .peer_contexts - .read() - .unwrap() - .get(&follower_id) - .unwrap() - .clone(); + let peer = self.peer_contexts.read().get(&follower_id).unwrap().clone(); let old_progress = peer.progress; let cur_last_log_index = self.command_log.get_log_last_index().await?; @@ -47,7 +41,7 @@ impl PeerSvc { // The entries to send may be deleted due to previous compactions. // In this case, replication will reset from the current snapshot index. - let cur_snapshot_index = self.command_log.snapshot_index.load(Ordering::SeqCst); + let cur_snapshot_index = self.command_log.snapshot_pointer.load(Ordering::SeqCst); if old_progress.next_index < cur_snapshot_index { warn!( "entry not found at next_index (idx={}) for {}", @@ -56,7 +50,6 @@ impl PeerSvc { let new_progress = ReplicationProgress::new(cur_snapshot_index); self.peer_contexts .write() - .unwrap() .get_mut(&follower_id) .unwrap() .progress = new_progress; @@ -100,7 +93,6 @@ impl PeerSvc { self.peer_contexts .write() - .unwrap() .get_mut(&follower_id) .unwrap() .progress = new_progress; diff --git a/lol2/src/process/raft_process/cluster.rs b/lol2/src/process/raft_process/cluster.rs index b23dcd8a..1ab0fd85 100644 --- a/lol2/src/process/raft_process/cluster.rs +++ b/lol2/src/process/raft_process/cluster.rs @@ -1,29 +1,40 @@ use super::*; impl RaftProcess { + /// Process configuration change. + /// Configuration should be applied as soon as it is inserted into the log because doing so + /// guarantees that majority of the servers move to the configuration when the entry is committed. + /// Without this property, servers may still be in some old configuration which may cause split-brain + /// by electing two leaders in a single term which is not allowed in Raft. + pub async fn process_configuration_command(&self, command: &[u8], index: Index) -> Result<()> { + let config0 = match Command::deserialize(command) { + Command::Snapshot { membership } => Some(membership), + Command::ClusterConfiguration { membership } => Some(membership), + _ => None, + }; + if let Some(config) = config0 { + self.peers + .set_membership(config, index, Ref(self.voter.clone())) + .await?; + } + Ok(()) + } + async fn init_cluster(&self) -> Result<()> { + let mut membership = HashSet::new(); + membership.insert(self.driver.selfid()); + + let init_command = Command::serialize(Command::Snapshot { + membership: membership.clone(), + }); let snapshot = Entry { prev_clock: Clock { term: 0, index: 0 }, this_clock: Clock { term: 0, index: 1 }, - command: Command::serialize(Command::Snapshot { - membership: HashSet::new(), - }), + command: init_command.clone(), }; - self.command_log.insert_snapshot(snapshot).await?; - let mut membership = HashSet::new(); - membership.insert(self.driver.selfid()); - let add_server = Entry { - prev_clock: Clock { term: 0, index: 1 }, - this_clock: Clock { term: 0, index: 2 }, - command: Command::serialize(Command::ClusterConfiguration { - membership: membership.clone(), - }), - }; - self.command_log.insert_entry(add_server).await?; - self.peers - .set_membership(membership, 2, Ref(self.voter.clone())) - .await?; + self.command_log.insert_snapshot(snapshot).await?; + self.process_configuration_command(&init_command, 1).await?; // After this function is called // this server immediately becomes the leader by self-vote and advance commit index. @@ -60,14 +71,4 @@ impl RaftProcess { conn.process_kern_request(req).await?; Ok(()) } - - pub(crate) async fn request_cluster_info(&self) -> Result { - let ballot = self.voter.read_ballot().await?; - let membership = self.peers.read_membership(); - let info = response::ClusterInfo { - known_leader: ballot.voted_for, - known_members: membership.clone(), - }; - Ok(info) - } } diff --git a/lol2/src/process/raft_process/mod.rs b/lol2/src/process/raft_process/mod.rs index 00b5333b..a221ee53 100644 --- a/lol2/src/process/raft_process/mod.rs +++ b/lol2/src/process/raft_process/mod.rs @@ -81,11 +81,4 @@ impl RaftProcess { }; Ok(RaftProcess(inner.into())) } - - pub async fn noop(&self) -> Result<()> { - let req = request::KernRequest { - message: kern_message::KernRequest::Noop.serialize(), - }; - self.process_kern_request(req).await - } } diff --git a/lol2/src/process/raft_process/queue.rs b/lol2/src/process/raft_process/queue.rs index 4724de0c..2195d2ea 100644 --- a/lol2/src/process/raft_process/queue.rs +++ b/lol2/src/process/raft_process/queue.rs @@ -1,37 +1,20 @@ use super::*; impl RaftProcess { - async fn process_membership_change(&self, command: Bytes, index: Index) -> Result<()> { - match Command::deserialize(&command) { - Command::Snapshot { membership } => { - self.peers - .set_membership(membership, index, Ref(self.voter.clone())) - .await?; - } - Command::ClusterConfiguration { membership } => { - self.peers - .set_membership(membership, index, Ref(self.voter.clone())) - .await?; - } - _ => {} - } - Ok(()) - } - - pub async fn queue_new_entry( - &self, - command: Bytes, - completion: Option, - ) -> Result { - let cur_term = self.voter.allow_queue_entry().await?; + pub async fn queue_new_entry(&self, command: Bytes, completion: Completion) -> Result { + ensure!(self.voter.allow_queue_entry().await?); let append_index = self .command_log - .append_new_entry(command.clone(), completion, cur_term) + .append_new_entry(command.clone(), None) .await?; - self.process_membership_change(command, append_index) + self.command_log + .register_completion(append_index, completion); + + self.process_configuration_command(&command, append_index) .await?; + Ok(append_index) } @@ -51,8 +34,8 @@ impl RaftProcess { .await? { command_log::TryInsertResult::Inserted => { - self.process_membership_change(command, insert_index) - .await?; + self.process_configuration_command(&command, insert_index) + .await? } command_log::TryInsertResult::Skipped => {} command_log::TryInsertResult::Rejected => { diff --git a/lol2/src/process/raft_process/responder.rs b/lol2/src/process/raft_process/responder.rs index 2c27064b..66912d9a 100644 --- a/lol2/src/process/raft_process/responder.rs +++ b/lol2/src/process/raft_process/responder.rs @@ -36,11 +36,10 @@ impl RaftProcess { membership.remove(&id); Command::ClusterConfiguration { membership } } - kern_message::KernRequest::Noop => Command::Noop, }; self.queue_new_entry( Command::serialize(command), - Some(Completion::Kern(kern_completion)), + Completion::Kern(kern_completion), ) .await?; rx.await?; @@ -51,7 +50,7 @@ impl RaftProcess { Ok(()) } - pub async fn process_user_request(&self, req: request::UserRequest) -> Result { + pub async fn process_user_read_request(&self, req: request::UserReadRequest) -> Result { let ballot = self.voter.read_ballot().await.unwrap(); ensure!(ballot.voted_for.is_some()); @@ -62,27 +61,56 @@ impl RaftProcess { voter::ElectionState::Leader ) { let (user_completion, rx) = completion::prepare_user_completion(); - if req.mutation { - let command = Command::Req { - message: &req.message, - }; - self.queue_new_entry( - Command::serialize(command), - Some(Completion::User(user_completion)), - ) - .await?; - } else { - let commit_index = self.command_log.commit_index.load(Ordering::SeqCst); - let query = query_queue::Query { - message: req.message, - user_completion, - }; - self.query_queue.register(commit_index, query).await; - } + + let read_index = self.command_log.commit_pointer.load(Ordering::SeqCst); + let query = query_queue::Query { + message: req.message, + user_completion, + }; + self.query_queue.register(read_index, query).await; + + rx.await? + } else { + // This check is to avoid looping. + ensure!(self.driver.selfid() != leader_id); + let conn = self.driver.connect(leader_id); + conn.process_user_read_request(req).await? + }; + Ok(resp) + } + + pub async fn process_user_write_request( + &self, + req: request::UserWriteRequest, + ) -> Result { + let ballot = self.voter.read_ballot().await.unwrap(); + + ensure!(ballot.voted_for.is_some()); + let leader_id = ballot.voted_for.unwrap(); + + let resp = if std::matches!( + self.voter.read_election_state(), + voter::ElectionState::Leader + ) { + let (user_completion, rx) = completion::prepare_user_completion(); + + let command = Command::ExecuteRequest { + message: &req.message, + request_id: req.request_id, + }; + self.queue_new_entry( + Command::serialize(command), + Completion::User(user_completion), + ) + .await?; + rx.await? } else { + // This check is to avoid looping. + ensure!(self.driver.selfid() != leader_id); + let conn = self.driver.connect(leader_id); - conn.process_user_request(req).await? + conn.process_user_write_request(req).await? }; Ok(resp) } diff --git a/lol2/src/process/thread/advance_commit.rs b/lol2/src/process/thread/advance_commit.rs index c19de0b7..84db801b 100644 --- a/lol2/src/process/thread/advance_commit.rs +++ b/lol2/src/process/thread/advance_commit.rs @@ -11,14 +11,14 @@ impl Thread { let election_state = self.voter.read_election_state(); ensure!(std::matches!(election_state, voter::ElectionState::Leader)); - let cur_commit_index = self.command_log.commit_index.load(Ordering::SeqCst); + let cur_commit_index = self.command_log.commit_pointer.load(Ordering::SeqCst); let new_commit_index = self.peers.find_new_commit_index().await?; if new_commit_index > cur_commit_index { debug!("commit -> {new_commit_index}"); self.command_log - .commit_index - .fetch_max(new_commit_index, Ordering::SeqCst); + .commit_pointer + .store(new_commit_index, Ordering::SeqCst); } Ok(()) diff --git a/lol2/src/process/voter/election.rs b/lol2/src/process/voter/election.rs index e0ba2436..1f62a7d1 100644 --- a/lol2/src/process/voter/election.rs +++ b/lol2/src/process/voter/election.rs @@ -61,6 +61,7 @@ impl Voter { let grant = match &ballot.voted_for { None => { + info!("learn candidate as the new leader"); ballot.voted_for = Some(candidate_id.clone()); true } @@ -199,7 +200,13 @@ impl Voter { info!("got enough votes from the cluster. promoted to leader"); // As soon as the node becomes the leader, replicate noop entries with term. - let index = self.command_log.append_noop_barrier(vote_term).await?; + let index = self + .command_log + .append_new_entry( + Command::serialize(Command::Barrier(vote_term)), + Some(vote_term), + ) + .await?; info!("noop barrier is queued@{index} (term={vote_term})"); // Initialize replication progress diff --git a/lol2/src/process/voter/heartbeat.rs b/lol2/src/process/voter/heartbeat.rs index fb127ede..08091a64 100644 --- a/lol2/src/process/voter/heartbeat.rs +++ b/lol2/src/process/voter/heartbeat.rs @@ -35,15 +35,15 @@ impl Voter { let new_commit_index = std::cmp::min(leader_commit, self.command_log.get_log_last_index().await?); self.command_log - .commit_index - .fetch_max(new_commit_index, Ordering::SeqCst); + .commit_pointer + .store(new_commit_index, Ordering::SeqCst); Ok(()) } pub async fn send_heartbeat(&self, follower_id: NodeId) -> Result<()> { let ballot = self.read_ballot().await?; - let leader_commit_index = self.command_log.commit_index.load(Ordering::SeqCst); + let leader_commit_index = self.command_log.commit_pointer.load(Ordering::SeqCst); let req = request::Heartbeat { leader_id: self.driver.selfid(), leader_term: ballot.cur_term, diff --git a/lol2/src/process/voter/mod.rs b/lol2/src/process/voter/mod.rs index 5c1b34c8..98133d47 100644 --- a/lol2/src/process/voter/mod.rs +++ b/lol2/src/process/voter/mod.rs @@ -73,13 +73,12 @@ impl Voter { pub fn commit_safe_term(&self, term: Term) { info!("commit safe term={term}"); - self.safe_term.fetch_max(term, Ordering::SeqCst); + self.safe_term.store(term, Ordering::SeqCst); } - pub async fn allow_queue_entry(&self) -> Result { + pub async fn allow_queue_entry(&self) -> Result { let cur_term = self.ballot.load_ballot().await?.cur_term; let cur_safe_term = self.safe_term.load(Ordering::SeqCst); - ensure!(cur_safe_term >= cur_term); - Ok(cur_term) + Ok(cur_safe_term == cur_term) } } diff --git a/lol2/src/process/voter/stepdown.rs b/lol2/src/process/voter/stepdown.rs index b9c3ccb8..7321fb95 100644 --- a/lol2/src/process/voter/stepdown.rs +++ b/lol2/src/process/voter/stepdown.rs @@ -7,10 +7,11 @@ impl Voter { voter::ElectionState::Leader )); - let last_membership_change_index = self.command_log.membership_index.load(Ordering::SeqCst); + let last_membership_change_index = + self.command_log.membership_pointer.load(Ordering::SeqCst); // Ensure the membership entry is committed otherwise add-server request may be lost. ensure!( - last_membership_change_index <= self.command_log.commit_index.load(Ordering::SeqCst) + last_membership_change_index <= self.command_log.commit_pointer.load(Ordering::SeqCst) ); let config = self diff --git a/lol2/src/raft_service/mod.rs b/lol2/src/raft_service/mod.rs index 719dca73..e8a035a9 100644 --- a/lol2/src/raft_service/mod.rs +++ b/lol2/src/raft_service/mod.rs @@ -17,66 +17,55 @@ pub struct ServiceImpl { impl raft::raft_server::Raft for ServiceImpl { type GetSnapshotStream = stream::SnapshotStreamOut; - async fn process( + async fn write( &self, - request: tonic::Request, + request: tonic::Request, ) -> std::result::Result, tonic::Status> { let req = request.into_inner(); - let req = request::UserRequest { + let req = request::UserWriteRequest { message: req.message, - mutation: req.mutation, + request_id: req.request_id, }; let resp = self .node .get_process() - .process_user_request(req) + .process_user_write_request(req) .await .unwrap(); Ok(tonic::Response::new(raft::Response { message: resp })) } - async fn process_kern_request( + async fn read( &self, - request: tonic::Request, - ) -> std::result::Result, tonic::Status> { + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { let req = request.into_inner(); - let req = request::KernRequest { + let req = request::UserReadRequest { message: req.message, }; - self.node + let resp = self + .node .get_process() - .process_kern_request(req) + .process_user_read_request(req) .await .unwrap(); - Ok(tonic::Response::new(())) + Ok(tonic::Response::new(raft::Response { message: resp })) } - async fn noop( + async fn process_kern_request( &self, - _: tonic::Request<()>, + request: tonic::Request, ) -> std::result::Result, tonic::Status> { - self.node.get_process().noop().await.unwrap(); - Ok(tonic::Response::new(())) - } - - async fn get_cluster_info( - &self, - _: tonic::Request<()>, - ) -> std::result::Result, tonic::Status> { - let resp = self - .node + let req = request.into_inner(); + let req = request::KernRequest { + message: req.message, + }; + self.node .get_process() - .request_cluster_info() + .process_kern_request(req) .await .unwrap(); - Ok(tonic::Response::new(raft::ClusterInfo { - known_leader_id: resp.known_leader.map(|id| id.to_string()), - known_members: resp - .known_members - .into_iter() - .map(|id| id.to_string()) - .collect(), - })) + Ok(tonic::Response::new(())) } async fn request_vote( diff --git a/lol2/src/requester/mod.rs b/lol2/src/requester/mod.rs index 798a5a64..61808e13 100644 --- a/lol2/src/requester/mod.rs +++ b/lol2/src/requester/mod.rs @@ -32,12 +32,23 @@ impl Connection { Ok(()) } - pub async fn process_user_request(&self, req: request::UserRequest) -> Result { - let req = raft::Request { + pub async fn process_user_write_request( + &self, + req: request::UserWriteRequest, + ) -> Result { + let req = raft::WriteRequest { message: req.message, - mutation: req.mutation, + request_id: req.request_id, }; - let resp = self.cli.clone().process(req).await?.into_inner(); + let resp = self.cli.clone().write(req).await?.into_inner(); + Ok(resp.message) + } + + pub async fn process_user_read_request(&self, req: request::UserReadRequest) -> Result { + let req = raft::ReadRequest { + message: req.message, + }; + let resp = self.cli.clone().read(req).await?.into_inner(); Ok(resp.message) } diff --git a/tests/env/src/lib.rs b/tests/env/src/lib.rs index 3ae689dd..1f5eb2b9 100644 --- a/tests/env/src/lib.rs +++ b/tests/env/src/lib.rs @@ -23,6 +23,7 @@ impl Env { containers: HashMap::new(), }) } + pub fn id_from_address(address: &str) -> u8 { let id = address .strip_prefix("http://lol-testapp-") @@ -31,9 +32,11 @@ impl Env { .unwrap(); id.parse().unwrap() } + pub fn address_from_id(id: u8) -> String { format!("http://lol-testapp-{id}:50000") } + pub async fn create(&mut self, id: u8) -> Result<()> { ensure!(!self.containers.contains_key(&id)); let options = container::CreateContainerOptions { @@ -44,7 +47,7 @@ impl Env { image: Some("lol-testapp:latest".to_string()), env: Some(vec![ format!("address=http://lol-testapp-{id}:50000"), - "RUST_LOG=debug".to_string(), + "RUST_LOG=info".to_string(), ]), ..Default::default() }; @@ -53,6 +56,7 @@ impl Env { self.containers.insert(id, Container(container_id)); Ok(()) } + pub async fn start(&mut self, id: u8) -> Result<()> { ensure!(self.containers.contains_key(&id)); let container_id = &self.containers.get(&id).unwrap().0.clone(); @@ -61,24 +65,28 @@ impl Env { .await?; Ok(()) } + pub async fn stop(&mut self, id: u8) -> Result<()> { ensure!(self.containers.contains_key(&id)); let container_id = self.containers.get(&id).unwrap().0.clone(); self.docker.stop_container(&container_id, None).await?; Ok(()) } + pub async fn pause(&mut self, id: u8) -> Result<()> { ensure!(self.containers.contains_key(&id)); let container_id = self.containers.get(&id).unwrap().0.clone(); self.docker.pause_container(&container_id).await?; Ok(()) } + pub async fn resume(&mut self, id: u8) -> Result<()> { ensure!(self.containers.contains_key(&id)); let container_id = self.containers.get(&id).unwrap().0.clone(); self.docker.unpause_container(&container_id).await?; Ok(()) } + pub async fn pause_v2(&mut self, id: u8) -> Result<()> { ensure!(self.containers.contains_key(&id)); let container_id = self.containers.get(&id).unwrap().0.clone(); @@ -98,6 +106,7 @@ impl Env { Ok(()) } + pub async fn resume_v2(&mut self, id: u8) -> Result<()> { ensure!(self.containers.contains_key(&id)); let container_id = self.containers.get(&id).unwrap().0.clone(); @@ -117,6 +126,7 @@ impl Env { Ok(()) } + pub async fn connect_network(&mut self, id: u8) -> Result<()> { ensure!(self.containers.contains_key(&id)); let container_id = self.containers.get(&id).unwrap().0.clone(); @@ -138,6 +148,7 @@ impl Env { Ok(()) } + pub async fn disconnect_network(&mut self, id: u8) -> Result<()> { ensure!(self.containers.contains_key(&id)); let container_id = self.containers.get(&id).unwrap().0.clone(); @@ -148,15 +159,19 @@ impl Env { self.docker.disconnect_network(NETWORK_NAME, config).await?; Ok(()) } + pub async fn ping(&self, id: u8) -> Result<()> { let chan = self.connect(0); let mut cli = testapp::PingClient::new(chan); cli.ping(()).await?; Ok(()) } + pub fn connect(&self, id: u8) -> Channel { let uri: Uri = Self::address_from_id(id).parse().unwrap(); - let endpoint = Endpoint::from(uri).connect_timeout(std::time::Duration::from_secs(3)); + let endpoint = Endpoint::from(uri) + .timeout(std::time::Duration::from_secs(1)) + .connect_timeout(std::time::Duration::from_secs(1)); let chan = endpoint.connect_lazy(); chan } diff --git a/tests/lol-tests/Cargo.toml b/tests/lol-tests/Cargo.toml index da7a8930..1e6f298a 100644 --- a/tests/lol-tests/Cargo.toml +++ b/tests/lol-tests/Cargo.toml @@ -16,6 +16,8 @@ testapp = { path = "../testapp" } [dev-dependencies] env_logger = "*" +futures = "0.3" serial_test = "2" test-log = "0.2.12" tokio = { version = "1", features = ["full"] } +uuid = "1.5" diff --git a/tests/lol-tests/src/lib.rs b/tests/lol-tests/src/lib.rs index 749eaf41..85a02f4a 100644 --- a/tests/lol-tests/src/lib.rs +++ b/tests/lol-tests/src/lib.rs @@ -15,56 +15,40 @@ impl Cluster { } Ok(Self { env }) } + pub fn raw_env(&mut self) -> &mut Env { &mut self.env } + pub fn user(&self, id: u8) -> testapp::Client { let conn = self.env.connect(id); testapp::Client::new(conn) } + fn admin(&self, id: u8) -> RaftClient { let conn = self.env.connect(id); lol2::client::RaftClient::new(conn) } + pub async fn add_server(&mut self, to: u8, id: u8) -> Result<()> { - let mut cli = self.admin(to); - cli.add_server(lol2::client::AddServerRequest { - server_id: Env::address_from_id(id), - }) - .await?; + self.admin(to) + .add_server(lol2::client::AddServerRequest { + server_id: Env::address_from_id(id), + }) + .await?; + // make sure the new server is aquiainted with the current leader. + self.user(id).fetch_add(0).await?; Ok(()) } + pub async fn remove_server(&mut self, to: u8, id: u8) -> Result<()> { - let mut cli = self.admin(to); - cli.remove_server(lol2::client::RemoveServerRequest { - server_id: Env::address_from_id(id), - }) - .await?; - Ok(()) - } - pub async fn try_commit(&mut self, to: u8) -> Result<()> { - let mut cli = self.admin(to); - wait_for_noop_commit(&mut cli).await?; + self.admin(to) + .remove_server(lol2::client::RemoveServerRequest { + server_id: Env::address_from_id(id), + }) + .await?; + eprintln!("removed"); + self.user(to).fetch_add(0).await?; Ok(()) } } - -async fn wait_for_noop_commit(cli: &mut RaftClient) -> Result<(), tonic::Status> { - use tokio_retry::strategy::{jitter, ExponentialBackoff}; - use tokio_retry::Retry; - - // 200ms, 400, 800, 1600, 3200 - let strategy = ExponentialBackoff::from_millis(2) - .factor(100) - .map(jitter) - .take(5); - - let fut = Retry::spawn(strategy, || { - let mut cli = cli.clone(); - async move { - let req = tonic::Request::new(()); - cli.noop(req).await - } - }); - fut.await.map(|_| ()) -} diff --git a/tests/lol-tests/tests/one_node.rs b/tests/lol-tests/tests/one_node.rs index cf5ef3fc..dbdb07f4 100644 --- a/tests/lol-tests/tests/one_node.rs +++ b/tests/lol-tests/tests/one_node.rs @@ -5,19 +5,19 @@ use test_log::test; #[serial] #[test(tokio::test(flavor = "multi_thread"))] -async fn cluster_1() -> Result<()> { +async fn n1_cluster() -> Result<()> { let mut cluster = Cluster::new(1).await?; cluster.add_server(0, 0).await?; + Ok(()) } #[serial] #[test(tokio::test(flavor = "multi_thread"))] -async fn write_1() -> Result<()> { +async fn n1_write() -> Result<()> { let mut cluster = Cluster::new(1).await?; cluster.add_server(0, 0).await?; - cluster.try_commit(0).await?; let mut cli = cluster.user(0); assert_eq!(cli.fetch_add(1).await?, 0); assert_eq!(cli.fetch_add(2).await?, 1); @@ -28,11 +28,10 @@ async fn write_1() -> Result<()> { #[serial] #[test(tokio::test(flavor = "multi_thread"))] -async fn read_1() -> Result<()> { +async fn n1_read() -> Result<()> { let mut cluster = Cluster::new(1).await?; cluster.add_server(0, 0).await?; - cluster.try_commit(0).await?; let mut cli = cluster.user(0); assert_eq!(cli.read().await?, 0); assert_eq!(cli.fetch_add(1).await?, 0); @@ -45,11 +44,10 @@ async fn read_1() -> Result<()> { #[serial] #[test(tokio::test(flavor = "multi_thread"))] -async fn snapshot_1() -> Result<()> { +async fn n1_snapshot() -> Result<()> { let mut cluster = Cluster::new(1).await?; cluster.add_server(0, 0).await?; - cluster.try_commit(0).await?; for n in 1..10 { cluster.user(0).fetch_add(n).await?; } @@ -62,3 +60,36 @@ async fn snapshot_1() -> Result<()> { Ok(()) } + +#[serial] +#[test(tokio::test(flavor = "multi_thread"))] +async fn n1_many_retry_exec_once() -> Result<()> { + let mut cluster = Cluster::new(1).await?; + cluster.add_server(0, 0).await?; + + let chan = cluster.raw_env().connect(0); + let cli = lol2::client::RaftClient::new(chan); + + let req = lol2::client::WriteRequest { + message: testapp::AppWriteRequest::FetchAdd { + bytes: vec![1u8; 1].into(), + } + .serialize(), + request_id: uuid::Uuid::new_v4().to_string(), + }; + + let mut futs = vec![]; + for _ in 0..100 { + let mut cli = cli.clone(); + let req = req.clone(); + let fut = async move { cli.write(req).await }; + futs.push(fut); + } + futures::future::join_all(futs).await; + + let cur_state = cluster.user(0).read().await?; + // executed only once. + assert_eq!(cur_state, 1); + + Ok(()) +} diff --git a/tests/lol-tests/tests/three_nodes.rs b/tests/lol-tests/tests/three_nodes.rs index 4387e0ad..22d7bf15 100644 --- a/tests/lol-tests/tests/three_nodes.rs +++ b/tests/lol-tests/tests/three_nodes.rs @@ -5,31 +5,22 @@ use test_log::test; #[serial] #[test(tokio::test(flavor = "multi_thread"))] -async fn cluster_3() -> Result<()> { +async fn n3_cluster() -> Result<()> { let mut cluster = Cluster::new(3).await?; cluster.add_server(0, 0).await?; - - cluster.try_commit(0).await?; cluster.add_server(0, 1).await?; - - cluster.try_commit(1).await?; cluster.add_server(1, 2).await?; Ok(()) } #[serial] #[test(tokio::test(flavor = "multi_thread"))] -async fn write_3() -> Result<()> { +async fn n3_write() -> Result<()> { let mut cluster = Cluster::new(3).await?; cluster.add_server(0, 0).await?; - - cluster.try_commit(0).await?; cluster.add_server(0, 1).await?; - - cluster.try_commit(1).await?; cluster.add_server(1, 2).await?; - cluster.try_commit(2).await?; assert_eq!(cluster.user(2).fetch_add(1).await?, 0); assert_eq!(cluster.user(1).fetch_add(10).await?, 1); assert_eq!(cluster.user(0).fetch_add(100).await?, 11); @@ -40,11 +31,10 @@ async fn write_3() -> Result<()> { #[serial] #[test(tokio::test(flavor = "multi_thread"))] -async fn snapshot_3() -> Result<()> { +async fn n3_snapshot() -> Result<()> { let mut cluster = Cluster::new(3).await?; cluster.add_server(0, 0).await?; - cluster.try_commit(0).await?; cluster.user(0).fetch_add(1).await?; cluster.user(0).fetch_add(10).await?; cluster.user(0).fetch_add(100).await?; @@ -61,11 +51,10 @@ async fn snapshot_3() -> Result<()> { #[serial] #[test(tokio::test(flavor = "multi_thread"))] -async fn leader_stop_3() -> Result<()> { +async fn n3_leader_stop() -> Result<()> { let mut cluster = Cluster::new(3).await?; cluster.add_server(0, 0).await?; - cluster.try_commit(0).await?; for i in 0..10 { cluster.user(0).fetch_add(i).await?; } @@ -76,7 +65,6 @@ async fn leader_stop_3() -> Result<()> { cluster.raw_env().stop(0).await?; tokio::time::sleep(tokio::time::Duration::from_secs(3)).await; - cluster.try_commit(1).await?; for i in 0..10 { cluster.user(1).fetch_add(i).await?; } @@ -86,11 +74,10 @@ async fn leader_stop_3() -> Result<()> { #[serial] #[test(tokio::test(flavor = "multi_thread"))] -async fn leader_stepdown_3() -> Result<()> { +async fn n3_leader_stepdown() -> Result<()> { let mut cluster = Cluster::new(3).await?; cluster.add_server(0, 0).await?; - cluster.try_commit(0).await?; for i in 0..10 { cluster.user(0).fetch_add(i).await?; } @@ -98,17 +85,32 @@ async fn leader_stepdown_3() -> Result<()> { cluster.add_server(0, 1).await?; cluster.add_server(0, 2).await?; - cluster.try_commit(1).await?; cluster.remove_server(1, 0).await?; eprintln!("removed nd0 -> ok"); tokio::time::sleep(tokio::time::Duration::from_secs(3)).await; - cluster.try_commit(1).await?; - eprintln!("try_commit to nd1 -> ok"); - for i in 0..10 { cluster.user(1).fetch_add(i).await?; } Ok(()) } + +#[serial] +#[test(tokio::test(flavor = "multi_thread"))] +async fn n3_down2() -> Result<()> { + let mut cluster = Cluster::new(3).await?; + cluster.add_server(0, 0).await?; + cluster.add_server(0, 1).await?; + cluster.add_server(0, 2).await?; + + cluster.user(0).fetch_add(1).await?; + + cluster.raw_env().stop(1).await?; + cluster.user(0).fetch_add(2).await?; + + cluster.raw_env().stop(2).await?; + assert!(cluster.user(0).fetch_add(4).await.is_err()); + + Ok(()) +} diff --git a/tests/testapp/Cargo.toml b/tests/testapp/Cargo.toml index 4d780a4c..335542a1 100644 --- a/tests/testapp/Cargo.toml +++ b/tests/testapp/Cargo.toml @@ -18,8 +18,10 @@ shrinkwraprs = "0.3" signal-hook = "0.3" signal-hook-tokio = { version = "0.3", features = ["futures-v0_3"] } tokio = { version = "1", features = ["full"] } +tokio-retry = "0.3" tokio-util = "0.7" tonic = "0.10" +uuid = "1.5" lol2 = { workspace = true } diff --git a/tests/testapp/src/app/mod.rs b/tests/testapp/src/app/mod.rs index ff6171f1..d235530b 100644 --- a/tests/testapp/src/app/mod.rs +++ b/tests/testapp/src/app/mod.rs @@ -32,17 +32,24 @@ impl AppSnapshot { AppSnapshot(cur_state) } } + +struct InnerState { + index: u64, + counter: u64, +} struct AppMain { - state_index: AtomicU64, - counter: AtomicU64, + state: RwLock, snapshots: RwLock>, } impl AppMain { pub fn new() -> Self { - let mut snapshots = BTreeMap::new(); + let init_state = InnerState { + index: 0, + counter: 0, + }; + let snapshots = BTreeMap::new(); Self { - state_index: AtomicU64::new(0), - counter: AtomicU64::new(0), + state: RwLock::new(init_state), snapshots: RwLock::new(snapshots), } } @@ -50,17 +57,22 @@ impl AppMain { #[async_trait::async_trait] impl RaftApp for AppMain { async fn process_write(&self, bytes: &[u8], entry_index: Index) -> Result { + let mut cur_state = self.state.write().unwrap(); + let req = testapp::AppWriteRequest::deserialize(bytes); let old_state = match req { AppWriteRequest::FetchAdd { bytes } => { let add_val = bytes.len() as u64; - let n = self.counter.fetch_add(add_val, Ordering::SeqCst); + let n = cur_state.counter; + cur_state.counter += add_val; n } }; - self.state_index.store(entry_index, Ordering::SeqCst); + cur_state.index = entry_index; + Ok(AppState(old_state).serialize()) } + async fn install_snapshot(&self, snapshot_index: Index) -> Result<()> { let snapshot = if snapshot_index == 1 { AppState(0) @@ -68,27 +80,30 @@ impl RaftApp for AppMain { ensure!(self.snapshots.read().unwrap().contains_key(&snapshot_index)); *self.snapshots.read().unwrap().get(&snapshot_index).unwrap() }; - self.state_index.store(snapshot_index, Ordering::SeqCst); - self.counter.store(snapshot.0, Ordering::SeqCst); + + let mut cur_state = self.state.write().unwrap(); + cur_state.index = snapshot_index; + cur_state.counter = snapshot.0; + Ok(()) } + async fn process_read(&self, bytes: &[u8]) -> Result { - let cur_state = self.counter.load(Ordering::SeqCst); + let cur_state = self.state.read().unwrap(); let req = testapp::AppReadRequest::deserialize(bytes); - let cur_state = match req { + match req { AppReadRequest::MakeSnapshot => { - let n = self.counter.load(Ordering::SeqCst); - let idx = self.state_index.load(Ordering::SeqCst); + let idx = cur_state.index; let mut snapshots = self.snapshots.write().unwrap(); - snapshots.insert(idx, AppState(n)); - n + snapshots.insert(idx, AppState(cur_state.counter)); } - AppReadRequest::Read => cur_state, + AppReadRequest::Read => {} }; - Ok(AppState(cur_state).serialize()) + Ok(AppState(cur_state.counter).serialize()) } + async fn save_snapshot(&self, st: SnapshotStream, snapshot_index: Index) -> Result<()> { let snap = AppSnapshot::from_stream(st).await; self.snapshots @@ -97,6 +112,7 @@ impl RaftApp for AppMain { .insert(snapshot_index, snap.0); Ok(()) } + async fn open_snapshot(&self, x: Index) -> Result { ensure!(self.snapshots.read().unwrap().contains_key(&x)); let cur_state = *self.snapshots.read().unwrap().get(&x).unwrap(); @@ -104,12 +120,14 @@ impl RaftApp for AppMain { let st = snap.into_stream(); Ok(st) } + async fn delete_snapshots_before(&self, x: Index) -> Result<()> { let mut snapshots = self.snapshots.write().unwrap(); let latter = snapshots.split_off(&x); *snapshots = latter; Ok(()) } + async fn propose_new_snapshot(&self) -> Result { let k = { let mut out = vec![]; @@ -140,6 +158,7 @@ impl RaftBallotStore for AppBallot { *self.inner.write().unwrap() = v; Ok(()) } + async fn load_ballot(&self) -> Result { let v = self.inner.read().unwrap().clone(); Ok(v) @@ -162,16 +181,19 @@ impl RaftLogStore for AppLog { self.inner.write().unwrap().insert(i, e); Ok(()) } + async fn delete_entries_before(&self, i: Index) -> Result<()> { let mut inner = self.inner.write().unwrap(); let latter = inner.split_off(&i); *inner = latter; Ok(()) } + async fn get_entry(&self, i: Index) -> Result> { let e: Option = self.inner.read().unwrap().get(&i).cloned(); Ok(e) } + async fn get_head_index(&self) -> Result { let reader = self.inner.read().unwrap(); let n = match reader.first_key_value() { @@ -180,6 +202,7 @@ impl RaftLogStore for AppLog { }; Ok(n) } + async fn get_last_index(&self) -> Result { let reader = self.inner.read().unwrap(); let n = match reader.last_key_value() { diff --git a/tests/testapp/src/lib.rs b/tests/testapp/src/lib.rs index 51dcd910..7b43ddd9 100644 --- a/tests/testapp/src/lib.rs +++ b/tests/testapp/src/lib.rs @@ -54,33 +54,51 @@ impl Client { let cli = RaftClient::new(conn); Self { cli } } + pub async fn fetch_add(&mut self, n: u64) -> Result { - let req = Request { + let request_id = uuid::Uuid::new_v4().to_string(); + let req = WriteRequest { message: AppWriteRequest::FetchAdd { bytes: vec![1u8; n as usize].into(), } .serialize(), - mutation: true, + request_id, }; - let resp = self.cli.process(req).await?.into_inner(); + + use tokio_retry::strategy::{jitter, ExponentialBackoff}; + use tokio_retry::Retry; + + // 200ms, 400, 800 + let strategy = ExponentialBackoff::from_millis(2) + .factor(100) + .map(jitter) + .take(3); + + let fut = Retry::spawn(strategy, || { + let mut cli = self.cli.clone(); + let req = req.clone(); + async move { cli.write(req).await } + }); + + let resp = fut.await?.into_inner(); let resp = AppState::deserialize(&resp.message); Ok(resp.0) } + pub async fn read(&self) -> Result { - let req = Request { + let req = ReadRequest { message: AppReadRequest::Read.serialize(), - mutation: false, }; - let resp = self.cli.clone().process(req).await?.into_inner(); + let resp = self.cli.clone().read(req).await?.into_inner(); let resp = AppState::deserialize(&resp.message); Ok(resp.0) } + pub async fn make_snapshot(&self) -> Result { - let req = Request { + let req = ReadRequest { message: AppReadRequest::MakeSnapshot.serialize(), - mutation: false, }; - let resp = self.cli.clone().process(req).await?.into_inner(); + let resp = self.cli.clone().read(req).await?.into_inner(); let resp = AppState::deserialize(&resp.message); Ok(resp.0) }