Skip to content

Commit

Permalink
feat: prometheus in docker
Browse files Browse the repository at this point in the history
  • Loading branch information
chronark committed Sep 16, 2024
1 parent 782c42c commit 875698a
Show file tree
Hide file tree
Showing 11 changed files with 144 additions and 13 deletions.
14 changes: 14 additions & 0 deletions apps/agent/pkg/batch/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package batch

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

var (
droppedMessages = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "agent",
Subsystem: "batch",
Name: "dropped_messages",
}, []string{"name"})
)
18 changes: 16 additions & 2 deletions apps/agent/pkg/batch/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,18 @@ import (
)

type BatchProcessor[T any] struct {
name string
drop bool
buffer chan T
batch []T
config Config[T]
flush func(ctx context.Context, batch []T)
}

type Config[T any] struct {
// drop events if the buffer is full
Drop bool
Name string
BatchSize int
BufferSize int
FlushInterval time.Duration
Expand All @@ -28,6 +33,7 @@ func New[T any](config Config[T]) *BatchProcessor[T] {
}

bp := &BatchProcessor[T]{
name: config.Name,
buffer: make(chan T, config.BufferSize),
batch: make([]T, 0, config.BatchSize),
flush: config.Flush,
Expand Down Expand Up @@ -71,15 +77,23 @@ func (bp *BatchProcessor[T]) process() {
flushAndReset()
}
}

}

func (bp *BatchProcessor[T]) Size() int {
return len(bp.buffer)
}

func (bp *BatchProcessor[T]) Buffer(t T) {
bp.buffer <- t
if bp.drop {

select {
case bp.buffer <- t:
default:
droppedMessages.WithLabelValues(bp.name).Inc()
}
} else {
bp.buffer <- t
}
}

func (bp *BatchProcessor[T]) Close() {
Expand Down
1 change: 0 additions & 1 deletion apps/agent/pkg/circuitbreaker/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ var (
var (
ErrTripped = errors.New("circuit breaker is open")
ErrTooManyRequests = errors.New("too many requests during half open state")
ErrTimeout = errors.New("circuit breaker timeout")
)

type CircuitBreaker[Res any] interface {
Expand Down
21 changes: 17 additions & 4 deletions apps/agent/pkg/circuitbreaker/lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package circuitbreaker

import (
"context"
"fmt"
"sync"
"time"

"github.com/unkeyed/unkey/apps/agent/pkg/clock"
"github.com/unkeyed/unkey/apps/agent/pkg/logging"
"github.com/unkeyed/unkey/apps/agent/pkg/tracing"
)

type CB[Res any] struct {
Expand Down Expand Up @@ -139,22 +141,28 @@ func New[Res any](name string, applyConfigs ...applyConfig) *CB[Res] {
var _ CircuitBreaker[any] = &CB[any]{}

func (cb *CB[Res]) Do(ctx context.Context, fn func(context.Context) (Res, error)) (res Res, err error) {
ctx, span := tracing.Start(ctx, tracing.NewSpanName(fmt.Sprintf("circuitbreaker.%s", cb.config.name), "Do"))
defer span.End()

err = cb.preflight()
err = cb.preflight(ctx)
if err != nil {
return res, err
}

ctx, fnSpan := tracing.Start(ctx, tracing.NewSpanName(fmt.Sprintf("circuitbreaker.%s", cb.config.name), "fn"))
res, err = fn(ctx)
fnSpan.End()

cb.postflight(err)
cb.postflight(ctx, err)

return res, err

}

// preflight checks if the circuit is ready to accept a request
func (cb *CB[Res]) preflight() error {
func (cb *CB[Res]) preflight(ctx context.Context) error {
ctx, span := tracing.Start(ctx, tracing.NewSpanName(fmt.Sprintf("circuitbreaker.%s", cb.config.name), "preflight"))
defer span.End()
cb.Lock()
defer cb.Unlock()

Expand All @@ -174,9 +182,12 @@ func (cb *CB[Res]) preflight() error {
cb.resetStateAt = now.Add(cb.config.timeout)
}

requests.WithLabelValues(cb.config.name, string(cb.state)).Inc()

if cb.state == Open {
return ErrTripped
}

cb.logger.Info().Str("state", string(cb.state)).Int("requests", cb.requests).Int("maxRequests", cb.config.maxRequests).Msg("circuit breaker state")
if cb.state == HalfOpen && cb.requests >= cb.config.maxRequests {
return ErrTooManyRequests
Expand All @@ -185,7 +196,9 @@ func (cb *CB[Res]) preflight() error {
}

// postflight updates the circuit breaker state based on the result of the request
func (cb *CB[Res]) postflight(err error) {
func (cb *CB[Res]) postflight(ctx context.Context, err error) {
ctx, span := tracing.Start(ctx, tracing.NewSpanName(fmt.Sprintf("circuitbreaker.%s", cb.config.name), "postflight"))
defer span.End()
cb.Lock()
defer cb.Unlock()
cb.requests++
Expand Down
14 changes: 14 additions & 0 deletions apps/agent/pkg/circuitbreaker/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package circuitbreaker

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

var (
requests = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "agent",
Subsystem: "circuitbreaker",
Name: "requests",
}, []string{"name", "state"})
)
22 changes: 18 additions & 4 deletions apps/agent/services/ratelimit/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@ import (
"sync"
"time"

"connectrpc.com/connect"

ratelimitv1 "github.com/unkeyed/unkey/apps/agent/gen/proto/ratelimit/v1"
"github.com/unkeyed/unkey/apps/agent/gen/proto/ratelimit/v1/ratelimitv1connect"
"github.com/unkeyed/unkey/apps/agent/pkg/circuitbreaker"
"github.com/unkeyed/unkey/apps/agent/pkg/cluster"
"github.com/unkeyed/unkey/apps/agent/pkg/logging"
"github.com/unkeyed/unkey/apps/agent/pkg/metrics"
Expand Down Expand Up @@ -33,6 +37,8 @@ type service struct {
leaseIdToKeyMapLock sync.RWMutex
// Store a reference leaseId -> window key
leaseIdToKeyMap map[string]string

syncCircuitBreaker circuitbreaker.CircuitBreaker[*connect.Response[ratelimitv1.PushPullResponse]]
}

type Config struct {
Expand All @@ -58,17 +64,25 @@ func New(cfg Config) (*service, error) {
buckets: make(map[string]*bucket),
leaseIdToKeyMapLock: sync.RWMutex{},
leaseIdToKeyMap: make(map[string]string),
syncCircuitBreaker: circuitbreaker.New[*connect.Response[ratelimitv1.PushPullResponse]](
"ratelimit.syncWithOrigin",
circuitbreaker.WithLogger(cfg.Logger),
circuitbreaker.WithCyclicPeriod(10*time.Second),
circuitbreaker.WithTimeout(time.Minute),
circuitbreaker.WithMaxRequests(100),
circuitbreaker.WithTripThreshold(50),
),
}

repeat.Every(time.Minute, s.removeExpiredIdentifiers)

if cfg.Cluster != nil {
s.mitigateBuffer = make(chan mitigateWindowRequest, 10000)
s.syncBuffer = make(chan syncWithOriginRequest, 10000)
s.mitigateBuffer = make(chan mitigateWindowRequest, 100000)
s.syncBuffer = make(chan syncWithOriginRequest, 100000)
// Process the individual requests to the origin and update local state
// We're using 32 goroutines to parallelise the network requests'
// We're using 128 goroutines to parallelise the network requests'
s.logger.Info().Msg("starting background jobs")
for range 32 {
for range 128 {
go func() {
for {
select {
Expand Down
6 changes: 5 additions & 1 deletion apps/agent/services/ratelimit/sync_with_origin.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@ func (s *service) syncWithOrigin(req syncWithOriginRequest) {
return
}

res, err := client.PushPull(ctx, connect.NewRequest(req.req))
res, err := s.syncCircuitBreaker.Do(ctx, func(innerCtx context.Context) (*connect.Response[ratelimitv1.PushPullResponse], error) {
innerCtx, cancel = context.WithTimeout(innerCtx, 10*time.Second)
defer cancel()
return client.PushPull(innerCtx, connect.NewRequest(req.req))
})
if err != nil {
s.peersMu.Lock()
s.logger.Warn().Err(err).Msg("resetting peer client due to error")
Expand Down
1 change: 0 additions & 1 deletion apps/api/src/routes/v1_keys_verifyKey.multilimit.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ describe("without identities", () => {
},
});

console.info(res);
expect(res.status, `expected 200, received: ${JSON.stringify(res, null, 2)}`).toBe(200);
expect(res.body.valid).toBe(true);
expect(res.body.code).toBe("VALID");
Expand Down
24 changes: 24 additions & 0 deletions deployment/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,34 @@ services:
- planetscale
- agent_lb

prometheus:
image: prom/prometheus
container_name: prometheus
command:
- '--config.file=/etc/prometheus/prometheus.yaml'
ports:
- 9090:9090
restart: unless-stopped
volumes:
- ./prometheus:/etc/prometheus
- prometheus:/prometheus


grafana:
image: grafana/grafana
container_name: grafana
ports:
- 4000:3000
restart: unless-stopped
environment:
- GF_SECURITY_ADMIN_USER=admin
- GF_SECURITY_ADMIN_PASSWORD=grafana
volumes:
- ./grafana:/etc/grafana/provisioning/datasources
volumes:
mysql:
grafana:
clickhouse:
clickhouse-keeper:
s3:
prometheus:
16 changes: 16 additions & 0 deletions deployment/grafana/grafana.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
apiVersion: 1

auth:
basic:
enabled: false
anonymous:
enabled: true
org_name: Local
org_role: Admin
datasources:
- name: Prometheus
type: prometheus
url: http://prometheus:9090
isDefault: true
access: proxy
editable: true
20 changes: 20 additions & 0 deletions deployment/prometheus/prometheus.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
global:
scrape_interval: 15s
scrape_timeout: 10s
evaluation_interval: 15s
alerting:
alertmanagers:
- static_configs:
- targets: []
scheme: http
timeout: 10s
api_version: v1
scrape_configs:
- job_name: prometheus
metrics_path: /metrics
dns_sd_configs:
- names:
- agent
type: A
port: 2112
refresh_interval: "30s"

0 comments on commit 875698a

Please sign in to comment.