Skip to content

Commit

Permalink
Merge pull request #675 from zltl/patch-2
Browse files Browse the repository at this point in the history
Fix connection leak when write failed but dial success.
  • Loading branch information
smallnest authored Jan 26, 2022
2 parents 49d6e7c + d91707b commit 5465428
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 4 deletions.
22 changes: 18 additions & 4 deletions client/circuit_breaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ var (

// ConsecCircuitBreaker is window sliding CircuitBreaker with failure threshold.
type ConsecCircuitBreaker struct {
lastFailureTime time.Time
// time.Time is a compund type, split into second and nano for using atomic.
lastFailureTimeSecond int64
lastFailureTimeNano int32

failures uint64
failureThreshold uint64
window time.Duration
Expand Down Expand Up @@ -64,7 +67,8 @@ func (cb *ConsecCircuitBreaker) Call(fn func() error, d time.Duration) error {
}

func (cb *ConsecCircuitBreaker) ready() bool {
if time.Since(cb.lastFailureTime) > cb.window {
lastFailureTime := cb.loadLastFailureTime()
if time.Since(lastFailureTime) > cb.window {
cb.reset()
return true
}
Expand All @@ -78,7 +82,7 @@ func (cb *ConsecCircuitBreaker) success() {
}
func (cb *ConsecCircuitBreaker) fail() {
atomic.AddUint64(&cb.failures, 1)
cb.lastFailureTime = time.Now()
cb.updateLastFailureTime(time.Now())
}

func (cb *ConsecCircuitBreaker) Success() {
Expand All @@ -94,5 +98,15 @@ func (cb *ConsecCircuitBreaker) Ready() bool {

func (cb *ConsecCircuitBreaker) reset() {
atomic.StoreUint64(&cb.failures, 0)
cb.lastFailureTime = time.Now()
cb.updateLastFailureTime(time.Now())
}

func (cb *ConsecCircuitBreaker) updateLastFailureTime(cur time.Time) {
atomic.StoreInt64(&cb.lastFailureTimeSecond, cur.Unix())
atomic.StoreInt32(&cb.lastFailureTimeNano, int32(cur.UnixNano()))
}
func (cb *ConsecCircuitBreaker) loadLastFailureTime() time.Time {
nano := atomic.LoadInt32(&cb.lastFailureTimeNano)
second := atomic.LoadInt64(&cb.lastFailureTimeSecond)
return time.Unix(second, int64(nano))
}
22 changes: 22 additions & 0 deletions client/circuit_breaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package client

import (
"errors"
"math/rand"
"testing"
"time"
)
Expand Down Expand Up @@ -51,3 +52,24 @@ func TestConsecCircuitBreaker(t *testing.T) {
}

}

func TestCircuitBreakerRace(t *testing.T) {
cb := NewConsecCircuitBreaker(2, 50*time.Millisecond)
routines := 100
loop := 100000

fn := func() error {
if rand.Intn(2) == 1 {
return nil
}
return errors.New("test error")
}

for r := 0; r < routines; r++ {
go func() {
for i := 0; i < loop; i++ {
cb.Call(fn, 100*time.Millisecond)
}
}()
}
}
4 changes: 4 additions & 0 deletions client/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ func newDirectHTTPConn(c *Client, network, address string) (net.Conn, error) {

_, err = io.WriteString(conn, "CONNECT "+path+" HTTP/1.0\n\n")
if err != nil {
// Dial() success but Write() failed here, close the successfully
// created conn before return.
conn.Close()

log.Errorf("failed to make CONNECT: %v", err)
return nil, err
}
Expand Down

0 comments on commit 5465428

Please sign in to comment.