Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove extra loop #33

Merged
merged 1 commit into from
Dec 6, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 62 additions & 59 deletions src/bin/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,87 +2,90 @@
use std::{
cmp::{max, min},
net::{Ipv4Addr, SocketAddrV4},
time::{Duration, Instant},
time::Instant,
};

use bluefin::{net::server::BluefinServer, utils::common::BluefinResult};
use tokio::{spawn, time::sleep};
use tokio::{spawn, task::JoinSet};

#[cfg_attr(coverage_nightly, coverage(off))]
#[tokio::main]
async fn main() -> BluefinResult<()> {
let _ = spawn(async move {
let _ = run().await;
})
.await;
Ok(())
}

async fn run() -> BluefinResult<()> {
// console_subscriber::init();
let mut server = BluefinServer::new(std::net::SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::new(127, 0, 0, 1),
1318,
)));
server.set_num_reader_workers(50)?;
server.set_num_reader_workers(300)?;
server.bind().await?;
let mut join_set = JoinSet::new();

const MAX_NUM_CONNECTIONS: usize = 3;
const MAX_NUM_CONNECTIONS: usize = 2;
for conn_num in 0..MAX_NUM_CONNECTIONS {
let mut s = server.clone();
let _ = spawn(async move {
let _num = conn_num;
loop {
let _conn = s.accept().await;
let _num = conn_num;
let _ = join_set.spawn(async move {
let _conn = s.accept().await;

match _conn {
Ok(mut conn) => {
spawn(async move {
let mut total_bytes = 0;
let mut recv_bytes = [0u8; 500000];
let mut min_bytes = usize::MAX;
let mut max_bytes = 0;
let mut iteration = 1;
let now = Instant::now();
loop {
// eprintln!("Waiting...");
let size = conn.recv(&mut recv_bytes, 500000).await.unwrap();
total_bytes += size;
min_bytes = min(size, min_bytes);
max_bytes = max(size, max_bytes);
// eprintln!("read {} bytes --- total bytes: {}", size, total_bytes);
match _conn {
Ok(mut conn) => {
let mut total_bytes = 0;
let mut recv_bytes = [0u8; 80000];
let mut min_bytes = usize::MAX;
let mut max_bytes = 0;
let mut iteration = 1;
let mut num_iterations_without_print = 0;
let now = Instant::now();
loop {
let size = conn.recv(&mut recv_bytes, 80000).await.unwrap();
total_bytes += size;
min_bytes = min(size, min_bytes);
max_bytes = max(size, max_bytes);
// eprintln!("read {} bytes --- total bytes: {}", size, total_bytes);

/*
println!(
"({:x}_{:x}) >>> Received: {:?} (total: {})",
conn.src_conn_id,
conn.dst_conn_id,
&recv_bytes[..size],
total_bytes
/*
println!(
"({:x}_{:x}) >>> Received: {:?} (total: {})",
conn.src_conn_id,
conn.dst_conn_id,
&recv_bytes[..size],
total_bytes
);
*/
num_iterations_without_print += 1;
if total_bytes >= 100000 && num_iterations_without_print == 200 {
let elapsed = now.elapsed().as_secs();
let through_put = u64::try_from(total_bytes).unwrap() / elapsed;
let avg_recv_bytes: f64 = total_bytes as f64 / iteration as f64;
eprintln!(
"{} {:.1} kb/s or {:.1} mb/s (read {:.1} kb/iteration, min: {:.1} kb, max: {:.1} kb)",
_num,
through_put as f64 / 1e3,
through_put as f64 / 1e6,
avg_recv_bytes / 1e3,
min_bytes as f64 / 1e3,
max_bytes as f64 / 1e3
);
*/
if total_bytes >= 100000 {
let elapsed = now.elapsed().as_secs();
let through_put = u64::try_from(total_bytes).unwrap() / elapsed;
let avg_recv_bytes: f64 = total_bytes as f64 / iteration as f64;
eprintln!(
"{} {:.1} kb/s or {:.1} mb/s (read {:.1} kb/iteration, min: {:.1} kb, max: {:.1} kb)",
_num,
through_put as f64 / 1e3,
through_put as f64 / 1e6,
avg_recv_bytes / 1e3,
min_bytes as f64 / 1e3,
max_bytes as f64 / 1e3
);
// break;
}
iteration += 1;
}
});
}
Err(e) => {
eprintln!("Could not accept connection due to error: {:?}", e);
num_iterations_without_print = 0;
// break;
}
iteration += 1;
}
}

sleep(Duration::from_secs(1)).await;
Err(e) => {
eprintln!("Could not accept connection due to error: {:?}", e);
}
}
});
}

// The spawned tasks are looping forever. This infinite loop will keep the
// process up forever.
loop {}
join_set.join_all().await;
Ok(())
}
Loading