Skip to content

Commit

Permalink
rework3: Implement Multi-Raft (#326)
Browse files Browse the repository at this point in the history
Multi-raft
  • Loading branch information
akiradeveloper authored Feb 10, 2024
1 parent 31724e7 commit 5301a2a
Show file tree
Hide file tree
Showing 35 changed files with 396 additions and 336 deletions.
43 changes: 38 additions & 5 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,46 @@ name: CI
on:
pull_request:
branches:
- develop
- master

jobs:
run_test:
docker:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Compile
run: ./dev cargo build
- name: Checkout
uses: actions/checkout@v4

- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3

- name: Login to Docker Hub
uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }}

- name: Build (dev)
uses: docker/build-push-action@v5
with:
context: .
push: false
load: true
tags: lol-dev:latest
cache-from: type=gha,scope=dev
cache-to: type=gha,mode=max,scope=dev

- name: Build (testapp)
uses: docker/build-push-action@v5
with:
context: .
file: tests/testapp/Dockerfile
push: false
load: true
tags: lol-testapp:latest
cache-from: type=gha,scope=testapp
cache-to: type=gha,mode=max,scope=testapp

- name: Run tests
run: |
docker compose -f docker-compose.ci.yml build
docker compose -f docker-compose.ci.yml run --rm dev cargo test
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM 'rust:1.72-bookworm'
FROM 'rust:1.75-bookworm'

WORKDIR '/work'

Expand Down
9 changes: 6 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@

A Raft implementation in Rust language. To support this project please give it a ⭐

![](https://user-images.githubusercontent.com/785824/146726060-63b12378-ecb7-49f9-8025-a65dbd37e9b2.jpeg)
![146726060-63b12378-ecb7-49f9-8025-a65dbd37e9b2](https://github.com/akiradeveloper/lol/assets/785824/12a016fe-35a0-4d12-8ffa-955ef61b25b9)


## Features

![multi-raft](https://github.com/akiradeveloper/lol/assets/785824/2293cf2b-436a-45ed-a507-88e299e622bf)
![スクリーンショット 2024-02-03 13 29 55](https://github.com/akiradeveloper/lol/assets/785824/f6a6ceef-98f3-4fcf-9ba8-3655d52bd3f0)


- Implements all fundamental [Raft](https://raft.github.io/) features for production use.
- Supports Multi-Raft. Mutliple Raft processes can coexist in a single OS process so they can share resources efficiently.
Expand All @@ -24,7 +26,8 @@ A Raft implementation in Rust language. To support this project please give it a
To implement Multi-Raft, the architecture is divided into two spaces. One in the lower side is called "Pure Raft" layer which is totally unaware of
gRPC and Multi-Raft. Therefore, called pure. The other side translates gRPC requests into pure requests and vice versa.

![lol2 (1)](https://github.com/akiradeveloper/lol/assets/785824/dc605ed9-ca90-4e17-a370-ca1b939e43ce)
![スクリーンショット 2024-02-03 13 30 09](https://github.com/akiradeveloper/lol/assets/785824/fd064ba6-be20-4934-839a-db8cd07a8f13)



## Development
Expand Down
20 changes: 20 additions & 0 deletions docker-compose.ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
version: '3'

services:
dev:
image: lol-dev:latest
working_dir: /work
volumes:
- ./:/work
- /var/run/docker.sock:/var/run/docker.sock
networks:
- raft-network

testapp:
image: lol-testapp:latest
entrypoint:
- /bin/sh

networks:
raft-network:
driver: bridge
49 changes: 31 additions & 18 deletions lol2/proto/lol2.proto
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@ import "google/protobuf/empty.proto";
package lol2;

message WriteRequest {
bytes message = 1;
uint32 lane_id = 1;
bytes message = 2;
// unique identifier of this request
// duplicated requests with the same unique identifier are only executed once.
string request_id = 2;
string request_id = 3;
}

message ReadRequest {
bytes message = 1;
uint32 lane_id = 1;
bytes message = 2;
}

message Response {
Expand All @@ -25,12 +27,14 @@ message Clock {
}

message KernRequest {
bytes message = 1;
uint32 lane_id = 1;
bytes message = 2;
}

message LogStreamHeader {
string sender_id = 1;
Clock prev_clock = 2;
uint32 lane_id = 1;
string sender_id = 2;
Clock prev_clock = 3;
}
message LogStreamEntry {
Clock clock = 1;
Expand All @@ -48,35 +52,44 @@ message SendLogStreamResponse {
}

message GetSnapshotRequest {
uint64 index = 1;
uint32 lane_id = 1;
uint64 index = 2;
}
message SnapshotChunk {
bytes data = 1;
}

message VoteRequest {
uint64 vote_term = 1;
string candidate_id = 2;
Clock candidate_clock = 3;
bool force_vote = 4;
bool pre_vote = 5;
uint32 lane_id = 1;
uint64 vote_term = 2;
string candidate_id = 3;
Clock candidate_clock = 4;
bool force_vote = 5;
bool pre_vote = 6;
}
message VoteResponse {
bool vote_granted = 1;
}

message Heartbeat {
uint64 leader_term = 1;
string leader_id = 2;
uint64 leader_commit_index = 3;
uint32 lane_id = 1;
uint64 leader_term = 2;
string leader_id = 3;
uint64 leader_commit_index = 4;
}

message AddServerRequest {
string server_id = 1;
uint32 lane_id = 1;
string server_id = 2;
}

message RemoveServerRequest {
string server_id = 1;
uint32 lane_id = 1;
string server_id = 2;
}

message TimeoutNowRequest {
uint32 lane_id = 1;
}

service Raft {
Expand All @@ -89,5 +102,5 @@ service Raft {
rpc SendLogStream (stream LogStreamChunk) returns (SendLogStreamResponse);
rpc GetSnapshot (GetSnapshotRequest) returns (stream SnapshotChunk);
rpc SendHeartbeat (Heartbeat) returns (google.protobuf.Empty);
rpc TimeoutNow (google.protobuf.Empty) returns (google.protobuf.Empty);
rpc TimeoutNow (TimeoutNowRequest) returns (google.protobuf.Empty);
}
4 changes: 3 additions & 1 deletion lol2/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use super::*;

pub type RaftClient = raft::raft_client::RaftClient<tonic::transport::channel::Channel>;
pub use raft::{AddServerRequest, ReadRequest, RemoveServerRequest, Response, WriteRequest};
pub use raft::{
AddServerRequest, ReadRequest, RemoveServerRequest, Response, TimeoutNowRequest, WriteRequest,
};
30 changes: 21 additions & 9 deletions lol2/src/requester/mod.rs → lol2/src/communicator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,30 @@ use process::*;

mod stream;

pub struct Connection {
pub struct Communicator {
cli: raft::RaftClient,
lane_id: LaneId,
}
impl Connection {
pub fn new(cli: raft::RaftClient) -> Self {
Self { cli }
impl Communicator {
pub fn new(cli: raft::RaftClient, lane_id: LaneId) -> Self {
Self { cli, lane_id }
}
}

impl Connection {
pub async fn get_snapshot(&self, index: Index) -> Result<SnapshotStream> {
let req = raft::GetSnapshotRequest { index };
impl Communicator {
pub async fn get_snapshot(&self, index: Index) -> Result<snapshot::Stream> {
let req = raft::GetSnapshotRequest {
lane_id: self.lane_id,
index,
};
let st = self.cli.clone().get_snapshot(req).await?.into_inner();
let st = Box::pin(stream::into_internal_snapshot_stream(st));
Ok(st)
}

pub async fn send_heartbeat(&self, req: request::Heartbeat) -> Result<()> {
let req = raft::Heartbeat {
lane_id: self.lane_id,
leader_id: req.leader_id.to_string(),
leader_term: req.leader_term,
leader_commit_index: req.leader_commit_index,
Expand All @@ -37,6 +42,7 @@ impl Connection {
req: request::UserWriteRequest,
) -> Result<Bytes> {
let req = raft::WriteRequest {
lane_id: self.lane_id,
message: req.message,
request_id: req.request_id,
};
Expand All @@ -46,6 +52,7 @@ impl Connection {

pub async fn process_user_read_request(&self, req: request::UserReadRequest) -> Result<Bytes> {
let req = raft::ReadRequest {
lane_id: self.lane_id,
message: req.message,
};
let resp = self.cli.clone().read(req).await?.into_inner();
Expand All @@ -54,19 +61,23 @@ impl Connection {

pub async fn process_kern_request(&self, req: request::KernRequest) -> Result<()> {
let req = raft::KernRequest {
lane_id: self.lane_id,
message: req.message,
};
self.cli.clone().process_kern_request(req).await?;
Ok(())
}

pub async fn send_timeout_now(&self) -> Result<()> {
self.cli.clone().timeout_now(()).await?;
let req = raft::TimeoutNowRequest {
lane_id: self.lane_id,
};
self.cli.clone().timeout_now(req).await?;
Ok(())
}

pub async fn send_log_stream(&self, st: LogStream) -> Result<response::SendLogStream> {
let st = stream::into_external_log_stream(st);
let st = stream::into_external_log_stream(self.lane_id, st);
let resp = self.cli.clone().send_log_stream(st).await?.into_inner();
Ok(response::SendLogStream {
success: resp.success,
Expand All @@ -76,6 +87,7 @@ impl Connection {

pub async fn request_vote(&self, req: request::RequestVote) -> Result<bool> {
let req = raft::VoteRequest {
lane_id: self.lane_id,
candidate_id: req.candidate_id.to_string(),
candidate_clock: {
let e = req.candidate_clock;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use super::*;

pub fn into_external_log_stream(
lane_id: LaneId,
st: LogStream,
) -> impl futures::stream::Stream<Item = raft::LogStreamChunk> {
use raft::log_stream_chunk::Elem as ChunkElem;
let header_stream = vec![ChunkElem::Header(raft::LogStreamHeader {
lane_id,
sender_id: st.sender_id.to_string(),
prev_clock: Some(raft::Clock {
term: st.prev_clock.term,
Expand Down
12 changes: 6 additions & 6 deletions lol2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
pub mod process;

pub mod client;
mod communicator;
mod node;
pub mod raft_service;
mod requester;

use anyhow::Result;
use bytes::Bytes;
Expand All @@ -16,13 +16,14 @@ use process::RaftProcess;
use std::future::Future;
use std::sync::Arc;
use std::time::Duration;
use tonic::transport::{Endpoint, Uri};
use tonic::transport::Uri;

mod raft {
tonic::include_proto!("lol2");
pub type RaftClient = raft_client::RaftClient<tonic::transport::channel::Channel>;
}

/// Identifier of a `RaftNode`.
#[derive(
serde::Serialize,
serde::Deserialize,
Expand All @@ -35,12 +36,11 @@ mod raft {
derive_more::FromStr,
)]
pub struct NodeId(#[serde(with = "http_serde::uri")] Uri);

impl NodeId {
pub fn new(uri: Uri) -> Self {
Self(uri)
}
pub fn from_str(url: &str) -> Result<Self> {
let url = url.parse()?;
Ok(Self(url))
}
}

pub type LaneId = u32;
Loading

0 comments on commit 5301a2a

Please sign in to comment.