Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

acknowledge timestamp frame manual testing #2

Draft
wants to merge 2 commits into
base: sdeng/receiver_timestamps
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 83 additions & 0 deletions quinn/examples/ack_timestamp_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
//! This example intends to use the smallest amount of code to make a simple QUIC connection.
//!
//! Checkout the `README.md` for guidance.

use std::error::Error;
use std::time::{Duration, Instant};

mod common;
use common::{make_client_endpoint, make_server_endpoint};

use bytes::BufMut;

use tracing::{self, info, trace, trace_span};
use tracing_subscriber::{fmt, prelude::*, EnvFilter};

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
let test_length = Duration::from_secs(15);
// This should match approximately what the interpacket delay is.
let send_interval = Duration::from_millis(250);

tracing_subscriber::registry()
.with(fmt::layer())
.with(EnvFilter::from_default_env())
.init();

let server_addr = "127.0.0.1:20001".parse().unwrap();
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The server_addr and client_endpoint have ports that are impacted by the traffic shaping script.

let (endpoint, server_cert) = make_server_endpoint(server_addr)?;
let endpoint2 = endpoint.clone();
let handle = tokio::spawn(async move {
let span = trace_span!("SERVER");
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could grep for SERVER or CLIENT to filter the logs

let _guard = span.enter();
let incoming_conn = endpoint2.accept().await.unwrap();
let conn = incoming_conn.await.unwrap();
trace!("connection accepted: addr={}", conn.remote_address());
loop {
match conn.read_datagram().await {
Ok(v) => {
let _ = String::from_utf8(v.to_vec()).unwrap();
}
Err(e) => match e {
proto::ConnectionError::ConnectionClosed(_)
| proto::ConnectionError::ApplicationClosed(_) => {
return;
}
_ => {
println!("connection error:{}", e);
return;
}
},
}
}
});

let span = trace_span!("CLIENT");
let _guard = span.enter();
let client_endpoint = make_client_endpoint("0.0.0.0:20002".parse().unwrap(), &[&server_cert])?;
// connect to server
let connection = client_endpoint
.connect(server_addr, "localhost")
.unwrap()
.await
.unwrap();
trace!("connected: addr={}", connection.remote_address());

let end = Instant::now().checked_add(test_length).unwrap();

let mut buf = bytes::BytesMut::new();
buf.put(&b"foobarbaz"[..]);
let buf = buf.freeze();
while Instant::now() < end {
connection.send_datagram(buf.clone()).unwrap();
tokio::time::sleep(send_interval).await;
}

drop(connection);
drop(_guard);

handle.await.unwrap();

info!("test exiting..");
Ok(())
}
105 changes: 105 additions & 0 deletions quinn/examples/common/custom_congestion.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
use proto::congestion::{Controller, ControllerFactory, CubicConfig};
use std::sync::Arc;
use std::time::{Duration, Instant};

use tracing::{info, info_span};

pub struct TestCubicWrapperFactory {}

impl ControllerFactory for TestCubicWrapperFactory {
fn build(self: Arc<Self>, now: Instant, current_mtu: u16) -> Box<dyn Controller> {
let cc = Arc::new(CubicConfig::default());
let controller = TestCubicWrapper {
last_packet: None,
controller: cc.build(now, current_mtu),
dur_basis: Instant::now(),
};
Box::new(controller)
}
}

pub struct TestCubicWrapper {
last_packet: Option<LastPacket>,
controller: Box<dyn Controller>,
dur_basis: Instant,
}

#[derive(Debug, Clone)]
struct LastPacket {
_number: u64,
_sent: Instant,
received: Option<Duration>,
}

impl Clone for TestCubicWrapper {
fn clone(&self) -> Self {
let cloned_controller = self.controller.clone_box();
Self {
last_packet: self.last_packet.clone(),
controller: cloned_controller,
dur_basis: self.dur_basis,
}
}
}

impl Controller for TestCubicWrapper {
fn on_congestion_event(
&mut self,
now: Instant,
sent: Instant,
is_persistent_congestion: bool,
lost_bytes: u64,
) {
self.controller
.on_congestion_event(now, sent, is_persistent_congestion, lost_bytes)
}

fn on_mtu_update(&mut self, new_mtu: u16) {
self.controller.on_mtu_update(new_mtu);
}
fn window(&self) -> u64 {
self.controller.window()
}
fn clone_box(&self) -> Box<dyn Controller> {
Box::new(self.clone())
}
fn initial_window(&self) -> u64 {
self.controller.initial_window()
}
fn into_any(self: Box<Self>) -> Box<dyn std::any::Any> {
Box::new(self)
}

// Provided methods

fn on_ack_timestamped(
&mut self,
pn: u64,
_now: Instant,
sent: Instant,
received: Option<Duration>,
_bytes: u64,
_app_limited: bool,
_rtt: &proto::RttEstimator,
) {
let span = info_span!("[cc] on_ack_packet", "pn" = pn);
let _guard = span.enter();
if let Some(recv) = received {
info!("~0.5RTT={}", recv.duration_since(sent).as_millis());

if let Some(lp) = self.last_packet.as_ref() {
if let Some(last_recv) = lp.received {
info!(
"receiver interpacket delay = {}",
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The interpacket delay is the time delay between the current packet and the previous packet from the perspective of the peer. This should roughly be the send_interval on the client side.
Sometimes the value will be 0 if the QUIC packet was sent in the same UDP packet. Or at least thats what I think. I verified this by adding logs on the fn on_sent method on the congestion controller trait.

(recv - last_recv).as_millis()
)
}
}
}
self.last_packet = Some(LastPacket {
_number: pn,
_sent: sent,
received,
})
}
}
13 changes: 12 additions & 1 deletion quinn/examples/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use rustls::pki_types::{CertificateDer, PrivatePkcs8KeyDer};

use std::{error::Error, net::SocketAddr, sync::Arc};

mod custom_congestion;

/// Constructs a QUIC endpoint configured for use a client only.
///
/// ## Args
Expand Down Expand Up @@ -51,7 +53,16 @@ fn configure_client(
certs.add(CertificateDer::from(*cert))?;
}

Ok(ClientConfig::with_root_certificates(Arc::new(certs))?)
let mut cfg = ClientConfig::with_root_certificates(Arc::new(certs))?;
let mut transport_config = proto::TransportConfig::default();
transport_config.max_ack_timestamps(10u32.into());

transport_config
.congestion_controller_factory(Arc::new(custom_congestion::TestCubicWrapperFactory {}));

cfg.transport_config(Arc::new(transport_config));

Ok(cfg)
}

/// Returns default server configuration along with its certificate.
Expand Down
41 changes: 41 additions & 0 deletions trafficshape.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#!/bin/bash
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This requires sudo


reset() {
dnctl -q flush
pfctl -f /etc/pf.conf
pfctl -d
}

status() {
echo
dnctl list
}

# this causes a 500ms delay for udp packets from port 20000:20100
delay() {
pfctl -e
dnctl pipe 1 config delay 500

(cat /etc/pf.conf && cat) <<__PF__ | pfctl -f -
dummynet-anchor "mop"
anchor "mop"
__PF__
cat <<__MOP__ | pfctl -a mop -f -
dummynet in proto udp from port 20000:20100 to any pipe 1
dummynet in proto udp from any to port 20000:20100 pipe 1
__MOP__

}

case $1 in
delay)
delay
;;
reset)
reset
status
;;
*)
echo $0 "delay | reset"
;;
esac