Skip to content

Commit

Permalink
maybe fixes watchdog?
Browse files Browse the repository at this point in the history
  • Loading branch information
CamDavidsonPilon committed Sep 8, 2024
1 parent 402bf74 commit 2d4aafd
Show file tree
Hide file tree
Showing 8 changed files with 26 additions and 21 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ jobs:
with:
version: '1.6'

- name: Install any required linux software
run: |
sudo apt-get install -y avahi avahi-utils
- name: Create dot_pioreactor folder
run: |
mkdir -p .pioreactor/storage
Expand Down
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
- Workers now have a webserver on them. This is one of the largest architectural changes to Pioreactor, and lays the foundation for better plugin and calibration management, plus future features.
- APIs that initiate a background task either return with the result, or return a task id that be be looked up at `/api/task_status/`.
- previous actions that would involve SSHing from leader to a worker are replaced by web requests.
- We no longer recommend the Raspberry Pi Zero (the original Zero, not the Zero 2.).
- We no longer recommend the Raspberry Pi Zero (the original Zero, not the Zero 2.) since supporting a web server + pioreactor functions is too much for a single core.
- Better MQTT re-connection logic.


### 24.8.22
Expand Down
4 changes: 3 additions & 1 deletion pioreactor/cli/pios.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ def install_plugin(plugin: str, source: str | None, units: tuple[str, ...], y: b
commands["options"] = {"source": source}

def _thread_function(unit: str) -> bool:
print(f"Installing {plugin} on {unit}")
try:
r = post_into(
resolve_to_address(unit), "/unit_api/plugins/install", json=commands, timeout=60
Expand Down Expand Up @@ -362,6 +363,7 @@ def uninstall_plugin(plugin: str, units: tuple[str, ...], y: bool) -> None:
raise click.Abort()

def _thread_function(unit: str) -> bool:
print(f"Uninstalling {plugin} on {unit}")
try:
r = post_into(
resolve_to_address(unit), "/unit_api/plugins/uninstall", json=commands, timeout=60
Expand Down Expand Up @@ -525,7 +527,7 @@ def run(ctx, job: str, units: tuple[str, ...], y: bool) -> None:
def _thread_function(unit: str) -> bool:
click.echo(f"Executing run {job} on {unit}.")
try:
r = post_into(resolve_to_address(unit), f"/unit_api/jobs/{job}/run", json=data)
r = post_into(resolve_to_address(unit), f"/unit_api/jobs/run/job_name/{job}", json=data)
r.raise_for_status()
return True
except HTTPException as e:
Expand Down
12 changes: 6 additions & 6 deletions pioreactor/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,19 +163,19 @@ def get_config() -> ConfigParserMod:
return config


config = get_config()


@cache
def get_leader_hostname() -> str:
return get_config().get("cluster.topology", "leader_hostname", fallback="localhost")
return config.get("cluster.topology", "leader_hostname", fallback="localhost")


@cache
def get_leader_address() -> str:
return get_config().get("cluster.topology", "leader_address", fallback="localhost")
return config.get("cluster.topology", "leader_address", fallback="localhost")


@cache
def get_mqtt_address() -> str:
return get_config().get("mqtt", "broker_address", fallback=get_leader_address())


config = get_config()
return config.get("mqtt", "broker_address", fallback=get_leader_address())
6 changes: 3 additions & 3 deletions pioreactor/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ def put_into(
def put_into_leader(
endpoint: str, body: bytes | None = None, json: dict | Struct | None = None, **kwargs
) -> mureq.Response:
return put_into(leader_address, endpoint, **kwargs)
return put_into(leader_address, endpoint, body=body, json=json, **kwargs)


def patch_into(
Expand All @@ -397,7 +397,7 @@ def patch_into(
def patch_into_leader(
endpoint: str, body: bytes | None = None, json: dict | Struct | None = None, **kwargs
) -> mureq.Response:
return patch_into(leader_address, endpoint, **kwargs)
return patch_into(leader_address, endpoint, body=body, json=json, **kwargs)


def post_into(
Expand All @@ -410,7 +410,7 @@ def post_into(
def post_into_leader(
endpoint: str, body: bytes | None = None, json: dict | Struct | None = None, **kwargs
) -> mureq.Response:
return post_into(leader_address, endpoint, **kwargs)
return post_into(leader_address, endpoint, body=body, json=json, **kwargs)


def delete_from(address: str, endpoint: str, **kwargs) -> mureq.Response:
Expand Down
2 changes: 0 additions & 2 deletions pioreactor/tests/test_watchdog.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,13 @@

import time

import pytest
import zeroconf

from pioreactor.background_jobs.leader.watchdog import WatchDog
from pioreactor.pubsub import collect_all_logs_of_level
from pioreactor.whoami import get_unit_name


@pytest.mark.xfail()
def test_watchdog_alerts_on_found_worker() -> None:
experiment = "test_watchdog_alerts_on_found_worker"

Expand Down
6 changes: 3 additions & 3 deletions pioreactor/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from pioreactor.pubsub import create_client
from pioreactor.pubsub import post_into
from pioreactor.pubsub import subscribe_and_callback
from pioreactor.utils.networking import add_local
from pioreactor.utils.networking import resolve_to_address
from pioreactor.utils.timing import current_utc_timestamp

if TYPE_CHECKING:
Expand Down Expand Up @@ -659,7 +659,7 @@ def kill_jobs(
name: str | None = None,
job_source: str | None = None,
) -> bool:
if len(self.units) == 0 or whoami.is_testing_env():
if len(self.units) == 0:
return True

if experiment:
Expand All @@ -673,7 +673,7 @@ def kill_jobs(

def _thread_function(unit: str) -> bool:
try:
r = post_into(add_local(unit), endpoint)
r = post_into(resolve_to_address(unit), endpoint)
r.raise_for_status()
return True
except Exception as e:
Expand Down
10 changes: 5 additions & 5 deletions pioreactor/utils/networking.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
from __future__ import annotations

import ipaddress
import os
import subprocess
from pathlib import Path
from queue import Empty
from queue import Queue
from threading import Thread
Expand All @@ -18,16 +18,16 @@ def rsync(*args):

try:
check_call(("rsync",) + args)
except CalledProcessError:
raise RsyncError
except CalledProcessError as e:
raise RsyncError from e


def cp_file_across_cluster(unit: str, localpath: str, remotepath: str, timeout: int = 5) -> None:
try:
rsync(
"-z",
"--timeout",
timeout,
f"{timeout}",
"--inplace",
"-e",
"ssh",
Expand All @@ -39,7 +39,7 @@ def cp_file_across_cluster(unit: str, localpath: str, remotepath: str, timeout:


def is_using_local_access_point() -> bool:
return os.path.isfile("/boot/firmware/local_access_point")
return Path("/boot/firmware/local_access_point").is_file()


def is_hostname_on_network(hostname: str, timeout: float = 10.0) -> bool:
Expand Down

0 comments on commit 2d4aafd

Please sign in to comment.