diff --git a/pioreactor/automations/led/base.py b/pioreactor/automations/led/base.py index 17d89662..969b07f4 100644 --- a/pioreactor/automations/led/base.py +++ b/pioreactor/automations/led/base.py @@ -264,12 +264,16 @@ def __setattr__(self, name, value) -> None: self._latest_settings_ended_at = None def _set_growth_rate(self, message: pt.MQTTMessage) -> None: + if not message.payload: + return self.previous_growth_rate = self._latest_growth_rate payload = decode(message.payload, type=structs.GrowthRate) self._latest_growth_rate = payload.growth_rate self.latest_growth_rate_at = payload.timestamp def _set_OD(self, message: pt.MQTTMessage) -> None: + if not message.payload: + return self.previous_normalized_od = self._latest_normalized_od payload = decode(message.payload, type=structs.ODFiltered) self._latest_normalized_od = payload.od_filtered diff --git a/pioreactor/automations/temperature/base.py b/pioreactor/automations/temperature/base.py index a6f59a10..aa232a08 100644 --- a/pioreactor/automations/temperature/base.py +++ b/pioreactor/automations/temperature/base.py @@ -116,9 +116,7 @@ def latest_growth_rate(self) -> float: # this should really only happen on the initialization. self.logger.debug("Waiting for OD and growth rate data to arrive") if not all(is_pio_job_running(["od_reading", "growth_rate_calculating"])): - raise exc.JobRequiredError( - "`od_reading` and `growth_rate_calculating` should be Ready." - ) + raise exc.JobRequiredError("`od_reading` and `growth_rate_calculating` should be Ready.") # check most stale time if (current_utc_datetime() - self.most_stale_time).seconds > 5 * 60: @@ -135,9 +133,7 @@ def latest_normalized_od(self) -> float: # this should really only happen on the initialization. self.logger.debug("Waiting for OD and growth rate data to arrive") if not all(is_pio_job_running(["od_reading", "growth_rate_calculating"])): - raise exc.JobRequiredError( - "`od_reading` and `growth_rate_calculating` should be running." - ) + raise exc.JobRequiredError("`od_reading` and `growth_rate_calculating` should be running.") # check most stale time if (current_utc_datetime() - self.most_stale_time).seconds > 5 * 60: @@ -164,6 +160,9 @@ def __setattr__(self, name, value) -> None: ) def _set_growth_rate(self, message: pt.MQTTMessage) -> None: + if not message.payload: + return + self.previous_growth_rate = self._latest_growth_rate payload = decode(message.payload, type=structs.GrowthRate) self._latest_growth_rate = payload.growth_rate diff --git a/pioreactor/pubsub.py b/pioreactor/pubsub.py index 533f0d0e..a3c93b0e 100644 --- a/pioreactor/pubsub.py +++ b/pioreactor/pubsub.py @@ -295,14 +295,14 @@ def on_connect(client: Client, userdata: dict, *args): return client -def prune_retained_messages(topics_to_prune: str = "#"): +def prune_retained_messages(topics_to_prune: str = "#") -> None: topics = [] def on_message(message): topics.append(message.topic) - client = subscribe_and_callback(on_message, topics_to_prune, timeout=1) - + client = subscribe_and_callback(on_message, topics_to_prune) + sleep(1) for topic in topics.copy(): publish(topic, None, retain=True)