Skip to content

Commit

Permalink
Acceptor: do not appear empty while adding
Browse files Browse the repository at this point in the history
Make sure that Acceptor::is_empty() returns false directly after
Acceptor::add(...).
  • Loading branch information
surban committed Jul 5, 2023
1 parent 79a899a commit b5d123e
Showing 1 changed file with 15 additions and 3 deletions.
18 changes: 15 additions & 3 deletions aggligator-util/src/transport/acceptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::{
time::Duration,
};
use tokio::{
sync::{broadcast, mpsc, oneshot, watch, Mutex, RwLock},
sync::{broadcast, mpsc, oneshot, watch, Mutex, OwnedSemaphorePermit, RwLock, Semaphore},
time::{sleep_until, Instant},
};

Expand Down Expand Up @@ -74,6 +74,7 @@ struct AcceptingTransportPack {
transport: ArcAcceptingTransport,
result_tx: oneshot::Sender<Result<()>>,
remove_rx: oneshot::Receiver<()>,
_permit: OwnedSemaphorePermit,
}

/// Builds a customized [`Acceptor`].
Expand Down Expand Up @@ -132,6 +133,7 @@ impl AcceptorBuilder {
task_cfg,
transport_tx,
transports_present_rx,
transports_being_added: Arc::new(Semaphore::new(Semaphore::MAX_PERMITS)),
error_rx,
active_transports,
no_transport_timeout,
Expand All @@ -148,6 +150,7 @@ pub struct Acceptor {
task_cfg: TaskCfgFn,
transport_tx: mpsc::UnboundedSender<AcceptingTransportPack>,
transports_present_rx: watch::Receiver<bool>,
transports_being_added: Arc<Semaphore>,
active_transports: Arc<RwLock<Vec<Weak<dyn AcceptingTransport>>>>,
error_rx: broadcast::Receiver<BoxLinkError>,
no_transport_timeout: Duration,
Expand Down Expand Up @@ -187,7 +190,12 @@ impl Acceptor {
let (result_tx, result_rx) = oneshot::channel();
let (remove_tx, remove_rx) = oneshot::channel();

let pack = AcceptingTransportPack { transport: Arc::new(transport), result_tx, remove_rx };
let pack = AcceptingTransportPack {
transport: Arc::new(transport),
result_tx,
remove_rx,
_permit: self.transports_being_added.clone().try_acquire_owned().unwrap(),
};
let _ = self.transport_tx.send(pack);

AcceptingTransportHandle { name, result_rx, remove_tx }
Expand All @@ -196,6 +204,7 @@ impl Acceptor {
/// Returns whether no transports are present.
pub fn is_empty(&self) -> bool {
!*self.transports_present_rx.borrow()
&& self.transports_being_added.available_permits() == Semaphore::MAX_PERMITS
}

/// Waits for an incoming connection and accepts it.
Expand Down Expand Up @@ -311,6 +320,9 @@ impl Acceptor {
active_transports.retain(|at| at.strong_count() > 0);
active_transports.push(Arc::downgrade(&transport_pack.transport));

// Notify of transport availability.
transports_present_tx.send_replace(true);

// Start transport task.
transport_tasks.push(Self::transport_task(
server.clone(),
Expand All @@ -330,7 +342,7 @@ impl Acceptor {
server: BoxServer, transport: AcceptingTransportPack, link_error_tx: broadcast::Sender<BoxLinkError>,
wrappers: Arc<Vec<BoxAcceptingWrapper>>,
) {
let AcceptingTransportPack { transport, result_tx, mut remove_rx } = transport;
let AcceptingTransportPack { transport, result_tx, mut remove_rx, _permit: _ } = transport;

let (tx, mut rx) = mpsc::channel(128);
let mut listener = transport.listen(tx);
Expand Down

0 comments on commit b5d123e

Please sign in to comment.