Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New Agent's Queue Component #28

Merged
merged 31 commits into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
6ab8fca
feat: initial commit of queue component
LucioDonda Jul 4, 2024
b710c15
feat: code style, test and additional methods
LucioDonda Jul 5, 2024
7f4b7ac
feat: add corrections on maps and helper functions plus tests WIP
LucioDonda Jul 10, 2024
106f528
feat: data persistence stage for the queue module
cborla Jul 12, 2024
330bb2e
fix: compiling with tests
LucioDonda Jul 12, 2024
1dc6f54
perf: enabling polymorphism in persistence
LucioDonda Jul 12, 2024
646dc6d
test: additional test cases and behavior correction
LucioDonda Jul 12, 2024
af616b9
fix: all test passing
LucioDonda Jul 15, 2024
b50a805
feat: add push and pop for multiple messages
LucioDonda Jul 17, 2024
8224c99
feat: rename STATEs enum values
LucioDonda Jul 17, 2024
778793c
feat: change sqlite library to SQLiteCpp, improvement in exceptions
cborla Jul 17, 2024
2a36513
fix: clean code from warning messages
cborla Jul 17, 2024
b7d28cf
feat: timeout on push and removing filestorage
LucioDonda Jul 19, 2024
88cdad2
feat: separing queue with header and building as library
LucioDonda Jul 20, 2024
3567cb6
fix: change to a correct use of clang-format
LucioDonda Jul 22, 2024
7ec08f9
feat: unify cmakes and vcpkg usage
LucioDonda Jul 22, 2024
545614f
feat: multithreading fis sqlitestorage level, tests and def on headers
LucioDonda Jul 29, 2024
1f71942
feat: three DB files approach
LucioDonda Jul 31, 2024
ca453f4
feat: single persistence approach
LucioDonda Aug 1, 2024
a4a5b2b
feat: addition of module field, pending tests
LucioDonda Aug 6, 2024
3b3b58b
feat: add module field on the other methods
LucioDonda Aug 7, 2024
30f3e6b
feat: getting N message and tests for it
LucioDonda Aug 8, 2024
9a4cc9a
feat: returning rows modified, message vector, test and cleaning
LucioDonda Aug 8, 2024
117ce4f
style: queue functions renaming
LucioDonda Aug 9, 2024
1ce35f5
feat: push and get awaitable approach with base tests
LucioDonda Aug 11, 2024
ad08082
feat: awaitable improvement for N messages
LucioDonda Aug 12, 2024
2e93ebd
fix: correct fifo order with additional tests correction
LucioDonda Aug 12, 2024
05591d7
fix: cmakes changes, headers and memeber names
LucioDonda Aug 13, 2024
fff83e9
feat: directory and files renaming
LucioDonda Aug 13, 2024
ce05ec6
fix: pr corrections and cleaning
LucioDonda Aug 13, 2024
942a905
fix: completing method and parameters description
LucioDonda Aug 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/agent/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ set(SOURCES
add_subdirectory(agent_info)
add_subdirectory(communicator)
add_subdirectory(configuration_parser)
add_subdirectory(agent_queue)

find_package(OpenSSL REQUIRED)
find_package(Boost REQUIRED COMPONENTS asio beast)
Expand Down
26 changes: 26 additions & 0 deletions src/agent/agent_queue/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
cmake_minimum_required(VERSION 3.22)

project(AgentQueue LANGUAGES CXX)
set(CMAKE_CXX_STANDARD 20)
set(CMAKE_CXX_STANDARD_REQUIRED True)
set(CMAKE_BUILD_TYPE RelWithDebInfo)

set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g -Wall -Wextra -Wunused -pthread")

find_package(SQLiteCpp REQUIRED)
find_package(nlohmann_json REQUIRED)
find_package(fmt REQUIRED)
find_package(Boost REQUIRED COMPONENTS asio)

add_library(AgentQueue
src/sqlitestorage.cpp
src/agent_queue.cpp
)

target_include_directories(AgentQueue PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include PRIVATE ${SQLiteCpp_INCLUDE_DIRS})
target_link_libraries(AgentQueue PUBLIC nlohmann_json::nlohmann_json Boost::asio PRIVATE SQLiteCpp nlohmann_json::nlohmann_json fmt::fmt Boost::asio)

if(BUILD_TESTS)
enable_testing()
add_subdirectory(tests)
endif()
197 changes: 197 additions & 0 deletions src/agent/agent_queue/include/agent_queue.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
#ifndef QUEUE_H
#define QUEUE_H

#include <any>
#include <condition_variable>
#include <memory>
#include <mutex>
#include <queue>
#include <unordered_map>
#include <vector>

#include <boost/asio.hpp>
#include <cassert>

#include "shared.hpp"
#include "sqlitestorage.h"

// TODO: move to a configuration setting
constexpr int DEFAULT_MAX = 10000;
constexpr int DEFAULT_TIMEOUT_S = 3;

// Factory class
class PersistenceFactory
{
public:
static std::unique_ptr<Persistence> createPersistence(const std::string& type, const std::vector<std::any>& args)
{
if (type == "SQLite3")
{
if (args.size() != 2 || !std::any_cast<std::string>(&args[0]) ||
!std::any_cast<std::vector<std::string>>(&args[1]))
{
throw std::invalid_argument("SQLite3 requires db name and table names as arguments");
}
return std::make_unique<SQLiteStorage>(std::any_cast<std::string>(args[0]),
std::any_cast<std::vector<std::string>>(args[1]));
}
throw std::runtime_error("Unknown persistence type");
}
};

/**
* @brief
*
*/
class MultiTypeQueue
{
private:
const std::vector<std::string> m_vMessageTypeStrings {"STATELESS", "STATEFUL", "COMMAND"};
const std::map<MessageType, std::string> m_mapMessageTypeName {
{MessageType::STATELESS, "STATELESS"},
{MessageType::STATEFUL, "STATEFUL"},
{MessageType::COMMAND, "COMMAND"},
};
const int m_maxItems;
const std::chrono::seconds m_timeout;
std::unique_ptr<Persistence> m_persistenceDest;
std::mutex m_mtx;
std::condition_variable m_cv;

public:
// Create a vector with 3 PersistedQueue elements
MultiTypeQueue(int size = DEFAULT_MAX, int timeout = DEFAULT_TIMEOUT_S)
: m_maxItems(size)
, m_timeout(timeout)
{
try
{
m_persistenceDest = PersistenceFactory::createPersistence(
"SQLite3", {static_cast<std::string>(DEFAULT_DB_PATH), m_vMessageTypeStrings});
}
catch (const std::exception& e)
{
std::cerr << "Error creating persistence: " << e.what() << '\n';
}
}

// Delete copy constructor
MultiTypeQueue(const MultiTypeQueue&) = delete;

// Delete copy assignment operator
MultiTypeQueue& operator=(const MultiTypeQueue&) = delete;

// Delete move constructor
MultiTypeQueue(MultiTypeQueue&&) = delete;

// Delete move assignment operator
MultiTypeQueue& operator=(MultiTypeQueue&&) = delete;

~MultiTypeQueue() {};

/**
* @brief pushes a message
*
* @param message to be pushed
* @param shouldWait when true, the function will wait until the message is pushed
* @return int number of messages pushed
*/
int push(Message message, bool shouldWait = false);

/**
* @brief pushes a message
*
* @param message to be pushed
* @return boost::asio::awaitable<int> number of messages pushed
*/
boost::asio::awaitable<int> pushAwaitable(Message message);

/**
* @brief pushes a vector of messages
*
* @param messages to be pushed
* @param shouldWait when true, the function will wait until the message is pushed
* @return int number of messages pushed
*/
int push(std::vector<Message> messages);

/**
* @brief Get the Last Message object
*
* @param type of the queue to be used as source
* @return Message type object taken from the queue
*/
Message getNext(MessageType type, const std::string module = "");

/**
* @brief Get the Next Awaitable object
*
* @param type of the queue to be used as source
* @param module module name
* @param messageQuantity quantity of messages to return
* @return boost::asio::awaitable<Message>
*/
boost::asio::awaitable<Message>
getNextNAwaitable(MessageType type, int messageQuantity, const std::string moduleName = "");
/**
* @brief Returns N messages from a queue
*
* @param type Of the queue to be used as source
* @param moduleName
* @param messageQuantity quantity of messages to return
* @return Message Json data othe messages fetched
*/
std::vector<Message> getNextN(MessageType type, int messageQuantity, const std::string moduleName = "");

/**
* @brief deletes a message from a queue
*
* @param type MessageType queue to pop
* @param moduleName
* @return true popped succesfully
* @return false wasn't able to pop message
*/
bool pop(MessageType type, const std::string moduleName = "");

/**
* @brief
*
* @param type
* @param moduleName
* @param messageQuantity
* @return Number of messages deleted
*/
int popN(MessageType type, int messageQuantity, const std::string moduleName = "");

/**
* @brief Checks emptyness of a queue
*
* @param type
* @param moduleName
* @return true when queue empty
* @return false otherwise
*/
bool isEmpty(MessageType type, const std::string moduleName = "");

/**
* @brief Checks fullness of a queue
*
* @param type
* @param moduleName
* @return true when queue is full
* @return false otherwise
*/
bool isFull(MessageType type, const std::string moduleName = "");

/**
* @brief Get the Items By Type object
*
* @param type
* @param moduleName
* @return true
* @return false
*/
int storedItems(MessageType type, const std::string moduleName = "");
};

#endif // QUEUE_H
83 changes: 83 additions & 0 deletions src/agent/agent_queue/include/persistence.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
#ifndef PERSISTENCE_H
#define PERSISTENCE_H

#include <nlohmann/json.hpp>
#include <string>
#include <vector>

using json = nlohmann::json;

/**
* @brief Interface for persistence storage.
*
* This interface defines methods for storing, retrieving, and removing JSON messages.
*/
class Persistence
{
public:
/**
* @brief Virtual destructor.
*/
virtual ~Persistence() = default;

/**
* @brief
*
* @param message
* @param queueName
* @param moduleName
* @return int
*/
virtual int Store(const json& message, const std::string& queueName, const std::string& moduleName = "") = 0;

/**
* @brief
*
* @param id
* @param queueName
* @param moduleName
* @return json
*/
virtual json Retrieve(int id, const std::string& queueName, const std::string& moduleName = "") = 0;

/**
* @brief
*
* @param n
* @param queueName
* @param moduleName
* @return json
*/
virtual json RetrieveMultiple(int n, const std::string& queueName, const std::string& moduleName = "") = 0;

/**
* @brief
*
* @param id
* @param queueName
* @param moduleName
* @return int
*/
virtual int Remove(int id, const std::string& queueName, const std::string& moduleName = "") = 0;

/**
* @brief
*
* @param n
* @param queueName
* @param moduleName
* @return int
*/
virtual int RemoveMultiple(int n, const std::string& queueName, const std::string& moduleName = "") = 0;

/**
* @brief Get the Element Count object
*
* @param queueName
* @param moduleName
* @return int
*/
virtual int GetElementCount(const std::string& queueName, const std::string& moduleName = "") = 0;
};

#endif // PERSISTENCE_H
41 changes: 41 additions & 0 deletions src/agent/agent_queue/include/shared.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#ifndef SHARED_H
#define SHARED_H

#include <map>

#include <nlohmann/json.hpp>

// TODO: should be moved to Config
constexpr char DEFAULT_FILE_PATH[] = "/home/vagrant/FILE_";
constexpr char DEFAULT_DB_PATH[] = "queue.db";

/**
* @brief Types of messages enum
*
*/
enum MessageType
{
STATELESS,
STATEFUL,
COMMAND
};

/**
* @brief Wrapper for Message data and type
*
*/
class Message
{
public:
MessageType type;
nlohmann::json data;
std::string moduleName;
Message(MessageType t, nlohmann::json d, std::string mN = "")
: type(t)
, data(d)
, moduleName(mN)
{
}
};

#endif // SHARED_H
Loading
Loading