diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 22842b74..c5f21e99 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -38,6 +38,10 @@ jobs: with: version: '1.6' + - name: Install any required linux software + run: | + sudo apt-get install -y avahi avahi-utils + - name: Create dot_pioreactor folder run: | mkdir -p .pioreactor/storage diff --git a/CHANGELOG.md b/CHANGELOG.md index ab85fc61..de6a47e6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,7 +3,8 @@ - Workers now have a webserver on them. This is one of the largest architectural changes to Pioreactor, and lays the foundation for better plugin and calibration management, plus future features. - APIs that initiate a background task either return with the result, or return a task id that be be looked up at `/api/task_status/`. - previous actions that would involve SSHing from leader to a worker are replaced by web requests. - - We no longer recommend the Raspberry Pi Zero (the original Zero, not the Zero 2.). + - We no longer recommend the Raspberry Pi Zero (the original Zero, not the Zero 2.) since supporting a web server + pioreactor functions is too much for a single core. + - Better MQTT re-connection logic. ### 24.8.22 diff --git a/pioreactor/cli/pios.py b/pioreactor/cli/pios.py index 384d9bfc..eb64093f 100644 --- a/pioreactor/cli/pios.py +++ b/pioreactor/cli/pios.py @@ -326,6 +326,7 @@ def install_plugin(plugin: str, source: str | None, units: tuple[str, ...], y: b commands["options"] = {"source": source} def _thread_function(unit: str) -> bool: + print(f"Installing {plugin} on {unit}") try: r = post_into( resolve_to_address(unit), "/unit_api/plugins/install", json=commands, timeout=60 @@ -362,6 +363,7 @@ def uninstall_plugin(plugin: str, units: tuple[str, ...], y: bool) -> None: raise click.Abort() def _thread_function(unit: str) -> bool: + print(f"Uninstalling {plugin} on {unit}") try: r = post_into( resolve_to_address(unit), "/unit_api/plugins/uninstall", json=commands, timeout=60 @@ -525,7 +527,7 @@ def run(ctx, job: str, units: tuple[str, ...], y: bool) -> None: def _thread_function(unit: str) -> bool: click.echo(f"Executing run {job} on {unit}.") try: - r = post_into(resolve_to_address(unit), f"/unit_api/jobs/{job}/run", json=data) + r = post_into(resolve_to_address(unit), f"/unit_api/jobs/run/job_name/{job}", json=data) r.raise_for_status() return True except HTTPException as e: diff --git a/pioreactor/config.py b/pioreactor/config.py index ce75c339..f6d50114 100644 --- a/pioreactor/config.py +++ b/pioreactor/config.py @@ -163,19 +163,19 @@ def get_config() -> ConfigParserMod: return config +config = get_config() + + @cache def get_leader_hostname() -> str: - return get_config().get("cluster.topology", "leader_hostname", fallback="localhost") + return config.get("cluster.topology", "leader_hostname", fallback="localhost") @cache def get_leader_address() -> str: - return get_config().get("cluster.topology", "leader_address", fallback="localhost") + return config.get("cluster.topology", "leader_address", fallback="localhost") @cache def get_mqtt_address() -> str: - return get_config().get("mqtt", "broker_address", fallback=get_leader_address()) - - -config = get_config() + return config.get("mqtt", "broker_address", fallback=get_leader_address()) diff --git a/pioreactor/pubsub.py b/pioreactor/pubsub.py index 43c0318e..9823366f 100644 --- a/pioreactor/pubsub.py +++ b/pioreactor/pubsub.py @@ -384,7 +384,7 @@ def put_into( def put_into_leader( endpoint: str, body: bytes | None = None, json: dict | Struct | None = None, **kwargs ) -> mureq.Response: - return put_into(leader_address, endpoint, **kwargs) + return put_into(leader_address, endpoint, body=body, json=json, **kwargs) def patch_into( @@ -397,7 +397,7 @@ def patch_into( def patch_into_leader( endpoint: str, body: bytes | None = None, json: dict | Struct | None = None, **kwargs ) -> mureq.Response: - return patch_into(leader_address, endpoint, **kwargs) + return patch_into(leader_address, endpoint, body=body, json=json, **kwargs) def post_into( @@ -410,7 +410,7 @@ def post_into( def post_into_leader( endpoint: str, body: bytes | None = None, json: dict | Struct | None = None, **kwargs ) -> mureq.Response: - return post_into(leader_address, endpoint, **kwargs) + return post_into(leader_address, endpoint, body=body, json=json, **kwargs) def delete_from(address: str, endpoint: str, **kwargs) -> mureq.Response: diff --git a/pioreactor/tests/test_watchdog.py b/pioreactor/tests/test_watchdog.py index c3da3473..55d308bc 100644 --- a/pioreactor/tests/test_watchdog.py +++ b/pioreactor/tests/test_watchdog.py @@ -4,7 +4,6 @@ import time -import pytest import zeroconf from pioreactor.background_jobs.leader.watchdog import WatchDog @@ -12,7 +11,6 @@ from pioreactor.whoami import get_unit_name -@pytest.mark.xfail() def test_watchdog_alerts_on_found_worker() -> None: experiment = "test_watchdog_alerts_on_found_worker" diff --git a/pioreactor/utils/__init__.py b/pioreactor/utils/__init__.py index add9cab3..4e845601 100644 --- a/pioreactor/utils/__init__.py +++ b/pioreactor/utils/__init__.py @@ -29,7 +29,7 @@ from pioreactor.pubsub import create_client from pioreactor.pubsub import post_into from pioreactor.pubsub import subscribe_and_callback -from pioreactor.utils.networking import add_local +from pioreactor.utils.networking import resolve_to_address from pioreactor.utils.timing import current_utc_timestamp if TYPE_CHECKING: @@ -659,7 +659,7 @@ def kill_jobs( name: str | None = None, job_source: str | None = None, ) -> bool: - if len(self.units) == 0 or whoami.is_testing_env(): + if len(self.units) == 0: return True if experiment: @@ -673,7 +673,7 @@ def kill_jobs( def _thread_function(unit: str) -> bool: try: - r = post_into(add_local(unit), endpoint) + r = post_into(resolve_to_address(unit), endpoint) r.raise_for_status() return True except Exception as e: diff --git a/pioreactor/utils/networking.py b/pioreactor/utils/networking.py index 6e8678ad..8dd7a211 100644 --- a/pioreactor/utils/networking.py +++ b/pioreactor/utils/networking.py @@ -2,8 +2,8 @@ from __future__ import annotations import ipaddress -import os import subprocess +from pathlib import Path from queue import Empty from queue import Queue from threading import Thread @@ -18,8 +18,8 @@ def rsync(*args): try: check_call(("rsync",) + args) - except CalledProcessError: - raise RsyncError + except CalledProcessError as e: + raise RsyncError from e def cp_file_across_cluster(unit: str, localpath: str, remotepath: str, timeout: int = 5) -> None: @@ -27,7 +27,7 @@ def cp_file_across_cluster(unit: str, localpath: str, remotepath: str, timeout: rsync( "-z", "--timeout", - timeout, + f"{timeout}", "--inplace", "-e", "ssh", @@ -39,7 +39,7 @@ def cp_file_across_cluster(unit: str, localpath: str, remotepath: str, timeout: def is_using_local_access_point() -> bool: - return os.path.isfile("/boot/firmware/local_access_point") + return Path("/boot/firmware/local_access_point").is_file() def is_hostname_on_network(hostname: str, timeout: float = 10.0) -> bool: