Skip to content

Commit

Permalink
reducing acks sent
Browse files Browse the repository at this point in the history
  • Loading branch information
Frank Lee committed Dec 24, 2024
1 parent 97344a2 commit c46db10
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 22 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ etherparse = "0.16.0"
local-ip-address = "0.6.3"
rand = "0.8.5"
rstest = "0.23.0"
thiserror = "2.0.6"
tokio = { version = "1.41.1", features = ["full", "tracing"] }
thiserror = "2.0.8"
tokio = { version = "1.42.0", features = ["full", "tracing"] }
console-subscriber = "0.4.1"
libc = "0.2.164"
sysctl = "0.6.0"
Expand Down
2 changes: 1 addition & 1 deletion src/bin/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ async fn main() -> BluefinResult<()> {
}
}
println!("Sent {} bytes", total_bytes);
sleep(Duration::from_secs(2)).await;
sleep(Duration::from_secs(3)).await;

Check warning on line 62 in src/bin/client.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/client.rs#L62

Added line #L62 was not covered by tests

Ok::<(), BluefinError>(())
});
Expand Down
82 changes: 64 additions & 18 deletions src/worker/conn_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ use tokio::sync::mpsc::{self};
use crate::core::error::BluefinError;
use crate::core::header::PacketType;
use crate::core::packet::BluefinPacket;
use crate::net::ack_handler::AckBuffer;
use crate::net::connection::ConnectionBuffer;
use crate::net::{ConnectionManagedBuffers, MAX_BLUEFIN_BYTES_IN_UDP_DATAGRAM};
use crate::utils::common::BluefinResult;
use std::sync::Arc;
use std::sync::{Arc, MutexGuard};

pub(crate) struct ConnReaderHandler {
socket: Arc<UdpSocket>,
Expand All @@ -21,7 +23,7 @@ impl ConnReaderHandler {

pub(crate) fn start(&self) -> BluefinResult<()> {
let (tx, rx) = mpsc::channel::<Vec<BluefinPacket>>(1024);
for _ in 0..4 {
for _ in 0..2 {
let tx_cloned = tx.clone();
let socket_cloned = self.socket.clone();
spawn(async move {
Expand Down Expand Up @@ -61,31 +63,75 @@ impl ConnReaderHandler {
) {
loop {
if let Some(packets) = rx.recv().await {
for p in packets {
let _ = ConnReaderHandler::buffer_in_packet(conn_bufs, p);
}
let _ = Self::buffer_in_packets(packets, conn_bufs);
}
}
}

#[inline]
fn buffer_in_packet(
fn buffer_in_packets(
packets: Vec<BluefinPacket>,
conn_bufs: &ConnectionManagedBuffers,
packet: BluefinPacket,
) -> BluefinResult<()> {
if packet.header.type_field == PacketType::Ack {
let mut guard = conn_bufs.ack_buff.lock().unwrap();
guard.buffer_in_ack_packet(packet)?;
guard.wake()?;
} else {
let mut guard = conn_bufs.conn_buff.lock().unwrap();
guard.buffer_in_bytes(packet)?;
if let Some(w) = guard.get_waker() {
w.wake_by_ref();
} else {
return Err(BluefinError::NoSuchWakerError);
// Nothing to do if empty
if packets.is_empty() {
return Ok(());

Check warning on line 78 in src/worker/conn_reader.rs

View check run for this annotation

Codecov / codecov/patch

src/worker/conn_reader.rs#L78

Added line #L78 was not covered by tests
}

// Peek at the first packet and acquire the buffer.
let p = packets.first().unwrap();
match p.header.type_field {
PacketType::Ack => {
let guard = conn_bufs.ack_buff.lock().unwrap();
Self::buffer_in_ack_packets(guard, packets)

Check warning on line 86 in src/worker/conn_reader.rs

View check run for this annotation

Codecov / codecov/patch

src/worker/conn_reader.rs#L85-L86

Added lines #L85 - L86 were not covered by tests
}
_ => {
let guard = conn_bufs.conn_buff.lock().unwrap();
Self::buffer_in_data_packets(guard, packets)
}
}
}

#[inline]
fn buffer_in_ack_packets(
mut guard: MutexGuard<'_, AckBuffer>,
packets: Vec<BluefinPacket>,
) -> BluefinResult<()> {
let mut e: Option<BluefinError> = None;
for p in packets {
if let Err(err) = guard.buffer_in_ack_packet(p) {
e = Some(err);
}

Check warning on line 104 in src/worker/conn_reader.rs

View check run for this annotation

Codecov / codecov/patch

src/worker/conn_reader.rs#L96-L104

Added lines #L96 - L104 were not covered by tests
}
guard.wake()?;

Check warning on line 106 in src/worker/conn_reader.rs

View check run for this annotation

Codecov / codecov/patch

src/worker/conn_reader.rs#L106

Added line #L106 was not covered by tests

if e.is_some() {
return Err(e.unwrap());
}
Ok(())
}

Check warning on line 112 in src/worker/conn_reader.rs

View check run for this annotation

Codecov / codecov/patch

src/worker/conn_reader.rs#L108-L112

Added lines #L108 - L112 were not covered by tests

#[inline]
fn buffer_in_data_packets(
mut guard: MutexGuard<'_, ConnectionBuffer>,
packets: Vec<BluefinPacket>,
) -> BluefinResult<()> {
let mut e: Option<BluefinError> = None;
for p in packets {
if let Err(err) = guard.buffer_in_bytes(p) {
e = Some(err);

Check warning on line 122 in src/worker/conn_reader.rs

View check run for this annotation

Codecov / codecov/patch

src/worker/conn_reader.rs#L122

Added line #L122 was not covered by tests
}
}

if let Some(w) = guard.get_waker() {
w.wake_by_ref();
} else {
return Err(BluefinError::NoSuchWakerError);

Check warning on line 129 in src/worker/conn_reader.rs

View check run for this annotation

Codecov / codecov/patch

src/worker/conn_reader.rs#L129

Added line #L129 was not covered by tests
}

if e.is_some() {
return Err(e.unwrap());

Check warning on line 133 in src/worker/conn_reader.rs

View check run for this annotation

Codecov / codecov/patch

src/worker/conn_reader.rs#L133

Added line #L133 was not covered by tests
}
Ok(())
}
}
11 changes: 10 additions & 1 deletion src/worker/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ pub(crate) struct ReaderTxChannel {
pub(crate) struct ReaderRxChannel {
future: ReaderRxChannelFuture,
writer_handler: WriterHandler,
packets_consumed: usize,
packets_consumed_before_ack: usize,
}

#[derive(Clone)]
Expand Down Expand Up @@ -74,6 +76,8 @@ impl ReaderRxChannel {
Self {
future,
writer_handler,
packets_consumed: 0,
packets_consumed_before_ack: 100,
}
}

Expand All @@ -90,9 +94,13 @@ impl ReaderRxChannel {
};
let num_packets_consumed = consume_res.get_num_packets_consumed();
let base_packet_num = consume_res.get_base_packet_number();
self.packets_consumed += num_packets_consumed;

// We need to send an ack.
if num_packets_consumed > 0 && base_packet_num != 0 {
if num_packets_consumed > 0
&& base_packet_num != 0
&& self.packets_consumed >= self.packets_consumed_before_ack
{
if let Err(e) = self
.writer_handler

Check warning on line 105 in src/worker/reader.rs

View check run for this annotation

Codecov / codecov/patch

src/worker/reader.rs#L105

Added line #L105 was not covered by tests
.send_ack(base_packet_num, num_packets_consumed)
Expand All @@ -102,6 +110,7 @@ impl ReaderRxChannel {
e
);
}
self.packets_consumed = 0;

Check warning on line 113 in src/worker/reader.rs

View check run for this annotation

Codecov / codecov/patch

src/worker/reader.rs#L113

Added line #L113 was not covered by tests
}

Ok((consume_res.get_bytes_consumed(), addr))
Expand Down

0 comments on commit c46db10

Please sign in to comment.