Skip to content

Commit

Permalink
fixes #38 Applications should not have to call socket.AddTransport()
Browse files Browse the repository at this point in the history
  • Loading branch information
gdamore committed Oct 31, 2018
1 parent 7e4c6eb commit edd3a10
Show file tree
Hide file tree
Showing 45 changed files with 198 additions and 246 deletions.
8 changes: 8 additions & 0 deletions CHANGES-v2.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -123,3 +123,11 @@ in a queue for a long time.
The ability to lookup protocol names by their protocol number is removed.
Each protocol instead has their identities (including name and string)
as constants (`Self`, `Peer`, `SelfName`, and `PeerName`) in the package.

== Simplified Transport registration

To register a transport, just import the transport package. (You can
use an anonymous import (i.e. an underscore import) to bring transport
packages in. The `AddTransport()` method on sockets, and `NewTransport()`
method for transport packages have been removed. (Transport implementations
can register themselves with transport.RegisterTransport().)
7 changes: 3 additions & 4 deletions examples/bus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ import (

"nanomsg.org/go/mangos/v2"
"nanomsg.org/go/mangos/v2/protocol/bus"
"nanomsg.org/go/mangos/v2/transport/ipc"
"nanomsg.org/go/mangos/v2/transport/tcp"

// register transports
_ "nanomsg.org/go/mangos/v2/transport/all"
)

func die(format string, v ...interface{}) {
Expand All @@ -55,8 +56,6 @@ func node(args []string) {
if sock, err = bus.NewSocket(); err != nil {
die("bus.NewSocket: %s", err)
}
sock.AddTransport(ipc.NewTransport())
sock.AddTransport(tcp.NewTransport())
if err = sock.Listen(args[2]); err != nil {
die("sock.Listen: %s", err.Error())
}
Expand Down
9 changes: 3 additions & 6 deletions examples/pair/pair.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ import (

"nanomsg.org/go/mangos/v2"
"nanomsg.org/go/mangos/v2/protocol/pair"
"nanomsg.org/go/mangos/v2/transport/ipc"
"nanomsg.org/go/mangos/v2/transport/tcp"

// register transports
_ "nanomsg.org/go/mangos/v2/transport/all"
)

func die(format string, v ...interface{}) {
Expand Down Expand Up @@ -72,8 +73,6 @@ func node0(url string) {
if sock, err = pair.NewSocket(); err != nil {
die("can't get new pair socket: %s", err)
}
sock.AddTransport(ipc.NewTransport())
sock.AddTransport(tcp.NewTransport())
if err = sock.Listen(url); err != nil {
die("can't listen on pair socket: %s", err.Error())
}
Expand All @@ -87,8 +86,6 @@ func node1(url string) {
if sock, err = pair.NewSocket(); err != nil {
die("can't get new pair socket: %s", err.Error())
}
sock.AddTransport(ipc.NewTransport())
sock.AddTransport(tcp.NewTransport())
if err = sock.Dial(url); err != nil {
die("can't dial on pair socket: %s", err.Error())
}
Expand Down
9 changes: 3 additions & 6 deletions examples/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ import (
"nanomsg.org/go/mangos/v2"
"nanomsg.org/go/mangos/v2/protocol/pull"
"nanomsg.org/go/mangos/v2/protocol/push"
"nanomsg.org/go/mangos/v2/transport/ipc"
"nanomsg.org/go/mangos/v2/transport/tcp"

// register transports
_ "nanomsg.org/go/mangos/v2/transport/all"
)

func die(format string, v ...interface{}) {
Expand All @@ -49,8 +50,6 @@ func node0(url string) {
if sock, err = pull.NewSocket(); err != nil {
die("can't get new pull socket: %s", err)
}
sock.AddTransport(ipc.NewTransport())
sock.AddTransport(tcp.NewTransport())
if err = sock.Listen(url); err != nil {
die("can't listen on pull socket: %s", err.Error())
}
Expand All @@ -68,8 +67,6 @@ func node1(url string, msg string) {
if sock, err = push.NewSocket(); err != nil {
die("can't get new push socket: %s", err.Error())
}
sock.AddTransport(ipc.NewTransport())
sock.AddTransport(tcp.NewTransport())
if err = sock.Dial(url); err != nil {
die("can't dial on push socket: %s", err.Error())
}
Expand Down
9 changes: 3 additions & 6 deletions examples/pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ import (
"nanomsg.org/go/mangos/v2"
"nanomsg.org/go/mangos/v2/protocol/pub"
"nanomsg.org/go/mangos/v2/protocol/sub"
"nanomsg.org/go/mangos/v2/transport/ipc"
"nanomsg.org/go/mangos/v2/transport/tcp"

// register transports
_ "nanomsg.org/go/mangos/v2/transport/all"
)

func die(format string, v ...interface{}) {
Expand All @@ -55,8 +56,6 @@ func server(url string) {
if sock, err = pub.NewSocket(); err != nil {
die("can't get new pub socket: %s", err)
}
sock.AddTransport(ipc.NewTransport())
sock.AddTransport(tcp.NewTransport())
if err = sock.Listen(url); err != nil {
die("can't listen on pub socket: %s", err.Error())
}
Expand All @@ -79,8 +78,6 @@ func client(url string, name string) {
if sock, err = sub.NewSocket(); err != nil {
die("can't get new sub socket: %s", err.Error())
}
sock.AddTransport(ipc.NewTransport())
sock.AddTransport(tcp.NewTransport())
if err = sock.Dial(url); err != nil {
die("can't dial on sub socket: %s", err.Error())
}
Expand Down
7 changes: 3 additions & 4 deletions examples/raw/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ import (

"nanomsg.org/go/mangos/v2"
"nanomsg.org/go/mangos/v2/protocol/req"
"nanomsg.org/go/mangos/v2/transport/ipc"
"nanomsg.org/go/mangos/v2/transport/tcp"

// register transports
_ "nanomsg.org/go/mangos/v2/transport/all"
)

// synchronize our output messaging so we don't overlap
Expand All @@ -39,8 +40,6 @@ func clientWorker(url string, id int) {

// Leave this in Cooked mode!

sock.AddTransport(ipc.NewTransport())
sock.AddTransport(tcp.NewTransport())
if err = sock.Dial(url); err != nil {
die("can't dial on req socket: %s", err.Error())
}
Expand Down
7 changes: 3 additions & 4 deletions examples/raw/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ import (

"nanomsg.org/go/mangos/v2"
"nanomsg.org/go/mangos/v2/protocol/xrep"
"nanomsg.org/go/mangos/v2/transport/ipc"
"nanomsg.org/go/mangos/v2/transport/tcp"

// register transports
_ "nanomsg.org/go/mangos/v2/transport/all"
)

// Our protocol is simple. Request packet is empty. The reply
Expand Down Expand Up @@ -64,8 +65,6 @@ func server(url string, nworkers int) {
if sock, err = xrep.NewSocket(); err != nil {
die("can't get new rep socket: %s", err)
}
sock.AddTransport(ipc.NewTransport())
sock.AddTransport(tcp.NewTransport())
if err = sock.Listen(url); err != nil {
die("can't listen on rep socket: %s", err.Error())
}
Expand Down
9 changes: 3 additions & 6 deletions examples/reqrep/reqrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ import (
"nanomsg.org/go/mangos/v2"
"nanomsg.org/go/mangos/v2/protocol/rep"
"nanomsg.org/go/mangos/v2/protocol/req"
"nanomsg.org/go/mangos/v2/transport/ipc"
"nanomsg.org/go/mangos/v2/transport/tcp"

// register transports
_ "nanomsg.org/go/mangos/v2/transport/all"
)

func die(format string, v ...interface{}) {
Expand All @@ -53,8 +54,6 @@ func node0(url string) {
if sock, err = rep.NewSocket(); err != nil {
die("can't get new rep socket: %s", err)
}
sock.AddTransport(ipc.NewTransport())
sock.AddTransport(tcp.NewTransport())
if err = sock.Listen(url); err != nil {
die("can't listen on rep socket: %s", err.Error())
}
Expand All @@ -81,8 +80,6 @@ func node1(url string) {
if sock, err = req.NewSocket(); err != nil {
die("can't get new req socket: %s", err.Error())
}
sock.AddTransport(ipc.NewTransport())
sock.AddTransport(tcp.NewTransport())
if err = sock.Dial(url); err != nil {
die("can't dial on req socket: %s", err.Error())
}
Expand Down
9 changes: 3 additions & 6 deletions examples/survey/survey.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ import (
"nanomsg.org/go/mangos/v2"
"nanomsg.org/go/mangos/v2/protocol/respondent"
"nanomsg.org/go/mangos/v2/protocol/surveyor"
"nanomsg.org/go/mangos/v2/transport/ipc"
"nanomsg.org/go/mangos/v2/transport/tcp"

// register transports
_ "nanomsg.org/go/mangos/v2/transport/all"
)

func die(format string, v ...interface{}) {
Expand All @@ -56,8 +57,6 @@ func server(url string) {
if sock, err = surveyor.NewSocket(); err != nil {
die("can't get new surveyor socket: %s", err)
}
sock.AddTransport(ipc.NewTransport())
sock.AddTransport(tcp.NewTransport())
if err = sock.Listen(url); err != nil {
die("can't listen on surveyor socket: %s", err.Error())
}
Expand Down Expand Up @@ -90,8 +89,6 @@ func client(url string, name string) {
if sock, err = respondent.NewSocket(); err != nil {
die("can't get new respondent socket: %s", err.Error())
}
sock.AddTransport(ipc.NewTransport())
sock.AddTransport(tcp.NewTransport())
if err = sock.Dial(url); err != nil {
die("can't dial on respondent socket: %s", err.Error())
}
Expand Down
5 changes: 3 additions & 2 deletions examples/websocket/reqclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import (
"time"

"nanomsg.org/go/mangos/v2/protocol/req"
"nanomsg.org/go/mangos/v2/transport/ws"

// register ws transport
_ "nanomsg.org/go/mangos/v2/transport/ws"
)

// reqClient implements the client for REQ.
Expand All @@ -28,7 +30,6 @@ func reqClient(port int) {
if e != nil {
die("cannot make req socket: %v", e)
}
sock.AddTransport(ws.NewTransport())
url := fmt.Sprintf("ws://127.0.0.1:%d/req", port)
if e = sock.Dial(url); e != nil {
die("cannot dial req url: %v", e)
Expand Down
4 changes: 2 additions & 2 deletions examples/websocket/reqhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (

"nanomsg.org/go/mangos/v2"
"nanomsg.org/go/mangos/v2/protocol/rep"

// register ws transport
"nanomsg.org/go/mangos/v2/transport/ws"
)

Expand All @@ -46,8 +48,6 @@ func reqHandler(sock mangos.Socket) {
func addReqHandler(mux *http.ServeMux, port int) {
sock, _ := rep.NewSocket()

sock.AddTransport(ws.NewTransport())

url := fmt.Sprintf("ws://127.0.0.1:%d/req", port)

if l, e := sock.NewListener(url, nil); e != nil {
Expand Down
5 changes: 3 additions & 2 deletions examples/websocket/subclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import (

"nanomsg.org/go/mangos/v2"
"nanomsg.org/go/mangos/v2/protocol/sub"
"nanomsg.org/go/mangos/v2/transport/ws"

// register ws transport
_ "nanomsg.org/go/mangos/v2/transport/ws"
)

// subClient implements the client for SUB.
Expand All @@ -28,7 +30,6 @@ func subClient(port int) {
if err != nil {
die("cannot make req socket: %v", err)
}
sock.AddTransport(ws.NewTransport())
if err = sock.SetOption(mangos.OptionSubscribe, []byte{}); err != nil {
die("cannot set subscription: %v", err)
}
Expand Down
4 changes: 2 additions & 2 deletions examples/websocket/subhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (

"nanomsg.org/go/mangos/v2"
"nanomsg.org/go/mangos/v2/protocol/pub"

// register ws transport
"nanomsg.org/go/mangos/v2/transport/ws"
)

Expand All @@ -42,8 +44,6 @@ func subHandler(sock mangos.Socket) {
func addSubHandler(mux *http.ServeMux, port int) {
sock, _ := pub.NewSocket()

sock.AddTransport(ws.NewTransport())

url := fmt.Sprintf("ws://127.0.0.1:%d/sub", port)

if l, e := sock.NewListener(url, nil); e != nil {
Expand Down
22 changes: 2 additions & 20 deletions internal/core/socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,7 @@ type socket struct {
listeners []*listener
dialers []*dialer
pipes map[*pipe]struct{}

translk sync.RWMutex
trans map[string]transport.Transport

pipehook mangos.PipeEventHook
pipehook mangos.PipeEventHook
}

type context struct {
Expand Down Expand Up @@ -113,7 +109,6 @@ func newSocket(proto mangos.ProtocolBase) *socket {
reconnMinTime: defaultReconnMinTime,
reconnMaxTime: defaultReconnMaxTime,
maxRxSize: defaultMaxRxSize,
trans: make(map[string]transport.Transport),
pipes: make(map[*pipe]struct{}),
}
return s
Expand Down Expand Up @@ -220,20 +215,7 @@ func (s *socket) getTransport(addr string) transport.Transport {
}
scheme := addr[:i]

s.translk.RLock()
t, ok := s.trans[scheme]
s.translk.RUnlock()

if t != nil && ok {
return t
}
return nil
}

func (s *socket) AddTransport(t transport.Transport) {
s.translk.Lock()
s.trans[t.Scheme()] = t
s.translk.Unlock()
return transport.GetTransport(scheme)
}

func (s *socket) DialOptions(addr string, opts map[string]interface{}) error {
Expand Down
4 changes: 0 additions & 4 deletions socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,6 @@ type Socket interface {
// support separate contexts, this will return an error.
OpenContext() (Context, error)

// AddTransport adds a new Transport to the socket. Transport specific
// options may have been configured on the Transport prior to this.
AddTransport(Transport)

// SetPipeEventHook sets a PipeEventHook function to be called when a
// Pipe is added or removed from this socket (connect/disconnect).
// The previous hook is returned (nil if none.) (Only one hook can
Expand Down
7 changes: 3 additions & 4 deletions test/besteffort_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ import (

"nanomsg.org/go/mangos/v2"
"nanomsg.org/go/mangos/v2/protocol/pair"
"nanomsg.org/go/mangos/v2/transport/tcp"
_ "nanomsg.org/go/mangos/v2/transport/tcp"

. "github.com/smartystreets/goconvey/convey"
)

func testBestEffort(addr string, tran mangos.Transport) {
func testBestEffort(addr string) {
timeout := time.Millisecond * 10
msg := []byte{'A', 'B', 'C'}

Expand All @@ -35,7 +35,6 @@ func testBestEffort(addr string, tran mangos.Transport) {
So(rp, ShouldNotBeNil)

defer rp.Close()
rp.AddTransport(tran)

err = rp.SetOption(mangos.OptionWriteQLen, 0)
So(err, ShouldBeNil)
Expand Down Expand Up @@ -70,6 +69,6 @@ func testBestEffort(addr string, tran mangos.Transport) {

func TestBestEffortTCP(t *testing.T) {
Convey("Testing TCP Best Effort", t, func() {
testBestEffort(AddrTestTCP(), tcp.NewTransport())
testBestEffort(AddrTestTCP())
})
}
Loading

0 comments on commit edd3a10

Please sign in to comment.