Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: ack receiver timestamps #1992

Open
wants to merge 30 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
bdd1115
Implement TransportParameters
sterlingdeng Sep 19, 2024
cb6d5c4
Implement AckTimestampsConfig
sterlingdeng Sep 19, 2024
8dc4579
Implement Ack Timestamps Frame
sterlingdeng Sep 19, 2024
d83b684
Implement ReceiverTimestamp data structure
sterlingdeng Sep 19, 2024
3366153
small cleanup
sterlingdeng Sep 19, 2024
94cb098
Process TransportParameters from peer if available
sterlingdeng Sep 19, 2024
76ddf51
Update spaces to include ReceiverTimestamps
sterlingdeng Sep 19, 2024
a5dae4a
Process ACK frames with timestamps
sterlingdeng Sep 19, 2024
ead292b
Send ACK frame with timestamps
sterlingdeng Sep 19, 2024
c98b93f
Update congestion controller interface
sterlingdeng Sep 19, 2024
3fd8f2e
Fix lint
sterlingdeng Sep 19, 2024
3af9df4
Remove feature flag
sterlingdeng Sep 24, 2024
6c56bc1
Set created_at time on Connection
sterlingdeng Sep 24, 2024
9d9ee23
minor cleanup
sterlingdeng Sep 24, 2024
e47ade6
Fix lint
sterlingdeng Sep 24, 2024
6ac89db
Fix unit tests
sterlingdeng Sep 25, 2024
96153e6
simplify into on_ack_received
sterlingdeng Sep 26, 2024
ea3bb6a
add comment
sterlingdeng Sep 26, 2024
caa86bd
revert some changes
sterlingdeng Sep 27, 2024
1e3c936
fix easier PR comments
sterlingdeng Oct 1, 2024
1cc4043
refactor on nits
sterlingdeng Oct 1, 2024
67af041
refactor transport parameter
sterlingdeng Oct 1, 2024
55d207d
refactor nits
sterlingdeng Oct 1, 2024
e85e8f1
consistent naming
sterlingdeng Oct 1, 2024
64acada
lint and test
sterlingdeng Oct 1, 2024
fa004c5
refactor some unidiomatic code
sterlingdeng Oct 2, 2024
53bcc35
refactor some unwraps
sterlingdeng Oct 2, 2024
7bee845
fix lint
sterlingdeng Oct 2, 2024
8bc98b1
timestamp as duration
Oct 22, 2024
bb8df6b
best effort timestamp send
Oct 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions quinn-proto/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub struct TransportConfig {
pub(crate) min_mtu: u16,
pub(crate) mtu_discovery_config: Option<MtuDiscoveryConfig>,
pub(crate) ack_frequency_config: Option<AckFrequencyConfig>,
pub(crate) ack_timestamps_config: AckTimestampsConfig,

pub(crate) persistent_congestion_threshold: u32,
pub(crate) keep_alive_interval: Option<Duration>,
Expand Down Expand Up @@ -223,6 +224,21 @@ impl TransportConfig {
self
}

/// Specifies the ACK timestamp config.
/// Defaults to `None`, which disables receiving acknowledgement timestamps from the sender.
/// If `Some`, TransportParameters are sent to the peer to enable acknowledgement timestamps
/// if supported.
pub fn max_ack_timestamps(&mut self, value: VarInt) -> &mut Self {
self.ack_timestamps_config.max_timestamps_per_ack = Some(value);
Comment on lines +231 to +232
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub fn max_ack_timestamps(&mut self, value: VarInt) -> &mut Self {
self.ack_timestamps_config.max_timestamps_per_ack = Some(value);
pub fn max_ack_timestamps(&mut self, value: Option<VarInt>) -> &mut Self {
self.ack_timestamps_config.max_timestamps_per_ack = value;
  • What happens if someone specifies Some(0)? Document.
  • Is it useful to users for the quantity to be configurable, or should this just be a bool flag? We don't expose configuration for the maximum number of ACK ranges, for example.

self
}

/// Specifies the exponent used when encoding the timestamps.
pub fn ack_timestamps_exponent(&mut self, value: VarInt) -> &mut Self {
self.ack_timestamps_config.exponent = value;
self
}

/// Number of consecutive PTOs after which network is considered to be experiencing persistent congestion.
pub fn persistent_congestion_threshold(&mut self, value: u32) -> &mut Self {
self.persistent_congestion_threshold = value;
Expand Down Expand Up @@ -360,6 +376,8 @@ impl Default for TransportConfig {
congestion_controller_factory: Arc::new(congestion::CubicConfig::default()),

enable_segmentation_offload: true,

ack_timestamps_config: AckTimestampsConfig::default(),
}
}
}
Expand Down Expand Up @@ -390,6 +408,7 @@ impl fmt::Debug for TransportConfig {
deterministic_packet_numbers: _,
congestion_controller_factory: _,
enable_segmentation_offload,
ack_timestamps_config,
} = self;
fmt.debug_struct("TransportConfig")
.field("max_concurrent_bidi_streams", max_concurrent_bidi_streams)
Expand All @@ -416,10 +435,44 @@ impl fmt::Debug for TransportConfig {
.field("datagram_send_buffer_size", datagram_send_buffer_size)
.field("congestion_controller_factory", &"[ opaque ]")
.field("enable_segmentation_offload", enable_segmentation_offload)
.field("ack_timestamps_config", ack_timestamps_config)
.finish()
}
}

/// Parameters for controlling the peer's acknowledgements with receiver timestamps.
#[derive(Clone, Debug, PartialEq, Eq, Copy)]
pub struct AckTimestampsConfig {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is public, then the main config struct should have a single config setter that accepts an Option<AckTimestampsConfig>. If you want setters for individual fields directly on the main struct, then it's confusing for this to be public.

/// If max_timestamp_per_ack is None, this feature is disabled.
pub(crate) max_timestamps_per_ack: Option<VarInt>,
pub(crate) exponent: VarInt,
}

impl AckTimestampsConfig {
/// Sets the maximum number of timestamp entries per ACK frame.
pub fn max_timestamps_per_ack(&mut self, value: VarInt) -> &mut Self {
self.max_timestamps_per_ack = Some(value);
self
}

/// Timestamp values are divided by the exponent value provided. This reduces the size of the
/// VARINT for loss in precision. A exponent of 0 represents microsecond precision.
pub fn exponent(&mut self, value: VarInt) -> &mut Self {
self.exponent = value;
self
}
}

impl Default for AckTimestampsConfig {
fn default() -> Self {
Self {
max_timestamps_per_ack: None,
// Default to 0 as per draft.
exponent: 0u32.into(),
}
}
}

/// Parameters for controlling the peer's acknowledgement frequency
///
/// The parameters provided in this config will be sent to the peer at the beginning of the
Expand Down
17 changes: 16 additions & 1 deletion quinn-proto/src/congestion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use crate::connection::RttEstimator;
use std::any::Any;
use std::sync::Arc;
use std::time::Instant;
use std::time::{Duration, Instant};

mod bbr;
mod cubic;
Expand Down Expand Up @@ -34,6 +34,21 @@ pub trait Controller: Send + Sync {
) {
}

#[allow(unused_variables)]
/// Packet deliveries were confirmed with timestamps information.
fn on_ack_timestamped(
&mut self,
pn: u64,
now: Instant,
sent: Instant,
received: Option<Duration>,
bytes: u64,
app_limited: bool,
rtt: &RttEstimator,
) {
self.on_ack(now, sent, bytes, app_limited, rtt);
}

/// Packets are acked in batches, all with the same `now` argument. This indicates one of those batches has completed.
#[allow(unused_variables)]
fn on_end_acks(
Expand Down
75 changes: 68 additions & 7 deletions quinn-proto/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{
};

use bytes::{Bytes, BytesMut};
use frame::StreamMetaVec;
use frame::{AckTimestampEncodeParams, StreamMetaVec};
use rand::{rngs::StdRng, Rng, SeedableRng};
use thiserror::Error;
use tracing::{debug, error, trace, trace_span, warn};
Expand All @@ -18,10 +18,9 @@ use crate::{
cid_generator::ConnectionIdGenerator,
cid_queue::CidQueue,
coding::BufMutExt,
config::{ServerConfig, TransportConfig},
config::{AckTimestampsConfig, ServerConfig, TransportConfig},
crypto::{self, KeyPair, Keys, PacketKey},
frame,
frame::{Close, Datagram, FrameStruct},
frame::{self, Close, Datagram, FrameStruct},
packet::{
FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, LongType, Packet,
PacketNumber, PartialDecode, SpaceId,
Expand Down Expand Up @@ -65,7 +64,10 @@ use paths::{PathData, PathResponses};

mod send_buffer;

mod spaces;
mod receiver_timestamps;
pub(crate) use receiver_timestamps::{PacketTimestamp, ReceiverTimestamps};

pub(crate) mod spaces;
#[cfg(fuzzing)]
pub use spaces::Retransmits;
#[cfg(not(fuzzing))]
Expand Down Expand Up @@ -227,6 +229,10 @@ pub struct Connection {
/// no outgoing application data.
app_limited: bool,

// Ack Receive Timestamps
// The timestamp config of the peer.
ack_timestamps_cfg: AckTimestampsConfig,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Option?


streams: StreamsState,
/// Surplus remote CIDs for future use on new paths
rem_cids: CidQueue,
Expand All @@ -238,6 +244,8 @@ pub struct Connection {
stats: ConnectionStats,
/// QUIC version used for the connection.
version: u32,
/// Created at time instant.
epoch: Instant,
}

impl Connection {
Expand Down Expand Up @@ -337,6 +345,8 @@ impl Connection {
&TransportParameters::default(),
)),

ack_timestamps_cfg: AckTimestampsConfig::default(),

pto_count: 0,

app_limited: false,
Expand All @@ -357,6 +367,7 @@ impl Connection {
rng,
stats: ConnectionStats::default(),
version,
epoch: now,
};
if side.is_client() {
// Kick off the connection
Expand Down Expand Up @@ -817,6 +828,7 @@ impl Connection {
&mut self.spaces[space_id],
buf,
&mut self.stats,
self.ack_timestamps_cfg,
);
}

Expand Down Expand Up @@ -1343,6 +1355,7 @@ impl Connection {
if ack.largest >= self.spaces[space].next_packet_number {
return Err(TransportError::PROTOCOL_VIOLATION("unsent packet acked"));
}

let new_largest = {
let space = &mut self.spaces[space];
if space
Expand Down Expand Up @@ -1371,6 +1384,23 @@ impl Connection {
}
}

let timestamp_iter = ack.timestamps_iter(self.config.ack_timestamps_config.exponent.0);
if let (Some(max), Some(iter)) = (
self.config.ack_timestamps_config.max_timestamps_per_ack,
timestamp_iter,
) {
let packet_space = &mut self.spaces[space];
for (i, pkt) in iter.enumerate() {
if i > max.0 as usize {
warn!("peer is sending more timestamps than max requested");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The draft describes this as non-conformant behavior. Should this be a transport error?

break;
}
if let Some(sent_packet) = packet_space.get_mut_sent_packet(pkt.packet_number) {
sent_packet.time_received = Some(pkt.timestamp);
}
}
}

if newly_acked.is_empty() {
return Ok(());
}
Expand Down Expand Up @@ -1490,9 +1520,11 @@ impl Connection {
if info.ack_eliciting && self.path.challenge.is_none() {
// Only pass ACKs to the congestion controller if we are not validating the current
// path, so as to ignore any ACKs from older paths still coming in.
self.path.congestion.on_ack(
self.path.congestion.on_ack_timestamped(
pn,
now,
info.time_sent,
info.time_received,
info.size.into(),
self.app_limited,
&self.path.rtt,
Expand Down Expand Up @@ -3047,6 +3079,7 @@ impl Connection {
space,
buf,
&mut self.stats,
self.ack_timestamps_cfg,
);
}

Expand Down Expand Up @@ -3231,6 +3264,7 @@ impl Connection {
space: &mut PacketSpace,
buf: &mut Vec<u8>,
stats: &mut ConnectionStats,
ack_timestamps_config: AckTimestampsConfig,
) {
debug_assert!(!space.pending_acks.ranges().is_empty());

Expand All @@ -3255,7 +3289,26 @@ impl Connection {
delay_micros
);

frame::Ack::encode(delay as _, space.pending_acks.ranges(), ecn, buf);
frame::Ack::encode(
delay as _,
space.pending_acks.ranges(),
ecn,
ack_timestamps_config.max_timestamps_per_ack.map(|max| {
AckTimestampEncodeParams {
// Safety: If peer_timestamp_config is set, receiver_timestamps must be set.
receiver_timestamps: space.pending_acks.receiver_timestamps_as_ref().unwrap(),
exponent: ack_timestamps_config.exponent.0,
max_timestamps: max.0,
}
}),
buf,
);

if let Some(ts) = space.pending_acks.receiver_timestamps_as_mut() {
// Best effort / one try to send the timestamps to the peer.
ts.clear();
}

stats.frame_tx.acks += 1;
}

Expand Down Expand Up @@ -3309,6 +3362,14 @@ impl Connection {
self.path.mtud.on_peer_max_udp_payload_size_received(
u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX),
);
self.ack_timestamps_cfg = params.ack_timestamps_cfg;
if let Some(max) = params.ack_timestamps_cfg.max_timestamps_per_ack {
for space in self.spaces.iter_mut() {
space
.pending_acks
.set_receiver_timestamp(max.0 as usize, self.epoch);
}
};
}

fn decrypt_packet(
Expand Down
2 changes: 2 additions & 0 deletions quinn-proto/src/connection/packet_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,8 @@ impl PacketBuilder {
ack_eliciting,
retransmits: sent.retransmits,
stream_frames: sent.stream_frames,

time_received: None,
};

conn.path
Expand Down
Loading
Loading