Skip to content

Commit

Permalink
fix merge conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
CamDavidsonPilon committed Sep 12, 2024
2 parents 144d32b + 2e958a6 commit 59f2d24
Show file tree
Hide file tree
Showing 27 changed files with 811 additions and 470 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ jobs:
with:
version: '1.6'

- name: Install any required linux software
run: |
sudo apt-get install -y avahi avahi-utils
- name: Create dot_pioreactor folder
run: |
mkdir -p .pioreactor/storage
Expand Down
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ repos:
- id: mixed-line-ending
- id: trailing-whitespace
- id: check-added-large-files
args: ['--maxkb=3000']


- repo: https://github.com/ambv/black
Expand Down
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
### Upcoming

- Workers now have a webserver on them. This is one of the largest architectural changes to Pioreactor, and lays the foundation for better plugin and calibration management, plus future features.
- APIs that initiate a background task either return with the result, or return a task id that be be looked up at `/api/task_status/`.
- previous actions that would involve SSHing from leader to a worker are replaced by web requests.
- We no longer recommend the Raspberry Pi Zero (the original Zero, not the Zero 2.) since supporting a web server + pioreactor functions is too much for a single core.
- Better MQTT re-connection logic.


### 24.8.22

#### Enhancements
Expand Down
9 changes: 5 additions & 4 deletions pioreactor/actions/leader/backup_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@

from pioreactor.cluster_management import get_active_workers_in_inventory
from pioreactor.config import config
from pioreactor.exc import RsyncError
from pioreactor.logging import create_logger
from pioreactor.pubsub import subscribe
from pioreactor.utils import local_persistant_storage
from pioreactor.utils import managed_lifecycle
from pioreactor.utils.networking import add_local
from pioreactor.utils.networking import resolve_to_address
from pioreactor.utils.networking import rsync
from pioreactor.utils.timing import current_utc_timestamp
from pioreactor.whoami import get_unit_name
from pioreactor.whoami import UNIVERSAL_EXPERIMENT
Expand Down Expand Up @@ -43,7 +45,6 @@ def backup_database(output_file: str, force: bool = False, backup_to_workers: in
"""

import sqlite3
from sh import ErrorReturnCode, rsync # type: ignore

unit = get_unit_name()
experiment = UNIVERSAL_EXPERIMENT
Expand Down Expand Up @@ -96,9 +97,9 @@ def backup_database(output_file: str, force: bool = False, backup_to_workers: in
"--partial",
"--inplace",
output_file,
f"{add_local(backup_unit)}:{output_file}",
f"{resolve_to_address(backup_unit)}:{output_file}",
)
except ErrorReturnCode:
except RsyncError:
logger.debug(
f"Unable to backup database to {backup_unit}. Is it online?",
exc_info=True,
Expand Down
29 changes: 13 additions & 16 deletions pioreactor/actions/leader/experiment_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from typing import Optional

import click
from msgspec.json import encode
from msgspec.yaml import decode

from pioreactor.cluster_management import get_active_workers_in_experiment
Expand Down Expand Up @@ -536,13 +535,11 @@ def _callable() -> None:
logger.info(f"Dry-run: Starting {job_name} on {unit} with options {options} and args {args}.")
else:
patch_into_leader(
f"/api/workers/{unit}/experiment/{experiment}/jobs/{job_name}/run",
body=encode(
{
"options": evaluate_options(options, env) | {"job_source": "experiment_profile"},
"args": args,
}
),
f"/api/workers/{unit}/jobs/run/job_name/{job_name}/experiments/{experiment}",
json={
"options": evaluate_options(options, env) | {"job_source": "experiment_profile"},
"args": args,
},
)
else:
logger.debug(f"Action's `if` condition, `{if_}`, evaluated False. Skipping action.")
Expand Down Expand Up @@ -574,8 +571,8 @@ def _callable() -> None:
logger.info(f"Dry-run: Pausing {job_name} on {unit}.")
else:
patch_into_leader(
f"/api/workers/{unit}/experiment/{experiment}/jobs/{job_name}/update",
body=encode({"settings": {"state": "sleeping"}}),
f"/api/workers/{unit}/jobs/update/job_name/{job_name}/experiments/{experiment}",
json={"settings": {"$state": "sleeping"}},
)
else:
logger.debug(f"Action's `if` condition, `{if_}`, evaluated False. Skipping action.")
Expand Down Expand Up @@ -607,8 +604,8 @@ def _callable() -> None:
logger.info(f"Dry-run: Resuming {job_name} on {unit}.")
else:
patch_into_leader(
f"/api/workers/{unit}/experiment/{experiment}/jobs/{job_name}/update",
body=encode({"settings": {"state": "ready"}}),
f"/api/workers/{unit}/jobs/update/job_name/{job_name}/experiments/{experiment}",
json={"settings": {"$state": "ready"}},
)
else:
logger.debug(f"Action's `if` condition, `{if_}`, evaluated False. Skipping action.")
Expand Down Expand Up @@ -640,8 +637,8 @@ def _callable() -> None:
logger.info(f"Dry-run: Stopping {job_name} on {unit}.")
else:
patch_into_leader(
f"/api/workers/{unit}/experiment/{experiment}/jobs/{job_name}/update",
body=encode({"settings": {"state": "disconnected"}}),
f"/api/workers/{unit}/jobs/update/job_name/{job_name}/experiments/{experiment}",
json={"settings": {"$state": "disconnected"}},
)
else:
logger.debug(f"Action's `if` condition, `{if_}`, evaluated False. Skipping action.")
Expand Down Expand Up @@ -677,8 +674,8 @@ def _callable() -> None:
else:
for setting, value in evaluate_options(options, env).items():
patch_into_leader(
f"/api/workers/{unit}/experiment/{experiment}/jobs/{job_name}/update",
body=encode({"settings": {setting: value}}),
f"/api/workers/{unit}/jobs/update/job_name/{job_name}/experiments/{experiment}",
json={"settings": {setting: value}},
)
else:
logger.debug(f"Action's `if` condition, `{if_}`, evaluated False. Skipping action.")
Expand Down
18 changes: 2 additions & 16 deletions pioreactor/background_jobs/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
from pioreactor.hardware import PCB_BUTTON_PIN as BUTTON_PIN
from pioreactor.hardware import PCB_LED_PIN as LED_PIN
from pioreactor.hardware import TEMP
from pioreactor.pubsub import get_from_leader
from pioreactor.pubsub import QOS
from pioreactor.structs import Voltage
from pioreactor.types import MQTTMessage
Expand Down Expand Up @@ -268,12 +267,12 @@ def self_checks(self) -> None:

# report on CPU usage, memory, disk space
self.check_and_publish_self_statistics()
sleep(0 if whoami.is_testing_env() else 5) # wait for other processes to catch up
self.check_for_webserver()

if whoami.am_I_leader():
self.check_for_last_backup()
sleep(0 if whoami.is_testing_env() else 5) # wait for other processes to catch up
self.check_for_correct_permissions()
self.check_for_webserver()
self.check_for_required_jobs_running()

try:
Expand Down Expand Up @@ -393,19 +392,6 @@ def check_for_webserver(self) -> None:
self.logger.debug(f"Error checking huey status: {e}", exc_info=True)
self.logger.error(f"Error checking huey status: {e}")

attempt = 0
retries = 5
while attempt < retries:
attempt += 1
res = get_from_leader("/api/experiments/latest")
if res.ok:
break
sleep(1.0)
else:
self.logger.debug(f"Error pinging UI: {res.status_code}")
self.logger.error(f"Error pinging UI: {res.status_code}")
self.flicker_led_with_error_code(error_codes.WEBSERVER_OFFLINE)

def check_for_required_jobs_running(self) -> None:
if not all(utils.is_pio_job_running(["watchdog", "mqtt_to_db_streaming"])):
self.logger.warning(
Expand Down
152 changes: 74 additions & 78 deletions pioreactor/cli/pio.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from pioreactor.logging import create_logger
from pioreactor.mureq import get
from pioreactor.mureq import HTTPException
from pioreactor.pubsub import get_from_leader
from pioreactor.pubsub import get_from
from pioreactor.utils import JobManager
from pioreactor.utils import local_intermittent_storage
from pioreactor.utils import local_persistant_storage
Expand Down Expand Up @@ -172,23 +172,22 @@ def version(verbose: bool) -> None:
from pioreactor.version import rpi_version_info
from pioreactor.whoami import get_pioreactor_model_and_version

click.echo(f"Pioreactor software: {tuple_to_text(software_version_info)}")
click.echo(f"Pioreactor app: {tuple_to_text(software_version_info)}")
click.echo(f"Pioreactor HAT: {tuple_to_text(hardware_version_info)}")
click.echo(f"Pioreactor firmware: {tuple_to_text(get_firmware_version())}")
click.echo(f"Model name: {get_pioreactor_model_and_version()}")
click.echo(f"HAT serial number: {serial_number}")
click.echo(f"Operating system: {platform.platform()}")
click.echo(f"Raspberry Pi: {rpi_version_info}")
click.echo(f"Image version: {whoami.get_image_git_hash()}")
if whoami.am_I_leader():
try:
result = get_from_leader("api/versions/ui")
result.raise_for_status()
ui_version = result.body.decode()
except Exception:
ui_version = "<Failed to fetch>"

click.echo(f"Pioreactor UI: {ui_version}")
try:
result = get_from("localhost", "/unit_api/versions/ui")
result.raise_for_status()
ui_version = result.body.decode()
except Exception:
ui_version = "<Failed to fetch>"

click.echo(f"Pioreactor UI: {ui_version}")
else:
click.echo(pioreactor.__version__)

Expand Down Expand Up @@ -399,6 +398,7 @@ def update_app(
(f"sudo bash {tmp_rls_dir}/pre_update.sh", 2),
(f"sudo bash {tmp_rls_dir}/update.sh", 4),
(f"sudo bash {tmp_rls_dir}/post_update.sh", 20),
(f"mv {tmp_rls_dir}/pioreactorui_*.tar.gz {tmp_dir}/pioreactorui_archive", 98), # move ui folder to be accessed by a `pio update ui`
(f"rm -rf {tmp_rls_dir}", 99),
]
)
Expand All @@ -407,7 +407,6 @@ def update_app(
commands_and_priority.extend([
(f"sudo pip install --no-index --find-links={tmp_rls_dir}/wheels/ {tmp_rls_dir}/pioreactor-{version_installed}-py3-none-any.whl[leader,worker]", 3),
(f'sudo sqlite3 {config.config["storage"]["database"]} < {tmp_rls_dir}/update.sql', 10),
(f"mv {tmp_rls_dir}/pioreactorui_*.tar.gz {tmp_dir}/pioreactorui_archive", 98), # move ui folder to be accessed by a `pio update ui`
])
else:
commands_and_priority.extend([
Expand Down Expand Up @@ -593,6 +592,68 @@ def update_firmware(version: Optional[str]) -> None:
logger.notice(f"Updated Pioreactor firmware to version {version_installed}.") # type: ignore


@update.command(name="ui")
@click.option("-b", "--branch", help="install from a branch on github")
@click.option(
"-r",
"--repo",
help="install from a repo on github. Format: username/project",
default="pioreactor/pioreactorui",
)
@click.option("--source", help="use a tar.gz file")
@click.option("-v", "--version", help="install a specific version")
def update_ui(branch: Optional[str], repo: str, source: Optional[str], version: Optional[str]) -> None:
"""
Update the PioreactorUI
Source, if provided, should be a .tar.gz with a top-level dir like pioreactorui-{version}/
This is what is provided from Github releases.
"""
logger = create_logger("update_ui", unit=whoami.get_unit_name(), experiment=whoami.UNIVERSAL_EXPERIMENT)
commands = []

if version is None:
version = "latest"
else:
version = f"tags/{version}"

if source is not None:
source = quote(source)
version_installed = source

elif branch is not None:
cleaned_branch = quote(branch)
cleaned_repo = quote(repo)
version_installed = cleaned_branch
url = f"https://github.com/{cleaned_repo}/archive/{cleaned_branch}.tar.gz"
source = "/tmp/pioreactorui.tar.gz"
commands.append(["wget", url, "-O", source])

else:
latest_release_metadata = loads(get(f"https://api.github.com/repos/{repo}/releases/{version}").body)
version_installed = latest_release_metadata["tag_name"]
url = f"https://github.com/{repo}/archive/refs/tags/{version_installed}.tar.gz"
source = "/tmp/pioreactorui.tar.gz"
commands.append(["wget", url, "-O", source])

assert source is not None
commands.append(["bash", "/usr/local/bin/update_ui.sh", source])

for command in commands:
logger.debug(" ".join(command))
p = subprocess.run(
command,
universal_newlines=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE,
)
if p.returncode != 0:
logger.error(p.stderr)
raise exc.BashScriptError(p.stderr)

logger.notice(f"Updated PioreactorUI to version {version_installed}.") # type: ignore


if whoami.am_I_leader():

@pio.command(short_help="access the db CLI")
Expand All @@ -609,7 +670,7 @@ def mqtt(topic: str) -> None:
"mosquitto_sub",
"-v",
"-t",
"#",
topic,
"-F",
"%19.19I||%t||%p",
"-u",
Expand All @@ -630,68 +691,3 @@ def mqtt(topic: str) -> None:
+ " | "
+ click.style(value, fg="bright_yellow")
)

@update.command(name="ui")
@click.option("-b", "--branch", help="install from a branch on github")
@click.option(
"-r",
"--repo",
help="install from a repo on github. Format: username/project",
default="pioreactor/pioreactorui",
)
@click.option("--source", help="use a tar.gz file")
@click.option("-v", "--version", help="install a specific version")
def update_ui(branch: Optional[str], repo: str, source: Optional[str], version: Optional[str]) -> None:
"""
Update the PioreactorUI
Source, if provided, should be a .tar.gz with a top-level dir like pioreactorui-{version}/
This is what is provided from Github releases.
"""
logger = create_logger(
"update_ui", unit=whoami.get_unit_name(), experiment=whoami.UNIVERSAL_EXPERIMENT
)
commands = []

if version is None:
version = "latest"
else:
version = f"tags/{version}"

if source is not None:
source = quote(source)
version_installed = source

elif branch is not None:
cleaned_branch = quote(branch)
cleaned_repo = quote(repo)
version_installed = cleaned_branch
url = f"https://github.com/{cleaned_repo}/archive/{cleaned_branch}.tar.gz"
source = "/tmp/pioreactorui.tar.gz"
commands.append(["wget", url, "-O", source])

else:
latest_release_metadata = loads(
get(f"https://api.github.com/repos/{repo}/releases/{version}").body
)
version_installed = latest_release_metadata["tag_name"]
url = f"https://github.com/{repo}/archive/refs/tags/{version_installed}.tar.gz"
source = "/tmp/pioreactorui.tar.gz"
commands.append(["wget", url, "-O", source])

assert source is not None
commands.append(["bash", "/usr/local/bin/update_ui.sh", source])

for command in commands:
logger.debug(" ".join(command))
p = subprocess.run(
command,
universal_newlines=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE,
)
if p.returncode != 0:
logger.error(p.stderr)
raise exc.BashScriptError(p.stderr)

logger.notice(f"Updated PioreactorUI to version {version_installed}.") # type: ignore
Loading

0 comments on commit 59f2d24

Please sign in to comment.