diff --git a/crates/cli/tests/admin_decommission.rs b/crates/cli/tests/admin_decommission.rs new file mode 100644 index 0000000..dca79b4 --- /dev/null +++ b/crates/cli/tests/admin_decommission.rs @@ -0,0 +1,225 @@ +#![cfg(not(windows))] + +use std::io::{ErrorKind, Read, Write}; +use std::net::{TcpListener, TcpStream}; +use std::path::PathBuf; +use std::process::Command; +use std::sync::mpsc::{self, Receiver}; +use std::thread::{self, JoinHandle}; +use std::time::{Duration, Instant}; + +#[derive(Debug)] +struct CapturedAdminRequest { + method: String, + target: String, +} + +fn rc_binary() -> PathBuf { + if let Ok(path) = std::env::var("CARGO_BIN_EXE_rc") { + return PathBuf::from(path); + } + + let workspace_root = PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .parent() + .expect("cli crate has parent directory") + .parent() + .expect("workspace root exists") + .to_path_buf(); + + let debug_binary = workspace_root.join("target/debug/rc"); + if debug_binary.exists() { + return debug_binary; + } + + workspace_root.join("target/release/rc") +} + +fn read_admin_request(stream: &mut TcpStream) -> CapturedAdminRequest { + stream + .set_read_timeout(Some(Duration::from_secs(5))) + .expect("set read timeout"); + + let mut request = Vec::new(); + let mut buffer = [0_u8; 8192]; + loop { + let bytes_read = stream.read(&mut buffer).expect("read admin request"); + if bytes_read == 0 { + break; + } + request.extend_from_slice(&buffer[..bytes_read]); + if request.windows(4).any(|window| window == b"\r\n\r\n") { + break; + } + } + + let request = String::from_utf8(request).expect("admin request should be UTF-8"); + let request_line = request.lines().next().expect("request line"); + let mut parts = request_line.split_whitespace(); + let method = parts.next().expect("request method").to_string(); + let target = parts.next().expect("request target").to_string(); + + CapturedAdminRequest { method, target } +} + +fn start_admin_test_server( + response_body: &'static str, +) -> (String, Receiver, JoinHandle<()>) { + let listener = TcpListener::bind("127.0.0.1:0").expect("bind admin test server"); + listener + .set_nonblocking(true) + .expect("set admin test server nonblocking"); + let endpoint = format!("http://{}", listener.local_addr().expect("server address")); + let (sender, receiver) = mpsc::channel(); + + let handle = thread::spawn(move || { + let deadline = Instant::now() + Duration::from_secs(10); + let (mut stream, _) = loop { + match listener.accept() { + Ok(accepted) => break accepted, + Err(e) if e.kind() == ErrorKind::WouldBlock && Instant::now() < deadline => { + thread::sleep(Duration::from_millis(10)); + } + Err(e) => panic!("accept admin request: {e}"), + } + }; + stream + .set_nonblocking(false) + .expect("set admin request stream blocking"); + let request = read_admin_request(&mut stream); + sender.send(request).expect("send captured request"); + + let response = format!( + "HTTP/1.1 200 OK\r\ncontent-type: application/json\r\ncontent-length: {}\r\nconnection: close\r\n\r\n{}", + response_body.len(), + response_body + ); + stream + .write_all(response.as_bytes()) + .expect("write admin response"); + }); + + (endpoint, receiver, handle) +} + +fn rc_host_alias(endpoint: &str) -> String { + let (_, endpoint_authority) = endpoint.split_once("://").expect("endpoint has scheme"); + format!("http://ACCESS_KEY:SECRET_KEY@{endpoint_authority}") +} + +#[test] +fn decommission_start_dispatches_by_id_pool_json() { + let config_dir = tempfile::tempdir().expect("create config dir"); + let (endpoint, receiver, handle) = start_admin_test_server(""); + + let output = Command::new(rc_binary()) + .args([ + "--json", + "admin", + "decommission", + "start", + "myalias", + "1,2", + "--by-id", + ]) + .env("RC_CONFIG_DIR", config_dir.path()) + .env("RC_HOST_myalias", rc_host_alias(&endpoint)) + .output() + .expect("run rc command"); + + assert!( + output.status.success(), + "stderr: {}", + String::from_utf8_lossy(&output.stderr) + ); + + let stdout = String::from_utf8(output.stdout).expect("stdout should be UTF-8"); + let payload: serde_json::Value = serde_json::from_str(&stdout).expect("JSON output"); + assert_eq!(payload["success"], true); + assert_eq!(payload["message"], "Decommission started successfully"); + assert_eq!(payload["pool"], "1,2"); + + let request = receiver + .recv_timeout(Duration::from_secs(5)) + .expect("captured admin request"); + assert_eq!(request.method, "POST"); + assert_eq!( + request.target, + "/rustfs/admin/v3/pools/decommission?pool=1%2C2&by-id=true" + ); + + handle.join().expect("admin test server finished"); +} + +#[test] +fn decommission_status_without_pool_dispatches_to_pool_list_json() { + let config_dir = tempfile::tempdir().expect("create config dir"); + let (endpoint, receiver, handle) = start_admin_test_server( + r#"[{"id":0,"cmdline":"/data/pool0/disk{1...4}","lastUpdate":"2026-05-06T00:00:00Z","decommissionInfo":null}]"#, + ); + + let output = Command::new(rc_binary()) + .args(["--json", "admin", "decommission", "status", "myalias"]) + .env("RC_CONFIG_DIR", config_dir.path()) + .env("RC_HOST_myalias", rc_host_alias(&endpoint)) + .output() + .expect("run rc command"); + + assert!( + output.status.success(), + "stderr: {}", + String::from_utf8_lossy(&output.stderr) + ); + + let stdout = String::from_utf8(output.stdout).expect("stdout should be UTF-8"); + let payload: serde_json::Value = serde_json::from_str(&stdout).expect("JSON output"); + let pools = payload["pools"].as_array().expect("pools array"); + assert_eq!(pools.len(), 1); + assert_eq!(pools[0]["id"], 0); + assert_eq!(pools[0]["cmdline"], "/data/pool0/disk{1...4}"); + + let request = receiver + .recv_timeout(Duration::from_secs(5)) + .expect("captured admin request"); + assert_eq!(request.method, "GET"); + assert_eq!(request.target, "/rustfs/admin/v3/pools/list"); + + handle.join().expect("admin test server finished"); +} + +#[test] +fn decom_cancel_dispatches_by_id_pool_json() { + let config_dir = tempfile::tempdir().expect("create config dir"); + let (endpoint, receiver, handle) = start_admin_test_server(""); + + let output = Command::new(rc_binary()) + .args([ + "--json", "admin", "decom", "cancel", "myalias", "1", "--by-id", + ]) + .env("RC_CONFIG_DIR", config_dir.path()) + .env("RC_HOST_myalias", rc_host_alias(&endpoint)) + .output() + .expect("run rc command"); + + assert!( + output.status.success(), + "stderr: {}", + String::from_utf8_lossy(&output.stderr) + ); + + let stdout = String::from_utf8(output.stdout).expect("stdout should be UTF-8"); + let payload: serde_json::Value = serde_json::from_str(&stdout).expect("JSON output"); + assert_eq!(payload["success"], true); + assert_eq!(payload["message"], "Decommission canceled successfully"); + assert_eq!(payload["pool"], "1"); + + let request = receiver + .recv_timeout(Duration::from_secs(5)) + .expect("captured admin request"); + assert_eq!(request.method, "POST"); + assert_eq!( + request.target, + "/rustfs/admin/v3/pools/cancel?pool=1&by-id=true" + ); + + handle.join().expect("admin test server finished"); +} diff --git a/crates/cli/tests/admin_rebalance.rs b/crates/cli/tests/admin_rebalance.rs index 1657231..2f05865 100644 --- a/crates/cli/tests/admin_rebalance.rs +++ b/crates/cli/tests/admin_rebalance.rs @@ -82,6 +82,9 @@ fn start_admin_test_server( Err(e) => panic!("accept admin request: {e}"), } }; + stream + .set_nonblocking(false) + .expect("set admin request stream blocking"); let request = read_admin_request(&mut stream); sender.send(request).expect("send captured request");