diff --git a/src/agent/CMakeLists.txt b/src/agent/CMakeLists.txt index 51cda998b9a..a411ed86a75 100644 --- a/src/agent/CMakeLists.txt +++ b/src/agent/CMakeLists.txt @@ -20,16 +20,16 @@ set(SOURCES add_subdirectory(agent_info) add_subdirectory(communicator) add_subdirectory(configuration_parser) +add_subdirectory(queue) find_package(OpenSSL REQUIRED) find_package(Boost REQUIRED COMPONENTS asio beast) add_library(Agent ${SOURCES}) target_include_directories(Agent PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include) -target_link_libraries(Agent PUBLIC ConfigurationParser Communicator AgentInfo PRIVATE OpenSSL::SSL OpenSSL::Crypto Boost::asio Boost::beast) +target_link_libraries(Agent PUBLIC ConfigurationParser Communicator AgentInfo queue PRIVATE OpenSSL::SSL OpenSSL::Crypto Boost::asio Boost::beast) if(BUILD_TESTS) enable_testing() add_subdirectory(tests) - add_subdirectory(queue) endif() diff --git a/src/agent/communicator/include/communicator.hpp b/src/agent/communicator/include/communicator.hpp index 5d1f0289215..d46685eb072 100644 --- a/src/agent/communicator/include/communicator.hpp +++ b/src/agent/communicator/include/communicator.hpp @@ -20,9 +20,15 @@ namespace communicator const std::function GetStringConfigValue); boost::asio::awaitable WaitForTokenExpirationAndAuthenticate(); - boost::asio::awaitable GetCommandsFromManager(std::queue& messageQueue); - boost::asio::awaitable StatefulMessageProcessingTask(std::queue& messageQueue); - boost::asio::awaitable StatelessMessageProcessingTask(std::queue& messageQueue); + boost::asio::awaitable + GetCommandsFromManager(std::function()> getMessages, + std::function onSuccess); + boost::asio::awaitable + StatefulMessageProcessingTask(std::function()> getMessages, + std::function onSuccess); + boost::asio::awaitable + StatelessMessageProcessingTask(std::function()> getMessages, + std::function onSuccess); private: long GetTokenRemainingSecs() const; diff --git a/src/agent/communicator/include/http_client.hpp b/src/agent/communicator/include/http_client.hpp index 0cc45e1b6b6..3d3c5618007 100644 --- a/src/agent/communicator/include/http_client.hpp +++ b/src/agent/communicator/include/http_client.hpp @@ -46,11 +46,12 @@ namespace http_client std::function onUnauthorized, std::function onSuccess = {}); - boost::asio::awaitable Co_MessageProcessingTask(const std::string& token, - HttpRequestParams params, - std::function messageGetter, - std::function onUnauthorized, - std::function onSuccess = {}); + boost::asio::awaitable + Co_MessageProcessingTask(const std::string& token, + HttpRequestParams params, + std::function()> messageGetter, + std::function onUnauthorized, + std::function onSuccess = {}); boost::beast::http::response PerformHttpRequest(const HttpRequestParams& params); diff --git a/src/agent/communicator/src/communicator.cpp b/src/agent/communicator/src/communicator.cpp index 23c61d7c910..e3e1146a91e 100644 --- a/src/agent/communicator/src/communicator.cpp +++ b/src/agent/communicator/src/communicator.cpp @@ -67,7 +67,9 @@ namespace communicator return std::max(0L, static_cast(m_tokenExpTimeInSeconds - now_seconds)); } - boost::asio::awaitable Communicator::GetCommandsFromManager(std::queue& messageQueue) + boost::asio::awaitable + Communicator::GetCommandsFromManager(std::function()> getMessages, + std::function onSuccess) { auto onAuthenticationFailed = [this]() { @@ -75,7 +77,8 @@ namespace communicator }; const auto reqParams = http_client::HttpRequestParams(boost::beast::http::verb::get, m_managerIp, m_port, "/commands"); - co_await http_client::Co_MessageProcessingTask(m_token, reqParams, {}, onAuthenticationFailed); + co_await http_client::Co_MessageProcessingTask( + m_token, reqParams, getMessages, onAuthenticationFailed, onSuccess); } boost::asio::awaitable Communicator::WaitForTokenExpirationAndAuthenticate() @@ -105,7 +108,9 @@ namespace communicator } } - boost::asio::awaitable Communicator::StatefulMessageProcessingTask(std::queue& messageQueue) + boost::asio::awaitable + Communicator::StatefulMessageProcessingTask(std::function()> getMessages, + std::function onSuccess) { auto onAuthenticationFailed = [this]() { @@ -113,10 +118,13 @@ namespace communicator }; const auto reqParams = http_client::HttpRequestParams(boost::beast::http::verb::post, m_managerIp, m_port, "/stateful"); - co_await http_client::Co_MessageProcessingTask(m_token, reqParams, {}, onAuthenticationFailed); + co_await http_client::Co_MessageProcessingTask( + m_token, reqParams, getMessages, onAuthenticationFailed, onSuccess); } - boost::asio::awaitable Communicator::StatelessMessageProcessingTask(std::queue& messageQueue) + boost::asio::awaitable + Communicator::StatelessMessageProcessingTask(std::function()> getMessages, + std::function onSuccess) { auto onAuthenticationFailed = [this]() { @@ -124,7 +132,8 @@ namespace communicator }; const auto reqParams = http_client::HttpRequestParams(boost::beast::http::verb::post, m_managerIp, m_port, "/stateless"); - co_await http_client::Co_MessageProcessingTask(m_token, reqParams, {}, onAuthenticationFailed); + co_await http_client::Co_MessageProcessingTask( + m_token, reqParams, getMessages, onAuthenticationFailed, onSuccess); } void Communicator::TryReAuthenticate() diff --git a/src/agent/communicator/src/http_client.cpp b/src/agent/communicator/src/http_client.cpp index 9ffba759dbb..3fe8a1d5094 100644 --- a/src/agent/communicator/src/http_client.cpp +++ b/src/agent/communicator/src/http_client.cpp @@ -96,11 +96,12 @@ namespace http_client std::cout << "Response body: " << boost::beast::buffers_to_string(res.body().data()) << std::endl; } - boost::asio::awaitable Co_MessageProcessingTask(const std::string& token, - HttpRequestParams reqParams, - std::function messageGetter, - std::function onUnauthorized, - std::function onSuccess) + boost::asio::awaitable + Co_MessageProcessingTask(const std::string& token, + HttpRequestParams reqParams, + std::function()> messageGetter, + std::function onUnauthorized, + std::function onSuccess) { using namespace std::chrono_literals; @@ -129,7 +130,7 @@ namespace http_client continue; } - reqParams.body = messageGetter ? messageGetter() : ""; + reqParams.body = messageGetter ? co_await messageGetter() : ""; reqParams.token = token; auto req = CreateHttpRequest(reqParams); diff --git a/src/agent/include/agent.hpp b/src/agent/include/agent.hpp index c9f0c0cea3f..165f34f7db5 100644 --- a/src/agent/include/agent.hpp +++ b/src/agent/include/agent.hpp @@ -3,10 +3,10 @@ #include #include #include +#include #include #include -#include #include class Agent @@ -18,7 +18,7 @@ class Agent void Run(); private: - std::queue m_messageQueue; + MultiTypeQueue m_messageQueue; SignalHandler m_signalHandler; TaskManager m_taskManager; diff --git a/src/agent/queue/CMakeLists.txt b/src/agent/queue/CMakeLists.txt index f8e4283a011..0288de7e87d 100644 --- a/src/agent/queue/CMakeLists.txt +++ b/src/agent/queue/CMakeLists.txt @@ -6,21 +6,15 @@ set(CMAKE_CXX_STANDARD_REQUIRED True) set(CMAKE_CXX_FLAGS "-Wall -Wextra -Wunused -pthread") -include_directories(${CMAKE_SOURCE_DIR}/include) - find_package(SQLiteCpp REQUIRED) find_package(nlohmann_json REQUIRED) find_package(fmt REQUIRED) find_package(Boost REQUIRED COMPONENTS asio beast) -add_library(queue - src/sqlitestorage.cpp - src/queue.cpp -) - -target_include_directories(queue PRIVATE include ${SQLiteCpp_INCLUDE_DIRS}) +add_library(queue src/sqlitestorage.cpp src/message_queue.cpp) +target_include_directories(queue PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include PRIVATE ${SQLiteCpp_INCLUDE_DIRS}) target_link_libraries(queue PRIVATE SQLiteCpp nlohmann_json::nlohmann_json fmt::fmt Boost::asio) -if(BUILD_TESTS) - add_subdirectory(tests) -endif() +# if(BUILD_TESTS) +# add_subdirectory(tests) +# endif() diff --git a/src/agent/queue/include/queue.hpp b/src/agent/queue/include/message_queue.hpp similarity index 100% rename from src/agent/queue/include/queue.hpp rename to src/agent/queue/include/message_queue.hpp diff --git a/src/agent/queue/src/main.cpp b/src/agent/queue/src/main.cpp index 398d4e93ecb..4fdf62c7590 100644 --- a/src/agent/queue/src/main.cpp +++ b/src/agent/queue/src/main.cpp @@ -2,7 +2,7 @@ #include #include -#include "queue.hpp" +#include "message_queue.hpp" int main() { diff --git a/src/agent/queue/src/queue.cpp b/src/agent/queue/src/message_queue.cpp similarity index 99% rename from src/agent/queue/src/queue.cpp rename to src/agent/queue/src/message_queue.cpp index 9052c3ccb12..93a430e4c09 100644 --- a/src/agent/queue/src/queue.cpp +++ b/src/agent/queue/src/message_queue.cpp @@ -10,7 +10,7 @@ #include #include -#include "queue.hpp" +#include "message_queue.hpp" int MultiTypeQueue::push(Message message, bool shouldWait) { diff --git a/src/agent/queue/tests/queue_test.cpp b/src/agent/queue/tests/queue_test.cpp index c706c16b905..c2719068d59 100644 --- a/src/agent/queue/tests/queue_test.cpp +++ b/src/agent/queue/tests/queue_test.cpp @@ -12,7 +12,7 @@ #include #include -#include "queue.hpp" +#include "message_queue.hpp" #include "queue_test.hpp" using json = nlohmann::json; diff --git a/src/agent/src/agent.cpp b/src/agent/src/agent.cpp index 1e0ede21695..1e7efd5ccea 100644 --- a/src/agent/src/agent.cpp +++ b/src/agent/src/agent.cpp @@ -1,7 +1,35 @@ #include +#include + +#include #include +namespace +{ + template + auto getMessagesFromQueue(MultiTypeQueue& multiTypeQueue) + { + return [&multiTypeQueue]() -> boost::asio::awaitable + { + std::cout << "Getting messages from queue\n"; + const auto message = co_await multiTypeQueue.getNextAwaitable(T); + co_return message.data.dump(); + }; + } + + template + auto popMessagesFromQueue(MultiTypeQueue& multiTypeQueue) + { + return [&multiTypeQueue](const std::string& response) -> void + { + std::cout << "Response: " << response << '\n'; + std::cout << "Popping messages from queue\n"; + multiTypeQueue.pop(T); + }; + } +} // namespace + Agent::Agent() : m_communicator(m_agentInfo.GetUUID(), [this](std::string table, std::string key) -> std::string @@ -18,9 +46,12 @@ Agent::~Agent() void Agent::Run() { m_taskManager.EnqueueTask(m_communicator.WaitForTokenExpirationAndAuthenticate()); - m_taskManager.EnqueueTask(m_communicator.GetCommandsFromManager(m_messageQueue)); - m_taskManager.EnqueueTask(m_communicator.StatefulMessageProcessingTask(m_messageQueue)); - m_taskManager.EnqueueTask(m_communicator.StatelessMessageProcessingTask(m_messageQueue)); + m_taskManager.EnqueueTask(m_communicator.GetCommandsFromManager(getMessagesFromQueue(m_messageQueue), + popMessagesFromQueue(m_messageQueue))); + m_taskManager.EnqueueTask(m_communicator.StatefulMessageProcessingTask( + getMessagesFromQueue(m_messageQueue), popMessagesFromQueue(m_messageQueue))); + m_taskManager.EnqueueTask(m_communicator.StatelessMessageProcessingTask( + getMessagesFromQueue(m_messageQueue), popMessagesFromQueue(m_messageQueue))); m_signalHandler.WaitForSignal(); }