diff --git a/processor/ratelimitprocessor/documentation.md b/processor/ratelimitprocessor/documentation.md index 1f3becd44..d1cf47411 100644 --- a/processor/ratelimitprocessor/documentation.md +++ b/processor/ratelimitprocessor/documentation.md @@ -10,9 +10,9 @@ The following telemetry is emitted by this component. Number of in-flight requests at any given time [Development] -| Unit | Metric Type | Value Type | Stability | -| ---- | ----------- | ---------- | --------- | -| {requests} | Gauge | Int | Development | +| Unit | Metric Type | Value Type | Monotonic | Stability | +| ---- | ----------- | ---------- | --------- | --------- | +| {requests} | Sum | Int | false | Development | ### otelcol_ratelimit.dynamic.escalations diff --git a/processor/ratelimitprocessor/factory.go b/processor/ratelimitprocessor/factory.go index 5c1c3497d..f6d7e7b1b 100644 --- a/processor/ratelimitprocessor/factory.go +++ b/processor/ratelimitprocessor/factory.go @@ -82,7 +82,6 @@ func createLogsProcessor( if err != nil { return nil, err } - var inflight int64 return NewLogsRateLimiterProcessor( rateLimiter, set.TelemetrySettings.Logger, @@ -92,7 +91,6 @@ func createLogsProcessor( func(ctx context.Context, ld plog.Logs) error { return nextConsumer.ConsumeLogs(ctx, ld) }, - &inflight, config.MetadataKeys, ) } @@ -112,7 +110,6 @@ func createMetricsProcessor( if err != nil { return nil, err } - var inflight int64 return NewMetricsRateLimiterProcessor( rateLimiter, set.TelemetrySettings.Logger, @@ -122,7 +119,6 @@ func createMetricsProcessor( func(ctx context.Context, md pmetric.Metrics) error { return nextConsumer.ConsumeMetrics(ctx, md) }, - &inflight, config.MetadataKeys, ) } @@ -142,7 +138,6 @@ func createTracesProcessor( if err != nil { return nil, err } - var inflight int64 return NewTracesRateLimiterProcessor( rateLimiter, set.TelemetrySettings.Logger, @@ -152,7 +147,6 @@ func createTracesProcessor( func(ctx context.Context, td ptrace.Traces) error { return nextConsumer.ConsumeTraces(ctx, td) }, - &inflight, config.MetadataKeys, ) } @@ -172,7 +166,6 @@ func createProfilesProcessor( if err != nil { return nil, err } - var inflight int64 return NewProfilesRateLimiterProcessor( rateLimiter, set.TelemetrySettings.Logger, @@ -182,7 +175,6 @@ func createProfilesProcessor( func(ctx context.Context, td pprofile.Profiles) error { return nextConsumer.ConsumeProfiles(ctx, td) }, - &inflight, config.MetadataKeys, ) } diff --git a/processor/ratelimitprocessor/internal/metadata/generated_telemetry.go b/processor/ratelimitprocessor/internal/metadata/generated_telemetry.go index 80c59b0b2..050979648 100644 --- a/processor/ratelimitprocessor/internal/metadata/generated_telemetry.go +++ b/processor/ratelimitprocessor/internal/metadata/generated_telemetry.go @@ -43,7 +43,7 @@ type TelemetryBuilder struct { meter metric.Meter mu sync.Mutex registrations []metric.Registration - RatelimitConcurrentRequests metric.Int64Gauge + RatelimitConcurrentRequests metric.Int64UpDownCounter RatelimitDynamicEscalations metric.Int64Counter RatelimitRequestDuration metric.Float64Histogram RatelimitRequestSize metric.Int64Histogram @@ -80,7 +80,7 @@ func NewTelemetryBuilder(settings component.TelemetrySettings, options ...Teleme } builder.meter = Meter(settings) var err, errs error - builder.RatelimitConcurrentRequests, err = builder.meter.Int64Gauge( + builder.RatelimitConcurrentRequests, err = builder.meter.Int64UpDownCounter( "otelcol_ratelimit.concurrent_requests", metric.WithDescription("Number of in-flight requests at any given time [Development]"), metric.WithUnit("{requests}"), diff --git a/processor/ratelimitprocessor/internal/metadatatest/generated_telemetrytest.go b/processor/ratelimitprocessor/internal/metadatatest/generated_telemetrytest.go index 565a8f57e..43086bc28 100644 --- a/processor/ratelimitprocessor/internal/metadatatest/generated_telemetrytest.go +++ b/processor/ratelimitprocessor/internal/metadatatest/generated_telemetrytest.go @@ -43,8 +43,10 @@ func AssertEqualRatelimitConcurrentRequests(t *testing.T, tt *componenttest.Tele Name: "otelcol_ratelimit.concurrent_requests", Description: "Number of in-flight requests at any given time [Development]", Unit: "{requests}", - Data: metricdata.Gauge[int64]{ - DataPoints: dps, + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: false, + DataPoints: dps, }, } got, err := tt.GetMetric("otelcol_ratelimit.concurrent_requests") diff --git a/processor/ratelimitprocessor/internal/metadatatest/generated_telemetrytest_test.go b/processor/ratelimitprocessor/internal/metadatatest/generated_telemetrytest_test.go index 13e11cee3..33fdd373f 100644 --- a/processor/ratelimitprocessor/internal/metadatatest/generated_telemetrytest_test.go +++ b/processor/ratelimitprocessor/internal/metadatatest/generated_telemetrytest_test.go @@ -36,7 +36,7 @@ func TestSetupTelemetry(t *testing.T) { tb, err := metadata.NewTelemetryBuilder(testTel.NewTelemetrySettings()) require.NoError(t, err) defer tb.Shutdown() - tb.RatelimitConcurrentRequests.Record(context.Background(), 1) + tb.RatelimitConcurrentRequests.Add(context.Background(), 1) tb.RatelimitDynamicEscalations.Add(context.Background(), 1) tb.RatelimitRequestDuration.Record(context.Background(), 1) tb.RatelimitRequestSize.Record(context.Background(), 1) diff --git a/processor/ratelimitprocessor/metadata.yaml b/processor/ratelimitprocessor/metadata.yaml index ab029aac4..ca3ee6fb0 100644 --- a/processor/ratelimitprocessor/metadata.yaml +++ b/processor/ratelimitprocessor/metadata.yaml @@ -19,9 +19,9 @@ telemetry: unit: "{requests}" stability: level: development - gauge: + sum: value_type: int - monotonic: true + monotonic: false ratelimit.dynamic.escalations: enabled: true description: Total number of dynamic rate escalations (dynamic > static) diff --git a/processor/ratelimitprocessor/processor.go b/processor/ratelimitprocessor/processor.go index 3394fdf73..31bc1a830 100644 --- a/processor/ratelimitprocessor/processor.go +++ b/processor/ratelimitprocessor/processor.go @@ -19,7 +19,6 @@ package ratelimitprocessor // import "github.com/elastic/opentelemetry-collector import ( "context" - "sync/atomic" "time" "go.opentelemetry.io/collector/component" @@ -47,7 +46,6 @@ type rateLimiterProcessor struct { telemetryBuilder *metadata.TelemetryBuilder tracerProvider trace.TracerProvider logger *zap.Logger - inflight *int64 strategy Strategy } @@ -82,7 +80,6 @@ func NewLogsRateLimiterProcessor( tracerProvider trace.TracerProvider, strategy Strategy, next func(ctx context.Context, logs plog.Logs) error, - inflight *int64, metadataKeys []string, ) (*LogsRateLimiterProcessor, error) { return &LogsRateLimiterProcessor{ @@ -92,7 +89,6 @@ func NewLogsRateLimiterProcessor( telemetryBuilder: telemetryBuilder, tracerProvider: tracerProvider, logger: logger, - inflight: inflight, metadataKeys: metadataKeys, strategy: strategy, }, @@ -108,7 +104,6 @@ func NewMetricsRateLimiterProcessor( tracerProvider trace.TracerProvider, strategy Strategy, next func(ctx context.Context, metrics pmetric.Metrics) error, - inflight *int64, // used to calculate concurrent requests metadataKeys []string, ) (*MetricsRateLimiterProcessor, error) { return &MetricsRateLimiterProcessor{ @@ -118,7 +113,6 @@ func NewMetricsRateLimiterProcessor( telemetryBuilder: telemetryBuilder, tracerProvider: tracerProvider, logger: logger, - inflight: inflight, metadataKeys: metadataKeys, strategy: strategy, }, @@ -134,7 +128,6 @@ func NewTracesRateLimiterProcessor( tracerProvider trace.TracerProvider, strategy Strategy, next func(ctx context.Context, traces ptrace.Traces) error, - inflight *int64, metadataKeys []string, ) (*TracesRateLimiterProcessor, error) { return &TracesRateLimiterProcessor{ @@ -144,7 +137,6 @@ func NewTracesRateLimiterProcessor( telemetryBuilder: telemetryBuilder, tracerProvider: tracerProvider, logger: logger, - inflight: inflight, metadataKeys: metadataKeys, strategy: strategy, }, @@ -160,7 +152,6 @@ func NewProfilesRateLimiterProcessor( tracerProvider trace.TracerProvider, strategy Strategy, next func(ctx context.Context, profiles pprofile.Profiles) error, - inflight *int64, metadataKeys []string, ) (*ProfilesRateLimiterProcessor, error) { return &ProfilesRateLimiterProcessor{ @@ -170,7 +161,6 @@ func NewProfilesRateLimiterProcessor( telemetryBuilder: telemetryBuilder, tracerProvider: tracerProvider, logger: logger, - inflight: inflight, metadataKeys: metadataKeys, strategy: strategy, }, @@ -216,34 +206,31 @@ func getTelemetryAttrs(attrsCommon []attribute.KeyValue, err error) (attrs []att return attrs } -func rateLimit(ctx context.Context, +func withRateLimit[T any](ctx context.Context, hits int, rateLimit func(ctx context.Context, n int) error, metadataKeys []string, tb *metadata.TelemetryBuilder, logger *zap.Logger, - inflight *int64, + next func(ctx context.Context, data T) error, + data T, ) error { - current := atomic.AddInt64(inflight, 1) attrsCommon := getAttrsFromContext(ctx, metadataKeys) attrsSet := attribute.NewSet(attrsCommon...) - tb.RatelimitConcurrentRequests.Record(ctx, current, + tb.RatelimitConcurrentRequests.Add(ctx, 1, metric.WithAttributeSet(attrsSet)) + defer tb.RatelimitConcurrentRequests.Add(ctx, -1, metric.WithAttributeSet(attrsSet)) + + start := time.Now() + err := rateLimit(ctx, hits) + tb.RatelimitRequestDuration.Record(ctx, + time.Since(start).Seconds(), metric.WithAttributeSet(attrsSet), ) - defer func(start time.Time) { - atomic.AddInt64(inflight, -1) - tb.RatelimitRequestDuration.Record(ctx, time.Since(start).Seconds(), - metric.WithAttributeSet(attrsSet), - ) - }(time.Now()) - - err := rateLimit(ctx, hits) attrRequests := getTelemetryAttrs(attrsCommon, err) attrRequestsSet := attribute.NewSet(attrRequests...) - tb.RatelimitRequestSize.Record(ctx, int64(hits), - metric.WithAttributeSet(attrRequestsSet), - ) + tb.RatelimitRequestSize.Record(ctx, int64(hits), metric.WithAttributeSet(attrRequestsSet)) + tb.RatelimitRequests.Add(ctx, 1, metric.WithAttributeSet(attrRequestsSet)) if err != nil { // enhance error logging with metadata keys fields := make([]zap.Field, 0, len(attrsCommon)+1) @@ -256,71 +243,61 @@ func rateLimit(ctx context.Context, fields = append(fields, zap.String(string(kv.Key), kv.Value.AsString())) } } - logger.Error("request is over the limits defined by the rate limiter", append(fields, zap.Error(err))...) + logger.Error( + "request is over the limits defined by the rate limiter", + append(fields, zap.Error(err))..., + ) + return err } - - tb.RatelimitRequests.Add(ctx, 1, metric.WithAttributeSet(attrRequestsSet)) - return err + return next(ctx, data) } func (r *LogsRateLimiterProcessor) ConsumeLogs(ctx context.Context, ld plog.Logs) error { - if err := rateLimit( + return withRateLimit( ctx, r.count(ld), r.rl.RateLimit, r.metadataKeys, r.telemetryBuilder, r.logger, - r.inflight, - ); err != nil { - return err - } - return r.next(ctx, ld) + r.next, ld, + ) } func (r *MetricsRateLimiterProcessor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { - if err := rateLimit( + return withRateLimit( ctx, r.count(md), r.rl.RateLimit, r.metadataKeys, r.telemetryBuilder, r.logger, - r.inflight, - ); err != nil { - return err - } - return r.next(ctx, md) + r.next, md, + ) } func (r *TracesRateLimiterProcessor) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { - if err := rateLimit( + return withRateLimit( ctx, r.count(td), r.rl.RateLimit, r.metadataKeys, r.telemetryBuilder, r.logger, - r.inflight, - ); err != nil { - return err - } - return r.next(ctx, td) + r.next, td, + ) } func (r *ProfilesRateLimiterProcessor) ConsumeProfiles(ctx context.Context, pd pprofile.Profiles) error { - if err := rateLimit( + return withRateLimit( ctx, r.count(pd), r.rl.RateLimit, r.metadataKeys, r.telemetryBuilder, r.logger, - r.inflight, - ); err != nil { - return err - } - return r.next(ctx, pd) + r.next, pd, + ) } func getLogsCountFunc(strategy Strategy) func(ld plog.Logs) int { diff --git a/processor/ratelimitprocessor/processor_test.go b/processor/ratelimitprocessor/processor_test.go index d164d5745..04029dc41 100644 --- a/processor/ratelimitprocessor/processor_test.go +++ b/processor/ratelimitprocessor/processor_test.go @@ -48,7 +48,6 @@ import ( ) var ( - inflight int64 clientContext = client.NewContext(context.Background(), client.Info{ Metadata: client.NewMetadata(map[string][]string{ "x-tenant-id": {"TestProjectID"}, @@ -153,7 +152,6 @@ func TestConsume_Logs(t *testing.T) { rl: rateLimiter, telemetryBuilder: telemetryBuilder, logger: zap.New(observedZapCore), - inflight: &inflight, metadataKeys: []string{"x-tenant-id"}, strategy: StrategyRateLimitBytes, } @@ -230,7 +228,6 @@ func TestConsume_Metrics(t *testing.T) { rl: rateLimiter, telemetryBuilder: telemetryBuilder, logger: zap.New(observedZapCore), - inflight: &inflight, metadataKeys: []string{"x-tenant-id"}, strategy: StrategyRateLimitBytes, } @@ -307,7 +304,6 @@ func TestConsume_Traces(t *testing.T) { rl: rateLimiter, telemetryBuilder: telemetryBuilder, logger: zap.New(observedZapCore), - inflight: &inflight, metadataKeys: []string{"x-tenant-id"}, strategy: StrategyRateLimitBytes, } @@ -385,7 +381,6 @@ func TestConsume_Profiles(t *testing.T) { rl: rateLimiter, telemetryBuilder: telemetryBuilder, logger: zap.New(observedZapCore), - inflight: &inflight, metadataKeys: []string{"x-tenant-id"}, strategy: StrategyRateLimitBytes, } @@ -470,7 +465,6 @@ func TestConcurrentRequestsTelemetry(t *testing.T) { rl := rateLimiterProcessor{ rl: rateLimiter, telemetryBuilder: telemetryBuilder, - inflight: &inflight, metadataKeys: []string{"x-tenant-id"}, } processor := &MetricsRateLimiterProcessor{ @@ -501,11 +495,9 @@ func TestConcurrentRequestsTelemetry(t *testing.T) { m, err := tt.GetMetric("otelcol_ratelimit.concurrent_requests") require.NoError(t, err, "expected to observe otelcol_ratelimit.concurrent_requests") - for _, dp := range m.Data.(metricdata.Gauge[int64]).DataPoints { - if assert.Equal(t, int64(2), dp.Value, "expected to observe otelcol_ratelimit.concurrent_requests == 2") { - break - } - } + dps := m.Data.(metricdata.Sum[int64]).DataPoints + require.Len(t, dps, 1) + assert.Equal(t, int64(numWorkers), dps[0].Value) // Release both goroutines close(blockCh)