Skip to content

Commit

Permalink
mpsc: test PermitIterator and reserve_many
Browse files Browse the repository at this point in the history
  • Loading branch information
Totodore committed Dec 17, 2023
1 parent d9bf134 commit 2ee4f76
Showing 1 changed file with 48 additions and 8 deletions.
56 changes: 48 additions & 8 deletions tokio/tests/sync_mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -550,31 +550,51 @@ async fn try_reserve_many_fails() {
for i in 4..20 {
let (tx, mut rx) = mpsc::channel(i);

let mut permit = assert_ok!(tx.try_reserve_many(i - 2));
let mut permit1 = assert_ok!(tx.try_reserve_many(i - 2));

// This should fail, there is only two remaining permits
match assert_err!(tx.try_reserve_many(3)) {
TrySendError::Full(()) => {}
_ => panic!(),
}

permit.next().unwrap().send("foo");
permit.next().unwrap().send("foo");
permit1.next().unwrap().send("foo");
permit1.next().unwrap().send("foo");

assert_eq!(rx.recv().await, Some("foo"));
assert_eq!(rx.recv().await, Some("foo"));

// There are now 4 remaining permits because of the 2 sends/recv
let _permit = assert_ok!(tx.try_reserve_many(4));
let permit2 = assert_ok!(tx.try_reserve_many(4));

// Dropping permit iterator releases the remaining slots.
drop(permit);
drop(_permit);

let _permit = assert_ok!(tx.try_reserve_many(i));
drop(permit1);
drop(permit2);

// It is possible to reserve all the permits
assert_ok!(tx.try_reserve_many(i));

// This should fail because the channel is closed
drop(rx);
match assert_err!(tx.try_reserve_many(3)) {
TrySendError::Closed(()) => {}
_ => panic!(),
};
}
}

#[maybe_tokio_test]
async fn reserve_many_and_send() {
let (tx, mut rx) = mpsc::channel(100);
for i in 1..100 {
for permit in assert_ok!(tx.reserve_many(i).await) {
permit.send("foo");
assert_eq!(rx.recv().await, Some("foo"));
}
assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
}
}

#[tokio::test]
#[cfg(feature = "full")]
async fn drop_permit_releases_permit() {
Expand All @@ -594,6 +614,25 @@ async fn drop_permit_releases_permit() {
assert_ready_ok!(reserve2.poll());
}

#[tokio::test]
#[cfg(feature = "full")]
async fn drop_permit_iterator_releases_permits() {
// poll_ready reserves capacity, ensure that the capacity is released if tx
// is dropped w/o sending a value.
let (tx1, _rx) = mpsc::channel::<i32>(10);
let tx2 = tx1.clone();

let permits = assert_ok!(tx1.reserve_many(10).await);

let mut reserve2 = tokio_test::task::spawn(tx2.reserve_many(10));
assert_pending!(reserve2.poll());

drop(permits);

assert!(reserve2.is_woken());
assert_ready_ok!(reserve2.poll());
}

#[maybe_tokio_test]
async fn dropping_rx_closes_channel() {
let (tx, rx) = mpsc::channel(100);
Expand All @@ -603,6 +642,7 @@ async fn dropping_rx_closes_channel() {

drop(rx);
assert_err!(tx.reserve().await);
assert_err!(tx.reserve_many(10).await);
assert_eq!(1, Arc::strong_count(&msg));
}

Expand Down

0 comments on commit 2ee4f76

Please sign in to comment.