From bc039d9ecd141f5c0aec3e8b4ee63887324ba05a Mon Sep 17 00:00:00 2001 From: Alessandro Ghedini Date: Sat, 23 Jul 2022 11:58:32 -0700 Subject: [PATCH 01/11] report ECN counts This adds support for reporting incoming ECN counts via the "ECN Counts" field in ACK frames. Applications can report the ECN marking for incoming packets through the new `ecn` field of `RecvInfo`, and internally quiche will track the ECN counters, and creates ACK frames with the ECN Counts field. TODO: * Fetch ECN markings for incoming packets in apps. --- apps/src/bin/quiche-server.rs | 1 + apps/src/client.rs | 1 + fuzz/src/packet_recv_client.rs | 2 +- fuzz/src/packet_recv_server.rs | 2 +- quiche/examples/client.rs | 1 + quiche/examples/http3-client.rs | 1 + quiche/examples/http3-server.rs | 1 + quiche/examples/server.rs | 1 + quiche/include/quiche.h | 3 + quiche/src/ffi.rs | 2 + quiche/src/frame.rs | 13 +--- quiche/src/lib.rs | 110 +++++++++++++++++++++++++++++++- quiche/src/packet.rs | 11 ++++ tools/http3_test/src/runner.rs | 1 + 14 files changed, 136 insertions(+), 14 deletions(-) diff --git a/apps/src/bin/quiche-server.rs b/apps/src/bin/quiche-server.rs index ac632dc064..fa06b7ec4e 100644 --- a/apps/src/bin/quiche-server.rs +++ b/apps/src/bin/quiche-server.rs @@ -407,6 +407,7 @@ fn main() { let recv_info = quiche::RecvInfo { to: local_addr, from, + ecn: 0, }; // Process potentially coalesced packets. diff --git a/apps/src/client.rs b/apps/src/client.rs index 86fb650776..b6a0c2e3ee 100644 --- a/apps/src/client.rs +++ b/apps/src/client.rs @@ -307,6 +307,7 @@ pub fn connect( let recv_info = quiche::RecvInfo { to: local_addr, from, + ecn: 0, }; // Process potentially coalesced packets. diff --git a/fuzz/src/packet_recv_client.rs b/fuzz/src/packet_recv_client.rs index 7fc25bf7e2..5b693f92b8 100644 --- a/fuzz/src/packet_recv_client.rs +++ b/fuzz/src/packet_recv_client.rs @@ -46,7 +46,7 @@ fuzz_target!(|data: &[u8]| { ) .unwrap(); - let info = quiche::RecvInfo { from, to }; + let info = quiche::RecvInfo { from, to, ecn: 0 }; conn.recv(&mut buf, info).ok(); }); diff --git a/fuzz/src/packet_recv_server.rs b/fuzz/src/packet_recv_server.rs index 51a1612077..63bf0cd09e 100644 --- a/fuzz/src/packet_recv_server.rs +++ b/fuzz/src/packet_recv_server.rs @@ -47,7 +47,7 @@ fuzz_target!(|data: &[u8]| { quiche::accept(&SCID, None, to, from, &mut CONFIG.lock().unwrap()) .unwrap(); - let info = quiche::RecvInfo { from, to }; + let info = quiche::RecvInfo { from, to, ecn: 0 }; conn.recv(&mut buf, info).ok(); }); diff --git a/quiche/examples/client.rs b/quiche/examples/client.rs index 9a5fb3957a..b82d3a65c1 100644 --- a/quiche/examples/client.rs +++ b/quiche/examples/client.rs @@ -172,6 +172,7 @@ fn main() { let recv_info = quiche::RecvInfo { to: socket.local_addr().unwrap(), from, + ecn: 0, }; // Process potentially coalesced packets. diff --git a/quiche/examples/http3-client.rs b/quiche/examples/http3-client.rs index 92646e0dfd..6eb7622e1f 100644 --- a/quiche/examples/http3-client.rs +++ b/quiche/examples/http3-client.rs @@ -191,6 +191,7 @@ fn main() { let recv_info = quiche::RecvInfo { to: local_addr, from, + ecn: 0, }; // Process potentially coalesced packets. diff --git a/quiche/examples/http3-server.rs b/quiche/examples/http3-server.rs index 3314e50799..d9f827adf0 100644 --- a/quiche/examples/http3-server.rs +++ b/quiche/examples/http3-server.rs @@ -292,6 +292,7 @@ fn main() { let recv_info = quiche::RecvInfo { to: socket.local_addr().unwrap(), from, + ecn: 0, }; // Process potentially coalesced packets. diff --git a/quiche/examples/server.rs b/quiche/examples/server.rs index 496b51caa6..376b6d3687 100644 --- a/quiche/examples/server.rs +++ b/quiche/examples/server.rs @@ -289,6 +289,7 @@ fn main() { let recv_info = quiche::RecvInfo { to: socket.local_addr().unwrap(), from, + ecn: 0, }; // Process potentially coalesced packets. diff --git a/quiche/include/quiche.h b/quiche/include/quiche.h index c0d1406b7c..5f964f6ba4 100644 --- a/quiche/include/quiche.h +++ b/quiche/include/quiche.h @@ -332,6 +332,9 @@ typedef struct { // The local address the packet was received on. struct sockaddr *to; socklen_t to_len; + + // The ECN marking on the incoming packet. + uint8_t ecn; } quiche_recv_info; // Processes QUIC packets received from the peer. diff --git a/quiche/src/ffi.rs b/quiche/src/ffi.rs index 0183730de4..4a0804c56f 100644 --- a/quiche/src/ffi.rs +++ b/quiche/src/ffi.rs @@ -720,6 +720,7 @@ pub struct RecvInfo<'a> { from_len: socklen_t, to: &'a sockaddr, to_len: socklen_t, + ecn: u8, } impl<'a> From<&RecvInfo<'a>> for crate::RecvInfo { @@ -727,6 +728,7 @@ impl<'a> From<&RecvInfo<'a>> for crate::RecvInfo { crate::RecvInfo { from: std_addr_from_c(info.from, info.from_len), to: std_addr_from_c(info.to, info.to_len), + ecn: info.ecn, } } } diff --git a/quiche/src/frame.rs b/quiche/src/frame.rs index 3ecfcd3707..a8ebf14f81 100644 --- a/quiche/src/frame.rs +++ b/quiche/src/frame.rs @@ -47,13 +47,6 @@ pub const MAX_DGRAM_OVERHEAD: usize = 2; pub const MAX_STREAM_OVERHEAD: usize = 12; pub const MAX_STREAM_SIZE: u64 = 1 << 62; -#[derive(Clone, Debug, PartialEq, Eq)] -pub struct EcnCounts { - ect0_count: u64, - ect1_count: u64, - ecn_ce_count: u64, -} - #[derive(Clone, PartialEq, Eq)] pub enum Frame { Padding { @@ -65,7 +58,7 @@ pub enum Frame { ACK { ack_delay: u64, ranges: ranges::RangeSet, - ecn_counts: Option, + ecn_counts: Option, }, ResetStream { @@ -1207,7 +1200,7 @@ fn parse_ack_frame(ty: u64, b: &mut octets::Octets) -> Result { } let ecn_counts = if first & 0x01 != 0 { - let ecn = EcnCounts { + let ecn = packet::EcnCounts { ect0_count: b.get_varint()?, ect1_count: b.get_varint()?, ecn_ce_count: b.get_varint()?, @@ -1427,7 +1420,7 @@ mod tests { ranges.insert(15..19); ranges.insert(3000..5000); - let ecn_counts = Some(EcnCounts { + let ecn_counts = Some(packet::EcnCounts { ect0_count: 100, ect1_count: 200, ecn_ce_count: 300, diff --git a/quiche/src/lib.rs b/quiche/src/lib.rs index c0286e4102..48f15c5366 100644 --- a/quiche/src/lib.rs +++ b/quiche/src/lib.rs @@ -116,13 +116,14 @@ //! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]); //! # let peer = "127.0.0.1:1234".parse().unwrap(); //! # let local = "127.0.0.1:4321".parse().unwrap(); +//! # let ecn = 0; //! # let mut conn = quiche::accept(&scid, None, local, peer, &mut config)?; //! let to = socket.local_addr().unwrap(); //! //! loop { //! let (read, from) = socket.recv_from(&mut buf).unwrap(); //! -//! let recv_info = quiche::RecvInfo { from, to }; +//! let recv_info = quiche::RecvInfo { from, to, ecn }; //! //! let read = match conn.recv(&mut buf[..read], recv_info) { //! Ok(v) => v, @@ -636,6 +637,14 @@ pub struct RecvInfo { /// The local address the packet was received on. pub to: SocketAddr, + + /// The ECN marking on the incoming packet. + /// + /// `0` means no ECN marking has been received, while non-zero values are + /// described in [RFC3168]. + /// + /// [RFC3168]: https://datatracker.ietf.org/doc/html/rfc3168#section-5 + pub ecn: u8, } /// Ancillary information about outgoing packets. @@ -2104,6 +2113,7 @@ impl Connection { /// # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]); /// # let peer = "127.0.0.1:1234".parse().unwrap(); /// # let local = socket.local_addr().unwrap(); + /// # let ecn = 0; /// # let mut conn = quiche::accept(&scid, None, local, peer, &mut config)?; /// loop { /// let (read, from) = socket.recv_from(&mut buf).unwrap(); @@ -2111,6 +2121,7 @@ impl Connection { /// let recv_info = quiche::RecvInfo { /// from, /// to: local, + /// ecn, /// }; /// /// let read = match conn.recv(&mut buf[..read], recv_info) { @@ -2131,6 +2142,10 @@ impl Connection { return Err(Error::BufferTooShort); } + if info.ecn > 0x03 { + return Err(Error::InvalidPacket); + } + let recv_pid = self.paths.path_id_from_addrs(&(info.to, info.from)); if let Some(recv_pid) = recv_pid { @@ -2972,6 +2987,30 @@ impl Connection { self.pkt_num_spaces[epoch].largest_rx_pkt_num = cmp::max(self.pkt_num_spaces[epoch].largest_rx_pkt_num, pn); + if info.ecn > 0 { + // Create the `EcnCounts` struct if this is the first non-zero ECN + // value received. + if self.pkt_num_spaces[epoch].ecn_counts.is_none() { + self.pkt_num_spaces[epoch].ecn_counts = + Some(packet::EcnCounts::default()); + } + + if let Some(ref mut ecn_counts) = + self.pkt_num_spaces[epoch].ecn_counts + { + match info.ecn { + // ECN capable (ECT(0)). + 0x01 => ecn_counts.ect0_count += 1, + 0x02 => ecn_counts.ect1_count += 1, + + // Congestion event (ECN-CE). + 0x03 => ecn_counts.ecn_ce_count += 1, + + _ => unreachable!(), + }; + } + } + if !probing { self.pkt_num_spaces[epoch].largest_rx_non_probing_pkt_num = cmp::max( self.pkt_num_spaces[epoch].largest_rx_non_probing_pkt_num, @@ -3575,7 +3614,7 @@ impl Connection { let frame = frame::Frame::ACK { ack_delay, ranges: pkt_space.recv_pkt_need_ack.clone(), - ecn_counts: None, // sending ECN is not supported at this time + ecn_counts: pkt_space.ecn_counts, }; // When a PING frame needs to be sent, avoid sending the ACK if @@ -8282,6 +8321,7 @@ pub mod testing { let info = RecvInfo { to: server_path.peer_addr(), from: server_path.local_addr(), + ecn: 0, }; self.client.recv(buf, info) @@ -8292,6 +8332,7 @@ pub mod testing { let info = RecvInfo { to: client_path.peer_addr(), from: client_path.local_addr(), + ecn: 0, }; self.server.recv(buf, info) @@ -8345,6 +8386,7 @@ pub mod testing { let info = RecvInfo { to: active_path.local_addr(), from: active_path.peer_addr(), + ecn: 0, }; conn.recv(&mut buf[..len], info)?; @@ -8369,6 +8411,7 @@ pub mod testing { let info = RecvInfo { to: si.to, from: si.from, + ecn: 0, }; conn.recv(&mut pkt, info)?; @@ -15304,6 +15347,7 @@ mod tests { let ri = RecvInfo { to: si.to, from: si.from, + ecn: 0, }; assert_eq!(pipe.server.recv(&mut buf[..sent], ri), Ok(sent)); @@ -15348,6 +15392,7 @@ mod tests { let ri = RecvInfo { to: si.to, from: si.from, + ecn: 0, }; assert_eq!(pipe.server.recv(&mut buf[..sent], ri), Ok(sent)); @@ -15365,6 +15410,7 @@ mod tests { let ri = RecvInfo { to: si.to, from: si.from, + ecn: 0, }; assert_eq!(pipe.server.recv(&mut buf[..sent], ri), Ok(sent)); @@ -15383,6 +15429,7 @@ mod tests { let ri = RecvInfo { to: si.to, from: si.from, + ecn: 0, }; assert_eq!(pipe.server.recv(&mut buf[..sent], ri), Ok(sent)); @@ -15400,6 +15447,7 @@ mod tests { let ri = RecvInfo { to: si.to, from: si.from, + ecn: 0, }; assert_eq!(pipe.server.recv(&mut buf[..sent], ri), Ok(sent)); @@ -16301,6 +16349,7 @@ mod tests { .recv(&mut pkt_buf[..written], RecvInfo { to: server_addr, from: client_addr_2, + ecn: 0, }) .expect("server receive path challenge"); @@ -16310,6 +16359,63 @@ mod tests { .paths_iter(server_addr) .any(|path| path == client_addr_2)); } + + /// Tests that incoming ECN values are reported back to the sender. + #[test] + fn ecn_reporting() { + let mut buf = [0; 65535]; + + let mut pipe = testing::Pipe::new().unwrap(); + assert_eq!(pipe.handshake(), Ok(())); + + let pkt_type = packet::Type::Short; + + let frames = [frame::Frame::Stream { + stream_id: 0, + data: stream::RangeBuf::from(b"aaaaa", 0, true), + }]; + + let written = + testing::encode_pkt(&mut pipe.client, pkt_type, &frames, &mut buf) + .unwrap(); + + let active_path = pipe.server.paths.get_active().unwrap(); + + let info = RecvInfo { + from: active_path.peer_addr(), + to: active_path.local_addr(), + ecn: 0x03, + }; + + // Server receives packet with ECN markings. + let len = pipe.server.recv(&mut buf[..written], info).unwrap(); + assert_eq!(len, written); + + // Server sends ACK_ECN freame. + let (len, _) = pipe.server.send(&mut buf).unwrap(); + + let frames = + testing::decode_pkt(&mut pipe.client, &mut buf[..len]).unwrap(); + let mut iter = frames.iter(); + + let mut ranges = ranges::RangeSet::default(); + ranges.push_item(0); + + let ecn_counts = Some(packet::EcnCounts { + ect0_count: 0, + ect1_count: 0, + ecn_ce_count: 1, + }); + + assert_eq!( + iter.next(), + Some(&frame::Frame::ACK { + ack_delay: 0, + ranges, + ecn_counts, + }) + ); + } } pub use crate::packet::ConnectionId; diff --git a/quiche/src/packet.rs b/quiche/src/packet.rs index b785f6eed5..12f74972c4 100644 --- a/quiche/src/packet.rs +++ b/quiche/src/packet.rs @@ -870,6 +870,8 @@ pub struct PktNumSpace { pub key_update: Option, + pub ecn_counts: Option, + pub crypto_open: Option, pub crypto_seal: Option, @@ -898,6 +900,8 @@ impl PktNumSpace { key_update: None, + ecn_counts: None, + crypto_open: None, crypto_seal: None, @@ -988,6 +992,13 @@ impl PktNumWindow { } } +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] +pub struct EcnCounts { + pub ect0_count: u64, + pub ect1_count: u64, + pub ecn_ce_count: u64, +} + #[cfg(test)] mod tests { use super::*; diff --git a/tools/http3_test/src/runner.rs b/tools/http3_test/src/runner.rs index 75c7e450e3..6adcdd588c 100644 --- a/tools/http3_test/src/runner.rs +++ b/tools/http3_test/src/runner.rs @@ -187,6 +187,7 @@ pub fn run( let recv_info = quiche::RecvInfo { from, to: local_addr, + ecn: 0, }; // Process potentially coalesced packets. From 92443182c26bfddd69ab306f6097b0d6c551073a Mon Sep 17 00:00:00 2001 From: Alessandro Ghedini Date: Sat, 23 Jul 2022 12:43:10 -0700 Subject: [PATCH 02/11] mark outgoing packets with ECN TODO: * Set ECN markings for outgoing packets in apps. * Probe ECN for each path. * Validate ECN counts in ACKs. --- quiche/include/quiche.h | 3 ++ quiche/src/lib.rs | 11 ++++- quiche/src/recovery/bbr/mod.rs | 7 ++++ quiche/src/recovery/bbr2/mod.rs | 6 +++ quiche/src/recovery/delivery_rate.rs | 1 + quiche/src/recovery/mod.rs | 63 +++++++++++++++++++++++++++- 6 files changed, 88 insertions(+), 3 deletions(-) diff --git a/quiche/include/quiche.h b/quiche/include/quiche.h index 5f964f6ba4..9c1f7fcc77 100644 --- a/quiche/include/quiche.h +++ b/quiche/include/quiche.h @@ -352,6 +352,9 @@ typedef struct { // The time to send the packet out. struct timespec at; + + // The ECN marking for the outgoing packet. + uint8_t ecn; } quiche_send_info; // Writes a single QUIC packet to be sent to the peer. diff --git a/quiche/src/lib.rs b/quiche/src/lib.rs index 48f15c5366..ec7295948f 100644 --- a/quiche/src/lib.rs +++ b/quiche/src/lib.rs @@ -376,6 +376,7 @@ //! [boring]: https://crates.io/crates/boring //! [qlog]: https://datatracker.ietf.org/doc/html/draft-ietf-quic-qlog-main-schema +#![allow(clippy::too_many_arguments)] #![allow(clippy::upper_case_acronyms)] #![warn(missing_docs)] #![cfg_attr(docsrs, feature(doc_cfg))] @@ -662,6 +663,9 @@ pub struct SendInfo { /// /// [Pacing]: index.html#pacing pub at: time::Instant, + + /// The ECN marking for the outgoing packet. + pub ecn: u8, } /// Represents information carried by `CONNECTION_CLOSE` frames. @@ -3330,6 +3334,8 @@ impl Connection { to: send_path.peer_addr(), at: send_path.recovery.get_packet_send_time(), + + ecn: 0x01, // ECT(0) }; Ok((done, info)) @@ -6682,7 +6688,9 @@ impl Connection { frame::Frame::Ping => (), frame::Frame::ACK { - ranges, ack_delay, .. + ranges, + ack_delay, + ecn_counts, } => { let ack_delay = ack_delay .checked_mul(2_u64.pow( @@ -6709,6 +6717,7 @@ impl Connection { let (lost_packets, lost_bytes) = p.recovery.on_ack_received( &ranges, ack_delay, + ecn_counts, epoch, handshake_status, now, diff --git a/quiche/src/recovery/bbr/mod.rs b/quiche/src/recovery/bbr/mod.rs index 07683dc0ae..fa143009d6 100644 --- a/quiche/src/recovery/bbr/mod.rs +++ b/quiche/src/recovery/bbr/mod.rs @@ -454,6 +454,7 @@ mod tests { r.on_ack_received( &acked, 25, + None, packet::Epoch::Application, HandshakeStatus::default(), now, @@ -525,6 +526,7 @@ mod tests { r.on_ack_received( &acked, 25, + None, packet::Epoch::Application, HandshakeStatus::default(), now, @@ -594,6 +596,7 @@ mod tests { r.on_ack_received( &acked, 25, + None, packet::Epoch::Application, HandshakeStatus::default(), now, @@ -648,6 +651,7 @@ mod tests { r.on_ack_received( &acked, 25, + None, packet::Epoch::Application, HandshakeStatus::default(), now, @@ -714,6 +718,7 @@ mod tests { r.on_ack_received( &acked, 25, + None, packet::Epoch::Application, HandshakeStatus::default(), now, @@ -787,6 +792,7 @@ mod tests { r.on_ack_received( &acked, 25, + None, packet::Epoch::Application, HandshakeStatus::default(), now, @@ -843,6 +849,7 @@ mod tests { r.on_ack_received( &acked, 25, + None, packet::Epoch::Application, HandshakeStatus::default(), now, diff --git a/quiche/src/recovery/bbr2/mod.rs b/quiche/src/recovery/bbr2/mod.rs index 46ed13baf1..48b79d2feb 100644 --- a/quiche/src/recovery/bbr2/mod.rs +++ b/quiche/src/recovery/bbr2/mod.rs @@ -766,6 +766,7 @@ mod tests { .on_ack_received( &acked, 25, + None, packet::Epoch::Application, HandshakeStatus::default(), now, @@ -836,6 +837,7 @@ mod tests { .on_ack_received( &acked, 25, + None, packet::Epoch::Application, HandshakeStatus::default(), now, @@ -908,6 +910,7 @@ mod tests { .on_ack_received( &acked, 25, + None, packet::Epoch::Application, HandshakeStatus::default(), now, @@ -961,6 +964,7 @@ mod tests { .on_ack_received( &acked, 25, + None, packet::Epoch::Application, HandshakeStatus::default(), now, @@ -1029,6 +1033,7 @@ mod tests { .on_ack_received( &acked, 25, + None, packet::Epoch::Application, HandshakeStatus::default(), now, @@ -1084,6 +1089,7 @@ mod tests { .on_ack_received( &acked, 25, + None, packet::Epoch::Application, HandshakeStatus::default(), now, diff --git a/quiche/src/recovery/delivery_rate.rs b/quiche/src/recovery/delivery_rate.rs index 590bac22cb..a5a866e6c6 100644 --- a/quiche/src/recovery/delivery_rate.rs +++ b/quiche/src/recovery/delivery_rate.rs @@ -374,6 +374,7 @@ mod tests { r.on_ack_received( &acked, 25, + None, packet::Epoch::Application, HandshakeStatus::default(), now, diff --git a/quiche/src/recovery/mod.rs b/quiche/src/recovery/mod.rs index a7f0b35ae7..0106c062a7 100644 --- a/quiche/src/recovery/mod.rs +++ b/quiche/src/recovery/mod.rs @@ -142,6 +142,8 @@ pub struct Recovery { max_datagram_size: usize, + ecn_ce_counts: [u64; packet::Epoch::count()], + cubic_state: cubic::State, // HyStart++. @@ -267,6 +269,8 @@ impl Recovery { max_datagram_size: recovery_config.max_send_udp_payload_size, + ecn_ce_counts: [0, 0, 0], + cc_ops: recovery_config.cc_ops, delivery_rate: delivery_rate::Rate::default(), @@ -437,8 +441,9 @@ impl Recovery { #[allow(clippy::too_many_arguments)] pub fn on_ack_received( &mut self, ranges: &ranges::RangeSet, ack_delay: u64, - epoch: packet::Epoch, handshake_status: HandshakeStatus, now: Instant, - trace_id: &str, newly_acked: &mut Vec, + ecn_counts: Option, epoch: packet::Epoch, + handshake_status: HandshakeStatus, now: Instant, trace_id: &str, + newly_acked: &mut Vec, ) -> Result<(usize, usize)> { let largest_acked = ranges.last().unwrap(); @@ -459,6 +464,7 @@ impl Recovery { let mut has_ack_eliciting = false; let mut largest_newly_acked_pkt_num = 0; + let mut largest_newly_acked_size = 0; let mut largest_newly_acked_sent_time = now; let mut undo_cwnd = false; @@ -531,6 +537,7 @@ impl Recovery { } largest_newly_acked_pkt_num = unacked.pkt_num; + largest_newly_acked_size = unacked.size; largest_newly_acked_sent_time = unacked.time_sent; self.acked[epoch].extend(unacked.frames.drain(..)); @@ -593,6 +600,36 @@ impl Recovery { } } + if let Some(ecn_counts) = ecn_counts { + // See if we can not do better. + let largest_unacked = newly_acked.last().unwrap(); + let largest_sent = Sent { + pkt_num: largest_unacked.pkt_num, + frames: SmallVec::new(), + time_sent: largest_unacked.time_sent, + time_acked: Some(now), + time_lost: None, + size: largest_unacked.size, + ack_eliciting: true, + in_flight: true, + delivered: largest_unacked.delivered, + delivered_time: largest_unacked.delivered_time, + first_sent_time: largest_unacked.first_sent_time, + is_app_limited: largest_unacked.is_app_limited, + tx_in_flight: largest_unacked.tx_in_flight, + lost: largest_unacked.lost, + has_data: true, + }; + self.process_ecn( + ecn_counts, + largest_newly_acked_size, + // We just checked that that list was not empty. + &largest_sent, + epoch, + now, + ); + } + // Detect and mark lost packets without removing them from the sent // packets list. let (lost_packets, lost_bytes) = @@ -1057,6 +1094,22 @@ impl Recovery { } } + fn process_ecn( + &mut self, ecn_counts: packet::EcnCounts, largest_acked_size: usize, + largest_acked_sent: &Sent, epoch: packet::Epoch, now: Instant, + ) { + if ecn_counts.ecn_ce_count > self.ecn_ce_counts[epoch] { + self.ecn_ce_counts[epoch] = ecn_counts.ecn_ce_count; + + self.congestion_event( + largest_acked_size, + largest_acked_sent, + epoch, + now, + ); + } + } + fn congestion_event( &mut self, lost_bytes: usize, largest_lost_pkt: &Sent, epoch: packet::Epoch, now: Instant, @@ -1624,6 +1677,7 @@ mod tests { r.on_ack_received( &acked, 25, + None, packet::Epoch::Application, HandshakeStatus::default(), now, @@ -1714,6 +1768,7 @@ mod tests { r.on_ack_received( &acked, 25, + None, packet::Epoch::Application, HandshakeStatus::default(), now, @@ -1872,6 +1927,7 @@ mod tests { r.on_ack_received( &acked, 25, + None, packet::Epoch::Application, HandshakeStatus::default(), now, @@ -2040,6 +2096,7 @@ mod tests { r.on_ack_received( &acked, 25, + None, packet::Epoch::Application, HandshakeStatus::default(), now, @@ -2060,6 +2117,7 @@ mod tests { r.on_ack_received( &acked, 25, + None, packet::Epoch::Application, HandshakeStatus::default(), now, @@ -2142,6 +2200,7 @@ mod tests { r.on_ack_received( &acked, 10, + None, packet::Epoch::Application, HandshakeStatus::default(), now, From 932ec2a08a1d39685719e891712ca5d31616b3ff Mon Sep 17 00:00:00 2001 From: Quentin De Coninck Date: Thu, 6 Oct 2022 14:34:18 +0200 Subject: [PATCH 03/11] ECN ACK validation and probing on paths --- quiche/src/ecn.rs | 415 +++++++++++++++++++++++++++ quiche/src/lib.rs | 40 ++- quiche/src/recovery/bbr/mod.rs | 7 + quiche/src/recovery/bbr2/mod.rs | 6 + quiche/src/recovery/cubic.rs | 11 + quiche/src/recovery/delivery_rate.rs | 3 + quiche/src/recovery/mod.rs | 115 +++++--- quiche/src/recovery/reno.rs | 4 + 8 files changed, 557 insertions(+), 44 deletions(-) create mode 100644 quiche/src/ecn.rs diff --git a/quiche/src/ecn.rs b/quiche/src/ecn.rs new file mode 100644 index 0000000000..162c8f5785 --- /dev/null +++ b/quiche/src/ecn.rs @@ -0,0 +1,415 @@ +// Copyright (C) 2022, Cloudflare, Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright notice, +// this list of conditions and the following disclaimer. +// +// * Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS +// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, +// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR +// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use crate::packet; +use crate::packet::Epoch; + +const ECN_VALIDATION_COUNT: u64 = 10; +const ECN_LOSS_THRESHOLD: u64 = 3; + +pub const ECN_NOT_ECT: u8 = 0; +pub const ECN_ECT1: u8 = 1; +pub const ECN_ECT0: u8 = 2; +pub const ECN_CE: u8 = 3; + +#[derive(Debug)] +enum EcnState { + /// Send up to 10 packets marked to ECT(0) or ECT(1) and move to + /// `Validating`. + Probing(u8), + /// Stop sending ECN-enabled packets, but wait for possible ACK reception + /// confirming ECN usage or disabling it. + Validating(u8), + /// Validation succeeded, always send ECT(0) or ECT(1) packets. + Capable(u8), + /// Either ECN is disabled or validation failed. No ECN-enabled packet is + /// sent. + Unsupported, +} + +/// Structure keeping the ECN state. +#[derive(Debug)] +pub struct Ecn { + state: EcnState, + ecn_pkts_sent: u64, + ecn_pkts_lost: u64, + last_ecn_counts: [packet::EcnCounts; packet::Epoch::count()], + enabled: bool, + use_ect1: bool, +} + +impl Ecn { + /// Creates a new `Ecn` structure. + pub fn new( + enabled: bool, use_ect1: bool, + base_ecn_counts: [packet::EcnCounts; packet::Epoch::count()], + ) -> Ecn { + let state = if enabled { + let ect_value = if use_ect1 { ECN_ECT1 } else { ECN_ECT0 }; + EcnState::Probing(ect_value) + } else { + EcnState::Unsupported + }; + Ecn { + state, + ecn_pkts_sent: 0, + ecn_pkts_lost: 0, + last_ecn_counts: base_ecn_counts, + enabled, + use_ect1, + } + } + + /// An endpoint thus attempts to use ECN and validates this for each new + /// connection, when switching to a server's preferred address, and on + /// active connection migration to a new path. + pub fn reset( + &mut self, base_ecn_counts: [packet::EcnCounts; packet::Epoch::count()], + ) { + self.state = if self.enabled { + let ect_value = if self.use_ect1 { ECN_ECT1 } else { ECN_ECT0 }; + EcnState::Probing(ect_value) + } else { + EcnState::Unsupported + }; + self.ecn_pkts_sent = 0; + self.last_ecn_counts = base_ecn_counts; + } + + /// Returns the ECN value to assign to the packet to send. + /// + /// By calling this method, we assume that the packet to be marked is + /// actually sent. When probing a path for ECN-capability, this method may + /// return different values if too many packets were sent (calling this + /// method) before getting any acknowledgment ([`on_ack_received()`]). + /// + /// [`on_ack_received()`]: struct.Ecn.html#method.on_ack_received + pub fn get_ecn_value_to_send(&mut self) -> u8 { + match self.state { + EcnState::Probing(ect_value) => { + self.ecn_pkts_sent += 1; + if self.ecn_pkts_sent >= ECN_VALIDATION_COUNT { + self.state = EcnState::Validating(ect_value); + } + ect_value + }, + + EcnState::Capable(ect_value) => ect_value, + + EcnState::Validating(_) | EcnState::Unsupported => ECN_NOT_ECT, + } + } + + /// Returns whether the next packet to send will be ECN-marked. + pub fn is_next_sent_pkt_ecn_marked(&self) -> bool { + matches!(self.state, EcnState::Probing(_) | EcnState::Capable(_)) + } + + /// This method should be called when we receive an ACK frame increasing the + /// largest acknowledged packet number that acknowledged ECN-marked sent + /// packets. + /// + /// This returns the number of newly acknowledged packets with ECN-CE mark. + pub fn on_ack_received( + &mut self, epoch: Epoch, newly_ecn_marked_acked: u64, + ecn_counts: Option, + ) -> u64 { + if matches!(self.state, EcnState::Unsupported) { + return 0; + } + // If an ACK frame newly acknowledges a packet that the endpoint sent + // with either the ECT(0) or ECT(1) codepoint set, ECN validation fails + // if the corresponding ECN counts are not present in the ACK frame. + let ecn_counts = match ecn_counts { + Some(e) => e, + None => { + warn!("received ACK without ECN counts; disable ECN"); + self.state = EcnState::Unsupported; + return 0; + }, + }; + + let last_ecn_counts = self.last_ecn_counts[epoch]; + + // Validation will fail when an endpoint receives a non-zero ECN count + // corresponding to an ECT codepoint that it never applied. + if (self.use_ect1 && ecn_counts.ect0_count != last_ecn_counts.ect0_count) || + (!self.use_ect1 && + ecn_counts.ect1_count != last_ecn_counts.ect1_count) + { + warn!("received ECN count increase for unsent marking; disable ECN"); + self.state = EcnState::Unsupported; + return 0; + } + + // Strange reordering cases or buggy implementations may provide lower + // ECN counts than the last remembered values. Skip their processing. + if ecn_counts.ect0_count < last_ecn_counts.ect0_count || + ecn_counts.ect1_count < last_ecn_counts.ect1_count || + ecn_counts.ecn_ce_count < last_ecn_counts.ecn_ce_count + { + return 0; + } + + // ECN validation also fails if the sum of the increase in ECT(0) and + // ECN-CE counts is less than the number of newly acknowledged packets + // that were originally sent with an ECT(0) marking. Similarly, ECN + // validation fails if the sum of the increases to ECT(1) and ECN-CE + // counts is less than the number of newly acknowledged packets sent + // with an ECT(1) marking. + let ecn_count_increase = ecn_counts.ect0_count - + last_ecn_counts.ect0_count + + ecn_counts.ect1_count - + last_ecn_counts.ect1_count + + ecn_counts.ecn_ce_count - + last_ecn_counts.ecn_ce_count; + if ecn_count_increase < newly_ecn_marked_acked { + warn!("sum of ECN count increase lower than number of ECN-marked acked packets; disable ECN"); + self.state = EcnState::Unsupported; + return 0; + } + + let ecn_ce_increase = + ecn_counts.ecn_ce_count - last_ecn_counts.ecn_ce_count; + match self.state { + EcnState::Probing(e) | EcnState::Validating(e) => + self.state = EcnState::Capable(e), + _ => {}, + } + self.last_ecn_counts[epoch] = ecn_counts; + // Reset the number of lost packets. + self.ecn_pkts_lost = 0; + + ecn_ce_increase + } + + /// This method should be called when a non-zero number of ECN-marked sent + /// packets were declared lost. + pub fn on_packets_lost( + &mut self, lost_ecn_marked_ack_eliciting_packets: u64, + ) { + if matches!(self.state, EcnState::Unsupported) { + return; + } + self.ecn_pkts_lost += lost_ecn_marked_ack_eliciting_packets; + if self.ecn_pkts_lost >= ECN_LOSS_THRESHOLD { + warn!( + "Lost {} ECN-marked packets; disable ECN", + self.ecn_pkts_lost + ); + self.state = EcnState::Unsupported; + } + } +} + +#[cfg(test)] +mod tests { + use crate::ecn::ECN_LOSS_THRESHOLD; + use crate::packet; + + use super::Ecn; + use super::ECN_ECT0; + use super::ECN_ECT1; + use super::ECN_NOT_ECT; + use super::ECN_VALIDATION_COUNT; + + #[test] + fn disabled_ecn() { + let mut ecn = Ecn::new(false, false, [packet::EcnCounts::default(); 3]); + for _ in 0..1000 { + assert_eq!(ecn.is_next_sent_pkt_ecn_marked(), false); + assert_eq!(ecn.get_ecn_value_to_send(), ECN_NOT_ECT); + } + } + + #[test] + fn ecn_marked_pkt_acked_without_ecn_counts() { + let mut ecn = Ecn::new(true, false, [packet::EcnCounts::default(); 3]); + + assert_eq!(ecn.is_next_sent_pkt_ecn_marked(), true); + assert_eq!(ecn.get_ecn_value_to_send(), ECN_ECT0); + + // The packet gets acknowledged, but in an ACK frame without ECN counts. + let ce_events = ecn.on_ack_received(packet::Epoch::Application, 1, None); + assert_eq!(ce_events, 0); + // This disables ECN marks. + assert_eq!(ecn.is_next_sent_pkt_ecn_marked(), false); + assert_eq!(ecn.get_ecn_value_to_send(), ECN_NOT_ECT); + } + + #[test] + fn received_ecn_counts_ect1_when_marking_ect0() { + let mut ecn_counts = [packet::EcnCounts::default(); 3]; + let mut ecn = Ecn::new(true, false, ecn_counts); + + assert_eq!(ecn.is_next_sent_pkt_ecn_marked(), true); + assert_eq!(ecn.get_ecn_value_to_send(), ECN_ECT0); + + // The packet gets acknowledged, but in an ACK frame increasing the ECN + // count of an ECT different from the one being used. + ecn_counts[packet::Epoch::Application].ect1_count += 1; + let ce_events = ecn.on_ack_received( + packet::Epoch::Application, + 1, + Some(ecn_counts[packet::Epoch::Application]), + ); + assert_eq!(ce_events, 0); + // This disables ECN marks. + assert_eq!(ecn.is_next_sent_pkt_ecn_marked(), false); + assert_eq!(ecn.get_ecn_value_to_send(), ECN_NOT_ECT); + } + + #[test] + fn received_ecn_counts_ect0_when_marking_ect1() { + let mut ecn_counts = [packet::EcnCounts::default(); 3]; + let mut ecn = Ecn::new(true, true, ecn_counts); + + assert_eq!(ecn.is_next_sent_pkt_ecn_marked(), true); + assert_eq!(ecn.get_ecn_value_to_send(), ECN_ECT1); + + // The packet gets acknowledged, but in an ACK frame increasing the ECN + // count of an ECT different from the one being used. + ecn_counts[packet::Epoch::Application].ect0_count += 1; + let ce_events = ecn.on_ack_received( + packet::Epoch::Application, + 1, + Some(ecn_counts[packet::Epoch::Application]), + ); + assert_eq!(ce_events, 0); + // This disables ECN marks. + assert_eq!(ecn.is_next_sent_pkt_ecn_marked(), false); + assert_eq!(ecn.get_ecn_value_to_send(), ECN_NOT_ECT); + } + + #[test] + fn sum_of_counts_lower_than_acked_ecn_marked_pkts() { + let mut ecn_counts = [packet::EcnCounts::default(); 3]; + let mut ecn = Ecn::new(true, false, ecn_counts); + + assert_eq!(ecn.is_next_sent_pkt_ecn_marked(), true); + assert_eq!(ecn.get_ecn_value_to_send(), ECN_ECT0); + + // The packet gets acknowledged, but the ACK frame increases the ECN + // counters such that it includes a lower number of packets than the + // numbers of ECN-marked packets actually acknowledged. + ecn_counts[packet::Epoch::Application].ect0_count += 3; + ecn_counts[packet::Epoch::Application].ecn_ce_count += 2; + let ce_events = ecn.on_ack_received( + packet::Epoch::Application, + 6, + Some(ecn_counts[packet::Epoch::Application]), + ); + assert_eq!(ce_events, 0); + // This disables ECN marks. + assert_eq!(ecn.is_next_sent_pkt_ecn_marked(), false); + assert_eq!(ecn.get_ecn_value_to_send(), ECN_NOT_ECT); + } + + #[test] + fn probing_ect0_then_validated() { + let mut ecn_counts = [packet::EcnCounts::default(); 3]; + let mut ecn = Ecn::new(true, false, ecn_counts); + for _ in 0..ECN_VALIDATION_COUNT { + assert_eq!(ecn.is_next_sent_pkt_ecn_marked(), true); + assert_eq!(ecn.get_ecn_value_to_send(), ECN_ECT0); + } + // Once we sent ECN_VALIDATION_COUNT packets, wait for any ECN + // validation before sending ECN marks again. + assert_eq!(ecn.is_next_sent_pkt_ecn_marked(), false); + assert_eq!(ecn.get_ecn_value_to_send(), ECN_NOT_ECT); + + // Our first 10 packets got acked correctly. ECN should now be enabled. + ecn_counts[packet::Epoch::Application].ect0_count += 8; + ecn_counts[packet::Epoch::Application].ecn_ce_count += 2; + let ce_events = ecn.on_ack_received( + packet::Epoch::Application, + 10, + Some(ecn_counts[packet::Epoch::Application]), + ); + assert_eq!(ce_events, 2); + for _ in 0..1000 { + assert_eq!(ecn.is_next_sent_pkt_ecn_marked(), true); + assert_eq!(ecn.get_ecn_value_to_send(), ECN_ECT0); + } + } + + #[test] + fn probing_ect1_then_validated() { + let mut ecn_counts = [packet::EcnCounts::default(); 3]; + let mut ecn = Ecn::new(true, true, ecn_counts); + for _ in 0..ECN_VALIDATION_COUNT { + assert_eq!(ecn.is_next_sent_pkt_ecn_marked(), true); + assert_eq!(ecn.get_ecn_value_to_send(), ECN_ECT1); + } + // Once we sent ECN_VALIDATION_COUNT packets, wait for any ECN + // validation before sending ECN marks again. + assert_eq!(ecn.is_next_sent_pkt_ecn_marked(), false); + assert_eq!(ecn.get_ecn_value_to_send(), ECN_NOT_ECT); + + // Our first 10 packets got acked correctly. ECN should now be enabled. + ecn_counts[packet::Epoch::Application].ect1_count += 8; + ecn_counts[packet::Epoch::Application].ecn_ce_count += 2; + let ce_events = ecn.on_ack_received( + packet::Epoch::Application, + 10, + Some(ecn_counts[packet::Epoch::Application]), + ); + assert_eq!(ce_events, 2); + for _ in 0..1000 { + assert_eq!(ecn.is_next_sent_pkt_ecn_marked(), true); + assert_eq!(ecn.get_ecn_value_to_send(), ECN_ECT1); + } + } + + #[test] + fn probing_ect0_but_lost_packets() { + let mut ecn_counts = [packet::EcnCounts::default(); 3]; + let mut ecn = Ecn::new(true, false, ecn_counts); + for _ in 0..3 { + assert_eq!(ecn.is_next_sent_pkt_ecn_marked(), true); + assert_eq!(ecn.get_ecn_value_to_send(), ECN_ECT0); + } + ecn.on_packets_lost(2); + + // Our first 10 packets got acked correctly. ECN should now be enabled. + ecn_counts[packet::Epoch::Application].ect0_count += 1; + let ce_events = ecn.on_ack_received( + packet::Epoch::Application, + 1, + Some(ecn_counts[packet::Epoch::Application]), + ); + assert_eq!(ce_events, 0); + for _ in 0..ECN_LOSS_THRESHOLD { + assert_eq!(ecn.is_next_sent_pkt_ecn_marked(), true); + assert_eq!(ecn.get_ecn_value_to_send(), ECN_ECT0); + } + ecn.on_packets_lost(ECN_LOSS_THRESHOLD); + for _ in 0..1000 { + assert_eq!(ecn.is_next_sent_pkt_ecn_marked(), false); + assert_eq!(ecn.get_ecn_value_to_send(), ECN_NOT_ECT); + } + } +} diff --git a/quiche/src/lib.rs b/quiche/src/lib.rs index ec7295948f..22df41a145 100644 --- a/quiche/src/lib.rs +++ b/quiche/src/lib.rs @@ -742,6 +742,9 @@ pub struct Config { max_stream_window: u64, disable_dcid_reuse: bool, + + ecn_enabled: bool, + ecn_use_ect1: bool, } // See https://quicwg.org/base-drafts/rfc9000.html#section-15 @@ -807,6 +810,9 @@ impl Config { max_stream_window: stream::MAX_STREAM_WINDOW, disable_dcid_reuse: false, + + ecn_enabled: false, + ecn_use_ect1: false, }) } @@ -1262,6 +1268,21 @@ impl Config { pub fn set_disable_dcid_reuse(&mut self, v: bool) { self.disable_dcid_reuse = v; } + + /// Sets whether the QUIC connection should send packets with ECN support. + /// + /// The default value is `false`. + pub fn enable_ecn(&mut self, v: bool) { + self.ecn_enabled = v; + } + + /// Sets whether the QUIC connection should send packets with ECN support + /// with ECT(1) marking instead of ECT(0). + /// + /// The default value is `false`. + pub fn set_ecn_use_ect1(&mut self, v: bool) { + self.ecn_use_ect1 = v; + } } /// A QUIC connection. @@ -2991,7 +3012,7 @@ impl Connection { self.pkt_num_spaces[epoch].largest_rx_pkt_num = cmp::max(self.pkt_num_spaces[epoch].largest_rx_pkt_num, pn); - if info.ecn > 0 { + if info.ecn != ecn::ECN_NOT_ECT { // Create the `EcnCounts` struct if this is the first non-zero ECN // value received. if self.pkt_num_spaces[epoch].ecn_counts.is_none() { @@ -3003,14 +3024,13 @@ impl Connection { self.pkt_num_spaces[epoch].ecn_counts { match info.ecn { - // ECN capable (ECT(0)). - 0x01 => ecn_counts.ect0_count += 1, - 0x02 => ecn_counts.ect1_count += 1, + ecn::ECN_ECT0 => ecn_counts.ect0_count += 1, - // Congestion event (ECN-CE). - 0x03 => ecn_counts.ecn_ce_count += 1, + ecn::ECN_ECT1 => ecn_counts.ect1_count += 1, - _ => unreachable!(), + ecn::ECN_CE => ecn_counts.ecn_ce_count += 1, + + e => warn!("{} invalid ECN value {}", self.trace_id, e), }; } } @@ -3327,7 +3347,7 @@ impl Connection { done += pad_len; } - let send_path = self.paths.get(send_pid)?; + let send_path = self.paths.get_mut(send_pid)?; let info = SendInfo { from: send_path.local_addr(), @@ -3335,7 +3355,7 @@ impl Connection { at: send_path.recovery.get_packet_send_time(), - ecn: 0x01, // ECT(0) + ecn: send_path.recovery.ecn.get_ecn_value_to_send(), }; Ok((done, info)) @@ -4373,6 +4393,7 @@ impl Connection { tx_in_flight: 0, lost: 0, has_data, + ecn_marked: path.recovery.ecn.is_next_sent_pkt_ecn_marked(), }; if in_flight && is_app_limited { @@ -16442,6 +16463,7 @@ pub use crate::stream::StreamIter; mod cid; mod crypto; mod dgram; +mod ecn; #[cfg(feature = "ffi")] mod ffi; mod flowcontrol; diff --git a/quiche/src/recovery/bbr/mod.rs b/quiche/src/recovery/bbr/mod.rs index fa143009d6..906cf4308f 100644 --- a/quiche/src/recovery/bbr/mod.rs +++ b/quiche/src/recovery/bbr/mod.rs @@ -432,6 +432,7 @@ mod tests { tx_in_flight: 0, lost: 0, has_data: false, + ecn_marked: false, }; r.on_packet_sent( @@ -503,6 +504,7 @@ mod tests { tx_in_flight: 0, lost: 0, has_data: false, + ecn_marked: false, }; r.on_packet_sent( @@ -573,6 +575,7 @@ mod tests { tx_in_flight: 0, lost: 0, has_data: false, + ecn_marked: false, }; r.on_packet_sent( @@ -625,6 +628,7 @@ mod tests { tx_in_flight: 0, lost: 0, has_data: false, + ecn_marked: false, }; r.on_packet_sent( @@ -698,6 +702,7 @@ mod tests { tx_in_flight: 0, lost: 0, has_data: false, + ecn_marked: false, }; r.on_packet_sent( @@ -770,6 +775,7 @@ mod tests { tx_in_flight: 0, lost: 0, has_data: false, + ecn_marked: false, }; r.on_packet_sent( @@ -825,6 +831,7 @@ mod tests { tx_in_flight: 0, lost: 0, has_data: false, + ecn_marked: false, }; r.on_packet_sent( diff --git a/quiche/src/recovery/bbr2/mod.rs b/quiche/src/recovery/bbr2/mod.rs index 48b79d2feb..881b047c10 100644 --- a/quiche/src/recovery/bbr2/mod.rs +++ b/quiche/src/recovery/bbr2/mod.rs @@ -744,6 +744,7 @@ mod tests { tx_in_flight: 0, lost: 0, has_data: false, + ecn_marked: false, }; r.on_packet_sent( @@ -814,6 +815,7 @@ mod tests { tx_in_flight: 0, lost: 0, has_data: false, + ecn_marked: false, }; r.on_packet_sent( @@ -887,6 +889,7 @@ mod tests { tx_in_flight: 0, lost: 0, has_data: false, + ecn_marked: false, }; r.on_packet_sent( @@ -938,6 +941,7 @@ mod tests { tx_in_flight: 0, lost: 0, has_data: false, + ecn_marked: false, }; r.on_packet_sent( @@ -1011,6 +1015,7 @@ mod tests { tx_in_flight: 0, lost: 0, has_data: false, + ecn_marked: false, }; r.on_packet_sent( @@ -1065,6 +1070,7 @@ mod tests { tx_in_flight: 0, lost: 0, has_data: false, + ecn_marked: false, }; r.on_packet_sent( diff --git a/quiche/src/recovery/cubic.rs b/quiche/src/recovery/cubic.rs index 43babcbfad..fba04e34e6 100644 --- a/quiche/src/recovery/cubic.rs +++ b/quiche/src/recovery/cubic.rs @@ -490,6 +490,7 @@ mod tests { tx_in_flight: 0, lost: 0, has_data: false, + ecn_marked: false, }; // Send initcwnd full MSS packets to become no longer app limited @@ -542,6 +543,7 @@ mod tests { tx_in_flight: 0, lost: 0, has_data: false, + ecn_marked: false, }; // Send initcwnd full MSS packets to become no longer app limited @@ -621,6 +623,7 @@ mod tests { has_data: false, tx_in_flight: 0, lost: 0, + ecn_marked: false, }; r.congestion_event( @@ -665,6 +668,7 @@ mod tests { has_data: false, tx_in_flight: 0, lost: 0, + ecn_marked: false, }; // Trigger congestion event to update ssthresh @@ -741,6 +745,7 @@ mod tests { has_data: false, tx_in_flight: 0, lost: 0, + ecn_marked: false, }; r.congestion_event( @@ -806,6 +811,7 @@ mod tests { tx_in_flight: 0, lost: 0, has_data: false, + ecn_marked: false, }; // 1st round. @@ -962,6 +968,7 @@ mod tests { tx_in_flight: 0, lost: 0, has_data: false, + ecn_marked: false, }; // 1st round. @@ -1116,6 +1123,7 @@ mod tests { has_data: false, tx_in_flight: 0, lost: 0, + ecn_marked: false, }; r.congestion_event( @@ -1177,6 +1185,7 @@ mod tests { has_data: false, tx_in_flight: 0, lost: 0, + ecn_marked: false, }; let prev_cwnd = r.cwnd(); @@ -1252,6 +1261,7 @@ mod tests { has_data: false, tx_in_flight: 0, lost: 0, + ecn_marked: false, }; r.congestion_event( @@ -1318,6 +1328,7 @@ mod tests { has_data: false, tx_in_flight: 0, lost: 0, + ecn_marked: false, }; r.congestion_event( diff --git a/quiche/src/recovery/delivery_rate.rs b/quiche/src/recovery/delivery_rate.rs index a5a866e6c6..0e1934876b 100644 --- a/quiche/src/recovery/delivery_rate.rs +++ b/quiche/src/recovery/delivery_rate.rs @@ -244,6 +244,7 @@ mod tests { has_data: false, tx_in_flight: 0, lost: 0, + ecn_marked: false, }; r.on_packet_sent( @@ -312,6 +313,7 @@ mod tests { has_data: false, tx_in_flight: 0, lost: 0, + ecn_marked: false, }; r.on_packet_sent( @@ -353,6 +355,7 @@ mod tests { has_data: false, tx_in_flight: 0, lost: 0, + ecn_marked: false, }; r.on_packet_sent( diff --git a/quiche/src/recovery/mod.rs b/quiche/src/recovery/mod.rs index 0106c062a7..8dfa9b15e8 100644 --- a/quiche/src/recovery/mod.rs +++ b/quiche/src/recovery/mod.rs @@ -36,6 +36,7 @@ use std::collections::VecDeque; use crate::Config; use crate::Result; +use crate::ecn; use crate::frame; use crate::minmax; use crate::packet; @@ -142,7 +143,7 @@ pub struct Recovery { max_datagram_size: usize, - ecn_ce_counts: [u64; packet::Epoch::count()], + pub ecn: ecn::Ecn, cubic_state: cubic::State, @@ -183,6 +184,8 @@ pub struct RecoveryConfig { pacing: bool, max_pacing_rate: Option, initial_congestion_window_packets: usize, + pub ecn_enabled: bool, + pub ecn_use_ect1: bool, } impl RecoveryConfig { @@ -196,6 +199,8 @@ impl RecoveryConfig { max_pacing_rate: config.max_pacing_rate, initial_congestion_window_packets: config .initial_congestion_window_packets, + ecn_enabled: config.ecn_enabled, + ecn_use_ect1: config.ecn_use_ect1, } } } @@ -269,7 +274,11 @@ impl Recovery { max_datagram_size: recovery_config.max_send_udp_payload_size, - ecn_ce_counts: [0, 0, 0], + ecn: ecn::Ecn::new( + recovery_config.ecn_enabled, + recovery_config.ecn_use_ect1, + [packet::EcnCounts::default(); packet::Epoch::count()], + ), cc_ops: recovery_config.cc_ops, @@ -467,6 +476,8 @@ impl Recovery { let mut largest_newly_acked_size = 0; let mut largest_newly_acked_sent_time = now; + let mut newly_ecn_marked_acked = 0; + let mut undo_cwnd = false; let max_rtt = cmp::max(self.latest_rtt, self.rtt()); @@ -547,6 +558,10 @@ impl Recovery { self.in_flight_count[epoch].saturating_sub(1); } + if unacked.ecn_marked { + newly_ecn_marked_acked += 1; + } + newly_acked.push(Acked { pkt_num: unacked.pkt_num, @@ -600,35 +615,34 @@ impl Recovery { } } - if let Some(ecn_counts) = ecn_counts { - // See if we can not do better. - let largest_unacked = newly_acked.last().unwrap(); - let largest_sent = Sent { - pkt_num: largest_unacked.pkt_num, - frames: SmallVec::new(), - time_sent: largest_unacked.time_sent, - time_acked: Some(now), - time_lost: None, - size: largest_unacked.size, - ack_eliciting: true, - in_flight: true, - delivered: largest_unacked.delivered, - delivered_time: largest_unacked.delivered_time, - first_sent_time: largest_unacked.first_sent_time, - is_app_limited: largest_unacked.is_app_limited, - tx_in_flight: largest_unacked.tx_in_flight, - lost: largest_unacked.lost, - has_data: true, - }; - self.process_ecn( - ecn_counts, - largest_newly_acked_size, - // We just checked that that list was not empty. - &largest_sent, - epoch, - now, - ); - } + // See if we can not do better. + let largest_unacked = newly_acked.last().unwrap(); + let largest_sent = Sent { + pkt_num: largest_unacked.pkt_num, + frames: SmallVec::new(), + time_sent: largest_unacked.time_sent, + time_acked: Some(now), + time_lost: None, + size: largest_unacked.size, + ack_eliciting: true, + in_flight: true, + delivered: largest_unacked.delivered, + delivered_time: largest_unacked.delivered_time, + first_sent_time: largest_unacked.first_sent_time, + is_app_limited: largest_unacked.is_app_limited, + tx_in_flight: largest_unacked.tx_in_flight, + lost: largest_unacked.lost, + has_data: true, + ecn_marked: true, + }; + self.process_ecn( + newly_ecn_marked_acked, + ecn_counts, + largest_newly_acked_size, + &largest_sent, + epoch, + now, + ); // Detect and mark lost packets without removing them from the sent // packets list. @@ -956,6 +970,7 @@ impl Recovery { let mut lost_packets = 0; let mut lost_bytes = 0; + let mut lost_ecn_marked_ack_eliciting_packets = 0; let mut largest_lost_pkt = None; @@ -993,6 +1008,10 @@ impl Recovery { ); } + if unacked.ack_eliciting && unacked.ecn_marked { + lost_ecn_marked_ack_eliciting_packets += 1; + } + lost_packets += 1; self.lost_count += 1; } else { @@ -1014,6 +1033,9 @@ impl Recovery { self.on_packets_lost(lost_bytes, &pkt, epoch, now); } + self.ecn + .on_packets_lost(lost_ecn_marked_ack_eliciting_packets); + self.drain_packets(epoch, now); (lost_packets, lost_bytes) @@ -1095,12 +1117,15 @@ impl Recovery { } fn process_ecn( - &mut self, ecn_counts: packet::EcnCounts, largest_acked_size: usize, + &mut self, newly_ecn_marked_acked: u64, + ecn_counts: Option, largest_acked_size: usize, largest_acked_sent: &Sent, epoch: packet::Epoch, now: Instant, ) { - if ecn_counts.ecn_ce_count > self.ecn_ce_counts[epoch] { - self.ecn_ce_counts[epoch] = ecn_counts.ecn_ce_count; - + if self + .ecn + .on_ack_received(epoch, newly_ecn_marked_acked, ecn_counts) > + 0 + { self.congestion_event( largest_acked_size, largest_acked_sent, @@ -1325,6 +1350,8 @@ pub struct Sent { pub lost: u64, pub has_data: bool, + + pub ecn_marked: bool, } impl std::fmt::Debug for Sent { @@ -1570,6 +1597,7 @@ mod tests { tx_in_flight: 0, lost: 0, has_data: false, + ecn_marked: false, }; r.on_packet_sent( @@ -1598,6 +1626,7 @@ mod tests { tx_in_flight: 0, lost: 0, has_data: false, + ecn_marked: false, }; r.on_packet_sent( @@ -1626,6 +1655,7 @@ mod tests { tx_in_flight: 0, lost: 0, has_data: false, + ecn_marked: false, }; r.on_packet_sent( @@ -1654,6 +1684,7 @@ mod tests { tx_in_flight: 0, lost: 0, has_data: false, + ecn_marked: false, }; r.on_packet_sent( @@ -1716,6 +1747,7 @@ mod tests { tx_in_flight: 0, lost: 0, has_data: false, + ecn_marked: false, }; r.on_packet_sent( @@ -1744,6 +1776,7 @@ mod tests { tx_in_flight: 0, lost: 0, has_data: false, + ecn_marked: false, }; r.on_packet_sent( @@ -1819,6 +1852,7 @@ mod tests { tx_in_flight: 0, lost: 0, has_data: false, + ecn_marked: false, }; r.on_packet_sent( @@ -1847,6 +1881,7 @@ mod tests { tx_in_flight: 0, lost: 0, has_data: false, + ecn_marked: false, }; r.on_packet_sent( @@ -1875,6 +1910,7 @@ mod tests { tx_in_flight: 0, lost: 0, has_data: false, + ecn_marked: false, }; r.on_packet_sent( @@ -1903,6 +1939,7 @@ mod tests { tx_in_flight: 0, lost: 0, has_data: false, + ecn_marked: false, }; r.on_packet_sent( @@ -1989,6 +2026,7 @@ mod tests { tx_in_flight: 0, lost: 0, has_data: false, + ecn_marked: false, }; r.on_packet_sent( @@ -2017,6 +2055,7 @@ mod tests { tx_in_flight: 0, lost: 0, has_data: false, + ecn_marked: false, }; r.on_packet_sent( @@ -2045,6 +2084,7 @@ mod tests { tx_in_flight: 0, lost: 0, has_data: false, + ecn_marked: false, }; r.on_packet_sent( @@ -2073,6 +2113,7 @@ mod tests { tx_in_flight: 0, lost: 0, has_data: false, + ecn_marked: false, }; r.on_packet_sent( @@ -2173,6 +2214,7 @@ mod tests { tx_in_flight: 0, lost: 0, has_data: false, + ecn_marked: false, }; r.on_packet_sent( @@ -2234,6 +2276,7 @@ mod tests { tx_in_flight: 0, lost: 0, has_data: false, + ecn_marked: false, }; r.on_packet_sent( @@ -2267,6 +2310,7 @@ mod tests { tx_in_flight: 0, lost: 0, has_data: false, + ecn_marked: false, }; r.on_packet_sent( @@ -2297,6 +2341,7 @@ mod tests { tx_in_flight: 0, lost: 0, has_data: false, + ecn_marked: false, }; r.on_packet_sent( diff --git a/quiche/src/recovery/reno.rs b/quiche/src/recovery/reno.rs index c3e93dee18..41009c4850 100644 --- a/quiche/src/recovery/reno.rs +++ b/quiche/src/recovery/reno.rs @@ -220,6 +220,7 @@ mod tests { tx_in_flight: 0, lost: 0, has_data: false, + ecn_marked: false, }; // Send initcwnd full MSS packets to become no longer app limited @@ -273,6 +274,7 @@ mod tests { tx_in_flight: 0, lost: 0, has_data: false, + ecn_marked: false, }; // Send initcwnd full MSS packets to become no longer app limited @@ -354,6 +356,7 @@ mod tests { has_data: false, tx_in_flight: 0, lost: 0, + ecn_marked: false, }; r.congestion_event( @@ -395,6 +398,7 @@ mod tests { has_data: false, tx_in_flight: 0, lost: 0, + ecn_marked: false, }; // Trigger congestion event to update ssthresh From 2abe10bb99bc89720ab366b65bc302103de542fd Mon Sep 17 00:00:00 2001 From: Quentin De Coninck Date: Thu, 6 Oct 2022 16:24:29 +0200 Subject: [PATCH 04/11] making quiche apps ECN-capable --- apps/src/args.rs | 15 ++++ apps/src/bin/quiche-server.rs | 26 +++--- apps/src/client.rs | 38 ++++++-- apps/src/cmsg.rs | 144 +++++++++++++++++++++++++++++++ apps/src/common.rs | 36 ++++++++ apps/src/lib.rs | 2 + apps/src/recvfrom.rs | 158 ++++++++++++++++++++++++++++++++++ apps/src/sendto.rs | 42 ++++++++- 8 files changed, 439 insertions(+), 22 deletions(-) create mode 100644 apps/src/cmsg.rs create mode 100644 apps/src/recvfrom.rs diff --git a/apps/src/args.rs b/apps/src/args.rs index 24234a9c7c..56c8fa25dc 100644 --- a/apps/src/args.rs +++ b/apps/src/args.rs @@ -54,6 +54,8 @@ pub struct CommonArgs { pub qpack_max_table_capacity: Option, pub qpack_blocked_streams: Option, pub initial_cwnd_packets: u64, + pub enable_ecn: bool, + pub use_ect1: bool, } /// Creates a new `CommonArgs` structure using the provided [`Docopt`]. @@ -80,6 +82,8 @@ pub struct CommonArgs { /// --qpack-max-table-capacity BYTES Max capacity of dynamic QPACK decoding. /// --qpack-blocked-streams STREAMS Limit of blocked streams while decoding. /// --initial-cwnd-packets Size of initial congestion window, in packets. +/// --enable-ecn Enable ECN support. +/// --use-ect1 Use ECT(1) instead of ECT(0). /// /// [`Docopt`]: https://docs.rs/docopt/1.1.0/docopt/ impl Args for CommonArgs { @@ -191,6 +195,9 @@ impl Args for CommonArgs { .parse::() .unwrap(); + let enable_ecn = args.get_bool("--enable-ecn"); + let use_ect1 = args.get_bool("--use-ect1"); + CommonArgs { alpns, max_data, @@ -214,6 +221,8 @@ impl Args for CommonArgs { qpack_max_table_capacity, qpack_blocked_streams, initial_cwnd_packets, + enable_ecn, + use_ect1, } } } @@ -243,6 +252,8 @@ impl Default for CommonArgs { qpack_max_table_capacity: None, qpack_blocked_streams: None, initial_cwnd_packets: 10, + enable_ecn: false, + use_ect1: false, } } } @@ -289,6 +300,8 @@ Options: --session-file PATH File used to cache a TLS session for resumption. --source-port PORT Source port to use when connecting to the server [default: 0]. --initial-cwnd-packets PACKETS The initial congestion window size in terms of packet count [default: 10]. + --enable-ecn Enable ECN support. + --use-ect1 Use ECT(1) instead of ECT(0). -h --help Show this screen. "; @@ -464,6 +477,8 @@ Options: --disable-gso Disable GSO (linux only). --disable-pacing Disable pacing (linux only). --initial-cwnd-packets PACKETS The initial congestion window size in terms of packet count [default: 10]. + --enable-ecn Enable ECN support. + --use-ect1 Use ECT(1) instead of ECT(0). -h --help Show this screen. "; diff --git a/apps/src/bin/quiche-server.rs b/apps/src/bin/quiche-server.rs index fa06b7ec4e..e13a9808ea 100644 --- a/apps/src/bin/quiche-server.rs +++ b/apps/src/bin/quiche-server.rs @@ -41,6 +41,7 @@ use std::rc::Rc; use std::cell::RefCell; +use quiche_apps::recvfrom::recv_from; use ring::rand::*; use quiche_apps::args::*; @@ -127,6 +128,7 @@ fn main() { config.set_max_stream_window(conn_args.max_stream_window); config.enable_pacing(pacing); + config.enable_ecn(conn_args.enable_ecn); let mut keylog = None; @@ -202,7 +204,12 @@ fn main() { break 'read; } - let (len, from) = match socket.recv_from(&mut buf) { + let (len, recv_info) = match recv_from( + &socket, + local_addr, + &mut buf, + conn_args.enable_ecn, + ) { Ok(v) => v, Err(e) => { @@ -270,7 +277,7 @@ fn main() { let out = &out[..len]; - if let Err(e) = socket.send_to(out, from) { + if let Err(e) = socket.send_to(out, recv_info.from) { if e.kind() == std::io::ErrorKind::WouldBlock { trace!("send() would block"); break; @@ -295,7 +302,7 @@ fn main() { warn!("Doing stateless retry"); let scid = quiche::ConnectionId::from_ref(&scid); - let new_token = mint_token(&hdr, &from); + let new_token = mint_token(&hdr, &recv_info.from); let len = quiche::retry( &hdr.scid, @@ -309,7 +316,7 @@ fn main() { let out = &out[..len]; - if let Err(e) = socket.send_to(out, from) { + if let Err(e) = socket.send_to(out, recv_info.from) { if e.kind() == std::io::ErrorKind::WouldBlock { trace!("send() would block"); break; @@ -320,7 +327,7 @@ fn main() { continue 'read; } - odcid = validate_token(&from, token); + odcid = validate_token(&recv_info.from, token); // The token was not valid, meaning the retry failed, so // drop the packet. @@ -348,7 +355,7 @@ fn main() { &scid, odcid.as_ref(), local_addr, - from, + recv_info.from, &mut config, ) .unwrap(); @@ -404,12 +411,6 @@ fn main() { clients.get_mut(cid).unwrap() }; - let recv_info = quiche::RecvInfo { - to: local_addr, - from, - ecn: 0, - }; - // Process potentially coalesced packets. let read = match client.conn.recv(pkt_buf, recv_info) { Ok(v) => v, @@ -586,6 +587,7 @@ fn main() { client.max_datagram_size, pacing, enable_gso, + conn_args.enable_ecn, ) { if e.kind() == std::io::ErrorKind::WouldBlock { trace!("send() would block"); diff --git a/apps/src/client.rs b/apps/src/client.rs index b6a0c2e3ee..7256176d70 100644 --- a/apps/src/client.rs +++ b/apps/src/client.rs @@ -26,6 +26,8 @@ use crate::args::*; use crate::common::*; +use crate::recvfrom::recv_from; +use crate::sendto::send_to; use std::net::ToSocketAddrs; @@ -131,6 +133,9 @@ pub fn connect( config.set_max_connection_window(conn_args.max_window); config.set_max_stream_window(conn_args.max_stream_window); + config.enable_ecn(conn_args.enable_ecn); + config.set_ecn_use_ect1(conn_args.use_ect1); + let mut keylog = None; if let Some(keylog_path) = std::env::var_os("SSLKEYLOGFILE") { @@ -224,7 +229,15 @@ pub fn connect( let (write, send_info) = conn.send(&mut out).expect("initial send failed"); - while let Err(e) = socket.send_to(&out[..write], send_info.to) { + while let Err(e) = send_to( + &socket, + &out[..write], + &send_info, + MAX_DATAGRAM_SIZE, + false, + false, + conn_args.enable_ecn, + ) { if e.kind() == std::io::ErrorKind::WouldBlock { trace!( "{} -> {}: send() would block", @@ -274,7 +287,12 @@ pub fn connect( let local_addr = socket.local_addr().unwrap(); 'read: loop { - let (len, from) = match socket.recv_from(&mut buf) { + let (len, recv_info) = match recv_from( + socket, + local_addr, + &mut buf, + conn_args.enable_ecn, + ) { Ok(v) => v, Err(e) => { @@ -304,12 +322,6 @@ pub fn connect( pkt_count += 1; - let recv_info = quiche::RecvInfo { - to: local_addr, - from, - ecn: 0, - }; - // Process potentially coalesced packets. let read = match conn.recv(&mut buf[..len], recv_info) { Ok(v) => v, @@ -529,7 +541,15 @@ pub fn connect( }, }; - if let Err(e) = socket.send_to(&out[..write], send_info.to) { + if let Err(e) = send_to( + socket, + &out[..write], + &send_info, + MAX_DATAGRAM_SIZE, + false, + false, + conn_args.enable_ecn, + ) { if e.kind() == std::io::ErrorKind::WouldBlock { trace!( "{} -> {}: send() would block", diff --git a/apps/src/cmsg.rs b/apps/src/cmsg.rs new file mode 100644 index 0000000000..e69652f188 --- /dev/null +++ b/apps/src/cmsg.rs @@ -0,0 +1,144 @@ +// Copyright (C) 2022, Cloudflare, Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright notice, +// this list of conditions and the following disclaimer. +// +// * Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS +// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, +// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR +// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +// +// Taken from https://github.com/mxinden/udp-socket/blob/master/src/cmsg.rs, +// which is itself inspired from +// https://github.com/quinn-rs/quinn/blob/main/quinn-udp/src/cmsg.rs. + +use std::mem; +use std::ptr; + +#[derive(Copy, Clone)] +#[repr(align(8))] // Conservative bound for align_of +pub struct Aligned(pub T); + +/// Helper to encode a series of control messages ("cmsgs") to a buffer for use +/// in `sendmsg`. +/// +/// The operation must be "finished" for the msghdr to be usable, either by +/// calling `finish` explicitly or by dropping the `Encoder`. +pub struct Encoder<'a> { + hdr: &'a mut libc::msghdr, + cmsg: Option<&'a mut libc::cmsghdr>, + len: usize, +} + +impl<'a> Encoder<'a> { + /// # Safety + /// - `hdr.msg_control` must be a suitably aligned pointer to + /// `hdr.msg_controllen` bytes that can be safely written + /// - The `Encoder` must be dropped before `hdr` is passed to a system call, + /// and must not be leaked. + pub unsafe fn new(hdr: &'a mut libc::msghdr) -> Self { + Self { + cmsg: libc::CMSG_FIRSTHDR(hdr).as_mut(), + hdr, + len: 0, + } + } + + /// Append a control message to the buffer. + /// + /// # Panics + /// - If insufficient buffer space remains. + /// - If `T` has stricter alignment requirements than `cmsghdr` + pub fn push( + &mut self, level: libc::c_int, ty: libc::c_int, value: T, + ) { + assert!(mem::align_of::() <= mem::align_of::()); + let space = + unsafe { libc::CMSG_SPACE(mem::size_of_val(&value) as _) as usize }; + assert!( + self.hdr.msg_controllen as usize >= self.len + space, + "control message buffer too small" + ); + let cmsg = self.cmsg.take().expect("no control buffer space remaining"); + cmsg.cmsg_level = level; + cmsg.cmsg_type = ty; + cmsg.cmsg_len = + unsafe { libc::CMSG_LEN(mem::size_of_val(&value) as _) } as _; + unsafe { + ptr::write(libc::CMSG_DATA(cmsg) as *const T as *mut T, value); + } + self.len += space; + self.cmsg = unsafe { libc::CMSG_NXTHDR(self.hdr, cmsg).as_mut() }; + } + + /// Finishes appending control messages to the buffer + pub fn finish(self) { + // Delegates to the `Drop` impl + } +} + +// Statically guarantees that the encoding operation is "finished" before the +// control buffer is read by `sendmsg`. +impl<'a> Drop for Encoder<'a> { + fn drop(&mut self) { + self.hdr.msg_controllen = self.len as _; + } +} + +/// # Safety +/// +/// `cmsg` must refer to a cmsg containing a payload of type `T` +pub unsafe fn decode(cmsg: &libc::cmsghdr) -> T { + assert!(mem::align_of::() <= mem::align_of::()); + debug_assert_eq!( + cmsg.cmsg_len as usize, + libc::CMSG_LEN(mem::size_of::() as _) as usize + ); + ptr::read(libc::CMSG_DATA(cmsg) as *const T) +} + +pub struct Iter<'a> { + hdr: &'a libc::msghdr, + cmsg: Option<&'a libc::cmsghdr>, +} + +impl<'a> Iter<'a> { + /// # Safety + /// + /// `hdr.msg_control` must point to memory outliving `'a` which can be + /// soundly read for the lifetime of the constructed `Iter` and contains + /// a buffer of cmsgs, i.e. is aligned for `cmsghdr`, is fully + /// initialized, and has correct internal links. + pub unsafe fn new(hdr: &'a libc::msghdr) -> Self { + Self { + hdr, + cmsg: libc::CMSG_FIRSTHDR(hdr).as_ref(), + } + } +} + +impl<'a> Iterator for Iter<'a> { + type Item = &'a libc::cmsghdr; + + fn next(&mut self) -> Option<&'a libc::cmsghdr> { + let current = self.cmsg.take()?; + self.cmsg = unsafe { libc::CMSG_NXTHDR(self.hdr, current).as_ref() }; + Some(current) + } +} diff --git a/apps/src/common.rs b/apps/src/common.rs index 746e27a7b6..dc8915b274 100644 --- a/apps/src/common.rs +++ b/apps/src/common.rs @@ -1640,3 +1640,39 @@ impl HttpConn for Http3Conn { } } } + +// The two following functions were copied from +// https://github.com/rust-lang/socket2/blob/master/src/sys/unix.rs. + +#[cfg(target_os = "linux")] +pub(crate) mod macro_helper { + /// Helper macro to execute a system call that returns an `io::Result`. + macro_rules! syscall { + ($fn: ident ( $($arg: expr),* $(,)* ) ) => {{ + #[allow(unused_unsafe)] + let res = unsafe { libc::$fn($($arg, )*) }; + if res == -1 { + Err(std::io::Error::last_os_error()) + } else { + Ok(res) + } + }}; + } + + pub(crate) use syscall; +} + +#[cfg(target_os = "linux")] +pub(crate) unsafe fn setsockopt( + fd: libc::c_int, opt: libc::c_int, val: libc::c_int, payload: T, +) -> std::io::Result<()> { + let payload = std::ptr::addr_of!(payload).cast(); + macro_helper::syscall!(setsockopt( + fd, + opt, + val, + payload, + std::mem::size_of::() as libc::socklen_t, + )) + .map(|_| ()) +} diff --git a/apps/src/lib.rs b/apps/src/lib.rs index 6c4941a6b9..26c9cb2fb0 100644 --- a/apps/src/lib.rs +++ b/apps/src/lib.rs @@ -29,5 +29,7 @@ extern crate log; pub mod args; pub mod client; +pub mod cmsg; pub mod common; +pub mod recvfrom; pub mod sendto; diff --git a/apps/src/recvfrom.rs b/apps/src/recvfrom.rs new file mode 100644 index 0000000000..8430a55733 --- /dev/null +++ b/apps/src/recvfrom.rs @@ -0,0 +1,158 @@ +// Copyright (C) 2022, Cloudflare, Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright notice, +// this list of conditions and the following disclaimer. +// +// * Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS +// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, +// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR +// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use quiche::RecvInfo; +use std::io; +use std::net::SocketAddr; + +#[cfg(target_os = "linux")] +fn set_ecn_support(socket: &mio::net::UdpSocket) -> std::io::Result<()> { + use crate::common::setsockopt; + use libc::IPPROTO_IP; + use libc::IPPROTO_IPV6; + use libc::IPV6_RECVTCLASS; + use libc::IP_RECVTOS; + use std::os::unix::io::AsRawFd; + + let recv_ecn: u32 = 1; + + // Opportunistically set support on both address families. + let res1 = unsafe { + setsockopt(socket.as_raw_fd(), IPPROTO_IP, IP_RECVTOS, recv_ecn) + }; + let res2 = unsafe { + setsockopt(socket.as_raw_fd(), IPPROTO_IPV6, IPV6_RECVTCLASS, recv_ecn) + }; + if res1.is_ok() { + res1 + } else { + res2 + } +} + +#[cfg(not(target_os = "linux"))] +fn set_ecn_support(_socket: &mio::net::UdpSocket) -> std::io::Result<()> { + Err(std::io::Error::new( + std::io::ErrorKind::Other, + "ecn not supported", + )) +} + +// The following is taken from https://github.com/mxinden/udp-socket, +// with some adaptations. + +#[cfg(target_os = "linux")] +fn recvmsg( + socket: &mio::net::UdpSocket, local_addr: SocketAddr, buf: &mut [u8], +) -> std::io::Result<(usize, RecvInfo)> { + use std::io::IoSliceMut; + use std::mem::MaybeUninit; + use std::os::unix::io::AsRawFd; + + use libc::CMSG_LEN; + + use crate::cmsg; + use crate::common; + + let mut iov = IoSliceMut::new(buf); + let mut name = MaybeUninit::::uninit(); + let mut ctrl = cmsg::Aligned(MaybeUninit::< + [u8; (std::mem::size_of::() as _)], + >::uninit()); + let mut hdr = unsafe { std::mem::zeroed::() }; + + hdr.msg_name = name.as_mut_ptr() as _; + hdr.msg_namelen = std::mem::size_of::() as _; + hdr.msg_iov = (&mut iov) as *mut IoSliceMut as *mut libc::iovec; + hdr.msg_iovlen = 1; + hdr.msg_control = ctrl.0.as_mut_ptr() as _; + hdr.msg_controllen = CMSG_LEN as _; + hdr.msg_flags = 0; + + let read = common::macro_helper::syscall!(recvmsg( + socket.as_raw_fd(), + (&mut hdr) as *mut libc::msghdr, + 0 + ))?; + + let name = unsafe { name.assume_init() }; + let mut ecn_bits = 0; + + let cmsg_iter = unsafe { cmsg::Iter::new(&hdr) }; + for cmsg in cmsg_iter { + match (cmsg.cmsg_level, cmsg.cmsg_type) { + // FreeBSD uses IP_RECVTOS here, and we can be liberal because cmsgs + // are opt-in. + (libc::IPPROTO_IP, libc::IP_TOS) | + (libc::IPPROTO_IP, libc::IP_RECVTOS) => unsafe { + ecn_bits = cmsg::decode::(cmsg); + }, + (libc::IPPROTO_IPV6, libc::IPV6_TCLASS) => unsafe { + ecn_bits = cmsg::decode::(cmsg) as u8; + }, + _ => {}, + } + } + + let source = match libc::c_int::from(name.ss_family) { + libc::AF_INET => unsafe { + SocketAddr::V4(std::ptr::read(&name as *const _ as _)) + }, + libc::AF_INET6 => unsafe { + SocketAddr::V6(std::ptr::read(&name as *const _ as _)) + }, + _ => unreachable!(), + }; + + Ok((read as usize, RecvInfo { + from: source, + to: local_addr, + ecn: ecn_bits, + })) +} + +#[cfg(not(target_os = "linux"))] +fn recvmsg( + socket: &mio::net::UdpSocket, local_addr: SocketAddr, buf: &mut [u8], +) -> std::io::Result<(usize, RecvInfo)> { + let (len, from) = socket.recv_from(buf)?; + Ok((len, RecvInfo { + from, + to: local_addr, + ecn: 0, + })) +} + +pub fn recv_from( + socket: &mio::net::UdpSocket, local_addr: SocketAddr, buf: &mut [u8], + enable_ecn: bool, +) -> io::Result<(usize, RecvInfo)> { + if enable_ecn { + set_ecn_support(socket).ok(); + } + + recvmsg(socket, local_addr, buf) +} diff --git a/apps/src/sendto.rs b/apps/src/sendto.rs index 83ab855581..80c23b9fbf 100644 --- a/apps/src/sendto.rs +++ b/apps/src/sendto.rs @@ -93,13 +93,53 @@ fn send_to_gso_pacing( panic!("send_to_gso() should not be called on non-linux platforms"); } +#[cfg(target_os = "linux")] +fn set_ecn_value( + socket: &mio::net::UdpSocket, send_info: &quiche::SendInfo, +) -> std::io::Result<()> { + use crate::common::setsockopt; + use libc::c_int; + use libc::IPPROTO_IP; + use libc::IPPROTO_IPV6; + use libc::IPV6_TCLASS; + use libc::IP_TOS; + use std::os::unix::io::AsRawFd; + + let ecn: u32 = send_info.ecn as u32; + let res6 = unsafe { + setsockopt(socket.as_raw_fd(), IPPROTO_IPV6, IPV6_TCLASS, ecn as c_int) + }; + let res4 = unsafe { + setsockopt(socket.as_raw_fd(), IPPROTO_IP, IP_TOS, ecn as c_int) + }; + if res6.is_ok() { + res6 + } else { + res4 + } +} + +#[cfg(not(target_os = "linux"))] +fn set_ecn_value( + _socket: &mio::net::UdpSocket, _send_info: &quiche::SendInfo, +) -> std::io::Result<()> { + Err(std::io::Error::new( + std::io::ErrorKind::Other, + "ecn not supported", + )) +} + /// A wrapper function of send_to(). /// - when GSO and SO_TXTIME enabled, send a packet using send_to_gso(). /// Otherwise, send packet using socket.send_to(). pub fn send_to( socket: &mio::net::UdpSocket, buf: &[u8], send_info: &quiche::SendInfo, - segment_size: usize, pacing: bool, enable_gso: bool, + segment_size: usize, pacing: bool, enable_gso: bool, enable_ecn: bool, ) -> io::Result { + if enable_ecn { + set_ecn_value(socket, send_info).ok(); + } + if pacing && enable_gso { match send_to_gso_pacing(socket, buf, send_info, segment_size) { Ok(v) => { From 42cc5e1313a18ea00d49c41f831c002e3da6b2a6 Mon Sep 17 00:00:00 2001 From: Quentin De Coninck Date: Thu, 13 Oct 2022 14:05:16 +0200 Subject: [PATCH 05/11] build (windows and nginx) and test (multiarch) fixes --- apps/src/lib.rs | 1 + nginx/nginx-1.16.patch | 6 ++++-- quiche/src/ffi.rs | 4 ++++ quiche/src/lib.rs | 20 ++++++++++++-------- 4 files changed, 21 insertions(+), 10 deletions(-) diff --git a/apps/src/lib.rs b/apps/src/lib.rs index 26c9cb2fb0..483ebff6fd 100644 --- a/apps/src/lib.rs +++ b/apps/src/lib.rs @@ -29,6 +29,7 @@ extern crate log; pub mod args; pub mod client; +#[cfg(target_os = "linux")] pub mod cmsg; pub mod common; pub mod recvfrom; diff --git a/nginx/nginx-1.16.patch b/nginx/nginx-1.16.patch index b0a58f22e1..8ea241c15c 100644 --- a/nginx/nginx-1.16.patch +++ b/nginx/nginx-1.16.patch @@ -20,7 +20,7 @@ Subject: [PATCH] Initial QUIC and HTTP/3 implementation using quiche src/event/ngx_event.h | 4 - src/event/ngx_event_posted.c | 1 + src/event/ngx_event_posted.h | 3 + - src/event/ngx_event_quic.c | 902 +++++++++ + src/event/ngx_event_quic.c | 904 +++++++++ src/event/ngx_event_quic.h | 63 + src/event/ngx_event_udp.c | 39 +- src/http/modules/ngx_http_ssl_module.c | 13 +- @@ -638,7 +638,7 @@ new file mode 100644 index 000000000..bc73cffc9 --- /dev/null +++ b/src/event/ngx_event_quic.c -@@ -0,0 +1,902 @@ +@@ -0,0 +1,904 @@ + +/* + * Copyright (C) Cloudflare, Inc. @@ -866,6 +866,7 @@ index 000000000..bc73cffc9 + c->socklen, + c->local_sockaddr, + c->local_socklen, ++ 0, + }; + + /* Process the client's Initial packet, which was saved into c->buffer by @@ -932,6 +933,7 @@ index 000000000..bc73cffc9 + c->socklen, + c->local_sockaddr, + c->local_socklen, ++ 0, + }; + + ssize_t done = quiche_conn_recv(c->quic->conn, b, n, &recv_info); diff --git a/quiche/src/ffi.rs b/quiche/src/ffi.rs index 4a0804c56f..8b108b51c5 100644 --- a/quiche/src/ffi.rs +++ b/quiche/src/ffi.rs @@ -758,6 +758,8 @@ pub struct SendInfo { to_len: socklen_t, at: timespec, + + ecn: u8, } #[no_mangle] @@ -777,6 +779,8 @@ pub extern fn quiche_conn_send( std_time_to_c(&info.at, &mut out_info.at); + out_info.ecn = info.ecn; + v as ssize_t }, diff --git a/quiche/src/lib.rs b/quiche/src/lib.rs index 22df41a145..3992ba55aa 100644 --- a/quiche/src/lib.rs +++ b/quiche/src/lib.rs @@ -16437,14 +16437,18 @@ mod tests { ecn_ce_count: 1, }); - assert_eq!( - iter.next(), - Some(&frame::Frame::ACK { - ack_delay: 0, - ranges, - ecn_counts, - }) - ); + let ack = iter.next(); + assert!(matches!(ack, Some(&frame::Frame::ACK { .. }))); + let (ack_ranges, ack_ecn_counts) = match ack { + Some(frame::Frame::ACK { + ranges: r, + ecn_counts: ec, + .. + }) => (r, ec), + _ => unreachable!(), + }; + assert_eq!(*ack_ranges, ranges); + assert_eq!(*ack_ecn_counts, ecn_counts); } } From 6f704b6318f12f90e6646899853294749b3da8ab Mon Sep 17 00:00:00 2001 From: Quentin De Coninck Date: Wed, 19 Oct 2022 09:19:58 +0200 Subject: [PATCH 06/11] keep ECN counts when migrating --- quiche/src/ecn.rs | 5 ++ quiche/src/lib.rs | 120 ++++++++++++++++++++++++++++++++++++++++++++- quiche/src/path.rs | 9 +++- 3 files changed, 131 insertions(+), 3 deletions(-) diff --git a/quiche/src/ecn.rs b/quiche/src/ecn.rs index 162c8f5785..6351ccb14e 100644 --- a/quiche/src/ecn.rs +++ b/quiche/src/ecn.rs @@ -223,6 +223,11 @@ impl Ecn { self.state = EcnState::Unsupported; } } + + /// Returns the latest ECN counters. + pub fn ecn_counts(&self) -> [packet::EcnCounts; packet::Epoch::count()] { + self.last_ecn_counts + } } #[cfg(test)] diff --git a/quiche/src/lib.rs b/quiche/src/lib.rs index 3992ba55aa..2ab6018807 100644 --- a/quiche/src/lib.rs +++ b/quiche/src/lib.rs @@ -8441,7 +8441,7 @@ pub mod testing { let info = RecvInfo { to: si.to, from: si.from, - ecn: 0, + ecn: si.ecn, }; conn.recv(&mut pkt, info)?; @@ -16450,6 +16450,124 @@ mod tests { assert_eq!(*ack_ranges, ranges); assert_eq!(*ack_ecn_counts, ecn_counts); } + + #[test] + fn connection_migration_with_ecn() { + let mut config = Config::new(crate::PROTOCOL_VERSION).unwrap(); + config + .load_cert_chain_from_pem_file("examples/cert.crt") + .unwrap(); + config + .load_priv_key_from_pem_file("examples/cert.key") + .unwrap(); + config + .set_application_protos(&[b"proto1", b"proto2"]) + .unwrap(); + config.verify_peer(false); + config.set_active_connection_id_limit(3); + config.set_initial_max_data(20000); + config.set_initial_max_stream_data_bidi_local(20000); + config.set_initial_max_stream_data_bidi_remote(20000); + config.set_initial_max_stream_data_uni(20000); + config.set_initial_max_streams_bidi(10); + config.enable_ecn(true); + + let mut pipe = pipe_with_exchanged_cids(&mut config, 16, 16, 2); + + let server_addr = testing::Pipe::server_addr(); + let client_addr_2 = "127.0.0.1:5678".parse().unwrap(); + + // Case 1: the client first probes the new address, the server too, and + // then migrates. + assert_eq!(pipe.client.probe_path(client_addr_2, server_addr), Ok(1)); + assert_eq!(pipe.advance(), Ok(())); + assert_eq!( + pipe.client.path_event_next(), + Some(PathEvent::Validated(client_addr_2, server_addr)) + ); + assert_eq!(pipe.client.path_event_next(), None); + assert_eq!( + pipe.server.path_event_next(), + Some(PathEvent::New(server_addr, client_addr_2)) + ); + assert_eq!( + pipe.server.path_event_next(), + Some(PathEvent::Validated(server_addr, client_addr_2)) + ); + assert_eq!( + pipe.client.is_path_validated(client_addr_2, server_addr), + Ok(true) + ); + assert_eq!( + pipe.server.is_path_validated(server_addr, client_addr_2), + Ok(true) + ); + // Send some data. + let data_buf = [0x42; 10000]; + assert_eq!(pipe.client.stream_send(0, &data_buf[..], true), Ok(10000)); + assert_eq!(pipe.advance(), Ok(())); + let ecn_counts = pipe + .client + .paths + .get_active() + .unwrap() + .recovery + .ecn + .ecn_counts(); + assert_eq!(pipe.client.migrate(client_addr_2, server_addr), Ok(1)); + assert_eq!(pipe.client.stream_send(4, b"data", true), Ok(4)); + assert_eq!(pipe.advance(), Ok(())); + assert_eq!( + pipe.client + .paths + .get_active() + .expect("no active") + .local_addr(), + client_addr_2 + ); + assert_eq!( + pipe.client + .paths + .get_active() + .expect("no active") + .peer_addr(), + server_addr + ); + assert_eq!( + pipe.server.path_event_next(), + Some(PathEvent::PeerMigrated(server_addr, client_addr_2)) + ); + assert_eq!(pipe.server.path_event_next(), None); + assert_eq!( + pipe.server + .paths + .get_active() + .expect("no active") + .local_addr(), + server_addr + ); + assert_eq!( + pipe.server + .paths + .get_active() + .expect("no active") + .peer_addr(), + client_addr_2 + ); + let ecn_counts_2 = pipe + .client + .paths + .get_active() + .unwrap() + .recovery + .ecn + .ecn_counts(); + assert_eq!(ecn_counts[0], ecn_counts_2[0]); + assert_eq!(ecn_counts[1], ecn_counts_2[1]); + assert_eq!(ecn_counts[2].ect0_count + 1, ecn_counts_2[2].ect0_count); + assert_eq!(ecn_counts[2].ect1_count, ecn_counts_2[2].ect1_count); + assert_eq!(ecn_counts[2].ecn_ce_count, ecn_counts_2[2].ecn_ce_count); + } } pub use crate::packet::ConnectionId; diff --git a/quiche/src/path.rs b/quiche/src/path.rs index 9839c41227..c1384df6cc 100644 --- a/quiche/src/path.rs +++ b/quiche/src/path.rs @@ -37,6 +37,7 @@ use slab::Slab; use crate::Error; use crate::Result; +use crate::packet::EcnCounts; use crate::recovery; use crate::recovery::HandshakeStatus; @@ -769,12 +770,16 @@ impl PathMap { pub fn set_active_path(&mut self, path_id: usize) -> Result<()> { let is_server = self.is_server; - if let Ok(old_active_path) = self.get_active_mut() { + let old_ecn_counts = if let Ok(old_active_path) = self.get_active_mut() { old_active_path.active = false; - } + old_active_path.recovery.ecn.ecn_counts() + } else { + [EcnCounts::default(); crate::packet::Epoch::count()] + }; let new_active_path = self.get_mut(path_id)?; new_active_path.active = true; + new_active_path.recovery.ecn.reset(old_ecn_counts); if is_server { if new_active_path.validated() { From 3d6ef617048ed64f0bfb7110a5901de1a808204f Mon Sep 17 00:00:00 2001 From: Quentin De Coninck Date: Wed, 26 Oct 2022 10:06:41 +0200 Subject: [PATCH 07/11] provide `use_ect1` argument in server config --- apps/src/bin/quiche-server.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/apps/src/bin/quiche-server.rs b/apps/src/bin/quiche-server.rs index e13a9808ea..5d3c36177b 100644 --- a/apps/src/bin/quiche-server.rs +++ b/apps/src/bin/quiche-server.rs @@ -129,6 +129,7 @@ fn main() { config.enable_pacing(pacing); config.enable_ecn(conn_args.enable_ecn); + config.set_ecn_use_ect1(conn_args.use_ect1); let mut keylog = None; From f68a31a05b127661f74daa32483a8f6298f60649 Mon Sep 17 00:00:00 2001 From: Quentin De Coninck Date: Wed, 16 Nov 2022 14:01:36 +0100 Subject: [PATCH 08/11] recvfrom: don't assume memory layout --- apps/src/recvfrom.rs | 25 +++++++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/apps/src/recvfrom.rs b/apps/src/recvfrom.rs index 8430a55733..a0fa402cdf 100644 --- a/apps/src/recvfrom.rs +++ b/apps/src/recvfrom.rs @@ -70,6 +70,10 @@ fn recvmsg( ) -> std::io::Result<(usize, RecvInfo)> { use std::io::IoSliceMut; use std::mem::MaybeUninit; + use std::net::Ipv4Addr; + use std::net::Ipv6Addr; + use std::net::SocketAddrV4; + use std::net::SocketAddrV6; use std::os::unix::io::AsRawFd; use libc::CMSG_LEN; @@ -118,11 +122,24 @@ fn recvmsg( } let source = match libc::c_int::from(name.ss_family) { - libc::AF_INET => unsafe { - SocketAddr::V4(std::ptr::read(&name as *const _ as _)) + libc::AF_INET => { + let addr = + unsafe { &*(&name as *const _ as *const libc::sockaddr_in) }; + let ip = Ipv4Addr::from(addr.sin_addr.s_addr.to_ne_bytes()); + let port = u16::from_be(addr.sin_port); + SocketAddr::V4(SocketAddrV4::new(ip, port)) }, - libc::AF_INET6 => unsafe { - SocketAddr::V6(std::ptr::read(&name as *const _ as _)) + libc::AF_INET6 => { + let addr = + unsafe { &*(&name as *const _ as *const libc::sockaddr_in6) }; + let ip = Ipv6Addr::from(addr.sin6_addr.s6_addr); + let port = u16::from_be(addr.sin6_port); + SocketAddr::V6(SocketAddrV6::new( + ip, + port, + addr.sin6_flowinfo, + addr.sin6_scope_id, + )) }, _ => unreachable!(), }; From 533abc1c4705c4fdc1044702353a70dd250aa979 Mon Sep 17 00:00:00 2001 From: Quentin De Coninck Date: Wed, 10 Jan 2024 17:57:03 +0100 Subject: [PATCH 09/11] remove useless parentheses + clippy fixes --- apps/src/cmsg.rs | 4 ++-- apps/src/recvfrom.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/apps/src/cmsg.rs b/apps/src/cmsg.rs index e69652f188..dff59a9198 100644 --- a/apps/src/cmsg.rs +++ b/apps/src/cmsg.rs @@ -72,7 +72,7 @@ impl<'a> Encoder<'a> { let space = unsafe { libc::CMSG_SPACE(mem::size_of_val(&value) as _) as usize }; assert!( - self.hdr.msg_controllen as usize >= self.len + space, + self.hdr.msg_controllen >= self.len + space, "control message buffer too small" ); let cmsg = self.cmsg.take().expect("no control buffer space remaining"); @@ -107,7 +107,7 @@ impl<'a> Drop for Encoder<'a> { pub unsafe fn decode(cmsg: &libc::cmsghdr) -> T { assert!(mem::align_of::() <= mem::align_of::()); debug_assert_eq!( - cmsg.cmsg_len as usize, + cmsg.cmsg_len, libc::CMSG_LEN(mem::size_of::() as _) as usize ); ptr::read(libc::CMSG_DATA(cmsg) as *const T) diff --git a/apps/src/recvfrom.rs b/apps/src/recvfrom.rs index a0fa402cdf..fe1fac8f87 100644 --- a/apps/src/recvfrom.rs +++ b/apps/src/recvfrom.rs @@ -84,7 +84,7 @@ fn recvmsg( let mut iov = IoSliceMut::new(buf); let mut name = MaybeUninit::::uninit(); let mut ctrl = cmsg::Aligned(MaybeUninit::< - [u8; (std::mem::size_of::() as _)], + [u8; std::mem::size_of::() as _], >::uninit()); let mut hdr = unsafe { std::mem::zeroed::() }; From c24a9aac171cb8a4aacbc41f1deac73380cab169 Mon Sep 17 00:00:00 2001 From: Quentin De Coninck Date: Wed, 19 Oct 2022 17:03:49 +0200 Subject: [PATCH 10/11] prague: initial potential complete implementation --- quiche/src/recovery/bbr/mod.rs | 15 + quiche/src/recovery/bbr2/mod.rs | 15 + quiche/src/recovery/cubic.rs | 15 + quiche/src/recovery/mod.rs | 52 ++- quiche/src/recovery/prague.rs | 588 ++++++++++++++++++++++++++++++++ quiche/src/recovery/reno.rs | 17 +- 6 files changed, 686 insertions(+), 16 deletions(-) create mode 100644 quiche/src/recovery/prague.rs diff --git a/quiche/src/recovery/bbr/mod.rs b/quiche/src/recovery/bbr/mod.rs index 906cf4308f..1e52d9fbf5 100644 --- a/quiche/src/recovery/bbr/mod.rs +++ b/quiche/src/recovery/bbr/mod.rs @@ -42,6 +42,7 @@ pub static BBR: CongestionControlOps = CongestionControlOps { on_packet_sent, on_packets_acked, congestion_event, + process_ecn, collapse_cwnd, checkpoint, rollback, @@ -335,6 +336,20 @@ fn congestion_event( } } +fn process_ecn( + r: &mut Recovery, _newly_ecn_marked_acked: u64, new_ce_marks: u64, + _acked_bytes: usize, largest_sent: &Sent, epoch: packet::Epoch, now: Instant, +) { + if new_ce_marks > 0 { + r.congestion_event( + new_ce_marks as usize * r.max_datagram_size, + largest_sent, + epoch, + now, + ); + } +} + fn collapse_cwnd(r: &mut Recovery) { r.bbr_state.prior_cwnd = per_ack::bbr_save_cwnd(r); diff --git a/quiche/src/recovery/bbr2/mod.rs b/quiche/src/recovery/bbr2/mod.rs index 881b047c10..d29156384f 100644 --- a/quiche/src/recovery/bbr2/mod.rs +++ b/quiche/src/recovery/bbr2/mod.rs @@ -42,6 +42,7 @@ pub static BBR2: CongestionControlOps = CongestionControlOps { on_packet_sent, on_packets_acked, congestion_event, + process_ecn, collapse_cwnd, checkpoint, rollback, @@ -610,6 +611,20 @@ fn congestion_event( } } +fn process_ecn( + r: &mut Recovery, _newly_ecn_marked_acked: u64, new_ce_marks: u64, + _acked_bytes: usize, largest_sent: &Sent, epoch: packet::Epoch, now: Instant, +) { + if new_ce_marks > 0 { + r.congestion_event( + new_ce_marks as usize * r.max_datagram_size, + largest_sent, + epoch, + now, + ); + } +} + fn collapse_cwnd(r: &mut Recovery) { // BBROnEnterRTO() r.bbr2_state.prior_cwnd = per_ack::bbr2_save_cwnd(r); diff --git a/quiche/src/recovery/cubic.rs b/quiche/src/recovery/cubic.rs index fba04e34e6..a5b483e531 100644 --- a/quiche/src/recovery/cubic.rs +++ b/quiche/src/recovery/cubic.rs @@ -51,6 +51,7 @@ pub static CUBIC: CongestionControlOps = CongestionControlOps { on_packet_sent, on_packets_acked, congestion_event, + process_ecn, collapse_cwnd, checkpoint, rollback, @@ -396,6 +397,20 @@ fn congestion_event( } } +fn process_ecn( + r: &mut Recovery, _newly_ecn_marked_acked: u64, new_ce_marks: u64, + _acked_bytes: usize, largest_sent: &Sent, epoch: packet::Epoch, now: Instant, +) { + if new_ce_marks > 0 { + r.congestion_event( + new_ce_marks as usize * r.max_datagram_size, + largest_sent, + epoch, + now, + ); + } +} + fn checkpoint(r: &mut Recovery) { r.cubic_state.prior.congestion_window = r.congestion_window; r.cubic_state.prior.ssthresh = r.ssthresh; diff --git a/quiche/src/recovery/mod.rs b/quiche/src/recovery/mod.rs index 8dfa9b15e8..240ce76db5 100644 --- a/quiche/src/recovery/mod.rs +++ b/quiche/src/recovery/mod.rs @@ -174,6 +174,9 @@ pub struct Recovery { /// Initial congestion window size in terms of packet count. initial_congestion_window_packets: usize, + + /// Prague state. + prague_state: prague::State, } pub struct RecoveryConfig { @@ -313,6 +316,8 @@ impl Recovery { initial_congestion_window_packets: recovery_config .initial_congestion_window_packets, + + prague_state: prague::State::default(), } } @@ -473,7 +478,7 @@ impl Recovery { let mut has_ack_eliciting = false; let mut largest_newly_acked_pkt_num = 0; - let mut largest_newly_acked_size = 0; + let mut newly_acked_size = 0; let mut largest_newly_acked_sent_time = now; let mut newly_ecn_marked_acked = 0; @@ -548,7 +553,7 @@ impl Recovery { } largest_newly_acked_pkt_num = unacked.pkt_num; - largest_newly_acked_size = unacked.size; + newly_acked_size += unacked.size; largest_newly_acked_sent_time = unacked.time_sent; self.acked[epoch].extend(unacked.frames.drain(..)); @@ -638,7 +643,7 @@ impl Recovery { self.process_ecn( newly_ecn_marked_acked, ecn_counts, - largest_newly_acked_size, + newly_acked_size, &largest_sent, epoch, now, @@ -1118,16 +1123,18 @@ impl Recovery { fn process_ecn( &mut self, newly_ecn_marked_acked: u64, - ecn_counts: Option, largest_acked_size: usize, + ecn_counts: Option, acked_size: usize, largest_acked_sent: &Sent, epoch: packet::Epoch, now: Instant, ) { - if self - .ecn - .on_ack_received(epoch, newly_ecn_marked_acked, ecn_counts) > - 0 - { - self.congestion_event( - largest_acked_size, + let new_ce_marks = + self.ecn + .on_ack_received(epoch, newly_ecn_marked_acked, ecn_counts); + if newly_ecn_marked_acked > 0 { + (self.cc_ops.process_ecn)( + self, + newly_ecn_marked_acked, + new_ce_marks, + acked_size, largest_acked_sent, epoch, now, @@ -1199,13 +1206,15 @@ impl Recovery { #[repr(C)] pub enum CongestionControlAlgorithm { /// Reno congestion control algorithm. `reno` in a string form. - Reno = 0, + Reno = 0, /// CUBIC congestion control algorithm (default). `cubic` in a string form. - CUBIC = 1, + CUBIC = 1, /// BBR congestion control algorithm. `bbr` in a string form. - BBR = 2, + BBR = 2, /// BBRv2 congestion control algorithm. `bbr2` in a string form. - BBR2 = 3, + BBR2 = 3, + /// Prague congestion control algorithm. `prague` in a string form. + Prague = 4, } impl FromStr for CongestionControlAlgorithm { @@ -1220,6 +1229,7 @@ impl FromStr for CongestionControlAlgorithm { "cubic" => Ok(CongestionControlAlgorithm::CUBIC), "bbr" => Ok(CongestionControlAlgorithm::BBR), "bbr2" => Ok(CongestionControlAlgorithm::BBR2), + "prague" => Ok(CongestionControlAlgorithm::Prague), _ => Err(crate::Error::CongestionControl), } @@ -1248,6 +1258,16 @@ pub struct CongestionControlOps { now: Instant, ), + pub process_ecn: fn( + r: &mut Recovery, + newly_ecn_marked_acked: u64, + new_ce_marks: u64, + acked_bytes: usize, + sent: &Sent, + epoch: packet::Epoch, + now: Instant, + ), + pub collapse_cwnd: fn(r: &mut Recovery), pub checkpoint: fn(r: &mut Recovery), @@ -1267,6 +1287,7 @@ impl From for &'static CongestionControlOps { CongestionControlAlgorithm::CUBIC => &cubic::CUBIC, CongestionControlAlgorithm::BBR => &bbr::BBR, CongestionControlAlgorithm::BBR2 => &bbr2::BBR2, + CongestionControlAlgorithm::Prague => &prague::PRAGUE, } } } @@ -2374,5 +2395,6 @@ mod cubic; mod delivery_rate; mod hystart; mod pacer; +mod prague; mod prr; mod reno; diff --git a/quiche/src/recovery/prague.rs b/quiche/src/recovery/prague.rs new file mode 100644 index 0000000000..0dece86bd7 --- /dev/null +++ b/quiche/src/recovery/prague.rs @@ -0,0 +1,588 @@ +// Copyright (C) 2022, Cloudflare, Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright notice, +// this list of conditions and the following disclaimer. +// +// * Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS +// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, +// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR +// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +//! Prague Congestion Control + +use std::time::Duration; +use std::time::Instant; + +use crate::packet; +use crate::recovery; +use crate::recovery::reno; + +use crate::recovery::Acked; +use crate::recovery::CongestionControlOps; +use crate::recovery::Recovery; +use crate::recovery::Sent; + +pub static PRAGUE: CongestionControlOps = CongestionControlOps { + on_init, + reset, + on_packet_sent, + on_packets_acked, + congestion_event, + process_ecn, + collapse_cwnd, + checkpoint, + rollback, + has_custom_pacing, + debug_fmt, +}; + +/// The gain of the EWMA (1/16). +const G: f64 = 1.0 / 16.0; + +// Minimum virtual RTT used to reduce the bias of small RTTs. +const RTT_VIRT_MIN: Duration = Duration::from_millis(25); + +// The number of iterations before it starts being in reduced RTT-dependence. +const D: u32 = 500; + +/// Prague State Variables. +#[derive(Debug, Default)] +pub struct State { + /// Largest sent packet number set at the start of the round for EWMA, + /// different from the common state that tracks round for CWR. + largest_sent_pn: [u64; packet::Epoch::count()], + + /// Moving Average of ECN Feedback, as defined in Section 2.3.2 of + /// https://www.ietf.org/archive/id/draft-briscoe-iccrg-prague-congestion-control-01.html + alpha: f64, + + /// The rtt_virt/srtt ratio, ensured to be >= 1.0. + rtt_virt_ratio: f64, + + reduced_due_to_ce: bool, + + /// The number of newly marked CE packets during this RTT. + newly_ce_marked: u64, + + /// The number of newly acknowledged packets during this RTT. + newly_acknowledged: u64, + + /// The number of CE congestion events. + ce_event_cnt: u64, + + /// Time of the first packet sent ever. + first_sent_packet_time: Option, +} + +fn on_init(_r: &mut Recovery) {} + +fn reset(r: &mut Recovery) { + r.prague_state = State::default(); + r.prague_state.alpha = 1.0; +} + +fn on_packet_sent(r: &mut Recovery, sent_bytes: usize, now: Instant) { + if r.prague_state.first_sent_packet_time.is_none() { + r.prague_state.first_sent_packet_time = Some(now); + } + reno::on_packet_sent(r, sent_bytes, now); +} + +fn update_pacer_state(r: &mut Recovery, now: Instant) { + let mut rate = if r.congestion_window < r.ssthresh { + 2.0 * r.congestion_window as f64 + } else { + r.congestion_window as f64 + }; + rate /= r + .smoothed_rtt + .unwrap_or(recovery::INITIAL_RTT) + .as_secs_f64(); + + // Burst queue of at least 250 us. + let burst = rate / (2.0_f64).powi(12); + + r.pacer + .update(burst.round() as usize, rate.round() as u64, now); +} + +fn on_packets_acked( + r: &mut Recovery, packets: &mut Vec, epoch: packet::Epoch, + now: Instant, +) { + for pkt in packets.drain(..) { + on_packet_acked(r, &pkt, epoch, now); + } +} + +fn ca_after_ce(r: &mut Recovery, acked_bytes: usize) { + let increase = (1.0 / r.prague_state.rtt_virt_ratio.powi(2)) * + (acked_bytes as f64 / r.congestion_window as f64); + r.congestion_window += increase.round() as usize; +} + +fn on_packet_acked( + r: &mut Recovery, packet: &Acked, epoch: packet::Epoch, now: Instant, +) { + r.bytes_in_flight = r.bytes_in_flight.saturating_sub(packet.size); + + if r.in_congestion_recovery(packet.time_sent) { + return; + } + + if r.app_limited { + return; + } + + if r.congestion_window < r.ssthresh { + // In Slow slart, bytes_acked_sl is used for counting + // acknowledged bytes. + r.bytes_acked_sl += packet.size; + + if r.hystart.in_css(epoch) { + r.congestion_window += r.hystart.css_cwnd_inc(r.max_datagram_size); + } else { + r.congestion_window += r.max_datagram_size; + } + + if r.hystart.on_packet_acked(epoch, packet, r.latest_rtt, now) { + // Exit to congestion avoidance if CSS ends. + r.ssthresh = r.congestion_window; + } + } else if r.prague_state.reduced_due_to_ce { + ca_after_ce(r, packet.size); + } else { + // Congestion avoidance. + r.bytes_acked_ca += packet.size; + + if r.bytes_acked_ca >= r.congestion_window { + r.bytes_acked_ca -= r.congestion_window; + r.congestion_window += r.max_datagram_size; + } + } +} + +fn congestion_event( + r: &mut Recovery, lost_bytes: usize, largest_lost_pkt: &Sent, + epoch: packet::Epoch, now: Instant, +) { + if r.in_congestion_recovery(largest_lost_pkt.time_sent) { + return; + } + r.prague_state.reduced_due_to_ce = false; + // FIXME only rely on reno for now. We could use cubic instead. + reno::congestion_event(r, lost_bytes, largest_lost_pkt, epoch, now); + + update_pacer_state(r, now); +} + +fn rtt_elapsed(r: &Recovery, epoch: packet::Epoch) -> bool { + r.prague_state.largest_sent_pn[epoch] == 0 || + r.largest_acked_pkt[epoch] > r.prague_state.largest_sent_pn[epoch] +} + +fn update_alpha( + r: &mut Recovery, newly_ecn_marked_acked: u64, new_ce_counts: u64, + epoch: packet::Epoch, +) { + r.prague_state.newly_acknowledged += newly_ecn_marked_acked; + r.prague_state.newly_ce_marked += new_ce_counts; + + // Update alpha only once per RTT. + if !rtt_elapsed(r, epoch) { + return; + } + + if r.prague_state.newly_acknowledged == 0 { + error!("This should not happen"); + return; + } + + let frac = new_ce_counts as f64 / newly_ecn_marked_acked as f64; + r.prague_state.alpha += G * (frac - r.prague_state.alpha); + + // Start a new round. + r.prague_state.largest_sent_pn[epoch] = r.largest_sent_pkt[epoch]; + r.prague_state.newly_acknowledged = 0; + r.prague_state.newly_ce_marked = 0; +} + +fn enter_cwr(r: &mut Recovery, time_sent: Instant) { + r.prague_state.ce_event_cnt += 1; + + let in_congestion_recovery = r.in_congestion_recovery(time_sent); + if in_congestion_recovery { + return; + } + + // When we receive a CE-mark, we set ssthres to + // (1 - alpha/2) * cwnd + // and set cwnd to that value. + let cwnd_float = r.congestion_window as f64; + let reduction = (cwnd_float * r.prague_state.alpha) / 2.0; + r.congestion_window = (cwnd_float - reduction).round() as usize; + + // Ensure at least 2 MSS. + if r.congestion_window < + recovery::MINIMUM_WINDOW_PACKETS * r.max_datagram_size + { + r.congestion_window = r.max_datagram_size; + } + r.ssthresh = r.congestion_window; + + r.prague_state.reduced_due_to_ce = true; +} + +fn update_virt_rtt_ratio(r: &mut Recovery, now: Instant) { + let srtt = r.smoothed_rtt.unwrap_or(recovery::INITIAL_RTT); + let time_since_begin = + now - r.prague_state.first_sent_packet_time.unwrap_or(now); + if srtt >= RTT_VIRT_MIN || time_since_begin <= srtt * D { + r.prague_state.rtt_virt_ratio = 1.0; + } else { + r.prague_state.rtt_virt_ratio = + RTT_VIRT_MIN.as_secs_f64() / srtt.as_secs_f64(); + } +} + +fn process_ecn( + r: &mut Recovery, newly_ecn_marked_acked: u64, new_ce_marks: u64, + acked_bytes: usize, largest_sent: &Sent, epoch: packet::Epoch, now: Instant, +) { + if newly_ecn_marked_acked > 0 { + update_alpha(r, newly_ecn_marked_acked, new_ce_marks, epoch); + } + if new_ce_marks == 0 { + return; + } + trace!( + "{} bytes were ACKed with {} packets newly CE marked", + acked_bytes, + new_ce_marks + ); + + update_virt_rtt_ratio(r, now); + + // Do not re-enter CWR twice in a RTT. + if !rtt_elapsed(r, epoch) { + return; + } + + enter_cwr(r, largest_sent.time_sent); + + update_pacer_state(r, now); + + r.prague_state.largest_sent_pn[epoch] = r.largest_sent_pkt[epoch]; +} + +fn collapse_cwnd(r: &mut Recovery) { + reno::collapse_cwnd(r); +} + +fn checkpoint(_r: &mut Recovery) {} + +fn rollback(_r: &mut Recovery) -> bool { + // From reno + true +} + +fn has_custom_pacing() -> bool { + true +} + +fn debug_fmt(_r: &Recovery, _f: &mut std::fmt::Formatter) -> std::fmt::Result { + Ok(()) +} + +#[cfg(test)] +mod tests { + use crate::recovery; + + use super::*; + use smallvec::smallvec; + + #[test] + fn prague_init() { + let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap(); + cfg.set_cc_algorithm(recovery::CongestionControlAlgorithm::Prague); + + let r = Recovery::new(&cfg); + + assert!(r.cwnd() > 0); + assert_eq!(r.bytes_in_flight, 0); + } + + #[test] + fn prague_send() { + let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap(); + cfg.set_cc_algorithm(recovery::CongestionControlAlgorithm::Prague); + + let mut r = Recovery::new(&cfg); + + let now = Instant::now(); + + r.on_packet_sent_cc(1000, now); + + assert_eq!(r.bytes_in_flight, 1000); + } + + #[test] + fn prague_slow_start() { + let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap(); + cfg.set_cc_algorithm(recovery::CongestionControlAlgorithm::Prague); + + let mut r = Recovery::new(&cfg); + + let now = Instant::now(); + + let p = recovery::Sent { + pkt_num: 0, + frames: smallvec![], + time_sent: now, + time_acked: None, + time_lost: None, + size: r.max_datagram_size, + ack_eliciting: true, + in_flight: true, + delivered: 0, + delivered_time: std::time::Instant::now(), + first_sent_time: std::time::Instant::now(), + is_app_limited: false, + tx_in_flight: 0, + lost: 0, + has_data: false, + ecn_marked: false, + }; + + // Send initcwnd full MSS packets to become no longer app limited + for _ in 0..r.initial_congestion_window_packets { + r.on_packet_sent_cc(p.size, now); + } + + let cwnd_prev = r.cwnd(); + + let mut acked = vec![Acked { + pkt_num: p.pkt_num, + time_sent: p.time_sent, + size: p.size, + delivered: 0, + delivered_time: now, + first_sent_time: now, + is_app_limited: false, + tx_in_flight: 0, + lost: 0, + rtt: Duration::ZERO, + }]; + + r.on_packets_acked(&mut acked, packet::Epoch::Application, now); + + // Check if cwnd increased by packet size (slow start). + assert_eq!(r.cwnd(), cwnd_prev + p.size); + } + + #[test] + fn prague_slow_start_multi_acks() { + let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap(); + cfg.set_cc_algorithm(recovery::CongestionControlAlgorithm::Prague); + + let mut r = Recovery::new(&cfg); + + let now = Instant::now(); + + let p = recovery::Sent { + pkt_num: 0, + frames: smallvec![], + time_sent: now, + time_acked: None, + time_lost: None, + size: r.max_datagram_size, + ack_eliciting: true, + in_flight: true, + delivered: 0, + delivered_time: std::time::Instant::now(), + first_sent_time: std::time::Instant::now(), + is_app_limited: false, + tx_in_flight: 0, + lost: 0, + has_data: false, + ecn_marked: false, + }; + + // Send initcwnd full MSS packets to become no longer app limited + for _ in 0..r.initial_congestion_window_packets { + r.on_packet_sent_cc(p.size, now); + } + + let cwnd_prev = r.cwnd(); + + let mut acked = vec![ + Acked { + pkt_num: p.pkt_num, + time_sent: p.time_sent, + size: p.size, + delivered: 0, + delivered_time: now, + first_sent_time: now, + is_app_limited: false, + tx_in_flight: 0, + lost: 0, + rtt: Duration::ZERO, + }, + Acked { + pkt_num: p.pkt_num, + time_sent: p.time_sent, + size: p.size, + delivered: 0, + delivered_time: now, + first_sent_time: now, + is_app_limited: false, + tx_in_flight: 0, + lost: 0, + rtt: Duration::ZERO, + }, + Acked { + pkt_num: p.pkt_num, + time_sent: p.time_sent, + size: p.size, + delivered: 0, + delivered_time: now, + first_sent_time: now, + is_app_limited: false, + tx_in_flight: 0, + lost: 0, + rtt: Duration::ZERO, + }, + ]; + + r.on_packets_acked(&mut acked, packet::Epoch::Application, now); + + // Acked 3 packets. + assert_eq!(r.cwnd(), cwnd_prev + p.size * 3); + } + + #[test] + fn prague_congestion_event() { + let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap(); + cfg.set_cc_algorithm(recovery::CongestionControlAlgorithm::Prague); + + let mut r = Recovery::new(&cfg); + + let prev_cwnd = r.cwnd(); + + let now = Instant::now(); + + let p = recovery::Sent { + pkt_num: 0, + frames: smallvec![], + time_sent: now, + time_acked: None, + time_lost: None, + size: r.max_datagram_size, + ack_eliciting: true, + in_flight: true, + delivered: 0, + delivered_time: std::time::Instant::now(), + first_sent_time: std::time::Instant::now(), + is_app_limited: false, + has_data: false, + tx_in_flight: 0, + lost: 0, + ecn_marked: false, + }; + + r.congestion_event( + r.max_datagram_size, + &p, + packet::Epoch::Application, + now, + ); + + // In Reno, after congestion event, cwnd will be cut in half. + assert_eq!(prev_cwnd / 2, r.cwnd()); + } + + #[test] + fn prague_congestion_avoidance() { + let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap(); + cfg.set_cc_algorithm(recovery::CongestionControlAlgorithm::Prague); + + let mut r = Recovery::new(&cfg); + let now = Instant::now(); + let prev_cwnd = r.cwnd(); + + // Fill up bytes_in_flight to avoid app_limited=true + r.on_packet_sent_cc(20000, now); + + let p = recovery::Sent { + pkt_num: 0, + frames: smallvec![], + time_sent: now, + time_acked: None, + time_lost: None, + size: r.max_datagram_size, + ack_eliciting: true, + in_flight: true, + delivered: 0, + delivered_time: std::time::Instant::now(), + first_sent_time: std::time::Instant::now(), + is_app_limited: false, + has_data: false, + tx_in_flight: 0, + lost: 0, + ecn_marked: false, + }; + + // Trigger congestion event to update ssthresh + r.congestion_event( + r.max_datagram_size, + &p, + packet::Epoch::Application, + now, + ); + + // After congestion event, cwnd will be reduced. + let cur_cwnd = + (prev_cwnd as f64 * recovery::LOSS_REDUCTION_FACTOR) as usize; + assert_eq!(r.cwnd(), cur_cwnd); + + let rtt = Duration::from_millis(100); + + let mut acked = vec![Acked { + pkt_num: 0, + // To exit from recovery + time_sent: now + rtt, + // More than cur_cwnd to increase cwnd + size: 8000, + delivered: 0, + delivered_time: now, + first_sent_time: now, + is_app_limited: false, + tx_in_flight: 0, + lost: 0, + rtt: Duration::ZERO, + }]; + + // Ack more than cwnd bytes with rtt=100ms + r.update_rtt(rtt, Duration::from_millis(0), now); + r.on_packets_acked(&mut acked, packet::Epoch::Application, now + rtt * 2); + + // After acking more than cwnd, expect cwnd increased by MSS + assert_eq!(r.cwnd(), cur_cwnd + r.max_datagram_size); + } +} diff --git a/quiche/src/recovery/reno.rs b/quiche/src/recovery/reno.rs index 41009c4850..c1e1dcfe0e 100644 --- a/quiche/src/recovery/reno.rs +++ b/quiche/src/recovery/reno.rs @@ -45,6 +45,7 @@ pub static RENO: CongestionControlOps = CongestionControlOps { on_packet_sent, on_packets_acked, congestion_event, + process_ecn, collapse_cwnd, checkpoint, rollback, @@ -108,7 +109,7 @@ fn on_packet_acked( } } -fn congestion_event( +pub(crate) fn congestion_event( r: &mut Recovery, _lost_bytes: usize, largest_lost_pkt: &Sent, epoch: packet::Epoch, now: Instant, ) { @@ -139,6 +140,20 @@ fn congestion_event( } } +fn process_ecn( + r: &mut Recovery, _newly_ecn_marked_acked: u64, new_ce_marks: u64, + _acked_bytes: usize, largest_sent: &Sent, epoch: packet::Epoch, now: Instant, +) { + if new_ce_marks > 0 { + r.congestion_event( + new_ce_marks as usize * r.max_datagram_size, + largest_sent, + epoch, + now, + ); + } +} + pub fn collapse_cwnd(r: &mut Recovery) { r.congestion_window = r.max_datagram_size * recovery::MINIMUM_WINDOW_PACKETS; r.bytes_acked_sl = 0; From 4812368d060bc6f0a25f34bef1552ea0c5213275 Mon Sep 17 00:00:00 2001 From: Quentin De Coninck Date: Wed, 26 Oct 2022 17:58:15 +0200 Subject: [PATCH 11/11] prague: add a first test and fixes --- quiche/src/recovery/prague.rs | 188 +++++++++++++++++++++++++++++++++- 1 file changed, 184 insertions(+), 4 deletions(-) diff --git a/quiche/src/recovery/prague.rs b/quiche/src/recovery/prague.rs index 0dece86bd7..f24c4705e4 100644 --- a/quiche/src/recovery/prague.rs +++ b/quiche/src/recovery/prague.rs @@ -72,10 +72,13 @@ pub struct State { /// https://www.ietf.org/archive/id/draft-briscoe-iccrg-prague-congestion-control-01.html alpha: f64, + frac_acked_without_ce: f64, + /// The rtt_virt/srtt ratio, ensured to be >= 1.0. rtt_virt_ratio: f64, reduced_due_to_ce: bool, + saw_ce_mark: bool, /// The number of newly marked CE packets during this RTT. newly_ce_marked: u64, @@ -133,8 +136,14 @@ fn on_packets_acked( fn ca_after_ce(r: &mut Recovery, acked_bytes: usize) { let increase = (1.0 / r.prague_state.rtt_virt_ratio.powi(2)) * - (acked_bytes as f64 / r.congestion_window as f64); - r.congestion_window += increase.round() as usize; + acked_bytes as f64 * + r.prague_state.frac_acked_without_ce; + r.bytes_acked_ca += increase.round() as usize; + + if r.bytes_acked_ca >= r.congestion_window { + r.bytes_acked_ca -= r.congestion_window; + r.congestion_window += r.max_datagram_size; + } } fn on_packet_acked( @@ -214,8 +223,16 @@ fn update_alpha( return; } + // If this is the first CE mark, set alpha to 1. let frac = new_ce_counts as f64 / newly_ecn_marked_acked as f64; - r.prague_state.alpha += G * (frac - r.prague_state.alpha); + if !r.prague_state.saw_ce_mark && new_ce_counts > 0 { + r.prague_state.alpha = 1.0; + r.prague_state.saw_ce_mark = true; + } else { + r.prague_state.alpha += G * (frac - r.prague_state.alpha); + } + // Register the fraction of Non CE-marked / total acked for additive increase. + r.prague_state.frac_acked_without_ce = 1.0 - frac; // Start a new round. r.prague_state.largest_sent_pn[epoch] = r.largest_sent_pkt[epoch]; @@ -242,7 +259,8 @@ fn enter_cwr(r: &mut Recovery, time_sent: Instant) { if r.congestion_window < recovery::MINIMUM_WINDOW_PACKETS * r.max_datagram_size { - r.congestion_window = r.max_datagram_size; + r.congestion_window = + recovery::MINIMUM_WINDOW_PACKETS * r.max_datagram_size; } r.ssthresh = r.congestion_window; @@ -312,6 +330,7 @@ fn debug_fmt(_r: &Recovery, _f: &mut std::fmt::Formatter) -> std::fmt::Result { #[cfg(test)] mod tests { + use crate::packet::EcnCounts; use crate::recovery; use super::*; @@ -585,4 +604,165 @@ mod tests { // After acking more than cwnd, expect cwnd increased by MSS assert_eq!(r.cwnd(), cur_cwnd + r.max_datagram_size); } + + #[test] + fn prague_process_ecn() { + let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap(); + cfg.set_cc_algorithm(recovery::CongestionControlAlgorithm::Prague); + cfg.ecn_enabled = true; + + let mut r = Recovery::new(&cfg); + let now = Instant::now(); + let mut total_sent = 0; + + let p = recovery::Sent { + pkt_num: 0, + frames: smallvec![], + time_sent: now, + time_acked: None, + time_lost: None, + size: r.max_datagram_size, + ack_eliciting: true, + in_flight: true, + delivered: 0, + delivered_time: std::time::Instant::now(), + first_sent_time: std::time::Instant::now(), + is_app_limited: false, + tx_in_flight: 0, + lost: 0, + has_data: false, + ecn_marked: false, + }; + + // Send initcwnd full MSS packets to become no longer app limited + for _ in 0..r.initial_congestion_window_packets { + r.on_packet_sent_cc(p.size, now); + total_sent += p.size; + } + + let cwnd_prev = r.cwnd(); + + let a = Acked { + pkt_num: p.pkt_num, + time_sent: p.time_sent, + size: p.size, + delivered: 0, + delivered_time: now, + first_sent_time: now, + is_app_limited: false, + tx_in_flight: 0, + lost: 0, + rtt: Duration::ZERO, + }; + + let mut acked = vec![a.clone(); r.initial_congestion_window_packets]; + + // Let's have one packet that received CE mark. + r.process_ecn( + r.initial_congestion_window_packets as u64, + Some(EcnCounts { + ect0_count: r.initial_congestion_window_packets as u64 - 1, + ect1_count: 0, + ecn_ce_count: 1, + }), + total_sent, + &p, + packet::Epoch::Application, + now, + ); + + r.on_packets_acked(&mut acked, packet::Epoch::Application, now); + + // Alpha should now be 1. + assert_eq!(r.prague_state.alpha, 1.0); + assert_eq!(r.ssthresh, cwnd_prev / 2); + assert_eq!(r.bytes_in_flight, 0); + // We increase of 12000 * 0.9 = 10800. As we only increase cwnd per + // max_datagram_size steps, this means we should increase by 1200 the + // cwnd and keep (10800 - 6000) = 4800 in bytes_acked_ca. + assert_eq!(r.cwnd(), r.ssthresh + r.max_datagram_size); + assert_eq!(r.bytes_acked_ca, 4800); + + // Make a round without any ECN feedback. + total_sent = 0; + let cwnd_prev = r.cwnd(); + let sshthres_prev = r.ssthresh; + let nb_sent = cwnd_prev / r.max_datagram_size; + for _ in 0..nb_sent { + r.on_packet_sent_cc(p.size, now); + total_sent += p.size; + } + let mut acked = vec![a.clone(); nb_sent]; + let tot_ect0 = + r.initial_congestion_window_packets as u64 - 1 + nb_sent as u64; + + r.process_ecn( + nb_sent as u64, + Some(EcnCounts { + ect0_count: tot_ect0, + ect1_count: 0, + ecn_ce_count: 1, + }), + total_sent, + &p, + packet::Epoch::Application, + now, + ); + + r.on_packets_acked(&mut acked, packet::Epoch::Application, now); + + // Alpha should be 1 - G. + assert_eq!(r.prague_state.alpha, 1.0 - G); + assert_eq!(r.ssthresh, sshthres_prev); + assert_eq!(r.bytes_in_flight, 0); + // We increase of 7200. As we only increase cwnd per + // max_datagram_size steps, this means we should increase by 1200 the + // cwnd and keep (4800 + 7200 - 7200) = 4800 in bytes_acked_ca. + assert_eq!(r.cwnd(), cwnd_prev + r.max_datagram_size); + assert_eq!(r.bytes_acked_ca, 4800); + + // A last round with 2 CE feedbacks. + total_sent = 0; + let cwnd_prev = r.cwnd(); + let alpha_prev = r.prague_state.alpha; + let nb_sent = cwnd_prev / r.max_datagram_size; + for _ in 0..nb_sent { + r.on_packet_sent_cc(p.size, now); + total_sent += p.size; + } + let mut acked = vec![a.clone(); nb_sent]; + + r.process_ecn( + nb_sent as u64, + Some(EcnCounts { + ect0_count: tot_ect0 + nb_sent as u64 - 2, + ect1_count: 0, + ecn_ce_count: 3, + }), + total_sent, + &p, + packet::Epoch::Application, + now, + ); + + r.on_packets_acked(&mut acked, packet::Epoch::Application, now); + + // Alpha should be (1 - G) + G * (2 / 7 - (1 - G)). + assert_eq!( + r.prague_state.alpha, + alpha_prev + G * (2.0 / 7.0 - alpha_prev) + ); + assert_eq!( + r.ssthresh, + ((1.0 - r.prague_state.alpha / 2.0) * cwnd_prev as f64).round() + as usize + ); + assert_eq!(r.bytes_in_flight, 0); + // We increase of 8400 * 5 / 7 = 6000. The cwnd being 4634 and the + // remaining CA being 4800, we increase of two packets. + assert_eq!(r.cwnd(), r.ssthresh + 2 * r.max_datagram_size); + // This means (4800 + 6000 - 4634 - 5834) = 332. 331 because of + // rounding... + assert_eq!(r.bytes_acked_ca, 331); + } }