Skip to content

Commit f7b34fd

Browse files
committed
refactor: use max batch size only in communicator
get messages callback will now take an int parameter for the batch size
1 parent e2b32c4 commit f7b34fd

File tree

10 files changed

+95
-100
lines changed

10 files changed

+95
-100
lines changed

src/agent/communicator/include/communicator.hpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,15 +88,15 @@ namespace communicator
8888
/// @brief Processes messages in a stateful manner
8989
/// @param getMessages A function to retrieve a message from the queue
9090
/// @param onSuccess A callback function to execute when a message is processed
91-
boost::asio::awaitable<void>
92-
StatefulMessageProcessingTask(std::function<boost::asio::awaitable<std::tuple<int, std::string>>()> getMessages,
93-
std::function<void(const int, const std::string&)> onSuccess);
91+
boost::asio::awaitable<void> StatefulMessageProcessingTask(
92+
std::function<boost::asio::awaitable<std::tuple<int, std::string>>(const int)> getMessages,
93+
std::function<void(const int, const std::string&)> onSuccess);
9494

9595
/// @brief Processes messages in a stateless manner
9696
/// @param getMessages A function to retrieve a message from the queue
9797
/// @param onSuccess A callback function to execute when a message is processed
9898
boost::asio::awaitable<void> StatelessMessageProcessingTask(
99-
std::function<boost::asio::awaitable<std::tuple<int, std::string>>()> getMessages,
99+
std::function<boost::asio::awaitable<std::tuple<int, std::string>>(const int)> getMessages,
100100
std::function<void(const int, const std::string&)> onSuccess);
101101

102102
/// @brief Retrieves group configuration from the manager

src/agent/communicator/include/http_client.hpp

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -44,16 +44,16 @@ namespace http_client
4444
/// @param onSuccess Callback for successful request completion
4545
/// @param loopRequestCondition Condition to continue looping requests
4646
/// @return Awaitable task for the HTTP request
47-
boost::asio::awaitable<void>
48-
Co_PerformHttpRequest(std::shared_ptr<std::string> token,
49-
HttpRequestParams params,
50-
std::function<boost::asio::awaitable<std::tuple<int, std::string>>()> messageGetter,
51-
std::function<void()> onUnauthorized,
52-
std::time_t connectionRetry,
53-
std::time_t batchInterval,
54-
int batchSize,
55-
std::function<void(const int, const std::string&)> onSuccess = {},
56-
std::function<bool()> loopRequestCondition = {}) override;
47+
boost::asio::awaitable<void> Co_PerformHttpRequest(
48+
std::shared_ptr<std::string> token,
49+
HttpRequestParams params,
50+
std::function<boost::asio::awaitable<std::tuple<int, std::string>>(const int)> messageGetter,
51+
std::function<void()> onUnauthorized,
52+
std::time_t connectionRetry,
53+
std::time_t batchInterval,
54+
int batchSize,
55+
std::function<void(const int, const std::string&)> onSuccess = {},
56+
std::function<bool()> loopRequestCondition = {}) override;
5757

5858
/// @brief Performs a synchronous HTTP request
5959
/// @param params Parameters for the request

src/agent/communicator/include/ihttp_client.hpp

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -42,16 +42,16 @@ namespace http_client
4242
/// @param onSuccess Action to take on successful request
4343
/// @param loopRequestCondition Condition to continue the request loop
4444
/// @return Awaitable task for the HTTP request
45-
virtual boost::asio::awaitable<void>
46-
Co_PerformHttpRequest(std::shared_ptr<std::string> token,
47-
HttpRequestParams params,
48-
std::function<boost::asio::awaitable<std::tuple<int, std::string>>()> messageGetter,
49-
std::function<void()> onUnauthorized,
50-
std::time_t connectionRetry,
51-
std::time_t batchInterval,
52-
int batchSize,
53-
std::function<void(const int, const std::string&)> onSuccess = {},
54-
std::function<bool()> loopRequestCondition = {}) = 0;
45+
virtual boost::asio::awaitable<void> Co_PerformHttpRequest(
46+
std::shared_ptr<std::string> token,
47+
HttpRequestParams params,
48+
std::function<boost::asio::awaitable<std::tuple<int, std::string>>(const int)> messageGetter,
49+
std::function<void()> onUnauthorized,
50+
std::time_t connectionRetry,
51+
std::time_t batchInterval,
52+
int batchSize,
53+
std::function<void(const int, const std::string&)> onSuccess = {},
54+
std::function<bool()> loopRequestCondition = {}) = 0;
5555

5656
/// @brief Perform an HTTP request and receive the response
5757
/// @param params The parameters for the request

src/agent/communicator/src/communicator.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ namespace communicator
133133
}
134134

135135
boost::asio::awaitable<void> Communicator::StatefulMessageProcessingTask(
136-
std::function<boost::asio::awaitable<std::tuple<int, std::string>>()> getMessages,
136+
std::function<boost::asio::awaitable<std::tuple<int, std::string>>(const int)> getMessages,
137137
std::function<void(const int, const std::string&)> onSuccess)
138138
{
139139
auto onAuthenticationFailed = [this]()
@@ -162,7 +162,7 @@ namespace communicator
162162
}
163163

164164
boost::asio::awaitable<void> Communicator::StatelessMessageProcessingTask(
165-
std::function<boost::asio::awaitable<std::tuple<int, std::string>>()> getMessages,
165+
std::function<boost::asio::awaitable<std::tuple<int, std::string>>(const int)> getMessages,
166166
std::function<void(const int, const std::string&)> onSuccess)
167167
{
168168
auto onAuthenticationFailed = [this]()

src/agent/communicator/src/http_client.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ namespace http_client
9898
boost::asio::awaitable<void> HttpClient::Co_PerformHttpRequest(
9999
std::shared_ptr<std::string> token,
100100
HttpRequestParams reqParams,
101-
std::function<boost::asio::awaitable<std::tuple<int, std::string>>()> messageGetter,
101+
std::function<boost::asio::awaitable<std::tuple<int, std::string>>(const int)> messageGetter,
102102
std::function<void()> onUnauthorized,
103103
std::time_t connectionRetry,
104104
std::time_t batchInterval,
@@ -165,7 +165,7 @@ namespace http_client
165165
LogError("Loop request condition is true.");
166166
// print batch size
167167
LogError("Batch size: {}", batchSize);
168-
const auto messages = co_await messageGetter();
168+
const auto messages = co_await messageGetter(batchSize);
169169
messagesCount = std::get<0>(messages);
170170

171171
if (messagesCount >= batchSize || batchTimeoutTimer.expiry() <= std::chrono::steady_clock::now())

src/agent/communicator/tests/communicator_test.cpp

Lines changed: 57 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
// NOLINTBEGIN(cppcoreguidelines-avoid-capturing-lambda-coroutines)
2121

2222
using namespace testing;
23+
using GetMessagesFuncType = std::function<boost::asio::awaitable<intStringTuple>(const int)>;
2324

2425
namespace
2526
{
@@ -51,32 +52,34 @@ TEST(CommunicatorTest, StatefulMessageProcessingTask_Success)
5152
{
5253
auto mockHttpClient = std::make_unique<MockHttpClient>();
5354

54-
auto getMessages = []() -> boost::asio::awaitable<std::tuple<int, std::string>>
55+
auto getMessages = [](const int) -> boost::asio::awaitable<intStringTuple>
5556
{
56-
co_return std::tuple<int, std::string> {1, std::string("message-content")};
57+
co_return intStringTuple {1, std::string("message-content")};
5758
};
5859

5960
std::function<void(const int, const std::string&)> onSuccess = [](const int, const std::string& message)
6061
{
6162
EXPECT_EQ(message, "message-content");
6263
};
6364

65+
auto MockCo_PerformHttpRequest =
66+
[](std::shared_ptr<std::string>,
67+
http_client::HttpRequestParams,
68+
GetMessagesFuncType pGetMessages,
69+
std::function<void()>,
70+
[[maybe_unused]] std::time_t connectionRetry,
71+
[[maybe_unused]] std::time_t batchInterval,
72+
[[maybe_unused]] int batchSize,
73+
std::function<void(const int, const std::string&)> pOnSuccess,
74+
[[maybe_unused]] std::function<bool()> loopRequestCondition) -> boost::asio::awaitable<void>
75+
{
76+
const auto message = co_await pGetMessages(1);
77+
pOnSuccess(std::get<0>(message), std::get<1>(message));
78+
co_return;
79+
};
80+
6481
EXPECT_CALL(*mockHttpClient, Co_PerformHttpRequest(_, _, _, _, _, _, _, _, _))
65-
.WillOnce(Invoke(
66-
[](std::shared_ptr<std::string>,
67-
http_client::HttpRequestParams,
68-
std::function<boost::asio::awaitable<std::tuple<int, std::string>>()> pGetMessages,
69-
std::function<void()>,
70-
[[maybe_unused]] std::time_t connectionRetry,
71-
[[maybe_unused]] std::time_t batchInterval,
72-
[[maybe_unused]] int batchSize,
73-
std::function<void(const int, const std::string&)> pOnSuccess,
74-
[[maybe_unused]] std::function<bool()> loopRequestCondition) -> boost::asio::awaitable<void>
75-
{
76-
const auto message = co_await pGetMessages();
77-
pOnSuccess(std::get<0>(message), std::get<1>(message));
78-
co_return;
79-
}));
82+
.WillOnce(Invoke(MockCo_PerformHttpRequest));
8083

8184
communicator::Communicator communicator(std::move(mockHttpClient), "uuid", "key", nullptr, FUNC);
8285

@@ -109,22 +112,24 @@ TEST(CommunicatorTest, WaitForTokenExpirationAndAuthenticate_FailedAuthenticatio
109112
return std::nullopt;
110113
}));
111114

115+
auto MockCo_PerformHttpRequest =
116+
[](std::shared_ptr<std::string> token,
117+
http_client::HttpRequestParams,
118+
[[maybe_unused]] GetMessagesFuncType pGetMessages,
119+
[[maybe_unused]] std::function<void()> onUnauthorized,
120+
[[maybe_unused]] std::time_t connectionRetry,
121+
[[maybe_unused]] std::time_t batchInterval,
122+
[[maybe_unused]] int batchSize,
123+
[[maybe_unused]] std::function<void(const int, const std::string&)> onSuccess,
124+
[[maybe_unused]] std::function<bool()> loopCondition) -> boost::asio::awaitable<void>
125+
{
126+
EXPECT_TRUE(token->empty());
127+
co_return;
128+
};
129+
112130
// A following call to Co_PerformHttpRequest should not have a token
113131
EXPECT_CALL(*mockHttpClientPtr, Co_PerformHttpRequest(_, _, _, _, _, _, _, _, _))
114-
.WillOnce(Invoke(
115-
[](std::shared_ptr<std::string> token,
116-
http_client::HttpRequestParams,
117-
[[maybe_unused]] std::function<boost::asio::awaitable<std::tuple<int, std::string>>()> getMessages,
118-
[[maybe_unused]] std::function<void()> onUnauthorized,
119-
[[maybe_unused]] std::time_t connectionRetry,
120-
[[maybe_unused]] std::time_t batchInterval,
121-
[[maybe_unused]] int batchSize,
122-
[[maybe_unused]] std::function<void(const int, const std::string&)> onSuccess,
123-
[[maybe_unused]] std::function<bool()> loopCondition) -> boost::asio::awaitable<void>
124-
{
125-
EXPECT_TRUE(token->empty());
126-
co_return;
127-
}));
132+
.WillOnce(Invoke(MockCo_PerformHttpRequest));
128133

129134
boost::asio::io_context ioContext;
130135

@@ -134,8 +139,8 @@ TEST(CommunicatorTest, WaitForTokenExpirationAndAuthenticate_FailedAuthenticatio
134139
{
135140
co_await communicatorPtr->WaitForTokenExpirationAndAuthenticate();
136141
co_await communicatorPtr->StatelessMessageProcessingTask(
137-
[]() -> boost::asio::awaitable<std::tuple<int, std::string>>
138-
{ co_return std::tuple<int, std::string>(1, std::string {"message"}); },
142+
[](const int) -> boost::asio::awaitable<intStringTuple>
143+
{ co_return intStringTuple(1, std::string {"message"}); },
139144
[]([[maybe_unused]] const int, const std::string&) {});
140145
}(),
141146
boost::asio::detached);
@@ -167,22 +172,24 @@ TEST(CommunicatorTest, StatelessMessageProcessingTask_CallsWithValidToken)
167172
}));
168173

169174
std::string capturedToken;
175+
176+
auto MockCo_PerformHttpRequest =
177+
[&capturedToken](std::shared_ptr<std::string> token,
178+
http_client::HttpRequestParams,
179+
[[maybe_unused]] GetMessagesFuncType pGetMessages,
180+
[[maybe_unused]] std::function<void()> onUnauthorized,
181+
[[maybe_unused]] std::time_t connectionRetry,
182+
[[maybe_unused]] std::time_t batchInterval,
183+
[[maybe_unused]] int batchSize,
184+
[[maybe_unused]] std::function<void(const int, const std::string&)> onSuccess,
185+
[[maybe_unused]] std::function<bool()> loopCondition) -> boost::asio::awaitable<void>
186+
{
187+
capturedToken = *token;
188+
co_return;
189+
};
190+
170191
EXPECT_CALL(*mockHttpClientPtr, Co_PerformHttpRequest(_, _, _, _, _, _, _, _, _))
171-
.WillOnce(Invoke(
172-
[&capturedToken](
173-
std::shared_ptr<std::string> token,
174-
http_client::HttpRequestParams,
175-
[[maybe_unused]] std::function<boost::asio::awaitable<std::tuple<int, std::string>>()> getMessages,
176-
[[maybe_unused]] std::function<void()> onUnauthorized,
177-
[[maybe_unused]] std::time_t connectionRetry,
178-
[[maybe_unused]] std::time_t batchInterval,
179-
[[maybe_unused]] int batchSize,
180-
[[maybe_unused]] std::function<void(const int, const std::string&)> onSuccess,
181-
[[maybe_unused]] std::function<bool()> loopCondition) -> boost::asio::awaitable<void>
182-
{
183-
capturedToken = *token;
184-
co_return;
185-
}));
192+
.WillOnce(Invoke(MockCo_PerformHttpRequest));
186193

187194
boost::asio::io_context ioContext;
188195

@@ -192,8 +199,8 @@ TEST(CommunicatorTest, StatelessMessageProcessingTask_CallsWithValidToken)
192199
{
193200
co_await communicatorPtr->WaitForTokenExpirationAndAuthenticate();
194201
co_await communicatorPtr->StatelessMessageProcessingTask(
195-
[]() -> boost::asio::awaitable<std::tuple<int, std::string>>
196-
{ co_return std::tuple<int, std::string>(1, std::string {"message"}); },
202+
[](const int) -> boost::asio::awaitable<intStringTuple>
203+
{ co_return intStringTuple(1, std::string {"message"}); },
197204
[]([[maybe_unused]] const int, const std::string&) {});
198205
}(),
199206
boost::asio::detached);

src/agent/communicator/tests/http_client_test.cpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ TEST_F(HttpClientTest, Co_PerformHttpRequest_Success)
216216
SetupMockSocketReadExpectations(boost::beast::http::status::ok);
217217

218218
auto getMessagesCalled = false;
219-
auto getMessages = [&getMessagesCalled]() -> boost::asio::awaitable<std::tuple<int, std::string>>
219+
auto getMessages = [&getMessagesCalled](const int) -> boost::asio::awaitable<std::tuple<int, std::string>>
220220
{
221221
getMessagesCalled = true;
222222
co_return std::tuple<int, std::string>(1, "test message");
@@ -270,7 +270,7 @@ TEST_F(HttpClientTest, Co_PerformHttpRequest_CallbacksNotCalledIfCannotConnect)
270270
SetupMockSocketConnectExpectations(boost::system::errc::make_error_code(boost::system::errc::bad_address));
271271

272272
auto getMessagesCalled = false;
273-
auto getMessages = [&getMessagesCalled]() -> boost::asio::awaitable<std::tuple<int, std::string>>
273+
auto getMessages = [&getMessagesCalled](const int) -> boost::asio::awaitable<std::tuple<int, std::string>>
274274
{
275275
getMessagesCalled = true;
276276
co_return std::tuple<int, std::string>(1, "test message");
@@ -318,7 +318,7 @@ TEST_F(HttpClientTest, Co_PerformHttpRequest_OnSuccessNotCalledIfAsyncWriteFails
318318
SetupMockSocketWriteExpectations(boost::system::errc::make_error_code(boost::system::errc::bad_address));
319319

320320
auto getMessagesCalled = false;
321-
auto getMessages = [&getMessagesCalled]() -> boost::asio::awaitable<std::tuple<int, std::string>>
321+
auto getMessages = [&getMessagesCalled](const int) -> boost::asio::awaitable<std::tuple<int, std::string>>
322322
{
323323
getMessagesCalled = true;
324324
co_return std::tuple<int, std::string>(1, "test message");
@@ -374,7 +374,7 @@ TEST_F(HttpClientTest, Co_PerformHttpRequest_OnSuccessNotCalledIfAsyncReadFails)
374374
boost::system::errc::make_error_code(boost::system::errc::bad_address));
375375

376376
auto getMessagesCalled = false;
377-
auto getMessages = [&getMessagesCalled]() -> boost::asio::awaitable<std::tuple<int, std::string>>
377+
auto getMessages = [&getMessagesCalled](const int) -> boost::asio::awaitable<std::tuple<int, std::string>>
378378
{
379379
getMessagesCalled = true;
380380
co_return std::tuple<int, std::string>(1, "test message");
@@ -429,7 +429,7 @@ TEST_F(HttpClientTest, Co_PerformHttpRequest_UnauthorizedCalledWhenAuthorization
429429
SetupMockSocketReadExpectations(boost::beast::http::status::unauthorized);
430430

431431
auto getMessagesCalled = false;
432-
auto getMessages = [&getMessagesCalled]() -> boost::asio::awaitable<std::tuple<int, std::string>>
432+
auto getMessages = [&getMessagesCalled](const int) -> boost::asio::awaitable<std::tuple<int, std::string>>
433433
{
434434
getMessagesCalled = true;
435435
co_return std::tuple<int, std::string>(1, "test message");

src/agent/communicator/tests/mocks/mock_http_client.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ class MockHttpClient : public http_client::IHttpClient
1616
Co_PerformHttpRequest,
1717
(std::shared_ptr<std::string> token,
1818
http_client::HttpRequestParams params,
19-
std::function<boost::asio::awaitable<intStringTuple>()> messageGetter,
19+
std::function<boost::asio::awaitable<intStringTuple>(const int)> messageGetter,
2020
std::function<void()> onUnauthorized,
2121
std::time_t connectionRetry,
2222
std::time_t batchInterval,

src/agent/include/agent.hpp

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,4 @@ class Agent
6969

7070
/// @brief Centralized configuration
7171
centralized_configuration::CentralizedConfiguration m_centralizedConfiguration;
72-
73-
/// @brief Maximum messages batch size
74-
int m_maxBatchingSize = config::agent::DEFAULT_BATCH_SIZE;
7572
};

src/agent/src/agent.cpp

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,6 @@ Agent::Agent(const std::string& configFilePath, std::unique_ptr<ISignalHandler>
3131
[this](std::function<void()> task) { m_taskManager.EnqueueTask(std::move(task)); })
3232
, m_commandHandler(m_dataPath)
3333
{
34-
m_maxBatchingSize =
35-
m_configurationParser.GetConfig<int>("agent", "max_batching_size").value_or(config::agent::DEFAULT_BATCH_SIZE);
36-
37-
if (m_maxBatchingSize < 1)
38-
{
39-
LogWarn("max_batching_size cannot be lower than 1s. Using default value.");
40-
m_maxBatchingSize = config::agent::DEFAULT_BATCH_SIZE;
41-
}
42-
4334
m_centralizedConfiguration.SetGroupIdFunction([this](const std::vector<std::string>& groups)
4435
{ return m_agentInfo.SetGroups(groups); });
4536

@@ -66,22 +57,22 @@ void Agent::Run()
6657
[this](const int, const std::string& response) { PushCommandsToQueue(m_messageQueue, response); }));
6758

6859
m_taskManager.EnqueueTask(m_communicator.StatefulMessageProcessingTask(
69-
[this]()
60+
[this](const int numMessages)
7061
{
7162
return GetMessagesFromQueue(m_messageQueue,
7263
MessageType::STATEFUL,
73-
m_maxBatchingSize,
64+
numMessages,
7465
[this]() { return m_agentInfo.GetMetadataInfo(false); });
7566
},
7667
[this]([[maybe_unused]] const int messageCount, const std::string&)
7768
{ PopMessagesFromQueue(m_messageQueue, MessageType::STATEFUL, messageCount); }));
7869

7970
m_taskManager.EnqueueTask(m_communicator.StatelessMessageProcessingTask(
80-
[this]()
71+
[this](const int numMessages)
8172
{
8273
return GetMessagesFromQueue(m_messageQueue,
8374
MessageType::STATELESS,
84-
m_maxBatchingSize,
75+
numMessages,
8576
[this]() { return m_agentInfo.GetMetadataInfo(false); });
8677
},
8778
[this]([[maybe_unused]] const int messageCount, const std::string&)

0 commit comments

Comments
 (0)