Skip to content

Commit 2ebc20b

Browse files
Add extra test with sneaky subscribe()
We have to re-open the channel if a new receiver has subscribed after closing the channel.
1 parent 12d46d0 commit 2ebc20b

File tree

2 files changed

+39
-0
lines changed

2 files changed

+39
-0
lines changed

tokio/src/sync/broadcast.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -875,6 +875,11 @@ fn new_receiver<T>(shared: Arc<Shared<T>>) -> Receiver<T> {
875875
assert!(tail.rx_cnt != MAX_RECEIVERS, "max receivers");
876876

877877
tail.rx_cnt = tail.rx_cnt.checked_add(1).expect("overflow");
878+
if tail.closed {
879+
// Potentially need to re-open the channel, if a new receiver has been added between calls
880+
// to poll()
881+
tail.closed = false;
882+
}
878883

879884
let next = tail.pos;
880885

tokio/tests/sync_broadcast.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -657,3 +657,37 @@ fn broadcast_sender_closed() {
657657
assert!(task.is_woken());
658658
assert_ready!(task.poll());
659659
}
660+
661+
#[test]
662+
fn broadcast_sender_closed_with_extra_subscribe() {
663+
let (tx, rx) = broadcast::channel::<()>(1);
664+
let rx2 = tx.subscribe();
665+
666+
let mut task = task::spawn(tx.closed());
667+
assert_pending!(task.poll());
668+
669+
drop(rx);
670+
assert!(!task.is_woken());
671+
assert_pending!(task.poll());
672+
673+
drop(rx2);
674+
assert!(task.is_woken());
675+
676+
let rx3 = tx.subscribe();
677+
assert_pending!(task.poll());
678+
679+
drop(rx3);
680+
assert!(task.is_woken());
681+
assert_ready!(task.poll());
682+
683+
let mut task2 = task::spawn(tx.closed());
684+
assert_ready!(task2.poll());
685+
686+
let rx4 = tx.subscribe();
687+
let mut task3 = task::spawn(tx.closed());
688+
assert_pending!(task3.poll());
689+
690+
drop(rx4);
691+
assert!(task3.is_woken());
692+
assert_ready!(task3.poll());
693+
}

0 commit comments

Comments
 (0)