Skip to content

Commit

Permalink
revert: fixed window ratelimiting (#2116)
Browse files Browse the repository at this point in the history
* chore: switch file names

* revert: fixed window ratelimit

* chore(workflows): update job_test_agent_integration.yaml to use pkg and services
feat(workflows): add job_test_agent_local.yaml for testing agent locally
feat(workflows): add test_agent_local job to pr.yaml for local agent testing

* fix: use defer

* fix(ratelimit): add timeout to mitigate request to prevent hanging connections
test(ratelimit): fix loop condition in TestAccuracy_fixed_time to iterate correctly

* chore(workflows): rename workflow from 'Test Agent Local' to 'Test Agent Integration'
feat(workflows): update test job to run on integration tests directory
feat(workflows): add environment variables for cluster test and agent base URL

* chore(workflows): remove unnecessary inputs from workflow_call event in job_test_agent_local.yaml

* refactor(ratelimit_test.go): simplify calculation of lower limit in test
fix(ratelimit_test.go): fix calculation of upper limit in test

* chore(workflows): rename test_agent_integration job to test_agent_local

* test(ratelimit_mitigation_test.go): update test data for cluster sizes to include only 1, 3, and 5 nodes

* chore: mute logs

* fix(circuitbreaker): remove unnecessary log message in preflight function
fix(ratelimit_replication_test): correct index to call Ratelimit on correct node
  • Loading branch information
chronark committed Sep 20, 2024
1 parent 2d99e07 commit ee07672
Show file tree
Hide file tree
Showing 12 changed files with 259 additions and 231 deletions.
25 changes: 25 additions & 0 deletions .github/workflows/job_test_agent_local.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
name: Test Agent Local
on:
workflow_call:



jobs:
test_agent_local:
runs-on: ubuntu-latest
timeout-minutes: 60
steps:
- uses: actions/checkout@v4
- name: Install
uses: ./.github/actions/install
with:
go: true


- name: Build
run: task build
working-directory: apps/agent

- name: Test
run: go test -cover -json -timeout=60m -failfast ./pkg/... ./services/... | tparse -all -progress
working-directory: apps/agent
4 changes: 3 additions & 1 deletion .github/workflows/pr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ jobs:
name: Test API
uses: ./.github/workflows/job_test_api_local.yaml


test_agent_local:
name: Test Agent Local
uses: ./.github/workflows/job_test_agent_local.yaml
# test_agent_integration:
# name: Test Agent Integration
# runs-on: ubuntu-latest
Expand Down
1 change: 0 additions & 1 deletion apps/agent/pkg/circuitbreaker/lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,6 @@ func (cb *CB[Res]) preflight(ctx context.Context) error {
now := cb.config.clock.Now()

if now.After(cb.resetCountersAt) {
cb.logger.Info().Msg("resetting circuit breaker")
cb.requests = 0
cb.successes = 0
cb.failures = 0
Expand Down
28 changes: 6 additions & 22 deletions apps/agent/pkg/clock/real_clock.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,15 @@ package clock

import "time"

type TestClock struct {
now time.Time
type RealClock struct {
}

func NewTestClock(now ...time.Time) *TestClock {
if len(now) == 0 {
now = append(now, time.Now())
}
return &TestClock{now: now[0]}
func New() *RealClock {
return &RealClock{}
}

var _ Clock = &TestClock{}
var _ Clock = &RealClock{}

func (c *TestClock) Now() time.Time {
return c.now
}

// Tick advances the clock by the given duration and returns the new time.
func (c *TestClock) Tick(d time.Duration) time.Time {
c.now = c.now.Add(d)
return c.now
}

// Set sets the clock to the given time and returns the new time.
func (c *TestClock) Set(t time.Time) time.Time {
c.now = t
return c.now
func (c *RealClock) Now() time.Time {
return time.Now()
}
28 changes: 22 additions & 6 deletions apps/agent/pkg/clock/test_clock.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,31 @@ package clock

import "time"

type RealClock struct {
type TestClock struct {
now time.Time
}

func New() *RealClock {
return &RealClock{}
func NewTestClock(now ...time.Time) *TestClock {
if len(now) == 0 {
now = append(now, time.Now())
}
return &TestClock{now: now[0]}
}

var _ Clock = &RealClock{}
var _ Clock = &TestClock{}

func (c *RealClock) Now() time.Time {
return time.Now()
func (c *TestClock) Now() time.Time {
return c.now
}

// Tick advances the clock by the given duration and returns the new time.
func (c *TestClock) Tick(d time.Duration) time.Time {
c.now = c.now.Add(d)
return c.now
}

// Set sets the clock to the given time and returns the new time.
func (c *TestClock) Set(t time.Time) time.Time {
c.now = t
return c.now
}
19 changes: 11 additions & 8 deletions apps/agent/services/ratelimit/mitigate.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ func (s *service) Mitigate(ctx context.Context, req *ratelimitv1.MitigateRequest
bucket, _ := s.getBucket(bucketKey{req.Identifier, req.Limit, duration})
bucket.Lock()
defer bucket.Unlock()

bucket.windows[req.Window.GetSequence()] = req.Window

return &ratelimitv1.MitigateResponse{}, nil
Expand Down Expand Up @@ -51,16 +50,20 @@ func (s *service) broadcastMitigation(req mitigateWindowRequest) {
return
}
for _, peer := range peers {
_, err := peer.client.Mitigate(ctx, connect.NewRequest(&ratelimitv1.MitigateRequest{
Identifier: req.identifier,
Limit: req.limit,
Duration: req.duration.Milliseconds(),
Window: req.window,
}))
_, err := s.mitigateCircuitBreaker.Do(ctx, func(innerCtx context.Context) (*connect.Response[ratelimitv1.MitigateResponse], error) {
innerCtx, cancel := context.WithTimeout(innerCtx, 10*time.Second)
defer cancel()
return peer.client.Mitigate(innerCtx, connect.NewRequest(&ratelimitv1.MitigateRequest{
Identifier: req.identifier,
Limit: req.limit,
Duration: req.duration.Milliseconds(),
Window: req.window,
}))
})
if err != nil {
s.logger.Err(err).Msg("failed to call mitigate")
} else {
s.logger.Info().Str("peerId", peer.id).Msg("broadcasted mitigation")
s.logger.Debug().Str("peerId", peer.id).Msg("broadcasted mitigation")
}
}
}
8 changes: 5 additions & 3 deletions apps/agent/services/ratelimit/ratelimit_mitigation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
)

func TestExceedingTheLimitShouldNotifyAllNodes(t *testing.T) {
t.Skip()

for _, clusterSize := range []int{1, 3, 5} {
t.Run(fmt.Sprintf("Cluster Size %d", clusterSize), func(t *testing.T) {
logger := logging.New(nil)
Expand Down Expand Up @@ -94,23 +94,25 @@ func TestExceedingTheLimitShouldNotifyAllNodes(t *testing.T) {
ctx := context.Background()

// Saturate the window
for i := int64(0); i <= limit; i++ {
for i := int64(0); i < limit; i++ {
rl := util.RandomElement(ratelimiters)
res, err := rl.Ratelimit(ctx, req)
require.NoError(t, err)
t.Logf("saturate res: %+v", res)
require.True(t, res.Success)

}

time.Sleep(time.Second * 5)

// Let's hit everry node again
// They should all be mitigated
for i, rl := range ratelimiters {

res, err := rl.Ratelimit(ctx, req)
require.NoError(t, err)
t.Logf("res from %d: %+v", i, res)
// require.False(t, res.Success)
require.False(t, res.Success)
}

})
Expand Down
6 changes: 2 additions & 4 deletions apps/agent/services/ratelimit/ratelimit_replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ import (
"github.com/unkeyed/unkey/apps/agent/pkg/util"
)

func TestReplication(t *testing.T) {
t.Skip()
func TestSync(t *testing.T) {
type Node struct {
srv *service
cluster cluster.Cluster
Expand Down Expand Up @@ -106,7 +105,7 @@ func TestReplication(t *testing.T) {
}

// Figure out who is the origin
_, err := nodes[1].srv.Ratelimit(ctx, req)
_, err := nodes[0].srv.Ratelimit(ctx, req)
require.NoError(t, err)

time.Sleep(5 * time.Second)
Expand Down Expand Up @@ -138,7 +137,6 @@ func TestReplication(t *testing.T) {
require.True(t, ok)
bucket.RLock()
window := bucket.getCurrentWindow(now)
t.Logf("window on origin: %+v", window)
counter := window.Counter
bucket.RUnlock()

Expand Down
Loading

0 comments on commit ee07672

Please sign in to comment.