Skip to content

ROX-28527: Send networkflows through the new iservice #2099

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: mauro/refactor-output
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/integration-test-containers.yml
Original file line number Diff line number Diff line change
@@ -48,6 +48,7 @@ jobs:
- integration-tests/images.yml
- integration-tests/Dockerfile
- .github/workflows/integration-test-containers.yml
- collector/proto/third_party/stackrox
build-test-image:
name: Build the integration test image
123 changes: 123 additions & 0 deletions collector/lib/Channel.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
#ifndef COLLECTOR_CHANNEL_H
#define COLLECTOR_CHANNEL_H

#include <atomic>
#include <condition_variable>
#include <cstddef>
#include <iterator>
#include <mutex>
#include <queue>

namespace collector {
template <typename T>
class Channel {
public:
using value_type = T;

Channel(std::size_t capacity = 0)
: capacity_(capacity) {
}

bool IsClosed() {
return closed_.load();
}

void Close() {
closed_.store(true);
cv_.notify_all();
}

friend Channel<T>& operator<<(Channel<T>& ch, const T& data) {
if (ch.IsClosed()) {
// If the channel is closed we simply drop messages
return ch;
}

std::unique_lock<std::mutex> lock{ch.mutex_};
if (ch.capacity_ > 0 && ch.queue_.size() >= ch.capacity_) {
ch.cv_.wait(lock, [&ch] { return ch.queue_.size() < ch.capacity_; });
}

ch.queue_.push(data);
ch.cv_.notify_one();

return ch;
}

friend Channel<T>& operator<<(Channel<T>& ch, T&& data) {
if (ch.IsClosed()) {
// If the channel is closed we simply drop messages
return ch;
}

std::unique_lock<std::mutex> lock{ch.mutex_};
if (ch.capacity_ > 0 && ch.queue_.size() >= ch.capacity_) {
ch.cv_.wait(lock, [&ch] { return ch.queue_.size() < ch.capacity_; });
}

ch.queue_.push(std::move(data));
ch.cv_.notify_one();

return ch;
}

friend Channel<T>& operator>>(Channel<T>& ch, T& out) {
std::unique_lock<std::mutex> lock{ch.mutex_};
if (ch.IsClosed() && ch.queue_.empty()) {
return ch;
}

ch.cv_.wait(lock, [&ch] { return !ch.queue_.empty() || ch.IsClosed(); });
if (!ch.queue_.empty()) {
out = std::move(ch.queue_.front());
ch.queue_.pop();
}

ch.cv_.notify_one();
return ch;
}

bool Empty() {
std::unique_lock<std::mutex> lock{mutex_};
return queue_.empty();
}

struct Iterator {
Iterator(Channel<T>& ch) : ch_(ch) {}

using iterator_category = std::input_iterator_tag;
using value_type = T;
using reference = T&;
using pointer = T*;

Iterator operator++() { return *this; }
reference operator*() {
ch_ >> value_;
return value_;
}

bool operator!=(Iterator& /*unused*/) const {
std::unique_lock<std::mutex> lock{ch_.mutex_};
ch_.cv_.wait(lock, [this] { return !ch_.queue_.empty() || ch_.IsClosed(); });

return !(ch_.IsClosed() && ch_.queue_.empty());
}

private:
Channel<T>& ch_;
value_type value_;
};

Iterator begin() { return Iterator{*this}; }
Iterator end() { return Iterator{*this}; }

private:
std::size_t capacity_;
std::queue<T> queue_;
std::mutex mutex_;
std::condition_variable cv_;
std::atomic<bool> closed_{false};
};
} // namespace collector

#endif
19 changes: 9 additions & 10 deletions collector/lib/CollectorService.cpp
Original file line number Diff line number Diff line change
@@ -40,8 +40,7 @@ CollectorService::CollectorService(CollectorConfig& config, std::atomic<ControlV
// Network tracking
if (!config_.grpc_channel || !config_.DisableNetworkFlows()) {
// In case if no GRPC is used, continue to setup networking infrasturcture
// with empty grpc_channel. NetworkConnectionInfoServiceComm will pick it
// up and use stdout instead.
// with empty grpc_channel. output_ will pick it up and use stdout instead.
conn_tracker_ = std::make_shared<ConnectionTracker>();
UnorderedSet<L4ProtoPortPair> ignored_l4proto_port_pairs(config_.IgnoredL4ProtoPortPairs());
conn_tracker_->UpdateIgnoredL4ProtoPortPairs(std::move(ignored_l4proto_port_pairs));
@@ -52,6 +51,7 @@ CollectorService::CollectorService(CollectorConfig& config, std::atomic<ControlV
conn_tracker_,
config_,
&system_inspector_,
&output_,
exporter_.GetRegistry().get());

auto network_signal_handler = std::make_unique<NetworkSignalHandler>(system_inspector_.GetInspector(), conn_tracker_, system_inspector_.GetUserspaceStats());
@@ -98,6 +98,12 @@ void CollectorService::RunForever() {

CLOG(INFO) << "Network scrape interval set to " << config_.ScrapeInterval() << " seconds";

auto should_stop = [this] {
return control_->load(std::memory_order_relaxed) == STOP_COLLECTOR;
};

output_.WaitReady(should_stop);

if (net_status_notifier_) {
net_status_notifier_->Start();
}
@@ -108,8 +114,7 @@ void CollectorService::RunForever() {

system_inspector_.Start();

ControlValue cv;
while ((cv = control_->load(std::memory_order_relaxed)) != STOP_COLLECTOR) {
while (!should_stop()) {
system_inspector_.Run(*control_);
CLOG(DEBUG) << "Interrupted collector!";
}
@@ -123,12 +128,6 @@ void CollectorService::RunForever() {
CLOG(INFO) << "Shutting down collector.";
}

bool CollectorService::WaitForGRPCServer() {
std::string error_str;
auto interrupt = [this] { return control_->load(std::memory_order_relaxed) == STOP_COLLECTOR; };
return WaitForChannelReady(config_.grpc_channel, interrupt);
}

bool CollectorService::InitKernel() {
auto& startup_diagnostics = StartupDiagnostics::GetInstance();
std::string cm_name(CollectionMethodName(config_.GetCollectionMethod()));
3 changes: 0 additions & 3 deletions collector/lib/CollectorService.h
Original file line number Diff line number Diff line change
@@ -30,8 +30,6 @@ class CollectorService {
bool InitKernel();

private:
bool WaitForGRPCServer();

CollectorConfig& config_;
output::Output output_;
system_inspector::Service system_inspector_;
@@ -53,7 +51,6 @@ class CollectorService {
std::shared_ptr<ConnectionTracker> conn_tracker_;
std::unique_ptr<NetworkStatusNotifier> net_status_notifier_;
std::shared_ptr<ProcessStore> process_store_;
std::shared_ptr<NetworkConnectionInfoServiceComm> network_connection_info_service_comm_;
};

} // namespace collector
62 changes: 0 additions & 62 deletions collector/lib/NetworkConnectionInfoServiceComm.cpp

This file was deleted.

63 changes: 0 additions & 63 deletions collector/lib/NetworkConnectionInfoServiceComm.h

This file was deleted.

Loading