Skip to content

Commit f0efdab

Browse files
committed
Improve scheduling performance
1 parent 6027761 commit f0efdab

File tree

7 files changed

+73
-80
lines changed

7 files changed

+73
-80
lines changed

test/channel_test.cc

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,35 @@ TEST(ChannelTest, DISABLED_basicWriteReadBench) {
103103
run_loop(setup);
104104
}
105105

106+
TEST(ChannelTest, DISABLED_generatorWriteReadBench) {
107+
static constexpr int N_iter = 1000000;
108+
109+
auto reader = [](Channel<int> &chan) -> Promise<void> {
110+
MultiPromise<int> gen = chan.getAll();
111+
for (int i = 1; i < N_iter; ++i) {
112+
EXPECT_EQ(i, co_await gen);
113+
}
114+
};
115+
116+
auto writer = [](Channel<int> &chan) -> Promise<void> {
117+
for (int i = 1; i < N_iter; ++i) {
118+
co_await chan.put(i);
119+
}
120+
};
121+
122+
auto setup = [&](const Loop &) -> Promise<void> {
123+
Channel<int> chan{2};
124+
125+
Promise<void> readerCoroutine = reader(chan);
126+
Promise<void> writerCoroutine = writer(chan);
127+
128+
co_await readerCoroutine;
129+
co_await writerCoroutine;
130+
};
131+
132+
run_loop(setup);
133+
}
134+
106135
TEST(ChannelTest, blockingRead) {
107136

108137
auto drain = [](Channel<int> &chan) -> Promise<void> {

uvco/bounded_queue.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ template <typename T> class BoundedQueue {
4343
}
4444
/// Pop an item from the queue.
4545
T get() {
46-
if (empty()) {
46+
if (empty()) [[unlikely]] {
4747
throw UvcoException(UV_EAGAIN, "queue is empty");
4848
}
4949
T element = std::move(queue_.at(tail_++));

uvco/channel.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ template <typename T> class Channel {
144144
if (!waiters_.hasSpace()) {
145145
throw UvcoException(
146146
UV_EBUSY,
147-
"only one coroutine can wait for reading/writing a channel");
147+
"too many coroutines waiting for reading/writing a channel");
148148
}
149149
waiters_.put(handle);
150150
return true;

uvco/loop/scheduler.cc

Lines changed: 10 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
// uvco (c) 2023 Lewin Bormann. See LICENSE for specific terms.
22

3+
#include <array>
34
#include <fmt/core.h>
45
#include <uv.h>
56

@@ -14,37 +15,6 @@ namespace uvco {
1415

1516
namespace {
1617

17-
using BloomFilter = std::size_t;
18-
19-
size_t splitmix64(size_t x) {
20-
size_t z = (x + 0x9e3779b97f4a7c15);
21-
z = (z ^ (z >> 30)) * 0xbf58476d1ce4e5b9;
22-
z = (z ^ (z >> 27)) * 0x94d049bb133111eb;
23-
return z ^ (z >> 31);
24-
}
25-
26-
bool haveSeenOrAdd(BloomFilter &filter, std::coroutine_handle<> handle) {
27-
static_assert(sizeof(BloomFilter) == 8, "BloomFilter is not 64 bits");
28-
const size_t hash = splitmix64((size_t)handle.address());
29-
const unsigned index1 = hash % 64;
30-
const unsigned index2 = (hash >> 6U) % 64;
31-
const unsigned index3 = (hash >> 12U) % 64;
32-
const unsigned index4 = (hash >> 18U) % 64;
33-
const unsigned index5 = (hash >> 24U) % 64;
34-
35-
// More than the first 32 bits appear to not gain much.
36-
const size_t bloomIndex = (1U << index1) | (1U << index2) | (1U << index3) |
37-
(1U << index4) | (1U << index5);
38-
39-
if ((filter & bloomIndex) == bloomIndex) {
40-
// Potentially a false positive.
41-
return true;
42-
}
43-
// Definitely not seen before.
44-
filter |= bloomIndex;
45-
return false;
46-
}
47-
4818
unsigned findFirstIndexOf(std::span<const std::coroutine_handle<>> handles,
4919
std::coroutine_handle<> handle) {
5020
return std::ranges::find_if(
@@ -61,39 +31,35 @@ void Scheduler::runAll() {
6131
unsigned turns = 0;
6232

6333
while (!resumableActive_.empty() && turns < maxTurnsBeforeReturning) {
64-
BloomFilter seenHandles = 0;
6534
resumableRunning_.swap(resumableActive_);
6635
for (unsigned i = 0; i < resumableRunning_.size(); ++i) {
6736
auto &coro = resumableRunning_[i];
68-
6937
// Defend against resuming the same coroutine twice in the same loop pass.
7038
// This happens when SelectSet selects two coroutines which return at the
7139
// same time. Resuming the same handle twice is not good, very bad, and
7240
// will usually at least cause a heap use-after-free.
7341

74-
// Explicitly written in an explicit way :)
75-
if (!haveSeenOrAdd(seenHandles, coro)) [[likely]] {
76-
coro.resume();
77-
} else if (findFirstIndexOf(resumableRunning_, coro) == i) {
42+
// Check if this coroutine handle has already been resumed. This has
43+
// quadratic complexity, but appears to be faster than e.g. a Bloom
44+
// filter, because it takes fewer calculations and is a nice linear search
45+
// over a usually short vector.
46+
if (findFirstIndexOf(resumableRunning_, coro) == i) {
7847
// This is only true if the coroutine is a false positive in the bloom
7948
// filter, and has not been run before. The linear search is slow (but
8049
// not too slow), and only happens in the case of a false positive.
8150
coro.resume();
82-
} else {
83-
// This is most likely a SelectSet being awaited, with two coroutines
84-
// being ready at the same time.
8551
}
8652
}
8753
resumableRunning_.clear();
8854
++turns;
8955
}
9056
}
9157

92-
void Scheduler::close() { BOOST_ASSERT(resumableActive_.empty()); }
58+
void Scheduler::close() { BOOST_ASSERT(!resumableActive_.empty()); }
9359

9460
void Scheduler::enqueue(std::coroutine_handle<> handle) {
9561
// Use of moved-out Scheduler?
96-
BOOST_ASSERT(resumableActive_.capacity() != 0);
62+
BOOST_ASSERT(resumableActive_.capacity() > 0);
9763

9864
if (run_mode_ == RunMode::Immediate) {
9965
handle.resume();
@@ -108,9 +74,8 @@ void Scheduler::setUpLoop(uv_loop_t *loop) { uv_loop_set_data(loop, this); }
10874
Scheduler::~Scheduler() = default;
10975

11076
Scheduler::Scheduler(RunMode mode) : run_mode_{mode} {
111-
static constexpr size_t resumableBufferSize = 16;
112-
resumableActive_.reserve(resumableBufferSize);
113-
resumableRunning_.reserve(resumableBufferSize);
77+
resumableActive_.reserve(16);
78+
resumableRunning_.reserve(16);
11479
}
11580

11681
} // namespace uvco

uvco/loop/scheduler.h

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
#pragma once
44

5+
#include <array>
56
#include <boost/assert.hpp>
67
#include <uv.h>
78

@@ -62,9 +63,9 @@ class Scheduler {
6263
explicit Scheduler(RunMode mode = RunMode::Deferred);
6364

6465
Scheduler(const Scheduler &) = delete;
65-
Scheduler(Scheduler &&) = default;
66+
Scheduler(Scheduler &&) = delete;
6667
Scheduler &operator=(const Scheduler &) = delete;
67-
Scheduler &operator=(Scheduler &&) = default;
68+
Scheduler &operator=(Scheduler &&) = delete;
6869
~Scheduler();
6970

7071
/// Set up scheduler with event loop. This is required for all uvco
@@ -88,10 +89,10 @@ class Scheduler {
8889
[[nodiscard]] bool empty() const { return resumableActive_.empty(); }
8990

9091
private:
91-
// Vectors of coroutine handles to be resumed.
92-
std::vector<std::coroutine_handle<>> resumableActive_ = {};
93-
// Vectors of coroutines currently being resumed (while in runAll()).
94-
std::vector<std::coroutine_handle<>> resumableRunning_ = {};
92+
// Vector of coroutine handles to be resumed.
93+
std::vector<std::coroutine_handle<>> resumableActive_;
94+
// Vector of coroutines currently being resumed (while in runAll()).
95+
std::vector<std::coroutine_handle<>> resumableRunning_;
9596

9697
RunMode run_mode_;
9798
};

uvco/promise/multipromise.h

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,7 @@ template <typename T> class MultiPromiseCore : public PromiseCore<T> {
6262
/// next value.
6363
///
6464
/// Called by a `MultiPromiseAwaiter_` when the `MultiPromise` is
65-
/// `co_await`ed, so that the generator coroutine can continue yielding the
66-
/// next value.
65+
/// `co_await`ed, after a value has been successfully awaited.
6766
void resumeGenerator() {
6867
BOOST_ASSERT(PromiseCore<T>::state_ != PromiseState::finished);
6968
if (generatorHandle_) {
@@ -100,15 +99,17 @@ template <typename T> class MultiPromiseCore : public PromiseCore<T> {
10099
}
101100
}
102101

103-
/// Mark generator as finished, yielding no more values.
104-
///
105-
/// Called when the generator coroutine returns or throws.
102+
/// Mark generator as finished, yielding no more values. Called from within
103+
/// the MultiPromise upon return_value and unhandled_exception. From hereon
104+
/// awaiting the generator will either rethrow the thrown exception, or yield
105+
/// nullopt.
106106
void terminated() { terminated_ = true; }
107107

108108
/// Check if the generator has been cancelled or has returned.
109109
[[nodiscard]] bool isTerminated() const { return terminated_; }
110110

111111
private:
112+
/// Coroutine handle referring to the suspended generator.
112113
std::optional<std::coroutine_handle<>> generatorHandle_;
113114
/// Set to true once the generator coroutine has returned or has been
114115
/// cancelled.
@@ -161,7 +162,8 @@ template <typename T> class MultiPromise {
161162
}
162163
}
163164

164-
/// Obtain the next value yielded by a generator coroutine.
165+
/// Obtain the next value yielded by a generator coroutine. This is less
166+
/// efficient than awaiting the multipromise directly.
165167
Promise<std::optional<T>> next() { co_return (co_await *this); }
166168

167169
/// Return an awaiter for this MultiPromise, which resumes the waiting
@@ -174,7 +176,8 @@ template <typename T> class MultiPromise {
174176
return MultiPromiseAwaiter_{core_};
175177
}
176178

177-
/// Returns true if a value is available.
179+
/// Returns true if a value is available, or the generator has returned or
180+
/// thrown.
178181
bool ready() { return core_->slot.has_value() || core_->isTerminated(); }
179182

180183
/// Immediately cancel the suspended generator coroutine. This will drop all
@@ -230,21 +233,19 @@ template <typename T> class MultiPromise {
230233
if (!core_->slot) {
231234
// Terminated by co_return
232235
return std::nullopt;
233-
} else {
234-
switch (core_->slot->index()) {
235-
case 0: {
236-
std::optional<T> result = std::move(std::get<0>(core_->slot.value()));
237-
core_->slot.reset();
238-
return std::move(result);
239-
}
240-
case 1:
241-
// Terminated by exception
242-
BOOST_ASSERT(core_->isTerminated());
243-
std::rethrow_exception(std::get<1>(core_->slot.value()));
244-
default:
245-
throw UvcoException(
246-
"MultiPromiseAwaiter_::await_resume: invalid slot");
247-
}
236+
}
237+
switch (core_->slot->index()) {
238+
[[likely]] case 0: {
239+
std::optional<T> result = std::move(std::get<0>(core_->slot.value()));
240+
core_->slot.reset();
241+
return std::move(result);
242+
}
243+
case 1:
244+
// Terminated by exception
245+
BOOST_ASSERT(core_->isTerminated());
246+
std::rethrow_exception(std::get<1>(core_->slot.value()));
247+
default:
248+
throw UvcoException("MultiPromiseAwaiter_::await_resume: invalid slot");
248249
}
249250
}
250251

uvco/promise/promise.h

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -151,9 +151,8 @@ template <typename T> class Promise {
151151
default:
152152
throw UvcoException("PromiseAwaiter_::await_resume: invalid slot");
153153
}
154-
} else {
155-
throw UvcoException("unwrap called on unfulfilled promise");
156154
}
155+
throw UvcoException("unwrap called on unfulfilled promise");
157156
}
158157

159158
protected:
@@ -203,10 +202,8 @@ template <typename T> class Promise {
203202
default:
204203
throw UvcoException("PromiseAwaiter_::await_resume: invalid slot");
205204
}
206-
} else {
207-
throw UvcoException(
208-
"await_resume called on unfulfilled promise (bug?)");
209205
}
206+
throw UvcoException("await_resume called on unfulfilled promise (bug?)");
210207
}
211208

212209
PromiseCore_ &core_;

0 commit comments

Comments
 (0)