Skip to content

Commit

Permalink
Merge pull request #28 from wazuh/16-implement-queue-component
Browse files Browse the repository at this point in the history
New Agent's Queue Component
  • Loading branch information
TomasTurina authored Aug 13, 2024
2 parents 5fa3e5b + 942a905 commit a65a0e8
Show file tree
Hide file tree
Showing 14 changed files with 2,024 additions and 0 deletions.
1 change: 1 addition & 0 deletions src/agent/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ set(SOURCES
add_subdirectory(agent_info)
add_subdirectory(communicator)
add_subdirectory(configuration_parser)
add_subdirectory(agent_queue)

find_package(OpenSSL REQUIRED)
find_package(Boost REQUIRED COMPONENTS asio beast)
Expand Down
26 changes: 26 additions & 0 deletions src/agent/agent_queue/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
cmake_minimum_required(VERSION 3.22)

project(AgentQueue LANGUAGES CXX)
set(CMAKE_CXX_STANDARD 20)
set(CMAKE_CXX_STANDARD_REQUIRED True)
set(CMAKE_BUILD_TYPE RelWithDebInfo)

set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g -Wall -Wextra -Wunused -pthread")

find_package(SQLiteCpp REQUIRED)
find_package(nlohmann_json REQUIRED)
find_package(fmt REQUIRED)
find_package(Boost REQUIRED COMPONENTS asio)

add_library(AgentQueue
src/sqlitestorage.cpp
src/agent_queue.cpp
)

target_include_directories(AgentQueue PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include PRIVATE ${SQLiteCpp_INCLUDE_DIRS})
target_link_libraries(AgentQueue PUBLIC nlohmann_json::nlohmann_json Boost::asio PRIVATE SQLiteCpp nlohmann_json::nlohmann_json fmt::fmt Boost::asio)

if(BUILD_TESTS)
enable_testing()
add_subdirectory(tests)
endif()
203 changes: 203 additions & 0 deletions src/agent/agent_queue/include/agent_queue.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
#ifndef QUEUE_H
#define QUEUE_H

#include <any>
#include <condition_variable>
#include <memory>
#include <mutex>
#include <queue>
#include <unordered_map>
#include <vector>

#include <boost/asio.hpp>
#include <cassert>

#include "shared.hpp"
#include "sqlitestorage.h"

// TODO: move to a configuration setting
constexpr int DEFAULT_MAX = 10000;
constexpr int DEFAULT_TIMEOUT_S = 3;

// Factory class
class PersistenceFactory
{
public:
static std::unique_ptr<Persistence> createPersistence(const std::string& type, const std::vector<std::any>& args)
{
if (type == "SQLite3")
{
if (args.size() != 2 || !std::any_cast<std::string>(&args[0]) ||
!std::any_cast<std::vector<std::string>>(&args[1]))
{
throw std::invalid_argument("SQLite3 requires db name and table names as arguments");
}
return std::make_unique<SQLiteStorage>(std::any_cast<std::string>(args[0]),
std::any_cast<std::vector<std::string>>(args[1]));
}
throw std::runtime_error("Unknown persistence type");
}
};

class MultiTypeQueue
{
private:
const std::vector<std::string> m_vMessageTypeStrings {"STATELESS", "STATEFUL", "COMMAND"};
const std::map<MessageType, std::string> m_mapMessageTypeName {
{MessageType::STATELESS, "STATELESS"},
{MessageType::STATEFUL, "STATEFUL"},
{MessageType::COMMAND, "COMMAND"},
};
const int m_maxItems;
const std::chrono::seconds m_timeout;
std::unique_ptr<Persistence> m_persistenceDest;
std::mutex m_mtx;
std::condition_variable m_cv;

public:
MultiTypeQueue(int size = DEFAULT_MAX, int timeout = DEFAULT_TIMEOUT_S)
: m_maxItems(size)
, m_timeout(timeout)
{
try
{
m_persistenceDest = PersistenceFactory::createPersistence(
"SQLite3", {static_cast<std::string>(DEFAULT_DB_PATH), m_vMessageTypeStrings});
}
catch (const std::exception& e)
{
std::cerr << "Error creating persistence: " << e.what() << '\n';
}
}

/**
* @brief Delete copy constructor
*/
MultiTypeQueue(const MultiTypeQueue&) = delete;

/**
* @brief Delete copy assignment operator
*/
MultiTypeQueue& operator=(const MultiTypeQueue&) = delete;

/**
* @brief Delete move constructor
*/
MultiTypeQueue(MultiTypeQueue&&) = delete;

/**
* @brief Delete move assignment operator
*/
MultiTypeQueue& operator=(MultiTypeQueue&&) = delete;

/**
* @brief Destructor.
*/
~MultiTypeQueue() {};

/**
* @brief pushes a message
*
* @param message to be pushed
* @param shouldWait when true, the function will wait until the message is pushed
* @return int number of messages pushed
*/
int push(Message message, bool shouldWait = false);

/**
* @brief pushes a message
*
* @param message to be pushed
* @return boost::asio::awaitable<int> number of messages pushed
*/
boost::asio::awaitable<int> pushAwaitable(Message message);

/**
* @brief pushes a vector of messages
*
* @param messages vector of messages to be pushed
* @return int number of messages pushed
*/
int push(std::vector<Message> messages);

/**
* @brief Get the next Message object
*
* @param type of the queue to be used as source
* @param module module name
* @return Message type object taken from the queue
*/
Message getNext(MessageType type, const std::string module = "");

/**
* @brief Get the Next Awaitable object
*
* @param type of the queue to be used as source
* @param moduleName module name
* @param messageQuantity quantity of messages to return
* @return boost::asio::awaitable<Message> awaitable object taken from the queue
*/
boost::asio::awaitable<Message>
getNextNAwaitable(MessageType type, int messageQuantity, const std::string moduleName = "");

/**
* @brief Returns N messages from a queue
*
* @param type Of the queue to be used as source
* @param moduleName module name
* @param messageQuantity quantity of messages to return
* @return Message Json data othe messages fetched
*/
std::vector<Message> getNextN(MessageType type, int messageQuantity, const std::string moduleName = "");

/**
* @brief deletes a message from a queue
*
* @param type MessageType queue to pop
* @param moduleName
* @return true when popped succesfully
* @return false if it wasn't able to pop message
*/
bool pop(MessageType type, const std::string moduleName = "");

/**
* @brief deletes N messages from a queue
*
* @param type MessageType queue to pop
* @param moduleName module name
* @param messageQuantity quantity of messages to pop
* @return Number of messages deleted
*/
int popN(MessageType type, int messageQuantity, const std::string moduleName = "");

/**
* @brief Checks emptyness of a queue
*
* @param type MessageType
* @param moduleName module name
* @return true when queue empty
* @return false otherwise
*/
bool isEmpty(MessageType type, const std::string moduleName = "");

/**
* @brief Checks fullness of a queue
*
* @param type MessageType
* @param moduleName module name
* @return true when queue is full
* @return false otherwise
*/
bool isFull(MessageType type, const std::string moduleName = "");

/**
* @brief Get the Items By Type object
*
* @param type MessageType
* @param moduleName module name
* @return int number of items in the queue.
*/
int storedItems(MessageType type, const std::string moduleName = "");
};

#endif // QUEUE_H
83 changes: 83 additions & 0 deletions src/agent/agent_queue/include/persistence.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
#ifndef PERSISTENCE_H
#define PERSISTENCE_H

#include <nlohmann/json.hpp>
#include <string>
#include <vector>

using json = nlohmann::json;

/**
* @brief Interface for persistence storage.
*
* This interface defines methods for storing, retrieving, and removing JSON messages.
*/
class Persistence
{
public:
/**
* @brief Virtual destructor.
*/
virtual ~Persistence() = default;

/**
* @brief Store a JSON message in the specified queue.
*
* @param message The JSON message to be stored.
* @param queueName The name of the queue.
* @param moduleName The name of the module.
* @return int The number of messages stored.
*/
virtual int Store(const json& message, const std::string& queueName, const std::string& moduleName = "") = 0;

/**
* @brief Retrieve a JSON message from the specified queue.
*
* @param id rowid of the message to be retrieved.
* @param queueName The name of the queue.
* @param moduleName The name of the module.
* @return json The retrieved JSON message.
*/
virtual json Retrieve(int id, const std::string& queueName, const std::string& moduleName = "") = 0;

/**
* @brief Retrieve multiple JSON messages from the specified queue.
*
* @param n number of messages to be retrieved.
* @param queueName The name of the queue.
* @param moduleName The name of the module.
* @return json The retrieved JSON messages.
*/
virtual json RetrieveMultiple(int n, const std::string& queueName, const std::string& moduleName = "") = 0;

/**
* @brief Remove a JSON message from the specified queue.
*
* @param id number of messages to be removed.
* @param queueName The name of the queue.
* @param moduleName The name of the module.
* @return int The number of messages removed.
*/
virtual int Remove(int id, const std::string& queueName, const std::string& moduleName = "") = 0;

/**
* @brief Remove multiple JSON messages from the specified queue.
*
* @param n number of messages to be removed.
* @param queueName The name of the queue.
* @param moduleName The name of the module.
* @return int The number of messages removed.
*/
virtual int RemoveMultiple(int n, const std::string& queueName, const std::string& moduleName = "") = 0;

/**
* @brief Get the quantity of elements stored in the specified queue.
*
* @param queueName The name of the queue.
* @param moduleName The name of the module.
* @return int The quantity of elements stored in the specified queue.
*/
virtual int GetElementCount(const std::string& queueName, const std::string& moduleName = "") = 0;
};

#endif // PERSISTENCE_H
41 changes: 41 additions & 0 deletions src/agent/agent_queue/include/shared.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#ifndef SHARED_H
#define SHARED_H

#include <map>

#include <nlohmann/json.hpp>

// TODO: should be moved to Config
constexpr char DEFAULT_FILE_PATH[] = "/home/vagrant/FILE_";
constexpr char DEFAULT_DB_PATH[] = "queue.db";

/**
* @brief Types of messages enum
*
*/
enum MessageType
{
STATELESS,
STATEFUL,
COMMAND
};

/**
* @brief Wrapper for Message, contains the message type, the json data and the module name.
*
*/
class Message
{
public:
MessageType type;
nlohmann::json data;
std::string moduleName;
Message(MessageType t, nlohmann::json d, std::string mN = "")
: type(t)
, data(d)
, moduleName(mN)
{
}
};

#endif // SHARED_H
Loading

0 comments on commit a65a0e8

Please sign in to comment.