Skip to content

Commit

Permalink
fixes #175 Use reference counting on messages to reduce copying
Browse files Browse the repository at this point in the history
  • Loading branch information
gdamore committed Dec 21, 2019
1 parent 4d04312 commit fc73dac
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 51 deletions.
54 changes: 44 additions & 10 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package mangos

import (
"sync"
"sync/atomic"
)

// Message encapsulates the messages that we exchange back and forth. The
Expand All @@ -39,9 +40,10 @@ type Message struct {
// informational purposes.
Pipe Pipe

bbuf []byte
hbuf []byte
bsize int
bbuf []byte
hbuf []byte
bsize int
refcnt int32
}

type msgCacheInfo struct {
Expand Down Expand Up @@ -108,18 +110,49 @@ var messageCache = []msgCacheInfo{
// rather substantial benefits for performance.
func (m *Message) Free() {
if m != nil {
for i := range messageCache {
if m.bsize == messageCache[i].maxbody {
messageCache[i].pool.Put(m)
return
if atomic.AddInt32(&m.refcnt, -1) == 0 {
for i := range messageCache {
if m.bsize == messageCache[i].maxbody {
messageCache[i].pool.Put(m)
return
}
}
}
}
}

// Dup creates a "duplicate" message.
// Reference counting was found to be error prone, so we have elected
// to simply make a full copy of the message for now.
// Clone bumps the reference count on the message, allowing it to be
// shared. Callers of this MUST ensure that the message is never modified.
// If a read-only copy needs to be made "unique", callers can do so by
// using the Uniq function.
func (m *Message) Clone() {
atomic.AddInt32(&m.refcnt, 1)
}

// MakeUnique ensures that the message is not shared. If the reference
// count on the message is one, then the message is returned as is.
// Otherwise a new copy of hte message is made, and the reference count
// on the original is dropped. Note that it is an error for the caller
// to use the original message after this function; the caller should
// always do `m = m.MakeUnique()`. This function should be called whenever
// the message is leaving the control of the caller, such as when passing
// it to a user program.
//
// Note that transports always should call this on their transmit path
// if they are going to modify the message. (Most do not.)
func (m *Message) MakeUnique() *Message {
if atomic.LoadInt32(&m.refcnt) == 1 {
return m
}
d := m.Dup()
m.Free()
return d
}

//

// Dup creates a "duplicate" message. The message is made as a
// deep copy, so the resulting message is safe to modify.
func (m *Message) Dup() *Message {
dup := NewMessage(len(m.Body))
dup.Body = append(dup.Body, m.Body...)
Expand All @@ -144,5 +177,6 @@ func NewMessage(sz int) *Message {

m.Body = m.bbuf
m.Header = m.hbuf
atomic.StoreInt32(&m.refcnt, 1)
return m
}
7 changes: 4 additions & 3 deletions protocol/sub/sub.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func (c *context) RecvMsg() (*protocol.Message, error) {
s.Unlock()
continue
case m := <-recvQ:
m = m.MakeUnique()
return m, nil
}
}
Expand Down Expand Up @@ -130,9 +131,9 @@ func (p *pipe) receiver() {
// As we are passing this to the user,
// we need to ensure that the message
// may be modified.
dm := m.Dup()
m.Clone()
select {
case c.recvQ <- dm:
case c.recvQ <- m:
default:
select {
case m2 := <-c.recvQ:
Expand All @@ -147,7 +148,7 @@ func (p *pipe) receiver() {
// NB: If we ever do work to break
// up the locking, we will need to
// revisit this.
c.recvQ <- dm
c.recvQ <- m
}
}
}
Expand Down
26 changes: 21 additions & 5 deletions protocol/sub/sub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,8 @@ func TestSubMultiContexts(t *testing.T) {

MustSucceed(t, c1.SetOption(mangos.OptionSubscribe, "1"))
MustSucceed(t, c2.SetOption(mangos.OptionSubscribe, "2"))
MustSucceed(t, c1.SetOption(mangos.OptionSubscribe, "*"))
MustSucceed(t, c2.SetOption(mangos.OptionSubscribe, "*"))

p, e := pub.NewSocket()
MustSucceed(t, e)
Expand All @@ -289,15 +291,23 @@ func TestSubMultiContexts(t *testing.T) {

sent := []int{0, 0}
recv := []int{0, 0}
wildrecv := []int{0, 0}
wildsent := 0
mesg := []string{"1", "2"}
var wg sync.WaitGroup
wg.Add(2)
fn := func(c mangos.Context, index int) {
for {
m, e := c.RecvMsg()
if e == nil {
MustBeTrue(t, string(m.Body) == mesg[index])
recv[index]++
switch string(m.Body) {
case mesg[index]:
recv[index]++
case "*":
wildrecv[index]++
default:
MustBeTrue(t, false)
}
continue
}
MustBeError(t, e, mangos.ErrClosed)
Expand All @@ -316,8 +326,13 @@ func TestSubMultiContexts(t *testing.T) {
// fixed seed above, it works out to 41 & 60.
for i := 0; i < 101; i++ {
index := int(rng.Int63() & 1)
MustSucceed(t, p.Send([]byte(mesg[index])))
sent[index]++
if rng.Int63()&128 < 8 {
MustSucceed(t, p.Send([]byte{'*'}))
wildsent++
} else {
MustSucceed(t, p.Send([]byte(mesg[index])))
sent[index]++
}
}

// Give time for everything to be delivered.
Expand All @@ -326,9 +341,10 @@ func TestSubMultiContexts(t *testing.T) {
MustSucceed(t, c2.Close())
wg.Wait()

MustBeTrue(t, sent[0] != sent[1])
MustBeTrue(t, sent[0] == recv[0])
MustBeTrue(t, sent[1] == recv[1])
MustBeTrue(t, wildsent == wildrecv[0])
MustBeTrue(t, wildsent == wildrecv[1])

MustSucceed(t, s.Close())
MustSucceed(t, p.Close())
Expand Down
23 changes: 5 additions & 18 deletions protocol/surveyor/surveyor.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func (c *context) SendMsg(m *protocol.Message) error {
sock: s,
}

m.MakeUnique()
m.Header = make([]byte, 4)
binary.BigEndian.PutUint32(m.Header, newsurv.id)

Expand All @@ -144,30 +145,16 @@ func (c *context) SendMsg(m *protocol.Message) error {
}
s.Unlock()

var last *pipe
reused := false
if len(pipes) > 0 {
last = pipes[len(pipes)-1]
}

// Best-effort broadcast on all pipes
for _, p := range pipes {
var dm *protocol.Message
if p == last {
dm = m
reused = true
} else {
dm = m.Dup()
}
m.Clone()
select {
case p.sendQ <- dm:
case p.sendQ <- m:
default:
dm.Free()
m.Free()
}
}
if !reused {
m.Free()
}
m.Free()
return nil
}

Expand Down
8 changes: 4 additions & 4 deletions protocol/xbus/xbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func (s *socket) SendMsg(m *protocol.Message) error {
var id uint32

if len(m.Header) == 4 {
m = m.MakeUnique()
// This is coming back to us - its a forwarded message
// from an earlier pipe. Note that we could also have
// used the m.Pipe but this is how mangos v1 and nanomsg
Expand All @@ -76,20 +77,19 @@ func (s *socket) SendMsg(m *protocol.Message) error {
m.Header = m.Header[:0]
}

// This could benefit from optimization to avoid useless duplicates.
for _, p := range s.pipes {

// Don't deliver the message back up to the same pipe it
// arrived from.
if p.p.ID() == id {
continue
}
pm := m.Dup()
m.Clone()
select {
case p.sendQ <- pm:
case p.sendQ <- m:
default:
// back-pressure, but we do not exert
pm.Free()
m.Free()
}
}
s.Unlock()
Expand Down
6 changes: 3 additions & 3 deletions protocol/xpub/xpub.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,12 @@ func (s *socket) SendMsg(m *protocol.Message) error {
}
// This could benefit from optimization to avoid useless duplicates.
for _, p := range s.pipes {
pm := m.Dup()
m.Clone()
select {
case p.sendq <- pm:
case p.sendq <- m:
default:
// back-pressure, but we do not exert
pm.Free()
m.Free()
}
}
s.Unlock()
Expand Down
9 changes: 4 additions & 5 deletions protocol/xstar/xstar.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,14 @@ func (s *socket) SendMsg(m *protocol.Message) error {
return nil
}

// This could benefit from optimization to avoid useless duplicates.
for _, p := range s.pipes {

pm := m.Dup()
m.Clone()
select {
case p.sendq <- pm:
case p.sendq <- m:
default:
// backpressure, but we do not exert
pm.Free()
// back-pressure, but we do not exert
m.Free()
}
}
s.Unlock()
Expand Down
6 changes: 3 additions & 3 deletions protocol/xsurveyor/xsurveyor.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,11 @@ func (s *socket) SendMsg(m *protocol.Message) error {

// This could benefit from optimization to avoid useless duplicates.
for _, p := range s.pipes {
pm := m.Dup()
m.Clone()
select {
case p.sendQ <- pm:
case p.sendQ <- m:
default:
pm.Free()
m.Free()
}
}
s.Unlock()
Expand Down

0 comments on commit fc73dac

Please sign in to comment.