Skip to content

Commit

Permalink
fixes #51 Changing queue depths should just discard queue contents
Browse files Browse the repository at this point in the history
fixes #50 Go Report Card has numerous minor issues
  • Loading branch information
gdamore committed Oct 30, 2018
1 parent 106d823 commit 7e4c6eb
Show file tree
Hide file tree
Showing 15 changed files with 68 additions and 163 deletions.
5 changes: 1 addition & 4 deletions internal/core/socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,10 +303,7 @@ func (s *socket) ListenOptions(addr string, options map[string]interface{}) erro
if err != nil {
return err
}
if err = l.Listen(); err != nil {
return err
}
return nil
return l.Listen()
}

func (s *socket) Listen(addr string) error {
Expand Down
2 changes: 0 additions & 2 deletions perf/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ func doLocalReqRepLatency(args []string) {
os.Exit(0)
}


func doRemoteLatency(args []string) {
if len(args) < 3 {
log.Fatalf("Usage: remote_lat <connect-to> <msg-size> <roundtrips>")
Expand All @@ -82,7 +81,6 @@ func doRemoteLatency(args []string) {
os.Exit(0)
}


func doLocalLatency(args []string) {
if len(args) < 3 {
log.Fatalf("Usage: local_lat <connect-to> <msg-size> <roundtrips>")
Expand Down
9 changes: 4 additions & 5 deletions perf/reqlatency.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,12 @@ import (
"time"

"nanomsg.org/go/mangos/v2"
"nanomsg.org/go/mangos/v2/protocol/req"
"nanomsg.org/go/mangos/v2/protocol/rep"
"nanomsg.org/go/mangos/v2/protocol/req"
"nanomsg.org/go/mangos/v2/transport/all"
)

// LatencyServer is the server side -- very much equivalent to local_lat in
// nanomsg/perf. It does no measurement at all, just sends packets on the wire.
// ReqRepLatencyServer is the server side for REQ/REP latency testing.
func ReqRepLatencyServer(addr string, msgSize int, roundTrips int) {
s, err := rep.NewSocket()
if err != nil {
Expand Down Expand Up @@ -64,8 +63,8 @@ func ReqRepLatencyServer(addr string, msgSize int, roundTrips int) {
}
}

// LatencyClient is the client side of the latency test. It measures round
// trip times, and is the equivalent to nanomsg/perf/remote_lat.
// ReqRepLatencyClient is the client side of the latency test. It measures
// round trip times using REQ/REP protocol.
func ReqRepLatencyClient(addr string, msgSize int, roundTrips int) {
s, err := req.NewSocket()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package mangos
// channel. There is one of these associated with a given TCP connection,
// for example. This interface is intended for application use.
//
// Note that applicatons cannot send or receive data on a Pipe directly.
// Note that applications cannot send or receive data on a Pipe directly.
type Pipe interface {

// ID returns the numeric ID for this Pipe. This will be a
Expand Down
77 changes: 43 additions & 34 deletions protocol/sub/sub.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,47 @@ func (c *context) matches(m *protocol.Message) bool {

}

func (c *context) subscribe(topic []byte) error {
for _, sub := range c.subs {
if bytes.Equal(sub, topic) {
// Already present
return nil
}
}
c.subs = append(c.subs, topic)
return nil
}

func (c *context) unsubscribe(topic []byte) error {
for i, sub := range c.subs {
if !bytes.Equal(sub, topic) {
continue
}
c.subs = append(c.subs[:i], c.subs[i+1:]...)

// Because we have changed the subscription,
// we may have messages in the channel that
// we don't want any more. Lets prune those.
newchan := make(chan *protocol.Message, c.recvQLen)
oldchan := c.recvq
c.recvq = newchan
for m := range oldchan {
if !c.matches(m) {
m.Free()
continue
}
select {
case newchan <- m:
default:
m.Free()
}
}
return nil
}
// Subscription not present
return protocol.ErrBadValue
}

func (c *context) SetOption(name string, value interface{}) error {
s := c.s

Expand Down Expand Up @@ -246,42 +287,10 @@ func (c *context) SetOption(name string, value interface{}) error {

switch name {
case protocol.OptionSubscribe:
for _, sub := range c.subs {
if bytes.Equal(sub, vb) {
// Already present
return nil
}
}
c.subs = append(c.subs, vb)
return nil
return c.subscribe(vb)

case protocol.OptionUnsubscribe:
for i, sub := range c.subs {
if bytes.Equal(sub, vb) {
c.subs = append(c.subs[:i], c.subs[i+1:]...)

// Because we have changed the subscription,
// we may have messages in the channel that
// we don't want any more. Lets prune those.
newchan := make(chan *protocol.Message, c.recvQLen)
oldchan := c.recvq
c.recvq = newchan
for m := range oldchan {
if !c.matches(m) {
m.Free()
continue
}
select {
case newchan <- m:
default:
m.Free()
}
}
return nil
}
}
// Subscription not present
return protocol.ErrBadValue
return c.unsubscribe(vb)

default:
return protocol.ErrBadOption
Expand Down
17 changes: 0 additions & 17 deletions protocol/xrep/xrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,25 +276,8 @@ func (s *socket) SetOption(name string, value interface{}) error {
newchan := make(chan *protocol.Message, v)
s.Lock()
s.recvQLen = v
oldchan := s.recvq
s.recvq = newchan
s.Unlock()

for {
var m *protocol.Message
select {
case m = <-oldchan:
default:
}
if m == nil {
break
}
select {
case newchan <- m:
default:
m.Free()
}
}
}
// We don't support these
// case OptionLinger:
Expand Down
34 changes: 0 additions & 34 deletions protocol/xreq/xreq.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,25 +213,8 @@ func (s *socket) SetOption(name string, value interface{}) error {
newchan := make(chan *protocol.Message, v)
s.Lock()
s.sendQLen = v
oldchan := s.sendq
s.sendq = newchan
s.Unlock()

for {
var m *protocol.Message
select {
case m = <-oldchan:
default:
}
if m == nil {
break
}
select {
case newchan <- m:
default:
m.Free()
}
}
}
return protocol.ErrBadValue

Expand All @@ -240,25 +223,8 @@ func (s *socket) SetOption(name string, value interface{}) error {
newchan := make(chan *protocol.Message, v)
s.Lock()
s.recvQLen = v
oldchan := s.recvq
s.recvq = newchan
s.Unlock()

for {
var m *protocol.Message
select {
case m = <-oldchan:
default:
}
if m == nil {
break
}
select {
case newchan <- m:
default:
m.Free()
}
}
}
// We don't support these
// case OptionLinger:
Expand Down
17 changes: 0 additions & 17 deletions protocol/xrespondent/xrespondent.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,25 +276,8 @@ func (s *socket) SetOption(name string, value interface{}) error {
newchan := make(chan *protocol.Message, v)
s.Lock()
s.recvQLen = v
oldchan := s.recvq
s.recvq = newchan
s.Unlock()

for {
var m *protocol.Message
select {
case m = <-oldchan:
default:
}
if m == nil {
break
}
select {
case newchan <- m:
default:
m.Free()
}
}
}
// We don't support these
// case OptionLinger:
Expand Down
35 changes: 6 additions & 29 deletions protocol/xstar/xstar.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,25 +157,9 @@ func (s *socket) SetOption(name string, value interface{}) error {
newchan := make(chan *protocol.Message, v)
s.Lock()
s.recvQLen = v
oldchan := s.recvq
s.recvq = newchan
s.Unlock()

for {
var m *protocol.Message
select {
case m = <-oldchan:
default:
}
if m == nil {
break
}
select {
case newchan <- m:
default:
m.Free()
}
}
return nil
}
return protocol.ErrBadValue
Expand Down Expand Up @@ -310,27 +294,20 @@ outer:
m.Free()
continue
}
m.Header = m.Body[:4]
m.Body = m.Body[4:]
if m.Header[0] != 0 || m.Header[1] != 0 || m.Header[2] != 0 {
// non-zero reserved fields are illegal
m.Free()
continue
}
if int(m.Header[3]) >= s.ttl { // TTL expired?
// XXX: bump a stat
if len(m.Body) < 4 ||
m.Body[0] != 0 || m.Body[1] != 0 || m.Body[2] != 0 ||
int(m.Body[3]) >= s.ttl {
m.Free()
continue
}
m.Header = m.Body[:4]
m.Body = m.Body[4:]
m.Header[3]++

userm := m.Dup()
s.Lock()
for _, p2 := range s.pipes {
if p2 == p {
continue
}
if p2.closed {
if p2 == p || p2.closed {
continue
}

Expand Down
8 changes: 4 additions & 4 deletions test/certs.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2015 The Mangos Authors
// Copyright 2018 The Mangos Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use file except in compliance with the License.
Expand Down Expand Up @@ -108,9 +108,9 @@ var rootTmpl = &x509.Certificate{
CommonName: "root.mangos.example.com",
Organization: []string{"Mangos Root Org"},
},
NotBefore: time.Unix(1000, 0),
NotAfter: time.Now().Add(time.Hour),
IsCA: true,
NotBefore: time.Unix(1000, 0),
NotAfter: time.Now().Add(time.Hour),
IsCA: true,
BasicConstraintsValid: true,
OCSPServer: []string{"ocsp.mangos.example.com"},
DNSNames: []string{"root.mangos.example.com"},
Expand Down
2 changes: 2 additions & 0 deletions test/reqretry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func TestReqRetry(t *testing.T) {
So(l2, ShouldNotBeNil)

err = l.Listen()
So(err, ShouldBeNil)
time.Sleep(time.Millisecond * 50)

m := mangos.NewMessage(0)
Expand All @@ -121,6 +122,7 @@ func TestReqRetry(t *testing.T) {

// Open the new one on the other socket
err = l2.Listen()
So(err, ShouldBeNil)
m, err = rep2.RecvMsg()
So(m, ShouldNotBeNil)
So(err, ShouldBeNil)
Expand Down
3 changes: 3 additions & 0 deletions test/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,9 @@ func (tt *TranTest) TestSendRecv(t *testing.T) {
// Client side
t.Logf("Connecting REQ on %s", tt.addr)
d, err := tt.tran.NewDialer(tt.addr, tt.sockReq)
if err != nil {
t.Errorf("Failed creating Dialer: %v", err)
}
if tt.cliCfg != nil {
if err = d.SetOption(mangos.OptionTLSConfig, tt.cliCfg); err != nil {
t.Errorf("Failed setting TLS config: %v", err)
Expand Down
4 changes: 0 additions & 4 deletions transport/ipc/ipc_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,6 @@ func (d *dialer) GetOption(n string) (interface{}, error) {
return d.opts.get(n)
}

func (d *dialer) Address() string {
return "ipc://" + d.addr.String()
}

type listener struct {
addr *net.UnixAddr
proto transport.ProtocolInfo
Expand Down
Loading

0 comments on commit 7e4c6eb

Please sign in to comment.