Skip to content

Commit

Permalink
cleanup + minor speedup
Browse files Browse the repository at this point in the history
  • Loading branch information
Frank Lee committed Dec 31, 2024
1 parent 2cd2362 commit a95de88
Show file tree
Hide file tree
Showing 12 changed files with 112 additions and 465 deletions.
6 changes: 2 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,13 @@ repository = "https://github.com/franklee26/bluefin"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
etherparse = "0.16.0"
local-ip-address = "0.6.3"
rand = "0.8.5"
rstest = "0.23.0"
thiserror = "2.0.8"
thiserror = "2.0.9"
tokio = { version = "1.42.0", features = ["full", "tracing"] }
console-subscriber = "0.4.1"
libc = "0.2.164"
sysctl = "0.6.0"
socket2 = "0.5.8"

[dev-dependencies]
Expand All @@ -39,7 +37,7 @@ path = "src/bin/server.rs"
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(coverage,coverage_nightly)', 'cfg(kani)'] }

[profile.release]
opt-level = 3
opt-level = 3
codegen-units = 1
lto = "fat"
debug = true
2 changes: 1 addition & 1 deletion src/bin/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ async fn main() -> BluefinResult<()> {
total_bytes += size;
println!("Sent {} bytes", size);

Check warning on line 50 in src/bin/client.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/client.rs#L48-L50

Added lines #L48 - L50 were not covered by tests

let my_array = [0u8; 1500];
for ix in 0..10000000 {
// let my_array: [u8; 32] = rand::random();
let my_array = [0u8; 1500];
size = conn.send(&my_array)?;
total_bytes += size;
if ix % 4000 == 0 {
Expand Down
3 changes: 0 additions & 3 deletions src/bin/server.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
#![cfg_attr(coverage_nightly, feature(coverage_attribute))]
use bluefin::{net::server::BluefinServer, utils::common::BluefinResult};
use std::time::Duration;
use std::{
cmp::{max, min},
net::{Ipv4Addr, SocketAddrV4},
time::Instant,
};
use tokio::task::yield_now;
use tokio::time::sleep;
use tokio::{spawn, task::JoinSet};

#[cfg_attr(coverage_nightly, coverage(off))]
Expand Down
11 changes: 11 additions & 0 deletions src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,17 @@ pub mod error;
pub mod header;
pub mod packet;

pub trait Extract: Default {
/// Replace self with default and returns the initial value.
fn extract(&mut self) -> Self;
}

impl<T: Default> Extract for T {
fn extract(&mut self) -> Self {
std::mem::replace(self, T::default())
}
}

pub trait Serialisable {
fn serialise(&self) -> Vec<u8>;
fn deserialise(bytes: &[u8]) -> Result<Self, BluefinError>
Expand Down
13 changes: 13 additions & 0 deletions src/core/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,19 @@ impl Serialisable for BluefinPacket {
}
}

impl Default for BluefinPacket {
#[allow(invalid_value)]
#[inline]
fn default() -> Self {
// SAFETY
// Actually, this isn't safe and access to this kind of zero'd value would result
// in panics. There does not exist a 'default' bluefin packet. Therefore, the
// purpose of this is to quickly instantiate a 'filler' bluefin packet BUT this
// default value should NEVER be read/used.
unsafe { std::mem::zeroed() }
}

Check warning on line 64 in src/core/packet.rs

View check run for this annotation

Codecov / codecov/patch

src/core/packet.rs#L57-L64

Added lines #L57 - L64 were not covered by tests
}

impl BluefinPacket {
#[inline]
pub fn builder() -> BluefinPacketBuilder {
Expand Down
3 changes: 1 addition & 2 deletions src/net/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::{
utils::common::BluefinResult,
};

const NUM_TX_WORKERS_FOR_CLIENT_DEFAULT: u16 = 2;
const NUM_TX_WORKERS_FOR_CLIENT_DEFAULT: u16 = 1;

pub struct BluefinClient {
socket: Option<Arc<UdpSocket>>,
Expand Down Expand Up @@ -137,7 +137,6 @@ impl BluefinClient {
packet_number + 2,
Arc::clone(&conn_buffer),
Arc::clone(&ack_buff),
Arc::clone(self.socket.as_ref().unwrap()),
self.dst_addr.unwrap(),
self.src_addr,
))
Expand Down
10 changes: 3 additions & 7 deletions src/net/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,19 @@ use std::{
time::Duration,
};

use tokio::{net::UdpSocket, time::timeout};
use tokio::time::timeout;

use crate::{
core::{context::BluefinHost, error::BluefinError, packet::BluefinPacket},
utils::common::BluefinResult,
worker::{
reader::ReaderRxChannel,
writer::{WriterHandler, WriterTxChannel},
},
worker::{reader::ReaderRxChannel, writer::WriterHandler},
};

use super::{
build_and_start_ack_consumer_workers, build_and_start_conn_reader_tx_channels,
get_connected_udp_socket,
ordered_bytes::{ConsumeResult, OrderedBytes},
AckBuffer, ConnectionManagedBuffers, WriterQueue,
AckBuffer, ConnectionManagedBuffers,
};

pub const MAX_BUFFER_SIZE: usize = 2000;
Expand Down Expand Up @@ -268,7 +265,6 @@ impl BluefinConnection {
next_send_packet_num: u64,
conn_buffer: Arc<Mutex<ConnectionBuffer>>,
ack_buffer: Arc<Mutex<AckBuffer>>,
socket: Arc<UdpSocket>,
dst_addr: SocketAddr,
src_addr: SocketAddr,
) -> Self {
Expand Down
11 changes: 2 additions & 9 deletions src/net/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
use std::{
net::SocketAddr,
sync::{Arc, Mutex},
};
use std::sync::{Arc, Mutex};

use ack_handler::{AckBuffer, AckConsumer};
use connection::{ConnectionBuffer, ConnectionManager};
Expand All @@ -14,11 +11,7 @@ use crate::{
packet::BluefinPacket,
},
utils::{common::BluefinResult, get_connected_udp_socket},
worker::{
conn_reader::ConnReaderHandler,
reader::ReaderTxChannel,
writer::{WriterQueue, WriterRxChannel},
},
worker::{conn_reader::ConnReaderHandler, reader::ReaderTxChannel},
};

pub mod ack_handler;
Expand Down
3 changes: 1 addition & 2 deletions src/net/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use super::{
connection::{BluefinConnection, ConnectionBuffer, ConnectionManager},
AckBuffer, ConnectionManagedBuffers,
};
const NUM_TX_WORKERS_FOR_SERVER_DEFAULT: u16 = 2;
const NUM_TX_WORKERS_FOR_SERVER_DEFAULT: u16 = 1;

pub struct BluefinServer {
socket: Option<Arc<UdpSocket>>,
Expand Down Expand Up @@ -133,7 +133,6 @@ impl BluefinServer {
packet_number + 1,
Arc::clone(&conn_buffer),
Arc::clone(&ack_buffer),
Arc::clone(self.socket.as_ref().unwrap()),
addr,
self.src_addr,
))
Expand Down
29 changes: 26 additions & 3 deletions src/worker/conn_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ use crate::net::connection::ConnectionBuffer;
use crate::net::{ConnectionManagedBuffers, MAX_BLUEFIN_BYTES_IN_UDP_DATAGRAM};
use crate::utils::common::BluefinResult;
use std::sync::{Arc, MutexGuard};
use std::thread::available_parallelism;

const DEFAULT_NUMBER_OF_TASKS_TO_SPAWN: usize = 3;

pub(crate) struct ConnReaderHandler {
socket: Arc<UdpSocket>,
Expand All @@ -24,8 +25,7 @@ impl ConnReaderHandler {

pub(crate) fn start(&self) -> BluefinResult<()> {
let (tx, rx) = mpsc::channel::<Vec<BluefinPacket>>(1024);
let num_cpu_cores = available_parallelism()?.get();
for _ in 0..1 {
for _ in 0..Self::get_num_cpu_cores() {
let tx_cloned = tx.clone();
let socket_cloned = self.socket.clone();
spawn(async move {
Expand All @@ -40,6 +40,29 @@ impl ConnReaderHandler {
Ok(())
}

#[allow(unreachable_code)]
#[inline]
fn get_num_cpu_cores() -> usize {
// For linux, let's use all the cpu cores available.
#[cfg(target_os = "linux")]
{
use std::thread::available_parallelism;
if let Ok(num_cpu_cores) = available_parallelism() {
return num_cpu_cores.get();
}
}

// For macos (at least silicon macs), we can't seem to use
// SO_REUSEPORT to our benefit. We will pretend we have one core.
#[cfg(target_os = "macos")]
{
return 1;
}

// For everything else, we assume the default.
DEFAULT_NUMBER_OF_TASKS_TO_SPAWN

Check warning on line 63 in src/worker/conn_reader.rs

View check run for this annotation

Codecov / codecov/patch

src/worker/conn_reader.rs#L52-L63

Added lines #L52 - L63 were not covered by tests
}

#[inline]
async fn tx_impl(
socket: Arc<UdpSocket>,
Expand Down
4 changes: 2 additions & 2 deletions src/worker/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::{
utils::common::BluefinResult,
};

use super::writer::{WriterHandler, WriterTxChannel};
use super::writer::WriterHandler;

#[derive(Clone)]
/// [ReaderTxChannel] is the transmission channel for the receiving [ReaderRxChannel]. This channel will when
Expand Down Expand Up @@ -77,7 +77,7 @@ impl ReaderRxChannel {
future,
writer_handler,
packets_consumed: 0,
packets_consumed_before_ack: 100,
packets_consumed_before_ack: 200,
}
}

Expand Down
Loading

0 comments on commit a95de88

Please sign in to comment.