Skip to content

Commit f658abd

Browse files
committed
Use the new internal service for network connections
This is achieved by replacing the NetworkConnectionInfoServiceComm component with the Output component. TODO: - Receiving messages from sensor is not fully implemented, but the mechanisms are already in place for it. - NetworkStatusNotifierTest is broken.
1 parent 0527c42 commit f658abd

24 files changed

+575
-534
lines changed

collector/lib/Channel.h

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
#ifndef COLLECTOR_CHANNEL_H
2+
#define COLLECTOR_CHANNEL_H
3+
4+
#include <condition_variable>
5+
#include <cstddef>
6+
#include <mutex>
7+
#include <queue>
8+
9+
namespace collector {
10+
template <typename T>
11+
class Channel {
12+
public:
13+
Channel(std::size_t capacity = 0)
14+
: capacity_(capacity) {
15+
}
16+
17+
friend Channel<T>& operator<<(Channel<T>& ch, const T& data) {
18+
std::unique_lock<std::mutex> lock{ch.mutex_};
19+
if (ch.capacity_ > 0 && ch.queue_.size() >= ch.capacity_) {
20+
ch.cv_.wait(lock, [&ch]() { return ch.queue_.size() < ch.capacity_; });
21+
}
22+
23+
ch.queue_.push(data);
24+
ch.cv_.notify_one();
25+
return ch;
26+
}
27+
28+
friend Channel<T>& operator<<(Channel<T>& ch, T&& data) {
29+
std::unique_lock<std::mutex> lock{ch.mutex_};
30+
if (ch.capacity_ > 0 && ch.queue_.size() >= ch.capacity_) {
31+
ch.cv_.wait(lock, [&ch]() { return ch.queue_.size() < ch.capacity_; });
32+
}
33+
34+
ch.queue_.emplace(std::move(data));
35+
ch.cv_.notify_one();
36+
return ch;
37+
}
38+
39+
friend Channel<T>& operator>>(Channel<T>& ch, T& out) {
40+
std::unique_lock<std::mutex> lock{ch.mutex_};
41+
ch.cv_.wait(lock, [&ch]() { return ch.queue_.size() > 0; });
42+
43+
out = std::move(ch.queue_.front());
44+
ch.queue_.pop();
45+
46+
ch.cv_.notify_one();
47+
return ch;
48+
}
49+
50+
private:
51+
std::size_t capacity_;
52+
std::queue<T> queue_;
53+
std::mutex mutex_;
54+
std::condition_variable cv_;
55+
};
56+
} // namespace collector
57+
58+
#endif

collector/lib/CollectorService.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ CollectorService::CollectorService(CollectorConfig& config, std::atomic<ControlV
5252
conn_tracker_,
5353
config_,
5454
&system_inspector_,
55+
&output_,
5556
exporter_.GetRegistry().get());
5657

5758
auto network_signal_handler = std::make_unique<NetworkSignalHandler>(system_inspector_.GetInspector(), conn_tracker_, system_inspector_.GetUserspaceStats());

collector/lib/CollectorService.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@ class CollectorService {
5454
std::shared_ptr<ConnectionTracker> conn_tracker_;
5555
std::unique_ptr<NetworkStatusNotifier> net_status_notifier_;
5656
std::shared_ptr<ProcessStore> process_store_;
57-
std::shared_ptr<NetworkConnectionInfoServiceComm> network_connection_info_service_comm_;
5857
};
5958

6059
} // namespace collector

collector/lib/NetworkConnectionInfoServiceComm.cpp

Lines changed: 0 additions & 62 deletions
This file was deleted.

collector/lib/NetworkConnectionInfoServiceComm.h

Lines changed: 0 additions & 66 deletions
This file was deleted.

collector/lib/NetworkStatusNotifier.cpp

Lines changed: 4 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -114,24 +114,10 @@ void NetworkStatusNotifier::Run() {
114114
auto next_attempt = std::chrono::system_clock::now();
115115

116116
while (thread_.PauseUntil(next_attempt)) {
117-
comm_->ResetClientContext();
118-
119-
if (!comm_->WaitForConnectionReady([this] { return thread_.should_stop(); })) {
120-
break;
121-
}
122-
123-
auto client_writer = comm_->PushNetworkConnectionInfoOpenStream([this](const sensor::NetworkFlowsControlMessage* msg) { OnRecvControlMessage(msg); });
124-
125-
RunSingle(client_writer.get());
117+
RunSingle();
126118
if (thread_.should_stop()) {
127119
return;
128120
}
129-
auto status = client_writer->Finish(std::chrono::seconds(5));
130-
if (status.ok()) {
131-
CLOG(ERROR) << "Error streaming network connection info: server hung up unexpectedly";
132-
} else {
133-
CLOG(ERROR) << "Error streaming network connection info: " << status.error_message();
134-
}
135121
next_attempt = std::chrono::system_clock::now() + std::chrono::seconds(10);
136122
}
137123

@@ -144,7 +130,6 @@ void NetworkStatusNotifier::Start() {
144130
}
145131

146132
void NetworkStatusNotifier::Stop() {
147-
comm_->TryCancel();
148133
thread_.Stop();
149134
}
150135

@@ -217,15 +202,13 @@ bool NetworkStatusNotifier::UpdateAllConnsAndEndpoints() {
217202
return true;
218203
}
219204

220-
void NetworkStatusNotifier::RunSingle(IDuplexClientWriter<sensor::NetworkConnectionInfoMessage>* writer) {
221-
WaitUntilWriterStarted(writer, 10);
222-
205+
void NetworkStatusNotifier::RunSingle() {
223206
ConnMap old_conn_state;
224207
AdvertisedEndpointMap old_cep_state;
225208
auto next_scrape = std::chrono::system_clock::now();
226209
int64_t time_at_last_scrape = NowMicros();
227210

228-
while (writer->Sleep(next_scrape)) {
211+
while (thread_.PauseUntil(next_scrape)) {
229212
CLOG(DEBUG) << "Starting network status notification";
230213
next_scrape = std::chrono::system_clock::now() + std::chrono::seconds(config_.ScrapeInterval());
231214

@@ -272,7 +255,7 @@ void NetworkStatusNotifier::RunSingle(IDuplexClientWriter<sensor::NetworkConnect
272255
}
273256

274257
WITH_TIMER(CollectorStats::net_write_message) {
275-
if (!writer->Write(*msg, next_scrape)) {
258+
if (output_->SendMsg(*msg) != SignalHandler::PROCESSED) {
276259
CLOG(ERROR) << "Failed to write network connection info";
277260
return;
278261
}

collector/lib/NetworkStatusNotifier.h

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,11 @@
99
#include "CollectorConfig.h"
1010
#include "CollectorConnectionStats.h"
1111
#include "ConnTracker.h"
12-
#include "NetworkConnectionInfoServiceComm.h"
12+
#include "DuplexGRPC.h"
1313
#include "ProcfsScraper.h"
1414
#include "ProtoAllocator.h"
1515
#include "StoppableThread.h"
16+
#include "output/Output.h"
1617

1718
namespace collector {
1819

@@ -21,11 +22,12 @@ class NetworkStatusNotifier : protected ProtoAllocator<sensor::NetworkConnection
2122
NetworkStatusNotifier(std::shared_ptr<ConnectionTracker> conn_tracker,
2223
const CollectorConfig& config,
2324
system_inspector::Service* inspector,
25+
output::Output* output,
2426
prometheus::Registry* registry)
2527
: conn_scraper_(std::make_unique<ConnScraper>(config, inspector)),
2628
conn_tracker_(std::move(conn_tracker)),
2729
config_(config),
28-
comm_(std::make_unique<NetworkConnectionInfoServiceComm>(config.grpc_channel)) {
30+
output_(output) {
2931
if (config_.EnableConnectionStats()) {
3032
connections_total_reporter_ = {{registry,
3133
"rox_connections_total",
@@ -56,18 +58,6 @@ class NetworkStatusNotifier : protected ProtoAllocator<sensor::NetworkConnection
5658
conn_scraper_ = std::move(cs);
5759
}
5860

59-
/**
60-
* Replace the communications object.
61-
*
62-
* This is meant to make testing easier by swapping in a mock object.
63-
*
64-
* @params comm A unique pointer to the new instance of communications
65-
* to use.
66-
*/
67-
void ReplaceComm(std::unique_ptr<INetworkConnectionInfoServiceComm>&& comm) {
68-
comm_ = std::move(comm);
69-
}
70-
7161
private:
7262
FRIEND_TEST(NetworkStatusNotifierTest, RateLimitedConnections);
7363

@@ -85,7 +75,7 @@ class NetworkStatusNotifier : protected ProtoAllocator<sensor::NetworkConnection
8575
void Run();
8676
void WaitUntilWriterStarted(IDuplexClientWriter<sensor::NetworkConnectionInfoMessage>* writer, int wait_time);
8777
bool UpdateAllConnsAndEndpoints();
88-
void RunSingle(IDuplexClientWriter<sensor::NetworkConnectionInfoMessage>* writer);
78+
void RunSingle();
8979
void ReceivePublicIPs(const sensor::IPAddressList& public_ips);
9080
void ReceiveIPNetworks(const sensor::IPNetworkList& networks);
9181

@@ -97,7 +87,7 @@ class NetworkStatusNotifier : protected ProtoAllocator<sensor::NetworkConnection
9787
std::shared_ptr<ConnectionTracker> conn_tracker_;
9888

9989
const CollectorConfig& config_;
100-
std::unique_ptr<INetworkConnectionInfoServiceComm> comm_;
90+
output::Output* output_;
10191

10292
std::optional<CollectorConnectionStats<unsigned int>> connections_total_reporter_;
10393
std::optional<CollectorConnectionStats<float>> connections_rate_reporter_;

collector/lib/SignalServiceClient.cpp

Lines changed: 0 additions & 53 deletions
This file was deleted.

0 commit comments

Comments
 (0)