Skip to content

Commit

Permalink
Backport fixes for trace sampling (#1241)
Browse files Browse the repository at this point in the history
  • Loading branch information
grcevski authored Oct 9, 2024
1 parent 3339917 commit 6eccfa3
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 17 deletions.
57 changes: 40 additions & 17 deletions pkg/export/otel/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,38 @@ func (tr *tracesOTELReceiver) spanDiscarded(span *request.Span) bool {
return span.IgnoreTraces() || span.ServiceID.ExportsOTelTraces() || !tr.acceptSpan(span)
}

func (tr *tracesOTELReceiver) processSpans(exp exporter.Traces, spans []request.Span, traceAttrs map[attr.Name]struct{}, envResourceAttrs []attribute.KeyValue, sampler trace.Sampler) {
for i := range spans {
span := &spans[i]
if span.InternalSignal() {
continue
}
if tr.spanDiscarded(span) {
continue
}

finalAttrs := traceAttributes(span, traceAttrs)

sr := sampler.ShouldSample(trace.SamplingParameters{
ParentContext: tr.ctx,
Name: span.TraceName(),
TraceID: span.TraceID,
Kind: spanKind(span),
Attributes: finalAttrs,
})

if sr.Decision == trace.Drop {
continue
}

traces := GenerateTracesWithAttributes(span, tr.ctxInfo.HostID, finalAttrs, envResourceAttrs)
err := exp.ConsumeTraces(tr.ctx, traces)
if err != nil {
slog.Error("error sending trace to consumer", "error", err)
}
}
}

func (tr *tracesOTELReceiver) provideLoop() (pipe.FinalFunc[[]request.Span], error) {
if !tr.cfg.Enabled() {
return pipe.IgnoreFinal[[]request.Span](), nil
Expand Down Expand Up @@ -199,22 +231,10 @@ func (tr *tracesOTELReceiver) provideLoop() (pipe.FinalFunc[[]request.Span], err
}

envResourceAttrs := ResourceAttrsFromEnv()
sampler := tr.cfg.Sampler.Implementation()

for spans := range in {
for i := range spans {
span := &spans[i]
if span.InternalSignal() {
continue
}
if tr.spanDiscarded(span) {
continue
}
traces := GenerateTraces(span, tr.ctxInfo.HostID, traceAttrs, envResourceAttrs)
err := exp.ConsumeTraces(tr.ctx, traces)
if err != nil {
slog.Error("error sending trace to consumer", "error", err)
}
}
tr.processSpans(exp, spans, traceAttrs, envResourceAttrs, sampler)
}
}, nil
}
Expand Down Expand Up @@ -385,8 +405,7 @@ func traceAppResourceAttrs(hostID string, service *svc.ID) []attribute.KeyValue
return attrs
}

// GenerateTraces creates a ptrace.Traces from a request.Span
func GenerateTraces(span *request.Span, hostID string, userAttrs map[attr.Name]struct{}, envResourceAttrs []attribute.KeyValue) ptrace.Traces {
func GenerateTracesWithAttributes(span *request.Span, hostID string, attrs []attribute.KeyValue, envResourceAttrs []attribute.KeyValue) ptrace.Traces {
t := span.Timings()
start := spanStartTime(t)
hasSubSpans := t.Start.After(start)
Expand Down Expand Up @@ -425,7 +444,6 @@ func GenerateTraces(span *request.Span, hostID string, userAttrs map[attr.Name]s
}

// Set span attributes
attrs := traceAttributes(span, userAttrs)
m := attrsToMap(attrs)
m.CopyTo(s.Attributes())

Expand All @@ -436,6 +454,11 @@ func GenerateTraces(span *request.Span, hostID string, userAttrs map[attr.Name]s
return traces
}

// GenerateTraces creates a ptrace.Traces from a request.Span
func GenerateTraces(span *request.Span, hostID string, userAttrs map[attr.Name]struct{}, envResourceAttrs []attribute.KeyValue) ptrace.Traces {
return GenerateTracesWithAttributes(span, hostID, traceAttributes(span, userAttrs), envResourceAttrs)
}

// createSubSpans creates the internal spans for a request.Span
func createSubSpans(span *request.Span, parentSpanID pcommon.SpanID, traceID pcommon.TraceID, ss *ptrace.ScopeSpans, t request.Timings) {
// Create a child span showing the queue time
Expand Down
96 changes: 96 additions & 0 deletions pkg/export/otel/traces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net/http"
"net/http/httptest"
"os"
"strconv"
"sync"
"sync/atomic"
"testing"
Expand All @@ -15,10 +16,13 @@ import (
"github.com/mariomac/pipes/pipe"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.25.0"
"go.opentelemetry.io/otel/trace"

Expand Down Expand Up @@ -600,6 +604,77 @@ func TestGenerateTracesAttributes(t *testing.T) {
})
}

func TestTraceSampling(t *testing.T) {
spans := []request.Span{}
start := time.Now()
for i := 0; i < 10; i++ {
span := request.Span{Type: request.EventTypeHTTP,
RequestStart: start.UnixNano(),
Start: start.Add(time.Second).UnixNano(),
End: start.Add(3 * time.Second).UnixNano(),
Method: "GET",
Route: "/test" + strconv.Itoa(i),
Status: 200,
ServiceID: svc.ID{},
TraceID: randomTraceID(),
}
spans = append(spans, span)
}

receiver := makeTracesTestReceiver([]string{"http"})
envResourceAttrs := []attribute.KeyValue{}

t.Run("test sample all", func(t *testing.T) {
sampler := sdktrace.AlwaysSample()
attrs := make(map[attr.Name]struct{})

tr := []ptrace.Traces{}

exporter := TestExporter{
collector: func(td ptrace.Traces) {
tr = append(tr, td)
},
}

receiver.processSpans(exporter, spans, attrs, envResourceAttrs, sampler)
assert.Equal(t, 10, len(tr))
})

t.Run("test sample nothing", func(t *testing.T) {
sampler := sdktrace.NeverSample()
attrs := make(map[attr.Name]struct{})

tr := []ptrace.Traces{}

exporter := TestExporter{
collector: func(td ptrace.Traces) {
tr = append(tr, td)
},
}

receiver.processSpans(exporter, spans, attrs, envResourceAttrs, sampler)
assert.Equal(t, 0, len(tr))
})

t.Run("test sample 1/10th", func(t *testing.T) {
sampler := sdktrace.TraceIDRatioBased(0.1)
attrs := make(map[attr.Name]struct{})

tr := []ptrace.Traces{}

exporter := TestExporter{
collector: func(td ptrace.Traces) {
tr = append(tr, td)
},
}

receiver.processSpans(exporter, spans, attrs, envResourceAttrs, sampler)
// The result is likely 0,1,2 with 1/10th, but we don't want
// to maybe fail if it accidentally it randomly becomes 3
assert.GreaterOrEqual(t, 3, len(tr))
})
}

func TestAttrsToMap(t *testing.T) {
t.Run("test with string attribute", func(t *testing.T) {
attrs := []attribute.KeyValue{
Expand Down Expand Up @@ -1232,3 +1307,24 @@ func generateTracesForSpans(t *testing.T, tr *tracesOTELReceiver, spans []reques

return res
}

type TestExporter struct {
collector func(td ptrace.Traces)
}

func (e TestExporter) Start(_ context.Context, _ component.Host) error {
return nil
}

func (e TestExporter) Shutdown(_ context.Context) error {
return nil
}

func (e TestExporter) ConsumeTraces(_ context.Context, td ptrace.Traces) error {
e.collector(td)
return nil
}

func (e TestExporter) Capabilities() consumer.Capabilities {
return consumer.Capabilities{}
}

0 comments on commit 6eccfa3

Please sign in to comment.