From 9281742ef14b56ba4581f664577d85e1a5ce0e74 Mon Sep 17 00:00:00 2001 From: Marc Tuduri Date: Thu, 11 Jul 2024 16:07:30 +0200 Subject: [PATCH] Revert "Revert "add config flag and more unit tests"" This reverts commit 7b73b331074efa85fa311306e36f1d9fafc7b933. --- pkg/beyla/config.go | 11 +++-- pkg/internal/discover/typer.go | 2 +- pkg/internal/export/alloy/traces.go | 10 ++-- pkg/internal/export/otel/traces.go | 26 +++++----- pkg/internal/export/otel/traces_test.go | 64 +++++++++++++++++-------- pkg/internal/goexec/offsets.go | 9 +++- pkg/internal/goexec/offsets_test.go | 4 +- pkg/internal/pipe/instrumenter.go | 2 +- 8 files changed, 81 insertions(+), 47 deletions(-) diff --git a/pkg/beyla/config.go b/pkg/beyla/config.go index e60cb206a..3730c1438 100644 --- a/pkg/beyla/config.go +++ b/pkg/beyla/config.go @@ -73,11 +73,12 @@ var DefaultConfig = Config{ TTL: defaultMetricsTTL, }, Traces: otel.TracesConfig{ - Protocol: otel.ProtocolUnset, - TracesProtocol: otel.ProtocolUnset, - MaxQueueSize: 4096, - MaxExportBatchSize: 4096, - ReportersCacheLen: ReporterLRUSize, + Protocol: otel.ProtocolUnset, + TracesProtocol: otel.ProtocolUnset, + MaxQueueSize: 4096, + MaxExportBatchSize: 4096, + ReportersCacheLen: ReporterLRUSize, + ReportExceptionEvents: false, Instrumentations: []string{ instrumentations.InstrumentationALL, }, diff --git a/pkg/internal/discover/typer.go b/pkg/internal/discover/typer.go index fe586023e..af96bc377 100644 --- a/pkg/internal/discover/typer.go +++ b/pkg/internal/discover/typer.go @@ -166,7 +166,7 @@ func (t *typer) inspectOffsets(execElf *exec.FileInfo) (*goexec.Offsets, bool, e t.log.Debug("skipping inspection for Go functions", "pid", execElf.Pid, "comm", execElf.CmdExePath) } else { t.log.Debug("inspecting", "pid", execElf.Pid, "comm", execElf.CmdExePath) - offsets, err := goexec.InspectOffsets(execElf, t.allGoFunctions) + offsets, err := goexec.InspectOffsets(&t.cfg.Traces, execElf, t.allGoFunctions) if err != nil { t.log.Debug("couldn't find go specific tracers", "error", err) return nil, false, err diff --git a/pkg/internal/export/alloy/traces.go b/pkg/internal/export/alloy/traces.go index c29358fad..168db59a4 100644 --- a/pkg/internal/export/alloy/traces.go +++ b/pkg/internal/export/alloy/traces.go @@ -13,18 +13,18 @@ import ( ) // TracesReceiver creates a terminal node that consumes request.Spans and sends OpenTelemetry traces to the configured consumers. -func TracesReceiver(ctx context.Context, cfg *beyla.TracesReceiverConfig, userAttribSelection attributes.Selection) pipe.FinalProvider[[]request.Span] { +func TracesReceiver(ctx context.Context, cfg *beyla.Config, userAttribSelection attributes.Selection) pipe.FinalProvider[[]request.Span] { return (&tracesReceiver{ctx: ctx, cfg: cfg, attributes: userAttribSelection}).provideLoop } type tracesReceiver struct { ctx context.Context - cfg *beyla.TracesReceiverConfig + cfg *beyla.Config attributes attributes.Selection } func (tr *tracesReceiver) provideLoop() (pipe.FinalFunc[[]request.Span], error) { - if !tr.cfg.Enabled() { + if !tr.cfg.TracesReceiver.Enabled() { return pipe.IgnoreFinal[[]request.Span](), nil } return func(in <-chan []request.Span) { @@ -41,8 +41,8 @@ func (tr *tracesReceiver) provideLoop() (pipe.FinalFunc[[]request.Span], error) continue } - for _, tc := range tr.cfg.Traces { - traces := otel.GenerateTraces(span, traceAttrs) + for _, tc := range tr.cfg.TracesReceiver.Traces { + traces := otel.GenerateTraces(tr.cfg.Traces, span, traceAttrs) err := tc.ConsumeTraces(tr.ctx, traces) if err != nil { slog.Error("error sending trace to consumer", "error", err) diff --git a/pkg/internal/export/otel/traces.go b/pkg/internal/export/otel/traces.go index a7651db09..addcf5d33 100644 --- a/pkg/internal/export/otel/traces.go +++ b/pkg/internal/export/otel/traces.go @@ -80,6 +80,9 @@ type TracesConfig struct { // BackOffMaxElapsedTime is the maximum amount of time (including retries) spent trying to send a request/batch. BackOffMaxElapsedTime time.Duration `yaml:"backoff_max_elapsed_time" env:"BEYLA_BACKOFF_MAX_ELAPSED_TIME"` + // ReportExceptionEvents enables the reporting of exception events. + ReportExceptionEvents bool `yaml:"report_exception_events" env:"BEYLA_TRACES_REPORT_EXCEPTION_EVENTS"` + ReportersCacheLen int `yaml:"reporters_cache_len" env:"BEYLA_TRACES_REPORT_CACHE_LEN"` // SDKLogLevel works independently from the global LogLevel because it prints GBs of logs in Debug mode @@ -196,7 +199,7 @@ func (tr *tracesOTELReceiver) provideLoop() (pipe.FinalFunc[[]request.Span], err if span.IgnoreSpan == request.IgnoreTraces || !tr.acceptSpan(span) { continue } - traces := GenerateTraces(span, traceAttrs) + traces := GenerateTraces(tr.cfg, span, traceAttrs) err := exp.ConsumeTraces(tr.ctx, traces) if err != nil { slog.Error("error sending trace to consumer", "error", err) @@ -363,7 +366,7 @@ func getRetrySettings(cfg TracesConfig) configretry.BackOffConfig { } // GenerateTraces creates a ptrace.Traces from a request.Span -func GenerateTraces(span *request.Span, userAttrs map[attr.Name]struct{}) ptrace.Traces { +func GenerateTraces(cfg TracesConfig, span *request.Span, userAttrs map[attr.Name]struct{}) ptrace.Traces { t := span.Timings() start := spanStartTime(t) hasSubSpans := t.Start.After(start) @@ -404,6 +407,15 @@ func GenerateTraces(span *request.Span, userAttrs map[attr.Name]struct{}) ptrace m := attrsToMap(attrs) m.CopyTo(s.Attributes()) + // Set error message and stacktrace + if cfg.ReportExceptionEvents && span.ErrorMessage != "" { + e := s.Events().AppendEmpty() + e.SetName(semconv.ExceptionEventName) + e.Attributes().PutStr(string(semconv.ExceptionMessageKey), span.ErrorMessage) + e.Attributes().PutStr(string(semconv.ExceptionTypeKey), "error") + e.Attributes().PutStr(string(semconv.ExceptionStacktraceKey), span.ErrorStacktrace) + } + // Set status code statusCode := codeToStatusCode(request.SpanStatusCode(span)) s.Status().SetCode(statusCode) @@ -793,13 +805,3 @@ func setTracesProtocol(cfg *TracesConfig) { // unset. Guessing it os.Setenv(envTracesProtocol, string(cfg.guessProtocol())) } - -// func recordError(span *HTTPRequestSpan, s trace2.Span) { -// var opts []trace2.EventOption -// opts = append(opts, trace2.WithAttributes( -// semconv.ExceptionType("errors.(*errorString).Error"), // TODO(marctc): make this dynamic -// semconv.ExceptionMessage(span.ErrorMessage), -// semconv.ExceptionStacktrace(span.ErrorStacktrace), -// )) -// s.AddEvent(semconv.ExceptionEventName, opts...) -// } diff --git a/pkg/internal/export/otel/traces_test.go b/pkg/internal/export/otel/traces_test.go index 3d6dfeb99..8a87f994b 100644 --- a/pkg/internal/export/otel/traces_test.go +++ b/pkg/internal/export/otel/traces_test.go @@ -318,18 +318,23 @@ func TestGenerateTraces(t *testing.T) { spanID, _ := trace.SpanIDFromHex("89cbc1f60aab3b01") traceID, _ := trace.TraceIDFromHex("eae56fbbec9505c102e8aabfc6b5c481") span := &request.Span{ - Type: request.EventTypeHTTP, - RequestStart: start.UnixNano(), - Start: start.Add(time.Second).UnixNano(), - End: start.Add(3 * time.Second).UnixNano(), - Method: "GET", - Route: "/test", - Status: 200, - ParentSpanID: parentSpanID, - TraceID: traceID, - SpanID: spanID, + Type: request.EventTypeHTTP, + RequestStart: start.UnixNano(), + Start: start.Add(time.Second).UnixNano(), + End: start.Add(3 * time.Second).UnixNano(), + Method: "GET", + Route: "/test", + Status: 200, + ParentSpanID: parentSpanID, + TraceID: traceID, + SpanID: spanID, + ErrorMessage: "crash", + ErrorStacktrace: "function\nline", } - traces := GenerateTraces(span, map[attr.Name]struct{}{}) + cfg := TracesConfig{ + ReportExceptionEvents: true, + } + traces := GenerateTraces(cfg, span, map[attr.Name]struct{}{}) assert.Equal(t, 1, traces.ResourceSpans().Len()) assert.Equal(t, 1, traces.ResourceSpans().At(0).ScopeSpans().Len()) @@ -357,6 +362,13 @@ func TestGenerateTraces(t *testing.T) { assert.NotEqual(t, spans.At(0).SpanID().String(), spans.At(1).SpanID().String()) assert.NotEqual(t, spans.At(1).SpanID().String(), spans.At(2).SpanID().String()) + + e := spans.At(2).Events().At(0) + val, _ := e.Attributes().Get(string(semconv.ExceptionMessageKey)) + assert.Equal(t, "crash", val.AsString()) + val, _ = e.Attributes().Get(string(semconv.ExceptionStacktraceKey)) + assert.Equal(t, "function\nline", val.AsString()) + }) t.Run("test with subtraces - ids set bpf layer", func(t *testing.T) { @@ -374,7 +386,8 @@ func TestGenerateTraces(t *testing.T) { SpanID: spanID, TraceID: traceID, } - traces := GenerateTraces(span, map[attr.Name]struct{}{}) + cfg := TracesConfig{} + traces := GenerateTraces(cfg, span, map[attr.Name]struct{}{}) assert.Equal(t, 1, traces.ResourceSpans().Len()) assert.Equal(t, 1, traces.ResourceSpans().At(0).ScopeSpans().Len()) @@ -410,7 +423,8 @@ func TestGenerateTraces(t *testing.T) { Route: "/test", Status: 200, } - traces := GenerateTraces(span, map[attr.Name]struct{}{}) + cfg := TracesConfig{} + traces := GenerateTraces(cfg, span, map[attr.Name]struct{}{}) assert.Equal(t, 1, traces.ResourceSpans().Len()) assert.Equal(t, 1, traces.ResourceSpans().At(0).ScopeSpans().Len()) @@ -447,7 +461,8 @@ func TestGenerateTraces(t *testing.T) { SpanID: spanID, TraceID: traceID, } - traces := GenerateTraces(span, map[attr.Name]struct{}{}) + cfg := TracesConfig{} + traces := GenerateTraces(cfg, span, map[attr.Name]struct{}{}) assert.Equal(t, 1, traces.ResourceSpans().Len()) assert.Equal(t, 1, traces.ResourceSpans().At(0).ScopeSpans().Len()) @@ -472,7 +487,8 @@ func TestGenerateTraces(t *testing.T) { ParentSpanID: parentSpanID, TraceID: traceID, } - traces := GenerateTraces(span, map[attr.Name]struct{}{}) + cfg := TracesConfig{} + traces := GenerateTraces(cfg, span, map[attr.Name]struct{}{}) assert.Equal(t, 1, traces.ResourceSpans().Len()) assert.Equal(t, 1, traces.ResourceSpans().At(0).ScopeSpans().Len()) @@ -492,7 +508,8 @@ func TestGenerateTraces(t *testing.T) { Method: "GET", Route: "/test", } - traces := GenerateTraces(span, map[attr.Name]struct{}{}) + cfg := TracesConfig{} + traces := GenerateTraces(cfg, span, map[attr.Name]struct{}{}) assert.Equal(t, 1, traces.ResourceSpans().Len()) assert.Equal(t, 1, traces.ResourceSpans().At(0).ScopeSpans().Len()) @@ -508,7 +525,8 @@ func TestGenerateTraces(t *testing.T) { func TestGenerateTracesAttributes(t *testing.T) { t.Run("test SQL trace generation, no statement", func(t *testing.T) { span := makeSQLRequestSpan("SELECT password FROM credentials WHERE username=\"bill\"") - traces := GenerateTraces(&span, map[attr.Name]struct{}{}) + cfg := TracesConfig{} + traces := GenerateTraces(cfg, &span, map[attr.Name]struct{}{}) assert.Equal(t, 1, traces.ResourceSpans().Len()) assert.Equal(t, 1, traces.ResourceSpans().At(0).ScopeSpans().Len()) @@ -529,7 +547,8 @@ func TestGenerateTracesAttributes(t *testing.T) { t.Run("test SQL trace generation, unknown attribute", func(t *testing.T) { span := makeSQLRequestSpan("SELECT password FROM credentials WHERE username=\"bill\"") - traces := GenerateTraces(&span, map[attr.Name]struct{}{"db.operation.name": {}}) + cfg := TracesConfig{} + traces := GenerateTraces(cfg, &span, map[attr.Name]struct{}{"db.operation.name": {}}) assert.Equal(t, 1, traces.ResourceSpans().Len()) assert.Equal(t, 1, traces.ResourceSpans().At(0).ScopeSpans().Len()) @@ -550,7 +569,8 @@ func TestGenerateTracesAttributes(t *testing.T) { t.Run("test SQL trace generation, unknown attribute", func(t *testing.T) { span := makeSQLRequestSpan("SELECT password FROM credentials WHERE username=\"bill\"") - traces := GenerateTraces(&span, map[attr.Name]struct{}{attr.DBQueryText: {}}) + cfg := TracesConfig{} + traces := GenerateTraces(cfg, &span, map[attr.Name]struct{}{attr.DBQueryText: {}}) assert.Equal(t, 1, traces.ResourceSpans().Len()) assert.Equal(t, 1, traces.ResourceSpans().At(0).ScopeSpans().Len()) @@ -570,7 +590,8 @@ func TestGenerateTracesAttributes(t *testing.T) { }) t.Run("test Kafka trace generation", func(t *testing.T) { span := request.Span{Type: request.EventTypeKafkaClient, Method: "process", Path: "important-topic", OtherNamespace: "test"} - traces := GenerateTraces(&span, map[attr.Name]struct{}{}) + cfg := TracesConfig{} + traces := GenerateTraces(cfg, &span, map[attr.Name]struct{}{}) assert.Equal(t, 1, traces.ResourceSpans().Len()) assert.Equal(t, 1, traces.ResourceSpans().At(0).ScopeSpans().Len()) @@ -1143,7 +1164,8 @@ func generateTracesForSpans(t *testing.T, tr *tracesOTELReceiver, spans []reques if span.IgnoreSpan == request.IgnoreTraces || !tr.acceptSpan(span) { continue } - res = append(res, GenerateTraces(span, traceAttrs)) + cfg := TracesConfig{} + res = append(res, GenerateTraces(cfg, span, traceAttrs)) } return res diff --git a/pkg/internal/goexec/offsets.go b/pkg/internal/goexec/offsets.go index e92625075..cc0834c11 100644 --- a/pkg/internal/goexec/offsets.go +++ b/pkg/internal/goexec/offsets.go @@ -6,6 +6,7 @@ import ( "fmt" "github.com/grafana/beyla/pkg/internal/exec" + "github.com/grafana/beyla/pkg/internal/export/otel" ) type Offsets struct { @@ -24,7 +25,7 @@ type FieldOffsets map[string]any // InspectOffsets gets the memory addresses/offsets of the instrumenting function, as well as the required // parameters fields to be read from the eBPF code -func InspectOffsets(execElf *exec.FileInfo, funcs []string) (*Offsets, error) { +func InspectOffsets(cfg *otel.TracesConfig, execElf *exec.FileInfo, funcs []string) (*Offsets, error) { if execElf == nil { return nil, fmt.Errorf("executable not found") } @@ -34,6 +35,12 @@ func InspectOffsets(execElf *exec.FileInfo, funcs []string) (*Offsets, error) { if err != nil { return nil, fmt.Errorf("finding instrumentation points: %w", err) } + // symTab would be used to find the function name from the address when + // captuing Go errors. If the option is disabled, whe don't need to keep + // the symbol table in memory. + if !cfg.ReportExceptionEvents { + symTab = nil + } if len(found) == 0 { return nil, fmt.Errorf("couldn't find any instrumentation point in %s", execElf.CmdExePath) } diff --git a/pkg/internal/goexec/offsets_test.go b/pkg/internal/goexec/offsets_test.go index 5bfa88259..f7823c9e9 100644 --- a/pkg/internal/goexec/offsets_test.go +++ b/pkg/internal/goexec/offsets_test.go @@ -7,6 +7,7 @@ import ( "github.com/stretchr/testify/require" + "github.com/grafana/beyla/pkg/internal/export/otel" "github.com/grafana/beyla/pkg/internal/testutil" ) @@ -18,7 +19,8 @@ func TestProcessNotFound(t *testing.T) { finish := make(chan struct{}) go func() { defer close(finish) - _, err := InspectOffsets(nil, nil) + cfg := &otel.TracesConfig{} + _, err := InspectOffsets(cfg, nil, nil) require.Error(t, err) }() testutil.ReadChannel(t, finish, 5*time.Second) diff --git a/pkg/internal/pipe/instrumenter.go b/pkg/internal/pipe/instrumenter.go index db16ee878..96a9f152a 100644 --- a/pkg/internal/pipe/instrumenter.go +++ b/pkg/internal/pipe/instrumenter.go @@ -117,7 +117,7 @@ func newGraphBuilder(ctx context.Context, config *beyla.Config, ctxInfo *global. config.Traces.Grafana = &gb.config.Grafana.OTLP pipe.AddFinalProvider(gnb, otelTraces, otel.TracesReceiver(ctx, config.Traces, gb.ctxInfo, config.Attributes.Select)) pipe.AddFinalProvider(gnb, prometheus, prom.PrometheusEndpoint(ctx, gb.ctxInfo, &config.Prometheus, config.Attributes.Select)) - pipe.AddFinalProvider(gnb, alloyTraces, alloy.TracesReceiver(ctx, &config.TracesReceiver, config.Attributes.Select)) + pipe.AddFinalProvider(gnb, alloyTraces, alloy.TracesReceiver(ctx, config, config.Attributes.Select)) pipe.AddFinalProvider(gnb, noop, debug.NoopNode(config.Noop)) pipe.AddFinalProvider(gnb, printer, debug.PrinterNode(config.Printer))