Skip to content

Commit

Permalink
remove extra loop (#33)
Browse files Browse the repository at this point in the history
  • Loading branch information
franklee26 authored Dec 6, 2024
1 parent c326637 commit fc9bc8d
Showing 1 changed file with 62 additions and 59 deletions.
121 changes: 62 additions & 59 deletions src/bin/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,87 +2,90 @@
use std::{
cmp::{max, min},
net::{Ipv4Addr, SocketAddrV4},
time::{Duration, Instant},
time::Instant,
};

use bluefin::{net::server::BluefinServer, utils::common::BluefinResult};
use tokio::{spawn, time::sleep};
use tokio::{spawn, task::JoinSet};

#[cfg_attr(coverage_nightly, coverage(off))]
#[tokio::main]
async fn main() -> BluefinResult<()> {
let _ = spawn(async move {
let _ = run().await;
})
.await;
Ok(())
}

async fn run() -> BluefinResult<()> {
// console_subscriber::init();
let mut server = BluefinServer::new(std::net::SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::new(127, 0, 0, 1),
1318,
)));
server.set_num_reader_workers(50)?;
server.set_num_reader_workers(300)?;
server.bind().await?;
let mut join_set = JoinSet::new();

const MAX_NUM_CONNECTIONS: usize = 3;
const MAX_NUM_CONNECTIONS: usize = 2;
for conn_num in 0..MAX_NUM_CONNECTIONS {
let mut s = server.clone();
let _ = spawn(async move {
let _num = conn_num;
loop {
let _conn = s.accept().await;
let _num = conn_num;
let _ = join_set.spawn(async move {
let _conn = s.accept().await;

match _conn {
Ok(mut conn) => {
spawn(async move {
let mut total_bytes = 0;
let mut recv_bytes = [0u8; 500000];
let mut min_bytes = usize::MAX;
let mut max_bytes = 0;
let mut iteration = 1;
let now = Instant::now();
loop {
// eprintln!("Waiting...");
let size = conn.recv(&mut recv_bytes, 500000).await.unwrap();
total_bytes += size;
min_bytes = min(size, min_bytes);
max_bytes = max(size, max_bytes);
// eprintln!("read {} bytes --- total bytes: {}", size, total_bytes);
match _conn {
Ok(mut conn) => {
let mut total_bytes = 0;
let mut recv_bytes = [0u8; 80000];
let mut min_bytes = usize::MAX;
let mut max_bytes = 0;
let mut iteration = 1;
let mut num_iterations_without_print = 0;
let now = Instant::now();
loop {
let size = conn.recv(&mut recv_bytes, 80000).await.unwrap();
total_bytes += size;
min_bytes = min(size, min_bytes);
max_bytes = max(size, max_bytes);
// eprintln!("read {} bytes --- total bytes: {}", size, total_bytes);

/*
println!(
"({:x}_{:x}) >>> Received: {:?} (total: {})",
conn.src_conn_id,
conn.dst_conn_id,
&recv_bytes[..size],
total_bytes
/*
println!(
"({:x}_{:x}) >>> Received: {:?} (total: {})",
conn.src_conn_id,
conn.dst_conn_id,
&recv_bytes[..size],
total_bytes
);
*/
num_iterations_without_print += 1;
if total_bytes >= 100000 && num_iterations_without_print == 200 {
let elapsed = now.elapsed().as_secs();
let through_put = u64::try_from(total_bytes).unwrap() / elapsed;
let avg_recv_bytes: f64 = total_bytes as f64 / iteration as f64;
eprintln!(
"{} {:.1} kb/s or {:.1} mb/s (read {:.1} kb/iteration, min: {:.1} kb, max: {:.1} kb)",
_num,
through_put as f64 / 1e3,
through_put as f64 / 1e6,
avg_recv_bytes / 1e3,
min_bytes as f64 / 1e3,
max_bytes as f64 / 1e3
);
*/
if total_bytes >= 100000 {
let elapsed = now.elapsed().as_secs();
let through_put = u64::try_from(total_bytes).unwrap() / elapsed;
let avg_recv_bytes: f64 = total_bytes as f64 / iteration as f64;
eprintln!(
"{} {:.1} kb/s or {:.1} mb/s (read {:.1} kb/iteration, min: {:.1} kb, max: {:.1} kb)",
_num,
through_put as f64 / 1e3,
through_put as f64 / 1e6,
avg_recv_bytes / 1e3,
min_bytes as f64 / 1e3,
max_bytes as f64 / 1e3
);
// break;
}
iteration += 1;
}
});
}
Err(e) => {
eprintln!("Could not accept connection due to error: {:?}", e);
num_iterations_without_print = 0;
// break;
}
iteration += 1;
}
}

sleep(Duration::from_secs(1)).await;
Err(e) => {
eprintln!("Could not accept connection due to error: {:?}", e);
}
}
});
}

// The spawned tasks are looping forever. This infinite loop will keep the
// process up forever.
loop {}
join_set.join_all().await;
Ok(())
}

0 comments on commit fc9bc8d

Please sign in to comment.