Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

doc updates + version bump #36

Merged
merged 4 commits into from
Dec 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "bluefin"
version = "0.1.5"
version = "0.1.6"
edition = "2021"
description = "An experimental, secure, P2P, transport-layer protocol."
license = "MIT"
Expand Down
84 changes: 36 additions & 48 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
# Bluefin

`Bluefin` is an experimental, P2P, transport-layer protocol.
`Bluefin` is an experimental, P2P, transport-layer protocol. Unlike TCP, `Bluefin` runs in user-space and can allow for
faster development cycles, greater flexibility in new features and more performant resource management compared to
kernel-space implementations.
`Bluefin` is currently only supported on MacOs and Linux.

[![Latest Version]][crates.io]
[![Latest Version]][crates.io]
[![Documentation]][docs.rs]
![Github Workflow](https://github.com/franklee26/bluefin/actions/workflows/bluefin.yml/badge.svg)
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)
[![codecov](https://codecov.io/github/franklee26/bluefin/graph/badge.svg?token=U0XPUZVE0I)](https://codecov.io/github/franklee26/bluefin)

### Example

#### Pack-leader

```rust
#[tokio::main]
async fn main() -> BluefinResult<()> {
Expand All @@ -19,36 +24,18 @@ async fn main() -> BluefinResult<()> {
)));
server.bind().await?;

const MAX_NUM_CONNECTIONS: usize = 5;
for _ in 0..MAX_NUM_CONNECTIONS {
let mut s = server.clone();
while let Ok(conn) = s.accept().await {
let _ = spawn(async move {
let mut recv_bytes = [0u8; 1024];
loop {
let _conn = s.accept().await;

match _conn {
Ok(mut conn) => {
spawn(async move {
loop {
let mut recv_bytes = [0u8; 1024];
let size = conn.recv(&mut recv_bytes, 100).await.unwrap();

println!(
"({:x}_{:x}) >>> Received: {:?}",
conn.src_conn_id,
conn.dst_conn_id,
&recv_bytes[..size],
);
sleep(Duration::from_secs(1)).await;
}
});
}
Err(e) => {
eprintln!("Could not accept connection due to error: {:?}", e);
}
}

sleep(Duration::from_secs(1)).await;
let size = conn.recv(&mut recv_bytes, 1024).await.unwrap();

println!(
"({:x}_{:x}) >>> Received: {:?}",
conn.src_conn_id,
conn.dst_conn_id,
&recv_bytes[..size],
);
}
});
}
Expand All @@ -58,34 +45,35 @@ async fn main() -> BluefinResult<()> {
loop {}
}
```

#### Client

```rust
#[tokio::main]
async fn main() -> BluefinResult<()> {
let task = spawn(async move {
let mut client = BluefinClient::new(std::net::SocketAddr::V4(SocketAddrV4::new(
let mut client = BluefinClient::new(std::net::SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::new(192, 168, 1, 38),
1234,
)));
let mut conn = client
.connect(std::net::SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::new(192, 168, 1, 38),
1234,
)));
let mut conn = client
.connect(std::net::SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::new(192, 168, 1, 38),
1235,
)))
.await?;

let bytes = [1, 2, 3, 4];
let mut size = conn.send(&bytes).await?;
println!("Sent {} bytes", size);

Ok::<(), BluefinError>(())
});
1235,
)))
.await?;

let bytes = [1, 2, 3, 4];
let size = conn.send(&bytes);
println!("Sent {} bytes", size);

Ok(())
}
```


[Latest Version]: https://img.shields.io/crates/v/bluefin.svg

[crates.io]: https://crates.io/crates/bluefin

[Documentation]: https://docs.rs/bluefin/badge.svg

[docs.rs]: https://docs.rs/bluefin
7 changes: 4 additions & 3 deletions src/core/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ use crate::utils::common::BluefinResult;
use super::{error::BluefinError, Serialisable};

/// 4 bits reserved for PacketType => 16 possible packet types
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
pub enum PacketType {
UnencryptedClientHello = 0x00,
UnencryptedServerHello = 0x01,
ClientAck = 0x02,
#[default]
UnencryptedData = 0x03,
Ack = 0x04,
}
Expand All @@ -26,7 +27,7 @@ impl PacketType {
}

/// This struct contains the encryption flag and header-protection fields for a total of 8 bits
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
pub struct BluefinSecurityFields {
/// header_encrypted is one bit and signals whether the header contains encrypted fields
header_encrypted: bool,
Expand Down Expand Up @@ -87,7 +88,7 @@ impl Serialisable for BluefinSecurityFields {
/// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/// ```
///
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
pub struct BluefinHeader {
// The version is 4 bits
pub version: u8,
Expand Down
10 changes: 4 additions & 6 deletions src/core/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,10 @@ impl Default for BluefinPacket {
#[allow(invalid_value)]
#[inline]
fn default() -> Self {
// SAFETY
// Actually, this isn't safe and access to this kind of zero'd value would result
// in panics. There does not exist a 'default' bluefin packet. Therefore, the
// purpose of this is to quickly instantiate a 'filler' bluefin packet BUT this
// default value should NEVER be read/used.
unsafe { std::mem::zeroed() }
Self {
header: Default::default(),
payload: vec![],
}
}
}

Expand Down
7 changes: 2 additions & 5 deletions src/net/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,14 +305,11 @@ impl BluefinConnection {
#[inline]
pub async fn recv(&mut self, buf: &mut [u8], len: usize) -> BluefinResult<usize> {
let (size, _) = self.reader_rx.read(len, buf).await?;
return Ok(size as usize);
Ok(size as usize)
}

#[inline]
pub fn send(&mut self, buf: &[u8]) -> BluefinResult<usize> {
// TODO! This returns the total bytes sent (including bluefin payload). This
// really should only return the total payload bytes
self.writer_handler.send_data(buf)?;
Ok(buf.len())
self.writer_handler.send_data(buf)
}
}
53 changes: 44 additions & 9 deletions src/worker/conn_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,22 @@ use tokio::sync::mpsc::{self};
use crate::core::error::BluefinError;
use crate::core::header::PacketType;
use crate::core::packet::BluefinPacket;
use crate::core::Extract;
use crate::net::ack_handler::AckBuffer;
use crate::net::connection::ConnectionBuffer;
use crate::net::{ConnectionManagedBuffers, MAX_BLUEFIN_BYTES_IN_UDP_DATAGRAM};
use crate::utils::common::BluefinResult;
use std::sync::{Arc, MutexGuard};

/// This is arbitrary number of worker tasks to use if we cannot decide how many worker tasks
/// to spawn.
const DEFAULT_NUMBER_OF_TASKS_TO_SPAWN: usize = 3;

/// [ConnReaderHandler] is a handle to network read-related functionalities. As the name suggests,
/// we this handler is specific for *connection* reads. That is, this handler can only be used
/// when a Bluefin connection has been established. This reader is fundamentally different from that
/// of the [crate::worker::reader::ReaderRxChannel] as this will only read packets from the wire
/// intended for the connection.
pub(crate) struct ConnReaderHandler {
socket: Arc<UdpSocket>,
conn_bufs: Arc<ConnectionManagedBuffers>,
Expand All @@ -23,26 +31,44 @@ impl ConnReaderHandler {
Self { socket, conn_bufs }
}

/// Starts the handler worker jobs. This starts the worker tasks, which busy-polls a connected
/// UDP socket for packets. Upon receiving bytes, these workers will send them to another
/// channel for processing. Then second kind of worker is the processing channel, which receives
/// bytes, attempts to deserialise them into bluefin packets and buffer them in the correct
/// buffer.
pub(crate) fn start(&self) -> BluefinResult<()> {
let (tx, rx) = mpsc::channel::<Vec<BluefinPacket>>(1024);
for _ in 0..Self::get_num_cpu_cores() {

// Spawn n-number of UDP-recv tasks.
for _ in 0..Self::get_number_of_tx_tasks() {
let tx_cloned = tx.clone();
let socket_cloned = self.socket.clone();
spawn(async move {
let _ = ConnReaderHandler::tx_impl(socket_cloned, tx_cloned).await;
});
}

// Spawn the corresponding rx channel which receives bytes from the tx channel and processes
// bytes and buffers them.
let conn_bufs = self.conn_bufs.clone();
spawn(async move {
let _ = ConnReaderHandler::rx_impl(rx, &*conn_bufs).await;
});
Ok(())
}

/// For linux, we return the expected number of CPU cores. This lets us take advantage of
/// parallelism. For (silicon) macos, we return one. Experiments on Apple Silicon have shown
/// that SO_REUSEPORT does not behave the same way as it does on Linux
/// (see: https://stackoverflow.com/questions/51998042/macos-so-reuseaddr-so-reuseport-not-consistent-with-linux)
/// and so we cannot take advantage of running the rx-tasks on multiple threads. For now, running
/// one instance of it is performant enough.
///
/// For all other operating systems (which is currently unsupported by Bluefine anyways), we
/// return an arbitrary default value.
#[allow(unreachable_code)]
#[inline]
fn get_num_cpu_cores() -> usize {
fn get_number_of_tx_tasks() -> usize {
// For linux, let's use all the cpu cores available.
#[cfg(target_os = "linux")]
{
Expand All @@ -63,6 +89,10 @@ impl ConnReaderHandler {
DEFAULT_NUMBER_OF_TASKS_TO_SPAWN
}

/// This represents one tx task or one of the multiple producers in the mpsc channel. This
/// function is a hot-loop; it continuously reads from a connected socket. When bytes are
/// received, we attempt to deserialise them into bluefin packets. If valid packets are
/// produced, them we send them to the consumer channel for processing.
#[inline]
async fn tx_impl(
socket: Arc<UdpSocket>,
Expand All @@ -73,14 +103,12 @@ impl ConnReaderHandler {
let size = socket.recv(&mut buf).await?;
let packets = BluefinPacket::from_bytes(&buf[..size])?;

if packets.len() == 0 {
continue;
}

let _ = tx.send(packets).await;
}
}

/// This is the single consumer in the mpsc channel. This receives bluefin packets from
/// n-producers. We place the packets into the relevant buffer.
#[inline]
async fn rx_impl(
mut rx: mpsc::Receiver<Vec<BluefinPacket>>,
Expand All @@ -103,7 +131,14 @@ impl ConnReaderHandler {
return Ok(());
}

// Peek at the first packet and acquire the buffer.
// Peek at the first packet and acquire the buffer. The assumptions here are:
// 1. An udp datagram contains one or more bluefin packets. However, all the packets
// in the datagram are for the same connection (no mix and matching different connection
// packets in the same datagram).
// 2. An udp datagram contains the same type of packets. This means a udp datagram either
// contains all data-type packets or ack-packets.
// Therefore, with these assumptions, we can just peek at the first packet in the datagram
// and then acquire the appropriate lock before processing.
let p = packets.first().unwrap();
match p.header.type_field {
PacketType::Ack => {
Expand Down Expand Up @@ -142,8 +177,8 @@ impl ConnReaderHandler {
packets: Vec<BluefinPacket>,
) -> BluefinResult<()> {
let mut e: Option<BluefinError> = None;
for p in packets {
if let Err(err) = guard.buffer_in_bytes(p) {
for mut p in packets {
if let Err(err) = guard.buffer_in_bytes(p.extract()) {
e = Some(err);
}
}
Expand Down
Loading
Loading