Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ThreadPool: optional limit for jobs queue #1741

Merged
merged 1 commit into from
Dec 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,17 @@ If you want to set the thread count at runtime, there is no convenient way... Bu
svr.new_task_queue = [] { return new ThreadPool(12); };
```

You can also provide an optional parameter to limit the maximum number
of pending requests, i.e. requests `accept()`ed by the listener but
still waiting to be serviced by worker threads.

```cpp
svr.new_task_queue = [] { return new ThreadPool(/*num_threads=*/12, /*max_queued_requests=*/18); };
```

Default limit is 0 (unlimited). Once the limit is reached, the listener
will shutdown the client connection.

### Override the default thread pool with yours

You can supply your own thread pool implementation according to your need.
Expand All @@ -444,8 +455,10 @@ public:
pool_.start_with_thread_count(n);
}

virtual void enqueue(std::function<void()> fn) override {
pool_.enqueue(fn);
virtual bool enqueue(std::function<void()> fn) override {
/* Return true if the task was actually enqueued, or false
* if the caller must drop the corresponding connection. */
return pool_.enqueue(fn);
}

virtual void shutdown() override {
Expand Down
18 changes: 14 additions & 4 deletions httplib.h
Original file line number Diff line number Diff line change
Expand Up @@ -582,15 +582,16 @@ class TaskQueue {
TaskQueue() = default;
virtual ~TaskQueue() = default;

virtual void enqueue(std::function<void()> fn) = 0;
virtual bool enqueue(std::function<void()> fn) = 0;
virtual void shutdown() = 0;

virtual void on_idle() {}
};

class ThreadPool : public TaskQueue {
public:
explicit ThreadPool(size_t n) : shutdown_(false) {
explicit ThreadPool(size_t n, size_t mqr = 0)
: shutdown_(false), max_queued_requests_(mqr) {
while (n) {
threads_.emplace_back(worker(*this));
n--;
Expand All @@ -600,13 +601,17 @@ class ThreadPool : public TaskQueue {
ThreadPool(const ThreadPool &) = delete;
~ThreadPool() override = default;

void enqueue(std::function<void()> fn) override {
bool enqueue(std::function<void()> fn) override {
{
std::unique_lock<std::mutex> lock(mutex_);
if (max_queued_requests_ > 0 && jobs_.size() >= max_queued_requests_) {
return false;
}
jobs_.push_back(std::move(fn));
}

cond_.notify_one();
return true;
}

void shutdown() override {
Expand Down Expand Up @@ -656,6 +661,7 @@ class ThreadPool : public TaskQueue {
std::list<std::function<void()>> jobs_;

bool shutdown_;
size_t max_queued_requests_ = 0;

std::condition_variable cond_;
std::mutex mutex_;
Expand Down Expand Up @@ -6242,7 +6248,11 @@ inline bool Server::listen_internal() {
#endif
}

task_queue->enqueue([this, sock]() { process_and_close_socket(sock); });
if (!task_queue->enqueue(
[this, sock]() { process_and_close_socket(sock); })) {
detail::shutdown_socket(sock);
detail::close_socket(sock);
}
}

task_queue->shutdown();
Expand Down
93 changes: 89 additions & 4 deletions test/test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6506,18 +6506,103 @@ TEST(SocketStream, is_writable_INET) {
#endif // #ifndef _WIN32

TEST(TaskQueueTest, IncreaseAtomicInteger) {
static constexpr unsigned int number_of_task{1000000};
static constexpr unsigned int number_of_tasks{1000000};
std::atomic_uint count{0};
std::unique_ptr<TaskQueue> task_queue{
new ThreadPool{CPPHTTPLIB_THREAD_POOL_COUNT}};

for (unsigned int i = 0; i < number_of_task; ++i) {
task_queue->enqueue(
for (unsigned int i = 0; i < number_of_tasks; ++i) {
auto queued = task_queue->enqueue(
[&count] { count.fetch_add(1, std::memory_order_relaxed); });
EXPECT_TRUE(queued);
}

EXPECT_NO_THROW(task_queue->shutdown());
EXPECT_EQ(number_of_tasks, count.load());
}

TEST(TaskQueueTest, IncreaseAtomicIntegerWithQueueLimit) {
static constexpr unsigned int number_of_tasks{1000000};
static constexpr unsigned int qlimit{2};
unsigned int queued_count{0};
std::atomic_uint count{0};
std::unique_ptr<TaskQueue> task_queue{
new ThreadPool{/*num_threads=*/1, qlimit}};

for (unsigned int i = 0; i < number_of_tasks; ++i) {
if (task_queue->enqueue(
[&count] { count.fetch_add(1, std::memory_order_relaxed); })) {
queued_count++;
}
}

EXPECT_NO_THROW(task_queue->shutdown());
EXPECT_EQ(queued_count, count.load());
EXPECT_TRUE(queued_count <= number_of_tasks);
EXPECT_TRUE(queued_count >= qlimit);
}
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you insert EXPECT_TRUE(queued_count < number_of_task); at line 6540?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the adjustment. But I suggested < instead of <=, so that we can confirm that task_queue->enqueue fails at least once.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although extremely unlikely, it is still possible that enqueue never fails.
The worker thread loop may be faster than the enqueue loop, in such a way that the latter will never find the queue full.

You can reproduce this condition by adding something like std::this_thread::sleep_for(std::chrono::microseconds(100)); in the enqueue loop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What we can add, instead, is EXPECT_TRUE(queued_count >= 2), because even in the worst case (worker thread starts after the enqueue loop) two enqueue operations will always succeed.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest, I would like to see something simpler which clearly shows that this edge case is properly handled. The original IncreaseAtomicInteger test isn't intended for this purpose, and I don't think it's a good idea to reuse it as base...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clang-format done.

What about the same test idea you proposed, but with proper synchronization?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks much nicer!

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could take a look at the failure in 'build (macOS-latest)' on Github Actions?
https://github.com/yhirose/cpp-httplib/actions/runs/7309110277/job/19916387047?pr=1741

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was a logic bug in the test. Now is fixed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I've run the test 10000 times and it always succeded.


TEST(TaskQueueTest, MaxQueuedRequests) {
static constexpr unsigned int qlimit{3};
std::unique_ptr<TaskQueue> task_queue{new ThreadPool{1, qlimit}};
std::condition_variable sem_cv;
std::mutex sem_mtx;
int credits = 0;
bool queued;

/* Fill up the queue with tasks that will block until we give them credits to
* complete. */
for (unsigned int n = 0; n <= qlimit;) {
queued = task_queue->enqueue([&sem_mtx, &sem_cv, &credits] {
std::unique_lock<std::mutex> lock(sem_mtx);
while (credits <= 0) {
sem_cv.wait(lock);
}
/* Consume the credit and signal the test code if they are all gone. */
if (--credits == 0) { sem_cv.notify_one(); }
});

if (n < qlimit) {
/* The first qlimit enqueues must succeed. */
EXPECT_TRUE(queued);
} else {
/* The last one will succeed only when the worker thread
* starts and dequeues the first blocking task. Although
* not necessary for the correctness of this test, we sleep for
* a short while to avoid busy waiting. */
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
if (queued) { n++; }
}

/* Further enqueues must fail since the queue is full. */
for (auto i = 0; i < 4; i++) {
queued = task_queue->enqueue([] {});
EXPECT_FALSE(queued);
}

/* Give the credits to allow the previous tasks to complete. */
{
std::unique_lock<std::mutex> lock(sem_mtx);
credits += qlimit + 1;
}
sem_cv.notify_all();

/* Wait for all the credits to be consumed. */
{
std::unique_lock<std::mutex> lock(sem_mtx);
while (credits > 0) {
sem_cv.wait(lock);
}
}

/* Check that we are able again to enqueue at least qlimit tasks. */
for (unsigned int i = 0; i < qlimit; i++) {
queued = task_queue->enqueue([] {});
EXPECT_TRUE(queued);
}

EXPECT_NO_THROW(task_queue->shutdown());
EXPECT_EQ(number_of_task, count.load());
}

TEST(RedirectTest, RedirectToUrlWithQueryParameters) {
Expand Down