Skip to content

Commit

Permalink
Read socket rework (#35)
Browse files Browse the repository at this point in the history
* 1gbps throughput

* warnings

* tokio mpsc

* migrate sender to mpsc

* reducing acks sent

* fix server accept logic

* cleanup + minor speedup

* codecov bring back bins

---------

Co-authored-by: Frank Lee <>
  • Loading branch information
franklee26 authored Dec 31, 2024
1 parent 3424b6f commit 5fc3314
Show file tree
Hide file tree
Showing 16 changed files with 772 additions and 650 deletions.
14 changes: 7 additions & 7 deletions .github/workflows/bluefin.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ubuntu-latest, macos-latest]
os: [ ubuntu-latest, macos-latest ]
include:
- os: ubuntu-latest
target: Linux
Expand All @@ -23,11 +23,11 @@ jobs:
target: Macos

steps:
- uses: actions/checkout@v3
- name: Build
run: cargo build --verbose
- name: Run tests
run: cargo test --verbose
- uses: actions/checkout@v3
- name: Build
run: cargo build --verbose
- name: Run tests
run: cargo test --verbose

coverage:
runs-on: ubuntu-latest
Expand All @@ -40,7 +40,7 @@ jobs:
- name: Install cargo-llvm-cov
uses: taiki-e/install-action@cargo-llvm-cov
- name: Generate code coverage
run: cargo llvm-cov --all-features --workspace --lcov --ignore-filename-regex "error.rs|*/bin/*.rs" --output-path lcov.info
run: cargo llvm-cov --all-features --workspace --lcov --ignore-filename-regex "error.rs" --output-path lcov.info
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v3
with:
Expand Down
9 changes: 4 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,14 @@ repository = "https://github.com/franklee26/bluefin"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
etherparse = "0.15.0"
local-ip-address = "0.6.3"
rand = "0.8.5"
rstest = "0.23.0"
thiserror = "2.0.3"
tokio = { version = "1.41.1", features = ["full", "tracing"] }
thiserror = "2.0.9"
tokio = { version = "1.42.0", features = ["full", "tracing"] }
console-subscriber = "0.4.1"
libc = "0.2.164"
sysctl = "0.6.0"
socket2 = "0.5.8"

[dev-dependencies]
local-ip-address = "0.6.3"
Expand All @@ -38,7 +37,7 @@ path = "src/bin/server.rs"
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(coverage,coverage_nightly)', 'cfg(kani)'] }

[profile.release]
opt-level = 3
opt-level = 3
codegen-units = 1
lto = "fat"
debug = true
78 changes: 40 additions & 38 deletions src/bin/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,54 +16,56 @@ async fn main() -> BluefinResult<()> {
let ports = [1320, 1322, 1323, 1324, 1325];
let mut tasks = vec![];
for ix in 0..2 {
// sleep(Duration::from_secs(3)).await;
let task = spawn(async move {
let mut total_bytes = 0;
let mut client = BluefinClient::new(std::net::SocketAddr::V4(SocketAddrV4::new(
let mut client = BluefinClient::new(std::net::SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::new(127, 0, 0, 1),
ports[ix],
)));
if let Ok(mut conn) = client
.connect(std::net::SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::new(127, 0, 0, 1),
ports[ix],
)));
let mut conn = client
.connect(std::net::SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::new(127, 0, 0, 1),
1318,
)))
.await?;
1318,
)))
.await
{
let task = spawn(async move {
let mut total_bytes = 0;

let bytes = [1, 2, 3, 4, 5, 6, 7];
let mut size = conn.send(&bytes).await?;
total_bytes += size;
println!("Sent {} bytes", size);
let bytes = [1, 2, 3, 4, 5, 6, 7];
let mut size = conn.send(&bytes)?;
total_bytes += size;
println!("Sent {} bytes", size);

size = conn.send(&[12, 12, 12, 12, 12, 12]).await?;
total_bytes += size;
println!("Sent {} bytes", size);
size = conn.send(&[12, 12, 12, 12, 12, 12])?;
total_bytes += size;
println!("Sent {} bytes", size);

size = conn.send(&[13; 100]).await?;
total_bytes += size;
println!("Sent {} bytes", size);
size = conn.send(&[13; 100])?;
total_bytes += size;
println!("Sent {} bytes", size);

sleep(Duration::from_secs(1)).await;
sleep(Duration::from_secs(1)).await;

size = conn.send(&[14, 14, 14, 14, 14, 14]).await?;
total_bytes += size;
println!("Sent {} bytes", size);
size = conn.send(&[14, 14, 14, 14, 14, 14])?;
total_bytes += size;
println!("Sent {} bytes", size);

for ix in 0..5000000 {
// let my_array: [u8; 32] = rand::random();
let my_array = [0u8; 1500];
size = conn.send(&my_array).await?;
total_bytes += size;
if ix % 3000 == 0 {
sleep(Duration::from_millis(3)).await;
for ix in 0..10000000 {
// let my_array: [u8; 32] = rand::random();
size = conn.send(&my_array)?;
total_bytes += size;
if ix % 4000 == 0 {
sleep(Duration::from_millis(1)).await;
}
}
}
println!("Sent {} bytes", total_bytes);
sleep(Duration::from_secs(2)).await;
println!("Sent {} bytes", total_bytes);
sleep(Duration::from_secs(3)).await;

Ok::<(), BluefinError>(())
});
tasks.push(task);
Ok::<(), BluefinError>(())
});
tasks.push(task);
sleep(Duration::from_millis(1)).await;
}
}

for t in tasks {
Expand Down
78 changes: 51 additions & 27 deletions src/bin/server.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
#![cfg_attr(coverage_nightly, feature(coverage_attribute))]
use bluefin::{net::server::BluefinServer, utils::common::BluefinResult};
use std::{
cmp::{max, min},
net::{Ipv4Addr, SocketAddrV4},
time::Instant,
};

use bluefin::{net::server::BluefinServer, utils::common::BluefinResult};
use tokio::{spawn, task::JoinSet};

#[cfg_attr(coverage_nightly, coverage(off))]
Expand All @@ -24,68 +23,93 @@ async fn run() -> BluefinResult<()> {
Ipv4Addr::new(127, 0, 0, 1),
1318,
)));
server.set_num_reader_workers(300)?;
server.set_num_reader_workers(3)?;
server.bind().await?;
let mut join_set = JoinSet::new();

const MAX_NUM_CONNECTIONS: usize = 2;
for conn_num in 0..MAX_NUM_CONNECTIONS {
let mut s = server.clone();
let _num = conn_num;
let mut _num = 0;
while let Ok(mut conn) = server.accept().await {
let _ = join_set.spawn(async move {
let _conn = s.accept().await;

match _conn {
Ok(mut conn) => {
let mut total_bytes = 0;
let mut recv_bytes = [0u8; 80000];
let mut recv_bytes = [0u8; 10000];
let mut min_bytes = usize::MAX;
let mut max_bytes = 0;
let mut iteration = 1;
let mut iteration: i64 = 1;
let mut num_iterations_without_print = 0;
let mut max_throughput = 0.0;
let mut min_throughput = f64::MAX;
let now = Instant::now();
loop {
let size = conn.recv(&mut recv_bytes, 80000).await.unwrap();
let size = conn.recv(&mut recv_bytes, 10000).await.unwrap();
total_bytes += size;
min_bytes = min(size, min_bytes);
max_bytes = max(size, max_bytes);
// eprintln!("read {} bytes --- total bytes: {}", size, total_bytes);

/*
println!(
"({:x}_{:x}) >>> Received: {:?} (total: {})",
"({:x}_{:x}) >>> Received: {} bytes",
conn.src_conn_id,
conn.dst_conn_id,
&recv_bytes[..size],
total_bytes
);
*/
num_iterations_without_print += 1;
if total_bytes >= 100000 && num_iterations_without_print == 200 {
if total_bytes >= 1000000 && num_iterations_without_print == 3500 {
let elapsed = now.elapsed().as_secs();
if elapsed == 0 {
eprintln!("(#{})Total bytes: {} (0s???)", _num, total_bytes);
num_iterations_without_print = 0;
continue;
}
let through_put = u64::try_from(total_bytes).unwrap() / elapsed;
let through_put_mb = through_put as f64 / 1e6;
let avg_recv_bytes: f64 = total_bytes as f64 / iteration as f64;
eprintln!(
"{} {:.1} kb/s or {:.1} mb/s (read {:.1} kb/iteration, min: {:.1} kb, max: {:.1} kb)",

if through_put_mb > max_throughput {
max_throughput = through_put_mb;
}

if through_put_mb < min_throughput {
min_throughput = through_put_mb;
}

if through_put_mb < 1000.0 {
eprintln!(
"{} {:.1} kb/s or {:.1} mb/s (read {:.1} kb/iteration, min: {:.1} kb, max: {:.1} kb) (max {:.1} mb/s, min {:.1} mb/s)",
_num,
through_put as f64 / 1e3,
through_put as f64 / 1e6,
through_put_mb,
avg_recv_bytes / 1e3,
min_bytes as f64 / 1e3,
max_bytes as f64 / 1e3
max_bytes as f64 / 1e3,
max_throughput,
min_throughput
);
num_iterations_without_print = 0;
} else {
eprintln!(
"{} {:.2} gb/s (read {:.1} kb/iter, min: {:.1} kb, max: {:.1} kb) (max {:.2} gb/s, min {:.1} kb/s)",
_num,
through_put_mb / 1e3,
avg_recv_bytes / 1e3,
min_bytes as f64 / 1e3,
max_bytes as f64 / 1e3,
max_throughput / 1e3,
min_throughput
);
}
num_iterations_without_print = 0;
// break;
}
iteration += 1;
}
}
Err(e) => {
eprintln!("Could not accept connection due to error: {:?}", e);
}
}
});
_num += 1;
if _num >= 2 {
break;
}
}

join_set.join_all().await;
Ok(())
}
11 changes: 11 additions & 0 deletions src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,17 @@ pub mod error;
pub mod header;
pub mod packet;

pub trait Extract: Default {
/// Replace self with default and returns the initial value.
fn extract(&mut self) -> Self;
}

impl<T: Default> Extract for T {
fn extract(&mut self) -> Self {
std::mem::replace(self, T::default())
}
}

pub trait Serialisable {
fn serialise(&self) -> Vec<u8>;
fn deserialise(bytes: &[u8]) -> Result<Self, BluefinError>
Expand Down
13 changes: 13 additions & 0 deletions src/core/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,19 @@ impl Serialisable for BluefinPacket {
}
}

impl Default for BluefinPacket {
#[allow(invalid_value)]
#[inline]
fn default() -> Self {
// SAFETY
// Actually, this isn't safe and access to this kind of zero'd value would result
// in panics. There does not exist a 'default' bluefin packet. Therefore, the
// purpose of this is to quickly instantiate a 'filler' bluefin packet BUT this
// default value should NEVER be read/used.
unsafe { std::mem::zeroed() }
}
}

impl BluefinPacket {
#[inline]
pub fn builder() -> BluefinPacketBuilder {
Expand Down
16 changes: 8 additions & 8 deletions src/net/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ use std::{
use rand::Rng;
use tokio::{net::UdpSocket, sync::RwLock};

use super::{
connection::{BluefinConnection, ConnectionBuffer, ConnectionManager},
AckBuffer, ConnectionManagedBuffers,
};
use crate::utils::get_udp_socket;
use crate::{
core::{context::BluefinHost, error::BluefinError, header::PacketType, Serialisable},
net::{
Expand All @@ -15,12 +20,7 @@ use crate::{
utils::common::BluefinResult,
};

use super::{
connection::{BluefinConnection, ConnectionBuffer, ConnectionManager},
AckBuffer, ConnectionManagedBuffers,
};

const NUM_TX_WORKERS_FOR_CLIENT_DEFAULT: u16 = 5;
const NUM_TX_WORKERS_FOR_CLIENT_DEFAULT: u16 = 1;

pub struct BluefinClient {
socket: Option<Arc<UdpSocket>>,
Expand Down Expand Up @@ -53,7 +53,7 @@ impl BluefinClient {
}

pub async fn connect(&mut self, dst_addr: SocketAddr) -> BluefinResult<BluefinConnection> {
let socket = Arc::new(UdpSocket::bind(self.src_addr).await?);
let socket = Arc::new(get_udp_socket(self.src_addr)?);
self.socket = Some(Arc::clone(&socket));
self.dst_addr = Some(dst_addr);

Expand Down Expand Up @@ -137,8 +137,8 @@ impl BluefinClient {
packet_number + 2,
Arc::clone(&conn_buffer),
Arc::clone(&ack_buff),
Arc::clone(self.socket.as_ref().unwrap()),
self.dst_addr.unwrap(),
self.src_addr,
))
}
}
Loading

0 comments on commit 5fc3314

Please sign in to comment.