Skip to content

Commit

Permalink
moving automation code around
Browse files Browse the repository at this point in the history
  • Loading branch information
CamDavidsonPilon committed Jul 14, 2024
1 parent 9c230e1 commit dd2b53a
Show file tree
Hide file tree
Showing 10 changed files with 59 additions and 178 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
The benefits of removing this abstraction is much less code, less overhead, easier developer experience, and overall simplification. Later, we may create a new abstraction, but now we are moving abstractions back the level 0.

- `log` in experiment profiles now uses expressions instead of Python string formatting. For example: `The unit {unit} is running {job} in experiment {experiment}` should be replaced by expressions in the string: `The unit ${{unit()}} is running ${{job_name()}} in the experiment ${{experiment}}`. Note: `{job}` is not `${{job_name()}}`.
- This is also a bug fix, as this behaviour was not intended: when pausing temperature automations, the heater now turns off and stays off until unpaused.

### 24.7.5 & 24.7.6 & 24.7.7

Expand Down
46 changes: 43 additions & 3 deletions pioreactor/automations/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,13 @@

from typing import Optional

from msgspec.json import encode

from pioreactor import structs
from pioreactor.automations import events
from pioreactor.background_jobs.base import BackgroundJob
from pioreactor.pubsub import QOS
from pioreactor.utils.timing import current_utc_datetime


DISALLOWED_AUTOMATION_NAMES = {
Expand All @@ -14,19 +19,21 @@

class AutomationJob(BackgroundJob):
automation_name = "automation_job"
_latest_settings_ended_at = None

def __init__(self, unit: str, experiment: str) -> None:
super(AutomationJob, self).__init__(unit, experiment)

super().__init__(unit, experiment)
if self.automation_name in DISALLOWED_AUTOMATION_NAMES:
raise NameError(f"{self.automation_name} is not allowed.")

self.logger.info(f"Starting {self.automation_name}.")

self.add_to_published_settings(
"automation_name",
{
"datatype": "string",
"settable": False,
}
},
)
self.add_to_published_settings(
"latest_event",
Expand All @@ -37,6 +44,7 @@ def __init__(self, unit: str, experiment: str) -> None:
)
self._publish_attr("automation_name")

self._latest_settings_started_at = current_utc_datetime()

def on_init_to_ready(self) -> None:
self.start_passive_listeners()
Expand All @@ -46,3 +54,35 @@ def execute(self) -> Optional[events.AutomationEvent]:
Overwrite in subclass
"""
return events.NoEvent()

def _send_details_to_mqtt(self) -> None:
self.publish(
f"pioreactor/{self.unit}/{self.experiment}/{self.job_name}/{self.job_name}_settings",
encode(
structs.AutomationSettings(
pioreactor_unit=self.unit,
experiment=self.experiment,
started_at=self._latest_settings_started_at,
ended_at=self._latest_settings_ended_at,
automation_name=self.automation_name,
settings=encode(
{
attr: getattr(self, attr, None)
for attr, metadata in self.published_settings.items()
if metadata["settable"]
}
),
)
),
qos=QOS.EXACTLY_ONCE,
)

def __setattr__(self, name, value) -> None:
super().__setattr__(name, value)
if name in self.published_settings and name != "state" and self.published_settings[name]["settable"]:
self._latest_settings_ended_at = current_utc_datetime()
self._send_details_to_mqtt()
self._latest_settings_started_at, self._latest_settings_ended_at = (
current_utc_datetime(),
None,
)
2 changes: 1 addition & 1 deletion pioreactor/background_jobs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,7 @@ def _create_sub_client(self) -> Client:
# was exhausted), so we reset the last will in the pre_connect callback.
def set_last_will(client: Client, userdata) -> None:
# we can only set last wills _before_ connecting, so we put this here.
client.will_set(**last_will)
client.will_set(**last_will) # type: ignore

def reconnect_protocol(client: Client, userdata, flags, rc: int, properties=None) -> None:
self.logger.info("Reconnected to the MQTT broker on leader.")
Expand Down
63 changes: 5 additions & 58 deletions pioreactor/background_jobs/dosing_automation.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,11 @@
from datetime import datetime
from functools import partial
from threading import Thread
from typing import Any
from typing import cast
from typing import Optional

import click
from msgspec.json import decode
from msgspec.json import encode

from pioreactor import exc
from pioreactor import structs
Expand All @@ -23,7 +21,6 @@
from pioreactor.automations import events
from pioreactor.automations.base import AutomationJob
from pioreactor.config import config
from pioreactor.pubsub import QOS
from pioreactor.utils import is_pio_job_running
from pioreactor.utils import local_persistant_storage
from pioreactor.utils import SummableDict
Expand Down Expand Up @@ -183,7 +180,6 @@ class DosingAutomationJob(AutomationJob):
_latest_od: Optional[dict[pt.PdChannel, float]] = None

latest_event: Optional[events.AutomationEvent] = None
_latest_settings_ended_at: Optional[datetime] = None
_latest_run_at: Optional[datetime] = None
run_thread: RepeatedTimer | Thread
duration: float | None
Expand Down Expand Up @@ -237,7 +233,6 @@ def __init__(

self.skip_first_run = skip_first_run

self._latest_settings_started_at = current_utc_datetime()
self.latest_normalized_od_at = current_utc_datetime()
self.latest_growth_rate_at = current_utc_datetime()
self.latest_od_at = current_utc_datetime()
Expand Down Expand Up @@ -335,9 +330,9 @@ def block_until_not_sleeping(self) -> bool:

def execute_io_action(
self,
waste_ml: float = 0,
media_ml: float = 0,
alt_media_ml: float = 0,
waste_ml: float = 0.0,
media_ml: float = 0.0,
alt_media_ml: float = 0.0,
**other_pumps_ml: float,
) -> SummableDict:
"""
Expand Down Expand Up @@ -521,31 +516,13 @@ def latest_od(self) -> dict[pt.PdChannel, float]:
########## Private & internal methods

def on_disconnected(self) -> None:
self._latest_settings_ended_at = current_utc_datetime()
self._send_details_to_mqtt()

with suppress(AttributeError):
self.run_thread.join(
timeout=10
) # thread has N seconds to end. If not, something is wrong, like a while loop in execute that isn't stopping.
if self.run_thread.is_alive():
self.logger.debug("run_thread still alive!")

def __setattr__(self, name: str, value: Any) -> None:
super(DosingAutomationJob, self).__setattr__(name, value)
if name in self.published_settings and name not in (
"state",
"alt_media_fraction",
"media_throughput",
"alt_media_throughput",
"latest_event",
"vial_volume",
):
self._latest_settings_ended_at = current_utc_datetime()
self._send_details_to_mqtt()
self._latest_settings_started_at = current_utc_datetime()
self._latest_settings_ended_at = None

def _set_growth_rate(self, message: pt.MQTTMessage) -> None:
if not message.payload:
return
Expand Down Expand Up @@ -573,36 +550,6 @@ def _set_ods(self, message: pt.MQTTMessage) -> None:
self._latest_od: dict[pt.PdChannel, float] = {c: payload.ods[c].od for c in payload.ods}
self.latest_od_at = payload.timestamp

def _send_details_to_mqtt(self) -> None:
self.publish(
f"pioreactor/{self.unit}/{self.experiment}/{self.job_name}/dosing_automation_settings",
encode(
structs.AutomationSettings(
pioreactor_unit=self.unit,
experiment=self.experiment,
started_at=self._latest_settings_started_at,
ended_at=self._latest_settings_ended_at,
automation_name=self.automation_name,
settings=encode(
{
attr: getattr(self, attr, None)
for attr in self.published_settings
if attr
not in (
"state",
"alt_media_fraction",
"media_throughput",
"vial_volume",
"alt_media_throughput",
"latest_event",
)
}
),
)
),
qos=QOS.EXACTLY_ONCE,
)

def _update_dosing_metrics(self, message: pt.MQTTMessage) -> None:
dosing_event = decode(message.payload, type=structs.DosingEvent)
self._update_alt_media_fraction(dosing_event)
Expand Down Expand Up @@ -684,15 +631,15 @@ def _init_volume_throughput(self) -> None:
"alt_media_throughput",
{
"datatype": "float",
"settable": True, # settable because in the future, the UI may "reset" these values to 0.
"settable": False,
"unit": "mL",
},
)
self.add_to_published_settings(
"media_throughput",
{
"datatype": "float",
"settable": True,
"settable": False,
"unit": "mL",
},
)
Expand Down
38 changes: 0 additions & 38 deletions pioreactor/background_jobs/led_automation.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import click
from msgspec.json import decode
from msgspec.json import encode

from pioreactor import exc
from pioreactor import structs
Expand All @@ -19,7 +18,6 @@
from pioreactor.automations import events
from pioreactor.automations.base import AutomationJob
from pioreactor.config import config
from pioreactor.pubsub import QOS
from pioreactor.utils import is_pio_job_running
from pioreactor.utils import whoami
from pioreactor.utils.timing import current_utc_datetime
Expand Down Expand Up @@ -49,15 +47,13 @@ class LEDAutomationJob(AutomationJob):

published_settings: dict[str, pt.PublishableSetting] = {
"duration": {"datatype": "float", "settable": True},
"automation_name": {"datatype": "string", "settable": False},
}

_latest_growth_rate: Optional[float] = None
_latest_normalized_od: Optional[float] = None
previous_normalized_od: Optional[float] = None
previous_growth_rate: Optional[float] = None

_latest_settings_ended_at: Optional[datetime] = None
_latest_run_at: Optional[datetime] = None

latest_event: Optional[events.AutomationEvent] = None
Expand All @@ -80,7 +76,6 @@ def __init__(
super(LEDAutomationJob, self).__init__(unit, experiment)

self.skip_first_run = skip_first_run
self._latest_settings_started_at: datetime = current_utc_datetime()
self.latest_normalized_od_at: datetime = current_utc_datetime()
self.latest_growth_rate_at: datetime = current_utc_datetime()
self.edited_channels: set[pt.LedChannel] = set()
Expand Down Expand Up @@ -237,9 +232,6 @@ def latest_normalized_od(self) -> float:
########## Private & internal methods

def on_disconnected(self) -> None:
self._latest_settings_ended_at = current_utc_datetime()
self._send_details_to_mqtt()

with suppress(AttributeError):
self.run_thread.join(
timeout=10
Expand All @@ -255,14 +247,6 @@ def on_disconnected(self) -> None:
source_of_event=f"{self.job_name}:{self.automation_name}",
)

def __setattr__(self, name, value) -> None:
super(LEDAutomationJob, self).__setattr__(name, value)
if name in self.published_settings and name not in ("state", "latest_event"):
self._latest_settings_ended_at = current_utc_datetime()
self._send_details_to_mqtt()
self._latest_settings_started_at = current_utc_datetime()
self._latest_settings_ended_at = None

def _set_growth_rate(self, message: pt.MQTTMessage) -> None:
if not message.payload:
return
Expand All @@ -279,28 +263,6 @@ def _set_OD(self, message: pt.MQTTMessage) -> None:
self._latest_normalized_od = payload.od_filtered
self.latest_normalized_od_at = payload.timestamp

def _send_details_to_mqtt(self) -> None:
self.publish(
f"pioreactor/{self.unit}/{self.experiment}/{self.job_name}/led_automation_settings",
encode(
structs.AutomationSettings(
pioreactor_unit=self.unit,
experiment=self.experiment,
started_at=self._latest_settings_started_at,
ended_at=self._latest_settings_ended_at,
automation_name=self.automation_name,
settings=encode(
{
attr: getattr(self, attr, None)
for attr in self.published_settings
if attr not in ("state", "latest_event")
}
),
)
),
qos=QOS.EXACTLY_ONCE,
)

def start_passive_listeners(self) -> None:
self.subscribe_and_callback(
self._set_OD,
Expand Down
Loading

0 comments on commit dd2b53a

Please sign in to comment.