Skip to content

Commit

Permalink
tokio: add builder function for watch::Sender
Browse files Browse the repository at this point in the history
Added a method that allows to build a Sender without specifying any Receiver.
  • Loading branch information
nicflower committed Sep 10, 2023
1 parent 95fb599 commit 94b5392
Showing 1 changed file with 28 additions and 24 deletions.
52 changes: 28 additions & 24 deletions tokio/src/sync/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,13 +377,6 @@ mod state {
#[derive(Debug)]
pub(super) struct AtomicState(AtomicUsize);

impl Version {
/// Get the initial version when creating the channel.
pub(super) fn initial() -> Self {
Version(0)
}
}

impl StateSnapshot {
/// Extract the version from the state.
pub(super) fn version(self) -> Version {
Expand Down Expand Up @@ -458,23 +451,8 @@ mod state {
/// [`Sender`]: struct@Sender
/// [`Receiver`]: struct@Receiver
pub fn channel<T>(init: T) -> (Sender<T>, Receiver<T>) {
let shared = Arc::new(Shared {
value: RwLock::new(init),
state: AtomicState::new(),
ref_count_rx: AtomicUsize::new(1),
notify_rx: big_notify::BigNotify::new(),
notify_tx: Notify::new(),
});

let tx = Sender {
shared: shared.clone(),
};

let rx = Receiver {
shared,
version: Version::initial(),
};

let tx = Sender::new(init);
let rx = tx.subscribe();
(tx, rx)
}

Expand Down Expand Up @@ -854,6 +832,32 @@ impl<T> Drop for Receiver<T> {
}

impl<T> Sender<T> {
/// Creates the sending-half of the [`watch`] channel.
///
/// See documentation of [`watch::channel`] for errors when calling this function.
/// Beware that attempting to send a value when no one subscribed to the channel will
/// return an error.
///
/// [`watch`]: crate::sync::watch
/// [`watch::channel`]: crate::sync::watch
///
/// # Examples
/// ```
/// let sender = tokio::sync::watch::Sender::new(0u8);
/// assert!(sender.send(3).is_err())
/// let _rec = sender.subscribe()
/// assert!(sender.send(4).is_ok())
/// ```
pub fn new(init: T) -> Self {
let shared = Arc::new(Shared {
value: RwLock::new(init),
state: AtomicState::new(),
ref_count_rx: AtomicUsize::new(0),
notify_rx: big_notify::BigNotify::new(),
notify_tx: Notify::new(),
});
Sender { shared }
}
/// Sends a new value via the channel, notifying all receivers.
///
/// This method fails if the channel is closed, which is the case when
Expand Down

0 comments on commit 94b5392

Please sign in to comment.