Skip to content

Commit

Permalink
stream: implement efficient in-place read
Browse files Browse the repository at this point in the history
  • Loading branch information
dermesser committed Jun 24, 2024
1 parent 9eff971 commit 9b8e975
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 42 deletions.
24 changes: 21 additions & 3 deletions test/stream_test.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@

#include <boost/assert.hpp>
#include <gtest/gtest.h>
#include <sys/socket.h>
#include <uv.h>
Expand Down Expand Up @@ -124,24 +125,41 @@ TEST(PipeTest, largeWriteRead) {
auto [read, write] = pipe(loop);

for (unsigned i = 0; i < 10; ++i) {
co_await write.write(std::string(buffer.data(), buffer.size()));
EXPECT_EQ(buffer.size(), co_await write.write(
std::string(buffer.data(), buffer.size())));
}
co_await write.close();

size_t bytesRead{};

while (true) {
auto chunk = co_await read.read();
// Read random chunk size.
auto chunk = co_await read.read(732);
if (!chunk.has_value()) {
break;
}
bytesRead += chunk->size();
}

BOOST_ASSERT(bytesRead == 10240);
co_await read.close();
};

run_loop(setup);
}

TEST(PipeTest, readIntoBuffer) {
auto setup = [&](const Loop &loop) -> uvco::Promise<void> {
auto [read, write] = pipe(loop);

co_await write.write("Hello");
std::array<char, 32> buffer{};
size_t bytesRead = co_await read.read(buffer);
EXPECT_EQ(bytesRead, 5);
EXPECT_EQ(std::string(buffer.data(), bytesRead), "Hello");
co_await read.close();
co_await write.close();
};
run_loop(setup);
}

} // namespace
11 changes: 10 additions & 1 deletion test/tcp-broadcaster.exe.cc
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@

#include <boost/program_options.hpp>
#include <boost/program_options/detail/parsers.hpp>
#include <boost/program_options/options_description.hpp>
#include <boost/program_options/value_semantic.hpp>
#include <boost/program_options/variables_map.hpp>
#include <fmt/core.h>
#include <fmt/format.h>

#include "uvco/name_resolution.h"
#include "uvco/promise/multipromise.h"
#include "uvco/promise/promise.h"
#include "uvco/run.h"
#include "uvco/stream.h"
#include "uvco/tcp.h"
#include "uvco/tcp_stream.h"

#include <cstdint>
#include <cstdio>
Expand All @@ -16,7 +22,10 @@
#include <memory>
#include <optional>
#include <string>
#include <string_view>
#include <unordered_map>
#include <utility>
#include <vector>

struct Options {
const uvco::Loop *loop;
Expand Down Expand Up @@ -55,7 +64,7 @@ class Hub {

Promise<void> broadcast(std::string_view from, std::string_view what) {
const std::string message = fmt::format("{} says: {}", from, what);
std::vector<Promise<uv_status>> promises;
std::vector<Promise<size_t>> promises;
promises.reserve(clients_.size());

for (const auto &clientPtr : clients_) {
Expand Down
77 changes: 50 additions & 27 deletions uvco/stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <boost/assert.hpp>
#include <fmt/core.h>
#include <span>
#include <uv.h>
#include <uv/unix.h>

Expand Down Expand Up @@ -46,17 +47,32 @@ TtyStream TtyStream::tty(const Loop &loop, int fd) {
return TtyStream{std::move(tty)};
}

Promise<std::optional<std::string>> StreamBase::read() {
Promise<std::optional<std::string>> StreamBase::read(size_t maxSize) {
// This is a promise root function, i.e. origin of a promise.
InStreamAwaiter_ awaiter{*this};
std::optional<std::string> buf = co_await awaiter;
std::string buf(maxSize, '\0');
InStreamAwaiter_ awaiter{*this, buf};
const size_t nRead = co_await awaiter;
if (nRead == 0) {
// EOF.
co_return std::nullopt;
}
buf.resize(nRead);
co_return buf;
}

Promise<uv_status> StreamBase::write(std::string buf) {
Promise<size_t> StreamBase::read(std::span<char> buffer) {
InStreamAwaiter_ awaiter{*this, buffer};
size_t n = co_await awaiter;
co_return n;
}

Promise<size_t> StreamBase::write(std::string buf) {
OutStreamAwaiter_ awaiter{*this, std::move(buf)};
uv_status status = co_await awaiter;
co_return status;
if (status < 0) {
throw UvcoException{status, "StreamBase::write() encountered error"};
}
co_return static_cast<size_t>(status);
}

Promise<void> StreamBase::shutdown() {
Expand Down Expand Up @@ -93,7 +109,7 @@ bool StreamBase::InStreamAwaiter_::await_ready() {
start_read();
stop_read();
}
return slot_.has_value();
return status_.has_value();
}

bool StreamBase::InStreamAwaiter_::await_suspend(
Expand All @@ -106,44 +122,51 @@ bool StreamBase::InStreamAwaiter_::await_suspend(
return true;
}

std::optional<std::string> StreamBase::InStreamAwaiter_::await_resume() {
if (!slot_ && !stream_.stream_) {
size_t StreamBase::InStreamAwaiter_::await_resume() {
if (!status_ && !stream_.stream_) {
return {};
}
BOOST_ASSERT(slot_);
std::optional<std::string> result = std::move(*slot_);
slot_.reset();
BOOST_ASSERT(status_);
stream_.reader_.reset();
return result;
if (status_ && *status_ == UV_EOF) {
return 0;
}
if (status_ && *status_ < 0) {
throw UvcoException{static_cast<uv_status>(*status_),
"StreamBase::read() encountered error"};
}
BOOST_ASSERT(status_.value() >= 0);
return static_cast<size_t>(status_.value());
}

// Provides the InStreamAwaiter_'s span buffer to libuv.
void StreamBase::InStreamAwaiter_::allocate(uv_handle_t *handle,
size_t /*suggested_size*/,
uv_buf_t *buf) {
const InStreamAwaiter_ *awaiter = (InStreamAwaiter_ *)handle->data;
BOOST_ASSERT(awaiter != nullptr);
buf->base = awaiter->buffer_.data();
buf->len = awaiter->buffer_.size();
}

void StreamBase::InStreamAwaiter_::start_read() {
uv_read_start(&stream_.stream(), allocator, onInStreamRead);
uv_read_start(&stream_.stream(), StreamBase::InStreamAwaiter_::allocate,
StreamBase::InStreamAwaiter_::onInStreamRead);
}

void StreamBase::InStreamAwaiter_::stop_read() {
uv_read_stop(&stream_.stream());
}

// buf is not used, because it is an alias to awaiter->buffer_.
void StreamBase::InStreamAwaiter_::onInStreamRead(uv_stream_t *stream,
ssize_t nread,
const uv_buf_t *buf) {
const uv_buf_t * /*buf*/) {
auto *awaiter = (InStreamAwaiter_ *)stream->data;
BOOST_ASSERT(awaiter != nullptr);
awaiter->stop_read();
awaiter->status_ = nread;

if (nread == UV_EOF) {
awaiter->slot_ = std::optional<std::string>{};
} else if (nread >= 0) {
std::string line{buf->base, static_cast<size_t>(nread)};
awaiter->slot_ = std::move(line);
} else {
// Some error; assume EOF.
// TODO: propagate error.
awaiter->slot_ = std::optional<std::string>{};
}

freeUvBuf(buf);
if (awaiter->handle_) {
auto handle = awaiter->handle_.value();
awaiter->handle_.reset();
Expand All @@ -154,7 +177,7 @@ void StreamBase::InStreamAwaiter_::onInStreamRead(uv_stream_t *stream,

StreamBase::OutStreamAwaiter_::OutStreamAwaiter_(StreamBase &stream,
std::string_view buffer)
: stream_{stream}, buffer_{buffer}, write_{} {}
: buffer_{buffer}, write_{}, stream_{stream} {}

std::array<uv_buf_t, 1> StreamBase::OutStreamAwaiter_::prepare_buffers() const {
std::array<uv_buf_t, 1> bufs{};
Expand Down
35 changes: 24 additions & 11 deletions uvco/stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include <boost/assert.hpp>
#include <fmt/core.h>
#include <span>
#include <string_view>
#include <uv.h>
#include <uv/unix.h>
Expand Down Expand Up @@ -39,18 +40,26 @@ class StreamBase {
StreamBase &operator=(StreamBase &&) = default;
virtual ~StreamBase();

/// Read available data (up to 4 kB) from stream. Returns an empty optional on
/// EOF or closed handle (`close()`).
static constexpr size_t defaultMaxReadSize = 4096;

/// Read available data (up to `maxSize` bytes) from stream. Returns
/// `std::nullopt` on EOF or closed handle (`close()`).
///
/// Throws `UvcoException` on error.
///
/// Implementation note: the actual read start occurs in the awaiter,
/// whereas most other types of I/O start in the promise root function.
/// For streams it is more convenient to do it like this, but there is no
/// deeper reason.
/// NOTE: Consider using `read(std::span<char>)` for better performance.
///
/// NOTE: only one reader is allowed to be active at a time. If a read is
/// started while another is still active, uvco will abort the process (in
/// Debug mode), or ignore the first read (in Release mode).
[[nodiscard]] Promise<std::optional<std::string>> read();
[[nodiscard]] Promise<std::optional<std::string>>
read(size_t maxSize = defaultMaxReadSize);

/// Read available data (up to `buffer.size()` bytes) from stream. Returns
/// the number of bytes read, or 0 on EOF or closed handle (`close()`).
///
/// Throws `UvcoException` on error.
Promise<size_t> read(std::span<char> buffer);

/// Write a buffer to the stream. A copy of `buf` is taken because it is
/// undetermined when the actual write will occur. Await the result if the
Expand All @@ -60,7 +69,7 @@ class StreamBase {
/// NOTE: only one writer is allowed to be active at a time. If two writes
/// are started simultaneously, the process will be aborted in Debug mode, or
/// the first `write()` coroutine will not return in Release mode.
[[nodiscard]] Promise<uv_status> write(std::string buf);
[[nodiscard]] Promise<size_t> write(std::string buf);

/// Shut down stream for writing. This is a half-close; the other side
/// can still write. The result of `shutdown()` *must be `co_await`ed*.
Expand Down Expand Up @@ -105,20 +114,24 @@ class StreamBase {
};

struct InStreamAwaiter_ {
explicit InStreamAwaiter_(StreamBase &stream) : stream_{stream} {}
explicit InStreamAwaiter_(StreamBase &stream, std::span<char> buffer)
: stream_{stream}, buffer_{buffer} {}

bool await_ready();
bool await_suspend(std::coroutine_handle<> handle);
std::optional<std::string> await_resume();
size_t await_resume();

void start_read();
void stop_read();

static void allocate(uv_handle_t *handle, size_t suggested_size,
uv_buf_t *buf);
static void onInStreamRead(uv_stream_t *stream, ssize_t nread,
const uv_buf_t *buf);

StreamBase &stream_;
std::optional<std::optional<std::string>> slot_;
std::span<char> buffer_;
std::optional<ssize_t> status_;
std::optional<std::coroutine_handle<>> handle_;
};

Expand Down

0 comments on commit 9b8e975

Please sign in to comment.