From 5301a2a9b80e5235d64e4a0a9a7de181e2537a87 Mon Sep 17 00:00:00 2001 From: Akira Hayakawa Date: Sat, 10 Feb 2024 19:40:41 +0900 Subject: [PATCH] rework3: Implement Multi-Raft (#326) Multi-raft --- .github/workflows/ci.yml | 43 +++++++-- Dockerfile | 2 +- README.md | 9 +- docker-compose.ci.yml | 20 +++++ lol2/proto/lol2.proto | 49 ++++++---- lol2/src/client.rs | 4 +- lol2/src/{requester => communicator}/mod.rs | 30 +++++-- .../src/{requester => communicator}/stream.rs | 2 + lol2/src/lib.rs | 12 +-- lol2/src/node.rs | 45 ++++++---- lol2/src/process/app.rs | 2 +- lol2/src/process/command_log/consumer.rs | 2 +- lol2/src/process/command_log/mod.rs | 2 +- lol2/src/process/mod.rs | 62 +++++++++---- lol2/src/process/peer_svc/mod.rs | 6 +- lol2/src/process/peer_svc/replication.rs | 4 +- lol2/src/process/query_queue.rs | 8 +- lol2/src/process/raft_process/cluster.rs | 10 +-- lol2/src/process/raft_process/mod.rs | 12 +-- lol2/src/process/raft_process/responder.rs | 6 +- lol2/src/process/snapshot.rs | 48 +--------- lol2/src/process/voter/election.rs | 14 +-- lol2/src/process/voter/heartbeat.rs | 2 +- lol2/src/process/voter/stepdown.rs | 2 +- lol2/src/raft_service/mod.rs | 72 ++++++++++++--- lol2/src/raft_service/stream.rs | 14 +-- tests/env/src/lib.rs | 66 +------------- tests/env/tests/tests.rs | 89 ------------------- tests/lol-tests/src/lib.rs | 2 + tests/lol-tests/tests/one_node.rs | 1 + tests/testapp/Dockerfile | 14 ++- tests/testapp/src/app/mod.rs | 17 ++-- tests/testapp/src/app/snapshot_io.rs | 47 ++++++++++ tests/testapp/src/lib.rs | 8 ++ tests/testapp/src/main.rs | 6 +- 35 files changed, 396 insertions(+), 336 deletions(-) create mode 100644 docker-compose.ci.yml rename lol2/src/{requester => communicator}/mod.rs (76%) rename lol2/src/{requester => communicator}/stream.rs (97%) create mode 100644 tests/testapp/src/app/snapshot_io.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 3847df16..125d2761 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -3,13 +3,46 @@ name: CI on: pull_request: branches: - - develop - master jobs: - run_test: + docker: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 - - name: Compile - run: ./dev cargo build \ No newline at end of file + - name: Checkout + uses: actions/checkout@v4 + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + + - name: Login to Docker Hub + uses: docker/login-action@v3 + with: + username: ${{ secrets.DOCKERHUB_USERNAME }} + password: ${{ secrets.DOCKERHUB_TOKEN }} + + - name: Build (dev) + uses: docker/build-push-action@v5 + with: + context: . + push: false + load: true + tags: lol-dev:latest + cache-from: type=gha,scope=dev + cache-to: type=gha,mode=max,scope=dev + + - name: Build (testapp) + uses: docker/build-push-action@v5 + with: + context: . + file: tests/testapp/Dockerfile + push: false + load: true + tags: lol-testapp:latest + cache-from: type=gha,scope=testapp + cache-to: type=gha,mode=max,scope=testapp + + - name: Run tests + run: | + docker compose -f docker-compose.ci.yml build + docker compose -f docker-compose.ci.yml run --rm dev cargo test \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index 1f566893..517a2a1a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM 'rust:1.72-bookworm' +FROM 'rust:1.75-bookworm' WORKDIR '/work' diff --git a/README.md b/README.md index 0aac03e0..c8e918f4 100644 --- a/README.md +++ b/README.md @@ -8,11 +8,13 @@ A Raft implementation in Rust language. To support this project please give it a ⭐ -![](https://user-images.githubusercontent.com/785824/146726060-63b12378-ecb7-49f9-8025-a65dbd37e9b2.jpeg) +![146726060-63b12378-ecb7-49f9-8025-a65dbd37e9b2](https://github.com/akiradeveloper/lol/assets/785824/12a016fe-35a0-4d12-8ffa-955ef61b25b9) + ## Features -![multi-raft](https://github.com/akiradeveloper/lol/assets/785824/2293cf2b-436a-45ed-a507-88e299e622bf) +![スクリーンショット 2024-02-03 13 29 55](https://github.com/akiradeveloper/lol/assets/785824/f6a6ceef-98f3-4fcf-9ba8-3655d52bd3f0) + - Implements all fundamental [Raft](https://raft.github.io/) features for production use. - Supports Multi-Raft. Mutliple Raft processes can coexist in a single OS process so they can share resources efficiently. @@ -24,7 +26,8 @@ A Raft implementation in Rust language. To support this project please give it a To implement Multi-Raft, the architecture is divided into two spaces. One in the lower side is called "Pure Raft" layer which is totally unaware of gRPC and Multi-Raft. Therefore, called pure. The other side translates gRPC requests into pure requests and vice versa. -![lol2 (1)](https://github.com/akiradeveloper/lol/assets/785824/dc605ed9-ca90-4e17-a370-ca1b939e43ce) +![スクリーンショット 2024-02-03 13 30 09](https://github.com/akiradeveloper/lol/assets/785824/fd064ba6-be20-4934-839a-db8cd07a8f13) + ## Development diff --git a/docker-compose.ci.yml b/docker-compose.ci.yml new file mode 100644 index 00000000..617907ab --- /dev/null +++ b/docker-compose.ci.yml @@ -0,0 +1,20 @@ +version: '3' + +services: + dev: + image: lol-dev:latest + working_dir: /work + volumes: + - ./:/work + - /var/run/docker.sock:/var/run/docker.sock + networks: + - raft-network + + testapp: + image: lol-testapp:latest + entrypoint: + - /bin/sh + +networks: + raft-network: + driver: bridge \ No newline at end of file diff --git a/lol2/proto/lol2.proto b/lol2/proto/lol2.proto index fad781b8..87066fce 100644 --- a/lol2/proto/lol2.proto +++ b/lol2/proto/lol2.proto @@ -5,14 +5,16 @@ import "google/protobuf/empty.proto"; package lol2; message WriteRequest { - bytes message = 1; + uint32 lane_id = 1; + bytes message = 2; // unique identifier of this request // duplicated requests with the same unique identifier are only executed once. - string request_id = 2; + string request_id = 3; } message ReadRequest { - bytes message = 1; + uint32 lane_id = 1; + bytes message = 2; } message Response { @@ -25,12 +27,14 @@ message Clock { } message KernRequest { - bytes message = 1; + uint32 lane_id = 1; + bytes message = 2; } message LogStreamHeader { - string sender_id = 1; - Clock prev_clock = 2; + uint32 lane_id = 1; + string sender_id = 2; + Clock prev_clock = 3; } message LogStreamEntry { Clock clock = 1; @@ -48,35 +52,44 @@ message SendLogStreamResponse { } message GetSnapshotRequest { - uint64 index = 1; + uint32 lane_id = 1; + uint64 index = 2; } message SnapshotChunk { bytes data = 1; } message VoteRequest { - uint64 vote_term = 1; - string candidate_id = 2; - Clock candidate_clock = 3; - bool force_vote = 4; - bool pre_vote = 5; + uint32 lane_id = 1; + uint64 vote_term = 2; + string candidate_id = 3; + Clock candidate_clock = 4; + bool force_vote = 5; + bool pre_vote = 6; } message VoteResponse { bool vote_granted = 1; } message Heartbeat { - uint64 leader_term = 1; - string leader_id = 2; - uint64 leader_commit_index = 3; + uint32 lane_id = 1; + uint64 leader_term = 2; + string leader_id = 3; + uint64 leader_commit_index = 4; } message AddServerRequest { - string server_id = 1; + uint32 lane_id = 1; + string server_id = 2; } message RemoveServerRequest { - string server_id = 1; + uint32 lane_id = 1; + string server_id = 2; +} + +message TimeoutNowRequest { + uint32 lane_id = 1; } service Raft { @@ -89,5 +102,5 @@ service Raft { rpc SendLogStream (stream LogStreamChunk) returns (SendLogStreamResponse); rpc GetSnapshot (GetSnapshotRequest) returns (stream SnapshotChunk); rpc SendHeartbeat (Heartbeat) returns (google.protobuf.Empty); - rpc TimeoutNow (google.protobuf.Empty) returns (google.protobuf.Empty); + rpc TimeoutNow (TimeoutNowRequest) returns (google.protobuf.Empty); } \ No newline at end of file diff --git a/lol2/src/client.rs b/lol2/src/client.rs index dcc5a8cf..d21aee5c 100644 --- a/lol2/src/client.rs +++ b/lol2/src/client.rs @@ -1,4 +1,6 @@ use super::*; pub type RaftClient = raft::raft_client::RaftClient; -pub use raft::{AddServerRequest, ReadRequest, RemoveServerRequest, Response, WriteRequest}; +pub use raft::{ + AddServerRequest, ReadRequest, RemoveServerRequest, Response, TimeoutNowRequest, WriteRequest, +}; diff --git a/lol2/src/requester/mod.rs b/lol2/src/communicator/mod.rs similarity index 76% rename from lol2/src/requester/mod.rs rename to lol2/src/communicator/mod.rs index 61808e13..71536b9e 100644 --- a/lol2/src/requester/mod.rs +++ b/lol2/src/communicator/mod.rs @@ -5,18 +5,22 @@ use process::*; mod stream; -pub struct Connection { +pub struct Communicator { cli: raft::RaftClient, + lane_id: LaneId, } -impl Connection { - pub fn new(cli: raft::RaftClient) -> Self { - Self { cli } +impl Communicator { + pub fn new(cli: raft::RaftClient, lane_id: LaneId) -> Self { + Self { cli, lane_id } } } -impl Connection { - pub async fn get_snapshot(&self, index: Index) -> Result { - let req = raft::GetSnapshotRequest { index }; +impl Communicator { + pub async fn get_snapshot(&self, index: Index) -> Result { + let req = raft::GetSnapshotRequest { + lane_id: self.lane_id, + index, + }; let st = self.cli.clone().get_snapshot(req).await?.into_inner(); let st = Box::pin(stream::into_internal_snapshot_stream(st)); Ok(st) @@ -24,6 +28,7 @@ impl Connection { pub async fn send_heartbeat(&self, req: request::Heartbeat) -> Result<()> { let req = raft::Heartbeat { + lane_id: self.lane_id, leader_id: req.leader_id.to_string(), leader_term: req.leader_term, leader_commit_index: req.leader_commit_index, @@ -37,6 +42,7 @@ impl Connection { req: request::UserWriteRequest, ) -> Result { let req = raft::WriteRequest { + lane_id: self.lane_id, message: req.message, request_id: req.request_id, }; @@ -46,6 +52,7 @@ impl Connection { pub async fn process_user_read_request(&self, req: request::UserReadRequest) -> Result { let req = raft::ReadRequest { + lane_id: self.lane_id, message: req.message, }; let resp = self.cli.clone().read(req).await?.into_inner(); @@ -54,6 +61,7 @@ impl Connection { pub async fn process_kern_request(&self, req: request::KernRequest) -> Result<()> { let req = raft::KernRequest { + lane_id: self.lane_id, message: req.message, }; self.cli.clone().process_kern_request(req).await?; @@ -61,12 +69,15 @@ impl Connection { } pub async fn send_timeout_now(&self) -> Result<()> { - self.cli.clone().timeout_now(()).await?; + let req = raft::TimeoutNowRequest { + lane_id: self.lane_id, + }; + self.cli.clone().timeout_now(req).await?; Ok(()) } pub async fn send_log_stream(&self, st: LogStream) -> Result { - let st = stream::into_external_log_stream(st); + let st = stream::into_external_log_stream(self.lane_id, st); let resp = self.cli.clone().send_log_stream(st).await?.into_inner(); Ok(response::SendLogStream { success: resp.success, @@ -76,6 +87,7 @@ impl Connection { pub async fn request_vote(&self, req: request::RequestVote) -> Result { let req = raft::VoteRequest { + lane_id: self.lane_id, candidate_id: req.candidate_id.to_string(), candidate_clock: { let e = req.candidate_clock; diff --git a/lol2/src/requester/stream.rs b/lol2/src/communicator/stream.rs similarity index 97% rename from lol2/src/requester/stream.rs rename to lol2/src/communicator/stream.rs index b387faa5..4d0dc3c8 100644 --- a/lol2/src/requester/stream.rs +++ b/lol2/src/communicator/stream.rs @@ -1,10 +1,12 @@ use super::*; pub fn into_external_log_stream( + lane_id: LaneId, st: LogStream, ) -> impl futures::stream::Stream { use raft::log_stream_chunk::Elem as ChunkElem; let header_stream = vec![ChunkElem::Header(raft::LogStreamHeader { + lane_id, sender_id: st.sender_id.to_string(), prev_clock: Some(raft::Clock { term: st.prev_clock.term, diff --git a/lol2/src/lib.rs b/lol2/src/lib.rs index 23cee396..2a203883 100644 --- a/lol2/src/lib.rs +++ b/lol2/src/lib.rs @@ -3,9 +3,9 @@ pub mod process; pub mod client; +mod communicator; mod node; pub mod raft_service; -mod requester; use anyhow::Result; use bytes::Bytes; @@ -16,13 +16,14 @@ use process::RaftProcess; use std::future::Future; use std::sync::Arc; use std::time::Duration; -use tonic::transport::{Endpoint, Uri}; +use tonic::transport::Uri; mod raft { tonic::include_proto!("lol2"); pub type RaftClient = raft_client::RaftClient; } +/// Identifier of a `RaftNode`. #[derive( serde::Serialize, serde::Deserialize, @@ -35,12 +36,11 @@ mod raft { derive_more::FromStr, )] pub struct NodeId(#[serde(with = "http_serde::uri")] Uri); + impl NodeId { pub fn new(uri: Uri) -> Self { Self(uri) } - pub fn from_str(url: &str) -> Result { - let url = url.parse()?; - Ok(Self(url)) - } } + +pub type LaneId = u32; diff --git a/lol2/src/node.rs b/lol2/src/node.rs index b3583511..53e058f0 100644 --- a/lol2/src/node.rs +++ b/lol2/src/node.rs @@ -1,57 +1,72 @@ use super::*; +use std::collections::HashMap; + pub struct Inner { - selfid: NodeId, + self_node_id: NodeId, cache: moka::sync::Cache, - process: once_cell::sync::OnceCell, + process: spin::RwLock>, } +/// `RaftNode` contains a set of `RaftProcess`es. #[derive(shrinkwraprs::Shrinkwrap, Clone)] pub struct RaftNode(Arc); impl RaftNode { + /// Create a new Raft node with a given node ID. pub fn new(id: NodeId) -> Self { let builder = moka::sync::Cache::builder() .initial_capacity(1000) .time_to_live(Duration::from_secs(60)); let inner = Inner { - selfid: id, + self_node_id: id, cache: builder.build(), - process: once_cell::sync::OnceCell::new(), + process: HashMap::new().into(), }; Self(inner.into()) } - pub fn get_driver(&self) -> RaftDriver { + /// Get a Raft driver to drive a Raft process on a lane. + pub fn get_driver(&self, lane_id: LaneId) -> RaftDriver { RaftDriver { - selfid: self.selfid.clone(), + lane_id, + self_node_id: self.self_node_id.clone(), cache: self.cache.clone(), } } - pub fn attach_process(&self, p: RaftProcess) { - self.process.set(p).ok(); + /// Attach a Raft process to a lane. + pub fn attach_process(&self, lane_id: LaneId, p: RaftProcess) { + self.process.write().insert(lane_id, p); + } + + /// Detach a Raft process from a lane. + pub fn detach_process(&self, lane_id: LaneId) { + self.process.write().remove(&lane_id); } - pub(crate) fn get_process(&self) -> &RaftProcess { - self.process.get().unwrap() + pub(crate) fn get_process(&self, lane_id: LaneId) -> Option { + self.process.read().get(&lane_id).cloned() } } +/// `RaftDriver` is a context to drive a `RaftProcess`. #[derive(Clone)] pub struct RaftDriver { - selfid: NodeId, + lane_id: LaneId, + self_node_id: NodeId, cache: moka::sync::Cache, } impl RaftDriver { - pub(crate) fn selfid(&self) -> NodeId { - self.selfid.clone() + pub(crate) fn self_node_id(&self) -> NodeId { + self.self_node_id.clone() } - pub(crate) fn connect(&self, id: NodeId) -> requester::Connection { + + pub(crate) fn connect(&self, id: NodeId) -> communicator::Communicator { let conn = self.cache.get_with(id.clone(), || { let endpoint = tonic::transport::Endpoint::from(id.0); let chan = endpoint.connect_lazy(); raft::RaftClient::new(chan) }); - requester::Connection::new(conn) + communicator::Communicator::new(conn, self.lane_id) } } diff --git a/lol2/src/process/app.rs b/lol2/src/process/app.rs index ccbb3936..ddc9409f 100644 --- a/lol2/src/process/app.rs +++ b/lol2/src/process/app.rs @@ -13,7 +13,7 @@ impl App { owner: NodeId, driver: RaftDriver, ) -> Result<()> { - if owner == driver.selfid() { + if owner == driver.self_node_id() { return Ok(()); } if index < 2 { diff --git a/lol2/src/process/command_log/consumer.rs b/lol2/src/process/command_log/consumer.rs index 6b73f869..1701a7d8 100644 --- a/lol2/src/process/command_log/consumer.rs +++ b/lol2/src/process/command_log/consumer.rs @@ -14,7 +14,7 @@ impl CommandLog { pub async fn advance_snapshot_index(&self) -> Result<()> { let cur_snapshot_index = self.snapshot_pointer.load(Ordering::SeqCst); - let proposed_snapshot_index = self.app.propose_new_snapshot().await?; + let proposed_snapshot_index = self.app.get_latest_snapshot().await?; if proposed_snapshot_index > cur_snapshot_index { info!("find a newer proposed snapshot@{proposed_snapshot_index}. will move the snapshot index."); diff --git a/lol2/src/process/command_log/mod.rs b/lol2/src/process/command_log/mod.rs index bdc8d94d..5c25ba9b 100644 --- a/lol2/src/process/command_log/mod.rs +++ b/lol2/src/process/command_log/mod.rs @@ -104,7 +104,7 @@ impl Inner { Ok(()) } - pub async fn open_snapshot(&self, index: Index) -> Result { + pub async fn open_snapshot(&self, index: Index) -> Result { let _g = self.snapshot_lock.read().await; let cur_snapshot_index = self.snapshot_pointer.load(Ordering::SeqCst); diff --git a/lol2/src/process/mod.rs b/lol2/src/process/mod.rs index 79cbb127..04d71687 100644 --- a/lol2/src/process/mod.rs +++ b/lol2/src/process/mod.rs @@ -32,7 +32,6 @@ use completion::*; mod raft_process; pub use raft_process::RaftProcess; mod thread; -pub use snapshot::SnapshotStream; pub type Term = u64; pub type Index = u64; @@ -69,47 +68,80 @@ impl Ballot { } } -pub struct LogStream { +pub(crate) struct LogStream { pub sender_id: NodeId, pub prev_clock: Clock, pub entries: std::pin::Pin + Send>>, } -pub struct LogStreamElem { +pub(crate) struct LogStreamElem { pub this_clock: Clock, pub command: Bytes, } #[derive(shrinkwraprs::Shrinkwrap, Clone)] -pub struct Ref(T); +struct Ref(T); +/// `RaftApp` is the representation of state machine in Raft. +/// Beside the application state, it also contains the snapshot store +/// where snapshot data is stored with a snapshot index as a key. #[async_trait::async_trait] pub trait RaftApp: Sync + Send + 'static { + /// Apply read request to the application. + /// Calling of this function should not change the state of the application. async fn process_read(&self, request: &[u8]) -> Result; + + /// Apply write request to the application. + /// Calling of this function may change the state of the application. async fn process_write(&self, request: &[u8], entry_index: Index) -> Result; - async fn install_snapshot(&self, snapshot: Index) -> Result<()>; - async fn save_snapshot(&self, st: SnapshotStream, snapshot_index: Index) -> Result<()>; - async fn open_snapshot(&self, x: Index) -> Result; - async fn delete_snapshots_before(&self, x: Index) -> Result<()>; - async fn propose_new_snapshot(&self) -> Result; + + /// Replace the state of the application with the snapshot. + /// The snapshot is guaranteed to exist in the snapshot store. + async fn install_snapshot(&self, snapshot_index: Index) -> Result<()>; + + /// Save snapshot with index `snapshot_index` to the snapshot store. + /// This function is called when the snapshot is fetched from the leader. + async fn save_snapshot(&self, st: snapshot::Stream, snapshot_index: Index) -> Result<()>; + + /// Read existing snapshot with index `snapshot_index` from the snapshot store. + /// This function is called when a follower requests a snapshot from the leader. + async fn open_snapshot(&self, snapshot_index: Index) -> Result; + + /// Delete all the snapshots in range [, i) from the snapshot store. + async fn delete_snapshots_before(&self, i: Index) -> Result<()>; + + /// Get the index of the latest snapshot in the snapshot store. + /// If the index is greater than the current snapshot entry index, + /// it will replace the snapshot entry with the new one. + async fn get_latest_snapshot(&self) -> Result; } +/// `RaftLogStore` is the representation of the log store in Raft. +/// Conceptually, it is like `RwLock>`. #[async_trait::async_trait] pub trait RaftLogStore: Sync + Send + 'static { + /// Insert the entry at index `i` into the log. async fn insert_entry(&self, i: Index, e: Entry) -> Result<()>; + + /// Delete all the entries in range [, i) from the log. async fn delete_entries_before(&self, i: Index) -> Result<()>; + + /// Get the entry at index `i` from the log. async fn get_entry(&self, i: Index) -> Result>; + + /// Get the index of the first entry in the log. async fn get_head_index(&self) -> Result; + + /// Get the index of the last entry in the log. async fn get_last_index(&self) -> Result; } +/// `RaftBallotStore` is the representation of the ballot store in Raft. +/// Conceptually, it is like `RwLock`. #[async_trait::async_trait] pub trait RaftBallotStore: Sync + Send + 'static { + /// Replace the current ballot with the new one. async fn save_ballot(&self, v: Ballot) -> Result<()>; - async fn load_ballot(&self) -> Result; -} -#[derive(Clone, Hash, PartialEq, Eq, serde::Serialize, serde::Deserialize, Debug)] -pub struct ExecutionKey { - pub client_id: String, - pub seq_num: u64, + /// Get the current ballot. + async fn load_ballot(&self) -> Result; } diff --git a/lol2/src/process/peer_svc/mod.rs b/lol2/src/process/peer_svc/mod.rs index ee5c2042..19f3074e 100644 --- a/lol2/src/process/peer_svc/mod.rs +++ b/lol2/src/process/peer_svc/mod.rs @@ -33,14 +33,14 @@ pub struct Inner { peer_contexts: spin::RwLock>, peer_threads: spin::Mutex>, - command_log: CommandLog, + command_log: Ref, driver: RaftDriver, } #[derive(shrinkwraprs::Shrinkwrap, Clone)] pub struct PeerSvc(pub Arc); impl PeerSvc { - pub fn new(command_log: CommandLog, driver: RaftDriver) -> Self { + pub fn new(command_log: Ref, driver: RaftDriver) -> Self { let inner = Inner { membership: HashSet::new().into(), peer_contexts: HashMap::new().into(), @@ -77,7 +77,7 @@ impl PeerSvc { impl PeerSvc { async fn add_peer(&self, id: NodeId, voter: Ref) -> Result<()> { - if id == self.driver.selfid() { + if id == self.driver.self_node_id() { return Ok(()); } diff --git a/lol2/src/process/peer_svc/replication.rs b/lol2/src/process/peer_svc/replication.rs index 1a4da73c..b2025c44 100644 --- a/lol2/src/process/peer_svc/replication.rs +++ b/lol2/src/process/peer_svc/replication.rs @@ -3,7 +3,7 @@ use super::*; impl PeerSvc { async fn prepare_replication_stream( selfid: NodeId, - command_log: CommandLog, + command_log: Ref, l: Index, r: Index, ) -> Result { @@ -61,7 +61,7 @@ impl PeerSvc { assert!(n >= 1); let out_stream = Self::prepare_replication_stream( - self.driver.selfid(), + self.driver.self_node_id(), self.command_log.clone(), old_progress.next_index, old_progress.next_index + n, diff --git a/lol2/src/process/query_queue.rs b/lol2/src/process/query_queue.rs index 8d2ad105..aa5dedd6 100644 --- a/lol2/src/process/query_queue.rs +++ b/lol2/src/process/query_queue.rs @@ -7,14 +7,14 @@ pub struct Query { } pub struct Inner { - app: App, + app: Ref, q: tokio::sync::RwLock, } #[derive(shrinkwraprs::Shrinkwrap, Clone)] pub struct QueryQueue(Arc); impl QueryQueue { - pub fn new(app: App) -> Self { + pub fn new(app: Ref) -> Self { let inner = Inner { app, q: Impl::new().into(), @@ -31,7 +31,7 @@ impl QueryQueue { pub async fn execute(&self, index: Index) -> bool { let mut q = self.q.write().await; - q.execute(index, self.app.clone()).await + q.execute(index, &self.app).await } } @@ -54,7 +54,7 @@ impl Impl { } /// execute all awating queries in range [, index] in parallel - async fn execute(&mut self, index: Index, app: App) -> bool { + async fn execute(&mut self, index: Index, app: &App) -> bool { let futs = { let mut out = vec![]; let ls: Vec = self.reserved.range(..=index).map(|(k, _)| *k).collect(); diff --git a/lol2/src/process/raft_process/cluster.rs b/lol2/src/process/raft_process/cluster.rs index 1ab0fd85..586fcf31 100644 --- a/lol2/src/process/raft_process/cluster.rs +++ b/lol2/src/process/raft_process/cluster.rs @@ -22,7 +22,7 @@ impl RaftProcess { async fn init_cluster(&self) -> Result<()> { let mut membership = HashSet::new(); - membership.insert(self.driver.selfid()); + membership.insert(self.driver.self_node_id()); let init_command = Command::serialize(Command::Snapshot { membership: membership.clone(), @@ -39,14 +39,14 @@ impl RaftProcess { // After this function is called // this server immediately becomes the leader by self-vote and advance commit index. // Consequently, when initial install_snapshot is called this server is already the leader. - let conn = self.driver.connect(self.driver.selfid()); + let conn = self.driver.connect(self.driver.self_node_id()); conn.send_timeout_now().await?; Ok(()) } pub(crate) async fn add_server(&self, req: request::AddServer) -> Result<()> { - if self.peers.read_membership().is_empty() && req.server_id == self.driver.selfid() { + if self.peers.read_membership().is_empty() && req.server_id == self.driver.self_node_id() { // This is called the "cluster bootstrapping". // To add a node to a cluster we have to know some node in the cluster. // But what about the first node? @@ -56,7 +56,7 @@ impl RaftProcess { let req = request::KernRequest { message: msg.serialize(), }; - let conn = self.driver.connect(self.driver.selfid()); + let conn = self.driver.connect(self.driver.self_node_id()); conn.process_kern_request(req).await?; } Ok(()) @@ -67,7 +67,7 @@ impl RaftProcess { let req = request::KernRequest { message: msg.serialize(), }; - let conn = self.driver.connect(self.driver.selfid()); + let conn = self.driver.connect(self.driver.self_node_id()); conn.process_kern_request(req).await?; Ok(()) } diff --git a/lol2/src/process/raft_process/mod.rs b/lol2/src/process/raft_process/mod.rs index a221ee53..c7168371 100644 --- a/lol2/src/process/raft_process/mod.rs +++ b/lol2/src/process/raft_process/mod.rs @@ -22,10 +22,10 @@ pub struct Inner { peers: PeerSvc, query_queue: QueryQueue, driver: RaftDriver, - thread_handles: ThreadHandles, + _thread_handles: ThreadHandles, } -#[derive(shrinkwraprs::Shrinkwrap)] +#[derive(shrinkwraprs::Shrinkwrap, Clone)] pub struct RaftProcess(Arc); impl RaftProcess { pub async fn new( @@ -36,12 +36,12 @@ impl RaftProcess { ) -> Result { let app = App::new(app); - let query_queue = QueryQueue::new(app.clone()); + let query_queue = QueryQueue::new(Ref(app.clone())); let command_log = CommandLog::new(log_store, app.clone()); command_log.restore_state().await?; - let peers = PeerSvc::new(command_log.clone(), driver.clone()); + let peers = PeerSvc::new(Ref(command_log.clone()), driver.clone()); let voter = Voter::new( ballot_store, @@ -52,7 +52,7 @@ impl RaftProcess { peers.restore_state(Ref(voter.clone())).await?; - let thread_handles = ThreadHandles { + let _thread_handles = ThreadHandles { advance_kern_handle: thread::advance_kern::new(command_log.clone(), voter.clone()), advance_user_handle: thread::advance_user::new(command_log.clone(), app.clone()), advance_snapshot_handle: thread::advance_snapshot::new(command_log.clone()), @@ -77,7 +77,7 @@ impl RaftProcess { peers, query_queue, driver, - thread_handles, + _thread_handles, }; Ok(RaftProcess(inner.into())) } diff --git a/lol2/src/process/raft_process/responder.rs b/lol2/src/process/raft_process/responder.rs index 66912d9a..e8c272fa 100644 --- a/lol2/src/process/raft_process/responder.rs +++ b/lol2/src/process/raft_process/responder.rs @@ -72,7 +72,7 @@ impl RaftProcess { rx.await? } else { // This check is to avoid looping. - ensure!(self.driver.selfid() != leader_id); + ensure!(self.driver.self_node_id() != leader_id); let conn = self.driver.connect(leader_id); conn.process_user_read_request(req).await? }; @@ -107,7 +107,7 @@ impl RaftProcess { rx.await? } else { // This check is to avoid looping. - ensure!(self.driver.selfid() != leader_id); + ensure!(self.driver.self_node_id() != leader_id); let conn = self.driver.connect(leader_id); conn.process_user_write_request(req).await? @@ -125,7 +125,7 @@ impl RaftProcess { Ok(()) } - pub async fn get_snapshot(&self, index: Index) -> Result { + pub async fn get_snapshot(&self, index: Index) -> Result { let st = self.command_log.open_snapshot(index).await?; Ok(st) } diff --git a/lol2/src/process/snapshot.rs b/lol2/src/process/snapshot.rs index aca90072..164828ad 100644 --- a/lol2/src/process/snapshot.rs +++ b/lol2/src/process/snapshot.rs @@ -1,51 +1,5 @@ use super::*; -use futures::stream::TryStreamExt; -use tokio::io; -use tokio::io::{AsyncRead, AsyncWrite}; -use tokio_util::codec; - pub type Error = std::io::Error; pub type Result = std::io::Result; - -pub type SnapshotStream = - std::pin::Pin> + Send>>; - -mod reader { - use super::*; - - fn into_bytes_stream(r: R) -> impl Stream> - where - R: AsyncRead, - { - codec::FramedRead::new(r, codec::BytesCodec::new()).map_ok(|bytes| bytes.freeze()) - } - - pub fn read(reader: R) -> impl Stream> { - into_bytes_stream(reader) - } -} -pub use reader::read; - -mod writer { - use super::*; - - async fn read_bytes_stream( - w: W, - mut st: impl Stream> + Unpin, - ) -> io::Result<()> { - use futures::SinkExt; - let mut sink = codec::FramedWrite::new(w, codec::BytesCodec::new()); - sink.send_all(&mut st).await?; - Ok(()) - } - - pub async fn write( - writer: W, - st: impl Stream> + Unpin, - ) -> io::Result<()> { - let st = st.map(|res| res.map_err(|_| std::io::Error::from(std::io::ErrorKind::Other))); - read_bytes_stream(writer, st).await - } -} -pub use writer::write; +pub type Stream = std::pin::Pin> + Send>>; diff --git a/lol2/src/process/voter/election.rs b/lol2/src/process/voter/election.rs index 1f62a7d1..f780d925 100644 --- a/lol2/src/process/voter/election.rs +++ b/lol2/src/process/voter/election.rs @@ -84,7 +84,11 @@ impl Voter { pub fn get_election_timeout(&self) -> Option { // This is optimization to avoid unnecessary election. // If the membership doesn't contain this node, it can't be receiving any heartbeat. - if !self.peers.read_membership().contains(&self.driver.selfid()) { + if !self + .peers + .read_membership() + .contains(&self.driver.self_node_id()) + { return None; } self.leader_failure_detector.get_election_timeout() @@ -116,7 +120,7 @@ impl Voter { // Vote to itself new_ballot.cur_term = vote_term; - new_ballot.voted_for = Some(self.driver.selfid()); + new_ballot.voted_for = Some(self.driver.self_node_id()); self.write_ballot(new_ballot).await?; // Becoming Candidate avoids this node starts another election during this election. @@ -146,12 +150,12 @@ impl Voter { ) -> Result { let (others, remaining) = { let membership = self.peers.read_membership(); - ensure!(membership.contains(&self.driver.selfid())); + ensure!(membership.contains(&self.driver.self_node_id())); let n = membership.len(); let mut others = vec![]; for id in membership { - if id != self.driver.selfid() { + if id != self.driver.self_node_id() { others.push(id); } } @@ -168,7 +172,7 @@ impl Voter { // Let's get remaining votes out of others. let mut vote_requests = vec![]; for endpoint in others { - let selfid = self.driver.selfid(); + let selfid = self.driver.self_node_id(); let conn = self.driver.connect(endpoint); vote_requests.push(async move { let req = request::RequestVote { diff --git a/lol2/src/process/voter/heartbeat.rs b/lol2/src/process/voter/heartbeat.rs index 08091a64..c5dac7b2 100644 --- a/lol2/src/process/voter/heartbeat.rs +++ b/lol2/src/process/voter/heartbeat.rs @@ -45,7 +45,7 @@ impl Voter { let ballot = self.read_ballot().await?; let leader_commit_index = self.command_log.commit_pointer.load(Ordering::SeqCst); let req = request::Heartbeat { - leader_id: self.driver.selfid(), + leader_id: self.driver.self_node_id(), leader_term: ballot.cur_term, leader_commit_index, }; diff --git a/lol2/src/process/voter/stepdown.rs b/lol2/src/process/voter/stepdown.rs index 7321fb95..50abd664 100644 --- a/lol2/src/process/voter/stepdown.rs +++ b/lol2/src/process/voter/stepdown.rs @@ -19,7 +19,7 @@ impl Voter { .try_read_membership_change(last_membership_change_index) .await? .unwrap(); - ensure!(!config.contains(&self.driver.selfid())); + ensure!(!config.contains(&self.driver.self_node_id())); info!("step down"); self.write_election_state(voter::ElectionState::Follower); diff --git a/lol2/src/raft_service/mod.rs b/lol2/src/raft_service/mod.rs index e8a035a9..2a913ea4 100644 --- a/lol2/src/raft_service/mod.rs +++ b/lol2/src/raft_service/mod.rs @@ -4,6 +4,7 @@ use process::*; mod stream; +/// Create a Raft service backed by a `RaftNode`. pub fn new(node: RaftNode) -> raft::raft_server::RaftServer { let inner = ServiceImpl { node }; raft::raft_server::RaftServer::new(inner) @@ -13,6 +14,7 @@ pub fn new(node: RaftNode) -> raft::raft_server::RaftServer { pub struct ServiceImpl { node: RaftNode, } + #[tonic::async_trait] impl raft::raft_server::Raft for ServiceImpl { type GetSnapshotStream = stream::SnapshotStreamOut; @@ -22,13 +24,15 @@ impl raft::raft_server::Raft for ServiceImpl { request: tonic::Request, ) -> std::result::Result, tonic::Status> { let req = request.into_inner(); + let lane_id = req.lane_id; let req = request::UserWriteRequest { message: req.message, request_id: req.request_id, }; let resp = self .node - .get_process() + .get_process(lane_id) + .unwrap() .process_user_write_request(req) .await .unwrap(); @@ -40,12 +44,14 @@ impl raft::raft_server::Raft for ServiceImpl { request: tonic::Request, ) -> std::result::Result, tonic::Status> { let req = request.into_inner(); + let lane_id = req.lane_id; let req = request::UserReadRequest { message: req.message, }; let resp = self .node - .get_process() + .get_process(lane_id) + .unwrap() .process_user_read_request(req) .await .unwrap(); @@ -57,11 +63,13 @@ impl raft::raft_server::Raft for ServiceImpl { request: tonic::Request, ) -> std::result::Result, tonic::Status> { let req = request.into_inner(); + let lane_id = req.lane_id; let req = request::KernRequest { message: req.message, }; self.node - .get_process() + .get_process(lane_id) + .unwrap() .process_kern_request(req) .await .unwrap(); @@ -73,6 +81,7 @@ impl raft::raft_server::Raft for ServiceImpl { request: tonic::Request, ) -> std::result::Result, tonic::Status> { let req = request.into_inner(); + let lane_id = req.lane_id; let req = request::RequestVote { candidate_id: req.candidate_id.parse().unwrap(), candidate_clock: { @@ -86,7 +95,13 @@ impl raft::raft_server::Raft for ServiceImpl { force_vote: req.force_vote, pre_vote: req.pre_vote, }; - let resp = self.node.get_process().request_vote(req).await.unwrap(); + let resp = self + .node + .get_process(lane_id) + .unwrap() + .request_vote(req) + .await + .unwrap(); Ok(tonic::Response::new(raft::VoteResponse { vote_granted: resp, })) @@ -97,10 +112,16 @@ impl raft::raft_server::Raft for ServiceImpl { request: tonic::Request, ) -> std::result::Result, tonic::Status> { let req = request.into_inner(); + let lane_id = req.lane_id; let req = request::AddServer { server_id: req.server_id.parse().unwrap(), }; - self.node.get_process().add_server(req).await.unwrap(); + self.node + .get_process(lane_id) + .unwrap() + .add_server(req) + .await + .unwrap(); Ok(tonic::Response::new(())) } @@ -109,10 +130,16 @@ impl raft::raft_server::Raft for ServiceImpl { request: tonic::Request, ) -> std::result::Result, tonic::Status> { let req = request.into_inner(); + let lane_id = req.lane_id; let req = request::RemoveServer { server_id: req.server_id.parse().unwrap(), }; - self.node.get_process().remove_server(req).await.unwrap(); + self.node + .get_process(lane_id) + .unwrap() + .remove_server(req) + .await + .unwrap(); Ok(tonic::Response::new(())) } @@ -121,8 +148,14 @@ impl raft::raft_server::Raft for ServiceImpl { request: tonic::Request>, ) -> std::result::Result, tonic::Status> { let st = request.into_inner(); - let st = stream::into_internal_log_stream(st).await; - let resp = self.node.get_process().send_log_stream(st).await.unwrap(); + let (lane_id, st) = stream::into_internal_log_stream(st).await; + let resp = self + .node + .get_process(lane_id) + .unwrap() + .send_log_stream(st) + .await + .unwrap(); Ok(tonic::Response::new(raft::SendLogStreamResponse { success: resp.success, log_last_index: resp.log_last_index, @@ -134,9 +167,11 @@ impl raft::raft_server::Raft for ServiceImpl { request: tonic::Request, ) -> std::result::Result, tonic::Status> { let req = request.into_inner(); + let lane_id = req.lane_id; let resp = self .node - .get_process() + .get_process(lane_id) + .unwrap() .get_snapshot(req.index) .await .unwrap(); @@ -149,20 +184,33 @@ impl raft::raft_server::Raft for ServiceImpl { request: tonic::Request, ) -> std::result::Result, tonic::Status> { let req = request.into_inner(); + let lane_id = req.lane_id; let req = request::Heartbeat { leader_id: req.leader_id.parse().unwrap(), leader_term: req.leader_term, leader_commit_index: req.leader_commit_index, }; - self.node.get_process().send_heartbeat(req).await.unwrap(); + self.node + .get_process(lane_id) + .unwrap() + .send_heartbeat(req) + .await + .unwrap(); Ok(tonic::Response::new(())) } async fn timeout_now( &self, - _: tonic::Request<()>, + req: tonic::Request, ) -> std::result::Result, tonic::Status> { - self.node.get_process().send_timeout_now().await.unwrap(); + let req = req.into_inner(); + let lane_id = req.lane_id; + self.node + .get_process(lane_id) + .unwrap() + .send_timeout_now() + .await + .unwrap(); Ok(tonic::Response::new(())) } } diff --git a/lol2/src/raft_service/stream.rs b/lol2/src/raft_service/stream.rs index f71aefc5..2a7eddf0 100644 --- a/lol2/src/raft_service/stream.rs +++ b/lol2/src/raft_service/stream.rs @@ -2,17 +2,19 @@ use super::*; pub async fn into_internal_log_stream( mut out_stream: tonic::Streaming, -) -> LogStream { +) -> (LaneId, LogStream) { use raft::log_stream_chunk::Elem as ChunkElem; - let (sender_id, prev_clock) = if let Some(Ok(chunk)) = out_stream.next().await { + let (lane_id, sender_id, prev_clock) = if let Some(Ok(chunk)) = out_stream.next().await { let e = chunk.elem.unwrap(); if let ChunkElem::Header(raft::LogStreamHeader { + lane_id, sender_id, prev_clock: Some(prev_clock), }) = e { ( + lane_id, sender_id, Clock { term: prev_clock.term, @@ -42,18 +44,20 @@ pub async fn into_internal_log_stream( } }; - LogStream { + let st = LogStream { sender_id: sender_id.parse().unwrap(), prev_clock, entries: Box::pin(entries), - } + }; + + (lane_id, st) } pub type SnapshotStreamOut = std::pin::Pin< Box> + Send>, >; -pub fn into_external_snapshot_stream(in_stream: SnapshotStream) -> SnapshotStreamOut { +pub fn into_external_snapshot_stream(in_stream: snapshot::Stream) -> SnapshotStreamOut { let out_stream = in_stream.map(|res| { res.map(|data| raft::SnapshotChunk { data }) .map_err(|_| tonic::Status::unknown("streaming error")) diff --git a/tests/env/src/lib.rs b/tests/env/src/lib.rs index 1f5eb2b9..25d6e3e9 100644 --- a/tests/env/src/lib.rs +++ b/tests/env/src/lib.rs @@ -73,60 +73,6 @@ impl Env { Ok(()) } - pub async fn pause(&mut self, id: u8) -> Result<()> { - ensure!(self.containers.contains_key(&id)); - let container_id = self.containers.get(&id).unwrap().0.clone(); - self.docker.pause_container(&container_id).await?; - Ok(()) - } - - pub async fn resume(&mut self, id: u8) -> Result<()> { - ensure!(self.containers.contains_key(&id)); - let container_id = self.containers.get(&id).unwrap().0.clone(); - self.docker.unpause_container(&container_id).await?; - Ok(()) - } - - pub async fn pause_v2(&mut self, id: u8) -> Result<()> { - ensure!(self.containers.contains_key(&id)); - let container_id = self.containers.get(&id).unwrap().0.clone(); - - let inspect = self.docker.inspect_container(&container_id, None).await?; - let pid = inspect - .state - .and_then(|state| state.pid) - .ok_or(anyhow::anyhow!("no pid"))?; - - let mut child = tokio::process::Command::new("kill") - .arg("-s") - .arg("SIGSTOP") - .arg(format!("{pid}")) - .spawn()?; - child.wait().await?; - - Ok(()) - } - - pub async fn resume_v2(&mut self, id: u8) -> Result<()> { - ensure!(self.containers.contains_key(&id)); - let container_id = self.containers.get(&id).unwrap().0.clone(); - - let inspect = self.docker.inspect_container(&container_id, None).await?; - let pid = inspect - .state - .and_then(|state| state.pid) - .ok_or(anyhow::anyhow!("no pid"))?; - - let mut child = tokio::process::Command::new("kill") - .arg("-s") - .arg("SIGCONT") - .arg(format!("{pid}")) - .spawn()?; - child.wait().await?; - - Ok(()) - } - pub async fn connect_network(&mut self, id: u8) -> Result<()> { ensure!(self.containers.contains_key(&id)); let container_id = self.containers.get(&id).unwrap().0.clone(); @@ -149,17 +95,6 @@ impl Env { Ok(()) } - pub async fn disconnect_network(&mut self, id: u8) -> Result<()> { - ensure!(self.containers.contains_key(&id)); - let container_id = self.containers.get(&id).unwrap().0.clone(); - let config = network::DisconnectNetworkOptions { - container: container_id, - ..Default::default() - }; - self.docker.disconnect_network(NETWORK_NAME, config).await?; - Ok(()) - } - pub async fn ping(&self, id: u8) -> Result<()> { let chan = self.connect(0); let mut cli = testapp::PingClient::new(chan); @@ -176,6 +111,7 @@ impl Env { chan } } + impl Drop for Env { fn drop(&mut self) { for (id, container) in self.containers.drain() { diff --git a/tests/env/tests/tests.rs b/tests/env/tests/tests.rs index 03616ffb..e69e034f 100644 --- a/tests/env/tests/tests.rs +++ b/tests/env/tests/tests.rs @@ -30,94 +30,5 @@ async fn start_stop() -> Result<()> { env.stop(0).await?; assert!(env.ping(0).await.is_err()); - env.start(0).await?; - env.ping(0).await?; - - Ok(()) -} - -#[ignore] -#[serial] -#[test(tokio::test(flavor = "multi_thread"))] -async fn pause_resume() -> Result<()> { - let mut env = env::Env::new()?; - env.create(0).await?; - env.start(0).await?; - env.connect_network(0).await?; - env.ping(0).await?; - - env.pause(0).await?; - // hang - assert!(env.ping(0).await.is_err()); - - env.resume(0).await?; - env.ping(0).await?; - - Ok(()) -} - -#[ignore] -#[serial] -#[test(tokio::test(flavor = "multi_thread"))] -async fn pause_resume_v2() -> Result<()> { - let mut env = env::Env::new()?; - env.create(0).await?; - env.start(0).await?; - env.connect_network(0).await?; - env.ping(0).await?; - - env.pause_v2(0).await?; - assert!(env.ping(0).await.is_err()); - - env.resume_v2(0).await?; - env.ping(0).await?; - - Ok(()) -} - -#[serial] -#[test(tokio::test(flavor = "multi_thread"))] -async fn connect_disconnect_network() -> Result<()> { - let mut env = env::Env::new()?; - env.create(0).await?; - - env.connect_network(0).await?; - assert!(env.ping(0).await.is_err()); - - env.start(0).await?; - env.ping(0).await?; - - env.disconnect_network(0).await?; - // hang - // cli.ping(()).await?; - - Ok(()) -} - -#[serial] -#[test(tokio::test(flavor = "multi_thread"))] -async fn ping_connectivity() -> Result<()> { - let mut env = env::Env::new()?; - env.create(0).await?; - - let chan = env.connect(0); - assert!(env.ping(0).await.is_err()); - - env.start(0).await?; - assert!(env.ping(0).await.is_err()); - - env.connect_network(0).await?; - env.ping(0).await?; - - // hang - // env.disconnect_network(0).await?; - // assert!(cli.ping(()).await.is_err()); - - // env.connect_network(0).await?; - // assert!(cli.ping(()).await.is_ok()); - - env.stop(0).await?; - assert!(env.ping(0).await.is_err()); - Ok(()) } diff --git a/tests/lol-tests/src/lib.rs b/tests/lol-tests/src/lib.rs index 85a02f4a..a532ad53 100644 --- a/tests/lol-tests/src/lib.rs +++ b/tests/lol-tests/src/lib.rs @@ -33,6 +33,7 @@ impl Cluster { pub async fn add_server(&mut self, to: u8, id: u8) -> Result<()> { self.admin(to) .add_server(lol2::client::AddServerRequest { + lane_id: testapp::APP_LANE_ID, server_id: Env::address_from_id(id), }) .await?; @@ -44,6 +45,7 @@ impl Cluster { pub async fn remove_server(&mut self, to: u8, id: u8) -> Result<()> { self.admin(to) .remove_server(lol2::client::RemoveServerRequest { + lane_id: testapp::APP_LANE_ID, server_id: Env::address_from_id(id), }) .await?; diff --git a/tests/lol-tests/tests/one_node.rs b/tests/lol-tests/tests/one_node.rs index dbdb07f4..96ba7fd8 100644 --- a/tests/lol-tests/tests/one_node.rs +++ b/tests/lol-tests/tests/one_node.rs @@ -71,6 +71,7 @@ async fn n1_many_retry_exec_once() -> Result<()> { let cli = lol2::client::RaftClient::new(chan); let req = lol2::client::WriteRequest { + lane_id: testapp::APP_LANE_ID, message: testapp::AppWriteRequest::FetchAdd { bytes: vec![1u8; 1].into(), } diff --git a/tests/testapp/Dockerfile b/tests/testapp/Dockerfile index 22104c90..86e7ea23 100644 --- a/tests/testapp/Dockerfile +++ b/tests/testapp/Dockerfile @@ -1,12 +1,20 @@ -FROM rust:1.72-bookworm as builder -WORKDIR /work +FROM rust:1.75-bookworm AS chef RUN apt-get update RUN apt-get install -y protobuf-compiler +RUN cargo install cargo-chef +WORKDIR /work + +FROM chef AS planner +COPY . . +RUN cargo chef prepare --recipe-path recipe.json +FROM chef AS builder +COPY --from=planner /work/recipe.json recipe.json +RUN cargo chef cook --release --recipe-path recipe.json COPY . . RUN cargo build -FROM rust:1.72-slim-bookworm as runtime +FROM rust:1.75-slim-bookworm AS runtime EXPOSE 50000 COPY --from=builder /work/target/debug/testapp ./ ENTRYPOINT ["./testapp"] \ No newline at end of file diff --git a/tests/testapp/src/app/mod.rs b/tests/testapp/src/app/mod.rs index d235530b..7358ea57 100644 --- a/tests/testapp/src/app/mod.rs +++ b/tests/testapp/src/app/mod.rs @@ -8,6 +8,8 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::RwLock; use testapp::{AppReadRequest, AppState, AppWriteRequest}; +mod snapshot_io; + pub async fn new(driver: lol2::RaftDriver) -> Result { let app_main = AppMain::new(); let app_log = AppLog::new(); @@ -19,15 +21,16 @@ pub async fn new(driver: lol2::RaftDriver) -> Result { struct AppSnapshot(AppState); impl AppSnapshot { - pub fn into_stream(self) -> SnapshotStream { + pub fn into_stream(self) -> snapshot::Stream { let bytes = self.0.serialize(); let cursor = std::io::Cursor::new(bytes); - Box::pin(snapshot::read(cursor)) + Box::pin(snapshot_io::read(cursor)) } - pub async fn from_stream(st: SnapshotStream) -> Self { + + pub async fn from_stream(st: snapshot::Stream) -> Self { let mut v = vec![]; let cursor = std::io::Cursor::new(&mut v); - snapshot::write(cursor, st).await.unwrap(); + snapshot_io::write(cursor, st).await.unwrap(); let cur_state = AppState::deserialize(&v); AppSnapshot(cur_state) } @@ -104,7 +107,7 @@ impl RaftApp for AppMain { Ok(AppState(cur_state.counter).serialize()) } - async fn save_snapshot(&self, st: SnapshotStream, snapshot_index: Index) -> Result<()> { + async fn save_snapshot(&self, st: snapshot::Stream, snapshot_index: Index) -> Result<()> { let snap = AppSnapshot::from_stream(st).await; self.snapshots .write() @@ -113,7 +116,7 @@ impl RaftApp for AppMain { Ok(()) } - async fn open_snapshot(&self, x: Index) -> Result { + async fn open_snapshot(&self, x: Index) -> Result { ensure!(self.snapshots.read().unwrap().contains_key(&x)); let cur_state = *self.snapshots.read().unwrap().get(&x).unwrap(); let snap = AppSnapshot(cur_state); @@ -128,7 +131,7 @@ impl RaftApp for AppMain { Ok(()) } - async fn propose_new_snapshot(&self) -> Result { + async fn get_latest_snapshot(&self) -> Result { let k = { let mut out = vec![]; let snapshots = self.snapshots.read().unwrap(); diff --git a/tests/testapp/src/app/snapshot_io.rs b/tests/testapp/src/app/snapshot_io.rs new file mode 100644 index 00000000..fe495456 --- /dev/null +++ b/tests/testapp/src/app/snapshot_io.rs @@ -0,0 +1,47 @@ +use super::*; + +use futures::stream::TryStreamExt; +use futures::Stream; +use futures::StreamExt; +use tokio::io; +use tokio::io::{AsyncRead, AsyncWrite}; +use tokio_util::codec; + +mod reader { + use super::*; + + fn into_bytes_stream(r: R) -> impl Stream> + where + R: AsyncRead, + { + codec::FramedRead::new(r, codec::BytesCodec::new()).map_ok(|bytes| bytes.freeze()) + } + + pub fn read(reader: R) -> impl Stream> { + into_bytes_stream(reader) + } +} +pub use reader::read; + +mod writer { + use super::*; + + async fn read_bytes_stream( + w: W, + mut st: impl Stream> + Unpin, + ) -> io::Result<()> { + use futures::SinkExt; + let mut sink = codec::FramedWrite::new(w, codec::BytesCodec::new()); + sink.send_all(&mut st).await?; + Ok(()) + } + + pub async fn write( + writer: W, + st: impl Stream> + Unpin, + ) -> io::Result<()> { + let st = st.map(|res| res.map_err(|_| std::io::Error::from(std::io::ErrorKind::Other))); + read_bytes_stream(writer, st).await + } +} +pub use writer::write; diff --git a/tests/testapp/src/lib.rs b/tests/testapp/src/lib.rs index 7b43ddd9..5d05f170 100644 --- a/tests/testapp/src/lib.rs +++ b/tests/testapp/src/lib.rs @@ -8,6 +8,8 @@ mod proto { } pub use proto::ping_client::PingClient; +pub const APP_LANE_ID: lol2::LaneId = 777; + #[derive(serde::Serialize, serde::Deserialize)] pub enum AppWriteRequest { FetchAdd { bytes: Vec }, @@ -16,6 +18,7 @@ impl AppWriteRequest { pub fn serialize(self) -> Bytes { bincode::serialize(&self).unwrap().into() } + pub fn deserialize(bytes: &[u8]) -> Self { bincode::deserialize(bytes).unwrap() } @@ -30,6 +33,7 @@ impl AppReadRequest { pub fn serialize(self) -> Bytes { bincode::serialize(&self).unwrap().into() } + pub fn deserialize(bytes: &[u8]) -> Self { bincode::deserialize(bytes).unwrap() } @@ -41,6 +45,7 @@ impl AppState { pub fn serialize(&self) -> Bytes { bincode::serialize(&self).unwrap().into() } + pub fn deserialize(bytes: &[u8]) -> Self { bincode::deserialize(bytes).unwrap() } @@ -58,6 +63,7 @@ impl Client { pub async fn fetch_add(&mut self, n: u64) -> Result { let request_id = uuid::Uuid::new_v4().to_string(); let req = WriteRequest { + lane_id: APP_LANE_ID, message: AppWriteRequest::FetchAdd { bytes: vec![1u8; n as usize].into(), } @@ -87,6 +93,7 @@ impl Client { pub async fn read(&self) -> Result { let req = ReadRequest { + lane_id: APP_LANE_ID, message: AppReadRequest::Read.serialize(), }; let resp = self.cli.clone().read(req).await?.into_inner(); @@ -96,6 +103,7 @@ impl Client { pub async fn make_snapshot(&self) -> Result { let req = ReadRequest { + lane_id: APP_LANE_ID, message: AppReadRequest::MakeSnapshot.serialize(), }; let resp = self.cli.clone().read(req).await?.into_inner(); diff --git a/tests/testapp/src/main.rs b/tests/testapp/src/main.rs index 21a1b1f3..fdcf527c 100644 --- a/tests/testapp/src/main.rs +++ b/tests/testapp/src/main.rs @@ -5,7 +5,9 @@ mod app; mod proto { tonic::include_proto!("testapp"); } + struct PingApp; + #[tonic::async_trait] impl proto::ping_server::Ping for PingApp { async fn ping( @@ -51,9 +53,9 @@ async fn main() -> Result<()> { let node_id = env_config.address.parse()?; let node = lol2::RaftNode::new(node_id); - let driver = node.get_driver(); + let driver = node.get_driver(testapp::APP_LANE_ID); let process = app::new(driver).await?; - node.attach_process(process); + node.attach_process(testapp::APP_LANE_ID, process); let raft_svc = lol2::raft_service::new(node); let ping_svc = proto::ping_server::PingServer::new(PingApp);