Skip to content

Commit

Permalink
Ordered bytes (#28)
Browse files Browse the repository at this point in the history
* basic byte ordering

* recv max num of bytes

* fix peeking carry over bytes + opt

* send ack packets after consume

* doc updates
  • Loading branch information
franklee26 authored Sep 4, 2024
1 parent b42bc71 commit 4575031
Show file tree
Hide file tree
Showing 11 changed files with 504 additions and 91 deletions.
5 changes: 1 addition & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,8 @@ repository = "https://github.com/franklee26/bluefin"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
bincode = "1.3.3"
etherparse = "0.13.0"
pin-utils = "0.1.0"
rand = "0.8.5"
serde = { version = "1.0.147", features = ["derive"]}
thiserror = "1.0.39"
tokio = { version = "1", features = ["full"] }

Expand All @@ -29,6 +26,6 @@ name = "server"
path = "src/bin/server.rs"

[profile.release]
opt-level = 2
opt-level = 3
codegen-units = 1
lto = "fat"
49 changes: 46 additions & 3 deletions src/bin/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ async fn main() -> BluefinResult<()> {
Ipv4Addr::new(10, 0, 0, 31),
ports[ix],
)));
let conn = client
let mut conn = client
.connect(std::net::SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::new(10, 0, 0, 31),
1318,
Expand All @@ -40,7 +40,44 @@ async fn main() -> BluefinResult<()> {
size = conn.send(&[3; 8]).await?;
println!("Sent {} bytes", size);

sleep(Duration::from_secs(2)).await;
size = conn.send(&[100; 8]).await?;
println!("Sent {} bytes", size);

size = conn.send(&[101; 8]).await?;
println!("Sent {} bytes", size);

size = conn.send(&[102; 8]).await?;
println!("Sent {} bytes", size);

size = conn.send(&[103; 8]).await?;
println!("Sent {} bytes", size);

size = conn.send(&[104; 8]).await?;
println!("Sent {} bytes", size);

size = conn.send(&[105; 8]).await?;
println!("Sent {} bytes", size);

size = conn.send(&[106; 8]).await?;
println!("Sent {} bytes", size);

size = conn.send(&[107; 8]).await?;
println!("Sent {} bytes", size);

size = conn.send(&[108; 10]).await?;
println!("Sent {} bytes", size);

size = conn.send(&[109; 50]).await?;
println!("Sent {} bytes", size);

size = conn.send(&[110; 50]).await?;
println!("Sent {} bytes", size);

size = conn.send(&[111; 50]).await?;
println!("Sent {} bytes", size);

size = conn.send(&[112; 50]).await?;
println!("Sent {} bytes", size);

size = conn.send(&[12, 12, 12, 12, 12, 12]).await?;
println!("Sent {} bytes", size);
Expand All @@ -54,6 +91,9 @@ async fn main() -> BluefinResult<()> {
let mut size = conn.send(&bytes).await?;
println!("Sent {} bytes", size);

size = conn.send(&[5; 20]).await?;
println!("Sent {} bytes", size);

size = conn.send(&[6; 10]).await?;
println!("Sent {} bytes", size);

Expand All @@ -63,9 +103,12 @@ async fn main() -> BluefinResult<()> {
size = conn.send(&[9; 8]).await?;
println!("Sent {} bytes", size);

size = conn.send(&[13, 13, 13, 13, 13, 13]).await?;
println!("Sent {} bytes", size);

sleep(Duration::from_secs(3)).await;

size = conn.send(&[13, 13, 13, 13, 13, 13]).await?;
size = conn.send(&[15, 15, 15]).await?;
println!("Sent {} bytes", size);
}

Expand Down
5 changes: 3 additions & 2 deletions src/bin/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,17 @@ async fn main() -> BluefinResult<()> {
let mut s = server.clone();
let _ = spawn(async move {
loop {
println!();
let _conn = s.accept().await;

if let Ok(mut conn) = _conn {
spawn(async move {
loop {
let mut recv_bytes = [0u8; 1024];
let size = conn.recv(&mut recv_bytes).await.unwrap();
let size = conn.recv(&mut recv_bytes, 100).await.unwrap();

println!(
"({}_{}) >>> Received: {:?}",
"({:x}_{:x}) >>> Received: {:?}",
conn.src_conn_id,
conn.dst_conn_id,
&recv_bytes[..size],
Expand Down
4 changes: 2 additions & 2 deletions src/core/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ pub enum BluefinError {
#[error("Socket is not valid.")]
InvalidSocketError,

#[error("Encountered segment with unexpected segment number.")]
UnexpectedSegmentError,
#[error("Encountered packet with unexpected packet number.")]
UnexpectedPacketNumberError,

#[error("Could not buffer data: `{0}`")]
CannotBufferError(String),
Expand Down
7 changes: 6 additions & 1 deletion src/core/header.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use crate::utils::common::BluefinResult;

use super::{error::BluefinError, Serialisable};

/// 4 bits reserved for PacketType => 16 possible packet types
Expand Down Expand Up @@ -120,12 +122,14 @@ impl BluefinHeader {
}
}

#[inline]
pub fn with_packet_number(&mut self, packet_number: u64) {
self.packet_number = packet_number;
}
}

impl Serialisable for BluefinHeader {
#[inline]
fn serialise(&self) -> Vec<u8> {
let first_byte = (self.version << 4) | self.type_field as u8;
[
Expand All @@ -139,7 +143,8 @@ impl Serialisable for BluefinHeader {
.concat()
}

fn deserialise(bytes: &[u8]) -> Result<Self, BluefinError> {
#[inline]
fn deserialise(bytes: &[u8]) -> BluefinResult<Self> {
if bytes.len() != 20 {
return Err(BluefinError::DeserialiseError(
"Bluefin header must be exactly 20 bytes".to_string(),
Expand Down
33 changes: 26 additions & 7 deletions src/net/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,11 @@ impl BluefinClient {
self.dst_addr = Some(dst_addr);

let src_conn_id: u32 = rand::thread_rng().gen();
eprintln!("client src id: {}", src_conn_id);
let conn_buffer = Arc::new(Mutex::new(ConnectionBuffer::new()));
eprintln!("client src id: 0x{:x}", src_conn_id);
let conn_buffer = Arc::new(Mutex::new(ConnectionBuffer::new(
src_conn_id,
BluefinHost::Client,
)));
let handshake_buf = HandshakeConnectionBuffer::new(Arc::clone(&conn_buffer));

// Register the connection
Expand All @@ -64,8 +67,13 @@ impl BluefinClient {
.insert(&hello_key, Arc::clone(&conn_buffer))?;

// send the client hello
let packet =
build_empty_encrypted_packet(src_conn_id, 0x0, PacketType::UnencryptedClientHello);
let packet_number: u64 = rand::thread_rng().gen();
let packet = build_empty_encrypted_packet(
src_conn_id,
0x0,
packet_number,
PacketType::UnencryptedClientHello,
);
self.socket
.as_ref()
.unwrap()
Expand All @@ -80,9 +88,14 @@ impl BluefinClient {
"Did not receive server hello in time".to_string(),
));
}
let (packet, _) = res.unwrap();
let dst_conn_id = packet.header.source_connection_id;
let (server_hello, _) = res.unwrap();
let dst_conn_id = server_hello.header.source_connection_id;
let key = format!("{}_{}", src_conn_id, dst_conn_id);
let server_packet_number = server_hello.header.packet_number;
// Bluefin handshake asserts that the initial packet numbers cannot be zero
if server_packet_number == 0x0 {
return Err(BluefinError::UnexpectedPacketNumberError);
}

// delete the old hello entry and insert the new connection entry
let mut guard = self.conn_manager.write().await;
Expand All @@ -91,7 +104,12 @@ impl BluefinClient {
drop(guard);

// send the client ack
let packet = build_empty_encrypted_packet(src_conn_id, dst_conn_id, PacketType::Ack);
let packet = build_empty_encrypted_packet(
src_conn_id,
dst_conn_id,
packet_number + 1,
PacketType::Ack,
);
self.socket
.as_ref()
.unwrap()
Expand All @@ -101,6 +119,7 @@ impl BluefinClient {
Ok(BluefinConnection::new(
src_conn_id,
dst_conn_id,
packet_number + 2,
Arc::clone(&conn_buffer),
Arc::clone(self.socket.as_ref().unwrap()),
))
Expand Down
Loading

0 comments on commit 4575031

Please sign in to comment.