Skip to content

Commit

Permalink
remove psutil; adding inputs to experiment profiles
Browse files Browse the repository at this point in the history
  • Loading branch information
CamDavidsonPilon committed Aug 13, 2024
1 parent 2573768 commit 2241fd3
Show file tree
Hide file tree
Showing 14 changed files with 84 additions and 85 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@

- more resilience to "UI state" diverging from "bioreactor state". Often, this occurred when two jobs stared almost immediately (often a networking issue), and the last job would halt since it couldn't get the required resources, however any MQTT data would be overwritten by the last job. Now, multiple places in the request pipeline will reduce duplication and prevent two jobs from starting too close to each other.
- Improved stirring clean up when stopped in quick succession after starting.
- if a network isn't found, the `monitor` job will not stall, but warning an continue.
- if a network isn't found, the `monitor` job will not stall, but warn and continue.

#### Breaking changes

- removed `psutil` package from new images. We replaced its functionality with built-in routines.
- in config.ini, the section `od_config` renamed to `od_reading.config`, and `stirring` is `stirring.config`. When you update, a script will run to automatically update these names in your config.inis.

### 24.7.18
Expand Down
41 changes: 31 additions & 10 deletions pioreactor/actions/leader/experiment_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from collections import defaultdict
from pathlib import Path
from sched import scheduler
from typing import Any
from typing import Callable
from typing import Optional

Expand All @@ -29,6 +30,7 @@
from pioreactor.whoami import is_testing_env

bool_expression = str | bool
Env = dict[str, Any]


def wrap_in_try_except(func, logger: CustomLogger) -> Callable:
Expand Down Expand Up @@ -169,6 +171,7 @@ def get_simple_priority(action: struct.Action):
def wrapped_execute_action(
unit: str,
experiment: str,
global_env: Env,
job_name: str,
logger: CustomLogger,
schedule: scheduler,
Expand All @@ -188,7 +191,7 @@ def wrapped_execute_action(
)
job_name = job_name.replace("control", "automation")

env = {"unit": unit, "experiment": experiment, "job_name": job_name}
env = global_env | {"unit": unit, "experiment": experiment, "job_name": job_name}

match action:
case struct.Start(_, if_, options, args):
Expand Down Expand Up @@ -291,6 +294,7 @@ def combined_function() -> None:
def common_wrapped_execute_action(
experiment: str,
job_name: str,
global_env: Env,
logger: CustomLogger,
schedule: scheduler,
elapsed_seconds_func: Callable[[], float],
Expand All @@ -302,7 +306,16 @@ def common_wrapped_execute_action(
for worker in get_active_workers_in_experiment(experiment):
actions_to_execute.append(
wrapped_execute_action(
worker, experiment, job_name, logger, schedule, elapsed_seconds_func, client, action, dry_run
worker,
experiment,
global_env,
job_name,
logger,
schedule,
elapsed_seconds_func,
client,
action,
dry_run,
)
)

Expand All @@ -329,7 +342,7 @@ def _callable() -> None:
if (get_assigned_experiment_name(unit) != experiment) and not is_testing_env():
return

env["hours_elapsed"] = seconds_to_hours(elapsed_seconds_func())
env = env | {"hours_elapsed": seconds_to_hours(elapsed_seconds_func())}

if (if_ is None) or evaluate_bool_expression(if_, env):
try:
Expand All @@ -344,6 +357,7 @@ def _callable() -> None:
action=wrapped_execute_action(
unit,
experiment,
env,
job_name,
logger,
schedule,
Expand All @@ -362,6 +376,7 @@ def _callable() -> None:
action=wrapped_execute_action(
unit,
experiment,
env,
job_name,
logger,
schedule,
Expand Down Expand Up @@ -400,7 +415,7 @@ def _callable() -> None:
if get_assigned_experiment_name(unit) != experiment:
return

env["hours_elapsed"] = seconds_to_hours(elapsed_seconds_func())
env = env | {"hours_elapsed": seconds_to_hours(elapsed_seconds_func())}

if ((if_ is None) or evaluate_bool_expression(if_, env)) and (
((while_ is None) or evaluate_bool_expression(while_, env))
Expand All @@ -419,6 +434,7 @@ def _callable() -> None:
action=wrapped_execute_action(
unit,
experiment,
env,
job_name,
logger,
schedule,
Expand All @@ -442,6 +458,7 @@ def _callable() -> None:
action=wrapped_execute_action(
unit,
experiment,
env,
job_name,
logger,
schedule,
Expand Down Expand Up @@ -479,7 +496,7 @@ def _callable() -> None:
if get_assigned_experiment_name(unit) != experiment:
return

env["hours_elapsed"] = seconds_to_hours(elapsed_seconds_func())
env = env | {"hours_elapsed": seconds_to_hours(elapsed_seconds_func())}

if (if_ is None) or evaluate_bool_expression(if_, env):
level = options.level.lower()
Expand Down Expand Up @@ -507,7 +524,7 @@ def _callable() -> None:
# first check if the Pioreactor is still part of the experiment.
if get_assigned_experiment_name(unit) != experiment:
return
env["hours_elapsed"] = seconds_to_hours(elapsed_seconds_func())
env = env | {"hours_elapsed": seconds_to_hours(elapsed_seconds_func())}

if (if_ is None) or evaluate_bool_expression(if_, env):
if dry_run:
Expand Down Expand Up @@ -544,7 +561,7 @@ def _callable() -> None:
if get_assigned_experiment_name(unit) != experiment:
return

env["hours_elapsed"] = seconds_to_hours(elapsed_seconds_func())
env = env | {"hours_elapsed": seconds_to_hours(elapsed_seconds_func())}

if (if_ is None) or evaluate_bool_expression(if_, env):
if dry_run:
Expand Down Expand Up @@ -573,7 +590,7 @@ def _callable() -> None:
if get_assigned_experiment_name(unit) != experiment:
return

env["hours_elapsed"] = seconds_to_hours(elapsed_seconds_func())
env = env | {"hours_elapsed": seconds_to_hours(elapsed_seconds_func())}

if (if_ is None) or evaluate_bool_expression(if_, env):
if dry_run:
Expand Down Expand Up @@ -602,7 +619,7 @@ def _callable() -> None:
if get_assigned_experiment_name(unit) != experiment:
return

env["hours_elapsed"] = seconds_to_hours(elapsed_seconds_func())
env = env | {"hours_elapsed": seconds_to_hours(elapsed_seconds_func())}

if (if_ is None) or evaluate_bool_expression(if_, env):
if dry_run:
Expand Down Expand Up @@ -632,7 +649,7 @@ def _callable() -> None:
if get_assigned_experiment_name(unit) != experiment:
return

env["hours_elapsed"] = seconds_to_hours(elapsed_seconds_func())
env = env | {"hours_elapsed": seconds_to_hours(elapsed_seconds_func())}

if (if_ is None) or evaluate_bool_expression(if_, env):
if dry_run:
Expand Down Expand Up @@ -787,6 +804,8 @@ def execute_experiment_profile(profile_filename: str, experiment: str, dry_run:
logger.error(e)
raise e

global_env = profile.inputs

sched = scheduler()

with catchtime() as elapsed_seconds_func:
Expand All @@ -799,6 +818,7 @@ def execute_experiment_profile(profile_filename: str, experiment: str, dry_run:
action=common_wrapped_execute_action(
experiment,
job_name,
global_env,
logger,
sched,
elapsed_seconds_func,
Expand All @@ -823,6 +843,7 @@ def execute_experiment_profile(profile_filename: str, experiment: str, dry_run:
action=wrapped_execute_action(
unit_,
experiment,
global_env,
job_name,
logger,
sched,
Expand Down
33 changes: 9 additions & 24 deletions pioreactor/background_jobs/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -559,34 +559,21 @@ def check_for_power_problems(self) -> None:
self.logger.debug("Power status okay.")

def check_and_publish_self_statistics(self) -> None:
import psutil # type: ignore
import os

# Disk usage percentage
statvfs = os.statvfs("/")
total_disk_space = statvfs.f_frsize * statvfs.f_blocks
available_disk_space = statvfs.f_frsize * statvfs.f_bavail
disk_usage_percent = round((1 - available_disk_space / total_disk_space) * 100)

disk_usage_percent = round(psutil.disk_usage("/").percent)
if disk_usage_percent <= 80:
self.logger.debug(f"Disk space at {disk_usage_percent}%.")
else:
# TODO: add documentation to clear disk space.
self.logger.warning(f"Disk space at {disk_usage_percent}%.")
self.flicker_led_with_error_code(error_codes.DISK_IS_ALMOST_FULL)

cpu_usage_percent = round(
(psutil.cpu_percent() + psutil.cpu_percent() + psutil.cpu_percent()) / 3
) # this is a noisy process, and we average it over a small window.
if cpu_usage_percent <= 85:
self.logger.debug(f"CPU usage at {cpu_usage_percent}%.")
else:
# TODO: add documentation
self.logger.warning(f"CPU usage at {cpu_usage_percent}%.")

memory_usage_percent = 100 - round(
100 * psutil.virtual_memory().available / psutil.virtual_memory().total
)
if memory_usage_percent <= 75:
self.logger.debug(f"Memory usage at {memory_usage_percent}%.")
else:
# TODO: add documentation
self.logger.warning(f"Memory usage at {memory_usage_percent}%.")

cpu_temperature_celcius = round(utils.get_cpu_temperature())
if cpu_temperature_celcius <= 70:
self.logger.debug(f"CPU temperature at {cpu_temperature_celcius} ℃.")
Expand All @@ -597,8 +584,6 @@ def check_and_publish_self_statistics(self) -> None:

self.computer_statistics = {
"disk_usage_percent": disk_usage_percent,
"cpu_usage_percent": cpu_usage_percent,
"memory_usage_percent": memory_usage_percent,
"cpu_temperature_celcius": cpu_temperature_celcius,
"timestamp": current_utc_timestamp(),
}
Expand Down Expand Up @@ -724,8 +709,8 @@ def _run_job_on_machine(self, topic: str, raw_payload: MQTTMessagePayload) -> No
options.pop("job_source", "") # techdebt, led_intensity doesn't use job_source
Thread(
target=utils.boolean_retry,
args=(led_intensity, (state,), options),
kwargs={"sleep_for": 0.4, "retries": 5},
args=(led_intensity,),
kwargs={"sleep_for": 0.4, "retries": 5, "args": (state,), "kwargs": options},
).start()

elif job_name in {
Expand Down
2 changes: 1 addition & 1 deletion pioreactor/background_jobs/stirring.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ def set_target_rpm(self, value: float) -> None:
# probably use_rpm=0 is in config.ini
raise ValueError("Can't set target RPM when no RPM measurement is being made")

self.target_rpm = clamp(0.0, value, 5_000.0)
self.target_rpm = clamp(0.0, float(value), 5_000.0)
self.set_duty_cycle(self.rpm_to_dc_lookup(self.target_rpm))
self.pid.set_setpoint(self.target_rpm)

Expand Down
20 changes: 10 additions & 10 deletions pioreactor/cluster_management/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from pioreactor.pubsub import delete_from_leader
from pioreactor.pubsub import get_from_leader
from pioreactor.pubsub import put_into_leader
from pioreactor.pubsub import subscribe
from pioreactor.utils import networking
from pioreactor.utils.timing import catchtime

Expand Down Expand Up @@ -110,13 +111,13 @@ def add_worker(hostname: str, password: str, version: str, model: str) -> None:
r.raise_for_status()
except HTTPErrorStatus:
if r.status_code >= 500:
click.echo("Server error. Could not complete.")
logger.error("Server error. Could not complete. See UI logs in /var/log/pioreactorui.log")
else:
logger.error("Did not add worker to backend")
raise HTTPException("Did not add worker to backend")
logger.error(f"Did not add worker {hostname} to backend.")
raise HTTPException(f"Did not add worker {hostname} to backend.")
except HTTPException:
logger.error("Could not connect to leader's webserver")
raise HTTPException("Could not connect to leader's webserver")
logger.error(f"Not able to connect to leader's backend at {leader_address}.")
raise HTTPException(f"Not able to connect to leader's backend at {leader_address}.")

logger.notice(f"New pioreactor {hostname} successfully added to cluster.") # type: ignore

Expand All @@ -129,12 +130,12 @@ def remove_worker(hostname: str) -> None:
r.raise_for_status()
except HTTPErrorStatus:
if r.status_code >= 500:
click.echo("Server error. Could not complete.")
click.echo("Server error. Could not complete. See UI logs in /var/log/pioreactorui.log")
else:
click.echo(f"Worker {hostname} not present to be removed. Check hostname.")
click.Abort()
except HTTPException:
click.echo("Not able to connect to leader's backend.")
click.echo(f"Not able to connect to leader's backend at {leader_address}.")
click.Abort()
else:
click.echo(f"Removed {hostname} from cluster.") # this needs to shutdown the worker too???
Expand Down Expand Up @@ -222,7 +223,6 @@ def cluster_status() -> None:
Note that this only looks at the current cluster as defined in config.ini.
"""
import socket
from pioreactor import pubsub

def get_metadata(hostname):
# get ip
Expand All @@ -235,7 +235,7 @@ def get_metadata(hostname):
ip = "unknown"

# get state
result = pubsub.subscribe(
result = subscribe(
f"pioreactor/{hostname}/{whoami.UNIVERSAL_EXPERIMENT}/monitor/$state",
timeout=1,
name="CLI",
Expand All @@ -246,7 +246,7 @@ def get_metadata(hostname):
state = "unknown"

# get version
result = pubsub.subscribe(
result = subscribe(
f"pioreactor/{hostname}/{whoami.UNIVERSAL_EXPERIMENT}/monitor/versions",
timeout=1,
name="CLI",
Expand Down
2 changes: 2 additions & 0 deletions pioreactor/experiment_profiles/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ def expr(self, p):
return True
elif p.NAME.lower() == "false":
return False
elif p.NAME in self.ENV:
return self.ENV[p.NAME]
else:
return p.NAME

Expand Down
1 change: 1 addition & 0 deletions pioreactor/experiment_profiles/profile_struct.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,3 +125,4 @@ class Profile(Struct, forbid_unknown_fields=True):
default_factory=CommonBlock
) # later this might expand to include other fields
pioreactors: dict[PioreactorUnitName, PioreactorSpecificBlock] = {}
inputs: dict[str, t.Any] = {}
Loading

0 comments on commit 2241fd3

Please sign in to comment.