Skip to content

Commit

Permalink
promise: improve promise core state machine
Browse files Browse the repository at this point in the history
  • Loading branch information
dermesser committed Jun 15, 2024
1 parent 3216212 commit 1d1f0d4
Show file tree
Hide file tree
Showing 8 changed files with 79 additions and 51 deletions.
2 changes: 1 addition & 1 deletion clang-format.sh
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
#!/bin/bash
fd '.+\.(cc|h|hpp|cpp|c)$' src test -x clang-format -i
fd '.+\.(cc|h|hpp|cpp|c)$' uvco test -x clang-format -i
14 changes: 13 additions & 1 deletion test/promise_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,21 @@ TEST(PromiseTest, moveCtor) {

TEST(PromiseTest, awaitTwice) {
auto setup = [](const Loop &loop) -> uvco::Promise<void> {
Promise<int> promise = []() -> uvco::Promise<int> { co_return 1; }();
Promise<int> promise = []() -> uvco::Promise<int> {
co_await yield();
co_return 1;
}();
EXPECT_EQ(co_await promise, 1);
EXPECT_THROW({ co_await promise; }, UvcoException);
};

run_loop(setup);
}

TEST(PromiseTest, awaitTwiceImmediateReturn) {
auto setup = [](const Loop &loop) -> uvco::Promise<void> {
Promise<int> promise = []() -> uvco::Promise<int> { co_return 1; }();
EXPECT_EQ(co_await promise, 1);
EXPECT_THROW({ co_await promise; }, UvcoException);
};

Expand Down
7 changes: 3 additions & 4 deletions uvco/promise/multipromise.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,7 @@ template <typename T> class MultiPromise {
}

/// Obtain the next value yielded by a generator coroutine.
Promise<std::optional<T>> next() {
co_return (co_await *this);
}
Promise<std::optional<T>> next() { co_return (co_await *this); }

/// Return an awaiter for this MultiPromise, which resumes the waiting
/// coroutine once the generator yields its next value.
Expand Down Expand Up @@ -200,7 +198,8 @@ template <typename T> class MultiPromise {
/// receiving values from a generating (yielding) coroutine. This awaiter is
/// used when applying the `co_await` operator on a `MultiPromise`.
struct MultiPromiseAwaiter_ {
constexpr explicit MultiPromiseAwaiter_(SharedCore_ core) : core_{std::move(core)} {}
constexpr explicit MultiPromiseAwaiter_(SharedCore_ core)
: core_{std::move(core)} {}
MultiPromiseAwaiter_(MultiPromiseAwaiter_ &&) = delete;
MultiPromiseAwaiter_(const MultiPromiseAwaiter_ &) = delete;
MultiPromiseAwaiter_ &operator=(MultiPromiseAwaiter_ &&) = delete;
Expand Down
22 changes: 13 additions & 9 deletions uvco/promise/promise.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,26 @@ Promise<void>::PromiseAwaiter_ Promise<void>::operator co_await() const {

bool Promise<void>::PromiseAwaiter_::await_suspend(
std::coroutine_handle<> handle) const {
BOOST_ASSERT(!core_.ready && !core_.exception);
BOOST_ASSERT(!core_.ready_ && !core_.exception_);
BOOST_ASSERT_MSG(!core_.willResume(),
"promise is already being waited on!\n");
core_.setHandle(handle);
return true;
}

bool Promise<void>::PromiseAwaiter_::await_ready() const {
return core_.ready || core_.exception;
return core_.ready_ || core_.exception_;
}

void Promise<void>::PromiseAwaiter_::await_resume() const {
if (core_.exception) {
std::rethrow_exception(core_.exception.value());
if (core_.stale()) {
throw UvcoException(
"co_await called on previously finished promise (void)");
}
BOOST_ASSERT(core_.ready);
if (core_.exception_) {
std::rethrow_exception(core_.exception_.value());
}
BOOST_ASSERT(core_.ready_);
}

Promise<void>::Promise() : core_{makeRefCounted<PromiseCore<void>>()} {}
Expand Down Expand Up @@ -78,12 +82,12 @@ Promise<void>::~Promise() {
}
}

bool Promise<void>::ready() const { return core_->ready; }
bool Promise<void>::ready() const { return core_->ready_; }

void Promise<void>::unwrap() {
if (ready()) {
if (core_->exception) {
std::rethrow_exception(core_->exception.value());
if (core_->exception_) {
std::rethrow_exception(core_->exception_.value());
}
} else {
throw UvcoException(UV_EAGAIN, "unwrap called on unfulfilled promise");
Expand All @@ -95,7 +99,7 @@ PromiseHandle<void> Promise<void>::handle() {
}

void Coroutine<void>::return_void() {
core_->ready = true;
core_->ready_ = true;
core_->resume();
}

Expand Down
11 changes: 8 additions & 3 deletions uvco/promise/promise.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ template <typename T> class Promise {
}
}

/// Return a handle that can be used to cancel the coroutine.
PromiseHandle<T> handle() { return PromiseHandle<T>{core_}; }

/// Part of the coroutine protocol: called by `co_await p` where `p` is a
Expand Down Expand Up @@ -174,7 +175,7 @@ template <typename T> class Promise {

/// Part of the coroutine protocol: returns `true` if the promise is already
/// fulfilled.
[[nodiscard]] bool await_ready() const { return core_.slot.has_value(); }
[[nodiscard]] bool await_ready() const { return core_.ready(); }
/// Part of the coroutine protocol: returns if suspension is desired (always
/// true), and stores the awaiting coroutine state in the `PromiseCore`.
[[nodiscard]] bool await_suspend(std::coroutine_handle<> handle) const {
Expand All @@ -186,6 +187,10 @@ template <typename T> class Promise {
/// Part of the coroutine protocol: extracts the resulting value from the
/// promise core and returns it.
T await_resume() const {
if (core_.stale()) {
throw UvcoException(
"co_await called on previously finished promise (T)");
}
if (core_.slot.has_value()) {
switch (core_.slot->index()) {
case 0: {
Expand Down Expand Up @@ -288,8 +293,8 @@ template <typename T> class Coroutine {
using SharedCore_ = PromiseCore_ *;

public:
// Coroutine object is pinned within the coroutine frame; copy/move is
// disallowed.
/// Coroutine object lives and is pinned within the coroutine frame; copy/move
/// is disallowed.
Coroutine() : core_{makeRefCounted<PromiseCore_>()} {}
Coroutine(const Coroutine &other) = delete;
Coroutine &operator=(const Coroutine &other) = delete;
Expand Down
20 changes: 10 additions & 10 deletions uvco/promise/promise_core.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,17 @@ void PromiseCore<void>::setHandle(std::coroutine_handle<> handle) {
}

bool PromiseCore<void>::willResume() const { return handle_.has_value(); }

bool PromiseCore<void>::finished() const {
return state_ == PromiseState::finished;
bool PromiseCore<void>::ready() const { return exception_ || ready_; }
bool PromiseCore<void>::stale() const {
return state_ == PromiseState::finished && !ready();
}

void PromiseCore<void>::resume() {
if (handle_) {
BOOST_ASSERT(state_ == PromiseState::waitedOn);
state_ = PromiseState::resuming;
auto resumeHandle = *handle_;
handle_.reset();
state_ = PromiseState::running;
Loop::enqueue(resumeHandle);
} else {
// If a coroutine returned immediately, or nobody is co_awaiting the result.
Expand All @@ -43,7 +43,7 @@ void PromiseCore<void>::resume() {
}

PromiseCore<void>::~PromiseCore() {
BOOST_ASSERT(state_ != PromiseState::running);
BOOST_ASSERT(state_ != PromiseState::resuming);
if (state_ == PromiseState::init) {
fmt::print(stderr, "void Promise not finished\n");
}
Expand All @@ -56,15 +56,15 @@ PromiseCore<void>::~PromiseCore() {
void PromiseCore<void>::except(std::exception_ptr exc) {
BOOST_ASSERT(state_ == PromiseState::init ||
state_ == PromiseState::waitedOn);
exception = std::move(exc);
ready = true;
exception_ = std::move(exc);
ready_ = true;
}

void PromiseCore<void>::cancel() {
if (state_ == PromiseState::waitedOn) {
BOOST_ASSERT(!exception);
if (!exception) {
exception = std::make_exception_ptr(
BOOST_ASSERT(!exception_);
if (!exception_) {
exception_ = std::make_exception_ptr(
UvcoException(UV_ECANCELED, "Promise cancelled"));
}
resume();
Expand Down
50 changes: 29 additions & 21 deletions uvco/promise/promise_core.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,16 @@ namespace uvco {
/// After the caller has been run (and suspended again), the state is
/// `finished`, and no more operations may be executed on this promise.
enum class PromiseState {
/// After construction, as the associated coroutine is about to start, up to
/// the first suspension point and the following co_await.
init = 0,
/// After the coroutine has reached a suspension point and another coroutine
/// has started co_awaiting it.
waitedOn = 1,
running = 2,
/// After the coroutine has been resumed, and is scheduled to be run on the
/// next Loop turn.
resuming = 2,
///
finished = 3,
};

Expand Down Expand Up @@ -99,9 +106,10 @@ template <typename T> class PromiseCore : public RefCounted<PromiseCore<T>> {

/// Checks if a coroutine is waiting on this core.
bool willResume() { return handle_.has_value(); }
/// Checks if a value is present in the slot.
[[nodiscard]] bool ready() const { return slot.has_value(); }
[[nodiscard]] bool finished() const {
return state_ == PromiseState::finished;
[[nodiscard]] bool stale() const {
return state_ == PromiseState::finished && !ready();
}

/// Resume a suspended coroutine waiting on the associated coroutine by
Expand All @@ -111,7 +119,7 @@ template <typename T> class PromiseCore : public RefCounted<PromiseCore<T>> {
virtual void resume() {
if (handle_) {
BOOST_ASSERT(state_ == PromiseState::waitedOn);
state_ = PromiseState::running;
state_ = PromiseState::resuming;
auto resume = *handle_;
handle_.reset();
Loop::enqueue(resume);
Expand All @@ -121,26 +129,25 @@ template <typename T> class PromiseCore : public RefCounted<PromiseCore<T>> {
// returned a value. (await_ready() == true)
}

// Note: with asynchronous resumption (Loop::enqueue), this state machine
// is a bit faulty. The promise awaiter is resumed in state `finished`.
// However, this works out fine for the purpose of enforcing the
// "protocol" of interactions with the promise core: a promise can be
// destroyed without the resumption having run, but that is an issue in
// the loop or the result of a premature termination.
switch (state_) {
case PromiseState::init:
case PromiseState::running:
// Coroutine returns but nobody has awaited yet. This is fine.
state_ = PromiseState::finished;
break;
case PromiseState::resuming:
// Not entirely correct, but the resumed awaiting coroutine is not coming
// back to us.
state_ = PromiseState::finished;
break;
case PromiseState::waitedOn:
// It is possible that set_resume() was called in a stack originating at
// resume(), thus updating the state. In that case, the state should be
// preserved.
state_ = PromiseState::waitedOn;
// state is waitedOn, but no handle is set - that's an error.
BOOST_ASSERT_MSG(
false,
"PromiseCore::resume() called without handle in state waitedOn");
break;
case PromiseState::finished:
// Happens in MultiPromiseCore on co_return if the co_awaiter has lost
// interest. Harmless if !resume_ (asserted above).
// interest. Harmless if !handle_ (asserted above).
break;
}
}
Expand Down Expand Up @@ -191,20 +198,21 @@ template <> class PromiseCore<void> : public RefCounted<PromiseCore<void>> {
/// See `PromiseCore::set_resume`.
void setHandle(std::coroutine_handle<> handle);

/// See `PromiseCore::reset_resume`.
/// See `PromiseCore::resetHandle`.
void resetHandle();
/// See `PromiseCore::will_resume`.
/// See `PromiseCore::willResume`.
[[nodiscard]] bool willResume() const;
[[nodiscard]] bool finished() const;
[[nodiscard]] bool ready() const;
[[nodiscard]] bool stale() const;

/// See `PromiseCore::resume`.
void resume();

void except(std::exception_ptr exc);

bool ready = false;
bool ready_ = false;

std::optional<std::exception_ptr> exception;
std::optional<std::exception_ptr> exception_;

private:
std::optional<std::coroutine_handle<>> handle_;
Expand Down
4 changes: 2 additions & 2 deletions uvco/promise/select.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ template <typename... Ts> class SelectSet {
BOOST_ASSERT_MSG(!resumed_, "A select set can only be used once");
std::apply(
[handle](auto &&...promise) {
((!promise.core()->finished() ? promise.core()->setHandle(handle)
: (void)0),
((!promise.core()->stale() ? promise.core()->setHandle(handle)
: (void)0),
...);
},
promises_);
Expand Down

0 comments on commit 1d1f0d4

Please sign in to comment.