Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a cache value for configs #266

Merged
merged 2 commits into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python_transport/wirepas_gateway/configure_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ def list_sinks(self):
print("List of sinks:")
for sink in sinks:
print("============== [%s] ===============" % sink.sink_id)
config = sink.read_config()
config, _ = sink.read_config()
for key in config.keys():
if key == "node_role":
print("[%s]: %s" % (key, NodeRole.from_dualmcu_value(config[key])))
Expand Down
14 changes: 12 additions & 2 deletions python_transport/wirepas_gateway/dbus/sink_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def __init__(self, bus, proxy, sink_id, unique_name, on_stack_started, on_stack_
self.unique_name = unique_name
self._on_started_handle = None
self._on_stopped_handle = None
self._last_config_dict = None

def register_for_stack_started(self):
# Use the subscribe directly to be able to specify the sender
Expand Down Expand Up @@ -136,14 +137,14 @@ def _get_pair_params(self, dic, key1, att1, key2, att2):
def read_config(self):
config = {}
config["sink_id"] = self.sink_id
partial = False

# Should always be available
try:
config["started"] = (self.proxy.StackStatus & 0x01) == 0
except GLib.Error as e:
error = ReturnCode.error_from_dbus_exception(str(e))
logging.error("Cannot get Stack state: %s", error)
return None

self._get_param(config, "node_address", "NodeAddress")
self._get_param(config, "node_role", "NodeRole")
Expand Down Expand Up @@ -181,7 +182,16 @@ def read_config(self):
# Add scratchpad related info
self.get_scratchpad_status(config)

return config
if self._last_config_dict is not None:
for key, value in self._last_config_dict.items():
if key not in config:
logging.warning("Add %s from cache in config", key)
config[key] = value
partial = True

self._last_config_dict = config.copy()

return config, partial

def _set_param(self, dic, key, attribute):
try:
Expand Down
3 changes: 2 additions & 1 deletion python_transport/wirepas_gateway/dbus_print_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ def on_sink_connected(self, name):

if sink is not None:
# Read Stack status of sink on connection
logging.info("Sink connected with config: %s", sink.read_config())
config, _ = sink.read_config()
logging.info("Sink connected with config: %s", config)


def main():
Expand Down
49 changes: 41 additions & 8 deletions python_transport/wirepas_gateway/transport_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@ def __init__(self, settings, **kwargs):
self._is_update_status_scheduled = False
self._status_lock = Lock()

self._last_status_config = None

def _on_mqtt_wrapper_termination_cb(self):
"""
Callback used to be informed when the MQTT wrapper has exited
Expand All @@ -244,13 +246,23 @@ def _on_mqtt_wrapper_termination_cb(self):

def _set_status(self):
# Create a list of different sink configs
partial_status = False
configs = []
for sink in self.sink_manager.get_sinks():
config = sink.read_config()
config, partial = sink.read_config()
partial_status |= partial
if config is not None:
configs.append(config)

topic = TopicGenerator.make_get_configs_response_topic(self.gw_id)
if partial_status:
# Some part of the status were read from cache value
logging.warning("Some value were not up to date")

# Publish only if something has changed
if self._last_status_config is not None and \
self._last_status_config == configs:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

continuation line with same indent as next logical line

logging.info("No new status to publish")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

over-indented

return

event_online = wmm.StatusEvent(
self.gw_id,
Expand All @@ -264,6 +276,10 @@ def _set_status(self):

self.mqtt_wrapper.publish(topic, event_online.payload, qos=1, retain=True)

self._last_status_config = configs.copy()

return partial_status

def _update_status(self):
# First check if an update is about to be sent to avoid
# multiple updates in a short period (0.5s)
Expand All @@ -279,12 +295,27 @@ def _update_status(self):

def set_status_after_delay(delay_s):
sleep(delay_s)
attempt = 0
logging.debug("Time to update the status")
if self._set_status():
# Status is partial, retry few times until we have valid one
delay_s = 2
while attempt < 5:
sleep(delay_s)
logging.debug("New attempt to publish status")
if not self._set_status():
# Successful update
break

attempt += 1
delay_s *= 2

if attempt == 5:
logging.error("Not able to read a full status")
self._is_update_status_scheduled = False
self._set_status()

# We are here as there was no update scheduled yet.
# Wait a bit
# Wait a bit in case something else is changing
thread = Thread(target=set_status_after_delay, args=[0.5])
thread.start()

Expand Down Expand Up @@ -426,12 +457,14 @@ def _send_asynchronous_set_config_response(self, name):
if sink is None:
logging.error("Sink %s error: unknown sink", name)
return

config, _ = sink.read_config()
response = wmm.SetConfigResponse(
0,
self.gw_id,
wmm.GatewayResultCode.GW_RES_OK,
sink.sink_id,
sink.read_config(),
config,
)
topic = TopicGenerator.make_set_config_response_topic(self.gw_id, sink.sink_id)
self.mqtt_wrapper.publish(topic, response.payload, qos=2)
Expand All @@ -440,7 +473,7 @@ def _send_asynchronous_get_configs_response(self):
# Create a list of different sink configs
configs = []
for sink in self.sink_manager.get_sinks():
config = sink.read_config()
config, _ = sink.read_config()
if config is not None:
configs.append(config)

Expand Down Expand Up @@ -547,7 +580,7 @@ def _on_get_configs_cmd_received(self, client, userdata, message):
# Create a list of different sink configs
configs = []
for sink in self.sink_manager.get_sinks():
config = sink.read_config()
config, _ = sink.read_config()
if config is not None:
configs.append(config)

Expand Down Expand Up @@ -599,7 +632,7 @@ def _on_set_config_cmd_received(self, client, userdata, message):
sink = self.sink_manager.get_sink(request.sink_id)
if sink is not None:
res = sink.write_config(request.new_config)
new_config = sink.read_config()
new_config, _ = sink.read_config()
else:
res = wmm.GatewayResultCode.GW_RES_INVALID_SINK_ID
new_config = None
Expand Down
2 changes: 2 additions & 0 deletions sink_service/source/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,8 @@ static const sd_bus_vtable config_vtable[] = {

/* Event generated when stack starts */
SD_BUS_SIGNAL("StackStarted", "", 0),
/* Event generated when stack is stopped */
SD_BUS_SIGNAL("StackStopped", "", 0),

SD_BUS_VTABLE_END};

Expand Down
Loading