Skip to content

Commit

Permalink
tokio-utils: add miri test suite
Browse files Browse the repository at this point in the history
  • Loading branch information
nurmohammed840 committed Oct 12, 2024
1 parent 46f17cc commit dd5d065
Show file tree
Hide file tree
Showing 20 changed files with 106 additions and 84 deletions.
5 changes: 1 addition & 4 deletions tokio-test/macros/src/expend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,7 @@ pub fn tokio_test(args: TokenStream, item_fn: ItemFn) -> TokenStream {
}
}
}
panic!(
"unknown config `{}`",
meta.path().to_token_stream().to_string()
)
panic!("unknown config `{}`", meta.path().to_token_stream())
});
let runtime_type = quote(|t| {
if id_multi_thread {
Expand Down
1 change: 1 addition & 0 deletions tokio-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ hashbrown = { version = "0.14.0", default-features = false, optional = true }
tokio = { version = "1.0.0", path = "../tokio", features = ["full"] }
tokio-test = { version = "0.4.0", path = "../tokio-test" }
tokio-stream = { version = "0.1", path = "../tokio-stream" }
tokio-test-macros = { path = "../tokio-test/macros" }

async-stream = "0.3.0"
futures = "0.3.0"
Expand Down
5 changes: 3 additions & 2 deletions tokio-util/src/either.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,15 +202,16 @@ mod tests {
use super::*;
use tokio::io::{repeat, AsyncReadExt, Repeat};
use tokio_stream::{once, Once, StreamExt};
use tokio_test_macros::tokio_test;

#[tokio::test]
#[tokio_test(miri)]
async fn either_is_stream() {
let mut either: Either<Once<u32>, Once<u32>> = Either::Left(once(1));

assert_eq!(Some(1u32), either.next().await);
}

#[tokio::test]
#[tokio_test(miri)]
async fn either_is_async_read() {
let mut buffer = [0; 3];
let mut either: Either<Repeat, Repeat> = Either::Right(repeat(0b101));
Expand Down
5 changes: 3 additions & 2 deletions tokio-util/tests/abort_on_drop.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use tokio::sync::oneshot;
use tokio_test_macros::tokio_test;
use tokio_util::task::AbortOnDropHandle;

#[tokio::test]
#[tokio_test]
async fn aborts_task_on_drop() {
let (mut tx, rx) = oneshot::channel::<bool>();
let handle = tokio::spawn(async move {
Expand All @@ -13,7 +14,7 @@ async fn aborts_task_on_drop() {
assert!(tx.is_closed());
}

#[tokio::test]
#[tokio_test]
async fn aborts_task_directly() {
let (mut tx, rx) = oneshot::channel::<bool>();
let handle = tokio::spawn(async move {
Expand Down
3 changes: 2 additions & 1 deletion tokio-util/tests/compat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ use futures_io::SeekFrom;
use futures_util::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
use tempfile::NamedTempFile;
use tokio::fs::OpenOptions;
use tokio_test_macros::tokio_test;
use tokio_util::compat::TokioAsyncWriteCompatExt;

#[tokio::test]
#[tokio_test]
async fn compat_file_seek() -> futures_util::io::Result<()> {
let temp_file = NamedTempFile::new()?;
let mut file = OpenOptions::new()
Expand Down
1 change: 1 addition & 0 deletions tokio-util/tests/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use tokio::time::*;
use tokio_util::context::RuntimeExt;

#[test]
#[cfg_attr(miri, ignore)]
fn tokio_context_with_another_runtime() {
let rt1 = Builder::new_multi_thread()
.worker_threads(1)
Expand Down
5 changes: 3 additions & 2 deletions tokio-util/tests/framed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use tokio_stream::StreamExt;
use tokio_test::assert_ok;
use tokio_test_macros::tokio_test;
use tokio_util::codec::{Decoder, Encoder, Framed, FramedParts};

use bytes::{Buf, BufMut, BytesMut};
Expand Down Expand Up @@ -97,7 +98,7 @@ impl tokio::io::AsyncRead for DontReadIntoThis {
}
}

#[tokio::test]
#[tokio_test(miri)]
async fn can_read_from_existing_buf() {
let mut parts = FramedParts::new(DontReadIntoThis, U32Codec::default());
parts.read_buf = BytesMut::from(&[0, 0, 0, 42][..]);
Expand All @@ -109,7 +110,7 @@ async fn can_read_from_existing_buf() {
assert_eq!(framed.codec().read_bytes, 4);
}

#[tokio::test]
#[tokio_test(miri)]
async fn can_read_from_existing_buf_after_codec_changed() {
let mut parts = FramedParts::new(DontReadIntoThis, U32Codec::default());
parts.read_buf = BytesMut::from(&[0, 0, 0, 42, 0, 0, 0, 0, 0, 0, 0, 84][..]);
Expand Down
3 changes: 2 additions & 1 deletion tokio-util/tests/framed_stream.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use futures_core::stream::Stream;
use tokio_test_macros::tokio_test;
use std::{io, pin::Pin};
use tokio_test::{assert_ready, io::Builder, task};
use tokio_util::codec::{BytesCodec, FramedRead};
Expand All @@ -16,7 +17,7 @@ macro_rules! assert_read {
}};
}

#[tokio::test]
#[tokio_test(miri)]
async fn return_none_after_error() {
let mut io = FramedRead::new(
Builder::new()
Expand Down
7 changes: 4 additions & 3 deletions tokio-util/tests/io_inspect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::{
task::{Context, Poll},
};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf};
use tokio_test_macros::tokio_test;
use tokio_util::io::{InspectReader, InspectWriter};

/// An AsyncRead implementation that works byte-by-byte, to catch out callers
Expand All @@ -28,7 +29,7 @@ impl AsyncRead for SmallReader {
}
}

#[tokio::test]
#[tokio_test(miri)]
async fn read_tee() {
let contents = b"This could be really long, you know".to_vec();
let reader = SmallReader {
Expand Down Expand Up @@ -110,7 +111,7 @@ impl AsyncWrite for SmallWriter {
}
}

#[tokio::test]
#[tokio_test(miri)]
async fn write_tee() {
let mut altout: Vec<u8> = Vec::new();
let mut writeout = SmallWriter {
Expand Down Expand Up @@ -157,7 +158,7 @@ async fn write_all_vectored<W: AsyncWrite + Unpin>(
Ok(res)
}

#[tokio::test]
#[tokio_test(miri)]
async fn write_tee_vectored() {
let mut altout: Vec<u8> = Vec::new();
let mut writeout = SmallWriter {
Expand Down
3 changes: 2 additions & 1 deletion tokio-util/tests/io_reader_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, ReadBuf};
use tokio_stream::StreamExt;
use tokio_test_macros::tokio_test;

/// produces at most `remaining` zeros, that returns error.
/// each time it reads at most 31 byte.
Expand Down Expand Up @@ -34,7 +35,7 @@ impl AsyncRead for Reader {
}
}

#[tokio::test]
#[tokio_test] // Too slow on miri
async fn correct_behavior_on_errors() {
let reader = Reader { remaining: 8000 };
let mut stream = tokio_util::io::ReaderStream::new(reader);
Expand Down
5 changes: 3 additions & 2 deletions tokio-util/tests/io_sink_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@

use bytes::Bytes;
use futures_util::SinkExt;
use tokio_test_macros::tokio_test;
use std::io::{self, Error, ErrorKind};
use tokio::io::AsyncWriteExt;
use tokio_util::codec::{Encoder, FramedWrite};
use tokio_util::io::{CopyToBytes, SinkWriter};
use tokio_util::sync::PollSender;

#[tokio::test]
#[tokio_test(miri)]
async fn test_copied_sink_writer() -> Result<(), Error> {
// Construct a channel pair to send data across and wrap a pollable sink.
// Note that the sink must mimic a writable object, e.g. have `std::io::Error`
Expand Down Expand Up @@ -51,7 +52,7 @@ impl<'a> Encoder<&'a [u8]> for SliceEncoder {
}
}

#[tokio::test]
#[tokio_test(miri)]
async fn test_direct_sink_writer() -> Result<(), Error> {
// We define a framed writer which accepts byte slices
// and 'reverse' this construction immediately.
Expand Down
3 changes: 2 additions & 1 deletion tokio-util/tests/io_stream_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
use bytes::Bytes;
use tokio::io::AsyncReadExt;
use tokio_stream::iter;
use tokio_test_macros::tokio_test;
use tokio_util::io::StreamReader;

#[tokio::test]
#[tokio_test(miri)]
async fn test_stream_reader() -> std::io::Result<()> {
let stream = iter(vec![
std::io::Result::Ok(Bytes::from_static(&[])),
Expand Down
9 changes: 5 additions & 4 deletions tokio-util/tests/io_sync_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use std::error::Error;
use std::io::{Cursor, Read, Result as IoResult, Write};
use tokio::io::{AsyncRead, AsyncReadExt};
use tokio_test_macros::tokio_test;
use tokio_util::io::SyncIoBridge;

async fn test_reader_len(
Expand All @@ -21,15 +22,15 @@ async fn test_reader_len(
Ok(())
}

#[tokio::test]
#[tokio_test]
async fn test_async_read_to_sync() -> Result<(), Box<dyn Error>> {
test_reader_len(tokio::io::empty(), 0).await?;
let buf = b"hello world";
test_reader_len(Cursor::new(buf), buf.len()).await?;
Ok(())
}

#[tokio::test]
#[tokio_test]
async fn test_async_write_to_sync() -> Result<(), Box<dyn Error>> {
let mut dest = Vec::new();
let src = b"hello world";
Expand All @@ -43,7 +44,7 @@ async fn test_async_write_to_sync() -> Result<(), Box<dyn Error>> {
Ok(())
}

#[tokio::test]
#[tokio_test]
async fn test_into_inner() -> Result<(), Box<dyn Error>> {
let mut buf = Vec::new();
SyncIoBridge::new(tokio::io::empty())
Expand All @@ -55,7 +56,7 @@ async fn test_into_inner() -> Result<(), Box<dyn Error>> {
Ok(())
}

#[tokio::test]
#[tokio_test]
async fn test_shutdown() -> Result<(), Box<dyn Error>> {
let (s1, mut s2) = tokio::io::duplex(1024);
let (_rh, wh) = tokio::io::split(s1);
Expand Down
25 changes: 13 additions & 12 deletions tokio-util/tests/mpsc.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use futures::sink::SinkExt;
use tokio_test_macros::tokio_test;
use std::future::poll_fn;
use tokio::sync::mpsc::channel;
use tokio_test::task::spawn;
Expand All @@ -7,7 +8,7 @@ use tokio_test::{
};
use tokio_util::sync::PollSender;

#[tokio::test]
#[tokio_test(miri)]
async fn simple() {
let (send, mut recv) = channel(3);
let mut send = PollSender::new(send);
Expand All @@ -30,7 +31,7 @@ async fn simple() {
send.send_item(42).unwrap();
}

#[tokio::test]
#[tokio_test(miri)]
async fn simple_ref() {
let v = [1, 2, 3i32];

Expand All @@ -53,7 +54,7 @@ async fn simple_ref() {
send.send_item(&42).unwrap();
}

#[tokio::test]
#[tokio_test(miri)]
async fn repeated_poll_reserve() {
let (send, mut recv) = channel::<i32>(1);
let mut send = PollSender::new(send);
Expand All @@ -66,7 +67,7 @@ async fn repeated_poll_reserve() {
assert_eq!(recv.recv().await.unwrap(), 1);
}

#[tokio::test]
#[tokio_test(miri)]
async fn abort_send() {
let (send, mut recv) = channel(3);
let mut send = PollSender::new(send);
Expand Down Expand Up @@ -95,7 +96,7 @@ async fn abort_send() {
assert_eq!(recv.recv().await.unwrap(), 5);
}

#[tokio::test]
#[tokio_test(miri)]
async fn close_sender_last() {
let (send, mut recv) = channel::<i32>(3);
let mut send = PollSender::new(send);
Expand All @@ -109,7 +110,7 @@ async fn close_sender_last() {
assert!(assert_ready!(recv_task.poll()).is_none());
}

#[tokio::test]
#[tokio_test(miri)]
async fn close_sender_not_last() {
let (send, mut recv) = channel::<i32>(3);
let mut send = PollSender::new(send);
Expand All @@ -129,7 +130,7 @@ async fn close_sender_not_last() {
assert!(assert_ready!(recv_task.poll()).is_none());
}

#[tokio::test]
#[tokio_test(miri)]
async fn close_sender_before_reserve() {
let (send, mut recv) = channel::<i32>(3);
let mut send = PollSender::new(send);
Expand All @@ -146,7 +147,7 @@ async fn close_sender_before_reserve() {
assert_ready_err!(reserve.poll());
}

#[tokio::test]
#[tokio_test(miri)]
async fn close_sender_after_pending_reserve() {
let (send, mut recv) = channel::<i32>(1);
let mut send = PollSender::new(send);
Expand All @@ -171,7 +172,7 @@ async fn close_sender_after_pending_reserve() {
assert_ready_err!(reserve.poll());
}

#[tokio::test]
#[tokio_test(miri)]
async fn close_sender_after_successful_reserve() {
let (send, mut recv) = channel::<i32>(3);
let mut send = PollSender::new(send);
Expand All @@ -192,7 +193,7 @@ async fn close_sender_after_successful_reserve() {
assert_ready_ok!(reserve.poll());
}

#[tokio::test]
#[tokio_test(miri)]
async fn abort_send_after_pending_reserve() {
let (send, mut recv) = channel::<i32>(1);
let mut send = PollSender::new(send);
Expand All @@ -214,7 +215,7 @@ async fn abort_send_after_pending_reserve() {
assert_eq!(send.get_ref().unwrap().capacity(), 0);
}

#[tokio::test]
#[tokio_test(miri)]
async fn abort_send_after_successful_reserve() {
let (send, mut recv) = channel::<i32>(1);
let mut send = PollSender::new(send);
Expand All @@ -231,7 +232,7 @@ async fn abort_send_after_successful_reserve() {
assert_eq!(send.get_ref().unwrap().capacity(), 1);
}

#[tokio::test]
#[tokio_test(miri)]
async fn closed_when_receiver_drops() {
let (send, _) = channel::<i32>(1);
let mut send = PollSender::new(send);
Expand Down
Loading

0 comments on commit dd5d065

Please sign in to comment.