diff --git a/Cargo.lock b/Cargo.lock index 8c0f1145f..6a9073eb4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1750,6 +1750,7 @@ dependencies = [ "statistical", "tempfile", "tokio", + "uuid", ] [[package]] @@ -1764,6 +1765,7 @@ dependencies = [ "schemars", "serde", "serde_json", + "uuid", ] [[package]] diff --git a/crutest/src/cli.rs b/crutest/src/cli.rs index c4209f9ec..e3a34679c 100644 --- a/crutest/src/cli.rs +++ b/crutest/src/cli.rs @@ -25,8 +25,9 @@ pub struct CliAction { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Debug, Parser, PartialEq)] pub enum DscCommand { - /// Connect to the default DSC server (http://127.0.0.1:9998) - Connect, + /// IP:Port for a dsc server + /// #[clap(long, global = true, default_value = "127.0.0.1:9998", action)] + Connect { server: SocketAddr }, /// Disable random stopping of downstairs DisableRandomStop, /// Disable auto restart on the given downstairs client ID @@ -380,7 +381,7 @@ async fn handle_dsc( ) { if let Some(dsc_client) = dsc_client { match dsc_cmd { - DscCommand::Connect => { + DscCommand::Connect { .. } => { println!("Already connected"); } DscCommand::DisableRandomStop => { @@ -444,13 +445,18 @@ async fn handle_dsc( println!("Got res: {:?}", res); } } - } else if dsc_cmd == DscCommand::Connect { - let url = "http://127.0.0.1:9998".to_string(); - println!("Connect to {:?}", url); - let rs = Client::new(&url); - *dsc_client = Some(rs); } else { - println!("dsc: Need to be connected first"); + match dsc_cmd { + DscCommand::Connect { server } => { + let url = format!("http://{}", server).to_string(); + println!("Connecting to {:?}", url); + let rs = Client::new(&url); + *dsc_client = Some(rs); + } + _ => { + println!("dsc: Need to be connected first"); + } + } } } /* diff --git a/crutest/src/main.rs b/crutest/src/main.rs index ef41385ed..dfbe20078 100644 --- a/crutest/src/main.rs +++ b/crutest/src/main.rs @@ -18,6 +18,7 @@ use futures::stream::FuturesOrdered; use futures::StreamExt; use human_bytes::human_bytes; use indicatif::{ProgressBar, ProgressStyle}; +use oximeter::types::ProducerRegistry; use rand::prelude::*; use rand_chacha::rand_core::SeedableRng; use serde::{Deserialize, Serialize}; @@ -117,11 +118,8 @@ enum Workload { /// Test the downstairs replay path. /// Stop a downstairs, then run some IO, then start that downstairs back /// up. Verify all IO to all downstairs finishes. - Replay { - /// URL location of the running dsc server - #[clap(long, default_value = "http://127.0.0.1:9998", action)] - dsc: String, - }, + /// This test requires a dsc server to control the downstairs. + Replay, /// Test the downstairs replacement path. /// Run IO to the upstairs, then replace a downstairs, then run /// more IO and verify it all works as expected. @@ -139,7 +137,7 @@ enum Workload { ReplaceBeforeActive { /// URL location of the running dsc server #[clap(long, default_value = "http://127.0.0.1:9998", action)] - dsc: String, + dsc_str: String, /// The address:port of a running downstairs for replacement #[clap(long, action)] @@ -149,7 +147,7 @@ enum Workload { ReplaceReconcile { /// URL location of the running dsc server #[clap(long, default_value = "http://127.0.0.1:9998", action)] - dsc: String, + dsc_str: String, /// The address:port of a running downstairs for replacement #[clap(long, action)] @@ -184,6 +182,12 @@ pub struct Opt { #[clap(short, long, global = true, action)] count: Option, + /// IP:Port for a dsc server. + /// Some tests require a dsc enpoint to control the downstairs. + /// A dsc endpoint can also be used to construct the initial Volume. + #[clap(long, global = true, action)] + dsc: Option, + /// How long to wait before the auto flush check fires #[clap(long, global = true, action)] flush_timeout: Option, @@ -255,13 +259,7 @@ pub struct Opt { stable: bool, /// The IP:Port where each downstairs is listening. - #[clap( - short, - long, - global = true, - default_value = "127.0.0.1:9000", - action - )] + #[clap(short, long, global = true, action)] target: Vec, /// A UUID to use for the upstairs. @@ -744,9 +742,190 @@ async fn handle_signals( } } +// Construct a volume for use by the tests. +// Our choice of how to construct the volume depends on what options we +// have been given. +// +// If we have been provided a vcr file, this will get first priority and all +// other options will be ignored. +// +// Second choice is if we are provided the address for a dsc server. We can +// use the dsc server to determine part of what we need to create a Volume. +// The rest of what we need we can gather from the CrucibleOpts, which are +// built from options provided on the command line, or their defaults. +// +// For the final choice we have to construct a Volume by asking our downstairs +// for information that we need up front, which we then combine with +// CrucibleOpts. This will work as long as one of the downstairs is up +// already. If we have a test that requires no downstairs to be running on +// startup, then we need to provide a VCR file, or use the dsc server. +async fn make_a_volume( + opt: &Opt, + block_io_logger: Logger, + test_log: &Logger, + pr: Option, +) -> Result> { + let up_uuid = opt.uuid.unwrap_or_else(Uuid::new_v4); + let mut crucible_opts = CrucibleOpts { + id: up_uuid, + target: opt.target.clone(), + lossy: opt.lossy, + flush_timeout: opt.flush_timeout, + key: opt.key.clone(), + cert_pem: opt.cert_pem.clone(), + key_pem: opt.key_pem.clone(), + root_cert_pem: opt.root_cert_pem.clone(), + control: opt.control, + read_only: opt.read_only, + }; + + if let Some(vcr_file) = &opt.vcr_file { + let vcr: VolumeConstructionRequest = match read_json(vcr_file) { + Ok(vcr) => vcr, + Err(e) => { + bail!("Error {:?} reading VCR from {:?}", e, vcr_file) + } + }; + info!(test_log, "Using VCR: {:?}", vcr); + + if opt.gen != 0 { + warn!(test_log, "gen option is ignored when VCR is provided"); + } + if !opt.target.is_empty() { + warn!(test_log, "targets are ignored when VCR is provided"); + } + let volume = Volume::construct(vcr, pr, block_io_logger).await.unwrap(); + Ok(Arc::new(volume)) + } else if opt.dsc.is_some() { + // We were given a dsc endpoint, use that to create a VCR that + // represents our Volume. + if !opt.target.is_empty() { + warn!(test_log, "targets are ignored when dsc option is provided"); + } + let dsc = opt.dsc.unwrap(); + let dsc_url = format!("http://{}", dsc); + let dsc_client = Client::new(&dsc_url); + let ri = match dsc_client.dsc_get_region_info().await { + Ok(res) => res.into_inner(), + Err(e) => { + bail!("Failed to get region info from {:?}: {}", dsc_url, e); + } + }; + info!(test_log, "use region info: {:?}", ri); + let extent_info = RegionExtentInfo { + block_size: ri.block_size, + blocks_per_extent: ri.blocks_per_extent, + extent_count: ri.extent_count, + }; + + let res = dsc_client.dsc_get_region_count().await.unwrap(); + let regions = res.into_inner(); + + let sv_count = regions / 3; + let region_remainder = regions % 3; + info!( + test_log, + "dsc has {} regions. This means {} sub_volumes", regions, sv_count + ); + if region_remainder != 0 { + warn!( + test_log, + "{} regions from dsc will not be part of any sub_volume", + region_remainder, + ); + } + + // We start by creating the overall volume. + let mut volume = Volume::new(extent_info.block_size, block_io_logger); + + // Now, loop over regions we found from dsc and make a + // sub_volume at every three. + let mut cid = 0; + for sv in 0..sv_count { + let mut targets = Vec::new(); + for _ in 0..3 { + let port = dsc_client.dsc_get_port(cid).await.unwrap(); + let tar: SocketAddr = + format!("{}:{}", dsc.ip(), port.into_inner()) + .parse() + .unwrap(); + targets.push(tar); + cid += 1; + } + info!(test_log, "SV {:?} has targets: {:?}", sv, targets); + crucible_opts.target = targets; + + volume + .add_subvolume_create_guest( + crucible_opts.clone(), + extent_info.clone(), + opt.gen, + pr.clone(), + ) + .await + .unwrap(); + } + + Ok(Arc::new(volume)) + } else { + // We were not provided a VCR, so, we have to make one by using + // the repair port on a downstairs to get region information that + // we require. Once we have that information, we can build a VCR + // from it. + + // For each sub-volume, we need to know: + // block_size, blocks_per_extent, and extent_size. We can get any + // of the target downstairs to give us this info, if they are + // running. We don't care which one responds. Any mismatch will + // be detected later in the process and handled by the upstairs. + let mut extent_info_result = None; + for target in &crucible_opts.target { + let port = target.port() + crucible_common::REPAIR_PORT_OFFSET; + println!("look at: http://{}:{} ", target.ip(), port); + let repair_url = format!("http://{}:{}", target.ip(), port); + let repair_client = repair_client::new(&repair_url); + match repair_client.get_region_info().await { + Ok(ri) => { + println!("RI is: {:?}", ri); + extent_info_result = Some(RegionExtentInfo { + block_size: ri.block_size(), + blocks_per_extent: ri.extent_size().value, + extent_count: ri.extent_count(), + }); + break; + } + Err(e) => { + println!( + "Failed to get info from {:?} {:?}", + repair_url, e + ); + } + } + } + let extent_info = match extent_info_result { + Some(ei) => ei, + None => { + bail!("Can't determine extent info to build a Volume"); + } + }; + + let mut volume = Volume::new(extent_info.block_size, block_io_logger); + volume + .add_subvolume_create_guest( + crucible_opts.clone(), + extent_info, + opt.gen, + pr, + ) + .await + .unwrap(); + + Ok(Arc::new(volume)) + } +} + /** - * This is an example Crucible client. - * Here we make use of the interfaces that Crucible exposes. + * A test program that makes use use of the interfaces that Crucible exposes. */ #[tokio::main] async fn main() -> Result<()> { @@ -768,26 +947,9 @@ async fn main() -> Result<()> { bail!("Verify requires verify_in file"); } - let up_uuid = opt.uuid.unwrap_or_else(Uuid::new_v4); - - let crucible_opts = CrucibleOpts { - id: up_uuid, - target: opt.target.clone(), - lossy: opt.lossy, - flush_timeout: opt.flush_timeout, - key: opt.key, - cert_pem: opt.cert_pem, - key_pem: opt.key_pem, - root_cert_pem: opt.root_cert_pem, - control: opt.control, - read_only: opt.read_only, - }; - - /* - * If just want the cli, then start that after our runtime. The cli - * does not need upstairs started, as that should happen in the - * cli-server code. - */ + // If just want the cli, then just run that function. The cli itself does + // not need to start the upstairs, as that should happen in the cli-server + // code in another process. if let Workload::Cli { attach } = opt.workload { cli::start_cli_client(attach).await?; return Ok(()); @@ -844,80 +1006,9 @@ async fn main() -> Result<()> { pr = None; } - // We need to build a Volume for all the tests to use. - // If we have received a VCR as input, we can use that. Otherwise we - // have to construct one by asking our downstairs for information that - // we need up front. This will work as long as one of the downstairs - // is up already. If we have a test that requires no downstairs to be - // running on startup, then we need to provide a VCR up front. - let block_io = { - if let Some(vcr_file) = opt.vcr_file { - let vcr: VolumeConstructionRequest = match read_json(&vcr_file) { - Ok(vcr) => vcr, - Err(e) => { - bail!("Error {:?} reading VCR from {:?}", e, vcr_file) - } - }; - let volume = - Volume::construct(vcr, pr, block_io_logger).await.unwrap(); - Arc::new(volume) - } else { - // We were not provided a VCR, so, we have to make one by using - // the repair port on a downstairs to get region information that - // we require. Once we have that information, we can build a VCR - // from it. - - // For each sub-volume, we need to know: - // block_size, blocks_per_extent, and extent_size. We can get any - // of the target downstairs to give us this info, if they are - // running. We don't care which one responds. Any mismatch will - // be detected later in the process and handled by the upstairs. - let mut extent_info_result = None; - for target in &crucible_opts.target { - let port = target.port() + crucible_common::REPAIR_PORT_OFFSET; - println!("look at: http://{}:{} ", target.ip(), port); - let repair_url = format!("http://{}:{}", target.ip(), port); - let repair_client = repair_client::new(&repair_url); - match repair_client.get_region_info().await { - Ok(ri) => { - println!("RI is: {:?}", ri); - extent_info_result = Some(RegionExtentInfo { - block_size: ri.block_size(), - blocks_per_extent: ri.extent_size().value, - extent_count: ri.extent_count(), - }); - break; - } - Err(e) => { - println!( - "Failed to get info from {:?} {:?}", - repair_url, e - ); - } - } - } - let extent_info = match extent_info_result { - Some(ei) => ei, - None => { - bail!("Can't determine extent info to build a Volume"); - } - }; - - let mut volume = - Volume::new(extent_info.block_size, block_io_logger); - volume - .add_subvolume_create_guest( - crucible_opts, - extent_info, - opt.gen, - pr, - ) - .await - .unwrap(); - - Arc::new(volume) - } - }; + // Build a Volume for all the tests to use. + let block_io = + make_a_volume(&opt, block_io_logger.clone(), &test_log, pr).await?; if let Workload::CliServer { listen, port } = opt.workload { cli::start_cli_server( @@ -1212,7 +1303,7 @@ async fn main() -> Result<()> { } return Ok(()); } - Workload::Replay { dsc } => { + Workload::Replay => { // Either we have a count, or we run until we get a signal. let mut wtq = { if opt.continuous { @@ -1222,8 +1313,15 @@ async fn main() -> Result<()> { WhenToQuit::Count { count } } }; - - let dsc_client = Client::new(&dsc); + let dsc_client = match opt.dsc { + Some(dsc_addr) => { + let dsc_url = format!("http://{}", dsc_addr); + Client::new(&dsc_url) + } + None => { + bail!("Replay workload requires a dsc endpoint"); + } + }; replay_workload(&block_io, &mut wtq, &mut region_info, dsc_client) .await?; } @@ -1255,8 +1353,11 @@ async fn main() -> Result<()> { ) .await?; } - Workload::ReplaceBeforeActive { dsc, replacement } => { - let dsc_client = Client::new(&dsc); + Workload::ReplaceBeforeActive { + dsc_str, + replacement, + } => { + let dsc_client = Client::new(&dsc_str); // Either we have a count, or we run until we get a signal. let wtq = { if opt.continuous { @@ -1283,8 +1384,11 @@ async fn main() -> Result<()> { ) .await?; } - Workload::ReplaceReconcile { dsc, replacement } => { - let dsc_client = Client::new(&dsc); + Workload::ReplaceReconcile { + dsc_str, + replacement, + } => { + let dsc_client = Client::new(&dsc_str); // Either we have a count, or we run until we get a signal. let wtq = { if opt.continuous { @@ -2145,8 +2249,8 @@ async fn replay_workload( "CLIENT: Up:{} ds:{} act:{}", wc.up_count, wc.ds_count, wc.active_count ); - if wc.up_count + wc.ds_count == 0 && wc.active_count == 3 { - println!("Replay: All jobs finished, all DS active."); + if wc.up_count + wc.ds_count == 0 { + println!("Replay: All jobs finished"); break; } tokio::time::sleep(tokio::time::Duration::from_secs(4)).await; diff --git a/dsc-client/Cargo.toml b/dsc-client/Cargo.toml index 13d18f265..4fd6d0223 100644 --- a/dsc-client/Cargo.toml +++ b/dsc-client/Cargo.toml @@ -6,10 +6,11 @@ edition = "2021" [dependencies] anyhow.workspace = true +crucible-workspace-hack.workspace = true percent-encoding.workspace = true progenitor.workspace = true reqwest.workspace = true schemars.workspace = true serde_json.workspace = true serde.workspace = true -crucible-workspace-hack.workspace = true +uuid.workspace = true diff --git a/dsc/Cargo.toml b/dsc/Cargo.toml index 3a758a023..341f33378 100644 --- a/dsc/Cargo.toml +++ b/dsc/Cargo.toml @@ -10,6 +10,7 @@ byte-unit.workspace = true clap.workspace = true crucible-client-types.workspace = true crucible-common.workspace = true +crucible-workspace-hack.workspace = true csv.workspace = true dsc-client.workspace = true dropshot.workspace = true @@ -19,7 +20,7 @@ schemars.workspace = true serde.workspace = true statistical.workspace = true tokio.workspace = true -crucible-workspace-hack.workspace = true +uuid.workspace = true [dev-dependencies] expectorate.workspace = true diff --git a/dsc/src/client.rs b/dsc/src/client.rs index 28ff8bcd5..be3c14e20 100644 --- a/dsc/src/client.rs +++ b/dsc/src/client.rs @@ -8,6 +8,8 @@ use anyhow::Result; #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Debug, Parser, PartialEq)] pub enum ClientCommand { + /// Returns true if all downstairs are running + AllRunning, /// Disable random stopping of downstairs DisableRandomStop, /// Disable auto restart on the given downstairs client ID @@ -46,6 +48,8 @@ pub enum ClientCommand { #[clap(long, short, action)] cid: u32, }, + /// Get count of regions. + RegionCount, /// Get region info. RegionInfo, /// Shutdown all downstairs, then shutdown dsc itself. @@ -71,6 +75,11 @@ pub enum ClientCommand { StopAll, /// Stop a random downstairs StopRand, + /// Get the UUID of the given client ID + Uuid { + #[clap(long, short, action)] + cid: u32, + }, } // Connect to the DSC and run a command. @@ -78,6 +87,10 @@ pub enum ClientCommand { pub async fn client_main(server: String, cmd: ClientCommand) -> Result<()> { let dsc = Client::new(&server); match cmd { + ClientCommand::AllRunning => { + let res = dsc.dsc_all_running().await.unwrap(); + println!("{:?}", res); + } ClientCommand::DisableRandomStop => { let _ = dsc.dsc_disable_random_stop().await.unwrap(); } @@ -110,6 +123,10 @@ pub async fn client_main(server: String, cmd: ClientCommand) -> Result<()> { let res = dsc.dsc_get_port(cid).await.unwrap(); println!("{:?}", res); } + ClientCommand::RegionCount => { + let res = dsc.dsc_get_region_count().await.unwrap(); + println!("{:?}", res); + } ClientCommand::RegionInfo => { let res = dsc.dsc_get_region_info().await.unwrap(); println!("{:?}", res); @@ -136,6 +153,10 @@ pub async fn client_main(server: String, cmd: ClientCommand) -> Result<()> { ClientCommand::StopRand => { let _ = dsc.dsc_stop_rand().await.unwrap(); } + ClientCommand::Uuid { cid } => { + let res = dsc.dsc_get_uuid(cid).await.unwrap(); + println!("{:?}", res); + } } Ok(()) } diff --git a/dsc/src/control.rs b/dsc/src/control.rs index ac60acd8e..371b2a70e 100644 --- a/dsc/src/control.rs +++ b/dsc/src/control.rs @@ -12,17 +12,19 @@ use dropshot::RequestContext; use dropshot::{HttpResponseOk, HttpResponseUpdatedNoContent}; use schemars::JsonSchema; use serde::Deserialize; -//use serde::Serialize; use std::sync::Arc; use super::*; pub(crate) fn build_api() -> ApiDescription> { let mut api = ApiDescription::new(); + api.register(dsc_all_running).unwrap(); api.register(dsc_get_ds_state).unwrap(); api.register(dsc_get_pid).unwrap(); api.register(dsc_get_port).unwrap(); + api.register(dsc_get_region_count).unwrap(); api.register(dsc_get_region_info).unwrap(); + api.register(dsc_get_uuid).unwrap(); api.register(dsc_stop).unwrap(); api.register(dsc_stop_all).unwrap(); api.register(dsc_stop_rand).unwrap(); @@ -187,6 +189,22 @@ async fn dsc_get_port( Ok(HttpResponseOk(ds_port)) } +/** + * Return true if all downstairs are running + */ +#[endpoint { + method = GET, + path = "/allrunning", +}] +async fn dsc_all_running( + rqctx: RequestContext>, +) -> Result, HttpError> { + let api_context = rqctx.context(); + + let all_state = api_context.dsci.all_running().await; + Ok(HttpResponseOk(all_state)) +} + /** * Fetch the current state for the requested client_id */ @@ -218,6 +236,37 @@ async fn dsc_get_ds_state( Ok(HttpResponseOk(ds_state)) } +/** + * Fetch the UUID for the requested client_id + */ +#[endpoint { + method = GET, + path = "/uuid/cid/{cid}", +}] +async fn dsc_get_uuid( + rqctx: RequestContext>, + path: Path, +) -> Result, HttpError> { + let path = path.into_inner(); + let cid = path.cid; + let api_context = rqctx.context(); + + if cid_bad(&api_context.dsci, cid).await { + return Err(HttpError::for_bad_request( + Some(String::from("BadInput")), + format!("Invalid client id: {}", cid), + )); + } + let uuid = api_context.dsci.get_ds_uuid(cid).await.map_err(|e| { + HttpError::for_bad_request( + None, + format!("failed to get UUID for downstairs {}: {:#}", cid, e), + ) + })?; + + Ok(HttpResponseOk(uuid)) +} + /** * Stop the downstairs at the given client_id */ @@ -406,6 +455,22 @@ async fn dsc_enable_restart_all( Ok(HttpResponseUpdatedNoContent()) } +/** + * Get the count of regions. + */ +#[endpoint { + method = GET, + path = "/regioncount", +}] +async fn dsc_get_region_count( + rqctx: RequestContext>, +) -> Result, HttpError> { + let api_context = rqctx.context(); + + let region_count = api_context.dsci.get_region_count().await; + Ok(HttpResponseOk(region_count)) +} + /** * Fetch the region info for our downstairs */ @@ -422,7 +487,7 @@ async fn dsc_get_region_info( api_context.dsci.get_region_info().await.map_err(|e| { HttpError::for_bad_request( None, - format!("failed get to region info {:#}", e), + format!("failed to get region info {:#}", e), ) })?; diff --git a/dsc/src/main.rs b/dsc/src/main.rs index 847343b06..fabc0392a 100644 --- a/dsc/src/main.rs +++ b/dsc/src/main.rs @@ -20,6 +20,7 @@ use tokio::process::{Child, Command}; use tokio::runtime::Builder; use tokio::sync::{mpsc, watch, Mutex}; use tokio::time::sleep_until; +use uuid::Uuid; pub mod client; pub mod control; @@ -145,7 +146,7 @@ enum Action { )] region_dir: Vec, }, - /// Start a downstairs region set + /// Start the requested downstairs regions /// This requires the region is already created, unless you include /// the --create option. Start { @@ -228,6 +229,7 @@ struct DownstairsInfo { ds_bin: String, region_dir: String, port: u32, + uuid: Uuid, _create_output: String, output_file: PathBuf, client_id: usize, @@ -235,10 +237,12 @@ struct DownstairsInfo { } impl DownstairsInfo { + #[allow(clippy::too_many_arguments)] fn new( ds_bin: String, region_dir: String, port: u32, + uuid: Uuid, _create_output: String, output_file: PathBuf, client_id: usize, @@ -248,6 +252,7 @@ impl DownstairsInfo { ds_bin, region_dir, port, + uuid, _create_output, output_file, client_id, @@ -295,9 +300,9 @@ impl DownstairsInfo { } } -// Describing the downstairs that together make a region. +// Describing all the downstairs regions we know about. #[derive(Debug)] -struct RegionSet { +struct Regions { ds: Vec>, ds_bin: String, region_dir: Vec, @@ -313,8 +318,8 @@ struct RegionSet { pub struct DscInfo { /// The directory location where output files are output_dir: PathBuf, - /// The region set that make our downstairs - rs: Mutex, + /// The regions this dsc knows about + rs: Mutex, /// Work for the dsc to do, what downstairs to start/stop/etc work: Mutex, /// If the downstairs are started read only @@ -427,7 +432,7 @@ impl DscInfo { }; assert_eq!(rv.len(), region_count); - let rs = RegionSet { + let rs = Regions { ds: Vec::new(), ds_bin: downstairs_bin, region_dir: rv, @@ -452,9 +457,9 @@ impl DscInfo { } /* - * Create a default region set. Attach it to our dsc info struct + * Create the requested set of regions. Attach it to our dsc info struct */ - async fn create_region_set( + async fn create_regions( &self, extent_size: u64, extent_count: u32, @@ -475,13 +480,12 @@ impl DscInfo { .await .unwrap(); } - println!("Region set with {region_count} regions was created"); + println!("Created {region_count} regions"); Ok(()) } /** - * Create a region as part of the region set at the given port with - * the provided extent size and extent_count. + * Create a region with the provided extent size and extent_count. */ async fn create_ds_region( &self, @@ -524,8 +528,7 @@ impl DscInfo { extent_count, }); } - - // use port to do this, or make a client ID that is port base, etc + // The port is determined by ds_id and the port step value. let port = rs.port_base + (ds_id as u32 * rs.port_step); let rd = &rs.region_dir[ds_id]; let new_region_dir = port_to_region(rd.clone(), port)?; @@ -533,6 +536,7 @@ impl DscInfo { let extent_count = format!("{}", extent_count); let block_size = format!("{}", block_size); let uuid = format!("12345678-0000-0000-0000-{:012}", port); + let ds_uuid = Uuid::parse_str(&uuid).unwrap(); let start = std::time::Instant::now(); let mut cmd_args = vec![ "create", @@ -589,6 +593,7 @@ impl DscInfo { rs.ds_bin.clone(), new_region_dir, port, + ds_uuid, String::from_utf8(output.stdout).unwrap(), output_path, ds_id, @@ -615,8 +620,8 @@ impl DscInfo { } /* - * Generate a region set using the starting port and region - * directories. Return error if any of them don't already exist. + * Generate regions using the starting port and region directories. + * Return error if any of them don't already exist. * TODO: This is assuming a fair amount of stuff. * Make fewer assumptions... */ @@ -624,8 +629,8 @@ impl DscInfo { let mut rs = self.rs.lock().await; let mut port = rs.port_base; - // If we are generating our region set, then we don't know any - // information yet about the region. + // Since we are generating our regions, we must create the required + // directories and files. let mut region_info: Option = None; for ds_id in 0..region_count { let rd = rs.region_dir[ds_id].clone(); @@ -683,6 +688,7 @@ impl DscInfo { rs.ds_bin.clone(), new_region_dir, port, + def.uuid(), "/dev/null".to_string(), output_path, ds_id, @@ -699,6 +705,16 @@ impl DscInfo { Ok(()) } + async fn all_running(&self) -> bool { + let rs = self.rs.lock().await; + for state in rs.ds_state.iter() { + if *state != DownstairsState::Running { + return false; + } + } + true + } + async fn get_ds_state(&self, client_id: usize) -> Result { let rs = self.rs.lock().await; if rs.ds_state.len() <= client_id { @@ -723,6 +739,14 @@ impl DscInfo { Ok(rs.ds[client_id].port) } + async fn get_ds_uuid(&self, client_id: usize) -> Result { + let rs = self.rs.lock().await; + if rs.ds.len() <= client_id { + bail!("Invalid client ID: {}", client_id); + } + Ok(rs.ds[client_id].uuid) + } + async fn get_region_info(&self) -> Result { let rs = self.rs.lock().await; if let Some(ri) = &rs.region_info { @@ -731,6 +755,11 @@ impl DscInfo { bail!("No region info found"); } } + + async fn get_region_count(&self) -> usize { + let rs = self.rs.lock().await; + rs.ds.len() + } } // This holds the work queue for the main task. Work is added @@ -917,6 +946,32 @@ async fn start_dsc( drop(tx); drop(rs); + // Wait here for all downstairs to start + let mut running = 0; + loop { + let res = rx.recv().await; + if let Some(mi) = res { + println!( + "[{}][{}] initial start wait reports {:?}", + mi.port, mi.client_id, mi.state + ); + let mut rs = dsci.rs.lock().await; + if mi.state == DownstairsState::Running { + running += 1; + } + + rs.ds_state[mi.client_id] = mi.state; + rs.ds_pid[mi.client_id] = mi.pid; + + if running == rs.ds_state.len() { + println!("All downstairs are running"); + break; + } + } else { + println!("rx.recv got None"); + } + } + let mut rng = rand_chacha::ChaCha8Rng::from_entropy(); let mut timeout_deadline = deadline_secs(5); let mut shutdown_sent = false; @@ -1055,7 +1110,7 @@ struct MonitorInfo { } /// State of a downstairs. -#[derive(Debug, Copy, Clone, Deserialize, Serialize, JsonSchema)] +#[derive(Debug, Copy, Clone, Deserialize, Serialize, JsonSchema, PartialEq)] #[serde(rename_all = "snake_case")] pub enum DownstairsState { Stopped, @@ -1509,7 +1564,7 @@ fn main() -> Result<()> { false, )?; - runtime.block_on(dsci.create_region_set( + runtime.block_on(dsci.create_regions( extent_size, extent_count, block_size, @@ -1569,7 +1624,7 @@ fn main() -> Result<()> { )?; if create { - runtime.block_on(dsci.create_region_set( + runtime.block_on(dsci.create_regions( extent_size, extent_count, block_size, diff --git a/openapi/dsc-control.json b/openapi/dsc-control.json index 5d217def6..367ac701f 100644 --- a/openapi/dsc-control.json +++ b/openapi/dsc-control.json @@ -5,6 +5,31 @@ "version": "0.0.0" }, "paths": { + "/allrunning": { + "get": { + "summary": "Return true if all downstairs are running", + "operationId": "dsc_all_running", + "responses": { + "200": { + "description": "successful operation", + "content": { + "application/json": { + "schema": { + "title": "Boolean", + "type": "boolean" + } + } + } + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" + } + } + } + }, "/disablerestart/all": { "post": { "summary": "Disable automatic restart on all downstairs", @@ -268,6 +293,33 @@ } } }, + "/regioncount": { + "get": { + "summary": "Get the count of regions.", + "operationId": "dsc_get_region_count", + "responses": { + "200": { + "description": "successful operation", + "content": { + "application/json": { + "schema": { + "title": "uint", + "type": "integer", + "format": "uint", + "minimum": 0 + } + } + } + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" + } + } + } + }, "/regioninfo": { "get": { "summary": "Fetch the region info for our downstairs", @@ -453,6 +505,44 @@ } } } + }, + "/uuid/cid/{cid}": { + "get": { + "summary": "Fetch the UUID for the requested client_id", + "operationId": "dsc_get_uuid", + "parameters": [ + { + "in": "path", + "name": "cid", + "required": true, + "schema": { + "type": "integer", + "format": "uint", + "minimum": 0 + } + } + ], + "responses": { + "200": { + "description": "successful operation", + "content": { + "application/json": { + "schema": { + "title": "Uuid", + "type": "string", + "format": "uuid" + } + } + } + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" + } + } + } } }, "components": { diff --git a/tools/test_fail_live_repair.sh b/tools/test_fail_live_repair.sh index c90f8b4da..f8d6ac266 100755 --- a/tools/test_fail_live_repair.sh +++ b/tools/test_fail_live_repair.sh @@ -95,8 +95,7 @@ fi echo "starting $(date)" | tee ${loop_log} echo "Tail $test_log for test output" -# Make enough extents that we can be sure to catch in while it -# is repairing. +# Make enough extents that we can be sure to catch it while it is repairing. if ! ${dsc} create --cleanup \ --ds-bin "$cds" \ --extent-count 400 \ diff --git a/tools/test_replay.sh b/tools/test_replay.sh index 2948a3eef..e6bceb929 100755 --- a/tools/test_replay.sh +++ b/tools/test_replay.sh @@ -70,15 +70,11 @@ if ! ps -p $dsc_pid > /dev/null; then exit 1 fi -args=() -args+=( -t "127.0.0.1:8810" ) -args+=( -t "127.0.0.1:8820" ) -args+=( -t "127.0.0.1:8830" ) - gen=1 # Initial seed for verify file echo "Running initial fill" | tee -a "$test_log" -if ! "$crucible_test" fill "${args[@]}" -q -g "$gen"\ +if ! "$crucible_test" fill -q -g "$gen"\ + --dsc 127.0.0.1:9998 \ --verify-out "$verify_log" --retry-activate >> "$test_log" 2>&1 ; then echo Failed on initial verify seed, check "$test_log" ${dsc} cmd shutdown @@ -88,9 +84,10 @@ fi SECONDS=0 echo "Replay loop starts now $(date)" | tee -a "$test_log" -"$crucible_test" replay "${args[@]}" -c "$loops" \ +"$crucible_test" replay -c "$loops" \ --stable -g "$gen" --verify-out "$verify_log" \ --verify-in "$verify_log" \ + --dsc 127.0.0.1:9998 \ --retry-activate >> "$test_log" 2>&1 result=$? duration=$SECONDS @@ -103,7 +100,8 @@ else "$loops" $((duration / 60)) $((duration % 60)) | tee -a "$test_log" echo "Do final verify" | tee -a "$test_log" - if ! "$crucible_test" verify "${args[@]}" -q -g "$gen"\ + if ! "$crucible_test" verify -q -g "$gen"\ + --dsc 127.0.0.1:9998 \ --verify-out "$verify_log" \ --verify-in "$verify_log" >> "$test_log" 2>&1 ; then echo Failed on final verify, check "$test_log" diff --git a/tools/test_restart_repair.sh b/tools/test_restart_repair.sh index 1812a1e77..7cd42bca7 100755 --- a/tools/test_restart_repair.sh +++ b/tools/test_restart_repair.sh @@ -25,17 +25,23 @@ function ctrl_c() { # Bring all downstairs online. function bring_all_downstairs_online() { - # dsc start all downstairs - if ! "$dsc" cmd start-all; then - echo "dsc: Failed to stop all downstairs" + # dsc turn on automatic restart + if ! "$dsc" cmd enable-restart-all; then + echo "dsc: Failed to enable automatic restart" exit 1 fi - # dsc turn on automatic restart - if ! "$dsc" cmd enable-restart-all; then - echo "dsc: Failed to disable automatic restart" + # dsc start all downstairs + if ! "$dsc" cmd start-all; then + echo "dsc: Failed to start all downstairs" exit 1 fi + ready=$("$dsc" cmd all-running) + while [[ "$ready" != "true" ]]; do + echo "Waiting for all downstairs to come online" >> "$test_log" + sleep 5 + ready=$("$dsc" cmd all-running) + done } # Stop all downstairs.