diff --git a/src/agent/multitype_queue/include/imultitype_queue.hpp b/src/agent/multitype_queue/include/imultitype_queue.hpp index 23531228ac..8e14358a81 100644 --- a/src/agent/multitype_queue/include/imultitype_queue.hpp +++ b/src/agent/multitype_queue/include/imultitype_queue.hpp @@ -133,4 +133,7 @@ class IMultiTypeQueue * @return int The number of items in the queue. */ virtual int storedItems(MessageType type, const std::string moduleName = "") = 0; + + //TODO: doc + virtual int sizePerType(MessageType type) = 0; }; diff --git a/src/agent/multitype_queue/include/multitype_queue.hpp b/src/agent/multitype_queue/include/multitype_queue.hpp index bd16916e22..d47976733d 100644 --- a/src/agent/multitype_queue/include/multitype_queue.hpp +++ b/src/agent/multitype_queue/include/multitype_queue.hpp @@ -135,4 +135,7 @@ class MultiTypeQueue : public IMultiTypeQueue * @copydoc IMultiTypeQueue::storedItems(MessageType, const std::string) */ int storedItems(MessageType type, const std::string moduleName = "") override; + + //TODO: doc + int sizePerType(MessageType type) override; }; diff --git a/src/agent/multitype_queue/include/persistence.hpp b/src/agent/multitype_queue/include/persistence.hpp index a1af51e195..1ab3d21659 100644 --- a/src/agent/multitype_queue/include/persistence.hpp +++ b/src/agent/multitype_queue/include/persistence.hpp @@ -89,4 +89,8 @@ class Persistence * @return int The quantity of elements stored in the specified queue. */ virtual int GetElementCount(const std::string& queueName, const std::string& moduleName = "") = 0; + + + //TODO:doc + virtual int GetElementsStoredSize(const std::string& tableName) = 0; }; diff --git a/src/agent/multitype_queue/src/multitype_queue.cpp b/src/agent/multitype_queue/src/multitype_queue.cpp index e09f36d21e..6f22a4547e 100644 --- a/src/agent/multitype_queue/src/multitype_queue.cpp +++ b/src/agent/multitype_queue/src/multitype_queue.cpp @@ -94,14 +94,14 @@ boost::asio::awaitable MultiTypeQueue::pushAwaitable(Message message) co_await timer.async_wait(boost::asio::use_awaitable); } - const auto storedMessages = static_cast(m_persistenceDest->GetElementCount(sMessageType)); - const auto spaceAvailable = (m_maxItems > storedMessages) ? m_maxItems - storedMessages : 0; - if (spaceAvailable) + const auto storedItems = static_cast(m_persistenceDest->GetElementCount(sMessageType)); + const auto availableItems = (m_maxItems > storedItems) ? m_maxItems - storedItems : 0; + if (availableItems) { auto messageData = message.data; if (messageData.is_array()) { - if (messageData.size() <= spaceAvailable) + if (messageData.size() <= availableItems) { for (const auto& singleMessageData : messageData) { @@ -155,7 +155,6 @@ Message MultiTypeQueue::getNext(MessageType type, const std::string moduleName, } else { - // TODO: error handling LogError("Error didn't find the queue."); } return result; @@ -180,7 +179,6 @@ boost::asio::awaitable> MultiTypeQueue::getNextNAwaitable(M } else { - // TODO: error handling LogError("Error didn't find the queue."); } co_return result; @@ -204,7 +202,6 @@ std::vector MultiTypeQueue::getNextN(MessageType type, } else { - // TODO: error handling LogError("Error didn't find the queue."); } return result; @@ -219,7 +216,6 @@ bool MultiTypeQueue::pop(MessageType type, const std::string moduleName) } else { - // TODO: error handling LogError("Error didn't find the queue."); } return result; @@ -234,7 +230,6 @@ int MultiTypeQueue::popN(MessageType type, int messageQuantity, const std::strin } else { - // TODO: error handling LogError("Error didn't find the queue."); } return result; @@ -248,7 +243,6 @@ bool MultiTypeQueue::isEmpty(MessageType type, const std::string moduleName) } else { - // TODO: error handling LogError("Error didn't find the queue."); } return false; @@ -263,7 +257,6 @@ bool MultiTypeQueue::isFull(MessageType type, const std::string moduleName) } else { - // TODO: error handling LogError("Error didn't find the queue."); } return false; @@ -277,7 +270,19 @@ int MultiTypeQueue::storedItems(MessageType type, const std::string moduleName) } else { - // TODO: error handling + LogError("Error didn't find the queue."); + } + return false; +} + +int MultiTypeQueue::sizePerType(MessageType type) +{ + if (m_mapMessageTypeName.contains(type)) + { + return m_persistenceDest->GetElementsStoredSize(m_mapMessageTypeName.at(type)); + } + else + { LogError("Error didn't find the queue."); } return false; diff --git a/src/agent/multitype_queue/src/sqlitestorage.cpp b/src/agent/multitype_queue/src/sqlitestorage.cpp index ea0ee67a6f..c031572d3b 100644 --- a/src/agent/multitype_queue/src/sqlitestorage.cpp +++ b/src/agent/multitype_queue/src/sqlitestorage.cpp @@ -278,7 +278,7 @@ int SQLiteStorage::Remove(int id, const std::string& tableName, const std::strin catch (const std::exception& e) { LogError("Error during Remove operation: {}.", e.what()); - return {}; + return 0; } } @@ -349,6 +349,33 @@ int SQLiteStorage::GetElementCount(const std::string& tableName, const std::stri catch (const std::exception& e) { LogError("Error during GetElementCount operation: {}.", e.what()); - return {}; + return 0; + } +} + +int SQLiteStorage::GetElementsStoredSize(const std::string& tableName) +{ + std::string sizeQuery = fmt::format( + R"(SELECT SUM(LENGTH(module_name) + LENGTH(module_type) + LENGTH(metadata) + LENGTH(message)) AS total_bytes FROM {};)", + tableName); + + try + { + SQLite::Statement query(*m_db, sizeQuery); + int count = 0; + if (query.executeStep()) + { + count = query.getColumn(0).getInt(); + } + else + { + LogError("Error SQLiteStorage get element count."); + } + return count; + } + catch (const std::exception& e) + { + LogError("Error during GetElementCount operation: {}.", e.what()); + return 0; } } diff --git a/src/agent/multitype_queue/src/sqlitestorage.hpp b/src/agent/multitype_queue/src/sqlitestorage.hpp index 25f2491058..16ca9c1a7c 100644 --- a/src/agent/multitype_queue/src/sqlitestorage.hpp +++ b/src/agent/multitype_queue/src/sqlitestorage.hpp @@ -120,6 +120,10 @@ class SQLiteStorage : public Persistence */ int GetElementCount(const std::string& tableName, const std::string& moduleName = "") override; + + //TODO: doc + int GetElementsStoredSize(const std::string& tableName) override; + private: /** * @brief Initialize the table in the SQLite database. diff --git a/src/agent/multitype_queue/tests/sqlitestorage_test.cpp b/src/agent/multitype_queue/tests/sqlitestorage_test.cpp index d021e14713..dceba7105a 100644 --- a/src/agent/multitype_queue/tests/sqlitestorage_test.cpp +++ b/src/agent/multitype_queue/tests/sqlitestorage_test.cpp @@ -196,6 +196,24 @@ TEST_F(SQLiteStorageTest, GetElementCountWithModule) EXPECT_EQ(storage->GetElementCount(tableName, "unavailableModuleName"), 0); } +TEST_F(SQLiteStorageTest, MessagesSizes) +{ + auto messages = nlohmann::json::array(); + messages.push_back({{"key", "value1"}}); + messages.push_back({{"key", "value2"}}); + auto val = storage->Store(messages, tableName); + EXPECT_EQ(val, 2); + + auto retrievedMessages = storage->GetElementsStoredSize(tableName); + EXPECT_EQ(retrievedMessages, 32); + + val = storage->Store(messages, tableName); + EXPECT_EQ(val, 2); + + retrievedMessages = storage->GetElementsStoredSize(tableName); + EXPECT_EQ(retrievedMessages, 64); +} + class SQLiteStorageMultithreadedTest : public ::testing::Test { protected: