From 0c068165cb0a6876d862369e9af41bf7d05b3e66 Mon Sep 17 00:00:00 2001 From: Gwendal Raoul Date: Thu, 8 Feb 2024 10:46:34 +0100 Subject: [PATCH 1/2] Sink Service: add stack stopped signal in the list It was used but not declared. --- sink_service/source/config.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sink_service/source/config.c b/sink_service/source/config.c index 78fb5c60..60241016 100644 --- a/sink_service/source/config.c +++ b/sink_service/source/config.c @@ -578,6 +578,8 @@ static const sd_bus_vtable config_vtable[] = { /* Event generated when stack starts */ SD_BUS_SIGNAL("StackStarted", "", 0), + /* Event generated when stack is stopped */ + SD_BUS_SIGNAL("StackStopped", "", 0), SD_BUS_VTABLE_END}; From a6e672307d74f83f38a97f27cff341fd15d94a03 Mon Sep 17 00:00:00 2001 From: Gwendal Raoul Date: Tue, 13 Feb 2024 16:54:46 +0100 Subject: [PATCH 2/2] Transport: keep a cache of each sink config It ensure that if we have difficulties to read it from sink (during scratchpad exchange), config is still valid (missing values are from the cache). --- .../wirepas_gateway/configure_node.py | 2 +- .../wirepas_gateway/dbus/sink_manager.py | 14 +++++- .../wirepas_gateway/dbus_print_client.py | 3 +- .../wirepas_gateway/transport_service.py | 49 ++++++++++++++++--- 4 files changed, 56 insertions(+), 12 deletions(-) diff --git a/python_transport/wirepas_gateway/configure_node.py b/python_transport/wirepas_gateway/configure_node.py index 8fb8e61e..8ad32d17 100755 --- a/python_transport/wirepas_gateway/configure_node.py +++ b/python_transport/wirepas_gateway/configure_node.py @@ -175,7 +175,7 @@ def list_sinks(self): print("List of sinks:") for sink in sinks: print("============== [%s] ===============" % sink.sink_id) - config = sink.read_config() + config, _ = sink.read_config() for key in config.keys(): if key == "node_role": print("[%s]: %s" % (key, NodeRole.from_dualmcu_value(config[key]))) diff --git a/python_transport/wirepas_gateway/dbus/sink_manager.py b/python_transport/wirepas_gateway/dbus/sink_manager.py index b813a068..e77dd7dc 100644 --- a/python_transport/wirepas_gateway/dbus/sink_manager.py +++ b/python_transport/wirepas_gateway/dbus/sink_manager.py @@ -24,6 +24,7 @@ def __init__(self, bus, proxy, sink_id, unique_name, on_stack_started, on_stack_ self.unique_name = unique_name self._on_started_handle = None self._on_stopped_handle = None + self._last_config_dict = None def register_for_stack_started(self): # Use the subscribe directly to be able to specify the sender @@ -136,6 +137,7 @@ def _get_pair_params(self, dic, key1, att1, key2, att2): def read_config(self): config = {} config["sink_id"] = self.sink_id + partial = False # Should always be available try: @@ -143,7 +145,6 @@ def read_config(self): except GLib.Error as e: error = ReturnCode.error_from_dbus_exception(str(e)) logging.error("Cannot get Stack state: %s", error) - return None self._get_param(config, "node_address", "NodeAddress") self._get_param(config, "node_role", "NodeRole") @@ -181,7 +182,16 @@ def read_config(self): # Add scratchpad related info self.get_scratchpad_status(config) - return config + if self._last_config_dict is not None: + for key, value in self._last_config_dict.items(): + if key not in config: + logging.warning("Add %s from cache in config", key) + config[key] = value + partial = True + + self._last_config_dict = config.copy() + + return config, partial def _set_param(self, dic, key, attribute): try: diff --git a/python_transport/wirepas_gateway/dbus_print_client.py b/python_transport/wirepas_gateway/dbus_print_client.py index d519fb2b..a5082f98 100644 --- a/python_transport/wirepas_gateway/dbus_print_client.py +++ b/python_transport/wirepas_gateway/dbus_print_client.py @@ -46,7 +46,8 @@ def on_sink_connected(self, name): if sink is not None: # Read Stack status of sink on connection - logging.info("Sink connected with config: %s", sink.read_config()) + config, _ = sink.read_config() + logging.info("Sink connected with config: %s", config) def main(): diff --git a/python_transport/wirepas_gateway/transport_service.py b/python_transport/wirepas_gateway/transport_service.py index cd99a02d..df7db9e0 100644 --- a/python_transport/wirepas_gateway/transport_service.py +++ b/python_transport/wirepas_gateway/transport_service.py @@ -233,6 +233,8 @@ def __init__(self, settings, **kwargs): self._is_update_status_scheduled = False self._status_lock = Lock() + self._last_status_config = None + def _on_mqtt_wrapper_termination_cb(self): """ Callback used to be informed when the MQTT wrapper has exited @@ -244,13 +246,23 @@ def _on_mqtt_wrapper_termination_cb(self): def _set_status(self): # Create a list of different sink configs + partial_status = False configs = [] for sink in self.sink_manager.get_sinks(): - config = sink.read_config() + config, partial = sink.read_config() + partial_status |= partial if config is not None: configs.append(config) - topic = TopicGenerator.make_get_configs_response_topic(self.gw_id) + if partial_status: + # Some part of the status were read from cache value + logging.warning("Some value were not up to date") + + # Publish only if something has changed + if self._last_status_config is not None and \ + self._last_status_config == configs: + logging.info("No new status to publish") + return event_online = wmm.StatusEvent( self.gw_id, @@ -264,6 +276,10 @@ def _set_status(self): self.mqtt_wrapper.publish(topic, event_online.payload, qos=1, retain=True) + self._last_status_config = configs.copy() + + return partial_status + def _update_status(self): # First check if an update is about to be sent to avoid # multiple updates in a short period (0.5s) @@ -279,12 +295,27 @@ def _update_status(self): def set_status_after_delay(delay_s): sleep(delay_s) + attempt = 0 logging.debug("Time to update the status") + if self._set_status(): + # Status is partial, retry few times until we have valid one + delay_s = 2 + while attempt < 5: + sleep(delay_s) + logging.debug("New attempt to publish status") + if not self._set_status(): + # Successful update + break + + attempt += 1 + delay_s *= 2 + + if attempt == 5: + logging.error("Not able to read a full status") self._is_update_status_scheduled = False - self._set_status() # We are here as there was no update scheduled yet. - # Wait a bit + # Wait a bit in case something else is changing thread = Thread(target=set_status_after_delay, args=[0.5]) thread.start() @@ -426,12 +457,14 @@ def _send_asynchronous_set_config_response(self, name): if sink is None: logging.error("Sink %s error: unknown sink", name) return + + config, _ = sink.read_config() response = wmm.SetConfigResponse( 0, self.gw_id, wmm.GatewayResultCode.GW_RES_OK, sink.sink_id, - sink.read_config(), + config, ) topic = TopicGenerator.make_set_config_response_topic(self.gw_id, sink.sink_id) self.mqtt_wrapper.publish(topic, response.payload, qos=2) @@ -440,7 +473,7 @@ def _send_asynchronous_get_configs_response(self): # Create a list of different sink configs configs = [] for sink in self.sink_manager.get_sinks(): - config = sink.read_config() + config, _ = sink.read_config() if config is not None: configs.append(config) @@ -547,7 +580,7 @@ def _on_get_configs_cmd_received(self, client, userdata, message): # Create a list of different sink configs configs = [] for sink in self.sink_manager.get_sinks(): - config = sink.read_config() + config, _ = sink.read_config() if config is not None: configs.append(config) @@ -599,7 +632,7 @@ def _on_set_config_cmd_received(self, client, userdata, message): sink = self.sink_manager.get_sink(request.sink_id) if sink is not None: res = sink.write_config(request.new_config) - new_config = sink.read_config() + new_config, _ = sink.read_config() else: res = wmm.GatewayResultCode.GW_RES_INVALID_SINK_ID new_config = None