diff --git a/aggligator-util/src/transport/acceptor.rs b/aggligator-util/src/transport/acceptor.rs index 4565851..1b152a8 100644 --- a/aggligator-util/src/transport/acceptor.rs +++ b/aggligator-util/src/transport/acceptor.rs @@ -10,7 +10,7 @@ use std::{ time::Duration, }; use tokio::{ - sync::{broadcast, mpsc, oneshot, watch, Mutex, RwLock}, + sync::{broadcast, mpsc, oneshot, watch, Mutex, OwnedSemaphorePermit, RwLock, Semaphore}, time::{sleep_until, Instant}, }; @@ -74,6 +74,7 @@ struct AcceptingTransportPack { transport: ArcAcceptingTransport, result_tx: oneshot::Sender>, remove_rx: oneshot::Receiver<()>, + _permit: OwnedSemaphorePermit, } /// Builds a customized [`Acceptor`]. @@ -132,6 +133,7 @@ impl AcceptorBuilder { task_cfg, transport_tx, transports_present_rx, + transports_being_added: Arc::new(Semaphore::new(Semaphore::MAX_PERMITS)), error_rx, active_transports, no_transport_timeout, @@ -148,6 +150,7 @@ pub struct Acceptor { task_cfg: TaskCfgFn, transport_tx: mpsc::UnboundedSender, transports_present_rx: watch::Receiver, + transports_being_added: Arc, active_transports: Arc>>>, error_rx: broadcast::Receiver, no_transport_timeout: Duration, @@ -187,7 +190,12 @@ impl Acceptor { let (result_tx, result_rx) = oneshot::channel(); let (remove_tx, remove_rx) = oneshot::channel(); - let pack = AcceptingTransportPack { transport: Arc::new(transport), result_tx, remove_rx }; + let pack = AcceptingTransportPack { + transport: Arc::new(transport), + result_tx, + remove_rx, + _permit: self.transports_being_added.clone().try_acquire_owned().unwrap(), + }; let _ = self.transport_tx.send(pack); AcceptingTransportHandle { name, result_rx, remove_tx } @@ -196,6 +204,7 @@ impl Acceptor { /// Returns whether no transports are present. pub fn is_empty(&self) -> bool { !*self.transports_present_rx.borrow() + && self.transports_being_added.available_permits() == Semaphore::MAX_PERMITS } /// Waits for an incoming connection and accepts it. @@ -311,6 +320,9 @@ impl Acceptor { active_transports.retain(|at| at.strong_count() > 0); active_transports.push(Arc::downgrade(&transport_pack.transport)); + // Notify of transport availability. + transports_present_tx.send_replace(true); + // Start transport task. transport_tasks.push(Self::transport_task( server.clone(), @@ -330,7 +342,7 @@ impl Acceptor { server: BoxServer, transport: AcceptingTransportPack, link_error_tx: broadcast::Sender, wrappers: Arc>, ) { - let AcceptingTransportPack { transport, result_tx, mut remove_rx } = transport; + let AcceptingTransportPack { transport, result_tx, mut remove_rx, _permit: _ } = transport; let (tx, mut rx) = mpsc::channel(128); let mut listener = transport.listen(tx);