Skip to content

Commit

Permalink
feat: use MultiTypeQueue for messages
Browse files Browse the repository at this point in the history
  • Loading branch information
jr0me committed Aug 16, 2024
1 parent 0049084 commit e209265
Show file tree
Hide file tree
Showing 12 changed files with 217 additions and 26 deletions.
3 changes: 2 additions & 1 deletion src/agent/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ project(Agent)

set(SOURCES
src/agent.cpp
src/task_manager.cpp
src/message_queue_utils.cpp
src/register.cpp
src/signal_handler.cpp
src/task_manager.cpp
)

add_subdirectory(agent_info)
Expand Down
10 changes: 7 additions & 3 deletions src/agent/communicator/include/communicator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,13 @@ namespace communicator
const std::function<std::string(std::string, std::string)> GetStringConfigValue);

boost::asio::awaitable<void> WaitForTokenExpirationAndAuthenticate();
boost::asio::awaitable<void> GetCommandsFromManager(std::queue<std::string>& messageQueue);
boost::asio::awaitable<void> StatefulMessageProcessingTask(std::queue<std::string>& messageQueue);
boost::asio::awaitable<void> StatelessMessageProcessingTask(std::queue<std::string>& messageQueue);
boost::asio::awaitable<void> GetCommandsFromManager(std::function<void(const std::string&)> onSuccess);
boost::asio::awaitable<void>
StatefulMessageProcessingTask(std::function<boost::asio::awaitable<std::string>()> getMessages,
std::function<void(const std::string&)> onSuccess);
boost::asio::awaitable<void>
StatelessMessageProcessingTask(std::function<boost::asio::awaitable<std::string>()> getMessages,
std::function<void(const std::string&)> onSuccess);

private:
long GetTokenRemainingSecs() const;
Expand Down
11 changes: 6 additions & 5 deletions src/agent/communicator/include/http_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,12 @@ namespace http_client
std::function<void()> onUnauthorized,
std::function<void(const std::string&)> onSuccess = {});

boost::asio::awaitable<void> Co_MessageProcessingTask(const std::string& token,
HttpRequestParams params,
std::function<std::string()> messageGetter,
std::function<void()> onUnauthorized,
std::function<void(const std::string&)> onSuccess = {});
boost::asio::awaitable<void>
Co_MessageProcessingTask(const std::string& token,
HttpRequestParams params,
std::function<boost::asio::awaitable<std::string>()> messageGetter,
std::function<void()> onUnauthorized,
std::function<void(const std::string&)> onSuccess = {});

boost::beast::http::response<boost::beast::http::dynamic_body> PerformHttpRequest(const HttpRequestParams& params);

Expand Down
18 changes: 12 additions & 6 deletions src/agent/communicator/src/communicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,15 @@ namespace communicator
return std::max(0L, static_cast<long>(m_tokenExpTimeInSeconds - now_seconds));
}

boost::asio::awaitable<void> Communicator::GetCommandsFromManager(std::queue<std::string>& messageQueue)
boost::asio::awaitable<void> Communicator::GetCommandsFromManager(std::function<void(const std::string&)> onSuccess)
{
auto onAuthenticationFailed = [this]()
{
TryReAuthenticate();
};
const auto reqParams =
http_client::HttpRequestParams(boost::beast::http::verb::get, m_managerIp, m_port, "/commands");
co_await http_client::Co_MessageProcessingTask(m_token, reqParams, {}, onAuthenticationFailed);
co_await http_client::Co_MessageProcessingTask(m_token, reqParams, {}, onAuthenticationFailed, onSuccess);
}

boost::asio::awaitable<void> Communicator::WaitForTokenExpirationAndAuthenticate()
Expand Down Expand Up @@ -119,26 +119,32 @@ namespace communicator
}
}

boost::asio::awaitable<void> Communicator::StatefulMessageProcessingTask(std::queue<std::string>& messageQueue)
boost::asio::awaitable<void>
Communicator::StatefulMessageProcessingTask(std::function<boost::asio::awaitable<std::string>()> getMessages,
std::function<void(const std::string&)> onSuccess)
{
auto onAuthenticationFailed = [this]()
{
TryReAuthenticate();
};
const auto reqParams =
http_client::HttpRequestParams(boost::beast::http::verb::post, m_managerIp, m_port, "/stateful");
co_await http_client::Co_MessageProcessingTask(m_token, reqParams, {}, onAuthenticationFailed);
co_await http_client::Co_MessageProcessingTask(
m_token, reqParams, getMessages, onAuthenticationFailed, onSuccess);
}

boost::asio::awaitable<void> Communicator::StatelessMessageProcessingTask(std::queue<std::string>& messageQueue)
boost::asio::awaitable<void>
Communicator::StatelessMessageProcessingTask(std::function<boost::asio::awaitable<std::string>()> getMessages,
std::function<void(const std::string&)> onSuccess)
{
auto onAuthenticationFailed = [this]()
{
TryReAuthenticate();
};
const auto reqParams =
http_client::HttpRequestParams(boost::beast::http::verb::post, m_managerIp, m_port, "/stateless");
co_await http_client::Co_MessageProcessingTask(m_token, reqParams, {}, onAuthenticationFailed);
co_await http_client::Co_MessageProcessingTask(
m_token, reqParams, getMessages, onAuthenticationFailed, onSuccess);
}

void Communicator::TryReAuthenticate()
Expand Down
13 changes: 7 additions & 6 deletions src/agent/communicator/src/http_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,12 @@ namespace http_client
std::cout << "Response body: " << boost::beast::buffers_to_string(res.body().data()) << std::endl;
}

boost::asio::awaitable<void> Co_MessageProcessingTask(const std::string& token,
HttpRequestParams reqParams,
std::function<std::string()> messageGetter,
std::function<void()> onUnauthorized,
std::function<void(const std::string&)> onSuccess)
boost::asio::awaitable<void>
Co_MessageProcessingTask(const std::string& token,
HttpRequestParams reqParams,
std::function<boost::asio::awaitable<std::string>()> messageGetter,
std::function<void()> onUnauthorized,
std::function<void(const std::string&)> onSuccess)
{
using namespace std::chrono_literals;

Expand Down Expand Up @@ -131,7 +132,7 @@ namespace http_client

if (messageGetter != nullptr)
{
reqParams.body = messageGetter();
reqParams.body = co_await messageGetter();
}
else
{
Expand Down
4 changes: 2 additions & 2 deletions src/agent/include/agent.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
#include <agent_info.hpp>
#include <communicator.hpp>
#include <configuration_parser.hpp>
#include <multitype_queue.hpp>
#include <signal_handler.hpp>
#include <task_manager.hpp>

#include <queue>
#include <string>

class Agent
Expand All @@ -18,7 +18,7 @@ class Agent
void Run();

private:
std::queue<std::string> m_messageQueue;
MultiTypeQueue m_messageQueue;

SignalHandler m_signalHandler;
TaskManager m_taskManager;
Expand Down
15 changes: 15 additions & 0 deletions src/agent/include/message_queue_utils.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#pragma once

#include <message.hpp>

#include <boost/asio/awaitable.hpp>

#include <string>

class IMultiTypeQueue;

boost::asio::awaitable<std::string> getMessagesFromQueue(IMultiTypeQueue& multiTypeQueue, MessageType messageType);

void popMessagesFromQueue(IMultiTypeQueue& multiTypeQueue, MessageType messageType);

void pushCommandsToQueue(IMultiTypeQueue& multiTypeQueue, const std::string& commands);
6 changes: 6 additions & 0 deletions src/agent/multitype_queue/include/message.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,10 @@ class Message
, moduleName(mN)
{
}

// Define equality operator
bool operator==(const Message& other) const
{
return type == other.type && data == other.data;
}
};
19 changes: 16 additions & 3 deletions src/agent/src/agent.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
#include <agent.hpp>

#include <message.hpp>
#include <message_queue_utils.hpp>

#include <string>
#include <thread>
#include <vector>

Agent::Agent()
: m_communicator(m_agentInfo.GetUUID(),
Expand All @@ -18,9 +23,17 @@ Agent::~Agent()
void Agent::Run()
{
m_taskManager.EnqueueTask(m_communicator.WaitForTokenExpirationAndAuthenticate());
m_taskManager.EnqueueTask(m_communicator.GetCommandsFromManager(m_messageQueue));
m_taskManager.EnqueueTask(m_communicator.StatefulMessageProcessingTask(m_messageQueue));
m_taskManager.EnqueueTask(m_communicator.StatelessMessageProcessingTask(m_messageQueue));

m_taskManager.EnqueueTask(m_communicator.GetCommandsFromManager(
[this](const std::string& response) { pushCommandsToQueue(m_messageQueue, response); }));

m_taskManager.EnqueueTask(m_communicator.StatefulMessageProcessingTask(
[this]() { return getMessagesFromQueue(m_messageQueue, STATEFUL); },
[this]([[maybe_unused]] const std::string& response) { popMessagesFromQueue(m_messageQueue, STATEFUL); }));

m_taskManager.EnqueueTask(m_communicator.StatelessMessageProcessingTask(
[this]() { return getMessagesFromQueue(m_messageQueue, STATELESS); },
[this]([[maybe_unused]] const std::string& response) { popMessagesFromQueue(m_messageQueue, STATELESS); }));

m_signalHandler.WaitForSignal();
}
49 changes: 49 additions & 0 deletions src/agent/src/message_queue_utils.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#include <message_queue_utils.hpp>

#include <imultitype_queue.hpp>

#include <nlohmann/json.hpp>

#include <vector>

namespace
{
// This should eventually be replaced with a configuration parameter.
constexpr int NUM_EVENTS = 1;
} // namespace

boost::asio::awaitable<std::string> getMessagesFromQueue(IMultiTypeQueue& multiTypeQueue, MessageType messageType)
{
const auto message = co_await multiTypeQueue.getNextNAwaitable(messageType, NUM_EVENTS);

nlohmann::json jsonObj;
jsonObj["events"] = nlohmann::json::array();
jsonObj["events"].push_back(message.data);

co_return jsonObj.dump();
}

void popMessagesFromQueue(IMultiTypeQueue& multiTypeQueue, MessageType messageType)
{
multiTypeQueue.popN(messageType, NUM_EVENTS);
}

void pushCommandsToQueue(IMultiTypeQueue& multiTypeQueue, const std::string& commands)
{
const auto jsonObj = nlohmann::json::parse(commands);

if (jsonObj.contains("commands") && jsonObj["commands"].is_array())
{
std::vector<Message> messages;

for (const auto& command : jsonObj["commands"])
{
messages.emplace_back(MessageType::COMMAND, command);
}

if (!messages.empty())
{
multiTypeQueue.push(messages);
}
}
}
4 changes: 4 additions & 0 deletions src/agent/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,7 @@ add_test(NAME RegisterTest COMMAND register_test)
add_executable(signal_handler_test signal_handler_test.cpp)
target_link_libraries(signal_handler_test PRIVATE Agent GTest::gtest)
add_test(NAME SignalHandlerTest COMMAND signal_handler_test)

add_executable(message_queue_utils_test message_queue_utils_test.cpp)
target_link_libraries(message_queue_utils_test PRIVATE Agent MultiTypeQueue GTest::gtest GTest::gmock)
add_test(NAME MessageQueueUtilsTest COMMAND message_queue_utils_test)
91 changes: 91 additions & 0 deletions src/agent/tests/message_queue_utils_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
#include <message_queue_utils.hpp>

#include <message.hpp>
#include <multitype_queue.hpp>

#include <boost/asio.hpp>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <nlohmann/json.hpp>

class MockMultiTypeQueue : public MultiTypeQueue
{
public:
MOCK_METHOD(boost::asio::awaitable<Message>,
getNextNAwaitable,
(MessageType, int, const std::string module),
(override));
MOCK_METHOD(int, popN, (MessageType, int, const std::string module), (override));
MOCK_METHOD(int, push, (std::vector<Message>), (override));
};

class MessageQueueUtilsTest : public ::testing::Test
{
protected:
boost::asio::io_context io_context;
MockMultiTypeQueue mockQueue;
};

TEST_F(MessageQueueUtilsTest, GetMessagesFromQueueTest)
{
Message testMessage {MessageType::STATEFUL, "test_data"};

EXPECT_CALL(mockQueue, getNextNAwaitable(MessageType::STATEFUL, 1, ""))
.WillOnce([this, &testMessage]() -> boost::asio::awaitable<Message> { co_return testMessage; });

io_context.restart();

auto result = boost::asio::co_spawn(
io_context, getMessagesFromQueue(mockQueue, MessageType::STATEFUL), boost::asio::use_future);

const auto timeout = std::chrono::steady_clock::now() + std::chrono::milliseconds(1);
io_context.run_until(timeout);

ASSERT_TRUE(result.wait_for(std::chrono::milliseconds(1)) == std::future_status::ready);

const auto jsonResult = result.get();

nlohmann::json expectedJson;
expectedJson["events"] = nlohmann::json::array();
expectedJson["events"].push_back("test_data");

ASSERT_EQ(jsonResult, expectedJson.dump());
}

TEST_F(MessageQueueUtilsTest, PopMessagesFromQueueTest)
{
EXPECT_CALL(mockQueue, popN(MessageType::STATEFUL, 1, "")).Times(1);
popMessagesFromQueue(mockQueue, MessageType::STATEFUL);
}

TEST_F(MessageQueueUtilsTest, PushCommandsToQueueTest)
{
nlohmann::json commandsJson;
commandsJson["commands"] = nlohmann::json::array();
commandsJson["commands"].push_back("command_1");
commandsJson["commands"].push_back("command_2");

std::vector<Message> expectedMessages;
expectedMessages.emplace_back(MessageType::COMMAND, "command_1");
expectedMessages.emplace_back(MessageType::COMMAND, "command_2");

EXPECT_CALL(mockQueue, push(::testing::ContainerEq(expectedMessages))).Times(1);

pushCommandsToQueue(mockQueue, commandsJson.dump());
}

TEST_F(MessageQueueUtilsTest, NoCommandsToPushTest)
{
nlohmann::json commandsJson;
commandsJson["commands"] = nlohmann::json::array();

EXPECT_CALL(mockQueue, push(::testing::_)).Times(0);

pushCommandsToQueue(mockQueue, commandsJson.dump());
}

int main(int argc, char** argv)
{
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

0 comments on commit e209265

Please sign in to comment.