diff --git a/src/agent/queue/CMakeLists.txt b/src/agent/queue/CMakeLists.txt index 02547f30ea8..783b9cc5fd2 100644 --- a/src/agent/queue/CMakeLists.txt +++ b/src/agent/queue/CMakeLists.txt @@ -1,6 +1,6 @@ cmake_minimum_required(VERSION 3.22) -project(queue_test LANGUAGES CXX) +project(queue LANGUAGES CXX) set(CMAKE_CXX_STANDARD 20) set(CMAKE_CXX_STANDARD_REQUIRED True) @@ -8,17 +8,17 @@ set(CMAKE_CXX_FLAGS "-Wall -Wextra -Wunused -pthread") include_directories(${CMAKE_SOURCE_DIR}/include) -add_library(queue_test - src/sqlitestorage.cpp - src/queue.cpp -) - find_package(SQLiteCpp REQUIRED) find_package(nlohmann_json REQUIRED) find_package(fmt REQUIRED) -target_include_directories(queue_test PRIVATE include ${SQLiteCpp_INCLUDE_DIRS}) -target_link_libraries(queue_test PRIVATE SQLiteCpp nlohmann_json::nlohmann_json fmt::fmt) +add_library(queue + src/sqlitestorage.cpp + src/queue.cpp +) + +target_include_directories(queue PRIVATE include ${SQLiteCpp_INCLUDE_DIRS}) +target_link_libraries(queue PRIVATE SQLiteCpp nlohmann_json::nlohmann_json fmt::fmt) if(BUILD_TESTS) add_subdirectory(tests) diff --git a/src/agent/queue/include/persistence.h b/src/agent/queue/include/persistence.h index 7b49f08bfbe..605d7f4da43 100644 --- a/src/agent/queue/include/persistence.h +++ b/src/agent/queue/include/persistence.h @@ -26,7 +26,7 @@ class Persistence * @param message * @param queueName */ - virtual void Store(const json& message, const std::string& queueName) = 0; + virtual void Store(const json& message, const std::string& tableName, const std::string& moduleName = "") = 0; /** * @brief @@ -44,7 +44,7 @@ class Persistence * @param queueName * @return json */ - virtual json RetrieveMultiple(int n, const std::string& queueName) = 0; + virtual json RetrieveMultiple(int n, const std::string& queueName, const std::string& moduleName = "") = 0; /** * @brief diff --git a/src/agent/queue/include/queue.hpp b/src/agent/queue/include/queue.hpp index c12626de916..d6b4a009b30 100644 --- a/src/agent/queue/include/queue.hpp +++ b/src/agent/queue/include/queue.hpp @@ -61,8 +61,15 @@ class MultiTypeQueue : m_maxItems(size) , m_timeout(timeout) { - m_persistenceDest = PersistenceFactory::createPersistence( - "SQLite3", {static_cast(DEFAULT_DB_PATH), m_vMessageTypeStrings}); + try + { + m_persistenceDest = PersistenceFactory::createPersistence( + "SQLite3", {static_cast(DEFAULT_DB_PATH), m_vMessageTypeStrings}); + } + catch (const std::exception& e) + { + std::cerr << "Error creating persistence: " << e.what() << '\n'; + } } // Delete copy constructor @@ -99,7 +106,7 @@ class MultiTypeQueue * @param type * @return Message */ - Message getLastMessage(MessageType type); + Message getLastMessage(MessageType type, const std::string module = ""); /** * @brief Returns N messages from a queue @@ -108,7 +115,7 @@ class MultiTypeQueue * @param messageQuantity quantity of messages to return * @return Message Json data othe messages fetched */ - Message getNMessages(MessageType type, int messageQuantity); + Message getNMessages(MessageType type, int messageQuantity, const std::string moduleName = ""); /** * @brief deletes a message from a queue diff --git a/src/agent/queue/include/shared.hpp b/src/agent/queue/include/shared.hpp index a89eb2c4c15..0561f0db8ea 100644 --- a/src/agent/queue/include/shared.hpp +++ b/src/agent/queue/include/shared.hpp @@ -29,9 +29,11 @@ class Message public: MessageType type; nlohmann::json data; - Message(MessageType t, nlohmann::json d) + std::string moduleName; + Message(MessageType t, nlohmann::json d, std::string mN = "") : type(t) , data(d) + , moduleName(mN) { } }; diff --git a/src/agent/queue/include/sqlitestorage.h b/src/agent/queue/include/sqlitestorage.h index f86a7ac23f1..b946d27600a 100644 --- a/src/agent/queue/include/sqlitestorage.h +++ b/src/agent/queue/include/sqlitestorage.h @@ -49,8 +49,9 @@ class SQLiteStorage : public Persistence * * @param message The JSON message to store. */ - void Store(const json& message, const std::string& tableName) override; + void Store(const json& message, const std::string& tableName, const std::string& moduleName = "") override; + // TODO: pending tests! /** * @brief Retrieve a JSON message by its ID. * @@ -65,7 +66,7 @@ class SQLiteStorage : public Persistence * @param n The number of messages to retrieve. * @return A vector of retrieved JSON messages. */ - json RetrieveMultiple(int n, const std::string& tableName) override; + json RetrieveMultiple(int n, const std::string& tableName, const std::string& moduleName = "") override; /** * @brief Remove a JSON message by its ID. diff --git a/src/agent/queue/src/main.cpp b/src/agent/queue/src/main.cpp index ae8983c9baa..29755ca5f8c 100644 --- a/src/agent/queue/src/main.cpp +++ b/src/agent/queue/src/main.cpp @@ -6,7 +6,143 @@ int main() { - MultiTypeQueue queue(10); + + const std::map mapMessageTypeName { + {MessageType::STATELESS, "STATELESS"}, + {MessageType::STATEFUL, "STATEFUL"}, + {MessageType::COMMAND, "COMMAND"}}; + + //CLEANUP + std::string filePath = DEFAULT_DB_PATH; + for (const auto& entry : std::filesystem::directory_iterator(".")) + { + std::string fileFullPath = entry.path(); + size_t found = fileFullPath.find(filePath); + if (found != std::string::npos) + { + std::error_code ec; + std::filesystem::remove(fileFullPath, ec); + } + } + + int items = 100000; + MultiTypeQueue queue(items); + + auto messageConsumer = [&](int& count, MessageType messageType) + { + for (int i = 0; i < count; ++i) + { + queue.popLastMessage(messageType); + } + }; + + auto messageGetPop = [&](int& count, MessageType messageType) + { + for (int i = 0; i < count; ++i) + { + queue.getLastMessage(messageType); + queue.popLastMessage(messageType); + } + }; + + auto messageProducer = [&](int& count, MessageType messageType) + { + for (int i = 0; i < count; ++i) + { + const json dataContent1 = R"({{"Data": "for )" + mapMessageTypeName.at(messageType) + std::to_string(i) + R"("}})"; + queue.timeoutPush(Message(messageType, dataContent1), true); + } + }; + + auto start = std::chrono::high_resolution_clock::now(); + messageProducer(items, MessageType::COMMAND); + auto endCom = std::chrono::high_resolution_clock::now(); + auto duration = std::chrono::duration_cast(endCom - start); + std::cout << "Filling Command took " << duration.count() << " microseconds, per message: " << duration.count()/items << std::endl; + + messageProducer(items, MessageType::STATELESS); + auto endSL = std::chrono::high_resolution_clock::now(); + duration = std::chrono::duration_cast(endSL - endCom); + std::cout << "Filling SL took " << duration.count() << " microseconds, per message: " << duration.count()/items << std::endl; + + messageProducer(items, MessageType::STATEFUL); + auto endSF = std::chrono::high_resolution_clock::now(); + duration = std::chrono::duration_cast(endSF - endSL); + std::cout << "Filling SF took " << duration.count() << " microseconds, per message: " << duration.count()/items << std::endl; + + duration = std::chrono::duration_cast(endSF - start); + std::cout << "Filling everything took " << duration.count() << " microseconds, per message: " << duration.count()/items << std::endl; + + std::cout << "1- Inserted into command: " << queue.getItemsByType(MessageType::COMMAND) << std::endl; + std::cout << "1- Inserted into stateful: " << queue.getItemsByType(MessageType::STATEFUL) << std::endl; + std::cout << "1- Inserted into stateless: " << queue.getItemsByType(MessageType::STATELESS) << std::endl; + + // std::thread consumerThread1(messageConsumer, std::ref(items), MessageType::COMMAND); + // std::thread consumerThread2(messageConsumer, std::ref(items), MessageType::STATELESS); + // std::thread consumerThread3(messageConsumer, std::ref(items), MessageType::STATEFUL); + + + // auto startConsuming = std::chrono::high_resolution_clock::now(); + // if (consumerThread1.joinable()) + // { + // consumerThread1.join(); + // } + + // if (consumerThread2.joinable()) + // { + // consumerThread2.join(); + // } + + // if (consumerThread3.joinable()) + // { + // consumerThread3.join(); + // } + + // auto endConsuming = std::chrono::high_resolution_clock::now(); + // duration = std::chrono::duration_cast(endConsuming - startConsuming); + // std::cout << "Consuming everything took " << duration.count() << " microseconds, per message: " << duration.count()/items << std::endl; + + // std::cout << "2- Inserted into command: " << queue.getItemsByType(MessageType::COMMAND) << std::endl; + // std::cout << "2- Inserted into stateful: " << queue.getItemsByType(MessageType::STATEFUL) << std::endl; + // std::cout << "2- Inserted into stateless: " << queue.getItemsByType(MessageType::STATELESS) << std::endl; + + // // FIll again + // messageProducer(items, MessageType::COMMAND); + // messageProducer(items, MessageType::STATELESS); + // messageProducer(items, MessageType::STATEFUL); + + // std::cout << "3- Inserted into command: " << queue.getItemsByType(MessageType::COMMAND) << std::endl; + // std::cout << "3- Inserted into stateful: " << queue.getItemsByType(MessageType::STATEFUL) << std::endl; + // std::cout << "3- Inserted into stateless: " << queue.getItemsByType(MessageType::STATELESS) << std::endl; + + // std::thread consumerThread11(messageGetPop, std::ref(items), MessageType::COMMAND); + // std::thread consumerThread12(messageGetPop, std::ref(items), MessageType::STATELESS); + // std::thread consumerThread13(messageGetPop, std::ref(items), MessageType::STATEFUL); + + + // auto startGettingPopping = std::chrono::high_resolution_clock::now(); + // if (consumerThread11.joinable()) + // { + // consumerThread11.join(); + // } + + // if (consumerThread12.joinable()) + // { + // consumerThread12.join(); + // } + + // if (consumerThread13.joinable()) + // { + // consumerThread13.join(); + // } + + // auto endGettingPopping = std::chrono::high_resolution_clock::now(); + // duration = std::chrono::duration_cast(endGettingPopping - startGettingPopping); + // std::cout << "Getting Popping everything took " << duration.count() << " microseconds, per message: " << duration.count()/items << std::endl; + + // std::cout << "4- Inserted into command: " << queue.getItemsByType(MessageType::COMMAND) << std::endl; + // std::cout << "4- Inserted into stateful: " << queue.getItemsByType(MessageType::STATEFUL) << std::endl; + // std::cout << "4- Inserted into stateless: " << queue.getItemsByType(MessageType::STATELESS) << std::endl; return 0; } diff --git a/src/agent/queue/src/queue.cpp b/src/agent/queue/src/queue.cpp index 574ac028726..2d11337c31e 100644 --- a/src/agent/queue/src/queue.cpp +++ b/src/agent/queue/src/queue.cpp @@ -27,11 +27,6 @@ bool MultiTypeQueue::timeoutPush(Message message, bool shouldWait) m_cv.wait_for( lock, m_timeout, [&, this] { return m_persistenceDest->GetElementCount(sMessageType) < m_maxItems; }); } - // FIXME - // else - // { - // std::cout << "Can failed because os full queue" << std::endl; - // } auto storedMessages = m_persistenceDest->GetElementCount(sMessageType); size_t spaceAvailable = (m_maxItems > storedMessages) ? m_maxItems - storedMessages : 0; @@ -46,7 +41,7 @@ bool MultiTypeQueue::timeoutPush(Message message, bool shouldWait) { for (const auto& singleMessageData : messageData) { - m_persistenceDest->Store(singleMessageData, sMessageType); + m_persistenceDest->Store(singleMessageData, sMessageType, message.moduleName); m_cv.notify_all(); } } @@ -57,7 +52,7 @@ bool MultiTypeQueue::timeoutPush(Message message, bool shouldWait) } else { - m_persistenceDest->Store(message.data, m_mapMessageTypeName.at(message.type)); + m_persistenceDest->Store(message.data, m_mapMessageTypeName.at(message.type), message.moduleName); m_cv.notify_all(); } } @@ -77,13 +72,17 @@ void MultiTypeQueue::timeoutPush(std::vector messages) } } -Message MultiTypeQueue::getLastMessage(MessageType type) +Message MultiTypeQueue::getLastMessage(MessageType type, const std::string moduleName) { - Message result(type, {}); + Message result(type, "{}"_json, moduleName); // std::unique_lock mapLock(m_mapMutex); if (m_mapMessageTypeName.contains(type)) { - result.data = m_persistenceDest->RetrieveMultiple(1, m_mapMessageTypeName.at(type)); + auto resultData = m_persistenceDest->RetrieveMultiple(1, m_mapMessageTypeName.at(type), moduleName); + if (!resultData.empty()) + { + result.data = resultData; + } } else { @@ -93,12 +92,12 @@ Message MultiTypeQueue::getLastMessage(MessageType type) return result; } -Message MultiTypeQueue::getNMessages(MessageType type, int messageQuantity) +Message MultiTypeQueue::getNMessages(MessageType type, int messageQuantity, const std::string moduleName) { Message result(type, {}); if (m_mapMessageTypeName.contains(type)) { - result.data = m_persistenceDest->RetrieveMultiple(messageQuantity, m_mapMessageTypeName.at(type)); + result.data = m_persistenceDest->RetrieveMultiple(messageQuantity, m_mapMessageTypeName.at(type), moduleName); } else { diff --git a/src/agent/queue/src/sqlitestorage.cpp b/src/agent/queue/src/sqlitestorage.cpp index c9b7de94138..dc17628a2a6 100644 --- a/src/agent/queue/src/sqlitestorage.cpp +++ b/src/agent/queue/src/sqlitestorage.cpp @@ -29,7 +29,8 @@ SQLiteStorage::~SQLiteStorage() {} void SQLiteStorage::InitializeTable(const std::string& tableName) { // TODO: all queries should be in the same place. - constexpr std::string_view CREATE_TABLE_QUERY {"CREATE TABLE IF NOT EXISTS {} (message TEXT NOT NULL);"}; + constexpr std::string_view CREATE_TABLE_QUERY { + "CREATE TABLE IF NOT EXISTS {} (module TEXT, message TEXT NOT NULL);"}; auto createTableQuery = fmt::format(CREATE_TABLE_QUERY, tableName); std::lock_guard lock(m_mutex); try @@ -56,11 +57,11 @@ void SQLiteStorage::releaseDatabaseAccess() m_cv.notify_one(); } -void SQLiteStorage::Store(const json& message, const std::string& tableName) +void SQLiteStorage::Store(const json& message, const std::string& tableName, const std::string& moduleName) { waitForDatabaseAccess(); - constexpr std::string_view INSERT_QUERY {"INSERT INTO {} (message) VALUES (?);"}; - std::string insertQuery = fmt::format(INSERT_QUERY, tableName); + constexpr std::string_view INSERT_QUERY {"INSERT INTO {} (module, message) VALUES (\"{}\", ?);"}; + std::string insertQuery = fmt::format(INSERT_QUERY, tableName, moduleName); SQLite::Statement query = SQLite::Statement(*m_db, insertQuery); if (message.is_array()) @@ -75,7 +76,7 @@ void SQLiteStorage::Store(const json& message, const std::string& tableName) } catch (const SQLite::Exception& e) { - std::cerr << "2 " << e.what() << '\n'; + std::cerr << "Error SqliteStorage Store: " << e.what() << '\n'; } // Reset the query to reuse it for the next message query.reset(); @@ -104,6 +105,7 @@ json SQLiteStorage::Retrieve(int id, const std::string& tableName) json message; if (query.executeStep()) { + // TODO: Pending retrieve module message = json::parse(query.getColumn(0).getString()); } else @@ -114,31 +116,69 @@ json SQLiteStorage::Retrieve(int id, const std::string& tableName) } catch (const SQLite::Exception& e) { - std::cerr << "Error retrieving message: " << e.what() << std::endl; - throw; + std::cerr << "Error SQLiteStorage retrieve: " << e.what() << std::endl; + return {}; } } -json SQLiteStorage::RetrieveMultiple(int n, const std::string& tableName) +json SQLiteStorage::RetrieveMultiple(int n, const std::string& tableName, const std::string& moduleName) { + std::string selectQuery; + if (moduleName.empty()) + { + constexpr std::string_view SELECT_MULTIPLE_QUERY {"SELECT module, message FROM {} ORDER BY rowid ASC LIMIT ?;"}; + selectQuery = fmt::format(SELECT_MULTIPLE_QUERY, tableName); + } + else + { + constexpr std::string_view SELECT_QUERY { + "SELECT module, message FROM {} WHERE module LIKE \"{}\" ORDER BY rowid ASC LIMIT ?;"}; + selectQuery = fmt::format(SELECT_QUERY, tableName, moduleName); + } + try { - constexpr std::string_view SELECT_MULTIPLE_QUERY {"SELECT message FROM {} ORDER BY rowid ASC LIMIT ?;"}; - std::string selectQuery = fmt::format(SELECT_MULTIPLE_QUERY, tableName); SQLite::Statement query(*m_db, selectQuery); query.bind(1, n); json messages = json::array(); while (query.executeStep()) { - messages.push_back(json::parse(query.getColumn(0).getString())); + // getting data json + std::string dataString; + std::string moduleString; + + // TODO: recheck type! + if (query.getColumnCount() == 2 && query.getColumn(1).getType() == 3 && query.getColumn(0).getType() == 3) + { + moduleString = query.getColumn(0).getString(); + dataString = query.getColumn(1).getString(); + + json outputJson = {{"module", ""}, {"data", {}}}; + + if (!dataString.empty()) + { + outputJson["data"] = json::parse(dataString); + } + + if (!moduleString.empty()) + { + outputJson["module"] = moduleString; + } + + messages.push_back(outputJson); + } + } + + if (!messages.empty()) + { + std::reverse(messages.begin(), messages.end()); } - std::reverse(messages.begin(), messages.end()); return messages; } catch (const SQLite::Exception& e) { - std::cerr << "Error retrieving multiple messages: " << e.what() << std::endl; - throw; + std::cerr << "Error SQLiteStorage retrieve multiple: " << e.what() << std::endl; + return {}; } } @@ -157,8 +197,8 @@ int SQLiteStorage::Remove(int id, const std::string& tableName) } catch (const SQLite::Exception& e) { - std::cerr << "Error removing message: " << e.what() << std::endl; - throw; + std::cerr << "Error SQLiteStorage remove: " << e.what() << std::endl; + return {}; } } @@ -180,8 +220,8 @@ int SQLiteStorage::RemoveMultiple(int n, const std::string& tableName) } catch (const SQLite::Exception& e) { - std::cerr << "Error removing multiple messages: " << e.what() << std::endl; - throw; + std::cerr << "Error SQLiteStorage remove multiple: " << e.what() << std::endl; + return {}; } } @@ -201,7 +241,7 @@ int SQLiteStorage::GetElementCount(const std::string& tableName) } catch (const SQLite::Exception& e) { - std::cerr << "Error getting element count: " << e.what() << std::endl; - throw; + std::cerr << "Error SQLiteStorage get element count: " << e.what() << std::endl; + return {}; } } diff --git a/src/agent/queue/tests/CMakeLists.txt b/src/agent/queue/tests/CMakeLists.txt index 1a45cf81790..40b9490bdfe 100644 --- a/src/agent/queue/tests/CMakeLists.txt +++ b/src/agent/queue/tests/CMakeLists.txt @@ -17,7 +17,7 @@ add_executable(test_queue target_link_libraries(test_queue ${GTEST_LIBRARIES} ${GTEST_MAIN_LIBRARIES} - queue_test + queue SQLiteCpp nlohmann_json::nlohmann_json fmt::fmt) diff --git a/src/agent/queue/tests/queue_test.cpp b/src/agent/queue/tests/queue_test.cpp index 225121164ee..75c6e6aea99 100644 --- a/src/agent/queue/tests/queue_test.cpp +++ b/src/agent/queue/tests/queue_test.cpp @@ -137,9 +137,8 @@ TEST_F(QueueTest, SinglePushGetNotEmpty) auto typeReceived = messageResponse.type; EXPECT_TRUE(typeSend == typeReceived); - auto dataResponse = messageResponse.data[0].template get(); - auto dataToSend = messageToSend.data.template get(); - EXPECT_EQ(dataResponse, dataToSend); + auto dataResponse = messageResponse.data.at(0).at("data"); + EXPECT_EQ(dataResponse, baseDataContent); EXPECT_FALSE(queue.isEmptyByType(MessageType::STATELESS)); } @@ -158,9 +157,8 @@ TEST_F(QueueTest, SinglePushPopEmpty) auto typeReceived = messageResponse.type; EXPECT_TRUE(typeSend == typeReceived); - auto dataResponse = messageResponse.data[0].template get(); - auto dataToSend = messageToSend.data.template get(); - EXPECT_EQ(dataResponse, dataToSend); + auto dataResponse = messageResponse.data.at(0).at("data"); + EXPECT_EQ(dataResponse, baseDataContent); queue.popLastMessage(MessageType::STATELESS); EXPECT_TRUE(queue.isEmptyByType(MessageType::STATELESS)); @@ -193,12 +191,37 @@ TEST_F(QueueTest, SingleGetPopOnEmpty) auto messageResponse = queue.getLastMessage(MessageType::STATEFUL); EXPECT_EQ(messageResponse.type, MessageType::STATEFUL); - EXPECT_EQ(messageResponse.data[0], nullptr); + EXPECT_EQ(messageResponse.data, "{}"_json); queue.popLastMessage(MessageType::STATEFUL); EXPECT_TRUE(queue.isEmptyByType(MessageType::STATEFUL)); } +TEST_F(QueueTest, SinglePushGetWithModule) +{ + MultiTypeQueue queue(BIG_QUEUE_QTTY); + const MessageType messageType {MessageType::STATELESS}; + const std::string moduleFakeName = "fake-module"; + const std::string moduleName = "module"; + const Message messageToSend {messageType, baseDataContent, moduleName}; + + queue.timeoutPush(messageToSend); + auto messageResponseWrongModule = queue.getLastMessage(MessageType::STATELESS, moduleFakeName); + + auto typeSend = messageToSend.type; + auto typeReceived = messageResponseWrongModule.type; + EXPECT_TRUE(typeSend == typeReceived); + + EXPECT_EQ(messageResponseWrongModule.data, "{}"_json); + + auto messageResponseCorrectModule = queue.getLastMessage(MessageType::STATELESS, moduleName); + + auto dataResponse = messageResponseCorrectModule.data.at(0).at("data"); + EXPECT_EQ(dataResponse, baseDataContent); + + EXPECT_EQ(moduleName, messageResponseCorrectModule.moduleName); +} + // Push, get and check while the queue is full TEST_F(QueueTest, SinglePushPopFullWithTimeout) { @@ -368,7 +391,7 @@ TEST_F(QueueTest, MultiplePushSeveralSingleGets) for (int i : {0, 1, 2}) { auto messageResponse = queue.getLastMessage(MessageType::STATELESS); - auto responseData = messageResponse.data[0].template get(); + auto responseData = messageResponse.data.at(0).at("data"); auto sentData = messageToSend.data[i].template get(); EXPECT_EQ(responseData, sentData); queue.popLastMessage(MessageType::STATELESS); diff --git a/src/vcpkg.json b/src/vcpkg.json index abcdd2104e1..798c6607ef7 100644 --- a/src/vcpkg.json +++ b/src/vcpkg.json @@ -11,7 +11,8 @@ "nlohmann-json", "openssl", "sqlitecpp", - "toml11" + "toml11", + "fmt" ], "overrides": [ {