Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lib: add register_buffers_* family of functions #217

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions io-uring-test/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ fn test<S: squeue::EntryMarker, C: cqueue::EntryMarker>(

// register
tests::register::test_register_files_sparse(&mut ring, &test)?;
tests::register_buffers::test_register_buffers(&mut ring, &test)?;
tests::register_buffers::test_register_buffers_update_tag(&mut ring, &test)?;
tests::register_buf_ring::test_register_buf_ring(&mut ring, &test)?;

// fs
Expand Down
1 change: 1 addition & 0 deletions io-uring-test/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@ pub mod poll;
pub mod queue;
pub mod register;
pub mod register_buf_ring;
pub mod register_buffers;
pub mod regression;
pub mod timeout;
248 changes: 248 additions & 0 deletions io-uring-test/src/tests/register_buffers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
use std::{
fs::File,
io::{self, Error, Write},
os::fd::{AsRawFd, FromRawFd},
};

use crate::Test;
use io_uring::{cqueue, opcode, squeue, types, IoUring};
use libc::EFAULT;

pub fn test_register_buffers<S: squeue::EntryMarker, C: cqueue::EntryMarker>(
ring: &mut IoUring<S, C>,
test: &Test,
) -> anyhow::Result<()> {
let mut buf = [0xde, 0xed, 0xbe, 0xef];
let tags = [0];
let iovecs = [libc::iovec {
iov_len: buf.len(),
iov_base: buf.as_mut_ptr() as _,
}];

test_register(ring, test, "register_buffers", None, |ring| unsafe {
ring.submitter().register_buffers(&iovecs)
})?;
test_register(
ring,
test,
"register_buffers_tags",
ring.params().is_feature_resource_tagging(),
|ring| unsafe { ring.submitter().register_buffers_tags(&iovecs, &tags) },
)?;
test_register(
ring,
test,
"register_buffers_sparse",
ring.params().is_feature_resource_tagging(),
|ring| ring.submitter().register_buffers_sparse(4),
)?;

return Ok(());

fn test_register<
S: squeue::EntryMarker,
C: cqueue::EntryMarker,
F: FnMut(&mut IoUring<S, C>) -> io::Result<()>,
P: Into<Option<bool>>,
>(
ring: &mut IoUring<S, C>,
test: &Test,
name: &str,
probe: P,
mut register: F,
) -> anyhow::Result<()> {
require!(
test;
test.probe.is_supported(opcode::ReadFixed::CODE);
probe.into().unwrap_or(true);
);

println!("test {name}");

ring.submitter().unregister_buffers().err().ok_or_else(|| {
anyhow::anyhow!("unregister_buffers should fail if not buffer table has been setup")
})?;

register(ring).map_err(|e| anyhow::anyhow!("{name} failed: {e}"))?;

// See that same call again, with any value, will fail because a direct table cannot be built
// over an existing one.
register(ring)
.err()
.ok_or_else(|| anyhow::anyhow!("{name} should not have succeeded twice in a row"))?;

// See that the direct table can be removed.
ring.submitter()
.unregister_buffers()
.map_err(|e| anyhow::anyhow!("unregister_buffers failed: {e}"))?;

// See that a second attempt to remove the direct table would fail.
ring.submitter().unregister_buffers().err().ok_or_else(|| {
anyhow::anyhow!("unregister_buffers should not have succeeded twice in a row")
})?;

Ok(())
}
}

const BUFFER_TAG: u64 = 0xbadcafe;
const TIMEOUT_TAG: u64 = 0xbadf00d;

pub fn test_register_buffers_update_tag<S: squeue::EntryMarker, C: cqueue::EntryMarker>(
ring: &mut IoUring<S, C>,
test: &Test,
) -> anyhow::Result<()> {
require!(
test;
test.probe.is_supported(opcode::ReadFixed::CODE);
ring.params().is_feature_resource_tagging();
);

println!("test register_buffers_update_tag");

let (read, mut write) = create_pipe()?;
let mut buf = [0xde, 0xed, 0xbe, 0xef];
let mut tags = [BUFFER_TAG];
let iovecs = [libc::iovec {
iov_len: buf.len(),
iov_base: buf.as_mut_ptr() as _,
}];
let timeout = types::Timespec::new().nsec(50 * 1_000_000);
let timeout = opcode::Timeout::new(&timeout as _)
.build()
.user_data(TIMEOUT_TAG)
.into();
let read_sqe = opcode::ReadFixed::new(
types::Fd(read.as_raw_fd()),
buf.as_mut_ptr(),
buf.len() as _,
5,
)
.build()
.user_data(42)
.into();

// Register a buffer table and then immediately unregister it
ring.submitter().register_buffers_sparse(1)?;
ring.submitter().unregister_buffers()?;

// Push a timeout of 50ms
unsafe { ring.submission().push(&timeout).unwrap() };

// We should not receive any other entries than the timeout
check_only_timeout(ring)?;

// Register a sparse buffer table of 10 elements
ring.submitter().register_buffers_sparse(10)?;

// Try read the pipe using a sparse buffer
let cqe = {
write.write("yo".as_bytes())?;

unsafe { ring.submission().push(&read_sqe).unwrap() };

ring.submit_and_wait(1)?;

ring.completion().next().unwrap().into()
};

// We should get the correct user_data
if cqe.user_data() != 42 {
return Err(anyhow::anyhow!("unexpected completion queue entry"));
}

// EFAULT is to be expected with incorrect fixed buffers
if cqe.result() != -EFAULT {
return Err(anyhow::anyhow!("unexpected read result: {}", cqe.result()));
}

// Register a buffer at the index 5
unsafe {
ring.submitter()
.register_buffers_update_tag(5, &iovecs, &tags)?;

// Push a timeout of 50ms
ring.submission().push(&timeout).unwrap();
}

// We should not receive any other entries than the timeout
check_only_timeout(ring)?;

// Register a buffer at the same index 5, but this time with an empty tag.
let cqe = {
tags[0] = 0;

unsafe {
ring.submitter()
.register_buffers_update_tag(5, &iovecs, &tags)?;
}
ring.submit_and_wait(1)?;

ring.completion().next().unwrap().into()
};

// We should get a cqe with the first tag because we registered a
// new buffer at an index where a buffer was already registered.
if cqe.user_data() != BUFFER_TAG {
return Err(anyhow::anyhow!(
"expected completion queue to contain a buffer unregistered event"
));
}

// Try reading now that the buffer is registered at index 5
let cqe = {
unsafe {
ring.submission().push(&read_sqe).unwrap();
}

ring.submit_and_wait(1)?;

ring.completion().next().unwrap().into()
};

// We should get the correct user_data
if cqe.user_data() != 42 {
return Err(anyhow::anyhow!("unexpected completion queue entry"));
}

// We should read exactly two bytes
if cqe.result() != 2 {
return Err(anyhow::anyhow!("unexpected read result: {}", cqe.result()));
}

// The first two bytes of `buf` should be "yo"
if &buf[0..2] != "yo".as_bytes() {
return Err(anyhow::anyhow!("unexpected read buffer data: {:x?}", &buf));
}

return Ok(());
}

/// Create a pipe and return both ends as RAII `File` handles
fn create_pipe() -> io::Result<(File, File)> {
let mut fds = [-1, -1];

unsafe {
if libc::pipe(fds.as_mut_ptr()) == -1 {
Err(Error::last_os_error())
} else {
Ok((File::from_raw_fd(fds[0]), File::from_raw_fd(fds[1])))
}
}
}

/// Submit sqes and asserts the only cqe is a timeout entry
fn check_only_timeout<S: squeue::EntryMarker, C: cqueue::EntryMarker>(
ring: &mut IoUring<S, C>,
) -> Result<(), anyhow::Error> {
ring.submit_and_wait(1)?;

if Into::<cqueue::Entry>::into(ring.completion().next().unwrap()).user_data() == TIMEOUT_TAG {
// There should not be any more entries in the queue
if ring.completion().next().is_none() {
return Ok(());
}
}

Err(anyhow::anyhow!("unexpected completion queue entry"))
}
106 changes: 106 additions & 0 deletions src/submit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,110 @@ impl<'a> Submitter<'a> {
.map(drop)
}

/// Update a range of fixed buffers starting at `off`.
///
/// This is required to use buffers registered using
/// [`register_buffers_sparse`](Self::register_buffers_sparse),
/// although it can be also be used with [`register_buffers`](Self::register_buffers).
///
/// See [`register_buffers_tags`](Self::register_buffers_tags)
/// for more information about resource tagging.
///
/// Available since Linux 5.13.
///
/// # Safety
///
/// Developers must ensure that the `iov_base` and `iov_len` values are valid and will
/// be valid until buffers are unregistered or the ring destroyed, otherwise undefined
/// behaviour may occur.
pub unsafe fn register_buffers_update_tag(
&self,
off: u32,
bufs: &[libc::iovec],
tags: &[u64],
) -> io::Result<()> {
let rr = sys::io_uring_rsrc_update2 {
nr: bufs.len().min(tags.len()) as _,
data: bufs.as_ptr() as _,
tags: tags.as_ptr() as _,
offset: off,
..Default::default()
};
let rr = cast_ptr::<sys::io_uring_rsrc_update2>(&rr);
execute(
self.fd.as_raw_fd(),
sys::IORING_REGISTER_BUFFERS_UPDATE,
rr as *const _,
std::mem::size_of::<sys::io_uring_rsrc_update2>() as _,
)
.map(drop)
}

/// Variant of [`register_buffers`](Self::register_buffers)
/// with resource tagging.
///
/// `tags` should be the same length as `bufs` and contain the
/// tag value corresponding to the buffer at the same index.
///
/// If a tag is zero, then tagging for this particular resource
/// (a buffer in this case) is disabled. Otherwise, after the
/// resource had been unregistered and it's not used anymore,
/// a CQE will be posted with `user_data` set to the specified
/// tag and all other fields zeroed.
///
/// Available since Linux 5.13.
///
/// # Safety
///
/// Developers must ensure that the `iov_base` and `iov_len` values are valid and will
/// be valid until buffers are unregistered or the ring destroyed, otherwise undefined
/// behaviour may occur.
pub unsafe fn register_buffers_tags(
&self,
bufs: &[libc::iovec],
tags: &[u64],
) -> io::Result<()> {
let rr = sys::io_uring_rsrc_register {
nr: bufs.len().min(tags.len()) as _,
data: bufs.as_ptr() as _,
tags: tags.as_ptr() as _,
..Default::default()
};
let rr = cast_ptr::<sys::io_uring_rsrc_register>(&rr);
execute(
self.fd.as_raw_fd(),
sys::IORING_REGISTER_BUFFERS2,
rr as *const _,
std::mem::size_of::<sys::io_uring_rsrc_register>() as _,
)
.map(drop)
}

/// Registers an empty table of nr fixed buffers buffers.
///
/// These must be updated before use, using eg.
/// [`register_buffers_update_tag`](Self::register_buffers_update_tag).
///
/// See [`register_buffers`](Self::register_buffers)
/// for more information about fixed buffers.
///
/// Available since Linux 5.13.
pub fn register_buffers_sparse(&self, nr: u32) -> io::Result<()> {
let rr = sys::io_uring_rsrc_register {
nr,
flags: sys::IORING_RSRC_REGISTER_SPARSE,
..Default::default()
};
let rr = cast_ptr::<sys::io_uring_rsrc_register>(&rr);
execute(
self.fd.as_raw_fd(),
sys::IORING_REGISTER_BUFFERS2,
rr as *const _,
std::mem::size_of::<sys::io_uring_rsrc_register>() as _,
)
.map(drop)
}

/// Registers an empty file table of nr_files number of file descriptors. The sparse variant is
/// available in kernels 5.19 and later.
///
Expand Down Expand Up @@ -326,6 +430,8 @@ impl<'a> Submitter<'a> {
///
/// You do not need to explicitly call this before dropping the [`IoUring`](crate::IoUring), as
/// it will be cleaned up by the kernel automatically.
///
/// Available since Linux 5.1.
pub fn unregister_buffers(&self) -> io::Result<()> {
execute(
self.fd.as_raw_fd(),
Expand Down