diff --git a/pkg/export/otel/traces.go b/pkg/export/otel/traces.go index 4782422a8..636e9fbe7 100644 --- a/pkg/export/otel/traces.go +++ b/pkg/export/otel/traces.go @@ -169,6 +169,38 @@ func (tr *tracesOTELReceiver) spanDiscarded(span *request.Span) bool { return span.IgnoreTraces() || span.ServiceID.ExportsOTelTraces() || !tr.acceptSpan(span) } +func (tr *tracesOTELReceiver) processSpans(exp exporter.Traces, spans []request.Span, traceAttrs map[attr.Name]struct{}, envResourceAttrs []attribute.KeyValue, sampler trace.Sampler) { + for i := range spans { + span := &spans[i] + if span.InternalSignal() { + continue + } + if tr.spanDiscarded(span) { + continue + } + + finalAttrs := traceAttributes(span, traceAttrs) + + sr := sampler.ShouldSample(trace.SamplingParameters{ + ParentContext: tr.ctx, + Name: span.TraceName(), + TraceID: span.TraceID, + Kind: spanKind(span), + Attributes: finalAttrs, + }) + + if sr.Decision == trace.Drop { + continue + } + + traces := GenerateTracesWithAttributes(span, tr.ctxInfo.HostID, finalAttrs, envResourceAttrs) + err := exp.ConsumeTraces(tr.ctx, traces) + if err != nil { + slog.Error("error sending trace to consumer", "error", err) + } + } +} + func (tr *tracesOTELReceiver) provideLoop() (pipe.FinalFunc[[]request.Span], error) { if !tr.cfg.Enabled() { return pipe.IgnoreFinal[[]request.Span](), nil @@ -199,22 +231,10 @@ func (tr *tracesOTELReceiver) provideLoop() (pipe.FinalFunc[[]request.Span], err } envResourceAttrs := ResourceAttrsFromEnv() + sampler := tr.cfg.Sampler.Implementation() for spans := range in { - for i := range spans { - span := &spans[i] - if span.InternalSignal() { - continue - } - if tr.spanDiscarded(span) { - continue - } - traces := GenerateTraces(span, tr.ctxInfo.HostID, traceAttrs, envResourceAttrs) - err := exp.ConsumeTraces(tr.ctx, traces) - if err != nil { - slog.Error("error sending trace to consumer", "error", err) - } - } + tr.processSpans(exp, spans, traceAttrs, envResourceAttrs, sampler) } }, nil } @@ -385,8 +405,7 @@ func traceAppResourceAttrs(hostID string, service *svc.ID) []attribute.KeyValue return attrs } -// GenerateTraces creates a ptrace.Traces from a request.Span -func GenerateTraces(span *request.Span, hostID string, userAttrs map[attr.Name]struct{}, envResourceAttrs []attribute.KeyValue) ptrace.Traces { +func GenerateTracesWithAttributes(span *request.Span, hostID string, attrs []attribute.KeyValue, envResourceAttrs []attribute.KeyValue) ptrace.Traces { t := span.Timings() start := spanStartTime(t) hasSubSpans := t.Start.After(start) @@ -425,7 +444,6 @@ func GenerateTraces(span *request.Span, hostID string, userAttrs map[attr.Name]s } // Set span attributes - attrs := traceAttributes(span, userAttrs) m := attrsToMap(attrs) m.CopyTo(s.Attributes()) @@ -436,6 +454,11 @@ func GenerateTraces(span *request.Span, hostID string, userAttrs map[attr.Name]s return traces } +// GenerateTraces creates a ptrace.Traces from a request.Span +func GenerateTraces(span *request.Span, hostID string, userAttrs map[attr.Name]struct{}, envResourceAttrs []attribute.KeyValue) ptrace.Traces { + return GenerateTracesWithAttributes(span, hostID, traceAttributes(span, userAttrs), envResourceAttrs) +} + // createSubSpans creates the internal spans for a request.Span func createSubSpans(span *request.Span, parentSpanID pcommon.SpanID, traceID pcommon.TraceID, ss *ptrace.ScopeSpans, t request.Timings) { // Create a child span showing the queue time diff --git a/pkg/export/otel/traces_test.go b/pkg/export/otel/traces_test.go index 877ce4aba..845c95835 100644 --- a/pkg/export/otel/traces_test.go +++ b/pkg/export/otel/traces_test.go @@ -6,6 +6,7 @@ import ( "net/http" "net/http/httptest" "os" + "strconv" "sync" "sync/atomic" "testing" @@ -15,10 +16,13 @@ import ( "github.com/mariomac/pipes/pipe" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" + sdktrace "go.opentelemetry.io/otel/sdk/trace" semconv "go.opentelemetry.io/otel/semconv/v1.25.0" "go.opentelemetry.io/otel/trace" @@ -600,6 +604,77 @@ func TestGenerateTracesAttributes(t *testing.T) { }) } +func TestTraceSampling(t *testing.T) { + spans := []request.Span{} + start := time.Now() + for i := 0; i < 10; i++ { + span := request.Span{Type: request.EventTypeHTTP, + RequestStart: start.UnixNano(), + Start: start.Add(time.Second).UnixNano(), + End: start.Add(3 * time.Second).UnixNano(), + Method: "GET", + Route: "/test" + strconv.Itoa(i), + Status: 200, + ServiceID: svc.ID{}, + TraceID: randomTraceID(), + } + spans = append(spans, span) + } + + receiver := makeTracesTestReceiver([]string{"http"}) + envResourceAttrs := []attribute.KeyValue{} + + t.Run("test sample all", func(t *testing.T) { + sampler := sdktrace.AlwaysSample() + attrs := make(map[attr.Name]struct{}) + + tr := []ptrace.Traces{} + + exporter := TestExporter{ + collector: func(td ptrace.Traces) { + tr = append(tr, td) + }, + } + + receiver.processSpans(exporter, spans, attrs, envResourceAttrs, sampler) + assert.Equal(t, 10, len(tr)) + }) + + t.Run("test sample nothing", func(t *testing.T) { + sampler := sdktrace.NeverSample() + attrs := make(map[attr.Name]struct{}) + + tr := []ptrace.Traces{} + + exporter := TestExporter{ + collector: func(td ptrace.Traces) { + tr = append(tr, td) + }, + } + + receiver.processSpans(exporter, spans, attrs, envResourceAttrs, sampler) + assert.Equal(t, 0, len(tr)) + }) + + t.Run("test sample 1/10th", func(t *testing.T) { + sampler := sdktrace.TraceIDRatioBased(0.1) + attrs := make(map[attr.Name]struct{}) + + tr := []ptrace.Traces{} + + exporter := TestExporter{ + collector: func(td ptrace.Traces) { + tr = append(tr, td) + }, + } + + receiver.processSpans(exporter, spans, attrs, envResourceAttrs, sampler) + // The result is likely 0,1,2 with 1/10th, but we don't want + // to maybe fail if it accidentally it randomly becomes 3 + assert.GreaterOrEqual(t, 3, len(tr)) + }) +} + func TestAttrsToMap(t *testing.T) { t.Run("test with string attribute", func(t *testing.T) { attrs := []attribute.KeyValue{ @@ -1232,3 +1307,24 @@ func generateTracesForSpans(t *testing.T, tr *tracesOTELReceiver, spans []reques return res } + +type TestExporter struct { + collector func(td ptrace.Traces) +} + +func (e TestExporter) Start(_ context.Context, _ component.Host) error { + return nil +} + +func (e TestExporter) Shutdown(_ context.Context) error { + return nil +} + +func (e TestExporter) ConsumeTraces(_ context.Context, td ptrace.Traces) error { + e.collector(td) + return nil +} + +func (e TestExporter) Capabilities() consumer.Capabilities { + return consumer.Capabilities{} +}