Skip to content

Commit eb084fe

Browse files
authored
Merge pull request #1 from ssrlive/mio
Mio
2 parents 508d4a7 + 4cbf40b commit eb084fe

File tree

14 files changed

+279
-197
lines changed

14 files changed

+279
-197
lines changed

.github/workflows/rust.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ jobs:
1313
host_os:
1414
- ubuntu-latest
1515
- macos-latest
16-
# - windows-latest
16+
- windows-latest
1717

1818
runs-on: ${{ matrix.host_os }}
1919

Cargo.toml

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,17 +11,19 @@ categories = ["network-utilities"]
1111
readme = "README.md"
1212

1313
[dependencies]
14+
cfg-if = "1.0"
1415
chrono = "0.4"
1516
clap = { version = "4.4", features = ["derive", "wrap_help"] }
1617
core_affinity = "0.8"
17-
ctrlc2 = "3.5"
18+
ctrlc2 = { version = "3.5", features = ["termination"] }
1819
env_logger = "0.10"
1920
log = { version = "0.4", features = ["std"] }
20-
mio = "0.6"
21-
nix = "0.20"
21+
mio = { version = "0.8", features = ["log", "os-poll", "net"] }
22+
nix = { version = "0.27", features = ["net"] }
2223
serde = { version = "1.0", features = ["derive"] }
2324
serde_json = "1.0"
2425
simple-error = "0.3"
26+
socket2 = { version = "0.5", features = ["all"] }
2527
uuid = { version = "1.6", features = ["v4"] }
2628

2729
#configuration for cargo-deb

src/args.rs

Lines changed: 49 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -33,49 +33,57 @@ pub struct Args {
3333

3434
/// enable IPv6 on the server (on most hosts, this will allow both IPv4 and IPv6,
3535
/// but it might limit to just IPv6 on some)
36-
#[arg(short = '6', long)]
36+
#[arg(short = '6', long, conflicts_with = "client")]
3737
pub version6: bool,
3838

3939
/// limit the number of concurrent clients that can be processed by a server;
4040
/// any over this count will be immediately disconnected
41-
#[arg(long, value_name = "number", default_value = "0")]
41+
#[arg(long, value_name = "number", default_value = "0", conflicts_with = "client")]
4242
pub client_limit: usize,
4343

4444
/// run in client mode; value is the server's address
4545
#[arg(short, long, value_name = "host", conflicts_with = "server")]
4646
pub client: Option<std::net::IpAddr>,
4747

4848
/// run in reverse-mode (server sends, client receives)
49-
#[arg(short = 'R', long)]
49+
#[arg(short = 'R', long, conflicts_with = "server")]
5050
pub reverse: bool,
5151

5252
/// the format in which to deplay information (json, megabit/sec, megabyte/sec)
53-
#[arg(short, long, value_enum, value_name = "format", default_value = "megabit")]
53+
#[arg(
54+
short,
55+
long,
56+
value_enum,
57+
value_name = "format",
58+
default_value = "megabit",
59+
conflicts_with = "server"
60+
)]
5461
pub format: Format,
5562

5663
/// use UDP rather than TCP
57-
#[arg(short, long)]
64+
#[arg(short, long, conflicts_with = "server")]
5865
pub udp: bool,
5966

6067
/// target bandwidth in bytes/sec; this value is applied to each stream,
6168
/// with a default target of 1 megabit/second for all protocols (note: megabit, not mebibit);
6269
/// the suffixes kKmMgG can also be used for xbit and xbyte, respectively
63-
#[arg(short, long, default_value = "125000", value_name = "bytes/sec")]
70+
#[arg(short, long, default_value = "125000", value_name = "bytes/sec", conflicts_with = "server")]
6471
pub bandwidth: String,
6572

6673
/// the time in seconds for which to transmit
67-
#[arg(short, long, default_value = "10.0", value_name = "seconds")]
74+
#[arg(short, long, default_value = "10.0", value_name = "seconds", conflicts_with = "server")]
6875
pub time: f64,
6976

7077
/// the interval at which to send batches of data, in seconds, between [0.0 and 1.0);
7178
/// this is used to evenly spread packets out over time
72-
#[arg(long, default_value = "0.05", value_name = "seconds")]
79+
#[arg(long, default_value = "0.05", value_name = "seconds", conflicts_with = "server")]
7380
pub send_interval: f64,
7481

7582
/// length of the buffer to exchange; for TCP, this defaults to 32 kibibytes; for UDP, it's 1024 bytes
7683
#[arg(
7784
short,
7885
long,
86+
conflicts_with = "server",
7987
default_value = "32768",
8088
default_value_if("udp", "true", Some("1024")),
8189
value_name = "bytes"
@@ -85,27 +93,27 @@ pub struct Args {
8593
/// send buffer, in bytes (only supported on some platforms;
8694
/// if set too small, a 'resource unavailable' error may occur;
8795
/// affects TCP window-size)
88-
#[arg(long, default_value = "0", value_name = "bytes")]
96+
#[arg(long, default_value = "0", value_name = "bytes", conflicts_with = "server")]
8997
pub send_buffer: usize,
9098

9199
/// receive buffer, in bytes (only supported on some platforms;
92100
/// if set too small, a 'resource unavailable' error may occur; affects TCP window-size)
93-
#[arg(long, default_value = "0", value_name = "bytes")]
101+
#[arg(long, default_value = "0", value_name = "bytes", conflicts_with = "server")]
94102
pub receive_buffer: usize,
95103

96104
/// the number of parallel data-streams to use
97-
#[arg(short = 'P', long, value_name = "number", default_value = "1")]
105+
#[arg(short = 'P', long, value_name = "number", default_value = "1", conflicts_with = "server")]
98106
pub parallel: usize,
99107

100108
/// omit a number of seconds from the start of calculations,
101109
/// primarily to avoid including TCP ramp-up in averages;
102110
/// using this option may result in disagreement between bytes sent and received,
103111
/// since data can be in-flight across time-boundaries
104-
#[arg(short, long, default_value = "0", value_name = "seconds")]
112+
#[arg(short, long, default_value = "0", value_name = "seconds", conflicts_with = "server")]
105113
pub omit: usize,
106114

107115
/// use no-delay mode for TCP tests, disabling Nagle's Algorithm
108-
#[arg(short = 'N', long)]
116+
#[arg(short = 'N', long, conflicts_with = "server")]
109117
pub no_delay: bool,
110118

111119
/// an optional pool of IPv4 TCP ports over which data will be accepted;
@@ -127,6 +135,10 @@ pub struct Args {
127135
/// if omitted, any OS-assignable port is used; format: 1-10,19,21
128136
#[arg(long, value_name = "ports", default_value = "")]
129137
pub udp6_port_pool: String,
138+
139+
/// Verbosity level
140+
#[arg(short, long, value_name = "level", value_enum, default_value = "info")]
141+
pub verbosity: ArgVerbosity,
130142
}
131143

132144
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, clap::ValueEnum, Default)]
@@ -146,3 +158,27 @@ impl std::fmt::Display for Format {
146158
}
147159
}
148160
}
161+
162+
#[derive(Debug, Copy, Clone, Default, PartialEq, Eq, PartialOrd, Ord, clap::ValueEnum)]
163+
pub enum ArgVerbosity {
164+
Off,
165+
Error,
166+
Warn,
167+
#[default]
168+
Info,
169+
Debug,
170+
Trace,
171+
}
172+
173+
impl std::fmt::Display for ArgVerbosity {
174+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
175+
match self {
176+
ArgVerbosity::Off => write!(f, "off"),
177+
ArgVerbosity::Error => write!(f, "error"),
178+
ArgVerbosity::Warn => write!(f, "warn"),
179+
ArgVerbosity::Info => write!(f, "info"),
180+
ArgVerbosity::Debug => write!(f, "debug"),
181+
ArgVerbosity::Trace => write!(f, "trace"),
182+
}
183+
}
184+
}

src/client.rs

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,10 @@ use crate::{
2727
results::{IntervalResultBox, IntervalResultKind, TcpTestResults, TestResults, UdpTestResults},
2828
},
2929
stream::{tcp, udp, TestStream},
30+
BoxResult,
3031
};
3132
use mio::net::TcpStream;
3233
use std::{
33-
error::Error,
3434
net::{IpAddr, Shutdown, ToSocketAddrs},
3535
sync::{
3636
atomic::{AtomicBool, Ordering},
@@ -41,8 +41,6 @@ use std::{
4141
time::{Duration, SystemTime, UNIX_EPOCH},
4242
};
4343

44-
type BoxResult<T> = Result<T, Box<dyn Error>>;
45-
4644
/// when false, the system is shutting down
4745
static ALIVE: AtomicBool = AtomicBool::new(true);
4846

@@ -60,23 +58,30 @@ fn connect_to_server(address: &str, port: &u16) -> BoxResult<TcpStream> {
6058
if server_addr.is_none() {
6159
return Err(Box::new(simple_error::simple_error!("unable to resolve {}", address)));
6260
}
63-
let raw_stream = match std::net::TcpStream::connect_timeout(&server_addr.unwrap(), CONNECT_TIMEOUT) {
61+
let stream = match std::net::TcpStream::connect_timeout(&server_addr.unwrap(), CONNECT_TIMEOUT) {
6462
Ok(s) => s,
6563
Err(e) => return Err(Box::new(simple_error::simple_error!("unable to connect: {}", e))),
6664
};
67-
let stream = match TcpStream::from_stream(raw_stream) {
68-
Ok(s) => s,
69-
Err(e) => {
70-
return Err(Box::new(simple_error::simple_error!(
71-
"unable to prepare TCP control-channel: {}",
72-
e
73-
)))
65+
66+
let stream = {
67+
let socket: socket2::Socket = socket2::Socket::from(stream);
68+
let keepalive = socket2::TcpKeepalive::new()
69+
.with_time(KEEPALIVE_DURATION)
70+
.with_interval(KEEPALIVE_DURATION);
71+
cfg_if::cfg_if! {
72+
if #[cfg(unix)] {
73+
let keepalive = keepalive.with_retries(4);
74+
}
7475
}
76+
socket.set_tcp_keepalive(&keepalive)?;
77+
socket.set_nodelay(true)?;
78+
79+
let stream: std::net::TcpStream = socket.into();
80+
81+
TcpStream::from_std(stream)
7582
};
76-
log::info!("connected to server");
7783

78-
stream.set_nodelay(true).expect("cannot disable Nagle's algorithm");
79-
stream.set_keepalive(Some(KEEPALIVE_DURATION)).expect("unable to set TCP keepalive");
84+
log::info!("connected to server");
8085

8186
Ok(stream)
8287
}

src/lib.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
pub mod args;
2+
pub mod client;
3+
pub(crate) mod protocol;
4+
pub mod server;
5+
pub(crate) mod stream;
6+
pub(crate) mod utils;
7+
8+
pub type BoxResult<T> = Result<T, Box<dyn std::error::Error + Send + Sync + 'static>>;
9+
10+
/// a global token generator
11+
pub(crate) fn get_global_token() -> mio::Token {
12+
mio::Token(TOKEN_SEED.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1)
13+
}
14+
static TOKEN_SEED: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(0);

src/main.rs

Lines changed: 11 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -18,42 +18,34 @@
1818
* along with rperf. If not, see <https://www.gnu.org/licenses/>.
1919
*/
2020

21-
mod args;
22-
mod client;
23-
mod protocol;
24-
mod server;
25-
mod stream;
26-
mod utils;
21+
use rperf::{args, client, server, BoxResult};
2722

28-
fn main() {
23+
fn main() -> BoxResult<()> {
2924
use clap::Parser;
3025
let args = args::Args::parse();
3126

32-
let mut env = env_logger::Env::default().filter_or("RUST_LOG", "info");
27+
let default = args.verbosity.to_string();
28+
let mut env = env_logger::Env::default().filter_or("RUST_LOG", &default);
3329
if args.debug {
3430
env = env.filter_or("RUST_LOG", "debug");
3531
}
3632
env_logger::init_from_env(env);
3733

3834
if args.server {
3935
log::debug!("registering SIGINT handler...");
40-
ctrlc2::set_handler(move || {
36+
let exiting = ctrlc2::set_handler(move || {
4137
if server::kill() {
4238
log::warn!("shutdown requested; please allow a moment for any in-progress tests to stop");
4339
} else {
4440
log::warn!("forcing shutdown immediately");
4541
std::process::exit(3);
4642
}
4743
true
48-
})
49-
.expect("unable to set SIGINT handler");
44+
})?;
5045

5146
log::debug!("beginning normal operation...");
52-
let service = server::serve(&args);
53-
if service.is_err() {
54-
log::error!("unable to run server: {}", service.unwrap_err());
55-
std::process::exit(4);
56-
}
47+
server::serve(&args)?;
48+
exiting.join().expect("unable to join SIGINT handler thread");
5749
} else if args.client.is_some() {
5850
log::debug!("registering SIGINT handler...");
5951
ctrlc2::set_handler(move || {
@@ -64,19 +56,14 @@ fn main() {
6456
std::process::exit(3);
6557
}
6658
true
67-
})
68-
.expect("unable to set SIGINT handler");
59+
})?;
6960

7061
log::debug!("connecting to server...");
71-
let execution = client::execute(&args);
72-
if execution.is_err() {
73-
log::error!("unable to run client: {}", execution.unwrap_err());
74-
std::process::exit(4);
75-
}
62+
client::execute(&args)?;
7663
} else {
7764
use clap::CommandFactory;
7865
let mut cmd = args::Args::command();
7966
cmd.print_help().unwrap();
80-
std::process::exit(2);
8167
}
68+
Ok(())
8269
}

0 commit comments

Comments
 (0)