From ca3978269cae18536b30214c0e9f81d5495f514c Mon Sep 17 00:00:00 2001 From: CamDavidsonPilon Date: Wed, 14 Aug 2024 22:01:56 -0400 Subject: [PATCH] remove zeroconf; rely more on linux utils and Popen for pio cmds --- CHANGELOG.md | 3 +- pioreactor/background_jobs/monitor.py | 4 +- pioreactor/cli/pio.py | 72 ++++++++++------------- pioreactor/utils/networking.py | 84 ++++++++++++++------------- requirements/requirements_dev.txt | 1 + requirements/requirements_leader.txt | 1 - setup.py | 1 - 7 files changed, 82 insertions(+), 84 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bac26e64..c9a6600b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ #### Enhancements + - `pio logs` now includes the UI logs (if run on leader). - introduce a new od_reading config,`turn_off_leds_during_reading`, which enables / disables turning off the other LEDS during an OD snapshot. By default, it is set to 1 (enables). - leader-only Pioreactors also have a `config_hostname.local` file now. - a new top-level section in experiment profiles, `inputs`, allows you to define variables that can be used in expressions. This is useful if you are copy the same constant over an over again, and want a quick way to change it once. Example: @@ -38,7 +39,7 @@ #### Breaking changes - - removed `psutil` package from new images. We replaced its functionality with built-in routines. + - removed `psutil` and `zeroconf` packages from new images. We replaced their functionality with built-in routines. - in config.ini, the section `od_config` renamed to `od_reading.config`, and `stirring` is `stirring.config`. When you update, a script will run to automatically update these names in your config.inis. ### 24.7.18 diff --git a/pioreactor/background_jobs/monitor.py b/pioreactor/background_jobs/monitor.py index dbe0b79e..002902a4 100644 --- a/pioreactor/background_jobs/monitor.py +++ b/pioreactor/background_jobs/monitor.py @@ -244,9 +244,9 @@ def did_find_network() -> bool: return True if utils.boolean_retry(did_find_network, retries=3, sleep_for=2): - ipv4: str = "" + ipv4: str = get_ip() or "" else: - ipv4 = get_ip() or "" + ipv4 = "" self.ipv4 = ipv4 diff --git a/pioreactor/cli/pio.py b/pioreactor/cli/pio.py index e10d5adb..52c35028 100644 --- a/pioreactor/cli/pio.py +++ b/pioreactor/cli/pio.py @@ -7,9 +7,9 @@ """ from __future__ import annotations +import subprocess from os import geteuid from shlex import quote -from time import sleep from typing import Optional import click @@ -83,39 +83,20 @@ def pio(ctx) -> None: def logs(n: int) -> None: """ Tail & stream the logs from this unit to the terminal. CTRL-C to exit. - TODO: this consumes a full CPU core! """ + log_file = config.config.get("logging", "log_file", fallback="/var/log/pioreactor.log") + ui_log_file = ( + config.config.get("logging", "ui_log_file", fallback="/var/log/pioreactorui.log") + if am_I_leader() + else "" + ) - def file_len(filename) -> int: - count = 0 - with open(filename) as f: - for _ in f: - count += 1 - return count - - def follow(filename, sleep_sec=0.2): - """Yield each line from a file as they are written. - `sleep_sec` is the time to sleep after empty reads.""" - - # count the number of lines - n_lines = file_len(filename) - - with open(filename) as file: - line = "" - count = 1 - while True: - tmp = file.readline() - count += 1 - if tmp is not None: - line += tmp - if line.endswith("\n") and count > (n_lines - n): - yield line - line = "" - else: - sleep(sleep_sec) - - for line in follow(config.config["logging"]["log_file"]): - click.echo(line, nl=False) + with subprocess.Popen( + ["tail", "-fqn", str(n), log_file, ui_log_file], stdout=subprocess.PIPE, stderr=subprocess.STDOUT + ) as process: + assert process.stdout is not None + for line in process.stdout: + print(line.decode("utf8").rstrip("\n")) @pio.command(name="log", short_help="logs a message from the CLI") @@ -393,8 +374,6 @@ def update_app( """ Update the Pioreactor core software """ - import subprocess - logger = create_logger("update_app", unit=whoami.get_unit_name(), experiment=whoami.UNIVERSAL_EXPERIMENT) commands_and_priority: list[tuple[str, float]] = [] @@ -557,7 +536,6 @@ def update_firmware(version: Optional[str]) -> None: # TODO: this needs accept a --source arg """ - import subprocess logger = create_logger( "update_firmware", unit=whoami.get_unit_name(), experiment=whoami.UNIVERSAL_EXPERIMENT @@ -616,9 +594,25 @@ def db() -> None: @pio.command(short_help="tail MQTT") @click.option("--topic", "-t", default="pioreactor/#") def mqtt(topic: str) -> None: - import os - - os.system(f"""mosquitto_sub -v -t '{topic}' -F "%19.19I | %t %p" -u pioreactor -P raspberry""") + with subprocess.Popen( + [ + "mosquitto_sub", + "-v", + "-t", + "#", + "-F", + "%19.19I | %t %p", + "-u", + "pioreactor", + "-P", + "raspberry", + ], + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + ) as process: + assert process.stdout is not None + for line in process.stdout: + print(line.decode("utf8").rstrip("\n")) @update.command(name="ui") @click.option("-b", "--branch", help="install from a branch on github") @@ -637,8 +631,6 @@ def update_ui(branch: Optional[str], repo: str, source: Optional[str], version: Source, if provided, should be a .tar.gz with a top-level dir like pioreactorui-{version}/ This is what is provided from Github releases. """ - import subprocess - logger = create_logger( "update_ui", unit=whoami.get_unit_name(), experiment=whoami.UNIVERSAL_EXPERIMENT ) diff --git a/pioreactor/utils/networking.py b/pioreactor/utils/networking.py index bc0f881d..72d1fbcc 100644 --- a/pioreactor/utils/networking.py +++ b/pioreactor/utils/networking.py @@ -3,6 +3,9 @@ import os import subprocess +from queue import Empty +from queue import Queue +from threading import Thread from typing import Generator from typing import Optional @@ -73,56 +76,59 @@ def get_ip() -> Optional[str]: def discover_workers_on_network(terminate: bool = False) -> Generator[str, None, None]: """ + Discover workers on the network using avahi-browse. + Parameters ---------- terminate: bool - terminate after dumping a more or less complete list + If True, terminate after dumping a more or less complete list (wait 3 seconds for any new arrivals, exit if none). + + Yields + ------ + str + Hostnames of discovered workers. Example -------- > for worker in discover_workers_on_network(): > print(worker) - Notes - ------ - - This is very similar to `avahi-browse _pio-worker._tcp -t` - + ----- + This is very similar to `avahi-browse _pio-worker._tcp -tpr` """ - from zeroconf import ServiceBrowser, ServiceListener, Zeroconf - from queue import Queue, Empty - - class Listener(ServiceListener): - def __init__(self) -> None: - self.hostnames: Queue[str] = Queue() - - def add_service(self, zc: Zeroconf, type_: str, name: str) -> None: - info = zc.get_service_info(type_, name) - try: - self.hostnames.put(info.server.removesuffix(".local.")) # type: ignore - except AttributeError: - # sometimes, we've seen info.server not exist, often when there is a problem with mdns reflections / duplications - pass - - def remove_service(self, *args, **kwargs): - pass - - def update_service(self, *args, **kwargs): - pass - - def __next__(self) -> str: - try: - return self.hostnames.get(timeout=3 if terminate else None) - except Empty: - raise StopIteration - - def __iter__(self) -> Listener: - return self - - listener = Listener() - ServiceBrowser(Zeroconf(), "_pio-worker._tcp.local.", listener) - yield from listener + + def worker_hostnames(queue: Queue) -> None: + with subprocess.Popen( + ["avahi-browse", "_pio-worker._tcp", "-rp"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT + ) as process: + if process.stdout is None: + return + + assert process.stdout is not None + for line in process.stdout: + result = line.decode("utf8").rstrip("\n") + parsed = result.split(";") + if parsed[0] != "=" or parsed[1] == "lo" or parsed[2] != "IPv4": + continue + hostname = parsed[6].removesuffix(".local") + queue.put(hostname) + return + + hostnames_queue: Queue[str] = Queue() + worker_thread = Thread(target=worker_hostnames, args=(hostnames_queue,)) + worker_thread.daemon = True + worker_thread.start() + + while True: + try: + # Wait for the next hostname, with a timeout if terminate is True + hostname = hostnames_queue.get(timeout=3 if terminate else None) + yield hostname + except Empty: + # If the queue is empty and we're in terminate mode, stop the iteration + if terminate: + break def add_local(hostname: str) -> str: diff --git a/requirements/requirements_dev.txt b/requirements/requirements_dev.txt index 093b0f40..dd8c1e64 100644 --- a/requirements/requirements_dev.txt +++ b/requirements/requirements_dev.txt @@ -4,3 +4,4 @@ pytest-timeout pytest-random-order numpy pytest-mock +zeroconf diff --git a/requirements/requirements_leader.txt b/requirements/requirements_leader.txt index dbe9cf2f..5e49049b 100644 --- a/requirements/requirements_leader.txt +++ b/requirements/requirements_leader.txt @@ -9,4 +9,3 @@ Jinja2==3.1.4 MarkupSafe==2.1.5 python-dotenv==1.0.1 Werkzeug==3.0.3 -zeroconf==0.115.2 diff --git a/setup.py b/setup.py index 0f4f67e4..711fc68f 100644 --- a/setup.py +++ b/setup.py @@ -33,7 +33,6 @@ "MarkupSafe==2.1.5", "python-dotenv==1.0.1", "Werkzeug==3.0.3", - "zeroconf==0.115.2", ]