From 0b062b7bccde9982faa8620e2b096519f2da0e96 Mon Sep 17 00:00:00 2001 From: jr0me Date: Fri, 9 Aug 2024 19:44:21 +0200 Subject: [PATCH] temp notes: Queue header has the same name as others, namely queue.hpp from boost this causes intellisense to fail to recognize symbols, bigger problems may arise. Some necessary changes to cmakes were made to allow compilation, aside from the queue header name, the cmake wasnt generating, some linking must be made public. Currently experimenting passing a lambda to the communicator with the strategy on how to get a message to send. This way the communciator can be kept decoupled from the queue implementation. --- src/agent/CMakeLists.txt | 4 +- .../communicator/include/communicator.hpp | 12 ++++-- .../communicator/include/http_client.hpp | 11 +++--- src/agent/communicator/src/communicator.cpp | 21 ++++++++--- src/agent/communicator/src/http_client.cpp | 13 ++++--- src/agent/include/agent.hpp | 4 +- src/agent/queue/CMakeLists.txt | 16 +++----- .../include/{queue.hpp => message_queue.hpp} | 0 src/agent/queue/src/main.cpp | 2 +- .../src/{queue.cpp => message_queue.cpp} | 2 +- src/agent/queue/tests/queue_test.cpp | 2 +- src/agent/src/agent.cpp | 37 +++++++++++++++++-- 12 files changed, 83 insertions(+), 41 deletions(-) rename src/agent/queue/include/{queue.hpp => message_queue.hpp} (100%) rename src/agent/queue/src/{queue.cpp => message_queue.cpp} (99%) diff --git a/src/agent/CMakeLists.txt b/src/agent/CMakeLists.txt index 51cda998b9a..a411ed86a75 100644 --- a/src/agent/CMakeLists.txt +++ b/src/agent/CMakeLists.txt @@ -20,16 +20,16 @@ set(SOURCES add_subdirectory(agent_info) add_subdirectory(communicator) add_subdirectory(configuration_parser) +add_subdirectory(queue) find_package(OpenSSL REQUIRED) find_package(Boost REQUIRED COMPONENTS asio beast) add_library(Agent ${SOURCES}) target_include_directories(Agent PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include) -target_link_libraries(Agent PUBLIC ConfigurationParser Communicator AgentInfo PRIVATE OpenSSL::SSL OpenSSL::Crypto Boost::asio Boost::beast) +target_link_libraries(Agent PUBLIC ConfigurationParser Communicator AgentInfo queue PRIVATE OpenSSL::SSL OpenSSL::Crypto Boost::asio Boost::beast) if(BUILD_TESTS) enable_testing() add_subdirectory(tests) - add_subdirectory(queue) endif() diff --git a/src/agent/communicator/include/communicator.hpp b/src/agent/communicator/include/communicator.hpp index 5d1f0289215..d46685eb072 100644 --- a/src/agent/communicator/include/communicator.hpp +++ b/src/agent/communicator/include/communicator.hpp @@ -20,9 +20,15 @@ namespace communicator const std::function GetStringConfigValue); boost::asio::awaitable WaitForTokenExpirationAndAuthenticate(); - boost::asio::awaitable GetCommandsFromManager(std::queue& messageQueue); - boost::asio::awaitable StatefulMessageProcessingTask(std::queue& messageQueue); - boost::asio::awaitable StatelessMessageProcessingTask(std::queue& messageQueue); + boost::asio::awaitable + GetCommandsFromManager(std::function()> getMessages, + std::function onSuccess); + boost::asio::awaitable + StatefulMessageProcessingTask(std::function()> getMessages, + std::function onSuccess); + boost::asio::awaitable + StatelessMessageProcessingTask(std::function()> getMessages, + std::function onSuccess); private: long GetTokenRemainingSecs() const; diff --git a/src/agent/communicator/include/http_client.hpp b/src/agent/communicator/include/http_client.hpp index 0cc45e1b6b6..3d3c5618007 100644 --- a/src/agent/communicator/include/http_client.hpp +++ b/src/agent/communicator/include/http_client.hpp @@ -46,11 +46,12 @@ namespace http_client std::function onUnauthorized, std::function onSuccess = {}); - boost::asio::awaitable Co_MessageProcessingTask(const std::string& token, - HttpRequestParams params, - std::function messageGetter, - std::function onUnauthorized, - std::function onSuccess = {}); + boost::asio::awaitable + Co_MessageProcessingTask(const std::string& token, + HttpRequestParams params, + std::function()> messageGetter, + std::function onUnauthorized, + std::function onSuccess = {}); boost::beast::http::response PerformHttpRequest(const HttpRequestParams& params); diff --git a/src/agent/communicator/src/communicator.cpp b/src/agent/communicator/src/communicator.cpp index 23c61d7c910..e3e1146a91e 100644 --- a/src/agent/communicator/src/communicator.cpp +++ b/src/agent/communicator/src/communicator.cpp @@ -67,7 +67,9 @@ namespace communicator return std::max(0L, static_cast(m_tokenExpTimeInSeconds - now_seconds)); } - boost::asio::awaitable Communicator::GetCommandsFromManager(std::queue& messageQueue) + boost::asio::awaitable + Communicator::GetCommandsFromManager(std::function()> getMessages, + std::function onSuccess) { auto onAuthenticationFailed = [this]() { @@ -75,7 +77,8 @@ namespace communicator }; const auto reqParams = http_client::HttpRequestParams(boost::beast::http::verb::get, m_managerIp, m_port, "/commands"); - co_await http_client::Co_MessageProcessingTask(m_token, reqParams, {}, onAuthenticationFailed); + co_await http_client::Co_MessageProcessingTask( + m_token, reqParams, getMessages, onAuthenticationFailed, onSuccess); } boost::asio::awaitable Communicator::WaitForTokenExpirationAndAuthenticate() @@ -105,7 +108,9 @@ namespace communicator } } - boost::asio::awaitable Communicator::StatefulMessageProcessingTask(std::queue& messageQueue) + boost::asio::awaitable + Communicator::StatefulMessageProcessingTask(std::function()> getMessages, + std::function onSuccess) { auto onAuthenticationFailed = [this]() { @@ -113,10 +118,13 @@ namespace communicator }; const auto reqParams = http_client::HttpRequestParams(boost::beast::http::verb::post, m_managerIp, m_port, "/stateful"); - co_await http_client::Co_MessageProcessingTask(m_token, reqParams, {}, onAuthenticationFailed); + co_await http_client::Co_MessageProcessingTask( + m_token, reqParams, getMessages, onAuthenticationFailed, onSuccess); } - boost::asio::awaitable Communicator::StatelessMessageProcessingTask(std::queue& messageQueue) + boost::asio::awaitable + Communicator::StatelessMessageProcessingTask(std::function()> getMessages, + std::function onSuccess) { auto onAuthenticationFailed = [this]() { @@ -124,7 +132,8 @@ namespace communicator }; const auto reqParams = http_client::HttpRequestParams(boost::beast::http::verb::post, m_managerIp, m_port, "/stateless"); - co_await http_client::Co_MessageProcessingTask(m_token, reqParams, {}, onAuthenticationFailed); + co_await http_client::Co_MessageProcessingTask( + m_token, reqParams, getMessages, onAuthenticationFailed, onSuccess); } void Communicator::TryReAuthenticate() diff --git a/src/agent/communicator/src/http_client.cpp b/src/agent/communicator/src/http_client.cpp index 9ffba759dbb..3fe8a1d5094 100644 --- a/src/agent/communicator/src/http_client.cpp +++ b/src/agent/communicator/src/http_client.cpp @@ -96,11 +96,12 @@ namespace http_client std::cout << "Response body: " << boost::beast::buffers_to_string(res.body().data()) << std::endl; } - boost::asio::awaitable Co_MessageProcessingTask(const std::string& token, - HttpRequestParams reqParams, - std::function messageGetter, - std::function onUnauthorized, - std::function onSuccess) + boost::asio::awaitable + Co_MessageProcessingTask(const std::string& token, + HttpRequestParams reqParams, + std::function()> messageGetter, + std::function onUnauthorized, + std::function onSuccess) { using namespace std::chrono_literals; @@ -129,7 +130,7 @@ namespace http_client continue; } - reqParams.body = messageGetter ? messageGetter() : ""; + reqParams.body = messageGetter ? co_await messageGetter() : ""; reqParams.token = token; auto req = CreateHttpRequest(reqParams); diff --git a/src/agent/include/agent.hpp b/src/agent/include/agent.hpp index c9f0c0cea3f..165f34f7db5 100644 --- a/src/agent/include/agent.hpp +++ b/src/agent/include/agent.hpp @@ -3,10 +3,10 @@ #include #include #include +#include #include #include -#include #include class Agent @@ -18,7 +18,7 @@ class Agent void Run(); private: - std::queue m_messageQueue; + MultiTypeQueue m_messageQueue; SignalHandler m_signalHandler; TaskManager m_taskManager; diff --git a/src/agent/queue/CMakeLists.txt b/src/agent/queue/CMakeLists.txt index f8e4283a011..0288de7e87d 100644 --- a/src/agent/queue/CMakeLists.txt +++ b/src/agent/queue/CMakeLists.txt @@ -6,21 +6,15 @@ set(CMAKE_CXX_STANDARD_REQUIRED True) set(CMAKE_CXX_FLAGS "-Wall -Wextra -Wunused -pthread") -include_directories(${CMAKE_SOURCE_DIR}/include) - find_package(SQLiteCpp REQUIRED) find_package(nlohmann_json REQUIRED) find_package(fmt REQUIRED) find_package(Boost REQUIRED COMPONENTS asio beast) -add_library(queue - src/sqlitestorage.cpp - src/queue.cpp -) - -target_include_directories(queue PRIVATE include ${SQLiteCpp_INCLUDE_DIRS}) +add_library(queue src/sqlitestorage.cpp src/message_queue.cpp) +target_include_directories(queue PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include PRIVATE ${SQLiteCpp_INCLUDE_DIRS}) target_link_libraries(queue PRIVATE SQLiteCpp nlohmann_json::nlohmann_json fmt::fmt Boost::asio) -if(BUILD_TESTS) - add_subdirectory(tests) -endif() +# if(BUILD_TESTS) +# add_subdirectory(tests) +# endif() diff --git a/src/agent/queue/include/queue.hpp b/src/agent/queue/include/message_queue.hpp similarity index 100% rename from src/agent/queue/include/queue.hpp rename to src/agent/queue/include/message_queue.hpp diff --git a/src/agent/queue/src/main.cpp b/src/agent/queue/src/main.cpp index 398d4e93ecb..4fdf62c7590 100644 --- a/src/agent/queue/src/main.cpp +++ b/src/agent/queue/src/main.cpp @@ -2,7 +2,7 @@ #include #include -#include "queue.hpp" +#include "message_queue.hpp" int main() { diff --git a/src/agent/queue/src/queue.cpp b/src/agent/queue/src/message_queue.cpp similarity index 99% rename from src/agent/queue/src/queue.cpp rename to src/agent/queue/src/message_queue.cpp index 9052c3ccb12..93a430e4c09 100644 --- a/src/agent/queue/src/queue.cpp +++ b/src/agent/queue/src/message_queue.cpp @@ -10,7 +10,7 @@ #include #include -#include "queue.hpp" +#include "message_queue.hpp" int MultiTypeQueue::push(Message message, bool shouldWait) { diff --git a/src/agent/queue/tests/queue_test.cpp b/src/agent/queue/tests/queue_test.cpp index c706c16b905..c2719068d59 100644 --- a/src/agent/queue/tests/queue_test.cpp +++ b/src/agent/queue/tests/queue_test.cpp @@ -12,7 +12,7 @@ #include #include -#include "queue.hpp" +#include "message_queue.hpp" #include "queue_test.hpp" using json = nlohmann::json; diff --git a/src/agent/src/agent.cpp b/src/agent/src/agent.cpp index 1e0ede21695..1e7efd5ccea 100644 --- a/src/agent/src/agent.cpp +++ b/src/agent/src/agent.cpp @@ -1,7 +1,35 @@ #include +#include + +#include #include +namespace +{ + template + auto getMessagesFromQueue(MultiTypeQueue& multiTypeQueue) + { + return [&multiTypeQueue]() -> boost::asio::awaitable + { + std::cout << "Getting messages from queue\n"; + const auto message = co_await multiTypeQueue.getNextAwaitable(T); + co_return message.data.dump(); + }; + } + + template + auto popMessagesFromQueue(MultiTypeQueue& multiTypeQueue) + { + return [&multiTypeQueue](const std::string& response) -> void + { + std::cout << "Response: " << response << '\n'; + std::cout << "Popping messages from queue\n"; + multiTypeQueue.pop(T); + }; + } +} // namespace + Agent::Agent() : m_communicator(m_agentInfo.GetUUID(), [this](std::string table, std::string key) -> std::string @@ -18,9 +46,12 @@ Agent::~Agent() void Agent::Run() { m_taskManager.EnqueueTask(m_communicator.WaitForTokenExpirationAndAuthenticate()); - m_taskManager.EnqueueTask(m_communicator.GetCommandsFromManager(m_messageQueue)); - m_taskManager.EnqueueTask(m_communicator.StatefulMessageProcessingTask(m_messageQueue)); - m_taskManager.EnqueueTask(m_communicator.StatelessMessageProcessingTask(m_messageQueue)); + m_taskManager.EnqueueTask(m_communicator.GetCommandsFromManager(getMessagesFromQueue(m_messageQueue), + popMessagesFromQueue(m_messageQueue))); + m_taskManager.EnqueueTask(m_communicator.StatefulMessageProcessingTask( + getMessagesFromQueue(m_messageQueue), popMessagesFromQueue(m_messageQueue))); + m_taskManager.EnqueueTask(m_communicator.StatelessMessageProcessingTask( + getMessagesFromQueue(m_messageQueue), popMessagesFromQueue(m_messageQueue))); m_signalHandler.WaitForSignal(); }