From 1d1f0d49875c4424817d2684dbbd581978d5eabb Mon Sep 17 00:00:00 2001 From: Lewin Bormann Date: Sat, 15 Jun 2024 19:49:41 +0200 Subject: [PATCH] promise: improve promise core state machine --- clang-format.sh | 2 +- test/promise_test.cc | 14 +++++++++- uvco/promise/multipromise.h | 7 +++-- uvco/promise/promise.cc | 22 +++++++++------- uvco/promise/promise.h | 11 +++++--- uvco/promise/promise_core.cc | 20 +++++++-------- uvco/promise/promise_core.h | 50 +++++++++++++++++++++--------------- uvco/promise/select.h | 4 +-- 8 files changed, 79 insertions(+), 51 deletions(-) diff --git a/clang-format.sh b/clang-format.sh index f00fef8..9c6ca51 100755 --- a/clang-format.sh +++ b/clang-format.sh @@ -1,2 +1,2 @@ #!/bin/bash -fd '.+\.(cc|h|hpp|cpp|c)$' src test -x clang-format -i +fd '.+\.(cc|h|hpp|cpp|c)$' uvco test -x clang-format -i diff --git a/test/promise_test.cc b/test/promise_test.cc index 5fa4889..b30cb15 100644 --- a/test/promise_test.cc +++ b/test/promise_test.cc @@ -28,9 +28,21 @@ TEST(PromiseTest, moveCtor) { TEST(PromiseTest, awaitTwice) { auto setup = [](const Loop &loop) -> uvco::Promise { - Promise promise = []() -> uvco::Promise { co_return 1; }(); + Promise promise = []() -> uvco::Promise { + co_await yield(); + co_return 1; + }(); EXPECT_EQ(co_await promise, 1); + EXPECT_THROW({ co_await promise; }, UvcoException); + }; + run_loop(setup); +} + +TEST(PromiseTest, awaitTwiceImmediateReturn) { + auto setup = [](const Loop &loop) -> uvco::Promise { + Promise promise = []() -> uvco::Promise { co_return 1; }(); + EXPECT_EQ(co_await promise, 1); EXPECT_THROW({ co_await promise; }, UvcoException); }; diff --git a/uvco/promise/multipromise.h b/uvco/promise/multipromise.h index 59f0f60..2ac8b8a 100644 --- a/uvco/promise/multipromise.h +++ b/uvco/promise/multipromise.h @@ -162,9 +162,7 @@ template class MultiPromise { } /// Obtain the next value yielded by a generator coroutine. - Promise> next() { - co_return (co_await *this); - } + Promise> next() { co_return (co_await *this); } /// Return an awaiter for this MultiPromise, which resumes the waiting /// coroutine once the generator yields its next value. @@ -200,7 +198,8 @@ template class MultiPromise { /// receiving values from a generating (yielding) coroutine. This awaiter is /// used when applying the `co_await` operator on a `MultiPromise`. struct MultiPromiseAwaiter_ { - constexpr explicit MultiPromiseAwaiter_(SharedCore_ core) : core_{std::move(core)} {} + constexpr explicit MultiPromiseAwaiter_(SharedCore_ core) + : core_{std::move(core)} {} MultiPromiseAwaiter_(MultiPromiseAwaiter_ &&) = delete; MultiPromiseAwaiter_(const MultiPromiseAwaiter_ &) = delete; MultiPromiseAwaiter_ &operator=(MultiPromiseAwaiter_ &&) = delete; diff --git a/uvco/promise/promise.cc b/uvco/promise/promise.cc index 7376e9e..07d6f6b 100644 --- a/uvco/promise/promise.cc +++ b/uvco/promise/promise.cc @@ -22,7 +22,7 @@ Promise::PromiseAwaiter_ Promise::operator co_await() const { bool Promise::PromiseAwaiter_::await_suspend( std::coroutine_handle<> handle) const { - BOOST_ASSERT(!core_.ready && !core_.exception); + BOOST_ASSERT(!core_.ready_ && !core_.exception_); BOOST_ASSERT_MSG(!core_.willResume(), "promise is already being waited on!\n"); core_.setHandle(handle); @@ -30,14 +30,18 @@ bool Promise::PromiseAwaiter_::await_suspend( } bool Promise::PromiseAwaiter_::await_ready() const { - return core_.ready || core_.exception; + return core_.ready_ || core_.exception_; } void Promise::PromiseAwaiter_::await_resume() const { - if (core_.exception) { - std::rethrow_exception(core_.exception.value()); + if (core_.stale()) { + throw UvcoException( + "co_await called on previously finished promise (void)"); } - BOOST_ASSERT(core_.ready); + if (core_.exception_) { + std::rethrow_exception(core_.exception_.value()); + } + BOOST_ASSERT(core_.ready_); } Promise::Promise() : core_{makeRefCounted>()} {} @@ -78,12 +82,12 @@ Promise::~Promise() { } } -bool Promise::ready() const { return core_->ready; } +bool Promise::ready() const { return core_->ready_; } void Promise::unwrap() { if (ready()) { - if (core_->exception) { - std::rethrow_exception(core_->exception.value()); + if (core_->exception_) { + std::rethrow_exception(core_->exception_.value()); } } else { throw UvcoException(UV_EAGAIN, "unwrap called on unfulfilled promise"); @@ -95,7 +99,7 @@ PromiseHandle Promise::handle() { } void Coroutine::return_void() { - core_->ready = true; + core_->ready_ = true; core_->resume(); } diff --git a/uvco/promise/promise.h b/uvco/promise/promise.h index 7ccb90b..ac4408f 100644 --- a/uvco/promise/promise.h +++ b/uvco/promise/promise.h @@ -126,6 +126,7 @@ template class Promise { } } + /// Return a handle that can be used to cancel the coroutine. PromiseHandle handle() { return PromiseHandle{core_}; } /// Part of the coroutine protocol: called by `co_await p` where `p` is a @@ -174,7 +175,7 @@ template class Promise { /// Part of the coroutine protocol: returns `true` if the promise is already /// fulfilled. - [[nodiscard]] bool await_ready() const { return core_.slot.has_value(); } + [[nodiscard]] bool await_ready() const { return core_.ready(); } /// Part of the coroutine protocol: returns if suspension is desired (always /// true), and stores the awaiting coroutine state in the `PromiseCore`. [[nodiscard]] bool await_suspend(std::coroutine_handle<> handle) const { @@ -186,6 +187,10 @@ template class Promise { /// Part of the coroutine protocol: extracts the resulting value from the /// promise core and returns it. T await_resume() const { + if (core_.stale()) { + throw UvcoException( + "co_await called on previously finished promise (T)"); + } if (core_.slot.has_value()) { switch (core_.slot->index()) { case 0: { @@ -288,8 +293,8 @@ template class Coroutine { using SharedCore_ = PromiseCore_ *; public: - // Coroutine object is pinned within the coroutine frame; copy/move is - // disallowed. + /// Coroutine object lives and is pinned within the coroutine frame; copy/move + /// is disallowed. Coroutine() : core_{makeRefCounted()} {} Coroutine(const Coroutine &other) = delete; Coroutine &operator=(const Coroutine &other) = delete; diff --git a/uvco/promise/promise_core.cc b/uvco/promise/promise_core.cc index eafea95..9e1f401 100644 --- a/uvco/promise/promise_core.cc +++ b/uvco/promise/promise_core.cc @@ -24,17 +24,17 @@ void PromiseCore::setHandle(std::coroutine_handle<> handle) { } bool PromiseCore::willResume() const { return handle_.has_value(); } - -bool PromiseCore::finished() const { - return state_ == PromiseState::finished; +bool PromiseCore::ready() const { return exception_ || ready_; } +bool PromiseCore::stale() const { + return state_ == PromiseState::finished && !ready(); } void PromiseCore::resume() { if (handle_) { BOOST_ASSERT(state_ == PromiseState::waitedOn); + state_ = PromiseState::resuming; auto resumeHandle = *handle_; handle_.reset(); - state_ = PromiseState::running; Loop::enqueue(resumeHandle); } else { // If a coroutine returned immediately, or nobody is co_awaiting the result. @@ -43,7 +43,7 @@ void PromiseCore::resume() { } PromiseCore::~PromiseCore() { - BOOST_ASSERT(state_ != PromiseState::running); + BOOST_ASSERT(state_ != PromiseState::resuming); if (state_ == PromiseState::init) { fmt::print(stderr, "void Promise not finished\n"); } @@ -56,15 +56,15 @@ PromiseCore::~PromiseCore() { void PromiseCore::except(std::exception_ptr exc) { BOOST_ASSERT(state_ == PromiseState::init || state_ == PromiseState::waitedOn); - exception = std::move(exc); - ready = true; + exception_ = std::move(exc); + ready_ = true; } void PromiseCore::cancel() { if (state_ == PromiseState::waitedOn) { - BOOST_ASSERT(!exception); - if (!exception) { - exception = std::make_exception_ptr( + BOOST_ASSERT(!exception_); + if (!exception_) { + exception_ = std::make_exception_ptr( UvcoException(UV_ECANCELED, "Promise cancelled")); } resume(); diff --git a/uvco/promise/promise_core.h b/uvco/promise/promise_core.h index 92c7261..51072ae 100644 --- a/uvco/promise/promise_core.h +++ b/uvco/promise/promise_core.h @@ -31,9 +31,16 @@ namespace uvco { /// After the caller has been run (and suspended again), the state is /// `finished`, and no more operations may be executed on this promise. enum class PromiseState { + /// After construction, as the associated coroutine is about to start, up to + /// the first suspension point and the following co_await. init = 0, + /// After the coroutine has reached a suspension point and another coroutine + /// has started co_awaiting it. waitedOn = 1, - running = 2, + /// After the coroutine has been resumed, and is scheduled to be run on the + /// next Loop turn. + resuming = 2, + /// finished = 3, }; @@ -99,9 +106,10 @@ template class PromiseCore : public RefCounted> { /// Checks if a coroutine is waiting on this core. bool willResume() { return handle_.has_value(); } + /// Checks if a value is present in the slot. [[nodiscard]] bool ready() const { return slot.has_value(); } - [[nodiscard]] bool finished() const { - return state_ == PromiseState::finished; + [[nodiscard]] bool stale() const { + return state_ == PromiseState::finished && !ready(); } /// Resume a suspended coroutine waiting on the associated coroutine by @@ -111,7 +119,7 @@ template class PromiseCore : public RefCounted> { virtual void resume() { if (handle_) { BOOST_ASSERT(state_ == PromiseState::waitedOn); - state_ = PromiseState::running; + state_ = PromiseState::resuming; auto resume = *handle_; handle_.reset(); Loop::enqueue(resume); @@ -121,26 +129,25 @@ template class PromiseCore : public RefCounted> { // returned a value. (await_ready() == true) } - // Note: with asynchronous resumption (Loop::enqueue), this state machine - // is a bit faulty. The promise awaiter is resumed in state `finished`. - // However, this works out fine for the purpose of enforcing the - // "protocol" of interactions with the promise core: a promise can be - // destroyed without the resumption having run, but that is an issue in - // the loop or the result of a premature termination. switch (state_) { case PromiseState::init: - case PromiseState::running: + // Coroutine returns but nobody has awaited yet. This is fine. + state_ = PromiseState::finished; + break; + case PromiseState::resuming: + // Not entirely correct, but the resumed awaiting coroutine is not coming + // back to us. state_ = PromiseState::finished; break; case PromiseState::waitedOn: - // It is possible that set_resume() was called in a stack originating at - // resume(), thus updating the state. In that case, the state should be - // preserved. - state_ = PromiseState::waitedOn; + // state is waitedOn, but no handle is set - that's an error. + BOOST_ASSERT_MSG( + false, + "PromiseCore::resume() called without handle in state waitedOn"); break; case PromiseState::finished: // Happens in MultiPromiseCore on co_return if the co_awaiter has lost - // interest. Harmless if !resume_ (asserted above). + // interest. Harmless if !handle_ (asserted above). break; } } @@ -191,20 +198,21 @@ template <> class PromiseCore : public RefCounted> { /// See `PromiseCore::set_resume`. void setHandle(std::coroutine_handle<> handle); - /// See `PromiseCore::reset_resume`. + /// See `PromiseCore::resetHandle`. void resetHandle(); - /// See `PromiseCore::will_resume`. + /// See `PromiseCore::willResume`. [[nodiscard]] bool willResume() const; - [[nodiscard]] bool finished() const; + [[nodiscard]] bool ready() const; + [[nodiscard]] bool stale() const; /// See `PromiseCore::resume`. void resume(); void except(std::exception_ptr exc); - bool ready = false; + bool ready_ = false; - std::optional exception; + std::optional exception_; private: std::optional> handle_; diff --git a/uvco/promise/select.h b/uvco/promise/select.h index 3f08bfd..4af0745 100644 --- a/uvco/promise/select.h +++ b/uvco/promise/select.h @@ -58,8 +58,8 @@ template class SelectSet { BOOST_ASSERT_MSG(!resumed_, "A select set can only be used once"); std::apply( [handle](auto &&...promise) { - ((!promise.core()->finished() ? promise.core()->setHandle(handle) - : (void)0), + ((!promise.core()->stale() ? promise.core()->setHandle(handle) + : (void)0), ...); }, promises_);