-
Notifications
You must be signed in to change notification settings - Fork 19
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: add TaskManager implementation using boost asio
This implementation uses boost asio io_context and awaitables. By enqueing either a callable object or an awaitable, we delegate to the io_context to schedule and manage the tasks within the pool.
- Loading branch information
Showing
6 changed files
with
150 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
#pragma once | ||
|
||
#include <itask_manager.hpp> | ||
|
||
#include <boost/asio/io_context.hpp> | ||
#include <boost/asio/awaitable.hpp> | ||
|
||
#include <functional> | ||
#include <thread> | ||
#include <vector> | ||
|
||
class TaskManager : public ITaskManager<boost::asio::awaitable<void>> | ||
{ | ||
public: | ||
TaskManager(); | ||
|
||
void start(size_t numThreads) override; | ||
void stop() override; | ||
|
||
void enqueueTask(std::function<void()> task) override; | ||
void enqueueTask(boost::asio::awaitable<void> task) override; | ||
|
||
private: | ||
boost::asio::io_context m_ioContext; | ||
boost::asio::io_context::work m_work; | ||
std::vector<std::thread> m_threads; | ||
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
#include <task_manager.hpp> | ||
|
||
#include <boost/asio/co_spawn.hpp> | ||
#include <boost/asio/detached.hpp> | ||
#include <boost/asio/post.hpp> | ||
|
||
TaskManager::TaskManager() | ||
: m_work(m_ioContext) | ||
{ | ||
} | ||
|
||
void TaskManager::start(size_t numThreads) | ||
{ | ||
stop(); | ||
|
||
for (size_t i = 0; i < numThreads; ++i) | ||
{ | ||
m_threads.emplace_back([this]() { m_ioContext.run(); }); | ||
} | ||
} | ||
|
||
void TaskManager::stop() | ||
{ | ||
m_ioContext.stop(); | ||
|
||
for (std::thread& thread : m_threads) | ||
{ | ||
if (thread.joinable()) | ||
{ | ||
thread.join(); | ||
} | ||
} | ||
m_threads.clear(); | ||
m_ioContext.reset(); | ||
} | ||
|
||
void TaskManager::enqueueTask(std::function<void()> task) | ||
{ | ||
boost::asio::post(m_ioContext, std::move(task)); | ||
} | ||
|
||
void TaskManager::enqueueTask(boost::asio::awaitable<void> task) | ||
{ | ||
boost::asio::co_spawn(m_ioContext, std::move(task), boost::asio::detached); | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
#include <gtest/gtest.h> | ||
#include <task_manager.hpp> | ||
|
||
#include <atomic> | ||
#include <chrono> | ||
|
||
class TaskManagerTest : public ::testing::Test | ||
{ | ||
protected: | ||
TaskManager taskManager; | ||
|
||
void SetUp() override {} | ||
|
||
void TearDown() override {} | ||
}; | ||
|
||
TEST_F(TaskManagerTest, StartAndStop) | ||
{ | ||
taskManager.start(2); | ||
taskManager.stop(); | ||
} | ||
|
||
TEST_F(TaskManagerTest, EnqueueFunctionTask) | ||
{ | ||
taskManager.start(1); | ||
|
||
std::atomic<int> counter = 0; | ||
std::function<void()> task = [&counter]() | ||
{ | ||
++counter; | ||
}; | ||
|
||
taskManager.enqueueTask(task); | ||
|
||
std::this_thread::sleep_for(std::chrono::milliseconds(100)); | ||
|
||
EXPECT_EQ(counter, 1); | ||
|
||
taskManager.stop(); | ||
} | ||
|
||
TEST_F(TaskManagerTest, EnqueueCoroutineTask) | ||
{ | ||
taskManager.start(1); | ||
|
||
std::atomic<int> counter = 0; | ||
auto coroutineTask = [&counter]() -> boost::asio::awaitable<void> | ||
{ | ||
++counter; | ||
co_return; | ||
}; | ||
|
||
taskManager.enqueueTask(coroutineTask()); | ||
|
||
std::this_thread::sleep_for(std::chrono::milliseconds(100)); | ||
|
||
EXPECT_EQ(counter, 1); | ||
|
||
taskManager.stop(); | ||
} | ||
|
||
int main(int argc, char** argv) | ||
{ | ||
::testing::InitGoogleTest(&argc, argv); | ||
return RUN_ALL_TESTS(); | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,6 +2,7 @@ | |
"name": "wazuh-agent-mvp", | ||
"version": "5.0.0", | ||
"dependencies": [ | ||
"boost-asio", | ||
"gtest" | ||
] | ||
} |