Skip to content

Commit

Permalink
fix: properly close fd when return-fd op cancelled failed (#318)
Browse files Browse the repository at this point in the history
* fix: properly close fd when return-fd op cancelled failed

* fix feature

* test

* fix clippy

* remove useless cfg feature

* remove syscall_u32 and add doc comment for syscall macro and MaybeFd
  • Loading branch information
ihciah authored Nov 6, 2024
1 parent 90c9c3e commit 704e7d5
Show file tree
Hide file tree
Showing 40 changed files with 580 additions and 343 deletions.
1 change: 1 addition & 0 deletions monoio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ tempfile = "3.2"
# use nightly only feature flags
unstable = []
# async-cancel will push a async-cancel entry into sq when op is canceled
# strongly recommend to enable this feature
async-cancel = []
# enanle zero copy(enable SOCK_ZEROCOPY + MSG_ZEROCOPY flag)
# WARNING: this feature may cause performance degradation
Expand Down
15 changes: 4 additions & 11 deletions monoio/src/driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,20 +156,13 @@ impl Inner {
}
}

#[allow(unused)]
fn drop_op<T: 'static>(&self, index: usize, data: &mut Option<T>) {
#[cfg(all(target_os = "linux", feature = "iouring"))]
#[inline]
fn drop_op<T: 'static>(&self, index: usize, data: &mut Option<T>, skip_cancel: bool) {
match self {
#[cfg(all(target_os = "linux", feature = "iouring"))]
Inner::Uring(this) => UringInner::drop_op(this, index, data),
Inner::Uring(this) => UringInner::drop_op(this, index, data, skip_cancel),
#[cfg(feature = "legacy")]
Inner::Legacy(_) => {}
#[cfg(all(
not(feature = "legacy"),
not(all(target_os = "linux", feature = "iouring"))
))]
_ => {
util::feature_panic();
}
}
}

Expand Down
114 changes: 92 additions & 22 deletions monoio/src/driver/op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ mod symlink;
mod splice;

/// In-flight operation
pub(crate) struct Op<T: 'static> {
pub(crate) struct Op<T: 'static + OpAble> {
// Driver running the operation
pub(super) driver: driver::Inner,

Expand All @@ -58,19 +58,98 @@ pub(crate) struct Completion<T> {
/// Operation completion meta info.
#[derive(Debug)]
pub(crate) struct CompletionMeta {
pub(crate) result: io::Result<u32>,
pub(crate) result: io::Result<MaybeFd>,
#[allow(unused)]
pub(crate) flags: u32,
}

/// MaybeFd is a wrapper for fd or a normal number. If it is marked as fd, it will close the fd when
/// dropped.
/// Use `into_inner` to take the inner fd or number and skip the drop.
///
/// This wrapper is designed to be used in the syscall return value. It can prevent fd leak when the
/// operation is cancelled.
#[derive(Debug)]
pub(crate) struct MaybeFd {
is_fd: bool,
fd: u32,
}

impl MaybeFd {
#[inline]
pub(crate) unsafe fn new_result(fdr: io::Result<u32>, is_fd: bool) -> io::Result<Self> {
fdr.map(|fd| Self { is_fd, fd })
}

#[inline]
pub(crate) unsafe fn new_fd_result(fdr: io::Result<u32>) -> io::Result<Self> {
fdr.map(|fd| Self { is_fd: true, fd })
}

#[inline]
pub(crate) fn new_non_fd_result(fdr: io::Result<u32>) -> io::Result<Self> {
fdr.map(|fd| Self { is_fd: false, fd })
}

#[inline]
pub(crate) const unsafe fn new_fd(fd: u32) -> Self {
Self { is_fd: true, fd }
}

#[inline]
pub(crate) const fn new_non_fd(fd: u32) -> Self {
Self { is_fd: false, fd }
}

#[inline]
pub(crate) const fn into_inner(self) -> u32 {
let fd = self.fd;
std::mem::forget(self);
fd
}

#[inline]
pub(crate) const fn zero() -> Self {
Self {
is_fd: false,
fd: 0,
}
}

#[inline]
pub(crate) fn fd(&self) -> u32 {
self.fd
}
}

impl Drop for MaybeFd {
fn drop(&mut self) {
// The fd close only executed when:
// 1. the operation is cancelled
// 2. the cancellation failed
// 3. the returned result is a fd
// So this is a relatively cold path. For simplicity, we just do a close syscall here
// instead of pushing close op.
if self.is_fd {
unsafe {
libc::close(self.fd as libc::c_int);
}
}
}
}

pub(crate) trait OpAble {
#[cfg(all(target_os = "linux", feature = "iouring"))]
const RET_IS_FD: bool = false;
#[cfg(all(target_os = "linux", feature = "iouring"))]
const SKIP_CANCEL: bool = false;
#[cfg(all(target_os = "linux", feature = "iouring"))]
fn uring_op(&mut self) -> io_uring::squeue::Entry;

#[cfg(any(feature = "legacy", feature = "poll-io"))]
fn legacy_interest(&self) -> Option<(super::ready::Direction, usize)>;
#[cfg(any(feature = "legacy", feature = "poll-io"))]
fn legacy_call(&mut self) -> io::Result<u32>;
fn legacy_call(&mut self) -> io::Result<MaybeFd>;
}

/// If legacy is enabled and iouring is not, we can expose io interface in a poll-like way.
Expand All @@ -85,10 +164,7 @@ pub(crate) trait PollLegacy {
}

#[cfg(any(feature = "legacy", feature = "poll-io"))]
impl<T> PollLegacy for T
where
T: OpAble,
{
impl<T: OpAble> PollLegacy for T {
#[cfg(feature = "legacy")]
#[inline]
fn poll_legacy(&mut self, _cx: &mut std::task::Context<'_>) -> std::task::Poll<CompletionMeta> {
Expand All @@ -113,35 +189,26 @@ where
}
}

impl<T> Op<T> {
impl<T: OpAble> Op<T> {
/// Submit an operation to uring.
///
/// `state` is stored during the operation tracking any state submitted to
/// the kernel.
pub(super) fn submit_with(data: T) -> io::Result<Op<T>>
where
T: OpAble,
{
pub(super) fn submit_with(data: T) -> io::Result<Op<T>> {
driver::CURRENT.with(|this| this.submit_with(data))
}

/// Try submitting an operation to uring
#[allow(unused)]
pub(super) fn try_submit_with(data: T) -> io::Result<Op<T>>
where
T: OpAble,
{
pub(super) fn try_submit_with(data: T) -> io::Result<Op<T>> {
if driver::CURRENT.is_set() {
Op::submit_with(data)
} else {
Err(io::ErrorKind::Other.into())
}
}

pub(crate) fn op_canceller(&self) -> OpCanceller
where
T: OpAble,
{
pub(crate) fn op_canceller(&self) -> OpCanceller {
#[cfg(feature = "legacy")]
if is_legacy() {
return if let Some((dir, id)) = self.data.as_ref().unwrap().legacy_interest() {
Expand Down Expand Up @@ -181,9 +248,12 @@ where
}
}

impl<T> Drop for Op<T> {
#[cfg(all(target_os = "linux", feature = "iouring"))]
impl<T: OpAble> Drop for Op<T> {
#[inline]
fn drop(&mut self) {
self.driver.drop_op(self.index, &mut self.data);
self.driver
.drop_op(self.index, &mut self.data, T::SKIP_CANCEL);
}
}

Expand Down
36 changes: 16 additions & 20 deletions monoio/src/driver/op/accept.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use std::{
use io_uring::{opcode, types};
#[cfg(windows)]
use {
crate::syscall,
std::os::windows::prelude::AsRawSocket,
windows_sys::Win32::Networking::WinSock::{
accept, socklen_t, INVALID_SOCKET, SOCKADDR_STORAGE,
Expand All @@ -18,9 +17,7 @@ use {

use super::{super::shared_fd::SharedFd, Op, OpAble};
#[cfg(any(feature = "legacy", feature = "poll-io"))]
use crate::driver::ready::Direction;
#[cfg(all(unix, any(feature = "legacy", feature = "poll-io")))]
use crate::syscall_u32;
use super::{driver::ready::Direction, MaybeFd};

/// Accept
pub(crate) struct Accept {
Expand Down Expand Up @@ -54,6 +51,9 @@ impl Op<Accept> {
}

impl OpAble for Accept {
#[cfg(all(target_os = "linux", feature = "iouring"))]
const RET_IS_FD: bool = true;

#[cfg(all(target_os = "linux", feature = "iouring"))]
fn uring_op(&mut self) -> io_uring::squeue::Entry {
opcode::Accept::new(
Expand All @@ -71,16 +71,16 @@ impl OpAble for Accept {
}

#[cfg(all(any(feature = "legacy", feature = "poll-io"), windows))]
fn legacy_call(&mut self) -> io::Result<u32> {
fn legacy_call(&mut self) -> io::Result<MaybeFd> {
let fd = self.fd.as_raw_socket();
let addr = self.addr.0.as_mut_ptr() as *mut _;
let len = &mut self.addr.1;

syscall!(accept(fd as _, addr, len), PartialEq::eq, INVALID_SOCKET)
crate::syscall!(accept@FD(fd as _, addr, len), PartialEq::eq, INVALID_SOCKET)
}

#[cfg(all(any(feature = "legacy", feature = "poll-io"), unix))]
fn legacy_call(&mut self) -> io::Result<u32> {
fn legacy_call(&mut self) -> io::Result<MaybeFd> {
let fd = self.fd.as_raw_fd();
let addr = self.addr.0.as_mut_ptr() as *mut _;
let len = &mut self.addr.1;
Expand All @@ -102,12 +102,10 @@ impl OpAble for Accept {
target_os = "netbsd",
target_os = "openbsd"
))]
return syscall_u32!(accept4(
fd,
addr,
len,
libc::SOCK_CLOEXEC | libc::SOCK_NONBLOCK,
));
return {
let flag = libc::SOCK_CLOEXEC | libc::SOCK_NONBLOCK;
crate::syscall!(accept4@FD(fd, addr, len, flag))
};

// But not all platforms have the `accept4(2)` call. Luckily BSD (derived)
// OSes inherit the non-blocking flag from the listener, so we just have to
Expand All @@ -119,13 +117,11 @@ impl OpAble for Accept {
target_os = "redox"
))]
return {
let stream_fd = syscall_u32!(accept(fd, addr, len))? as i32;
syscall_u32!(fcntl(stream_fd, libc::F_SETFD, libc::FD_CLOEXEC))
.and_then(|_| syscall_u32!(fcntl(stream_fd, libc::F_SETFL, libc::O_NONBLOCK)))
.inspect_err(|_| {
let _ = syscall_u32!(close(stream_fd));
})?;
Ok(stream_fd as _)
let stream_fd = crate::syscall!(accept@FD(fd, addr, len))?;
let fd = stream_fd.fd() as libc::c_int;
crate::syscall!(fcntl@RAW(fd, libc::F_SETFD, libc::FD_CLOEXEC))?;
crate::syscall!(fcntl@RAW(fd, libc::F_SETFL, libc::O_NONBLOCK))?;
Ok(stream_fd)
};
}
}
16 changes: 9 additions & 7 deletions monoio/src/driver/op/close.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ use std::os::unix::io::RawFd;
#[cfg(all(target_os = "linux", feature = "iouring"))]
use io_uring::{opcode, types};
#[cfg(windows)]
use {
crate::syscall, std::os::windows::io::RawSocket,
windows_sys::Win32::Networking::WinSock::closesocket,
};
use {std::os::windows::io::RawSocket, windows_sys::Win32::Networking::WinSock::closesocket};

#[cfg(any(feature = "legacy", feature = "poll-io"))]
use super::MaybeFd;
use super::{Op, OpAble};

pub(crate) struct Close {
Expand All @@ -33,6 +32,9 @@ impl Op<Close> {
}

impl OpAble for Close {
#[cfg(all(target_os = "linux", feature = "iouring"))]
const SKIP_CANCEL: bool = true;

#[cfg(all(target_os = "linux", feature = "iouring"))]
fn uring_op(&mut self) -> io_uring::squeue::Entry {
opcode::Close::new(types::Fd(self.fd)).build()
Expand All @@ -45,11 +47,11 @@ impl OpAble for Close {
}

#[cfg(any(feature = "legacy", feature = "poll-io"))]
fn legacy_call(&mut self) -> io::Result<u32> {
fn legacy_call(&mut self) -> io::Result<MaybeFd> {
#[cfg(unix)]
return crate::syscall_u32!(close(self.fd));
return crate::syscall!(close@NON_FD(self.fd));

#[cfg(windows)]
return syscall!(closesocket(self.fd as _), PartialEq::ne, 0);
return crate::syscall!(closesocket@NON_FD(self.fd as _), PartialEq::ne, 0);
}
}
Loading

0 comments on commit 704e7d5

Please sign in to comment.