Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ack-handling part #2 #32

Merged
merged 13 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion .github/workflows/bluefin.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ jobs:
run: cargo build --verbose
- name: Run tests
run: cargo test --verbose

coverage:
runs-on: ubuntu-latest
env:
Expand All @@ -39,10 +40,21 @@ jobs:
- name: Install cargo-llvm-cov
uses: taiki-e/install-action@cargo-llvm-cov
- name: Generate code coverage
run: cargo llvm-cov --all-features --workspace --lcov --output-path lcov.info
run: cargo llvm-cov --all-features --workspace --lcov --ignore-filename-regex "error.rs|*/bin/*.rs" --output-path lcov.info
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v3
with:
token: ${{ secrets.CODECOV_TOKEN }} # not required for public repos
files: lcov.info
fail_ci_if_error: false

kani:
runs-on: ubuntu-latest
strategy:
fail-fast: false
steps:
- name: Checkout bluefin
uses: actions/checkout@v4
- name: Run Kani
uses: model-checking/[email protected]

10 changes: 8 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@ etherparse = "0.15.0"
local-ip-address = "0.6.3"
rand = "0.8.5"
rstest = "0.23.0"
thiserror = "1.0.39"
tokio = { version = "1.41.1", features = ["full"] }
thiserror = "2.0.3"
tokio = { version = "1.41.1", features = ["full", "tracing"] }
console-subscriber = "0.4.1"
libc = "0.2.164"
sysctl = "0.6.0"

[dev-dependencies]
local-ip-address = "0.6.3"
Expand All @@ -31,6 +34,9 @@ path = "src/bin/client.rs"
name = "server"
path = "src/bin/server.rs"

[lints.rust]
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(coverage,coverage_nightly)', 'cfg(kani)'] }

[profile.release]
opt-level = 3
codegen-units = 1
Expand Down
17 changes: 11 additions & 6 deletions src/bin/client.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#![cfg_attr(coverage_nightly, feature(coverage_attribute))]
use std::{
net::{Ipv4Addr, SocketAddrV4},
time::Duration,
Expand All @@ -8,9 +9,11 @@
};
use tokio::{spawn, time::sleep};

#[cfg_attr(coverage_nightly, coverage(off))]
#[tokio::main]
async fn main() -> BluefinResult<()> {
let ports = [1320, 1322];
// console_subscriber::init();
let ports = [1320, 1322, 1323, 1324, 1325];

Check warning on line 16 in src/bin/client.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/client.rs#L15-L16

Added lines #L15 - L16 were not covered by tests
let mut tasks = vec![];
for ix in 0..2 {
// sleep(Duration::from_secs(3)).await;
Expand Down Expand Up @@ -40,21 +43,23 @@
total_bytes += size;
println!("Sent {} bytes", size);

sleep(Duration::from_secs(2)).await;
sleep(Duration::from_secs(1)).await;

Check warning on line 46 in src/bin/client.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/client.rs#L46

Added line #L46 was not covered by tests

size = conn.send(&[14, 14, 14, 14, 14, 14]).await?;
total_bytes += size;
println!("Sent {} bytes", size);

for ix in 0..200000 {
let my_array: [u8; 32] = rand::random();
for ix in 0..5000000 {
// let my_array: [u8; 32] = rand::random();
let my_array = [0u8; 1500];

Check warning on line 54 in src/bin/client.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/client.rs#L52-L54

Added lines #L52 - L54 were not covered by tests
size = conn.send(&my_array).await?;
total_bytes += size;
if ix % 1250 == 0 {
sleep(Duration::from_millis(10)).await;
if ix % 3000 == 0 {
sleep(Duration::from_millis(3)).await;

Check warning on line 58 in src/bin/client.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/client.rs#L57-L58

Added lines #L57 - L58 were not covered by tests
}
}
println!("Sent {} bytes", total_bytes);
sleep(Duration::from_secs(2)).await;

Check warning on line 62 in src/bin/client.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/client.rs#L62

Added line #L62 was not covered by tests

Ok::<(), BluefinError>(())
});
Expand Down
45 changes: 36 additions & 9 deletions src/bin/server.rs
Original file line number Diff line number Diff line change
@@ -1,37 +1,48 @@
#![cfg_attr(coverage_nightly, feature(coverage_attribute))]
use std::{
cmp::{max, min},
net::{Ipv4Addr, SocketAddrV4},
time::Duration,
time::{Duration, Instant},
};

use bluefin::{net::server::BluefinServer, utils::common::BluefinResult};
use tokio::{spawn, time::sleep};

#[cfg_attr(coverage_nightly, coverage(off))]
#[tokio::main]
async fn main() -> BluefinResult<()> {
// console_subscriber::init();

Check warning on line 14 in src/bin/server.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/server.rs#L14

Added line #L14 was not covered by tests
let mut server = BluefinServer::new(std::net::SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::new(127, 0, 0, 1),
1318,
)));
server.set_num_reader_workers(50)?;

Check warning on line 19 in src/bin/server.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/server.rs#L19

Added line #L19 was not covered by tests
server.bind().await?;

const MAX_NUM_CONNECTIONS: usize = 5;
for _ in 0..MAX_NUM_CONNECTIONS {
const MAX_NUM_CONNECTIONS: usize = 3;
for conn_num in 0..MAX_NUM_CONNECTIONS {

Check warning on line 23 in src/bin/server.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/server.rs#L22-L23

Added lines #L22 - L23 were not covered by tests
let mut s = server.clone();
let _ = spawn(async move {
let mut total_bytes = 0;
let _num = conn_num;

Check warning on line 26 in src/bin/server.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/server.rs#L26

Added line #L26 was not covered by tests
loop {
println!();
let _conn = s.accept().await;

match _conn {
Ok(mut conn) => {
spawn(async move {
let mut total_bytes = 0;
let mut recv_bytes = [0u8; 500000];
let mut min_bytes = usize::MAX;
let mut max_bytes = 0;
let mut iteration = 1;
let now = Instant::now();

Check warning on line 38 in src/bin/server.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/server.rs#L33-L38

Added lines #L33 - L38 were not covered by tests
loop {
let mut recv_bytes = [0u8; 1024];
let size = conn.recv(&mut recv_bytes, 1024).await.unwrap();
// eprintln!("Waiting...");
let size = conn.recv(&mut recv_bytes, 500000).await.unwrap();

Check warning on line 41 in src/bin/server.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/server.rs#L40-L41

Added lines #L40 - L41 were not covered by tests
total_bytes += size;

println!("total bytes: {}", total_bytes);
min_bytes = min(size, min_bytes);
max_bytes = max(size, max_bytes);
// eprintln!("read {} bytes --- total bytes: {}", size, total_bytes);

Check warning on line 45 in src/bin/server.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/server.rs#L43-L45

Added lines #L43 - L45 were not covered by tests

/*
println!(
Expand All @@ -42,6 +53,22 @@
total_bytes
);
*/
if total_bytes >= 100000 {
let elapsed = now.elapsed().as_secs();
let through_put = u64::try_from(total_bytes).unwrap() / elapsed;
let avg_recv_bytes: f64 = total_bytes as f64 / iteration as f64;
eprintln!(
"{} {:.1} kb/s or {:.1} mb/s (read {:.1} kb/iteration, min: {:.1} kb, max: {:.1} kb)",
_num,
through_put as f64 / 1e3,
through_put as f64 / 1e6,
avg_recv_bytes / 1e3,
min_bytes as f64 / 1e3,
max_bytes as f64 / 1e3
);
// break;
}
iteration += 1;

Check warning on line 71 in src/bin/server.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/server.rs#L56-L71

Added lines #L56 - L71 were not covered by tests
}
});
}
Expand Down
6 changes: 3 additions & 3 deletions src/core/error.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use thiserror::Error;

#[derive(Error, Debug)]
#[derive(Error, Debug, PartialEq)]
pub enum BluefinError {
#[error("Unable to serialise data")]
SerialiseError,
Expand All @@ -11,8 +11,8 @@ pub enum BluefinError {
#[error("Connection buffer does not exist")]
BufferDoesNotExist,

#[error("Current buffer is full.")]
BufferFullError,
#[error("Current buffer is full: `{0}`")]
BufferFullError(String),

#[error("Current buffer is empty.")]
BufferEmptyError,
Expand Down
Loading