diff --git a/lib/base/io-engine.cpp b/lib/base/io-engine.cpp index 0792be5cc1..79de418ff2 100644 --- a/lib/base/io-engine.cpp +++ b/lib/base/io-engine.cpp @@ -16,60 +16,116 @@ using namespace icinga; -CpuBoundWork::CpuBoundWork(boost::asio::yield_context yc) +/** + * Acquires a slot for CPU-bound work. + * + * If and as long as the lock-free TryAcquireSlot() doesn't succeed, + * subscribes to the slow path by waiting on a condition variable. + * It is woken up by Done() which is called by the destructor. + * + * @param yc Needed to asynchronously wait for the condition variable. + * @param strand Where to post the wake-up of the condition variable. + */ +CpuBoundWork::CpuBoundWork(boost::asio::yield_context yc, boost::asio::io_context::strand& strand) : m_Done(false) { - auto& ioEngine (IoEngine::Get()); + VERIFY(strand.running_in_this_thread()); - for (;;) { - auto availableSlots (ioEngine.m_CpuBoundSemaphore.fetch_sub(1)); + auto& ie (IoEngine::Get()); + Shared::Ptr cv; + + while (!TryAcquireSlot()) { + if (!cv) { + cv = Shared::Make(ie.GetIoContext()); + } - if (availableSlots < 1) { - ioEngine.m_CpuBoundSemaphore.fetch_add(1); - IoEngine::YieldCurrentCoroutine(yc); - continue; + { + std::unique_lock lock (ie.m_CpuBoundWaitingMutex); + + // The above lines may take a little bit, so let's optimistically re-check. + // Also mitigate lost wake-ups by re-checking during the lock: + // + // During our lock, Done() can't retrieve the subscribers to wake up, + // so any ongoing wake-up is either done at this point or has not started yet. + // If such a wake-up is done, it's a lost wake-up to us unless we re-check here + // whether the slot being freed (just before the wake-up) is still available. + if (TryAcquireSlot()) { + break; + } + + // If the (hypothetical) slot mentioned above was taken by another coroutine, + // there are no free slots again, just as if no wake-ups happened just now. + ie.m_CpuBoundWaiting.emplace_back(strand, cv); } - break; + cv->Wait(yc); } } -CpuBoundWork::~CpuBoundWork() +/** + * Tries to acquire a slot for CPU-bound work. + * + * Specifically, decrements the number of free slots (semaphore) by one, + * but only if it's currently greater than zero. + * Not falling below zero requires an atomic#compare_exchange_weak() loop + * instead of a simple atomic#fetch_sub() call, but it's also atomic. + * + * @return Whether a slot was acquired. + */ +bool CpuBoundWork::TryAcquireSlot() { - if (!m_Done) { - IoEngine::Get().m_CpuBoundSemaphore.fetch_add(1); + auto& ie (IoEngine::Get()); + auto freeSlots (ie.m_CpuBoundSemaphore.load()); + + while (freeSlots > 0u) { + // If ie.m_CpuBoundSemaphore was changed after the last load, + // compare_exchange_weak() will load its latest value into freeSlots for us to retry until... + if (ie.m_CpuBoundSemaphore.compare_exchange_weak(freeSlots, freeSlots - 1u)) { + // ... either we successfully decrement ie.m_CpuBoundSemaphore by one, ... + return true; + } } + + // ... or it becomes zero due to another coroutine. + return false; } +/** + * Releases the own slot acquired by the constructor (TryAcquireSlot()) if not already done. + * + * Precisely, increments the number of free slots (semaphore) by one. + * Also wakes up all waiting constructors (slow path) if necessary. + */ void CpuBoundWork::Done() { if (!m_Done) { - IoEngine::Get().m_CpuBoundSemaphore.fetch_add(1); - m_Done = true; - } -} - -IoBoundWorkSlot::IoBoundWorkSlot(boost::asio::yield_context yc) - : yc(yc) -{ - IoEngine::Get().m_CpuBoundSemaphore.fetch_add(1); -} -IoBoundWorkSlot::~IoBoundWorkSlot() -{ - auto& ioEngine (IoEngine::Get()); - - for (;;) { - auto availableSlots (ioEngine.m_CpuBoundSemaphore.fetch_sub(1)); - - if (availableSlots < 1) { - ioEngine.m_CpuBoundSemaphore.fetch_add(1); - IoEngine::YieldCurrentCoroutine(yc); - continue; + auto& ie (IoEngine::Get()); + + // The constructor takes the slow path only if the semaphore is full, + // so we only have to wake up constructors if the semaphore was full. + // This works because after fetch_add(), TryAcquireSlot() (fast path) will succeed. + if (ie.m_CpuBoundSemaphore.fetch_add(1) == 0u) { + // So now there are only slow path subscribers from just before the fetch_add() to be woken up. + // Precisely, only subscribers from just before the fetch_add() which turned 0 to 1. + + decltype(ie.m_CpuBoundWaiting) subscribers; + + { + // Locking after fetch_add() is safe because a delayed wake-up is fine. + // Wake-up of constructors which subscribed after the fetch_add() is also not a problem. + // In worst case, they will just re-subscribe to the slow path. + // Lost wake-ups are mitigated by the constructor, see its implementation comments. + std::unique_lock lock (ie.m_CpuBoundWaitingMutex); + std::swap(subscribers, ie.m_CpuBoundWaiting); + } + + // Again, a delayed wake-up is fine, hence unlocked. + for (auto& [strand, cv] : subscribers) { + boost::asio::post(strand, [cv = std::move(cv)] { cv->NotifyOne(); }); + } } - - break; } } @@ -85,9 +141,8 @@ boost::asio::io_context& IoEngine::GetIoContext() return m_IoContext; } -IoEngine::IoEngine() : m_IoContext(), m_KeepAlive(boost::asio::make_work_guard(m_IoContext)), m_Threads(decltype(m_Threads)::size_type(Configuration::Concurrency * 2u)), m_AlreadyExpiredTimer(m_IoContext) +IoEngine::IoEngine() : m_IoContext(), m_KeepAlive(boost::asio::make_work_guard(m_IoContext)), m_Threads(decltype(m_Threads)::size_type(Configuration::Concurrency * 2u)) { - m_AlreadyExpiredTimer.expires_at(boost::posix_time::neg_infin); m_CpuBoundSemaphore.store(Configuration::Concurrency * 3u / 2u); for (auto& thread : m_Threads) { @@ -173,6 +228,30 @@ void AsioDualEvent::WaitForClear(boost::asio::yield_context yc) m_IsFalse.Wait(std::move(yc)); } +AsioConditionVariable::AsioConditionVariable(boost::asio::io_context& io) + : m_Timer(io) +{ + m_Timer.expires_at(boost::posix_time::pos_infin); +} + +void AsioConditionVariable::Wait(boost::asio::yield_context yc) +{ + boost::system::error_code ec; + m_Timer.async_wait(yc[ec]); +} + +bool AsioConditionVariable::NotifyOne() +{ + boost::system::error_code ec; + return m_Timer.cancel_one(ec); +} + +size_t AsioConditionVariable::NotifyAll() +{ + boost::system::error_code ec; + return m_Timer.cancel(ec); +} + /** * Cancels any pending timeout callback. * diff --git a/lib/base/io-engine.hpp b/lib/base/io-engine.hpp index 0883d7810e..f2477184d4 100644 --- a/lib/base/io-engine.hpp +++ b/lib/base/io-engine.hpp @@ -12,6 +12,7 @@ #include #include #include +#include #include #include #include @@ -20,6 +21,7 @@ #include #include #include +#include #include #if BOOST_VERSION >= 108700 @@ -37,36 +39,41 @@ namespace icinga class CpuBoundWork { public: - CpuBoundWork(boost::asio::yield_context yc); + CpuBoundWork(boost::asio::yield_context yc, boost::asio::io_context::strand&); CpuBoundWork(const CpuBoundWork&) = delete; CpuBoundWork(CpuBoundWork&&) = delete; CpuBoundWork& operator=(const CpuBoundWork&) = delete; CpuBoundWork& operator=(CpuBoundWork&&) = delete; - ~CpuBoundWork(); + + inline ~CpuBoundWork() + { + Done(); + } void Done(); private: + static bool TryAcquireSlot(); + bool m_Done; }; /** - * Scope break for CPU-bound work done in an I/O thread + * Condition variable which doesn't block I/O threads * * @ingroup base */ -class IoBoundWorkSlot +class AsioConditionVariable { public: - IoBoundWorkSlot(boost::asio::yield_context yc); - IoBoundWorkSlot(const IoBoundWorkSlot&) = delete; - IoBoundWorkSlot(IoBoundWorkSlot&&) = delete; - IoBoundWorkSlot& operator=(const IoBoundWorkSlot&) = delete; - IoBoundWorkSlot& operator=(IoBoundWorkSlot&&) = delete; - ~IoBoundWorkSlot(); + AsioConditionVariable(boost::asio::io_context& io); + + void Wait(boost::asio::yield_context yc); + bool NotifyOne(); + size_t NotifyAll(); private: - boost::asio::yield_context yc; + boost::asio::deadline_timer m_Timer; }; /** @@ -77,7 +84,6 @@ class IoBoundWorkSlot class IoEngine { friend CpuBoundWork; - friend IoBoundWorkSlot; public: IoEngine(const IoEngine&) = delete; @@ -133,12 +139,6 @@ class IoEngine #endif // BOOST_VERSION >= 108700 } - static inline - void YieldCurrentCoroutine(boost::asio::yield_context yc) - { - Get().m_AlreadyExpiredTimer.async_wait(yc); - } - private: IoEngine(); @@ -149,8 +149,10 @@ class IoEngine boost::asio::io_context m_IoContext; boost::asio::executor_work_guard m_KeepAlive; std::vector m_Threads; - boost::asio::deadline_timer m_AlreadyExpiredTimer; - std::atomic_int_fast32_t m_CpuBoundSemaphore; + + std::atomic_uint_fast32_t m_CpuBoundSemaphore; + std::mutex m_CpuBoundWaitingMutex; + std::vector::Ptr>> m_CpuBoundWaiting; }; class TerminateIoThread : public std::exception diff --git a/lib/remote/eventshandler.cpp b/lib/remote/eventshandler.cpp index 1b7798c04a..8a2d592638 100644 --- a/lib/remote/eventshandler.cpp +++ b/lib/remote/eventshandler.cpp @@ -100,8 +100,6 @@ bool EventsHandler::HandleRequest( EventsSubscriber subscriber (std::move(eventTypes), HttpUtility::GetLastParameter(params, "filter"), l_ApiQuery); - IoBoundWorkSlot dontLockTheIoThread (yc); - response.result(http::status::ok); response.set(http::field::content_type, "application/json"); response.StartStreaming(true); diff --git a/lib/remote/httpmessage.cpp b/lib/remote/httpmessage.cpp index 18e5a30164..242077c7eb 100644 --- a/lib/remote/httpmessage.cpp +++ b/lib/remote/httpmessage.cpp @@ -156,6 +156,12 @@ void HttpResponse::StartStreaming(bool checkForDisconnect) chunked(true); if (checkForDisconnect) { + auto work (m_CpuBoundWork.lock()); + + if (work) { + work->Done(); + } + ASSERT(m_Server); m_Server->StartDetectClientSideShutdown(); } diff --git a/lib/remote/httpmessage.hpp b/lib/remote/httpmessage.hpp index 10d00fd498..2376a6453d 100644 --- a/lib/remote/httpmessage.hpp +++ b/lib/remote/httpmessage.hpp @@ -3,6 +3,7 @@ #pragma once #include "base/dictionary.hpp" +#include "base/io-engine.hpp" #include "base/json.hpp" #include "base/tlsstream.hpp" #include "remote/apiuser.hpp" @@ -10,6 +11,8 @@ #include "remote/url.hpp" #include #include +#include +#include namespace icinga { @@ -217,6 +220,11 @@ class HttpResponse : public boost::beast::http::response cbw) + { + m_CpuBoundWork = std::move(cbw); + } + /** * Writes as much of the response as is currently available. * @@ -273,6 +281,7 @@ class HttpResponse : public boost::beast::http::response; Serializer m_Serializer{*this}; bool m_SerializationStarted = false; + std::weak_ptr m_CpuBoundWork; HttpServerConnection::Ptr m_Server; Shared::Ptr m_Stream; diff --git a/lib/remote/httpserverconnection.cpp b/lib/remote/httpserverconnection.cpp index d8befd2114..946e4e22c7 100644 --- a/lib/remote/httpserverconnection.cpp +++ b/lib/remote/httpserverconnection.cpp @@ -413,15 +413,17 @@ void ProcessRequest( HttpResponse& response, const WaitGroup::Ptr& waitGroup, std::chrono::steady_clock::duration& cpuBoundWorkTime, - boost::asio::yield_context& yc + boost::asio::yield_context& yc, + boost::asio::io_context::strand& strand ) { try { // Cache the elapsed time to acquire a CPU semaphore used to detect extremely heavy workloads. auto start (std::chrono::steady_clock::now()); - CpuBoundWork handlingRequest (yc); + auto handlingRequest (std::make_shared(yc, strand)); cpuBoundWorkTime = std::chrono::steady_clock::now() - start; + response.SetCpuBoundWork(handlingRequest); HttpHandler::ProcessRequest(waitGroup, request, response, yc); response.body().Finish(); } catch (const std::exception& ex) { @@ -521,7 +523,7 @@ void HttpServerConnection::ProcessMessages(boost::asio::yield_context yc) m_Seen = std::numeric_limits::max(); - ProcessRequest(request, response, m_WaitGroup, cpuBoundWorkTime, yc); + ProcessRequest(request, response, m_WaitGroup, cpuBoundWorkTime, yc, m_IoStrand); if (!request.keep_alive() || !m_ConnectionReusable) { break; diff --git a/lib/remote/jsonrpcconnection.cpp b/lib/remote/jsonrpcconnection.cpp index b859448672..7e897b3e53 100644 --- a/lib/remote/jsonrpcconnection.cpp +++ b/lib/remote/jsonrpcconnection.cpp @@ -91,7 +91,7 @@ void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc) auto start (ch::steady_clock::now()); try { - CpuBoundWork handleMessage (yc); + CpuBoundWork handleMessage (yc, m_IoStrand); // Cache the elapsed time to acquire a CPU semaphore used to detect extremely heavy workloads. cpuBoundDuration = ch::steady_clock::now() - start;