@@ -35,6 +35,7 @@ def __init__(
35
35
minimum_sink_cost ,
36
36
max_buffered_packets ,
37
37
max_delay_without_publish ,
38
+ stop_stack = False
38
39
):
39
40
"""
40
41
Thread monitoring the connection with the MQTT broker.
@@ -53,6 +54,7 @@ def __init__(
53
54
rising the sink costs
54
55
max_delay_without_publish: the maximum delay without any successful publish (with
55
56
something in the queue before rising the sink costs
57
+ stop_stack: stop the stack instead of increasing the sink cost in case of black hole
56
58
"""
57
59
Thread .__init__ (self )
58
60
@@ -71,11 +73,20 @@ def __init__(
71
73
self .minimum_sink_cost = minimum_sink_cost
72
74
self .max_buffered_packets = max_buffered_packets
73
75
self .max_delay_without_publish = max_delay_without_publish
76
+ self .stop_stack = stop_stack
74
77
75
78
def _set_sinks_cost (self , cost ):
76
79
for sink in self .sink_manager .get_sinks ():
77
80
sink .cost = cost
78
81
82
+ def _stop_sinks (self ):
83
+ for sink in self .sink_manager .get_sinks ():
84
+ sink .write_config ({"started" : False })
85
+
86
+ def _start_sinks (self ):
87
+ for sink in self .sink_manager .get_sinks ():
88
+ sink .write_config ({"started" : True })
89
+
79
90
def _set_sinks_cost_high (self ):
80
91
self ._set_sinks_cost (self .SINK_COST_HIGH )
81
92
@@ -113,22 +124,34 @@ def run(self):
113
124
if not self .disconnected :
114
125
# Check if a condition to declare "back hole" is met
115
126
if self ._is_publish_delay_over () or self ._is_buffer_threshold_reached ():
116
- logging .info ("Increasing sink cost of all sinks" )
117
- logging .debug (
127
+ if self .stop_stack :
128
+ logging .info ("Black hole detected, stop all stacks" )
129
+ self ._stop_sinks ()
130
+ else :
131
+ logging .info ("Increasing sink cost of all sinks" )
132
+ self ._set_sinks_cost_high ()
133
+
134
+ logging .info (
118
135
"Last publish: %s Queue Size %s" ,
119
136
self .mqtt_wrapper .publish_waiting_time_s ,
120
137
self .mqtt_wrapper .publish_queue_size ,
121
138
)
122
139
123
- self ._set_sinks_cost_high ()
124
140
self .disconnected = True
125
141
else :
126
142
if self .mqtt_wrapper .publish_queue_size == 0 :
127
143
# Network is back, put the connection back
128
144
logging .info (
129
- "Connection is back, decreasing sink cost of all sinks "
145
+ "Connection is back, black hole is finished "
130
146
)
131
- self ._set_sinks_cost_low ()
147
+
148
+ if self .stop_stack :
149
+ logging .info ("Restart all sinks" )
150
+ self ._start_sinks ()
151
+ else :
152
+ logging .info ("Decreasing sink cost" )
153
+ self ._set_sinks_cost_low ()
154
+
132
155
self .disconnected = False
133
156
134
157
# Wait for period
@@ -146,14 +169,16 @@ def initialize_sink(self, name):
146
169
Args:
147
170
name: name of sink to initialize
148
171
"""
149
- sink = self .sink_manager .get_sink (name )
172
+ # It is only required if black hole is managed by sink cost
173
+ if not self .stop_stack :
174
+ sink = self .sink_manager .get_sink (name )
150
175
151
- logging .info ("Initialize sinkCost of sink %s" , name )
152
- if sink is not None :
153
- if self .disconnected :
154
- sink .cost = self .SINK_COST_HIGH
155
- else :
156
- sink .cost = self .minimum_sink_cost
176
+ logging .info ("Initialize sinkCost of sink %s" , name )
177
+ if sink is not None :
178
+ if self .disconnected :
179
+ sink .cost = self .SINK_COST_HIGH
180
+ else :
181
+ sink .cost = self .minimum_sink_cost
157
182
158
183
159
184
class TransportService (BusClient ):
@@ -209,9 +234,10 @@ def __init__(self, settings, **kwargs):
209
234
210
235
if settings .buffering_max_buffered_packets > 0 or settings .buffering_max_delay_without_publish > 0 :
211
236
logging .info (
212
- " Black hole detection enabled: max_packets=%s packets, max_delay=%s" ,
237
+ " Black hole detection enabled: max_packets=%s packets, max_delay=%s, stop_stack=%s " ,
213
238
settings .buffering_max_buffered_packets ,
214
239
settings .buffering_max_delay_without_publish ,
240
+ settings .buffering_stop_stack
215
241
)
216
242
# Create and start a monitoring thread for black hole issue
217
243
self .monitoring_thread = ConnectionToBackendMonitorThread (
@@ -221,6 +247,7 @@ def __init__(self, settings, **kwargs):
221
247
settings .buffering_minimal_sink_cost ,
222
248
settings .buffering_max_buffered_packets ,
223
249
settings .buffering_max_delay_without_publish ,
250
+ settings .buffering_stop_stack
224
251
)
225
252
self .monitoring_thread .start ()
226
253
0 commit comments