Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
242 changes: 227 additions & 15 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/configuration/default/config.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -714,7 +714,7 @@ internal "metrics" {
# mermin_pipeline_duration_seconds = [0.00001, 0.00005, 0.0001, 0.0005, 0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 30.0, 60.0]
# mermin_export_batch_size = [1.0, 10.0, 50.0, 100.0, 250.0, 500.0, 1000.0]
# mermin_k8s_watcher_ip_index_update_duration_seconds = [0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0]
# mermin_taskmanager_shutdown_duration_seconds = [0.1, 0.5, 1.0, 5.0, 10.0, 30.0, 60.0, 120.0] # Only when debug_metrics_enabled = true
# mermin_shutdown_duration_seconds = [0.1, 0.5, 1.0, 5.0, 10.0, 30.0, 60.0, 120.0] # Only when debug_metrics_enabled = true
# }

# Endpoints available:
Expand Down
8 changes: 4 additions & 4 deletions docs/configuration/reference/internal-prometheus-metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,9 @@ You can customize these bucket sizes inside a `histogram_buckets` block.
}
```

- `mermin_taskmanager_shutdown_duration_seconds` attribute
- `mermin_shutdown_duration_seconds` attribute

Custom buckets for the `mermin_taskmanager_shutdown_duration_seconds` histogram metric. This metric tracks the duration of shutdown operations.
Custom buckets for the `mermin_shutdown_duration_seconds` histogram metric. This metric tracks the duration of shutdown operations.

**Type:** Array of numbers

Expand All @@ -200,7 +200,7 @@ You can customize these bucket sizes inside a `histogram_buckets` block.
internal "metrics" {
debug_metrics_enabled = true
histogram_buckets {
mermin_taskmanager_shutdown_duration_seconds = [0.1, 0.5, 1.0, 2.0, 5.0, 10.0]
mermin_shutdown_duration_seconds = [0.1, 0.5, 1.0, 2.0, 5.0, 10.0]
}
}
```
Expand All @@ -214,7 +214,7 @@ You can customize these bucket sizes inside a `histogram_buckets` block.
mermin_pipeline_duration_seconds = [0.0001, 0.001, 0.01, 0.1, 1.0, 5.0, 10.0]
mermin_export_batch_size = [10, 50, 100, 500, 1000]
mermin_k8s_watcher_ip_index_update_duration_seconds = [0.001, 0.01, 0.1, 0.5, 1.0]
mermin_taskmanager_shutdown_duration_seconds = [0.1, 0.5, 1.0, 2.0, 5.0, 10.0]
mermin_shutdown_duration_seconds = [0.1, 0.5, 1.0, 2.0, 5.0, 10.0]
}
}
```
Expand Down
8 changes: 0 additions & 8 deletions docs/contributor-guide/development-workflow.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,6 @@ Running the eBPF agent requires elevated privileges. Use the `--config` flag to
cargo run --release --config 'target."cfg(all())".runner="sudo -E"' -- --config local/config.hcl
```

**Using YAML:**

If you prefer YAML format, you can convert your HCL config on-the-fly:

```shell
cargo run --release --config 'target."cfg(all())".runner="sudo -E"' -- --config <(fmtconvert -from hcl -to yaml local/config.hcl)
```

> The `sudo -E` command runs the program as root while preserving the user's environment variables, which is
> necessary for `cargo` to find the correct binary.

Expand Down
33 changes: 24 additions & 9 deletions docs/internal-monitoring/internal-metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ Metrics are categorized into logical subsystems that correspond to different com
- `flow`: Metrics on the Flow Spans
- `interface`: Network interface-related metrics
- `k8s`: For Kubernetes watcher metrics
- `taskmanager`: Internal Mermin tasks metrics
- `shutdown`: Shutdown lifecycle metrics

### eBPF Metrics (`mermin_ebpf_*`)

Expand Down Expand Up @@ -273,20 +273,34 @@ These metrics offer insight into the internal pipelines used for data mutation (

**Default buckets:** `[0.00001, 0.00005, 0.0001, 0.0005, 0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 30.0, 60.0]` (10μs to 60s)

### TaskManager Metrics (`mermin_taskmanager_*`)
### Shutdown Metrics (`mermin_shutdown_*`)

These metrics track the number and type of active background tasks managed by Mermin.
These metrics track the shutdown lifecycle of Mermin components.

- `mermin_taskmanager_tasks_active`
- `mermin_shutdown_duration_seconds`

Current number of active tasks across all task types.
Duration of shutdown operations.

**Type:** `gauge`
**Type:** `histogram`

**Unit:** seconds

**Default buckets:** `[0.1, 0.5, 1.0, 5.0, 10.0, 30.0, 60.0, 120.0]` (100ms to 120s)

- `mermin_shutdown_timeouts_total`

**Unit:** tasks (count)
Total number of shutdown operations that timed out.

**Type:** `counter`

- `mermin_shutdown_flows_total`

Total flow spans processed during shutdown.

**Type:** `counter`

**Labels:**
- `task`: Task names are dynamic and correspond to spawned background tasks (e.g., watcher tasks, producer tasks)
- `status`: `preserved`, `lost`

## Label Values Reference

Expand All @@ -307,6 +321,7 @@ This section provides a quick reference for all label values used across metrics
| `event` | `apply`, `delete`, `init`, `init_done`, `error` |
| `kind` | `Pod`, `Service`, `Node`, `Deployment`, `ReplicaSet`, `DaemonSet`, `StatefulSet`, `EndpointSlice`, etc. |
| `stage` | `flow_producer_out`, `k8s_decorator_out`, `export_out` |
| `status` (shutdown) | `preserved`, `lost` |

## Histogram Buckets

Expand All @@ -317,7 +332,7 @@ Histogram metrics use configurable bucket boundaries. The default buckets are op
| `mermin_pipeline_duration_seconds` | `[0.00001, 0.00005, 0.0001, 0.0005, 0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 30.0, 60.0]` | 10μs to 60s |
| `mermin_export_batch_size` | `[1, 10, 50, 100, 250, 500, 1000]` | 1 to 1000 spans |
| `mermin_k8s_watcher_ip_index_update_duration_seconds` | `[0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0]` | 1ms to 1s |
| `mermin_taskmanager_shutdown_duration_seconds` (debug) | `[0.1, 0.5, 1.0, 5.0, 10.0, 30.0, 60.0, 120.0]` | 100ms to 120s |
| `mermin_shutdown_duration_seconds` (debug) | `[0.1, 0.5, 1.0, 5.0, 10.0, 30.0, 60.0, 120.0]` | 100ms to 120s |

## Grafana Dashboard

Expand Down
2 changes: 1 addition & 1 deletion mermin-ebpf/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
//! ## Supported Protocols
//!
//! ### Layer 2
//! - Ethernet II (DIX)
//! - Ethernet II
//!
//! ### Layer 3
//! - IPv4 (with variable-length options via IHL)
Expand Down
1 change: 1 addition & 0 deletions mermin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ log = { workspace = true }
mermin-common = { path = "../mermin-common", features = ["user"] }
moka = { version = "0.12", features = ["future"] }
network-types = { path = "../network-types" }
notify = "6.1"
netlink-packet-core = "0.8.1"
netlink-packet-route = "0.25.1"
netlink-sys = "0.8.7"
Expand Down
8 changes: 4 additions & 4 deletions mermin/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use thiserror::Error;

use crate::{
health::HealthError, k8s::K8sError, otlp::OtlpError, runtime::context::ContextError,
health::HealthError, k8s::K8sError, otlp::OtlpError, runtime::conf::ConfError,
span::producer::BootTimeError,
};

Expand All @@ -20,9 +20,9 @@ pub enum MerminError {
#[error("health check error: {0}")]
Health(#[from] HealthError),

/// Runtime context initialization errors
#[error("context error: {0}")]
Context(#[from] ContextError),
/// Runtime initialization errors
#[error("configuration error: {0}")]
Conf(#[from] ConfError),

/// Boot time calculation errors
#[error("boot time error: {0}")]
Expand Down
56 changes: 3 additions & 53 deletions mermin/src/iface/threads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use std::{mem, os::fd::RawFd, sync::Arc, thread, time};

use crossbeam::channel::{Receiver, Sender};
use libc::{
AF_NETLINK, EFD_NONBLOCK, NETLINK_ADD_MEMBERSHIP, POLLERR, POLLHUP, POLLIN, POLLNVAL, SOCK_RAW,
SOL_NETLINK, bind, c_void, eventfd, poll, pollfd, recv, setsockopt, sockaddr_nl, socket,
AF_NETLINK, NETLINK_ADD_MEMBERSHIP, POLLERR, POLLHUP, POLLIN, POLLNVAL, SOCK_RAW, SOL_NETLINK,
bind, c_void, poll, pollfd, recv, setsockopt, sockaddr_nl, socket,
};
use netlink_packet_core::{NetlinkBuffer, NetlinkMessage, NetlinkPayload};
use netlink_packet_route::{
Expand All @@ -26,7 +26,7 @@ use super::{
controller::IfaceController,
types::{ControllerCommand, ControllerEvent, NetlinkEvent},
};
use crate::error::MerminError;
use crate::{error::MerminError, runtime::component::ShutdownEventFd};

/// Default timeout for waiting for controller thread to become ready (in seconds)
pub const CONTROLLER_READY_TIMEOUT_SECS: u64 = 30;
Expand Down Expand Up @@ -59,56 +59,6 @@ impl Drop for NetlinkSocket {
}
}

/// RAII wrapper for eventfd used to signal shutdown to netlink thread.
/// Automatically closes the eventfd when dropped.
pub struct ShutdownEventFd(RawFd);

impl ShutdownEventFd {
pub fn new() -> Result<Self, std::io::Error> {
// SAFETY: eventfd() is safe to call, we check for errors
let fd = unsafe { eventfd(0, EFD_NONBLOCK) };
if fd < 0 {
return Err(std::io::Error::last_os_error());
}
Ok(Self(fd))
}

fn as_raw_fd(&self) -> RawFd {
self.0
}

/// Signal shutdown by writing to the eventfd
pub fn signal(&self) -> Result<(), std::io::Error> {
let val: u64 = 1;
// SAFETY: self.0 is valid, val is properly initialized
let ret = unsafe {
libc::write(
self.0,
&val as *const u64 as *const c_void,
mem::size_of::<u64>(),
)
};
if ret < 0 {
return Err(std::io::Error::last_os_error());
}
Ok(())
}
}

impl Drop for ShutdownEventFd {
fn drop(&mut self) {
// SAFETY: self.0 is a valid file descriptor that we own
unsafe {
libc::close(self.0);
}
trace!(
event.name = "interface_controller.netlink.shutdown_fd_closed",
fd = self.0,
"shutdown eventfd closed via RAII cleanup"
);
}
}

/// Netlink multicast group for link events (interface up/down)
const RTNLGRP_LINK: i32 = 1;

Expand Down
15 changes: 10 additions & 5 deletions mermin/src/k8s/decorator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,20 @@ impl<'a> Decorator<'a> {
}
}

/// Replace the extract rules with new values (for hot-reload).
pub fn update_extract_rules(
&mut self,
source_extract_rules: Vec<String>,
dest_extract_rules: Vec<String>,
) {
self.source_extract_rules = source_extract_rules;
self.dest_extract_rules = dest_extract_rules;
}

/// Decorate a flow span with K8s metadata, with automatic fallback to undecorated span on error.
///
/// This is the primary method for the K8s decorator pipeline. It ensures that spans are
/// NEVER dropped - if decoration fails for any reason, the original undecorated span is returned.
///
/// # Returns
/// A tuple of (FlowSpan, Option<K8sError>) where:
/// - FlowSpan is either decorated (on success) or the original undecorated span (on error)
/// - Option<K8sError> is Some(error) if decoration failed, None if successful
pub async fn decorate_or_fallback(&self, flow_span: FlowSpan) -> (FlowSpan, Option<K8sError>) {
match self.decorate(&flow_span).await {
Ok(decorated_span) => (decorated_span, None),
Expand Down
Loading
Loading