Skip to content

Commit

Permalink
feat: push and get awaitable approach with base tests
Browse files Browse the repository at this point in the history
  • Loading branch information
LucioDonda committed Aug 12, 2024
1 parent 117ce4f commit 1ce35f5
Show file tree
Hide file tree
Showing 5 changed files with 183 additions and 7 deletions.
3 changes: 2 additions & 1 deletion src/agent/queue/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@ include_directories(${CMAKE_SOURCE_DIR}/include)
find_package(SQLiteCpp REQUIRED)
find_package(nlohmann_json REQUIRED)
find_package(fmt REQUIRED)
find_package(Boost REQUIRED COMPONENTS asio beast)

add_library(queue
src/sqlitestorage.cpp
src/queue.cpp
)

target_include_directories(queue PRIVATE include ${SQLiteCpp_INCLUDE_DIRS})
target_link_libraries(queue PRIVATE SQLiteCpp nlohmann_json::nlohmann_json fmt::fmt)
target_link_libraries(queue PRIVATE SQLiteCpp nlohmann_json::nlohmann_json fmt::fmt Boost::asio)

if(BUILD_TESTS)
add_subdirectory(tests)
Expand Down
24 changes: 22 additions & 2 deletions src/agent/queue/include/queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@
#include <unordered_map>
#include <vector>

#include <boost/asio.hpp>
// #include <boost/asio/experimental/awaitable_operators.hpp>
#include <cassert>

#include "shared.hpp"
#include "sqlitestorage.h"

Expand Down Expand Up @@ -95,6 +99,14 @@ class MultiTypeQueue
*/
int push(Message message, bool shouldWait = false);

/**
* @brief pushes a message
*
* @param message to be pushed
* @return boost::asio::awaitable<int> number of messages pushed
*/
boost::asio::awaitable<int> pushAwaitable(Message message);

/**
* @brief pushes a vector of messages
*
Expand All @@ -107,11 +119,19 @@ class MultiTypeQueue
/**
* @brief Get the Last Message object
*
* @param type
* @return Message
* @param type of the queue to be used as source
* @return Message type object taken from the queue
*/
Message getNext(MessageType type, const std::string module = "");

/**
* @brief Get the Next Awaitable object
*
* @param type of the queue to be used as source
* @param module module name
* @return boost::asio::awaitable<Message>
*/
boost::asio::awaitable<Message> getNextAwaitable(MessageType type, const std::string module = "");
/**
* @brief Returns N messages from a queue
*
Expand Down
79 changes: 76 additions & 3 deletions src/agent/queue/src/queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,51 @@ int MultiTypeQueue::push(Message message, bool shouldWait)
m_cv.notify_all();
}
}
else
}
else
{
result =
m_persistenceDest->Store(message.data, m_mapMessageTypeName.at(message.type), message.moduleName);
m_cv.notify_all();
}
}
}
else
{
std::cout << "error didn't find the queue" << std::endl;
}
return result;
}

boost::asio::awaitable<int> MultiTypeQueue::pushAwaitable(Message message)
{
int result = 0;
boost::asio::steady_timer timer(co_await boost::asio::this_coro::executor);

if (m_mapMessageTypeName.contains(message.type))
{
auto sMessageType = m_mapMessageTypeName.at(message.type);

while (m_persistenceDest->GetElementCount(sMessageType) >= m_maxItems)
{
timer.expires_after(std::chrono::milliseconds(100));
co_await timer.async_wait(boost::asio::use_awaitable);
}

auto storedMessages = m_persistenceDest->GetElementCount(sMessageType);
size_t spaceAvailable = (m_maxItems > storedMessages) ? m_maxItems - storedMessages : 0;
if (spaceAvailable)
{
auto messageData = message.data;
if (messageData.is_array())
{
if (messageData.size() <= spaceAvailable)
{
result = false;
for (const auto& singleMessageData : messageData)
{
result += m_persistenceDest->Store(singleMessageData, sMessageType, message.moduleName);
m_cv.notify_all();
}
}
}
else
Expand All @@ -60,7 +102,7 @@ int MultiTypeQueue::push(Message message, bool shouldWait)
{
std::cout << "error didn't find the queue" << std::endl;
}
return result;
co_return result;
}

int MultiTypeQueue::push(std::vector<Message> messages)
Expand Down Expand Up @@ -96,6 +138,37 @@ Message MultiTypeQueue::getNext(MessageType type, const std::string moduleName)
return result;
}

boost::asio::awaitable<Message> MultiTypeQueue::getNextAwaitable(MessageType type, const std::string moduleName)
{
boost::asio::steady_timer timer(co_await boost::asio::this_coro::executor);

Message result(type, "{}"_json, moduleName);
if (m_mapMessageTypeName.contains(type))
{
while (isEmpty(type))
{
timer.expires_after(std::chrono::milliseconds(100));
co_await timer.async_wait(boost::asio::use_awaitable);
}

auto resultData = m_persistenceDest->RetrieveMultiple(1, m_mapMessageTypeName.at(type), moduleName);
if (!resultData.empty())
{
result.data = resultData;
if (moduleName.empty())
{
result.moduleName = result.data.at(0).at("module");
}
}
}
else
{
// TODO: error handling and logging
std::cout << "error didn't find the queue" << std::endl;
}
co_return result;
}

std::vector<Message> MultiTypeQueue::getNextN(MessageType type, int messageQuantity, const std::string moduleName)
{
std::vector<Message> result;
Expand Down
3 changes: 2 additions & 1 deletion src/agent/queue/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ target_link_libraries(test_queue
queue
SQLiteCpp
nlohmann_json::nlohmann_json
fmt::fmt)
fmt::fmt
Boost::asio)

# Create a test executable for SQLiteStorage tests
add_executable(test_sqlitestorage
Expand Down
81 changes: 81 additions & 0 deletions src/agent/queue/tests/queue_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
#include <stop_token>
#include <thread>

#include <boost/asio.hpp>
#include <boost/asio/experimental/co_spawn.hpp>

#include "queue.hpp"
#include "queue_test.hpp"

Expand Down Expand Up @@ -459,3 +462,81 @@ TEST_F(QueueTest, PushSinglesleGetMultipleWithModule)
auto messageReceivedContent1 = queue.getNextN(MessageType::STATELESS, 10, "module-1");
EXPECT_EQ(1, messageReceivedContent1.size());
}

TEST_F(QueueTest, getNextAwaitable)
{
MultiTypeQueue queue(BIG_QUEUE_CAPACITY);
boost::asio::io_context io_context;

const MessageType messageType {MessageType::STATEFUL};
const json multipleDataContent = {"content-1"};
const Message messageToSend {messageType, multipleDataContent};

// Coroutine that waits till there's a message of the needed type on the queue
boost::asio::co_spawn(
io_context,
[&queue]() -> boost::asio::awaitable<void>
{
auto messageReceived = co_await queue.getNextAwaitable(MessageType::STATELESS);
EXPECT_EQ(messageReceived.data.at(0).at("data"), "content-0");
},
boost::asio::detached);

// Simulate the addition of needed message to the queue after some time
std::thread producer(
[&queue, &io_context]()
{
std::this_thread::sleep_for(std::chrono::seconds(2));
const MessageType messageType {MessageType::STATELESS};
const json multipleDataContent = {"content-0"};
const Message messageToSend {messageType, multipleDataContent};
EXPECT_EQ(queue.push(messageToSend), 1);
io_context.stop();
});

io_context.run();
producer.join();
}

TEST_F(QueueTest, pushAwaitable)
{
MultiTypeQueue queue(SMALL_QUEUE_CAPACITY);
boost::asio::io_context io_context;

// complete the queue with messages
const MessageType messageType {MessageType::STATEFUL};
for (int i : {1, 2})
{
const json dataContent = R"({"Data" : "for STATEFUL)" + std::to_string(i) + R"("})";
EXPECT_EQ(queue.push({messageType, dataContent}), 1);
}

EXPECT_TRUE(queue.isFull(MessageType::STATEFUL));

// Coroutine that waits till there's space on the to push a new messagequeue
boost::asio::co_spawn(
io_context,
[&queue]() -> boost::asio::awaitable<void>
{
const MessageType messageType {MessageType::STATEFUL};
const json multipleDataContent = {"content-1"};
const Message messageToSend {messageType, multipleDataContent};
auto messagesPushed = co_await queue.pushAwaitable(messageToSend);
EXPECT_EQ(messagesPushed, 1);
},
boost::asio::detached);

// Simulate poping one message tll there's space to push a new one
std::thread consumer(
[&queue, &io_context]()
{
std::this_thread::sleep_for(std::chrono::seconds(2));
EXPECT_EQ(queue.pop(MessageType::STATEFUL), 1);
io_context.stop();
});

io_context.run();
consumer.join();

EXPECT_TRUE(queue.isFull(MessageType::STATEFUL));
}

0 comments on commit 1ce35f5

Please sign in to comment.