Skip to content

Commit

Permalink
ThreadPool: optional limit for jobs queue
Browse files Browse the repository at this point in the history
For very busy servers, the internal jobs queue where accepted
sockets are enqueued can grow without limit.
This is a problem for two reasons:
 - queueing too much work causes the server to respond with huge latency,
   resulting in repetead timeouts on the clients; it is definitely
   better to reject the connection early, so that the client
   receives the backpressure signal as soon as the queue is
   becoming too large
 - the jobs list can eventually cause an out of memory condition
  • Loading branch information
vmaffione committed Dec 22, 2023
1 parent 37f8dc4 commit db4bbb6
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 10 deletions.
17 changes: 15 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,17 @@ If you want to set the thread count at runtime, there is no convenient way... Bu
svr.new_task_queue = [] { return new ThreadPool(12); };
```
You can also provide an optional parameter to limit the maximum number
of pending requests, i.e. requests `accept()`ed by the listener but
still waiting to be serviced by worker threads.
```cpp
svr.new_task_queue = [] { return new ThreadPool(/*num_threads=*/12, /*max_queued_requests=*/18); };
```

Default limit is 0 (unlimited). Once the limit is reached, the listener
will shutdown the client connection.

### Override the default thread pool with yours

You can supply your own thread pool implementation according to your need.
Expand All @@ -444,8 +455,10 @@ public:
pool_.start_with_thread_count(n);
}

virtual void enqueue(std::function<void()> fn) override {
pool_.enqueue(fn);
virtual bool enqueue(std::function<void()> fn) override {
/* Return true if the task was actually enqueued, or false
* if the caller must drop the corresponding connection. */
return pool_.enqueue(fn);
}

virtual void shutdown() override {
Expand Down
16 changes: 12 additions & 4 deletions httplib.h
Original file line number Diff line number Diff line change
Expand Up @@ -582,15 +582,15 @@ class TaskQueue {
TaskQueue() = default;
virtual ~TaskQueue() = default;

virtual void enqueue(std::function<void()> fn) = 0;
virtual bool enqueue(std::function<void()> fn) = 0;
virtual void shutdown() = 0;

virtual void on_idle() {}
};

class ThreadPool : public TaskQueue {
public:
explicit ThreadPool(size_t n) : shutdown_(false) {
explicit ThreadPool(size_t n, size_t mqr = 0) : shutdown_(false), max_queued_requests_(mqr) {
while (n) {
threads_.emplace_back(worker(*this));
n--;
Expand All @@ -600,13 +600,17 @@ class ThreadPool : public TaskQueue {
ThreadPool(const ThreadPool &) = delete;
~ThreadPool() override = default;

void enqueue(std::function<void()> fn) override {
bool enqueue(std::function<void()> fn) override {
{
std::unique_lock<std::mutex> lock(mutex_);
if (max_queued_requests_ > 0 && jobs_.size() >= max_queued_requests_) {
return false;
}
jobs_.push_back(std::move(fn));
}

cond_.notify_one();
return true;
}

void shutdown() override {
Expand Down Expand Up @@ -656,6 +660,7 @@ class ThreadPool : public TaskQueue {
std::list<std::function<void()>> jobs_;

bool shutdown_;
size_t max_queued_requests_ = 0;

std::condition_variable cond_;
std::mutex mutex_;
Expand Down Expand Up @@ -6242,7 +6247,10 @@ inline bool Server::listen_internal() {
#endif
}

task_queue->enqueue([this, sock]() { process_and_close_socket(sock); });
if (!task_queue->enqueue([this, sock]() { process_and_close_socket(sock); })) {
detail::shutdown_socket(sock);
detail::close_socket(sock);
}
}

task_queue->shutdown();
Expand Down
30 changes: 26 additions & 4 deletions test/test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6506,18 +6506,40 @@ TEST(SocketStream, is_writable_INET) {
#endif // #ifndef _WIN32

TEST(TaskQueueTest, IncreaseAtomicInteger) {
static constexpr unsigned int number_of_task{1000000};
static constexpr unsigned int number_of_tasks{1000000};
std::atomic_uint count{0};
std::unique_ptr<TaskQueue> task_queue{
new ThreadPool{CPPHTTPLIB_THREAD_POOL_COUNT}};

for (unsigned int i = 0; i < number_of_task; ++i) {
task_queue->enqueue(
for (unsigned int i = 0; i < number_of_tasks; ++i) {
auto queued = task_queue->enqueue(
[&count] { count.fetch_add(1, std::memory_order_relaxed); });
EXPECT_TRUE(queued);
}

EXPECT_NO_THROW(task_queue->shutdown());
EXPECT_EQ(number_of_task, count.load());
EXPECT_EQ(number_of_tasks, count.load());
}

TEST(TaskQueueTest, IncreaseAtomicIntegerWithQueueLimit) {
static constexpr unsigned int number_of_tasks{1000000};
static constexpr unsigned int qlimit{2};
unsigned int queued_count{0};
std::atomic_uint count{0};
std::unique_ptr<TaskQueue> task_queue{
new ThreadPool{/*num_threads=*/1, qlimit}};

for (unsigned int i = 0; i < number_of_tasks; ++i) {
if (task_queue->enqueue(
[&count] { count.fetch_add(1, std::memory_order_relaxed); })) {
queued_count++;
}
}

EXPECT_NO_THROW(task_queue->shutdown());
EXPECT_EQ(queued_count, count.load());
EXPECT_TRUE(queued_count <= number_of_tasks);
EXPECT_TRUE(queued_count >= qlimit);
}

TEST(RedirectTest, RedirectToUrlWithQueryParameters) {
Expand Down

0 comments on commit db4bbb6

Please sign in to comment.