From d711eec8502f2d72a14742baad944a200bf3308c Mon Sep 17 00:00:00 2001 From: Julian Brost Date: Fri, 5 Sep 2025 16:29:39 +0200 Subject: [PATCH 1/2] Remove CpuBoundWork The idea of CpuBoundWork was to prevent too many coroutines from performing long-running actions at the same time so that some worker threads are always available for other tasks. However, in practice, once all slots provided by CpuBoundWork were used, this would also block the handling of JSON-RPC messages, effectively bringing down the whole cluster communication. A replacement is added in the following commit. --- doc/19-technical-concepts.md | 15 -------- lib/base/io-engine.cpp | 58 ----------------------------- lib/base/io-engine.hpp | 44 ---------------------- lib/remote/eventshandler.cpp | 2 - lib/remote/httpserverconnection.cpp | 19 ++-------- lib/remote/jsonrpcconnection.cpp | 21 ++--------- 6 files changed, 7 insertions(+), 152 deletions(-) diff --git a/doc/19-technical-concepts.md b/doc/19-technical-concepts.md index d268ea7cf34..cc443bab921 100644 --- a/doc/19-technical-concepts.md +++ b/doc/19-technical-concepts.md @@ -1148,21 +1148,6 @@ hidden in Boost ASIO, Beast, Coroutine and Context libraries. #### Data Exchange: Coroutines and I/O Engine -Light-weight and fast operations such as connection handling or TLS handshakes -are performed in the default `IoBoundWorkSlot` pool inside the I/O engine. - -The I/O engine has another pool available: `CpuBoundWork`. - -This is used for processing CPU intensive tasks, such as handling a HTTP request. -Depending on the available CPU cores, this is limited to `std::thread::hardware_concurrency() * 3u / 2u`. - -``` -1 core * 3 / 2 = 1 -2 cores * 3 / 2 = 3 -8 cores * 3 / 2 = 12 -16 cores * 3 / 2 = 24 -``` - The I/O engine itself is used with all network I/O in Icinga, not only the cluster and the REST API. Features such as Graphite, InfluxDB, etc. also consume its functionality. diff --git a/lib/base/io-engine.cpp b/lib/base/io-engine.cpp index 0792be5cc1f..8a3dda0c94b 100644 --- a/lib/base/io-engine.cpp +++ b/lib/base/io-engine.cpp @@ -16,63 +16,6 @@ using namespace icinga; -CpuBoundWork::CpuBoundWork(boost::asio::yield_context yc) - : m_Done(false) -{ - auto& ioEngine (IoEngine::Get()); - - for (;;) { - auto availableSlots (ioEngine.m_CpuBoundSemaphore.fetch_sub(1)); - - if (availableSlots < 1) { - ioEngine.m_CpuBoundSemaphore.fetch_add(1); - IoEngine::YieldCurrentCoroutine(yc); - continue; - } - - break; - } -} - -CpuBoundWork::~CpuBoundWork() -{ - if (!m_Done) { - IoEngine::Get().m_CpuBoundSemaphore.fetch_add(1); - } -} - -void CpuBoundWork::Done() -{ - if (!m_Done) { - IoEngine::Get().m_CpuBoundSemaphore.fetch_add(1); - - m_Done = true; - } -} - -IoBoundWorkSlot::IoBoundWorkSlot(boost::asio::yield_context yc) - : yc(yc) -{ - IoEngine::Get().m_CpuBoundSemaphore.fetch_add(1); -} - -IoBoundWorkSlot::~IoBoundWorkSlot() -{ - auto& ioEngine (IoEngine::Get()); - - for (;;) { - auto availableSlots (ioEngine.m_CpuBoundSemaphore.fetch_sub(1)); - - if (availableSlots < 1) { - ioEngine.m_CpuBoundSemaphore.fetch_add(1); - IoEngine::YieldCurrentCoroutine(yc); - continue; - } - - break; - } -} - LazyInit> IoEngine::m_Instance ([]() { return std::unique_ptr(new IoEngine()); }); IoEngine& IoEngine::Get() @@ -88,7 +31,6 @@ boost::asio::io_context& IoEngine::GetIoContext() IoEngine::IoEngine() : m_IoContext(), m_KeepAlive(boost::asio::make_work_guard(m_IoContext)), m_Threads(decltype(m_Threads)::size_type(Configuration::Concurrency * 2u)), m_AlreadyExpiredTimer(m_IoContext) { m_AlreadyExpiredTimer.expires_at(boost::posix_time::neg_infin); - m_CpuBoundSemaphore.store(Configuration::Concurrency * 3u / 2u); for (auto& thread : m_Threads) { thread = std::thread(&IoEngine::RunEventLoop, this); diff --git a/lib/base/io-engine.hpp b/lib/base/io-engine.hpp index 0883d7810e9..efc03f8307f 100644 --- a/lib/base/io-engine.hpp +++ b/lib/base/io-engine.hpp @@ -29,46 +29,6 @@ namespace icinga { -/** - * Scope lock for CPU-bound work done in an I/O thread - * - * @ingroup base - */ -class CpuBoundWork -{ -public: - CpuBoundWork(boost::asio::yield_context yc); - CpuBoundWork(const CpuBoundWork&) = delete; - CpuBoundWork(CpuBoundWork&&) = delete; - CpuBoundWork& operator=(const CpuBoundWork&) = delete; - CpuBoundWork& operator=(CpuBoundWork&&) = delete; - ~CpuBoundWork(); - - void Done(); - -private: - bool m_Done; -}; - -/** - * Scope break for CPU-bound work done in an I/O thread - * - * @ingroup base - */ -class IoBoundWorkSlot -{ -public: - IoBoundWorkSlot(boost::asio::yield_context yc); - IoBoundWorkSlot(const IoBoundWorkSlot&) = delete; - IoBoundWorkSlot(IoBoundWorkSlot&&) = delete; - IoBoundWorkSlot& operator=(const IoBoundWorkSlot&) = delete; - IoBoundWorkSlot& operator=(IoBoundWorkSlot&&) = delete; - ~IoBoundWorkSlot(); - -private: - boost::asio::yield_context yc; -}; - /** * Async I/O engine * @@ -76,9 +36,6 @@ class IoBoundWorkSlot */ class IoEngine { - friend CpuBoundWork; - friend IoBoundWorkSlot; - public: IoEngine(const IoEngine&) = delete; IoEngine(IoEngine&&) = delete; @@ -150,7 +107,6 @@ class IoEngine boost::asio::executor_work_guard m_KeepAlive; std::vector m_Threads; boost::asio::deadline_timer m_AlreadyExpiredTimer; - std::atomic_int_fast32_t m_CpuBoundSemaphore; }; class TerminateIoThread : public std::exception diff --git a/lib/remote/eventshandler.cpp b/lib/remote/eventshandler.cpp index 1b7798c04ac..8a2d592638f 100644 --- a/lib/remote/eventshandler.cpp +++ b/lib/remote/eventshandler.cpp @@ -100,8 +100,6 @@ bool EventsHandler::HandleRequest( EventsSubscriber subscriber (std::move(eventTypes), HttpUtility::GetLastParameter(params, "filter"), l_ApiQuery); - IoBoundWorkSlot dontLockTheIoThread (yc); - response.result(http::status::ok); response.set(http::field::content_type, "application/json"); response.StartStreaming(true); diff --git a/lib/remote/httpserverconnection.cpp b/lib/remote/httpserverconnection.cpp index 39fa2d79def..f61686b4e04 100644 --- a/lib/remote/httpserverconnection.cpp +++ b/lib/remote/httpserverconnection.cpp @@ -416,16 +416,10 @@ void ProcessRequest( HttpRequest& request, HttpResponse& response, const WaitGroup::Ptr& waitGroup, - std::chrono::steady_clock::duration& cpuBoundWorkTime, boost::asio::yield_context& yc ) { try { - // Cache the elapsed time to acquire a CPU semaphore used to detect extremely heavy workloads. - auto start (std::chrono::steady_clock::now()); - CpuBoundWork handlingRequest (yc); - cpuBoundWorkTime = std::chrono::steady_clock::now() - start; - HttpHandler::ProcessRequest(waitGroup, request, response, yc); response.body().Finish(); } catch (const std::exception& ex) { @@ -497,14 +491,9 @@ void HttpServerConnection::ProcessMessages(boost::asio::yield_context yc) << ", user: " << (request.User() ? request.User()->GetName() : "") << ", agent: " << request[http::field::user_agent]; //operator[] - Returns the value for a field, or "" if it does not exist. - ch::steady_clock::duration cpuBoundWorkTime(0); - Defer addRespCode ([&response, start, &logMsg, &cpuBoundWorkTime]() { - logMsg << ", status: " << response.result() << ")"; - if (cpuBoundWorkTime >= ch::seconds(1)) { - logMsg << " waited " << ch::duration_cast(cpuBoundWorkTime).count() << "ms on semaphore and"; - } - - logMsg << " took total " << ch::duration_cast(ch::steady_clock::now() - start).count() << "ms."; + Defer addRespCode ([&response, start, &logMsg]() { + logMsg << ", status: " << response.result() << ")" << " took total " + << ch::duration_cast(ch::steady_clock::now() - start).count() << "ms."; }); if (!HandleAccessControl(request, response, yc)) { @@ -525,7 +514,7 @@ void HttpServerConnection::ProcessMessages(boost::asio::yield_context yc) m_Seen = ch::steady_clock::time_point::max(); - ProcessRequest(request, response, m_WaitGroup, cpuBoundWorkTime, yc); + ProcessRequest(request, response, m_WaitGroup, yc); if (!request.keep_alive() || !m_ConnectionReusable) { break; diff --git a/lib/remote/jsonrpcconnection.cpp b/lib/remote/jsonrpcconnection.cpp index 0dab1ed5f8c..21b1845c617 100644 --- a/lib/remote/jsonrpcconnection.cpp +++ b/lib/remote/jsonrpcconnection.cpp @@ -87,15 +87,9 @@ void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc) } String rpcMethod("UNKNOWN"); - ch::steady_clock::duration cpuBoundDuration(0); auto start (ch::steady_clock::now()); try { - CpuBoundWork handleMessage (yc); - - // Cache the elapsed time to acquire a CPU semaphore used to detect extremely heavy workloads. - cpuBoundDuration = ch::steady_clock::now() - start; - Dictionary::Ptr message = JsonRpc::DecodeMessage(jsonString); if (String method = message->Get("method"); !method.IsEmpty()) { rpcMethod = std::move(method); @@ -112,23 +106,14 @@ void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc) Log msg(total >= ch::seconds(5) ? LogWarning : LogDebug, "JsonRpcConnection"); msg << "Processed JSON-RPC '" << rpcMethod << "' message for identity '" << m_Identity - << "' (took total " << toMilliseconds(total) << "ms"; - - if (cpuBoundDuration >= ch::seconds(1)) { - msg << ", waited " << toMilliseconds(cpuBoundDuration) << "ms on semaphore"; - } - msg << ")."; + << "' (took total " << toMilliseconds(total) << "ms" << ")."; } catch (const std::exception& ex) { auto total = ch::steady_clock::now() - start; Log msg(m_ShuttingDown ? LogDebug : LogWarning, "JsonRpcConnection"); msg << "Error while processing JSON-RPC '" << rpcMethod << "' message for identity '" - << m_Identity << "' (took total " << toMilliseconds(total) << "ms"; - - if (cpuBoundDuration >= ch::seconds(1)) { - msg << ", waited " << toMilliseconds(cpuBoundDuration) << "ms on semaphore"; - } - msg << "): " << DiagnosticInformation(ex); + << m_Identity << "' (took total " << toMilliseconds(total) << "ms" << "): " + << DiagnosticInformation(ex); break; } From 644009ff140e033b86ede7398f835b2d4ff3bb34 Mon Sep 17 00:00:00 2001 From: Julian Brost Date: Fri, 26 Sep 2025 10:35:49 +0200 Subject: [PATCH 2/2] Limit the number of concurrent HTTP requests This serves as a replacement for CpuBoundWork removed in the previous commit. The core idea is still similar, the main differences are: - It is no longer used during JSON-RPC message handling. The corresponding handlers are rather quick, so they don't block the threads for long. Additionally, JSON-RPC message handling is essential for an Icinga 2 cluster to work, so making them wait for something makes little sense. - There's no more operation to wait a slot. Instead, HTTP requests are now rejected with a 503 Service Unavailable error if there's no slot available. This means that if there is too much load on an Icinga 2 instance from HTTP requests, this shows in error messages instead of more and more waiting requests accumulating and increased response times. This commit does not limit the total number of running HTTP requests. In particular, those streaming the response (for example using chunked encoding) are no longer counted once they enter the streaming phase. There is a very specific reason for this: otherwise, a slow or malicious client would block the slot for the whole time it's reading the response, which could take a while. If that happens on multiple connections, the whole pool could be blocked by clients reading responses very slowly. --- lib/base/io-engine.cpp | 33 ++++++++++++++++++++++++++++- lib/base/io-engine.hpp | 6 ++++++ lib/remote/httpmessage.cpp | 20 +++++++++++++++++ lib/remote/httpmessage.hpp | 4 ++++ lib/remote/httpserverconnection.cpp | 28 ++++++++++++++++++++++++ 5 files changed, 90 insertions(+), 1 deletion(-) diff --git a/lib/base/io-engine.cpp b/lib/base/io-engine.cpp index 8a3dda0c94b..c04b9b78760 100644 --- a/lib/base/io-engine.cpp +++ b/lib/base/io-engine.cpp @@ -28,7 +28,12 @@ boost::asio::io_context& IoEngine::GetIoContext() return m_IoContext; } -IoEngine::IoEngine() : m_IoContext(), m_KeepAlive(boost::asio::make_work_guard(m_IoContext)), m_Threads(decltype(m_Threads)::size_type(Configuration::Concurrency * 2u)), m_AlreadyExpiredTimer(m_IoContext) +IoEngine::IoEngine() + : m_IoContext(), + m_KeepAlive(boost::asio::make_work_guard(m_IoContext)), + m_Threads(decltype(m_Threads)::size_type(Configuration::Concurrency * 2u)), + m_AlreadyExpiredTimer(m_IoContext), + m_SlowSlotsAvailable(Configuration::Concurrency) { m_AlreadyExpiredTimer.expires_at(boost::posix_time::neg_infin); @@ -66,6 +71,32 @@ void IoEngine::RunEventLoop() } } +/** + * Try to acquire a slot for a slow operation. This is intended to limit the number of concurrent slow operations. In + * case no slot is returned, the caller should reject the operation (for example by sending an HTTP error) to prevent an + * overload situation. + * + * @return A RAII-style object representing the slot. operator bool() can be used to check if the operation was + * successful and the caller now owns a slot. Its destructor automatically releases the slot. + */ +IoEngine::SlowSlot IoEngine::TryAcquireSlowSlot() +{ + // This is basically an ad-hoc (partial) semaphore implementation. + // TODO(C++20): Use std::counting_semaphore instead. + + std::unique_lock lock(m_SlowSlotsMutex); + if (m_SlowSlotsAvailable > 0) { + m_SlowSlotsAvailable--; + lock.unlock(); + + return std::make_unique([this] { + std::unique_lock lock(m_SlowSlotsMutex); + m_SlowSlotsAvailable++; + }); + } + return {}; +} + AsioEvent::AsioEvent(boost::asio::io_context& io, bool init) : m_Timer(io) { diff --git a/lib/base/io-engine.hpp b/lib/base/io-engine.hpp index efc03f8307f..42fa1487116 100644 --- a/lib/base/io-engine.hpp +++ b/lib/base/io-engine.hpp @@ -5,6 +5,7 @@ #include "base/atomic.hpp" #include "base/debug.hpp" +#include "base/defer.hpp" #include "base/exception.hpp" #include "base/lazy-init.hpp" #include "base/logger.hpp" @@ -96,6 +97,9 @@ class IoEngine Get().m_AlreadyExpiredTimer.async_wait(yc); } + using SlowSlot = std::unique_ptr; + SlowSlot TryAcquireSlowSlot(); + private: IoEngine(); @@ -107,6 +111,8 @@ class IoEngine boost::asio::executor_work_guard m_KeepAlive; std::vector m_Threads; boost::asio::deadline_timer m_AlreadyExpiredTimer; + std::mutex m_SlowSlotsMutex; + int m_SlowSlotsAvailable; }; class TerminateIoThread : public std::exception diff --git a/lib/remote/httpmessage.cpp b/lib/remote/httpmessage.cpp index 18e5a301641..cf750d493a7 100644 --- a/lib/remote/httpmessage.cpp +++ b/lib/remote/httpmessage.cpp @@ -129,6 +129,10 @@ void HttpResponse::Flush(boost::asio::yield_context yc) prepare_payload(); } + // If this request currently owns a slow request slot, release it as is must not be held while sending to the client + // (otherwise, waiting for a slow client to read its data would block the slot). + m_SlowSlot.reset(); + m_SerializationStarted = true; if (!m_Serializer.is_header_done()) { @@ -194,3 +198,19 @@ JsonEncoder HttpResponse::GetJsonEncoder(bool pretty) { return JsonEncoder{std::make_shared(*this), pretty}; } + +/** + * Tries to acquire a slow slot using ApiListener::TryAcquireSlowSlot(). If this was successful, that slot will be + * owned by this HttpResponse object and is automatically released when the next write operation is performed using + * Flush() or any other method that calls it. + * + * In case no ApiListener is configured (only relevant for tests), no limiting of concurrent requests takes place and + * this method always returns true to allow callers to just check the return value to determine whether to continue. + * + * @return Whether the operation was successful and the handler can continue. + */ +bool HttpResponse::TryAcquireSlowSlot() +{ + m_SlowSlot = IoEngine::Get().TryAcquireSlowSlot(); + return bool(m_SlowSlot); +} diff --git a/lib/remote/httpmessage.hpp b/lib/remote/httpmessage.hpp index 10d00fd4989..1aeb516a1b5 100644 --- a/lib/remote/httpmessage.hpp +++ b/lib/remote/httpmessage.hpp @@ -5,6 +5,7 @@ #include "base/dictionary.hpp" #include "base/json.hpp" #include "base/tlsstream.hpp" +#include "remote/apilistener.hpp" #include "remote/apiuser.hpp" #include "remote/httpserverconnection.hpp" #include "remote/url.hpp" @@ -269,6 +270,8 @@ class HttpResponse : public boost::beast::http::response; Serializer m_Serializer{*this}; @@ -276,6 +279,7 @@ class HttpResponse : public boost::beast::http::response::Ptr m_Stream; + IoEngine::SlowSlot m_SlowSlot; }; } // namespace icinga diff --git a/lib/remote/httpserverconnection.cpp b/lib/remote/httpserverconnection.cpp index f61686b4e04..380f303548e 100644 --- a/lib/remote/httpserverconnection.cpp +++ b/lib/remote/httpserverconnection.cpp @@ -420,6 +420,34 @@ void ProcessRequest( ) { try { + /* Place some restrictions on the total number of HTTP requests handled concurrently to prevent HTTP requests + * from hogging the entire coroutine thread pool by running too many requests handlers at once that don't + * regularly yield, starving other coroutines. + * + * We need to consider two types of handlers here: + * + * 1. Those performing a more or less expensive operation and then returning the whole response at once. + * Not too many of such handlers should run concurrently. + * 2. Those already streaming the response while they are running, for example using chunked transfer encoding. + * For these, we assume that they will frequently yield to other coroutines, in particular when writing parts + * of the response to the client, or in case of EventsHandler also when waiting for new events. + * + * The following approach handles both of this automatically: we acquire one of a limited number of slots for + * each request and release it automatically the first time anything (either the full response after the handler + * finished or the first chunk from within the handler) is written using the response object. This means that + * we don't have to handle acquiring or releasing that slot inside individual handlers. + * + * Overall, this is more or less a safeguard preventing long-running HTTP handlers from taking down the entire + * Icinga 2 process by blocking the execution of JSON-RPC message handlers. In general, (new) HTTP handlers + * shouldn't rely on this behavior but rather ensure that they are quick or at least yield regularly. + */ + if (!response.TryAcquireSlowSlot()) { + HttpUtility::SendJsonError(response, request.Params(), 503, + "Too many requests already in progress, please try again later."); + response.Flush(yc); + return; + } + HttpHandler::ProcessRequest(waitGroup, request, response, yc); response.body().Finish(); } catch (const std::exception& ex) {