Skip to content

Commit 7dc6f2b

Browse files
Merge pull request #17 from utilitywarehouse/retry-backoff
Add a backoff retry mechanism for starting runners
2 parents 8b495d7 + 59e77a2 commit 7dc6f2b

File tree

7 files changed

+202
-16
lines changed

7 files changed

+202
-16
lines changed

Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ ENV CGO_ENABLED=0
55
RUN \
66
apk --no-cache add git upx \
77
&& go get -t ./... \
8-
&& go test -v \
8+
&& go test -v ./... \
99
&& go build -ldflags='-s -w' -o /semaphore-wireguard . \
1010
&& upx /semaphore-wireguard
1111

backoff/backoff.go

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
// Package backoff includes this backoff function copied from: https://github.com/jpillora/backoff/blob/d80867952dff4e2fbfb4280ded4ff94d67790457/backoff.go
2+
package backoff
3+
4+
import (
5+
"math"
6+
"math/rand"
7+
"sync/atomic"
8+
"time"
9+
)
10+
11+
// Backoff is a time.Duration counter, starting at Min. After every call to
12+
// the Duration method the current timing is multiplied by Factor, but it
13+
// never exceeds Max.
14+
//
15+
// Backoff is not generally concurrent-safe, but the ForAttempt method can
16+
// be used concurrently.
17+
type Backoff struct {
18+
attempt uint64
19+
// Factor is the multiplying factor for each increment step
20+
Factor float64
21+
// Jitter eases contention by randomizing backoff steps
22+
Jitter bool
23+
// Min and Max are the minimum and maximum values of the counter
24+
Min, Max time.Duration
25+
}
26+
27+
// Duration returns the duration for the current attempt before incrementing
28+
// the attempt counter. See ForAttempt.
29+
func (b *Backoff) Duration() time.Duration {
30+
d := b.ForAttempt(float64(atomic.AddUint64(&b.attempt, 1) - 1))
31+
return d
32+
}
33+
34+
const maxInt64 = float64(math.MaxInt64 - 512)
35+
36+
// ForAttempt returns the duration for a specific attempt. This is useful if
37+
// you have a large number of independent Backoffs, but don't want use
38+
// unnecessary memory storing the Backoff parameters per Backoff. The first
39+
// attempt should be 0.
40+
//
41+
// ForAttempt is concurrent-safe.
42+
func (b *Backoff) ForAttempt(attempt float64) time.Duration {
43+
// Zero-values are nonsensical, so we use
44+
// them to apply defaults
45+
min := b.Min
46+
if min <= 0 {
47+
min = 100 * time.Millisecond
48+
}
49+
max := b.Max
50+
if max <= 0 {
51+
max = 10 * time.Second
52+
}
53+
if min >= max {
54+
// short-circuit
55+
return max
56+
}
57+
factor := b.Factor
58+
if factor <= 0 {
59+
factor = 2
60+
}
61+
//calculate this duration
62+
minf := float64(min)
63+
durf := minf * math.Pow(factor, attempt)
64+
if b.Jitter {
65+
durf = rand.Float64()*(durf-minf) + minf
66+
}
67+
//ensure float64 wont overflow int64
68+
if durf > maxInt64 {
69+
return max
70+
}
71+
dur := time.Duration(durf)
72+
//keep within bounds
73+
if dur < min {
74+
return min
75+
}
76+
if dur > max {
77+
return max
78+
}
79+
return dur
80+
}
81+
82+
// Reset restarts the current attempt counter at zero.
83+
func (b *Backoff) Reset() {
84+
atomic.StoreUint64(&b.attempt, 0)
85+
}
86+
87+
// Attempt returns the current attempt counter value.
88+
func (b *Backoff) Attempt() float64 {
89+
return float64(atomic.LoadUint64(&b.attempt))
90+
}
91+
92+
// Copy returns a backoff with equals constraints as the original
93+
func (b *Backoff) Copy() *Backoff {
94+
return &Backoff{
95+
Factor: b.Factor,
96+
Jitter: b.Jitter,
97+
Min: b.Min,
98+
Max: b.Max,
99+
}
100+
}

backoff/retry.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package backoff
2+
3+
import (
4+
"time"
5+
6+
"github.com/utilitywarehouse/semaphore-wireguard/log"
7+
)
8+
9+
type operation func() error
10+
11+
const (
12+
defaultBackoffJitter = true
13+
defaultBackoffMin = 2 * time.Second
14+
defaultBackoffMax = 1 * time.Minute
15+
)
16+
17+
// Retry will use the default backoff values to retry the passed operation
18+
func Retry(op operation, description string) {
19+
b := &Backoff{
20+
Jitter: defaultBackoffJitter,
21+
Min: defaultBackoffMin,
22+
Max: defaultBackoffMax,
23+
}
24+
RetryWithBackoff(op, b, description)
25+
}
26+
27+
// RetryWithBackoff will retry the passed function (operation) using the given
28+
// backoff
29+
func RetryWithBackoff(op operation, b *Backoff, description string) {
30+
b.Reset()
31+
for {
32+
err := op()
33+
if err == nil {
34+
return
35+
}
36+
d := b.Duration()
37+
log.Logger.Error("Retry failed",
38+
"description", description,
39+
"error", err,
40+
"backoff", d,
41+
)
42+
time.Sleep(d)
43+
}
44+
}

backoff/retry_test.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package backoff
2+
3+
import (
4+
"errors"
5+
"testing"
6+
"time"
7+
8+
"github.com/stretchr/testify/assert"
9+
"github.com/utilitywarehouse/semaphore-wireguard/log"
10+
)
11+
12+
var testFuncCallCounter int
13+
var successThreshold int
14+
15+
func testFunc() error {
16+
testFuncCallCounter++
17+
18+
if testFuncCallCounter >= successThreshold {
19+
return nil
20+
}
21+
return errors.New("error")
22+
23+
}
24+
25+
func TestRetryWithBackoff(t *testing.T) {
26+
log.InitLogger("retry-test", "info")
27+
b := &Backoff{
28+
Jitter: false,
29+
Min: 10 * time.Millisecond,
30+
Max: 1 * time.Second,
31+
}
32+
successThreshold = 3
33+
34+
// Retrying testFunc should fail 2 times before hitting the success
35+
// threshold
36+
RetryWithBackoff(testFunc, b, "test func")
37+
assert.Equal(t, testFuncCallCounter, 3) // should be 3 after 2 consecutive fails
38+
assert.Equal(t, b.Duration(), 40*time.Millisecond) // should be 40 millisec after failing for 10 and 20 and without a jitter
39+
}

config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,18 @@ const (
1111
defaultWGListenPort = 51820
1212
)
1313

14+
// Duration is a helper to unmarshal time.Duration from json
1415
// https://stackoverflow.com/questions/48050945/how-to-unmarshal-json-into-durations/54571600#54571600
1516
type Duration struct {
1617
time.Duration
1718
}
1819

20+
// MarshalJSON calls json Marshall on Duration
1921
func (d Duration) MarshalJSON() ([]byte, error) {
2022
return json.Marshal(d.String())
2123
}
2224

25+
// UnmarshalJSON provides handling of time.Duration when unmarshalling
2326
func (d *Duration) UnmarshalJSON(b []byte) error {
2427
var v interface{}
2528
if err := json.Unmarshal(b, &v); err != nil {
@@ -58,6 +61,7 @@ type remoteClusterConfig struct {
5861
ResyncPeriod Duration `json:"resyncPeriod"`
5962
}
6063

64+
// Config holds the application configuration
6165
type Config struct {
6266
Local localClusterConfig `json:"local"`
6367
Remotes []*remoteClusterConfig `json:"remotes"`

main.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/prometheus/client_golang/prometheus/promhttp"
1313
"golang.zx2c4.com/wireguard/wgctrl"
1414

15+
"github.com/utilitywarehouse/semaphore-wireguard/backoff"
1516
"github.com/utilitywarehouse/semaphore-wireguard/kube"
1617
"github.com/utilitywarehouse/semaphore-wireguard/log"
1718
"k8s.io/client-go/kubernetes"
@@ -89,9 +90,7 @@ func main() {
8990
wgDeviceNames = append(wgDeviceNames, wgDeviceName)
9091
runners = append(runners, r)
9192
go func() {
92-
if err := r.Run(); err != nil {
93-
log.Logger.Error("Failed to start runner", "err", err)
94-
}
93+
backoff.Retry(r.Run, "start runner")
9594
}()
9695
}
9796

wireguard/wireguard_test.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,28 +11,28 @@ var (
1111

1212
func TestNewPeerConfig(t *testing.T) {
1313
var err error
14-
_, err = newPeerConfig("", "", "", nil)
14+
_, err = NewPeerConfig("", "", "", nil)
1515
if err == nil {
16-
t.Errorf("newPeerConfig: empty publicKey should generate an error")
16+
t.Errorf("NewPeerConfig: empty publicKey should generate an error")
1717
}
18-
_, err = newPeerConfig("foobar", "", "", nil)
18+
_, err = NewPeerConfig("foobar", "", "", nil)
1919
if err == nil {
20-
t.Errorf("newPeerConfig: invalid publicKey should generate an error")
20+
t.Errorf("NewPeerConfig: invalid publicKey should generate an error")
2121
}
22-
_, err = newPeerConfig(validPublicKey, "", "", []string{""})
22+
_, err = NewPeerConfig(validPublicKey, "", "", []string{""})
2323
if err == nil {
24-
t.Errorf("newPeerConfig: invalid allowedIPs should generate an error")
24+
t.Errorf("NewPeerConfig: invalid allowedIPs should generate an error")
2525
}
26-
_, err = newPeerConfig(validPublicKey, "foo", "", validAllowedIPs)
26+
_, err = NewPeerConfig(validPublicKey, "foo", "", validAllowedIPs)
2727
if err == nil {
28-
t.Errorf("newPeerConfig: invalid presharedKey should generate an error")
28+
t.Errorf("NewPeerConfig: invalid presharedKey should generate an error")
2929
}
30-
_, err = newPeerConfig(validPublicKey, validPublicKey, "foo", validAllowedIPs)
30+
_, err = NewPeerConfig(validPublicKey, validPublicKey, "foo", validAllowedIPs)
3131
if err == nil {
32-
t.Errorf("newPeerConfig: invalid endpoint should generate an error")
32+
t.Errorf("NewPeerConfig: invalid endpoint should generate an error")
3333
}
34-
_, err = newPeerConfig(validPublicKey, validPublicKey, "1.1.1.1:1111", validAllowedIPs)
34+
_, err = NewPeerConfig(validPublicKey, validPublicKey, "1.1.1.1:1111", validAllowedIPs)
3535
if err != nil {
36-
t.Errorf("newPeerConfig: unexpected error: %v", err)
36+
t.Errorf("NewPeerConfig: unexpected error: %v", err)
3737
}
3838
}

0 commit comments

Comments
 (0)