Skip to content

Commit

Permalink
handle errors from calls to get_next_packet and resulting shutdowns (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
jmwample authored Jun 7, 2023
1 parent 3a7c637 commit 3068615
Showing 1 changed file with 35 additions and 7 deletions.
42 changes: 35 additions & 7 deletions util/captool/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use std::fs::{self, File};
use std::io::stdin;
use std::io::Write;
use std::path::Path;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::{self, Duration};
Expand Down Expand Up @@ -278,7 +278,10 @@ fn read_interfaces<W>(
) where
W: Write + std::marker::Send + 'static,
{
let pool = ThreadPool::new(interfaces.matches(',').count() + 2);
let n_interfaces = interfaces.matches(',').count();
let pool = ThreadPool::new(n_interfaces + 2);

let interfaces_complete = Arc::new(AtomicU32::new(0_u32));

for sig in TERM_SIGNALS {
register(*sig, Arc::clone(&term)).unwrap();
Expand All @@ -294,13 +297,16 @@ fn read_interfaces<W>(
let h = Arc::clone(&handler);
let w = Arc::clone(&arc_writer);
let t = Arc::clone(&term);
let ic = Arc::clone(&interfaces_complete);
pool.execute(move || {
let cap = Capture::from_device(dev)
.unwrap()
.immediate_mode(true) // enable immediate mode
.open()
.unwrap();
read_packets(n as u32, cap, h, w, t);
ic.fetch_add(1, Ordering::Relaxed);

});
}
None => println!("Couldn't find interface '{iface}'"),
Expand All @@ -314,6 +320,7 @@ fn read_interfaces<W>(
pool.execute(move || {
let beginning_park = time::Instant::now();
let mut timeout_remaining = duration_limit;
let ic = Arc::clone(&interfaces_complete);
loop {
thread::park_timeout(timeout_remaining);
let elapsed = beginning_park.elapsed();
Expand All @@ -325,6 +332,9 @@ fn read_interfaces<W>(
term_timeout.store(true, Ordering::Relaxed);
break;
}
if ic.load(Ordering::Relaxed) >= n_interfaces as u32 {
break;
}

timeout_remaining = duration_limit - elapsed;
}
Expand All @@ -344,9 +354,12 @@ fn read_pcap_dir<W>(
W: Write + std::marker::Send + 'static,
{
let mut paths = fs::read_dir(pcap_dir.clone()).unwrap();
let pool = ThreadPool::new(paths.count() + 2);
let total_files = paths.count();
let pool = ThreadPool::new(total_files + 2);
signal_hook::flag::register(signal_hook::consts::SIGTERM, Arc::clone(&term)).unwrap();

let files_complete = Arc::new(AtomicU32::new(0_u32));

// refresh the path list and launch jobs
paths = fs::read_dir(pcap_dir).unwrap();
for (n, path) in paths.enumerate() {
Expand All @@ -356,9 +369,11 @@ fn read_pcap_dir<W>(
let h = Arc::clone(&handler);
let w = Arc::clone(&arc_writer);
let t = Arc::clone(&term);
let fc = Arc::clone(&files_complete);
pool.execute(move || {
let cap = Capture::from_file(p.path()).unwrap();
read_packets(n as u32, cap, h, w, t);
fc.fetch_add(1, Ordering::Relaxed);
});
}
Err(e) => println!("path error: {e}"),
Expand All @@ -369,6 +384,7 @@ fn read_pcap_dir<W>(
if let Some(duration_limit) = timeout {
debug!("duration: {duration_limit:?}");
let term_timeout = Arc::clone(&term);
let fc = Arc::clone(&files_complete);
pool.execute(move || {
let beginning_park = time::Instant::now();
let mut timeout_remaining = duration_limit;
Expand All @@ -383,6 +399,9 @@ fn read_pcap_dir<W>(
term_timeout.store(true, Ordering::Relaxed);
break;
}
if fc.load(Ordering::Relaxed) >= total_files as u32 {
break;
}

timeout_remaining = duration_limit - elapsed;
}
Expand All @@ -395,7 +414,7 @@ fn read_pcap_dir<W>(
// abstracts over live captures (Capture<Active>) and file captures
// (Capture<Offline>) using generics and the Activated trait,
fn read_packets<T, W>(
_id: u32,
id: u32,
mut capture: Capture<T>,
handler: Arc<Mutex<PacketHandler>>,
writer: Arc<Mutex<PcapNgWriter<W>>>,
Expand Down Expand Up @@ -428,7 +447,16 @@ fn read_packets<T, W>(
while !terminate.load(Ordering::Relaxed) {
let packet = match capture.next_packet() {
Ok(p) => p,
Err(_e) => continue,
Err(e) => {
match e {
pcap::Error::NoMorePackets => {}
pcap::Error::TimeoutExpired => {}
_ => {
println!("thread {id} err: {e}");
}
}
break;
}
};

if packet.is_empty() {
Expand Down Expand Up @@ -486,10 +514,10 @@ fn read_packets<T, W>(

match { writer.lock().unwrap().write_pcapng_block(out) } {
Ok(_) => continue,
Err(e) => println!("thread {_id} failed to write packet: {e}"),
Err(e) => println!("thread {id} failed to write packet: {e}"),
}
}
debug!("thread {_id} shutting down")
debug!("thread {id} shutting down")
}

fn parse_targets(input: String) -> Vec<IpNet> {
Expand Down

0 comments on commit 3068615

Please sign in to comment.