From b44a42ecb07506c0031e28b7fb80b4dd89a7a108 Mon Sep 17 00:00:00 2001 From: chronark Date: Fri, 28 Jun 2024 10:35:17 +0200 Subject: [PATCH] chore(workflows): update axiom-deployment-annotation action to use latest version chore(ratelimit): change method signatures to accept context parameter chore(ratelimit): remove unused tracing and logging statements --- .../job_deploy_agent_production.yaml | 8 +- apps/agent/pkg/ratelimit/fixed_window.go | 24 ++- apps/agent/pkg/ratelimit/interface.go | 6 +- apps/agent/pkg/ratelimit/token_bucket.go | 138 ------------------ apps/agent/services/ratelimit/pushpull.go | 5 +- apps/agent/services/ratelimit/ratelimit.go | 7 +- apps/agent/services/ratelimit/sync.go | 2 +- 7 files changed, 30 insertions(+), 160 deletions(-) delete mode 100644 apps/agent/pkg/ratelimit/token_bucket.go diff --git a/.github/workflows/job_deploy_agent_production.yaml b/.github/workflows/job_deploy_agent_production.yaml index b0212acf2f..8dd4848f48 100644 --- a/.github/workflows/job_deploy_agent_production.yaml +++ b/.github/workflows/job_deploy_agent_production.yaml @@ -40,13 +40,13 @@ jobs: - name: Annotate axiom id: axiom-deployment-annotation - uses: axiomhq/annotation-action@v0.1.0 + uses: axiomhq/annotation-action with: - type: "production deployment" + type: "production-release" datasets: agent axiomToken: ${{ secrets.AXIOM_TOKEN }} - title: 'Production deployment ${{ env.VERSION }}' - description: 'Commit ${{ github.event.head_commit.message }}' # optional + # title: 'Production deployment ${{ env.VERSION }}' + # description: 'Commit ${{ github.event.head_commit.message }}' - name: Deploy prod diff --git a/apps/agent/pkg/ratelimit/fixed_window.go b/apps/agent/pkg/ratelimit/fixed_window.go index 6ab3059441..aea2695f64 100644 --- a/apps/agent/pkg/ratelimit/fixed_window.go +++ b/apps/agent/pkg/ratelimit/fixed_window.go @@ -1,11 +1,14 @@ package ratelimit import ( + "context" "fmt" "sync" "time" "github.com/unkeyed/unkey/apps/agent/pkg/logging" + "github.com/unkeyed/unkey/apps/agent/pkg/tracing" + "go.opentelemetry.io/otel/attribute" ) type identifierWindow struct { @@ -53,17 +56,20 @@ func buildKey(identifier string, limit int64, duration int64) string { return fmt.Sprintf("ratelimit:%s:%d:%d", identifier, limit, window) } -func (r *fixedWindow) Take(req RatelimitRequest) RatelimitResponse { - start := time.Now() +func (r *fixedWindow) Take(ctx context.Context, req RatelimitRequest) RatelimitResponse { + ctx, span := tracing.Start(ctx, "fixedWindow.Take") + defer span.End() + key := buildKey(req.Identifier, req.Max, req.RefillInterval) - defer func() { - r.logger.Info().Str("key", key).Int64("latency", time.Since(start).Milliseconds()).Msg("fixedWindow.Take") - }() + span.SetAttributes(attribute.String("key", key)) + _, lockSpan := tracing.Start(ctx, "fixedWindow.Take.lock") r.identifiersLock.Lock() + lockSpan.End() defer r.identifiersLock.Unlock() id, ok := r.identifiers[key] + span.SetAttributes(attribute.Bool("identifierExisted", ok)) if !ok { id = &identifierWindow{id: key, current: 0, reset: time.Now().Add(time.Duration(req.RefillInterval) * time.Millisecond)} r.identifiers[key] = id @@ -77,13 +83,17 @@ func (r *fixedWindow) Take(req RatelimitRequest) RatelimitResponse { return RatelimitResponse{Pass: true, Remaining: req.Max - id.current, Reset: id.reset.UnixMilli(), Limit: req.Max, Current: id.current} } -func (r *fixedWindow) SetCurrent(req SetCurrentRequest) error { +func (r *fixedWindow) SetCurrent(ctx context.Context, req SetCurrentRequest) error { + ctx, span := tracing.Start(ctx, "fixedWindow.SetCurrent") + defer span.End() key := buildKey(req.Identifier, req.Max, req.RefillInterval) + _, lockSpan := tracing.Start(ctx, "fixedWindow.SetCurrent.lock") r.identifiersLock.Lock() + lockSpan.End() defer r.identifiersLock.Unlock() - id, ok := r.identifiers[req.Identifier] + span.SetAttributes(attribute.Bool("identifierExisted", ok)) if !ok { id = &identifierWindow{id: key, current: 0, reset: time.Now().Add(time.Duration(req.RefillInterval) * time.Millisecond)} r.identifiers[req.Identifier] = id diff --git a/apps/agent/pkg/ratelimit/interface.go b/apps/agent/pkg/ratelimit/interface.go index 89c18031dc..2bad7245c3 100644 --- a/apps/agent/pkg/ratelimit/interface.go +++ b/apps/agent/pkg/ratelimit/interface.go @@ -1,8 +1,10 @@ package ratelimit +import "context" + type Ratelimiter interface { - Take(req RatelimitRequest) RatelimitResponse - SetCurrent(req SetCurrentRequest) error + Take(ctx context.Context, req RatelimitRequest) RatelimitResponse + SetCurrent(ctx context.Context, req SetCurrentRequest) error } type RatelimitRequest struct { diff --git a/apps/agent/pkg/ratelimit/token_bucket.go b/apps/agent/pkg/ratelimit/token_bucket.go deleted file mode 100644 index ae1492a900..0000000000 --- a/apps/agent/pkg/ratelimit/token_bucket.go +++ /dev/null @@ -1,138 +0,0 @@ -package ratelimit - -import ( - "sync" - "time" -) - -type bucket struct { - sync.RWMutex - - // unix milli of when this bucket was created - startTime int64 - // Currently remaining tokens - remaining int64 - - // how many tokens at maximum fill - max int64 - - // how many tokens to refill per interval - refillRate int64 - // in milliseconds - refillInterval int64 - - // the window where the last refill happened - lastTick int64 -} - -func newTokenBucket(refillRate int64, refillInterval int64, max int64) *bucket { - now := time.Now().UnixMilli() - return &bucket{ - startTime: now, - remaining: max, - max: max, - refillRate: refillRate, - refillInterval: refillInterval, - lastTick: 0, - } -} - -func (b *bucket) take(tokens int64) RatelimitResponse { - now := time.Now().UnixMilli() - - // The number of the window since bucket creation - tick := (now - b.startTime) / int64(b.refillInterval) - - reset := b.startTime + ((tick + 1) * int64(b.refillInterval)) - - b.Lock() - defer b.Unlock() - - if b.lastTick < tick { - b.remaining += int64((tick - b.lastTick) * int64(b.refillRate)) - if b.remaining > b.max { - b.remaining = b.max - } - b.lastTick = tick - } - - if b.remaining-tokens < 0 { - return RatelimitResponse{ - Pass: false, - Limit: b.max, - Remaining: b.remaining, - Reset: reset, - } - } - b.remaining -= tokens - - return RatelimitResponse{ - Pass: true, - Limit: b.max, - Remaining: b.remaining, - Reset: reset, - } - -} - -type tokenBucket struct { - stateLock sync.RWMutex - state map[string]*bucket -} - -func NewTokenBucket() *tokenBucket { - - r := &tokenBucket{ - stateLock: sync.RWMutex{}, - state: make(map[string]*bucket), - } - - go func() { - for range time.NewTicker(time.Minute).C { - now := time.Now().UnixMilli() - r.stateLock.Lock() - - for id, b := range r.state { - b.Lock() - currentTick := (now - b.startTime) / int64(b.refillInterval) - requiredTicksToRefill := (b.max - b.remaining) / b.refillRate - - if int64(requiredTicksToRefill) > currentTick-b.lastTick { - delete(r.state, id) - } - b.Unlock() - } - r.stateLock.Unlock() - - } - }() - - return r - -} - -func (r *tokenBucket) Take(req RatelimitRequest) RatelimitResponse { - - r.stateLock.RLock() - - b, ok := r.state[req.Identifier] - r.stateLock.RUnlock() - if ok { - return b.take(req.Cost) - } - - r.stateLock.Lock() - // Check again since we are in a new lock and another goroutine could have created it now - b, ok = r.state[req.Identifier] - if ok { - r.stateLock.Unlock() - return b.take(req.Cost) - } - - b = newTokenBucket(req.RefillRate, req.RefillInterval, req.Max) - r.state[req.Identifier] = b - r.stateLock.Unlock() - - return b.take(req.Cost) - -} diff --git a/apps/agent/services/ratelimit/pushpull.go b/apps/agent/services/ratelimit/pushpull.go index 6c3256be44..f0db05d5f1 100644 --- a/apps/agent/services/ratelimit/pushpull.go +++ b/apps/agent/services/ratelimit/pushpull.go @@ -6,7 +6,6 @@ import ( ratelimitv1 "github.com/unkeyed/unkey/apps/agent/gen/proto/ratelimit/v1" "github.com/unkeyed/unkey/apps/agent/pkg/ratelimit" - "github.com/unkeyed/unkey/apps/agent/pkg/tracing" ) func (s *service) PushPull(ctx context.Context, req *ratelimitv1.PushPullRequest) (*ratelimitv1.PushPullResponse, error) { @@ -16,9 +15,7 @@ func (s *service) PushPull(ctx context.Context, req *ratelimitv1.PushPullRequest Int64("latency", time.Since(start).Milliseconds()). Msg("service.PushPull") }() - ctx, span := tracing.Start(ctx, "PushPull") - defer span.End() - res := s.ratelimiter.Take(ratelimit.RatelimitRequest{ + res := s.ratelimiter.Take(ctx, ratelimit.RatelimitRequest{ Identifier: req.Identifier, Max: req.Limit, RefillRate: req.Limit, diff --git a/apps/agent/services/ratelimit/ratelimit.go b/apps/agent/services/ratelimit/ratelimit.go index 51abfd916a..baf4c617e4 100644 --- a/apps/agent/services/ratelimit/ratelimit.go +++ b/apps/agent/services/ratelimit/ratelimit.go @@ -16,9 +16,7 @@ func (s *service) Ratelimit(ctx context.Context, req *ratelimitv1.RatelimitReque Int64("latency", time.Since(start).Milliseconds()). Msg("service.Ratelimit") }() - _, span := tracing.Start(ctx, "Ratelimit") - defer span.End() - res := s.ratelimiter.Take(ratelimit.RatelimitRequest{ + res := s.ratelimiter.Take(ctx, ratelimit.RatelimitRequest{ Identifier: req.Identifier, Max: req.Limit, RefillRate: req.Limit, @@ -28,14 +26,15 @@ func (s *service) Ratelimit(ctx context.Context, req *ratelimitv1.RatelimitReque s.logger.Info().Interface("req", req).Interface("res", res).Msg("ratelimit") if s.pushPullC != nil { + _, span := tracing.Start(ctx, "emitting pushPull event") e := pushPullEvent{ identifier: req.Identifier, limit: req.Limit, duration: req.Duration, cost: req.Cost, } - s.logger.Info().Interface("event", e).Msg("queueing pushPull with origin") s.pushPullC <- e + span.End() } diff --git a/apps/agent/services/ratelimit/sync.go b/apps/agent/services/ratelimit/sync.go index 366034f472..18b2a6d2a7 100644 --- a/apps/agent/services/ratelimit/sync.go +++ b/apps/agent/services/ratelimit/sync.go @@ -54,7 +54,7 @@ func (s *service) createWorker(id int) { } logger.Info().Str("peerId", peer.Id).Str("key", key).Interface("res", res).Msg("push pull came back") - err = s.ratelimiter.SetCurrent(ratelimit.SetCurrentRequest{ + err = s.ratelimiter.SetCurrent(context.Background(), ratelimit.SetCurrentRequest{ Identifier: e.identifier, Max: e.limit, Current: res.Msg.Current,