Skip to content

Commit 1ed1d17

Browse files
authored
ratelimitprocessor: Wrap error with code for 429 (#656)
Wraps the error returned by the rate limiter with the grpc code library to ensure that the code is propagated to the client. For non 5xx errors, the code is set to `codes.ResourceExhausted`, which is the appropriate code for rate limiting errors. Part of elastic/hosted-otel-collector#991 Signed-off-by: Marc Lopez Rubio <[email protected]>
1 parent 0601088 commit 1ed1d17

File tree

6 files changed

+17
-13
lines changed

6 files changed

+17
-13
lines changed

processor/ratelimitprocessor/gubernator.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@ import (
2929

3030
"github.com/sirupsen/logrus"
3131
"google.golang.org/grpc"
32+
"google.golang.org/grpc/codes"
3233
"google.golang.org/grpc/credentials/insecure"
34+
"google.golang.org/grpc/status"
3335

3436
"github.com/gubernator-io/gubernator/v2"
3537
"go.opentelemetry.io/collector/component"
@@ -165,7 +167,7 @@ func (r *gubernatorRateLimiter) RateLimit(ctx context.Context, hits int) error {
165167
zap.String("processor_id", r.set.ID.String()),
166168
zap.Strings("metadata_keys", r.cfg.MetadataKeys),
167169
)
168-
return errTooManyRequests
170+
return status.Error(codes.ResourceExhausted, errTooManyRequests.Error())
169171
case ThrottleBehaviorDelay:
170172
delay := time.Duration(resp.GetResetTime()-createdAt) * time.Millisecond
171173
timer := time.NewTimer(delay)

processor/ratelimitprocessor/gubernator_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ func TestGubernatorRateLimiter_RateLimit(t *testing.T) {
100100
err = rateLimiter.RateLimit(context.Background(), 1)
101101
switch behavior {
102102
case ThrottleBehaviorError:
103-
assert.EqualError(t, err, "too many requests")
103+
assert.EqualError(t, err, "rpc error: code = ResourceExhausted desc = too many requests")
104104
case ThrottleBehaviorDelay:
105105
assert.NoError(t, err)
106106
}

processor/ratelimitprocessor/local.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ import (
2525
"go.opentelemetry.io/collector/component"
2626
"go.opentelemetry.io/collector/processor"
2727
"golang.org/x/time/rate"
28+
"google.golang.org/grpc/codes"
29+
"google.golang.org/grpc/status"
2830
)
2931

3032
var _ RateLimiter = (*localRateLimiter)(nil)
@@ -61,12 +63,12 @@ func (r *localRateLimiter) RateLimit(ctx context.Context, hits int) error {
6163
switch cfg.ThrottleBehavior {
6264
case ThrottleBehaviorError:
6365
if ok := limiter.AllowN(time.Now(), hits); !ok {
64-
return errTooManyRequests
66+
return status.Error(codes.ResourceExhausted, errTooManyRequests.Error())
6567
}
6668
case ThrottleBehaviorDelay:
6769
lr := limiter.ReserveN(time.Now(), hits)
6870
if !lr.OK() {
69-
return errTooManyRequests
71+
return status.Error(codes.ResourceExhausted, errTooManyRequests.Error())
7072
}
7173
timer := time.NewTimer(lr.Delay())
7274
defer timer.Stop()

processor/ratelimitprocessor/local_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ func TestLocalRateLimiter_RateLimit(t *testing.T) {
8080
err = rateLimiter.RateLimit(context.Background(), 1) // should fail
8181
switch behavior {
8282
case ThrottleBehaviorError:
83-
assert.EqualError(t, err, "too many requests")
83+
assert.EqualError(t, err, "rpc error: code = ResourceExhausted desc = too many requests")
8484
// retry every 20ms to ensure that RateLimit will recover from error when bucket refills after 1 second
8585
assert.Eventually(t, func() bool {
8686
return rateLimiter.RateLimit(context.Background(), 1) == nil

processor/ratelimitprocessor/processor.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package ratelimitprocessor // import "github.com/elastic/opentelemetry-collector
1919

2020
import (
2121
"context"
22-
"errors"
2322
"fmt"
2423
"sync/atomic"
2524
"time"
@@ -32,6 +31,8 @@ import (
3231
"go.opentelemetry.io/collector/pdata/ptrace"
3332
"go.opentelemetry.io/otel/attribute"
3433
"go.opentelemetry.io/otel/metric"
34+
"google.golang.org/grpc/codes"
35+
"google.golang.org/grpc/status"
3536

3637
"github.com/elastic/opentelemetry-collector-components/internal/sharedcomponent"
3738
"github.com/elastic/opentelemetry-collector-components/processor/ratelimitprocessor/internal/metadata"
@@ -197,7 +198,7 @@ func getTelemetryAttrs(attrsCommon []attribute.KeyValue, err error) (attrs []att
197198
telemetry.WithReason(telemetry.StatusUnderLimit),
198199
telemetry.WithDecision("accepted"),
199200
)
200-
case errors.Is(err, errTooManyRequests):
201+
case status.Code(err) == codes.ResourceExhausted:
201202
attrs = append(attrsCommon,
202203
telemetry.WithDecision("throttled"),
203204
)
@@ -211,8 +212,7 @@ func getTelemetryAttrs(attrsCommon []attribute.KeyValue, err error) (attrs []att
211212
return attrs
212213
}
213214

214-
func rateLimit(
215-
ctx context.Context,
215+
func rateLimit(ctx context.Context,
216216
hits int,
217217
rateLimit func(ctx context.Context, n int) error,
218218
metadataKeys []string,

processor/ratelimitprocessor/processor_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ func TestConsume_Logs(t *testing.T) {
164164
consumed = false
165165
err = processor.ConsumeLogs(clientContext, logs)
166166
assert.False(t, consumed)
167-
assert.EqualError(t, err, "too many requests")
167+
assert.EqualError(t, err, "rpc error: code = ResourceExhausted desc = too many requests")
168168

169169
testRateLimitTelemetry(t, tt)
170170
}
@@ -211,7 +211,7 @@ func TestConsume_Metrics(t *testing.T) {
211211
consumed = false
212212
err = processor.ConsumeMetrics(clientContext, metrics)
213213
assert.False(t, consumed)
214-
assert.EqualError(t, err, "too many requests")
214+
assert.EqualError(t, err, "rpc error: code = ResourceExhausted desc = too many requests")
215215

216216
testRateLimitTelemetry(t, tt)
217217
}
@@ -258,7 +258,7 @@ func TestConsume_Traces(t *testing.T) {
258258
consumed = false
259259
err = processor.ConsumeTraces(clientContext, traces)
260260
assert.False(t, consumed)
261-
assert.EqualError(t, err, "too many requests")
261+
assert.EqualError(t, err, "rpc error: code = ResourceExhausted desc = too many requests")
262262

263263
testRateLimitTelemetry(t, tt)
264264
}
@@ -306,7 +306,7 @@ func TestConsume_Profiles(t *testing.T) {
306306
consumed = false
307307
err = processor.ConsumeProfiles(clientContext, profiles)
308308
assert.False(t, consumed)
309-
assert.EqualError(t, err, "too many requests")
309+
assert.EqualError(t, err, "rpc error: code = ResourceExhausted desc = too many requests")
310310

311311
testRateLimitTelemetry(t, tt)
312312
}

0 commit comments

Comments
 (0)