diff --git a/examples/quickstart/golang/golang b/examples/quickstart/golang/golang index 3d37bd08e..ba0e07f33 100755 Binary files a/examples/quickstart/golang/golang and b/examples/quickstart/golang/golang differ diff --git a/pkg/export/otel/metrics.go b/pkg/export/otel/metrics.go index 4a23a9cf6..c1c444a84 100644 --- a/pkg/export/otel/metrics.go +++ b/pkg/export/otel/metrics.go @@ -60,6 +60,7 @@ const ( FeatureSpan = "application_span" FeatureGraph = "application_service_graph" FeatureProcess = "application_process" + FeatureEBPF = "ebpf" ) type MetricsConfig struct { diff --git a/pkg/export/prom/prom.go b/pkg/export/prom/prom.go index d5aa65c80..b7c4056e4 100644 --- a/pkg/export/prom/prom.go +++ b/pkg/export/prom/prom.go @@ -139,6 +139,10 @@ func (p *PrometheusConfig) NetworkMetricsEnabled() bool { return slices.Contains(p.Features, otel.FeatureNetwork) } +func (p *PrometheusConfig) EBPFEnabled() bool { + return slices.Contains(p.Features, otel.FeatureEBPF) +} + func (p *PrometheusConfig) EndpointEnabled() bool { return p.Port != 0 || p.Registry != nil } diff --git a/pkg/export/prom/prom_bpf.go b/pkg/export/prom/prom_bpf.go new file mode 100644 index 000000000..785bec821 --- /dev/null +++ b/pkg/export/prom/prom_bpf.go @@ -0,0 +1,273 @@ +package prom + +import ( + "context" + "encoding" + "log/slog" + "strconv" + "time" + + "github.com/cilium/ebpf" + "github.com/mariomac/pipes/pipe" + "github.com/prometheus/client_golang/prometheus" + "golang.org/x/sys/unix" + + "github.com/grafana/beyla/pkg/internal/connector" + "github.com/grafana/beyla/pkg/internal/pipe/global" + "github.com/grafana/beyla/pkg/internal/request" +) + +// BPFCollector implements prometheus.Collector for collecting metrics about currently loaded eBPF programs. +type BPFCollector struct { + cfg *PrometheusConfig + promConnect *connector.PrometheusManager + bgCtx context.Context + ctxInfo *global.ContextInfo + log *slog.Logger + + probeLatencyDesc *prometheus.Desc + mapSizeDesc *prometheus.Desc + progs map[ebpf.ProgramID]*BPFProgram +} + +type BPFProgram struct { + runTime time.Duration + runCount uint64 + prevRunTime time.Duration + prevRunCount uint64 + buckets map[float64]uint64 +} + +var bucketKeysSeconds = []float64{ + 0.0000001, + 0.0000005, + 0.000001, + 0.000002, + 0.000005, + 0.00001, + 0.00002, + 0.00005, + 0.0001, + 0.0002, + 0.0005, + 0.001, + 0.002, + 0.005, +} + +func BPFMetrics( + ctx context.Context, + ctxInfo *global.ContextInfo, + cfg *PrometheusConfig, +) pipe.FinalProvider[[]request.Span] { + return func() (pipe.FinalFunc[[]request.Span], error) { + if !cfg.EndpointEnabled() && !cfg.EBPFEnabled() { + return pipe.IgnoreFinal[[]request.Span](), nil + } + collector := newBPFCollector(ctx, ctxInfo, cfg) + return collector.reportMetrics, nil + } +} + +func newBPFCollector(ctx context.Context, ctxInfo *global.ContextInfo, cfg *PrometheusConfig) *BPFCollector { + c := &BPFCollector{ + cfg: cfg, + log: slog.With("component", "prom.BPFCollector"), + bgCtx: ctx, + ctxInfo: ctxInfo, + promConnect: ctxInfo.Prometheus, + progs: make(map[ebpf.ProgramID]*BPFProgram), + probeLatencyDesc: prometheus.NewDesc( + prometheus.BuildFQName("bpf", "probe", "latency_seconds"), + "Latency of the probe in seconds", + []string{"probe_id", "probe_type", "probe_name"}, + nil, + ), + mapSizeDesc: prometheus.NewDesc( + prometheus.BuildFQName("bpf", "map", "entries_total"), + "Number of entries in the map", + []string{"map_id", "map_name", "map_type", "max_entries"}, + nil, + ), + } + // Register the collector + c.promConnect.Register(cfg.Port, cfg.Path, c) + return c +} + +func (bc *BPFCollector) reportMetrics(_ <-chan []request.Span) { + go bc.promConnect.StartHTTP(bc.bgCtx) +} + +func (bc *BPFCollector) Describe(ch chan<- *prometheus.Desc) { + ch <- bc.probeLatencyDesc +} + +func (bc *BPFCollector) Collect(ch chan<- prometheus.Metric) { + bc.log.Debug("Collecting eBPF metrics") + bc.collectProbesMetrics(ch) + bc.collectMapMetrics(ch) +} + +func (bc *BPFCollector) collectProbesMetrics(ch chan<- prometheus.Metric) { + _, err := ebpf.EnableStats(unix.BPF_STATS_RUN_TIME) + if err != nil { + bc.log.Error("failed to enable runtime stats", "error", err) + } + + // Iterate over all eBPF programs + ids, err := ebpf.ProgramGetNextID(0) + if err != nil { + bc.log.Error("failed to get first program ID", "ID", ids, "error", err) + } + + for ids != 0 { + // Get the program from the ID + program, err := ebpf.NewProgramFromID(ids) + if err != nil { + bc.log.Error("failed to load program", "ID", ids, "error", err) + continue + } + defer program.Close() + + // Get program info + info, err := program.Info() + if err != nil { + bc.log.Error("failed to get program info", "ID", ids, "error", err) + continue + } + + runtime, _ := info.Runtime() + runCount, _ := info.RunCount() + idStr := strconv.FormatUint(uint64(ids), 10) + + // Get the previous stats + probe, ok := bc.progs[ids] + if !ok { + probe = &BPFProgram{ + runTime: runtime, + runCount: runCount, + prevRunTime: 0, + prevRunCount: 0, + } + bc.progs[ids] = probe + } else { + probe.prevRunTime = probe.runTime + probe.prevRunCount = probe.runCount + probe.runTime = runtime + probe.runCount = runCount + } + probe.updateBuckets() + + // Create the histogram metric + ch <- prometheus.MustNewConstHistogram( + bc.probeLatencyDesc, + runCount, + runtime.Seconds(), + probe.buckets, + idStr, + info.Type.String(), + info.Name, + ) + + // Get the next program ID + ids, _ = ebpf.ProgramGetNextID(ids) + } +} + +func (bc *BPFCollector) collectMapMetrics(ch chan<- prometheus.Metric) { + // Iterate over all eBPF maps + ids, err := ebpf.MapGetNextID(0) + if err != nil { + bc.log.Error("failed to get first map ID", "ID", ids, "error", err) + } + + for ids != 0 { + // Get the map from the ID + m, err := ebpf.NewMapFromID(ids) + if err != nil { + bc.log.Error("failed to load map", "ID", ids, "error", err) + continue + } + defer m.Close() + + // Get map info + info, err := m.Info() + if err != nil { + bc.log.Error("failed to get map info", "ID", ids, "error", err) + continue + } + + // This snippet is copied from digitalocean-labs/ebpf_exporter + // https://github.com/digitalocean-labs/ebpf_exporter/blob/main/collectors/map.go + var count uint64 + throwawayKey := discardEncoding{} + throwawayValues := make(sliceDiscardEncoding, 0) + iter := m.Iterate() + for iter.Next(&throwawayKey, &throwawayValues) { + count++ + } + if err := iter.Err(); err == nil { + // Create the map metric + ch <- prometheus.MustNewConstMetric( + bc.mapSizeDesc, + prometheus.CounterValue, + float64(count), + strconv.FormatUint(uint64(ids), 10), + info.Name, + info.Type.String(), + strconv.FormatUint(uint64(info.MaxEntries), 10), + ) + } + + // Get the next map ID + ids, _ = ebpf.MapGetNextID(ids) + } +} + +// updateBuckets update the histogram buckets for the given data based on previous data. +func (bp *BPFProgram) updateBuckets() { + // Calculate the difference in runtime and run count + deltaTime := bp.runTime - bp.prevRunTime + deltaCount := bp.runCount - bp.prevRunCount + + // Calculate the average latency + var avgLatency float64 + if deltaCount > 0 { + avgLatency = deltaTime.Seconds() / float64(deltaCount) + } else { + avgLatency = 0 + } + + // Update the buckets + if bp.buckets == nil { + bp.buckets = make(map[float64]uint64) + } + for _, bucket := range bucketKeysSeconds { + if deltaCount > 0 && avgLatency <= bucket { + bp.buckets[bucket] += deltaCount + break + } + } +} + +// Assert that discardEncoding implements the correct interfaces for map iterators. +var ( + _ encoding.BinaryUnmarshaler = (*discardEncoding)(nil) + _ encoding.BinaryUnmarshaler = (*sliceDiscardEncoding)(nil) +) + +// discardEncoding implements encoding.BinaryMarshaler for eBPF map values such that everything is discarded. +type discardEncoding struct { +} + +func (de *discardEncoding) UnmarshalBinary(_ []byte) error { + return nil +} + +// sliceDiscardEncoding implements encoding.BinaryMarshaler for eBPF per-cpu map values such that everything is discarded. +type sliceDiscardEncoding []discardEncoding + +func (sde *sliceDiscardEncoding) UnmarshalBinary(_ []byte) error { + return nil +} diff --git a/pkg/internal/pipe/instrumenter.go b/pkg/internal/pipe/instrumenter.go index 23edc03e1..8a76bf83c 100644 --- a/pkg/internal/pipe/instrumenter.go +++ b/pkg/internal/pipe/instrumenter.go @@ -39,6 +39,7 @@ type nodesMap struct { Metrics pipe.Final[[]request.Span] Traces pipe.Final[[]request.Span] Prometheus pipe.Final[[]request.Span] + BpfMetrics pipe.Final[[]request.Span] Printer pipe.Final[[]request.Span] ProcessReport pipe.Final[[]request.Span] @@ -66,7 +67,9 @@ func otelMetrics(n *nodesMap) *pipe.Final[[]request.Span] { re func otelTraces(n *nodesMap) *pipe.Final[[]request.Span] { return &n.Traces } func printer(n *nodesMap) *pipe.Final[[]request.Span] { return &n.Printer } func prometheus(n *nodesMap) *pipe.Final[[]request.Span] { return &n.Prometheus } -func processReport(n *nodesMap) *pipe.Final[[]request.Span] { return &n.ProcessReport } +func bpfMetrics(n *nodesMap) *pipe.Final[[]request.Span] { return &n.BpfMetrics } + +func processReport(n *nodesMap) *pipe.Final[[]request.Span] { return &n.ProcessReport } // builder with injectable instantiators for unit testing type graphFunctions struct { @@ -115,6 +118,7 @@ func newGraphBuilder(ctx context.Context, config *beyla.Config, ctxInfo *global. config.Traces.Grafana = &gb.config.Grafana.OTLP pipe.AddFinalProvider(gnb, otelTraces, otel.TracesReceiver(ctx, config.Traces, gb.ctxInfo, config.Attributes.Select)) pipe.AddFinalProvider(gnb, prometheus, prom.PrometheusEndpoint(ctx, gb.ctxInfo, &config.Prometheus, config.Attributes.Select)) + pipe.AddFinalProvider(gnb, bpfMetrics, prom.BPFMetrics(ctx, gb.ctxInfo, &config.Prometheus)) pipe.AddFinalProvider(gnb, alloyTraces, alloy.TracesReceiver(ctx, gb.ctxInfo, &config.TracesReceiver, config.Attributes.Select)) pipe.AddFinalProvider(gnb, printer, debug.PrinterNode(config.TracePrinter)) diff --git a/test/integration/docker-compose.yml b/test/integration/docker-compose.yml index 450a034a3..201296f4f 100644 --- a/test/integration/docker-compose.yml +++ b/test/integration/docker-compose.yml @@ -36,8 +36,8 @@ services: GOCOVERDIR: "/coverage" BEYLA_TRACE_PRINTER: "json" BEYLA_OPEN_PORT: "${BEYLA_OPEN_PORT}" - BEYLA_OTEL_METRICS_FEATURES: "application,application_span,application_process,application_service_graph" - BEYLA_PROMETHEUS_FEATURES: "application,application_span,application_process,application_service_graph" + BEYLA_OTEL_METRICS_FEATURES: "application,application_span,application_process,application_service_graph,ebpf" + BEYLA_PROMETHEUS_FEATURES: "application,application_span,application_process,application_service_graph,ebpf" BEYLA_DISCOVERY_POLL_INTERVAL: 500ms BEYLA_EXECUTABLE_NAME: "${BEYLA_EXECUTABLE_NAME}" BEYLA_SKIP_GO_SPECIFIC_TRACERS: "${BEYLA_SKIP_GO_SPECIFIC_TRACERS}" diff --git a/test/integration/red_test.go b/test/integration/red_test.go index 0c3adc965..d50d730d6 100644 --- a/test/integration/red_test.go +++ b/test/integration/red_test.go @@ -726,6 +726,25 @@ func testPrometheusBeylaBuildInfo(t *testing.T) { }) } +func testPrometheusBPFMetrics(t *testing.T) { + t.Skip("BPF metrics are not available in the test environment") + pq := prom.Client{HostPort: prometheusHostPort} + var results []prom.Result + test.Eventually(t, testTimeout, func(t require.TestingT) { + var err error + results, err = pq.Query(`bpf_probe_latency_seconds_count{probe_name=~"uprobe_.*"}`) + require.NoError(t, err) + require.NotEmpty(t, results) + }) + + test.Eventually(t, testTimeout, func(t require.TestingT) { + var err error + results, err = pq.Query(`bpf_map_entries_total{map_name="ongoing_server_"}`) + require.NoError(t, err) + require.NotEmpty(t, results) + }) +} + func testPrometheusNoBeylaEvents(t *testing.T) { pq := prom.Client{HostPort: prometheusHostPort} var results []prom.Result diff --git a/test/integration/suites_test.go b/test/integration/suites_test.go index 27fe66156..cf150f3de 100644 --- a/test/integration/suites_test.go +++ b/test/integration/suites_test.go @@ -220,6 +220,7 @@ func TestSuite_PrometheusScrape(t *testing.T) { t.Run("Internal Prometheus metrics", testInternalPrometheusExport) t.Run("Testing Beyla Build Info metric", testPrometheusBeylaBuildInfo) t.Run("Testing for no Beyla self metrics", testPrometheusNoBeylaEvents) + t.Run("Testing BPF metrics", testPrometheusBPFMetrics) require.NoError(t, compose.Close()) }