Skip to content

Commit

Permalink
Add BPF level metrics (#1505)
Browse files Browse the repository at this point in the history
* Add BPF level metrics

* make fmt

* Fix linter

* check error

* Add integration test

* Add integration test

* Fix compilation

* change query

* Skip test
  • Loading branch information
marctc authored Jan 14, 2025
1 parent 750f626 commit c47142f
Show file tree
Hide file tree
Showing 8 changed files with 305 additions and 3 deletions.
Binary file modified examples/quickstart/golang/golang
Binary file not shown.
1 change: 1 addition & 0 deletions pkg/export/otel/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ const (
FeatureSpan = "application_span"
FeatureGraph = "application_service_graph"
FeatureProcess = "application_process"
FeatureEBPF = "ebpf"
)

type MetricsConfig struct {
Expand Down
4 changes: 4 additions & 0 deletions pkg/export/prom/prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ func (p *PrometheusConfig) NetworkMetricsEnabled() bool {
return slices.Contains(p.Features, otel.FeatureNetwork)
}

func (p *PrometheusConfig) EBPFEnabled() bool {
return slices.Contains(p.Features, otel.FeatureEBPF)
}

func (p *PrometheusConfig) EndpointEnabled() bool {
return p.Port != 0 || p.Registry != nil
}
Expand Down
273 changes: 273 additions & 0 deletions pkg/export/prom/prom_bpf.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,273 @@
package prom

import (
"context"
"encoding"
"log/slog"
"strconv"
"time"

"github.com/cilium/ebpf"
"github.com/mariomac/pipes/pipe"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/sys/unix"

"github.com/grafana/beyla/pkg/internal/connector"
"github.com/grafana/beyla/pkg/internal/pipe/global"
"github.com/grafana/beyla/pkg/internal/request"
)

// BPFCollector implements prometheus.Collector for collecting metrics about currently loaded eBPF programs.
type BPFCollector struct {
cfg *PrometheusConfig
promConnect *connector.PrometheusManager
bgCtx context.Context
ctxInfo *global.ContextInfo
log *slog.Logger

probeLatencyDesc *prometheus.Desc
mapSizeDesc *prometheus.Desc
progs map[ebpf.ProgramID]*BPFProgram
}

type BPFProgram struct {
runTime time.Duration
runCount uint64
prevRunTime time.Duration
prevRunCount uint64
buckets map[float64]uint64
}

var bucketKeysSeconds = []float64{
0.0000001,
0.0000005,
0.000001,
0.000002,
0.000005,
0.00001,
0.00002,
0.00005,
0.0001,
0.0002,
0.0005,
0.001,
0.002,
0.005,
}

func BPFMetrics(
ctx context.Context,
ctxInfo *global.ContextInfo,
cfg *PrometheusConfig,
) pipe.FinalProvider[[]request.Span] {
return func() (pipe.FinalFunc[[]request.Span], error) {
if !cfg.EndpointEnabled() && !cfg.EBPFEnabled() {
return pipe.IgnoreFinal[[]request.Span](), nil
}
collector := newBPFCollector(ctx, ctxInfo, cfg)
return collector.reportMetrics, nil
}
}

func newBPFCollector(ctx context.Context, ctxInfo *global.ContextInfo, cfg *PrometheusConfig) *BPFCollector {
c := &BPFCollector{
cfg: cfg,
log: slog.With("component", "prom.BPFCollector"),
bgCtx: ctx,
ctxInfo: ctxInfo,
promConnect: ctxInfo.Prometheus,
progs: make(map[ebpf.ProgramID]*BPFProgram),
probeLatencyDesc: prometheus.NewDesc(
prometheus.BuildFQName("bpf", "probe", "latency_seconds"),
"Latency of the probe in seconds",
[]string{"probe_id", "probe_type", "probe_name"},
nil,
),
mapSizeDesc: prometheus.NewDesc(
prometheus.BuildFQName("bpf", "map", "entries_total"),
"Number of entries in the map",
[]string{"map_id", "map_name", "map_type", "max_entries"},
nil,
),
}
// Register the collector
c.promConnect.Register(cfg.Port, cfg.Path, c)
return c
}

func (bc *BPFCollector) reportMetrics(_ <-chan []request.Span) {
go bc.promConnect.StartHTTP(bc.bgCtx)
}

func (bc *BPFCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- bc.probeLatencyDesc
}

func (bc *BPFCollector) Collect(ch chan<- prometheus.Metric) {
bc.log.Debug("Collecting eBPF metrics")
bc.collectProbesMetrics(ch)
bc.collectMapMetrics(ch)
}

func (bc *BPFCollector) collectProbesMetrics(ch chan<- prometheus.Metric) {
_, err := ebpf.EnableStats(unix.BPF_STATS_RUN_TIME)
if err != nil {
bc.log.Error("failed to enable runtime stats", "error", err)
}

// Iterate over all eBPF programs
ids, err := ebpf.ProgramGetNextID(0)
if err != nil {
bc.log.Error("failed to get first program ID", "ID", ids, "error", err)
}

for ids != 0 {
// Get the program from the ID
program, err := ebpf.NewProgramFromID(ids)
if err != nil {
bc.log.Error("failed to load program", "ID", ids, "error", err)
continue
}
defer program.Close()

// Get program info
info, err := program.Info()
if err != nil {
bc.log.Error("failed to get program info", "ID", ids, "error", err)
continue
}

runtime, _ := info.Runtime()
runCount, _ := info.RunCount()
idStr := strconv.FormatUint(uint64(ids), 10)

// Get the previous stats
probe, ok := bc.progs[ids]
if !ok {
probe = &BPFProgram{
runTime: runtime,
runCount: runCount,
prevRunTime: 0,
prevRunCount: 0,
}
bc.progs[ids] = probe
} else {
probe.prevRunTime = probe.runTime
probe.prevRunCount = probe.runCount
probe.runTime = runtime
probe.runCount = runCount
}
probe.updateBuckets()

// Create the histogram metric
ch <- prometheus.MustNewConstHistogram(
bc.probeLatencyDesc,
runCount,
runtime.Seconds(),
probe.buckets,
idStr,
info.Type.String(),
info.Name,
)

// Get the next program ID
ids, _ = ebpf.ProgramGetNextID(ids)
}
}

func (bc *BPFCollector) collectMapMetrics(ch chan<- prometheus.Metric) {
// Iterate over all eBPF maps
ids, err := ebpf.MapGetNextID(0)
if err != nil {
bc.log.Error("failed to get first map ID", "ID", ids, "error", err)
}

for ids != 0 {
// Get the map from the ID
m, err := ebpf.NewMapFromID(ids)
if err != nil {
bc.log.Error("failed to load map", "ID", ids, "error", err)
continue
}
defer m.Close()

// Get map info
info, err := m.Info()
if err != nil {
bc.log.Error("failed to get map info", "ID", ids, "error", err)
continue
}

// This snippet is copied from digitalocean-labs/ebpf_exporter
// https://github.com/digitalocean-labs/ebpf_exporter/blob/main/collectors/map.go
var count uint64
throwawayKey := discardEncoding{}
throwawayValues := make(sliceDiscardEncoding, 0)
iter := m.Iterate()
for iter.Next(&throwawayKey, &throwawayValues) {
count++
}
if err := iter.Err(); err == nil {
// Create the map metric
ch <- prometheus.MustNewConstMetric(
bc.mapSizeDesc,
prometheus.CounterValue,
float64(count),
strconv.FormatUint(uint64(ids), 10),
info.Name,
info.Type.String(),
strconv.FormatUint(uint64(info.MaxEntries), 10),
)
}

// Get the next map ID
ids, _ = ebpf.MapGetNextID(ids)
}
}

// updateBuckets update the histogram buckets for the given data based on previous data.
func (bp *BPFProgram) updateBuckets() {
// Calculate the difference in runtime and run count
deltaTime := bp.runTime - bp.prevRunTime
deltaCount := bp.runCount - bp.prevRunCount

// Calculate the average latency
var avgLatency float64
if deltaCount > 0 {
avgLatency = deltaTime.Seconds() / float64(deltaCount)
} else {
avgLatency = 0
}

// Update the buckets
if bp.buckets == nil {
bp.buckets = make(map[float64]uint64)
}
for _, bucket := range bucketKeysSeconds {
if deltaCount > 0 && avgLatency <= bucket {
bp.buckets[bucket] += deltaCount
break
}
}
}

// Assert that discardEncoding implements the correct interfaces for map iterators.
var (
_ encoding.BinaryUnmarshaler = (*discardEncoding)(nil)
_ encoding.BinaryUnmarshaler = (*sliceDiscardEncoding)(nil)
)

// discardEncoding implements encoding.BinaryMarshaler for eBPF map values such that everything is discarded.
type discardEncoding struct {
}

func (de *discardEncoding) UnmarshalBinary(_ []byte) error {
return nil
}

// sliceDiscardEncoding implements encoding.BinaryMarshaler for eBPF per-cpu map values such that everything is discarded.
type sliceDiscardEncoding []discardEncoding

func (sde *sliceDiscardEncoding) UnmarshalBinary(_ []byte) error {
return nil
}
6 changes: 5 additions & 1 deletion pkg/internal/pipe/instrumenter.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type nodesMap struct {
Metrics pipe.Final[[]request.Span]
Traces pipe.Final[[]request.Span]
Prometheus pipe.Final[[]request.Span]
BpfMetrics pipe.Final[[]request.Span]
Printer pipe.Final[[]request.Span]

ProcessReport pipe.Final[[]request.Span]
Expand Down Expand Up @@ -66,7 +67,9 @@ func otelMetrics(n *nodesMap) *pipe.Final[[]request.Span] { re
func otelTraces(n *nodesMap) *pipe.Final[[]request.Span] { return &n.Traces }
func printer(n *nodesMap) *pipe.Final[[]request.Span] { return &n.Printer }
func prometheus(n *nodesMap) *pipe.Final[[]request.Span] { return &n.Prometheus }
func processReport(n *nodesMap) *pipe.Final[[]request.Span] { return &n.ProcessReport }
func bpfMetrics(n *nodesMap) *pipe.Final[[]request.Span] { return &n.BpfMetrics }

func processReport(n *nodesMap) *pipe.Final[[]request.Span] { return &n.ProcessReport }

// builder with injectable instantiators for unit testing
type graphFunctions struct {
Expand Down Expand Up @@ -115,6 +118,7 @@ func newGraphBuilder(ctx context.Context, config *beyla.Config, ctxInfo *global.
config.Traces.Grafana = &gb.config.Grafana.OTLP
pipe.AddFinalProvider(gnb, otelTraces, otel.TracesReceiver(ctx, config.Traces, gb.ctxInfo, config.Attributes.Select))
pipe.AddFinalProvider(gnb, prometheus, prom.PrometheusEndpoint(ctx, gb.ctxInfo, &config.Prometheus, config.Attributes.Select))
pipe.AddFinalProvider(gnb, bpfMetrics, prom.BPFMetrics(ctx, gb.ctxInfo, &config.Prometheus))
pipe.AddFinalProvider(gnb, alloyTraces, alloy.TracesReceiver(ctx, gb.ctxInfo, &config.TracesReceiver, config.Attributes.Select))

pipe.AddFinalProvider(gnb, printer, debug.PrinterNode(config.TracePrinter))
Expand Down
4 changes: 2 additions & 2 deletions test/integration/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ services:
GOCOVERDIR: "/coverage"
BEYLA_TRACE_PRINTER: "json"
BEYLA_OPEN_PORT: "${BEYLA_OPEN_PORT}"
BEYLA_OTEL_METRICS_FEATURES: "application,application_span,application_process,application_service_graph"
BEYLA_PROMETHEUS_FEATURES: "application,application_span,application_process,application_service_graph"
BEYLA_OTEL_METRICS_FEATURES: "application,application_span,application_process,application_service_graph,ebpf"
BEYLA_PROMETHEUS_FEATURES: "application,application_span,application_process,application_service_graph,ebpf"
BEYLA_DISCOVERY_POLL_INTERVAL: 500ms
BEYLA_EXECUTABLE_NAME: "${BEYLA_EXECUTABLE_NAME}"
BEYLA_SKIP_GO_SPECIFIC_TRACERS: "${BEYLA_SKIP_GO_SPECIFIC_TRACERS}"
Expand Down
19 changes: 19 additions & 0 deletions test/integration/red_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,25 @@ func testPrometheusBeylaBuildInfo(t *testing.T) {
})
}

func testPrometheusBPFMetrics(t *testing.T) {
t.Skip("BPF metrics are not available in the test environment")
pq := prom.Client{HostPort: prometheusHostPort}
var results []prom.Result
test.Eventually(t, testTimeout, func(t require.TestingT) {
var err error
results, err = pq.Query(`bpf_probe_latency_seconds_count{probe_name=~"uprobe_.*"}`)
require.NoError(t, err)
require.NotEmpty(t, results)
})

test.Eventually(t, testTimeout, func(t require.TestingT) {
var err error
results, err = pq.Query(`bpf_map_entries_total{map_name="ongoing_server_"}`)
require.NoError(t, err)
require.NotEmpty(t, results)
})
}

func testPrometheusNoBeylaEvents(t *testing.T) {
pq := prom.Client{HostPort: prometheusHostPort}
var results []prom.Result
Expand Down
1 change: 1 addition & 0 deletions test/integration/suites_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ func TestSuite_PrometheusScrape(t *testing.T) {
t.Run("Internal Prometheus metrics", testInternalPrometheusExport)
t.Run("Testing Beyla Build Info metric", testPrometheusBeylaBuildInfo)
t.Run("Testing for no Beyla self metrics", testPrometheusNoBeylaEvents)
t.Run("Testing BPF metrics", testPrometheusBPFMetrics)

require.NoError(t, compose.Close())
}
Expand Down

0 comments on commit c47142f

Please sign in to comment.