Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce the Prague congestion control #1708

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
15 changes: 15 additions & 0 deletions apps/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ pub struct CommonArgs {
pub qpack_max_table_capacity: Option<u64>,
pub qpack_blocked_streams: Option<u64>,
pub initial_cwnd_packets: u64,
pub enable_ecn: bool,
pub use_ect1: bool,
}

/// Creates a new `CommonArgs` structure using the provided [`Docopt`].
Expand All @@ -80,6 +82,8 @@ pub struct CommonArgs {
/// --qpack-max-table-capacity BYTES Max capacity of dynamic QPACK decoding.
/// --qpack-blocked-streams STREAMS Limit of blocked streams while decoding.
/// --initial-cwnd-packets Size of initial congestion window, in packets.
/// --enable-ecn Enable ECN support.
/// --use-ect1 Use ECT(1) instead of ECT(0).
///
/// [`Docopt`]: https://docs.rs/docopt/1.1.0/docopt/
impl Args for CommonArgs {
Expand Down Expand Up @@ -191,6 +195,9 @@ impl Args for CommonArgs {
.parse::<u64>()
.unwrap();

let enable_ecn = args.get_bool("--enable-ecn");
let use_ect1 = args.get_bool("--use-ect1");

CommonArgs {
alpns,
max_data,
Expand All @@ -214,6 +221,8 @@ impl Args for CommonArgs {
qpack_max_table_capacity,
qpack_blocked_streams,
initial_cwnd_packets,
enable_ecn,
use_ect1,
}
}
}
Expand Down Expand Up @@ -243,6 +252,8 @@ impl Default for CommonArgs {
qpack_max_table_capacity: None,
qpack_blocked_streams: None,
initial_cwnd_packets: 10,
enable_ecn: false,
use_ect1: false,
}
}
}
Expand Down Expand Up @@ -289,6 +300,8 @@ Options:
--session-file PATH File used to cache a TLS session for resumption.
--source-port PORT Source port to use when connecting to the server [default: 0].
--initial-cwnd-packets PACKETS The initial congestion window size in terms of packet count [default: 10].
--enable-ecn Enable ECN support.
--use-ect1 Use ECT(1) instead of ECT(0).
-h --help Show this screen.
";

Expand Down Expand Up @@ -464,6 +477,8 @@ Options:
--disable-gso Disable GSO (linux only).
--disable-pacing Disable pacing (linux only).
--initial-cwnd-packets PACKETS The initial congestion window size in terms of packet count [default: 10].
--enable-ecn Enable ECN support.
--use-ect1 Use ECT(1) instead of ECT(0).
-h --help Show this screen.
";

Expand Down
26 changes: 15 additions & 11 deletions apps/src/bin/quiche-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use std::rc::Rc;

use std::cell::RefCell;

use quiche_apps::recvfrom::recv_from;
use ring::rand::*;

use quiche_apps::args::*;
Expand Down Expand Up @@ -127,6 +128,8 @@ fn main() {
config.set_max_stream_window(conn_args.max_stream_window);

config.enable_pacing(pacing);
config.enable_ecn(conn_args.enable_ecn);
config.set_ecn_use_ect1(conn_args.use_ect1);

let mut keylog = None;

Expand Down Expand Up @@ -202,7 +205,12 @@ fn main() {
break 'read;
}

let (len, from) = match socket.recv_from(&mut buf) {
let (len, recv_info) = match recv_from(
&socket,
local_addr,
&mut buf,
conn_args.enable_ecn,
) {
Ok(v) => v,

Err(e) => {
Expand Down Expand Up @@ -270,7 +278,7 @@ fn main() {

let out = &out[..len];

if let Err(e) = socket.send_to(out, from) {
if let Err(e) = socket.send_to(out, recv_info.from) {
if e.kind() == std::io::ErrorKind::WouldBlock {
trace!("send() would block");
break;
Expand All @@ -295,7 +303,7 @@ fn main() {
warn!("Doing stateless retry");

let scid = quiche::ConnectionId::from_ref(&scid);
let new_token = mint_token(&hdr, &from);
let new_token = mint_token(&hdr, &recv_info.from);

let len = quiche::retry(
&hdr.scid,
Expand All @@ -309,7 +317,7 @@ fn main() {

let out = &out[..len];

if let Err(e) = socket.send_to(out, from) {
if let Err(e) = socket.send_to(out, recv_info.from) {
if e.kind() == std::io::ErrorKind::WouldBlock {
trace!("send() would block");
break;
Expand All @@ -320,7 +328,7 @@ fn main() {
continue 'read;
}

odcid = validate_token(&from, token);
odcid = validate_token(&recv_info.from, token);

// The token was not valid, meaning the retry failed, so
// drop the packet.
Expand Down Expand Up @@ -348,7 +356,7 @@ fn main() {
&scid,
odcid.as_ref(),
local_addr,
from,
recv_info.from,
&mut config,
)
.unwrap();
Expand Down Expand Up @@ -404,11 +412,6 @@ fn main() {
clients.get_mut(cid).unwrap()
};

let recv_info = quiche::RecvInfo {
to: local_addr,
from,
};

// Process potentially coalesced packets.
let read = match client.conn.recv(pkt_buf, recv_info) {
Ok(v) => v,
Expand Down Expand Up @@ -585,6 +588,7 @@ fn main() {
client.max_datagram_size,
pacing,
enable_gso,
conn_args.enable_ecn,
) {
if e.kind() == std::io::ErrorKind::WouldBlock {
trace!("send() would block");
Expand Down
37 changes: 29 additions & 8 deletions apps/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@

use crate::args::*;
use crate::common::*;
use crate::recvfrom::recv_from;
use crate::sendto::send_to;

use std::net::ToSocketAddrs;

Expand Down Expand Up @@ -131,6 +133,9 @@ pub fn connect(
config.set_max_connection_window(conn_args.max_window);
config.set_max_stream_window(conn_args.max_stream_window);

config.enable_ecn(conn_args.enable_ecn);
config.set_ecn_use_ect1(conn_args.use_ect1);

let mut keylog = None;

if let Some(keylog_path) = std::env::var_os("SSLKEYLOGFILE") {
Expand Down Expand Up @@ -224,7 +229,15 @@ pub fn connect(

let (write, send_info) = conn.send(&mut out).expect("initial send failed");

while let Err(e) = socket.send_to(&out[..write], send_info.to) {
while let Err(e) = send_to(
&socket,
&out[..write],
&send_info,
MAX_DATAGRAM_SIZE,
false,
false,
conn_args.enable_ecn,
) {
if e.kind() == std::io::ErrorKind::WouldBlock {
trace!(
"{} -> {}: send() would block",
Expand Down Expand Up @@ -274,7 +287,12 @@ pub fn connect(

let local_addr = socket.local_addr().unwrap();
'read: loop {
let (len, from) = match socket.recv_from(&mut buf) {
let (len, recv_info) = match recv_from(
socket,
local_addr,
&mut buf,
conn_args.enable_ecn,
) {
Ok(v) => v,

Err(e) => {
Expand Down Expand Up @@ -304,11 +322,6 @@ pub fn connect(

pkt_count += 1;

let recv_info = quiche::RecvInfo {
to: local_addr,
from,
};

// Process potentially coalesced packets.
let read = match conn.recv(&mut buf[..len], recv_info) {
Ok(v) => v,
Expand Down Expand Up @@ -528,7 +541,15 @@ pub fn connect(
},
};

if let Err(e) = socket.send_to(&out[..write], send_info.to) {
if let Err(e) = send_to(
socket,
&out[..write],
&send_info,
MAX_DATAGRAM_SIZE,
false,
false,
conn_args.enable_ecn,
) {
if e.kind() == std::io::ErrorKind::WouldBlock {
trace!(
"{} -> {}: send() would block",
Expand Down
144 changes: 144 additions & 0 deletions apps/src/cmsg.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
// Copyright (C) 2022, Cloudflare, Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright notice,
// this list of conditions and the following disclaimer.
//
// * Redistributions in binary form must reproduce the above copyright
// notice, this list of conditions and the following disclaimer in the
// documentation and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
// Taken from https://github.com/mxinden/udp-socket/blob/master/src/cmsg.rs,
// which is itself inspired from
// https://github.com/quinn-rs/quinn/blob/main/quinn-udp/src/cmsg.rs.

use std::mem;
use std::ptr;

#[derive(Copy, Clone)]
#[repr(align(8))] // Conservative bound for align_of<cmsghdr>
pub struct Aligned<T>(pub T);

/// Helper to encode a series of control messages ("cmsgs") to a buffer for use
/// in `sendmsg`.
///
/// The operation must be "finished" for the msghdr to be usable, either by
/// calling `finish` explicitly or by dropping the `Encoder`.
pub struct Encoder<'a> {
hdr: &'a mut libc::msghdr,
cmsg: Option<&'a mut libc::cmsghdr>,
len: usize,
}

impl<'a> Encoder<'a> {
/// # Safety
/// - `hdr.msg_control` must be a suitably aligned pointer to
/// `hdr.msg_controllen` bytes that can be safely written
/// - The `Encoder` must be dropped before `hdr` is passed to a system call,
/// and must not be leaked.
pub unsafe fn new(hdr: &'a mut libc::msghdr) -> Self {
Self {
cmsg: libc::CMSG_FIRSTHDR(hdr).as_mut(),
hdr,
len: 0,
}
}

/// Append a control message to the buffer.
///
/// # Panics
/// - If insufficient buffer space remains.
/// - If `T` has stricter alignment requirements than `cmsghdr`
pub fn push<T: Copy + ?Sized>(
&mut self, level: libc::c_int, ty: libc::c_int, value: T,
) {
assert!(mem::align_of::<T>() <= mem::align_of::<libc::cmsghdr>());
let space =
unsafe { libc::CMSG_SPACE(mem::size_of_val(&value) as _) as usize };
assert!(
self.hdr.msg_controllen >= self.len + space,
"control message buffer too small"
);
let cmsg = self.cmsg.take().expect("no control buffer space remaining");
cmsg.cmsg_level = level;
cmsg.cmsg_type = ty;
cmsg.cmsg_len =
unsafe { libc::CMSG_LEN(mem::size_of_val(&value) as _) } as _;
unsafe {
ptr::write(libc::CMSG_DATA(cmsg) as *const T as *mut T, value);
}
self.len += space;
self.cmsg = unsafe { libc::CMSG_NXTHDR(self.hdr, cmsg).as_mut() };
}

/// Finishes appending control messages to the buffer
pub fn finish(self) {
// Delegates to the `Drop` impl
}
}

// Statically guarantees that the encoding operation is "finished" before the
// control buffer is read by `sendmsg`.
impl<'a> Drop for Encoder<'a> {
fn drop(&mut self) {
self.hdr.msg_controllen = self.len as _;
}
}

/// # Safety
///
/// `cmsg` must refer to a cmsg containing a payload of type `T`
pub unsafe fn decode<T: Copy>(cmsg: &libc::cmsghdr) -> T {
assert!(mem::align_of::<T>() <= mem::align_of::<libc::cmsghdr>());
debug_assert_eq!(
cmsg.cmsg_len,
libc::CMSG_LEN(mem::size_of::<T>() as _) as usize
);
ptr::read(libc::CMSG_DATA(cmsg) as *const T)
}

pub struct Iter<'a> {
hdr: &'a libc::msghdr,
cmsg: Option<&'a libc::cmsghdr>,
}

impl<'a> Iter<'a> {
/// # Safety
///
/// `hdr.msg_control` must point to memory outliving `'a` which can be
/// soundly read for the lifetime of the constructed `Iter` and contains
/// a buffer of cmsgs, i.e. is aligned for `cmsghdr`, is fully
/// initialized, and has correct internal links.
pub unsafe fn new(hdr: &'a libc::msghdr) -> Self {
Self {
hdr,
cmsg: libc::CMSG_FIRSTHDR(hdr).as_ref(),
}
}
}

impl<'a> Iterator for Iter<'a> {
type Item = &'a libc::cmsghdr;

fn next(&mut self) -> Option<&'a libc::cmsghdr> {
let current = self.cmsg.take()?;
self.cmsg = unsafe { libc::CMSG_NXTHDR(self.hdr, current).as_ref() };
Some(current)
}
}
Loading
Loading