diff --git a/tokio-test/Cargo.toml b/tokio-test/Cargo.toml index 536c5a848e8..b84f28c82dd 100644 --- a/tokio-test/Cargo.toml +++ b/tokio-test/Cargo.toml @@ -27,6 +27,7 @@ futures-core = "0.3.0" [dev-dependencies] tokio = { version = "1.2.0", path = "../tokio", features = ["full"] } futures-util = "0.3.0" +tokio-test-macros = { path = "./macros" } [package.metadata.docs.rs] all-features = true diff --git a/tokio-test/macros/Cargo.toml b/tokio-test/macros/Cargo.toml new file mode 100644 index 00000000000..770aaa9ca1d --- /dev/null +++ b/tokio-test/macros/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "tokio-test-macros" +version = "0.0.0" +edition = "2021" + +[lib] +proc-macro = true + +[dependencies] +syn = { version = "2", features = ["full"] } +quote2 = "0.9" \ No newline at end of file diff --git a/tokio-test/macros/src/expend.rs b/tokio-test/macros/src/expend.rs new file mode 100644 index 00000000000..d9359c58215 --- /dev/null +++ b/tokio-test/macros/src/expend.rs @@ -0,0 +1,118 @@ +use quote2::{proc_macro2::TokenStream, quote, utils::quote_rep, Quote, ToTokens}; +use syn::{ + parse::{Parse, ParseStream, Parser}, + punctuated::Punctuated, + Attribute, Meta, MetaNameValue, Signature, Token, Visibility, +}; + +type AttributeArgs = Punctuated; + +pub struct ItemFn { + pub attrs: Vec, + pub vis: Visibility, + pub sig: Signature, + pub body: TokenStream, +} + +impl Parse for ItemFn { + fn parse(input: ParseStream) -> Result { + Ok(Self { + attrs: input.call(Attribute::parse_outer)?, + vis: input.parse()?, + sig: input.parse()?, + body: input.parse()?, + }) + } +} + +pub fn tokio_test(args: TokenStream, item_fn: ItemFn) -> TokenStream { + let metadata = match AttributeArgs::parse_terminated.parse2(args) { + Ok(args) => args, + Err(err) => return err.into_compile_error(), + }; + + let has_miri_cfg = metadata.iter().any(|meta| meta.path().is_ident("miri")); + let id_multi_thread = metadata.iter().any(|meta| match meta { + Meta::NameValue(meta) if meta.path.is_ident("flavor") => { + match meta.value.to_token_stream().to_string().as_str() { + "multi_thread" => true, + "current_thread" => false, + val => panic!("unknown `flavor = {val}`, expected: multi_thread | current_thread"), + } + } + _ => false, + }); + let config = quote_rep(metadata, |t, meta| { + for key in ["miri", "flavor"] { + if meta.path().is_ident(key) { + return; + } + } + if let Meta::NameValue(MetaNameValue { path, value, .. }) = &meta { + for key in ["worker_threads", "start_paused"] { + if path.is_ident(key) { + quote!(t, { .#path(#value) }); + return; + } + } + } + panic!("unknown config `{}`", meta.path().to_token_stream()) + }); + let runtime_type = quote(|t| { + if id_multi_thread { + quote!(t, { new_multi_thread }); + } else { + quote!(t, { new_current_thread }); + } + }); + let ignore_miri = quote(|t| { + if !has_miri_cfg { + quote!(t, { #[cfg_attr(miri, ignore)] }); + } + }); + let miri_test_executor = quote(|t| { + if has_miri_cfg { + quote!(t, { + if cfg!(miri) { + return tokio_test::task::spawn(body).block_on(); + } + }); + } + }); + + let ItemFn { + attrs, + vis, + mut sig, + body, + } = item_fn; + + let async_keyword = sig.asyncness.take(); + let attrs = quote_rep(attrs, |t, attr| { + quote!(t, { #attr }); + }); + + let mut out = TokenStream::new(); + quote!(out, { + #attrs + #ignore_miri + #[::core::prelude::v1::test] + #vis #sig { + let body = #async_keyword #body; + let body= ::std::pin::pin!(body); + + #miri_test_executor + + #[allow(clippy::expect_used, clippy::diverging_sub_expression, clippy::needless_return)] + { + return tokio::runtime::Builder::#runtime_type() + #config + .enable_all() + .build() + .expect("Failed building the Runtime") + .block_on(body); + } + } + }); + out +} diff --git a/tokio-test/macros/src/lib.rs b/tokio-test/macros/src/lib.rs new file mode 100644 index 00000000000..23e38668573 --- /dev/null +++ b/tokio-test/macros/src/lib.rs @@ -0,0 +1,7 @@ +mod expend; +use proc_macro::TokenStream; + +#[proc_macro_attribute] +pub fn tokio_test(args: TokenStream, item: TokenStream) -> TokenStream { + expend::tokio_test(args.into(), syn::parse_macro_input!(item)).into() +} diff --git a/tokio-test/src/task.rs b/tokio-test/src/task.rs index 2e646d44bf8..12660373878 100644 --- a/tokio-test/src/task.rs +++ b/tokio-test/src/task.rs @@ -26,11 +26,10 @@ //! ``` use std::future::Future; -use std::mem; use std::ops; use std::pin::Pin; use std::sync::{Arc, Condvar, Mutex}; -use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; +use std::task::{Context, Poll, Wake, Waker}; use tokio_stream::Stream; @@ -123,6 +122,34 @@ impl Spawn { let fut = self.future.as_mut(); self.task.enter(|cx| fut.poll(cx)) } + + /// Run a future to completion on the current thread. + /// + /// This function will block the caller until the given future has completed. + /// + /// Note: This does not create a Tokio runtime, and therefore does not support + /// Tokio-specific asynchronous APIs, such as [tokio::time::sleep]. + pub fn block_on(&mut self) -> T::Output { + loop { + match self.poll() { + Poll::Ready(val) => return val, + Poll::Pending => { + let mut guard = self.task.waker.state.lock().unwrap(); + let state = *guard; + + if state == WAKE { + continue; + } + + assert_eq!(state, IDLE); + *guard = SLEEP; + let guard = self.task.waker.condvar.wait(guard).unwrap(); + assert_eq!(*guard, WAKE); + drop(guard); + } + }; + } + } } impl Spawn { @@ -171,9 +198,8 @@ impl MockTask { F: FnOnce(&mut Context<'_>) -> R, { self.waker.clear(); - let waker = self.waker(); + let waker = Waker::from(self.waker.clone()); let mut cx = Context::from_waker(&waker); - f(&mut cx) } @@ -189,13 +215,6 @@ impl MockTask { fn waker_ref_count(&self) -> usize { Arc::strong_count(&self.waker) } - - fn waker(&self) -> Waker { - unsafe { - let raw = to_raw(self.waker.clone()); - Waker::from_raw(raw) - } - } } impl Default for MockTask { @@ -226,8 +245,10 @@ impl ThreadWaker { _ => unreachable!(), } } +} - fn wake(&self) { +impl Wake for ThreadWaker { + fn wake(self: Arc) { // First, try transitioning from IDLE -> NOTIFY, this does not require a lock. let mut state = self.state.lock().unwrap(); let prev = *state; @@ -247,39 +268,3 @@ impl ThreadWaker { self.condvar.notify_one(); } } - -static VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake_by_ref, drop_waker); - -unsafe fn to_raw(waker: Arc) -> RawWaker { - RawWaker::new(Arc::into_raw(waker) as *const (), &VTABLE) -} - -unsafe fn from_raw(raw: *const ()) -> Arc { - Arc::from_raw(raw as *const ThreadWaker) -} - -unsafe fn clone(raw: *const ()) -> RawWaker { - let waker = from_raw(raw); - - // Increment the ref count - mem::forget(waker.clone()); - - to_raw(waker) -} - -unsafe fn wake(raw: *const ()) { - let waker = from_raw(raw); - waker.wake(); -} - -unsafe fn wake_by_ref(raw: *const ()) { - let waker = from_raw(raw); - waker.wake(); - - // We don't actually own a reference to the unparker - mem::forget(waker); -} - -unsafe fn drop_waker(raw: *const ()) { - let _ = from_raw(raw); -} diff --git a/tokio-test/tests/block_on.rs b/tokio-test/tests/block_on.rs index efaaf510f07..926c4d00a8d 100644 --- a/tokio-test/tests/block_on.rs +++ b/tokio-test/tests/block_on.rs @@ -4,6 +4,7 @@ use tokio::time::{sleep_until, Duration, Instant}; use tokio_test::block_on; #[test] +#[cfg_attr(miri, ignore)] fn async_block() { assert_eq!(4, block_on(async { 4 })); } @@ -13,11 +14,13 @@ async fn five() -> u8 { } #[test] +#[cfg_attr(miri, ignore)] fn async_fn() { assert_eq!(5, block_on(five())); } #[test] +#[cfg_attr(miri, ignore)] fn test_sleep() { let deadline = Instant::now() + Duration::from_millis(100); diff --git a/tokio-test/tests/io.rs b/tokio-test/tests/io.rs index 5f2ed427cd3..e70530573bd 100644 --- a/tokio-test/tests/io.rs +++ b/tokio-test/tests/io.rs @@ -5,7 +5,12 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::time::{Duration, Instant}; use tokio_test::io::Builder; -#[tokio::test] +mod tokio { + pub use ::tokio::*; + pub use ::tokio_test_macros::tokio_test as test; +} + +#[tokio::test(miri)] async fn read() { let mut mock = Builder::new().read(b"hello ").read(b"world!").build(); @@ -18,7 +23,7 @@ async fn read() { assert_eq!(&buf[..n], b"world!"); } -#[tokio::test] +#[tokio::test(miri)] async fn read_error() { let error = io::Error::new(io::ErrorKind::Other, "cruel"); let mut mock = Builder::new() @@ -43,7 +48,7 @@ async fn read_error() { assert_eq!(&buf[..n], b"world!"); } -#[tokio::test] +#[tokio::test(miri)] async fn write() { let mut mock = Builder::new().write(b"hello ").write(b"world!").build(); @@ -51,7 +56,7 @@ async fn write() { mock.write_all(b"world!").await.expect("write 2"); } -#[tokio::test] +#[tokio::test(miri)] async fn write_with_handle() { let (mut mock, mut handle) = Builder::new().build_with_handle(); handle.write(b"hello "); @@ -61,7 +66,7 @@ async fn write_with_handle() { mock.write_all(b"world!").await.expect("write 2"); } -#[tokio::test] +#[tokio::test(miri)] async fn read_with_handle() { let (mut mock, mut handle) = Builder::new().build_with_handle(); handle.read(b"hello "); @@ -74,7 +79,7 @@ async fn read_with_handle() { assert_eq!(&buf[..], b"world!"); } -#[tokio::test] +#[tokio::test(miri)] async fn write_error() { let error = io::Error::new(io::ErrorKind::Other, "cruel"); let mut mock = Builder::new() @@ -82,6 +87,7 @@ async fn write_error() { .write_error(error) .write(b"world!") .build(); + mock.write_all(b"hello ").await.expect("write 1"); match mock.write_all(b"whoa").await { @@ -95,14 +101,14 @@ async fn write_error() { mock.write_all(b"world!").await.expect("write 2"); } -#[tokio::test] +#[tokio::test(miri)] #[should_panic] async fn mock_panics_read_data_left() { use tokio_test::io::Builder; Builder::new().read(b"read").build(); } -#[tokio::test] +#[tokio::test(miri)] #[should_panic] async fn mock_panics_write_data_left() { use tokio_test::io::Builder; diff --git a/tokio-test/tests/stream_mock.rs b/tokio-test/tests/stream_mock.rs index a54ea838a5b..652fdb8b352 100644 --- a/tokio-test/tests/stream_mock.rs +++ b/tokio-test/tests/stream_mock.rs @@ -2,7 +2,12 @@ use futures_util::StreamExt; use std::time::Duration; use tokio_test::stream_mock::StreamMockBuilder; -#[tokio::test] +mod tokio { + pub use ::tokio::*; + pub use ::tokio_test_macros::tokio_test as test; +} + +#[tokio::test(miri)] async fn test_stream_mock_empty() { let mut stream_mock = StreamMockBuilder::::new().build(); @@ -10,7 +15,7 @@ async fn test_stream_mock_empty() { assert_eq!(stream_mock.next().await, None); } -#[tokio::test] +#[tokio::test(miri)] async fn test_stream_mock_items() { let mut stream_mock = StreamMockBuilder::new().next(1).next(2).build(); @@ -35,14 +40,14 @@ async fn test_stream_mock_wait() { assert_eq!(stream_mock.next().await, None); } -#[tokio::test] +#[tokio::test(miri)] #[should_panic(expected = "StreamMock was dropped before all actions were consumed")] async fn test_stream_mock_drop_without_consuming_all() { let stream_mock = StreamMockBuilder::new().next(1).next(2).build(); drop(stream_mock); } -#[tokio::test] +#[tokio::test(miri)] #[should_panic(expected = "test panic was not masked")] async fn test_stream_mock_drop_during_panic_doesnt_mask_panic() { let _stream_mock = StreamMockBuilder::new().next(1).next(2).build(); diff --git a/tokio-test/tests/task.rs b/tokio-test/tests/task.rs index a464bf9b7c5..ef46c1d492f 100644 --- a/tokio-test/tests/task.rs +++ b/tokio-test/tests/task.rs @@ -1,5 +1,8 @@ +use std::future::poll_fn; use std::pin::Pin; use std::task::{Context, Poll}; +use std::thread; +use std::time::Duration; use tokio_stream::Stream; use tokio_test::task; @@ -23,3 +26,48 @@ fn test_spawn_stream_size_hint() { let spawn = task::spawn(SizedStream); assert_eq!(spawn.size_hint(), (100, Some(200))); } + +#[test] +#[cfg_attr(miri, ignore)] +fn test_spawn_block_on() { + let job = thread::spawn(move || { + task::spawn(async { + let mut poll_once = false; + poll_fn(|cx| { + if poll_once { + return Poll::Ready(()); + } + assert!(!poll_once); + poll_once = true; + cx.waker().wake_by_ref(); + Poll::Pending + }) + .await; + + let mut once = false; + poll_fn(|cx| { + if once { + return Poll::Ready(()); + } + let waker = cx.waker().clone(); + thread::spawn(move || { + thread::sleep(Duration::from_millis(333)); + waker.wake(); + }); + assert!(!once); + once = true; + Poll::Pending + }) + .await; + }) + .block_on(); + }); + + let job2 = thread::spawn(|| { + task::spawn(async { std::future::pending::<()>().await }).block_on(); + }); + + thread::sleep(Duration::from_secs(2)); + assert!(job.is_finished()); + assert!(!job2.is_finished()); +} diff --git a/tokio-util/Cargo.toml b/tokio-util/Cargo.toml index 12f70be862e..c8f78194a91 100644 --- a/tokio-util/Cargo.toml +++ b/tokio-util/Cargo.toml @@ -51,6 +51,7 @@ hashbrown = { version = "0.14.0", default-features = false, optional = true } tokio = { version = "1.0.0", path = "../tokio", features = ["full"] } tokio-test = { version = "0.4.0", path = "../tokio-test" } tokio-stream = { version = "0.1", path = "../tokio-stream" } +tokio-test-macros = { path = "../tokio-test/macros" } async-stream = "0.3.0" futures = "0.3.0" diff --git a/tokio-util/src/either.rs b/tokio-util/src/either.rs index e7fec9546b3..eb4c211d886 100644 --- a/tokio-util/src/either.rs +++ b/tokio-util/src/either.rs @@ -202,15 +202,16 @@ mod tests { use super::*; use tokio::io::{repeat, AsyncReadExt, Repeat}; use tokio_stream::{once, Once, StreamExt}; + use tokio_test_macros::tokio_test; - #[tokio::test] + #[tokio_test(miri)] async fn either_is_stream() { let mut either: Either, Once> = Either::Left(once(1)); assert_eq!(Some(1u32), either.next().await); } - #[tokio::test] + #[tokio_test(miri)] async fn either_is_async_read() { let mut buffer = [0; 3]; let mut either: Either = Either::Right(repeat(0b101)); diff --git a/tokio-util/tests/abort_on_drop.rs b/tokio-util/tests/abort_on_drop.rs index c7dcee35aac..df8dd9445b1 100644 --- a/tokio-util/tests/abort_on_drop.rs +++ b/tokio-util/tests/abort_on_drop.rs @@ -1,7 +1,8 @@ use tokio::sync::oneshot; +use tokio_test_macros::tokio_test; use tokio_util::task::AbortOnDropHandle; -#[tokio::test] +#[tokio_test] async fn aborts_task_on_drop() { let (mut tx, rx) = oneshot::channel::(); let handle = tokio::spawn(async move { @@ -13,7 +14,7 @@ async fn aborts_task_on_drop() { assert!(tx.is_closed()); } -#[tokio::test] +#[tokio_test] async fn aborts_task_directly() { let (mut tx, rx) = oneshot::channel::(); let handle = tokio::spawn(async move { diff --git a/tokio-util/tests/compat.rs b/tokio-util/tests/compat.rs index 1c77081f898..16664110f47 100644 --- a/tokio-util/tests/compat.rs +++ b/tokio-util/tests/compat.rs @@ -6,9 +6,10 @@ use futures_io::SeekFrom; use futures_util::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; use tempfile::NamedTempFile; use tokio::fs::OpenOptions; +use tokio_test_macros::tokio_test; use tokio_util::compat::TokioAsyncWriteCompatExt; -#[tokio::test] +#[tokio_test] async fn compat_file_seek() -> futures_util::io::Result<()> { let temp_file = NamedTempFile::new()?; let mut file = OpenOptions::new() diff --git a/tokio-util/tests/context.rs b/tokio-util/tests/context.rs index 2ec8258d0bf..05c1cb47c49 100644 --- a/tokio-util/tests/context.rs +++ b/tokio-util/tests/context.rs @@ -7,6 +7,7 @@ use tokio::time::*; use tokio_util::context::RuntimeExt; #[test] +#[cfg_attr(miri, ignore)] fn tokio_context_with_another_runtime() { let rt1 = Builder::new_multi_thread() .worker_threads(1) diff --git a/tokio-util/tests/framed.rs b/tokio-util/tests/framed.rs index ec8cdf00d09..07efd6b0e48 100644 --- a/tokio-util/tests/framed.rs +++ b/tokio-util/tests/framed.rs @@ -2,6 +2,7 @@ use tokio_stream::StreamExt; use tokio_test::assert_ok; +use tokio_test_macros::tokio_test; use tokio_util::codec::{Decoder, Encoder, Framed, FramedParts}; use bytes::{Buf, BufMut, BytesMut}; @@ -97,7 +98,7 @@ impl tokio::io::AsyncRead for DontReadIntoThis { } } -#[tokio::test] +#[tokio_test(miri)] async fn can_read_from_existing_buf() { let mut parts = FramedParts::new(DontReadIntoThis, U32Codec::default()); parts.read_buf = BytesMut::from(&[0, 0, 0, 42][..]); @@ -109,7 +110,7 @@ async fn can_read_from_existing_buf() { assert_eq!(framed.codec().read_bytes, 4); } -#[tokio::test] +#[tokio_test(miri)] async fn can_read_from_existing_buf_after_codec_changed() { let mut parts = FramedParts::new(DontReadIntoThis, U32Codec::default()); parts.read_buf = BytesMut::from(&[0, 0, 0, 42, 0, 0, 0, 0, 0, 0, 0, 84][..]); diff --git a/tokio-util/tests/framed_stream.rs b/tokio-util/tests/framed_stream.rs index 76d8af7b7d6..056ff239822 100644 --- a/tokio-util/tests/framed_stream.rs +++ b/tokio-util/tests/framed_stream.rs @@ -1,6 +1,7 @@ use futures_core::stream::Stream; use std::{io, pin::Pin}; use tokio_test::{assert_ready, io::Builder, task}; +use tokio_test_macros::tokio_test; use tokio_util::codec::{BytesCodec, FramedRead}; macro_rules! pin { @@ -16,7 +17,7 @@ macro_rules! assert_read { }}; } -#[tokio::test] +#[tokio_test(miri)] async fn return_none_after_error() { let mut io = FramedRead::new( Builder::new() diff --git a/tokio-util/tests/io_inspect.rs b/tokio-util/tests/io_inspect.rs index ee8b3f0c604..eb526853368 100644 --- a/tokio-util/tests/io_inspect.rs +++ b/tokio-util/tests/io_inspect.rs @@ -5,6 +5,7 @@ use std::{ task::{Context, Poll}, }; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf}; +use tokio_test_macros::tokio_test; use tokio_util::io::{InspectReader, InspectWriter}; /// An AsyncRead implementation that works byte-by-byte, to catch out callers @@ -28,7 +29,7 @@ impl AsyncRead for SmallReader { } } -#[tokio::test] +#[tokio_test(miri)] async fn read_tee() { let contents = b"This could be really long, you know".to_vec(); let reader = SmallReader { @@ -110,7 +111,7 @@ impl AsyncWrite for SmallWriter { } } -#[tokio::test] +#[tokio_test(miri)] async fn write_tee() { let mut altout: Vec = Vec::new(); let mut writeout = SmallWriter { @@ -157,7 +158,7 @@ async fn write_all_vectored( Ok(res) } -#[tokio::test] +#[tokio_test(miri)] async fn write_tee_vectored() { let mut altout: Vec = Vec::new(); let mut writeout = SmallWriter { diff --git a/tokio-util/tests/io_reader_stream.rs b/tokio-util/tests/io_reader_stream.rs index e30cd85164c..5ad362e3012 100644 --- a/tokio-util/tests/io_reader_stream.rs +++ b/tokio-util/tests/io_reader_stream.rs @@ -4,6 +4,7 @@ use std::pin::Pin; use std::task::{Context, Poll}; use tokio::io::{AsyncRead, ReadBuf}; use tokio_stream::StreamExt; +use tokio_test_macros::tokio_test; /// produces at most `remaining` zeros, that returns error. /// each time it reads at most 31 byte. @@ -34,7 +35,7 @@ impl AsyncRead for Reader { } } -#[tokio::test] +#[tokio_test] // Too slow on miri async fn correct_behavior_on_errors() { let reader = Reader { remaining: 8000 }; let mut stream = tokio_util::io::ReaderStream::new(reader); diff --git a/tokio-util/tests/io_sink_writer.rs b/tokio-util/tests/io_sink_writer.rs index e76870be41d..0f7fb11557c 100644 --- a/tokio-util/tests/io_sink_writer.rs +++ b/tokio-util/tests/io_sink_writer.rs @@ -4,11 +4,12 @@ use bytes::Bytes; use futures_util::SinkExt; use std::io::{self, Error, ErrorKind}; use tokio::io::AsyncWriteExt; +use tokio_test_macros::tokio_test; use tokio_util::codec::{Encoder, FramedWrite}; use tokio_util::io::{CopyToBytes, SinkWriter}; use tokio_util::sync::PollSender; -#[tokio::test] +#[tokio_test(miri)] async fn test_copied_sink_writer() -> Result<(), Error> { // Construct a channel pair to send data across and wrap a pollable sink. // Note that the sink must mimic a writable object, e.g. have `std::io::Error` @@ -51,7 +52,7 @@ impl<'a> Encoder<&'a [u8]> for SliceEncoder { } } -#[tokio::test] +#[tokio_test(miri)] async fn test_direct_sink_writer() -> Result<(), Error> { // We define a framed writer which accepts byte slices // and 'reverse' this construction immediately. diff --git a/tokio-util/tests/io_stream_reader.rs b/tokio-util/tests/io_stream_reader.rs index 59759941c51..1811b05c0da 100644 --- a/tokio-util/tests/io_stream_reader.rs +++ b/tokio-util/tests/io_stream_reader.rs @@ -3,9 +3,10 @@ use bytes::Bytes; use tokio::io::AsyncReadExt; use tokio_stream::iter; +use tokio_test_macros::tokio_test; use tokio_util::io::StreamReader; -#[tokio::test] +#[tokio_test(miri)] async fn test_stream_reader() -> std::io::Result<()> { let stream = iter(vec![ std::io::Result::Ok(Bytes::from_static(&[])), diff --git a/tokio-util/tests/io_sync_bridge.rs b/tokio-util/tests/io_sync_bridge.rs index 50d0e89617c..f1d2928fe20 100644 --- a/tokio-util/tests/io_sync_bridge.rs +++ b/tokio-util/tests/io_sync_bridge.rs @@ -4,6 +4,7 @@ use std::error::Error; use std::io::{Cursor, Read, Result as IoResult, Write}; use tokio::io::{AsyncRead, AsyncReadExt}; +use tokio_test_macros::tokio_test; use tokio_util::io::SyncIoBridge; async fn test_reader_len( @@ -21,7 +22,7 @@ async fn test_reader_len( Ok(()) } -#[tokio::test] +#[tokio_test] async fn test_async_read_to_sync() -> Result<(), Box> { test_reader_len(tokio::io::empty(), 0).await?; let buf = b"hello world"; @@ -29,7 +30,7 @@ async fn test_async_read_to_sync() -> Result<(), Box> { Ok(()) } -#[tokio::test] +#[tokio_test] async fn test_async_write_to_sync() -> Result<(), Box> { let mut dest = Vec::new(); let src = b"hello world"; @@ -43,7 +44,7 @@ async fn test_async_write_to_sync() -> Result<(), Box> { Ok(()) } -#[tokio::test] +#[tokio_test] async fn test_into_inner() -> Result<(), Box> { let mut buf = Vec::new(); SyncIoBridge::new(tokio::io::empty()) @@ -55,7 +56,7 @@ async fn test_into_inner() -> Result<(), Box> { Ok(()) } -#[tokio::test] +#[tokio_test] async fn test_shutdown() -> Result<(), Box> { let (s1, mut s2) = tokio::io::duplex(1024); let (_rh, wh) = tokio::io::split(s1); diff --git a/tokio-util/tests/mpsc.rs b/tokio-util/tests/mpsc.rs index d53c81c2a9a..20df84991a7 100644 --- a/tokio-util/tests/mpsc.rs +++ b/tokio-util/tests/mpsc.rs @@ -5,9 +5,10 @@ use tokio_test::task::spawn; use tokio_test::{ assert_ok, assert_pending, assert_ready, assert_ready_eq, assert_ready_err, assert_ready_ok, }; +use tokio_test_macros::tokio_test; use tokio_util::sync::PollSender; -#[tokio::test] +#[tokio_test(miri)] async fn simple() { let (send, mut recv) = channel(3); let mut send = PollSender::new(send); @@ -30,7 +31,7 @@ async fn simple() { send.send_item(42).unwrap(); } -#[tokio::test] +#[tokio_test(miri)] async fn simple_ref() { let v = [1, 2, 3i32]; @@ -53,7 +54,7 @@ async fn simple_ref() { send.send_item(&42).unwrap(); } -#[tokio::test] +#[tokio_test(miri)] async fn repeated_poll_reserve() { let (send, mut recv) = channel::(1); let mut send = PollSender::new(send); @@ -66,7 +67,7 @@ async fn repeated_poll_reserve() { assert_eq!(recv.recv().await.unwrap(), 1); } -#[tokio::test] +#[tokio_test(miri)] async fn abort_send() { let (send, mut recv) = channel(3); let mut send = PollSender::new(send); @@ -95,7 +96,7 @@ async fn abort_send() { assert_eq!(recv.recv().await.unwrap(), 5); } -#[tokio::test] +#[tokio_test(miri)] async fn close_sender_last() { let (send, mut recv) = channel::(3); let mut send = PollSender::new(send); @@ -109,7 +110,7 @@ async fn close_sender_last() { assert!(assert_ready!(recv_task.poll()).is_none()); } -#[tokio::test] +#[tokio_test(miri)] async fn close_sender_not_last() { let (send, mut recv) = channel::(3); let mut send = PollSender::new(send); @@ -129,7 +130,7 @@ async fn close_sender_not_last() { assert!(assert_ready!(recv_task.poll()).is_none()); } -#[tokio::test] +#[tokio_test(miri)] async fn close_sender_before_reserve() { let (send, mut recv) = channel::(3); let mut send = PollSender::new(send); @@ -146,7 +147,7 @@ async fn close_sender_before_reserve() { assert_ready_err!(reserve.poll()); } -#[tokio::test] +#[tokio_test(miri)] async fn close_sender_after_pending_reserve() { let (send, mut recv) = channel::(1); let mut send = PollSender::new(send); @@ -171,7 +172,7 @@ async fn close_sender_after_pending_reserve() { assert_ready_err!(reserve.poll()); } -#[tokio::test] +#[tokio_test(miri)] async fn close_sender_after_successful_reserve() { let (send, mut recv) = channel::(3); let mut send = PollSender::new(send); @@ -192,7 +193,7 @@ async fn close_sender_after_successful_reserve() { assert_ready_ok!(reserve.poll()); } -#[tokio::test] +#[tokio_test(miri)] async fn abort_send_after_pending_reserve() { let (send, mut recv) = channel::(1); let mut send = PollSender::new(send); @@ -214,7 +215,7 @@ async fn abort_send_after_pending_reserve() { assert_eq!(send.get_ref().unwrap().capacity(), 0); } -#[tokio::test] +#[tokio_test(miri)] async fn abort_send_after_successful_reserve() { let (send, mut recv) = channel::(1); let mut send = PollSender::new(send); @@ -231,7 +232,7 @@ async fn abort_send_after_successful_reserve() { assert_eq!(send.get_ref().unwrap().capacity(), 1); } -#[tokio::test] +#[tokio_test(miri)] async fn closed_when_receiver_drops() { let (send, _) = channel::(1); let mut send = PollSender::new(send); diff --git a/tokio-util/tests/panic.rs b/tokio-util/tests/panic.rs index 853f132fc2f..52b0d67380c 100644 --- a/tokio-util/tests/panic.rs +++ b/tokio-util/tests/panic.rs @@ -89,6 +89,7 @@ fn local_pool_handle_new_panic_caller() -> Result<(), Box> { } #[test] +#[cfg_attr(miri, ignore)] fn local_pool_handle_spawn_pinned_by_idx_panic_caller() -> Result<(), Box> { let panic_location_file = test_panic(|| { let rt = basic(); @@ -104,7 +105,9 @@ fn local_pool_handle_spawn_pinned_by_idx_panic_caller() -> Result<(), Box Result<(), Box> { let panic_location_file = test_panic(|| { let rt = basic(); @@ -126,6 +129,7 @@ fn delay_queue_insert_at_panic_caller() -> Result<(), Box> { } #[test] +#[cfg_attr(miri, ignore)] fn delay_queue_insert_panic_caller() -> Result<(), Box> { let panic_location_file = test_panic(|| { let rt = basic(); @@ -143,6 +147,7 @@ fn delay_queue_insert_panic_caller() -> Result<(), Box> { } #[test] +#[cfg_attr(miri, ignore)] fn delay_queue_remove_panic_caller() -> Result<(), Box> { let panic_location_file = test_panic(|| { let rt = basic(); @@ -162,6 +167,7 @@ fn delay_queue_remove_panic_caller() -> Result<(), Box> { } #[test] +#[cfg_attr(miri, ignore)] fn delay_queue_reset_at_panic_caller() -> Result<(), Box> { let panic_location_file = test_panic(|| { let rt = basic(); @@ -183,6 +189,7 @@ fn delay_queue_reset_at_panic_caller() -> Result<(), Box> { } #[test] +#[cfg_attr(miri, ignore)] fn delay_queue_reset_panic_caller() -> Result<(), Box> { let panic_location_file = test_panic(|| { let rt = basic(); @@ -201,6 +208,7 @@ fn delay_queue_reset_panic_caller() -> Result<(), Box> { } #[test] +#[cfg_attr(miri, ignore)] fn delay_queue_reserve_panic_caller() -> Result<(), Box> { let panic_location_file = test_panic(|| { let rt = basic(); diff --git a/tokio-util/tests/poll_semaphore.rs b/tokio-util/tests/poll_semaphore.rs index fe947f9164a..3ff3914bef8 100644 --- a/tokio-util/tests/poll_semaphore.rs +++ b/tokio-util/tests/poll_semaphore.rs @@ -2,6 +2,7 @@ use std::future::Future; use std::sync::Arc; use std::task::Poll; use tokio::sync::{OwnedSemaphorePermit, Semaphore}; +use tokio_test_macros::tokio_test; use tokio_util::sync::PollSemaphore; type SemRet = Option; @@ -21,7 +22,7 @@ fn semaphore_poll_many( tokio_test::task::spawn(fut) } -#[tokio::test] +#[tokio_test(miri)] async fn it_works() { let sem = Arc::new(Semaphore::new(1)); let mut poll_sem = PollSemaphore::new(sem.clone()); @@ -43,7 +44,7 @@ async fn it_works() { assert!(semaphore_poll(&mut poll_sem).await.is_none()); } -#[tokio::test] +#[tokio_test(miri)] async fn can_acquire_many_permits() { let sem = Arc::new(Semaphore::new(4)); let mut poll_sem = PollSemaphore::new(sem.clone()); @@ -68,7 +69,7 @@ async fn can_acquire_many_permits() { assert_eq!(sem.available_permits(), 0); } -#[tokio::test] +#[tokio_test(miri)] async fn can_poll_different_amounts_of_permits() { let sem = Arc::new(Semaphore::new(4)); let mut poll_sem = PollSemaphore::new(sem.clone()); diff --git a/tokio-util/tests/spawn_pinned.rs b/tokio-util/tests/spawn_pinned.rs index 110fdab0abc..9c441ed7157 100644 --- a/tokio-util/tests/spawn_pinned.rs +++ b/tokio-util/tests/spawn_pinned.rs @@ -6,10 +6,11 @@ use std::rc::Rc; use std::sync::Arc; use tokio::sync::Barrier; +use tokio_test_macros::tokio_test; use tokio_util::task; /// Simple test of running a !Send future via spawn_pinned -#[tokio::test] +#[tokio_test(miri)] async fn can_spawn_not_send_future() { let pool = task::LocalPoolHandle::new(1); @@ -53,7 +54,7 @@ fn cannot_create_zero_sized_pool() { } /// We should be able to spawn multiple futures onto the pool at the same time. -#[tokio::test] +#[tokio_test(miri)] async fn can_spawn_multiple_futures() { let pool = task::LocalPoolHandle::new(2); @@ -72,7 +73,7 @@ async fn can_spawn_multiple_futures() { /// A panic in the spawned task causes the join handle to return an error. /// But, you can continue to spawn tasks. -#[tokio::test] +#[tokio_test(miri)] #[cfg(panic = "unwind")] async fn task_panic_propagates() { let pool = task::LocalPoolHandle::new(1); @@ -97,7 +98,7 @@ async fn task_panic_propagates() { /// A panic during task creation causes the join handle to return an error. /// But, you can continue to spawn tasks. -#[tokio::test] +#[tokio_test(miri)] #[cfg(panic = "unwind")] async fn callback_panic_does_not_kill_worker() { let pool = task::LocalPoolHandle::new(1); @@ -124,7 +125,7 @@ async fn callback_panic_does_not_kill_worker() { /// Canceling the task via the returned join handle cancels the spawned task /// (which has a different, internal join handle). -#[tokio::test] +#[tokio_test(miri)] async fn task_cancellation_propagates() { let pool = task::LocalPoolHandle::new(1); let notify_dropped = Arc::new(()); @@ -161,7 +162,7 @@ async fn task_cancellation_propagates() { /// Tasks should be given to the least burdened worker. When spawning two tasks /// on a pool with two empty workers the tasks should be spawned on separate /// workers. -#[tokio::test] +#[tokio_test(miri)] async fn tasks_are_balanced() { let pool = task::LocalPoolHandle::new(2); @@ -198,7 +199,7 @@ async fn tasks_are_balanced() { assert_ne!(thread_id1, thread_id2); } -#[tokio::test] +#[tokio_test(miri)] async fn spawn_by_idx() { let pool = task::LocalPoolHandle::new(3); let barrier = Arc::new(Barrier::new(4)); diff --git a/tokio-util/tests/task_join_map.rs b/tokio-util/tests/task_join_map.rs index 8757f8b5c6e..fab8f6f05f6 100644 --- a/tokio-util/tests/task_join_map.rs +++ b/tokio-util/tests/task_join_map.rs @@ -66,7 +66,7 @@ async fn test_with_sleep() { assert!(matches!(map.join_next().await, None)); } -#[tokio::test] +#[tokio_test(miri)] async fn test_abort_on_drop() { let mut map = JoinMap::new(); @@ -91,7 +91,7 @@ async fn test_abort_on_drop() { } } -#[tokio::test] +#[tokio_test(miri)] async fn alternating() { let mut map = JoinMap::new(); @@ -110,7 +110,7 @@ async fn alternating() { } } -#[tokio::test] +#[tokio_test(miri)] async fn test_keys() { use std::collections::HashSet; diff --git a/tokio-util/tests/time_delay_queue.rs b/tokio-util/tests/time_delay_queue.rs index 855b82dd40e..63df114b57c 100644 --- a/tokio-util/tests/time_delay_queue.rs +++ b/tokio-util/tests/time_delay_queue.rs @@ -5,6 +5,7 @@ use futures::StreamExt; use tokio::time::{self, sleep, sleep_until, Duration, Instant}; use tokio_test::{assert_pending, assert_ready, task}; +use tokio_test_macros::tokio_test; use tokio_util::time::DelayQueue; macro_rules! poll { @@ -22,7 +23,7 @@ macro_rules! assert_ready_some { }}; } -#[tokio::test] +#[tokio_test] async fn single_immediate_delay() { time::pause(); @@ -38,7 +39,7 @@ async fn single_immediate_delay() { assert!(entry.is_none()) } -#[tokio::test] +#[tokio_test] async fn multi_immediate_delays() { time::pause(); @@ -67,7 +68,7 @@ async fn multi_immediate_delays() { assert_eq!("3", res[2]); } -#[tokio::test] +#[tokio_test] async fn single_short_delay() { time::pause(); @@ -91,7 +92,7 @@ async fn single_short_delay() { assert!(entry.is_none()); } -#[tokio::test] +#[tokio_test] #[cfg_attr(miri, ignore)] // Too slow on miri. async fn multi_delay_at_start() { time::pause(); @@ -134,7 +135,7 @@ async fn multi_delay_at_start() { println!("finished multi_delay_start"); } -#[tokio::test] +#[tokio_test] async fn insert_in_past_fires_immediately() { println!("running insert_in_past_fires_immediately"); time::pause(); @@ -150,7 +151,7 @@ async fn insert_in_past_fires_immediately() { println!("finished insert_in_past_fires_immediately"); } -#[tokio::test] +#[tokio_test] async fn remove_entry() { time::pause(); @@ -169,7 +170,7 @@ async fn remove_entry() { assert!(entry.is_none()); } -#[tokio::test] +#[tokio_test] async fn reset_entry() { time::pause(); @@ -203,7 +204,7 @@ async fn reset_entry() { } // Reproduces tokio-rs/tokio#849. -#[tokio::test] +#[tokio_test] async fn reset_much_later() { time::pause(); @@ -225,7 +226,7 @@ async fn reset_much_later() { } // Reproduces tokio-rs/tokio#849. -#[tokio::test] +#[tokio_test] async fn reset_twice() { time::pause(); @@ -256,7 +257,7 @@ async fn reset_twice() { /// deadline in the future. Validate that this leaves the entry and queue in an /// internally consistent state by running an additional reset on the entry /// before polling it to completion. -#[tokio::test] +#[tokio_test] async fn repeatedly_reset_entry_inserted_as_expired() { time::pause(); @@ -284,7 +285,7 @@ async fn repeatedly_reset_entry_inserted_as_expired() { assert!(entry.is_none()); } -#[tokio::test] +#[tokio_test] async fn remove_expired_item() { time::pause(); @@ -304,7 +305,7 @@ async fn remove_expired_item() { /// 0th slot of the internal timer wheel — that is, entries whose expiration /// (a) falls at the beginning of one of the wheel's hierarchical levels and (b) /// is equal to the wheel's current elapsed time. -#[tokio::test] +#[tokio_test] async fn remove_at_timer_wheel_threshold() { time::pause(); @@ -332,7 +333,7 @@ async fn remove_at_timer_wheel_threshold() { } } -#[tokio::test] +#[tokio_test] async fn expires_before_last_insert() { time::pause(); @@ -358,7 +359,7 @@ async fn expires_before_last_insert() { assert_eq!(entry, "bar"); } -#[tokio::test] +#[tokio_test] async fn multi_reset() { time::pause(); @@ -395,7 +396,7 @@ async fn multi_reset() { assert!(entry.is_none()) } -#[tokio::test] +#[tokio_test] async fn expire_first_key_when_reset_to_expire_earlier() { time::pause(); @@ -418,7 +419,7 @@ async fn expire_first_key_when_reset_to_expire_earlier() { assert_eq!(entry, "one"); } -#[tokio::test] +#[tokio_test] async fn expire_second_key_when_reset_to_expire_earlier() { time::pause(); @@ -441,7 +442,7 @@ async fn expire_second_key_when_reset_to_expire_earlier() { assert_eq!(entry, "two"); } -#[tokio::test] +#[tokio_test] async fn reset_first_expiring_item_to_expire_later() { time::pause(); @@ -463,7 +464,7 @@ async fn reset_first_expiring_item_to_expire_later() { assert_eq!(entry, "two"); } -#[tokio::test] +#[tokio_test] async fn insert_before_first_after_poll() { time::pause(); @@ -489,7 +490,7 @@ async fn insert_before_first_after_poll() { assert_eq!(entry, "two"); } -#[tokio::test] +#[tokio_test] async fn insert_after_ready_poll() { time::pause(); @@ -522,7 +523,7 @@ async fn insert_after_ready_poll() { assert_eq!("3", res[2]); } -#[tokio::test] +#[tokio_test] async fn reset_later_after_slot_starts() { time::pause(); @@ -559,7 +560,7 @@ async fn reset_later_after_slot_starts() { assert_eq!(entry, "foo"); } -#[tokio::test] +#[tokio_test] async fn reset_inserted_expired() { time::pause(); @@ -584,7 +585,7 @@ async fn reset_inserted_expired() { assert_eq!(queue.len(), 0); } -#[tokio::test] +#[tokio_test] async fn reset_earlier_after_slot_starts() { time::pause(); @@ -621,7 +622,7 @@ async fn reset_earlier_after_slot_starts() { assert_eq!(entry, "foo"); } -#[tokio::test] +#[tokio_test] async fn insert_in_past_after_poll_fires_immediately() { time::pause(); @@ -644,7 +645,7 @@ async fn insert_in_past_after_poll_fires_immediately() { assert_eq!(entry, "bar"); } -#[tokio::test] +#[tokio_test] async fn delay_queue_poll_expired_when_empty() { let mut delay_queue = task::spawn(DelayQueue::new()); let key = delay_queue.insert(0, std::time::Duration::from_secs(10)); @@ -654,7 +655,7 @@ async fn delay_queue_poll_expired_when_empty() { assert!(assert_ready!(poll!(delay_queue)).is_none()); } -#[tokio::test(start_paused = true)] +#[tokio_test(start_paused = true)] async fn compact_expire_empty() { let mut queue = task::spawn(DelayQueue::new()); @@ -677,7 +678,7 @@ async fn compact_expire_empty() { assert_eq!(queue.capacity(), 0); } -#[tokio::test(start_paused = true)] +#[tokio_test(start_paused = true)] async fn compact_remove_empty() { let mut queue = task::spawn(DelayQueue::new()); @@ -695,7 +696,7 @@ async fn compact_remove_empty() { assert_eq!(queue.capacity(), 0); } -#[tokio::test(start_paused = true)] +#[tokio_test(start_paused = true)] // Trigger a re-mapping of keys in the slab due to a `compact` call and // test removal of re-mapped keys async fn compact_remove_remapped_keys() { @@ -736,7 +737,7 @@ async fn compact_remove_remapped_keys() { assert_eq!(queue.capacity(), 1); } -#[tokio::test(start_paused = true)] +#[tokio_test(start_paused = true)] async fn compact_change_deadline() { let mut queue = task::spawn(DelayQueue::new()); @@ -788,7 +789,7 @@ async fn compact_change_deadline() { assert!(entry.is_none()); } -#[tokio::test(start_paused = true)] +#[tokio_test(start_paused = true)] async fn item_expiry_greater_than_wheel() { // This function tests that a delay queue that has existed for at least 2^36 milliseconds won't panic when a new item is inserted. let mut queue = DelayQueue::new(); @@ -805,7 +806,7 @@ async fn item_expiry_greater_than_wheel() { } #[cfg_attr(target_os = "wasi", ignore = "FIXME: Does not seem to work with WASI")] -#[tokio::test(start_paused = true)] +#[tokio_test(start_paused = true)] #[cfg(panic = "unwind")] async fn remove_after_compact() { let now = Instant::now(); @@ -823,7 +824,7 @@ async fn remove_after_compact() { } #[cfg_attr(target_os = "wasi", ignore = "FIXME: Does not seem to work with WASI")] -#[tokio::test(start_paused = true)] +#[tokio_test(start_paused = true)] #[cfg(panic = "unwind")] async fn remove_after_compact_poll() { let now = Instant::now(); @@ -843,7 +844,7 @@ async fn remove_after_compact_poll() { assert!(panic.is_err()); } -#[tokio::test(start_paused = true)] +#[tokio_test(start_paused = true)] async fn peek() { let mut queue = task::spawn(DelayQueue::new()); @@ -881,7 +882,7 @@ async fn peek() { assert!(queue.peek().is_none()); } -#[tokio::test(start_paused = true)] +#[tokio_test(start_paused = true)] async fn wake_after_remove_last() { let mut queue = task::spawn(DelayQueue::new()); let key = queue.insert("foo", ms(1000)); diff --git a/tokio-util/tests/udp.rs b/tokio-util/tests/udp.rs index 6e31b3a5394..9a0172c73c4 100644 --- a/tokio-util/tests/udp.rs +++ b/tokio-util/tests/udp.rs @@ -4,6 +4,7 @@ use tokio::net::UdpSocket; use tokio_stream::StreamExt; +use tokio_test_macros::tokio_test; use tokio_util::codec::{Decoder, Encoder, LinesCodec}; use tokio_util::udp::UdpFramed; @@ -24,7 +25,7 @@ use std::sync::Arc; ), allow(unused_assignments) )] -#[tokio::test] +#[tokio_test(miri)] async fn send_framed_byte_codec() -> std::io::Result<()> { let mut a_soc = UdpSocket::bind("127.0.0.1:0").await?; let mut b_soc = UdpSocket::bind("127.0.0.1:0").await?; @@ -99,7 +100,7 @@ impl Encoder<&[u8]> for ByteCodec { } } -#[tokio::test] +#[tokio_test(miri)] async fn send_framed_lines_codec() -> std::io::Result<()> { let a_soc = UdpSocket::bind("127.0.0.1:0").await?; let b_soc = UdpSocket::bind("127.0.0.1:0").await?; @@ -120,7 +121,7 @@ async fn send_framed_lines_codec() -> std::io::Result<()> { Ok(()) } -#[tokio::test] +#[tokio_test(miri)] async fn framed_half() -> std::io::Result<()> { let a_soc = Arc::new(UdpSocket::bind("127.0.0.1:0").await?); let b_soc = a_soc.clone();