From ef126eaca8740b95feba57b47818f280438013c0 Mon Sep 17 00:00:00 2001 From: Gwendal Raoul Date: Mon, 17 Jun 2024 16:32:36 +0200 Subject: [PATCH] Black hole: add an option to stop stack in case of black hole Until now (and still the default) sink cost set to the 254 is the way to prevent the black hole case. But with this option, it allows to stop the stack instead. It allows testing on large scale networks. Also add the cost in configure node for easy diagnostic. --- .../wirepas_gateway/configure_node.py | 1 + .../wirepas_gateway/transport_service.py | 53 ++++++++++++++----- .../wirepas_gateway/utils/argument_tools.py | 10 ++++ 3 files changed, 51 insertions(+), 13 deletions(-) diff --git a/python_transport/wirepas_gateway/configure_node.py b/python_transport/wirepas_gateway/configure_node.py index 8ad32d17..2c73c420 100755 --- a/python_transport/wirepas_gateway/configure_node.py +++ b/python_transport/wirepas_gateway/configure_node.py @@ -183,6 +183,7 @@ def list_sinks(self): print("[%s]: %s" % (key, binascii.hexlify(config[key]))) else: print("[%s]: %s" % (key, config[key])) + print("[cost]: %d" % (sink.cost)) print("===================================") diff --git a/python_transport/wirepas_gateway/transport_service.py b/python_transport/wirepas_gateway/transport_service.py index 1f481b8c..05b7ed39 100644 --- a/python_transport/wirepas_gateway/transport_service.py +++ b/python_transport/wirepas_gateway/transport_service.py @@ -35,6 +35,7 @@ def __init__( minimum_sink_cost, max_buffered_packets, max_delay_without_publish, + stop_stack = False ): """ Thread monitoring the connection with the MQTT broker. @@ -53,6 +54,7 @@ def __init__( rising the sink costs max_delay_without_publish: the maximum delay without any successful publish (with something in the queue before rising the sink costs + stop_stack: stop the stack instead of increasing the sink cost in case of black hole """ Thread.__init__(self) @@ -71,11 +73,20 @@ def __init__( self.minimum_sink_cost = minimum_sink_cost self.max_buffered_packets = max_buffered_packets self.max_delay_without_publish = max_delay_without_publish + self.stop_stack = stop_stack def _set_sinks_cost(self, cost): for sink in self.sink_manager.get_sinks(): sink.cost = cost + def _stop_sinks(self): + for sink in self.sink_manager.get_sinks(): + sink.write_config({"started": False}) + + def _start_sinks(self): + for sink in self.sink_manager.get_sinks(): + sink.write_config({"started": True}) + def _set_sinks_cost_high(self): self._set_sinks_cost(self.SINK_COST_HIGH) @@ -113,22 +124,34 @@ def run(self): if not self.disconnected: # Check if a condition to declare "back hole" is met if self._is_publish_delay_over() or self._is_buffer_threshold_reached(): - logging.info("Increasing sink cost of all sinks") - logging.debug( + if self.stop_stack: + logging.info("Black hole detected, stop all stacks") + self._stop_sinks() + else: + logging.info("Increasing sink cost of all sinks") + self._set_sinks_cost_high() + + logging.info( "Last publish: %s Queue Size %s", self.mqtt_wrapper.publish_waiting_time_s, self.mqtt_wrapper.publish_queue_size, ) - self._set_sinks_cost_high() self.disconnected = True else: if self.mqtt_wrapper.publish_queue_size == 0: # Network is back, put the connection back logging.info( - "Connection is back, decreasing sink cost of all sinks" + "Connection is back, black hole is finished" ) - self._set_sinks_cost_low() + + if self.stop_stack: + logging.info("Restart all sinks") + self._start_sinks() + else: + logging.info("Decreasing sink cost") + self._set_sinks_cost_low() + self.disconnected = False # Wait for period @@ -146,14 +169,16 @@ def initialize_sink(self, name): Args: name: name of sink to initialize """ - sink = self.sink_manager.get_sink(name) + # It is only required if black hole is managed by sink cost + if not self.stop_stack: + sink = self.sink_manager.get_sink(name) - logging.info("Initialize sinkCost of sink %s", name) - if sink is not None: - if self.disconnected: - sink.cost = self.SINK_COST_HIGH - else: - sink.cost = self.minimum_sink_cost + logging.info("Initialize sinkCost of sink %s", name) + if sink is not None: + if self.disconnected: + sink.cost = self.SINK_COST_HIGH + else: + sink.cost = self.minimum_sink_cost class TransportService(BusClient): @@ -209,9 +234,10 @@ def __init__(self, settings, **kwargs): if settings.buffering_max_buffered_packets > 0 or settings.buffering_max_delay_without_publish > 0: logging.info( - " Black hole detection enabled: max_packets=%s packets, max_delay=%s", + " Black hole detection enabled: max_packets=%s packets, max_delay=%s, stop_stack=%s", settings.buffering_max_buffered_packets, settings.buffering_max_delay_without_publish, + settings.buffering_stop_stack ) # Create and start a monitoring thread for black hole issue self.monitoring_thread = ConnectionToBackendMonitorThread( @@ -221,6 +247,7 @@ def __init__(self, settings, **kwargs): settings.buffering_minimal_sink_cost, settings.buffering_max_buffered_packets, settings.buffering_max_delay_without_publish, + settings.buffering_stop_stack ) self.monitoring_thread.start() diff --git a/python_transport/wirepas_gateway/utils/argument_tools.py b/python_transport/wirepas_gateway/utils/argument_tools.py index 2f45b1ec..a4150f78 100644 --- a/python_transport/wirepas_gateway/utils/argument_tools.py +++ b/python_transport/wirepas_gateway/utils/argument_tools.py @@ -388,6 +388,16 @@ def add_buffering_settings(self): ), ) + self.buffering.add_argument( + "--buffering_stop_stack", + default=os.environ.get("WM_GW_BUFFERING_STOP_STACK", False), + type=self.str2bool, + help=( + "When true, when a black hole is detected, stack is stopped instead of " + " increasing the sink cost" + ), + ) + # This minimal sink cost could be moved somewhere as it can be used even # buffering limitation is not in use self.buffering.add_argument(