Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Read socket rework #35

Merged
merged 8 commits into from
Dec 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions .github/workflows/bluefin.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ubuntu-latest, macos-latest]
os: [ ubuntu-latest, macos-latest ]
include:
- os: ubuntu-latest
target: Linux
Expand All @@ -23,11 +23,11 @@ jobs:
target: Macos

steps:
- uses: actions/checkout@v3
- name: Build
run: cargo build --verbose
- name: Run tests
run: cargo test --verbose
- uses: actions/checkout@v3
- name: Build
run: cargo build --verbose
- name: Run tests
run: cargo test --verbose

coverage:
runs-on: ubuntu-latest
Expand All @@ -40,7 +40,7 @@ jobs:
- name: Install cargo-llvm-cov
uses: taiki-e/install-action@cargo-llvm-cov
- name: Generate code coverage
run: cargo llvm-cov --all-features --workspace --lcov --ignore-filename-regex "error.rs|*/bin/*.rs" --output-path lcov.info
run: cargo llvm-cov --all-features --workspace --lcov --ignore-filename-regex "error.rs" --output-path lcov.info
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v3
with:
Expand Down
9 changes: 4 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,14 @@ repository = "https://github.com/franklee26/bluefin"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
etherparse = "0.15.0"
local-ip-address = "0.6.3"
rand = "0.8.5"
rstest = "0.23.0"
thiserror = "2.0.3"
tokio = { version = "1.41.1", features = ["full", "tracing"] }
thiserror = "2.0.9"
tokio = { version = "1.42.0", features = ["full", "tracing"] }
console-subscriber = "0.4.1"
libc = "0.2.164"
sysctl = "0.6.0"
socket2 = "0.5.8"

[dev-dependencies]
local-ip-address = "0.6.3"
Expand All @@ -38,7 +37,7 @@ path = "src/bin/server.rs"
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(coverage,coverage_nightly)', 'cfg(kani)'] }

[profile.release]
opt-level = 3
opt-level = 3
codegen-units = 1
lto = "fat"
debug = true
78 changes: 40 additions & 38 deletions src/bin/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,54 +16,56 @@
let ports = [1320, 1322, 1323, 1324, 1325];
let mut tasks = vec![];
for ix in 0..2 {
// sleep(Duration::from_secs(3)).await;
let task = spawn(async move {
let mut total_bytes = 0;
let mut client = BluefinClient::new(std::net::SocketAddr::V4(SocketAddrV4::new(
let mut client = BluefinClient::new(std::net::SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::new(127, 0, 0, 1),
ports[ix],
)));
if let Ok(mut conn) = client
.connect(std::net::SocketAddr::V4(SocketAddrV4::new(

Check warning on line 24 in src/bin/client.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/client.rs#L19-L24

Added lines #L19 - L24 were not covered by tests
Ipv4Addr::new(127, 0, 0, 1),
ports[ix],
)));
let mut conn = client
.connect(std::net::SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::new(127, 0, 0, 1),
1318,
)))
.await?;
1318,
)))
.await
{
let task = spawn(async move {
let mut total_bytes = 0;

Check warning on line 31 in src/bin/client.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/client.rs#L26-L31

Added lines #L26 - L31 were not covered by tests

let bytes = [1, 2, 3, 4, 5, 6, 7];
let mut size = conn.send(&bytes).await?;
total_bytes += size;
println!("Sent {} bytes", size);
let bytes = [1, 2, 3, 4, 5, 6, 7];
let mut size = conn.send(&bytes)?;
total_bytes += size;
println!("Sent {} bytes", size);

Check warning on line 36 in src/bin/client.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/client.rs#L33-L36

Added lines #L33 - L36 were not covered by tests

size = conn.send(&[12, 12, 12, 12, 12, 12]).await?;
total_bytes += size;
println!("Sent {} bytes", size);
size = conn.send(&[12, 12, 12, 12, 12, 12])?;
total_bytes += size;
println!("Sent {} bytes", size);

Check warning on line 40 in src/bin/client.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/client.rs#L38-L40

Added lines #L38 - L40 were not covered by tests

size = conn.send(&[13; 100]).await?;
total_bytes += size;
println!("Sent {} bytes", size);
size = conn.send(&[13; 100])?;
total_bytes += size;
println!("Sent {} bytes", size);

Check warning on line 44 in src/bin/client.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/client.rs#L42-L44

Added lines #L42 - L44 were not covered by tests

sleep(Duration::from_secs(1)).await;
sleep(Duration::from_secs(1)).await;

Check warning on line 46 in src/bin/client.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/client.rs#L46

Added line #L46 was not covered by tests

size = conn.send(&[14, 14, 14, 14, 14, 14]).await?;
total_bytes += size;
println!("Sent {} bytes", size);
size = conn.send(&[14, 14, 14, 14, 14, 14])?;
total_bytes += size;
println!("Sent {} bytes", size);

Check warning on line 50 in src/bin/client.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/client.rs#L48-L50

Added lines #L48 - L50 were not covered by tests

for ix in 0..5000000 {
// let my_array: [u8; 32] = rand::random();
let my_array = [0u8; 1500];
size = conn.send(&my_array).await?;
total_bytes += size;
if ix % 3000 == 0 {
sleep(Duration::from_millis(3)).await;
for ix in 0..10000000 {
// let my_array: [u8; 32] = rand::random();
size = conn.send(&my_array)?;
total_bytes += size;
if ix % 4000 == 0 {
sleep(Duration::from_millis(1)).await;
}

Check warning on line 59 in src/bin/client.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/client.rs#L53-L59

Added lines #L53 - L59 were not covered by tests
}
}
println!("Sent {} bytes", total_bytes);
sleep(Duration::from_secs(2)).await;
println!("Sent {} bytes", total_bytes);
sleep(Duration::from_secs(3)).await;

Check warning on line 62 in src/bin/client.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/client.rs#L61-L62

Added lines #L61 - L62 were not covered by tests

Ok::<(), BluefinError>(())
});
tasks.push(task);
Ok::<(), BluefinError>(())
});
tasks.push(task);
sleep(Duration::from_millis(1)).await;
}

Check warning on line 68 in src/bin/client.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/client.rs#L64-L68

Added lines #L64 - L68 were not covered by tests
}

for t in tasks {
Expand Down
78 changes: 51 additions & 27 deletions src/bin/server.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
#![cfg_attr(coverage_nightly, feature(coverage_attribute))]
use bluefin::{net::server::BluefinServer, utils::common::BluefinResult};
use std::{
cmp::{max, min},
net::{Ipv4Addr, SocketAddrV4},
time::Instant,
};

use bluefin::{net::server::BluefinServer, utils::common::BluefinResult};
use tokio::{spawn, task::JoinSet};

#[cfg_attr(coverage_nightly, coverage(off))]
Expand All @@ -24,68 +23,93 @@
Ipv4Addr::new(127, 0, 0, 1),
1318,
)));
server.set_num_reader_workers(300)?;
server.set_num_reader_workers(3)?;

Check warning on line 26 in src/bin/server.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/server.rs#L26

Added line #L26 was not covered by tests
server.bind().await?;
let mut join_set = JoinSet::new();

const MAX_NUM_CONNECTIONS: usize = 2;
for conn_num in 0..MAX_NUM_CONNECTIONS {
let mut s = server.clone();
let _num = conn_num;
let mut _num = 0;
while let Ok(mut conn) = server.accept().await {

Check warning on line 31 in src/bin/server.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/server.rs#L30-L31

Added lines #L30 - L31 were not covered by tests
let _ = join_set.spawn(async move {
let _conn = s.accept().await;

match _conn {
Ok(mut conn) => {
let mut total_bytes = 0;
let mut recv_bytes = [0u8; 80000];
let mut recv_bytes = [0u8; 10000];

Check warning on line 34 in src/bin/server.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/server.rs#L34

Added line #L34 was not covered by tests
let mut min_bytes = usize::MAX;
let mut max_bytes = 0;
let mut iteration = 1;
let mut iteration: i64 = 1;

Check warning on line 37 in src/bin/server.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/server.rs#L37

Added line #L37 was not covered by tests
let mut num_iterations_without_print = 0;
let mut max_throughput = 0.0;
let mut min_throughput = f64::MAX;

Check warning on line 40 in src/bin/server.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/server.rs#L39-L40

Added lines #L39 - L40 were not covered by tests
let now = Instant::now();
loop {
let size = conn.recv(&mut recv_bytes, 80000).await.unwrap();
let size = conn.recv(&mut recv_bytes, 10000).await.unwrap();

Check warning on line 43 in src/bin/server.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/server.rs#L43

Added line #L43 was not covered by tests
total_bytes += size;
min_bytes = min(size, min_bytes);
max_bytes = max(size, max_bytes);
// eprintln!("read {} bytes --- total bytes: {}", size, total_bytes);

/*
println!(
"({:x}_{:x}) >>> Received: {:?} (total: {})",
"({:x}_{:x}) >>> Received: {} bytes",

Check warning on line 51 in src/bin/server.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/server.rs#L51

Added line #L51 was not covered by tests
conn.src_conn_id,
conn.dst_conn_id,
&recv_bytes[..size],
total_bytes
);
*/
num_iterations_without_print += 1;
if total_bytes >= 100000 && num_iterations_without_print == 200 {
if total_bytes >= 1000000 && num_iterations_without_print == 3500 {

Check warning on line 58 in src/bin/server.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/server.rs#L58

Added line #L58 was not covered by tests
let elapsed = now.elapsed().as_secs();
if elapsed == 0 {
eprintln!("(#{})Total bytes: {} (0s???)", _num, total_bytes);
num_iterations_without_print = 0;
continue;
}

Check warning on line 64 in src/bin/server.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/server.rs#L60-L64

Added lines #L60 - L64 were not covered by tests
let through_put = u64::try_from(total_bytes).unwrap() / elapsed;
let through_put_mb = through_put as f64 / 1e6;

Check warning on line 66 in src/bin/server.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/server.rs#L66

Added line #L66 was not covered by tests
let avg_recv_bytes: f64 = total_bytes as f64 / iteration as f64;
eprintln!(
"{} {:.1} kb/s or {:.1} mb/s (read {:.1} kb/iteration, min: {:.1} kb, max: {:.1} kb)",

if through_put_mb > max_throughput {
max_throughput = through_put_mb;
}

Check warning on line 71 in src/bin/server.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/server.rs#L68-L71

Added lines #L68 - L71 were not covered by tests

if through_put_mb < min_throughput {
min_throughput = through_put_mb;
}

Check warning on line 75 in src/bin/server.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/server.rs#L73-L75

Added lines #L73 - L75 were not covered by tests

if through_put_mb < 1000.0 {
eprintln!(
"{} {:.1} kb/s or {:.1} mb/s (read {:.1} kb/iteration, min: {:.1} kb, max: {:.1} kb) (max {:.1} mb/s, min {:.1} mb/s)",

Check warning on line 79 in src/bin/server.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/server.rs#L77-L79

Added lines #L77 - L79 were not covered by tests
_num,
through_put as f64 / 1e3,
through_put as f64 / 1e6,
through_put_mb,

Check warning on line 82 in src/bin/server.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/server.rs#L82

Added line #L82 was not covered by tests
avg_recv_bytes / 1e3,
min_bytes as f64 / 1e3,
max_bytes as f64 / 1e3
max_bytes as f64 / 1e3,
max_throughput,
min_throughput

Check warning on line 87 in src/bin/server.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/server.rs#L85-L87

Added lines #L85 - L87 were not covered by tests
);
num_iterations_without_print = 0;
} else {
eprintln!(
"{} {:.2} gb/s (read {:.1} kb/iter, min: {:.1} kb, max: {:.1} kb) (max {:.2} gb/s, min {:.1} kb/s)",
_num,
through_put_mb / 1e3,
avg_recv_bytes / 1e3,
min_bytes as f64 / 1e3,
max_bytes as f64 / 1e3,
max_throughput / 1e3,
min_throughput
);
}
num_iterations_without_print = 0;

Check warning on line 101 in src/bin/server.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/server.rs#L89-L101

Added lines #L89 - L101 were not covered by tests
// break;
}
iteration += 1;
}
}
Err(e) => {
eprintln!("Could not accept connection due to error: {:?}", e);
}
}
});
_num += 1;
if _num >= 2 {
break;
}

Check warning on line 110 in src/bin/server.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/server.rs#L107-L110

Added lines #L107 - L110 were not covered by tests
}

join_set.join_all().await;
Ok(())
}
11 changes: 11 additions & 0 deletions src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,17 @@ pub mod error;
pub mod header;
pub mod packet;

pub trait Extract: Default {
/// Replace self with default and returns the initial value.
fn extract(&mut self) -> Self;
}

impl<T: Default> Extract for T {
fn extract(&mut self) -> Self {
std::mem::replace(self, T::default())
}
}

pub trait Serialisable {
fn serialise(&self) -> Vec<u8>;
fn deserialise(bytes: &[u8]) -> Result<Self, BluefinError>
Expand Down
13 changes: 13 additions & 0 deletions src/core/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,19 @@
}
}

impl Default for BluefinPacket {
#[allow(invalid_value)]
#[inline]
fn default() -> Self {
// SAFETY
// Actually, this isn't safe and access to this kind of zero'd value would result
// in panics. There does not exist a 'default' bluefin packet. Therefore, the
// purpose of this is to quickly instantiate a 'filler' bluefin packet BUT this
// default value should NEVER be read/used.
unsafe { std::mem::zeroed() }
}

Check warning on line 64 in src/core/packet.rs

View check run for this annotation

Codecov / codecov/patch

src/core/packet.rs#L57-L64

Added lines #L57 - L64 were not covered by tests
}

impl BluefinPacket {
#[inline]
pub fn builder() -> BluefinPacketBuilder {
Expand Down
16 changes: 8 additions & 8 deletions src/net/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ use std::{
use rand::Rng;
use tokio::{net::UdpSocket, sync::RwLock};

use super::{
connection::{BluefinConnection, ConnectionBuffer, ConnectionManager},
AckBuffer, ConnectionManagedBuffers,
};
use crate::utils::get_udp_socket;
use crate::{
core::{context::BluefinHost, error::BluefinError, header::PacketType, Serialisable},
net::{
Expand All @@ -15,12 +20,7 @@ use crate::{
utils::common::BluefinResult,
};

use super::{
connection::{BluefinConnection, ConnectionBuffer, ConnectionManager},
AckBuffer, ConnectionManagedBuffers,
};

const NUM_TX_WORKERS_FOR_CLIENT_DEFAULT: u16 = 5;
const NUM_TX_WORKERS_FOR_CLIENT_DEFAULT: u16 = 1;

pub struct BluefinClient {
socket: Option<Arc<UdpSocket>>,
Expand Down Expand Up @@ -53,7 +53,7 @@ impl BluefinClient {
}

pub async fn connect(&mut self, dst_addr: SocketAddr) -> BluefinResult<BluefinConnection> {
let socket = Arc::new(UdpSocket::bind(self.src_addr).await?);
let socket = Arc::new(get_udp_socket(self.src_addr)?);
self.socket = Some(Arc::clone(&socket));
self.dst_addr = Some(dst_addr);

Expand Down Expand Up @@ -137,8 +137,8 @@ impl BluefinClient {
packet_number + 2,
Arc::clone(&conn_buffer),
Arc::clone(&ack_buff),
Arc::clone(self.socket.as_ref().unwrap()),
self.dst_addr.unwrap(),
self.src_addr,
))
}
}
Loading
Loading