Skip to content

Commit

Permalink
feat: adding size measurement to sqlite storage
Browse files Browse the repository at this point in the history
  • Loading branch information
LucioDonda committed Dec 5, 2024
1 parent 57c46ee commit 88f1c95
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 14 deletions.
3 changes: 3 additions & 0 deletions src/agent/multitype_queue/include/imultitype_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,4 +133,7 @@ class IMultiTypeQueue
* @return int The number of items in the queue.
*/
virtual int storedItems(MessageType type, const std::string moduleName = "") = 0;

//TODO: doc
virtual int sizePerType(MessageType type) = 0;
};
3 changes: 3 additions & 0 deletions src/agent/multitype_queue/include/multitype_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,4 +135,7 @@ class MultiTypeQueue : public IMultiTypeQueue
* @copydoc IMultiTypeQueue::storedItems(MessageType, const std::string)
*/
int storedItems(MessageType type, const std::string moduleName = "") override;

//TODO: doc
int sizePerType(MessageType type) override;
};
4 changes: 4 additions & 0 deletions src/agent/multitype_queue/include/persistence.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,8 @@ class Persistence
* @return int The quantity of elements stored in the specified queue.
*/
virtual int GetElementCount(const std::string& queueName, const std::string& moduleName = "") = 0;


//TODO:doc
virtual int GetElementsStoredSize(const std::string& tableName) = 0;
};
29 changes: 17 additions & 12 deletions src/agent/multitype_queue/src/multitype_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,14 @@ boost::asio::awaitable<int> MultiTypeQueue::pushAwaitable(Message message)
co_await timer.async_wait(boost::asio::use_awaitable);
}

const auto storedMessages = static_cast<size_t>(m_persistenceDest->GetElementCount(sMessageType));
const auto spaceAvailable = (m_maxItems > storedMessages) ? m_maxItems - storedMessages : 0;
if (spaceAvailable)
const auto storedItems = static_cast<size_t>(m_persistenceDest->GetElementCount(sMessageType));
const auto availableItems = (m_maxItems > storedItems) ? m_maxItems - storedItems : 0;
if (availableItems)
{
auto messageData = message.data;
if (messageData.is_array())
{
if (messageData.size() <= spaceAvailable)
if (messageData.size() <= availableItems)
{
for (const auto& singleMessageData : messageData)
{
Expand Down Expand Up @@ -155,7 +155,6 @@ Message MultiTypeQueue::getNext(MessageType type, const std::string moduleName,
}
else
{
// TODO: error handling
LogError("Error didn't find the queue.");
}
return result;
Expand All @@ -180,7 +179,6 @@ boost::asio::awaitable<std::vector<Message>> MultiTypeQueue::getNextNAwaitable(M
}
else
{
// TODO: error handling
LogError("Error didn't find the queue.");
}
co_return result;
Expand All @@ -204,7 +202,6 @@ std::vector<Message> MultiTypeQueue::getNextN(MessageType type,
}
else
{
// TODO: error handling
LogError("Error didn't find the queue.");
}
return result;
Expand All @@ -219,7 +216,6 @@ bool MultiTypeQueue::pop(MessageType type, const std::string moduleName)
}
else
{
// TODO: error handling
LogError("Error didn't find the queue.");
}
return result;
Expand All @@ -234,7 +230,6 @@ int MultiTypeQueue::popN(MessageType type, int messageQuantity, const std::strin
}
else
{
// TODO: error handling
LogError("Error didn't find the queue.");
}
return result;
Expand All @@ -248,7 +243,6 @@ bool MultiTypeQueue::isEmpty(MessageType type, const std::string moduleName)
}
else
{
// TODO: error handling
LogError("Error didn't find the queue.");
}
return false;
Expand All @@ -263,7 +257,6 @@ bool MultiTypeQueue::isFull(MessageType type, const std::string moduleName)
}
else
{
// TODO: error handling
LogError("Error didn't find the queue.");
}
return false;
Expand All @@ -277,7 +270,19 @@ int MultiTypeQueue::storedItems(MessageType type, const std::string moduleName)
}
else
{
// TODO: error handling
LogError("Error didn't find the queue.");
}
return false;
}

int MultiTypeQueue::sizePerType(MessageType type)
{
if (m_mapMessageTypeName.contains(type))
{
return m_persistenceDest->GetElementsStoredSize(m_mapMessageTypeName.at(type));
}
else
{
LogError("Error didn't find the queue.");
}
return false;
Expand Down
31 changes: 29 additions & 2 deletions src/agent/multitype_queue/src/sqlitestorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ int SQLiteStorage::Remove(int id, const std::string& tableName, const std::strin
catch (const std::exception& e)
{
LogError("Error during Remove operation: {}.", e.what());
return {};
return 0;
}
}

Expand Down Expand Up @@ -349,6 +349,33 @@ int SQLiteStorage::GetElementCount(const std::string& tableName, const std::stri
catch (const std::exception& e)
{
LogError("Error during GetElementCount operation: {}.", e.what());
return {};
return 0;
}
}

int SQLiteStorage::GetElementsStoredSize(const std::string& tableName)
{
std::string sizeQuery = fmt::format(
R"(SELECT SUM(LENGTH(module_name) + LENGTH(module_type) + LENGTH(metadata) + LENGTH(message)) AS total_bytes FROM {};)",
tableName);

try
{
SQLite::Statement query(*m_db, sizeQuery);
int count = 0;
if (query.executeStep())
{
count = query.getColumn(0).getInt();
}
else
{
LogError("Error SQLiteStorage get element count.");
}
return count;
}
catch (const std::exception& e)
{
LogError("Error during GetElementCount operation: {}.", e.what());
return 0;
}
}
4 changes: 4 additions & 0 deletions src/agent/multitype_queue/src/sqlitestorage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ class SQLiteStorage : public Persistence
*/
int GetElementCount(const std::string& tableName, const std::string& moduleName = "") override;


//TODO: doc
int GetElementsStoredSize(const std::string& tableName) override;

private:
/**
* @brief Initialize the table in the SQLite database.
Expand Down
18 changes: 18 additions & 0 deletions src/agent/multitype_queue/tests/sqlitestorage_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,24 @@ TEST_F(SQLiteStorageTest, GetElementCountWithModule)
EXPECT_EQ(storage->GetElementCount(tableName, "unavailableModuleName"), 0);
}

TEST_F(SQLiteStorageTest, MessagesSizes)
{
auto messages = nlohmann::json::array();
messages.push_back({{"key", "value1"}});
messages.push_back({{"key", "value2"}});
auto val = storage->Store(messages, tableName);
EXPECT_EQ(val, 2);

auto retrievedMessages = storage->GetElementsStoredSize(tableName);
EXPECT_EQ(retrievedMessages, 32);

val = storage->Store(messages, tableName);
EXPECT_EQ(val, 2);

retrievedMessages = storage->GetElementsStoredSize(tableName);
EXPECT_EQ(retrievedMessages, 64);
}

class SQLiteStorageMultithreadedTest : public ::testing::Test
{
protected:
Expand Down

0 comments on commit 88f1c95

Please sign in to comment.