From 2241fd3a6f3123017253a9ad94de488c2767c34f Mon Sep 17 00:00:00 2001 From: CamDavidsonPilon Date: Mon, 12 Aug 2024 21:43:06 -0400 Subject: [PATCH] remove psutil; adding inputs to experiment profiles --- CHANGELOG.md | 3 +- .../actions/leader/experiment_profile.py | 41 ++++++++++++++----- pioreactor/background_jobs/monitor.py | 33 ++++----------- pioreactor/background_jobs/stirring.py | 2 +- pioreactor/cluster_management/__init__.py | 20 ++++----- pioreactor/experiment_profiles/parser.py | 2 + .../experiment_profiles/profile_struct.py | 1 + pioreactor/pubsub.py | 27 +++++------- .../tests/test_experiment_profile_structs.py | 5 +++ pioreactor/tests/test_parser.py | 6 +++ pioreactor/utils/__init__.py | 6 +-- pioreactor/utils/networking.py | 21 ++-------- requirements/requirements.txt | 1 - setup.py | 1 - 14 files changed, 84 insertions(+), 85 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d4ae9c9f..fd48b2fb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,10 +4,11 @@ - more resilience to "UI state" diverging from "bioreactor state". Often, this occurred when two jobs stared almost immediately (often a networking issue), and the last job would halt since it couldn't get the required resources, however any MQTT data would be overwritten by the last job. Now, multiple places in the request pipeline will reduce duplication and prevent two jobs from starting too close to each other. - Improved stirring clean up when stopped in quick succession after starting. - - if a network isn't found, the `monitor` job will not stall, but warning an continue. + - if a network isn't found, the `monitor` job will not stall, but warn and continue. #### Breaking changes + - removed `psutil` package from new images. We replaced its functionality with built-in routines. - in config.ini, the section `od_config` renamed to `od_reading.config`, and `stirring` is `stirring.config`. When you update, a script will run to automatically update these names in your config.inis. ### 24.7.18 diff --git a/pioreactor/actions/leader/experiment_profile.py b/pioreactor/actions/leader/experiment_profile.py index 7f62eb8f..af10ef1b 100644 --- a/pioreactor/actions/leader/experiment_profile.py +++ b/pioreactor/actions/leader/experiment_profile.py @@ -6,6 +6,7 @@ from collections import defaultdict from pathlib import Path from sched import scheduler +from typing import Any from typing import Callable from typing import Optional @@ -29,6 +30,7 @@ from pioreactor.whoami import is_testing_env bool_expression = str | bool +Env = dict[str, Any] def wrap_in_try_except(func, logger: CustomLogger) -> Callable: @@ -169,6 +171,7 @@ def get_simple_priority(action: struct.Action): def wrapped_execute_action( unit: str, experiment: str, + global_env: Env, job_name: str, logger: CustomLogger, schedule: scheduler, @@ -188,7 +191,7 @@ def wrapped_execute_action( ) job_name = job_name.replace("control", "automation") - env = {"unit": unit, "experiment": experiment, "job_name": job_name} + env = global_env | {"unit": unit, "experiment": experiment, "job_name": job_name} match action: case struct.Start(_, if_, options, args): @@ -291,6 +294,7 @@ def combined_function() -> None: def common_wrapped_execute_action( experiment: str, job_name: str, + global_env: Env, logger: CustomLogger, schedule: scheduler, elapsed_seconds_func: Callable[[], float], @@ -302,7 +306,16 @@ def common_wrapped_execute_action( for worker in get_active_workers_in_experiment(experiment): actions_to_execute.append( wrapped_execute_action( - worker, experiment, job_name, logger, schedule, elapsed_seconds_func, client, action, dry_run + worker, + experiment, + global_env, + job_name, + logger, + schedule, + elapsed_seconds_func, + client, + action, + dry_run, ) ) @@ -329,7 +342,7 @@ def _callable() -> None: if (get_assigned_experiment_name(unit) != experiment) and not is_testing_env(): return - env["hours_elapsed"] = seconds_to_hours(elapsed_seconds_func()) + env = env | {"hours_elapsed": seconds_to_hours(elapsed_seconds_func())} if (if_ is None) or evaluate_bool_expression(if_, env): try: @@ -344,6 +357,7 @@ def _callable() -> None: action=wrapped_execute_action( unit, experiment, + env, job_name, logger, schedule, @@ -362,6 +376,7 @@ def _callable() -> None: action=wrapped_execute_action( unit, experiment, + env, job_name, logger, schedule, @@ -400,7 +415,7 @@ def _callable() -> None: if get_assigned_experiment_name(unit) != experiment: return - env["hours_elapsed"] = seconds_to_hours(elapsed_seconds_func()) + env = env | {"hours_elapsed": seconds_to_hours(elapsed_seconds_func())} if ((if_ is None) or evaluate_bool_expression(if_, env)) and ( ((while_ is None) or evaluate_bool_expression(while_, env)) @@ -419,6 +434,7 @@ def _callable() -> None: action=wrapped_execute_action( unit, experiment, + env, job_name, logger, schedule, @@ -442,6 +458,7 @@ def _callable() -> None: action=wrapped_execute_action( unit, experiment, + env, job_name, logger, schedule, @@ -479,7 +496,7 @@ def _callable() -> None: if get_assigned_experiment_name(unit) != experiment: return - env["hours_elapsed"] = seconds_to_hours(elapsed_seconds_func()) + env = env | {"hours_elapsed": seconds_to_hours(elapsed_seconds_func())} if (if_ is None) or evaluate_bool_expression(if_, env): level = options.level.lower() @@ -507,7 +524,7 @@ def _callable() -> None: # first check if the Pioreactor is still part of the experiment. if get_assigned_experiment_name(unit) != experiment: return - env["hours_elapsed"] = seconds_to_hours(elapsed_seconds_func()) + env = env | {"hours_elapsed": seconds_to_hours(elapsed_seconds_func())} if (if_ is None) or evaluate_bool_expression(if_, env): if dry_run: @@ -544,7 +561,7 @@ def _callable() -> None: if get_assigned_experiment_name(unit) != experiment: return - env["hours_elapsed"] = seconds_to_hours(elapsed_seconds_func()) + env = env | {"hours_elapsed": seconds_to_hours(elapsed_seconds_func())} if (if_ is None) or evaluate_bool_expression(if_, env): if dry_run: @@ -573,7 +590,7 @@ def _callable() -> None: if get_assigned_experiment_name(unit) != experiment: return - env["hours_elapsed"] = seconds_to_hours(elapsed_seconds_func()) + env = env | {"hours_elapsed": seconds_to_hours(elapsed_seconds_func())} if (if_ is None) or evaluate_bool_expression(if_, env): if dry_run: @@ -602,7 +619,7 @@ def _callable() -> None: if get_assigned_experiment_name(unit) != experiment: return - env["hours_elapsed"] = seconds_to_hours(elapsed_seconds_func()) + env = env | {"hours_elapsed": seconds_to_hours(elapsed_seconds_func())} if (if_ is None) or evaluate_bool_expression(if_, env): if dry_run: @@ -632,7 +649,7 @@ def _callable() -> None: if get_assigned_experiment_name(unit) != experiment: return - env["hours_elapsed"] = seconds_to_hours(elapsed_seconds_func()) + env = env | {"hours_elapsed": seconds_to_hours(elapsed_seconds_func())} if (if_ is None) or evaluate_bool_expression(if_, env): if dry_run: @@ -787,6 +804,8 @@ def execute_experiment_profile(profile_filename: str, experiment: str, dry_run: logger.error(e) raise e + global_env = profile.inputs + sched = scheduler() with catchtime() as elapsed_seconds_func: @@ -799,6 +818,7 @@ def execute_experiment_profile(profile_filename: str, experiment: str, dry_run: action=common_wrapped_execute_action( experiment, job_name, + global_env, logger, sched, elapsed_seconds_func, @@ -823,6 +843,7 @@ def execute_experiment_profile(profile_filename: str, experiment: str, dry_run: action=wrapped_execute_action( unit_, experiment, + global_env, job_name, logger, sched, diff --git a/pioreactor/background_jobs/monitor.py b/pioreactor/background_jobs/monitor.py index ebf60a91..dbe0b79e 100644 --- a/pioreactor/background_jobs/monitor.py +++ b/pioreactor/background_jobs/monitor.py @@ -559,9 +559,14 @@ def check_for_power_problems(self) -> None: self.logger.debug("Power status okay.") def check_and_publish_self_statistics(self) -> None: - import psutil # type: ignore + import os + + # Disk usage percentage + statvfs = os.statvfs("/") + total_disk_space = statvfs.f_frsize * statvfs.f_blocks + available_disk_space = statvfs.f_frsize * statvfs.f_bavail + disk_usage_percent = round((1 - available_disk_space / total_disk_space) * 100) - disk_usage_percent = round(psutil.disk_usage("/").percent) if disk_usage_percent <= 80: self.logger.debug(f"Disk space at {disk_usage_percent}%.") else: @@ -569,24 +574,6 @@ def check_and_publish_self_statistics(self) -> None: self.logger.warning(f"Disk space at {disk_usage_percent}%.") self.flicker_led_with_error_code(error_codes.DISK_IS_ALMOST_FULL) - cpu_usage_percent = round( - (psutil.cpu_percent() + psutil.cpu_percent() + psutil.cpu_percent()) / 3 - ) # this is a noisy process, and we average it over a small window. - if cpu_usage_percent <= 85: - self.logger.debug(f"CPU usage at {cpu_usage_percent}%.") - else: - # TODO: add documentation - self.logger.warning(f"CPU usage at {cpu_usage_percent}%.") - - memory_usage_percent = 100 - round( - 100 * psutil.virtual_memory().available / psutil.virtual_memory().total - ) - if memory_usage_percent <= 75: - self.logger.debug(f"Memory usage at {memory_usage_percent}%.") - else: - # TODO: add documentation - self.logger.warning(f"Memory usage at {memory_usage_percent}%.") - cpu_temperature_celcius = round(utils.get_cpu_temperature()) if cpu_temperature_celcius <= 70: self.logger.debug(f"CPU temperature at {cpu_temperature_celcius} ℃.") @@ -597,8 +584,6 @@ def check_and_publish_self_statistics(self) -> None: self.computer_statistics = { "disk_usage_percent": disk_usage_percent, - "cpu_usage_percent": cpu_usage_percent, - "memory_usage_percent": memory_usage_percent, "cpu_temperature_celcius": cpu_temperature_celcius, "timestamp": current_utc_timestamp(), } @@ -724,8 +709,8 @@ def _run_job_on_machine(self, topic: str, raw_payload: MQTTMessagePayload) -> No options.pop("job_source", "") # techdebt, led_intensity doesn't use job_source Thread( target=utils.boolean_retry, - args=(led_intensity, (state,), options), - kwargs={"sleep_for": 0.4, "retries": 5}, + args=(led_intensity,), + kwargs={"sleep_for": 0.4, "retries": 5, "args": (state,), "kwargs": options}, ).start() elif job_name in { diff --git a/pioreactor/background_jobs/stirring.py b/pioreactor/background_jobs/stirring.py index 4ddd6aec..a609feb1 100644 --- a/pioreactor/background_jobs/stirring.py +++ b/pioreactor/background_jobs/stirring.py @@ -414,7 +414,7 @@ def set_target_rpm(self, value: float) -> None: # probably use_rpm=0 is in config.ini raise ValueError("Can't set target RPM when no RPM measurement is being made") - self.target_rpm = clamp(0.0, value, 5_000.0) + self.target_rpm = clamp(0.0, float(value), 5_000.0) self.set_duty_cycle(self.rpm_to_dc_lookup(self.target_rpm)) self.pid.set_setpoint(self.target_rpm) diff --git a/pioreactor/cluster_management/__init__.py b/pioreactor/cluster_management/__init__.py index 98404bed..36f6d857 100644 --- a/pioreactor/cluster_management/__init__.py +++ b/pioreactor/cluster_management/__init__.py @@ -19,6 +19,7 @@ from pioreactor.pubsub import delete_from_leader from pioreactor.pubsub import get_from_leader from pioreactor.pubsub import put_into_leader +from pioreactor.pubsub import subscribe from pioreactor.utils import networking from pioreactor.utils.timing import catchtime @@ -110,13 +111,13 @@ def add_worker(hostname: str, password: str, version: str, model: str) -> None: r.raise_for_status() except HTTPErrorStatus: if r.status_code >= 500: - click.echo("Server error. Could not complete.") + logger.error("Server error. Could not complete. See UI logs in /var/log/pioreactorui.log") else: - logger.error("Did not add worker to backend") - raise HTTPException("Did not add worker to backend") + logger.error(f"Did not add worker {hostname} to backend.") + raise HTTPException(f"Did not add worker {hostname} to backend.") except HTTPException: - logger.error("Could not connect to leader's webserver") - raise HTTPException("Could not connect to leader's webserver") + logger.error(f"Not able to connect to leader's backend at {leader_address}.") + raise HTTPException(f"Not able to connect to leader's backend at {leader_address}.") logger.notice(f"New pioreactor {hostname} successfully added to cluster.") # type: ignore @@ -129,12 +130,12 @@ def remove_worker(hostname: str) -> None: r.raise_for_status() except HTTPErrorStatus: if r.status_code >= 500: - click.echo("Server error. Could not complete.") + click.echo("Server error. Could not complete. See UI logs in /var/log/pioreactorui.log") else: click.echo(f"Worker {hostname} not present to be removed. Check hostname.") click.Abort() except HTTPException: - click.echo("Not able to connect to leader's backend.") + click.echo(f"Not able to connect to leader's backend at {leader_address}.") click.Abort() else: click.echo(f"Removed {hostname} from cluster.") # this needs to shutdown the worker too??? @@ -222,7 +223,6 @@ def cluster_status() -> None: Note that this only looks at the current cluster as defined in config.ini. """ import socket - from pioreactor import pubsub def get_metadata(hostname): # get ip @@ -235,7 +235,7 @@ def get_metadata(hostname): ip = "unknown" # get state - result = pubsub.subscribe( + result = subscribe( f"pioreactor/{hostname}/{whoami.UNIVERSAL_EXPERIMENT}/monitor/$state", timeout=1, name="CLI", @@ -246,7 +246,7 @@ def get_metadata(hostname): state = "unknown" # get version - result = pubsub.subscribe( + result = subscribe( f"pioreactor/{hostname}/{whoami.UNIVERSAL_EXPERIMENT}/monitor/versions", timeout=1, name="CLI", diff --git a/pioreactor/experiment_profiles/parser.py b/pioreactor/experiment_profiles/parser.py index dd913d9c..324bac4a 100644 --- a/pioreactor/experiment_profiles/parser.py +++ b/pioreactor/experiment_profiles/parser.py @@ -196,6 +196,8 @@ def expr(self, p): return True elif p.NAME.lower() == "false": return False + elif p.NAME in self.ENV: + return self.ENV[p.NAME] else: return p.NAME diff --git a/pioreactor/experiment_profiles/profile_struct.py b/pioreactor/experiment_profiles/profile_struct.py index 30f9acdc..ff162958 100644 --- a/pioreactor/experiment_profiles/profile_struct.py +++ b/pioreactor/experiment_profiles/profile_struct.py @@ -125,3 +125,4 @@ class Profile(Struct, forbid_unknown_fields=True): default_factory=CommonBlock ) # later this might expand to include other fields pioreactors: dict[PioreactorUnitName, PioreactorSpecificBlock] = {} + inputs: dict[str, t.Any] = {} diff --git a/pioreactor/pubsub.py b/pioreactor/pubsub.py index 70bb4dba..b54121df 100644 --- a/pioreactor/pubsub.py +++ b/pioreactor/pubsub.py @@ -350,13 +350,16 @@ def __exit__(self, *args): self.client.disconnect() -def get_from_leader(endpoint: str, **kwargs) -> mureq.Response: - assert endpoint.startswith("/api/") or endpoint.startswith("api/") - +def create_leader_webserver_path(endpoint): port = config.getint("ui", "port", fallback=80) proto = config.get("ui", "proto", fallback="http") + return f"{proto}://{leader_address}:{port}/{endpoint}" + + +def get_from_leader(endpoint: str, **kwargs) -> mureq.Response: + assert endpoint.startswith("/api/") or endpoint.startswith("api/") endpoint = endpoint.removeprefix("/") - return mureq.get(f"{proto}://{leader_address}:{port}/{endpoint}", **kwargs) + return mureq.get(create_leader_webserver_path(endpoint), **kwargs) def put_into_leader( @@ -364,10 +367,8 @@ def put_into_leader( ) -> mureq.Response: assert endpoint.startswith("/api/") or endpoint.startswith("api/") - port = config.getint("ui", "port", fallback=80) - proto = config.get("ui", "proto", fallback="http") endpoint = endpoint.removeprefix("/") - return mureq.put(f"{proto}://{leader_address}:{port}/{endpoint}", body=body, json=json, **kwargs) + return mureq.put(create_leader_webserver_path(endpoint), body=body, json=json, **kwargs) def patch_into_leader( @@ -375,25 +376,19 @@ def patch_into_leader( ) -> mureq.Response: assert endpoint.startswith("/api/") or endpoint.startswith("api/") - port = config.getint("ui", "port", fallback=80) - proto = config.get("ui", "proto", fallback="http") endpoint = endpoint.removeprefix("/") - return mureq.patch(f"{proto}://{leader_address}:{port}/{endpoint}", body=body, json=json, **kwargs) + return mureq.patch(create_leader_webserver_path(endpoint), body=body, json=json, **kwargs) def post_into_leader( endpoint: str, body: bytes | None = None, json: dict | Struct | None = None, **kwargs ) -> mureq.Response: assert endpoint.startswith("/api/") or endpoint.startswith("api/") - port = config.getint("ui", "port", fallback=80) - proto = config.get("ui", "proto", fallback="http") endpoint = endpoint.removeprefix("/") - return mureq.post(f"{proto}://{leader_address}:{port}/{endpoint}", body=body, json=json, **kwargs) + return mureq.post(create_leader_webserver_path(endpoint), body=body, json=json, **kwargs) def delete_from_leader(endpoint: str, **kwargs) -> mureq.Response: assert endpoint.startswith("/api/") or endpoint.startswith("api/") - port = config.getint("ui", "port", fallback=80) - proto = config.get("ui", "proto", fallback="http") endpoint = endpoint.removeprefix("/") - return mureq.delete(f"{proto}://{leader_address}:{port}/{endpoint}", **kwargs) + return mureq.delete(create_leader_webserver_path(endpoint), **kwargs) diff --git a/pioreactor/tests/test_experiment_profile_structs.py b/pioreactor/tests/test_experiment_profile_structs.py index 3e85fbc7..3645681e 100644 --- a/pioreactor/tests/test_experiment_profile_structs.py +++ b/pioreactor/tests/test_experiment_profile_structs.py @@ -210,11 +210,16 @@ def test_complex3() -> None: author: Alex Doe description: Very complex experiment with multiple jobs and bioreactors, different jobs on different bioreactors +inputs: + dummy: 1 + dummy_truth: 2 + common: jobs: stirring: actions: - type: start + if: ${{dummy > 0}} hours_elapsed: 0.0 options: target_rpm: 200.0 diff --git a/pioreactor/tests/test_parser.py b/pioreactor/tests/test_parser.py index fb8ad3b3..85b137b3 100644 --- a/pioreactor/tests/test_parser.py +++ b/pioreactor/tests/test_parser.py @@ -163,6 +163,12 @@ def test_env_and_functions(): parse_profile_expression("unit()", env={}) +def test_env(): + assert parse_profile_expression("rpm + 5.0", env={"rpm": 100}) == 105.0 + assert parse_profile_expression("rpm_start * other", env={"rpm_start": 10, "other": 6.6}) == 10 * 6.6 + assert parse_profile_expression("b", env={"b": True}) + + def test_mqtt_fetches_with_calculations(): experiment = "_testing_experiment" publish( diff --git a/pioreactor/utils/__init__.py b/pioreactor/utils/__init__.py index 2056cbf3..12e0a098 100644 --- a/pioreactor/utils/__init__.py +++ b/pioreactor/utils/__init__.py @@ -420,17 +420,17 @@ def __getitem__(self, key: str) -> float: def boolean_retry( func: Callable[..., bool], - f_args: tuple = tuple(), - f_kwargs: dict = dict(), retries: int = 3, sleep_for: float = 0.25, + args: tuple = (), + kwargs: dict = {}, ) -> bool: """ Retries a function upon encountering an False return until it succeeds or the maximum number of retries is exhausted. """ for _ in range(retries): - res = func(*f_args, **f_kwargs) + res = func(*args, **kwargs) if res: return res time.sleep(sleep_for) diff --git a/pioreactor/utils/networking.py b/pioreactor/utils/networking.py index 5bd78253..bc0f881d 100644 --- a/pioreactor/utils/networking.py +++ b/pioreactor/utils/networking.py @@ -2,6 +2,7 @@ from __future__ import annotations import os +import subprocess from typing import Generator from typing import Optional @@ -47,7 +48,6 @@ def is_reachable(address: str) -> bool: Can we ping the computer at `address`? """ # TODO: why not use sh.ping? Ex: ping("leader7.local", "-c1", "-W50") - import subprocess std_out_from_ping = subprocess.Popen( ["ping", "-c1", "-W50", address], @@ -63,23 +63,8 @@ def is_reachable(address: str) -> bool: def get_ip() -> Optional[str]: # returns all ipv4s as comma-separated string - from psutil import net_if_addrs - - ipv4_addresses = [] - - interfaces = net_if_addrs() - - for interface in interfaces: - if interface == "lo": - continue - - try: - ipv4_addresses.extend( - [addr.address for addr in interfaces[interface] if addr.family == 2] - ) # AddressFamily.AF_INET == 2 - except Exception: - continue - + result = subprocess.run(["hostname", "-I"], stdout=subprocess.PIPE, text=True) + ipv4_addresses = result.stdout.strip().split() if ipv4_addresses: return ",".join(ipv4_addresses) else: diff --git a/requirements/requirements.txt b/requirements/requirements.txt index 9bac789e..9d6d7cae 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -1,6 +1,5 @@ click==8.1.7 paho-mqtt==2.1.0 -psutil==5.9.5 sh==2.0.6 JSON-log-formatter==0.5.1 colorlog==6.7.0 diff --git a/setup.py b/setup.py index da835f53..0f4f67e4 100644 --- a/setup.py +++ b/setup.py @@ -10,7 +10,6 @@ CORE_REQUIREMENTS = [ "click==8.1.7", "paho-mqtt==2.1.0", - "psutil==5.9.5", "sh==2.0.6", "JSON-log-formatter==0.5.1", "colorlog==6.7.0",