From 8c6d3490e3116c0686dcda0a0391e52f9c815389 Mon Sep 17 00:00:00 2001 From: jr0me Date: Fri, 9 Aug 2024 19:44:21 +0200 Subject: [PATCH] temp notes: Queue header has the same name as others, namely queue.hpp from boost this causes intellisense to fail to recognize symbols, bigger problems may arise. Some necessary changes to cmakes were made to allow compilation, aside from the queue header name, the cmake wasnt generating, some linking must be made public. Currently experimenting passing a lambda to the communicator with the strategy on how to get a message to send. This way the communciator can be kept decoupled from the queue implementation. --- src/agent/CMakeLists.txt | 4 +- .../communicator/include/communicator.hpp | 9 +++-- src/agent/communicator/src/communicator.cpp | 20 +++++++--- src/agent/include/agent.hpp | 4 +- src/agent/queue/CMakeLists.txt | 16 +++----- .../include/{queue.hpp => message_queue.hpp} | 0 src/agent/queue/src/main.cpp | 2 +- .../src/{queue.cpp => message_queue.cpp} | 2 +- src/agent/queue/tests/queue_test.cpp | 2 +- src/agent/src/agent.cpp | 37 +++++++++++++++++-- 10 files changed, 66 insertions(+), 30 deletions(-) rename src/agent/queue/include/{queue.hpp => message_queue.hpp} (100%) rename src/agent/queue/src/{queue.cpp => message_queue.cpp} (99%) diff --git a/src/agent/CMakeLists.txt b/src/agent/CMakeLists.txt index 51cda998b9..a411ed86a7 100644 --- a/src/agent/CMakeLists.txt +++ b/src/agent/CMakeLists.txt @@ -20,16 +20,16 @@ set(SOURCES add_subdirectory(agent_info) add_subdirectory(communicator) add_subdirectory(configuration_parser) +add_subdirectory(queue) find_package(OpenSSL REQUIRED) find_package(Boost REQUIRED COMPONENTS asio beast) add_library(Agent ${SOURCES}) target_include_directories(Agent PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include) -target_link_libraries(Agent PUBLIC ConfigurationParser Communicator AgentInfo PRIVATE OpenSSL::SSL OpenSSL::Crypto Boost::asio Boost::beast) +target_link_libraries(Agent PUBLIC ConfigurationParser Communicator AgentInfo queue PRIVATE OpenSSL::SSL OpenSSL::Crypto Boost::asio Boost::beast) if(BUILD_TESTS) enable_testing() add_subdirectory(tests) - add_subdirectory(queue) endif() diff --git a/src/agent/communicator/include/communicator.hpp b/src/agent/communicator/include/communicator.hpp index 5d1f028921..a2fa5d9b4f 100644 --- a/src/agent/communicator/include/communicator.hpp +++ b/src/agent/communicator/include/communicator.hpp @@ -20,9 +20,12 @@ namespace communicator const std::function GetStringConfigValue); boost::asio::awaitable WaitForTokenExpirationAndAuthenticate(); - boost::asio::awaitable GetCommandsFromManager(std::queue& messageQueue); - boost::asio::awaitable StatefulMessageProcessingTask(std::queue& messageQueue); - boost::asio::awaitable StatelessMessageProcessingTask(std::queue& messageQueue); + boost::asio::awaitable GetCommandsFromManager(std::function getMessages, + std::function onSuccess); + boost::asio::awaitable StatefulMessageProcessingTask(std::function getMessages, + std::function onSuccess); + boost::asio::awaitable StatelessMessageProcessingTask(std::function getMessages, + std::function onSuccess); private: long GetTokenRemainingSecs() const; diff --git a/src/agent/communicator/src/communicator.cpp b/src/agent/communicator/src/communicator.cpp index 23c61d7c91..d4e5bae4e5 100644 --- a/src/agent/communicator/src/communicator.cpp +++ b/src/agent/communicator/src/communicator.cpp @@ -67,7 +67,8 @@ namespace communicator return std::max(0L, static_cast(m_tokenExpTimeInSeconds - now_seconds)); } - boost::asio::awaitable Communicator::GetCommandsFromManager(std::queue& messageQueue) + boost::asio::awaitable Communicator::GetCommandsFromManager(std::function getMessages, + std::function onSuccess) { auto onAuthenticationFailed = [this]() { @@ -75,7 +76,8 @@ namespace communicator }; const auto reqParams = http_client::HttpRequestParams(boost::beast::http::verb::get, m_managerIp, m_port, "/commands"); - co_await http_client::Co_MessageProcessingTask(m_token, reqParams, {}, onAuthenticationFailed); + co_await http_client::Co_MessageProcessingTask( + m_token, reqParams, getMessages, onAuthenticationFailed, onSuccess); } boost::asio::awaitable Communicator::WaitForTokenExpirationAndAuthenticate() @@ -105,7 +107,9 @@ namespace communicator } } - boost::asio::awaitable Communicator::StatefulMessageProcessingTask(std::queue& messageQueue) + boost::asio::awaitable + Communicator::StatefulMessageProcessingTask(std::function getMessages, + std::function onSuccess) { auto onAuthenticationFailed = [this]() { @@ -113,10 +117,13 @@ namespace communicator }; const auto reqParams = http_client::HttpRequestParams(boost::beast::http::verb::post, m_managerIp, m_port, "/stateful"); - co_await http_client::Co_MessageProcessingTask(m_token, reqParams, {}, onAuthenticationFailed); + co_await http_client::Co_MessageProcessingTask( + m_token, reqParams, getMessages, onAuthenticationFailed, onSuccess); } - boost::asio::awaitable Communicator::StatelessMessageProcessingTask(std::queue& messageQueue) + boost::asio::awaitable + Communicator::StatelessMessageProcessingTask(std::function getMessages, + std::function onSuccess) { auto onAuthenticationFailed = [this]() { @@ -124,7 +131,8 @@ namespace communicator }; const auto reqParams = http_client::HttpRequestParams(boost::beast::http::verb::post, m_managerIp, m_port, "/stateless"); - co_await http_client::Co_MessageProcessingTask(m_token, reqParams, {}, onAuthenticationFailed); + co_await http_client::Co_MessageProcessingTask( + m_token, reqParams, getMessages, onAuthenticationFailed, onSuccess); } void Communicator::TryReAuthenticate() diff --git a/src/agent/include/agent.hpp b/src/agent/include/agent.hpp index c9f0c0cea3..165f34f7db 100644 --- a/src/agent/include/agent.hpp +++ b/src/agent/include/agent.hpp @@ -3,10 +3,10 @@ #include #include #include +#include #include #include -#include #include class Agent @@ -18,7 +18,7 @@ class Agent void Run(); private: - std::queue m_messageQueue; + MultiTypeQueue m_messageQueue; SignalHandler m_signalHandler; TaskManager m_taskManager; diff --git a/src/agent/queue/CMakeLists.txt b/src/agent/queue/CMakeLists.txt index f8e4283a01..0288de7e87 100644 --- a/src/agent/queue/CMakeLists.txt +++ b/src/agent/queue/CMakeLists.txt @@ -6,21 +6,15 @@ set(CMAKE_CXX_STANDARD_REQUIRED True) set(CMAKE_CXX_FLAGS "-Wall -Wextra -Wunused -pthread") -include_directories(${CMAKE_SOURCE_DIR}/include) - find_package(SQLiteCpp REQUIRED) find_package(nlohmann_json REQUIRED) find_package(fmt REQUIRED) find_package(Boost REQUIRED COMPONENTS asio beast) -add_library(queue - src/sqlitestorage.cpp - src/queue.cpp -) - -target_include_directories(queue PRIVATE include ${SQLiteCpp_INCLUDE_DIRS}) +add_library(queue src/sqlitestorage.cpp src/message_queue.cpp) +target_include_directories(queue PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include PRIVATE ${SQLiteCpp_INCLUDE_DIRS}) target_link_libraries(queue PRIVATE SQLiteCpp nlohmann_json::nlohmann_json fmt::fmt Boost::asio) -if(BUILD_TESTS) - add_subdirectory(tests) -endif() +# if(BUILD_TESTS) +# add_subdirectory(tests) +# endif() diff --git a/src/agent/queue/include/queue.hpp b/src/agent/queue/include/message_queue.hpp similarity index 100% rename from src/agent/queue/include/queue.hpp rename to src/agent/queue/include/message_queue.hpp diff --git a/src/agent/queue/src/main.cpp b/src/agent/queue/src/main.cpp index 398d4e93ec..4fdf62c759 100644 --- a/src/agent/queue/src/main.cpp +++ b/src/agent/queue/src/main.cpp @@ -2,7 +2,7 @@ #include #include -#include "queue.hpp" +#include "message_queue.hpp" int main() { diff --git a/src/agent/queue/src/queue.cpp b/src/agent/queue/src/message_queue.cpp similarity index 99% rename from src/agent/queue/src/queue.cpp rename to src/agent/queue/src/message_queue.cpp index 7c6671dc98..42ded13787 100644 --- a/src/agent/queue/src/queue.cpp +++ b/src/agent/queue/src/message_queue.cpp @@ -10,7 +10,7 @@ #include #include -#include "queue.hpp" +#include "message_queue.hpp" int MultiTypeQueue::push(Message message, bool shouldWait) { diff --git a/src/agent/queue/tests/queue_test.cpp b/src/agent/queue/tests/queue_test.cpp index b18eafb641..80236a5ab3 100644 --- a/src/agent/queue/tests/queue_test.cpp +++ b/src/agent/queue/tests/queue_test.cpp @@ -12,7 +12,7 @@ #include #include -#include "queue.hpp" +#include "message_queue.hpp" #include "queue_test.hpp" using json = nlohmann::json; diff --git a/src/agent/src/agent.cpp b/src/agent/src/agent.cpp index 1e0ede2169..94e8cb68d6 100644 --- a/src/agent/src/agent.cpp +++ b/src/agent/src/agent.cpp @@ -1,7 +1,35 @@ #include +#include + +#include #include +namespace +{ + template + auto getMessagesFromQueue(MultiTypeQueue& multiTypeQueue) + { + return [&multiTypeQueue]() -> std::string + { + std::cout << "Getting messages from queue\n"; + const auto message = multiTypeQueue.getLastMessage(messageType); + return message.data.dump(); + }; + } + + template + auto popMessagesFromQueue(MultiTypeQueue& multiTypeQueue) + { + return [&multiTypeQueue](const std::string& response) -> void + { + std::cout << "Response: " << response << '\n'; + std::cout << "Popping messages from queue\n"; + multiTypeQueue.popLastMessage(messageType); + }; + } +} // namespace + Agent::Agent() : m_communicator(m_agentInfo.GetUUID(), [this](std::string table, std::string key) -> std::string @@ -18,9 +46,12 @@ Agent::~Agent() void Agent::Run() { m_taskManager.EnqueueTask(m_communicator.WaitForTokenExpirationAndAuthenticate()); - m_taskManager.EnqueueTask(m_communicator.GetCommandsFromManager(m_messageQueue)); - m_taskManager.EnqueueTask(m_communicator.StatefulMessageProcessingTask(m_messageQueue)); - m_taskManager.EnqueueTask(m_communicator.StatelessMessageProcessingTask(m_messageQueue)); + m_taskManager.EnqueueTask(m_communicator.GetCommandsFromManager(getMessagesFromQueue(m_messageQueue), + popMessagesFromQueue(m_messageQueue))); + m_taskManager.EnqueueTask(m_communicator.StatefulMessageProcessingTask( + getMessagesFromQueue(m_messageQueue), popMessagesFromQueue(m_messageQueue))); + m_taskManager.EnqueueTask(m_communicator.StatelessMessageProcessingTask( + getMessagesFromQueue(m_messageQueue), popMessagesFromQueue(m_messageQueue))); m_signalHandler.WaitForSignal(); }