Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions curvine-common/src/error/fs_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,10 @@ impl ErrorExt for FsError {
fn should_retry(&self) -> bool {
self.retry_master()
}

fn should_continue(&self) -> bool {
matches!(self, FsError::NotLeaderMaster(_))
}
}

#[cfg(test)]
Expand Down
13 changes: 13 additions & 0 deletions curvine-docker/deploy/example/conf/curvine-cluster.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,19 @@ master_addrs = [
{ hostname = "cv-master-2.svc.cluster.local", port = 8995 }
]

# RPC timeout optimization for fast master failover
# RPC timeout: 120s → 10s (Raft election takes 10-15s)
rpc_timeout_ms = 10000

# Total retry duration: 300s → 40s (10s × 3 nodes + retry intervals)
rpc_retry_max_duration_ms = 40000

# Minimum retry sleep: 300ms → 100ms (faster retry)
rpc_retry_min_sleep_ms = 100

# Maximum retry sleep: 30s → 2s (avoid long wait)
rpc_retry_max_sleep_ms = 2000

[fuse]
debug = false

Expand Down
5 changes: 5 additions & 0 deletions curvine-server/src/master/master_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,11 @@ impl Master {
self.replication_manager.clone()
}

// for test - get state controller to simulate server shutdown
pub fn new_state_ctl(&self) -> orpc::sync::StateCtl {
self.rpc_server.new_state_ctl()
}

pub fn service(&self) -> &MasterService {
self.rpc_server.service()
}
Expand Down
162 changes: 146 additions & 16 deletions curvine-server/src/test/mini_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,16 @@ use crate::master::Master;
use crate::worker::Worker;
use curvine_client::file::CurvineFileSystem;
use curvine_common::conf::ClusterConf;
use curvine_common::raft::{NodeId, RaftPeer};
use curvine_common::error::FsError;
use curvine_common::raft::{NodeId, RaftPeer, RoleState};
use curvine_common::FsResult;
use dashmap::DashMap;
use log::info;
use orpc::client::RpcClient;
use orpc::common::LocalTime;
use orpc::io::net::{InetAddr, NetUtils};
use orpc::runtime::{RpcRuntime, Runtime};
use orpc::{err_box, CommonResult};
use orpc::runtime::Runtime;
use orpc::{err_box, err_msg, CommonResult};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
Expand Down Expand Up @@ -82,20 +83,10 @@ impl MiniCluster {

pub fn start_cluster(&self) {
self.start_master();

// Wait for Master to be fully ready before starting Workers
self.client_rt.block_on(self.wait_master_ready()).unwrap();

// Add a small additional delay to ensure Master is fully stable
std::thread::sleep(Duration::from_millis(500));

self.start_worker();

self.client_rt.block_on(self.wait_ready()).unwrap();
}

/// Wait for Master service to be fully ready (Raft Leader + RPC service started)
async fn wait_master_ready(&self) -> FsResult<()> {
pub async fn wait_master_ready(&self) -> FsResult<()> {
let wait_time = LocalTime::mills() + 60 * 1000; // 60 second timeout
let mut retry_count = 0;
let mut rpc_connection_attempted = false;
Expand Down Expand Up @@ -123,7 +114,6 @@ impl MiniCluster {
}

if raft_leader_ready {
// Additional check: try to create a simple RPC connection to verify the service is actually listening
let conf = self.master_conf().client_rpc_conf();
let addr = self.master_conf().master_addr();

Expand Down Expand Up @@ -154,7 +144,7 @@ impl MiniCluster {
}

/// Wait for the entire cluster to be ready (all Workers registered)
async fn wait_ready(&self) -> FsResult<()> {
pub async fn wait_ready(&self) -> FsResult<()> {
let fs = self.new_fs();
let wait_time = LocalTime::mills() + 20 * 1000; // 20 second timeout
let mut retry_count = 0;
Expand Down Expand Up @@ -225,6 +215,146 @@ impl MiniCluster {
.unwrap()
}

/// Get the index of the currently active master
pub fn get_active_master_index(&self) -> Option<usize> {
self.master_entries
.iter()
.find(|x| x.0.master_monitor.is_active())
.map(|x| *x.key())
}

/// Wait for cluster to be fully ready (Master + Workers)
pub async fn wait_cluster_ready(&self) -> FsResult<()> {
// Wait for master to be ready
self.wait_master_ready().await?;

// Wait for workers to register
self.wait_ready().await?;

Ok(())
}

/// Kill the currently active master
pub fn kill_active_master(&self) -> FsResult<()> {
let active_idx = self
.get_active_master_index()
.ok_or_else(|| FsError::common(err_msg!("No active master found")))?;
self.kill_master(active_idx)
}

/// Kill a master at the specified index
pub fn kill_master(&self, index: usize) -> FsResult<()> {
let entry = self
.master_entries
.get(&index)
.ok_or_else(|| FsError::common(err_msg!("Master at index {} not found", index)))?;

info!("Killing master at index {}", index);

// Set Raft role state to Exit to immediately mark as inactive
entry
.0
.master_monitor
.journal_ctl
.set_state(RoleState::Exit);

info!(
"Master at index {} has been killed (Raft role set to Exit)",
index
);
Ok(())
}

/// Wait for a new master to be elected after killing the old one
pub async fn wait_for_new_master(
&self,
timeout_secs: u64,
old_master_idx: Option<usize>,
) -> FsResult<usize> {
let wait_time = LocalTime::mills() + (timeout_secs * 1000);
let mut retry_count = 0;

info!(
"Waiting for new master election (timeout: {}s)...",
timeout_secs
);

// Step 1: Wait for old master to become inactive (if specified)
if let Some(old_idx) = old_master_idx {
info!(
"Step 1: Waiting for old master (index {}) to become inactive...",
old_idx
);
while LocalTime::mills() <= wait_time {
retry_count += 1;

if let Some(entry) = self.master_entries.get(&old_idx) {
if !entry.0.master_monitor.is_active() {
info!("Old master (index {}) is now inactive", old_idx);
break;
}
} else {
// Master entry removed, consider it inactive
info!("Old master (index {}) entry removed", old_idx);
break;
}

if retry_count % 10 == 0 {
info!(
"Still waiting for old master to become inactive... (attempt {})",
retry_count
);
}
tokio::time::sleep(Duration::from_millis(500)).await;
}
}

// Step 2: Wait for new master to be elected
info!("Step 2: Waiting for new master to be elected...");
retry_count = 0;
while LocalTime::mills() <= wait_time {
retry_count += 1;

// Check all masters to find a new active one
for entry in self.master_entries.iter() {
let idx = *entry.key();
let is_active = entry.0.master_monitor.is_active();

// If we're looking for a different master, verify it's not the old one
if is_active && (old_master_idx != Some(idx)) {
info!(
"New master elected at index {} (attempt {})",
idx, retry_count
);
return Ok(idx);
}
}

if retry_count % 10 == 0 {
info!(
"Still waiting for new master election... (attempt {})",
retry_count
);
}
tokio::time::sleep(Duration::from_millis(500)).await;
}

err_box!(
"New master election timeout after {}s (attempt {})",
timeout_secs,
retry_count
)
}

/// Check if a master at the specified index is running
pub fn is_master_running(&self, index: usize) -> bool {
if let Some(entry) = self.master_entries.get(&index) {
entry.0.master_monitor.is_active()
} else {
false
}
}

// Create the default master node configuration.
// Multiple masters can be created
fn default_master_conf(conf: &ClusterConf, num: u16) -> Vec<ClusterConf> {
Expand Down
5 changes: 5 additions & 0 deletions curvine-server/src/worker/worker_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,11 @@ impl Worker {
self.rpc_server.service()
}

// for test - get state controller to simulate server shutdown
pub fn new_state_ctl(&self) -> orpc::sync::StateCtl {
self.rpc_server.new_state_ctl()
}

#[cfg(target_os = "linux")]
async fn start_s3_gateway(mut conf: ClusterConf, worker_rt: Arc<Runtime>) {
let listen_addr = conf.s3_gateway.listen.clone();
Expand Down
Loading