Skip to content

Commit

Permalink
feat: add TaskManager implementation using boost asio
Browse files Browse the repository at this point in the history
This implementation uses boost asio io_context and
awaitables. By enqueing either a callable object or
an awaitable, we delegate to the io_context to
schedule and manage the tasks within the pool.
  • Loading branch information
jr0me committed Jul 17, 2024
1 parent f5cc4ae commit 8df565f
Show file tree
Hide file tree
Showing 6 changed files with 150 additions and 0 deletions.
6 changes: 6 additions & 0 deletions src/agent/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,21 @@ include_directories(include)

set(SOURCES
src/agent.cpp
src/task_manager.cpp
)

set(HEADERS
include/agent.hpp
include/itask_manager.hpp
include/task_manager.hpp
)

add_library(agent ${SOURCES} ${HEADERS})

find_package(Boost REQUIRED COMPONENTS asio)

target_link_libraries(agent PRIVATE ${Boost_LIBRARIES})

if(BUILD_TESTS)
add_subdirectory(tests)
endif()
27 changes: 27 additions & 0 deletions src/agent/include/task_manager.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#pragma once

#include <itask_manager.hpp>

#include <boost/asio/awaitable.hpp>
#include <boost/asio/io_context.hpp>

#include <functional>
#include <thread>
#include <vector>

class TaskManager : public ITaskManager<boost::asio::awaitable<void>>
{
public:
TaskManager();

void start(size_t numThreads) override;
void stop() override;

void enqueueTask(std::function<void()> task) override;
void enqueueTask(boost::asio::awaitable<void> task) override;

private:
boost::asio::io_context m_ioContext;
boost::asio::io_context::work m_work;
std::vector<std::thread> m_threads;
};
45 changes: 45 additions & 0 deletions src/agent/src/task_manager.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#include <task_manager.hpp>

#include <boost/asio/co_spawn.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/post.hpp>

TaskManager::TaskManager()
: m_work(m_ioContext)
{
}

void TaskManager::start(size_t numThreads)
{
stop();

for (size_t i = 0; i < numThreads; ++i)
{
m_threads.emplace_back([this]() { m_ioContext.run(); });
}
}

void TaskManager::stop()
{
m_ioContext.stop();

for (std::thread& thread : m_threads)
{
if (thread.joinable())
{
thread.join();
}
}
m_threads.clear();
m_ioContext.reset();
}

void TaskManager::enqueueTask(std::function<void()> task)
{
boost::asio::post(m_ioContext, std::move(task));
}

void TaskManager::enqueueTask(boost::asio::awaitable<void> task)
{
boost::asio::co_spawn(m_ioContext, std::move(task), boost::asio::detached);
}
5 changes: 5 additions & 0 deletions src/agent/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,13 @@ find_package(GTest CONFIG REQUIRED)

set(TEST_SOURCES
agent_test.cpp
task_manager_test.cpp
)

add_executable(agent_test agent_test.cpp)
target_link_libraries(agent_test PRIVATE agent GTest::gtest)
add_test(NAME AgentTest COMMAND agent_test)

add_executable(task_manager_test task_manager_test.cpp)
target_link_libraries(task_manager_test PRIVATE agent GTest::gtest)
add_test(NAME TaskManagerTest COMMAND task_manager_test)
66 changes: 66 additions & 0 deletions src/agent/tests/task_manager_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#include <gtest/gtest.h>
#include <task_manager.hpp>

#include <atomic>
#include <chrono>

class TaskManagerTest : public ::testing::Test
{
protected:
TaskManager taskManager;

void SetUp() override {}

void TearDown() override {}
};

TEST_F(TaskManagerTest, StartAndStop)
{
taskManager.start(2);
taskManager.stop();
}

TEST_F(TaskManagerTest, EnqueueFunctionTask)
{
taskManager.start(1);

std::atomic<int> counter = 0;
std::function<void()> task = [&counter]()
{
++counter;
};

taskManager.enqueueTask(task);

std::this_thread::sleep_for(std::chrono::milliseconds(100));

EXPECT_EQ(counter, 1);

taskManager.stop();
}

TEST_F(TaskManagerTest, EnqueueCoroutineTask)
{
taskManager.start(1);

std::atomic<int> counter = 0;
auto coroutineTask = [&counter]() -> boost::asio::awaitable<void>
{
++counter;
co_return;
};

taskManager.enqueueTask(coroutineTask());

std::this_thread::sleep_for(std::chrono::milliseconds(100));

EXPECT_EQ(counter, 1);

taskManager.stop();
}

int main(int argc, char** argv)
{
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
1 change: 1 addition & 0 deletions src/vcpkg.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"name": "wazuh-agent-mvp",
"version": "5.0.0",
"dependencies": [
"boost-asio",
"gtest"
]
}

0 comments on commit 8df565f

Please sign in to comment.