Skip to content

Commit

Permalink
Attempt publication with circuit breaker
Browse files Browse the repository at this point in the history
GPBFT silently ignores broadcast and rebroadcast requests as it is
beyond its boundaries of responsibility to do something about. In
practice such failures may be a sign that pubsub is overwhelmed with
messages. Therefore, ideally the system should avoid aggravating the
situation by requesting further broadcasts. This is specially important
in re-broadcast requests because it often involves batch message
publication.

The changes here wrap the pubsub publication calls with a circuit
breaker that will open on consecutive errors (set to `5`), and will not
attempt to actually publish messages until a reset timeout (set to `3s`)
has passed.

Fixes #632
  • Loading branch information
masih committed Oct 9, 2024
1 parent 25c3ade commit f003b1f
Show file tree
Hide file tree
Showing 3 changed files with 225 additions and 2 deletions.
16 changes: 14 additions & 2 deletions host.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/filecoin-project/go-f3/certstore"
"github.com/filecoin-project/go-f3/ec"
"github.com/filecoin-project/go-f3/gpbft"
"github.com/filecoin-project/go-f3/internal/circuitbreaker"
"github.com/filecoin-project/go-f3/internal/clock"
"github.com/filecoin-project/go-f3/internal/psutil"
"github.com/filecoin-project/go-f3/internal/writeaheadlog"
Expand Down Expand Up @@ -41,6 +42,7 @@ type gpbftRunner struct {

participant *gpbft.Participant
topic *pubsub.Topic
cb *circuitbreaker.CircuitBreaker

alertTimer *clock.Timer

Expand Down Expand Up @@ -86,6 +88,7 @@ func newRunner(
ctxCancel: ctxCancel,
equivFilter: newEquivocationFilter(pID),
selfMessages: make(map[uint64]map[roundPhase][]*gpbft.GMessage),
cb: circuitbreaker.New(5, 3*time.Second),
}

// create a stopped timer to facilitate alerts requested from gpbft
Expand Down Expand Up @@ -444,7 +447,7 @@ func (h *gpbftRunner) BroadcastMessage(ctx context.Context, msg *gpbft.GMessage)
return fmt.Errorf("marshalling GMessage for broadcast: %w", err)
}

err = h.topic.Publish(ctx, bw.Bytes())
err = h.publishWithCircuitBreaker(ctx, bw.Bytes())
if err != nil {
return fmt.Errorf("publishing message: %w", err)
}
Expand All @@ -463,12 +466,21 @@ func (h *gpbftRunner) rebroadcastMessage(msg *gpbft.GMessage) error {
if err := msg.MarshalCBOR(&bw); err != nil {
return fmt.Errorf("marshalling GMessage for broadcast: %w", err)
}
if err := h.topic.Publish(h.runningCtx, bw.Bytes()); err != nil {
if err := h.publishWithCircuitBreaker(h.runningCtx, bw.Bytes()); err != nil {
return fmt.Errorf("publishing message: %w", err)
}
return nil
}

func (h *gpbftRunner) publishWithCircuitBreaker(ctx context.Context, msg []byte) error {
if h.topic == nil {
return nil

Check warning on line 477 in host.go

View check run for this annotation

Codecov / codecov/patch

host.go#L477

Added line #L477 was not covered by tests
}
return h.cb.Run(func() error {
return h.topic.Publish(ctx, msg)
})
}

var _ pubsub.ValidatorEx = (*gpbftRunner)(nil).validatePubsubMessage

func (h *gpbftRunner) validatePubsubMessage(ctx context.Context, _ peer.ID, msg *pubsub.Message) (_result pubsub.ValidationResult) {
Expand Down
109 changes: 109 additions & 0 deletions internal/circuitbreaker/circuitbreaker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package circuitbreaker

import (
"errors"
"fmt"
"sync"
"time"
)

const (
Closed Status = iota
Open
HalfOpen
)

// ErrOpen signals that the circuit is open. See CircuitBreaker.Run.
var ErrOpen = errors.New("circuit breaker is open")

type Status int

type CircuitBreaker struct {
maxFailures int
resetTimeout time.Duration

// mu guards access to status, lastFailure and failures.
mu sync.Mutex
failures int
lastFailure time.Time
status Status
}

// New creates a new CircuitBreaker instance with the specified maximum number
// of failures and a reset timeout duration.
//
// See CircuitBreaker.Run.
func New(maxFailures int, resetTimeout time.Duration) *CircuitBreaker {
return &CircuitBreaker{
maxFailures: maxFailures,
resetTimeout: resetTimeout,
}
}

// Run attempts to execute the provided function within the context of the
// circuit breaker. It handles state transitions, Closed, Open, or HalfOpen,
// based on the outcome of the attempt.
//
// If the circuit is in the Open state, and not enough time has passed since the
// last failure, the circuit remains open, and the function returns
// ErrOpen without attempting the provided function. If enough time
// has passed, the circuit transitions to HalfOpen, and one attempt is allowed.
//
// In HalfOpen state if the function is executed and returns an error, the
// circuit breaker will transition back to Open status. Otherwise, if the
// function executes successfully, the circuit resets to the Closed state, and
// the failure count is reset to zero.
//
// Example:
//
// cb := NewCircuitBreaker(3, time.Second)
// switch err := cb.Run(func() error {
// // Your attempt logic here
// return nil
// }); {
// case errors.Is(err, ErrCircuitBreakerOpen):
// // No execution attempt was made since the circuit is open.
// case err != nil:
// // Execution attempt failed.
// default:
// // Execution attempt succeeded.
// }
func (cb *CircuitBreaker) Run(attempt func() error) error {
cb.mu.Lock()
defer cb.mu.Unlock()
switch cb.status {
case Open:
if time.Since(cb.lastFailure) < cb.resetTimeout {
// Not enough time has passed since the circuit opened. Do not make any further
// attempts.
return ErrOpen
}
// Enough time has passed since last failure. Proceed to allow one attempt by
// half-opening the circuit.
cb.status = HalfOpen
fallthrough
case HalfOpen, Closed:
if err := attempt(); err != nil {
cb.failures++
if cb.failures >= cb.maxFailures {
// Trip the circuit as we are at or above the max failure threshold.
cb.status = Open
cb.lastFailure = time.Now()
}
return err
}
// Reset the circuit since the attempt succeeded.
cb.status = Closed
cb.failures = 0
return nil
default:
return fmt.Errorf("unknown status: %d", cb.status)

Check warning on line 100 in internal/circuitbreaker/circuitbreaker.go

View check run for this annotation

Codecov / codecov/patch

internal/circuitbreaker/circuitbreaker.go#L99-L100

Added lines #L99 - L100 were not covered by tests
}
}

// GetStatus returns the current status of the CircuitBreaker.
func (cb *CircuitBreaker) GetStatus() Status {
cb.mu.Lock()
defer cb.mu.Unlock()
return cb.status
}
102 changes: 102 additions & 0 deletions internal/circuitbreaker/circuitbreaker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package circuitbreaker_test

import (
"errors"
"sync"
"testing"
"time"

"github.com/filecoin-project/go-f3/internal/circuitbreaker"
"github.com/stretchr/testify/require"
)

func TestCircuitBreaker(t *testing.T) {
t.Parallel()

const (
maxFailures = 3
restTimeout = 10 * time.Millisecond

eventualTimeout = restTimeout * 2
eventualTick = restTimeout / 5
)

var (
failure = errors.New("fish out of water")

succeed = func() error { return nil }
fail = func() error { return failure }
trip = func(t *testing.T, subject *circuitbreaker.CircuitBreaker) {
for range maxFailures {
require.ErrorContains(t, subject.Run(fail), "fish")
}
require.Equal(t, circuitbreaker.Open, subject.GetStatus())
}
)

t.Run("closed on no error", func(t *testing.T) {
t.Parallel()
subject := circuitbreaker.New(maxFailures, restTimeout)
require.NoError(t, subject.Run(succeed))
require.Equal(t, circuitbreaker.Closed, subject.GetStatus())
})

t.Run("opens after max failures and stays open", func(t *testing.T) {
subject := circuitbreaker.New(maxFailures, restTimeout)
trip(t, subject)

// Assert that immediate runs fail, without being attempted, even if they would
// be successful until restTimeout has elapsed.
err := subject.Run(succeed)
require.ErrorIs(t, err, circuitbreaker.ErrOpen)
require.Equal(t, circuitbreaker.Open, subject.GetStatus())
})

t.Run("half-opens eventually", func(t *testing.T) {
subject := circuitbreaker.New(maxFailures, restTimeout)
trip(t, subject)
require.ErrorIs(t, subject.Run(fail), circuitbreaker.ErrOpen)
// Assert that given function is eventually run after circuit is tripped at
// half-open status by checking error type.
require.Eventually(t, func() bool { return errors.Is(subject.Run(fail), failure) }, eventualTimeout, eventualTick)
})

t.Run("closes after rest timeout and success", func(t *testing.T) {
subject := circuitbreaker.New(maxFailures, restTimeout)
trip(t, subject)

require.Eventually(t, func() bool { return subject.Run(succeed) == nil }, eventualTimeout, eventualTick)
require.Equal(t, circuitbreaker.Closed, subject.GetStatus())
})

t.Run("usable concurrently", func(t *testing.T) {
subject := circuitbreaker.New(maxFailures, restTimeout)
const (
wantSuccesses = 7
totalAttempts = 1_000
)
var (
successes, failures int
wg sync.WaitGroup
)
for range totalAttempts {
wg.Add(1)
go func() {
defer wg.Done()
_ = subject.Run(func() error {
// Unsafely increment/decrement counters so that if Run is not synchronised
// properly the test creates a race condition.
if successes < wantSuccesses {
successes++
return nil
}
failures++
return errors.New("error")
})
}()
}
wg.Wait()
require.Equal(t, wantSuccesses, successes)
require.Equal(t, maxFailures, failures)
})
}

0 comments on commit f003b1f

Please sign in to comment.