Skip to content

Commit

Permalink
Ack-handling part #2 (#32)
Browse files Browse the repository at this point in the history
  • Loading branch information
franklee26 authored Nov 27, 2024
1 parent ddd9331 commit c326637
Show file tree
Hide file tree
Showing 18 changed files with 1,475 additions and 335 deletions.
14 changes: 13 additions & 1 deletion .github/workflows/bluefin.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ jobs:
run: cargo build --verbose
- name: Run tests
run: cargo test --verbose

coverage:
runs-on: ubuntu-latest
env:
Expand All @@ -39,10 +40,21 @@ jobs:
- name: Install cargo-llvm-cov
uses: taiki-e/install-action@cargo-llvm-cov
- name: Generate code coverage
run: cargo llvm-cov --all-features --workspace --lcov --output-path lcov.info
run: cargo llvm-cov --all-features --workspace --lcov --ignore-filename-regex "error.rs|*/bin/*.rs" --output-path lcov.info
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v3
with:
token: ${{ secrets.CODECOV_TOKEN }} # not required for public repos
files: lcov.info
fail_ci_if_error: false

kani:
runs-on: ubuntu-latest
strategy:
fail-fast: false
steps:
- name: Checkout bluefin
uses: actions/checkout@v4
- name: Run Kani
uses: model-checking/[email protected]

10 changes: 8 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@ etherparse = "0.15.0"
local-ip-address = "0.6.3"
rand = "0.8.5"
rstest = "0.23.0"
thiserror = "1.0.39"
tokio = { version = "1.41.1", features = ["full"] }
thiserror = "2.0.3"
tokio = { version = "1.41.1", features = ["full", "tracing"] }
console-subscriber = "0.4.1"
libc = "0.2.164"
sysctl = "0.6.0"

[dev-dependencies]
local-ip-address = "0.6.3"
Expand All @@ -31,6 +34,9 @@ path = "src/bin/client.rs"
name = "server"
path = "src/bin/server.rs"

[lints.rust]
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(coverage,coverage_nightly)', 'cfg(kani)'] }

[profile.release]
opt-level = 3
codegen-units = 1
Expand Down
17 changes: 11 additions & 6 deletions src/bin/client.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#![cfg_attr(coverage_nightly, feature(coverage_attribute))]
use std::{
net::{Ipv4Addr, SocketAddrV4},
time::Duration,
Expand All @@ -8,9 +9,11 @@ use bluefin::{
};
use tokio::{spawn, time::sleep};

#[cfg_attr(coverage_nightly, coverage(off))]
#[tokio::main]
async fn main() -> BluefinResult<()> {
let ports = [1320, 1322];
// console_subscriber::init();
let ports = [1320, 1322, 1323, 1324, 1325];
let mut tasks = vec![];
for ix in 0..2 {
// sleep(Duration::from_secs(3)).await;
Expand Down Expand Up @@ -40,21 +43,23 @@ async fn main() -> BluefinResult<()> {
total_bytes += size;
println!("Sent {} bytes", size);

sleep(Duration::from_secs(2)).await;
sleep(Duration::from_secs(1)).await;

size = conn.send(&[14, 14, 14, 14, 14, 14]).await?;
total_bytes += size;
println!("Sent {} bytes", size);

for ix in 0..200000 {
let my_array: [u8; 32] = rand::random();
for ix in 0..5000000 {
// let my_array: [u8; 32] = rand::random();
let my_array = [0u8; 1500];
size = conn.send(&my_array).await?;
total_bytes += size;
if ix % 1250 == 0 {
sleep(Duration::from_millis(10)).await;
if ix % 3000 == 0 {
sleep(Duration::from_millis(3)).await;
}
}
println!("Sent {} bytes", total_bytes);
sleep(Duration::from_secs(2)).await;

Ok::<(), BluefinError>(())
});
Expand Down
45 changes: 36 additions & 9 deletions src/bin/server.rs
Original file line number Diff line number Diff line change
@@ -1,37 +1,48 @@
#![cfg_attr(coverage_nightly, feature(coverage_attribute))]
use std::{
cmp::{max, min},
net::{Ipv4Addr, SocketAddrV4},
time::Duration,
time::{Duration, Instant},
};

use bluefin::{net::server::BluefinServer, utils::common::BluefinResult};
use tokio::{spawn, time::sleep};

#[cfg_attr(coverage_nightly, coverage(off))]
#[tokio::main]
async fn main() -> BluefinResult<()> {
// console_subscriber::init();
let mut server = BluefinServer::new(std::net::SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::new(127, 0, 0, 1),
1318,
)));
server.set_num_reader_workers(50)?;
server.bind().await?;

const MAX_NUM_CONNECTIONS: usize = 5;
for _ in 0..MAX_NUM_CONNECTIONS {
const MAX_NUM_CONNECTIONS: usize = 3;
for conn_num in 0..MAX_NUM_CONNECTIONS {
let mut s = server.clone();
let _ = spawn(async move {
let mut total_bytes = 0;
let _num = conn_num;
loop {
println!();
let _conn = s.accept().await;

match _conn {
Ok(mut conn) => {
spawn(async move {
let mut total_bytes = 0;
let mut recv_bytes = [0u8; 500000];
let mut min_bytes = usize::MAX;
let mut max_bytes = 0;
let mut iteration = 1;
let now = Instant::now();
loop {
let mut recv_bytes = [0u8; 1024];
let size = conn.recv(&mut recv_bytes, 1024).await.unwrap();
// eprintln!("Waiting...");
let size = conn.recv(&mut recv_bytes, 500000).await.unwrap();
total_bytes += size;

println!("total bytes: {}", total_bytes);
min_bytes = min(size, min_bytes);
max_bytes = max(size, max_bytes);
// eprintln!("read {} bytes --- total bytes: {}", size, total_bytes);

/*
println!(
Expand All @@ -42,6 +53,22 @@ async fn main() -> BluefinResult<()> {
total_bytes
);
*/
if total_bytes >= 100000 {
let elapsed = now.elapsed().as_secs();
let through_put = u64::try_from(total_bytes).unwrap() / elapsed;
let avg_recv_bytes: f64 = total_bytes as f64 / iteration as f64;
eprintln!(
"{} {:.1} kb/s or {:.1} mb/s (read {:.1} kb/iteration, min: {:.1} kb, max: {:.1} kb)",
_num,
through_put as f64 / 1e3,
through_put as f64 / 1e6,
avg_recv_bytes / 1e3,
min_bytes as f64 / 1e3,
max_bytes as f64 / 1e3
);
// break;
}
iteration += 1;
}
});
}
Expand Down
6 changes: 3 additions & 3 deletions src/core/error.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use thiserror::Error;

#[derive(Error, Debug)]
#[derive(Error, Debug, PartialEq)]
pub enum BluefinError {
#[error("Unable to serialise data")]
SerialiseError,
Expand All @@ -11,8 +11,8 @@ pub enum BluefinError {
#[error("Connection buffer does not exist")]
BufferDoesNotExist,

#[error("Current buffer is full.")]
BufferFullError,
#[error("Current buffer is full: `{0}`")]
BufferFullError(String),

#[error("Current buffer is empty.")]
BufferEmptyError,
Expand Down
Loading

0 comments on commit c326637

Please sign in to comment.