-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Attempt publication with circuit breaker #713
Closed
Closed
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,109 @@ | ||
package circuitbreaker | ||
|
||
import ( | ||
"errors" | ||
"fmt" | ||
"sync" | ||
"time" | ||
) | ||
|
||
const ( | ||
Closed Status = iota | ||
Open | ||
HalfOpen | ||
) | ||
|
||
// ErrOpen signals that the circuit is open. See CircuitBreaker.Run. | ||
var ErrOpen = errors.New("circuit breaker is open") | ||
|
||
type Status int | ||
|
||
type CircuitBreaker struct { | ||
maxFailures int | ||
resetTimeout time.Duration | ||
|
||
// mu guards access to status, lastFailure and failures. | ||
mu sync.Mutex | ||
failures int | ||
lastFailure time.Time | ||
status Status | ||
} | ||
|
||
// New creates a new CircuitBreaker instance with the specified maximum number | ||
// of failures and a reset timeout duration. | ||
// | ||
// See CircuitBreaker.Run. | ||
func New(maxFailures int, resetTimeout time.Duration) *CircuitBreaker { | ||
return &CircuitBreaker{ | ||
maxFailures: maxFailures, | ||
resetTimeout: resetTimeout, | ||
} | ||
} | ||
|
||
// Run attempts to execute the provided function within the context of the | ||
// circuit breaker. It handles state transitions, Closed, Open, or HalfOpen, | ||
// based on the outcome of the attempt. | ||
// | ||
// If the circuit is in the Open state, and not enough time has passed since the | ||
// last failure, the circuit remains open, and the function returns | ||
// ErrOpen without attempting the provided function. If enough time | ||
// has passed, the circuit transitions to HalfOpen, and one attempt is allowed. | ||
// | ||
// In HalfOpen state if the function is executed and returns an error, the | ||
// circuit breaker will transition back to Open status. Otherwise, if the | ||
// function executes successfully, the circuit resets to the Closed state, and | ||
// the failure count is reset to zero. | ||
// | ||
// Example: | ||
// | ||
// cb := NewCircuitBreaker(3, time.Second) | ||
// switch err := cb.Run(func() error { | ||
// // Your attempt logic here | ||
// return nil | ||
// }); { | ||
// case errors.Is(err, ErrCircuitBreakerOpen): | ||
// // No execution attempt was made since the circuit is open. | ||
// case err != nil: | ||
// // Execution attempt failed. | ||
// default: | ||
// // Execution attempt succeeded. | ||
// } | ||
func (cb *CircuitBreaker) Run(attempt func() error) error { | ||
cb.mu.Lock() | ||
defer cb.mu.Unlock() | ||
switch cb.status { | ||
case Open: | ||
if time.Since(cb.lastFailure) < cb.resetTimeout { | ||
// Not enough time has passed since the circuit opened. Do not make any further | ||
// attempts. | ||
return ErrOpen | ||
} | ||
// Enough time has passed since last failure. Proceed to allow one attempt by | ||
// half-opening the circuit. | ||
cb.status = HalfOpen | ||
fallthrough | ||
case HalfOpen, Closed: | ||
if err := attempt(); err != nil { | ||
cb.failures++ | ||
if cb.failures >= cb.maxFailures { | ||
// Trip the circuit as we are at or above the max failure threshold. | ||
cb.status = Open | ||
cb.lastFailure = time.Now() | ||
} | ||
return err | ||
} | ||
// Reset the circuit since the attempt succeeded. | ||
cb.status = Closed | ||
cb.failures = 0 | ||
return nil | ||
default: | ||
return fmt.Errorf("unknown status: %d", cb.status) | ||
} | ||
} | ||
|
||
// GetStatus returns the current status of the CircuitBreaker. | ||
func (cb *CircuitBreaker) GetStatus() Status { | ||
cb.mu.Lock() | ||
defer cb.mu.Unlock() | ||
return cb.status | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
package circuitbreaker_test | ||
|
||
import ( | ||
"errors" | ||
"sync" | ||
"testing" | ||
"time" | ||
|
||
"github.com/filecoin-project/go-f3/internal/circuitbreaker" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func TestCircuitBreaker(t *testing.T) { | ||
t.Parallel() | ||
|
||
const ( | ||
maxFailures = 3 | ||
restTimeout = 10 * time.Millisecond | ||
|
||
eventualTimeout = restTimeout * 2 | ||
eventualTick = restTimeout / 5 | ||
) | ||
|
||
var ( | ||
failure = errors.New("fish out of water") | ||
|
||
succeed = func() error { return nil } | ||
fail = func() error { return failure } | ||
trip = func(t *testing.T, subject *circuitbreaker.CircuitBreaker) { | ||
for range maxFailures { | ||
require.ErrorContains(t, subject.Run(fail), "fish") | ||
} | ||
require.Equal(t, circuitbreaker.Open, subject.GetStatus()) | ||
} | ||
) | ||
|
||
t.Run("closed on no error", func(t *testing.T) { | ||
t.Parallel() | ||
subject := circuitbreaker.New(maxFailures, restTimeout) | ||
require.NoError(t, subject.Run(succeed)) | ||
require.Equal(t, circuitbreaker.Closed, subject.GetStatus()) | ||
}) | ||
|
||
t.Run("opens after max failures and stays open", func(t *testing.T) { | ||
subject := circuitbreaker.New(maxFailures, restTimeout) | ||
trip(t, subject) | ||
|
||
// Assert that immediate runs fail, without being attempted, even if they would | ||
// be successful until restTimeout has elapsed. | ||
err := subject.Run(succeed) | ||
require.ErrorIs(t, err, circuitbreaker.ErrOpen) | ||
require.Equal(t, circuitbreaker.Open, subject.GetStatus()) | ||
}) | ||
|
||
t.Run("half-opens eventually", func(t *testing.T) { | ||
subject := circuitbreaker.New(maxFailures, restTimeout) | ||
trip(t, subject) | ||
require.ErrorIs(t, subject.Run(fail), circuitbreaker.ErrOpen) | ||
// Assert that given function is eventually run after circuit is tripped at | ||
// half-open status by checking error type. | ||
require.Eventually(t, func() bool { return errors.Is(subject.Run(fail), failure) }, eventualTimeout, eventualTick) | ||
}) | ||
|
||
t.Run("closes after rest timeout and success", func(t *testing.T) { | ||
subject := circuitbreaker.New(maxFailures, restTimeout) | ||
trip(t, subject) | ||
|
||
require.Eventually(t, func() bool { return subject.Run(succeed) == nil }, eventualTimeout, eventualTick) | ||
require.Equal(t, circuitbreaker.Closed, subject.GetStatus()) | ||
}) | ||
|
||
t.Run("usable concurrently", func(t *testing.T) { | ||
subject := circuitbreaker.New(maxFailures, restTimeout) | ||
const ( | ||
wantSuccesses = 7 | ||
totalAttempts = 1_000 | ||
) | ||
var ( | ||
successes, failures int | ||
wg sync.WaitGroup | ||
) | ||
for range totalAttempts { | ||
wg.Add(1) | ||
go func() { | ||
defer wg.Done() | ||
_ = subject.Run(func() error { | ||
// Unsafely increment/decrement counters so that if Run is not synchronised | ||
// properly the test creates a race condition. | ||
if successes < wantSuccesses { | ||
successes++ | ||
return nil | ||
} | ||
failures++ | ||
return errors.New("error") | ||
}) | ||
}() | ||
} | ||
wg.Wait() | ||
require.Equal(t, wantSuccesses, successes) | ||
require.Equal(t, maxFailures, failures) | ||
}) | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thoughts on defaults?