diff --git a/router.go b/router.go index bd05a0c..e7101b5 100644 --- a/router.go +++ b/router.go @@ -171,12 +171,12 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order } go func() { // Main go routine handling inbound messages + var handlers []MessageHandler for message := range messages { // DEBUG.Println(ROU, "matchAndDispatch received message") sent := false r.RLock() m := messageFromPublish(message, ackFunc(ackInChan, client.persist, message)) - var handlers []MessageHandler for e := r.routes.Front(); e != nil; e = e.Next() { if e.Value.(*route).match(message.TopicName) { if order { @@ -214,11 +214,14 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order } } r.RUnlock() - for _, handler := range handlers { - handler(client, m) - if !client.options.AutoAckDisabled { - m.Ack() + if order { + for _, handler := range handlers { + handler(client, m) + if !client.options.AutoAckDisabled { + m.Ack() + } } + handlers = handlers[:0] } // DEBUG.Println(ROU, "matchAndDispatch handled message") } diff --git a/unit_router_test.go b/unit_router_test.go index daef761..6b4ecf4 100644 --- a/unit_router_test.go +++ b/unit_router_test.go @@ -19,6 +19,7 @@ package mqtt import ( + "sync" "testing" "time" @@ -354,3 +355,47 @@ func Test_SharedSubscription_MatchAndDispatch(t *testing.T) { } } + +func Benchmark_MatchAndDispatch(b *testing.B) { + calledback := make(chan bool, 1) + + cb := func(c Client, m Message) { + calledback <- true + } + + pub := packets.NewControlPacket(packets.Publish).(*packets.PublishPacket) + pub.TopicName = "a" + pub.Payload = []byte("foo") + + msgs := make(chan *packets.PublishPacket, 1) + + router := newRouter() + router.addRoute("a", cb) + + var wg sync.WaitGroup + wg.Add(1) + + stopped := make(chan bool) + go func() { + wg.Done() // started + <-router.matchAndDispatch(msgs, true, &client{oboundP: make(chan *PacketAndToken, 100)}) + stopped <- true + }() + + wg.Wait() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + msgs <- pub + <-calledback + } + + close(msgs) + + select { + case <-stopped: + break + case <-time.After(time.Second): + b.Errorf("matchAndDispatch should have exited") + } +}