Skip to content

Commit

Permalink
fix: correct fifo order with additional tests correction
Browse files Browse the repository at this point in the history
  • Loading branch information
LucioDonda committed Aug 12, 2024
1 parent ad08082 commit bd687de
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 42 deletions.
6 changes: 3 additions & 3 deletions src/agent/queue/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@ add_library(queue
target_include_directories(queue PRIVATE include ${SQLiteCpp_INCLUDE_DIRS})
target_link_libraries(queue PRIVATE SQLiteCpp nlohmann_json::nlohmann_json fmt::fmt Boost::asio)

# if(BUILD_TESTS)
# add_subdirectory(tests)
# endif()
if(BUILD_TESTS)
add_subdirectory(tests)
endif()
3 changes: 2 additions & 1 deletion src/agent/queue/include/queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ class MultiTypeQueue
* @param messageQuantity quantity of messages to return
* @return boost::asio::awaitable<Message>
*/
boost::asio::awaitable<Message> getNextNAwaitable(MessageType type, int messageQuantity, const std::string moduleName = "");
boost::asio::awaitable<Message>
getNextNAwaitable(MessageType type, int messageQuantity, const std::string moduleName = "");
/**
* @brief Returns N messages from a queue
*
Expand Down
6 changes: 4 additions & 2 deletions src/agent/queue/src/queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ Message MultiTypeQueue::getNext(MessageType type, const std::string moduleName)
return result;
}

boost::asio::awaitable<Message> MultiTypeQueue::getNextNAwaitable(MessageType type, int messageQuantity, const std::string moduleName)
boost::asio::awaitable<Message>
MultiTypeQueue::getNextNAwaitable(MessageType type, int messageQuantity, const std::string moduleName)
{
boost::asio::steady_timer timer(co_await boost::asio::this_coro::executor);

Expand All @@ -151,7 +152,8 @@ boost::asio::awaitable<Message> MultiTypeQueue::getNextNAwaitable(MessageType ty
co_await timer.async_wait(boost::asio::use_awaitable);
}

auto resultData = m_persistenceDest->RetrieveMultiple(messageQuantity, m_mapMessageTypeName.at(type), moduleName);
auto resultData =
m_persistenceDest->RetrieveMultiple(messageQuantity, m_mapMessageTypeName.at(type), moduleName);
if (!resultData.empty())
{
result.data = resultData;
Expand Down
4 changes: 0 additions & 4 deletions src/agent/queue/src/sqlitestorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,6 @@ json SQLiteStorage::RetrieveMultiple(int n, const std::string& tableName, const
}
}

if (!messages.empty())
{
std::reverse(messages.begin(), messages.end());
}
return messages;
}
catch (const SQLite::Exception& e)
Expand Down
106 changes: 76 additions & 30 deletions src/agent/queue/tests/queue_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ TEST_F(QueueTest, SinglePushPopFullWithTimeout)
EXPECT_NE(items, SMALL_QUEUE_CAPACITY);
}

// Accesing different types of queues
// Accesing different types of queues from several threads
TEST_F(QueueTest, MultithreadDifferentType)
{
MultiTypeQueue queue(BIG_QUEUE_CAPACITY);
Expand All @@ -255,7 +255,7 @@ TEST_F(QueueTest, MultithreadDifferentType)
{
for (int i = 0; i < count; ++i)
{
const json dataContent = R"({{"Data", "for STATELESS)" + std::to_string(i) + R"("}})";
const json dataContent = R"({{"Data", "Number )" + std::to_string(i) + R"("}})";
EXPECT_EQ(queue.push(Message(MessageType::STATELESS, dataContent)), 1);
EXPECT_EQ(queue.push(Message(MessageType::STATEFUL, dataContent)), 1);
}
Expand All @@ -279,10 +279,8 @@ TEST_F(QueueTest, MultithreadDifferentType)
consumerThread2.join();
}

EXPECT_NE(0, queue.storedItems(MessageType::STATELESS));
EXPECT_NE(0, queue.storedItems(MessageType::STATEFUL));
EXPECT_FALSE(queue.isEmpty(MessageType::STATELESS));
EXPECT_FALSE(queue.isEmpty(MessageType::STATEFUL));
EXPECT_EQ(5, queue.storedItems(MessageType::STATELESS));
EXPECT_EQ(5, queue.storedItems(MessageType::STATEFUL));

// Consume the rest of the messages
std::thread consumerThread12(consumerStateLess, std::ref(itemsToConsume));
Expand All @@ -298,31 +296,29 @@ TEST_F(QueueTest, MultithreadDifferentType)
consumerThread22.join();
}

// FIXME: this doesn't match
EXPECT_EQ(0, queue.storedItems(MessageType::STATELESS));
EXPECT_EQ(0, queue.storedItems(MessageType::STATEFUL));
EXPECT_TRUE(queue.isEmpty(MessageType::STATELESS));
EXPECT_TRUE(queue.isEmpty(MessageType::STATEFUL));
}

// Accesing same queue
// Accesing same queue from 2 different threads
TEST_F(QueueTest, MultithreadSameType)
{
MultiTypeQueue queue(BIG_QUEUE_CAPACITY);
auto messageType = MessageType::COMMAND;

auto consumerCommand1 = [&](int& count)
{
for (int i = 0; i < count; ++i)
{
queue.pop(MessageType::COMMAND);
queue.pop(messageType);
}
};

auto consumerCommand2 = [&](int& count)
{
for (int i = 0; i < count; ++i)
{
queue.pop(MessageType::COMMAND);
queue.pop(messageType);
}
};

Expand All @@ -331,7 +327,7 @@ TEST_F(QueueTest, MultithreadSameType)
for (int i = 0; i < count; ++i)
{
const json dataContent = R"({{"Data": "for COMMAND)" + std::to_string(i) + R"("}})";
EXPECT_EQ(queue.push(Message(MessageType::COMMAND, dataContent)), 1);
EXPECT_EQ(queue.push(Message(messageType, dataContent)), 1);
}
};

Expand All @@ -340,7 +336,7 @@ TEST_F(QueueTest, MultithreadSameType)

messageProducer(itemsToInsert);

EXPECT_EQ(itemsToInsert, queue.storedItems(MessageType::COMMAND));
EXPECT_EQ(itemsToInsert, queue.storedItems(messageType));

std::thread consumerThread1(consumerCommand1, std::ref(itemsToConsume));
std::thread messageProducerThread1(consumerCommand2, std::ref(itemsToConsume));
Expand All @@ -355,7 +351,7 @@ TEST_F(QueueTest, MultithreadSameType)
consumerThread1.join();
}

EXPECT_TRUE(queue.isEmpty(MessageType::COMMAND));
EXPECT_TRUE(queue.isEmpty(messageType));
}

// Push Multiple with single message and data array,
Expand All @@ -364,7 +360,6 @@ TEST_F(QueueTest, PushMultipleSeveralSingleGets)
{
MultiTypeQueue queue(BIG_QUEUE_CAPACITY);
const MessageType messageType {MessageType::STATELESS};
// TODO: double check array of objects
const Message messageToSend {messageType, multipleDataContent};

EXPECT_EQ(3, queue.push(messageToSend));
Expand Down Expand Up @@ -393,10 +388,32 @@ TEST_F(QueueTest, PushMultipleWithMessageVector)
messages.push_back({messageType, multipleDataContent});
}
EXPECT_EQ(messages.size(), 3);
EXPECT_EQ(3, queue.push(messages));
EXPECT_EQ(queue.push(messages), 3);
EXPECT_EQ(queue.storedItems(MessageType::STATELESS), 3);
}

//push message vector with a mutiple data element
TEST_F(QueueTest, PushVectorWithAMultipleInside)
{
MultiTypeQueue queue(BIG_QUEUE_CAPACITY);

std::vector<Message> messages;

// triple data content message
const MessageType messageType {MessageType::STATELESS};
const Message messageToSend {messageType, multipleDataContent};
messages.push_back(messageToSend);

// triple message vector
for (std::string i : {"0", "1", "2"})
{
const json dataContent = {"content " + i};
messages.push_back({messageType, dataContent});
}

EXPECT_EQ(6, queue.push(messages));
}

// Push Multiple, pop multiples
TEST_F(QueueTest, PushMultipleGetMultiple)
{
Expand Down Expand Up @@ -424,10 +441,10 @@ TEST_F(QueueTest, PushMultipleGetMultipleWithModule)

// Altough we're asking for 10 messages only the availables are returned.
auto messagesReceived = queue.getNextN(MessageType::STATELESS, 10);
int i = 3;
int i = 0;
for (auto singleMessage : messagesReceived)
{
EXPECT_EQ("content " + std::to_string(i--), singleMessage.data.at("data"));
EXPECT_EQ("content " + std::to_string(++i), singleMessage.data.at("data"));
}

EXPECT_EQ(0, queue.storedItems(MessageType::STATELESS, "fakemodule"));
Expand All @@ -450,10 +467,10 @@ TEST_F(QueueTest, PushSinglesleGetMultipleWithModule)

auto messagesReceived = queue.getNextN(MessageType::STATELESS, 10);
EXPECT_EQ(5, messagesReceived.size());
int i = 5;
int i = 0;
for (auto singleMessage : messagesReceived)
{
auto val = i--;
auto val = ++i;
EXPECT_EQ("content-" + std::to_string(val), singleMessage.data.at("data"));
EXPECT_EQ("module-" + std::to_string(val), singleMessage.data.at("module"));
}
Expand All @@ -462,7 +479,7 @@ TEST_F(QueueTest, PushSinglesleGetMultipleWithModule)
EXPECT_EQ(1, messageReceivedContent1.size());
}

TEST_F(QueueTest, getNextAwaitableBase)
TEST_F(QueueTest, GetNextAwaitableBase)
{
MultiTypeQueue queue(BIG_QUEUE_CAPACITY);
boost::asio::io_context io_context;
Expand All @@ -473,8 +490,8 @@ TEST_F(QueueTest, getNextAwaitableBase)
[&queue]() -> boost::asio::awaitable<void>
{
auto messageReceived = co_await queue.getNextNAwaitable(MessageType::STATELESS, 2);
EXPECT_EQ(messageReceived.data.at(0).at("data"), "content-2");
EXPECT_EQ(messageReceived.data.at(1).at("data"), "content-1");
EXPECT_EQ(messageReceived.data.at(0).at("data"), "content-1");
EXPECT_EQ(messageReceived.data.at(1).at("data"), "content-2");
},
boost::asio::detached);

Expand All @@ -484,7 +501,7 @@ TEST_F(QueueTest, getNextAwaitableBase)
{
std::this_thread::sleep_for(std::chrono::seconds(2));
const MessageType messageType {MessageType::STATELESS};
const json multipleDataContent = {"content-1","content-2","content-3"};
const json multipleDataContent = {"content-1", "content-2", "content-3"};
const Message messageToSend {messageType, multipleDataContent};
EXPECT_EQ(queue.push(messageToSend), 3);
// io_context.stop();
Expand All @@ -494,7 +511,7 @@ TEST_F(QueueTest, getNextAwaitableBase)
producer.join();
}

TEST_F(QueueTest, pushAwaitable)
TEST_F(QueueTest, PushAwaitable)
{
MultiTypeQueue queue(SMALL_QUEUE_CAPACITY);
boost::asio::io_context io_context;
Expand All @@ -508,7 +525,7 @@ TEST_F(QueueTest, pushAwaitable)
}

EXPECT_TRUE(queue.isFull(MessageType::STATEFUL));
EXPECT_EQ(queue.storedItems(MessageType::STATEFUL),2);
EXPECT_EQ(queue.storedItems(MessageType::STATEFUL), 2);

// Coroutine that waits till there's space to push a new message
boost::asio::co_spawn(
Expand All @@ -518,10 +535,10 @@ TEST_F(QueueTest, pushAwaitable)
const MessageType messageType {MessageType::STATEFUL};
const json dataContent = {"content-1"};
const Message messageToSend {messageType, dataContent};
EXPECT_EQ(queue.storedItems(MessageType::STATEFUL),2);
EXPECT_EQ(queue.storedItems(MessageType::STATEFUL), 2);
auto messagesPushed = co_await queue.pushAwaitable(messageToSend);
EXPECT_EQ(messagesPushed, 1);
EXPECT_EQ(queue.storedItems(MessageType::STATEFUL),2);
EXPECT_EQ(queue.storedItems(MessageType::STATEFUL), 2);
},
boost::asio::detached);

Expand All @@ -530,7 +547,7 @@ TEST_F(QueueTest, pushAwaitable)
[&queue, &io_context]()
{
std::this_thread::sleep_for(std::chrono::seconds(2));
EXPECT_EQ(queue.popN(MessageType::STATEFUL,1), 1);
EXPECT_EQ(queue.popN(MessageType::STATEFUL, 1), 1);
// TODO: double check this behavior, is it mandatory to stop the context here?
// io_context.stop();
});
Expand All @@ -540,3 +557,32 @@ TEST_F(QueueTest, pushAwaitable)

EXPECT_TRUE(queue.isFull(MessageType::STATEFUL));
}

TEST_F(QueueTest, FifoOrderCheck)
{
MultiTypeQueue queue(BIG_QUEUE_CAPACITY);

// complete the queue with messages
const MessageType messageType {MessageType::STATEFUL};
for (int i : {1, 2, 3, 4, 5, 6, 7, 8, 9, 10})
{
const json dataContent = R"({"Data" : "for STATEFUL)" + std::to_string(i) + R"("})";
EXPECT_EQ(queue.push({messageType, dataContent}), 1);
}

auto messageReceivedVector = queue.getNextN(messageType, 10);
EXPECT_EQ(messageReceivedVector.size(), 10);
int i = 0;
for (auto singleMessage : messageReceivedVector)
{
EXPECT_EQ(singleMessage.data.at("data"), R"({"Data" : "for STATEFUL)" + std::to_string(++i) + R"("})");
}

// Kepp the order of the message: FIFO
for (int i : {1, 2, 3, 4, 5, 6, 7, 8, 9, 10})
{
auto messageReceived = queue.getNextN(messageType, 1);
EXPECT_EQ(messageReceived.at(0).data.at("data"), R"({"Data" : "for STATEFUL)" + std::to_string(i) + R"("})");
EXPECT_TRUE(queue.pop(messageType));
}
}
4 changes: 2 additions & 2 deletions src/agent/queue/tests/sqlitestorage_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,10 @@ TEST_F(SQLiteStorageTest, RetrieveMultipleMessagesWithModule)
json retrievedMessages = storage->RetrieveMultiple(4, tableName, moduleName);
EXPECT_EQ(retrievedMessages.size(), 4);

int i = 4;
int i = 0;
for (auto singleMessage : retrievedMessages)
{
EXPECT_EQ("value" + std::to_string(i--), singleMessage.at("data").at("key"));
EXPECT_EQ("value" + std::to_string(++i), singleMessage.at("data").at("key"));
}
}

Expand Down

0 comments on commit bd687de

Please sign in to comment.