diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index a2b46d1..3320b1b 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -11,7 +11,7 @@ concurrency: cancel-in-progress: true jobs: - lint: + lint-and-test: runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 @@ -22,6 +22,9 @@ jobs: - name: Run clippy run: cargo clippy + - name: Run tests + run: cargo test + build: runs-on: ubuntu-latest steps: diff --git a/Cargo.lock b/Cargo.lock index 765152a..cb2d779 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -30,7 +30,7 @@ checksum = "b9ccdd8f2a161be9bd5c023df56f1b2a0bd1d83872ae53b71a84a12c9bf6e842" dependencies = [ "proc-macro2", "quote", - "syn 2.0.15", + "syn 2.0.25", ] [[package]] @@ -70,6 +70,7 @@ dependencies = [ "nix", "rand", "rand_distr", + "serde", "syscalls", "usdt", ] @@ -277,7 +278,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" dependencies = [ "proc-macro2", "quote", - "syn 2.0.15", + "syn 2.0.25", ] [[package]] @@ -540,7 +541,7 @@ dependencies = [ "pest_meta", "proc-macro2", "quote", - "syn 2.0.15", + "syn 2.0.25", ] [[package]] @@ -574,18 +575,18 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" [[package]] name = "proc-macro2" -version = "1.0.56" +version = "1.0.64" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b63bdb0cd06f1f4dedf69b254734f9b45af66e4a031e42a7480257d9898b435" +checksum = "78803b62cbf1f46fde80d7c0e803111524b9877184cfe7c3033659490ac7a7da" dependencies = [ "unicode-ident", ] [[package]] name = "quote" -version = "1.0.26" +version = "1.0.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4424af4bf778aae2051a77b60283332f386554255d722233d09fbfc7e30da2fc" +checksum = "573015e8ab27661678357f27dc26460738fd2b6c86e46f386fde94cb5d913105" dependencies = [ "proc-macro2", ] @@ -685,22 +686,22 @@ checksum = "f91339c0467de62360649f8d3e185ca8de4224ff281f66000de5eb2a77a79041" [[package]] name = "serde" -version = "1.0.160" +version = "1.0.171" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb2f3770c8bce3bcda7e149193a069a0f4365bda1fa5cd88e03bca26afc1216c" +checksum = "30e27d1e4fd7659406c492fd6cfaf2066ba8773de45ca75e855590f856dc34a9" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.160" +version = "1.0.171" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "291a097c63d8497e00160b166a967a4a79c64f3facdd01cbd7502231688d77df" +checksum = "389894603bd18c46fa56231694f8d827779c0951a667087194cf9de94ed24682" dependencies = [ "proc-macro2", "quote", - "syn 2.0.15", + "syn 2.0.25", ] [[package]] @@ -722,7 +723,7 @@ checksum = "bcec881020c684085e55a25f7fd888954d56609ef363479dc5a1305eb0d40cab" dependencies = [ "proc-macro2", "quote", - "syn 2.0.15", + "syn 2.0.25", ] [[package]] @@ -775,9 +776,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.15" +version = "2.0.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a34fcf3e8b60f57e6a14301a2e916d323af98b0ea63c599441eec8558660c822" +checksum = "15e3fc8c0c74267e2df136e5e5fb656a464158aa57624053375eb9c8c6e25ae2" dependencies = [ "proc-macro2", "quote", @@ -833,7 +834,7 @@ checksum = "f9456a42c5b0d803c8cd86e73dd7cc9edd429499f37a3550d286d5e86720569f" dependencies = [ "proc-macro2", "quote", - "syn 2.0.15", + "syn 2.0.25", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index c0e41c1..ad08e57 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,3 +18,4 @@ log = "0.4.6" env_logger = "0.9.0" config = "0.13.3" syscalls = "0.6.13" +serde = { version = "1.0.171", features = ["derive"] } diff --git a/src/lib.rs b/src/lib.rs index 209cfdb..551a739 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,28 +1,182 @@ +use serde::Deserialize; + pub mod worker; -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Copy, Clone, Deserialize)] +#[serde(tag = "distribution")] pub enum Distribution { - Zipfian, - Uniform, + #[serde(alias = "zipf")] + Zipfian { n_ports: u64, exponent: f64 }, + #[serde(alias = "uniform")] + Uniform { lower: u64, upper: u64 }, } -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Copy, Clone, Deserialize)] +#[serde(rename_all = "lowercase", tag = "type")] pub enum Workload { - Endpoints, - Processes, - Syscalls, + Endpoints { + #[serde(flatten)] + distribution: Distribution, + }, + Processes { + arrival_rate: f64, + departure_rate: f64, + random_process: bool, + }, + Syscalls { + arrival_rate: f64, + }, } -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Copy, Clone, Deserialize)] pub struct WorkloadConfig { pub restart_interval: u64, - pub endpoints_dist: Distribution, pub workload: Workload, - pub zipf_exponent: f64, - pub n_ports: u64, - pub uniform_lower: u64, - pub uniform_upper: u64, - pub arrival_rate: f64, - pub departure_rate: f64, - pub random_process: bool, +} + +#[cfg(test)] +mod tests { + use super::*; + use config::{Config, File, FileFormat}; + + #[test] + fn test_processes() { + let input = r#" + restart_interval = 10 + + [workload] + type = "processes" + arrival_rate = 10.0 + departure_rate = 200.0 + random_process = true + "#; + + let config = Config::builder() + .add_source(File::from_str(input, FileFormat::Toml)) + .build() + .expect("failed to parse configuration") + .try_deserialize::() + .expect("failed to deserialize into WorkloadConfig"); + + let WorkloadConfig { + restart_interval, + workload, + } = config; + assert_eq!(restart_interval, 10); + if let Workload::Processes { + arrival_rate, + departure_rate, + random_process, + } = workload + { + assert_eq!(arrival_rate, 10.0); + assert_eq!(departure_rate, 200.0); + assert!(random_process); + } else { + panic!("wrong workload type found"); + } + } + + #[test] + fn test_endpoints_zipf() { + let input = r#" + restart_interval = 10 + + [workload] + type = "endpoints" + distribution = "zipf" + n_ports = 200 + exponent = 1.4 + "#; + + let config = Config::builder() + .add_source(File::from_str(input, FileFormat::Toml)) + .build() + .expect("failed to parse configuration") + .try_deserialize::() + .expect("failed to deserialize into WorkloadConfig"); + + let WorkloadConfig { + restart_interval, + workload, + } = config; + assert_eq!(restart_interval, 10); + + if let Workload::Endpoints { distribution, .. } = workload { + if let Distribution::Zipfian { n_ports, exponent } = distribution { + assert_eq!(n_ports, 200); + assert_eq!(exponent, 1.4); + } else { + panic!("wrong distribution type found"); + } + } else { + panic!("wrong workload type found"); + } + } + + #[test] + fn test_endpoints_uniform() { + let input = r#" + restart_interval = 10 + + [workload] + type = "endpoints" + distribution = "uniform" + upper = 100 + lower = 1 + "#; + + let config = Config::builder() + .add_source(File::from_str(input, FileFormat::Toml)) + .build() + .expect("failed to parse configuration") + .try_deserialize::() + .expect("failed to deserialize into WorkloadConfig"); + + let WorkloadConfig { + restart_interval, + workload, + } = config; + assert_eq!(restart_interval, 10); + + if let Workload::Endpoints { distribution } = workload { + if let Distribution::Uniform { lower, upper } = distribution { + assert_eq!(lower, 1); + assert_eq!(upper, 100); + } else { + panic!("wrong distribution type found"); + } + } else { + panic!("wrong workload type found"); + } + } + + #[test] + fn test_syscalls() { + let input = r#" + restart_interval = 10 + + [workload] + type = "syscalls" + arrival_rate = 10.0 + "#; + + let config = Config::builder() + .add_source(File::from_str(input, FileFormat::Toml)) + .build() + .expect("failed to parse configuration") + .try_deserialize::() + .expect("failed to deserialize into WorkloadConfig"); + + let WorkloadConfig { + restart_interval, + workload, + } = config; + assert_eq!(restart_interval, 10); + if let Workload::Syscalls { arrival_rate } = workload { + assert_eq!(arrival_rate, 10.0); + } else { + panic!("wrong workload type found"); + } + } } diff --git a/src/main.rs b/src/main.rs index 817ad7a..2adb93b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,24 +2,18 @@ extern crate log; extern crate core_affinity; -use std::collections::HashMap; - -use berserker::WorkloadConfig; use config::Config; use fork::{fork, Fork}; use itertools::iproduct; use nix::sys::wait::waitpid; use nix::unistd::Pid; -use rand::prelude::*; -use rand_distr::Uniform; -use rand_distr::Zipf; -use berserker::{worker::WorkerConfig, Distribution, Workload}; +use berserker::{worker::new_worker, WorkloadConfig}; fn main() { // Retrieve the IDs of all active CPU cores. let core_ids = core_affinity::get_core_ids().unwrap(); - let settings = Config::builder() + let config = Config::builder() // Add in `./Settings.toml` .add_source(config::File::with_name("/etc/berserker/workload.toml").required(false)) .add_source(config::File::with_name("workload.toml").required(false)) @@ -28,7 +22,7 @@ fn main() { .add_source(config::Environment::with_prefix("WORKLOAD")) .build() .unwrap() - .try_deserialize::>() + .try_deserialize::() .unwrap(); let mut lower = 1024; @@ -36,51 +30,10 @@ fn main() { env_logger::init(); - let workload = match settings["workload"].as_str() { - "endpoints" => Workload::Endpoints, - "processes" => Workload::Processes, - "syscalls" => Workload::Syscalls, - _ => Workload::Endpoints, - }; - - let endpoints_dist = match settings["endpoints_distribution"].as_str() { - "zipf" => Distribution::Zipfian, - "uniform" => Distribution::Uniform, - _ => Distribution::Zipfian, - }; - - let config = WorkloadConfig { - restart_interval: settings["restart_interval"].parse::().unwrap(), - endpoints_dist, - workload, - zipf_exponent: settings["zipf_exponent"].parse::().unwrap(), - n_ports: settings["n_ports"].parse::().unwrap(), - arrival_rate: settings["arrival_rate"].parse::().unwrap(), - departure_rate: settings["departure_rate"].parse::().unwrap(), - uniform_lower: settings["uniform_lower"].parse::().unwrap(), - uniform_upper: settings["uniform_upper"].parse::().unwrap(), - random_process: settings["random_process"].parse::().unwrap(), - }; - // Create processes for each active CPU core. let handles: Vec<_> = iproduct!(core_ids.into_iter(), 0..9) .map(|(cpu, process)| { - match config.endpoints_dist { - Distribution::Zipfian => { - let n_ports: f64 = thread_rng() - .sample(Zipf::new(config.n_ports, config.zipf_exponent).unwrap()); - - lower = upper; - upper += n_ports as usize; - } - Distribution::Uniform => { - let n_ports = thread_rng() - .sample(Uniform::new(config.uniform_lower, config.uniform_upper)); - - lower = upper; - upper += n_ports as usize; - } - } + let worker = new_worker(config, cpu, process, &mut lower, &mut upper); match fork() { Ok(Fork::Parent(child)) => { @@ -89,21 +42,15 @@ fn main() { } Ok(Fork::Child) => { if core_affinity::set_for_current(cpu) { - let worker_config = WorkerConfig::new(config, cpu, process, lower, upper); - loop { - let _res = match config.workload { - Workload::Endpoints => worker_config.listen_payload(), - Workload::Processes => worker_config.process_payload(), - Workload::Syscalls => worker_config.syscalls_payload(), - }; + worker.run_payload().unwrap(); } } None } - Err(_) => { - warn!("Failed"); + Err(e) => { + warn!("Failed: {e:?}"); None } } diff --git a/src/worker.rs b/src/worker.rs deleted file mode 100644 index 7b8a723..0000000 --- a/src/worker.rs +++ /dev/null @@ -1,168 +0,0 @@ -use std::{io::Result, net::TcpListener, process::Command, thread, time}; - -use core_affinity::CoreId; -use fork::{fork, Fork}; -use log::{info, warn}; -use nix::{sys::wait::waitpid, unistd::Pid}; -use rand::{distributions::Alphanumeric, thread_rng, Rng}; -use rand_distr::Exp; -use syscalls::{syscall, Sysno}; - -use crate::WorkloadConfig; - -#[derive(Debug, Copy, Clone)] -pub struct WorkerConfig { - workload: WorkloadConfig, - cpu: CoreId, - process: usize, - lower: usize, - upper: usize, -} - -impl WorkerConfig { - pub fn new( - workload: WorkloadConfig, - cpu: CoreId, - process: usize, - lower: usize, - upper: usize, - ) -> Self { - WorkerConfig { - workload, - cpu, - process, - lower, - upper, - } - } - - fn spawn_process(&self, lifetime: u64) -> Result<()> { - if self.workload.random_process { - let uniq_arg: String = rand::thread_rng() - .sample_iter(&Alphanumeric) - .take(7) - .map(char::from) - .collect(); - let _res = Command::new("stub").arg(uniq_arg).output().unwrap(); - Ok(()) - } else { - match fork() { - Ok(Fork::Parent(child)) => { - info!("Parent: child {}", child); - waitpid(Pid::from_raw(child), None).unwrap(); - Ok(()) - } - Ok(Fork::Child) => { - info!( - "{}-{}: Child start, {}", - self.cpu.id, self.process, lifetime - ); - thread::sleep(time::Duration::from_millis(lifetime)); - info!("{}-{}: Child stop", self.cpu.id, self.process); - Ok(()) - } - Err(_) => { - warn!("Failed"); - Ok(()) - } - } - } - } - - // Spawn processes with a specified rate - pub fn process_payload(&self) -> std::io::Result<()> { - info!( - "Process {} from {}: {}-{}", - self.process, self.cpu.id, self.lower, self.upper - ); - - loop { - let lifetime: f64 = - thread_rng().sample(Exp::new(self.workload.departure_rate).unwrap()); - - let worker = *self; - thread::spawn(move || worker.spawn_process((lifetime * 1000.0).round() as u64)); - - let interval: f64 = thread_rng().sample(Exp::new(self.workload.arrival_rate).unwrap()); - info!( - "{}-{}: Interval {}, rounded {}, lifetime {}, rounded {}", - self.cpu.id, - self.process, - interval, - (interval * 1000.0).round() as u64, - lifetime, - (lifetime * 1000.0).round() as u64 - ); - thread::sleep(time::Duration::from_millis( - (interval * 1000.0).round() as u64 - )); - info!("{}-{}: Continue", self.cpu.id, self.process); - } - } - - pub fn listen_payload(&self) -> std::io::Result<()> { - info!( - "Process {} from {}: {}-{}", - self.process, self.cpu.id, self.lower, self.upper - ); - - let restart_interval = self.workload.restart_interval; - - let listeners: Vec<_> = (self.lower..self.upper) - .map(|port| thread::spawn(move || listen(port, restart_interval))) - .collect(); - - for listener in listeners { - let _res = listener.join().unwrap(); - } - - Ok(()) - } - - pub fn syscalls_payload(&self) -> Result<()> { - info!( - "Process {} from {}: {}-{}", - self.process, self.cpu.id, self.lower, self.upper - ); - - loop { - let worker = *self; - thread::spawn(move || { - worker.do_syscall().unwrap(); - }); - - let interval: f64 = thread_rng().sample(Exp::new(self.workload.arrival_rate).unwrap()); - info!( - "{}-{}: Interval {}, rounded {}", - self.cpu.id, - self.process, - interval, - (interval * 1000.0).round() as u64 - ); - thread::sleep(time::Duration::from_millis( - (interval * 1000.0).round() as u64 - )); - info!("{}-{}: Continue", self.cpu.id, self.process); - } - } - - fn do_syscall(&self) -> std::io::Result<()> { - match unsafe { syscall!(Sysno::getpid) } { - Ok(_) => Ok(()), - Err(err) => { - warn!("Syscall failed: {}", err); - Ok(()) - } - } - } -} - -fn listen(port: usize, sleep: u64) -> std::io::Result<()> { - let addr = format!("127.0.0.1:{port}"); - let listener = TcpListener::bind(addr)?; - - let _res = listener.incoming(); - - thread::sleep(time::Duration::from_secs(sleep)); - Ok(()) -} diff --git a/src/worker/endpoints.rs b/src/worker/endpoints.rs new file mode 100644 index 0000000..4953e08 --- /dev/null +++ b/src/worker/endpoints.rs @@ -0,0 +1,81 @@ +use std::{fmt::Display, net::TcpListener, thread, time}; + +use core_affinity::CoreId; +use log::info; + +use crate::WorkloadConfig; + +use super::{BaseConfig, Worker, WorkerError}; + +struct EndpointWorkload { + restart_interval: u64, + lower: usize, + upper: usize, +} + +pub struct EndpointWorker { + config: BaseConfig, + workload: EndpointWorkload, +} + +impl EndpointWorker { + pub fn new( + workload: WorkloadConfig, + cpu: CoreId, + process: usize, + lower: usize, + upper: usize, + ) -> Self { + let WorkloadConfig { + restart_interval, + workload: _, + } = workload; + + EndpointWorker { + config: BaseConfig { cpu, process }, + workload: EndpointWorkload { + restart_interval, + lower, + upper, + }, + } + } +} + +impl Worker for EndpointWorker { + fn run_payload(&self) -> Result<(), WorkerError> { + info!("{self}"); + + let EndpointWorkload { + restart_interval, + lower, + upper, + } = self.workload; + + let listeners: Vec<_> = (lower..upper) + .map(|port| thread::spawn(move || listen(port, restart_interval))) + .collect(); + + for listener in listeners { + let _res = listener.join().unwrap(); + } + + Ok(()) + } +} + +impl Display for EndpointWorker { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.config) + } +} + +fn listen(port: usize, sleep: u64) -> std::io::Result<()> { + let addr = format!("127.0.0.1:{port}"); + let listener = TcpListener::bind(addr)?; + + let _res = listener.incoming(); + + thread::sleep(time::Duration::from_secs(sleep)); + Ok(()) +} diff --git a/src/worker/mod.rs b/src/worker/mod.rs new file mode 100644 index 0000000..9804626 --- /dev/null +++ b/src/worker/mod.rs @@ -0,0 +1,75 @@ +use std::fmt::Display; + +use core_affinity::CoreId; +use rand::{thread_rng, Rng}; +use rand_distr::{Uniform, Zipf}; + +use crate::{Distribution, Workload, WorkloadConfig}; + +use self::{endpoints::EndpointWorker, processes::ProcessesWorker, syscalls::SyscallsWorker}; + +pub mod endpoints; +pub mod processes; +pub mod syscalls; + +#[derive(Debug)] +pub enum WorkerError {} + +impl Display for WorkerError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "worker error found") + } +} + +pub trait Worker { + fn run_payload(&self) -> Result<(), WorkerError>; +} + +#[derive(Debug, Copy, Clone)] +struct BaseConfig { + cpu: CoreId, + process: usize, +} + +impl Display for BaseConfig { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Process {} from {}", self.process, self.cpu.id) + } +} + +pub fn new_worker( + workload: WorkloadConfig, + cpu: CoreId, + process: usize, + lower_bound: &mut usize, + upper_bound: &mut usize, +) -> Box { + match workload.workload { + Workload::Processes { .. } => Box::new(ProcessesWorker::new(workload, cpu, process)), + Workload::Endpoints { distribution } => { + match distribution { + Distribution::Zipfian { n_ports, exponent } => { + let n_ports: f64 = thread_rng().sample(Zipf::new(n_ports, exponent).unwrap()); + + *lower_bound = *upper_bound; + *upper_bound += n_ports as usize; + } + Distribution::Uniform { lower, upper } => { + // TODO: Double check this branch + let n_ports = thread_rng().sample(Uniform::new(lower, upper)); + + *lower_bound = *upper_bound; + *upper_bound += n_ports as usize; + } + } + Box::new(EndpointWorker::new( + workload, + cpu, + process, + *lower_bound, + *upper_bound, + )) + } + Workload::Syscalls { .. } => Box::new(SyscallsWorker::new(workload, cpu, process)), + } +} diff --git a/src/worker/processes.rs b/src/worker/processes.rs new file mode 100644 index 0000000..ee4f0a1 --- /dev/null +++ b/src/worker/processes.rs @@ -0,0 +1,104 @@ +use std::{fmt::Display, process::Command, thread, time}; + +use core_affinity::CoreId; +use fork::{fork, Fork}; +use log::{info, warn}; +use nix::{sys::wait::waitpid, unistd::Pid}; +use rand::{distributions::Alphanumeric, thread_rng, Rng}; +use rand_distr::Exp; + +use crate::{Workload, WorkloadConfig}; + +use super::{BaseConfig, Worker, WorkerError}; + +#[derive(Debug, Clone, Copy)] +pub struct ProcessesWorker { + config: BaseConfig, + workload: WorkloadConfig, +} + +impl ProcessesWorker { + pub fn new(workload: WorkloadConfig, cpu: CoreId, process: usize) -> Self { + ProcessesWorker { + config: BaseConfig { cpu, process }, + workload, + } + } + + fn spawn_process(&self, lifetime: u64) -> Result<(), WorkerError> { + let Workload::Processes { + arrival_rate: _, + departure_rate: _, + random_process, + } = self.workload.workload else { unreachable!() }; + let BaseConfig { cpu, process } = self.config; + + if random_process { + let uniq_arg: String = rand::thread_rng() + .sample_iter(&Alphanumeric) + .take(7) + .map(char::from) + .collect(); + let _res = Command::new("stub").arg(uniq_arg).output().unwrap(); + Ok(()) + } else { + match fork() { + Ok(Fork::Parent(child)) => { + info!("Parent: child {}", child); + waitpid(Pid::from_raw(child), None).unwrap(); + Ok(()) + } + Ok(Fork::Child) => { + info!("{}-{}: Child start, {}", cpu.id, process, lifetime); + thread::sleep(time::Duration::from_millis(lifetime)); + info!("{}-{}: Child stop", cpu.id, process); + Ok(()) + } + Err(_) => { + warn!("Failed"); + Ok(()) + } + } + } + } +} + +impl Worker for ProcessesWorker { + fn run_payload(&self) -> Result<(), super::WorkerError> { + info!("{self}"); + + let Workload::Processes { + arrival_rate, + departure_rate, + random_process: _, + } = self.workload.workload else {unreachable!()}; + + loop { + let lifetime: f64 = thread_rng().sample(Exp::new(departure_rate).unwrap()); + + let worker = *self; + thread::spawn(move || worker.spawn_process((lifetime * 1000.0).round() as u64)); + + let interval: f64 = thread_rng().sample(Exp::new(arrival_rate).unwrap()); + info!( + "{}-{}: Interval {}, rounded {}, lifetime {}, rounded {}", + self.config.cpu.id, + self.config.process, + interval, + (interval * 1000.0).round() as u64, + lifetime, + (lifetime * 1000.0).round() as u64 + ); + thread::sleep(time::Duration::from_millis( + (interval * 1000.0).round() as u64 + )); + info!("{}-{}: Continue", self.config.cpu.id, self.config.process); + } + } +} + +impl Display for ProcessesWorker { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.config) + } +} diff --git a/src/worker/syscalls.rs b/src/worker/syscalls.rs new file mode 100644 index 0000000..f97564a --- /dev/null +++ b/src/worker/syscalls.rs @@ -0,0 +1,70 @@ +use std::{fmt::Display, thread, time}; + +use core_affinity::CoreId; +use log::{info, warn}; +use rand::{thread_rng, Rng}; +use rand_distr::Exp; +use syscalls::{syscall, Sysno}; + +use crate::{Workload, WorkloadConfig}; + +use super::{BaseConfig, Worker}; + +#[derive(Debug, Copy, Clone)] +pub struct SyscallsWorker { + config: BaseConfig, + workload: WorkloadConfig, +} + +impl SyscallsWorker { + pub fn new(workload: WorkloadConfig, cpu: CoreId, process: usize) -> Self { + SyscallsWorker { + config: BaseConfig { cpu, process }, + workload, + } + } + + fn do_syscall(&self) -> std::io::Result<()> { + match unsafe { syscall!(Sysno::getpid) } { + Ok(_) => Ok(()), + Err(err) => { + warn!("Syscall failed: {}", err); + Ok(()) + } + } + } +} + +impl Worker for SyscallsWorker { + fn run_payload(&self) -> Result<(), super::WorkerError> { + info!("{self}"); + + let Workload::Syscalls { arrival_rate } = self.workload.workload else {unreachable!()}; + + loop { + let worker = *self; + thread::spawn(move || { + worker.do_syscall().unwrap(); + }); + + let interval: f64 = thread_rng().sample(Exp::new(arrival_rate).unwrap()); + info!( + "{}-{}: Interval {}, rounded {}", + self.config.cpu.id, + self.config.process, + interval, + (interval * 1000.0).round() as u64 + ); + thread::sleep(time::Duration::from_millis( + (interval * 1000.0).round() as u64 + )); + info!("{}-{}: Continue", self.config.cpu.id, self.config.process); + } + } +} + +impl Display for SyscallsWorker { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.config) + } +} diff --git a/workload.toml b/workload.toml index 67d2cbe..f161df0 100644 --- a/workload.toml +++ b/workload.toml @@ -1,18 +1,7 @@ restart_interval = 10 -workload = "processes" -# endpoints workload -endpoints_distribution = "zipf" - -# endpoints zipf parameters -n_ports = 200 -zipf_exponent = 1.4 - -# endppoints uniform parameters -uniform_upper = 100 -uniform_lower = 1 - -# processes workload +[workload] +type = "processes" arrival_rate = 10.0 departure_rate = 200.0 random_process = true diff --git a/workloads/endpoints-uniform.toml b/workloads/endpoints-uniform.toml new file mode 100644 index 0000000..197ba6e --- /dev/null +++ b/workloads/endpoints-uniform.toml @@ -0,0 +1,7 @@ +restart_interval = 10 + +[workload] +type = "endpoints" +distribution = "uniform" +upper = 100 +lower = 1 diff --git a/workloads/endpoints-zipf.toml b/workloads/endpoints-zipf.toml new file mode 100644 index 0000000..8680572 --- /dev/null +++ b/workloads/endpoints-zipf.toml @@ -0,0 +1,7 @@ +restart_interval = 10 + +[workload] +type = "endpoints" +distribution = "zipf" +n_ports = 200 +exponent = 1.4 diff --git a/workloads/processes.toml b/workloads/processes.toml new file mode 100644 index 0000000..f161df0 --- /dev/null +++ b/workloads/processes.toml @@ -0,0 +1,7 @@ +restart_interval = 10 + +[workload] +type = "processes" +arrival_rate = 10.0 +departure_rate = 200.0 +random_process = true diff --git a/workloads/syscalls.toml b/workloads/syscalls.toml new file mode 100644 index 0000000..0cbdcfc --- /dev/null +++ b/workloads/syscalls.toml @@ -0,0 +1,5 @@ +restart_interval = 10 + +[workload] +type = "syscalls" +arrival_rate = 10.0