Skip to content

Commit f6aaa02

Browse files
committed
Avoid interrupt inflight requests after a new socket connect failed
1 parent 3f83fa5 commit f6aaa02

File tree

4 files changed

+120
-19
lines changed

4 files changed

+120
-19
lines changed

cluster.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,10 +122,10 @@ func (cluster *mongoCluster) removeServer(server *mongoServer) {
122122
other := cluster.servers.Remove(server)
123123
cluster.Unlock()
124124
if other != nil {
125-
other.Close()
125+
other.CloseIdle()
126126
log("Removed server ", server.Addr, " from cluster.")
127127
}
128-
server.Close()
128+
server.CloseIdle()
129129
}
130130

131131
type isMasterResult struct {

cluster_test.go

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,13 @@
2727
package mgo_test
2828

2929
import (
30+
"errors"
3031
"fmt"
3132
"io"
3233
"net"
3334
"strings"
3435
"sync"
36+
"sync/atomic"
3537
"time"
3638

3739
. "gopkg.in/check.v1"
@@ -1477,7 +1479,6 @@ func (s *S) TestSecondaryModeWithMongosInsert(c *C) {
14771479
c.Assert(result.A, Equals, 1)
14781480
}
14791481

1480-
14811482
func (s *S) TestRemovalOfClusterMember(c *C) {
14821483
if *fast {
14831484
c.Skip("-fast")
@@ -2088,3 +2089,72 @@ func (s *S) TestDoNotFallbackToMonotonic(c *C) {
20882089
c.Assert(q13b, Equals, q13a)
20892090
}
20902091
}
2092+
2093+
func (s *S) TestConnectServerFailed(c *C) {
2094+
dials := int32(0)
2095+
maxDials := 50
2096+
info := &mgo.DialInfo{
2097+
Addrs: []string{"localhost:40001"},
2098+
DialServer: func(addr *mgo.ServerAddr) (net.Conn, error) {
2099+
n := atomic.AddInt32(&dials, 1)
2100+
if n == int32(maxDials/2) {
2101+
return nil, errors.New("expected dial failed")
2102+
}
2103+
return net.Dial("tcp", addr.String())
2104+
},
2105+
}
2106+
2107+
session, err := mgo.DialWithInfo(info)
2108+
c.Assert(err, IsNil)
2109+
defer session.Close()
2110+
2111+
mgo.ResetStats()
2112+
2113+
errs := make(chan error, 1)
2114+
var done int32
2115+
var finished sync.WaitGroup
2116+
var starting sync.WaitGroup
2117+
defer func() {
2118+
atomic.StoreInt32(&done, 1)
2119+
finished.Wait()
2120+
}()
2121+
for i := 0; i < maxDials; i++ {
2122+
finished.Add(1)
2123+
starting.Add(1)
2124+
go func(s0 *mgo.Session) {
2125+
defer finished.Done()
2126+
for i := 0; ; i++ {
2127+
if atomic.LoadInt32(&done) == 1 {
2128+
break
2129+
}
2130+
err := func(s0 *mgo.Session) error {
2131+
s := s0.Copy()
2132+
defer s.Close()
2133+
coll := s.DB("mydb").C("mycoll")
2134+
2135+
var ret []interface{}
2136+
return coll.Find(nil).All(&ret)
2137+
}(s0)
2138+
if err != nil {
2139+
select {
2140+
case errs <- err:
2141+
default:
2142+
}
2143+
}
2144+
if i == 0 {
2145+
starting.Done()
2146+
}
2147+
}
2148+
}(session)
2149+
time.Sleep(10 * time.Millisecond)
2150+
}
2151+
starting.Wait()
2152+
2153+
// no errors expect.
2154+
var opErr error
2155+
select {
2156+
case opErr = <-errs:
2157+
default:
2158+
}
2159+
c.Assert(opErr, IsNil)
2160+
}

server.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,16 @@ func (server *mongoServer) Connect(timeout time.Duration) (*mongoSocket, error)
187187
// Close forces closing all sockets that are alive, whether
188188
// they're currently in use or not.
189189
func (server *mongoServer) Close() {
190+
server.close(false)
191+
}
192+
193+
// CloseIdle closing all sockets that are idle,
194+
// sockets currently in use will be closed after idle.
195+
func (server *mongoServer) CloseIdle() {
196+
server.close(true)
197+
}
198+
199+
func (server *mongoServer) close(waitForIdle bool) {
190200
server.Lock()
191201
server.closed = true
192202
liveSockets := server.liveSockets
@@ -196,7 +206,11 @@ func (server *mongoServer) Close() {
196206
server.Unlock()
197207
logf("Connections to %s closing (%d live sockets).", server.Addr, len(liveSockets))
198208
for i, s := range liveSockets {
199-
s.Close()
209+
if waitForIdle {
210+
s.CloseAfterIdle()
211+
} else {
212+
s.Close()
213+
}
200214
liveSockets[i] = nil
201215
}
202216
for i := range unusedSockets {

socket.go

Lines changed: 32 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -40,19 +40,20 @@ type replyFunc func(err error, reply *replyOp, docNum int, docData []byte)
4040

4141
type mongoSocket struct {
4242
sync.Mutex
43-
server *mongoServer // nil when cached
44-
conn net.Conn
45-
timeout time.Duration
46-
addr string // For debugging only.
47-
nextRequestId uint32
48-
replyFuncs map[uint32]replyFunc
49-
references int
50-
creds []Credential
51-
logout []Credential
52-
cachedNonce string
53-
gotNonce sync.Cond
54-
dead error
55-
serverInfo *mongoServerInfo
43+
server *mongoServer // nil when cached
44+
conn net.Conn
45+
timeout time.Duration
46+
addr string // For debugging only.
47+
nextRequestId uint32
48+
replyFuncs map[uint32]replyFunc
49+
references int
50+
creds []Credential
51+
logout []Credential
52+
cachedNonce string
53+
gotNonce sync.Cond
54+
dead error
55+
serverInfo *mongoServerInfo
56+
closeAfterIdle bool
5657
}
5758

5859
type queryOpFlags uint32
@@ -264,10 +265,13 @@ func (socket *mongoSocket) Release() {
264265
if socket.references == 0 {
265266
stats.socketsInUse(-1)
266267
server := socket.server
268+
closeAfterIdle := socket.closeAfterIdle
267269
socket.Unlock()
268270
socket.LogoutAll()
269-
// If the socket is dead server is nil.
270-
if server != nil {
271+
if closeAfterIdle {
272+
socket.Close()
273+
} else if server != nil {
274+
// If the socket is dead server is nil.
271275
server.RecycleSocket(socket)
272276
}
273277
} else {
@@ -316,6 +320,19 @@ func (socket *mongoSocket) Close() {
316320
socket.kill(errors.New("Closed explicitly"), false)
317321
}
318322

323+
func (socket *mongoSocket) CloseAfterIdle() {
324+
socket.Lock()
325+
if socket.references == 0 {
326+
socket.Unlock()
327+
socket.Close()
328+
logf("Socket %p to %s: idle and close.", socket, socket.addr)
329+
return
330+
}
331+
socket.closeAfterIdle = true
332+
socket.Unlock()
333+
logf("Socket %p to %s: close after idle.", socket, socket.addr)
334+
}
335+
319336
func (socket *mongoSocket) kill(err error, abend bool) {
320337
socket.Lock()
321338
if socket.dead != nil {

0 commit comments

Comments
 (0)