Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync: add Sender<T>::closed future #6685

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

evanrittenhouse
Copy link
Contributor

@evanrittenhouse evanrittenhouse commented Jul 14, 2024

Motivation

Adds a closed Future to broadcast::Sender<T>, similar to the oneshot or mpsc variants, which completes when all subscribed receivers have been dropped.

Solution

This is a simple poll_fn which wraps a check around shared.tail.rx_cnt, returning Ready if the remaining count is 0.

Closes: #6649

@github-actions github-actions bot added the R-loom-sync Run loom sync tests on this PR label Jul 14, 2024
Comment on lines 835 to 862
pub async fn closed(&self) {
std::future::poll_fn(|_| {
let tail = self.shared.tail.lock();

if tail.closed || tail.rx_cnt == 0 {
Poll::Ready(())
} else {
Poll::Pending
}
})
.await;
}
Copy link
Contributor

@tglane tglane Jul 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be wrong here butI think this future will only get polled once because you are not waking up the task again.

Consider this example:

#[tokio::main]
async fn main() {
    let (tx, mut rx1) = tokio::sync::broadcast::channel(16);

    tokio::spawn(async move {
        let r = rx1.recv().await.unwrap();
        println!("Rx1 received: {}", r);
        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
    });

    tx.send(20).unwrap();

    println!("Waiting for subscribers to drop");
    tx.closed().await;
    println!("Subscribers dropped");
}

Here the call to closed will block because it will only be polled once right when you call it. But the subscriber is not dropped at that point, so the poll_fn will return Pending. But because the future is never be polled again, this will not resolve after that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, this is not correct. When returning Pending, you must register for wakeups so that you are polled again when progress can be made.

Compare with the mpsc channel:

/// Notifies all tasks listening for the receiver being dropped.
notify_rx_closed: Notify,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, and I guess poll_fn is flagged as unstable in CI anyway. I'll find another way around this - thanks both.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tokio has a poll_fn implementation you can use if you find that you need it.

@Darksonn Darksonn added A-tokio Area: The main tokio crate M-sync Module: tokio/sync labels Jul 14, 2024
@Darksonn
Copy link
Contributor

What's the status on this?

@evanrittenhouse
Copy link
Contributor Author

Work has been nuts - haven't had much time for anything else. This is on my backlog though

@Darksonn
Copy link
Contributor

No rush from my side. Just wanted to check in.

@evanrittenhouse
Copy link
Contributor Author

evanrittenhouse commented Aug 19, 2024

Pushed up what should be closer to the proper implementation. I'd like to add a task.is_woken() check after each rx is dropped in the integration test, but for some reason, dropping rx2 doesn't wake the task (though it does make it Ready).

I'll dig more into that when I have a bit more time, as well as fix the various CI issues and clean up the code (for example the tail lock will drop when it goes out of scope).

tokio/src/sync/broadcast.rs Show resolved Hide resolved
Comment on lines 849 to 861
self.shared.notify_rx_drop.notified().await;

poll_fn(|_| {
let tail = self.shared.tail.lock();

if tail.closed || tail.rx_cnt == 0 {
return Poll::Ready(());
}

drop(tail);
return Poll::Pending;
})
.await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The poll fn needs to interact with the notified() future for this to work. Check out this for inspiration:

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
let mut this = self.project();
loop {
if this.cancellation_token.is_cancelled() {
return Poll::Ready(());
}
// No wakeups can be lost here because there is always a call to
// `is_cancelled` between the creation of the future and the call to
// `poll`, and the code that sets the cancelled flag does so before
// waking the `Notified`.
if this.future.as_mut().poll(cx).is_pending() {
return Poll::Pending;
}
this.future.set(this.cancellation_token.inner.notified());
}

You should be able to do something similar without creating a whole future struct if you use tokio::pin! on the notified() future.

Copy link
Contributor Author

@evanrittenhouse evanrittenhouse Aug 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we now notify only once, when the last receiver drops, can we just await the Notified directly? I originally had this block because I notified on every Rx drop, so had to include an additional check to ensure that we had just dropped the last Rx.

I'm also trying to get a better understanding of the internals here. For my own understanding - this doesn't work because, after the first call, the internal Future state machine is progressed to the point where we're already in the poll_fn and have a Pending from a non-terminal Rx dropping. That means that we'd never await the notified() again. Is that correct?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, Notified doesn't have spurious wakeups in the scenario we are using it, so it should work. Well, except for the fact that we probably want it to return immediately if it's already closed when we call it. We can do it like this:

loop {
    let notified = self.shared.notify_rx_drop.notified();
    if self.is_closed() { return; }
    notified.await;
}

Copy link
Contributor Author

@evanrittenhouse evanrittenhouse Sep 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, done. I used tail.closed instead of self.is_closed() as I couldn't find an is_closed on Sender, which means an extra lock.

For my own curiosity, what's the advantage of the loop above vs. a simple self.shared.notify_rx_drop.notified().await, if Notified doesn't suffer from spurious wakeups? Now that we only notify once when the last receiver drops, it may be a nice way to avoid that extra lock I mentioned.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you don't use a loop, then it changes the behavior of the test I requested in #6685 (comment). Which behavior we want is up for discussion.

tokio/src/sync/broadcast.rs Outdated Show resolved Hide resolved
tokio/src/sync/broadcast.rs Outdated Show resolved Hide resolved
@evanrittenhouse evanrittenhouse marked this pull request as draft August 19, 2024 11:31
@evanrittenhouse evanrittenhouse force-pushed the evanrittenhouse/6649 branch 3 times, most recently from eb0d891 to 43be386 Compare September 22, 2024 15:16
@evanrittenhouse evanrittenhouse marked this pull request as ready for review September 22, 2024 15:16
Comment on lines +652 to +674
drop(rx);
assert!(!task.is_woken());
assert_pending!(task.poll());

drop(rx2);
assert!(task.is_woken());
assert_ready!(task.poll());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add another test that calls subscribe between dropping the last receiver and polling closed(), to see what happens in that case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, will do - sorry for the delay!

Copy link
Contributor Author

@evanrittenhouse evanrittenhouse Nov 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done - I simply re-open the channel if the rx_cnt is 0 when calling new_receiver. This allows us to re-create the closed() future, which I try to demonstrate in the new broadcast_sender_closed_with_extra_subscribe test.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, it seems that this change made another test failing.

Now even after dropping all senders, the channel doesn't seem to be marked as closed when resubscribe is called afterwards.

We have to re-open the channel if a new receiver has subscribed
after closing the channel.
Comment on lines +879 to +883
if tail.closed {
// Potentially need to re-open the channel, if a new receiver has been added between calls
// to poll()
tail.closed = false;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This causes the test tests/sync_broadcast::resubscribe_to_closed_channel to fail because it also reopens the channel when the sender got dropped before.
Maybe instead of checking for tail.closed to reopen the channel we could check for tail.rx_cnt == 0 before the call to tail.rx_cnt.checked_add(1)? In that case the channel would only be reopened if it got closed due to no more receivers being "alive".
I only ran the sync tests locally real quick and got no failures, so maybe its something to work with?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the pointers! I've been meaning to get back to this and study the code a bit more, but have had some stuff crop up in my personal life so haven't had time, sorry if I'm causing unnecessary work for anyone

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No worries, there isn't a rush from our side. Things take the time they take.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio Area: The main tokio crate M-sync Module: tokio/sync R-loom-sync Run loom sync tests on this PR
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add broadcast::Sender::closed Future
4 participants