Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: ack receiver timestamps #1992

Open
wants to merge 30 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
bdd1115
Implement TransportParameters
sterlingdeng Sep 19, 2024
cb6d5c4
Implement AckTimestampsConfig
sterlingdeng Sep 19, 2024
8dc4579
Implement Ack Timestamps Frame
sterlingdeng Sep 19, 2024
d83b684
Implement ReceiverTimestamp data structure
sterlingdeng Sep 19, 2024
3366153
small cleanup
sterlingdeng Sep 19, 2024
94cb098
Process TransportParameters from peer if available
sterlingdeng Sep 19, 2024
76ddf51
Update spaces to include ReceiverTimestamps
sterlingdeng Sep 19, 2024
a5dae4a
Process ACK frames with timestamps
sterlingdeng Sep 19, 2024
ead292b
Send ACK frame with timestamps
sterlingdeng Sep 19, 2024
c98b93f
Update congestion controller interface
sterlingdeng Sep 19, 2024
3fd8f2e
Fix lint
sterlingdeng Sep 19, 2024
3af9df4
Remove feature flag
sterlingdeng Sep 24, 2024
6c56bc1
Set created_at time on Connection
sterlingdeng Sep 24, 2024
9d9ee23
minor cleanup
sterlingdeng Sep 24, 2024
e47ade6
Fix lint
sterlingdeng Sep 24, 2024
6ac89db
Fix unit tests
sterlingdeng Sep 25, 2024
96153e6
simplify into on_ack_received
sterlingdeng Sep 26, 2024
ea3bb6a
add comment
sterlingdeng Sep 26, 2024
caa86bd
revert some changes
sterlingdeng Sep 27, 2024
1e3c936
fix easier PR comments
sterlingdeng Oct 1, 2024
1cc4043
refactor on nits
sterlingdeng Oct 1, 2024
67af041
refactor transport parameter
sterlingdeng Oct 1, 2024
55d207d
refactor nits
sterlingdeng Oct 1, 2024
e85e8f1
consistent naming
sterlingdeng Oct 1, 2024
64acada
lint and test
sterlingdeng Oct 1, 2024
fa004c5
refactor some unidiomatic code
sterlingdeng Oct 2, 2024
53bcc35
refactor some unwraps
sterlingdeng Oct 2, 2024
7bee845
fix lint
sterlingdeng Oct 2, 2024
8bc98b1
timestamp as duration
Oct 22, 2024
bb8df6b
best effort timestamp send
Oct 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions quinn-proto/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ pub struct TransportConfig {
pub(crate) mtu_discovery_config: Option<MtuDiscoveryConfig>,
pub(crate) ack_frequency_config: Option<AckFrequencyConfig>,

pub(crate) ack_timestamp_config: Option<AckTimestampsConfig>,

pub(crate) persistent_congestion_threshold: u32,
pub(crate) keep_alive_interval: Option<Duration>,
pub(crate) crypto_buffer_size: usize,
Expand Down Expand Up @@ -223,6 +225,15 @@ impl TransportConfig {
self
}

/// Specifies the ACK timestamp config.
/// Defaults to `None`, which disables receiving acknowledgement timestamps from the sender.
/// If `Some`, TransportParameters are sent to the peer to enable acknowledgement timestamps
/// if supported.
pub fn ack_timestamp_config(&mut self, value: Option<AckTimestampsConfig>) -> &mut Self {
self.ack_timestamp_config = value;
self
}

/// Number of consecutive PTOs after which network is considered to be experiencing persistent congestion.
pub fn persistent_congestion_threshold(&mut self, value: u32) -> &mut Self {
self.persistent_congestion_threshold = value;
Expand Down Expand Up @@ -360,6 +371,8 @@ impl Default for TransportConfig {
congestion_controller_factory: Arc::new(congestion::CubicConfig::default()),

enable_segmentation_offload: true,

ack_timestamp_config: None,
}
}
}
Expand Down Expand Up @@ -390,6 +403,7 @@ impl fmt::Debug for TransportConfig {
deterministic_packet_numbers: _,
congestion_controller_factory: _,
enable_segmentation_offload,
ack_timestamp_config,
} = self;
fmt.debug_struct("TransportConfig")
.field("max_concurrent_bidi_streams", max_concurrent_bidi_streams)
Expand All @@ -416,10 +430,51 @@ impl fmt::Debug for TransportConfig {
.field("datagram_send_buffer_size", datagram_send_buffer_size)
.field("congestion_controller_factory", &"[ opaque ]")
.field("enable_segmentation_offload", enable_segmentation_offload)
.field("ack_timestamp_config", ack_timestamp_config)
.finish()
}
}

/// Parameters for controlling the peer's acknowledgements with receiver timestamps.
#[derive(Clone, Debug)]
pub struct AckTimestampsConfig {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is public, then the main config struct should have a single config setter that accepts an Option<AckTimestampsConfig>. If you want setters for individual fields directly on the main struct, then it's confusing for this to be public.

pub(crate) max_timestamps_per_ack: VarInt,
pub(crate) exponent: VarInt,
pub(crate) basis: std::time::Instant,
}

impl AckTimestampsConfig {
/// Sets the maximum number of timestamp entries per ACK frame.
pub fn max_timestamps_per_ack(&mut self, value: VarInt) -> &mut Self {
self.max_timestamps_per_ack = value;
self
}

/// Timestamp values are divided by the exponent value provided. This reduces the size of the
/// VARINT for loss in precision. A exponent of 0 represents microsecond precision.
pub fn exponent(&mut self, value: VarInt) -> &mut Self {
self.exponent = value;
self
}

/// Sets the time base for which all timestamps are anchored on.
/// Defaults to Instant::now of when the default struct was created.
pub fn basis(&mut self, instant: std::time::Instant) -> &mut Self {
sterlingdeng marked this conversation as resolved.
Show resolved Hide resolved
self.basis = instant;
self
}
}

impl Default for AckTimestampsConfig {
fn default() -> Self {
Self {
max_timestamps_per_ack: 10u32.into(),
exponent: 0u32.into(),
basis: std::time::Instant::now(),
}
}
}

/// Parameters for controlling the peer's acknowledgement frequency
///
/// The parameters provided in this config will be sent to the peer at the beginning of the
Expand Down
14 changes: 14 additions & 0 deletions quinn-proto/src/congestion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,20 @@ pub trait Controller: Send + Sync {
) {
}

#[allow(unused_variables)]
/// Packet deliveries were confirmed with timestamps information.
fn on_ack_packet(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these actually changes you'd look to make to the on_ack() interface which you're adding a separate method for to keep it semver-compatible?

If so, should this default implementation call self.on_ack()?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I did it this way to prevent any breaking changes if there were users of the on_ack method. Yea, I think it makes sense for the default implementation to all self.on_ack, and then we can remove the congestion.on_ack call from on_packet_acked and just have it call congestion.on_ack_packet.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
fn on_ack_packet(
fn on_ack_timestamped(

&mut self,
pn: u64,
now: Instant,
sent: Instant,
received: Option<Instant>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

received should use a type other than Instant to represent that it is not comparable to sent.

bytes: u64,
app_limited: bool,
rtt: &RttEstimator,
) {
}

/// Packets are acked in batches, all with the same `now` argument. This indicates one of those batches has completed.
#[allow(unused_variables)]
fn on_end_acks(
Expand Down
111 changes: 104 additions & 7 deletions quinn-proto/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ use crate::{
coding::BufMutExt,
config::{ServerConfig, TransportConfig},
crypto::{self, KeyPair, Keys, PacketKey},
frame,
frame::{Close, Datagram, FrameStruct},
frame::{self, Close, Datagram, FrameStruct},
packet::{
FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, LongType, Packet,
PacketNumber, PartialDecode, SpaceId,
Expand All @@ -37,6 +36,8 @@ use crate::{
VarInt, MAX_STREAM_COUNT, MIN_INITIAL_SIZE, TIMER_GRANULARITY,
};

use crate::config::AckTimestampsConfig;
sterlingdeng marked this conversation as resolved.
Show resolved Hide resolved

mod ack_frequency;
use ack_frequency::AckFrequencyState;

Expand Down Expand Up @@ -65,7 +66,10 @@ use paths::{PathData, PathResponses};

mod send_buffer;

mod spaces;
mod receiver_timestamps;
pub(crate) use receiver_timestamps::{PacketTimestamp, ReceiverTimestamps};

pub(crate) mod spaces;
#[cfg(fuzzing)]
pub use spaces::Retransmits;
#[cfg(not(fuzzing))]
Expand Down Expand Up @@ -227,6 +231,11 @@ pub struct Connection {
/// no outgoing application data.
app_limited: bool,

//
// Ack Receive Timestamps
//
peer_ack_timestamp_cfg: Option<AckTimestampsConfig>,
sterlingdeng marked this conversation as resolved.
Show resolved Hide resolved

streams: StreamsState,
/// Surplus remote CIDs for future use on new paths
rem_cids: CidQueue,
Expand All @@ -238,6 +247,8 @@ pub struct Connection {
stats: ConnectionStats,
/// QUIC version used for the connection.
version: u32,
/// Created at time instant.
epoch: Instant,
}

impl Connection {
Expand Down Expand Up @@ -337,6 +348,8 @@ impl Connection {
&TransportParameters::default(),
)),

peer_ack_timestamp_cfg: None,

pto_count: 0,

app_limited: false,
Expand All @@ -357,6 +370,7 @@ impl Connection {
rng,
stats: ConnectionStats::default(),
version,
epoch: now,
};
if side.is_client() {
// Kick off the connection
Expand Down Expand Up @@ -817,6 +831,7 @@ impl Connection {
&mut self.spaces[space_id],
buf,
&mut self.stats,
None,
);
}

Expand Down Expand Up @@ -1343,6 +1358,13 @@ impl Connection {
if ack.largest >= self.spaces[space].next_packet_number {
return Err(TransportError::PROTOCOL_VIOLATION("unsent packet acked"));
}

if ack.timestamps.is_some() != self.config.ack_timestamp_config.is_some() {
return Err(TransportError::PROTOCOL_VIOLATION(
"ack with timestamps expectation mismatched",
));
}
sterlingdeng marked this conversation as resolved.
Show resolved Hide resolved

let new_largest = {
let space = &mut self.spaces[space];
if space
Expand Down Expand Up @@ -1371,13 +1393,21 @@ impl Connection {
}
}

let mut timestamp_iter = self.config.ack_timestamp_config.as_ref().map(|cfg| {
let decoder = ack.timestamp_iter(cfg.basis, cfg.exponent.0).unwrap();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should at least have a comment about why this unwrap() is safe.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I fixed it so it does a if let Some check

let mut v: tinyvec::TinyVec<[PacketTimestamp; 10]> = tinyvec::TinyVec::new();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: import TinyVec directly, and avoid type annotations. The code here is looking pretty unidiomatic... Can use rev() to reverse the direction on the source iterator directly which should be faster, and collect() into the TinyVec (are you sure we need to allocate here instead of yielding the iterator directly)?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't seem possible to rev() on the source iterator because its yielding elements directly from the frame. It's similar to the AckIter implementation where each iteration yields a value as it reads from the Ack frame. The values yielded by the AckIter are added to an ArrayRangeSet which is space optimized for handling ranges, and is backed by a TinyVec.

What I'm essentially trying to do is match a packet number and it's timestamp (if available) together by "zipping" both together. When reading the values from the frame (for both packet number and timestamp), the values are produced from high packet number to low, but when we use them later on in the code, they're used from low to high, hence why I do a reverse.

One option to reduce an allocation and avoid the need of reverseing is to attempt to zip the packet number and timestamps together as the values are yielded when reading from the frame. However, the issue that I run into is that the array_range_set is space optimized so it only records the ranges. If I wanted to zip the packet number and timestamps together, the array_range_set would have to be modified to associate a packet number value to a timestamp; example tuple (packet_number, Option<timestamp>).

However, I don't think it makes sense to modify the existing array_range_set for this draft, so I opted to handle it this way, which is to read all the timestamps value into a Vec, and then handle the timestamp and packet_number matching later in the code.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay nvm, I think I found a better solution - going to test it and report back..

decoder.for_each(|elt| v.push(elt));
v.reverse();
v.into_iter().peekable()
});

if newly_acked.is_empty() {
return Ok(());
}

let mut ack_eliciting_acked = false;
for packet in newly_acked.elts() {
if let Some(info) = self.spaces[space].take(packet) {
if let Some(mut info) = self.spaces[space].take(packet) {
if let Some(acked) = info.largest_acked {
// Assume ACKs for all packets below the largest acknowledged in `packet` have
// been received. This can cause the peer to spuriously retransmit if some of
Expand All @@ -1399,6 +1429,25 @@ impl Connection {
// Notify ack frequency that a packet was acked, because it might contain an ACK_FREQUENCY frame
self.ack_frequency.on_acked(packet);

if let Some(timestamp_iter) = timestamp_iter.as_mut() {
while let Some(v) = timestamp_iter.peek() {
sterlingdeng marked this conversation as resolved.
Show resolved Hide resolved
match v.packet_number.cmp(&packet) {
cmp::Ordering::Less => {
let _ = timestamp_iter.next();
}
cmp::Ordering::Equal => {
// Unwrap safety is guaranteed because a value was validated
// to exist using peek
let ts = timestamp_iter.next().unwrap();
info.time_received = Some(ts.timestamp);
}
cmp::Ordering::Greater => {
break;
}
}
}
}

self.on_packet_acked(now, packet, info);
}
}
Expand Down Expand Up @@ -1497,6 +1546,16 @@ impl Connection {
self.app_limited,
&self.path.rtt,
);

self.path.congestion.on_ack_packet(
pn,
now,
info.time_sent,
info.time_received,
info.size.into(),
self.app_limited,
&self.path.rtt,
);
}

// Update state for confirmed delivery of frames
Expand Down Expand Up @@ -2184,8 +2243,10 @@ impl Connection {
}
Ok((packet, number)) => {
let span = match number {
Some(pn) => trace_span!("recv", space = ?packet.header.space(), pn),
None => trace_span!("recv", space = ?packet.header.space()),
Some(pn) => {
trace_span!("recv", space = ?packet.header.space(), pn, side=?self.side)
}
None => trace_span!("recv", space = ?packet.header.space(), side=?self.side),
};
let _guard = span.enter();

Expand Down Expand Up @@ -3047,6 +3108,7 @@ impl Connection {
space,
buf,
&mut self.stats,
self.peer_ack_timestamp_cfg.clone(),
);
}

Expand Down Expand Up @@ -3231,6 +3293,7 @@ impl Connection {
space: &mut PacketSpace,
buf: &mut Vec<u8>,
stats: &mut ConnectionStats,
timestamp_config: Option<AckTimestampsConfig>,
) {
debug_assert!(!space.pending_acks.ranges().is_empty());

Expand All @@ -3255,7 +3318,21 @@ impl Connection {
delay_micros
);

frame::Ack::encode(delay as _, space.pending_acks.ranges(), ecn, buf);
frame::Ack::encode(
delay as _,
space.pending_acks.ranges(),
ecn,
timestamp_config.map(|cfg| {
(
space.pending_acks.receiver_timestamps_as_ref().unwrap(),
cfg.basis,
cfg.exponent.0,
cfg.max_timestamps_per_ack.0,
)
}),
buf,
);

stats.frame_tx.acks += 1;
}

Expand Down Expand Up @@ -3309,6 +3386,26 @@ impl Connection {
self.path.mtud.on_peer_max_udp_payload_size_received(
u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX),
);

{
self.peer_ack_timestamp_cfg = if let (Some(max_timestamps_per_ack), Some(exponent)) = (
params.max_recv_timestamps_per_ack,
params.receive_timestamps_exponent,
) {
for space in self.spaces.iter_mut() {
space
.pending_acks
.set_receiver_timestamp(max_timestamps_per_ack.0 as usize);
}
Some(AckTimestampsConfig {
exponent,
max_timestamps_per_ack,
basis: self.epoch,
})
} else {
None
};
}
sterlingdeng marked this conversation as resolved.
Show resolved Hide resolved
}

fn decrypt_packet(
Expand Down
2 changes: 2 additions & 0 deletions quinn-proto/src/connection/packet_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,8 @@ impl PacketBuilder {
ack_eliciting,
retransmits: sent.retransmits,
stream_frames: sent.stream_frames,

time_received: None,
};

conn.path
Expand Down
Loading