diff --git a/src/agent/CMakeLists.txt b/src/agent/CMakeLists.txt index b7a95cb808..c5743b9454 100644 --- a/src/agent/CMakeLists.txt +++ b/src/agent/CMakeLists.txt @@ -20,6 +20,7 @@ set(SOURCES add_subdirectory(agent_info) add_subdirectory(communicator) add_subdirectory(configuration_parser) +add_subdirectory(agent_queue) find_package(OpenSSL REQUIRED) find_package(Boost REQUIRED COMPONENTS asio beast) diff --git a/src/agent/agent_queue/CMakeLists.txt b/src/agent/agent_queue/CMakeLists.txt new file mode 100644 index 0000000000..6d9280b30b --- /dev/null +++ b/src/agent/agent_queue/CMakeLists.txt @@ -0,0 +1,26 @@ +cmake_minimum_required(VERSION 3.22) + +project(AgentQueue LANGUAGES CXX) +set(CMAKE_CXX_STANDARD 20) +set(CMAKE_CXX_STANDARD_REQUIRED True) +set(CMAKE_BUILD_TYPE RelWithDebInfo) + +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g -Wall -Wextra -Wunused -pthread") + +find_package(SQLiteCpp REQUIRED) +find_package(nlohmann_json REQUIRED) +find_package(fmt REQUIRED) +find_package(Boost REQUIRED COMPONENTS asio) + +add_library(AgentQueue + src/sqlitestorage.cpp + src/agent_queue.cpp +) + +target_include_directories(AgentQueue PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include PRIVATE ${SQLiteCpp_INCLUDE_DIRS}) +target_link_libraries(AgentQueue PUBLIC nlohmann_json::nlohmann_json Boost::asio PRIVATE SQLiteCpp nlohmann_json::nlohmann_json fmt::fmt Boost::asio) + +if(BUILD_TESTS) + enable_testing() + add_subdirectory(tests) +endif() diff --git a/src/agent/agent_queue/include/agent_queue.hpp b/src/agent/agent_queue/include/agent_queue.hpp new file mode 100644 index 0000000000..9ce98d5281 --- /dev/null +++ b/src/agent/agent_queue/include/agent_queue.hpp @@ -0,0 +1,203 @@ +#ifndef QUEUE_H +#define QUEUE_H + +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "shared.hpp" +#include "sqlitestorage.h" + +// TODO: move to a configuration setting +constexpr int DEFAULT_MAX = 10000; +constexpr int DEFAULT_TIMEOUT_S = 3; + +// Factory class +class PersistenceFactory +{ +public: + static std::unique_ptr createPersistence(const std::string& type, const std::vector& args) + { + if (type == "SQLite3") + { + if (args.size() != 2 || !std::any_cast(&args[0]) || + !std::any_cast>(&args[1])) + { + throw std::invalid_argument("SQLite3 requires db name and table names as arguments"); + } + return std::make_unique(std::any_cast(args[0]), + std::any_cast>(args[1])); + } + throw std::runtime_error("Unknown persistence type"); + } +}; + +class MultiTypeQueue +{ +private: + const std::vector m_vMessageTypeStrings {"STATELESS", "STATEFUL", "COMMAND"}; + const std::map m_mapMessageTypeName { + {MessageType::STATELESS, "STATELESS"}, + {MessageType::STATEFUL, "STATEFUL"}, + {MessageType::COMMAND, "COMMAND"}, + }; + const int m_maxItems; + const std::chrono::seconds m_timeout; + std::unique_ptr m_persistenceDest; + std::mutex m_mtx; + std::condition_variable m_cv; + +public: + MultiTypeQueue(int size = DEFAULT_MAX, int timeout = DEFAULT_TIMEOUT_S) + : m_maxItems(size) + , m_timeout(timeout) + { + try + { + m_persistenceDest = PersistenceFactory::createPersistence( + "SQLite3", {static_cast(DEFAULT_DB_PATH), m_vMessageTypeStrings}); + } + catch (const std::exception& e) + { + std::cerr << "Error creating persistence: " << e.what() << '\n'; + } + } + + /** + * @brief Delete copy constructor + */ + MultiTypeQueue(const MultiTypeQueue&) = delete; + + /** + * @brief Delete copy assignment operator + */ + MultiTypeQueue& operator=(const MultiTypeQueue&) = delete; + + /** + * @brief Delete move constructor + */ + MultiTypeQueue(MultiTypeQueue&&) = delete; + + /** + * @brief Delete move assignment operator + */ + MultiTypeQueue& operator=(MultiTypeQueue&&) = delete; + + /** + * @brief Destructor. + */ + ~MultiTypeQueue() {}; + + /** + * @brief pushes a message + * + * @param message to be pushed + * @param shouldWait when true, the function will wait until the message is pushed + * @return int number of messages pushed + */ + int push(Message message, bool shouldWait = false); + + /** + * @brief pushes a message + * + * @param message to be pushed + * @return boost::asio::awaitable number of messages pushed + */ + boost::asio::awaitable pushAwaitable(Message message); + + /** + * @brief pushes a vector of messages + * + * @param messages vector of messages to be pushed + * @return int number of messages pushed + */ + int push(std::vector messages); + + /** + * @brief Get the next Message object + * + * @param type of the queue to be used as source + * @param module module name + * @return Message type object taken from the queue + */ + Message getNext(MessageType type, const std::string module = ""); + + /** + * @brief Get the Next Awaitable object + * + * @param type of the queue to be used as source + * @param moduleName module name + * @param messageQuantity quantity of messages to return + * @return boost::asio::awaitable awaitable object taken from the queue + */ + boost::asio::awaitable + getNextNAwaitable(MessageType type, int messageQuantity, const std::string moduleName = ""); + + /** + * @brief Returns N messages from a queue + * + * @param type Of the queue to be used as source + * @param moduleName module name + * @param messageQuantity quantity of messages to return + * @return Message Json data othe messages fetched + */ + std::vector getNextN(MessageType type, int messageQuantity, const std::string moduleName = ""); + + /** + * @brief deletes a message from a queue + * + * @param type MessageType queue to pop + * @param moduleName + * @return true when popped succesfully + * @return false if it wasn't able to pop message + */ + bool pop(MessageType type, const std::string moduleName = ""); + + /** + * @brief deletes N messages from a queue + * + * @param type MessageType queue to pop + * @param moduleName module name + * @param messageQuantity quantity of messages to pop + * @return Number of messages deleted + */ + int popN(MessageType type, int messageQuantity, const std::string moduleName = ""); + + /** + * @brief Checks emptyness of a queue + * + * @param type MessageType + * @param moduleName module name + * @return true when queue empty + * @return false otherwise + */ + bool isEmpty(MessageType type, const std::string moduleName = ""); + + /** + * @brief Checks fullness of a queue + * + * @param type MessageType + * @param moduleName module name + * @return true when queue is full + * @return false otherwise + */ + bool isFull(MessageType type, const std::string moduleName = ""); + + /** + * @brief Get the Items By Type object + * + * @param type MessageType + * @param moduleName module name + * @return int number of items in the queue. + */ + int storedItems(MessageType type, const std::string moduleName = ""); +}; + +#endif // QUEUE_H diff --git a/src/agent/agent_queue/include/persistence.h b/src/agent/agent_queue/include/persistence.h new file mode 100644 index 0000000000..7f859403f6 --- /dev/null +++ b/src/agent/agent_queue/include/persistence.h @@ -0,0 +1,83 @@ +#ifndef PERSISTENCE_H +#define PERSISTENCE_H + +#include +#include +#include + +using json = nlohmann::json; + +/** + * @brief Interface for persistence storage. + * + * This interface defines methods for storing, retrieving, and removing JSON messages. + */ +class Persistence +{ +public: + /** + * @brief Virtual destructor. + */ + virtual ~Persistence() = default; + + /** + * @brief Store a JSON message in the specified queue. + * + * @param message The JSON message to be stored. + * @param queueName The name of the queue. + * @param moduleName The name of the module. + * @return int The number of messages stored. + */ + virtual int Store(const json& message, const std::string& queueName, const std::string& moduleName = "") = 0; + + /** + * @brief Retrieve a JSON message from the specified queue. + * + * @param id rowid of the message to be retrieved. + * @param queueName The name of the queue. + * @param moduleName The name of the module. + * @return json The retrieved JSON message. + */ + virtual json Retrieve(int id, const std::string& queueName, const std::string& moduleName = "") = 0; + + /** + * @brief Retrieve multiple JSON messages from the specified queue. + * + * @param n number of messages to be retrieved. + * @param queueName The name of the queue. + * @param moduleName The name of the module. + * @return json The retrieved JSON messages. + */ + virtual json RetrieveMultiple(int n, const std::string& queueName, const std::string& moduleName = "") = 0; + + /** + * @brief Remove a JSON message from the specified queue. + * + * @param id number of messages to be removed. + * @param queueName The name of the queue. + * @param moduleName The name of the module. + * @return int The number of messages removed. + */ + virtual int Remove(int id, const std::string& queueName, const std::string& moduleName = "") = 0; + + /** + * @brief Remove multiple JSON messages from the specified queue. + * + * @param n number of messages to be removed. + * @param queueName The name of the queue. + * @param moduleName The name of the module. + * @return int The number of messages removed. + */ + virtual int RemoveMultiple(int n, const std::string& queueName, const std::string& moduleName = "") = 0; + + /** + * @brief Get the quantity of elements stored in the specified queue. + * + * @param queueName The name of the queue. + * @param moduleName The name of the module. + * @return int The quantity of elements stored in the specified queue. + */ + virtual int GetElementCount(const std::string& queueName, const std::string& moduleName = "") = 0; +}; + +#endif // PERSISTENCE_H diff --git a/src/agent/agent_queue/include/shared.hpp b/src/agent/agent_queue/include/shared.hpp new file mode 100644 index 0000000000..c2b07d3022 --- /dev/null +++ b/src/agent/agent_queue/include/shared.hpp @@ -0,0 +1,41 @@ +#ifndef SHARED_H +#define SHARED_H + +#include + +#include + +// TODO: should be moved to Config +constexpr char DEFAULT_FILE_PATH[] = "/home/vagrant/FILE_"; +constexpr char DEFAULT_DB_PATH[] = "queue.db"; + +/** + * @brief Types of messages enum + * + */ +enum MessageType +{ + STATELESS, + STATEFUL, + COMMAND +}; + +/** + * @brief Wrapper for Message, contains the message type, the json data and the module name. + * + */ +class Message +{ +public: + MessageType type; + nlohmann::json data; + std::string moduleName; + Message(MessageType t, nlohmann::json d, std::string mN = "") + : type(t) + , data(d) + , moduleName(mN) + { + } +}; + +#endif // SHARED_H diff --git a/src/agent/agent_queue/include/sqlitestorage.h b/src/agent/agent_queue/include/sqlitestorage.h new file mode 100644 index 0000000000..aa1bda09c6 --- /dev/null +++ b/src/agent/agent_queue/include/sqlitestorage.h @@ -0,0 +1,163 @@ +#ifndef SQLITE_STORAGE_H +#define SQLITE_STORAGE_H + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include "persistence.h" + +/** + * @brief SQLite implementation of the Persistence interface. + * + * This class provides methods to store, retrieve, and remove JSON messages + * in a SQLite database. + */ +class SQLiteStorage : public Persistence +{ +public: + SQLiteStorage(const std::string& dbName, const std::vector tableName); + + /** + * @brief Delete copy constructor + */ + SQLiteStorage(const SQLiteStorage&) = delete; + + /** + * @brief Delete copy assignment operator + */ + SQLiteStorage& operator=(const SQLiteStorage&) = delete; + + /** + * @brief Delete move constructor + */ + SQLiteStorage(SQLiteStorage&&) = delete; + + /** + * @brief Delete move assignment operator + */ + SQLiteStorage& operator=(SQLiteStorage&&) = delete; + + /** + * @brief Destructor. + */ + ~SQLiteStorage() override; + + /** + * @brief Store a JSON message in the storage. + * + * @param message The JSON message to store. + * @param tableName The name of the table to store the message in. + * @param moduleName The name of the module that created the message. + * @return The number of stored elements. + */ + int Store(const json& message, const std::string& tableName, const std::string& moduleName = "") override; + + /** + * @brief Retrieve a JSON message by its ID. + * + * @param id The ID of the message to retrieve. + * @param tableName The name of the table to retrieve the message from. + * @param moduleName The name of the module that created the message. + * @return The retrieved JSON message. + */ + json Retrieve(int id, const std::string& tableName, const std::string& moduleName = "") override; + + /** + * @brief Retrieve multiple JSON messages. + * + * @param n The number of messages to retrieve. + * @param tableName The name of the table to retrieve the message from. + * @param moduleName The name of the module that created the message. + * @return A vector of retrieved JSON messages. + */ + json RetrieveMultiple(int n, const std::string& tableName, const std::string& moduleName = "") override; + + /** + * @brief Remove a JSON message by its ID. + * + * @param id The number the message to remove. + * @param tableName The name of the table to remove the message from. + * @param moduleName The name of the module that created the message. + * @return The number of removed elements. + */ + int Remove(int id, const std::string& tableName, const std::string& moduleName = "") override; + + /** + * @brief Remove multiple JSON messages. + * + * @param n The number of messages to remove. + * @param tableName The name of the table to remove the message from. + * @param moduleName The name of the module that created the message. + * @return The number of removed elements. + */ + int RemoveMultiple(int n, const std::string& tableName, const std::string& moduleName = "") override; + + /** + * @brief Get the number of elements in the table. + * @param tableName The name of the table to retrieve the message from. + * @param moduleName The name of the module that created the message. + * @return The number of elements in the table. + */ + int GetElementCount(const std::string& tableName, const std::string& moduleName = "") override; + +private: + /** + * @brief Initialize the table in the SQLite database. + * This method creates the table if it does not already exist. + * @param tableName The name of the table to initialize. + */ + void InitializeTable(const std::string& tableName); + + /** + * @brief Private method for waiting for database access. + * + */ + void waitForDatabaseAccess(); + + /** + * @brief Private method for releasing database access. + * + */ + void releaseDatabaseAccess(); + + /** + * @brief The name of the SQLite database file. + */ + const std::string m_dbName; + + /** + * @brief The name of the table to use for storing messages. + */ + const std::string m_tableName; + + /** + * @brief Pointer to the SQLite database connection. + */ + std::unique_ptr m_db; + + /** + * @brief Mutex to ensure thread-safe operations. + */ + std::mutex m_mutex; + + /** + * @brief condition variable to wait for database access. + */ + std::condition_variable m_cv; + + /** + * @brief flag for notifying the use of the db. + */ + bool m_dbInUse = false; +}; + +#endif // SQLITE_STORAGE_H diff --git a/src/agent/agent_queue/src/agent_queue.cpp b/src/agent/agent_queue/src/agent_queue.cpp new file mode 100644 index 0000000000..791a4373f8 --- /dev/null +++ b/src/agent/agent_queue/src/agent_queue.cpp @@ -0,0 +1,262 @@ +#include +#include +#include +#include + +#include "agent_queue.hpp" + +int MultiTypeQueue::push(Message message, bool shouldWait) +{ + int result = 0; + + if (m_mapMessageTypeName.contains(message.type)) + { + auto sMessageType = m_mapMessageTypeName.at(message.type); + + // Wait until the queue is not full + if (shouldWait) + { + std::unique_lock lock(m_mtx); + m_cv.wait_for( + lock, m_timeout, [&, this] { return m_persistenceDest->GetElementCount(sMessageType) < m_maxItems; }); + } + + auto storedMessages = m_persistenceDest->GetElementCount(sMessageType); + size_t spaceAvailable = (m_maxItems > storedMessages) ? m_maxItems - storedMessages : 0; + if (spaceAvailable) + { + auto messageData = message.data; + if (messageData.is_array()) + { + if (messageData.size() <= spaceAvailable) + { + for (const auto& singleMessageData : messageData) + { + result += m_persistenceDest->Store(singleMessageData, sMessageType, message.moduleName); + m_cv.notify_all(); + } + } + } + else + { + result = + m_persistenceDest->Store(message.data, m_mapMessageTypeName.at(message.type), message.moduleName); + m_cv.notify_all(); + } + } + } + else + { + std::cout << "error didn't find the queue" << std::endl; + } + return result; +} + +boost::asio::awaitable MultiTypeQueue::pushAwaitable(Message message) +{ + int result = 0; + boost::asio::steady_timer timer(co_await boost::asio::this_coro::executor); + + if (m_mapMessageTypeName.contains(message.type)) + { + auto sMessageType = m_mapMessageTypeName.at(message.type); + + while (m_persistenceDest->GetElementCount(sMessageType) >= m_maxItems) + { + timer.expires_after(std::chrono::milliseconds(100)); + co_await timer.async_wait(boost::asio::use_awaitable); + } + + auto storedMessages = m_persistenceDest->GetElementCount(sMessageType); + size_t spaceAvailable = (m_maxItems > storedMessages) ? m_maxItems - storedMessages : 0; + if (spaceAvailable) + { + auto messageData = message.data; + if (messageData.is_array()) + { + if (messageData.size() <= spaceAvailable) + { + for (const auto& singleMessageData : messageData) + { + result += m_persistenceDest->Store(singleMessageData, sMessageType, message.moduleName); + m_cv.notify_all(); + } + } + } + else + { + result = + m_persistenceDest->Store(message.data, m_mapMessageTypeName.at(message.type), message.moduleName); + m_cv.notify_all(); + } + } + } + else + { + std::cout << "error didn't find the queue" << std::endl; + } + co_return result; +} + +int MultiTypeQueue::push(std::vector messages) +{ + int result = 0; + for (const auto& singleMessage : messages) + { + result += push(singleMessage); + } + return result; +} + +Message MultiTypeQueue::getNext(MessageType type, const std::string moduleName) +{ + Message result(type, "{}"_json, moduleName); + if (m_mapMessageTypeName.contains(type)) + { + auto resultData = m_persistenceDest->RetrieveMultiple(1, m_mapMessageTypeName.at(type), moduleName); + if (!resultData.empty()) + { + result.data = resultData; + if (moduleName.empty()) + { + result.moduleName = result.data.at(0).at("module"); + } + } + } + else + { + // TODO: error handling and logging + std::cout << "error didn't find the queue" << std::endl; + } + return result; +} + +boost::asio::awaitable +MultiTypeQueue::getNextNAwaitable(MessageType type, int messageQuantity, const std::string moduleName) +{ + boost::asio::steady_timer timer(co_await boost::asio::this_coro::executor); + + Message result(type, "{}"_json, moduleName); + if (m_mapMessageTypeName.contains(type)) + { + while (isEmpty(type)) + { + timer.expires_after(std::chrono::milliseconds(100)); + co_await timer.async_wait(boost::asio::use_awaitable); + } + + auto resultData = + m_persistenceDest->RetrieveMultiple(messageQuantity, m_mapMessageTypeName.at(type), moduleName); + if (!resultData.empty()) + { + result.data = resultData; + if (moduleName.empty()) + { + result.moduleName = result.data.at(0).at("module"); + } + } + } + else + { + // TODO: error handling and logging + std::cout << "error didn't find the queue" << std::endl; + } + co_return result; +} + +std::vector MultiTypeQueue::getNextN(MessageType type, int messageQuantity, const std::string moduleName) +{ + std::vector result; + if (m_mapMessageTypeName.contains(type)) + { + auto arrayData = + m_persistenceDest->RetrieveMultiple(messageQuantity, m_mapMessageTypeName.at(type), moduleName); + for (auto singleJson : arrayData) + { + auto finalModuleName = moduleName; + if (moduleName.empty()) + { + finalModuleName = singleJson["module"]; + } + result.push_back(Message(type, singleJson, finalModuleName)); + } + } + else + { + // TODO: error handling and logging + std::cout << "error didn't find the queue" << std::endl; + } + return result; +} + +bool MultiTypeQueue::pop(MessageType type, const std::string moduleName) +{ + bool result = false; + if (m_mapMessageTypeName.contains(type)) + { + result = m_persistenceDest->RemoveMultiple(1, m_mapMessageTypeName.at(type), moduleName); + } + else + { + // TODO: error handling and logging + std::cout << "error didn't find the queue" << std::endl; + } + return result; +} + +int MultiTypeQueue::popN(MessageType type, int messageQuantity, const std::string moduleName) +{ + int result = 0; + if (m_mapMessageTypeName.contains(type)) + { + result = m_persistenceDest->RemoveMultiple(messageQuantity, m_mapMessageTypeName.at(type), moduleName); + } + else + { + // TODO: error handling and logging + std::cout << "error didn't find the queue" << std::endl; + } + return result; +} + +bool MultiTypeQueue::isEmpty(MessageType type, const std::string moduleName) +{ + if (m_mapMessageTypeName.contains(type)) + { + return m_persistenceDest->GetElementCount(m_mapMessageTypeName.at(type), moduleName) == 0; + } + else + { + // TODO: error handling and logging + std::cout << "error didn't find the queue" << std::endl; + } + return false; +} + +bool MultiTypeQueue::isFull(MessageType type, const std::string moduleName) +{ + if (m_mapMessageTypeName.contains(type)) + { + return m_persistenceDest->GetElementCount(m_mapMessageTypeName.at(type), moduleName) == m_maxItems; + } + else + { + // TODO: error handling and logging + std::cout << "error didn't find the queue" << std::endl; + } + return false; +} + +int MultiTypeQueue::storedItems(MessageType type, const std::string moduleName) +{ + if (m_mapMessageTypeName.contains(type)) + { + return m_persistenceDest->GetElementCount(m_mapMessageTypeName.at(type), moduleName); + } + else + { + // TODO: error handling and logging + std::cout << "error didn't find the queue" << std::endl; + } + return false; +} diff --git a/src/agent/agent_queue/src/sqlitestorage.cpp b/src/agent/agent_queue/src/sqlitestorage.cpp new file mode 100644 index 0000000000..357c0e52f7 --- /dev/null +++ b/src/agent/agent_queue/src/sqlitestorage.cpp @@ -0,0 +1,310 @@ +#include "sqlitestorage.h" +#include +#include +#include + +SQLiteStorage::SQLiteStorage(const std::string& dbName, const std::vector tableNames) + : m_dbName(dbName) + , m_db(make_unique(dbName, SQLite::OPEN_READWRITE | SQLite::OPEN_CREATE)) +{ + try + { + // Open the database in WAL mode + m_db->exec("PRAGMA journal_mode=WAL;"); + for (auto table : tableNames) + { + InitializeTable(table); + } + } + catch (const std::exception& e) + { + std::cerr << "Error initializing database: " << e.what() << std::endl; + throw; + } +} + +SQLiteStorage::~SQLiteStorage() {} + +void SQLiteStorage::InitializeTable(const std::string& tableName) +{ + // TODO: all queries should be in the same place. + constexpr std::string_view CREATE_TABLE_QUERY { + "CREATE TABLE IF NOT EXISTS {} (module TEXT, message TEXT NOT NULL);"}; + auto createTableQuery = fmt::format(CREATE_TABLE_QUERY, tableName); + std::lock_guard lock(m_mutex); + try + { + m_db->exec(createTableQuery); + } + catch (const std::exception& e) + { + std::cerr << "Error initializing table: " << e.what() << std::endl; + throw; + } +} +void SQLiteStorage::waitForDatabaseAccess() +{ + std::unique_lock lock(m_mutex); + m_cv.wait(lock, [this] { return !m_dbInUse; }); + m_dbInUse = true; +} + +void SQLiteStorage::releaseDatabaseAccess() +{ + std::lock_guard lock(m_mutex); + m_dbInUse = false; + m_cv.notify_one(); +} + +int SQLiteStorage::Store(const json& message, const std::string& tableName, const std::string& moduleName) +{ + constexpr std::string_view INSERT_QUERY {"INSERT INTO {} (module, message) VALUES (\"{}\", ?);"}; + std::string insertQuery = fmt::format(INSERT_QUERY, tableName, moduleName); + int result = 0; + + waitForDatabaseAccess(); + SQLite::Statement query = SQLite::Statement(*m_db, insertQuery); + + if (message.is_array()) + { + SQLite::Transaction transaction(*m_db); + for (const auto& singleMessageData : message) + { + try + { + query.bind(1, singleMessageData.dump()); + result += query.exec(); + } + catch (const SQLite::Exception& e) + { + std::cerr << "Error SqliteStorage Store: " << e.what() << '\n'; + break; + } + // Reset the query to reuse it for the next message + query.reset(); + } + transaction.commit(); + } + else + { + SQLite::Transaction transaction(*m_db); + query.bind(1, message.dump()); + result = query.exec(); + transaction.commit(); + } + releaseDatabaseAccess(); + + return result; +} + +// TODO: we shouldn't use rowid outside the table itself +json SQLiteStorage::Retrieve(int id, const std::string& tableName, const std::string& moduleName) +{ + + std::string selectQuery; + if (moduleName.empty()) + { + constexpr std::string_view SELECT_QUERY {"SELECT module, message FROM {} WHERE rowid = ?;"}; + selectQuery = fmt::format(SELECT_QUERY, tableName); + } + else + { + constexpr std::string_view SELECT_QUERY { + "SELECT module, message FROM {} WHERE module LIKE \"{}\" AND rowid = ?;"}; + selectQuery = fmt::format(SELECT_QUERY, tableName, moduleName); + } + + try + { + SQLite::Statement query(*m_db, selectQuery); + query.bind(1, id); + json outputJson = {{"module", ""}, {"data", {}}}; + if (query.executeStep()) + { + std::string dataString; + std::string moduleString; + + if (query.getColumnCount() == 2 && query.getColumn(1).getType() == SQLite::TEXT && + query.getColumn(0).getType() == SQLite::TEXT) + { + moduleString = query.getColumn(0).getString(); + dataString = query.getColumn(1).getString(); + + if (!dataString.empty()) + { + outputJson["data"] = json::parse(dataString); + } + + if (!moduleString.empty()) + { + outputJson["module"] = moduleString; + } + } + } + + return outputJson; + } + catch (const SQLite::Exception& e) + { + std::cerr << "Error SQLiteStorage retrieve: " << e.what() << std::endl; + return {}; + } +} + +json SQLiteStorage::RetrieveMultiple(int n, const std::string& tableName, const std::string& moduleName) +{ + std::string selectQuery; + if (moduleName.empty()) + { + constexpr std::string_view SELECT_MULTIPLE_QUERY {"SELECT module, message FROM {} ORDER BY rowid ASC LIMIT ?;"}; + selectQuery = fmt::format(SELECT_MULTIPLE_QUERY, tableName); + } + else + { + constexpr std::string_view SELECT_MULTIPLE_QUERY { + "SELECT module, message FROM {} WHERE module LIKE \"{}\" ORDER BY rowid ASC LIMIT ?;"}; + selectQuery = fmt::format(SELECT_MULTIPLE_QUERY, tableName, moduleName); + } + + try + { + SQLite::Statement query(*m_db, selectQuery); + query.bind(1, n); + json messages = json::array(); + while (query.executeStep()) + { + // getting data json + std::string dataString; + std::string moduleString; + + if (query.getColumnCount() == 2 && query.getColumn(1).getType() == SQLite::TEXT && + query.getColumn(0).getType() == SQLite::TEXT) + { + moduleString = query.getColumn(0).getString(); + dataString = query.getColumn(1).getString(); + + json outputJson = {{"module", ""}, {"data", {}}}; + + if (!dataString.empty()) + { + outputJson["data"] = json::parse(dataString); + } + + if (!moduleString.empty()) + { + outputJson["module"] = moduleString; + } + + messages.push_back(outputJson); + } + } + + return messages; + } + catch (const SQLite::Exception& e) + { + std::cerr << "Error SQLiteStorage retrieve multiple: " << e.what() << std::endl; + return {}; + } +} + +int SQLiteStorage::Remove(int id, const std::string& tableName, const std::string& moduleName) +{ + std::string deleteQuery; + if (moduleName.empty()) + { + constexpr std::string_view DELETE_QUERY {"DELETE FROM {} WHERE rowid = ?;"}; + deleteQuery = fmt::format(DELETE_QUERY, tableName); + } + else + { + constexpr std::string_view DELETE_QUERY {"DELETE FROM {} WHERE module LIKE \"{}\" AND rowid = ?;"}; + deleteQuery = fmt::format(DELETE_QUERY, tableName, moduleName); + } + + try + { + SQLite::Statement query(*m_db, deleteQuery); + query.bind(1, id); + SQLite::Transaction transaction(*m_db); + query.exec(); + transaction.commit(); + return 1; + } + catch (const SQLite::Exception& e) + { + std::cerr << "Error SQLiteStorage remove: " << e.what() << std::endl; + return {}; + } +} + +int SQLiteStorage::RemoveMultiple(int n, const std::string& tableName, const std::string& moduleName) +{ + std::string deleteQuery; + int rowsModified = 0; + if (moduleName.empty()) + { + constexpr std::string_view DELETE_MULTIPLE_QUERY { + "DELETE FROM {} WHERE rowid IN (SELECT rowid FROM {} ORDER BY rowid ASC LIMIT ?);"}; + deleteQuery = fmt::format(DELETE_MULTIPLE_QUERY, tableName, tableName); + } + else + { + constexpr std::string_view DELETE_MULTIPLE_QUERY { + "DELETE FROM {} WHERE module LIKE \"{}\" AND rowid IN (SELECT rowid FROM {} WHERE module LIKE \"{}\" ORDER " + "BY rowid ASC LIMIT ?);"}; + deleteQuery = fmt::format(DELETE_MULTIPLE_QUERY, tableName, moduleName, tableName, moduleName); + } + + try + { + waitForDatabaseAccess(); + SQLite::Statement query(*m_db, deleteQuery); + SQLite::Transaction transaction(*m_db); + query.bind(1, n); + rowsModified = query.exec(); + transaction.commit(); + releaseDatabaseAccess(); + return rowsModified; + } + catch (const SQLite::Exception& e) + { + std::cerr << "Error SQLiteStorage remove multiple: " << e.what() << std::endl; + return rowsModified; + } +} + +int SQLiteStorage::GetElementCount(const std::string& tableName, const std::string& moduleName) +{ + std::string countQuery; + if (moduleName.empty()) + { + constexpr std::string_view COUNT_QUERY {"SELECT COUNT(*) FROM {}"}; + countQuery = fmt::format(COUNT_QUERY, tableName); + } + else + { + constexpr std::string_view COUNT_QUERY {"SELECT COUNT(*) FROM {} WHERE module LIKE \"{}\""}; + countQuery = fmt::format(COUNT_QUERY, tableName, moduleName); + } + + try + { + SQLite::Statement query(*m_db, countQuery); + int count = 0; + if (query.executeStep()) + { + count = query.getColumn(0).getInt(); + } + else + { + std::cerr << "Error SQLiteStorage get element count." << std::endl; + } + return count; + } + catch (const SQLite::Exception& e) + { + std::cerr << "Error SQLiteStorage get element count: " << e.what() << std::endl; + return {}; + } +} diff --git a/src/agent/agent_queue/tests/CMakeLists.txt b/src/agent/agent_queue/tests/CMakeLists.txt new file mode 100644 index 0000000000..3da2988d63 --- /dev/null +++ b/src/agent/agent_queue/tests/CMakeLists.txt @@ -0,0 +1,26 @@ +find_package(GTest REQUIRED) + +# AgentQueue tests +add_executable(test_AgentQueue queue_test.cpp) + +target_link_libraries(test_AgentQueue PUBLIC + AgentQueue + GTest::gtest + GTest::gtest_main + GTest::gmock + GTest::gmock_main) +add_test(NAME AgentQueueTest COMMAND test_AgentQueue) + + +# SQLiteStorage tests +add_executable(test_sqlitestorage sqlitestorage_test.cpp) + +target_link_libraries(test_sqlitestorage + AgentQueue + SQLiteCpp + GTest::gtest + GTest::gtest_main + GTest::gmock + GTest::gmock_main) +add_test(NAME SqliteStorageTest COMMAND test_sqlitestorage) + diff --git a/src/agent/agent_queue/tests/main.cpp b/src/agent/agent_queue/tests/main.cpp new file mode 100644 index 0000000000..9bb465e024 --- /dev/null +++ b/src/agent/agent_queue/tests/main.cpp @@ -0,0 +1,7 @@ +#include + +int main(int argc, char** argv) +{ + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/src/agent/agent_queue/tests/queue_test.cpp b/src/agent/agent_queue/tests/queue_test.cpp new file mode 100644 index 0000000000..364eb15298 --- /dev/null +++ b/src/agent/agent_queue/tests/queue_test.cpp @@ -0,0 +1,588 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "agent_queue.hpp" +#include "queue_test.hpp" + +using json = nlohmann::json; + +#define BIG_QUEUE_CAPACITY 10 +#define SMALL_QUEUE_CAPACITY 2 + +const json baseDataContent = R"({{"data": "for STATELESS_0"}})"; +const json multipleDataContent = {"content 1", "content 2", "content 3"}; + +/// Helper functions + +// Unescape Strings +std::string unescape_string(const std::string& str) +{ + std::string result; + result.reserve(str.length()); + + for (size_t i = 0; i < str.length(); ++i) + { + if (str[i] == '\\' && i + 1 < str.length()) + { + switch (str[i + 1]) + { + case '\\': result += '\\'; break; + case '\"': result += '\"'; break; + case '/': result += '/'; break; + case 'b': result += '\b'; break; + case 'f': result += '\f'; break; + case 'n': result += '\n'; break; + case 'r': result += '\r'; break; + case 't': result += '\t'; break; + default: result += str[i + 1]; + } + ++i; + } + else + { + result += str[i]; + } + } + + return result; +} + +void cleanPersistence() +{ + std::string filePath = DEFAULT_DB_PATH; + for (const auto& entry : std::filesystem::directory_iterator(".")) + { + std::string fileFullPath = entry.path(); + size_t found = fileFullPath.find(filePath); + if (found != std::string::npos) + { + std::error_code ec; + std::filesystem::remove(fileFullPath, ec); + } + } +} + +/// Test Methods + +void QueueTest::SetUp() +{ + cleanPersistence(); +}; + +void QueueTest::TearDown() {}; + +/// TESTS + +// JSON Basic methods. Move or delete if JSON Wrapper is done +TEST_F(JsonTest, JSONConversionComparisson) +{ + json uj1 = {{"version", 1}, {"type", "integer"}}; + // From string. If not unescape then it throws errors + json uj2 = json::parse(unescape_string(R"({\"type\":\"integer\",\"version\":1})")); + + nlohmann::ordered_json oj1 = {{"version", 1}, {"type", "integer"}}; + nlohmann::ordered_json oj2 = {{"type", "integer"}, {"version", 1}}; + EXPECT_FALSE(oj1 == oj2); + + auto versionUj1 = uj1["version"].template get(); + auto versionUj2 = uj2["version"].template get(); + EXPECT_EQ(versionUj1, versionUj2); + + auto versionUj12 = uj1.at("version"); + auto versionUj22 = uj2.at("version"); + EXPECT_EQ(versionUj12, versionUj22); + + auto typeUj1 = uj1["type"].template get(); + auto typeUj2 = uj2["type"].template get(); + EXPECT_EQ(typeUj1, typeUj2); + + auto typeUj12 = uj1.at("type"); + auto typeUj22 = uj2.at("type"); + EXPECT_EQ(typeUj12, typeUj22); +} + +TEST_F(JsonTest, JSONArrays) +{ + // create JSON values + json j_object = {{"one", 1}, {"two", 2}, {"three", 3}}; + json j_array = {1, 2, 4, 8, 16}; + // TODO: test string: const std::string multipleDataContent = R"({"data": {"content 1", "content 2", "content 3"})"; + + // call is_array() + EXPECT_FALSE(j_object.is_array()); + EXPECT_TRUE(j_array.is_array()); + EXPECT_EQ(5, j_array.size()); + EXPECT_TRUE(multipleDataContent.is_array()); + + int i = 0; + for (auto& singleMessage : multipleDataContent.items()) + { + EXPECT_EQ(singleMessage.value(), "content " + std::to_string(++i)); + } +} + +// Push, get and check the queue is not empty +TEST_F(QueueTest, SinglePushGetNotEmpty) +{ + MultiTypeQueue queue(BIG_QUEUE_CAPACITY); + const MessageType messageType {MessageType::STATELESS}; + const Message messageToSend {messageType, baseDataContent}; + + EXPECT_EQ(queue.push(messageToSend), 1); + auto messageResponse = queue.getNext(MessageType::STATELESS); + + auto typeSend = messageToSend.type; + auto typeReceived = messageResponse.type; + EXPECT_TRUE(typeSend == typeReceived); + + auto dataResponse = messageResponse.data.at(0).at("data"); + EXPECT_EQ(dataResponse, baseDataContent); + + EXPECT_FALSE(queue.isEmpty(MessageType::STATELESS)); +} + +// push and pop on a non-full queue +TEST_F(QueueTest, SinglePushPopEmpty) +{ + MultiTypeQueue queue(BIG_QUEUE_CAPACITY); + const MessageType messageType {MessageType::STATELESS}; + const Message messageToSend {messageType, baseDataContent}; + + EXPECT_EQ(queue.push(messageToSend), 1); + auto messageResponse = queue.getNext(MessageType::STATELESS); + auto dataResponse = messageResponse.data.at(0).at("data"); + EXPECT_EQ(dataResponse, baseDataContent); + EXPECT_EQ(messageType, messageResponse.type); + + auto messageResponseStateFul = queue.getNext(MessageType::STATEFUL); + // TODO: this behavior can be change to return an empty message (type and module empty) + EXPECT_EQ(messageResponseStateFul.type, MessageType::STATEFUL); + EXPECT_EQ(messageResponseStateFul.data, "{}"_json); + + queue.pop(MessageType::STATELESS); + EXPECT_TRUE(queue.isEmpty(MessageType::STATELESS)); + + queue.pop(MessageType::STATELESS); + EXPECT_TRUE(queue.isEmpty(MessageType::STATELESS)); +} + +TEST_F(QueueTest, SinglePushGetWithModule) +{ + MultiTypeQueue queue(BIG_QUEUE_CAPACITY); + const MessageType messageType {MessageType::STATELESS}; + const std::string moduleFakeName = "fake-module"; + const std::string moduleName = "module"; + const Message messageToSend {messageType, baseDataContent, moduleName}; + + EXPECT_EQ(queue.push(messageToSend), 1); + auto messageResponseWrongModule = queue.getNext(MessageType::STATELESS, moduleFakeName); + + auto typeSend = messageToSend.type; + auto typeReceived = messageResponseWrongModule.type; + EXPECT_TRUE(typeSend == typeReceived); + + EXPECT_EQ(messageResponseWrongModule.moduleName, moduleFakeName); + EXPECT_EQ(messageResponseWrongModule.data, "{}"_json); + + auto messageResponseCorrectModule = queue.getNext(MessageType::STATELESS, moduleName); + + auto dataResponse = messageResponseCorrectModule.data.at(0).at("data"); + EXPECT_EQ(dataResponse, baseDataContent); + + EXPECT_EQ(moduleName, messageResponseCorrectModule.moduleName); +} + +// Push, get and check while the queue is full +TEST_F(QueueTest, SinglePushPopFullWithTimeout) +{ + MultiTypeQueue queue(SMALL_QUEUE_CAPACITY); + + // complete the queue with messages + const MessageType messageType {MessageType::COMMAND}; + for (int i : {1, 2}) + { + const json dataContent = R"({"Data" : "for COMMAND)" + std::to_string(i) + R"("})"; + EXPECT_EQ(queue.push({messageType, dataContent}), 1); + } + + const json dataContent = R"({"Data" : "for COMMAND3"})"; + Message exampleMessage {messageType, dataContent}; + EXPECT_EQ(queue.push({messageType, dataContent}, true), 0); + + auto items = queue.storedItems(MessageType::COMMAND); + EXPECT_EQ(items, SMALL_QUEUE_CAPACITY); + EXPECT_TRUE(queue.isFull(MessageType::COMMAND)); + EXPECT_TRUE(queue.isEmpty(MessageType::STATELESS)); + + queue.pop(MessageType::COMMAND); + items = queue.storedItems(MessageType::COMMAND); + EXPECT_NE(items, SMALL_QUEUE_CAPACITY); +} + +// Accesing different types of queues from several threads +TEST_F(QueueTest, MultithreadDifferentType) +{ + MultiTypeQueue queue(BIG_QUEUE_CAPACITY); + + auto consumerStateLess = [&](int& count) + { + for (int i = 0; i < count; ++i) + { + queue.pop(MessageType::STATELESS); + } + }; + + auto consumerStateFull = [&](int& count) + { + for (int i = 0; i < count; ++i) + { + queue.pop(MessageType::STATEFUL); + } + }; + + auto messageProducer = [&](int& count) + { + for (int i = 0; i < count; ++i) + { + const json dataContent = R"({{"Data", "Number )" + std::to_string(i) + R"("}})"; + EXPECT_EQ(queue.push(Message(MessageType::STATELESS, dataContent)), 1); + EXPECT_EQ(queue.push(Message(MessageType::STATEFUL, dataContent)), 1); + } + }; + + int itemsToInsert = 10; + int itemsToConsume = 5; + + messageProducer(itemsToInsert); + + std::thread consumerThread1(consumerStateLess, std::ref(itemsToConsume)); + std::thread consumerThread2(consumerStateFull, std::ref(itemsToConsume)); + + if (consumerThread1.joinable()) + { + consumerThread1.join(); + } + + if (consumerThread2.joinable()) + { + consumerThread2.join(); + } + + EXPECT_EQ(5, queue.storedItems(MessageType::STATELESS)); + EXPECT_EQ(5, queue.storedItems(MessageType::STATEFUL)); + + // Consume the rest of the messages + std::thread consumerThread12(consumerStateLess, std::ref(itemsToConsume)); + std::thread consumerThread22(consumerStateFull, std::ref(itemsToConsume)); + + if (consumerThread12.joinable()) + { + consumerThread12.join(); + } + + if (consumerThread22.joinable()) + { + consumerThread22.join(); + } + + EXPECT_TRUE(queue.isEmpty(MessageType::STATELESS)); + EXPECT_TRUE(queue.isEmpty(MessageType::STATEFUL)); +} + +// Accesing same queue from 2 different threads +TEST_F(QueueTest, MultithreadSameType) +{ + MultiTypeQueue queue(BIG_QUEUE_CAPACITY); + auto messageType = MessageType::COMMAND; + + auto consumerCommand1 = [&](int& count) + { + for (int i = 0; i < count; ++i) + { + queue.pop(messageType); + } + }; + + auto consumerCommand2 = [&](int& count) + { + for (int i = 0; i < count; ++i) + { + queue.pop(messageType); + } + }; + + auto messageProducer = [&](int& count) + { + for (int i = 0; i < count; ++i) + { + const json dataContent = R"({{"Data": "for COMMAND)" + std::to_string(i) + R"("}})"; + EXPECT_EQ(queue.push(Message(messageType, dataContent)), 1); + } + }; + + int itemsToInsert = 10; + int itemsToConsume = 5; + + messageProducer(itemsToInsert); + + EXPECT_EQ(itemsToInsert, queue.storedItems(messageType)); + + std::thread consumerThread1(consumerCommand1, std::ref(itemsToConsume)); + std::thread messageProducerThread1(consumerCommand2, std::ref(itemsToConsume)); + + if (messageProducerThread1.joinable()) + { + messageProducerThread1.join(); + } + + if (consumerThread1.joinable()) + { + consumerThread1.join(); + } + + EXPECT_TRUE(queue.isEmpty(messageType)); +} + +// Push Multiple with single message and data array, +// several gets, checks and pops +TEST_F(QueueTest, PushMultipleSeveralSingleGets) +{ + MultiTypeQueue queue(BIG_QUEUE_CAPACITY); + const MessageType messageType {MessageType::STATELESS}; + const Message messageToSend {messageType, multipleDataContent}; + + EXPECT_EQ(3, queue.push(messageToSend)); + + for (int i : {0, 1, 2}) + { + auto messageResponse = queue.getNext(MessageType::STATELESS); + auto responseData = messageResponse.data.at(0).at("data"); + auto sentData = messageToSend.data[i].template get(); + EXPECT_EQ(responseData, sentData); + queue.pop(MessageType::STATELESS); + } + + EXPECT_EQ(queue.storedItems(MessageType::STATELESS), 0); +} + +TEST_F(QueueTest, PushMultipleWithMessageVector) +{ + MultiTypeQueue queue(BIG_QUEUE_CAPACITY); + + std::vector messages; + const MessageType messageType {MessageType::STATELESS}; + for (std::string i : {"0", "1", "2"}) + { + const json multipleDataContent = {"content " + i}; + messages.push_back({messageType, multipleDataContent}); + } + EXPECT_EQ(messages.size(), 3); + EXPECT_EQ(queue.push(messages), 3); + EXPECT_EQ(queue.storedItems(MessageType::STATELESS), 3); +} + +// push message vector with a mutiple data element +TEST_F(QueueTest, PushVectorWithAMultipleInside) +{ + MultiTypeQueue queue(BIG_QUEUE_CAPACITY); + + std::vector messages; + + // triple data content message + const MessageType messageType {MessageType::STATELESS}; + const Message messageToSend {messageType, multipleDataContent}; + messages.push_back(messageToSend); + + // triple message vector + for (std::string i : {"0", "1", "2"}) + { + const json dataContent = {"content " + i}; + messages.push_back({messageType, dataContent}); + } + + EXPECT_EQ(6, queue.push(messages)); +} + +// Push Multiple, pop multiples +TEST_F(QueueTest, PushMultipleGetMultiple) +{ + MultiTypeQueue queue(BIG_QUEUE_CAPACITY); + const MessageType messageType {MessageType::STATELESS}; + const Message messageToSend {messageType, multipleDataContent}; + + EXPECT_EQ(3, queue.push(messageToSend)); + EXPECT_EQ(queue.storedItems(MessageType::STATELESS), 3); + EXPECT_EQ(queue.popN(MessageType::STATELESS, 1), 1); + EXPECT_EQ(queue.popN(MessageType::STATELESS, 3), 2); + EXPECT_TRUE(queue.isEmpty(MessageType::STATELESS)); + EXPECT_EQ(0, queue.storedItems(MessageType::STATELESS)); +} + +// Push Multiple, pop multiples +TEST_F(QueueTest, PushMultipleGetMultipleWithModule) +{ + MultiTypeQueue queue(BIG_QUEUE_CAPACITY); + const MessageType messageType {MessageType::STATELESS}; + const std::string moduleName = "testModule"; + const Message messageToSend {messageType, multipleDataContent, moduleName}; + + EXPECT_EQ(3, queue.push(messageToSend)); + + // Altough we're asking for 10 messages only the availables are returned. + auto messagesReceived = queue.getNextN(MessageType::STATELESS, 10); + int i = 0; + for (auto singleMessage : messagesReceived) + { + EXPECT_EQ("content " + std::to_string(++i), singleMessage.data.at("data")); + } + + EXPECT_EQ(0, queue.storedItems(MessageType::STATELESS, "fakemodule")); + EXPECT_EQ(3, queue.storedItems(MessageType::STATELESS)); + EXPECT_EQ(3, queue.storedItems(MessageType::STATELESS, moduleName)); +} + +TEST_F(QueueTest, PushSinglesleGetMultipleWithModule) +{ + MultiTypeQueue queue(BIG_QUEUE_CAPACITY); + + for (std::string i : {"1", "2", "3", "4", "5"}) + { + const MessageType messageType {MessageType::STATELESS}; + const json multipleDataContent = {"content-" + i}; + const std::string moduleName = "module-" + i; + const Message messageToSend {messageType, multipleDataContent, moduleName}; + EXPECT_EQ(1, queue.push(messageToSend)); + } + + auto messagesReceived = queue.getNextN(MessageType::STATELESS, 10); + EXPECT_EQ(5, messagesReceived.size()); + int i = 0; + for (auto singleMessage : messagesReceived) + { + auto val = ++i; + EXPECT_EQ("content-" + std::to_string(val), singleMessage.data.at("data")); + EXPECT_EQ("module-" + std::to_string(val), singleMessage.data.at("module")); + } + + auto messageReceivedContent1 = queue.getNextN(MessageType::STATELESS, 10, "module-1"); + EXPECT_EQ(1, messageReceivedContent1.size()); +} + +TEST_F(QueueTest, GetNextAwaitableBase) +{ + MultiTypeQueue queue(BIG_QUEUE_CAPACITY); + boost::asio::io_context io_context; + + // Coroutine that waits till there's a message of the needed type on the queue + boost::asio::co_spawn( + io_context, + [&queue]() -> boost::asio::awaitable + { + auto messageReceived = co_await queue.getNextNAwaitable(MessageType::STATELESS, 2); + EXPECT_EQ(messageReceived.data.at(0).at("data"), "content-1"); + EXPECT_EQ(messageReceived.data.at(1).at("data"), "content-2"); + }, + boost::asio::detached); + + // Simulate the addition of needed message to the queue after some time + std::thread producer( + [&queue, &io_context]() + { + std::this_thread::sleep_for(std::chrono::seconds(2)); + const MessageType messageType {MessageType::STATELESS}; + const json multipleDataContent = {"content-1", "content-2", "content-3"}; + const Message messageToSend {messageType, multipleDataContent}; + EXPECT_EQ(queue.push(messageToSend), 3); + // io_context.stop(); + }); + + io_context.run(); + producer.join(); +} + +TEST_F(QueueTest, PushAwaitable) +{ + MultiTypeQueue queue(SMALL_QUEUE_CAPACITY); + boost::asio::io_context io_context; + + // complete the queue with messages + const MessageType messageType {MessageType::STATEFUL}; + for (int i : {1, 2}) + { + const json dataContent = R"({"Data" : "for STATEFUL)" + std::to_string(i) + R"("})"; + EXPECT_EQ(queue.push({messageType, dataContent}), 1); + } + + EXPECT_TRUE(queue.isFull(MessageType::STATEFUL)); + EXPECT_EQ(queue.storedItems(MessageType::STATEFUL), 2); + + // Coroutine that waits till there's space to push a new message + boost::asio::co_spawn( + io_context, + [&queue]() -> boost::asio::awaitable + { + const MessageType messageType {MessageType::STATEFUL}; + const json dataContent = {"content-1"}; + const Message messageToSend {messageType, dataContent}; + EXPECT_EQ(queue.storedItems(MessageType::STATEFUL), 2); + auto messagesPushed = co_await queue.pushAwaitable(messageToSend); + EXPECT_EQ(messagesPushed, 1); + EXPECT_EQ(queue.storedItems(MessageType::STATEFUL), 2); + }, + boost::asio::detached); + + // Simulate poping one message so there's space to push a new one + std::thread consumer( + [&queue, &io_context]() + { + std::this_thread::sleep_for(std::chrono::seconds(2)); + EXPECT_EQ(queue.popN(MessageType::STATEFUL, 1), 1); + // TODO: double check this behavior, is it mandatory to stop the context here? + // io_context.stop(); + }); + + io_context.run(); + consumer.join(); + + EXPECT_TRUE(queue.isFull(MessageType::STATEFUL)); +} + +TEST_F(QueueTest, FifoOrderCheck) +{ + MultiTypeQueue queue(BIG_QUEUE_CAPACITY); + + // complete the queue with messages + const MessageType messageType {MessageType::STATEFUL}; + for (int i : {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}) + { + const json dataContent = R"({"Data" : "for STATEFUL)" + std::to_string(i) + R"("})"; + EXPECT_EQ(queue.push({messageType, dataContent}), 1); + } + + auto messageReceivedVector = queue.getNextN(messageType, 10); + EXPECT_EQ(messageReceivedVector.size(), 10); + int i = 0; + for (auto singleMessage : messageReceivedVector) + { + EXPECT_EQ(singleMessage.data.at("data"), R"({"Data" : "for STATEFUL)" + std::to_string(++i) + R"("})"); + } + + // Keep the order of the message: FIFO + for (int i : {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}) + { + auto messageReceived = queue.getNextN(messageType, 1); + EXPECT_EQ(messageReceived.at(0).data.at("data"), R"({"Data" : "for STATEFUL)" + std::to_string(i) + R"("})"); + EXPECT_TRUE(queue.pop(messageType)); + } +} diff --git a/src/agent/agent_queue/tests/queue_test.hpp b/src/agent/agent_queue/tests/queue_test.hpp new file mode 100644 index 0000000000..22ab133e43 --- /dev/null +++ b/src/agent/agent_queue/tests/queue_test.hpp @@ -0,0 +1,36 @@ +/* + * Wazuh SyscollectorImp + * Copyright (C) 2015, Wazuh Inc. + * July 10, 2024. + * + * This program is free software; you can redistribute it + * and/or modify it under the terms of the GNU General Public + * License (version 2) as published by the FSF - Free Software + * Foundation. + */ +#ifndef _QUEUE_TEST_H +#define _QUEUE_TEST_H +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +class QueueTest : public ::testing::Test +{ +protected: + QueueTest() = default; + virtual ~QueueTest() = default; + + void SetUp() override; + void TearDown() override; +}; + +class JsonTest : public ::testing::Test +{ +protected: + JsonTest() = default; + virtual ~JsonTest() = default; + + void SetUp() override {}; + void TearDown() override {}; +}; + +#endif //_QUEUE_TEST_H diff --git a/src/agent/agent_queue/tests/sqlitestorage_test.cpp b/src/agent/agent_queue/tests/sqlitestorage_test.cpp new file mode 100644 index 0000000000..fdb36e4769 --- /dev/null +++ b/src/agent/agent_queue/tests/sqlitestorage_test.cpp @@ -0,0 +1,277 @@ +#include +#include +#include +#include +#include + +#include "gtest/gtest.h" +#include + +#include "sqlitestorage.h" + +using json = nlohmann::json; + +class SQLiteStorageTest : public ::testing::Test +{ +protected: + const std::string dbName = "testdb.db"; + const std::string tableName = "test_table"; + const std::string moduleName = "moduleX"; + const std::vector m_vMessageTypeStrings {"test_table", "test_table2"}; + SQLiteStorage* storage; + + void SetUp() override + { + // Ensure the database file does not already exist + std::string filePath = dbName; + for (const auto& entry : std::filesystem::directory_iterator(".")) + { + std::string fileFullPath = entry.path(); + size_t found = fileFullPath.find(filePath); + if (found != std::string::npos) + { + std::error_code ec; + std::filesystem::remove(fileFullPath, ec); + } + } + + storage = new SQLiteStorage(dbName, m_vMessageTypeStrings); + } + + void TearDown() override + { + // Clean up: Delete the SQLiteStorage instance and remove the database file + delete storage; + std::error_code ec; + if (std::filesystem::exists(dbName.c_str())) + { + std::filesystem::remove(dbName.c_str()); + if (ec) + { + std::cerr << "Error removing file: " << ec.message() << std::endl; + } + } + } +}; + +TEST_F(SQLiteStorageTest, StoreSingleMessage) +{ + json message = {{"key", "value"}}; + EXPECT_EQ(storage->Store(message, tableName), 1); + EXPECT_EQ(storage->GetElementCount(tableName), 1); + EXPECT_EQ(storage->Store(message, tableName), 1); + EXPECT_EQ(storage->GetElementCount(tableName), 2); +} + +TEST_F(SQLiteStorageTest, StoreSingleMessageWithModule) +{ + json message = {{"key", "value"}}; + EXPECT_EQ(storage->Store(message, tableName, moduleName), 1); + EXPECT_EQ(storage->GetElementCount(tableName), 1); + EXPECT_EQ(storage->Store(message, tableName), 1); + EXPECT_EQ(storage->GetElementCount(tableName), 2); + EXPECT_EQ(storage->GetElementCount(tableName, "unavailableModuleName"), 0); + EXPECT_EQ(storage->GetElementCount(tableName, moduleName), 1); +} + +TEST_F(SQLiteStorageTest, StoreMultipleMessages) +{ + json messages = json::array(); + messages.push_back({{"key", "value1"}}); + messages.push_back({{"key", "value2"}}); + EXPECT_EQ(storage->Store(messages, tableName), 2); + EXPECT_EQ(storage->GetElementCount(tableName), 2); +} + +TEST_F(SQLiteStorageTest, StoreMultipleMessagesWithModule) +{ + json messages = json::array(); + messages.push_back({{"key", "value1"}}); + messages.push_back({{"key", "value2"}}); + EXPECT_EQ(storage->Store(messages, tableName, moduleName), 2); + EXPECT_EQ(storage->GetElementCount(tableName), 2); + EXPECT_EQ(storage->GetElementCount(tableName, moduleName), 2); + EXPECT_EQ(storage->GetElementCount(tableName, "unavailableModuleName"), 0); +} + +TEST_F(SQLiteStorageTest, RetrieveMessage) +{ + json message = {{"key", "value"}}; + EXPECT_EQ(storage->Store(message, tableName), 1); + json retrievedMessage = storage->Retrieve(1, tableName); + EXPECT_EQ(retrievedMessage.at("data").at("key"), "value"); +} + +TEST_F(SQLiteStorageTest, RetrieveMessageWithModule) +{ + json message = {{"key", "value"}}; + storage->Store(message, tableName, moduleName); + json retrievedMessage = storage->Retrieve(1, tableName, "unavailableModuleName"); + EXPECT_EQ(retrievedMessage.at("data"), nullptr); + retrievedMessage = storage->Retrieve(1, tableName, moduleName); + EXPECT_EQ(retrievedMessage.at("data").at("key"), "value"); +} + +TEST_F(SQLiteStorageTest, RetrieveMultipleMessages) +{ + json messages = json::array(); + messages.push_back({{"key", "value1"}}); + messages.push_back({{"key", "value2"}}); + storage->Store(messages, tableName); + json retrievedMessages = storage->RetrieveMultiple(2, tableName); + EXPECT_EQ(retrievedMessages.size(), 2); +} + +TEST_F(SQLiteStorageTest, RetrieveMultipleMessagesWithModule) +{ + json messages = json::array(); + messages.push_back({{"key", "value1"}}); + messages.push_back({{"key", "value2"}}); + messages.push_back({{"key", "value3"}}); + messages.push_back({{"key", "value4"}}); + storage->Store(messages, tableName, moduleName); + json retrievedMessages = storage->RetrieveMultiple(4, tableName, moduleName); + EXPECT_EQ(retrievedMessages.size(), 4); + + int i = 0; + for (auto singleMessage : retrievedMessages) + { + EXPECT_EQ("value" + std::to_string(++i), singleMessage.at("data").at("key")); + } +} + +TEST_F(SQLiteStorageTest, RemoveMessage) +{ + json message = {{"key", "value"}}; + EXPECT_EQ(storage->Store(message, tableName), 1); + EXPECT_EQ(storage->Remove(1, tableName), 1); + EXPECT_EQ(storage->GetElementCount(tableName), 0); +} + +TEST_F(SQLiteStorageTest, RemoveMessageWithModule) +{ + json message = {{"key", "value"}}; + EXPECT_EQ(storage->Store(message, tableName, moduleName), 1); + EXPECT_EQ(storage->Remove(1, tableName), 1); + EXPECT_EQ(storage->Store(message, tableName, moduleName), 1); + EXPECT_EQ(storage->Remove(1, tableName, "unavailableModuleName"), 1); + EXPECT_EQ(storage->GetElementCount(tableName), 1); + EXPECT_EQ(storage->Remove(1, tableName, moduleName), 1); + EXPECT_EQ(storage->GetElementCount(tableName), 0); +} + +TEST_F(SQLiteStorageTest, RemoveMultipleMessages) +{ + json messages = json::array(); + messages.push_back({{"key", "value1"}}); + messages.push_back({{"key", "value2"}}); + EXPECT_EQ(storage->Store(messages, tableName), 2); + EXPECT_EQ(storage->RemoveMultiple(2, tableName), 2); + EXPECT_EQ(storage->GetElementCount(tableName), 0); +} + +TEST_F(SQLiteStorageTest, RemoveMultipleMessagesWithModule) +{ + json messages = json::array(); + messages.push_back({{"key", "value1"}}); + messages.push_back({{"key", "value2"}}); + EXPECT_EQ(storage->Store(messages, tableName, moduleName), 2); + EXPECT_EQ(storage->RemoveMultiple(2, tableName, "unavailableModuleName"), 0); + EXPECT_EQ(storage->GetElementCount(tableName), 2); + EXPECT_EQ(storage->RemoveMultiple(2, tableName, moduleName), 2); + EXPECT_EQ(storage->GetElementCount(tableName), 0); +} + +TEST_F(SQLiteStorageTest, GetElementCount) +{ + json message = {{"key", "value"}}; + EXPECT_EQ(storage->Store(message, tableName), 1); + EXPECT_EQ(storage->GetElementCount(tableName), 1); +} + +TEST_F(SQLiteStorageTest, GetElementCountWithModule) +{ + json message = {{"key", "value"}}; + EXPECT_EQ(storage->Store(message, tableName, moduleName), 1); + EXPECT_EQ(storage->GetElementCount(tableName), 1); + EXPECT_EQ(storage->GetElementCount(tableName, moduleName), 1); + EXPECT_EQ(storage->GetElementCount(tableName, "unavailableModuleName"), 0); +} + +class SQLiteStorageMultithreadedTest : public ::testing::Test +{ +protected: + const std::string dbName = "testdb"; + const std::vector m_vMessageTypeStrings {"test_table1", "test_table2", "test_table3"}; + + void SetUp() override + { + // Clean up: Delete the SQLiteStorage instances and remove the database file + std::error_code ec; + if (std::filesystem::exists(dbName.c_str())) + { + std::filesystem::remove(dbName.c_str()); + if (ec) + { + std::cerr << "Error removing file: " << ec.message() << std::endl; + } + } + } + + void TearDown() override {} +}; + +void storeMessages(SQLiteStorage& storage, const json& messages, const std::string& tableName) +{ + for (const auto& message : messages) + { + storage.Store(message, tableName); + } +} + +void retrieveMessages(SQLiteStorage& storage, + int count, + std::vector& retrievedMessages, + const std::string& tableName) +{ + retrievedMessages = storage.RetrieveMultiple(count, tableName); +} + +void removeMessages(SQLiteStorage& storage, int count, const std::string& tableName) +{ + storage.RemoveMultiple(count, tableName); +} + +TEST_F(SQLiteStorageMultithreadedTest, MultithreadedStoreAndRetrieve) +{ + size_t messagesToStore = 100; + + SQLiteStorage storage1(dbName, m_vMessageTypeStrings); + + json messages1 = json::array(); + json messages2 = json::array(); + for (size_t i = 0; i < messagesToStore; i++) + { + messages1.push_back({{"key" + std::to_string(i), "value" + std::to_string(i)}}); + messages2.push_back({{std::to_string(i) + "key", std::to_string(i) + "value"}}); + } + + // Create threads for storing messages + std::thread thread1(storeMessages, std::ref(storage1), messages1, m_vMessageTypeStrings[0]); + std::thread thread2(storeMessages, std::ref(storage1), messages2, m_vMessageTypeStrings[0]); + + // Join the threads to ensure they complete + thread1.join(); + thread2.join(); + + EXPECT_EQ(storage1.GetElementCount(m_vMessageTypeStrings[0]), 2 * messagesToStore); + + std::thread thread3(removeMessages, std::ref(storage1), messagesToStore, m_vMessageTypeStrings[0]); + std::thread thread4(removeMessages, std::ref(storage1), messagesToStore, m_vMessageTypeStrings[0]); + + // Join the threads to ensure they complete + thread3.join(); + thread4.join(); + + EXPECT_EQ(storage1.GetElementCount(m_vMessageTypeStrings[0]), 0); +} diff --git a/src/vcpkg.json b/src/vcpkg.json index 65ad1a5c38..984eb01f8c 100644 --- a/src/vcpkg.json +++ b/src/vcpkg.json @@ -5,6 +5,7 @@ "boost-asio", "boost-beast", "boost-uuid", + "fmt", "gtest", "jwt-cpp", "nlohmann-json",