Skip to content

Commit

Permalink
Revert "Revert "add config flag and more unit tests""
Browse files Browse the repository at this point in the history
This reverts commit 7b73b33.
  • Loading branch information
marctc committed Jul 11, 2024
1 parent 4d09646 commit 9281742
Show file tree
Hide file tree
Showing 8 changed files with 81 additions and 47 deletions.
11 changes: 6 additions & 5 deletions pkg/beyla/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,12 @@ var DefaultConfig = Config{
TTL: defaultMetricsTTL,
},
Traces: otel.TracesConfig{
Protocol: otel.ProtocolUnset,
TracesProtocol: otel.ProtocolUnset,
MaxQueueSize: 4096,
MaxExportBatchSize: 4096,
ReportersCacheLen: ReporterLRUSize,
Protocol: otel.ProtocolUnset,
TracesProtocol: otel.ProtocolUnset,
MaxQueueSize: 4096,
MaxExportBatchSize: 4096,
ReportersCacheLen: ReporterLRUSize,
ReportExceptionEvents: false,
Instrumentations: []string{
instrumentations.InstrumentationALL,
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/internal/discover/typer.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func (t *typer) inspectOffsets(execElf *exec.FileInfo) (*goexec.Offsets, bool, e
t.log.Debug("skipping inspection for Go functions", "pid", execElf.Pid, "comm", execElf.CmdExePath)
} else {
t.log.Debug("inspecting", "pid", execElf.Pid, "comm", execElf.CmdExePath)
offsets, err := goexec.InspectOffsets(execElf, t.allGoFunctions)
offsets, err := goexec.InspectOffsets(&t.cfg.Traces, execElf, t.allGoFunctions)
if err != nil {
t.log.Debug("couldn't find go specific tracers", "error", err)
return nil, false, err
Expand Down
10 changes: 5 additions & 5 deletions pkg/internal/export/alloy/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,18 @@ import (
)

// TracesReceiver creates a terminal node that consumes request.Spans and sends OpenTelemetry traces to the configured consumers.
func TracesReceiver(ctx context.Context, cfg *beyla.TracesReceiverConfig, userAttribSelection attributes.Selection) pipe.FinalProvider[[]request.Span] {
func TracesReceiver(ctx context.Context, cfg *beyla.Config, userAttribSelection attributes.Selection) pipe.FinalProvider[[]request.Span] {
return (&tracesReceiver{ctx: ctx, cfg: cfg, attributes: userAttribSelection}).provideLoop
}

type tracesReceiver struct {
ctx context.Context
cfg *beyla.TracesReceiverConfig
cfg *beyla.Config
attributes attributes.Selection
}

func (tr *tracesReceiver) provideLoop() (pipe.FinalFunc[[]request.Span], error) {
if !tr.cfg.Enabled() {
if !tr.cfg.TracesReceiver.Enabled() {
return pipe.IgnoreFinal[[]request.Span](), nil
}
return func(in <-chan []request.Span) {
Expand All @@ -41,8 +41,8 @@ func (tr *tracesReceiver) provideLoop() (pipe.FinalFunc[[]request.Span], error)
continue
}

for _, tc := range tr.cfg.Traces {
traces := otel.GenerateTraces(span, traceAttrs)
for _, tc := range tr.cfg.TracesReceiver.Traces {
traces := otel.GenerateTraces(tr.cfg.Traces, span, traceAttrs)
err := tc.ConsumeTraces(tr.ctx, traces)
if err != nil {
slog.Error("error sending trace to consumer", "error", err)
Expand Down
26 changes: 14 additions & 12 deletions pkg/internal/export/otel/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ type TracesConfig struct {
// BackOffMaxElapsedTime is the maximum amount of time (including retries) spent trying to send a request/batch.
BackOffMaxElapsedTime time.Duration `yaml:"backoff_max_elapsed_time" env:"BEYLA_BACKOFF_MAX_ELAPSED_TIME"`

// ReportExceptionEvents enables the reporting of exception events.
ReportExceptionEvents bool `yaml:"report_exception_events" env:"BEYLA_TRACES_REPORT_EXCEPTION_EVENTS"`

ReportersCacheLen int `yaml:"reporters_cache_len" env:"BEYLA_TRACES_REPORT_CACHE_LEN"`

// SDKLogLevel works independently from the global LogLevel because it prints GBs of logs in Debug mode
Expand Down Expand Up @@ -196,7 +199,7 @@ func (tr *tracesOTELReceiver) provideLoop() (pipe.FinalFunc[[]request.Span], err
if span.IgnoreSpan == request.IgnoreTraces || !tr.acceptSpan(span) {
continue
}
traces := GenerateTraces(span, traceAttrs)
traces := GenerateTraces(tr.cfg, span, traceAttrs)
err := exp.ConsumeTraces(tr.ctx, traces)
if err != nil {
slog.Error("error sending trace to consumer", "error", err)
Expand Down Expand Up @@ -363,7 +366,7 @@ func getRetrySettings(cfg TracesConfig) configretry.BackOffConfig {
}

// GenerateTraces creates a ptrace.Traces from a request.Span
func GenerateTraces(span *request.Span, userAttrs map[attr.Name]struct{}) ptrace.Traces {
func GenerateTraces(cfg TracesConfig, span *request.Span, userAttrs map[attr.Name]struct{}) ptrace.Traces {
t := span.Timings()
start := spanStartTime(t)
hasSubSpans := t.Start.After(start)
Expand Down Expand Up @@ -404,6 +407,15 @@ func GenerateTraces(span *request.Span, userAttrs map[attr.Name]struct{}) ptrace
m := attrsToMap(attrs)
m.CopyTo(s.Attributes())

// Set error message and stacktrace
if cfg.ReportExceptionEvents && span.ErrorMessage != "" {
e := s.Events().AppendEmpty()
e.SetName(semconv.ExceptionEventName)
e.Attributes().PutStr(string(semconv.ExceptionMessageKey), span.ErrorMessage)
e.Attributes().PutStr(string(semconv.ExceptionTypeKey), "error")
e.Attributes().PutStr(string(semconv.ExceptionStacktraceKey), span.ErrorStacktrace)
}

// Set status code
statusCode := codeToStatusCode(request.SpanStatusCode(span))
s.Status().SetCode(statusCode)
Expand Down Expand Up @@ -793,13 +805,3 @@ func setTracesProtocol(cfg *TracesConfig) {
// unset. Guessing it
os.Setenv(envTracesProtocol, string(cfg.guessProtocol()))
}

// func recordError(span *HTTPRequestSpan, s trace2.Span) {
// var opts []trace2.EventOption
// opts = append(opts, trace2.WithAttributes(
// semconv.ExceptionType("errors.(*errorString).Error"), // TODO(marctc): make this dynamic
// semconv.ExceptionMessage(span.ErrorMessage),
// semconv.ExceptionStacktrace(span.ErrorStacktrace),
// ))
// s.AddEvent(semconv.ExceptionEventName, opts...)
// }
64 changes: 43 additions & 21 deletions pkg/internal/export/otel/traces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,18 +318,23 @@ func TestGenerateTraces(t *testing.T) {
spanID, _ := trace.SpanIDFromHex("89cbc1f60aab3b01")
traceID, _ := trace.TraceIDFromHex("eae56fbbec9505c102e8aabfc6b5c481")
span := &request.Span{
Type: request.EventTypeHTTP,
RequestStart: start.UnixNano(),
Start: start.Add(time.Second).UnixNano(),
End: start.Add(3 * time.Second).UnixNano(),
Method: "GET",
Route: "/test",
Status: 200,
ParentSpanID: parentSpanID,
TraceID: traceID,
SpanID: spanID,
Type: request.EventTypeHTTP,
RequestStart: start.UnixNano(),
Start: start.Add(time.Second).UnixNano(),
End: start.Add(3 * time.Second).UnixNano(),
Method: "GET",
Route: "/test",
Status: 200,
ParentSpanID: parentSpanID,
TraceID: traceID,
SpanID: spanID,
ErrorMessage: "crash",
ErrorStacktrace: "function\nline",
}
traces := GenerateTraces(span, map[attr.Name]struct{}{})
cfg := TracesConfig{
ReportExceptionEvents: true,
}
traces := GenerateTraces(cfg, span, map[attr.Name]struct{}{})

assert.Equal(t, 1, traces.ResourceSpans().Len())
assert.Equal(t, 1, traces.ResourceSpans().At(0).ScopeSpans().Len())
Expand Down Expand Up @@ -357,6 +362,13 @@ func TestGenerateTraces(t *testing.T) {

assert.NotEqual(t, spans.At(0).SpanID().String(), spans.At(1).SpanID().String())
assert.NotEqual(t, spans.At(1).SpanID().String(), spans.At(2).SpanID().String())

e := spans.At(2).Events().At(0)
val, _ := e.Attributes().Get(string(semconv.ExceptionMessageKey))
assert.Equal(t, "crash", val.AsString())
val, _ = e.Attributes().Get(string(semconv.ExceptionStacktraceKey))
assert.Equal(t, "function\nline", val.AsString())

})

t.Run("test with subtraces - ids set bpf layer", func(t *testing.T) {
Expand All @@ -374,7 +386,8 @@ func TestGenerateTraces(t *testing.T) {
SpanID: spanID,
TraceID: traceID,
}
traces := GenerateTraces(span, map[attr.Name]struct{}{})
cfg := TracesConfig{}
traces := GenerateTraces(cfg, span, map[attr.Name]struct{}{})

assert.Equal(t, 1, traces.ResourceSpans().Len())
assert.Equal(t, 1, traces.ResourceSpans().At(0).ScopeSpans().Len())
Expand Down Expand Up @@ -410,7 +423,8 @@ func TestGenerateTraces(t *testing.T) {
Route: "/test",
Status: 200,
}
traces := GenerateTraces(span, map[attr.Name]struct{}{})
cfg := TracesConfig{}
traces := GenerateTraces(cfg, span, map[attr.Name]struct{}{})

assert.Equal(t, 1, traces.ResourceSpans().Len())
assert.Equal(t, 1, traces.ResourceSpans().At(0).ScopeSpans().Len())
Expand Down Expand Up @@ -447,7 +461,8 @@ func TestGenerateTraces(t *testing.T) {
SpanID: spanID,
TraceID: traceID,
}
traces := GenerateTraces(span, map[attr.Name]struct{}{})
cfg := TracesConfig{}
traces := GenerateTraces(cfg, span, map[attr.Name]struct{}{})

assert.Equal(t, 1, traces.ResourceSpans().Len())
assert.Equal(t, 1, traces.ResourceSpans().At(0).ScopeSpans().Len())
Expand All @@ -472,7 +487,8 @@ func TestGenerateTraces(t *testing.T) {
ParentSpanID: parentSpanID,
TraceID: traceID,
}
traces := GenerateTraces(span, map[attr.Name]struct{}{})
cfg := TracesConfig{}
traces := GenerateTraces(cfg, span, map[attr.Name]struct{}{})

assert.Equal(t, 1, traces.ResourceSpans().Len())
assert.Equal(t, 1, traces.ResourceSpans().At(0).ScopeSpans().Len())
Expand All @@ -492,7 +508,8 @@ func TestGenerateTraces(t *testing.T) {
Method: "GET",
Route: "/test",
}
traces := GenerateTraces(span, map[attr.Name]struct{}{})
cfg := TracesConfig{}
traces := GenerateTraces(cfg, span, map[attr.Name]struct{}{})

assert.Equal(t, 1, traces.ResourceSpans().Len())
assert.Equal(t, 1, traces.ResourceSpans().At(0).ScopeSpans().Len())
Expand All @@ -508,7 +525,8 @@ func TestGenerateTraces(t *testing.T) {
func TestGenerateTracesAttributes(t *testing.T) {
t.Run("test SQL trace generation, no statement", func(t *testing.T) {
span := makeSQLRequestSpan("SELECT password FROM credentials WHERE username=\"bill\"")
traces := GenerateTraces(&span, map[attr.Name]struct{}{})
cfg := TracesConfig{}
traces := GenerateTraces(cfg, &span, map[attr.Name]struct{}{})

assert.Equal(t, 1, traces.ResourceSpans().Len())
assert.Equal(t, 1, traces.ResourceSpans().At(0).ScopeSpans().Len())
Expand All @@ -529,7 +547,8 @@ func TestGenerateTracesAttributes(t *testing.T) {

t.Run("test SQL trace generation, unknown attribute", func(t *testing.T) {
span := makeSQLRequestSpan("SELECT password FROM credentials WHERE username=\"bill\"")
traces := GenerateTraces(&span, map[attr.Name]struct{}{"db.operation.name": {}})
cfg := TracesConfig{}
traces := GenerateTraces(cfg, &span, map[attr.Name]struct{}{"db.operation.name": {}})

assert.Equal(t, 1, traces.ResourceSpans().Len())
assert.Equal(t, 1, traces.ResourceSpans().At(0).ScopeSpans().Len())
Expand All @@ -550,7 +569,8 @@ func TestGenerateTracesAttributes(t *testing.T) {

t.Run("test SQL trace generation, unknown attribute", func(t *testing.T) {
span := makeSQLRequestSpan("SELECT password FROM credentials WHERE username=\"bill\"")
traces := GenerateTraces(&span, map[attr.Name]struct{}{attr.DBQueryText: {}})
cfg := TracesConfig{}
traces := GenerateTraces(cfg, &span, map[attr.Name]struct{}{attr.DBQueryText: {}})

assert.Equal(t, 1, traces.ResourceSpans().Len())
assert.Equal(t, 1, traces.ResourceSpans().At(0).ScopeSpans().Len())
Expand All @@ -570,7 +590,8 @@ func TestGenerateTracesAttributes(t *testing.T) {
})
t.Run("test Kafka trace generation", func(t *testing.T) {
span := request.Span{Type: request.EventTypeKafkaClient, Method: "process", Path: "important-topic", OtherNamespace: "test"}
traces := GenerateTraces(&span, map[attr.Name]struct{}{})
cfg := TracesConfig{}
traces := GenerateTraces(cfg, &span, map[attr.Name]struct{}{})

assert.Equal(t, 1, traces.ResourceSpans().Len())
assert.Equal(t, 1, traces.ResourceSpans().At(0).ScopeSpans().Len())
Expand Down Expand Up @@ -1143,7 +1164,8 @@ func generateTracesForSpans(t *testing.T, tr *tracesOTELReceiver, spans []reques
if span.IgnoreSpan == request.IgnoreTraces || !tr.acceptSpan(span) {
continue
}
res = append(res, GenerateTraces(span, traceAttrs))
cfg := TracesConfig{}
res = append(res, GenerateTraces(cfg, span, traceAttrs))
}

return res
Expand Down
9 changes: 8 additions & 1 deletion pkg/internal/goexec/offsets.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"

"github.com/grafana/beyla/pkg/internal/exec"
"github.com/grafana/beyla/pkg/internal/export/otel"
)

type Offsets struct {
Expand All @@ -24,7 +25,7 @@ type FieldOffsets map[string]any

// InspectOffsets gets the memory addresses/offsets of the instrumenting function, as well as the required
// parameters fields to be read from the eBPF code
func InspectOffsets(execElf *exec.FileInfo, funcs []string) (*Offsets, error) {
func InspectOffsets(cfg *otel.TracesConfig, execElf *exec.FileInfo, funcs []string) (*Offsets, error) {
if execElf == nil {
return nil, fmt.Errorf("executable not found")
}
Expand All @@ -34,6 +35,12 @@ func InspectOffsets(execElf *exec.FileInfo, funcs []string) (*Offsets, error) {
if err != nil {
return nil, fmt.Errorf("finding instrumentation points: %w", err)
}
// symTab would be used to find the function name from the address when
// captuing Go errors. If the option is disabled, whe don't need to keep
// the symbol table in memory.
if !cfg.ReportExceptionEvents {
symTab = nil
}
if len(found) == 0 {
return nil, fmt.Errorf("couldn't find any instrumentation point in %s", execElf.CmdExePath)
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/internal/goexec/offsets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/stretchr/testify/require"

"github.com/grafana/beyla/pkg/internal/export/otel"
"github.com/grafana/beyla/pkg/internal/testutil"
)

Expand All @@ -18,7 +19,8 @@ func TestProcessNotFound(t *testing.T) {
finish := make(chan struct{})
go func() {
defer close(finish)
_, err := InspectOffsets(nil, nil)
cfg := &otel.TracesConfig{}
_, err := InspectOffsets(cfg, nil, nil)
require.Error(t, err)
}()
testutil.ReadChannel(t, finish, 5*time.Second)
Expand Down
2 changes: 1 addition & 1 deletion pkg/internal/pipe/instrumenter.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func newGraphBuilder(ctx context.Context, config *beyla.Config, ctxInfo *global.
config.Traces.Grafana = &gb.config.Grafana.OTLP
pipe.AddFinalProvider(gnb, otelTraces, otel.TracesReceiver(ctx, config.Traces, gb.ctxInfo, config.Attributes.Select))
pipe.AddFinalProvider(gnb, prometheus, prom.PrometheusEndpoint(ctx, gb.ctxInfo, &config.Prometheus, config.Attributes.Select))
pipe.AddFinalProvider(gnb, alloyTraces, alloy.TracesReceiver(ctx, &config.TracesReceiver, config.Attributes.Select))
pipe.AddFinalProvider(gnb, alloyTraces, alloy.TracesReceiver(ctx, config, config.Attributes.Select))

pipe.AddFinalProvider(gnb, noop, debug.NoopNode(config.Noop))
pipe.AddFinalProvider(gnb, printer, debug.PrinterNode(config.Printer))
Expand Down

0 comments on commit 9281742

Please sign in to comment.