Skip to content

Commit

Permalink
fix: drop mutex guard when DuplexStream I/O op is pending (#8)
Browse files Browse the repository at this point in the history
* fix: drop mutex guard when DuplexStream I/O op is pending

* chore: remove a println from test
  • Loading branch information
SteveLauC authored Oct 24, 2024
1 parent baf540a commit b2b333b
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 19 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ license = "MIT"
repository = "https://github.com/SteveLauC/monoio-duplex"

[dependencies]
async-lock = "3.4.0"
bytes = "1.7.2"
monoio = "0.2.4"
futures = "0.3.31"

[dev-dependencies]
tokio = { version = "1.41.0", features = ["sync"] }
117 changes: 99 additions & 18 deletions src/duplex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
use super::simplex::SimplexStream;

use async_lock::Mutex;
use futures::FutureExt;
use monoio::io::AsyncReadRent;
use monoio::io::AsyncWriteRent;
use std::rc::Rc;
use std::sync::Mutex;

/// Create a new pair of `DuplexStream`s that act like a pair of connected sockets.
///
Expand Down Expand Up @@ -78,48 +79,90 @@ pub struct DuplexStream {

impl Drop for DuplexStream {
fn drop(&mut self) {
futures::executor::block_on(async {
// notify the other side of the closure
self.write.lock().await.close_write();
self.read.lock().await.close_read();
})
// notify the other side of the closure
self.write.lock().unwrap().close_write();
self.read.lock().unwrap().close_read();
}
}

/// A helper macro to `.await` the I/O function, used in our I/O traits
/// implementations.
///
/// Different from a plain `function().await`, it drops the lock guard if future
/// `function()` is pending.
macro_rules! await_io_future {
// For functions that do not have arugments: flush/shutdown
($trait:ident, $function:ident, $guard:expr) => {{
let opt_read_ready = <SimplexStream as $trait>::$function(&mut *$guard).now_or_never();

match opt_read_ready {
Some(result) => result,
None => {
// drop the Mutex guard or it could deadlock
// https://github.com/SteveLauC/monoio-duplex/issues/7
drop($guard);
std::future::pending().await
}
}
}};

// For functions with a `buf` arugment: read/readv/write/writev
($trait:ident, $function:ident, $guard:expr, $buf:expr) => {{
let opt_read_ready =
<SimplexStream as $trait>::$function(&mut *$guard, $buf).now_or_never();

match opt_read_ready {
Some(result) => result,
None => {
// drop the Mutex guard or it could deadlock
// https://github.com/SteveLauC/monoio-duplex/issues/7
drop($guard);
std::future::pending().await
}
}
}};
}

impl AsyncReadRent for DuplexStream {
#[allow(clippy::await_holding_lock)] // false-positive, we explicitly dropped the guard before the await point.
async fn read<T: monoio::buf::IoBufMut>(&mut self, buf: T) -> monoio::BufResult<usize, T> {
let mut read_simplex = self.read.lock().await;
<SimplexStream as AsyncReadRent>::read(&mut *read_simplex, buf).await
let mut read_simplex = self.read.lock().unwrap();
await_io_future!(AsyncReadRent, read, read_simplex, buf)
}

#[allow(clippy::await_holding_lock)] // false-positive, we explicitly dropped the guard before the await point.
async fn readv<T: monoio::buf::IoVecBufMut>(&mut self, buf: T) -> monoio::BufResult<usize, T> {
let mut read_simplex = self.read.lock().await;
<SimplexStream as AsyncReadRent>::readv(&mut *read_simplex, buf).await
let mut read_simplex = self.read.lock().unwrap();
await_io_future!(AsyncReadRent, readv, read_simplex, buf)
}
}

impl AsyncWriteRent for DuplexStream {
#[allow(clippy::await_holding_lock)] // false-positive, we explicitly dropped the guard before the await point.
async fn write<T: monoio::buf::IoBuf>(&mut self, buf: T) -> monoio::BufResult<usize, T> {
let mut write_simplex = self.write.lock().await;
<SimplexStream as AsyncWriteRent>::write(&mut *write_simplex, buf).await
let mut write_simplex = self.write.lock().unwrap();
await_io_future!(AsyncWriteRent, write, write_simplex, buf)
}

#[allow(clippy::await_holding_lock)] // false-positive, we explicitly dropped the guard before the await point.
async fn writev<T: monoio::buf::IoVecBuf>(
&mut self,
buf_vec: T,
) -> monoio::BufResult<usize, T> {
let mut write_simplex = self.write.lock().await;
<SimplexStream as AsyncWriteRent>::writev(&mut *write_simplex, buf_vec).await
let mut write_simplex = self.write.lock().unwrap();
await_io_future!(AsyncWriteRent, writev, write_simplex, buf_vec)
}

#[allow(clippy::await_holding_lock)] // false-positive, we explicitly dropped the guard before the await point.
async fn flush(&mut self) -> std::io::Result<()> {
let mut write_simplex = self.write.lock().await;
<SimplexStream as AsyncWriteRent>::flush(&mut *write_simplex).await
let mut write_simplex = self.write.lock().unwrap();
await_io_future!(AsyncWriteRent, flush, write_simplex)
}

#[allow(clippy::await_holding_lock)] // false-positive, we explicitly dropped the guard before the await point.
async fn shutdown(&mut self) -> std::io::Result<()> {
let mut write_simplex = self.write.lock().await;
<SimplexStream as AsyncWriteRent>::shutdown(&mut *write_simplex).await
let mut write_simplex = self.write.lock().unwrap();
await_io_future!(AsyncWriteRent, shutdown, write_simplex)
}
}

Expand Down Expand Up @@ -180,4 +223,42 @@ mod tests {
std::io::ErrorKind::BrokenPipe
);
}

#[monoio::test(enable_timer = true)]
async fn pending_read_will_not_hold_mutex_gaurd() {
let (mut client, server) = duplex(100);
let (tx, rx) = tokio::sync::oneshot::channel::<()>();

monoio::spawn(async move {
// send task start signal
tx.send(()).unwrap();
// this line should block
let (_result, _buf) = client.read(vec![0_u8; 10]).await;
});

rx.await.unwrap();

drop(server);
}

#[monoio::test(enable_timer = true)]
async fn pending_write_will_not_hold_mutex_gaurd() {
let (mut client, server) = duplex(10);
let (tx, rx) = tokio::sync::oneshot::channel::<()>();

// write 10 bytes to make future writes pending
let (write_result, _buf) = client.write(vec![0_u8; 10]).await;
assert_eq!(write_result.unwrap(), 10);

monoio::spawn(async move {
// send task start signal
tx.send(()).unwrap();
// this line should block
let (_result, _buf) = client.write(vec![0_u8; 10]).await;
});

rx.await.unwrap();

drop(server);
}
}

0 comments on commit b2b333b

Please sign in to comment.