Skip to content

Commit

Permalink
doc updates + version bump (#36)
Browse files Browse the repository at this point in the history
* first comments + version bump

* more updates + cleanups

* fix send return

* small docs for writer

---------

Co-authored-by: Frank Lee <>
  • Loading branch information
franklee26 authored Dec 31, 2024
1 parent 5fc3314 commit 0056d6d
Show file tree
Hide file tree
Showing 8 changed files with 118 additions and 114 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "bluefin"
version = "0.1.5"
version = "0.1.6"
edition = "2021"
description = "An experimental, secure, P2P, transport-layer protocol."
license = "MIT"
Expand Down
84 changes: 36 additions & 48 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
# Bluefin

`Bluefin` is an experimental, P2P, transport-layer protocol.
`Bluefin` is an experimental, P2P, transport-layer protocol. Unlike TCP, `Bluefin` runs in user-space and can allow for
faster development cycles, greater flexibility in new features and more performant resource management compared to
kernel-space implementations.
`Bluefin` is currently only supported on MacOs and Linux.

[![Latest Version]][crates.io]
[![Latest Version]][crates.io]
[![Documentation]][docs.rs]
![Github Workflow](https://github.com/franklee26/bluefin/actions/workflows/bluefin.yml/badge.svg)
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)
[![codecov](https://codecov.io/github/franklee26/bluefin/graph/badge.svg?token=U0XPUZVE0I)](https://codecov.io/github/franklee26/bluefin)

### Example

#### Pack-leader

```rust
#[tokio::main]
async fn main() -> BluefinResult<()> {
Expand All @@ -19,36 +24,18 @@ async fn main() -> BluefinResult<()> {
)));
server.bind().await?;

const MAX_NUM_CONNECTIONS: usize = 5;
for _ in 0..MAX_NUM_CONNECTIONS {
let mut s = server.clone();
while let Ok(conn) = s.accept().await {
let _ = spawn(async move {
let mut recv_bytes = [0u8; 1024];
loop {
let _conn = s.accept().await;

match _conn {
Ok(mut conn) => {
spawn(async move {
loop {
let mut recv_bytes = [0u8; 1024];
let size = conn.recv(&mut recv_bytes, 100).await.unwrap();

println!(
"({:x}_{:x}) >>> Received: {:?}",
conn.src_conn_id,
conn.dst_conn_id,
&recv_bytes[..size],
);
sleep(Duration::from_secs(1)).await;
}
});
}
Err(e) => {
eprintln!("Could not accept connection due to error: {:?}", e);
}
}

sleep(Duration::from_secs(1)).await;
let size = conn.recv(&mut recv_bytes, 1024).await.unwrap();

println!(
"({:x}_{:x}) >>> Received: {:?}",
conn.src_conn_id,
conn.dst_conn_id,
&recv_bytes[..size],
);
}
});
}
Expand All @@ -58,34 +45,35 @@ async fn main() -> BluefinResult<()> {
loop {}
}
```

#### Client

```rust
#[tokio::main]
async fn main() -> BluefinResult<()> {
let task = spawn(async move {
let mut client = BluefinClient::new(std::net::SocketAddr::V4(SocketAddrV4::new(
let mut client = BluefinClient::new(std::net::SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::new(192, 168, 1, 38),
1234,
)));
let mut conn = client
.connect(std::net::SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::new(192, 168, 1, 38),
1234,
)));
let mut conn = client
.connect(std::net::SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::new(192, 168, 1, 38),
1235,
)))
.await?;

let bytes = [1, 2, 3, 4];
let mut size = conn.send(&bytes).await?;
println!("Sent {} bytes", size);

Ok::<(), BluefinError>(())
});
1235,
)))
.await?;

let bytes = [1, 2, 3, 4];
let size = conn.send(&bytes);
println!("Sent {} bytes", size);

Ok(())
}
```


[Latest Version]: https://img.shields.io/crates/v/bluefin.svg

[crates.io]: https://crates.io/crates/bluefin

[Documentation]: https://docs.rs/bluefin/badge.svg

[docs.rs]: https://docs.rs/bluefin
7 changes: 4 additions & 3 deletions src/core/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ use crate::utils::common::BluefinResult;
use super::{error::BluefinError, Serialisable};

/// 4 bits reserved for PacketType => 16 possible packet types
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
pub enum PacketType {
UnencryptedClientHello = 0x00,
UnencryptedServerHello = 0x01,
ClientAck = 0x02,
#[default]
UnencryptedData = 0x03,
Ack = 0x04,
}
Expand All @@ -26,7 +27,7 @@ impl PacketType {
}

/// This struct contains the encryption flag and header-protection fields for a total of 8 bits
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
pub struct BluefinSecurityFields {
/// header_encrypted is one bit and signals whether the header contains encrypted fields
header_encrypted: bool,
Expand Down Expand Up @@ -87,7 +88,7 @@ impl Serialisable for BluefinSecurityFields {
/// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/// ```
///
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
pub struct BluefinHeader {
// The version is 4 bits
pub version: u8,
Expand Down
10 changes: 4 additions & 6 deletions src/core/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,10 @@ impl Default for BluefinPacket {
#[allow(invalid_value)]
#[inline]
fn default() -> Self {
// SAFETY
// Actually, this isn't safe and access to this kind of zero'd value would result
// in panics. There does not exist a 'default' bluefin packet. Therefore, the
// purpose of this is to quickly instantiate a 'filler' bluefin packet BUT this
// default value should NEVER be read/used.
unsafe { std::mem::zeroed() }
Self {
header: Default::default(),
payload: vec![],
}
}
}

Expand Down
7 changes: 2 additions & 5 deletions src/net/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,14 +305,11 @@ impl BluefinConnection {
#[inline]
pub async fn recv(&mut self, buf: &mut [u8], len: usize) -> BluefinResult<usize> {
let (size, _) = self.reader_rx.read(len, buf).await?;
return Ok(size as usize);
Ok(size as usize)
}

#[inline]
pub fn send(&mut self, buf: &[u8]) -> BluefinResult<usize> {
// TODO! This returns the total bytes sent (including bluefin payload). This
// really should only return the total payload bytes
self.writer_handler.send_data(buf)?;
Ok(buf.len())
self.writer_handler.send_data(buf)
}
}
53 changes: 44 additions & 9 deletions src/worker/conn_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,22 @@ use tokio::sync::mpsc::{self};
use crate::core::error::BluefinError;
use crate::core::header::PacketType;
use crate::core::packet::BluefinPacket;
use crate::core::Extract;
use crate::net::ack_handler::AckBuffer;
use crate::net::connection::ConnectionBuffer;
use crate::net::{ConnectionManagedBuffers, MAX_BLUEFIN_BYTES_IN_UDP_DATAGRAM};
use crate::utils::common::BluefinResult;
use std::sync::{Arc, MutexGuard};

/// This is arbitrary number of worker tasks to use if we cannot decide how many worker tasks
/// to spawn.
const DEFAULT_NUMBER_OF_TASKS_TO_SPAWN: usize = 3;

/// [ConnReaderHandler] is a handle to network read-related functionalities. As the name suggests,
/// we this handler is specific for *connection* reads. That is, this handler can only be used
/// when a Bluefin connection has been established. This reader is fundamentally different from that
/// of the [crate::worker::reader::ReaderRxChannel] as this will only read packets from the wire
/// intended for the connection.
pub(crate) struct ConnReaderHandler {
socket: Arc<UdpSocket>,
conn_bufs: Arc<ConnectionManagedBuffers>,
Expand All @@ -23,26 +31,44 @@ impl ConnReaderHandler {
Self { socket, conn_bufs }
}

/// Starts the handler worker jobs. This starts the worker tasks, which busy-polls a connected
/// UDP socket for packets. Upon receiving bytes, these workers will send them to another
/// channel for processing. Then second kind of worker is the processing channel, which receives
/// bytes, attempts to deserialise them into bluefin packets and buffer them in the correct
/// buffer.
pub(crate) fn start(&self) -> BluefinResult<()> {
let (tx, rx) = mpsc::channel::<Vec<BluefinPacket>>(1024);
for _ in 0..Self::get_num_cpu_cores() {

// Spawn n-number of UDP-recv tasks.
for _ in 0..Self::get_number_of_tx_tasks() {
let tx_cloned = tx.clone();
let socket_cloned = self.socket.clone();
spawn(async move {
let _ = ConnReaderHandler::tx_impl(socket_cloned, tx_cloned).await;
});
}

// Spawn the corresponding rx channel which receives bytes from the tx channel and processes
// bytes and buffers them.
let conn_bufs = self.conn_bufs.clone();
spawn(async move {
let _ = ConnReaderHandler::rx_impl(rx, &*conn_bufs).await;
});
Ok(())
}

/// For linux, we return the expected number of CPU cores. This lets us take advantage of
/// parallelism. For (silicon) macos, we return one. Experiments on Apple Silicon have shown
/// that SO_REUSEPORT does not behave the same way as it does on Linux
/// (see: https://stackoverflow.com/questions/51998042/macos-so-reuseaddr-so-reuseport-not-consistent-with-linux)
/// and so we cannot take advantage of running the rx-tasks on multiple threads. For now, running
/// one instance of it is performant enough.
///
/// For all other operating systems (which is currently unsupported by Bluefine anyways), we
/// return an arbitrary default value.
#[allow(unreachable_code)]
#[inline]
fn get_num_cpu_cores() -> usize {
fn get_number_of_tx_tasks() -> usize {
// For linux, let's use all the cpu cores available.
#[cfg(target_os = "linux")]
{
Expand All @@ -63,6 +89,10 @@ impl ConnReaderHandler {
DEFAULT_NUMBER_OF_TASKS_TO_SPAWN
}

/// This represents one tx task or one of the multiple producers in the mpsc channel. This
/// function is a hot-loop; it continuously reads from a connected socket. When bytes are
/// received, we attempt to deserialise them into bluefin packets. If valid packets are
/// produced, them we send them to the consumer channel for processing.
#[inline]
async fn tx_impl(
socket: Arc<UdpSocket>,
Expand All @@ -73,14 +103,12 @@ impl ConnReaderHandler {
let size = socket.recv(&mut buf).await?;
let packets = BluefinPacket::from_bytes(&buf[..size])?;

if packets.len() == 0 {
continue;
}

let _ = tx.send(packets).await;
}
}

/// This is the single consumer in the mpsc channel. This receives bluefin packets from
/// n-producers. We place the packets into the relevant buffer.
#[inline]
async fn rx_impl(
mut rx: mpsc::Receiver<Vec<BluefinPacket>>,
Expand All @@ -103,7 +131,14 @@ impl ConnReaderHandler {
return Ok(());
}

// Peek at the first packet and acquire the buffer.
// Peek at the first packet and acquire the buffer. The assumptions here are:
// 1. An udp datagram contains one or more bluefin packets. However, all the packets
// in the datagram are for the same connection (no mix and matching different connection
// packets in the same datagram).
// 2. An udp datagram contains the same type of packets. This means a udp datagram either
// contains all data-type packets or ack-packets.
// Therefore, with these assumptions, we can just peek at the first packet in the datagram
// and then acquire the appropriate lock before processing.
let p = packets.first().unwrap();
match p.header.type_field {
PacketType::Ack => {
Expand Down Expand Up @@ -142,8 +177,8 @@ impl ConnReaderHandler {
packets: Vec<BluefinPacket>,
) -> BluefinResult<()> {
let mut e: Option<BluefinError> = None;
for p in packets {
if let Err(err) = guard.buffer_in_bytes(p) {
for mut p in packets {
if let Err(err) = guard.buffer_in_bytes(p.extract()) {
e = Some(err);
}
}
Expand Down
Loading

0 comments on commit 0056d6d

Please sign in to comment.