Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: drop mutex guard when DuplexStream I/O op is pending #8

Merged
merged 2 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ license = "MIT"
repository = "https://github.com/SteveLauC/monoio-duplex"

[dependencies]
async-lock = "3.4.0"
bytes = "1.7.2"
monoio = "0.2.4"
futures = "0.3.31"

[dev-dependencies]
tokio = { version = "1.41.0", features = ["sync"] }
117 changes: 99 additions & 18 deletions src/duplex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@

use super::simplex::SimplexStream;

use async_lock::Mutex;
use futures::FutureExt;
use monoio::io::AsyncReadRent;
use monoio::io::AsyncWriteRent;
use std::rc::Rc;
use std::sync::Mutex;

/// Create a new pair of `DuplexStream`s that act like a pair of connected sockets.
///
Expand Down Expand Up @@ -78,48 +79,90 @@ pub struct DuplexStream {

impl Drop for DuplexStream {
fn drop(&mut self) {
futures::executor::block_on(async {
// notify the other side of the closure
self.write.lock().await.close_write();
self.read.lock().await.close_read();
})
// notify the other side of the closure
self.write.lock().unwrap().close_write();
self.read.lock().unwrap().close_read();
}
}

/// A helper macro to `.await` the I/O function, used in our I/O traits
/// implementations.
///
/// Different from a plain `function().await`, it drops the lock guard if future
/// `function()` is pending.
macro_rules! await_io_future {
// For functions that do not have arugments: flush/shutdown
($trait:ident, $function:ident, $guard:expr) => {{
let opt_read_ready = <SimplexStream as $trait>::$function(&mut *$guard).now_or_never();

match opt_read_ready {
Some(result) => result,
None => {
// drop the Mutex guard or it could deadlock
// https://github.com/SteveLauC/monoio-duplex/issues/7
drop($guard);
std::future::pending().await
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Future polls to this future will always return Pending due to this line, the state of this future has been changed, which should never be done!

}
}
}};

// For functions with a `buf` arugment: read/readv/write/writev
($trait:ident, $function:ident, $guard:expr, $buf:expr) => {{
let opt_read_ready =
<SimplexStream as $trait>::$function(&mut *$guard, $buf).now_or_never();

match opt_read_ready {
Some(result) => result,
None => {
// drop the Mutex guard or it could deadlock
// https://github.com/SteveLauC/monoio-duplex/issues/7
drop($guard);
std::future::pending().await
}
}
}};
}

impl AsyncReadRent for DuplexStream {
#[allow(clippy::await_holding_lock)] // false-positive, we explicitly dropped the guard before the await point.
async fn read<T: monoio::buf::IoBufMut>(&mut self, buf: T) -> monoio::BufResult<usize, T> {
let mut read_simplex = self.read.lock().await;
<SimplexStream as AsyncReadRent>::read(&mut *read_simplex, buf).await
let mut read_simplex = self.read.lock().unwrap();
await_io_future!(AsyncReadRent, read, read_simplex, buf)
}

#[allow(clippy::await_holding_lock)] // false-positive, we explicitly dropped the guard before the await point.
async fn readv<T: monoio::buf::IoVecBufMut>(&mut self, buf: T) -> monoio::BufResult<usize, T> {
let mut read_simplex = self.read.lock().await;
<SimplexStream as AsyncReadRent>::readv(&mut *read_simplex, buf).await
let mut read_simplex = self.read.lock().unwrap();
await_io_future!(AsyncReadRent, readv, read_simplex, buf)
}
}

impl AsyncWriteRent for DuplexStream {
#[allow(clippy::await_holding_lock)] // false-positive, we explicitly dropped the guard before the await point.
async fn write<T: monoio::buf::IoBuf>(&mut self, buf: T) -> monoio::BufResult<usize, T> {
let mut write_simplex = self.write.lock().await;
<SimplexStream as AsyncWriteRent>::write(&mut *write_simplex, buf).await
let mut write_simplex = self.write.lock().unwrap();
await_io_future!(AsyncWriteRent, write, write_simplex, buf)
}

#[allow(clippy::await_holding_lock)] // false-positive, we explicitly dropped the guard before the await point.
async fn writev<T: monoio::buf::IoVecBuf>(
&mut self,
buf_vec: T,
) -> monoio::BufResult<usize, T> {
let mut write_simplex = self.write.lock().await;
<SimplexStream as AsyncWriteRent>::writev(&mut *write_simplex, buf_vec).await
let mut write_simplex = self.write.lock().unwrap();
await_io_future!(AsyncWriteRent, writev, write_simplex, buf_vec)
}

#[allow(clippy::await_holding_lock)] // false-positive, we explicitly dropped the guard before the await point.
async fn flush(&mut self) -> std::io::Result<()> {
let mut write_simplex = self.write.lock().await;
<SimplexStream as AsyncWriteRent>::flush(&mut *write_simplex).await
let mut write_simplex = self.write.lock().unwrap();
await_io_future!(AsyncWriteRent, flush, write_simplex)
}

#[allow(clippy::await_holding_lock)] // false-positive, we explicitly dropped the guard before the await point.
async fn shutdown(&mut self) -> std::io::Result<()> {
let mut write_simplex = self.write.lock().await;
<SimplexStream as AsyncWriteRent>::shutdown(&mut *write_simplex).await
let mut write_simplex = self.write.lock().unwrap();
await_io_future!(AsyncWriteRent, shutdown, write_simplex)
}
}

Expand Down Expand Up @@ -180,4 +223,42 @@ mod tests {
std::io::ErrorKind::BrokenPipe
);
}

#[monoio::test(enable_timer = true)]
async fn pending_read_will_not_hold_mutex_gaurd() {
let (mut client, server) = duplex(100);
let (tx, rx) = tokio::sync::oneshot::channel::<()>();

monoio::spawn(async move {
// send task start signal
tx.send(()).unwrap();
// this line should block
let (_result, _buf) = client.read(vec![0_u8; 10]).await;
});

rx.await.unwrap();

drop(server);
}

#[monoio::test(enable_timer = true)]
async fn pending_write_will_not_hold_mutex_gaurd() {
let (mut client, server) = duplex(10);
let (tx, rx) = tokio::sync::oneshot::channel::<()>();

// write 10 bytes to make future writes pending
let (write_result, _buf) = client.write(vec![0_u8; 10]).await;
assert_eq!(write_result.unwrap(), 10);

monoio::spawn(async move {
// send task start signal
tx.send(()).unwrap();
// this line should block
let (_result, _buf) = client.write(vec![0_u8; 10]).await;
});

rx.await.unwrap();

drop(server);
}
}