Skip to content

Commit

Permalink
Ratelimiter polish / fix: improve zero -> nonzero filling behavior fo…
Browse files Browse the repository at this point in the history
…r new ratelimiters (#6280)

# Motivation:
The global ratelimiter system was exhibiting some weird request-rejection at very low RPS usage.
On our dashboards it looks like this:
<img width="927" alt="Screenshot 2024-09-11 at 18 55 09" src="https://github.com/user-attachments/assets/8236d945-0f8f-45f9-9a9c-5c908f386ae4">

Previously I thought this was just due to undesirably-low weights, and #6238 addressed that (and is still a useful addition).

After that was rolled out, behavior improved, but small numbers still occurred... which should not have happened because the "boosting" logic should have meant that the global limits were *at least* identical, and likely larger.

Which drove me to re-read the details and think harder.  And then I found this PR's issue.

# Issue and fix

What was happening is that the initial `rate.NewLimiter(0,0)` detail was "leaking" into limits after the first update, so a request that occurred immediately after would likely be rejected, regardless of the configured limit.

This happens because `(0, 0)` creates a zero-burst limit on the "primary" limiter, and the shadowed `.Allow()` calls were advancing the limiter's internal "now" value...
... and then when the limit and burst were increased, the limiter would have to fill from zero.

This put it in a worse position than local / fallback limiters, which start from `(local, local)` with a zero "now" value, and then the next `.Allow()` is basically guaranteed to fill the token bucket due to many years "elapsing".

So the fix has two parts:

1: Avoid advancing the zero-valued limiter's internal time until a reasonable limit/burst has been set.
This is done by simply not calling it while in startup mode.

2: Avoid advancing limiters' time when setting limit and burst.
This means that after an idle period -> `Update()` -> `Allow()`, tokens will fill as if the new values were set all along, and the setters can be called in any order.

The underlying `rate.Limiter` does *not* do the second, it advances time when setting these... but that seems undesirable.
It means old values are preferred (which is reasonable, they were set when that time passed), *and* it means that the order you call to set both burst and limit has a significant impact on the outcome, even with the same values and the same timing: time passes only on the first call, the second has basically zero elapsed and has no immediate effect at all (unless lowering burst).  I can only see that latter part as surprising, and definitely worth avoiding.

# Alternative approach

2 seems worth keeping.  But 1 has a relatively clear alternative:
Don't create the "primary" limiter until the first `Update()`.

Because it's currently atomic-oriented, this can't be done safely without adding atomics or locks everywhere... so I didn't do that.
If I were to do this, I would just switch to a mutex, the `rate.Limiter` already uses them so it should be near zero cost.
I'm happy to build that if someone prefers, I just didn't bother this time.
  • Loading branch information
Groxx authored Sep 12, 2024
1 parent e5bd91e commit 04add2d
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 21 deletions.
34 changes: 28 additions & 6 deletions common/clock/ratelimiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,13 +198,17 @@ func NewRatelimiter(lim rate.Limit, burst int) Ratelimiter {
return &ratelimiter{
timesource: NewRealTimeSource(),
limiter: rate.NewLimiter(lim, burst),
// intentionally zero, matches rate.Limiter and helps fill the bucket if it is changed before any use.
latestNow: time.Time{},
}
}

func NewMockRatelimiter(ts TimeSource, lim rate.Limit, burst int) Ratelimiter {
return &ratelimiter{
timesource: ts,
limiter: rate.NewLimiter(lim, burst),
// intentionally zero, matches rate.Limiter and helps fill the bucket if it is changed before any use.
latestNow: time.Time{},
}
}

Expand Down Expand Up @@ -303,15 +307,33 @@ func (r *ratelimiter) Limit() rate.Limit {
}

func (r *ratelimiter) SetBurst(newBurst int) {
now, unlock := r.lockNow()
defer unlock()
r.limiter.SetBurstAt(now, newBurst)
r.mut.Lock()
defer r.mut.Unlock()
// setting burst/limit does not advance time, unlike the underlying limiter.
//
// this allows calling them in any order, and the next request
// will fill the token bucket to match elapsed time.
//
// this prefers new burst/limit values over past values,
// as they are assumed to be "better", and in particular ensures the first
// time-advancing call fills with the full values (starting from 0 time, like
// the underlying limiter does).
r.limiter.SetBurstAt(r.latestNow, newBurst)
}

func (r *ratelimiter) SetLimit(newLimit rate.Limit) {
now, unlock := r.lockNow()
defer unlock()
r.limiter.SetLimitAt(now, newLimit)
r.mut.Lock()
defer r.mut.Unlock()
// setting burst/limit does not advance time, unlike the underlying limiter.
//
// this allows calling them in any order, and the next request
// will fill the token bucket to match elapsed time.
//
// this prefers new burst/limit values over past values,
// as they are assumed to be "better", and in particular ensures the first
// time-advancing call fills with the full values (starting from 0 time, like
// the underlying limiter does).
r.limiter.SetLimitAt(r.latestNow, newLimit)
}

func (r *ratelimiter) Tokens() float64 {
Expand Down
26 changes: 16 additions & 10 deletions common/quotas/global/collection/collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,14 +207,17 @@ func TestCollectionSubmitsDataAndUpdates(t *testing.T) {
defer cancel()
require.NoError(t, c.OnStart(ctx))

// generate some data
// generate some data.
// these start with the collections' limits, i.e. 1 token, so only one request is allowed.
someLimiter := c.For("something")
res := someLimiter.Reserve()
assert.True(t, res.Allow(), "first request should have been allowed")
res.Used(true)
assert.False(t, someLimiter.Allow(), "second request on the same domain should have been rejected")
assert.NoError(t, c.For("other").Wait(ctx), "request on a different domain should be allowed")

// all limiters are now drained, and will take ~1s to recover normally.

// prep for the calls
called := make(chan struct{}, 1)
aggs.EXPECT().Update(gomock.Any(), gomock.Any(), map[shared.GlobalKey]rpc.Calls{
Expand All @@ -231,31 +234,34 @@ func TestCollectionSubmitsDataAndUpdates(t *testing.T) {
return rpc.UpdateResult{
Weights: map[shared.GlobalKey]rpc.UpdateEntry{
"test:something": {Weight: 1, UsedRPS: 2}, // should recover a token in 100ms
// "test:other": // not returned, should not change weight
// "test:other": // not returned, should not change weight/rps and stay at 1s
},
Err: nil,
}
})

mts.BlockUntil(1) // need to have created timer in background updater
mts.Advance(time.Second) // trigger the update
mts.Advance(time.Second) // trigger the update. also fills all ratelimiters.

// wait until the calls occur
select {
case <-called:
case <-time.After(time.Second):
t.Fatal("did not make an rpc call after 1s")
case <-time.After(time.Second / 2):
// keep total wait shorter than 1s to avoid refilling the slow token, just in case
t.Fatal("did not make an rpc call after 1/2s")
}
// panic if more calls occur
// crash if more calls occur
close(called)

// wait for the updates to be sent to the ratelimiters, and for "something"'s 100ms token to recover
// wait for the updates to be sent to the ratelimiters, and for at least one "something"'s 100ms token to recover
time.Sleep(150 * time.Millisecond)

// and make sure updates occurred
assert.False(t, c.For("other").Allow(), "should be no recovered tokens yet on the slow limit")
assert.True(t, c.For("something").Allow(), "should have allowed one request on the fast limit") // should use weight, not target rps
assert.False(t, c.For("something").Allow(), "should not have allowed as second request on the fast limit") // should use weight, not target rps
assert.False(t, c.For("other").Allow(), "should be no recovered tokens yet on the slow limit") // less than 1 second == no tokens
assert.True(t, c.For("something").Allow(), "should have allowed one request on the fast limit") // over 100ms (got updated rate) == at least one token
// specifically: because this is the first update to this limiter, it should now allow 10 requests, because the token bucket should be full.
// just check once though, no need to be precise here.
assert.True(t, c.For("something").Allow(), "after the initial update, the fast limiter should have extra tokens available")

assert.NoError(t, c.OnStop(ctx))

Expand Down
20 changes: 18 additions & 2 deletions common/quotas/global/collection/internal/fallback.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,14 @@ const (
// between the two regardless of which is being used.
func NewFallbackLimiter(fallback quotas.Limiter) *FallbackLimiter {
l := &FallbackLimiter{
// 0 allows no requests, will be unused until we receive an update.
// start from 0 as a default, the limiter is unused until it is updated.
//
// caution: it's important to not call any time-advancing methods on this until
// after the first update, so the token bucket will fill properly.
//
// this (partially) mimics how new ratelimiters start with a full token bucket,
// but there does not seem to be any way to perfectly mimic it without using locks,
// and hopefully precision is not needed.
primary: clock.NewRatelimiter(0, 0),
fallback: fallback,
}
Expand Down Expand Up @@ -221,7 +228,16 @@ func (b *FallbackLimiter) FallbackLimit() rate.Limit {
}

func (b *FallbackLimiter) both() quotas.Limiter {
if b.useFallback() {
starting, failing := b.mode()
if starting {
// don't touch the primary until an update occurs,
// to allow the token bucket to fill properly.
return b.fallback
}
if failing {
// keep shadowing calls, so the token buckets are similar.
// this prevents allowing a full burst when recovering, which seems
// reasonable as things were apparently unhealthy.
return NewShadowedLimiter(b.fallback, b.primary)
}
return NewShadowedLimiter(b.primary, b.fallback)
Expand Down
70 changes: 67 additions & 3 deletions common/quotas/global/collection/internal/fallback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,71 @@ import (
"github.com/uber/cadence/common/quotas"
)

func TestFallbackRegression(t *testing.T) {
t.Run("primary should start with fallback limit", func(t *testing.T) {
// checks for an issue in earlier versions, where a newly-enabled primary limit would start with an empty token bucket,
// unfairly rejecting requests that other limiters would allow (as they are created with a full bucket at their target rate).
//
// this can be spotted by doing this sequence:
// - create a new limiter, fallback of N per second
// - this starts the "primary" limiter with limit=0, burst=0.
// - Allow() a request
// - the limiters here may disagree. this is fine, primary is not used yet.
// - however: this advanced the primary's internal "now" to now. (this now happens only after update)
// - update the limit to match the fallback
// - primary now has limit==N, burst==N, but tokens=0 and now=now.
// - Allow() a request
// - fallback allows due to having N-1 tokens
// - primary rejects due to 0 tokens
// ^ this is the problem.
// ^ this could also happen if calling `ratelimiter.Limit()` sets "now" in the future, though this seems unlikely.
//
// this uses real time because sleeping is not necessary, and it ensures
// that any time-advancing calls that occur too early will lead to ~zero tokens.

limit := rate.Limit(10) // enough to accept all requests
rl := NewFallbackLimiter(clock.NewRatelimiter(limit, int(limit)))

// sanity check: this may be fine to change, but it will mean the test needs to be rewritten because the token bucket may not be empty.
orig := rl.primary
assert.Zero(t, orig.Burst(), "sanity check: default primary ratelimiter's burst is not zero, this test likely needs a rewrite")

// simulate: call while still starting up, it should not touch the primary limiter.
allowed := rl.Allow()
before, starting, failing := rl.Collect()
assert.True(t, allowed, "first request should have been allowed, on the fallback limiter")
assert.True(t, starting, "should be true == in starting mode")
assert.False(t, failing, "should be false == not failing")
assert.Equal(t, UsageMetrics{
Allowed: 1,
Rejected: 0,
Idle: 0,
}, before)

// update: this should set limits, and either fill the bucket or cause it to be filled on the next request
rl.Update(limit)

// call again: should be allowed, as this is the first time-touching request since it was created,
// and the token bucket should have filled to match the first-update value.
allowed = rl.Allow()
after, starting, failing := rl.Collect()
assert.True(t, allowed, "second request should have been allowed, on the primary limiter")
assert.False(t, starting, "should be false == not in starting mode (using global)")
assert.False(t, failing, "should be false == not failing")
assert.Equal(t, UsageMetrics{
Allowed: 1,
Rejected: 0,
Idle: 0,
}, after)
assert.InDeltaf(t,
// Tokens() advances time, so this will not be precise.
rl.primary.Tokens(), int(limit)-1, 0.1,
"should have ~%v tokens: %v from the initial fill, minus 1 for the allow call",
int(limit)-1, int(limit),
)
})
}

func TestLimiter(t *testing.T) {
t.Run("uses fallback initially", func(t *testing.T) {
m := quotas.NewMockLimiter(gomock.NewController(t))
Expand Down Expand Up @@ -93,9 +158,8 @@ func TestLimiter(t *testing.T) {
require.False(t, failing, "should not be using fallback")
require.False(t, startup, "should not be starting up, has had an update")

// bucket starts out empty / with whatever contents it had before (zero).
// this is possibly surprising, so it's asserted.
require.False(t, lim.Allow(), "rate.Limiter should reject requests until filled")
// the bucket will fill from time 0 on the first update, ensuring the first request is allowed
require.True(t, lim.Allow(), "rate.Limiter should start with a full bucket")

// fail enough times to trigger a fallback
for i := 0; i < maxFailedUpdates; i++ {
Expand Down

0 comments on commit 04add2d

Please sign in to comment.