@@ -18,15 +18,16 @@ use crate::master::Master;
1818use crate :: worker:: Worker ;
1919use curvine_client:: file:: CurvineFileSystem ;
2020use curvine_common:: conf:: ClusterConf ;
21- use curvine_common:: raft:: { NodeId , RaftPeer } ;
21+ use curvine_common:: error:: FsError ;
22+ use curvine_common:: raft:: { NodeId , RaftPeer , RoleState } ;
2223use curvine_common:: FsResult ;
2324use dashmap:: DashMap ;
2425use log:: info;
2526use orpc:: client:: RpcClient ;
2627use orpc:: common:: LocalTime ;
2728use orpc:: io:: net:: { InetAddr , NetUtils } ;
28- use orpc:: runtime:: { RpcRuntime , Runtime } ;
29- use orpc:: { err_box, CommonResult } ;
29+ use orpc:: runtime:: Runtime ;
30+ use orpc:: { err_box, err_msg , CommonResult } ;
3031use std:: sync:: Arc ;
3132use std:: thread;
3233use std:: time:: Duration ;
@@ -82,20 +83,10 @@ impl MiniCluster {
8283
8384 pub fn start_cluster ( & self ) {
8485 self . start_master ( ) ;
85-
86- // Wait for Master to be fully ready before starting Workers
87- self . client_rt . block_on ( self . wait_master_ready ( ) ) . unwrap ( ) ;
88-
89- // Add a small additional delay to ensure Master is fully stable
90- std:: thread:: sleep ( Duration :: from_millis ( 500 ) ) ;
91-
92- self . start_worker ( ) ;
93-
94- self . client_rt . block_on ( self . wait_ready ( ) ) . unwrap ( ) ;
9586 }
9687
9788 /// Wait for Master service to be fully ready (Raft Leader + RPC service started)
98- async fn wait_master_ready ( & self ) -> FsResult < ( ) > {
89+ pub async fn wait_master_ready ( & self ) -> FsResult < ( ) > {
9990 let wait_time = LocalTime :: mills ( ) + 60 * 1000 ; // 60 second timeout
10091 let mut retry_count = 0 ;
10192 let mut rpc_connection_attempted = false ;
@@ -123,7 +114,6 @@ impl MiniCluster {
123114 }
124115
125116 if raft_leader_ready {
126- // Additional check: try to create a simple RPC connection to verify the service is actually listening
127117 let conf = self . master_conf ( ) . client_rpc_conf ( ) ;
128118 let addr = self . master_conf ( ) . master_addr ( ) ;
129119
@@ -154,7 +144,7 @@ impl MiniCluster {
154144 }
155145
156146 /// Wait for the entire cluster to be ready (all Workers registered)
157- async fn wait_ready ( & self ) -> FsResult < ( ) > {
147+ pub async fn wait_ready ( & self ) -> FsResult < ( ) > {
158148 let fs = self . new_fs ( ) ;
159149 let wait_time = LocalTime :: mills ( ) + 20 * 1000 ; // 20 second timeout
160150 let mut retry_count = 0 ;
@@ -225,6 +215,146 @@ impl MiniCluster {
225215 . unwrap ( )
226216 }
227217
218+ /// Get the index of the currently active master
219+ pub fn get_active_master_index ( & self ) -> Option < usize > {
220+ self . master_entries
221+ . iter ( )
222+ . find ( |x| x. 0 . master_monitor . is_active ( ) )
223+ . map ( |x| * x. key ( ) )
224+ }
225+
226+ /// Wait for cluster to be fully ready (Master + Workers)
227+ pub async fn wait_cluster_ready ( & self ) -> FsResult < ( ) > {
228+ // Wait for master to be ready
229+ self . wait_master_ready ( ) . await ?;
230+
231+ // Wait for workers to register
232+ self . wait_ready ( ) . await ?;
233+
234+ Ok ( ( ) )
235+ }
236+
237+ /// Kill the currently active master
238+ pub fn kill_active_master ( & self ) -> FsResult < ( ) > {
239+ let active_idx = self
240+ . get_active_master_index ( )
241+ . ok_or_else ( || FsError :: common ( err_msg ! ( "No active master found" ) ) ) ?;
242+ self . kill_master ( active_idx)
243+ }
244+
245+ /// Kill a master at the specified index
246+ pub fn kill_master ( & self , index : usize ) -> FsResult < ( ) > {
247+ let entry = self
248+ . master_entries
249+ . get ( & index)
250+ . ok_or_else ( || FsError :: common ( err_msg ! ( "Master at index {} not found" , index) ) ) ?;
251+
252+ info ! ( "Killing master at index {}" , index) ;
253+
254+ // Set Raft role state to Exit to immediately mark as inactive
255+ entry
256+ . 0
257+ . master_monitor
258+ . journal_ctl
259+ . set_state ( RoleState :: Exit ) ;
260+
261+ info ! (
262+ "Master at index {} has been killed (Raft role set to Exit)" ,
263+ index
264+ ) ;
265+ Ok ( ( ) )
266+ }
267+
268+ /// Wait for a new master to be elected after killing the old one
269+ pub async fn wait_for_new_master (
270+ & self ,
271+ timeout_secs : u64 ,
272+ old_master_idx : Option < usize > ,
273+ ) -> FsResult < usize > {
274+ let wait_time = LocalTime :: mills ( ) + ( timeout_secs * 1000 ) ;
275+ let mut retry_count = 0 ;
276+
277+ info ! (
278+ "Waiting for new master election (timeout: {}s)..." ,
279+ timeout_secs
280+ ) ;
281+
282+ // Step 1: Wait for old master to become inactive (if specified)
283+ if let Some ( old_idx) = old_master_idx {
284+ info ! (
285+ "Step 1: Waiting for old master (index {}) to become inactive..." ,
286+ old_idx
287+ ) ;
288+ while LocalTime :: mills ( ) <= wait_time {
289+ retry_count += 1 ;
290+
291+ if let Some ( entry) = self . master_entries . get ( & old_idx) {
292+ if !entry. 0 . master_monitor . is_active ( ) {
293+ info ! ( "Old master (index {}) is now inactive" , old_idx) ;
294+ break ;
295+ }
296+ } else {
297+ // Master entry removed, consider it inactive
298+ info ! ( "Old master (index {}) entry removed" , old_idx) ;
299+ break ;
300+ }
301+
302+ if retry_count % 10 == 0 {
303+ info ! (
304+ "Still waiting for old master to become inactive... (attempt {})" ,
305+ retry_count
306+ ) ;
307+ }
308+ tokio:: time:: sleep ( Duration :: from_millis ( 500 ) ) . await ;
309+ }
310+ }
311+
312+ // Step 2: Wait for new master to be elected
313+ info ! ( "Step 2: Waiting for new master to be elected..." ) ;
314+ retry_count = 0 ;
315+ while LocalTime :: mills ( ) <= wait_time {
316+ retry_count += 1 ;
317+
318+ // Check all masters to find a new active one
319+ for entry in self . master_entries . iter ( ) {
320+ let idx = * entry. key ( ) ;
321+ let is_active = entry. 0 . master_monitor . is_active ( ) ;
322+
323+ // If we're looking for a different master, verify it's not the old one
324+ if is_active && ( old_master_idx != Some ( idx) ) {
325+ info ! (
326+ "New master elected at index {} (attempt {})" ,
327+ idx, retry_count
328+ ) ;
329+ return Ok ( idx) ;
330+ }
331+ }
332+
333+ if retry_count % 10 == 0 {
334+ info ! (
335+ "Still waiting for new master election... (attempt {})" ,
336+ retry_count
337+ ) ;
338+ }
339+ tokio:: time:: sleep ( Duration :: from_millis ( 500 ) ) . await ;
340+ }
341+
342+ err_box ! (
343+ "New master election timeout after {}s (attempt {})" ,
344+ timeout_secs,
345+ retry_count
346+ )
347+ }
348+
349+ /// Check if a master at the specified index is running
350+ pub fn is_master_running ( & self , index : usize ) -> bool {
351+ if let Some ( entry) = self . master_entries . get ( & index) {
352+ entry. 0 . master_monitor . is_active ( )
353+ } else {
354+ false
355+ }
356+ }
357+
228358 // Create the default master node configuration.
229359 // Multiple masters can be created
230360 fn default_master_conf ( conf : & ClusterConf , num : u16 ) -> Vec < ClusterConf > {
0 commit comments