Skip to content

Commit 88af117

Browse files
committed
Use the new internal service for network connections
This is achieved by replacing the NetworkConnectionInfoServiceComm component with the Output component. TODO: - Receiving messages from sensor is not fully implemented, but the mechanisms are already in place for it. - NetworkStatusNotifierTest is broken.
1 parent 7d26ea8 commit 88af117

26 files changed

+979
-793
lines changed

.github/workflows/integration-test-containers.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ jobs:
4848
- integration-tests/images.yml
4949
- integration-tests/Dockerfile
5050
- .github/workflows/integration-test-containers.yml
51+
- collector/proto/third_party/stackrox
5152
5253
build-test-image:
5354
name: Build the integration test image

collector/lib/Channel.h

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
#ifndef COLLECTOR_CHANNEL_H
2+
#define COLLECTOR_CHANNEL_H
3+
4+
#include <atomic>
5+
#include <condition_variable>
6+
#include <cstddef>
7+
#include <iterator>
8+
#include <mutex>
9+
#include <queue>
10+
11+
namespace collector {
12+
template <typename T>
13+
class Channel {
14+
public:
15+
using value_type = T;
16+
17+
Channel(std::size_t capacity = 0)
18+
: capacity_(capacity) {
19+
}
20+
21+
bool IsClosed() {
22+
return closed_.load();
23+
}
24+
25+
void Close() {
26+
closed_.store(true);
27+
cv_.notify_all();
28+
}
29+
30+
friend Channel<T>& operator<<(Channel<T>& ch, const T& data) {
31+
if (ch.IsClosed()) {
32+
// If the channel is closed we simply drop messages
33+
return ch;
34+
}
35+
36+
std::unique_lock<std::mutex> lock{ch.mutex_};
37+
if (ch.capacity_ > 0 && ch.queue_.size() >= ch.capacity_) {
38+
ch.cv_.wait(lock, [&ch] { return ch.queue_.size() < ch.capacity_; });
39+
}
40+
41+
ch.queue_.push(data);
42+
ch.cv_.notify_one();
43+
44+
return ch;
45+
}
46+
47+
friend Channel<T>& operator<<(Channel<T>& ch, T&& data) {
48+
if (ch.IsClosed()) {
49+
// If the channel is closed we simply drop messages
50+
return ch;
51+
}
52+
53+
std::unique_lock<std::mutex> lock{ch.mutex_};
54+
if (ch.capacity_ > 0 && ch.queue_.size() >= ch.capacity_) {
55+
ch.cv_.wait(lock, [&ch] { return ch.queue_.size() < ch.capacity_; });
56+
}
57+
58+
ch.queue_.push(std::move(data));
59+
ch.cv_.notify_one();
60+
61+
return ch;
62+
}
63+
64+
friend Channel<T>& operator>>(Channel<T>& ch, T& out) {
65+
std::unique_lock<std::mutex> lock{ch.mutex_};
66+
if (ch.IsClosed() && ch.queue_.empty()) {
67+
return ch;
68+
}
69+
70+
ch.cv_.wait(lock, [&ch] { return !ch.queue_.empty() || ch.IsClosed(); });
71+
if (!ch.queue_.empty()) {
72+
out = std::move(ch.queue_.front());
73+
ch.queue_.pop();
74+
}
75+
76+
ch.cv_.notify_one();
77+
return ch;
78+
}
79+
80+
bool Empty() {
81+
std::unique_lock<std::mutex> lock{mutex_};
82+
return queue_.empty();
83+
}
84+
85+
struct Iterator {
86+
Iterator(Channel<T>& ch) : ch_(ch) {}
87+
88+
using iterator_category = std::input_iterator_tag;
89+
using value_type = T;
90+
using reference = T&;
91+
using pointer = T*;
92+
93+
Iterator operator++() { return *this; }
94+
reference operator*() {
95+
ch_ >> value_;
96+
return value_;
97+
}
98+
99+
bool operator!=(Iterator& /*unused*/) const {
100+
std::unique_lock<std::mutex> lock{ch_.mutex_};
101+
ch_.cv_.wait(lock, [this] { return !ch_.queue_.empty() || ch_.IsClosed(); });
102+
103+
return !(ch_.IsClosed() && ch_.queue_.empty());
104+
}
105+
106+
private:
107+
Channel<T>& ch_;
108+
value_type value_;
109+
};
110+
111+
Iterator begin() { return Iterator{*this}; }
112+
Iterator end() { return Iterator{*this}; }
113+
114+
private:
115+
std::size_t capacity_;
116+
std::queue<T> queue_;
117+
std::mutex mutex_;
118+
std::condition_variable cv_;
119+
std::atomic<bool> closed_{false};
120+
};
121+
} // namespace collector
122+
123+
#endif

collector/lib/CollectorService.cpp

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,7 @@ CollectorService::CollectorService(CollectorConfig& config, std::atomic<ControlV
4040
// Network tracking
4141
if (!config_.grpc_channel || !config_.DisableNetworkFlows()) {
4242
// In case if no GRPC is used, continue to setup networking infrasturcture
43-
// with empty grpc_channel. NetworkConnectionInfoServiceComm will pick it
44-
// up and use stdout instead.
43+
// with empty grpc_channel. output_ will pick it up and use stdout instead.
4544
conn_tracker_ = std::make_shared<ConnectionTracker>();
4645
UnorderedSet<L4ProtoPortPair> ignored_l4proto_port_pairs(config_.IgnoredL4ProtoPortPairs());
4746
conn_tracker_->UpdateIgnoredL4ProtoPortPairs(std::move(ignored_l4proto_port_pairs));
@@ -52,6 +51,7 @@ CollectorService::CollectorService(CollectorConfig& config, std::atomic<ControlV
5251
conn_tracker_,
5352
config_,
5453
&system_inspector_,
54+
&output_,
5555
exporter_.GetRegistry().get());
5656

5757
auto network_signal_handler = std::make_unique<NetworkSignalHandler>(system_inspector_.GetInspector(), conn_tracker_, system_inspector_.GetUserspaceStats());
@@ -98,6 +98,12 @@ void CollectorService::RunForever() {
9898

9999
CLOG(INFO) << "Network scrape interval set to " << config_.ScrapeInterval() << " seconds";
100100

101+
auto should_stop = [this] {
102+
return control_->load(std::memory_order_relaxed) == STOP_COLLECTOR;
103+
};
104+
105+
output_.WaitReady(should_stop);
106+
101107
if (net_status_notifier_) {
102108
net_status_notifier_->Start();
103109
}
@@ -108,8 +114,7 @@ void CollectorService::RunForever() {
108114

109115
system_inspector_.Start();
110116

111-
ControlValue cv;
112-
while ((cv = control_->load(std::memory_order_relaxed)) != STOP_COLLECTOR) {
117+
while (!should_stop()) {
113118
system_inspector_.Run(*control_);
114119
CLOG(DEBUG) << "Interrupted collector!";
115120
}
@@ -123,12 +128,6 @@ void CollectorService::RunForever() {
123128
CLOG(INFO) << "Shutting down collector.";
124129
}
125130

126-
bool CollectorService::WaitForGRPCServer() {
127-
std::string error_str;
128-
auto interrupt = [this] { return control_->load(std::memory_order_relaxed) == STOP_COLLECTOR; };
129-
return WaitForChannelReady(config_.grpc_channel, interrupt);
130-
}
131-
132131
bool CollectorService::InitKernel() {
133132
auto& startup_diagnostics = StartupDiagnostics::GetInstance();
134133
std::string cm_name(CollectionMethodName(config_.GetCollectionMethod()));

collector/lib/CollectorService.h

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,6 @@ class CollectorService {
3030
bool InitKernel();
3131

3232
private:
33-
bool WaitForGRPCServer();
34-
3533
CollectorConfig& config_;
3634
output::Output output_;
3735
system_inspector::Service system_inspector_;
@@ -53,7 +51,6 @@ class CollectorService {
5351
std::shared_ptr<ConnectionTracker> conn_tracker_;
5452
std::unique_ptr<NetworkStatusNotifier> net_status_notifier_;
5553
std::shared_ptr<ProcessStore> process_store_;
56-
std::shared_ptr<NetworkConnectionInfoServiceComm> network_connection_info_service_comm_;
5754
};
5855

5956
} // namespace collector

collector/lib/NetworkConnectionInfoServiceComm.cpp

Lines changed: 0 additions & 62 deletions
This file was deleted.

collector/lib/NetworkConnectionInfoServiceComm.h

Lines changed: 0 additions & 63 deletions
This file was deleted.

0 commit comments

Comments
 (0)