From dd2b53ac4a24d4433dc32dd9253ec488582336f2 Mon Sep 17 00:00:00 2001 From: CamDavidsonPilon Date: Sun, 14 Jul 2024 12:51:54 -0400 Subject: [PATCH] moving automation code around --- CHANGELOG.md | 1 + pioreactor/automations/base.py | 46 +++++++++++++- pioreactor/background_jobs/base.py | 2 +- .../background_jobs/dosing_automation.py | 63 ++----------------- pioreactor/background_jobs/led_automation.py | 38 ----------- .../background_jobs/temperature_automation.py | 49 ++------------- pioreactor/structs.py | 4 -- pioreactor/tests/test_automation_struct.py | 29 --------- pioreactor/tests/test_dosing_automation.py | 4 +- pioreactor/utils/timing.py | 1 + 10 files changed, 59 insertions(+), 178 deletions(-) delete mode 100644 pioreactor/tests/test_automation_struct.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 8ad92b13..6a4270b1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -64,6 +64,7 @@ The benefits of removing this abstraction is much less code, less overhead, easier developer experience, and overall simplification. Later, we may create a new abstraction, but now we are moving abstractions back the level 0. - `log` in experiment profiles now uses expressions instead of Python string formatting. For example: `The unit {unit} is running {job} in experiment {experiment}` should be replaced by expressions in the string: `The unit ${{unit()}} is running ${{job_name()}} in the experiment ${{experiment}}`. Note: `{job}` is not `${{job_name()}}`. + - This is also a bug fix, as this behaviour was not intended: when pausing temperature automations, the heater now turns off and stays off until unpaused. ### 24.7.5 & 24.7.6 & 24.7.7 diff --git a/pioreactor/automations/base.py b/pioreactor/automations/base.py index e002f142..676fa1c6 100644 --- a/pioreactor/automations/base.py +++ b/pioreactor/automations/base.py @@ -3,8 +3,13 @@ from typing import Optional +from msgspec.json import encode + +from pioreactor import structs from pioreactor.automations import events from pioreactor.background_jobs.base import BackgroundJob +from pioreactor.pubsub import QOS +from pioreactor.utils.timing import current_utc_datetime DISALLOWED_AUTOMATION_NAMES = { @@ -14,19 +19,21 @@ class AutomationJob(BackgroundJob): automation_name = "automation_job" + _latest_settings_ended_at = None def __init__(self, unit: str, experiment: str) -> None: - super(AutomationJob, self).__init__(unit, experiment) - + super().__init__(unit, experiment) if self.automation_name in DISALLOWED_AUTOMATION_NAMES: raise NameError(f"{self.automation_name} is not allowed.") + self.logger.info(f"Starting {self.automation_name}.") + self.add_to_published_settings( "automation_name", { "datatype": "string", "settable": False, - } + }, ) self.add_to_published_settings( "latest_event", @@ -37,6 +44,7 @@ def __init__(self, unit: str, experiment: str) -> None: ) self._publish_attr("automation_name") + self._latest_settings_started_at = current_utc_datetime() def on_init_to_ready(self) -> None: self.start_passive_listeners() @@ -46,3 +54,35 @@ def execute(self) -> Optional[events.AutomationEvent]: Overwrite in subclass """ return events.NoEvent() + + def _send_details_to_mqtt(self) -> None: + self.publish( + f"pioreactor/{self.unit}/{self.experiment}/{self.job_name}/{self.job_name}_settings", + encode( + structs.AutomationSettings( + pioreactor_unit=self.unit, + experiment=self.experiment, + started_at=self._latest_settings_started_at, + ended_at=self._latest_settings_ended_at, + automation_name=self.automation_name, + settings=encode( + { + attr: getattr(self, attr, None) + for attr, metadata in self.published_settings.items() + if metadata["settable"] + } + ), + ) + ), + qos=QOS.EXACTLY_ONCE, + ) + + def __setattr__(self, name, value) -> None: + super().__setattr__(name, value) + if name in self.published_settings and name != "state" and self.published_settings[name]["settable"]: + self._latest_settings_ended_at = current_utc_datetime() + self._send_details_to_mqtt() + self._latest_settings_started_at, self._latest_settings_ended_at = ( + current_utc_datetime(), + None, + ) diff --git a/pioreactor/background_jobs/base.py b/pioreactor/background_jobs/base.py index 6db33fe9..a8d87caf 100644 --- a/pioreactor/background_jobs/base.py +++ b/pioreactor/background_jobs/base.py @@ -601,7 +601,7 @@ def _create_sub_client(self) -> Client: # was exhausted), so we reset the last will in the pre_connect callback. def set_last_will(client: Client, userdata) -> None: # we can only set last wills _before_ connecting, so we put this here. - client.will_set(**last_will) + client.will_set(**last_will) # type: ignore def reconnect_protocol(client: Client, userdata, flags, rc: int, properties=None) -> None: self.logger.info("Reconnected to the MQTT broker on leader.") diff --git a/pioreactor/background_jobs/dosing_automation.py b/pioreactor/background_jobs/dosing_automation.py index 58d0821f..fec1957f 100644 --- a/pioreactor/background_jobs/dosing_automation.py +++ b/pioreactor/background_jobs/dosing_automation.py @@ -6,13 +6,11 @@ from datetime import datetime from functools import partial from threading import Thread -from typing import Any from typing import cast from typing import Optional import click from msgspec.json import decode -from msgspec.json import encode from pioreactor import exc from pioreactor import structs @@ -23,7 +21,6 @@ from pioreactor.automations import events from pioreactor.automations.base import AutomationJob from pioreactor.config import config -from pioreactor.pubsub import QOS from pioreactor.utils import is_pio_job_running from pioreactor.utils import local_persistant_storage from pioreactor.utils import SummableDict @@ -183,7 +180,6 @@ class DosingAutomationJob(AutomationJob): _latest_od: Optional[dict[pt.PdChannel, float]] = None latest_event: Optional[events.AutomationEvent] = None - _latest_settings_ended_at: Optional[datetime] = None _latest_run_at: Optional[datetime] = None run_thread: RepeatedTimer | Thread duration: float | None @@ -237,7 +233,6 @@ def __init__( self.skip_first_run = skip_first_run - self._latest_settings_started_at = current_utc_datetime() self.latest_normalized_od_at = current_utc_datetime() self.latest_growth_rate_at = current_utc_datetime() self.latest_od_at = current_utc_datetime() @@ -335,9 +330,9 @@ def block_until_not_sleeping(self) -> bool: def execute_io_action( self, - waste_ml: float = 0, - media_ml: float = 0, - alt_media_ml: float = 0, + waste_ml: float = 0.0, + media_ml: float = 0.0, + alt_media_ml: float = 0.0, **other_pumps_ml: float, ) -> SummableDict: """ @@ -521,9 +516,6 @@ def latest_od(self) -> dict[pt.PdChannel, float]: ########## Private & internal methods def on_disconnected(self) -> None: - self._latest_settings_ended_at = current_utc_datetime() - self._send_details_to_mqtt() - with suppress(AttributeError): self.run_thread.join( timeout=10 @@ -531,21 +523,6 @@ def on_disconnected(self) -> None: if self.run_thread.is_alive(): self.logger.debug("run_thread still alive!") - def __setattr__(self, name: str, value: Any) -> None: - super(DosingAutomationJob, self).__setattr__(name, value) - if name in self.published_settings and name not in ( - "state", - "alt_media_fraction", - "media_throughput", - "alt_media_throughput", - "latest_event", - "vial_volume", - ): - self._latest_settings_ended_at = current_utc_datetime() - self._send_details_to_mqtt() - self._latest_settings_started_at = current_utc_datetime() - self._latest_settings_ended_at = None - def _set_growth_rate(self, message: pt.MQTTMessage) -> None: if not message.payload: return @@ -573,36 +550,6 @@ def _set_ods(self, message: pt.MQTTMessage) -> None: self._latest_od: dict[pt.PdChannel, float] = {c: payload.ods[c].od for c in payload.ods} self.latest_od_at = payload.timestamp - def _send_details_to_mqtt(self) -> None: - self.publish( - f"pioreactor/{self.unit}/{self.experiment}/{self.job_name}/dosing_automation_settings", - encode( - structs.AutomationSettings( - pioreactor_unit=self.unit, - experiment=self.experiment, - started_at=self._latest_settings_started_at, - ended_at=self._latest_settings_ended_at, - automation_name=self.automation_name, - settings=encode( - { - attr: getattr(self, attr, None) - for attr in self.published_settings - if attr - not in ( - "state", - "alt_media_fraction", - "media_throughput", - "vial_volume", - "alt_media_throughput", - "latest_event", - ) - } - ), - ) - ), - qos=QOS.EXACTLY_ONCE, - ) - def _update_dosing_metrics(self, message: pt.MQTTMessage) -> None: dosing_event = decode(message.payload, type=structs.DosingEvent) self._update_alt_media_fraction(dosing_event) @@ -684,7 +631,7 @@ def _init_volume_throughput(self) -> None: "alt_media_throughput", { "datatype": "float", - "settable": True, # settable because in the future, the UI may "reset" these values to 0. + "settable": False, "unit": "mL", }, ) @@ -692,7 +639,7 @@ def _init_volume_throughput(self) -> None: "media_throughput", { "datatype": "float", - "settable": True, + "settable": False, "unit": "mL", }, ) diff --git a/pioreactor/background_jobs/led_automation.py b/pioreactor/background_jobs/led_automation.py index 6e892036..97268aec 100644 --- a/pioreactor/background_jobs/led_automation.py +++ b/pioreactor/background_jobs/led_automation.py @@ -10,7 +10,6 @@ import click from msgspec.json import decode -from msgspec.json import encode from pioreactor import exc from pioreactor import structs @@ -19,7 +18,6 @@ from pioreactor.automations import events from pioreactor.automations.base import AutomationJob from pioreactor.config import config -from pioreactor.pubsub import QOS from pioreactor.utils import is_pio_job_running from pioreactor.utils import whoami from pioreactor.utils.timing import current_utc_datetime @@ -49,7 +47,6 @@ class LEDAutomationJob(AutomationJob): published_settings: dict[str, pt.PublishableSetting] = { "duration": {"datatype": "float", "settable": True}, - "automation_name": {"datatype": "string", "settable": False}, } _latest_growth_rate: Optional[float] = None @@ -57,7 +54,6 @@ class LEDAutomationJob(AutomationJob): previous_normalized_od: Optional[float] = None previous_growth_rate: Optional[float] = None - _latest_settings_ended_at: Optional[datetime] = None _latest_run_at: Optional[datetime] = None latest_event: Optional[events.AutomationEvent] = None @@ -80,7 +76,6 @@ def __init__( super(LEDAutomationJob, self).__init__(unit, experiment) self.skip_first_run = skip_first_run - self._latest_settings_started_at: datetime = current_utc_datetime() self.latest_normalized_od_at: datetime = current_utc_datetime() self.latest_growth_rate_at: datetime = current_utc_datetime() self.edited_channels: set[pt.LedChannel] = set() @@ -237,9 +232,6 @@ def latest_normalized_od(self) -> float: ########## Private & internal methods def on_disconnected(self) -> None: - self._latest_settings_ended_at = current_utc_datetime() - self._send_details_to_mqtt() - with suppress(AttributeError): self.run_thread.join( timeout=10 @@ -255,14 +247,6 @@ def on_disconnected(self) -> None: source_of_event=f"{self.job_name}:{self.automation_name}", ) - def __setattr__(self, name, value) -> None: - super(LEDAutomationJob, self).__setattr__(name, value) - if name in self.published_settings and name not in ("state", "latest_event"): - self._latest_settings_ended_at = current_utc_datetime() - self._send_details_to_mqtt() - self._latest_settings_started_at = current_utc_datetime() - self._latest_settings_ended_at = None - def _set_growth_rate(self, message: pt.MQTTMessage) -> None: if not message.payload: return @@ -279,28 +263,6 @@ def _set_OD(self, message: pt.MQTTMessage) -> None: self._latest_normalized_od = payload.od_filtered self.latest_normalized_od_at = payload.timestamp - def _send_details_to_mqtt(self) -> None: - self.publish( - f"pioreactor/{self.unit}/{self.experiment}/{self.job_name}/led_automation_settings", - encode( - structs.AutomationSettings( - pioreactor_unit=self.unit, - experiment=self.experiment, - started_at=self._latest_settings_started_at, - ended_at=self._latest_settings_ended_at, - automation_name=self.automation_name, - settings=encode( - { - attr: getattr(self, attr, None) - for attr in self.published_settings - if attr not in ("state", "latest_event") - } - ), - ) - ), - qos=QOS.EXACTLY_ONCE, - ) - def start_passive_listeners(self) -> None: self.subscribe_and_callback( self._set_OD, diff --git a/pioreactor/background_jobs/temperature_automation.py b/pioreactor/background_jobs/temperature_automation.py index 4861e2bd..0dbe987a 100644 --- a/pioreactor/background_jobs/temperature_automation.py +++ b/pioreactor/background_jobs/temperature_automation.py @@ -10,7 +10,6 @@ import click from msgspec.json import decode -from msgspec.json import encode from pioreactor import error_codes from pioreactor import exc @@ -19,7 +18,6 @@ from pioreactor import types as pt from pioreactor.automations.base import AutomationJob from pioreactor.config import config -from pioreactor.pubsub import QOS from pioreactor.structs import Temperature from pioreactor.utils import clamp from pioreactor.utils import is_pio_job_running @@ -76,7 +74,6 @@ class TemperatureAutomationJob(AutomationJob): latest_temperature = None previous_temperature = None - _latest_settings_ended_at = None automation_name = "temperature_automation_base" # is overwritten in subclasses job_name = "temperature_automation" @@ -102,7 +99,6 @@ def __init__( **kwargs, ) -> None: super(TemperatureAutomationJob, self).__init__(unit, experiment) - self._latest_settings_started_at = current_utc_datetime() self.add_to_published_settings( "temperature", {"datatype": "Temperature", "settable": False, "unit": "℃"} @@ -328,8 +324,12 @@ def on_disconnected(self) -> None: with suppress(AttributeError): self.turn_off_heater() - self._latest_settings_ended_at = current_utc_datetime() - self._send_details_to_mqtt() + def on_sleeping(self) -> None: + self.publish_temperature_timer.pause() + self._update_heater(0) + + def on_sleeping_to_ready(self) -> None: + self.publish_temperature_timer.unpause() def setup_pwm(self) -> PWM: hertz = 16 # technically this doesn't need to be high: it could even be 1hz. However, we want to smooth it's @@ -581,21 +581,6 @@ def dot_product(vec1: list, vec2: list) -> float: return dot_product(coefs, X) + intercept - def __setattr__(self, name, value) -> None: - super(TemperatureAutomationJob, self).__setattr__(name, value) - if name in self.published_settings and name not in ( - "state", - "latest_event", - "heater_duty_cycle", - "temperature", - ): - self._latest_settings_ended_at = current_utc_datetime() - self._send_details_to_mqtt() - self._latest_settings_started_at, self._latest_settings_ended_at = ( - current_utc_datetime(), - None, - ) - def _set_growth_rate(self, message: pt.MQTTMessage) -> None: if not message.payload: return @@ -624,28 +609,6 @@ def _set_OD(self, message: pt.MQTTMessage) -> None: self._latest_normalized_od = payload.od_filtered self.latest_normalized_od_at = payload.timestamp - def _send_details_to_mqtt(self) -> None: - self.publish( - f"pioreactor/{self.unit}/{self.experiment}/{self.job_name}/temperature_automation_settings", - encode( - structs.AutomationSettings( - pioreactor_unit=self.unit, - experiment=self.experiment, - started_at=self._latest_settings_started_at, - ended_at=self._latest_settings_ended_at, - automation_name=self.automation_name, - settings=encode( - { - attr: getattr(self, attr, None) - for attr in self.published_settings - if attr not in ("state", "latest_event", "heater_duty_cycle", "temperature") - } - ), - ) - ), - qos=QOS.EXACTLY_ONCE, - ) - def start_passive_listeners(self) -> None: self.subscribe_and_callback( self._set_growth_rate, diff --git a/pioreactor/structs.py b/pioreactor/structs.py index 032ad6ff..b25d396c 100644 --- a/pioreactor/structs.py +++ b/pioreactor/structs.py @@ -221,7 +221,3 @@ class KalmanFilterOutput(Struct): state: t.Annotated[list[float], Meta(max_length=3)] covariance_matrix: list[list[float]] timestamp: t.Annotated[datetime, Meta(tz=True)] - - -class ExperimentMetadata(Struct): - experiment: str diff --git a/pioreactor/tests/test_automation_struct.py b/pioreactor/tests/test_automation_struct.py deleted file mode 100644 index f10c1c61..00000000 --- a/pioreactor/tests/test_automation_struct.py +++ /dev/null @@ -1,29 +0,0 @@ -# -*- coding: utf-8 -*- -from __future__ import annotations - -from pioreactor.structs import Automation - - -def test_str_representation() -> None: - a = Automation( - automation_name="test", - args={"growth": 0.1, "intensity": "high", "value": True}, - ) - - assert str(a) == "test(growth=0.1, intensity=high, value=True)" - - -def test_str_representation_of_skip_first_run() -> None: - a = Automation( - automation_name="test", - args={"skip_first_run": 0, "intensity": "high", "value": True}, - ) - - assert str(a) == "test(skip_first_run=False, intensity=high, value=True)" - - b = Automation( - automation_name="test", - args={"skip_first_run": 1, "intensity": "high", "value": True}, - ) - - assert str(b) == "test(skip_first_run=True, intensity=high, value=True)" diff --git a/pioreactor/tests/test_dosing_automation.py b/pioreactor/tests/test_dosing_automation.py index 85e464ca..d461da55 100644 --- a/pioreactor/tests/test_dosing_automation.py +++ b/pioreactor/tests/test_dosing_automation.py @@ -999,8 +999,8 @@ def test_strings_are_okay_for_chemostat() -> None: unit = get_unit_name() experiment = "test_strings_are_okay_for_chemostat" - with start_dosing_automation("chemostat", "20", False, unit, experiment, volume="0.7") as chemostat: - assert chemostat.volume == 0.7 + with start_dosing_automation("chemostat", "20", False, unit, experiment, volume="0.7") as chemostat: # type: ignore + assert chemostat.volume == 0.7 # type: ignore pause(n=35) assert chemostat.media_throughput == 0.7 diff --git a/pioreactor/utils/timing.py b/pioreactor/utils/timing.py index 8aef8ad3..a670cb3d 100644 --- a/pioreactor/utils/timing.py +++ b/pioreactor/utils/timing.py @@ -189,6 +189,7 @@ def start(self) -> RepeatedTimer: return self def join(self, timeout: t.Optional[float] = None) -> None: + # alias for .cancel() self.cancel(timeout=timeout) def is_alive(self) -> bool: