Skip to content

Commit

Permalink
temp
Browse files Browse the repository at this point in the history
notes:

Queue header has the same name as others, namely queue.hpp from boost this
causes intellisense to fail to recognize symbols, bigger problems may arise.

Some necessary changes to cmakes were made to allow compilation, aside
from the queue header name, the cmake wasnt generating, some linking
must be made public.

Currently experimenting passing a lambda to the communicator with the
strategy on how to get a message to send. This way the communciator
can be kept decoupled from the queue implementation.
  • Loading branch information
jr0me committed Aug 12, 2024
1 parent 2e93ebd commit 0b062b7
Show file tree
Hide file tree
Showing 12 changed files with 83 additions and 41 deletions.
4 changes: 2 additions & 2 deletions src/agent/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,16 @@ set(SOURCES
add_subdirectory(agent_info)
add_subdirectory(communicator)
add_subdirectory(configuration_parser)
add_subdirectory(queue)

find_package(OpenSSL REQUIRED)
find_package(Boost REQUIRED COMPONENTS asio beast)

add_library(Agent ${SOURCES})
target_include_directories(Agent PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include)
target_link_libraries(Agent PUBLIC ConfigurationParser Communicator AgentInfo PRIVATE OpenSSL::SSL OpenSSL::Crypto Boost::asio Boost::beast)
target_link_libraries(Agent PUBLIC ConfigurationParser Communicator AgentInfo queue PRIVATE OpenSSL::SSL OpenSSL::Crypto Boost::asio Boost::beast)

if(BUILD_TESTS)
enable_testing()
add_subdirectory(tests)
add_subdirectory(queue)
endif()
12 changes: 9 additions & 3 deletions src/agent/communicator/include/communicator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,15 @@ namespace communicator
const std::function<std::string(std::string, std::string)> GetStringConfigValue);

boost::asio::awaitable<void> WaitForTokenExpirationAndAuthenticate();
boost::asio::awaitable<void> GetCommandsFromManager(std::queue<std::string>& messageQueue);
boost::asio::awaitable<void> StatefulMessageProcessingTask(std::queue<std::string>& messageQueue);
boost::asio::awaitable<void> StatelessMessageProcessingTask(std::queue<std::string>& messageQueue);
boost::asio::awaitable<void>
GetCommandsFromManager(std::function<boost::asio::awaitable<std::string>()> getMessages,
std::function<void(const std::string&)> onSuccess);
boost::asio::awaitable<void>
StatefulMessageProcessingTask(std::function<boost::asio::awaitable<std::string>()> getMessages,
std::function<void(const std::string&)> onSuccess);
boost::asio::awaitable<void>
StatelessMessageProcessingTask(std::function<boost::asio::awaitable<std::string>()> getMessages,
std::function<void(const std::string&)> onSuccess);

private:
long GetTokenRemainingSecs() const;
Expand Down
11 changes: 6 additions & 5 deletions src/agent/communicator/include/http_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,12 @@ namespace http_client
std::function<void()> onUnauthorized,
std::function<void(const std::string&)> onSuccess = {});

boost::asio::awaitable<void> Co_MessageProcessingTask(const std::string& token,
HttpRequestParams params,
std::function<std::string()> messageGetter,
std::function<void()> onUnauthorized,
std::function<void(const std::string&)> onSuccess = {});
boost::asio::awaitable<void>
Co_MessageProcessingTask(const std::string& token,
HttpRequestParams params,
std::function<boost::asio::awaitable<std::string>()> messageGetter,
std::function<void()> onUnauthorized,
std::function<void(const std::string&)> onSuccess = {});

boost::beast::http::response<boost::beast::http::dynamic_body> PerformHttpRequest(const HttpRequestParams& params);

Expand Down
21 changes: 15 additions & 6 deletions src/agent/communicator/src/communicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,18 @@ namespace communicator
return std::max(0L, static_cast<long>(m_tokenExpTimeInSeconds - now_seconds));
}

boost::asio::awaitable<void> Communicator::GetCommandsFromManager(std::queue<std::string>& messageQueue)
boost::asio::awaitable<void>
Communicator::GetCommandsFromManager(std::function<boost::asio::awaitable<std::string>()> getMessages,
std::function<void(const std::string&)> onSuccess)
{
auto onAuthenticationFailed = [this]()
{
TryReAuthenticate();
};
const auto reqParams =
http_client::HttpRequestParams(boost::beast::http::verb::get, m_managerIp, m_port, "/commands");
co_await http_client::Co_MessageProcessingTask(m_token, reqParams, {}, onAuthenticationFailed);
co_await http_client::Co_MessageProcessingTask(
m_token, reqParams, getMessages, onAuthenticationFailed, onSuccess);
}

boost::asio::awaitable<void> Communicator::WaitForTokenExpirationAndAuthenticate()
Expand Down Expand Up @@ -105,26 +108,32 @@ namespace communicator
}
}

boost::asio::awaitable<void> Communicator::StatefulMessageProcessingTask(std::queue<std::string>& messageQueue)
boost::asio::awaitable<void>
Communicator::StatefulMessageProcessingTask(std::function<boost::asio::awaitable<std::string>()> getMessages,
std::function<void(const std::string&)> onSuccess)
{
auto onAuthenticationFailed = [this]()
{
TryReAuthenticate();
};
const auto reqParams =
http_client::HttpRequestParams(boost::beast::http::verb::post, m_managerIp, m_port, "/stateful");
co_await http_client::Co_MessageProcessingTask(m_token, reqParams, {}, onAuthenticationFailed);
co_await http_client::Co_MessageProcessingTask(
m_token, reqParams, getMessages, onAuthenticationFailed, onSuccess);
}

boost::asio::awaitable<void> Communicator::StatelessMessageProcessingTask(std::queue<std::string>& messageQueue)
boost::asio::awaitable<void>
Communicator::StatelessMessageProcessingTask(std::function<boost::asio::awaitable<std::string>()> getMessages,
std::function<void(const std::string&)> onSuccess)
{
auto onAuthenticationFailed = [this]()
{
TryReAuthenticate();
};
const auto reqParams =
http_client::HttpRequestParams(boost::beast::http::verb::post, m_managerIp, m_port, "/stateless");
co_await http_client::Co_MessageProcessingTask(m_token, reqParams, {}, onAuthenticationFailed);
co_await http_client::Co_MessageProcessingTask(
m_token, reqParams, getMessages, onAuthenticationFailed, onSuccess);
}

void Communicator::TryReAuthenticate()
Expand Down
13 changes: 7 additions & 6 deletions src/agent/communicator/src/http_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,12 @@ namespace http_client
std::cout << "Response body: " << boost::beast::buffers_to_string(res.body().data()) << std::endl;
}

boost::asio::awaitable<void> Co_MessageProcessingTask(const std::string& token,
HttpRequestParams reqParams,
std::function<std::string()> messageGetter,
std::function<void()> onUnauthorized,
std::function<void(const std::string&)> onSuccess)
boost::asio::awaitable<void>
Co_MessageProcessingTask(const std::string& token,
HttpRequestParams reqParams,
std::function<boost::asio::awaitable<std::string>()> messageGetter,
std::function<void()> onUnauthorized,
std::function<void(const std::string&)> onSuccess)
{
using namespace std::chrono_literals;

Expand Down Expand Up @@ -129,7 +130,7 @@ namespace http_client
continue;
}

reqParams.body = messageGetter ? messageGetter() : "";
reqParams.body = messageGetter ? co_await messageGetter() : "";
reqParams.token = token;
auto req = CreateHttpRequest(reqParams);

Expand Down
4 changes: 2 additions & 2 deletions src/agent/include/agent.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
#include <agent_info.hpp>
#include <communicator.hpp>
#include <configuration_parser.hpp>
#include <message_queue.hpp>
#include <signal_handler.hpp>
#include <task_manager.hpp>

#include <queue>
#include <string>

class Agent
Expand All @@ -18,7 +18,7 @@ class Agent
void Run();

private:
std::queue<std::string> m_messageQueue;
MultiTypeQueue m_messageQueue;

SignalHandler m_signalHandler;
TaskManager m_taskManager;
Expand Down
16 changes: 5 additions & 11 deletions src/agent/queue/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,15 @@ set(CMAKE_CXX_STANDARD_REQUIRED True)

set(CMAKE_CXX_FLAGS "-Wall -Wextra -Wunused -pthread")

include_directories(${CMAKE_SOURCE_DIR}/include)

find_package(SQLiteCpp REQUIRED)
find_package(nlohmann_json REQUIRED)
find_package(fmt REQUIRED)
find_package(Boost REQUIRED COMPONENTS asio beast)

add_library(queue
src/sqlitestorage.cpp
src/queue.cpp
)

target_include_directories(queue PRIVATE include ${SQLiteCpp_INCLUDE_DIRS})
add_library(queue src/sqlitestorage.cpp src/message_queue.cpp)
target_include_directories(queue PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include PRIVATE ${SQLiteCpp_INCLUDE_DIRS})
target_link_libraries(queue PRIVATE SQLiteCpp nlohmann_json::nlohmann_json fmt::fmt Boost::asio)

if(BUILD_TESTS)
add_subdirectory(tests)
endif()
# if(BUILD_TESTS)
# add_subdirectory(tests)
# endif()
File renamed without changes.
2 changes: 1 addition & 1 deletion src/agent/queue/src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#include <string>
#include <thread>

#include "queue.hpp"
#include "message_queue.hpp"

int main()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#include <utility>
#include <vector>

#include "queue.hpp"
#include "message_queue.hpp"

int MultiTypeQueue::push(Message message, bool shouldWait)
{
Expand Down
2 changes: 1 addition & 1 deletion src/agent/queue/tests/queue_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
#include <boost/asio.hpp>
#include <boost/asio/experimental/co_spawn.hpp>

#include "queue.hpp"
#include "message_queue.hpp"
#include "queue_test.hpp"

using json = nlohmann::json;
Expand Down
37 changes: 34 additions & 3 deletions src/agent/src/agent.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,35 @@
#include <agent.hpp>

#include <shared.hpp>

#include <string>
#include <thread>

namespace
{
template<MessageType T>
auto getMessagesFromQueue(MultiTypeQueue& multiTypeQueue)
{
return [&multiTypeQueue]() -> boost::asio::awaitable<std::string>
{
std::cout << "Getting messages from queue\n";
const auto message = co_await multiTypeQueue.getNextAwaitable(T);
co_return message.data.dump();
};
}

template<MessageType T>
auto popMessagesFromQueue(MultiTypeQueue& multiTypeQueue)
{
return [&multiTypeQueue](const std::string& response) -> void
{
std::cout << "Response: " << response << '\n';
std::cout << "Popping messages from queue\n";
multiTypeQueue.pop(T);
};
}
} // namespace

Agent::Agent()
: m_communicator(m_agentInfo.GetUUID(),
[this](std::string table, std::string key) -> std::string
Expand All @@ -18,9 +46,12 @@ Agent::~Agent()
void Agent::Run()
{
m_taskManager.EnqueueTask(m_communicator.WaitForTokenExpirationAndAuthenticate());
m_taskManager.EnqueueTask(m_communicator.GetCommandsFromManager(m_messageQueue));
m_taskManager.EnqueueTask(m_communicator.StatefulMessageProcessingTask(m_messageQueue));
m_taskManager.EnqueueTask(m_communicator.StatelessMessageProcessingTask(m_messageQueue));
m_taskManager.EnqueueTask(m_communicator.GetCommandsFromManager(getMessagesFromQueue<COMMAND>(m_messageQueue),
popMessagesFromQueue<COMMAND>(m_messageQueue)));
m_taskManager.EnqueueTask(m_communicator.StatefulMessageProcessingTask(
getMessagesFromQueue<STATEFUL>(m_messageQueue), popMessagesFromQueue<STATEFUL>(m_messageQueue)));
m_taskManager.EnqueueTask(m_communicator.StatelessMessageProcessingTask(
getMessagesFromQueue<STATELESS>(m_messageQueue), popMessagesFromQueue<STATELESS>(m_messageQueue)));

m_signalHandler.WaitForSignal();
}

0 comments on commit 0b062b7

Please sign in to comment.