Skip to content

Commit

Permalink
feat: addition of module field, pending tests
Browse files Browse the repository at this point in the history
  • Loading branch information
LucioDonda committed Aug 7, 2024
1 parent 6683811 commit 93ec454
Show file tree
Hide file tree
Showing 11 changed files with 160 additions and 89 deletions.
16 changes: 8 additions & 8 deletions src/agent/queue/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
cmake_minimum_required(VERSION 3.22)

project(queue_test LANGUAGES CXX)
project(queue LANGUAGES CXX)
set(CMAKE_CXX_STANDARD 20)
set(CMAKE_CXX_STANDARD_REQUIRED True)

set(CMAKE_CXX_FLAGS "-Wall -Wextra -Wunused -pthread")

include_directories(${CMAKE_SOURCE_DIR}/include)

add_library(queue_test
src/sqlitestorage.cpp
src/queue.cpp
)

find_package(SQLiteCpp REQUIRED)
find_package(nlohmann_json REQUIRED)
find_package(fmt REQUIRED)

target_include_directories(queue_test PRIVATE include ${SQLiteCpp_INCLUDE_DIRS})
target_link_libraries(queue_test PRIVATE SQLiteCpp nlohmann_json::nlohmann_json fmt::fmt)
add_library(queue
src/sqlitestorage.cpp
src/queue.cpp
)

target_include_directories(queue PRIVATE include ${SQLiteCpp_INCLUDE_DIRS})
target_link_libraries(queue PRIVATE SQLiteCpp nlohmann_json::nlohmann_json fmt::fmt)

if(BUILD_TESTS)
add_subdirectory(tests)
Expand Down
58 changes: 29 additions & 29 deletions src/agent/queue/include/persistence.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,54 +21,54 @@ class Persistence
virtual ~Persistence() = default;

/**
* @brief
*
* @param message
* @param queueName
* @brief
*
* @param message
* @param queueName
*/
virtual void Store(const json& message, const std::string& queueName) = 0;
virtual void Store(const json& message, const std::string& tableName, const std::string& moduleName = "") = 0;

/**
* @brief
*
* @param id
* @param queueName
* @return json
* @brief
*
* @param id
* @param queueName
* @return json
*/
virtual json Retrieve(int id, const std::string& queueName) = 0;

/**
* @brief
*
* @param n
* @param queueName
* @return json
* @brief
*
* @param n
* @param queueName
* @return json
*/
virtual json RetrieveMultiple(int n, const std::string& queueName) = 0;
virtual json RetrieveMultiple(int n, const std::string& queueName, const std::string& moduleName = "") = 0;

/**
* @brief
*
* @param id
* @param queueName
* @return int
* @brief
*
* @param id
* @param queueName
* @return int
*/
virtual int Remove(int id, const std::string& queueName) = 0;

/**
* @brief
*
* @param n
* @param queueName
* @return int
* @brief
*
* @param n
* @param queueName
* @return int
*/
virtual int RemoveMultiple(int n, const std::string& queueName) = 0;

/**
* @brief Get the Element Count object
*
* @param queueName
* @return int
*
* @param queueName
* @return int
*/
virtual int GetElementCount(const std::string& queueName) = 0;
};
Expand Down
15 changes: 11 additions & 4 deletions src/agent/queue/include/queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,15 @@ class MultiTypeQueue
: m_maxItems(size)
, m_timeout(timeout)
{
m_persistenceDest = PersistenceFactory::createPersistence(
"SQLite3", {static_cast<std::string>(DEFAULT_DB_PATH), m_vMessageTypeStrings});
try
{
m_persistenceDest = PersistenceFactory::createPersistence(
"SQLite3", {static_cast<std::string>(DEFAULT_DB_PATH), m_vMessageTypeStrings});
}
catch (const std::exception& e)
{
std::cerr << "Error creating persistence: " << e.what() << '\n';
}
}

// Delete copy constructor
Expand Down Expand Up @@ -99,7 +106,7 @@ class MultiTypeQueue
* @param type
* @return Message
*/
Message getLastMessage(MessageType type);
Message getLastMessage(MessageType type, const std::string module = "");

/**
* @brief Returns N messages from a queue
Expand All @@ -108,7 +115,7 @@ class MultiTypeQueue
* @param messageQuantity quantity of messages to return
* @return Message Json data othe messages fetched
*/
Message getNMessages(MessageType type, int messageQuantity);
Message getNMessages(MessageType type, int messageQuantity, const std::string moduleName = "");

/**
* @brief deletes a message from a queue
Expand Down
4 changes: 3 additions & 1 deletion src/agent/queue/include/shared.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ class Message
public:
MessageType type;
nlohmann::json data;
Message(MessageType t, nlohmann::json d)
std::string moduleName;
Message(MessageType t, nlohmann::json d, std::string mN = "")
: type(t)
, data(d)
, moduleName(mN)
{
}
};
Expand Down
7 changes: 3 additions & 4 deletions src/agent/queue/include/sqlitestorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
class SQLiteStorage : public Persistence
{
public:

SQLiteStorage(const std::string& dbName, const std::vector<std::string> tableName);

// Delete copy constructor
Expand All @@ -49,8 +48,9 @@ class SQLiteStorage : public Persistence
*
* @param message The JSON message to store.
*/
void Store(const json& message, const std::string& tableName) override;
void Store(const json& message, const std::string& tableName, const std::string& moduleName = "") override;

// TODO: pending tests!
/**
* @brief Retrieve a JSON message by its ID.
*
Expand All @@ -65,7 +65,7 @@ class SQLiteStorage : public Persistence
* @param n The number of messages to retrieve.
* @return A vector of retrieved JSON messages.
*/
json RetrieveMultiple(int n, const std::string& tableName) override;
json RetrieveMultiple(int n, const std::string& tableName, const std::string& moduleName = "") override;

/**
* @brief Remove a JSON message by its ID.
Expand Down Expand Up @@ -127,7 +127,6 @@ class SQLiteStorage : public Persistence

// TODO: should it be atomic?
bool m_db_in_use = false;

};

#endif // SQLITE_STORAGE_H
2 changes: 1 addition & 1 deletion src/agent/queue/src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

int main()
{
MultiTypeQueue queue(10);
MultiTypeQueue queue(100);

return 0;
}
23 changes: 11 additions & 12 deletions src/agent/queue/src/queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,6 @@ bool MultiTypeQueue::timeoutPush(Message message, bool shouldWait)
m_cv.wait_for(
lock, m_timeout, [&, this] { return m_persistenceDest->GetElementCount(sMessageType) < m_maxItems; });
}
// FIXME
// else
// {
// std::cout << "Can failed because os full queue" << std::endl;
// }

auto storedMessages = m_persistenceDest->GetElementCount(sMessageType);
size_t spaceAvailable = (m_maxItems > storedMessages) ? m_maxItems - storedMessages : 0;
Expand All @@ -46,7 +41,7 @@ bool MultiTypeQueue::timeoutPush(Message message, bool shouldWait)
{
for (const auto& singleMessageData : messageData)
{
m_persistenceDest->Store(singleMessageData, sMessageType);
m_persistenceDest->Store(singleMessageData, sMessageType, message.moduleName);
m_cv.notify_all();
}
}
Expand All @@ -57,7 +52,7 @@ bool MultiTypeQueue::timeoutPush(Message message, bool shouldWait)
}
else
{
m_persistenceDest->Store(message.data, m_mapMessageTypeName.at(message.type));
m_persistenceDest->Store(message.data, m_mapMessageTypeName.at(message.type), message.moduleName);
m_cv.notify_all();
}
}
Expand All @@ -77,13 +72,17 @@ void MultiTypeQueue::timeoutPush(std::vector<Message> messages)
}
}

Message MultiTypeQueue::getLastMessage(MessageType type)
Message MultiTypeQueue::getLastMessage(MessageType type, const std::string moduleName)
{
Message result(type, {});
Message result(type, "{}"_json, moduleName);
// std::unique_lock<std::mutex> mapLock(m_mapMutex);
if (m_mapMessageTypeName.contains(type))
{
result.data = m_persistenceDest->RetrieveMultiple(1, m_mapMessageTypeName.at(type));
auto resultData = m_persistenceDest->RetrieveMultiple(1, m_mapMessageTypeName.at(type), moduleName);
if (!resultData.empty())
{
result.data = resultData;
}
}
else
{
Expand All @@ -93,12 +92,12 @@ Message MultiTypeQueue::getLastMessage(MessageType type)
return result;
}

Message MultiTypeQueue::getNMessages(MessageType type, int messageQuantity)
Message MultiTypeQueue::getNMessages(MessageType type, int messageQuantity, const std::string moduleName)
{
Message result(type, {});
if (m_mapMessageTypeName.contains(type))
{
result.data = m_persistenceDest->RetrieveMultiple(messageQuantity, m_mapMessageTypeName.at(type));
result.data = m_persistenceDest->RetrieveMultiple(messageQuantity, m_mapMessageTypeName.at(type), moduleName);
}
else
{
Expand Down
Loading

0 comments on commit 93ec454

Please sign in to comment.