diff --git a/tokio/tests/sync_mpsc.rs b/tokio/tests/sync_mpsc.rs index f454c1f18de..12e0e9d9adb 100644 --- a/tokio/tests/sync_mpsc.rs +++ b/tokio/tests/sync_mpsc.rs @@ -550,7 +550,7 @@ async fn try_reserve_many_fails() { for i in 4..20 { let (tx, mut rx) = mpsc::channel(i); - let mut permit = assert_ok!(tx.try_reserve_many(i - 2)); + let mut permit1 = assert_ok!(tx.try_reserve_many(i - 2)); // This should fail, there is only two remaining permits match assert_err!(tx.try_reserve_many(3)) { @@ -558,23 +558,43 @@ async fn try_reserve_many_fails() { _ => panic!(), } - permit.next().unwrap().send("foo"); - permit.next().unwrap().send("foo"); + permit1.next().unwrap().send("foo"); + permit1.next().unwrap().send("foo"); assert_eq!(rx.recv().await, Some("foo")); assert_eq!(rx.recv().await, Some("foo")); // There are now 4 remaining permits because of the 2 sends/recv - let _permit = assert_ok!(tx.try_reserve_many(4)); + let permit2 = assert_ok!(tx.try_reserve_many(4)); // Dropping permit iterator releases the remaining slots. - drop(permit); - drop(_permit); - - let _permit = assert_ok!(tx.try_reserve_many(i)); + drop(permit1); + drop(permit2); + + // It is possible to reserve all the permits + assert_ok!(tx.try_reserve_many(i)); + + // This should fail because the channel is closed + drop(rx); + match assert_err!(tx.try_reserve_many(3)) { + TrySendError::Closed(()) => {} + _ => panic!(), + }; } } +#[maybe_tokio_test] +async fn reserve_many_and_send() { + let (tx, mut rx) = mpsc::channel(100); + for i in 1..100 { + for permit in assert_ok!(tx.reserve_many(i).await) { + permit.send("foo"); + assert_eq!(rx.recv().await, Some("foo")); + } + assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); + } +} + #[tokio::test] #[cfg(feature = "full")] async fn drop_permit_releases_permit() { @@ -594,6 +614,25 @@ async fn drop_permit_releases_permit() { assert_ready_ok!(reserve2.poll()); } +#[tokio::test] +#[cfg(feature = "full")] +async fn drop_permit_iterator_releases_permits() { + // poll_ready reserves capacity, ensure that the capacity is released if tx + // is dropped w/o sending a value. + let (tx1, _rx) = mpsc::channel::(10); + let tx2 = tx1.clone(); + + let permits = assert_ok!(tx1.reserve_many(10).await); + + let mut reserve2 = tokio_test::task::spawn(tx2.reserve_many(10)); + assert_pending!(reserve2.poll()); + + drop(permits); + + assert!(reserve2.is_woken()); + assert_ready_ok!(reserve2.poll()); +} + #[maybe_tokio_test] async fn dropping_rx_closes_channel() { let (tx, rx) = mpsc::channel(100); @@ -603,6 +642,7 @@ async fn dropping_rx_closes_channel() { drop(rx); assert_err!(tx.reserve().await); + assert_err!(tx.reserve_many(10).await); assert_eq!(1, Arc::strong_count(&msg)); }