Skip to content

Commit

Permalink
temp
Browse files Browse the repository at this point in the history
notes:

Queue header has the same name as others, namely queue.hpp from boost this
causes intellisense to fail to recognize symbols, bigger problems may arise.

Some necessary changes to cmakes were made to allow compilation, aside
from the queue header name, the cmake wasnt generating, some linking
must be made public.

Currently experimenting passing a lambda to the communicator with the
strategy on how to get a message to send. This way the communciator
can be kept decoupled from the queue implementation.
  • Loading branch information
jr0me committed Aug 12, 2024
1 parent 1ce35f5 commit 8c6d349
Show file tree
Hide file tree
Showing 10 changed files with 66 additions and 30 deletions.
4 changes: 2 additions & 2 deletions src/agent/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,16 @@ set(SOURCES
add_subdirectory(agent_info)
add_subdirectory(communicator)
add_subdirectory(configuration_parser)
add_subdirectory(queue)

find_package(OpenSSL REQUIRED)
find_package(Boost REQUIRED COMPONENTS asio beast)

add_library(Agent ${SOURCES})
target_include_directories(Agent PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include)
target_link_libraries(Agent PUBLIC ConfigurationParser Communicator AgentInfo PRIVATE OpenSSL::SSL OpenSSL::Crypto Boost::asio Boost::beast)
target_link_libraries(Agent PUBLIC ConfigurationParser Communicator AgentInfo queue PRIVATE OpenSSL::SSL OpenSSL::Crypto Boost::asio Boost::beast)

if(BUILD_TESTS)
enable_testing()
add_subdirectory(tests)
add_subdirectory(queue)
endif()
9 changes: 6 additions & 3 deletions src/agent/communicator/include/communicator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@ namespace communicator
const std::function<std::string(std::string, std::string)> GetStringConfigValue);

boost::asio::awaitable<void> WaitForTokenExpirationAndAuthenticate();
boost::asio::awaitable<void> GetCommandsFromManager(std::queue<std::string>& messageQueue);
boost::asio::awaitable<void> StatefulMessageProcessingTask(std::queue<std::string>& messageQueue);
boost::asio::awaitable<void> StatelessMessageProcessingTask(std::queue<std::string>& messageQueue);
boost::asio::awaitable<void> GetCommandsFromManager(std::function<std::string()> getMessages,
std::function<void(const std::string&)> onSuccess);
boost::asio::awaitable<void> StatefulMessageProcessingTask(std::function<std::string()> getMessages,
std::function<void(const std::string&)> onSuccess);
boost::asio::awaitable<void> StatelessMessageProcessingTask(std::function<std::string()> getMessages,
std::function<void(const std::string&)> onSuccess);

private:
long GetTokenRemainingSecs() const;
Expand Down
20 changes: 14 additions & 6 deletions src/agent/communicator/src/communicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,17 @@ namespace communicator
return std::max(0L, static_cast<long>(m_tokenExpTimeInSeconds - now_seconds));
}

boost::asio::awaitable<void> Communicator::GetCommandsFromManager(std::queue<std::string>& messageQueue)
boost::asio::awaitable<void> Communicator::GetCommandsFromManager(std::function<std::string()> getMessages,
std::function<void(const std::string&)> onSuccess)
{
auto onAuthenticationFailed = [this]()
{
TryReAuthenticate();
};
const auto reqParams =
http_client::HttpRequestParams(boost::beast::http::verb::get, m_managerIp, m_port, "/commands");
co_await http_client::Co_MessageProcessingTask(m_token, reqParams, {}, onAuthenticationFailed);
co_await http_client::Co_MessageProcessingTask(
m_token, reqParams, getMessages, onAuthenticationFailed, onSuccess);
}

boost::asio::awaitable<void> Communicator::WaitForTokenExpirationAndAuthenticate()
Expand Down Expand Up @@ -105,26 +107,32 @@ namespace communicator
}
}

boost::asio::awaitable<void> Communicator::StatefulMessageProcessingTask(std::queue<std::string>& messageQueue)
boost::asio::awaitable<void>
Communicator::StatefulMessageProcessingTask(std::function<std::string()> getMessages,
std::function<void(const std::string&)> onSuccess)
{
auto onAuthenticationFailed = [this]()
{
TryReAuthenticate();
};
const auto reqParams =
http_client::HttpRequestParams(boost::beast::http::verb::post, m_managerIp, m_port, "/stateful");
co_await http_client::Co_MessageProcessingTask(m_token, reqParams, {}, onAuthenticationFailed);
co_await http_client::Co_MessageProcessingTask(
m_token, reqParams, getMessages, onAuthenticationFailed, onSuccess);
}

boost::asio::awaitable<void> Communicator::StatelessMessageProcessingTask(std::queue<std::string>& messageQueue)
boost::asio::awaitable<void>
Communicator::StatelessMessageProcessingTask(std::function<std::string()> getMessages,
std::function<void(const std::string&)> onSuccess)
{
auto onAuthenticationFailed = [this]()
{
TryReAuthenticate();
};
const auto reqParams =
http_client::HttpRequestParams(boost::beast::http::verb::post, m_managerIp, m_port, "/stateless");
co_await http_client::Co_MessageProcessingTask(m_token, reqParams, {}, onAuthenticationFailed);
co_await http_client::Co_MessageProcessingTask(
m_token, reqParams, getMessages, onAuthenticationFailed, onSuccess);
}

void Communicator::TryReAuthenticate()
Expand Down
4 changes: 2 additions & 2 deletions src/agent/include/agent.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
#include <agent_info.hpp>
#include <communicator.hpp>
#include <configuration_parser.hpp>
#include <message_queue.hpp>
#include <signal_handler.hpp>
#include <task_manager.hpp>

#include <queue>
#include <string>

class Agent
Expand All @@ -18,7 +18,7 @@ class Agent
void Run();

private:
std::queue<std::string> m_messageQueue;
MultiTypeQueue m_messageQueue;

SignalHandler m_signalHandler;
TaskManager m_taskManager;
Expand Down
16 changes: 5 additions & 11 deletions src/agent/queue/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,15 @@ set(CMAKE_CXX_STANDARD_REQUIRED True)

set(CMAKE_CXX_FLAGS "-Wall -Wextra -Wunused -pthread")

include_directories(${CMAKE_SOURCE_DIR}/include)

find_package(SQLiteCpp REQUIRED)
find_package(nlohmann_json REQUIRED)
find_package(fmt REQUIRED)
find_package(Boost REQUIRED COMPONENTS asio beast)

add_library(queue
src/sqlitestorage.cpp
src/queue.cpp
)

target_include_directories(queue PRIVATE include ${SQLiteCpp_INCLUDE_DIRS})
add_library(queue src/sqlitestorage.cpp src/message_queue.cpp)
target_include_directories(queue PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include PRIVATE ${SQLiteCpp_INCLUDE_DIRS})
target_link_libraries(queue PRIVATE SQLiteCpp nlohmann_json::nlohmann_json fmt::fmt Boost::asio)

if(BUILD_TESTS)
add_subdirectory(tests)
endif()
# if(BUILD_TESTS)
# add_subdirectory(tests)
# endif()
File renamed without changes.
2 changes: 1 addition & 1 deletion src/agent/queue/src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#include <string>
#include <thread>

#include "queue.hpp"
#include "message_queue.hpp"

int main()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#include <utility>
#include <vector>

#include "queue.hpp"
#include "message_queue.hpp"

int MultiTypeQueue::push(Message message, bool shouldWait)
{
Expand Down
2 changes: 1 addition & 1 deletion src/agent/queue/tests/queue_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
#include <boost/asio.hpp>
#include <boost/asio/experimental/co_spawn.hpp>

#include "queue.hpp"
#include "message_queue.hpp"
#include "queue_test.hpp"

using json = nlohmann::json;
Expand Down
37 changes: 34 additions & 3 deletions src/agent/src/agent.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,35 @@
#include <agent.hpp>

#include <shared.hpp>

#include <string>
#include <thread>

namespace
{
template<MessageType messageType>
auto getMessagesFromQueue(MultiTypeQueue& multiTypeQueue)
{
return [&multiTypeQueue]() -> std::string
{
std::cout << "Getting messages from queue\n";
const auto message = multiTypeQueue.getLastMessage(messageType);
return message.data.dump();
};
}

template<MessageType messageType>
auto popMessagesFromQueue(MultiTypeQueue& multiTypeQueue)
{
return [&multiTypeQueue](const std::string& response) -> void
{
std::cout << "Response: " << response << '\n';
std::cout << "Popping messages from queue\n";
multiTypeQueue.popLastMessage(messageType);
};
}
} // namespace

Agent::Agent()
: m_communicator(m_agentInfo.GetUUID(),
[this](std::string table, std::string key) -> std::string
Expand All @@ -18,9 +46,12 @@ Agent::~Agent()
void Agent::Run()
{
m_taskManager.EnqueueTask(m_communicator.WaitForTokenExpirationAndAuthenticate());
m_taskManager.EnqueueTask(m_communicator.GetCommandsFromManager(m_messageQueue));
m_taskManager.EnqueueTask(m_communicator.StatefulMessageProcessingTask(m_messageQueue));
m_taskManager.EnqueueTask(m_communicator.StatelessMessageProcessingTask(m_messageQueue));
m_taskManager.EnqueueTask(m_communicator.GetCommandsFromManager(getMessagesFromQueue<COMMAND>(m_messageQueue),
popMessagesFromQueue<COMMAND>(m_messageQueue)));
m_taskManager.EnqueueTask(m_communicator.StatefulMessageProcessingTask(
getMessagesFromQueue<STATEFUL>(m_messageQueue), popMessagesFromQueue<STATEFUL>(m_messageQueue)));
m_taskManager.EnqueueTask(m_communicator.StatelessMessageProcessingTask(
getMessagesFromQueue<STATELESS>(m_messageQueue), popMessagesFromQueue<STATELESS>(m_messageQueue)));

m_signalHandler.WaitForSignal();
}

0 comments on commit 8c6d349

Please sign in to comment.