Skip to content

Commit

Permalink
Add TimedReadAwaitable for timed read operation
Browse files Browse the repository at this point in the history
  • Loading branch information
longhao-li committed Sep 25, 2024
1 parent 72b74f0 commit 4fb5930
Show file tree
Hide file tree
Showing 3 changed files with 163 additions and 3 deletions.
110 changes: 110 additions & 0 deletions include/nyaio/task.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1418,6 +1418,116 @@ class ReadAwaitable {
std::uint64_t m_offset;
};

/// @class TimedReadAwaitable
/// @brief
/// Awaitable object for async read operation with timeout support.
class TimedReadAwaitable {
public:
using Self = TimedReadAwaitable;

/// @brief
/// Create a new @c TimedReadAwaitable for async read operation with timeout support.
/// @param file
/// File descriptor to be read from.
/// @param[out] buffer
/// Pointer to start of the buffer to store the read data.
/// @param size
/// Expected size in byte of data to be read.
/// @param offset
/// Offset in byte of the file to start reading. Pass -1 to read from current file pointer.
/// For file descriptors that do not support random access, this value should be 0.
template <class Rep, class Period>
requires(std::ratio_less_equal_v<std::nano, Period>)
TimedReadAwaitable(int file, void *buffer, std::uint32_t size, std::uint64_t offset,
std::chrono::duration<Rep, Period> timeout) noexcept
: m_promise(), m_file(file), m_size(size), m_buffer(buffer), m_offset(offset), m_timeout() {
auto nano = std::chrono::duration_cast<std::chrono::nanoseconds>(timeout).count();
if (nano <= 0)
return;

m_timeout.tv_sec = static_cast<std::uint64_t>(nano) / 1000000000ULL;
m_timeout.tv_nsec = static_cast<std::uint64_t>(nano) % 1000000000ULL;
}

/// @brief
/// C++20 coroutine API method. Always execute @c await_suspend().
/// @return
/// This function always returns @c false.
[[nodiscard]]
static constexpr auto await_ready() noexcept -> bool {
return false;
}

/// @brief
/// Prepare for async read operation and suspend the coroutine.
/// @tparam T
/// Type of promise of current coroutine.
/// @param coro
/// Current coroutine handle.
template <class T>
requires(std::is_base_of_v<detail::PromiseBase, T>)
auto await_suspend(std::coroutine_handle<T> coro) noexcept -> void {
auto &promise = static_cast<detail::PromiseBase &>(coro.promise());
m_promise = &promise;
auto *worker = promise.worker;

// Prepare for reading.
io_uring_sqe *sqe = worker->pollSubmissionQueueEntry();
while (sqe == nullptr) [[unlikely]] {
worker->submit();
sqe = worker->pollSubmissionQueueEntry();
}

sqe->opcode = IORING_OP_READ;
sqe->fd = m_file;
sqe->off = m_offset;
sqe->addr = reinterpret_cast<std::uintptr_t>(m_buffer);
sqe->len = m_size;
sqe->user_data = reinterpret_cast<std::uintptr_t>(&promise);

// Prepare for timeout.
if (m_timeout.tv_sec != 0 || m_timeout.tv_nsec != 0) {
io_uring_sqe *timeSqe = worker->pollSubmissionQueueEntry();
while (timeSqe == nullptr) [[unlikely]] {
worker->submit();
timeSqe = worker->pollSubmissionQueueEntry();
}

sqe->flags = IOSQE_IO_LINK;

// Prepare timeout event.
timeSqe->opcode = IORING_OP_LINK_TIMEOUT;
timeSqe->fd = -1;
timeSqe->addr = reinterpret_cast<std::uintptr_t>(&m_timeout);
timeSqe->len = 1;
timeSqe->user_data = 0;
timeSqe->timeout_flags = 0;
}

worker->flushSubmissionQueue();
}

/// @brief
/// Resume this coroutine and get result of the async read operation.
/// @return
/// A struct that contains number of bytes read and an error code. The error code is
/// @c std::errc{} if succeeded and the number of bytes read is valid. The return value is
/// @c std::errc::operation_canceled if timeout occured.
auto await_resume() const noexcept -> SystemIoResult {
if (m_promise->ioResult < 0) [[unlikely]]
return {0, std::errc{-m_promise->ioResult}};
return {static_cast<std::uint32_t>(m_promise->ioResult), std::errc{}};
}

private:
detail::PromiseBase *m_promise;
int m_file;
std::uint32_t m_size;
void *m_buffer;
std::uint64_t m_offset;
__kernel_timespec m_timeout;
};

/// @class WriteAwaitable
/// @brief
/// Awaitable object for async write operation.
Expand Down
11 changes: 8 additions & 3 deletions src/nyaio/task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -398,15 +398,20 @@ auto IoContextWorker::run() noexcept -> void {
do {
auto *p = reinterpret_cast<PromiseBase *>(static_cast<std::uintptr_t>(cqe->user_data));

// p may be null for linked timeout event.
if (p == nullptr) [[unlikely]] {
consumeCompletionQueueEntries(1);
cqe = pollCompletionQueueEntry();
continue;
}

p->ioFlags = cqe->flags;
p->ioResult = cqe->res;

// It is safe to mark current cqe as consumed.
consumeCompletionQueueEntries(1);

if (p->isCancelled()) [[unlikely]] {
p->release();
} else {
if (!p->isCancelled()) [[likely]] {
auto &stack = p->stackBottomPromise();
p->coroutine().resume();
if (stack.coroutine().done())
Expand Down
45 changes: 45 additions & 0 deletions test/task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,51 @@ TEST_CASE("[task] ReadAwaitable") {

namespace {

auto timedReadAwaitableTask(IoContext &ctx) noexcept -> Task<> {
{ // Normal read.
int zero = ::open("/dev/zero", O_RDONLY);
CHECK(zero >= 0);

constexpr char zeros[1024] = {};
char buffer[1024];

for (std::size_t i = 0; i < std::size(buffer); ++i) {
auto [bytes, error] = co_await TimedReadAwaitable(zero, buffer, i, 0, 1s);
CHECK(error == std::errc{});
CHECK(std::memcmp(buffer, zeros, bytes) == 0);
}

::close(zero);
}

{ // Timeout.
int pipes[2];
CHECK(::pipe2(pipes, O_CLOEXEC) == 0);

char buffer[1024];
for (std::size_t i = 0; i < 5; ++i) {
auto [bytes, error] =
co_await TimedReadAwaitable(pipes[0], buffer, sizeof(buffer), 0, 100ms);
CHECK(error == std::errc::operation_canceled);
}

::close(pipes[0]);
::close(pipes[1]);
}

ctx.stop();
}

} // namespace

TEST_CASE("[task] TimedReadAwaitable") {
IoContext ctx(1);
ctx.schedule(timedReadAwaitableTask(ctx));
ctx.run();
}

namespace {

auto writeAwaitableTask(IoContext &ctx) noexcept -> Task<> {
int null = ::open("/dev/null", O_WRONLY);
CHECK(null >= 0);
Expand Down

0 comments on commit 4fb5930

Please sign in to comment.