Skip to content

Commit 1dc8ec4

Browse files
authored
Fix delay behavior (#660)
* Fix delay behavior
1 parent 0171340 commit 1dc8ec4

File tree

3 files changed

+147
-30
lines changed

3 files changed

+147
-30
lines changed

processor/ratelimitprocessor/gubernator.go

Lines changed: 48 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -125,49 +125,67 @@ func (r *gubernatorRateLimiter) RateLimit(ctx context.Context, hits int) error {
125125
uniqueKey := getUniqueKey(ctx, r.cfg.MetadataKeys)
126126
cfg := resolveRateLimitSettings(r.cfg, uniqueKey)
127127

128-
createdAt := time.Now().UnixMilli()
129-
getRateLimitsResp, err := r.client.GetRateLimits(ctx, &gubernator.GetRateLimitsReq{
130-
Requests: []*gubernator.RateLimitReq{
131-
{
132-
Name: r.set.ID.String(),
133-
UniqueKey: uniqueKey,
134-
Hits: int64(hits),
135-
Behavior: r.behavior,
136-
Algorithm: gubernator.Algorithm_LEAKY_BUCKET,
137-
Limit: int64(cfg.Rate), // rate is per second
138-
Burst: int64(cfg.Burst),
139-
Duration: cfg.ThrottleInterval.Milliseconds(), // duration is in milliseconds, i.e. 1s
140-
CreatedAt: &createdAt,
128+
makeRateLimitRequest := func() (*gubernator.RateLimitResp, error) {
129+
createdAt := time.Now().UnixMilli()
130+
getRateLimitsResp, err := r.client.GetRateLimits(ctx, &gubernator.GetRateLimitsReq{
131+
Requests: []*gubernator.RateLimitReq{
132+
{
133+
Name: r.set.ID.String(),
134+
UniqueKey: uniqueKey,
135+
Hits: int64(hits),
136+
Behavior: r.behavior,
137+
Algorithm: gubernator.Algorithm_LEAKY_BUCKET,
138+
Limit: int64(cfg.Rate), // rate is per second
139+
Burst: int64(cfg.Burst),
140+
Duration: cfg.ThrottleInterval.Milliseconds(), // duration is in milliseconds, i.e. 1s
141+
CreatedAt: &createdAt,
142+
},
141143
},
142-
},
143-
})
144+
})
145+
if err != nil {
146+
return nil, err
147+
}
148+
// Inside the gRPC response, we should have a single-item list of responses.
149+
responses := getRateLimitsResp.GetResponses()
150+
if n := len(responses); n != 1 {
151+
return nil, fmt.Errorf("expected 1 response from gubernator, got %d", n)
152+
}
153+
resp := responses[0]
154+
if resp.GetError() != "" {
155+
return nil, errors.New(resp.GetError())
156+
}
157+
return resp, nil
158+
}
159+
resp, err := makeRateLimitRequest()
144160
if err != nil {
145161
return err
146162
}
147163

148-
// Inside the gRPC response, we should have a single-item list of responses.
149-
responses := getRateLimitsResp.GetResponses()
150-
if n := len(responses); n != 1 {
151-
return fmt.Errorf("expected 1 response from gubernator, got %d", n)
152-
}
153-
resp := responses[0]
154-
if resp.GetError() != "" {
155-
return errors.New(resp.GetError())
156-
}
157-
158164
if resp.GetStatus() == gubernator.Status_OVER_LIMIT {
159165
// Same logic as local
160166
switch r.cfg.ThrottleBehavior {
161167
case ThrottleBehaviorError:
162168
return status.Error(codes.ResourceExhausted, errTooManyRequests.Error())
163169
case ThrottleBehaviorDelay:
164-
delay := time.Duration(resp.GetResetTime()-createdAt) * time.Millisecond
170+
delay := time.Duration(resp.GetResetTime()-time.Now().UnixMilli()) * time.Millisecond
165171
timer := time.NewTimer(delay)
166172
defer timer.Stop()
167-
select {
168-
case <-ctx.Done():
169-
return ctx.Err()
170-
case <-timer.C:
173+
retry:
174+
for {
175+
select {
176+
case <-ctx.Done():
177+
return ctx.Err()
178+
case <-timer.C:
179+
resp, err = makeRateLimitRequest()
180+
if err != nil {
181+
return err
182+
}
183+
if resp.GetStatus() == gubernator.Status_UNDER_LIMIT {
184+
break retry
185+
}
186+
delay = time.Duration(resp.GetResetTime()-time.Now().UnixMilli()) * time.Millisecond
187+
timer.Reset(delay)
188+
}
171189
}
172190
}
173191
}

processor/ratelimitprocessor/gubernator_test.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ package ratelimitprocessor
1919

2020
import (
2121
"context"
22+
"slices"
23+
"sync"
2224
"testing"
2325
"time"
2426

@@ -142,3 +144,50 @@ func TestGubernatorRateLimiter_RateLimit_MetadataKeys(t *testing.T) {
142144
err = rateLimiter.RateLimit(clientContext2, 1)
143145
assert.NoError(t, err)
144146
}
147+
148+
func TestGubernatorRateLimiter_MultipleRequests_Delay(t *testing.T) {
149+
throttleInterval := 100 * time.Millisecond
150+
rl := newTestGubernatorRateLimiter(t, &Config{
151+
RateLimitSettings: RateLimitSettings{
152+
Rate: 1, // request per second
153+
Burst: 1, // capacity only for one
154+
ThrottleBehavior: ThrottleBehaviorDelay,
155+
ThrottleInterval: throttleInterval, // add 1 token after 100ms
156+
},
157+
MetadataKeys: []string{"metadata_key"},
158+
})
159+
160+
// Simulate 4 requests hitting the rate limit simultaneously.
161+
// The first request passes, and the next ones hit it simultaneously.
162+
requests := 5
163+
endingTimes := make([]time.Time, requests)
164+
var wg sync.WaitGroup
165+
wg.Add(requests)
166+
167+
for i := 0; i < requests; i++ {
168+
go func(i int) {
169+
defer wg.Done()
170+
err := rl.RateLimit(context.Background(), 1)
171+
require.NoError(t, err)
172+
endingTimes[i] = time.Now()
173+
}(i)
174+
}
175+
wg.Wait()
176+
177+
// Make sure all ending times have a difference of at least 100ms, as tokens are
178+
// added at that rate. We need to sort them first.
179+
slices.SortFunc(endingTimes, func(a, b time.Time) int {
180+
if a.Before(b) {
181+
return -1
182+
}
183+
return 1
184+
})
185+
186+
for i := 1; i < requests; i++ {
187+
diff := endingTimes[i].Sub(endingTimes[i-1]).Milliseconds()
188+
minExpected := throttleInterval - 5*time.Millisecond // allow small tolerance
189+
if diff < minExpected.Milliseconds() {
190+
t.Fatalf("difference is %dms, requests were sent before tokens were added", diff)
191+
}
192+
}
193+
}

processor/ratelimitprocessor/local_test.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ package ratelimitprocessor
1919

2020
import (
2121
"context"
22+
"slices"
23+
"sync"
2224
"testing"
2325
"time"
2426

@@ -126,3 +128,51 @@ func TestLocalRateLimiter_RateLimit_MetadataKeys(t *testing.T) {
126128
assert.NoError(t, err)
127129
}
128130
}
131+
132+
func TestLocalRateLimiter_MultipleRequests_Delay(t *testing.T) {
133+
throttleInterval := 100 * time.Millisecond
134+
rl := newTestLocalRateLimiter(t, &Config{
135+
Type: LocalRateLimiter,
136+
RateLimitSettings: RateLimitSettings{
137+
Rate: 1, // request per second
138+
Burst: 1, // capacity only for one
139+
ThrottleBehavior: ThrottleBehaviorDelay,
140+
ThrottleInterval: throttleInterval, // add 1 token after 100ms
141+
},
142+
MetadataKeys: []string{"metadata_key"},
143+
})
144+
145+
// Simulate 4 requests hitting the rate limit simultaneously.
146+
// The first request passes, and the next ones hit it simultaneously.
147+
requests := 5
148+
endingTimes := make([]time.Time, requests)
149+
var wg sync.WaitGroup
150+
wg.Add(requests)
151+
152+
for i := 0; i < requests; i++ {
153+
go func(i int) {
154+
defer wg.Done()
155+
err := rl.RateLimit(context.Background(), 1)
156+
require.NoError(t, err)
157+
endingTimes[i] = time.Now()
158+
}(i)
159+
}
160+
wg.Wait()
161+
162+
// Make sure all ending times have a difference of at least 100ms, as tokens are
163+
// added at that rate. We need to sort them first.
164+
slices.SortFunc(endingTimes, func(a, b time.Time) int {
165+
if a.Before(b) {
166+
return -1
167+
}
168+
return 1
169+
})
170+
171+
for i := 1; i < requests; i++ {
172+
diff := endingTimes[i].Sub(endingTimes[i-1]).Milliseconds()
173+
minExpected := throttleInterval - 5*time.Millisecond // allow small tolerance
174+
if diff < minExpected.Milliseconds() {
175+
t.Fatalf("difference is %dms, requests were sent before tokens were added", diff)
176+
}
177+
}
178+
}

0 commit comments

Comments
 (0)