Skip to content

Commit

Permalink
more comments + cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
Frank Lee committed Nov 26, 2024
1 parent c866288 commit d61a0ae
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 12 deletions.
39 changes: 39 additions & 0 deletions src/core/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ impl BluefinPacket {
self.payload.len() + 20
}

/// Converts an array of bytes into a vector of bluefin packets. The array of bytes must be
/// a valid stream of bluefin packet bytes. Otherwise, an error is returned.
#[inline]
pub fn from_bytes(bytes: &[u8]) -> BluefinResult<Vec<BluefinPacket>> {
if bytes.len() < 20 {
return Err(BluefinError::ReadError(
Expand Down Expand Up @@ -139,12 +142,48 @@ impl BluefinPacketBuilder {
#[cfg(test)]
mod tests {
use crate::core::{
error::BluefinError,
header::{BluefinHeader, BluefinSecurityFields, PacketType},
Serialisable,
};

use super::BluefinPacket;

#[test]
fn cannot_deserialise_invalid_bytes_into_bluefin_packets() {
let security_fields = BluefinSecurityFields::new(false, 0x0);

let mut header = BluefinHeader::new(0x0, 0x0, PacketType::Ack, 13, security_fields);
let payload: [u8; 32] = rand::random();
header.type_field = PacketType::UnencryptedData;
header.type_specific_payload = 32;
header.version = 13;
let mut packet = BluefinPacket::builder()
.header(header.clone())
.payload(payload.to_vec())
.build();
assert_eq!(packet.len(), 52);
assert!(BluefinPacket::from_bytes(&packet.serialise()).is_ok());

// Incorrectly specify the length to be 33 instead of 32
packet.header.type_specific_payload = (payload.len() + 1) as u16;
assert!(
BluefinPacket::from_bytes(&packet.serialise()).is_err_and(|e| e
== BluefinError::ReadError(
"Cannot read all bytes specified by header".to_string()
))
);

// Now test again but specify a payload length under the actual payload len
packet.header.type_specific_payload = (payload.len() - 1) as u16;
assert!(
BluefinPacket::from_bytes(&packet.serialise()).is_err_and(|e| e
== BluefinError::ReadError(
"Was not able to read all bytes into bluefin packets. Likely indicates corrupted UDP datagram.".to_string()
))
);
}

#[test]
fn able_to_deserialise_bytes_into_multiple_bluefin_packets_correctly() {
// Build 6 packets
Expand Down
1 change: 0 additions & 1 deletion src/net/ack_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ impl AckBuffer {
pub(crate) fn buffer_in_ack_packet(&mut self, packet: BluefinPacket) -> BluefinResult<()> {
let num_packets_to_ack = packet.header.type_specific_payload;
let base_packet_num = packet.header.packet_number;
// let base_packet_num = u64::from_le_bytes(packet.payload.try_into().unwrap());
for ix in 0..num_packets_to_ack {
self.received_acks
.insert_packet_number(base_packet_num + ix as u64)?;
Expand Down
16 changes: 9 additions & 7 deletions src/net/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ use super::{
AckBuffer, ConnectionManagedBuffers,
};

const NUM_TX_WORKERS_FOR_CLIENT: u16 = 5;
const NUM_TX_WORKERS_FOR_CLIENT_DEFAULT: u16 = 5;

pub struct BluefinClient {
socket: Option<Arc<UdpSocket>>,
src_addr: SocketAddr,
dst_addr: Option<SocketAddr>,
conn_manager: Arc<RwLock<ConnectionManager>>,
num_reader_workers: u16,
}

impl BluefinClient {
Expand All @@ -36,17 +37,17 @@ impl BluefinClient {
dst_addr: None,
conn_manager: Arc::new(RwLock::new(ConnectionManager::new())),
src_addr,
num_reader_workers: NUM_TX_WORKERS_FOR_CLIENT_DEFAULT,
}
}

pub async fn connect(&mut self, dst_addr: SocketAddr) -> BluefinResult<BluefinConnection> {
let socket = Arc::new(UdpSocket::bind(self.src_addr).await?);
// socket.connect(dst_addr).await?;
self.socket = Some(Arc::clone(&socket));
self.dst_addr = Some(dst_addr);

build_and_start_tx(
NUM_TX_WORKERS_FOR_CLIENT,
self.num_reader_workers,
Arc::clone(self.socket.as_ref().unwrap()),
Arc::clone(&self.conn_manager),
Arc::new(Mutex::new(Vec::new())),
Expand Down Expand Up @@ -100,10 +101,11 @@ impl BluefinClient {
}

// delete the old hello entry and insert the new connection entry
let mut guard = self.conn_manager.write().await;
let _ = guard.remove(&hello_key);
let _ = guard.insert(&key, conn_mgrs_buffs);
drop(guard);
{
let mut guard = self.conn_manager.write().await;
let _ = guard.remove(&hello_key);
let _ = guard.insert(&key, conn_mgrs_buffs);
}

// send the client ack
let packet = build_empty_encrypted_packet(
Expand Down
16 changes: 12 additions & 4 deletions src/worker/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::{
utils::common::BluefinResult,
};

/// Each writer queue holds a queue of `WriterQueueData`
enum WriterQueueData {
Payload(Vec<u8>),
Ack {
Expand All @@ -41,6 +42,7 @@ impl WriterQueue {
}
}

#[inline]
pub(crate) fn consume_data(
&mut self,
next_packet_num: &mut u64,
Expand Down Expand Up @@ -210,7 +212,7 @@ impl WriterQueue {
}
}

/// Queues write requests to be sent
/// Queues write requests to be sent. Each connection can have one or more [WriterTxChannel].
#[derive(Clone)]
pub(crate) struct WriterTxChannel {
data_queue: Arc<Mutex<WriterQueue>>,
Expand Down Expand Up @@ -288,7 +290,11 @@ struct WriterRxChannelAckFuture {
ack_queue: Arc<Mutex<WriterQueue>>,
}

/// Consumes queued requests and sends them across the wire
/// Consumes queued requests and sends them across the wire. For now, each connection
/// has one and only one [WriterRxChannel]. This channel must run two separate jobs:
/// [WriterRxChannel::run_data], which reads out of the data queue and sends bluefin
/// packets w/ payloads across the wire AND [WriterRxChannel::run_ack], which reads
/// acks out of the ack queue and sends bluefin ack packets across the wire.
#[derive(Clone)]
pub(crate) struct WriterRxChannel {
data_future: WriterRxChannelDataFuture,
Expand Down Expand Up @@ -414,9 +420,12 @@ mod verification_tests {
#[kani::proof]
fn kani_writer_queue_consume_empty_data_behaves_as_expected() {
let mut writer_q = WriterQueue::new();
let mut next_packet_num = kani::any();
let prev = next_packet_num;
assert!(writer_q
.consume_data(&mut 0, kani::any(), kani::any())
.consume_data(&mut next_packet_num, kani::any(), kani::any())
.is_none());
assert_eq!(next_packet_num, prev);
}

#[kani::proof]
Expand All @@ -428,7 +437,6 @@ mod verification_tests {

#[cfg(test)]
mod tests {

use rstest::rstest;

use crate::{
Expand Down

0 comments on commit d61a0ae

Please sign in to comment.