diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 22842b74..c5f21e99 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -38,6 +38,10 @@ jobs: with: version: '1.6' + - name: Install any required linux software + run: | + sudo apt-get install -y avahi avahi-utils + - name: Create dot_pioreactor folder run: | mkdir -p .pioreactor/storage diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index b225c045..fd43404c 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -10,6 +10,7 @@ repos: - id: mixed-line-ending - id: trailing-whitespace - id: check-added-large-files + args: ['--maxkb=3000'] - repo: https://github.com/ambv/black diff --git a/CHANGELOG.md b/CHANGELOG.md index 2a4b4ce2..de6a47e6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,12 @@ +### Upcoming + + - Workers now have a webserver on them. This is one of the largest architectural changes to Pioreactor, and lays the foundation for better plugin and calibration management, plus future features. + - APIs that initiate a background task either return with the result, or return a task id that be be looked up at `/api/task_status/`. + - previous actions that would involve SSHing from leader to a worker are replaced by web requests. + - We no longer recommend the Raspberry Pi Zero (the original Zero, not the Zero 2.) since supporting a web server + pioreactor functions is too much for a single core. + - Better MQTT re-connection logic. + + ### 24.8.22 #### Enhancements diff --git a/pioreactor/actions/leader/backup_database.py b/pioreactor/actions/leader/backup_database.py index 81a350db..a91c62da 100644 --- a/pioreactor/actions/leader/backup_database.py +++ b/pioreactor/actions/leader/backup_database.py @@ -5,11 +5,13 @@ from pioreactor.cluster_management import get_active_workers_in_inventory from pioreactor.config import config +from pioreactor.exc import RsyncError from pioreactor.logging import create_logger from pioreactor.pubsub import subscribe from pioreactor.utils import local_persistant_storage from pioreactor.utils import managed_lifecycle -from pioreactor.utils.networking import add_local +from pioreactor.utils.networking import resolve_to_address +from pioreactor.utils.networking import rsync from pioreactor.utils.timing import current_utc_timestamp from pioreactor.whoami import get_unit_name from pioreactor.whoami import UNIVERSAL_EXPERIMENT @@ -43,7 +45,6 @@ def backup_database(output_file: str, force: bool = False, backup_to_workers: in """ import sqlite3 - from sh import ErrorReturnCode, rsync # type: ignore unit = get_unit_name() experiment = UNIVERSAL_EXPERIMENT @@ -96,9 +97,9 @@ def backup_database(output_file: str, force: bool = False, backup_to_workers: in "--partial", "--inplace", output_file, - f"{add_local(backup_unit)}:{output_file}", + f"{resolve_to_address(backup_unit)}:{output_file}", ) - except ErrorReturnCode: + except RsyncError: logger.debug( f"Unable to backup database to {backup_unit}. Is it online?", exc_info=True, diff --git a/pioreactor/actions/leader/experiment_profile.py b/pioreactor/actions/leader/experiment_profile.py index 5af066cc..8f7f9ebc 100644 --- a/pioreactor/actions/leader/experiment_profile.py +++ b/pioreactor/actions/leader/experiment_profile.py @@ -11,7 +11,6 @@ from typing import Optional import click -from msgspec.json import encode from msgspec.yaml import decode from pioreactor.cluster_management import get_active_workers_in_experiment @@ -536,13 +535,11 @@ def _callable() -> None: logger.info(f"Dry-run: Starting {job_name} on {unit} with options {options} and args {args}.") else: patch_into_leader( - f"/api/workers/{unit}/experiment/{experiment}/jobs/{job_name}/run", - body=encode( - { - "options": evaluate_options(options, env) | {"job_source": "experiment_profile"}, - "args": args, - } - ), + f"/api/workers/{unit}/jobs/run/job_name/{job_name}/experiments/{experiment}", + json={ + "options": evaluate_options(options, env) | {"job_source": "experiment_profile"}, + "args": args, + }, ) else: logger.debug(f"Action's `if` condition, `{if_}`, evaluated False. Skipping action.") @@ -574,8 +571,8 @@ def _callable() -> None: logger.info(f"Dry-run: Pausing {job_name} on {unit}.") else: patch_into_leader( - f"/api/workers/{unit}/experiment/{experiment}/jobs/{job_name}/update", - body=encode({"settings": {"state": "sleeping"}}), + f"/api/workers/{unit}/jobs/update/job_name/{job_name}/experiments/{experiment}", + json={"settings": {"$state": "sleeping"}}, ) else: logger.debug(f"Action's `if` condition, `{if_}`, evaluated False. Skipping action.") @@ -607,8 +604,8 @@ def _callable() -> None: logger.info(f"Dry-run: Resuming {job_name} on {unit}.") else: patch_into_leader( - f"/api/workers/{unit}/experiment/{experiment}/jobs/{job_name}/update", - body=encode({"settings": {"state": "ready"}}), + f"/api/workers/{unit}/jobs/update/job_name/{job_name}/experiments/{experiment}", + json={"settings": {"$state": "ready"}}, ) else: logger.debug(f"Action's `if` condition, `{if_}`, evaluated False. Skipping action.") @@ -640,8 +637,8 @@ def _callable() -> None: logger.info(f"Dry-run: Stopping {job_name} on {unit}.") else: patch_into_leader( - f"/api/workers/{unit}/experiment/{experiment}/jobs/{job_name}/update", - body=encode({"settings": {"state": "disconnected"}}), + f"/api/workers/{unit}/jobs/update/job_name/{job_name}/experiments/{experiment}", + json={"settings": {"$state": "disconnected"}}, ) else: logger.debug(f"Action's `if` condition, `{if_}`, evaluated False. Skipping action.") @@ -677,8 +674,8 @@ def _callable() -> None: else: for setting, value in evaluate_options(options, env).items(): patch_into_leader( - f"/api/workers/{unit}/experiment/{experiment}/jobs/{job_name}/update", - body=encode({"settings": {setting: value}}), + f"/api/workers/{unit}/jobs/update/job_name/{job_name}/experiments/{experiment}", + json={"settings": {setting: value}}, ) else: logger.debug(f"Action's `if` condition, `{if_}`, evaluated False. Skipping action.") diff --git a/pioreactor/background_jobs/monitor.py b/pioreactor/background_jobs/monitor.py index 0cca02be..9f6822c4 100644 --- a/pioreactor/background_jobs/monitor.py +++ b/pioreactor/background_jobs/monitor.py @@ -29,7 +29,6 @@ from pioreactor.hardware import PCB_BUTTON_PIN as BUTTON_PIN from pioreactor.hardware import PCB_LED_PIN as LED_PIN from pioreactor.hardware import TEMP -from pioreactor.pubsub import get_from_leader from pioreactor.pubsub import QOS from pioreactor.structs import Voltage from pioreactor.types import MQTTMessage @@ -268,12 +267,12 @@ def self_checks(self) -> None: # report on CPU usage, memory, disk space self.check_and_publish_self_statistics() + sleep(0 if whoami.is_testing_env() else 5) # wait for other processes to catch up + self.check_for_webserver() if whoami.am_I_leader(): self.check_for_last_backup() - sleep(0 if whoami.is_testing_env() else 5) # wait for other processes to catch up self.check_for_correct_permissions() - self.check_for_webserver() self.check_for_required_jobs_running() try: @@ -393,19 +392,6 @@ def check_for_webserver(self) -> None: self.logger.debug(f"Error checking huey status: {e}", exc_info=True) self.logger.error(f"Error checking huey status: {e}") - attempt = 0 - retries = 5 - while attempt < retries: - attempt += 1 - res = get_from_leader("/api/experiments/latest") - if res.ok: - break - sleep(1.0) - else: - self.logger.debug(f"Error pinging UI: {res.status_code}") - self.logger.error(f"Error pinging UI: {res.status_code}") - self.flicker_led_with_error_code(error_codes.WEBSERVER_OFFLINE) - def check_for_required_jobs_running(self) -> None: if not all(utils.is_pio_job_running(["watchdog", "mqtt_to_db_streaming"])): self.logger.warning( diff --git a/pioreactor/cli/pio.py b/pioreactor/cli/pio.py index 99bf76d3..67f96ba3 100644 --- a/pioreactor/cli/pio.py +++ b/pioreactor/cli/pio.py @@ -24,7 +24,7 @@ from pioreactor.logging import create_logger from pioreactor.mureq import get from pioreactor.mureq import HTTPException -from pioreactor.pubsub import get_from_leader +from pioreactor.pubsub import get_from from pioreactor.utils import JobManager from pioreactor.utils import local_intermittent_storage from pioreactor.utils import local_persistant_storage @@ -172,7 +172,7 @@ def version(verbose: bool) -> None: from pioreactor.version import rpi_version_info from pioreactor.whoami import get_pioreactor_model_and_version - click.echo(f"Pioreactor software: {tuple_to_text(software_version_info)}") + click.echo(f"Pioreactor app: {tuple_to_text(software_version_info)}") click.echo(f"Pioreactor HAT: {tuple_to_text(hardware_version_info)}") click.echo(f"Pioreactor firmware: {tuple_to_text(get_firmware_version())}") click.echo(f"Model name: {get_pioreactor_model_and_version()}") @@ -180,15 +180,14 @@ def version(verbose: bool) -> None: click.echo(f"Operating system: {platform.platform()}") click.echo(f"Raspberry Pi: {rpi_version_info}") click.echo(f"Image version: {whoami.get_image_git_hash()}") - if whoami.am_I_leader(): - try: - result = get_from_leader("api/versions/ui") - result.raise_for_status() - ui_version = result.body.decode() - except Exception: - ui_version = "" - - click.echo(f"Pioreactor UI: {ui_version}") + try: + result = get_from("localhost", "/unit_api/versions/ui") + result.raise_for_status() + ui_version = result.body.decode() + except Exception: + ui_version = "" + + click.echo(f"Pioreactor UI: {ui_version}") else: click.echo(pioreactor.__version__) @@ -399,6 +398,7 @@ def update_app( (f"sudo bash {tmp_rls_dir}/pre_update.sh", 2), (f"sudo bash {tmp_rls_dir}/update.sh", 4), (f"sudo bash {tmp_rls_dir}/post_update.sh", 20), + (f"mv {tmp_rls_dir}/pioreactorui_*.tar.gz {tmp_dir}/pioreactorui_archive", 98), # move ui folder to be accessed by a `pio update ui` (f"rm -rf {tmp_rls_dir}", 99), ] ) @@ -407,7 +407,6 @@ def update_app( commands_and_priority.extend([ (f"sudo pip install --no-index --find-links={tmp_rls_dir}/wheels/ {tmp_rls_dir}/pioreactor-{version_installed}-py3-none-any.whl[leader,worker]", 3), (f'sudo sqlite3 {config.config["storage"]["database"]} < {tmp_rls_dir}/update.sql', 10), - (f"mv {tmp_rls_dir}/pioreactorui_*.tar.gz {tmp_dir}/pioreactorui_archive", 98), # move ui folder to be accessed by a `pio update ui` ]) else: commands_and_priority.extend([ @@ -593,6 +592,68 @@ def update_firmware(version: Optional[str]) -> None: logger.notice(f"Updated Pioreactor firmware to version {version_installed}.") # type: ignore +@update.command(name="ui") +@click.option("-b", "--branch", help="install from a branch on github") +@click.option( + "-r", + "--repo", + help="install from a repo on github. Format: username/project", + default="pioreactor/pioreactorui", +) +@click.option("--source", help="use a tar.gz file") +@click.option("-v", "--version", help="install a specific version") +def update_ui(branch: Optional[str], repo: str, source: Optional[str], version: Optional[str]) -> None: + """ + Update the PioreactorUI + + Source, if provided, should be a .tar.gz with a top-level dir like pioreactorui-{version}/ + This is what is provided from Github releases. + """ + logger = create_logger("update_ui", unit=whoami.get_unit_name(), experiment=whoami.UNIVERSAL_EXPERIMENT) + commands = [] + + if version is None: + version = "latest" + else: + version = f"tags/{version}" + + if source is not None: + source = quote(source) + version_installed = source + + elif branch is not None: + cleaned_branch = quote(branch) + cleaned_repo = quote(repo) + version_installed = cleaned_branch + url = f"https://github.com/{cleaned_repo}/archive/{cleaned_branch}.tar.gz" + source = "/tmp/pioreactorui.tar.gz" + commands.append(["wget", url, "-O", source]) + + else: + latest_release_metadata = loads(get(f"https://api.github.com/repos/{repo}/releases/{version}").body) + version_installed = latest_release_metadata["tag_name"] + url = f"https://github.com/{repo}/archive/refs/tags/{version_installed}.tar.gz" + source = "/tmp/pioreactorui.tar.gz" + commands.append(["wget", url, "-O", source]) + + assert source is not None + commands.append(["bash", "/usr/local/bin/update_ui.sh", source]) + + for command in commands: + logger.debug(" ".join(command)) + p = subprocess.run( + command, + universal_newlines=True, + stdout=subprocess.DEVNULL, + stderr=subprocess.PIPE, + ) + if p.returncode != 0: + logger.error(p.stderr) + raise exc.BashScriptError(p.stderr) + + logger.notice(f"Updated PioreactorUI to version {version_installed}.") # type: ignore + + if whoami.am_I_leader(): @pio.command(short_help="access the db CLI") @@ -609,7 +670,7 @@ def mqtt(topic: str) -> None: "mosquitto_sub", "-v", "-t", - "#", + topic, "-F", "%19.19I||%t||%p", "-u", @@ -630,68 +691,3 @@ def mqtt(topic: str) -> None: + " | " + click.style(value, fg="bright_yellow") ) - - @update.command(name="ui") - @click.option("-b", "--branch", help="install from a branch on github") - @click.option( - "-r", - "--repo", - help="install from a repo on github. Format: username/project", - default="pioreactor/pioreactorui", - ) - @click.option("--source", help="use a tar.gz file") - @click.option("-v", "--version", help="install a specific version") - def update_ui(branch: Optional[str], repo: str, source: Optional[str], version: Optional[str]) -> None: - """ - Update the PioreactorUI - - Source, if provided, should be a .tar.gz with a top-level dir like pioreactorui-{version}/ - This is what is provided from Github releases. - """ - logger = create_logger( - "update_ui", unit=whoami.get_unit_name(), experiment=whoami.UNIVERSAL_EXPERIMENT - ) - commands = [] - - if version is None: - version = "latest" - else: - version = f"tags/{version}" - - if source is not None: - source = quote(source) - version_installed = source - - elif branch is not None: - cleaned_branch = quote(branch) - cleaned_repo = quote(repo) - version_installed = cleaned_branch - url = f"https://github.com/{cleaned_repo}/archive/{cleaned_branch}.tar.gz" - source = "/tmp/pioreactorui.tar.gz" - commands.append(["wget", url, "-O", source]) - - else: - latest_release_metadata = loads( - get(f"https://api.github.com/repos/{repo}/releases/{version}").body - ) - version_installed = latest_release_metadata["tag_name"] - url = f"https://github.com/{repo}/archive/refs/tags/{version_installed}.tar.gz" - source = "/tmp/pioreactorui.tar.gz" - commands.append(["wget", url, "-O", source]) - - assert source is not None - commands.append(["bash", "/usr/local/bin/update_ui.sh", source]) - - for command in commands: - logger.debug(" ".join(command)) - p = subprocess.run( - command, - universal_newlines=True, - stdout=subprocess.DEVNULL, - stderr=subprocess.PIPE, - ) - if p.returncode != 0: - logger.error(p.stderr) - raise exc.BashScriptError(p.stderr) - - logger.notice(f"Updated PioreactorUI to version {version_installed}.") # type: ignore diff --git a/pioreactor/cli/pios.py b/pioreactor/cli/pios.py index c576dc67..a725626a 100644 --- a/pioreactor/cli/pios.py +++ b/pioreactor/cli/pios.py @@ -17,14 +17,18 @@ from pioreactor.cluster_management import get_workers_in_inventory from pioreactor.config import config from pioreactor.config import get_leader_hostname +from pioreactor.exc import RsyncError from pioreactor.logging import create_logger +from pioreactor.mureq import HTTPException +from pioreactor.pubsub import post_into from pioreactor.utils import ClusterJobManager -from pioreactor.utils.networking import add_local from pioreactor.utils.networking import cp_file_across_cluster +from pioreactor.utils.networking import resolve_to_address from pioreactor.utils.timing import current_utc_timestamp from pioreactor.whoami import am_I_leader from pioreactor.whoami import get_assigned_experiment_name from pioreactor.whoami import get_unit_name +from pioreactor.whoami import is_testing_env from pioreactor.whoami import UNIVERSAL_EXPERIMENT from pioreactor.whoami import UNIVERSAL_IDENTIFIER @@ -49,17 +53,64 @@ def pios(ctx) -> None: raise click.Abort() -if am_I_leader(): +if am_I_leader() or is_testing_env(): + which_units = click.option( + "--units", + multiple=True, + default=(UNIVERSAL_IDENTIFIER,), + type=click.STRING, + help="specify a hostname, default is all active units", + ) - def universal_identifier_to_all_active_workers(units: tuple[str, ...]) -> tuple[str, ...]: - if units == (UNIVERSAL_IDENTIFIER,): - units = get_active_workers_in_inventory() - return units + confirmation = click.option("-y", is_flag=True, help="Skip asking for confirmation.") + + def parse_click_arguments(input_list: list[str]) -> dict: + args: list[str] = [] + opts: dict[str, str | None] = {} + + i = 0 + while i < len(input_list): + item = input_list[i] + + if item.startswith("--"): + # Option detected + option_name = item.lstrip("--") + if i + 1 < len(input_list) and not input_list[i + 1].startswith("--"): + # Next item is the option's value + opts[option_name] = input_list[i + 1] + i += 1 # Skip the value + else: + # No value provided for this option + opts[option_name] = None + else: + # Argument detected + args.append(item) + + i += 1 + + return {"args": args, "options": opts} + + def universal_identifier_to_all_active_workers(workers: tuple[str, ...]) -> tuple[str, ...]: + active_workers = get_active_workers_in_inventory() + if workers == (UNIVERSAL_IDENTIFIER,): + return active_workers + else: + return tuple(u for u in set(workers) if u in active_workers) - def universal_identifier_to_all_workers(units: tuple[str, ...]) -> tuple[str, ...]: - if units == (UNIVERSAL_IDENTIFIER,): - units = get_workers_in_inventory() - return units + def universal_identifier_to_all_workers( + workers: tuple[str, ...], filter_out_non_workers=True + ) -> tuple[str, ...]: + all_workers = get_workers_in_inventory() + + if filter_out_non_workers: + include = lambda u: u in all_workers # noqa: E731 + else: + include = lambda u: True # noqa: E731 + + if workers == (UNIVERSAL_IDENTIFIER,): + return all_workers + else: + return tuple(u for u in set(workers) if include(u)) def add_leader(units: tuple[str, ...]) -> tuple[str, ...]: leader = get_leader_hostname() @@ -129,14 +180,8 @@ def sync_config_files(unit: str, shared: bool, specific: bool) -> None: @pios.command("cp", short_help="cp a file across the cluster") @click.argument("filepath", type=click.Path(exists=True, resolve_path=True)) - @click.option( - "--units", - multiple=True, - default=(UNIVERSAL_IDENTIFIER,), - type=click.STRING, - help="specify a Pioreactor name, default is all active non-leader units", - ) - @click.option("-y", is_flag=True, help="Skip asking for confirmation.") + @which_units + @confirmation def cp( filepath: str, units: tuple[str, ...], @@ -166,14 +211,8 @@ def _thread_function(unit: str) -> bool: @pios.command("rm", short_help="rm a file across the cluster") @click.argument("filepath", type=click.Path(exists=True, resolve_path=True)) - @click.option( - "--units", - multiple=True, - default=(UNIVERSAL_IDENTIFIER,), - type=click.STRING, - help="specify a Pioreactor name, default is all active non-leader units", - ) - @click.option("-y", is_flag=True, help="Skip asking for confirmation.") + @which_units + @confirmation def rm( filepath: str, units: tuple[str, ...], @@ -182,43 +221,29 @@ def rm( logger = create_logger("rm", unit=get_unit_name(), experiment=UNIVERSAL_EXPERIMENT) units = remove_leader(universal_identifier_to_all_workers(units)) - from sh import ssh # type: ignore - from sh import ErrorReturnCode_255 # type: ignore - from sh import ErrorReturnCode_1 - from shlex import join # https://docs.python.org/3/library/shlex.html#shlex.quote - - command = join(["rm", filepath]) - if not y: - confirm = input(f"Confirm running `{command}` on {units}? Y/n: ").strip() + confirm = input(f"Confirm deleting {filepath} on {units}? Y/n: ").strip() if confirm != "Y": raise click.Abort() def _thread_function(unit: str) -> bool: - logger.debug(f"Removing {unit}:{filepath}...") try: - ssh(add_local(unit), command) + logger.debug(f"deleting {unit}:{filepath}...") + r = post_into( + resolve_to_address(unit), "/unit_api/system/remove_file", json={"filepath": filepath} + ) + r.raise_for_status() return True - except ErrorReturnCode_255 as e: - logger.error(f"Unable to connect to unit {unit}. {e.stderr.decode()}") - logger.debug(e, exc_info=True) - return False - except ErrorReturnCode_1 as e: - logger.error(f"Error occurred rm-ing from {unit}. See logs for more.") - logger.debug(e, exc_info=True) + + except HTTPException as e: + logger.error(f"Unable to remove file on {unit} due to server error: {e}.") return False for unit in units: _thread_function(unit) @pios.command("update", short_help="update PioreactorApp on workers") - @click.option( - "--units", - multiple=True, - default=(UNIVERSAL_IDENTIFIER,), - type=click.STRING, - help="specify a Pioreactor name, default is all active units", - ) + @click.argument("target", default="app") @click.option("-b", "--branch", help="update to the github branch") @click.option( "-r", @@ -226,58 +251,53 @@ def _thread_function(unit: str) -> bool: help="install from a repo on github. Format: username/project", ) @click.option("-v", "--version", help="install a specific version, default is latest") - @click.option("--source", help="install from a source, whl or release archive") - @click.option("-y", is_flag=True, help="Skip asking for confirmation.") + @click.option("-s", "--source", help="install from a source, whl or release archive") + @which_units + @confirmation def update( - units: tuple[str, ...], + target: str, branch: str | None, repo: str | None, version: str | None, source: str | None, + units: tuple[str, ...], y: bool, ) -> None: """ Pulls and installs a Pioreactor software version across the cluster """ - from sh import ssh # type: ignore - from sh import ErrorReturnCode_255 # type: ignore - from sh import ErrorReturnCode_1 - from shlex import join - logger = create_logger("update", unit=get_unit_name(), experiment=UNIVERSAL_EXPERIMENT) - if version is not None: - commands = ["pio", "update", "app", "-v", version] - elif branch is not None: - commands = ["pio", "update", "app", "-b", branch] - elif source is not None: - commands = ["pio", "update", "app", "--source", source] - else: - commands = ["pio", "update", "app"] - - if repo is not None: - commands.extend(["-r", repo]) - - command = join(commands) units = universal_identifier_to_all_workers(units) if not y: - confirm = input(f"Confirm running `{command}` on {units}? Y/n: ").strip() + confirm = input(f"Confirm updating {target} on {units}? Y/n: ").strip() if confirm != "Y": raise click.Abort() + options: dict[str, str | None] = {} + + # only one of these three is possible, mutually exclusive + if version is not None: + options["version"] = version + elif branch is not None: + options["branch"] = branch + elif source is not None: + options["source"] = source + + if repo is not None: + options["repo"] = repo + def _thread_function(unit: str): - logger.debug(f"Executing `{command}` on {unit}...") + logger.debug(f"Executing update {target} command {unit}...") try: - ssh(add_local(unit), command) + r = post_into( + resolve_to_address(unit), f"/unit_api/system/update/{target}", json={"options": options} + ) + r.raise_for_status() return True - except ErrorReturnCode_255 as e: - logger.error(f"Unable to connect to unit {unit}. {e.stderr.decode()}") - logger.debug(e, exc_info=True) - return False - except ErrorReturnCode_1 as e: - logger.error(f"Error occurred updating {unit}. See logs for more.") - logger.debug(e.stderr, exc_info=True) + except HTTPException as e: + logger.error(f"Unable to update {target} on {unit} due to server error: {e}.") return False with ThreadPoolExecutor(max_workers=len(units)) as executor: @@ -293,44 +313,39 @@ def plugins(): @plugins.command("install", short_help="install a plugin on workers") @click.argument("plugin") @click.option( - "--units", - multiple=True, - default=(UNIVERSAL_IDENTIFIER,), - type=click.STRING, - help="specify a Pioreactor name, default is all active units", + "--source", + type=str, + help="Install from a url, ex: https://github.com/user/repository/archive/branch.zip, or wheel file", ) - @click.option("-y", is_flag=True, help="skip asking for confirmation") - def install_plugin(plugin: str, units: tuple[str, ...], y: bool) -> None: + @which_units + @confirmation + def install_plugin(plugin: str, source: str | None, units: tuple[str, ...], y: bool) -> None: """ Installs a plugin to worker and leader """ - from sh import ssh # type: ignore - from sh import ErrorReturnCode_255 # type: ignore - from sh import ErrorReturnCode_1 # type: ignore - from shlex import quote - logger = create_logger("install_plugin", unit=get_unit_name(), experiment=UNIVERSAL_EXPERIMENT) - command = f"pio plugins install {quote(plugin)}" units = add_leader(universal_identifier_to_all_workers(units)) if not y: - confirm = input(f"Confirm installing {quote(plugin)} on {units}? Y/n: ").strip() + confirm = input(f"Confirm installing {plugin} on {units}? Y/n: ").strip() if confirm != "Y": raise click.Abort() - def _thread_function(unit: str): - logger.debug(f"Executing `{command}` on {unit}...") + commands = {"args": [plugin], "options": dict()} + if source: + commands["options"] = {"source": source} + + def _thread_function(unit: str) -> bool: + print(f"Installing {plugin} on {unit}") try: - ssh(add_local(unit), command) + r = post_into( + resolve_to_address(unit), "/unit_api/plugins/install", json=commands, timeout=60 + ) + r.raise_for_status() return True - except ErrorReturnCode_255 as e: - logger.error(f"Unable to connect to unit {unit}. {e.stderr.decode()}") - logger.debug(e, exc_info=True) - return False - except ErrorReturnCode_1 as e: - logger.error(f"Error occurred installing plugin on {unit}. See logs for more.") - logger.debug(e.stderr, exc_info=True) + except HTTPException as e: + logger.error(f"Unable to install plugin on {unit} due to server error: {e}.") return False with ThreadPoolExecutor(max_workers=len(units)) as executor: @@ -341,46 +356,34 @@ def _thread_function(unit: str): @plugins.command("uninstall", short_help="uninstall a plugin on workers") @click.argument("plugin") - @click.option( - "--units", - multiple=True, - default=(UNIVERSAL_IDENTIFIER,), - type=click.STRING, - help="specify a Pioreactor name, default is all active units", - ) - @click.option("-y", is_flag=True, help="skip asking for confirmation") + @which_units + @confirmation def uninstall_plugin(plugin: str, units: tuple[str, ...], y: bool) -> None: """ Uninstalls a plugin from worker and leader """ - from sh import ssh # type: ignore - from sh import ErrorReturnCode_255 # type: ignore - from sh import ErrorReturnCode_1 # type: ignore - from shlex import quote - logger = create_logger("uninstall_plugin", unit=get_unit_name(), experiment=UNIVERSAL_EXPERIMENT) - command = f"pio plugins uninstall {quote(plugin)}" units = add_leader(universal_identifier_to_all_workers(units)) + commands = {"args": [plugin]} if not y: - confirm = input(f"Confirm uninstalling {quote(plugin)} on {units}? Y/n: ").strip() + confirm = input(f"Confirm uninstalling {plugin} on {units}? Y/n: ").strip() if confirm != "Y": raise click.Abort() - def _thread_function(unit: str): - logger.debug(f"Executing `{command}` on {unit}...") + def _thread_function(unit: str) -> bool: + print(f"Uninstalling {plugin} on {unit}") try: - ssh(add_local(unit), command) + r = post_into( + resolve_to_address(unit), "/unit_api/plugins/uninstall", json=commands, timeout=60 + ) + r.raise_for_status() return True - except ErrorReturnCode_255 as e: - logger.error(f"Unable to connect to unit {unit}. {e.stderr.decode()}") - logger.debug(e, exc_info=True) - return False - except ErrorReturnCode_1 as e: - logger.error(f"Error occurred uninstalling plugin on {unit}. See logs for more.") - logger.debug(e.stderr, exc_info=True) + + except HTTPException as e: + logger.error(f"Unable to install plugin on {unit} due to server error: {e}.") return False with ThreadPoolExecutor(max_workers=len(units)) as executor: @@ -390,13 +393,6 @@ def _thread_function(unit: str): raise click.Abort() @pios.command(name="sync-configs", short_help="sync config") - @click.option( - "--units", - multiple=True, - default=(UNIVERSAL_IDENTIFIER,), - type=click.STRING, - help="specify a hostname, default is all units", - ) @click.option( "--shared", is_flag=True, @@ -412,17 +408,18 @@ def _thread_function(unit: str): is_flag=True, help="don't save to db", ) - @click.option("-y", is_flag=True, help="(does nothing currently)") - def sync_configs(units: tuple[str, ...], shared: bool, specific: bool, skip_save: bool, y: bool) -> None: + @which_units + @confirmation + def sync_configs(shared: bool, specific: bool, skip_save: bool, units: tuple[str, ...], y: bool) -> None: """ Deploys the shared config.ini and specific config.inis to the pioreactor units. If neither `--shared` not `--specific` are specified, both are set to true. """ - from sh import ErrorReturnCode_12 # type: ignore - logger = create_logger("sync_configs", unit=get_unit_name(), experiment=UNIVERSAL_EXPERIMENT) - units = add_leader(universal_identifier_to_all_workers(units)) + units = add_leader( + universal_identifier_to_all_workers(units, filter_out_non_workers=False) + ) # TODO: why is leader being added if I only specify a subset of units? if not shared and not specific: shared = specific = True @@ -432,8 +429,8 @@ def _thread_function(unit: str) -> bool: try: sync_config_files(unit, shared, specific) return True - except ErrorReturnCode_12 as e: - logger.warning(f"Could not resolve hostname {unit}. Name not known.") + except RsyncError as e: + logger.warning(str(e)) logger.debug(e, exc_info=True) return False except Exception as e: @@ -454,25 +451,19 @@ def _thread_function(unit: str) -> bool: @pios.command("kill", short_help="kill a job(s) on workers") @click.option("--job") - @click.option( - "--units", - multiple=True, - default=(UNIVERSAL_IDENTIFIER,), - type=click.STRING, - help="specify a hostname, default is all active units", - ) @click.option("--all-jobs", is_flag=True, help="kill all worker jobs") @click.option("--experiment", type=click.STRING) @click.option("--job-source", type=click.STRING) @click.option("--name", type=click.STRING) - @click.option("-y", is_flag=True, help="skip asking for confirmation") + @which_units + @confirmation def kill( job: str | None, - units: tuple[str, ...], all_jobs: bool, experiment: str | None, job_source: str | None, name: str | None, + units: tuple[str, ...], y: bool, ) -> None: """ @@ -495,7 +486,6 @@ def kill( """ units = universal_identifier_to_all_active_workers(units) - if not y: confirm = input(f"Confirm killing jobs on {units}? Y/n: ").strip() if confirm != "Y": @@ -511,14 +501,8 @@ def kill( short_help="run a job on workers", ) @click.argument("job", type=click.STRING) - @click.option( - "--units", - multiple=True, - default=(UNIVERSAL_IDENTIFIER,), - type=click.STRING, - help="specify a hostname, default is all active units", - ) - @click.option("-y", is_flag=True, help="Skip asking for confirmation.") + @which_units + @confirmation @click.pass_context def run(ctx, job: str, units: tuple[str, ...], y: bool) -> None: """ @@ -529,51 +513,37 @@ def run(ctx, job: str, units: tuple[str, ...], y: bool) -> None: Will start stirring on all workers, after asking for confirmation. Each job has their own unique options: - > pios run stirring --duty-cycle 10 - > pios run od_reading --od-angle-channel 135,0 + > pios run stirring --target-rpm 100 + > pios run od_reading To specify specific units, use the `--units` keyword multiple times, ex: - > pios run stirring --units pioreactor2 --units pioreactor3 + > pios run stirring --units pio01 --units pio03 """ - from sh import ssh - from sh import ErrorReturnCode_255 # type: ignore - from sh import ErrorReturnCode_1 # type: ignore - from shlex import quote # https://docs.python.org/3/library/shlex.html#shlex.quote - extra_args = list(ctx.args) if "unit" in extra_args: click.echo("Did you mean to use 'units' instead of 'unit'? Exiting.", err=True) raise click.Abort() - core_command = " ".join(["pio", "run", quote(job), *extra_args]) - - # pipe all output to null - command = " ".join(["nohup", core_command, ">/dev/null", "2>&1", "&"]) - + data = parse_click_arguments(extra_args) units = universal_identifier_to_all_active_workers(units) + assert len(units) > 0, "Empty units!" if not y: - confirm = input(f"Confirm running `{core_command}` on {units}? Y/n: ").strip() + confirm = input(f"Confirm running {job} on {units}? Y/n: ").strip() if confirm != "Y": raise click.Abort() def _thread_function(unit: str) -> bool: - click.echo(f"Executing `{core_command}` on {unit}.") + click.echo(f"Executing run {job} on {unit}.") try: - ssh(add_local(unit), command) + r = post_into(resolve_to_address(unit), f"/unit_api/jobs/run/job_name/{job}", json=data) + r.raise_for_status() return True - except ErrorReturnCode_255 as e: - logger = create_logger("CLI", unit=get_unit_name(), experiment=UNIVERSAL_EXPERIMENT) - logger.debug(e, exc_info=True) - logger.error(f"Unable to connect to unit {unit}. {e.stderr.decode()}") - return False - except ErrorReturnCode_1 as e: - logger = create_logger("CLI", unit=get_unit_name(), experiment=UNIVERSAL_EXPERIMENT) - logger.error(f"Error occurred running job on {unit}. See logs for more.") - logger.debug(e.stderr, exc_info=True) + except HTTPException as e: + click.echo(f"Unable to execute run command on {unit} due to server error: {e}.") return False with ThreadPoolExecutor(max_workers=len(units)) as executor: @@ -586,45 +556,28 @@ def _thread_function(unit: str) -> bool: name="shutdown", short_help="shutdown Pioreactors", ) - @click.option( - "--units", - multiple=True, - default=(UNIVERSAL_IDENTIFIER,), - type=click.STRING, - help="specify a hostname, default is all active units", - ) - @click.option("-y", is_flag=True, help="Skip asking for confirmation.") + @which_units + @confirmation def shutdown(units: tuple[str, ...], y: bool) -> None: """ Shutdown Pioreactor / Raspberry Pi """ - from sh import ssh # type: ignore - from sh import ErrorReturnCode_255 # type: ignore - from sh import ErrorReturnCode_1 # type: ignore - command = "sudo shutdown -h now" units = universal_identifier_to_all_workers(units) also_shutdown_leader = get_leader_hostname() in units units_san_leader = remove_leader(units) if not y: - confirm = input(f"Confirm running `{command}` on {units}? Y/n: ").strip() + confirm = input(f"Confirm shutting down on {units}? Y/n: ").strip() if confirm != "Y": raise click.Abort() def _thread_function(unit: str) -> bool: - click.echo(f"Executing `{command}` on {unit}.") try: - ssh(add_local(unit), command) + post_into(resolve_to_address(unit), "/unit_api/system/shutdown", timeout=60) return True - except ErrorReturnCode_255 as e: - logger = create_logger("CLI", unit=get_unit_name(), experiment=UNIVERSAL_EXPERIMENT) - logger.debug(e, exc_info=True) - logger.error(f"Unable to connect to unit {unit}. {e.stderr.decode()}") - return False - except ErrorReturnCode_1 as e: - logger.error(f"Error occurred shutting down {unit}. See logs for more.") - logger.debug(e.stderr, exc_info=True) + except HTTPException as e: + click.echo(f"Unable to install plugin on {unit} due to server error: {e}.") return False if len(units_san_leader) > 0: @@ -634,53 +587,30 @@ def _thread_function(unit: str) -> bool: # we delay shutdown leader (if asked), since it would prevent # executing the shutdown cmd on other workers if also_shutdown_leader: - import os + post_into(resolve_to_address(get_leader_hostname()), "/unit_api/shutdown", timeout=60) - os.system(command) - - @pios.command( - name="reboot", - short_help="reboot Pioreactors", - ) - @click.option( - "--units", - multiple=True, - default=(UNIVERSAL_IDENTIFIER,), - type=click.STRING, - help="specify a hostname, default is all active units", - ) - @click.option("-y", is_flag=True, help="Skip asking for confirmation.") + @pios.command(name="reboot", short_help="reboot Pioreactors") + @which_units + @confirmation def reboot(units: tuple[str, ...], y: bool) -> None: """ Reboot Pioreactor / Raspberry Pi """ - from sh import ssh # type: ignore - from sh import ErrorReturnCode_255 # type: ignore - from sh import ErrorReturnCode_1 # type: ignore - - command = "sudo reboot" units = universal_identifier_to_all_workers(units) also_reboot_leader = get_leader_hostname() in units units_san_leader = remove_leader(units) if not y: - confirm = input(f"Confirm running `{command}` on {units}? Y/n: ").strip() + confirm = input(f"Confirm rebooting on {units}? Y/n: ").strip() if confirm != "Y": raise click.Abort() def _thread_function(unit: str) -> bool: - click.echo(f"Executing `{command}` on {unit}.") try: - ssh(add_local(unit), command) + post_into(resolve_to_address(unit), "/unit_api/system/reboot", timeout=60) return True - except ErrorReturnCode_255 as e: - logger = create_logger("CLI", unit=get_unit_name(), experiment=UNIVERSAL_EXPERIMENT) - logger.debug(e, exc_info=True) - logger.error(f"Unable to connect to unit {unit}. {e.stderr.decode()}") - return False - except ErrorReturnCode_1 as e: - logger.error(f"Error occurred rebooting {unit}. See logs for more.") - logger.debug(e.stderr, exc_info=True) + except HTTPException as e: + click.echo(f"Unable to install plugin on {unit} due to server error: {e}.") return False if len(units_san_leader) > 0: @@ -690,9 +620,7 @@ def _thread_function(unit: str) -> bool: # we delay rebooting leader (if asked), since it would prevent # executing the reboot cmd on other workers if also_reboot_leader: - import os - - os.system(command) + post_into(resolve_to_address(get_leader_hostname()), "/unit_api/reboot", timeout=60) @pios.command( name="update-settings", @@ -700,14 +628,8 @@ def _thread_function(unit: str) -> bool: short_help="update settings on a job on workers", ) @click.argument("job", type=click.STRING) - @click.option( - "--units", - multiple=True, - default=(UNIVERSAL_IDENTIFIER,), - type=click.STRING, - help="specify a hostname, default is all active units", - ) - @click.option("-y", is_flag=True, help="Skip asking for confirmation.") + @which_units + @confirmation @click.pass_context def update_settings(ctx, job: str, units: tuple[str, ...], y: bool) -> None: """ diff --git a/pioreactor/cluster_management/__init__.py b/pioreactor/cluster_management/__init__.py index 240df3b6..2bd918e1 100644 --- a/pioreactor/cluster_management/__init__.py +++ b/pioreactor/cluster_management/__init__.py @@ -53,7 +53,7 @@ def add_worker(hostname: str, password: str, version: str, model: str) -> None: """ Add a new pioreactor worker to the cluster. The pioreactor should already have the worker image installed and is turned on. """ - + # TODO: this needs to be more robust, i.e. remove the add_local parts import socket logger = create_logger( @@ -64,7 +64,7 @@ def add_worker(hostname: str, password: str, version: str, model: str) -> None: logger.info(f"Adding new pioreactor {hostname} to cluster.") hostname = hostname.removesuffix(".local") - hostname_dot_local = hostname + ".local" + hostname_dot_local = networking.add_local(hostname) assert model == "pioreactor_20ml" @@ -123,32 +123,32 @@ def add_worker(hostname: str, password: str, version: str, model: str) -> None: @click.command(name="remove", short_help="remove a pioreactor worker") -@click.argument("hostname") -def remove_worker(hostname: str) -> None: +@click.argument("worker") +def remove_worker(worker: str) -> None: try: - r = delete_from_leader(f"/api/workers/{hostname}") + r = delete_from_leader(f"/api/workers/{worker}") r.raise_for_status() except HTTPErrorStatus: if r.status_code >= 500: click.echo("Server error. Could not complete. See UI logs.") else: - click.echo(f"Worker {hostname} not present to be removed. Check hostname.") + click.echo(f"Worker {worker} not present to be removed. Check hostname.") click.Abort() except HTTPException: click.echo(f"Not able to connect to leader's backend at {leader_address}.") click.Abort() else: - click.echo(f"Removed {hostname} from cluster.") # this needs to shutdown the worker too??? + click.echo(f"Removed {worker} from cluster.") # this needs to shutdown the worker too??? @click.command(name="assign", short_help="assign a pioreactor worker") -@click.argument("hostname") +@click.argument("worker") @click.argument("experiment") -def assign_worker_to_experiment(hostname: str, experiment: str) -> None: +def assign_worker_to_experiment(worker: str, experiment: str) -> None: try: r = put_into_leader( f"/api/experiments/{experiment}/workers", - json={"pioreactor_unit": hostname}, + json={"pioreactor_unit": worker}, ) r.raise_for_status() except HTTPErrorStatus: @@ -161,16 +161,16 @@ def assign_worker_to_experiment(hostname: str, experiment: str) -> None: click.echo("Not able to connect to leader's backend.") click.Abort() else: - click.echo(f"Assigned {hostname} to {experiment}") + click.echo(f"Assigned {worker} to {experiment}") @click.command(name="unassign", short_help="unassign a pioreactor worker") -@click.argument("hostname") +@click.argument("worker") @click.argument("experiment") -def unassign_worker_from_experiment(hostname: str, experiment: str) -> None: +def unassign_worker_from_experiment(worker: str, experiment: str) -> None: try: r = delete_from_leader( - f"/api/experiments/{experiment}/workers/{hostname}", + f"/api/experiments/{experiment}/workers/{worker}", ) r.raise_for_status() except HTTPErrorStatus: @@ -180,16 +180,16 @@ def unassign_worker_from_experiment(hostname: str, experiment: str) -> None: click.echo("Not able to connect to leader's backend.") click.Abort() else: - click.echo(f"Unassigned {hostname} from {experiment}") + click.echo(f"Unassigned {worker} from {experiment}") @click.command(name="update-active", short_help="change active of worker") @click.argument("hostname") @click.argument("active", type=click.IntRange(0, 1)) -def update_active(hostname: str, active: int) -> None: +def update_active(worker: str, active: int) -> None: try: r = put_into_leader( - f"/api/workers/{hostname}/is_active", + f"/api/workers/{worker}/is_active", json={"is_active": active}, ) r.raise_for_status() @@ -197,7 +197,7 @@ def update_active(hostname: str, active: int) -> None: click.echo("Not able to connect to leader's backend.") click.Abort() else: - click.echo(f"Updated {hostname}'s active to {bool(active)}") + click.echo(f"Updated {worker}'s active to {bool(active)}") @click.command( @@ -246,6 +246,7 @@ def get_metadata(hostname): state = "unknown" # get version + # TODO: change to ping webserver result = subscribe( f"pioreactor/{hostname}/{whoami.UNIVERSAL_EXPERIMENT}/monitor/versions", timeout=1, @@ -256,7 +257,7 @@ def get_metadata(hostname): else: app_version = "unknown" - # is reachable? + # is reachable? # TODO: change to webserver reachable = networking.is_reachable(networking.add_local(hostname)) # get experiment @@ -268,7 +269,7 @@ def get_metadata(hostname): return ip, state, reachable, app_version, experiment - def display_data_for(worker: dict[str, str]) -> bool: + def display_data_for(worker: dict[str, str]) -> None: hostname, is_active = worker["pioreactor_unit"], worker["is_active"] ip, state, reachable, version, experiment = get_metadata(hostname) @@ -286,13 +287,13 @@ def display_data_for(worker: dict[str, str]) -> bool: click.echo( f"{hostnamef} {is_leaderf} {ipf} {statef} {is_activef} {reachablef} {versionf} {experimentf}" ) - return reachable & (state == "ready") + return workers = get_from_leader("/api/workers").json() n_workers = len(workers) click.secho( - f"{'Unit / hostname':20s} {'Is leader?':15s} {'IP address':20s} {'State':15s} {'Active?':15s} {'Reachable?':14s} {'Version':15s} {'Experiment':15s}", + f"{'Name':20s} {'Is leader?':15s} {'IP address':20s} {'State':15s} {'Active?':15s} {'Reachable?':14s} {'Version':15s} {'Experiment':15s}", bold=True, ) if n_workers == 0: @@ -301,5 +302,8 @@ def display_data_for(worker: dict[str, str]) -> bool: with ThreadPoolExecutor(max_workers=n_workers) as executor: results = executor.map(display_data_for, workers) - if not all(results): - raise click.Abort() + # Iterating over the results ensures that all tasks complete + for result in results: + pass + + return diff --git a/pioreactor/config.py b/pioreactor/config.py index ce75c339..f6d50114 100644 --- a/pioreactor/config.py +++ b/pioreactor/config.py @@ -163,19 +163,19 @@ def get_config() -> ConfigParserMod: return config +config = get_config() + + @cache def get_leader_hostname() -> str: - return get_config().get("cluster.topology", "leader_hostname", fallback="localhost") + return config.get("cluster.topology", "leader_hostname", fallback="localhost") @cache def get_leader_address() -> str: - return get_config().get("cluster.topology", "leader_address", fallback="localhost") + return config.get("cluster.topology", "leader_address", fallback="localhost") @cache def get_mqtt_address() -> str: - return get_config().get("mqtt", "broker_address", fallback=get_leader_address()) - - -config = get_config() + return config.get("mqtt", "broker_address", fallback=get_leader_address()) diff --git a/pioreactor/exc.py b/pioreactor/exc.py index a03051ac..c8665ce6 100644 --- a/pioreactor/exc.py +++ b/pioreactor/exc.py @@ -60,3 +60,9 @@ class MQTTValueError(ValueError): """ Data from MQTT is not expected / correct """ + + +class RsyncError(OSError): + """ + Syncing files failed + """ diff --git a/pioreactor/pubsub.py b/pioreactor/pubsub.py index b54121df..9823366f 100644 --- a/pioreactor/pubsub.py +++ b/pioreactor/pubsub.py @@ -350,45 +350,73 @@ def __exit__(self, *args): self.client.disconnect() -def create_leader_webserver_path(endpoint): +def conform_and_validate_api_endpoint(endpoint: str) -> str: + endpoint = endpoint.removeprefix("/") + if not (endpoint.startswith("api/") or endpoint.startswith("unit_api/")): + raise ValueError(f"/{endpoint} is not a valid Pioreactor API.") + + return endpoint + + +def create_webserver_path(address: str, endpoint: str) -> str: + # Most commonly, address can be an mdns name (test.local), or an IP address. port = config.getint("ui", "port", fallback=80) proto = config.get("ui", "proto", fallback="http") - return f"{proto}://{leader_address}:{port}/{endpoint}" + return f"{proto}://{address}:{port}/{endpoint}" + + +def get_from(address: str, endpoint: str, **kwargs) -> mureq.Response: + endpoint = conform_and_validate_api_endpoint(endpoint) + return mureq.get(create_webserver_path(address, endpoint), **kwargs) def get_from_leader(endpoint: str, **kwargs) -> mureq.Response: - assert endpoint.startswith("/api/") or endpoint.startswith("api/") - endpoint = endpoint.removeprefix("/") - return mureq.get(create_leader_webserver_path(endpoint), **kwargs) + return get_from(leader_address, endpoint, **kwargs) + + +def put_into( + address: str, endpoint: str, body: bytes | None = None, json: dict | Struct | None = None, **kwargs +) -> mureq.Response: + endpoint = conform_and_validate_api_endpoint(endpoint) + return mureq.put(create_webserver_path(address, endpoint), body=body, json=json, **kwargs) def put_into_leader( endpoint: str, body: bytes | None = None, json: dict | Struct | None = None, **kwargs ) -> mureq.Response: - assert endpoint.startswith("/api/") or endpoint.startswith("api/") + return put_into(leader_address, endpoint, body=body, json=json, **kwargs) - endpoint = endpoint.removeprefix("/") - return mureq.put(create_leader_webserver_path(endpoint), body=body, json=json, **kwargs) + +def patch_into( + address: str, endpoint: str, body: bytes | None = None, json: dict | Struct | None = None, **kwargs +) -> mureq.Response: + endpoint = conform_and_validate_api_endpoint(endpoint) + return mureq.patch(create_webserver_path(address, endpoint), body=body, json=json, **kwargs) def patch_into_leader( endpoint: str, body: bytes | None = None, json: dict | Struct | None = None, **kwargs ) -> mureq.Response: - assert endpoint.startswith("/api/") or endpoint.startswith("api/") + return patch_into(leader_address, endpoint, body=body, json=json, **kwargs) - endpoint = endpoint.removeprefix("/") - return mureq.patch(create_leader_webserver_path(endpoint), body=body, json=json, **kwargs) + +def post_into( + address: str, endpoint: str, body: bytes | None = None, json: dict | Struct | None = None, **kwargs +) -> mureq.Response: + endpoint = conform_and_validate_api_endpoint(endpoint) + return mureq.post(create_webserver_path(address, endpoint), body=body, json=json, **kwargs) def post_into_leader( endpoint: str, body: bytes | None = None, json: dict | Struct | None = None, **kwargs ) -> mureq.Response: - assert endpoint.startswith("/api/") or endpoint.startswith("api/") - endpoint = endpoint.removeprefix("/") - return mureq.post(create_leader_webserver_path(endpoint), body=body, json=json, **kwargs) + return post_into(leader_address, endpoint, body=body, json=json, **kwargs) + + +def delete_from(address: str, endpoint: str, **kwargs) -> mureq.Response: + endpoint = conform_and_validate_api_endpoint(endpoint) + return mureq.delete(create_webserver_path(address, endpoint), **kwargs) def delete_from_leader(endpoint: str, **kwargs) -> mureq.Response: - assert endpoint.startswith("/api/") or endpoint.startswith("api/") - endpoint = endpoint.removeprefix("/") - return mureq.delete(create_leader_webserver_path(endpoint), **kwargs) + return delete_from(leader_address, endpoint, **kwargs) diff --git a/pioreactor/tests/conftest.py b/pioreactor/tests/conftest.py index 418c0720..a068cd00 100644 --- a/pioreactor/tests/conftest.py +++ b/pioreactor/tests/conftest.py @@ -1,12 +1,14 @@ # -*- coding: utf-8 -*- from __future__ import annotations +import contextlib import re from unittest.mock import MagicMock +from unittest.mock import patch import pytest -from pioreactor.cli import run +from pioreactor.mureq import Response @pytest.fixture(autouse=True) @@ -81,3 +83,29 @@ def mock_get_response(endpoint): ) return mock_get + + +class CapturedRequest: + def __init__(self, method, url, headers, body): + self.method = method + self.url = url + self.headers = headers + self.body = body + + +@contextlib.contextmanager +def capture_requests(): + bucket = [] + + def mock_request(method, url, **kwargs): + # Capture the request details + headers = kwargs.get("headers") + body = kwargs.get("body", None) + bucket.append(CapturedRequest(method, url, headers, body)) + + # Return a mock response object + return Response(url, 200, {}, b'{"mocked": "response"}') + + # Patch the mureq.request method + with patch("pioreactor.mureq.request", side_effect=mock_request): + yield bucket diff --git a/pioreactor/tests/test_cli.py b/pioreactor/tests/test_cli.py index f38fc406..c6f87ba8 100644 --- a/pioreactor/tests/test_cli.py +++ b/pioreactor/tests/test_cli.py @@ -4,15 +4,20 @@ import time +import click import pytest from click.testing import CliRunner from pioreactor import whoami from pioreactor.background_jobs.dosing_automation import start_dosing_automation from pioreactor.cli.pio import pio +from pioreactor.cli.pios import kill from pioreactor.cli.pios import pios +from pioreactor.cli.pios import reboot +from pioreactor.cli.pios import run from pioreactor.pubsub import collect_all_logs_of_level from pioreactor.pubsub import subscribe_and_callback +from pioreactor.tests.conftest import capture_requests from pioreactor.utils import is_pio_job_running from pioreactor.utils import local_intermittent_storage @@ -112,3 +117,42 @@ def test_pio_kill_cleans_up_automations_correctly() -> None: pause() assert not is_pio_job_running("dosing_automation") + + +def test_pios_run_requests(): + with capture_requests() as bucket: + ctx = click.Context(run, allow_extra_args=True) + ctx.forward(run, job="stirring", y=True) + + assert len(bucket) == 2 + assert bucket[0].url == "http://unit1.local:4999/unit_api/jobs/run/job_name/stirring" + + +def test_pios_run_requests_dedup_and_filter_units(): + units = ("unit1", "unit1", "notaunitincluster") + + with capture_requests() as bucket: + ctx = click.Context(run, allow_extra_args=True) + ctx.forward(run, job="stirring", y=True, units=units) + + assert len(bucket) == 1 + assert bucket[0].url == "http://unit1.local:4999/unit_api/jobs/run/job_name/stirring" + + +def test_pios_kill_requests(): + with capture_requests() as bucket: + ctx = click.Context(kill, allow_extra_args=True) + ctx.forward(kill, experiment="demo", y=True) + + assert len(bucket) == 2 + assert bucket[0].url == "http://unit1.local:4999/unit_api/jobs/stop/experiment/demo" + assert bucket[1].url == "http://unit2.local:4999/unit_api/jobs/stop/experiment/demo" + + +def test_pios_reboot_requests(): + with capture_requests() as bucket: + ctx = click.Context(reboot, allow_extra_args=True) + ctx.forward(reboot, y=True, units=("unit1",)) + + assert len(bucket) == 1 + assert bucket[0].url == "http://unit1.local:4999/unit_api/system/reboot" diff --git a/pioreactor/tests/test_execute_experiment_profile.py b/pioreactor/tests/test_execute_experiment_profile.py index a4339913..88d11aa1 100644 --- a/pioreactor/tests/test_execute_experiment_profile.py +++ b/pioreactor/tests/test_execute_experiment_profile.py @@ -829,3 +829,50 @@ def test_profiles_in_github_repo() -> None: content = get(file["download_url"]).content profile = decode(content, type=Profile) assert _verify_experiment_profile(profile) + + +@patch("pioreactor.actions.leader.experiment_profile._load_experiment_profile") +def test_api_requests_are_made( + mock__load_experiment_profile, +) -> None: + experiment = "_testing_experiment" + + action1 = Start(hours_elapsed=0 / 60 / 60) + action2 = Start(hours_elapsed=2 / 60 / 60) + action3 = Stop(hours_elapsed=4 / 60 / 60) + + profile = Profile( + experiment_profile_name="test_profile", + plugins=[], + common=CommonBlock(jobs={"job1": Job(actions=[action1])}), + pioreactors={ + "unit1": PioreactorSpecificBlock(jobs={"job2": Job(actions=[action2, action3])}, label="label1"), + }, + metadata=Metadata(author="test_author"), + ) + + mock__load_experiment_profile.return_value = profile + + from pioreactor.tests.conftest import capture_requests + + with capture_requests() as bucket: + execute_experiment_profile("profile.yaml", experiment) + + assert len(bucket) == 5 + assert bucket[0].url == f"http://localhost:4999/api/experiments/{experiment}/unit_labels" + assert ( + bucket[1].url + == f"http://localhost:4999/api/workers/unit1/jobs/run/job_name/job1/experiments/{experiment}" + ) + assert ( + bucket[2].url + == f"http://localhost:4999/api/workers/unit2/jobs/run/job_name/job1/experiments/{experiment}" + ) + assert ( + bucket[3].url + == f"http://localhost:4999/api/workers/unit1/jobs/run/job_name/job2/experiments/{experiment}" + ) + assert ( + bucket[4].url + == f"http://localhost:4999/api/workers/unit1/jobs/update/job_name/job2/experiments/{experiment}" + ) diff --git a/pioreactor/tests/test_pubsub.py b/pioreactor/tests/test_pubsub.py index 8346dc12..3d1a890c 100644 --- a/pioreactor/tests/test_pubsub.py +++ b/pioreactor/tests/test_pubsub.py @@ -11,6 +11,9 @@ from pioreactor.pubsub import add_hash_suffix from pioreactor.pubsub import create_client +from pioreactor.pubsub import post_into +from pioreactor.pubsub import post_into_leader +from pioreactor.tests.conftest import capture_requests def test_add_hash_suffix() -> None: @@ -83,3 +86,35 @@ def test_create_client_max_connection_attempts(mock_client) -> None: create_client(hostname=hostname, max_connection_attempts=max_connection_attempts) assert client_instance.connect.call_count == max_connection_attempts + + +def test_post_into(): + data = b'{"key": "value"}' + + with capture_requests() as bucket: + post_into("pio01.local", "/api/my_endpoint", body=data) + + # Check that the request was made + assert len(bucket) == 1 + captured_request = bucket[0] + + # Assert request details + assert captured_request.method == "POST" + assert captured_request.url == "http://pio01.local:4999/api/my_endpoint" + assert captured_request.body == data + + +def test_post_into_leader(): + data = b'{"key": "value"}' + + with capture_requests() as bucket: + post_into_leader("/api/my_endpoint", body=data) + + # Check that the request was made + assert len(bucket) == 1 + captured_request = bucket[0] + + # Assert request details + assert captured_request.method == "POST" + assert captured_request.url == "http://localhost:4999/api/my_endpoint" + assert captured_request.body == data diff --git a/pioreactor/tests/test_temperature_approximation_1_0.py b/pioreactor/tests/test_temperature_approximation_1_0.py index 616b53db..2baf80fd 100644 --- a/pioreactor/tests/test_temperature_approximation_1_0.py +++ b/pioreactor/tests/test_temperature_approximation_1_0.py @@ -984,3 +984,83 @@ def test_temperature_approximation19(self) -> None: } assert 55.5 <= self.t.approximate_temperature_1_0(features) <= 56.5 + + def test_temperature_approximation50(self) -> None: + # this was real data from a bheit + + features = { + "previous_heater_dc": 1.3, + "room_temp": 22.0, + "time_series_of_temp": [ + 27.21875, + 26.23958333, + 25.52083333, + 24.94791667, + 24.45833333, + 24.0625, + 23.73958333, + 23.4375, + 23.1875, + 23.0, + 22.8125, + 22.63541667, + 22.5, + 22.41666667, + 22.3125, + 22.23958333, + 22.13541667, + 22.0625, + 22.0, + 21.98958333, + 21.9375, + 21.875, + 21.85416667, + 21.8125, + 21.80208333, + 21.75, + 21.75, + 21.72916667, + 21.6875, + ], + } + + with pytest.raises(ValueError): + assert 20 <= self.t.approximate_temperature_1_0(features) <= 30 + + features = { + "previous_heater_dc": 1.3, + "room_temp": 22.0 - 3.0, # here. + "time_series_of_temp": [ + 27.21875, + 26.23958333, + 25.52083333, + 24.94791667, + 24.45833333, + 24.0625, + 23.73958333, + 23.4375, + 23.1875, + 23.0, + 22.8125, + 22.63541667, + 22.5, + 22.41666667, + 22.3125, + 22.23958333, + 22.13541667, + 22.0625, + 22.0, + 21.98958333, + 21.9375, + 21.875, + 21.85416667, + 21.8125, + 21.80208333, + 21.75, + 21.75, + 21.72916667, + 21.6875, + ], + } + + assert 20 <= self.t.approximate_temperature_1_0(features) <= 30 diff --git a/pioreactor/tests/test_utils.py b/pioreactor/tests/test_utils.py index b5fc06df..59612ee8 100644 --- a/pioreactor/tests/test_utils.py +++ b/pioreactor/tests/test_utils.py @@ -10,7 +10,9 @@ from pioreactor.background_jobs.stirring import start_stirring from pioreactor.pubsub import subscribe_and_callback +from pioreactor.tests.conftest import capture_requests from pioreactor.utils import callable_stack +from pioreactor.utils import ClusterJobManager from pioreactor.utils import is_pio_job_running from pioreactor.utils import JobManager from pioreactor.utils import JobMetadataKey @@ -262,3 +264,26 @@ def collect(msg): job_manager.set_not_running(job_key1) job_manager.set_not_running(job_key2) + + +def test_ClusterJobManager_sends_requests(): + workers = ["pio01", "pio02", "pio03"] + with capture_requests() as bucket: + with ClusterJobManager(workers) as cm: + cm.kill_jobs(name="stirring") + + assert len(bucket) == len(workers) + assert bucket[0].body is None + assert bucket[0].method == "PATCH" + + for request, worker in zip(bucket, workers): + assert request.url == f"http://{worker}.local:4999/unit_api/jobs/stop/job_name/stirring" + + +def test_empty_ClusterJobManager(): + workers = [] + with capture_requests() as bucket: + with ClusterJobManager(workers) as cm: + cm.kill_jobs(name="stirring") + + assert len(bucket) == len(workers) diff --git a/pioreactor/tests/test_watchdog.py b/pioreactor/tests/test_watchdog.py index c3da3473..55d308bc 100644 --- a/pioreactor/tests/test_watchdog.py +++ b/pioreactor/tests/test_watchdog.py @@ -4,7 +4,6 @@ import time -import pytest import zeroconf from pioreactor.background_jobs.leader.watchdog import WatchDog @@ -12,7 +11,6 @@ from pioreactor.whoami import get_unit_name -@pytest.mark.xfail() def test_watchdog_alerts_on_found_worker() -> None: experiment = "test_watchdog_alerts_on_found_worker" diff --git a/pioreactor/utils/__init__.py b/pioreactor/utils/__init__.py index 12e0a098..fc09ceae 100644 --- a/pioreactor/utils/__init__.py +++ b/pioreactor/utils/__init__.py @@ -26,7 +26,10 @@ from pioreactor import whoami from pioreactor.exc import NotActiveWorkerError from pioreactor.exc import RoleError -from pioreactor.utils.networking import add_local +from pioreactor.pubsub import create_client +from pioreactor.pubsub import patch_into +from pioreactor.pubsub import subscribe_and_callback +from pioreactor.utils.networking import resolve_to_address from pioreactor.utils.timing import current_utc_timestamp if TYPE_CHECKING: @@ -145,8 +148,6 @@ def __init__( source: str = "app", job_source: str | None = None, ) -> None: - from pioreactor.pubsub import create_client - if not ignore_is_active_state and not whoami.is_active(unit): raise NotActiveWorkerError(f"{unit} is not active.") @@ -235,8 +236,6 @@ def exit_from_mqtt(self, message: pt.MQTTMessage) -> None: self._exit() def start_passive_listeners(self) -> None: - from pioreactor.pubsub import subscribe_and_callback - subscribe_and_callback( self.exit_from_mqtt, [ @@ -475,11 +474,11 @@ def exception_retry(func: Callable, retries: int = 3, sleep_for: float = 0.5, ar time.sleep(sleep_for) -def safe_kill(*args: int) -> None: - from sh import kill # type: ignore +def safe_kill(*args: str) -> None: + from subprocess import run try: - kill("-2", *args) + run(("kill", "-2") + args) except Exception: pass @@ -495,7 +494,7 @@ def kill_jobs(self) -> int: if len(self.list_of_pids) == 0: return 0 - safe_kill(*self.list_of_pids) + safe_kill(*(str(pid) for pid in self.list_of_pids)) return len(self.list_of_pids) @@ -512,8 +511,6 @@ def kill_jobs(self) -> int: if len(self.job_names_to_kill) == 0: return count - from pioreactor.pubsub import create_client - with create_client() as client: for i, name in enumerate(self.job_names_to_kill): count += 1 @@ -541,7 +538,7 @@ class JobManager: LONG_RUNNING_JOBS = ("monitor", "mqtt_to_db_streaming", "watchdog") def __init__(self) -> None: - self.db_path = f"{tempfile.gettempdir()}/pio_jobs_metadata.db" + self.db_path = f"{tempfile.gettempdir()}/local_intermittent_pioreactor_metadata.sqlite" self.conn = sqlite3.connect(self.db_path) self.cursor = self.conn.cursor() self._create_table() @@ -662,32 +659,25 @@ def kill_jobs( name: str | None = None, job_source: str | None = None, ) -> bool: - if len(self.units) == 0 or whoami.is_testing_env(): + if len(self.units) == 0: return True - from shlex import join - from sh import ssh # type: ignore - from sh import ErrorReturnCode_255 # type: ignore - from sh import ErrorReturnCode_1 # type: ignore - - command_pieces = ["pio", "kill"] if experiment: - command_pieces.extend(["--experiment", experiment]) + endpoint = f"/unit_api/jobs/stop/experiment/{experiment}" if name: - command_pieces.extend(["--name", name]) + endpoint = f"/unit_api/jobs/stop/job_name/{name}" if job_source: - command_pieces.extend(["--job-source", job_source]) + endpoint = f"/unit_api/jobs/stop/job_source/{job_source}" if all_jobs: - command_pieces.append("--all-jobs") - - command = join(command_pieces) + endpoint = "/unit_api/jobs/stop/all" def _thread_function(unit: str) -> bool: try: - ssh(add_local(unit), command) + r = patch_into(resolve_to_address(unit), endpoint) + r.raise_for_status() return True - - except (ErrorReturnCode_255, ErrorReturnCode_1): + except Exception as e: + print(f"Failed to send kill command to {unit}: {e}") return False with ThreadPoolExecutor(max_workers=len(self.units)) as executor: diff --git a/pioreactor/utils/networking.py b/pioreactor/utils/networking.py index f034482b..8dd7a211 100644 --- a/pioreactor/utils/networking.py +++ b/pioreactor/utils/networking.py @@ -1,35 +1,45 @@ # -*- coding: utf-8 -*- from __future__ import annotations -import os +import ipaddress import subprocess +from pathlib import Path from queue import Empty from queue import Queue from threading import Thread from typing import Generator +from pioreactor.exc import RsyncError -def cp_file_across_cluster(unit: str, localpath: str, remotepath: str, timeout: int = 5) -> None: - from sh import rsync # type: ignore - from sh import ErrorReturnCode_30 # type: ignore + +def rsync(*args): + from subprocess import check_call + from subprocess import CalledProcessError + try: + check_call(("rsync",) + args) + except CalledProcessError as e: + raise RsyncError from e + + +def cp_file_across_cluster(unit: str, localpath: str, remotepath: str, timeout: int = 5) -> None: try: rsync( "-z", "--timeout", - timeout, + f"{timeout}", "--inplace", "-e", "ssh", localpath, - f"{add_local(unit)}:{remotepath}", + f"{resolve_to_address(unit)}:{remotepath}", ) - except ErrorReturnCode_30: - raise ConnectionRefusedError(f"Error connecting to {unit}.") + except RsyncError: + raise RsyncError(f"Error moving file {localpath} to {unit}:{remotepath}.") def is_using_local_access_point() -> bool: - return os.path.isfile("/boot/firmware/local_access_point") + return Path("/boot/firmware/local_access_point").is_file() def is_hostname_on_network(hostname: str, timeout: float = 10.0) -> bool: @@ -130,7 +140,19 @@ def worker_hostnames(queue: Queue) -> None: break +def resolve_to_address(hostname: str) -> str: + # TODO: make this more fleshed out: resolve to IP, etc. + # add_local assumes a working mDNS. + return add_local(hostname) + + def add_local(hostname: str) -> str: + try: + # if it looks like an IP, don't continue + ipaddress.ip_address(hostname) + return hostname + except ValueError: + pass if not hostname.endswith(".local"): return hostname + ".local" return hostname diff --git a/requirements/requirements.txt b/requirements/requirements.txt index 9d6d7cae..d497b402 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -1,6 +1,5 @@ click==8.1.7 paho-mqtt==2.1.0 -sh==2.0.6 JSON-log-formatter==0.5.1 colorlog==6.7.0 msgspec==0.18.5 diff --git a/requirements/requirements_leader.txt b/requirements/requirements_leader.txt index 5e49049b..1fefe78e 100644 --- a/requirements/requirements_leader.txt +++ b/requirements/requirements_leader.txt @@ -1,11 +1,2 @@ -r requirements.txt -blinker==1.8.2 -Flask==3.0.2 -flup6==1.1.1 -huey==2.5.0 -ifaddr==0.2.0 -itsdangerous==2.2.0 -Jinja2==3.1.4 -MarkupSafe==2.1.5 -python-dotenv==1.0.1 -Werkzeug==3.0.3 +# pyyaml diff --git a/requirements/requirements_worker.txt b/requirements/requirements_worker.txt index 8a4b51fe..ac48662e 100644 --- a/requirements/requirements_worker.txt +++ b/requirements/requirements_worker.txt @@ -14,3 +14,13 @@ pyserial==3.5 pyusb==1.2.1 rpi_hardware_pwm==0.2.1 typing_extensions==4.12.2 +blinker==1.8.2 +Flask==3.0.2 +flup6==1.1.1 +huey==2.5.0 +ifaddr==0.2.0 # what is this a dependency of? +itsdangerous==2.2.0 +Jinja2==3.1.4 +MarkupSafe==2.1.5 +python-dotenv==1.0.1 +Werkzeug==3.0.3 diff --git a/setup.py b/setup.py index 711fc68f..4e28eca3 100644 --- a/setup.py +++ b/setup.py @@ -10,7 +10,6 @@ CORE_REQUIREMENTS = [ "click==8.1.7", "paho-mqtt==2.1.0", - "sh==2.0.6", "JSON-log-formatter==0.5.1", "colorlog==6.7.0", "msgspec==0.18.5", @@ -18,11 +17,6 @@ "crudini==0.9.5", "iniparse==0.5", "six==1.16.0", - # "lgpio; platform_machine!='armv7l' and platform_machine!='armv6l'", # primarily available with base image, or via apt-get install python3-lgpio -] - - -LEADER_REQUIREMENTS = [ "blinker==1.8.2", "flask==3.0.2", "flup6==1.1.1", @@ -36,6 +30,9 @@ ] +LEADER_REQUIREMENTS: list[str] = [] + + WORKER_REQUIREMENTS = [ "Adafruit-Blinka==8.43.0", "adafruit-circuitpython-ads1x15==2.2.23", diff --git a/update_scripts/upcoming/pre_update.sh b/update_scripts/upcoming/pre_update.sh new file mode 100644 index 00000000..e284b7e6 --- /dev/null +++ b/update_scripts/upcoming/pre_update.sh @@ -0,0 +1,23 @@ +#!/bin/bash + +set -xeu + + +export LC_ALL=C + +# Lower bound version +min_version="24.8.21" + +# Get the current version of pio +current_version=$(sudo -u pioreactor pio version) + +# Use sorting to determine if the current version is less than the minimum version +is_valid=$(printf "%s\n%s" "$current_version" "$min_version" | sort -V | head -n1) + +# If the smallest version isn't the minimum version, then current version is too low +if [ "$is_valid" != "$min_version" ]; then + sudo -u pioreactor pio log -l ERROR -m "Version error: installed version $current_version is lower than the minimum required version $min_version." + exit 1 +fi + +echo "Version check passed: $current_version" diff --git a/update_scripts/upcoming/update.sh b/update_scripts/upcoming/update.sh new file mode 100644 index 00000000..42f66b13 --- /dev/null +++ b/update_scripts/upcoming/update.sh @@ -0,0 +1,98 @@ +#!/bin/bash + +set -xeu + + +export LC_ALL=C + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" + +UI_FOLDER=/var/www/pioreactorui +SYSTEMD_DIR=/lib/systemd/system/ +UI_TAG="TODO" # TODO +PIO_DIR="/home/pioreactor/.pioreactor" + +HOSTNAME=$(hostname) + +# Get the leader address +LEADER_HOSTNAME=$(crudini --get "$PIO_DIR"/config.ini cluster.topology leader_hostname) + +if [ "$HOSTNAME" != "$LEADER_HOSTNAME" ]; then + # worker updates + + # install pioreactorui + rm -rf $UI_FOLDER + mkdir -p /var/www + + + tar -xzf "$SCRIPT_DIR"/pioreactorui_"$UI_TAG".tar.gz + mv pioreactorui-"$UI_TAG" /var/www + mv /var/www/pioreactorui-"$UI_TAG" $UI_FOLDER + + # init .env + mv $UI_FOLDER/.env.example $UI_FOLDER/.env + + # init sqlite db + touch $UI_FOLDER/huey.db + touch $UI_FOLDER/huey.db-shm + touch $UI_FOLDER/huey.db-wal + + + # make correct permissions in new www folders and files + # https://superuser.com/questions/19318/how-can-i-give-write-access-of-a-folder-to-all-users-in-linux + chown -R pioreactor:www-data /var/www + chmod -R g+w /var/www + find /var/www -type d -exec chmod 2775 {} \; + find /var/www -type f -exec chmod ug+rw {} \; + chmod +x $UI_FOLDER/main.fcgi + + # install lighttp and set up mods + unzip lighttpd_packages.zip -d lighttpd_packages + dpkg -i lighttpd_packages/*.deb + + # install our own lighttpd service + cp -u "$SCRIPT_DIR"/lighttpd.service $SYSTEMD_DIR + + cp -u "$SCRIPT_DIR"/lighttpd.conf /etc/lighttpd/ + cp -u "$SCRIPT_DIR"/50-pioreactorui.conf /etc/lighttpd/conf-available/ + cp -u "$SCRIPT_DIR"/52-api-only.conf /etc/lighttpd/conf-available/ + + /usr/sbin/lighttpd-enable-mod fastcgi + /usr/sbin/lighttpd-enable-mod rewrite + /usr/sbin/lighttpd-enable-mod pioreactorui + # workers only have an api, not served static files. + /usr/sbin/lighttpd-enable-mod api-only + + + cp -u "$SCRIPT_DIR"/create_diskcache.sh /usr/local/bin/ + cp -u "$SCRIPT_DIR"/update_ui.sh /usr/local/bin/ + + cp -u "$SCRIPT_DIR"/huey.service $SYSTEMD_DIR + cp -u "$SCRIPT_DIR"/create_diskcache.service $SYSTEMD_DIR + + systemctl enable huey.service + systemctl enable create_diskcache.service + systemctl enable lighttpd.service + + + # don't need to start these services, as we do a pio update ui which should handle this. +else + + CONFIG_FILE=/etc/lighttpd/conf-available/50-pioreactorui.conf + # add new unit_api to rewrite + # Check if the unit_api rule is already present + if grep -q 'unit_api' "$CONFIG_FILE"; then + echo "unit_api rewrite rule already exists." + else + # Add the new rewrite rule for /unit_api + sed -i '/^url.rewrite-once = (/a\ "^(/unit_api/.*)$" => "/main.fcgi$1",' "$CONFIG_FILE" + fi + +fi + + + +# test services +huey_consumer -h +lighttpd -h +flask --help