From d382b582a23ede32f509d3d99596c4ee2777373e Mon Sep 17 00:00:00 2001 From: J Pham <94262131+ie-pham@users.noreply.github.com> Date: Thu, 2 Jan 2025 16:31:49 -0600 Subject: [PATCH] Enforce max span attribute size (#4335) * enforce max span attribute size * changelog, lint, docs * updated test and response * update to truncate instead of discard * update test * add metrics to capture truncated attributes count * remove test logs * removing appending "_truncated" * lint * fix comments * manifest.md * fix test * remove log line --- CHANGELOG.md | 1 + docs/sources/tempo/configuration/_index.md | 5 ++ docs/sources/tempo/configuration/manifest.md | 1 + modules/distributor/config.go | 4 ++ modules/distributor/distributor.go | 52 +++++++++++++-- modules/distributor/distributor_test.go | 67 ++++++++++++++++++-- modules/distributor/forwarder_test.go | 4 +- pkg/util/test/req.go | 11 ++++ 8 files changed, 134 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4e918d9b5f5..1fa2263434f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -99,6 +99,7 @@ querier: * [ENHANCEMENT] Update golang.org/x/crypto [#4474](https://github.com/grafana/tempo/pull/4474) (@javiermolinar) * [ENHANCEMENT] Distributor shim: add test verifying receiver works (including metrics) [#4477](https://github.com/grafana/tempo/pull/4477) (@yvrhdn) * [ENHANCEMENT] Reduce goroutines in all non-querier components. [#4484](https://github.com/grafana/tempo/pull/4484) (@joe-elliott) +* [ENHANCEMENT] Add option to enforce max span attribute size [#4335](https://github.com/grafana/tempo/pull/4335) (@ie-pham) * [BUGFIX] Handle invalid TraceQL query filter in tag values v2 disk cache [#4392](https://github.com/grafana/tempo/pull/4392) (@electron0zero) * [BUGFIX] Replace hedged requests roundtrips total with a counter. [#4063](https://github.com/grafana/tempo/pull/4063) [#4078](https://github.com/grafana/tempo/pull/4078) (@galalen) * [BUGFIX] Metrics generators: Correctly drop from the ring before stopping ingestion to reduce drops during a rollout. [#4101](https://github.com/grafana/tempo/pull/4101) (@joe-elliott) diff --git a/docs/sources/tempo/configuration/_index.md b/docs/sources/tempo/configuration/_index.md index 5e551405883..eb125fd9d8d 100644 --- a/docs/sources/tempo/configuration/_index.md +++ b/docs/sources/tempo/configuration/_index.md @@ -232,6 +232,11 @@ distributor: # instruct the client how to retry. [retry_after_on_resource_exhausted: | default = '0' ] + # Optional + # Configures the max size an attribute can be. Any key or value that exceeds this limit will be truncated before storing + # Setting this parameter to '0' would disable this check against attribute size + [max_span_attr_byte: | default = '2048'] + # Optional. # Configures usage trackers in the distributor which expose metrics of ingested traffic grouped by configurable # attributes exposed on /usage_metrics. diff --git a/docs/sources/tempo/configuration/manifest.md b/docs/sources/tempo/configuration/manifest.md index 49692dbb783..ed0e82bbb57 100644 --- a/docs/sources/tempo/configuration/manifest.md +++ b/docs/sources/tempo/configuration/manifest.md @@ -185,6 +185,7 @@ distributor: stale_duration: 15m0s extend_writes: true retry_after_on_resource_exhausted: 0s + max_span_attr_byte: 2048 ingester_client: pool_config: checkinterval: 15s diff --git a/modules/distributor/config.go b/modules/distributor/config.go index 14bb02fa41b..ed8964cf20d 100644 --- a/modules/distributor/config.go +++ b/modules/distributor/config.go @@ -51,6 +51,8 @@ type Config struct { // For testing. factory ring_client.PoolAddrFunc `yaml:"-"` + + MaxSpanAttrByte int `yaml:"max_span_attr_byte"` } type LogSpansConfig struct { @@ -74,6 +76,8 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) cfg.OverrideRingKey = distributorRingKey cfg.ExtendWrites = true + cfg.MaxSpanAttrByte = 2048 // 2KB + f.BoolVar(&cfg.LogReceivedSpans.Enabled, util.PrefixConfig(prefix, "log-received-spans.enabled"), false, "Enable to log every received span to help debug ingestion or calculate span error distributions using the logs.") f.BoolVar(&cfg.LogReceivedSpans.IncludeAllAttributes, util.PrefixConfig(prefix, "log-received-spans.include-attributes"), false, "Enable to include span attributes in the logs.") f.BoolVar(&cfg.LogReceivedSpans.FilterByStatusError, util.PrefixConfig(prefix, "log-received-spans.filter-by-status-error"), false, "Enable to filter out spans without status error.") diff --git a/modules/distributor/distributor.go b/modules/distributor/distributor.go index d98dbe9797d..ce18d1240c0 100644 --- a/modules/distributor/distributor.go +++ b/modules/distributor/distributor.go @@ -35,6 +35,7 @@ import ( "github.com/grafana/tempo/modules/overrides" "github.com/grafana/tempo/pkg/model" "github.com/grafana/tempo/pkg/tempopb" + v1_common "github.com/grafana/tempo/pkg/tempopb/common/v1" v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1" "github.com/grafana/tempo/pkg/usagestats" tempo_util "github.com/grafana/tempo/pkg/util" @@ -112,6 +113,11 @@ var ( Name: "distributor_metrics_generator_clients", Help: "The current number of metrics-generator clients.", }) + metricAttributesTruncated = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "tempo", + Name: "distributor_attributes_truncated_total", + Help: "The total number of attribute keys or values truncated per tenant", + }, []string{"tenant"}) statBytesReceived = usagestats.NewCounter("distributor_bytes_received") statSpansReceived = usagestats.NewCounter("distributor_spans_received") @@ -378,13 +384,17 @@ func (d *Distributor) PushTraces(ctx context.Context, traces ptrace.Traces) (*te d.usage.Observe(userID, batches) } - keys, rebatchedTraces, err := requestsByTraceID(batches, userID, spanCount) + keys, rebatchedTraces, truncatedAttributeCount, err := requestsByTraceID(batches, userID, spanCount, d.cfg.MaxSpanAttrByte) if err != nil { overrides.RecordDiscardedSpans(spanCount, reasonInternalError, userID) logDiscardedResourceSpans(batches, userID, &d.cfg.LogDiscardedSpans, d.logger) return nil, err } + if truncatedAttributeCount > 0 { + metricAttributesTruncated.WithLabelValues(userID).Add(float64(truncatedAttributeCount)) + } + err = d.sendToIngestersViaBytes(ctx, userID, spanCount, rebatchedTraces, keys) if err != nil { return nil, err @@ -525,18 +535,29 @@ func (d *Distributor) UsageTrackerHandler() http.Handler { // requestsByTraceID takes an incoming tempodb.PushRequest and creates a set of keys for the hash ring // and traces to pass onto the ingesters. -func requestsByTraceID(batches []*v1.ResourceSpans, userID string, spanCount int) ([]uint32, []*rebatchedTrace, error) { +func requestsByTraceID(batches []*v1.ResourceSpans, userID string, spanCount, maxSpanAttrSize int) ([]uint32, []*rebatchedTrace, int, error) { const tracesPerBatch = 20 // p50 of internal env tracesByID := make(map[uint32]*rebatchedTrace, tracesPerBatch) + truncatedAttributeCount := 0 for _, b := range batches { spansByILS := make(map[uint32]*v1.ScopeSpans) + // check for large resources for large attributes + if maxSpanAttrSize > 0 && b.Resource != nil { + resourceAttrTruncatedCount := processAttributes(b.Resource.Attributes, maxSpanAttrSize) + truncatedAttributeCount += resourceAttrTruncatedCount + } for _, ils := range b.ScopeSpans { for _, span := range ils.Spans { + // check large spans for large attributes + if maxSpanAttrSize > 0 { + spanAttrTruncatedCount := processAttributes(span.Attributes, maxSpanAttrSize) + truncatedAttributeCount += spanAttrTruncatedCount + } traceID := span.TraceId if !validation.ValidTraceID(traceID) { - return nil, nil, status.Errorf(codes.InvalidArgument, "trace ids must be 128 bit, received %d bits", len(traceID)*8) + return nil, nil, 0, status.Errorf(codes.InvalidArgument, "trace ids must be 128 bit, received %d bits", len(traceID)*8) } traceKey := tempo_util.TokenFor(userID, traceID) @@ -602,7 +623,30 @@ func requestsByTraceID(batches []*v1.ResourceSpans, userID string, spanCount int traces = append(traces, r) } - return keys, traces, nil + return keys, traces, truncatedAttributeCount, nil +} + +// find and truncate the span attributes that are too large +func processAttributes(attributes []*v1_common.KeyValue, maxAttrSize int) int { + count := 0 + for _, attr := range attributes { + if len(attr.Key) > maxAttrSize { + attr.Key = attr.Key[:maxAttrSize] + count++ + } + + switch value := attr.GetValue().Value.(type) { + case *v1_common.AnyValue_StringValue: + if len(value.StringValue) > maxAttrSize { + value.StringValue = value.StringValue[:maxAttrSize] + count++ + } + default: + continue + } + } + + return count } // discardedPredicate determines if a trace is discarded based on the number of successful replications. diff --git a/modules/distributor/distributor_test.go b/modules/distributor/distributor_test.go index ac2f8183316..11babaf84b0 100644 --- a/modules/distributor/distributor_test.go +++ b/modules/distributor/distributor_test.go @@ -713,7 +713,7 @@ func TestRequestsByTraceID(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - keys, rebatchedTraces, err := requestsByTraceID(tt.batches, util.FakeTenantID, 1) + keys, rebatchedTraces, _, err := requestsByTraceID(tt.batches, util.FakeTenantID, 1, 1000) require.Equal(t, len(keys), len(rebatchedTraces)) for i, expectedKey := range tt.expectedKeys { @@ -739,9 +739,64 @@ func TestRequestsByTraceID(t *testing.T) { } } +func TestProcessAttributes(t *testing.T) { + spanCount := 10 + batchCount := 3 + trace := test.MakeTraceWithSpanCount(batchCount, spanCount, []byte{0x0A, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F}) + + maxAttrByte := 1000 + longString := strings.Repeat("t", 1100) + + // add long attributes to the resource level + trace.ResourceSpans[0].Resource.Attributes = append(trace.ResourceSpans[0].Resource.Attributes, + test.MakeAttribute("long value", longString), + ) + trace.ResourceSpans[0].Resource.Attributes = append(trace.ResourceSpans[0].Resource.Attributes, + test.MakeAttribute(longString, "long key"), + ) + + // add long attributes to the span level + trace.ResourceSpans[0].ScopeSpans[0].Spans[0].Attributes = append(trace.ResourceSpans[0].ScopeSpans[0].Spans[0].Attributes, + test.MakeAttribute("long value", longString), + ) + trace.ResourceSpans[0].ScopeSpans[0].Spans[0].Attributes = append(trace.ResourceSpans[0].ScopeSpans[0].Spans[0].Attributes, + test.MakeAttribute(longString, "long key"), + ) + + _, rebatchedTrace, truncatedCount, _ := requestsByTraceID(trace.ResourceSpans, "test", spanCount*batchCount, maxAttrByte) + assert.Equal(t, 4, truncatedCount) + for _, rT := range rebatchedTrace { + for _, resource := range rT.trace.ResourceSpans { + // find large resource attributes + for _, attr := range resource.Resource.Attributes { + if attr.Key == "long value" { + assert.Equal(t, longString[:maxAttrByte], attr.Value.GetStringValue()) + } + if attr.Value.GetStringValue() == "long key" { + assert.Equal(t, longString[:maxAttrByte], attr.Key) + } + } + // find large span attributes + for _, scope := range resource.ScopeSpans { + for _, span := range scope.Spans { + for _, attr := range span.Attributes { + if attr.Key == "long value" { + assert.Equal(t, longString[:maxAttrByte], attr.Value.GetStringValue()) + } + if attr.Value.GetStringValue() == "long key" { + assert.Equal(t, longString[:maxAttrByte], attr.Key) + } + } + } + } + + } + } +} + func BenchmarkTestsByRequestID(b *testing.B) { - spansPer := 100 - batches := 10 + spansPer := 5000 + batches := 100 traces := []*tempopb.Trace{ test.MakeTraceWithSpanCount(batches, spansPer, []byte{0x0A, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F}), test.MakeTraceWithSpanCount(batches, spansPer, []byte{0x0B, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F}), @@ -757,14 +812,15 @@ func BenchmarkTestsByRequestID(b *testing.B) { } b.ResetTimer() + b.ReportAllocs() for i := 0; i < b.N; i++ { for _, blerg := range ils { - _, _, err := requestsByTraceID([]*v1.ResourceSpans{ + _, _, _, err := requestsByTraceID([]*v1.ResourceSpans{ { ScopeSpans: blerg, }, - }, "test", spansPer*len(traces)) + }, "test", spansPer*len(traces), 5) require.NoError(b, err) } } @@ -1631,6 +1687,7 @@ func prepare(t *testing.T, limits overrides.Config, logger kitlog.Logger) (*Dist }) } + distributorConfig.MaxSpanAttrByte = 1000 distributorConfig.DistributorRing.HeartbeatPeriod = 100 * time.Millisecond distributorConfig.DistributorRing.InstanceID = strconv.Itoa(rand.Int()) distributorConfig.DistributorRing.KVStore.Mock = nil diff --git a/modules/distributor/forwarder_test.go b/modules/distributor/forwarder_test.go index 61807712a5f..d5dfbff868d 100644 --- a/modules/distributor/forwarder_test.go +++ b/modules/distributor/forwarder_test.go @@ -28,7 +28,7 @@ func TestForwarder(t *testing.T) { require.NoError(t, err) b := test.MakeBatch(10, id) - keys, rebatchedTraces, err := requestsByTraceID([]*v1.ResourceSpans{b}, tenantID, 10) + keys, rebatchedTraces, _, err := requestsByTraceID([]*v1.ResourceSpans{b}, tenantID, 10, 1000) require.NoError(t, err) o, err := overrides.NewOverrides(oCfg, nil, prometheus.DefaultRegisterer) @@ -69,7 +69,7 @@ func TestForwarder_shutdown(t *testing.T) { require.NoError(t, err) b := test.MakeBatch(10, id) - keys, rebatchedTraces, err := requestsByTraceID([]*v1.ResourceSpans{b}, tenantID, 10) + keys, rebatchedTraces, _, err := requestsByTraceID([]*v1.ResourceSpans{b}, tenantID, 10, 1000) require.NoError(t, err) o, err := overrides.NewOverrides(oCfg, nil, prometheus.DefaultRegisterer) diff --git a/pkg/util/test/req.go b/pkg/util/test/req.go index b33370581d1..969179b1016 100644 --- a/pkg/util/test/req.go +++ b/pkg/util/test/req.go @@ -17,6 +17,17 @@ import ( "github.com/stretchr/testify/require" ) +func MakeAttribute(key, value string) *v1_common.KeyValue { + return &v1_common.KeyValue{ + Key: key, + Value: &v1_common.AnyValue{ + Value: &v1_common.AnyValue_StringValue{ + StringValue: value, + }, + }, + } +} + func MakeSpan(traceID []byte) *v1_trace.Span { return MakeSpanWithAttributeCount(traceID, rand.Int()%10+1) }