diff --git a/test/stream_test.cc b/test/stream_test.cc index 4687e97..a276055 100644 --- a/test/stream_test.cc +++ b/test/stream_test.cc @@ -1,4 +1,5 @@ +#include #include #include #include @@ -124,24 +125,41 @@ TEST(PipeTest, largeWriteRead) { auto [read, write] = pipe(loop); for (unsigned i = 0; i < 10; ++i) { - co_await write.write(std::string(buffer.data(), buffer.size())); + EXPECT_EQ(buffer.size(), co_await write.write( + std::string(buffer.data(), buffer.size()))); } co_await write.close(); size_t bytesRead{}; while (true) { - auto chunk = co_await read.read(); + // Read random chunk size. + auto chunk = co_await read.read(732); if (!chunk.has_value()) { break; } bytesRead += chunk->size(); } - + BOOST_ASSERT(bytesRead == 10240); co_await read.close(); }; run_loop(setup); } +TEST(PipeTest, readIntoBuffer) { + auto setup = [&](const Loop &loop) -> uvco::Promise { + auto [read, write] = pipe(loop); + + co_await write.write("Hello"); + std::array buffer{}; + size_t bytesRead = co_await read.read(buffer); + EXPECT_EQ(bytesRead, 5); + EXPECT_EQ(std::string(buffer.data(), bytesRead), "Hello"); + co_await read.close(); + co_await write.close(); + }; + run_loop(setup); +} + } // namespace diff --git a/test/tcp-broadcaster.exe.cc b/test/tcp-broadcaster.exe.cc index 4d05933..8db9341 100644 --- a/test/tcp-broadcaster.exe.cc +++ b/test/tcp-broadcaster.exe.cc @@ -1,13 +1,19 @@ #include +#include +#include +#include #include +#include #include +#include "uvco/name_resolution.h" #include "uvco/promise/multipromise.h" #include "uvco/promise/promise.h" #include "uvco/run.h" #include "uvco/stream.h" #include "uvco/tcp.h" +#include "uvco/tcp_stream.h" #include #include @@ -16,7 +22,10 @@ #include #include #include +#include #include +#include +#include struct Options { const uvco::Loop *loop; @@ -55,7 +64,7 @@ class Hub { Promise broadcast(std::string_view from, std::string_view what) { const std::string message = fmt::format("{} says: {}", from, what); - std::vector> promises; + std::vector> promises; promises.reserve(clients_.size()); for (const auto &clientPtr : clients_) { diff --git a/uvco/stream.cc b/uvco/stream.cc index 225cb99..fecb59e 100644 --- a/uvco/stream.cc +++ b/uvco/stream.cc @@ -2,6 +2,7 @@ #include #include +#include #include #include @@ -46,17 +47,32 @@ TtyStream TtyStream::tty(const Loop &loop, int fd) { return TtyStream{std::move(tty)}; } -Promise> StreamBase::read() { +Promise> StreamBase::read(size_t maxSize) { // This is a promise root function, i.e. origin of a promise. - InStreamAwaiter_ awaiter{*this}; - std::optional buf = co_await awaiter; + std::string buf(maxSize, '\0'); + InStreamAwaiter_ awaiter{*this, buf}; + const size_t nRead = co_await awaiter; + if (nRead == 0) { + // EOF. + co_return std::nullopt; + } + buf.resize(nRead); co_return buf; } -Promise StreamBase::write(std::string buf) { +Promise StreamBase::read(std::span buffer) { + InStreamAwaiter_ awaiter{*this, buffer}; + size_t n = co_await awaiter; + co_return n; +} + +Promise StreamBase::write(std::string buf) { OutStreamAwaiter_ awaiter{*this, std::move(buf)}; uv_status status = co_await awaiter; - co_return status; + if (status < 0) { + throw UvcoException{status, "StreamBase::write() encountered error"}; + } + co_return static_cast(status); } Promise StreamBase::shutdown() { @@ -93,7 +109,7 @@ bool StreamBase::InStreamAwaiter_::await_ready() { start_read(); stop_read(); } - return slot_.has_value(); + return status_.has_value(); } bool StreamBase::InStreamAwaiter_::await_suspend( @@ -106,44 +122,51 @@ bool StreamBase::InStreamAwaiter_::await_suspend( return true; } -std::optional StreamBase::InStreamAwaiter_::await_resume() { - if (!slot_ && !stream_.stream_) { +size_t StreamBase::InStreamAwaiter_::await_resume() { + if (!status_ && !stream_.stream_) { return {}; } - BOOST_ASSERT(slot_); - std::optional result = std::move(*slot_); - slot_.reset(); + BOOST_ASSERT(status_); stream_.reader_.reset(); - return result; + if (status_ && *status_ == UV_EOF) { + return 0; + } + if (status_ && *status_ < 0) { + throw UvcoException{static_cast(*status_), + "StreamBase::read() encountered error"}; + } + BOOST_ASSERT(status_.value() >= 0); + return static_cast(status_.value()); +} + +// Provides the InStreamAwaiter_'s span buffer to libuv. +void StreamBase::InStreamAwaiter_::allocate(uv_handle_t *handle, + size_t /*suggested_size*/, + uv_buf_t *buf) { + const InStreamAwaiter_ *awaiter = (InStreamAwaiter_ *)handle->data; + BOOST_ASSERT(awaiter != nullptr); + buf->base = awaiter->buffer_.data(); + buf->len = awaiter->buffer_.size(); } void StreamBase::InStreamAwaiter_::start_read() { - uv_read_start(&stream_.stream(), allocator, onInStreamRead); + uv_read_start(&stream_.stream(), StreamBase::InStreamAwaiter_::allocate, + StreamBase::InStreamAwaiter_::onInStreamRead); } void StreamBase::InStreamAwaiter_::stop_read() { uv_read_stop(&stream_.stream()); } +// buf is not used, because it is an alias to awaiter->buffer_. void StreamBase::InStreamAwaiter_::onInStreamRead(uv_stream_t *stream, ssize_t nread, - const uv_buf_t *buf) { + const uv_buf_t * /*buf*/) { auto *awaiter = (InStreamAwaiter_ *)stream->data; BOOST_ASSERT(awaiter != nullptr); awaiter->stop_read(); + awaiter->status_ = nread; - if (nread == UV_EOF) { - awaiter->slot_ = std::optional{}; - } else if (nread >= 0) { - std::string line{buf->base, static_cast(nread)}; - awaiter->slot_ = std::move(line); - } else { - // Some error; assume EOF. - // TODO: propagate error. - awaiter->slot_ = std::optional{}; - } - - freeUvBuf(buf); if (awaiter->handle_) { auto handle = awaiter->handle_.value(); awaiter->handle_.reset(); @@ -154,7 +177,7 @@ void StreamBase::InStreamAwaiter_::onInStreamRead(uv_stream_t *stream, StreamBase::OutStreamAwaiter_::OutStreamAwaiter_(StreamBase &stream, std::string_view buffer) - : stream_{stream}, buffer_{buffer}, write_{} {} + : buffer_{buffer}, write_{}, stream_{stream} {} std::array StreamBase::OutStreamAwaiter_::prepare_buffers() const { std::array bufs{}; diff --git a/uvco/stream.h b/uvco/stream.h index 48e970b..b748c04 100644 --- a/uvco/stream.h +++ b/uvco/stream.h @@ -4,6 +4,7 @@ #include #include +#include #include #include #include @@ -39,18 +40,26 @@ class StreamBase { StreamBase &operator=(StreamBase &&) = default; virtual ~StreamBase(); - /// Read available data (up to 4 kB) from stream. Returns an empty optional on - /// EOF or closed handle (`close()`). + static constexpr size_t defaultMaxReadSize = 4096; + + /// Read available data (up to `maxSize` bytes) from stream. Returns + /// `std::nullopt` on EOF or closed handle (`close()`). + /// + /// Throws `UvcoException` on error. /// - /// Implementation note: the actual read start occurs in the awaiter, - /// whereas most other types of I/O start in the promise root function. - /// For streams it is more convenient to do it like this, but there is no - /// deeper reason. + /// NOTE: Consider using `read(std::span)` for better performance. /// /// NOTE: only one reader is allowed to be active at a time. If a read is /// started while another is still active, uvco will abort the process (in /// Debug mode), or ignore the first read (in Release mode). - [[nodiscard]] Promise> read(); + [[nodiscard]] Promise> + read(size_t maxSize = defaultMaxReadSize); + + /// Read available data (up to `buffer.size()` bytes) from stream. Returns + /// the number of bytes read, or 0 on EOF or closed handle (`close()`). + /// + /// Throws `UvcoException` on error. + Promise read(std::span buffer); /// Write a buffer to the stream. A copy of `buf` is taken because it is /// undetermined when the actual write will occur. Await the result if the @@ -60,7 +69,7 @@ class StreamBase { /// NOTE: only one writer is allowed to be active at a time. If two writes /// are started simultaneously, the process will be aborted in Debug mode, or /// the first `write()` coroutine will not return in Release mode. - [[nodiscard]] Promise write(std::string buf); + [[nodiscard]] Promise write(std::string buf); /// Shut down stream for writing. This is a half-close; the other side /// can still write. The result of `shutdown()` *must be `co_await`ed*. @@ -105,20 +114,24 @@ class StreamBase { }; struct InStreamAwaiter_ { - explicit InStreamAwaiter_(StreamBase &stream) : stream_{stream} {} + explicit InStreamAwaiter_(StreamBase &stream, std::span buffer) + : stream_{stream}, buffer_{buffer} {} bool await_ready(); bool await_suspend(std::coroutine_handle<> handle); - std::optional await_resume(); + size_t await_resume(); void start_read(); void stop_read(); + static void allocate(uv_handle_t *handle, size_t suggested_size, + uv_buf_t *buf); static void onInStreamRead(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf); StreamBase &stream_; - std::optional> slot_; + std::span buffer_; + std::optional status_; std::optional> handle_; };