Skip to content

Commit

Permalink
Transport: keep a cache of each sink config
Browse files Browse the repository at this point in the history
It ensure that if we have difficulties to read it from sink (during scratchpad exchange),
config is still valid (missing values are from the cache).
  • Loading branch information
GwendalRaoul committed Apr 12, 2024
1 parent bfd5a2d commit 710ba6f
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 12 deletions.
2 changes: 1 addition & 1 deletion python_transport/wirepas_gateway/configure_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ def list_sinks(self):
print("List of sinks:")
for sink in sinks:
print("============== [%s] ===============" % sink.sink_id)
config = sink.read_config()
config, _ = sink.read_config()
for key in config.keys():
if key == "node_role":
print("[%s]: %s" % (key, NodeRole.from_dualmcu_value(config[key])))
Expand Down
14 changes: 12 additions & 2 deletions python_transport/wirepas_gateway/dbus/sink_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def __init__(self, bus, proxy, sink_id, unique_name, on_stack_started, on_stack_
self.unique_name = unique_name
self._on_started_handle = None
self._on_stopped_handle = None
self._last_config_dict = None

def register_for_stack_started(self):
# Use the subscribe directly to be able to specify the sender
Expand Down Expand Up @@ -136,14 +137,14 @@ def _get_pair_params(self, dic, key1, att1, key2, att2):
def read_config(self):
config = {}
config["sink_id"] = self.sink_id
partial = False

# Should always be available
try:
config["started"] = (self.proxy.StackStatus & 0x01) == 0
except GLib.Error as e:
error = ReturnCode.error_from_dbus_exception(str(e))
logging.error("Cannot get Stack state: %s", error)
return None

self._get_param(config, "node_address", "NodeAddress")
self._get_param(config, "node_role", "NodeRole")
Expand Down Expand Up @@ -181,7 +182,16 @@ def read_config(self):
# Add scratchpad related info
self.get_scratchpad_status(config)

return config
if self._last_config_dict is not None:
for key, value in self._last_config_dict.items():
if key not in config:
logging.warning("Add %s from cache in config", key)
config[key] = value
partial = True

self._last_config_dict = config.copy()

return config, partial

def _set_param(self, dic, key, attribute):
try:
Expand Down
3 changes: 2 additions & 1 deletion python_transport/wirepas_gateway/dbus_print_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ def on_sink_connected(self, name):

if sink is not None:
# Read Stack status of sink on connection
logging.info("Sink connected with config: %s", sink.read_config())
config, _ = sink.read_config()
logging.info("Sink connected with config: %s", config)


def main():
Expand Down
49 changes: 41 additions & 8 deletions python_transport/wirepas_gateway/transport_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@ def __init__(self, settings, **kwargs):
self._is_update_status_scheduled = False
self._status_lock = Lock()

self._last_status_config = None

def _on_mqtt_wrapper_termination_cb(self):
"""
Callback used to be informed when the MQTT wrapper has exited
Expand All @@ -244,13 +246,23 @@ def _on_mqtt_wrapper_termination_cb(self):

def _set_status(self):
# Create a list of different sink configs
partial_status = False
configs = []
for sink in self.sink_manager.get_sinks():
config = sink.read_config()
config, partial = sink.read_config()
partial_status |= partial
if config is not None:
configs.append(config)

topic = TopicGenerator.make_get_configs_response_topic(self.gw_id)
if partial_status:
# Some part of the status were read from cache value
logging.warning("Some value were not up to date")

# Publish only if something has changed
if self._last_status_config is not None and \
self._last_status_config == configs:
logging.info("No new status to publish")
return

event_online = wmm.StatusEvent(
self.gw_id,
Expand All @@ -264,6 +276,10 @@ def _set_status(self):

self.mqtt_wrapper.publish(topic, event_online.payload, qos=1, retain=True)

self._last_status_config = configs.copy()

return partial_status

def _update_status(self):
# First check if an update is about to be sent to avoid
# multiple updates in a short period (0.5s)
Expand All @@ -279,12 +295,27 @@ def _update_status(self):

def set_status_after_delay(delay_s):
sleep(delay_s)
attempt = 0
logging.debug("Time to update the status")
if self._set_status():
# Status is partial, retry few times until we have valid one
delay_s = 2
while attempt < 5:
sleep(delay_s)
logging.debug("New attempt to publish status")
if not self._set_status():
# Successful update
break

attempt += 1
delay_s *= 2

if attempt == 5:
logging.error("Not able to read a full status")
self._is_update_status_scheduled = False
self._set_status()

# We are here as there was no update scheduled yet.
# Wait a bit
# Wait a bit in case something else is changing
thread = Thread(target=set_status_after_delay, args=[0.5])
thread.start()

Expand Down Expand Up @@ -426,12 +457,14 @@ def _send_asynchronous_set_config_response(self, name):
if sink is None:
logging.error("Sink %s error: unknown sink", name)
return

config, _ = sink.read_config()
response = wmm.SetConfigResponse(
0,
self.gw_id,
wmm.GatewayResultCode.GW_RES_OK,
sink.sink_id,
sink.read_config(),
config,
)
topic = TopicGenerator.make_set_config_response_topic(self.gw_id, sink.sink_id)
self.mqtt_wrapper.publish(topic, response.payload, qos=2)
Expand All @@ -440,7 +473,7 @@ def _send_asynchronous_get_configs_response(self):
# Create a list of different sink configs
configs = []
for sink in self.sink_manager.get_sinks():
config = sink.read_config()
config, _ = sink.read_config()
if config is not None:
configs.append(config)

Expand Down Expand Up @@ -547,7 +580,7 @@ def _on_get_configs_cmd_received(self, client, userdata, message):
# Create a list of different sink configs
configs = []
for sink in self.sink_manager.get_sinks():
config = sink.read_config()
config, _ = sink.read_config()
if config is not None:
configs.append(config)

Expand Down Expand Up @@ -599,7 +632,7 @@ def _on_set_config_cmd_received(self, client, userdata, message):
sink = self.sink_manager.get_sink(request.sink_id)
if sink is not None:
res = sink.write_config(request.new_config)
new_config = sink.read_config()
new_config, _ = sink.read_config()
else:
res = wmm.GatewayResultCode.GW_RES_INVALID_SINK_ID
new_config = None
Expand Down

0 comments on commit 710ba6f

Please sign in to comment.