diff --git a/Cargo.toml b/Cargo.toml index 79c69a8..1362258 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "bluefin" -version = "0.1.5" +version = "0.1.6" edition = "2021" description = "An experimental, secure, P2P, transport-layer protocol." license = "MIT" diff --git a/README.md b/README.md index a218f66..0e6e744 100644 --- a/README.md +++ b/README.md @@ -1,15 +1,20 @@ # Bluefin -`Bluefin` is an experimental, P2P, transport-layer protocol. +`Bluefin` is an experimental, P2P, transport-layer protocol. Unlike TCP, `Bluefin` runs in user-space and can allow for +faster development cycles, greater flexibility in new features and more performant resource management compared to +kernel-space implementations. +`Bluefin` is currently only supported on MacOs and Linux. -[![Latest Version]][crates.io] +[![Latest Version]][crates.io] [![Documentation]][docs.rs] ![Github Workflow](https://github.com/franklee26/bluefin/actions/workflows/bluefin.yml/badge.svg) [![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT) [![codecov](https://codecov.io/github/franklee26/bluefin/graph/badge.svg?token=U0XPUZVE0I)](https://codecov.io/github/franklee26/bluefin) ### Example + #### Pack-leader + ```rust #[tokio::main] async fn main() -> BluefinResult<()> { @@ -19,36 +24,18 @@ async fn main() -> BluefinResult<()> { ))); server.bind().await?; - const MAX_NUM_CONNECTIONS: usize = 5; - for _ in 0..MAX_NUM_CONNECTIONS { - let mut s = server.clone(); + while let Ok(conn) = s.accept().await { let _ = spawn(async move { + let mut recv_bytes = [0u8; 1024]; loop { - let _conn = s.accept().await; - - match _conn { - Ok(mut conn) => { - spawn(async move { - loop { - let mut recv_bytes = [0u8; 1024]; - let size = conn.recv(&mut recv_bytes, 100).await.unwrap(); - - println!( - "({:x}_{:x}) >>> Received: {:?}", - conn.src_conn_id, - conn.dst_conn_id, - &recv_bytes[..size], - ); - sleep(Duration::from_secs(1)).await; - } - }); - } - Err(e) => { - eprintln!("Could not accept connection due to error: {:?}", e); - } - } - - sleep(Duration::from_secs(1)).await; + let size = conn.recv(&mut recv_bytes, 1024).await.unwrap(); + + println!( + "({:x}_{:x}) >>> Received: {:?}", + conn.src_conn_id, + conn.dst_conn_id, + &recv_bytes[..size], + ); } }); } @@ -58,34 +45,35 @@ async fn main() -> BluefinResult<()> { loop {} } ``` + #### Client + ```rust #[tokio::main] async fn main() -> BluefinResult<()> { - let task = spawn(async move { - let mut client = BluefinClient::new(std::net::SocketAddr::V4(SocketAddrV4::new( + let mut client = BluefinClient::new(std::net::SocketAddr::V4(SocketAddrV4::new( + Ipv4Addr::new(192, 168, 1, 38), + 1234, + ))); + let mut conn = client + .connect(std::net::SocketAddr::V4(SocketAddrV4::new( Ipv4Addr::new(192, 168, 1, 38), - 1234, - ))); - let mut conn = client - .connect(std::net::SocketAddr::V4(SocketAddrV4::new( - Ipv4Addr::new(192, 168, 1, 38), - 1235, - ))) - .await?; - - let bytes = [1, 2, 3, 4]; - let mut size = conn.send(&bytes).await?; - println!("Sent {} bytes", size); - - Ok::<(), BluefinError>(()) - }); + 1235, + ))) + .await?; + + let bytes = [1, 2, 3, 4]; + let size = conn.send(&bytes); + println!("Sent {} bytes", size); + Ok(()) } ``` - [Latest Version]: https://img.shields.io/crates/v/bluefin.svg + [crates.io]: https://crates.io/crates/bluefin + [Documentation]: https://docs.rs/bluefin/badge.svg + [docs.rs]: https://docs.rs/bluefin \ No newline at end of file diff --git a/src/core/header.rs b/src/core/header.rs index 19cf71e..7023fb0 100644 --- a/src/core/header.rs +++ b/src/core/header.rs @@ -3,11 +3,12 @@ use crate::utils::common::BluefinResult; use super::{error::BluefinError, Serialisable}; /// 4 bits reserved for PacketType => 16 possible packet types -#[derive(Clone, Copy, Debug, PartialEq, Eq)] +#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)] pub enum PacketType { UnencryptedClientHello = 0x00, UnencryptedServerHello = 0x01, ClientAck = 0x02, + #[default] UnencryptedData = 0x03, Ack = 0x04, } @@ -26,7 +27,7 @@ impl PacketType { } /// This struct contains the encryption flag and header-protection fields for a total of 8 bits -#[derive(Clone, Copy, Debug, PartialEq, Eq)] +#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)] pub struct BluefinSecurityFields { /// header_encrypted is one bit and signals whether the header contains encrypted fields header_encrypted: bool, @@ -87,7 +88,7 @@ impl Serialisable for BluefinSecurityFields { /// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ /// ``` /// -#[derive(Copy, Clone, Debug, PartialEq, Eq)] +#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)] pub struct BluefinHeader { // The version is 4 bits pub version: u8, diff --git a/src/core/packet.rs b/src/core/packet.rs index 0e1cded..6a28b21 100644 --- a/src/core/packet.rs +++ b/src/core/packet.rs @@ -55,12 +55,10 @@ impl Default for BluefinPacket { #[allow(invalid_value)] #[inline] fn default() -> Self { - // SAFETY - // Actually, this isn't safe and access to this kind of zero'd value would result - // in panics. There does not exist a 'default' bluefin packet. Therefore, the - // purpose of this is to quickly instantiate a 'filler' bluefin packet BUT this - // default value should NEVER be read/used. - unsafe { std::mem::zeroed() } + Self { + header: Default::default(), + payload: vec![], + } } } diff --git a/src/net/connection.rs b/src/net/connection.rs index 20c0576..a09170e 100644 --- a/src/net/connection.rs +++ b/src/net/connection.rs @@ -305,14 +305,11 @@ impl BluefinConnection { #[inline] pub async fn recv(&mut self, buf: &mut [u8], len: usize) -> BluefinResult { let (size, _) = self.reader_rx.read(len, buf).await?; - return Ok(size as usize); + Ok(size as usize) } #[inline] pub fn send(&mut self, buf: &[u8]) -> BluefinResult { - // TODO! This returns the total bytes sent (including bluefin payload). This - // really should only return the total payload bytes - self.writer_handler.send_data(buf)?; - Ok(buf.len()) + self.writer_handler.send_data(buf) } } diff --git a/src/worker/conn_reader.rs b/src/worker/conn_reader.rs index e63cb38..01d2123 100644 --- a/src/worker/conn_reader.rs +++ b/src/worker/conn_reader.rs @@ -5,14 +5,22 @@ use tokio::sync::mpsc::{self}; use crate::core::error::BluefinError; use crate::core::header::PacketType; use crate::core::packet::BluefinPacket; +use crate::core::Extract; use crate::net::ack_handler::AckBuffer; use crate::net::connection::ConnectionBuffer; use crate::net::{ConnectionManagedBuffers, MAX_BLUEFIN_BYTES_IN_UDP_DATAGRAM}; use crate::utils::common::BluefinResult; use std::sync::{Arc, MutexGuard}; +/// This is arbitrary number of worker tasks to use if we cannot decide how many worker tasks +/// to spawn. const DEFAULT_NUMBER_OF_TASKS_TO_SPAWN: usize = 3; +/// [ConnReaderHandler] is a handle to network read-related functionalities. As the name suggests, +/// we this handler is specific for *connection* reads. That is, this handler can only be used +/// when a Bluefin connection has been established. This reader is fundamentally different from that +/// of the [crate::worker::reader::ReaderRxChannel] as this will only read packets from the wire +/// intended for the connection. pub(crate) struct ConnReaderHandler { socket: Arc, conn_bufs: Arc, @@ -23,9 +31,16 @@ impl ConnReaderHandler { Self { socket, conn_bufs } } + /// Starts the handler worker jobs. This starts the worker tasks, which busy-polls a connected + /// UDP socket for packets. Upon receiving bytes, these workers will send them to another + /// channel for processing. Then second kind of worker is the processing channel, which receives + /// bytes, attempts to deserialise them into bluefin packets and buffer them in the correct + /// buffer. pub(crate) fn start(&self) -> BluefinResult<()> { let (tx, rx) = mpsc::channel::>(1024); - for _ in 0..Self::get_num_cpu_cores() { + + // Spawn n-number of UDP-recv tasks. + for _ in 0..Self::get_number_of_tx_tasks() { let tx_cloned = tx.clone(); let socket_cloned = self.socket.clone(); spawn(async move { @@ -33,6 +48,8 @@ impl ConnReaderHandler { }); } + // Spawn the corresponding rx channel which receives bytes from the tx channel and processes + // bytes and buffers them. let conn_bufs = self.conn_bufs.clone(); spawn(async move { let _ = ConnReaderHandler::rx_impl(rx, &*conn_bufs).await; @@ -40,9 +57,18 @@ impl ConnReaderHandler { Ok(()) } + /// For linux, we return the expected number of CPU cores. This lets us take advantage of + /// parallelism. For (silicon) macos, we return one. Experiments on Apple Silicon have shown + /// that SO_REUSEPORT does not behave the same way as it does on Linux + /// (see: https://stackoverflow.com/questions/51998042/macos-so-reuseaddr-so-reuseport-not-consistent-with-linux) + /// and so we cannot take advantage of running the rx-tasks on multiple threads. For now, running + /// one instance of it is performant enough. + /// + /// For all other operating systems (which is currently unsupported by Bluefine anyways), we + /// return an arbitrary default value. #[allow(unreachable_code)] #[inline] - fn get_num_cpu_cores() -> usize { + fn get_number_of_tx_tasks() -> usize { // For linux, let's use all the cpu cores available. #[cfg(target_os = "linux")] { @@ -63,6 +89,10 @@ impl ConnReaderHandler { DEFAULT_NUMBER_OF_TASKS_TO_SPAWN } + /// This represents one tx task or one of the multiple producers in the mpsc channel. This + /// function is a hot-loop; it continuously reads from a connected socket. When bytes are + /// received, we attempt to deserialise them into bluefin packets. If valid packets are + /// produced, them we send them to the consumer channel for processing. #[inline] async fn tx_impl( socket: Arc, @@ -73,14 +103,12 @@ impl ConnReaderHandler { let size = socket.recv(&mut buf).await?; let packets = BluefinPacket::from_bytes(&buf[..size])?; - if packets.len() == 0 { - continue; - } - let _ = tx.send(packets).await; } } + /// This is the single consumer in the mpsc channel. This receives bluefin packets from + /// n-producers. We place the packets into the relevant buffer. #[inline] async fn rx_impl( mut rx: mpsc::Receiver>, @@ -103,7 +131,14 @@ impl ConnReaderHandler { return Ok(()); } - // Peek at the first packet and acquire the buffer. + // Peek at the first packet and acquire the buffer. The assumptions here are: + // 1. An udp datagram contains one or more bluefin packets. However, all the packets + // in the datagram are for the same connection (no mix and matching different connection + // packets in the same datagram). + // 2. An udp datagram contains the same type of packets. This means a udp datagram either + // contains all data-type packets or ack-packets. + // Therefore, with these assumptions, we can just peek at the first packet in the datagram + // and then acquire the appropriate lock before processing. let p = packets.first().unwrap(); match p.header.type_field { PacketType::Ack => { @@ -142,8 +177,8 @@ impl ConnReaderHandler { packets: Vec, ) -> BluefinResult<()> { let mut e: Option = None; - for p in packets { - if let Err(err) = guard.buffer_in_bytes(p) { + for mut p in packets { + if let Err(err) = guard.buffer_in_bytes(p.extract()) { e = Some(err); } } diff --git a/src/worker/reader.rs b/src/worker/reader.rs index 9aced05..31ae0ea 100644 --- a/src/worker/reader.rs +++ b/src/worker/reader.rs @@ -3,10 +3,9 @@ use std::{ net::SocketAddr, sync::{Arc, Mutex}, task::Poll, - time::Duration, }; -use tokio::{net::UdpSocket, sync::RwLock, time::sleep}; +use tokio::{net::UdpSocket, sync::RwLock}; use crate::{ core::{context::BluefinHost, error::BluefinError, header::PacketType, packet::BluefinPacket}, @@ -133,16 +132,6 @@ impl ReaderTxChannel { } } - #[inline] - async fn run_sleep(encountered_err: &mut bool) { - if !*encountered_err { - sleep(Duration::from_micros(1)).await; - return; - } - sleep(Duration::from_millis(5)).await; - *encountered_err = false; - } - #[inline] fn handle_for_handshake( &self, @@ -181,13 +170,11 @@ impl ReaderTxChannel { #[inline] fn build_conn_buff_key(is_hello: bool, src_conn_id: u32, dst_conn_id: u32) -> String { - return { - if !is_hello { - format!("{}_{}", src_conn_id, dst_conn_id) - } else { - format!("{}_0", src_conn_id) - } - }; + if !is_hello { + format!("{}_{}", src_conn_id, dst_conn_id) + } else { + format!("{}_0", src_conn_id) + } } fn buffer_to_conn_buffer( @@ -245,18 +232,14 @@ impl ReaderTxChannel { /// The [TxChannel]'s engine runner. This method will run forever and is responsible for reading bytes /// from the udp socket into a connection buffer. This method should be run its own asynchronous task. pub(crate) async fn run(&mut self) -> BluefinResult<()> { - let mut encountered_err = false; let mut buf = [0u8; MAX_BLUEFIN_BYTES_IN_UDP_DATAGRAM]; loop { - ReaderTxChannel::run_sleep(&mut encountered_err).await; - let (size, addr) = self.socket.recv_from(&mut buf).await?; let packets_res = BluefinPacket::from_bytes(&buf[..size]); // Not bluefin packet(s) or it's invalid. if let Err(e) = packets_res { - encountered_err = true; eprintln!("Encountered err: {:?}", e); continue; } @@ -264,7 +247,6 @@ impl ReaderTxChannel { // Acquire lock and buffer in data let packets = packets_res.unwrap(); if packets.len() == 0 { - encountered_err = true; continue; } @@ -281,7 +263,6 @@ impl ReaderTxChannel { self.handle_for_handshake(&packets[0], &mut is_hello, &mut src_conn_id) { eprintln!("{}", e); - encountered_err = true; continue; } } @@ -298,7 +279,6 @@ impl ReaderTxChannel { if _conn_buf.is_none() { eprintln!("Could not find connection {}", &key); - encountered_err = true; continue; } @@ -308,7 +288,6 @@ impl ReaderTxChannel { ReaderTxChannel::buffer_in_data(is_hello, self.host_type, p, addr, &buffers) { eprintln!("Failed to buffer in data: {}", e); - encountered_err = true; } } } diff --git a/src/worker/writer.rs b/src/worker/writer.rs index f89d212..b3e582d 100644 --- a/src/worker/writer.rs +++ b/src/worker/writer.rs @@ -18,12 +18,18 @@ use crate::{ utils::common::BluefinResult, }; +/// Internal representation of an ack. These fields will be used to build a Bluefin ack packet. #[derive(Clone, Copy)] struct AckData { base_packet_num: u64, num_packets_consumed: usize, } +/// [WriterHandler] is a handle for all write operations to the network. The writer handler is +/// responsible for accepting bytes, dividing them into Bluefin packets and then eventually sending +/// them on to the network. The handler ensures that the order in which the sends arrive at are +/// preserved and that we fit at most [MAX_BLUEFIN_BYTES_IN_UDP_DATAGRAM] bytes of bluefin packets +/// into a single UDP datagram. #[derive(Clone)] pub(crate) struct WriterHandler { socket: Arc, @@ -74,20 +80,21 @@ impl WriterHandler { } #[inline] - pub(crate) fn send_data(&self, payload: &[u8]) -> BluefinResult<()> { - if self.data_sender.is_none() { - return Err(BluefinError::WriteError( + pub(crate) fn send_data(&self, payload: &[u8]) -> BluefinResult { + match self.data_sender { + Some(ref sender) => { + if let Err(e) = sender.send(payload.to_vec()) { + return Err(BluefinError::WriteError(format!( + "Failed to send data due to error: {:?}", + e + ))); + } + Ok(payload.len()) + } + None => Err(BluefinError::WriteError( "Sender is not available. Cannot send.".to_string(), - )); + )), } - - if let Err(e) = self.data_sender.as_ref().unwrap().send(payload.to_vec()) { - return Err(BluefinError::WriteError(format!( - "Failed to send data due to error: {:?}", - e - ))); - } - Ok(()) } #[inline] @@ -165,9 +172,8 @@ impl WriterHandler { b.clear(); let size = rx.recv_many(&mut b, limit).await; for i in 0..size { - // Extract is a small optimization quicker. We avoid a (potentially) - // costly clone by moving the bytes out of the vec and replacing it - // via a zeroed default value. + // Extract is a small optimization. We avoid a (potentially) costly clone by + // moving the bytes out of the vec and replacing it via a zeroed default value. data_queue.push_back(b[i].extract()); }