Skip to content

Commit

Permalink
feat: addition of module field, pending tests
Browse files Browse the repository at this point in the history
  • Loading branch information
LucioDonda committed Aug 6, 2024
1 parent 6683811 commit b518212
Show file tree
Hide file tree
Showing 11 changed files with 269 additions and 60 deletions.
16 changes: 8 additions & 8 deletions src/agent/queue/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
cmake_minimum_required(VERSION 3.22)

project(queue_test LANGUAGES CXX)
project(queue LANGUAGES CXX)
set(CMAKE_CXX_STANDARD 20)
set(CMAKE_CXX_STANDARD_REQUIRED True)

set(CMAKE_CXX_FLAGS "-Wall -Wextra -Wunused -pthread")

include_directories(${CMAKE_SOURCE_DIR}/include)

add_library(queue_test
src/sqlitestorage.cpp
src/queue.cpp
)

find_package(SQLiteCpp REQUIRED)
find_package(nlohmann_json REQUIRED)
find_package(fmt REQUIRED)

target_include_directories(queue_test PRIVATE include ${SQLiteCpp_INCLUDE_DIRS})
target_link_libraries(queue_test PRIVATE SQLiteCpp nlohmann_json::nlohmann_json fmt::fmt)
add_library(queue
src/sqlitestorage.cpp
src/queue.cpp
)

target_include_directories(queue PRIVATE include ${SQLiteCpp_INCLUDE_DIRS})
target_link_libraries(queue PRIVATE SQLiteCpp nlohmann_json::nlohmann_json fmt::fmt)

if(BUILD_TESTS)
add_subdirectory(tests)
Expand Down
4 changes: 2 additions & 2 deletions src/agent/queue/include/persistence.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class Persistence
* @param message
* @param queueName
*/
virtual void Store(const json& message, const std::string& queueName) = 0;
virtual void Store(const json& message, const std::string& tableName, const std::string& moduleName = "") = 0;

/**
* @brief
Expand All @@ -44,7 +44,7 @@ class Persistence
* @param queueName
* @return json
*/
virtual json RetrieveMultiple(int n, const std::string& queueName) = 0;
virtual json RetrieveMultiple(int n, const std::string& queueName, const std::string& moduleName = "") = 0;

/**
* @brief
Expand Down
15 changes: 11 additions & 4 deletions src/agent/queue/include/queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,15 @@ class MultiTypeQueue
: m_maxItems(size)
, m_timeout(timeout)
{
m_persistenceDest = PersistenceFactory::createPersistence(
"SQLite3", {static_cast<std::string>(DEFAULT_DB_PATH), m_vMessageTypeStrings});
try
{
m_persistenceDest = PersistenceFactory::createPersistence(
"SQLite3", {static_cast<std::string>(DEFAULT_DB_PATH), m_vMessageTypeStrings});
}
catch (const std::exception& e)
{
std::cerr << "Error creating persistence: " << e.what() << '\n';
}
}

// Delete copy constructor
Expand Down Expand Up @@ -99,7 +106,7 @@ class MultiTypeQueue
* @param type
* @return Message
*/
Message getLastMessage(MessageType type);
Message getLastMessage(MessageType type, const std::string module = "");

/**
* @brief Returns N messages from a queue
Expand All @@ -108,7 +115,7 @@ class MultiTypeQueue
* @param messageQuantity quantity of messages to return
* @return Message Json data othe messages fetched
*/
Message getNMessages(MessageType type, int messageQuantity);
Message getNMessages(MessageType type, int messageQuantity, const std::string moduleName = "");

/**
* @brief deletes a message from a queue
Expand Down
4 changes: 3 additions & 1 deletion src/agent/queue/include/shared.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ class Message
public:
MessageType type;
nlohmann::json data;
Message(MessageType t, nlohmann::json d)
std::string moduleName;
Message(MessageType t, nlohmann::json d, std::string mN = "")
: type(t)
, data(d)
, moduleName(mN)
{
}
};
Expand Down
5 changes: 3 additions & 2 deletions src/agent/queue/include/sqlitestorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ class SQLiteStorage : public Persistence
*
* @param message The JSON message to store.
*/
void Store(const json& message, const std::string& tableName) override;
void Store(const json& message, const std::string& tableName, const std::string& moduleName = "") override;

// TODO: pending tests!
/**
* @brief Retrieve a JSON message by its ID.
*
Expand All @@ -65,7 +66,7 @@ class SQLiteStorage : public Persistence
* @param n The number of messages to retrieve.
* @return A vector of retrieved JSON messages.
*/
json RetrieveMultiple(int n, const std::string& tableName) override;
json RetrieveMultiple(int n, const std::string& tableName, const std::string& moduleName = "") override;

/**
* @brief Remove a JSON message by its ID.
Expand Down
138 changes: 137 additions & 1 deletion src/agent/queue/src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,143 @@

int main()
{
MultiTypeQueue queue(10);

const std::map<MessageType, std::string> mapMessageTypeName {
{MessageType::STATELESS, "STATELESS"},
{MessageType::STATEFUL, "STATEFUL"},
{MessageType::COMMAND, "COMMAND"}};

//CLEANUP
std::string filePath = DEFAULT_DB_PATH;
for (const auto& entry : std::filesystem::directory_iterator("."))
{
std::string fileFullPath = entry.path();
size_t found = fileFullPath.find(filePath);
if (found != std::string::npos)
{
std::error_code ec;
std::filesystem::remove(fileFullPath, ec);
}
}

int items = 100000;
MultiTypeQueue queue(items);

auto messageConsumer = [&](int& count, MessageType messageType)
{
for (int i = 0; i < count; ++i)
{
queue.popLastMessage(messageType);
}
};

auto messageGetPop = [&](int& count, MessageType messageType)
{
for (int i = 0; i < count; ++i)
{
queue.getLastMessage(messageType);
queue.popLastMessage(messageType);
}
};

auto messageProducer = [&](int& count, MessageType messageType)
{
for (int i = 0; i < count; ++i)
{
const json dataContent1 = R"({{"Data": "for )" + mapMessageTypeName.at(messageType) + std::to_string(i) + R"("}})";
queue.timeoutPush(Message(messageType, dataContent1), true);
}
};

auto start = std::chrono::high_resolution_clock::now();
messageProducer(items, MessageType::COMMAND);
auto endCom = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::microseconds>(endCom - start);
std::cout << "Filling Command took " << duration.count() << " microseconds, per message: " << duration.count()/items << std::endl;

messageProducer(items, MessageType::STATELESS);
auto endSL = std::chrono::high_resolution_clock::now();
duration = std::chrono::duration_cast<std::chrono::microseconds>(endSL - endCom);
std::cout << "Filling SL took " << duration.count() << " microseconds, per message: " << duration.count()/items << std::endl;

messageProducer(items, MessageType::STATEFUL);
auto endSF = std::chrono::high_resolution_clock::now();
duration = std::chrono::duration_cast<std::chrono::microseconds>(endSF - endSL);
std::cout << "Filling SF took " << duration.count() << " microseconds, per message: " << duration.count()/items << std::endl;

duration = std::chrono::duration_cast<std::chrono::microseconds>(endSF - start);
std::cout << "Filling everything took " << duration.count() << " microseconds, per message: " << duration.count()/items << std::endl;

std::cout << "1- Inserted into command: " << queue.getItemsByType(MessageType::COMMAND) << std::endl;
std::cout << "1- Inserted into stateful: " << queue.getItemsByType(MessageType::STATEFUL) << std::endl;
std::cout << "1- Inserted into stateless: " << queue.getItemsByType(MessageType::STATELESS) << std::endl;

// std::thread consumerThread1(messageConsumer, std::ref(items), MessageType::COMMAND);
// std::thread consumerThread2(messageConsumer, std::ref(items), MessageType::STATELESS);
// std::thread consumerThread3(messageConsumer, std::ref(items), MessageType::STATEFUL);


// auto startConsuming = std::chrono::high_resolution_clock::now();
// if (consumerThread1.joinable())
// {
// consumerThread1.join();
// }

// if (consumerThread2.joinable())
// {
// consumerThread2.join();
// }

// if (consumerThread3.joinable())
// {
// consumerThread3.join();
// }

// auto endConsuming = std::chrono::high_resolution_clock::now();
// duration = std::chrono::duration_cast<std::chrono::microseconds>(endConsuming - startConsuming);
// std::cout << "Consuming everything took " << duration.count() << " microseconds, per message: " << duration.count()/items << std::endl;

// std::cout << "2- Inserted into command: " << queue.getItemsByType(MessageType::COMMAND) << std::endl;
// std::cout << "2- Inserted into stateful: " << queue.getItemsByType(MessageType::STATEFUL) << std::endl;
// std::cout << "2- Inserted into stateless: " << queue.getItemsByType(MessageType::STATELESS) << std::endl;

// // FIll again
// messageProducer(items, MessageType::COMMAND);
// messageProducer(items, MessageType::STATELESS);
// messageProducer(items, MessageType::STATEFUL);

// std::cout << "3- Inserted into command: " << queue.getItemsByType(MessageType::COMMAND) << std::endl;
// std::cout << "3- Inserted into stateful: " << queue.getItemsByType(MessageType::STATEFUL) << std::endl;
// std::cout << "3- Inserted into stateless: " << queue.getItemsByType(MessageType::STATELESS) << std::endl;

// std::thread consumerThread11(messageGetPop, std::ref(items), MessageType::COMMAND);
// std::thread consumerThread12(messageGetPop, std::ref(items), MessageType::STATELESS);
// std::thread consumerThread13(messageGetPop, std::ref(items), MessageType::STATEFUL);


// auto startGettingPopping = std::chrono::high_resolution_clock::now();
// if (consumerThread11.joinable())
// {
// consumerThread11.join();
// }

// if (consumerThread12.joinable())
// {
// consumerThread12.join();
// }

// if (consumerThread13.joinable())
// {
// consumerThread13.join();
// }

// auto endGettingPopping = std::chrono::high_resolution_clock::now();
// duration = std::chrono::duration_cast<std::chrono::microseconds>(endGettingPopping - startGettingPopping);
// std::cout << "Getting Popping everything took " << duration.count() << " microseconds, per message: " << duration.count()/items << std::endl;

// std::cout << "4- Inserted into command: " << queue.getItemsByType(MessageType::COMMAND) << std::endl;
// std::cout << "4- Inserted into stateful: " << queue.getItemsByType(MessageType::STATEFUL) << std::endl;
// std::cout << "4- Inserted into stateless: " << queue.getItemsByType(MessageType::STATELESS) << std::endl;

return 0;
}
23 changes: 11 additions & 12 deletions src/agent/queue/src/queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,6 @@ bool MultiTypeQueue::timeoutPush(Message message, bool shouldWait)
m_cv.wait_for(
lock, m_timeout, [&, this] { return m_persistenceDest->GetElementCount(sMessageType) < m_maxItems; });
}
// FIXME
// else
// {
// std::cout << "Can failed because os full queue" << std::endl;
// }

auto storedMessages = m_persistenceDest->GetElementCount(sMessageType);
size_t spaceAvailable = (m_maxItems > storedMessages) ? m_maxItems - storedMessages : 0;
Expand All @@ -46,7 +41,7 @@ bool MultiTypeQueue::timeoutPush(Message message, bool shouldWait)
{
for (const auto& singleMessageData : messageData)
{
m_persistenceDest->Store(singleMessageData, sMessageType);
m_persistenceDest->Store(singleMessageData, sMessageType, message.moduleName);
m_cv.notify_all();
}
}
Expand All @@ -57,7 +52,7 @@ bool MultiTypeQueue::timeoutPush(Message message, bool shouldWait)
}
else
{
m_persistenceDest->Store(message.data, m_mapMessageTypeName.at(message.type));
m_persistenceDest->Store(message.data, m_mapMessageTypeName.at(message.type), message.moduleName);
m_cv.notify_all();
}
}
Expand All @@ -77,13 +72,17 @@ void MultiTypeQueue::timeoutPush(std::vector<Message> messages)
}
}

Message MultiTypeQueue::getLastMessage(MessageType type)
Message MultiTypeQueue::getLastMessage(MessageType type, const std::string moduleName)
{
Message result(type, {});
Message result(type, "{}"_json, moduleName);
// std::unique_lock<std::mutex> mapLock(m_mapMutex);
if (m_mapMessageTypeName.contains(type))
{
result.data = m_persistenceDest->RetrieveMultiple(1, m_mapMessageTypeName.at(type));
auto resultData = m_persistenceDest->RetrieveMultiple(1, m_mapMessageTypeName.at(type), moduleName);
if (!resultData.empty())
{
result.data = resultData;
}
}
else
{
Expand All @@ -93,12 +92,12 @@ Message MultiTypeQueue::getLastMessage(MessageType type)
return result;
}

Message MultiTypeQueue::getNMessages(MessageType type, int messageQuantity)
Message MultiTypeQueue::getNMessages(MessageType type, int messageQuantity, const std::string moduleName)
{
Message result(type, {});
if (m_mapMessageTypeName.contains(type))
{
result.data = m_persistenceDest->RetrieveMultiple(messageQuantity, m_mapMessageTypeName.at(type));
result.data = m_persistenceDest->RetrieveMultiple(messageQuantity, m_mapMessageTypeName.at(type), moduleName);
}
else
{
Expand Down
Loading

0 comments on commit b518212

Please sign in to comment.