Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions processor/ratelimitprocessor/documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ The following telemetry is emitted by this component.

Number of in-flight requests at any given time [Development]

| Unit | Metric Type | Value Type | Stability |
| ---- | ----------- | ---------- | --------- |
| {requests} | Gauge | Int | Development |
| Unit | Metric Type | Value Type | Monotonic | Stability |
| ---- | ----------- | ---------- | --------- | --------- |
| {requests} | Sum | Int | false | Development |

### otelcol_ratelimit.dynamic.escalations

Expand Down
8 changes: 0 additions & 8 deletions processor/ratelimitprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ func createLogsProcessor(
if err != nil {
return nil, err
}
var inflight int64
return NewLogsRateLimiterProcessor(
rateLimiter,
set.TelemetrySettings.Logger,
Expand All @@ -92,7 +91,6 @@ func createLogsProcessor(
func(ctx context.Context, ld plog.Logs) error {
return nextConsumer.ConsumeLogs(ctx, ld)
},
&inflight,
config.MetadataKeys,
)
}
Expand All @@ -112,7 +110,6 @@ func createMetricsProcessor(
if err != nil {
return nil, err
}
var inflight int64
return NewMetricsRateLimiterProcessor(
rateLimiter,
set.TelemetrySettings.Logger,
Expand All @@ -122,7 +119,6 @@ func createMetricsProcessor(
func(ctx context.Context, md pmetric.Metrics) error {
return nextConsumer.ConsumeMetrics(ctx, md)
},
&inflight,
config.MetadataKeys,
)
}
Expand All @@ -142,7 +138,6 @@ func createTracesProcessor(
if err != nil {
return nil, err
}
var inflight int64
return NewTracesRateLimiterProcessor(
rateLimiter,
set.TelemetrySettings.Logger,
Expand All @@ -152,7 +147,6 @@ func createTracesProcessor(
func(ctx context.Context, td ptrace.Traces) error {
return nextConsumer.ConsumeTraces(ctx, td)
},
&inflight,
config.MetadataKeys,
)
}
Expand All @@ -172,7 +166,6 @@ func createProfilesProcessor(
if err != nil {
return nil, err
}
var inflight int64
return NewProfilesRateLimiterProcessor(
rateLimiter,
set.TelemetrySettings.Logger,
Expand All @@ -182,7 +175,6 @@ func createProfilesProcessor(
func(ctx context.Context, td pprofile.Profiles) error {
return nextConsumer.ConsumeProfiles(ctx, td)
},
&inflight,
config.MetadataKeys,
)
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions processor/ratelimitprocessor/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ telemetry:
unit: "{requests}"
stability:
level: development
gauge:
sum:
value_type: int
monotonic: true
monotonic: false
ratelimit.dynamic.escalations:
enabled: true
description: Total number of dynamic rate escalations (dynamic > static)
Expand Down
83 changes: 30 additions & 53 deletions processor/ratelimitprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package ratelimitprocessor // import "github.com/elastic/opentelemetry-collector

import (
"context"
"sync/atomic"
"time"

"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -47,7 +46,6 @@ type rateLimiterProcessor struct {
telemetryBuilder *metadata.TelemetryBuilder
tracerProvider trace.TracerProvider
logger *zap.Logger
inflight *int64
strategy Strategy
}

Expand Down Expand Up @@ -82,7 +80,6 @@ func NewLogsRateLimiterProcessor(
tracerProvider trace.TracerProvider,
strategy Strategy,
next func(ctx context.Context, logs plog.Logs) error,
inflight *int64,
metadataKeys []string,
) (*LogsRateLimiterProcessor, error) {
return &LogsRateLimiterProcessor{
Expand All @@ -92,7 +89,6 @@ func NewLogsRateLimiterProcessor(
telemetryBuilder: telemetryBuilder,
tracerProvider: tracerProvider,
logger: logger,
inflight: inflight,
metadataKeys: metadataKeys,
strategy: strategy,
},
Expand All @@ -108,7 +104,6 @@ func NewMetricsRateLimiterProcessor(
tracerProvider trace.TracerProvider,
strategy Strategy,
next func(ctx context.Context, metrics pmetric.Metrics) error,
inflight *int64, // used to calculate concurrent requests
metadataKeys []string,
) (*MetricsRateLimiterProcessor, error) {
return &MetricsRateLimiterProcessor{
Expand All @@ -118,7 +113,6 @@ func NewMetricsRateLimiterProcessor(
telemetryBuilder: telemetryBuilder,
tracerProvider: tracerProvider,
logger: logger,
inflight: inflight,
metadataKeys: metadataKeys,
strategy: strategy,
},
Expand All @@ -134,7 +128,6 @@ func NewTracesRateLimiterProcessor(
tracerProvider trace.TracerProvider,
strategy Strategy,
next func(ctx context.Context, traces ptrace.Traces) error,
inflight *int64,
metadataKeys []string,
) (*TracesRateLimiterProcessor, error) {
return &TracesRateLimiterProcessor{
Expand All @@ -144,7 +137,6 @@ func NewTracesRateLimiterProcessor(
telemetryBuilder: telemetryBuilder,
tracerProvider: tracerProvider,
logger: logger,
inflight: inflight,
metadataKeys: metadataKeys,
strategy: strategy,
},
Expand All @@ -160,7 +152,6 @@ func NewProfilesRateLimiterProcessor(
tracerProvider trace.TracerProvider,
strategy Strategy,
next func(ctx context.Context, profiles pprofile.Profiles) error,
inflight *int64,
metadataKeys []string,
) (*ProfilesRateLimiterProcessor, error) {
return &ProfilesRateLimiterProcessor{
Expand All @@ -170,7 +161,6 @@ func NewProfilesRateLimiterProcessor(
telemetryBuilder: telemetryBuilder,
tracerProvider: tracerProvider,
logger: logger,
inflight: inflight,
metadataKeys: metadataKeys,
strategy: strategy,
},
Expand Down Expand Up @@ -216,34 +206,31 @@ func getTelemetryAttrs(attrsCommon []attribute.KeyValue, err error) (attrs []att
return attrs
}

func rateLimit(ctx context.Context,
func withRateLimit[T any](ctx context.Context,
hits int,
rateLimit func(ctx context.Context, n int) error,
metadataKeys []string,
tb *metadata.TelemetryBuilder,
logger *zap.Logger,
inflight *int64,
next func(ctx context.Context, data T) error,
data T,
) error {
current := atomic.AddInt64(inflight, 1)
attrsCommon := getAttrsFromContext(ctx, metadataKeys)
attrsSet := attribute.NewSet(attrsCommon...)
tb.RatelimitConcurrentRequests.Record(ctx, current,
tb.RatelimitConcurrentRequests.Add(ctx, 1, metric.WithAttributeSet(attrsSet))
defer tb.RatelimitConcurrentRequests.Add(ctx, -1, metric.WithAttributeSet(attrsSet))

start := time.Now()
err := rateLimit(ctx, hits)
tb.RatelimitRequestDuration.Record(ctx,
time.Since(start).Seconds(),
metric.WithAttributeSet(attrsSet),
)

defer func(start time.Time) {
atomic.AddInt64(inflight, -1)
tb.RatelimitRequestDuration.Record(ctx, time.Since(start).Seconds(),
metric.WithAttributeSet(attrsSet),
)
}(time.Now())

err := rateLimit(ctx, hits)
attrRequests := getTelemetryAttrs(attrsCommon, err)
attrRequestsSet := attribute.NewSet(attrRequests...)
tb.RatelimitRequestSize.Record(ctx, int64(hits),
metric.WithAttributeSet(attrRequestsSet),
)
tb.RatelimitRequestSize.Record(ctx, int64(hits), metric.WithAttributeSet(attrRequestsSet))
tb.RatelimitRequests.Add(ctx, 1, metric.WithAttributeSet(attrRequestsSet))
if err != nil {
// enhance error logging with metadata keys
fields := make([]zap.Field, 0, len(attrsCommon)+1)
Expand All @@ -256,71 +243,61 @@ func rateLimit(ctx context.Context,
fields = append(fields, zap.String(string(kv.Key), kv.Value.AsString()))
}
}
logger.Error("request is over the limits defined by the rate limiter", append(fields, zap.Error(err))...)
logger.Error(
"request is over the limits defined by the rate limiter",
append(fields, zap.Error(err))...,
)
return err
}

tb.RatelimitRequests.Add(ctx, 1, metric.WithAttributeSet(attrRequestsSet))
return err
return next(ctx, data)
}

func (r *LogsRateLimiterProcessor) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
if err := rateLimit(
return withRateLimit(
ctx,
r.count(ld),
r.rl.RateLimit,
r.metadataKeys,
r.telemetryBuilder,
r.logger,
r.inflight,
); err != nil {
return err
}
return r.next(ctx, ld)
r.next, ld,
)
}

func (r *MetricsRateLimiterProcessor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
if err := rateLimit(
return withRateLimit(
ctx,
r.count(md),
r.rl.RateLimit,
r.metadataKeys,
r.telemetryBuilder,
r.logger,
r.inflight,
); err != nil {
return err
}
return r.next(ctx, md)
r.next, md,
)
}

func (r *TracesRateLimiterProcessor) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
if err := rateLimit(
return withRateLimit(
ctx,
r.count(td),
r.rl.RateLimit,
r.metadataKeys,
r.telemetryBuilder,
r.logger,
r.inflight,
); err != nil {
return err
}
return r.next(ctx, td)
r.next, td,
)
}

func (r *ProfilesRateLimiterProcessor) ConsumeProfiles(ctx context.Context, pd pprofile.Profiles) error {
if err := rateLimit(
return withRateLimit(
ctx,
r.count(pd),
r.rl.RateLimit,
r.metadataKeys,
r.telemetryBuilder,
r.logger,
r.inflight,
); err != nil {
return err
}
return r.next(ctx, pd)
r.next, pd,
)
}

func getLogsCountFunc(strategy Strategy) func(ld plog.Logs) int {
Expand Down
Loading