diff --git a/src/agent/queue/CMakeLists.txt b/src/agent/queue/CMakeLists.txt index ebc1e389f5..f8e4283a01 100644 --- a/src/agent/queue/CMakeLists.txt +++ b/src/agent/queue/CMakeLists.txt @@ -21,6 +21,6 @@ add_library(queue target_include_directories(queue PRIVATE include ${SQLiteCpp_INCLUDE_DIRS}) target_link_libraries(queue PRIVATE SQLiteCpp nlohmann_json::nlohmann_json fmt::fmt Boost::asio) -# if(BUILD_TESTS) -# add_subdirectory(tests) -# endif() +if(BUILD_TESTS) + add_subdirectory(tests) +endif() diff --git a/src/agent/queue/include/queue.hpp b/src/agent/queue/include/queue.hpp index 73d1c03ca7..f400aa8ab6 100644 --- a/src/agent/queue/include/queue.hpp +++ b/src/agent/queue/include/queue.hpp @@ -132,7 +132,8 @@ class MultiTypeQueue * @param messageQuantity quantity of messages to return * @return boost::asio::awaitable */ - boost::asio::awaitable getNextNAwaitable(MessageType type, int messageQuantity, const std::string moduleName = ""); + boost::asio::awaitable + getNextNAwaitable(MessageType type, int messageQuantity, const std::string moduleName = ""); /** * @brief Returns N messages from a queue * diff --git a/src/agent/queue/src/queue.cpp b/src/agent/queue/src/queue.cpp index 77aa60d322..9052c3ccb1 100644 --- a/src/agent/queue/src/queue.cpp +++ b/src/agent/queue/src/queue.cpp @@ -138,7 +138,8 @@ Message MultiTypeQueue::getNext(MessageType type, const std::string moduleName) return result; } -boost::asio::awaitable MultiTypeQueue::getNextNAwaitable(MessageType type, int messageQuantity, const std::string moduleName) +boost::asio::awaitable +MultiTypeQueue::getNextNAwaitable(MessageType type, int messageQuantity, const std::string moduleName) { boost::asio::steady_timer timer(co_await boost::asio::this_coro::executor); @@ -151,7 +152,8 @@ boost::asio::awaitable MultiTypeQueue::getNextNAwaitable(MessageType ty co_await timer.async_wait(boost::asio::use_awaitable); } - auto resultData = m_persistenceDest->RetrieveMultiple(messageQuantity, m_mapMessageTypeName.at(type), moduleName); + auto resultData = + m_persistenceDest->RetrieveMultiple(messageQuantity, m_mapMessageTypeName.at(type), moduleName); if (!resultData.empty()) { result.data = resultData; diff --git a/src/agent/queue/src/sqlitestorage.cpp b/src/agent/queue/src/sqlitestorage.cpp index 36e62e24fc..fde8f4a8e5 100644 --- a/src/agent/queue/src/sqlitestorage.cpp +++ b/src/agent/queue/src/sqlitestorage.cpp @@ -200,10 +200,6 @@ json SQLiteStorage::RetrieveMultiple(int n, const std::string& tableName, const } } - if (!messages.empty()) - { - std::reverse(messages.begin(), messages.end()); - } return messages; } catch (const SQLite::Exception& e) diff --git a/src/agent/queue/tests/queue_test.cpp b/src/agent/queue/tests/queue_test.cpp index c1f9599d01..e458050d7a 100644 --- a/src/agent/queue/tests/queue_test.cpp +++ b/src/agent/queue/tests/queue_test.cpp @@ -230,7 +230,7 @@ TEST_F(QueueTest, SinglePushPopFullWithTimeout) EXPECT_NE(items, SMALL_QUEUE_CAPACITY); } -// Accesing different types of queues +// Accesing different types of queues from several threads TEST_F(QueueTest, MultithreadDifferentType) { MultiTypeQueue queue(BIG_QUEUE_CAPACITY); @@ -255,7 +255,7 @@ TEST_F(QueueTest, MultithreadDifferentType) { for (int i = 0; i < count; ++i) { - const json dataContent = R"({{"Data", "for STATELESS)" + std::to_string(i) + R"("}})"; + const json dataContent = R"({{"Data", "Number )" + std::to_string(i) + R"("}})"; EXPECT_EQ(queue.push(Message(MessageType::STATELESS, dataContent)), 1); EXPECT_EQ(queue.push(Message(MessageType::STATEFUL, dataContent)), 1); } @@ -279,10 +279,8 @@ TEST_F(QueueTest, MultithreadDifferentType) consumerThread2.join(); } - EXPECT_NE(0, queue.storedItems(MessageType::STATELESS)); - EXPECT_NE(0, queue.storedItems(MessageType::STATEFUL)); - EXPECT_FALSE(queue.isEmpty(MessageType::STATELESS)); - EXPECT_FALSE(queue.isEmpty(MessageType::STATEFUL)); + EXPECT_EQ(5, queue.storedItems(MessageType::STATELESS)); + EXPECT_EQ(5, queue.storedItems(MessageType::STATEFUL)); // Consume the rest of the messages std::thread consumerThread12(consumerStateLess, std::ref(itemsToConsume)); @@ -298,23 +296,21 @@ TEST_F(QueueTest, MultithreadDifferentType) consumerThread22.join(); } - // FIXME: this doesn't match - EXPECT_EQ(0, queue.storedItems(MessageType::STATELESS)); - EXPECT_EQ(0, queue.storedItems(MessageType::STATEFUL)); EXPECT_TRUE(queue.isEmpty(MessageType::STATELESS)); EXPECT_TRUE(queue.isEmpty(MessageType::STATEFUL)); } -// Accesing same queue +// Accesing same queue from 2 different threads TEST_F(QueueTest, MultithreadSameType) { MultiTypeQueue queue(BIG_QUEUE_CAPACITY); + auto messageType = MessageType::COMMAND; auto consumerCommand1 = [&](int& count) { for (int i = 0; i < count; ++i) { - queue.pop(MessageType::COMMAND); + queue.pop(messageType); } }; @@ -322,7 +318,7 @@ TEST_F(QueueTest, MultithreadSameType) { for (int i = 0; i < count; ++i) { - queue.pop(MessageType::COMMAND); + queue.pop(messageType); } }; @@ -331,7 +327,7 @@ TEST_F(QueueTest, MultithreadSameType) for (int i = 0; i < count; ++i) { const json dataContent = R"({{"Data": "for COMMAND)" + std::to_string(i) + R"("}})"; - EXPECT_EQ(queue.push(Message(MessageType::COMMAND, dataContent)), 1); + EXPECT_EQ(queue.push(Message(messageType, dataContent)), 1); } }; @@ -340,7 +336,7 @@ TEST_F(QueueTest, MultithreadSameType) messageProducer(itemsToInsert); - EXPECT_EQ(itemsToInsert, queue.storedItems(MessageType::COMMAND)); + EXPECT_EQ(itemsToInsert, queue.storedItems(messageType)); std::thread consumerThread1(consumerCommand1, std::ref(itemsToConsume)); std::thread messageProducerThread1(consumerCommand2, std::ref(itemsToConsume)); @@ -355,7 +351,7 @@ TEST_F(QueueTest, MultithreadSameType) consumerThread1.join(); } - EXPECT_TRUE(queue.isEmpty(MessageType::COMMAND)); + EXPECT_TRUE(queue.isEmpty(messageType)); } // Push Multiple with single message and data array, @@ -364,7 +360,6 @@ TEST_F(QueueTest, PushMultipleSeveralSingleGets) { MultiTypeQueue queue(BIG_QUEUE_CAPACITY); const MessageType messageType {MessageType::STATELESS}; - // TODO: double check array of objects const Message messageToSend {messageType, multipleDataContent}; EXPECT_EQ(3, queue.push(messageToSend)); @@ -393,10 +388,32 @@ TEST_F(QueueTest, PushMultipleWithMessageVector) messages.push_back({messageType, multipleDataContent}); } EXPECT_EQ(messages.size(), 3); - EXPECT_EQ(3, queue.push(messages)); + EXPECT_EQ(queue.push(messages), 3); EXPECT_EQ(queue.storedItems(MessageType::STATELESS), 3); } +//push message vector with a mutiple data element +TEST_F(QueueTest, PushVectorWithAMultipleInside) +{ + MultiTypeQueue queue(BIG_QUEUE_CAPACITY); + + std::vector messages; + + // triple data content message + const MessageType messageType {MessageType::STATELESS}; + const Message messageToSend {messageType, multipleDataContent}; + messages.push_back(messageToSend); + + // triple message vector + for (std::string i : {"0", "1", "2"}) + { + const json dataContent = {"content " + i}; + messages.push_back({messageType, dataContent}); + } + + EXPECT_EQ(6, queue.push(messages)); +} + // Push Multiple, pop multiples TEST_F(QueueTest, PushMultipleGetMultiple) { @@ -424,10 +441,10 @@ TEST_F(QueueTest, PushMultipleGetMultipleWithModule) // Altough we're asking for 10 messages only the availables are returned. auto messagesReceived = queue.getNextN(MessageType::STATELESS, 10); - int i = 3; + int i = 0; for (auto singleMessage : messagesReceived) { - EXPECT_EQ("content " + std::to_string(i--), singleMessage.data.at("data")); + EXPECT_EQ("content " + std::to_string(++i), singleMessage.data.at("data")); } EXPECT_EQ(0, queue.storedItems(MessageType::STATELESS, "fakemodule")); @@ -450,10 +467,10 @@ TEST_F(QueueTest, PushSinglesleGetMultipleWithModule) auto messagesReceived = queue.getNextN(MessageType::STATELESS, 10); EXPECT_EQ(5, messagesReceived.size()); - int i = 5; + int i = 0; for (auto singleMessage : messagesReceived) { - auto val = i--; + auto val = ++i; EXPECT_EQ("content-" + std::to_string(val), singleMessage.data.at("data")); EXPECT_EQ("module-" + std::to_string(val), singleMessage.data.at("module")); } @@ -462,7 +479,7 @@ TEST_F(QueueTest, PushSinglesleGetMultipleWithModule) EXPECT_EQ(1, messageReceivedContent1.size()); } -TEST_F(QueueTest, getNextAwaitableBase) +TEST_F(QueueTest, GetNextAwaitableBase) { MultiTypeQueue queue(BIG_QUEUE_CAPACITY); boost::asio::io_context io_context; @@ -473,8 +490,8 @@ TEST_F(QueueTest, getNextAwaitableBase) [&queue]() -> boost::asio::awaitable { auto messageReceived = co_await queue.getNextNAwaitable(MessageType::STATELESS, 2); - EXPECT_EQ(messageReceived.data.at(0).at("data"), "content-2"); - EXPECT_EQ(messageReceived.data.at(1).at("data"), "content-1"); + EXPECT_EQ(messageReceived.data.at(0).at("data"), "content-1"); + EXPECT_EQ(messageReceived.data.at(1).at("data"), "content-2"); }, boost::asio::detached); @@ -484,7 +501,7 @@ TEST_F(QueueTest, getNextAwaitableBase) { std::this_thread::sleep_for(std::chrono::seconds(2)); const MessageType messageType {MessageType::STATELESS}; - const json multipleDataContent = {"content-1","content-2","content-3"}; + const json multipleDataContent = {"content-1", "content-2", "content-3"}; const Message messageToSend {messageType, multipleDataContent}; EXPECT_EQ(queue.push(messageToSend), 3); // io_context.stop(); @@ -494,7 +511,7 @@ TEST_F(QueueTest, getNextAwaitableBase) producer.join(); } -TEST_F(QueueTest, pushAwaitable) +TEST_F(QueueTest, PushAwaitable) { MultiTypeQueue queue(SMALL_QUEUE_CAPACITY); boost::asio::io_context io_context; @@ -508,7 +525,7 @@ TEST_F(QueueTest, pushAwaitable) } EXPECT_TRUE(queue.isFull(MessageType::STATEFUL)); - EXPECT_EQ(queue.storedItems(MessageType::STATEFUL),2); + EXPECT_EQ(queue.storedItems(MessageType::STATEFUL), 2); // Coroutine that waits till there's space to push a new message boost::asio::co_spawn( @@ -518,10 +535,10 @@ TEST_F(QueueTest, pushAwaitable) const MessageType messageType {MessageType::STATEFUL}; const json dataContent = {"content-1"}; const Message messageToSend {messageType, dataContent}; - EXPECT_EQ(queue.storedItems(MessageType::STATEFUL),2); + EXPECT_EQ(queue.storedItems(MessageType::STATEFUL), 2); auto messagesPushed = co_await queue.pushAwaitable(messageToSend); EXPECT_EQ(messagesPushed, 1); - EXPECT_EQ(queue.storedItems(MessageType::STATEFUL),2); + EXPECT_EQ(queue.storedItems(MessageType::STATEFUL), 2); }, boost::asio::detached); @@ -530,7 +547,7 @@ TEST_F(QueueTest, pushAwaitable) [&queue, &io_context]() { std::this_thread::sleep_for(std::chrono::seconds(2)); - EXPECT_EQ(queue.popN(MessageType::STATEFUL,1), 1); + EXPECT_EQ(queue.popN(MessageType::STATEFUL, 1), 1); // TODO: double check this behavior, is it mandatory to stop the context here? // io_context.stop(); }); @@ -540,3 +557,32 @@ TEST_F(QueueTest, pushAwaitable) EXPECT_TRUE(queue.isFull(MessageType::STATEFUL)); } + +TEST_F(QueueTest, FifoOrderCheck) +{ + MultiTypeQueue queue(BIG_QUEUE_CAPACITY); + + // complete the queue with messages + const MessageType messageType {MessageType::STATEFUL}; + for (int i : {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}) + { + const json dataContent = R"({"Data" : "for STATEFUL)" + std::to_string(i) + R"("})"; + EXPECT_EQ(queue.push({messageType, dataContent}), 1); + } + + auto messageReceivedVector = queue.getNextN(messageType, 10); + EXPECT_EQ(messageReceivedVector.size(), 10); + int i = 0; + for (auto singleMessage : messageReceivedVector) + { + EXPECT_EQ(singleMessage.data.at("data"), R"({"Data" : "for STATEFUL)" + std::to_string(++i) + R"("})"); + } + + // Kepp the order of the message: FIFO + for (int i : {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}) + { + auto messageReceived = queue.getNextN(messageType, 1); + EXPECT_EQ(messageReceived.at(0).data.at("data"), R"({"Data" : "for STATEFUL)" + std::to_string(i) + R"("})"); + EXPECT_TRUE(queue.pop(messageType)); + } +} diff --git a/src/agent/queue/tests/sqlitestorage_test.cpp b/src/agent/queue/tests/sqlitestorage_test.cpp index 5f04df57d4..fdb36e4769 100644 --- a/src/agent/queue/tests/sqlitestorage_test.cpp +++ b/src/agent/queue/tests/sqlitestorage_test.cpp @@ -133,10 +133,10 @@ TEST_F(SQLiteStorageTest, RetrieveMultipleMessagesWithModule) json retrievedMessages = storage->RetrieveMultiple(4, tableName, moduleName); EXPECT_EQ(retrievedMessages.size(), 4); - int i = 4; + int i = 0; for (auto singleMessage : retrievedMessages) { - EXPECT_EQ("value" + std::to_string(i--), singleMessage.at("data").at("key")); + EXPECT_EQ("value" + std::to_string(++i), singleMessage.at("data").at("key")); } }