From 1ce35f5b3d8876bb56b4919e6af56f93324382d0 Mon Sep 17 00:00:00 2001 From: LucioDonda Date: Sun, 11 Aug 2024 19:31:10 -0300 Subject: [PATCH] feat: push and get awaitable approach with base tests --- src/agent/queue/CMakeLists.txt | 3 +- src/agent/queue/include/queue.hpp | 24 ++++++++- src/agent/queue/src/queue.cpp | 79 +++++++++++++++++++++++++-- src/agent/queue/tests/CMakeLists.txt | 3 +- src/agent/queue/tests/queue_test.cpp | 81 ++++++++++++++++++++++++++++ 5 files changed, 183 insertions(+), 7 deletions(-) diff --git a/src/agent/queue/CMakeLists.txt b/src/agent/queue/CMakeLists.txt index 783b9cc5fd..f8e4283a01 100644 --- a/src/agent/queue/CMakeLists.txt +++ b/src/agent/queue/CMakeLists.txt @@ -11,6 +11,7 @@ include_directories(${CMAKE_SOURCE_DIR}/include) find_package(SQLiteCpp REQUIRED) find_package(nlohmann_json REQUIRED) find_package(fmt REQUIRED) +find_package(Boost REQUIRED COMPONENTS asio beast) add_library(queue src/sqlitestorage.cpp @@ -18,7 +19,7 @@ add_library(queue ) target_include_directories(queue PRIVATE include ${SQLiteCpp_INCLUDE_DIRS}) -target_link_libraries(queue PRIVATE SQLiteCpp nlohmann_json::nlohmann_json fmt::fmt) +target_link_libraries(queue PRIVATE SQLiteCpp nlohmann_json::nlohmann_json fmt::fmt Boost::asio) if(BUILD_TESTS) add_subdirectory(tests) diff --git a/src/agent/queue/include/queue.hpp b/src/agent/queue/include/queue.hpp index 482a37691d..8158ad53fd 100644 --- a/src/agent/queue/include/queue.hpp +++ b/src/agent/queue/include/queue.hpp @@ -9,6 +9,10 @@ #include #include +#include +// #include +#include + #include "shared.hpp" #include "sqlitestorage.h" @@ -95,6 +99,14 @@ class MultiTypeQueue */ int push(Message message, bool shouldWait = false); + /** + * @brief pushes a message + * + * @param message to be pushed + * @return boost::asio::awaitable number of messages pushed + */ + boost::asio::awaitable pushAwaitable(Message message); + /** * @brief pushes a vector of messages * @@ -107,11 +119,19 @@ class MultiTypeQueue /** * @brief Get the Last Message object * - * @param type - * @return Message + * @param type of the queue to be used as source + * @return Message type object taken from the queue */ Message getNext(MessageType type, const std::string module = ""); + /** + * @brief Get the Next Awaitable object + * + * @param type of the queue to be used as source + * @param module module name + * @return boost::asio::awaitable + */ + boost::asio::awaitable getNextAwaitable(MessageType type, const std::string module = ""); /** * @brief Returns N messages from a queue * diff --git a/src/agent/queue/src/queue.cpp b/src/agent/queue/src/queue.cpp index ca001131f0..7c6671dc98 100644 --- a/src/agent/queue/src/queue.cpp +++ b/src/agent/queue/src/queue.cpp @@ -43,9 +43,51 @@ int MultiTypeQueue::push(Message message, bool shouldWait) m_cv.notify_all(); } } - else + } + else + { + result = + m_persistenceDest->Store(message.data, m_mapMessageTypeName.at(message.type), message.moduleName); + m_cv.notify_all(); + } + } + } + else + { + std::cout << "error didn't find the queue" << std::endl; + } + return result; +} + +boost::asio::awaitable MultiTypeQueue::pushAwaitable(Message message) +{ + int result = 0; + boost::asio::steady_timer timer(co_await boost::asio::this_coro::executor); + + if (m_mapMessageTypeName.contains(message.type)) + { + auto sMessageType = m_mapMessageTypeName.at(message.type); + + while (m_persistenceDest->GetElementCount(sMessageType) >= m_maxItems) + { + timer.expires_after(std::chrono::milliseconds(100)); + co_await timer.async_wait(boost::asio::use_awaitable); + } + + auto storedMessages = m_persistenceDest->GetElementCount(sMessageType); + size_t spaceAvailable = (m_maxItems > storedMessages) ? m_maxItems - storedMessages : 0; + if (spaceAvailable) + { + auto messageData = message.data; + if (messageData.is_array()) + { + if (messageData.size() <= spaceAvailable) { - result = false; + for (const auto& singleMessageData : messageData) + { + result += m_persistenceDest->Store(singleMessageData, sMessageType, message.moduleName); + m_cv.notify_all(); + } } } else @@ -60,7 +102,7 @@ int MultiTypeQueue::push(Message message, bool shouldWait) { std::cout << "error didn't find the queue" << std::endl; } - return result; + co_return result; } int MultiTypeQueue::push(std::vector messages) @@ -96,6 +138,37 @@ Message MultiTypeQueue::getNext(MessageType type, const std::string moduleName) return result; } +boost::asio::awaitable MultiTypeQueue::getNextAwaitable(MessageType type, const std::string moduleName) +{ + boost::asio::steady_timer timer(co_await boost::asio::this_coro::executor); + + Message result(type, "{}"_json, moduleName); + if (m_mapMessageTypeName.contains(type)) + { + while (isEmpty(type)) + { + timer.expires_after(std::chrono::milliseconds(100)); + co_await timer.async_wait(boost::asio::use_awaitable); + } + + auto resultData = m_persistenceDest->RetrieveMultiple(1, m_mapMessageTypeName.at(type), moduleName); + if (!resultData.empty()) + { + result.data = resultData; + if (moduleName.empty()) + { + result.moduleName = result.data.at(0).at("module"); + } + } + } + else + { + // TODO: error handling and logging + std::cout << "error didn't find the queue" << std::endl; + } + co_return result; +} + std::vector MultiTypeQueue::getNextN(MessageType type, int messageQuantity, const std::string moduleName) { std::vector result; diff --git a/src/agent/queue/tests/CMakeLists.txt b/src/agent/queue/tests/CMakeLists.txt index 40b9490bdf..aea65975ff 100644 --- a/src/agent/queue/tests/CMakeLists.txt +++ b/src/agent/queue/tests/CMakeLists.txt @@ -20,7 +20,8 @@ target_link_libraries(test_queue queue SQLiteCpp nlohmann_json::nlohmann_json - fmt::fmt) + fmt::fmt + Boost::asio) # Create a test executable for SQLiteStorage tests add_executable(test_sqlitestorage diff --git a/src/agent/queue/tests/queue_test.cpp b/src/agent/queue/tests/queue_test.cpp index 1023c524fa..b18eafb641 100644 --- a/src/agent/queue/tests/queue_test.cpp +++ b/src/agent/queue/tests/queue_test.cpp @@ -9,6 +9,9 @@ #include #include +#include +#include + #include "queue.hpp" #include "queue_test.hpp" @@ -459,3 +462,81 @@ TEST_F(QueueTest, PushSinglesleGetMultipleWithModule) auto messageReceivedContent1 = queue.getNextN(MessageType::STATELESS, 10, "module-1"); EXPECT_EQ(1, messageReceivedContent1.size()); } + +TEST_F(QueueTest, getNextAwaitable) +{ + MultiTypeQueue queue(BIG_QUEUE_CAPACITY); + boost::asio::io_context io_context; + + const MessageType messageType {MessageType::STATEFUL}; + const json multipleDataContent = {"content-1"}; + const Message messageToSend {messageType, multipleDataContent}; + + // Coroutine that waits till there's a message of the needed type on the queue + boost::asio::co_spawn( + io_context, + [&queue]() -> boost::asio::awaitable + { + auto messageReceived = co_await queue.getNextAwaitable(MessageType::STATELESS); + EXPECT_EQ(messageReceived.data.at(0).at("data"), "content-0"); + }, + boost::asio::detached); + + // Simulate the addition of needed message to the queue after some time + std::thread producer( + [&queue, &io_context]() + { + std::this_thread::sleep_for(std::chrono::seconds(2)); + const MessageType messageType {MessageType::STATELESS}; + const json multipleDataContent = {"content-0"}; + const Message messageToSend {messageType, multipleDataContent}; + EXPECT_EQ(queue.push(messageToSend), 1); + io_context.stop(); + }); + + io_context.run(); + producer.join(); +} + +TEST_F(QueueTest, pushAwaitable) +{ + MultiTypeQueue queue(SMALL_QUEUE_CAPACITY); + boost::asio::io_context io_context; + + // complete the queue with messages + const MessageType messageType {MessageType::STATEFUL}; + for (int i : {1, 2}) + { + const json dataContent = R"({"Data" : "for STATEFUL)" + std::to_string(i) + R"("})"; + EXPECT_EQ(queue.push({messageType, dataContent}), 1); + } + + EXPECT_TRUE(queue.isFull(MessageType::STATEFUL)); + + // Coroutine that waits till there's space on the to push a new messagequeue + boost::asio::co_spawn( + io_context, + [&queue]() -> boost::asio::awaitable + { + const MessageType messageType {MessageType::STATEFUL}; + const json multipleDataContent = {"content-1"}; + const Message messageToSend {messageType, multipleDataContent}; + auto messagesPushed = co_await queue.pushAwaitable(messageToSend); + EXPECT_EQ(messagesPushed, 1); + }, + boost::asio::detached); + + // Simulate poping one message tll there's space to push a new one + std::thread consumer( + [&queue, &io_context]() + { + std::this_thread::sleep_for(std::chrono::seconds(2)); + EXPECT_EQ(queue.pop(MessageType::STATEFUL), 1); + io_context.stop(); + }); + + io_context.run(); + consumer.join(); + + EXPECT_TRUE(queue.isFull(MessageType::STATEFUL)); +}