Skip to content

Commit

Permalink
rust: multiplex commands over iowatcherng executable
Browse files Browse the repository at this point in the history
  • Loading branch information
Guillermo Cifuentes committed Dec 12, 2022
1 parent 7ab0acb commit 8d7cc97
Showing 1 changed file with 70 additions and 31 deletions.
101 changes: 70 additions & 31 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use clap::{command, Parser};
use metrics::{increment_counter};
use itertools::{izip, Itertools};
use metrics::increment_counter;
use metrics_exporter_prometheus::PrometheusBuilder;
use std::io::prelude::*;
use std::ops::Deref;
use std::process::{Command, Stdio};
use std::{
error::Error,
fs::File,
Expand All @@ -26,45 +30,40 @@ static STDIN_PATH: &str = "-";
#[derive(Parser, Debug)]
#[command(version, about, long_about = None)]
struct Args {
/// Input file path, - for stdin
#[arg(short, long, default_value_t = STDIN_PATH.to_string(),)]
input: String,
/// command
#[command(subcommand)]
command: Subcommand,
}

impl Args {
pub fn is_stdin(&self) -> bool {
self.input.eq(&STDIN_PATH)
}
#[derive(clap::Subcommand, Debug)]
enum Subcommand {
Fork {
/// list of devices to blktrace
#[arg(short, long)]
device: Vec<String>,
},
Ingest {
/// Input file path, - for stdin
#[arg(short, long, default_value_t = STDIN_PATH.to_string(),)]
input: String,
},
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
const FRAGMENT_SIZE: usize = size_of::<blktrace::blk_io_trace>();
let args = Args::parse();

PrometheusBuilder::new()
.install()
.expect("failed to install Prometheus recoder/exporter");
const FRAGMENT_SIZE: usize = size_of::<blktrace::blk_io_trace>();

let mut input: File = if args.is_stdin() {
println!("STDIN!");
unsafe { File::from_raw_fd(io::stdin().as_raw_fd()) }
} else {
File::open(args.input)?
};
async fn process_input(input: &mut dyn Read) -> Result<(), Box<dyn Error>> {
let mut buffer: [u8; FRAGMENT_SIZE] = [0; FRAGMENT_SIZE];
while let Ok(()) = input.read_exact(&mut buffer) {
let trace: blktrace::blk_io_trace = unsafe { std::mem::transmute(buffer) };
let mut str_vec = Vec::<u8>::new();
io::copy(&mut input.by_ref().take(trace.pdu_len.into()), &mut str_vec)?;
//let str: String = String::from(str::from_utf8(&str_vec)?);
let mut str_vec = Vec::<u8>::with_capacity(trace.pdu_len.into());
io::copy(&mut input.take(trace.pdu_len.into()), &mut str_vec)?;
//let str: String = String::from(str::f+rom_utf8(&str_vec)?);
//println!("str: {}", str);
increment_counter!("iowatcherng-exporter.packets_read");
if (trace.magic & 0xffffff00) != blktrace::BLK_IO_TRACE_MAGIC {
println!("Bad pkt magic");
} else {
if (trace.action & !blktrace::blktrace_notify___BLK_TN_CGROUP)
== blktrace::blktrace_notify___BLK_TN_MESSAGE
if (trace.action & !blktrace::blktrace_notify___BLK_TN_CGROUP) == blktrace::blktrace_notify___BLK_TN_MESSAGE
{
println!("NOTIFY");
match trace.action & !blktrace::blktrace_notify___BLK_TN_CGROUP {
Expand All @@ -75,8 +74,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
}
} else if (trace.action & blktrace::blk_tc_act(blktrace::blktrace_cat_BLK_TC_PC)) == 0 {
println!("PC");
let _w =
(trace.action & blktrace::blk_tc_act(blktrace::blktrace_cat_BLK_TC_WRITE)) != 0;
let _w = (trace.action & blktrace::blk_tc_act(blktrace::blktrace_cat_BLK_TC_WRITE)) != 0;
let act = (trace.action & 0xffff) & !blktrace::blktrace_act___BLK_TA_CGROUP;
match act {
blktrace::blktrace_act___BLK_TA_QUEUE => println!("TQ"),
Expand All @@ -90,8 +88,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
}
} else {
println!("CGROUP");
let _w =
(trace.action & blktrace::blk_tc_act(blktrace::blktrace_cat_BLK_TC_WRITE)) != 0;
let _w = (trace.action & blktrace::blk_tc_act(blktrace::blktrace_cat_BLK_TC_WRITE)) != 0;
let act = (trace.action & 0xffff) & !blktrace::blktrace_act___BLK_TA_CGROUP;
match act {
blktrace::blktrace_act___BLK_TA_QUEUE => println!("TQ"),
Expand All @@ -109,3 +106,45 @@ async fn main() -> Result<(), Box<dyn Error>> {
}
Ok(())
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let args = Args::parse();

PrometheusBuilder::new()
.install()
.expect("failed to install Prometheus recoder/exporter");

match &args.command {
Subcommand::Fork { device } => {
let mut arg_stack = Vec::new();
for dev in device {
arg_stack.push("-d".to_string());
arg_stack.push(dev.to_string());
}
arg_stack.push("-o".to_string());
arg_stack.push("-".to_string());
match Command::new("blktrace")
.args(arg_stack)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.spawn()
{
Err(why) => panic!("couldn't spawn blktrace: {}", why),
Ok(mut child) => {
let mut stdout = child.stdout.expect("stdout is opened at this time");
process_input(&mut stdout).await?;
},
};
},
Subcommand::Ingest { input } => {
let mut input = if input.eq(&STDIN_PATH) {
unsafe { File::from_raw_fd(io::stdin().as_raw_fd()) }
} else {
File::open(input)?
};
process_input(&mut input).await?;
},
}
Ok(())
}

0 comments on commit 8d7cc97

Please sign in to comment.