Skip to content

Commit

Permalink
wip for dynamic log expressions, and more
Browse files Browse the repository at this point in the history
  • Loading branch information
CamDavidsonPilon committed Jul 5, 2024
1 parent 014f3ec commit 834ee8d
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 21 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
### Upcoming

- improvements to the real-time profile editor
- `hours_elapsed()` is a function in profile expressions
- `log` in experiment profiles now uses expressions.

### 24.7.3

#### Enhancements
Expand Down
68 changes: 60 additions & 8 deletions pioreactor/actions/leader/experiment_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import random
import time
from collections import defaultdict
from datetime import datetime
from pathlib import Path
from sched import scheduler
from typing import Callable
Expand All @@ -30,6 +31,16 @@
bool_expression = str | bool


class CustomScheduler(scheduler):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.start_time = datetime.now()

def get_elapsed_hours(self) -> float:
elapsed_time = datetime.now() - self.start_time
return elapsed_time.total_seconds() / 3600


def wrap_in_try_except(func, logger: CustomLogger) -> Callable:
def inner_function(*args, **kwargs) -> None:
try:
Expand Down Expand Up @@ -73,6 +84,21 @@ def evaluate_options(options: dict, env: dict) -> dict:
return options_expressed


def evaluate_log_message(message: str, env: dict) -> str:
import re
from pioreactor.experiment_profiles.parser import parse_profile_expression

pattern = r"\${{(.*?)}}"

matches = re.findall(pattern, message)

modified_matches = [parse_profile_expression(match, env) for match in matches]

# Replace each ${{...}} in the original string with the modified match
result_string = re.sub(pattern, lambda m: str(modified_matches.pop(0)), message)
return result_string


def evaluate_bool_expression(bool_expression: bool_expression, env: dict) -> bool:
from pioreactor.experiment_profiles.parser import parse_profile_expression_to_bool

Expand Down Expand Up @@ -168,22 +194,26 @@ def wrapped_execute_action(

match action:
case struct.Start(_, if_, options, args):
return start_job(unit, experiment, client, job_name, options, args, dry_run, if_, env, logger)
return start_job(
unit, experiment, client, job_name, options, args, dry_run, if_, env, logger, schedule
)

case struct.Pause(_, if_):
return pause_job(unit, experiment, client, job_name, dry_run, if_, env, logger)
return pause_job(unit, experiment, client, job_name, dry_run, if_, env, logger, schedule)

case struct.Resume(_, if_):
return resume_job(unit, experiment, client, job_name, dry_run, if_, env, logger)
return resume_job(unit, experiment, client, job_name, dry_run, if_, env, logger, schedule)

case struct.Stop(_, if_):
return stop_job(unit, experiment, client, job_name, dry_run, if_, env, logger)
return stop_job(unit, experiment, client, job_name, dry_run, if_, env, logger, schedule)

case struct.Update(_, if_, options):
return update_job(unit, experiment, client, job_name, options, dry_run, if_, env, logger)
return update_job(
unit, experiment, client, job_name, options, dry_run, if_, env, logger, schedule
)

case struct.Log(_, options, if_):
return log(unit, experiment, client, job_name, options, dry_run, if_, env, logger)
return log(unit, experiment, client, job_name, options, dry_run, if_, env, logger, schedule)

case struct.Repeat(_, if_, repeat_every_hours, while_, max_hours, actions):
return repeat(
Expand Down Expand Up @@ -268,6 +298,8 @@ def _callable() -> None:
if (get_assigned_experiment_name(unit) != experiment) and not is_testing_env():
return

env["hours_elapsed"] = schedule.get_elapsed_hours()

if (if_ is None) or evaluate_bool_expression(if_, env):
try:
condition_met = evaluate_bool_expression(condition, env)
Expand Down Expand Up @@ -320,6 +352,8 @@ def _callable() -> None:
if get_assigned_experiment_name(unit) != experiment:
return

env["hours_elapsed"] = schedule.get_elapsed_hours()

if ((if_ is None) or evaluate_bool_expression(if_, env)) and (
((while_ is None) or evaluate_bool_expression(while_, env))
):
Expand Down Expand Up @@ -374,14 +408,18 @@ def log(
if_: Optional[str | bool],
env: dict,
logger: CustomLogger,
schedule: scheduler,
) -> Callable[..., None]:
def _callable() -> None:
# first check if the Pioreactor is still part of the experiment.
if get_assigned_experiment_name(unit) != experiment:
return

env["hours_elapsed"] = schedule.get_elapsed_hours()

if (if_ is None) or evaluate_bool_expression(if_, env):
level = options.level.lower()
getattr(logger, level)(options.message.format(unit=unit, job=job_name, experiment=experiment))
getattr(logger, level)(evaluate_log_message(options.message, env))
else:
logger.debug(f"Action's `if` condition, `{if_}`, evaluated False. Skipping action.")

Expand All @@ -399,11 +437,13 @@ def start_job(
if_: Optional[str | bool],
env: dict,
logger: CustomLogger,
schedule: scheduler,
) -> Callable[..., None]:
def _callable() -> None:
# first check if the Pioreactor is still part of the experiment.
if get_assigned_experiment_name(unit) != experiment:
return
env["hours_elapsed"] = schedule.get_elapsed_hours()

if (if_ is None) or evaluate_bool_expression(if_, env):
if dry_run:
Expand Down Expand Up @@ -433,12 +473,15 @@ def pause_job(
if_: Optional[str | bool],
env: dict,
logger: CustomLogger,
schedule: scheduler,
) -> Callable[..., None]:
def _callable() -> None:
# first check if the Pioreactor is still part of the experiment.
if get_assigned_experiment_name(unit) != experiment:
return

env["hours_elapsed"] = schedule.get_elapsed_hours()

if (if_ is None) or evaluate_bool_expression(if_, env):
if dry_run:
logger.info(f"Dry-run: Pausing {job_name} on {unit}.")
Expand All @@ -459,11 +502,15 @@ def resume_job(
if_: Optional[str | bool],
env: dict,
logger: CustomLogger,
schedule: scheduler,
) -> Callable[..., None]:
def _callable() -> None:
# first check if the Pioreactor is still part of the experiment.
if get_assigned_experiment_name(unit) != experiment:
return

env["hours_elapsed"] = schedule.get_elapsed_hours()

if (if_ is None) or evaluate_bool_expression(if_, env):
if dry_run:
logger.info(f"Dry-run: Resuming {job_name} on {unit}.")
Expand All @@ -484,6 +531,7 @@ def stop_job(
if_: Optional[str | bool],
env: dict,
logger: CustomLogger,
schedule: scheduler,
) -> Callable[..., None]:
def _callable() -> None:
# first check if the Pioreactor is still part of the experiment.
Expand All @@ -510,11 +558,15 @@ def update_job(
if_: Optional[str | bool],
env: dict,
logger: CustomLogger,
schedule: scheduler,
) -> Callable[..., None]:
def _callable() -> None:
# first check if the Pioreactor is still part of the experiment.
if get_assigned_experiment_name(unit) != experiment:
return

env["hours_elapsed"] = schedule.get_elapsed_hours()

if (if_ is None) or evaluate_bool_expression(if_, env):
if dry_run:
for setting, value in options.items():
Expand Down Expand Up @@ -695,7 +747,7 @@ def execute_experiment_profile(profile_filename: str, experiment: str, dry_run:
logger.error(e)
raise e

sched = scheduler()
sched = CustomScheduler()

# process common
for job_name, job in profile.common.jobs.items():
Expand Down
2 changes: 1 addition & 1 deletion pioreactor/background_jobs/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,7 @@ def run_job_on_machine(self, msg: MQTTMessage) -> None:

from pioreactor.actions.led_intensity import led_intensity, ALL_LED_CHANNELS

state = {ch: options.pop(ch) for ch in ALL_LED_CHANNELS if ch in options}
state = {ch: float(options.pop(ch)) for ch in ALL_LED_CHANNELS if ch in options}
options["pubsub_client"] = self.pub_client
options["unit"] = self.unit
options["experiment"] = experiment # techdebt
Expand Down
7 changes: 4 additions & 3 deletions pioreactor/experiment_profiles/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from pioreactor.whoami import is_active


def convert_string(input_str: str) -> bool | float | str:
def convert_string(input_str: str) -> int | bool | float | str:
# Try to convert to float
try:
return float(input_str)
Expand Down Expand Up @@ -179,10 +179,10 @@ def expr(self, p):
return random()
elif p.FUNCTION == "unit()":
return self.ENV["unit"]

elif p.FUNCTION == "hours_elapsed()":
return self.ENV["hours_elapsed"]
elif p.FUNCTION == "experiment()":
return self.ENV["experiment"]

elif p.FUNCTION == "job_name()":
return self.ENV["job_name"]
else:
Expand Down Expand Up @@ -221,6 +221,7 @@ def expr(self, p) -> bool | float | str:
raise NotActiveWorkerError(f"Worker {unit} is not active.")

result = subscribe(f"pioreactor/{unit}/{experiment}/{job}/{setting}", timeout=1)

if result:
# error handling here
try:
Expand Down
30 changes: 22 additions & 8 deletions pioreactor/tests/test_execute_experiment_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,20 +130,33 @@ def collection_actions(msg):
def test_execute_experiment_log_actions(mock__load_experiment_profile, active_workers_in_cluster) -> None:
experiment = "_testing_experiment"

action1 = Log(hours_elapsed=0 / 60 / 60, options=_LogOptions(message="test {unit}"))
action1 = Log(hours_elapsed=0 / 60 / 60, options=_LogOptions(message=r"test ${{unit()}}"))
action2 = Log(
hours_elapsed=2 / 60 / 60, options=_LogOptions(message="test {job} on {unit}", level="INFO")
hours_elapsed=2 / 60 / 60,
options=_LogOptions(message=r"test ${{job_name()}} on ${{unit()}}", level="INFO"),
)
action3 = Log(
hours_elapsed=4 / 60 / 60, options=_LogOptions(message="test experiment={experiment}", level="DEBUG")
hours_elapsed=4 / 60 / 60,
options=_LogOptions(message=r"test experiment=${{experiment()}}", level="DEBUG"),
)

unit = "unit1"
job_name = "job2"
publish(f"pioreactor/{unit}/{experiment}/{job_name}/target", 10.5, retain=True)

action4 = Log(
hours_elapsed=4 / 60 / 60,
options=_LogOptions(message=r"dynamic data looks like ${{unit1:job2:target}}", level="DEBUG"),
)

profile = Profile(
experiment_profile_name="test_profile",
plugins=[],
common=CommonBlock(jobs={"job1": Job(actions=[action1])}),
pioreactors={
"unit1": PioreactorSpecificBlock(jobs={"job2": Job(actions=[action2, action3])}, label="label1")
"unit1": PioreactorSpecificBlock(
jobs={"job2": Job(actions=[action2, action3, action4])}, label="label1"
)
},
metadata=Metadata(author="test_author"),
)
Expand All @@ -158,14 +171,15 @@ def test_execute_experiment_log_actions(mock__load_experiment_profile, active_wo
"DEBUG", "testing_unit", experiment
) as debug_bucket:
execute_experiment_profile("profile.yaml", experiment)
assert [log["message"] for log in notice_bucket[1:-1]] == [
assert [log["message"] for log in notice_bucket[1:]] == [
f"test {unit}" for unit in active_workers_in_cluster
]
assert [log["message"] for log in info_bucket] == [
assert [log["message"] for log in info_bucket[:1]] == [
"test job2 on unit1",
]
assert [log["message"] for log in debug_bucket[1:]] == [
assert [log["message"] for log in debug_bucket[1:3]] == [
f"test experiment={experiment}",
"dynamic data looks like 10.5",
]


Expand Down Expand Up @@ -591,7 +605,7 @@ def collection_actions(msg):


@patch("pioreactor.actions.leader.experiment_profile._load_experiment_profile")
def test_execute_experiment_profile_when_action(mock__load_experiment_profile) -> None:
def test_execute_experiment_profile_when_action_simple(mock__load_experiment_profile) -> None:
experiment = "_testing_experiment"
action = When(
hours_elapsed=0.0005,
Expand Down
2 changes: 1 addition & 1 deletion pioreactor/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
# Append ".dev0" if a dev version
# Append "rc0" if a rc version
# No zero padding!
__version__ = "24.7.3"
__version__ = "24.7.3.dev0"


def get_hardware_version() -> tuple[int, int] | tuple[int, int, str]:
Expand Down
1 change: 1 addition & 0 deletions requirements/requirements_worker.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
-r requirements.txt
numpy<2.0.0
adafruit-circuitpython-ads1x15==2.2.23
DAC43608==0.2.7
TMP1075==0.2.1
Expand Down

0 comments on commit 834ee8d

Please sign in to comment.