@@ -51,6 +51,7 @@ type Listener struct {
51
51
52
52
listenerTimeout time.Duration
53
53
cancel context.CancelFunc
54
+ closed chan struct {}
54
55
}
55
56
56
57
func NewListener (
@@ -99,6 +100,7 @@ func (cl *Listener) Start(context.Context) error {
99
100
100
101
ctx , cancel := context .WithCancel (context .Background ())
101
102
cl .cancel = cancel
103
+ cl .closed = make (chan struct {})
102
104
103
105
sub , err := cl .fetcher .SubscribeNewBlockEvent (ctx )
104
106
if err != nil {
@@ -116,13 +118,25 @@ func (cl *Listener) Stop(ctx context.Context) error {
116
118
}
117
119
118
120
cl .cancel ()
119
- cl .cancel = nil
120
- return cl .metrics .Close ()
121
+ select {
122
+ case <- cl .closed :
123
+ cl .cancel = nil
124
+ cl .closed = nil
125
+ case <- ctx .Done ():
126
+ return ctx .Err ()
127
+ }
128
+
129
+ err = cl .metrics .Close ()
130
+ if err != nil {
131
+ log .Warnw ("listener: closing metrics" , "err" , err )
132
+ }
133
+ return nil
121
134
}
122
135
123
136
// runSubscriber runs a subscriber to receive event data of new signed blocks. It will attempt to
124
137
// resubscribe in case error happens during listening of subscription
125
138
func (cl * Listener ) runSubscriber (ctx context.Context , sub <- chan types.EventDataSignedBlock ) {
139
+ defer close (cl .closed )
126
140
for {
127
141
err := cl .listen (ctx , sub )
128
142
if ctx .Err () != nil {
@@ -131,7 +145,7 @@ func (cl *Listener) runSubscriber(ctx context.Context, sub <-chan types.EventDat
131
145
}
132
146
if errors .Is (err , errInvalidSubscription ) {
133
147
// stop node if there is a critical issue with the block subscription
134
- log .Fatalf ("listener: %v" , err )
148
+ log .Fatalf ("listener: %v" , err ) //nolint:gocritic
135
149
}
136
150
137
151
log .Warnw ("listener: subscriber error, resubscribing..." , "err" , err )
0 commit comments