Skip to content

Commit

Permalink
fixes #1 Dial should be synchronous by default
Browse files Browse the repository at this point in the history
This makes Dial synchronous, and adds a new option, OptionDialAsynch,
which can be set on a socket or a Dialer to restore the mangos v1
behavior.

We also have fixed bugs in the REQ/REP code, and made it correctly
self-heal "quickly" from a broken socket.  (Basically if our response
pipe disconnects, we can immediately try the request somewhere else.)
  • Loading branch information
gdamore committed Oct 28, 2018
1 parent 8eb9b98 commit d02df92
Show file tree
Hide file tree
Showing 9 changed files with 170 additions and 26 deletions.
15 changes: 15 additions & 0 deletions CHANGES-v2.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,18 @@ as Options and a `pipe.GetOpt()` API is used to access them.
The xstar protocol implicitly retransmits / forwards received messages
just like the cooked protocol. The fact that v1 did not do this was
a bug.

== Dial Now Synchronous

When using vanilla Dialer.Dial(), the calling thread will normally
be blocked until either a connection is established, or an error
occurs on this first attempt. If an error occurs, there will be no
further retries. However, the self-healing mode is used for subsequent
connection attempts.

This mode is intended to facilitate folks who are trying to fix the most
common connection setup errors.

An option, OptionDialAsynch, can be set on sockets or dialers to restore
the old behavior, where a dialer will just run in the background
from the beginning.
103 changes: 100 additions & 3 deletions impl/dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"sync"
"time"

"nanomsg.org/go/mangos/v2/errors"

"nanomsg.org/go/mangos/v2"
"nanomsg.org/go/mangos/v2/transport"
)
Expand All @@ -31,6 +33,7 @@ type dialer struct {
closed bool
active bool
dialing bool
asynch bool
redialer *time.Timer
reconnTime time.Duration
reconnMinTime time.Duration
Expand All @@ -40,18 +43,24 @@ type dialer struct {

func (d *dialer) Dial() error {
d.Lock()
defer d.Unlock()
if d.active {
d.Unlock()
return mangos.ErrAddrInUse
}
if d.closed {
d.Unlock()
return mangos.ErrClosed
}
d.closeq = make(chan struct{})
d.active = true
d.reconnTime = d.reconnMinTime
go d.redial()
return nil
if d.asynch {
go d.redial()
d.Unlock()
return nil
}
d.Unlock()
return d.dial(false)
}

func (d *dialer) Close() error {
Expand All @@ -76,6 +85,11 @@ func (d *dialer) GetOption(n string) (interface{}, error) {
v := d.reconnMaxTime
d.Unlock()
return v, nil
case mangos.OptionDialAsynch:
d.Lock()
v := d.asynch
d.Unlock()
return v, nil
}
if val, err := d.d.GetOption(n); err != mangos.ErrBadOption {
return val, err
Expand All @@ -102,6 +116,13 @@ func (d *dialer) SetOption(n string, v interface{}) error {
return nil
}
return mangos.ErrBadValue
case mangos.OptionDialAsynch:
if v, ok := v.(bool); ok {
d.Lock()
d.asynch = v
d.Unlock()
return nil
}
}
// Transport specific options passed down.
return d.d.SetOption(n, v)
Expand Down Expand Up @@ -132,7 +153,83 @@ func (d *dialer) pipeClosed() {
d.Unlock()
}

func (d *dialer) dial(redial bool) error {
d.Lock()
if d.asynch {
redial = true
}
if d.dialing || d.closed {
// If we already have a dial in progress, then stop.
// This really should never occur (see comments below),
// but having multiple dialers create multiple pipes is
// probably bad. So be paranoid -- I mean "defensive" --
// for now.
d.Unlock()
return errors.ErrAddrInUse
}
if d.redialer != nil {
d.redialer.Stop()
}
d.dialing = true
d.Unlock()

p, err := d.d.Dial()
if err == nil {
d.s.addPipe(p, d, nil)

d.Lock()
d.dialing = false
d.Unlock()
return nil
}

d.Lock()
defer d.Unlock()

// We're no longer dialing, so let another reschedule happen, if
// appropriate. This is quite possibly paranoia. We should only
// be in this routine in the following circumstances:
//
// 1. Initial dialing (via Dial())
// 2. After a previously created pipe fails and is closed due to error.
// 3. After timing out from a failed connection attempt.
//
// The above cases should be mutually exclusive. But paranoia.
// Consider removing the d.dialing logic later if we can prove
// that this never occurs.
d.dialing = false

if !redial {
return err
}
switch err {
case mangos.ErrClosed:
// Stop redialing, no further action.

default:
// Exponential backoff, and jitter. Our backoff grows at
// about 1.3x on average, so we don't penalize a failed
// connection too badly.
minfact := float64(1.1)
maxfact := float64(1.5)
actfact := rand.Float64()*(maxfact-minfact) + minfact
rtime := d.reconnTime
if d.reconnMaxTime != 0 {
d.reconnTime = time.Duration(actfact * float64(d.reconnTime))
if d.reconnTime > d.reconnMaxTime {
d.reconnTime = d.reconnMaxTime
}
}
d.redialer = time.AfterFunc(rtime, d.redial)
}
return err
}

func (d *dialer) redial() {
d.dial(true)
}

func (d *dialer) xredial() {
d.Lock()
if d.dialing || d.closed {
// If we already have a dial in progress, then stop.
Expand Down
37 changes: 27 additions & 10 deletions impl/socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type socket struct {
reconnMinTime time.Duration // reconnect time after error or disconnect
reconnMaxTime time.Duration // max reconnect interval
maxRxSize int // max recv size
dialAsynch bool // asynchronous dialing?

listeners []*listener
dialers []*dialer
Expand Down Expand Up @@ -255,9 +256,27 @@ func (s *socket) NewDialer(addr string, options map[string]interface{}) (mangos.
if err != nil {
return nil, err
}
d := &dialer{
d: td,
s: s,
reconnMinTime: s.reconnMinTime,
reconnMaxTime: s.reconnMaxTime,
addr: addr,
}
for n, v := range options {
if err = td.SetOption(n, v); err != nil {
return nil, err
switch n {
case mangos.OptionReconnectTime:
fallthrough
case mangos.OptionMaxReconnectTime:
fallthrough
case mangos.OptionDialAsynch:
if err := d.SetOption(n, v); err != nil {
return nil, err
}
default:
if err = td.SetOption(n, v); err != nil {
return nil, err
}
}
}
if _, ok := options[mangos.OptionMaxRecvSize]; !ok {
Expand All @@ -267,14 +286,6 @@ func (s *socket) NewDialer(addr string, options map[string]interface{}) (mangos.
}
}

d := &dialer{
d: td,
s: s,
reconnMinTime: s.reconnMinTime,
reconnMaxTime: s.reconnMaxTime,
addr: addr,
}

s.Lock()
if s.closed {
s.Unlock()
Expand Down Expand Up @@ -371,6 +382,12 @@ func (s *socket) SetOption(name string, value interface{}) error {
} else {
return mangos.ErrBadValue
}
case mangos.OptionDialAsynch:
if v, ok := value.(bool); ok {
s.dialAsynch = v
} else {
return mangos.ErrBadValue
}
default:
return mangos.ErrBadOption
}
Expand Down
9 changes: 9 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,4 +183,13 @@ const (
// OptionHTTPRequest conveys an *http.Request. This read-only option
// only exists for Pipes using websocket connections.
OptionHTTPRequest = "HTTP-REQUEST"

// OptionDialAsynch (used on a Dialer) causes the Dial() operation
// to run in the background. Further, the Dialer will always redial,
// even if the first attempt fails. (Normally dialing is performed
// synchronously, so that if the remote peer is unavailable at first
// the caller can learn of the error and handle or report it.
// Note that mangos v1 behavior is the same as if this option is
// set to true.
OptionDialAsynch = "DIAL-ASYNCH"
)
8 changes: 7 additions & 1 deletion protocol/req/req.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (c *context) unscheduleSend() {
c.wantw = false
for i, c2 := range s.sendq {
if c2 == c {
s.sendq = append(s.sendq[0:i-1],
s.sendq = append(s.sendq[:i],
s.sendq[i+1:]...)
return
}
Expand Down Expand Up @@ -525,6 +525,11 @@ func (s *socket) RemovePipe(ep protocol.Pipe) {
p.closed = true
ep.Close()
delete(s.pipes, ep.GetID())
for i, rp := range s.readyq {
if p == rp {
s.readyq = append(s.readyq[:i], s.readyq[i+1:]...)
}
}
for c := range s.ctxs {
if c.lastPipe == p {
// We are closing this pipe, so we need to
Expand Down Expand Up @@ -565,6 +570,7 @@ func NewProtocol() protocol.Protocol {
cond: sync.NewCond(s),
resendTime: time.Minute,
}
s.ctxs[s.defCtx] = struct{}{}
return s
}

Expand Down
1 change: 0 additions & 1 deletion test/busdevice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ func TestBusDevice(t *testing.T) {
So(s1, ShouldNotBeNil)
defer s1.Close()
s1.AddTransport(inproc.NewTransport())
So(mangos.Device(s1, s1), ShouldBeNil)
So(s1.Listen("inproc://busdevicetest"), ShouldBeNil)
// Create the device
So(mangos.Device(s1, s1), ShouldBeNil)
Expand Down
1 change: 1 addition & 0 deletions test/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ func (c *T) Dial() bool {
options[mangos.OptionTLSConfig] = cliCfg
}

options[mangos.OptionDialAsynch] = true
err := c.Sock.DialOptions(c.addr, options)
if err != nil {
c.Errorf("Dial (%s) failed: %v", c.addr, err)
Expand Down
16 changes: 8 additions & 8 deletions test/device_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,22 +276,22 @@ func testDevChain(t *testing.T, addr1 string, addr2 string, addr3 string) {
t.Errorf("s[0] Listen: %v", err)
return
}
if err = s[1].Dial(addr2); err != nil {
t.Errorf("s[1] Dial: %v", err)
return
}
if err = s[2].Listen(addr2); err != nil {
t.Errorf("s[2] Listen: %v", err)
return
}
if err = s[3].Dial(addr3); err != nil {
t.Errorf("s[3] Dial: %v", err)
return
}
if err = s[4].Listen(addr3); err != nil {
t.Errorf("s[4] Listen: %v", err)
return
}
if err = s[1].Dial(addr2); err != nil {
t.Errorf("s[1] Dial: %v", err)
return
}
if err = s[3].Dial(addr3); err != nil {
t.Errorf("s[3] Dial: %v", err)
return
}
if err = mangos.Device(s[0], s[1]); err != nil {
t.Errorf("s[0],s[1] Device: %v", err)
return
Expand Down
6 changes: 3 additions & 3 deletions test/reqretry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ func TestReqRetry(t *testing.T) {

err = sockreq.SetOption(mangos.OptionReconnectTime, time.Millisecond*100)
So(err, ShouldBeNil)
err = sockreq.SetOption(mangos.OptionDialAsynch, true)
So(err, ShouldBeNil)

l, err := sockrep.NewListener(addr, nil)
So(err, ShouldBeNil)
Expand Down Expand Up @@ -87,9 +89,7 @@ func TestReqRetry(t *testing.T) {
m.Free()
})

// Following is skipped for now because of the backout
// of e5e6478f44cda1eb8427b590755270e2704a990d
SkipConvey("A request is reissued on server re-connect", func() {
Convey("A request is reissued on server re-connect", func() {

rep2, err := rep.NewSocket()
So(err, ShouldBeNil)
Expand Down

0 comments on commit d02df92

Please sign in to comment.