Skip to content

Commit

Permalink
Replace Broadcast helper with tokio::sync::Notify
Browse files Browse the repository at this point in the history
Drops a chunk of complex and under-audited synchronization code in
favor of a similar, better-tested and more efficient primitive from
tokio.
  • Loading branch information
Ralith committed Dec 22, 2021
1 parent fa69cb2 commit f88fdd6
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 125 deletions.
62 changes: 0 additions & 62 deletions quinn/src/broadcast.rs

This file was deleted.

75 changes: 31 additions & 44 deletions quinn/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ use futures_core::Stream;
use proto::{ConnectionError, ConnectionHandle, ConnectionStats, Dir, StreamEvent, StreamId};
use rustc_hash::FxHashMap;
use thiserror::Error;
use tokio::sync::Notify;
use tokio::time::{sleep_until, Instant as TokioInstant, Sleep};
use tracing::info_span;
use udp::UdpState;

use crate::{
broadcast::{self, Broadcast},
mutex::Mutex,
poll_fn,
recv_stream::RecvStream,
Expand Down Expand Up @@ -325,21 +325,8 @@ impl Connection {
/// consequence, the peer won't be notified that a stream has been opened until the stream is
/// actually used.
pub async fn open_uni(&self) -> Result<SendStream, ConnectionError> {
let mut state = broadcast::State::default();
poll_fn(move |cx| {
let mut conn = self.0.lock("OpenUni::next");
if let Some(ref e) = conn.error {
return Poll::Ready(Err(e.clone()));
}
if let Some(id) = conn.inner.streams().open(Dir::Uni) {
let is_0rtt = conn.inner.side().is_client() && conn.inner.is_handshaking();
drop(conn); // Release lock for clone
return Poll::Ready(Ok(SendStream::new(self.0.clone(), id, is_0rtt)));
}
conn.uni_opening.register(cx, &mut state);
Poll::Pending
})
.await
let (id, is_0rtt) = self.open(Dir::Uni).await?;
Ok(SendStream::new(self.0.clone(), id, is_0rtt))
}

/// Initiate a new outgoing bidirectional stream.
Expand All @@ -348,24 +335,30 @@ impl Connection {
/// consequence, the peer won't be notified that a stream has been opened until the stream is
/// actually used.
pub async fn open_bi(&self) -> Result<(SendStream, RecvStream), ConnectionError> {
let mut state = broadcast::State::default();
poll_fn(move |cx| {
let mut conn = self.0.lock("OpenBi::next");
if let Some(ref e) = conn.error {
return Poll::Ready(Err(e.clone()));
}
if let Some(id) = conn.inner.streams().open(Dir::Bi) {
let is_0rtt = conn.inner.side().is_client() && conn.inner.is_handshaking();
drop(conn); // Release lock for clone
return Poll::Ready(Ok((
SendStream::new(self.0.clone(), id, is_0rtt),
RecvStream::new(self.0.clone(), id, is_0rtt),
)));
let (id, is_0rtt) = self.open(Dir::Bi).await?;
Ok((
SendStream::new(self.0.clone(), id, is_0rtt),
RecvStream::new(self.0.clone(), id, is_0rtt),
))
}

async fn open(&self, dir: Dir) -> Result<(StreamId, bool), ConnectionError> {
loop {
let opening;
{
let mut conn = self.0.lock("open");
if let Some(ref e) = conn.error {
return Err(e.clone());
}
if let Some(id) = conn.inner.streams().open(dir) {
let is_0rtt = conn.inner.side().is_client() && conn.inner.is_handshaking();
return Ok((id, is_0rtt));
}
opening = conn.stream_opening[dir as usize].clone();
opening.notified() // Must be called, but not awaited, while the connection lock is held
}
conn.bi_opening.register(cx, &mut state);
Poll::Pending
})
.await
.await
}
}

/// Close the connection immediately.
Expand Down Expand Up @@ -671,8 +664,7 @@ impl ConnectionRef {
endpoint_events,
blocked_writers: FxHashMap::default(),
blocked_readers: FxHashMap::default(),
uni_opening: Broadcast::new(),
bi_opening: Broadcast::new(),
stream_opening: [Arc::new(Notify::new()), Arc::new(Notify::new())],
incoming_uni_streams_reader: None,
incoming_bi_streams_reader: None,
datagram_reader: None,
Expand Down Expand Up @@ -732,8 +724,7 @@ pub struct ConnectionInner {
endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
pub(crate) blocked_writers: FxHashMap<StreamId, Waker>,
pub(crate) blocked_readers: FxHashMap<StreamId, Waker>,
uni_opening: Broadcast,
bi_opening: Broadcast,
stream_opening: [Arc<Notify>; 2],
incoming_uni_streams_reader: Option<Waker>,
incoming_bi_streams_reader: Option<Waker>,
datagram_reader: Option<Waker>,
Expand Down Expand Up @@ -856,11 +847,7 @@ impl ConnectionInner {
}
}
Stream(StreamEvent::Available { dir }) => {
let tasks = match dir {
Dir::Uni => &mut self.uni_opening,
Dir::Bi => &mut self.bi_opening,
};
tasks.wake();
self.stream_opening[dir as usize].notify_one();
}
Stream(StreamEvent::Finished { id }) => {
if let Some(finishing) = self.finishing.remove(&id) {
Expand Down Expand Up @@ -952,8 +939,8 @@ impl ConnectionInner {
for (_, reader) in self.blocked_readers.drain() {
reader.wake()
}
self.uni_opening.wake();
self.bi_opening.wake();
self.stream_opening[Dir::Uni as usize].notify_waiters();
self.stream_opening[Dir::Bi as usize].notify_waiters();
if let Some(x) = self.incoming_uni_streams_reader.take() {
x.wake();
}
Expand Down
36 changes: 18 additions & 18 deletions quinn/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,12 @@ use proto::{
self as proto, ClientConfig, ConnectError, ConnectionHandle, DatagramEvent, ServerConfig,
};
use rustc_hash::FxHashMap;
use tokio::sync::Notify;
use udp::{RecvMeta, UdpSocket, UdpState, BATCH_SIZE};

use crate::{
broadcast::{self, Broadcast},
connection::Connecting,
poll_fn,
work_limiter::WorkLimiter,
ConnectionEvent, EndpointConfig, EndpointEvent, VarInt, IO_LOOP_BOUND, RECV_TIME_BOUND,
SEND_TIME_BOUND,
connection::Connecting, poll_fn, work_limiter::WorkLimiter, ConnectionEvent, EndpointConfig,
EndpointEvent, VarInt, IO_LOOP_BOUND, RECV_TIME_BOUND, SEND_TIME_BOUND,
};

/// A QUIC endpoint.
Expand Down Expand Up @@ -226,16 +223,19 @@ impl Endpoint {
/// [`close()`]: Endpoint::close
/// [`Incoming`]: crate::Incoming
pub async fn wait_idle(&self) {
let mut state = broadcast::State::default();
poll_fn(move |cx| {
let endpoint = &mut *self.inner.lock().unwrap();
if endpoint.connections.is_empty() {
return Poll::Ready(());
loop {
let idle;
{
let endpoint = &mut *self.inner.lock().unwrap();
if endpoint.connections.is_empty() {
break;
}
idle = endpoint.idle.clone();
// Must be called, but not awaited, while the endpoint lock is held
idle.notified()
}
endpoint.idle.register(cx, &mut state);
Poll::Pending
})
.await;
.await;
}
}
}

Expand Down Expand Up @@ -321,7 +321,7 @@ pub(crate) struct EndpointInner {
recv_limiter: WorkLimiter,
recv_buf: Box<[u8]>,
send_limiter: WorkLimiter,
idle: Broadcast,
idle: Arc<Notify>,
}

impl EndpointInner {
Expand Down Expand Up @@ -442,7 +442,7 @@ impl EndpointInner {
if e.is_drained() {
self.connections.senders.remove(&ch);
if self.connections.is_empty() {
self.idle.wake();
self.idle.notify_waiters();
}
}
if let Some(event) = self.inner.handle_event(ch, e) {
Expand Down Expand Up @@ -581,7 +581,7 @@ impl EndpointRef {
recv_buf: recv_buf.into(),
recv_limiter: WorkLimiter::new(RECV_TIME_BOUND),
send_limiter: WorkLimiter::new(SEND_TIME_BOUND),
idle: Broadcast::new(),
idle: Arc::new(Notify::new()),
})))
}
}
Expand Down
1 change: 0 additions & 1 deletion quinn/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ macro_rules! ready {
};
}

mod broadcast;
mod connection;
mod endpoint;
mod mutex;
Expand Down

0 comments on commit f88fdd6

Please sign in to comment.