-
Notifications
You must be signed in to change notification settings - Fork 7
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Attempt publication with circuit breaker
GPBFT presciently ignores broadcast and rebroadcast requests as it is beyond its boundaries of responsibility to do something about. In practice such failures may be a sign that pubsub is overwhelmed with messages. Therefore, ideally the system should avoid aggravating the situation by requesting further broadcasts. This is specially important in re-broadcast requests because it often involves batch message publication. The changes here wrap the pubsub publication calls with a circuit breaker that will open on consecutive errors (set to `5`), and will not attempt to actually publish messages until a reset timeout (set to `3s`) has passed. Fixes #632
- Loading branch information
Showing
3 changed files
with
225 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,109 @@ | ||
package circuitbreaker | ||
|
||
import ( | ||
"errors" | ||
"fmt" | ||
"sync" | ||
"time" | ||
) | ||
|
||
const ( | ||
Closed Status = iota | ||
Open | ||
HalfOpen | ||
) | ||
|
||
// ErrOpen signals that the circuit is open. See CircuitBreaker.Run. | ||
var ErrOpen = errors.New("circuit breaker is open") | ||
|
||
type Status int | ||
|
||
type CircuitBreaker struct { | ||
maxFailures int | ||
resetTimeout time.Duration | ||
|
||
// mu guards access to status, lastFailure and failures. | ||
mu sync.Mutex | ||
failures int | ||
lastFailure time.Time | ||
status Status | ||
} | ||
|
||
// New creates a new CircuitBreaker instance with the specified maximum number | ||
// of failures and a reset timeout duration. | ||
// | ||
// See CircuitBreaker.Run. | ||
func New(maxFailures int, resetTimeout time.Duration) *CircuitBreaker { | ||
return &CircuitBreaker{ | ||
maxFailures: maxFailures, | ||
resetTimeout: resetTimeout, | ||
} | ||
} | ||
|
||
// Run attempts to execute the provided function within the context of the | ||
// circuit breaker. It handles state transitions, Closed, Open, or HalfOpen, | ||
// based on the outcome of the attempt. | ||
// | ||
// If the circuit is in the Open state, and not enough time has passed since the | ||
// last failure, the circuit remains open, and the function returns | ||
// ErrOpen without attempting the provided function. If enough time | ||
// has passed, the circuit transitions to HalfOpen, and one attempt is allowed. | ||
// | ||
// In HalfOpen state if the function is executed and returns an error, the | ||
// circuit breaker will transition back to Open status. Otherwise, if the | ||
// function executes successfully, the circuit resets to the Closed state, and | ||
// the failure count is reset to zero. | ||
// | ||
// Example: | ||
// | ||
// cb := NewCircuitBreaker(3, time.Second) | ||
// switch err := cb.Run(func() error { | ||
// // Your attempt logic here | ||
// return nil | ||
// }); { | ||
// case errors.Is(err, ErrCircuitBreakerOpen): | ||
// // No execution attempt was made since the circuit is open. | ||
// case err != nil: | ||
// // Execution attempt failed. | ||
// default: | ||
// // Execution attempt succeeded. | ||
// } | ||
func (cb *CircuitBreaker) Run(attempt func() error) error { | ||
cb.mu.Lock() | ||
defer cb.mu.Unlock() | ||
switch cb.status { | ||
case Open: | ||
if time.Since(cb.lastFailure) < cb.resetTimeout { | ||
// Not enough time has passed since the circuit opened. Do not make any further | ||
// attempts. | ||
return ErrOpen | ||
} | ||
// Enough time has passed since last failure. Proceed to allow one attempt by | ||
// half-opening the circuit. | ||
cb.status = HalfOpen | ||
fallthrough | ||
case HalfOpen, Closed: | ||
if err := attempt(); err != nil { | ||
cb.failures++ | ||
if cb.failures >= cb.maxFailures { | ||
// Trip the circuit as we are at or above the max failure threshold. | ||
cb.status = Open | ||
cb.lastFailure = time.Now() | ||
} | ||
return err | ||
} | ||
// Reset the circuit since the attempt succeeded. | ||
cb.status = Closed | ||
cb.failures = 0 | ||
return nil | ||
default: | ||
return fmt.Errorf("unknown status: %d", cb.status) | ||
} | ||
} | ||
|
||
// GetStatus returns the current status of the CircuitBreaker. | ||
func (cb *CircuitBreaker) GetStatus() Status { | ||
cb.mu.Lock() | ||
defer cb.mu.Unlock() | ||
return cb.status | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
package circuitbreaker_test | ||
|
||
import ( | ||
"errors" | ||
"sync" | ||
"testing" | ||
"time" | ||
|
||
"github.com/filecoin-project/go-f3/internal/circuitbreaker" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func TestCircuitBreaker(t *testing.T) { | ||
t.Parallel() | ||
|
||
const ( | ||
maxFailures = 3 | ||
restTimeout = 10 * time.Millisecond | ||
|
||
eventualTimeout = restTimeout * 2 | ||
eventualTick = restTimeout / 5 | ||
) | ||
|
||
var ( | ||
failure = errors.New("fish out of water") | ||
|
||
succeed = func() error { return nil } | ||
fail = func() error { return failure } | ||
trip = func(t *testing.T, subject *circuitbreaker.CircuitBreaker) { | ||
for range maxFailures { | ||
require.ErrorContains(t, subject.Run(fail), "fish") | ||
} | ||
require.Equal(t, circuitbreaker.Open, subject.GetStatus()) | ||
} | ||
) | ||
|
||
t.Run("closed on no error", func(t *testing.T) { | ||
t.Parallel() | ||
subject := circuitbreaker.New(maxFailures, restTimeout) | ||
require.NoError(t, subject.Run(succeed)) | ||
require.Equal(t, circuitbreaker.Closed, subject.GetStatus()) | ||
}) | ||
|
||
t.Run("opens after max failures and stays open", func(t *testing.T) { | ||
subject := circuitbreaker.New(maxFailures, restTimeout) | ||
trip(t, subject) | ||
|
||
// Assert that immediate runs fail, without being attempted, even if they would | ||
// be successful until restTimeout has elapsed. | ||
err := subject.Run(succeed) | ||
require.ErrorIs(t, err, circuitbreaker.ErrOpen) | ||
require.Equal(t, circuitbreaker.Open, subject.GetStatus()) | ||
}) | ||
|
||
t.Run("half-opens eventually", func(t *testing.T) { | ||
subject := circuitbreaker.New(maxFailures, restTimeout) | ||
trip(t, subject) | ||
require.ErrorIs(t, subject.Run(fail), circuitbreaker.ErrOpen) | ||
// Assert that given function is eventually run after circuit is tripped at | ||
// half-open status by checking error type. | ||
require.Eventually(t, func() bool { return errors.Is(subject.Run(fail), failure) }, eventualTimeout, eventualTick) | ||
}) | ||
|
||
t.Run("closes after rest timeout and success", func(t *testing.T) { | ||
subject := circuitbreaker.New(maxFailures, restTimeout) | ||
trip(t, subject) | ||
|
||
require.Eventually(t, func() bool { return subject.Run(succeed) == nil }, eventualTimeout, eventualTick) | ||
require.Equal(t, circuitbreaker.Closed, subject.GetStatus()) | ||
}) | ||
|
||
t.Run("usable concurrently", func(t *testing.T) { | ||
subject := circuitbreaker.New(maxFailures, restTimeout) | ||
const ( | ||
wantSuccesses = 7 | ||
totalAttempts = 1_000 | ||
) | ||
var ( | ||
successes, failures int | ||
wg sync.WaitGroup | ||
) | ||
for range totalAttempts { | ||
wg.Add(1) | ||
go func() { | ||
defer wg.Done() | ||
_ = subject.Run(func() error { | ||
// Unsafely increment/decrement counters so that if Run is not synchronised | ||
// properly the test creates a race condition. | ||
if successes < wantSuccesses { | ||
successes++ | ||
return nil | ||
} | ||
failures++ | ||
return errors.New("error") | ||
}) | ||
}() | ||
} | ||
wg.Wait() | ||
require.Equal(t, wantSuccesses, successes) | ||
require.Equal(t, maxFailures, failures) | ||
}) | ||
} |