@@ -7,7 +7,6 @@ package mpx
7
7
8
8
import (
9
9
"errors"
10
- "fmt"
11
10
"github.com/RedisMPX/go-mpx/internal"
12
11
"github.com/RedisMPX/go-mpx/internal/list"
13
12
"github.com/gomodule/redigo/redis"
@@ -57,15 +56,17 @@ type request struct {
57
56
type Multiplexer struct {
58
57
// Options
59
58
pingTimeout time.Duration
60
- minBackOff time.Duration
61
- maxBackOff time.Duration
59
+ minBackoff time.Duration
60
+ maxBackoff time.Duration
62
61
pipeliningSlots []request
63
62
64
63
// state
65
64
createConn func () (redis.Conn , error )
66
65
pubsub redis.PubSubConn
67
66
channels map [string ]* list.List
68
67
patterns map [string ]* list.List
68
+ active_channels map [string ]struct {}
69
+ active_patterns map [string ]struct {}
69
70
reqCh chan request
70
71
stop chan struct {}
71
72
exit chan struct {}
@@ -100,20 +101,22 @@ func New(createConn func() (redis.Conn, error)) *Multiplexer {
100
101
func NewWithOpts (
101
102
createConn func () (redis.Conn , error ),
102
103
pingTimeout time.Duration ,
103
- minBackOff time.Duration ,
104
- maxBackOff time.Duration ,
104
+ minBackoff time.Duration ,
105
+ maxBackoff time.Duration ,
105
106
messagesBufSize uint ,
106
107
pipeliningBufSize uint ,
107
108
) * Multiplexer {
108
109
mpx := Multiplexer {
109
110
pingTimeout : pingTimeout ,
110
- minBackoff : minBackOff ,
111
+ minBackoff : minBackoff ,
111
112
maxBackoff : maxBackoff ,
112
113
pipeliningSlots : make ([]request , pipeliningBufSize ),
113
114
pubsub : redis.PubSubConn {Conn : nil },
114
115
createConn : createConn ,
115
116
channels : make (map [string ]* list.List ),
116
117
patterns : make (map [string ]* list.List ),
118
+ active_channels : make (map [string ]struct {}),
119
+ active_patterns : make (map [string ]struct {}),
117
120
reqCh : make (chan request , 100 ),
118
121
stop : make (chan struct {}),
119
122
exit : make (chan struct {}),
@@ -267,7 +270,7 @@ func (mpx *Multiplexer) openNewConnection() {
267
270
break
268
271
}
269
272
if errorCount > 0 {
270
- time .Sleep (internal .RetryBackoff (errorCount , mpx .minBackOff , mpx .maxBackOff ))
273
+ time .Sleep (internal .RetryBackoff (errorCount , mpx .minBackoff , mpx .maxBackoff ))
271
274
}
272
275
errorCount ++
273
276
}
@@ -301,6 +304,10 @@ func (mpx *Multiplexer) connectionGoroutine() {
301
304
return
302
305
}
303
306
}
307
+
308
+ // Reset the active channel / pattern sets.
309
+ mpx .active_channels = make (map [string ]struct {})
310
+ mpx .active_patterns = make (map [string ]struct {})
304
311
}
305
312
306
313
// Start an "emergency" goroutine that keeps processing
@@ -458,27 +465,21 @@ func (mpx *Multiplexer) processRequests(requests []request, networkIO bool) erro
458
465
// Subscribe in Redis
459
466
sub = append (sub , req .name )
460
467
}
461
- } else {
462
- // We were already subscribed to that channel
463
- if networkIO {
464
- // If we are not in offline-mode, we immediately send confirmation
465
- // that the subscription is active. If we are in the case where
466
- // we just lost connectivity and processRequestGoroutine has not yet
467
- // noticed, we will send a wrong notification, but we will also soon
468
- // send a onDisconnect notification, after this goroutine exits, thus
469
- // rectifying our wrong communication.
470
- if onActivation := req .node .Value .(* ChannelSubscription ).onActivation ; onActivation != nil {
471
- onActivation (req .name )
472
- }
468
+ }
469
+
470
+ // Trigger onActivation if the subscription is already active.
471
+ if _ , active := mpx .active_channels [req .name ]; active {
472
+ if onActivation := req .node .Value .(* ChannelSubscription ).onActivation ; onActivation != nil {
473
+ onActivation (req .name )
473
474
}
474
475
}
476
+
475
477
case subscriptionRemove :
476
478
if listeners := req .node .DetachFromList (); listeners != nil {
477
479
if listeners .Len () > 0 {
478
- fmt .Printf ("[ws] unsubbed but more remaining (%v)\n " , listeners .Len ())
479
480
} else {
480
- fmt .Printf ("[ws] unsubbed also from Redis\n " )
481
481
delete (mpx .channels , req .name )
482
+ delete (mpx .active_channels , req .name )
482
483
if networkIO {
483
484
unsub = append (unsub , req .name )
484
485
}
@@ -509,18 +510,12 @@ func (mpx *Multiplexer) processRequests(requests []request, networkIO bool) erro
509
510
// PSubscribe in Redis
510
511
psub = append (psub , req .name )
511
512
}
512
- } else {
513
- // We were already subscribed to that channel
514
- if networkIO {
515
- // If we are not in offline-mode, we immediately send confirmation
516
- // that the subscription is active. If we are in the case where
517
- // we just lost connectivity and processRequestGoroutine has not yet
518
- // noticed, we will send a wrong notification, but we will also soon
519
- // send a onDisconnect notification, after this goroutine exits, thus
520
- // rectifying our wrong communication.
521
- if onActivation := req .node .Value .(* PatternSubscription ).onActivation ; onActivation != nil {
522
- onActivation (req .name )
523
- }
513
+ }
514
+
515
+ // Trigger onActivation if the subscription is already active.
516
+ if _ , active := mpx .active_patterns [req .name ]; active {
517
+ if onActivation := req .node .Value .(* PatternSubscription ).onActivation ; onActivation != nil {
518
+ onActivation (req .name )
524
519
}
525
520
}
526
521
case patternClose :
@@ -531,10 +526,9 @@ func (mpx *Multiplexer) processRequests(requests []request, networkIO bool) erro
531
526
onMessageNode := req .node .Value .(* PatternSubscription ).onMessageNode
532
527
if listeners := onMessageNode .DetachFromList (); listeners != nil {
533
528
if listeners .Len () > 0 {
534
- fmt .Printf ("[ws] unsubbed but more remaining (%v)\n " , listeners .Len ())
535
529
} else {
536
- fmt .Printf ("[ws] unsubbed also from Redis\n " )
537
530
delete (mpx .patterns , req .name )
531
+ delete (mpx .active_patterns , req .name )
538
532
if networkIO {
539
533
punsub = append (punsub , req .name )
540
534
}
@@ -711,7 +705,6 @@ func (mpx *Multiplexer) messageReadingGoroutine(size int) {
711
705
// TODO: ask redigo do accept pings also from the pubsub
712
706
// interface.
713
707
if ! strings .HasSuffix (msg .Error (), "got type string" ) {
714
- fmt .Printf ("error in Pubsub: %v" , msg )
715
708
mpx .triggerReconnect (msg )
716
709
return
717
710
}
0 commit comments