Skip to content

Commit bc6dbb7

Browse files
committed
refine code
1 parent e988f5d commit bc6dbb7

File tree

7 files changed

+100
-100
lines changed

7 files changed

+100
-100
lines changed

src/client.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,8 +100,8 @@ pub fn execute(args: &args::Args) -> BoxResult<()> {
100100
let mut complete = false;
101101

102102
//config-parsing and pre-connection setup
103-
let mut tcp_port_pool = tcp::receiver::TcpPortPool::new(args.tcp_port_pool.to_string(), args.tcp6_port_pool.to_string());
104-
let mut udp_port_pool = udp::receiver::UdpPortPool::new(args.udp_port_pool.to_string(), args.udp6_port_pool.to_string());
103+
let mut tcp_port_pool = tcp::receiver::TcpPortPool::new(&args.tcp_port_pool, &args.tcp6_port_pool);
104+
let mut udp_port_pool = udp::receiver::UdpPortPool::new(&args.udp_port_pool, &args.udp6_port_pool);
105105

106106
let cpu_affinity_manager = Arc::new(Mutex::new(crate::utils::cpu_affinity::CpuAffinityManager::new(&args.affinity)?));
107107

src/protocol/communication.rs

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,10 @@ pub fn send(stream: &mut TcpStream, message: &serde_json::Value) -> BoxResult<()
4242
let serialised_message = serde_json::to_vec(message)?;
4343

4444
log::debug!(
45-
"sending message of length {}, {:?}, to {}...",
45+
"sending message to {}, length {}, {:?}...",
46+
stream.peer_addr()?,
4647
serialised_message.len(),
4748
message,
48-
stream.peer_addr()?
4949
);
5050
let mut output_buffer = vec![0_u8; serialised_message.len() + 2];
5151
output_buffer[..2].copy_from_slice(&(serialised_message.len() as u16).to_be_bytes());
@@ -71,21 +71,19 @@ pub fn send(stream: &mut TcpStream, message: &serde_json::Value) -> BoxResult<()
7171
}
7272
}
7373
}
74-
Err(Box::new(simple_error::simple_error!(
75-
"timed out while attempting to send status-message to {}",
76-
stream.peer_addr()?
77-
)))
74+
let err = simple_error::simple_error!("timed out while attempting to send status-message to {}", stream.peer_addr()?);
75+
Err(Box::new(err))
7876
}
7977

8078
/// receives the length-count of a pending message over a client-server communications stream
81-
fn receive_length(stream: &mut TcpStream, alive_check: fn() -> bool, results_handler: &mut dyn FnMut() -> BoxResult<()>) -> BoxResult<u16> {
79+
fn receive_length(stream: &mut TcpStream, alive_check: fn() -> bool, handler: &mut dyn FnMut() -> BoxResult<()>) -> BoxResult<u16> {
8280
stream.set_read_timeout(Some(POLL_TIMEOUT)).expect("unable to set TCP read-timeout");
8381

8482
let mut length_bytes_read = 0;
8583
let mut length_spec: [u8; 2] = [0; 2];
8684
while alive_check() {
8785
//waiting to find out how long the next message is
88-
results_handler()?; //send any outstanding results between cycles
86+
handler()?; //send any outstanding results between cycles
8987

9088
let size = match stream.read(&mut length_spec[length_bytes_read..]) {
9189
Ok(size) => size,
@@ -156,7 +154,7 @@ fn receive_payload(
156154
if bytes_read == length as usize {
157155
match serde_json::from_slice(&buffer) {
158156
Ok(v) => {
159-
log::debug!("received {:?} from {}", v, stream.peer_addr()?);
157+
log::debug!("received message from {}: {:?}", stream.peer_addr()?, v);
160158
return Ok(v);
161159
}
162160
Err(e) => {

src/protocol/results.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ pub trait IntervalResult {
5858
fn to_string(&self, bit: bool) -> String;
5959
}
6060

61-
pub type IntervalResultBox = Box<dyn IntervalResult + Sync + Send>;
61+
pub type IntervalResultBox = Box<dyn IntervalResult + Sync + Send + 'static>;
6262

6363
pub struct ClientDoneResult {
6464
pub stream_idx: u8,
@@ -477,7 +477,7 @@ impl IntervalResult for UdpSendResult {
477477
}
478478
}
479479

480-
pub fn interval_result_from_json(value: serde_json::Value) -> BoxResult<Box<dyn IntervalResult>> {
480+
pub fn interval_result_from_json(value: serde_json::Value) -> BoxResult<IntervalResultBox> {
481481
match value.get("family") {
482482
Some(f) => match f.as_str() {
483483
Some(family) => match family {

src/server.rs

Lines changed: 49 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,7 @@
2121
use std::io;
2222
use std::net::{Shutdown, SocketAddr};
2323
use std::sync::atomic::{AtomicBool, AtomicU16, Ordering};
24-
use std::sync::mpsc::channel;
25-
use std::sync::{Arc, Mutex};
24+
use std::sync::{mpsc, Arc, Mutex};
2625
use std::thread;
2726
use std::time::Duration;
2827

@@ -31,7 +30,7 @@ use std::net::{TcpListener, TcpStream};
3130
use crate::args::Args;
3231
use crate::protocol::communication::{receive, send};
3332
use crate::protocol::messaging::{prepare_connect, prepare_connect_ready};
34-
use crate::protocol::results::ServerDoneResult;
33+
use crate::protocol::results::{IntervalResultBox, ServerDoneResult};
3534
use crate::stream::{tcp, udp, TestStream};
3635
use crate::BoxResult;
3736

@@ -56,10 +55,7 @@ fn handle_client(
5655
let mut parallel_streams: Vec<Arc<Mutex<(dyn TestStream + Sync + Send)>>> = Vec::new();
5756
let mut parallel_streams_joinhandles = Vec::new();
5857

59-
let (results_tx, results_rx): (
60-
std::sync::mpsc::Sender<crate::protocol::results::IntervalResultBox>,
61-
std::sync::mpsc::Receiver<crate::protocol::results::IntervalResultBox>,
62-
) = channel();
58+
let (results_tx, results_rx) = mpsc::channel::<IntervalResultBox>();
6359

6460
//a closure used to pass results from stream-handlers to the client-communication stream
6561
let mut forwarding_send_stream = stream.try_clone()?;
@@ -295,12 +291,12 @@ impl Drop for ClientThreadMonitor {
295291
pub fn serve(args: &Args) -> BoxResult<()> {
296292
//config-parsing and pre-connection setup
297293
let tcp_port_pool = Arc::new(Mutex::new(tcp::receiver::TcpPortPool::new(
298-
args.tcp_port_pool.to_string(),
299-
args.tcp6_port_pool.to_string(),
294+
&args.tcp_port_pool,
295+
&args.tcp6_port_pool,
300296
)));
301297
let udp_port_pool = Arc::new(Mutex::new(udp::receiver::UdpPortPool::new(
302-
args.udp_port_pool.to_string(),
303-
args.udp6_port_pool.to_string(),
298+
&args.udp_port_pool,
299+
&args.udp6_port_pool,
304300
)));
305301

306302
let cpu_affinity_manager = Arc::new(Mutex::new(crate::utils::cpu_affinity::CpuAffinityManager::new(&args.affinity)?));
@@ -317,53 +313,54 @@ pub fn serve(args: &Args) -> BoxResult<()> {
317313
log::info!("server listening on {}", listener.local_addr()?);
318314

319315
while is_alive() {
320-
match listener.accept() {
321-
Ok((mut stream, address)) => {
322-
log::info!("connection from {}", address);
323-
324-
stream.set_nodelay(true).expect("cannot disable Nagle's algorithm");
325-
326-
#[cfg(unix)]
327-
{
328-
use crate::protocol::communication::KEEPALIVE_DURATION;
329-
let keepalive_parameters = socket2::TcpKeepalive::new().with_time(KEEPALIVE_DURATION);
330-
let raw_socket = socket2::SockRef::from(&stream);
331-
raw_socket.set_tcp_keepalive(&keepalive_parameters)?;
332-
}
333-
334-
let client_count = CLIENTS.fetch_add(1, Ordering::Relaxed) + 1;
335-
if client_limit > 0 && client_count > client_limit {
336-
log::warn!("client-limit ({}) reached; disconnecting {}...", client_limit, address.to_string());
337-
stream.shutdown(Shutdown::Both).unwrap_or_default();
338-
CLIENTS.fetch_sub(1, Ordering::Relaxed);
339-
} else {
340-
let c_cam = cpu_affinity_manager.clone();
341-
let c_tcp_port_pool = tcp_port_pool.clone();
342-
let c_udp_port_pool = udp_port_pool.clone();
343-
let thread_builder = thread::Builder::new().name(address.to_string());
344-
thread_builder.spawn(move || {
345-
//ensure the client is accounted-for even if the handler panics
346-
let _client_thread_monitor = ClientThreadMonitor {
347-
client_address: address.to_string(),
348-
};
349-
350-
match handle_client(&mut stream, c_cam, c_tcp_port_pool, c_udp_port_pool) {
351-
Ok(_) => (),
352-
Err(e) => log::error!("error in client-handler: {}", e),
353-
}
354-
355-
//in the event of panic, this will happen when the stream is dropped
356-
stream.shutdown(Shutdown::Both).unwrap_or_default();
357-
})?;
358-
}
359-
}
316+
let (mut stream, address) = match listener.accept() {
317+
Ok((stream, address)) => (stream, address),
360318
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
361-
//no pending clients
319+
// no pending clients
362320
thread::sleep(POLL_TIMEOUT);
321+
continue;
363322
}
364323
Err(e) => {
365324
return Err(Box::new(e));
366325
}
326+
};
327+
328+
log::info!("connection from {}", address);
329+
330+
stream.set_nodelay(true).expect("cannot disable Nagle's algorithm");
331+
332+
#[cfg(unix)]
333+
{
334+
use crate::protocol::communication::KEEPALIVE_DURATION;
335+
let keepalive_parameters = socket2::TcpKeepalive::new().with_time(KEEPALIVE_DURATION);
336+
let raw_socket = socket2::SockRef::from(&stream);
337+
raw_socket.set_tcp_keepalive(&keepalive_parameters)?;
338+
}
339+
340+
let client_count = CLIENTS.fetch_add(1, Ordering::Relaxed) + 1;
341+
if client_limit > 0 && client_count > client_limit {
342+
log::warn!("client-limit ({}) reached; disconnecting {}...", client_limit, address.to_string());
343+
stream.shutdown(Shutdown::Both).unwrap_or_default();
344+
CLIENTS.fetch_sub(1, Ordering::Relaxed);
345+
} else {
346+
let c_cam = cpu_affinity_manager.clone();
347+
let c_tcp_port_pool = tcp_port_pool.clone();
348+
let c_udp_port_pool = udp_port_pool.clone();
349+
let thread_builder = thread::Builder::new().name(address.to_string());
350+
thread_builder.spawn(move || {
351+
// ensure the client is accounted-for even if the handler panics
352+
let _client_thread_monitor = ClientThreadMonitor {
353+
client_address: address.to_string(),
354+
};
355+
356+
match handle_client(&mut stream, c_cam, c_tcp_port_pool, c_udp_port_pool) {
357+
Ok(_) => (),
358+
Err(e) => log::error!("error in client-handler: {}", e),
359+
}
360+
361+
//in the event of panic, this will happen when the stream is dropped
362+
stream.shutdown(Shutdown::Both).unwrap_or_default();
363+
})?;
367364
}
368365
}
369366

src/stream/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
pub mod tcp;
2222
pub mod udp;
2323

24-
use crate::BoxResult;
24+
use crate::{protocol::results::IntervalResultBox, BoxResult};
2525

2626
pub const INTERVAL: std::time::Duration = std::time::Duration::from_secs(1);
2727

@@ -30,7 +30,7 @@ pub const INTERVAL: std::time::Duration = std::time::Duration::from_secs(1);
3030
/// INTERVAL while gathering data.
3131
pub trait TestStream {
3232
/// gather data; returns None when the test is over
33-
fn run_interval(&mut self) -> Option<BoxResult<crate::protocol::results::IntervalResultBox>>;
33+
fn run_interval(&mut self) -> Option<BoxResult<IntervalResultBox>>;
3434
/// return the port associated with the test-stream; this may vary over the test's lifetime
3535
fn get_port(&self) -> BoxResult<u16>;
3636
/// returns the index of the test, used to match client and server data
@@ -39,7 +39,7 @@ pub trait TestStream {
3939
fn stop(&mut self);
4040
}
4141

42-
fn parse_port_spec(port_spec: String) -> Vec<u16> {
42+
fn parse_port_spec(port_spec: &str) -> Vec<u16> {
4343
let mut ports = Vec::<u16>::new();
4444
if !port_spec.is_empty() {
4545
for range in port_spec.split(',') {

src/stream/tcp.rs

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,10 @@
1818
* along with rperf. If not, see <https://www.gnu.org/licenses/>.
1919
*/
2020

21-
use crate::protocol::results::{get_unix_timestamp, IntervalResult, TcpReceiveResult, TcpSendResult};
21+
use crate::protocol::results::{get_unix_timestamp, TcpReceiveResult, TcpSendResult};
22+
use crate::stream::{parse_port_spec, TestStream, INTERVAL};
2223
use crate::BoxResult;
2324

24-
use super::{parse_port_spec, TestStream, INTERVAL};
25-
2625
pub const TEST_HEADER_SIZE: usize = 16;
2726

2827
#[cfg(unix)]
@@ -38,7 +37,7 @@ pub struct TcpTestDefinition {
3837
pub length: usize,
3938
}
4039
impl TcpTestDefinition {
41-
pub fn new(details: &serde_json::Value) -> super::BoxResult<TcpTestDefinition> {
40+
pub fn new(details: &serde_json::Value) -> BoxResult<TcpTestDefinition> {
4241
let mut test_id_bytes = [0_u8; 16];
4342
for (i, v) in details
4443
.get("test_id")
@@ -76,13 +75,14 @@ impl TcpTestDefinition {
7675
}
7776

7877
pub mod receiver {
78+
use mio::net::{TcpListener, TcpStream};
7979
use std::io::Read;
8080
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
8181
use std::sync::atomic::{AtomicBool, Ordering::Relaxed};
8282
use std::sync::Mutex;
8383
use std::time::{Duration, Instant};
8484

85-
use mio::net::{TcpListener, TcpStream};
85+
use crate::{protocol::results::IntervalResultBox, BoxResult};
8686

8787
const POLL_TIMEOUT: Duration = Duration::from_millis(250);
8888
const CONNECTION_TIMEOUT: Duration = Duration::from_secs(1);
@@ -97,7 +97,7 @@ pub mod receiver {
9797
lock_ip6: Mutex<u8>,
9898
}
9999
impl TcpPortPool {
100-
pub fn new(port_spec: String, port_spec6: String) -> TcpPortPool {
100+
pub fn new(port_spec: &str, port_spec6: &str) -> TcpPortPool {
101101
let ports = super::parse_port_spec(port_spec);
102102
if !ports.is_empty() {
103103
log::debug!("configured IPv4 TCP port pool: {:?}", ports);
@@ -123,7 +123,7 @@ pub mod receiver {
123123
}
124124
}
125125

126-
pub fn bind(&mut self, peer_ip: &IpAddr) -> super::BoxResult<TcpListener> {
126+
pub fn bind(&mut self, peer_ip: &IpAddr) -> BoxResult<TcpListener> {
127127
match peer_ip {
128128
IpAddr::V6(_) => {
129129
if self.ports_ip6.is_empty() {
@@ -193,7 +193,6 @@ pub mod receiver {
193193
}
194194
}
195195

196-
#[allow(dead_code)]
197196
pub struct TcpReceiver {
198197
active: AtomicBool,
199198
test_definition: super::TcpTestDefinition,
@@ -213,7 +212,7 @@ pub mod receiver {
213212
stream_idx: &u8,
214213
port_pool: &mut TcpPortPool,
215214
peer_ip: &IpAddr,
216-
) -> super::BoxResult<TcpReceiver> {
215+
) -> BoxResult<TcpReceiver> {
217216
log::debug!("binding TCP listener for stream {}...", stream_idx);
218217
let mut listener: TcpListener = port_pool.bind(peer_ip).expect("failed to bind TCP socket");
219218
log::debug!("bound TCP listener for stream {}: {}", stream_idx, listener.local_addr()?);
@@ -238,7 +237,7 @@ pub mod receiver {
238237
})
239238
}
240239

241-
fn process_connection(&mut self) -> super::BoxResult<(TcpStream, u64, f32)> {
240+
fn process_connection(&mut self) -> BoxResult<(TcpStream, u64, f32)> {
242241
log::debug!("preparing to receive TCP stream {} connection...", self.stream_idx);
243242

244243
let listener = self.listener.as_mut().unwrap();
@@ -357,7 +356,7 @@ pub mod receiver {
357356
}
358357

359358
impl super::TestStream for TcpReceiver {
360-
fn run_interval(&mut self) -> Option<super::BoxResult<Box<dyn super::IntervalResult + Sync + Send>>> {
359+
fn run_interval(&mut self) -> Option<BoxResult<IntervalResultBox>> {
361360
let mut bytes_received: u64 = 0;
362361

363362
let mut additional_time_elapsed: f32 = 0.0;
@@ -450,7 +449,7 @@ pub mod receiver {
450449
}
451450
}
452451

453-
fn get_port(&self) -> super::BoxResult<u16> {
452+
fn get_port(&self) -> BoxResult<u16> {
454453
match &self.listener {
455454
Some(listener) => Ok(listener.local_addr()?.port()),
456455
None => match &self.stream {
@@ -476,6 +475,8 @@ pub mod sender {
476475
use std::thread::sleep;
477476
use std::time::{Duration, Instant};
478477

478+
use crate::{protocol::results::IntervalResultBox, BoxResult};
479+
479480
const CONNECT_TIMEOUT: Duration = Duration::from_secs(2);
480481
const WRITE_TIMEOUT: Duration = Duration::from_millis(50);
481482
const BUFFER_FULL_TIMEOUT: Duration = Duration::from_millis(1);
@@ -509,7 +510,7 @@ pub mod sender {
509510
send_interval: &f32,
510511
send_buffer: &usize,
511512
no_delay: &bool,
512-
) -> super::BoxResult<TcpSender> {
513+
) -> BoxResult<TcpSender> {
513514
let mut staged_buffer = vec![0_u8; test_definition.length];
514515
for (i, staged_buffer_i) in staged_buffer.iter_mut().enumerate().skip(super::TEST_HEADER_SIZE) {
515516
//fill the packet with a fixed sequence
@@ -536,8 +537,8 @@ pub mod sender {
536537
})
537538
}
538539

539-
fn process_connection(&mut self) -> super::BoxResult<TcpStream> {
540-
log::debug!("preparing to connect TCP stream {}...", self.stream_idx);
540+
fn process_connection(&mut self) -> BoxResult<TcpStream> {
541+
log::debug!("preparing to connect TCP stream {} to {} ...", self.stream_idx, self.socket_addr);
541542

542543
let stream = match TcpStream::connect_timeout(&self.socket_addr, CONNECT_TIMEOUT) {
543544
Ok(s) => s,
@@ -575,7 +576,7 @@ pub mod sender {
575576
}
576577
}
577578
impl super::TestStream for TcpSender {
578-
fn run_interval(&mut self) -> Option<super::BoxResult<Box<dyn super::IntervalResult + Sync + Send>>> {
579+
fn run_interval(&mut self) -> Option<BoxResult<IntervalResultBox>> {
579580
if self.stream.is_none() {
580581
//if still in the setup phase, connect to the receiver
581582
match self.process_connection() {
@@ -704,7 +705,7 @@ pub mod sender {
704705
}
705706
}
706707

707-
fn get_port(&self) -> super::BoxResult<u16> {
708+
fn get_port(&self) -> BoxResult<u16> {
708709
match &self.stream {
709710
Some(stream) => Ok(stream.local_addr()?.port()),
710711
None => Err(Box::new(simple_error::simple_error!("no stream currently exists"))),

0 commit comments

Comments
 (0)