Skip to content

Commit

Permalink
Merge branch 'main' into issue-2050
Browse files Browse the repository at this point in the history
  • Loading branch information
ygorhiroshi authored Sep 16, 2024
2 parents 0350ea5 + b725b84 commit 4201d0f
Show file tree
Hide file tree
Showing 19 changed files with 549 additions and 12 deletions.
17 changes: 17 additions & 0 deletions apps/agent/pkg/batch/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package batch

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

var (
// droppedMessages tracks the number of messages dropped due to a full buffer
// for each BatchProcessor instance. The "name" label identifies the specific
// BatchProcessor.
droppedMessages = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "agent",
Subsystem: "batch",
Name: "dropped_messages",
}, []string{"name"})
)
19 changes: 17 additions & 2 deletions apps/agent/pkg/batch/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,18 @@ import (
)

type BatchProcessor[T any] struct {
name string
drop bool
buffer chan T
batch []T
config Config[T]
flush func(ctx context.Context, batch []T)
}

type Config[T any] struct {
// drop events if the buffer is full
Drop bool
Name string
BatchSize int
BufferSize int
FlushInterval time.Duration
Expand All @@ -28,6 +33,8 @@ func New[T any](config Config[T]) *BatchProcessor[T] {
}

bp := &BatchProcessor[T]{
name: config.Name,
drop: config.Drop,
buffer: make(chan T, config.BufferSize),
batch: make([]T, 0, config.BatchSize),
flush: config.Flush,
Expand Down Expand Up @@ -71,15 +78,23 @@ func (bp *BatchProcessor[T]) process() {
flushAndReset()
}
}

}

func (bp *BatchProcessor[T]) Size() int {
return len(bp.buffer)
}

func (bp *BatchProcessor[T]) Buffer(t T) {
bp.buffer <- t
if bp.drop {

select {
case bp.buffer <- t:
default:
droppedMessages.WithLabelValues(bp.name).Inc()
}
} else {
bp.buffer <- t
}
}

func (bp *BatchProcessor[T]) Close() {
Expand Down
29 changes: 29 additions & 0 deletions apps/agent/pkg/circuitbreaker/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package circuitbreaker

import (
"context"
"errors"
)

type State string

var (
// Open state means the circuit breaker is open and requests are not allowed
// to pass through
Open State = "open"
// HalfOpen state means the circuit breaker is in a state of testing the
// upstream service to see if it has recovered
HalfOpen State = "halfopen"
// Closed state means the circuit breaker is allowing requests to pass
// through to the upstream service
Closed State = "closed"
)

var (
ErrTripped = errors.New("circuit breaker is open")
ErrTooManyRequests = errors.New("too many requests during half open state")
)

type CircuitBreaker[Res any] interface {
Do(ctx context.Context, fn func(context.Context) (Res, error)) (Res, error)
}
228 changes: 228 additions & 0 deletions apps/agent/pkg/circuitbreaker/lib.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
package circuitbreaker

import (
"context"
"fmt"
"sync"
"time"

"github.com/unkeyed/unkey/apps/agent/pkg/clock"
"github.com/unkeyed/unkey/apps/agent/pkg/logging"
"github.com/unkeyed/unkey/apps/agent/pkg/tracing"
)

type CB[Res any] struct {
sync.Mutex
// This is a pointer to the configuration of the circuit breaker because we
// need to modify the clock for testing
config *config

logger logging.Logger

// State of the circuit
state State

// reset the counters every cyclic period
resetCountersAt time.Time

// reset the state every recoveryTimeout
resetStateAt time.Time

// counters are protected by the mutex and are reset every cyclic period
requests int
successes int
failures int
consecutiveSuccesses int
consecutiveFailures int
}

type config struct {
name string
// Max requests that may pass through the circuit breaker in its half-open state
// If all requests are successful, the circuit will close
// If any request fails, the circuit will remaing half open until the next cycle
maxRequests int

// Interval to clear counts while the circuit is closed
cyclicPeriod time.Duration

// How long the circuit will stay open before transitioning to half-open
timeout time.Duration

// Determine whether the error is a downstream error or not
// If the error is a downstream error, the circuit will count it
// If the error is not a downstream error, the circuit will not count it
isDownstreamError func(error) bool

// How many downstream errors within a cyclic period are allowed before the
// circuit trips and opens
tripThreshold int

// Clock to use for timing, defaults to the system clock but can be overridden for testing
clock clock.Clock

logger logging.Logger
}

func WithMaxRequests(maxRequests int) applyConfig {
return func(c *config) {
c.maxRequests = maxRequests
}
}

func WithCyclicPeriod(cyclicPeriod time.Duration) applyConfig {
return func(c *config) {
c.cyclicPeriod = cyclicPeriod
}
}
func WithIsDownstreamError(isDownstreamError func(error) bool) applyConfig {
return func(c *config) {
c.isDownstreamError = isDownstreamError
}
}
func WithTripThreshold(tripThreshold int) applyConfig {
return func(c *config) {
c.tripThreshold = tripThreshold
}
}

func WithTimeout(timeout time.Duration) applyConfig {
return func(c *config) {
c.timeout = timeout
}
}

// for testing
func WithClock(clock clock.Clock) applyConfig {
return func(c *config) {
c.clock = clock
}
}

func WithLogger(logger logging.Logger) applyConfig {
return func(c *config) {
c.logger = logger
}
}

// applyConfig applies a config setting to the circuit breaker
type applyConfig func(*config)

func New[Res any](name string, applyConfigs ...applyConfig) *CB[Res] {

cfg := &config{
name: name,
maxRequests: 10,
cyclicPeriod: 5 * time.Second,
timeout: time.Minute,
isDownstreamError: func(err error) bool {
return err != nil
},
tripThreshold: 5,
clock: clock.New(),
logger: logging.New(nil),
}

for _, apply := range applyConfigs {
apply(cfg)
}

cb := &CB[Res]{
config: cfg,
logger: cfg.logger,
state: Closed,
resetCountersAt: cfg.clock.Now().Add(cfg.cyclicPeriod),
resetStateAt: cfg.clock.Now().Add(cfg.timeout),
}

return cb
}

var _ CircuitBreaker[any] = &CB[any]{}

func (cb *CB[Res]) Do(ctx context.Context, fn func(context.Context) (Res, error)) (res Res, err error) {
ctx, span := tracing.Start(ctx, tracing.NewSpanName(fmt.Sprintf("circuitbreaker.%s", cb.config.name), "Do"))
defer span.End()

err = cb.preflight(ctx)
if err != nil {
return res, err
}

ctx, fnSpan := tracing.Start(ctx, tracing.NewSpanName(fmt.Sprintf("circuitbreaker.%s", cb.config.name), "fn"))
res, err = fn(ctx)
fnSpan.End()

cb.postflight(ctx, err)

return res, err

}

// preflight checks if the circuit is ready to accept a request
func (cb *CB[Res]) preflight(ctx context.Context) error {
ctx, span := tracing.Start(ctx, tracing.NewSpanName(fmt.Sprintf("circuitbreaker.%s", cb.config.name), "preflight"))
defer span.End()
cb.Lock()
defer cb.Unlock()

now := cb.config.clock.Now()

if now.After(cb.resetCountersAt) {
cb.logger.Info().Msg("resetting circuit breaker")
cb.requests = 0
cb.successes = 0
cb.failures = 0
cb.consecutiveSuccesses = 0
cb.consecutiveFailures = 0
cb.resetCountersAt = now.Add(cb.config.cyclicPeriod)
}
if cb.state == Open && now.After(cb.resetStateAt) {
cb.state = HalfOpen
cb.resetStateAt = now.Add(cb.config.timeout)
}

requests.WithLabelValues(cb.config.name, string(cb.state)).Inc()

if cb.state == Open {
return ErrTripped
}

cb.logger.Info().Str("state", string(cb.state)).Int("requests", cb.requests).Int("maxRequests", cb.config.maxRequests).Msg("circuit breaker state")
if cb.state == HalfOpen && cb.requests >= cb.config.maxRequests {
return ErrTooManyRequests
}
return nil
}

// postflight updates the circuit breaker state based on the result of the request
func (cb *CB[Res]) postflight(ctx context.Context, err error) {
ctx, span := tracing.Start(ctx, tracing.NewSpanName(fmt.Sprintf("circuitbreaker.%s", cb.config.name), "postflight"))
defer span.End()
cb.Lock()
defer cb.Unlock()
cb.requests++
if cb.config.isDownstreamError(err) {
cb.failures++
cb.consecutiveFailures++
cb.consecutiveSuccesses = 0
} else {
cb.successes++
cb.consecutiveSuccesses++
cb.consecutiveFailures = 0
}

switch cb.state {

case Closed:
if cb.failures >= cb.config.tripThreshold {
cb.state = Open
}

case HalfOpen:
if cb.consecutiveSuccesses >= cb.config.maxRequests {
cb.state = Closed
}
}

}
Loading

0 comments on commit 4201d0f

Please sign in to comment.