diff --git a/pioreactor/actions/__init__.py b/pioreactor/actions/__init__.py index 54f18d56..403b7528 100644 --- a/pioreactor/actions/__init__.py +++ b/pioreactor/actions/__init__.py @@ -4,6 +4,7 @@ from pioreactor.actions import led_intensity from pioreactor.actions import od_blank from pioreactor.actions import od_calibration +from pioreactor.actions import od_calibration_from_standards from pioreactor.actions import pump from pioreactor.actions import pump_calibration from pioreactor.actions import self_test diff --git a/pioreactor/actions/od_calibration.py b/pioreactor/actions/od_calibration.py index 3ce43b84..452e81b3 100644 --- a/pioreactor/actions/od_calibration.py +++ b/pioreactor/actions/od_calibration.py @@ -17,6 +17,7 @@ from pioreactor import types as pt from pioreactor.background_jobs.od_reading import start_od_reading from pioreactor.background_jobs.stirring import start_stirring as stirring +from pioreactor.background_jobs.stirring import Stirrer from pioreactor.config import config from pioreactor.config import leader_address from pioreactor.mureq import patch @@ -135,7 +136,13 @@ def start_stirring(): def plot_data( - x, y, title, x_min=None, x_max=None, interpolation_curve=None, highlight_recent_point=True + x, + y, + title, + x_min=None, + x_max=None, + interpolation_curve=None, + highlight_recent_point=True, ): import plotext as plt # type: ignore @@ -159,8 +166,13 @@ def plot_data( def start_recording_and_diluting( - initial_od600: float, minimum_od600: float, dilution_amount: float, signal_channel + st: Stirrer, + initial_od600: float, + minimum_od600: float, + dilution_amount: float, + signal_channel, ): + target_rpm = st.target_rpm inferred_od600 = initial_od600 voltages = [] inferred_od600s = [] @@ -181,7 +193,10 @@ def start_recording_and_diluting( def get_voltage_from_adc() -> float: od_readings1 = od_reader.record_from_adc() od_readings2 = od_reader.record_from_adc() - return 0.5 * (od_readings1.ods[signal_channel].od + od_readings2.ods[signal_channel].od) + return 0.5 * ( + od_readings1.ods[signal_channel].od + + od_readings2.ods[signal_channel].od + ) for _ in range(4): # warm up @@ -246,15 +261,22 @@ def get_voltage_from_adc() -> float: x_min=minimum_od600, x_max=initial_od600, ) + st.set_target_rpm(0) click.echo() click.echo(click.style("Stop❗", fg="red")) click.echo("Carefully remove vial.") - click.echo("(Optional: take new OD600 reading with external instrument.)") + click.echo( + "(Optional: take new OD600 reading with external instrument.)" + ) click.echo("Reduce volume in vial back to 10ml.") - click.echo("Confirm vial outside is dry and clean. Place back into Pioreactor.") + click.echo( + "Confirm vial outside is dry and clean. Place back into Pioreactor." + ) while not click.confirm("Continue?", default=True): pass current_volume_in_vial = initial_volume_in_vial + st.set_target_rpm(target_rpm) + st.block_until_rpm_is_close_to_target(abs_tolerance=120) sleep(1.0) click.clear() @@ -299,7 +321,10 @@ def calculate_curve_of_best_fit( def show_results_and_confirm_with_user( - curve_data: list[float], curve_type: str, voltages: list[float], inferred_od600s: list[float] + curve_data: list[float], + curve_type: str, + voltages: list[float], + inferred_od600s: list[float], ) -> tuple[bool, int]: click.clear() @@ -404,14 +429,16 @@ def od_calibration() -> None: ) = get_metadata_from_user() setup_HDC_instructions() - with start_stirring(): + with start_stirring() as st: inferred_od600s, voltages = start_recording_and_diluting( - initial_od600, minimum_od600, dilution_amount, signal_channel + st, initial_od600, minimum_od600, dilution_amount, signal_channel ) degree = 4 while True: - curve_data_, curve_type = calculate_curve_of_best_fit(voltages, inferred_od600s, degree) + curve_data_, curve_type = calculate_curve_of_best_fit( + voltages, inferred_od600s, degree + ) okay_with_result, degree = show_results_and_confirm_with_user( curve_data_, curve_type, voltages, inferred_od600s ) @@ -433,7 +460,9 @@ def od_calibration() -> None: click.echo(click.style(f"Data for {name}", underline=True, bold=True)) click.echo(data_blob) click.echo() - click.echo(click.style(f"Calibration curve for `{name}`", underline=True, bold=True)) + click.echo( + click.style(f"Calibration curve for `{name}`", underline=True, bold=True) + ) click.echo(curve_to_functional_form(curve_type, curve_data_)) click.echo() click.echo(f"Finished calibration of {name} ✅") @@ -494,8 +523,12 @@ def display_from_calibration_blob(data_blob) -> None: ), ) click.echo() - click.echo(click.style(f"Calibration curve for `{name}`", underline=True, bold=True)) - click.echo(curve_to_functional_form(data_blob["curve_type"], data_blob["curve_data_"])) + click.echo( + click.style(f"Calibration curve for `{name}`", underline=True, bold=True) + ) + click.echo( + curve_to_functional_form(data_blob["curve_type"], data_blob["curve_data_"]) + ) click.echo() click.echo(click.style(f"Data for `{name}`", underline=True, bold=True)) pprint(data_blob) @@ -542,14 +575,16 @@ def change_current(name: str) -> None: try: with local_persistant_storage("od_calibrations") as all_calibrations: new_calibration = decode( - all_calibrations[name], type=structs.subclass_union(structs.ODCalibration) + all_calibrations[name], + type=structs.subclass_union(structs.ODCalibration), ) angle = new_calibration.angle with local_persistant_storage("current_od_calibration") as current_calibrations: if angle in current_calibrations: old_calibration = decode( - current_calibrations[angle], type=structs.subclass_union(structs.ODCalibration) + current_calibrations[angle], + type=structs.subclass_union(structs.ODCalibration), ) else: old_calibration = None @@ -564,7 +599,9 @@ def change_current(name: str) -> None: click.echo("Could not update in database on leader ❌") if old_calibration: - click.echo(f"Replaced {old_calibration.name} with {new_calibration.name} ✅") + click.echo( + f"Replaced {old_calibration.name} with {new_calibration.name} ✅" + ) else: click.echo(f"Set {new_calibration.name} to current calibration ✅") @@ -588,7 +625,9 @@ def list_() -> None: with local_persistant_storage("od_calibrations") as c: for name in c.iterkeys(): try: - cal = decode(c[name], type=structs.subclass_union(structs.ODCalibration)) + cal = decode( + c[name], type=structs.subclass_union(structs.ODCalibration) + ) click.secho( f"{cal.name:15s} {cal.created_at:%d %b, %Y} {cal.angle:12s} {'✅' if cal.name in current else ''}", ) diff --git a/pioreactor/actions/od_calibration_from_standards.py b/pioreactor/actions/od_calibration_from_standards.py new file mode 100644 index 00000000..e9d16294 --- /dev/null +++ b/pioreactor/actions/od_calibration_from_standards.py @@ -0,0 +1,578 @@ +# -*- coding: utf-8 -*- +""" +https://docs.pioreactor.com/developer-guide/adding-calibration-type +""" +from __future__ import annotations + +from time import sleep +from typing import Callable +from typing import cast +from typing import Type + +import click +from msgspec.json import decode +from msgspec.json import encode + +from pioreactor import structs +from pioreactor import types as pt +from pioreactor.background_jobs.od_reading import start_od_reading +from pioreactor.background_jobs.stirring import start_stirring as stirring +from pioreactor.background_jobs.stirring import Stirrer +from pioreactor.config import config +from pioreactor.config import leader_address +from pioreactor.mureq import patch +from pioreactor.mureq import put +from pioreactor.utils import is_pio_job_running +from pioreactor.utils import local_persistant_storage +from pioreactor.utils import publish_ready_to_disconnected_state +from pioreactor.utils.timing import current_utc_datetime +from pioreactor.whoami import get_latest_testing_experiment_name +from pioreactor.whoami import get_unit_name +from pioreactor.whoami import is_testing_env + + +def introduction() -> None: + import logging + + logging.disable(logging.WARNING) + + click.clear() + click.echo( + """This routine will calibrate the current Pioreactor to (offline) OD600 readings using a set of standards. You'll need: + 1. A Pioreactor + 2. A set of OD600 standards in Pioreactor vials (at least 10 mL in each vial) +""" + ) + + +def get_metadata_from_user(): + from math import log2 + + with local_persistant_storage("od_calibrations") as cache: + while True: + name = click.prompt("Provide a name for this calibration", type=str).strip() + if name == "": + click.echo("Name cannot be empty") + continue + elif name in cache: + if click.confirm("❗️ Name already exists. Do you wish to overwrite?"): + break + elif name == "current": + click.echo("Name cannot be `current`.") + continue + else: + break + + if "REF" not in config["od_config.photodiode_channel_reverse"]: + raise ValueError("REF required for OD calibration.") + # technically it's not required? we just need a specific PD channel to calibrate from. + + ref_channel = config["od_config.photodiode_channel_reverse"]["REF"] + signal_channel = "1" if ref_channel == "2" else "2" + + click.confirm( + f"Confirm using channel {signal_channel} with angle {config['od_config.photodiode_channel'][signal_channel]}° position in the Pioreactor", + abort=True, + default=True, + ) + angle = str(config["od_config.photodiode_channel"][signal_channel]) + return name, angle, signal_channel + + +def setup_HDC_instructions() -> None: + click.clear() + click.echo( + """ Setting up: + 1. Place first standard into Pioreactor, with a stir bar. +""" + ) + + +def start_stirring(): + while not click.confirm("Reading to start stirring?", default=True): + pass + + click.echo("Starting stirring.") + + st = stirring( + target_rpm=config.getfloat("stirring", "target_rpm"), + unit=get_unit_name(), + experiment=get_latest_testing_experiment_name(), + ) + st.block_until_rpm_is_close_to_target(abs_tolerance=120) + return st + + +def plot_data( + x, + y, + title, + x_min=None, + x_max=None, + interpolation_curve=None, + highlight_recent_point=True, +): + import plotext as plt # type: ignore + + plt.clf() + + plt.scatter(x, y, marker="hd") + + if highlight_recent_point: + plt.scatter([x[-1]], [y[-1]], color=204, marker="hd") + + plt.theme("pro") + plt.title(title) + plt.plot_size(105, 22) + + if interpolation_curve: + plt.plot(x, [interpolation_curve(x_) for x_ in x], color=204) + plt.plot_size(145, 42) + + plt.xlim(x_min, x_max) + plt.show() + + +def start_recording_standards(st: Stirrer, signal_channel): + voltages = [] + od600_values = [] + click.echo("Starting OD recordings.") + + with start_od_reading( + cast(pt.PdAngleOrREF, config.get("od_config.photodiode_channel", "1")), + cast(pt.PdAngleOrREF, config.get("od_config.photodiode_channel", "2")), + interval=None, + unit=get_unit_name(), + fake_data=is_testing_env(), + experiment=get_latest_testing_experiment_name(), + use_calibration=False, + ) as od_reader: + + def get_voltage_from_adc() -> float: + od_readings1 = od_reader.record_from_adc() + od_readings2 = od_reader.record_from_adc() + return 0.5 * ( + od_readings1.ods[signal_channel].od + + od_readings2.ods[signal_channel].od + ) + + for _ in range(4): + # warm up + od_reader.record_from_adc() + + while True: + click.echo("Recording next standard.") + standard_od = click.prompt("Enter OD600 measurement", type=float) + for i in range(4): + click.echo(".", nl=False) + sleep(0.5) + + click.echo(".", nl=False) + voltage = get_voltage_from_adc() + click.echo(".", nl=False) + + od600_values.append(standard_od) + voltages.append(voltage) + + for i in range(len(od600_values)): + click.clear() + plot_data( + od600_values, + voltages, + title="OD Calibration (ongoing)", + x_min=0, + x_max=max(od600_values), + ) + click.echo() + + if not click.confirm("Record another OD600 standard?", default=True): + break + + click.echo() + click.echo(click.style("Stop❗", fg="red")) + click.echo("Carefully remove vial and replace with next standard.") + click.echo("Confirm vial outside is dry and clean.") + while not click.confirm("Continue?", default=True): + pass + sleep(1.0) + + click.clear() + plot_data( + od600_values, + voltages, + title="OD Calibration (ongoing)", + x_min=0, + x_max=max(od600_values), + ) + click.echo("Add media blank standard.") + od600_blank = click.prompt("What is the OD600 of your blank?", type=float) + click.echo("Confirm vial outside is dry and clean. Place into Pioreactor.") + while not click.confirm("Continue?", default=True): + pass + + voltages.append(get_voltage_from_adc()) + od600_values.append(od600_blank) + + return od600_values, voltages + + +def calculate_curve_of_best_fit( + voltages: list[float], od600_values: list[float], degree: int +) -> tuple[list[float], str]: + import numpy as np + + # weigh the last point, the "blank measurement", more. + # 1. It's far away from the other points + # 2. We have prior knowledge that OD~0 when V~0. + n = len(voltages) + weights = np.ones_like(voltages) + weights[-1] = n / 2 + + try: + coefs = np.polyfit(od600_values, voltages, deg=degree, w=weights).tolist() + except Exception: + click.echo("Unable to fit.") + coefs = np.zeros(degree).tolist() + + return coefs, "poly" + + +def show_results_and_confirm_with_user( + curve_data: list[float], + curve_type: str, + voltages: list[float], + od600_values: list[float], +) -> tuple[bool, int]: + click.clear() + + curve_callable = curve_to_callable(curve_type, curve_data) + + plot_data( + od600_values, + voltages, + title="OD Calibration with curve of best fit", + interpolation_curve=curve_callable, + highlight_recent_point=False, + ) + click.echo() + click.echo(f"Calibration curve: {curve_to_functional_form(curve_type, curve_data)}") + r = click.prompt( + """ +What next? + +Y: confirm and save to disk +n: abort completely +d: choose a new degree for polynomial fit + +""", + type=click.Choice(["Y", "n", "d"]), + ) + if r == "Y": + return True, -1 + elif r == "n": + raise click.Abort() + elif r == "d": + d = click.prompt("Enter new degree", type=click.IntRange(1, 5, clamp=True)) + return False, d + else: + raise click.Abort() + + +def save_results( + curve_data_: list[float], + curve_type: str, + voltages: list[float], + od600_values: list[float], + angle, + name: str, + signal_channel: pt.PdChannel, + unit: str, +) -> structs.ODCalibration: + if angle == "45": + struct: Type[structs.ODCalibration] = structs.OD45Calibration + elif angle == "90": + struct = structs.OD90Calibration + elif angle == "135": + struct = structs.OD135Calibration + elif angle == "180": + struct = structs.OD180Calibration + else: + raise ValueError() + + data_blob = struct( + created_at=current_utc_datetime(), + pioreactor_unit=unit, + name=name, + angle=angle, + maximum_od600=max(od600_values), + minimum_od600=0, + minimum_voltage=min(voltages), + maximum_voltage=max(voltages), + curve_data_=curve_data_, + curve_type=curve_type, + voltages=voltages, + od600_values=od600_values, + ir_led_intensity=float(config["od_config"]["ir_led_intensity"]), + pd_channel=signal_channel, + ) + + with local_persistant_storage("od_calibrations") as cache: + cache[name] = encode(data_blob) + + publish_to_leader(name) + change_current(name) + + return data_blob + + +def od_calibration_from_standards() -> None: + unit = get_unit_name() + experiment = get_latest_testing_experiment_name() + + if any(is_pio_job_running(["stirring", "od_reading"])): + raise ValueError("Stirring and OD reading should be turned off.") + + with publish_ready_to_disconnected_state( + unit, experiment, "od_calibration_from_standards" + ): + introduction() + ( + name, + angle, + signal_channel, + ) = get_metadata_from_user() + setup_HDC_instructions() + + with start_stirring() as st: + od600_values, voltages = start_recording_standards(st, signal_channel) + + degree = 4 + while True: + curve_data_, curve_type = calculate_curve_of_best_fit( + voltages, od600_values, degree + ) + okay_with_result, degree = show_results_and_confirm_with_user( + curve_data_, curve_type, voltages, od600_values + ) + if okay_with_result: + break + + data_blob = save_results( + curve_data_, + curve_type, + voltages, + od600_values, + angle, + name, + signal_channel, + unit, + ) + click.echo(click.style(f"Data for {name}", underline=True, bold=True)) + click.echo(data_blob) + click.echo() + click.echo( + click.style(f"Calibration curve for `{name}`", underline=True, bold=True) + ) + click.echo(curve_to_functional_form(curve_type, curve_data_)) + click.echo() + click.echo(f"Finished calibration of {name} ✅") + + if not config.getboolean("od_config", "use_calibration", fallback=False): + click.echo() + click.echo( + click.style( + "Currently [od_config][use_calibration] is set to 0 in your config.ini. This should be set to 1 to use calibrations.", + bold=True, + ) + ) + return + + +def curve_to_functional_form(curve_type: str, curve_data) -> str: + if curve_type == "poly": + d = len(curve_data) + return " + ".join( + [ + (f"{c:0.3f}x^{d - i - 1}" if (i < d - 1) else f"{c:0.3f}") + for i, c in enumerate(curve_data) + ] + ) + else: + raise ValueError() + + +def curve_to_callable(curve_type: str, curve_data) -> Callable: + if curve_type == "poly": + import numpy as np + + def curve_callable(x): + return np.polyval(curve_data, x) + + return curve_callable + + else: + raise NotImplementedError + + +def display(name: str | None) -> None: + from pprint import pprint + + def display_from_calibration_blob(data_blob) -> None: + voltages = data_blob["voltages"] + ods = data_blob["od600_values"] + name, angle = data_blob["name"], data_blob["angle"] + click.echo() + click.echo(click.style(f"Calibration `{name}`", underline=True, bold=True)) + plot_data( + ods, + voltages, + title=f"`{name}`, calibration of {angle}°", + highlight_recent_point=False, + interpolation_curve=curve_to_callable( + data_blob["curve_type"], data_blob["curve_data_"] + ), + ) + click.echo() + click.echo( + click.style(f"Calibration curve for `{name}`", underline=True, bold=True) + ) + click.echo( + curve_to_functional_form(data_blob["curve_type"], data_blob["curve_data_"]) + ) + click.echo() + click.echo(click.style(f"Data for `{name}`", underline=True, bold=True)) + pprint(data_blob) + + if name is not None: + with local_persistant_storage("od_calibrations") as c: + display_from_calibration_blob(decode(c[name])) + else: + with local_persistant_storage("current_od_calibration") as c: + for angle in c.iterkeys(): + display_from_calibration_blob(decode(c[angle])) + click.echo() + click.echo() + click.echo() + + +def publish_to_leader(name: str) -> bool: + success = True + + with local_persistant_storage("od_calibrations") as all_calibrations: + calibration_result = decode( + all_calibrations[name], type=structs.subclass_union(structs.ODCalibration) + ) + + try: + res = put( + f"http://{leader_address}/api/calibrations", + encode(calibration_result), + headers={"Content-Type": "application/json"}, + ) + if not res.ok: + success = False + except Exception as e: + print(e) + success = False + if not success: + click.echo( + f"Could not update in database on leader at http://{leader_address}/api/calibrations ❌" + ) + return success + + +def change_current(name: str) -> None: + try: + with local_persistant_storage("od_calibrations") as all_calibrations: + new_calibration = decode( + all_calibrations[name], + type=structs.subclass_union(structs.ODCalibration), + ) + + angle = new_calibration.angle + with local_persistant_storage("current_od_calibration") as current_calibrations: + if angle in current_calibrations: + old_calibration = decode( + current_calibrations[angle], + type=structs.subclass_union(structs.ODCalibration), + ) + else: + old_calibration = None + + current_calibrations[angle] = encode(new_calibration) + + res = patch( + f"http://{leader_address}/api/calibrations/{get_unit_name()}/{new_calibration.type}/{new_calibration.name}", + json={"current": 1}, + ) + if not res.ok: + click.echo("Could not update in database on leader ❌") + + if old_calibration: + click.echo( + f"Replaced {old_calibration.name} with {new_calibration.name} ✅" + ) + else: + click.echo(f"Set {new_calibration.name} to current calibration ✅") + + except Exception: + click.echo("Failed to swap.") + raise click.Abort() + + +def list_() -> None: + # get current calibrations + current = [] + with local_persistant_storage("current_od_calibration") as c: + for _ in c.iterkeys(): + cal = decode(c[_], type=structs.subclass_union(structs.ODCalibration)) + current.append(cal.name) + + click.secho( + f"{'Name':15s} {'Date':18s} {'Angle':12s} {'Currently in use?':20s}", + bold=True, + ) + with local_persistant_storage("od_calibrations") as c: + for name in c.iterkeys(): + try: + cal = decode( + c[name], type=structs.subclass_union(structs.ODCalibration) + ) + click.secho( + f"{cal.name:15s} {cal.created_at:%d %b, %Y} {cal.angle:12s} {'✅' if cal.name in current else ''}", + ) + except Exception: + pass + + +@click.group(invoke_without_command=True, name="od_calibration_from_standards") +@click.pass_context +def click_od_calibration_from_standards(ctx): + """ + Calibrate OD600 to voltages + """ + if ctx.invoked_subcommand is None: + od_calibration_from_standards() + + +@click_od_calibration_from_standards.command(name="display") +@click.option("-n", "--name", type=click.STRING) +def click_display(name: str): + display(name) + + +@click_od_calibration_from_standards.command(name="change_current") +@click.argument("name", type=click.STRING) +def click_change_current(name: str): + change_current(name) + + +@click_od_calibration_from_standards.command(name="list") +def click_list(): + list_() + + +@click_od_calibration_from_standards.command(name="publish") +@click.argument("name", type=click.STRING) +def click_publish(name: str): + publish_to_leader(name) diff --git a/pioreactor/cli/pio.py b/pioreactor/cli/pio.py index 9b9edcbb..b8a93e83 100644 --- a/pioreactor/cli/pio.py +++ b/pioreactor/cli/pio.py @@ -116,12 +116,16 @@ def follow(filename, sleep_sec=0.2): @pio.command(name="log", short_help="logs a message from the CLI") -@click.option("-m", "--message", required=True, type=str, help="the message to append to the log") +@click.option( + "-m", "--message", required=True, type=str, help="the message to append to the log" +) @click.option( "-l", "--level", default="debug", - type=click.Choice(["debug", "info", "notice", "warning", "critical"], case_sensitive=False), + type=click.Choice( + ["debug", "info", "notice", "warning", "critical"], case_sensitive=False + ), ) @click.option( "-n", @@ -129,7 +133,9 @@ def follow(filename, sleep_sec=0.2): default="CLI", type=str, ) -@click.option("--local-only", is_flag=True, help="don't send to MQTT; write only to local disk") +@click.option( + "--local-only", is_flag=True, help="don't send to MQTT; write only to local disk" +) def log(message: str, level: str, name: str, local_only: bool): try: logger = create_logger( @@ -358,7 +364,9 @@ def update_settings(ctx, job: str) -> None: for setting, value in extra_args.items(): setting = setting.replace("-", "_") pubsub.publish( - f"pioreactor/{unit}/{exp}/{job}/{setting}/set", value, qos=pubsub.QOS.EXACTLY_ONCE + f"pioreactor/{unit}/{exp}/{job}/{setting}/set", + value, + qos=pubsub.QOS.EXACTLY_ONCE, ) @@ -379,7 +387,9 @@ def get_non_prerelease_tags_of_pioreactor(): response = get(url, headers=headers) if not response.ok: - raise Exception(f"Failed to retrieve releases (status code: {response.status_code})") + raise Exception( + f"Failed to retrieve releases (status code: {response.status_code})" + ) releases = response.json() non_prerelease_tags = [] @@ -419,7 +429,9 @@ def get_tag_to_install(version_desired: Optional[str]) -> str: ix = version_history.index(software_version) if ix >= 1: - tag = f"tags/{version_history[ix-1]}" # update to the succeeding version. + tag = ( + f"tags/{version_history[ix-1]}" # update to the succeeding version. + ) elif ix == 0: tag = "latest" # essentially a re-install? @@ -438,20 +450,26 @@ def get_tag_to_install(version_desired: Optional[str]) -> str: @click.option("-b", "--branch", help="install from a branch on github") @click.option("--source", help="use a URL or whl file") @click.option("-v", "--version", help="install a specific version, default is latest") -def update_app(branch: Optional[str], source: Optional[str], version: Optional[str]) -> None: +def update_app( + branch: Optional[str], source: Optional[str], version: Optional[str] +) -> None: """ Update the Pioreactor core software """ logger = create_logger( - "update-app", unit=whoami.get_unit_name(), experiment=whoami.UNIVERSAL_EXPERIMENT + "update-app", + unit=whoami.get_unit_name(), + experiment=whoami.UNIVERSAL_EXPERIMENT, ) commands_and_priority: list[tuple[str, int]] = [] if source is not None: version_installed = source - commands_and_priority.append((f"sudo pip3 install -U --force-reinstall {source}", 1)) + commands_and_priority.append( + (f"sudo pip3 install -U --force-reinstall {source}", 1) + ) elif branch is not None: version_installed = quote(branch) @@ -464,7 +482,9 @@ def update_app(branch: Optional[str], source: Optional[str], version: Optional[s else: tag = get_tag_to_install(version) - response = get(f"https://api.github.com/repos/pioreactor/pioreactor/releases/{tag}") + response = get( + f"https://api.github.com/repos/pioreactor/pioreactor/releases/{tag}" + ) if response.raise_for_status(): logger.error(f"Version {version} not found") raise click.Abort() @@ -495,7 +515,9 @@ def update_app(branch: Optional[str], source: Optional[str], version: Optional[s assert ( version_installed in url ), f"Hm, pip installing {url} but this doesn't match version specified for installing: {version_installed}" - commands_and_priority.extend([(f'sudo pip3 install "pioreactor @ {url}"', 2)]) + commands_and_priority.extend( + [(f'sudo pip3 install "pioreactor @ {url}"', 2)] + ) elif asset_name == "update.sh": commands_and_priority.extend( [ @@ -507,7 +529,10 @@ def update_app(branch: Optional[str], source: Optional[str], version: Optional[s commands_and_priority.extend( [ (f"wget -O /tmp/update.sql {url}", 5), - (f'sudo sqlite3 {config["storage"]["database"]} < /tmp/update.sql', 6), + ( + f'sudo sqlite3 {config["storage"]["database"]} < /tmp/update.sql', + 6, + ), ] ) elif asset_name == "post_update.sh": @@ -545,7 +570,9 @@ def update_firmware(version: Optional[str]) -> None: Update the RP2040 firmware """ logger = create_logger( - "update-app", unit=whoami.get_unit_name(), experiment=whoami.UNIVERSAL_EXPERIMENT + "update-app", + unit=whoami.get_unit_name(), + experiment=whoami.UNIVERSAL_EXPERIMENT, ) commands_and_priority: list[tuple[str, int]] = [] @@ -555,7 +582,9 @@ def update_firmware(version: Optional[str]) -> None: version = f"tags/{version}" release_metadata = loads( - get(f"https://api.github.com/repos/pioreactor/pico-build/releases/{version}").body + get( + f"https://api.github.com/repos/pioreactor/pico-build/releases/{version}" + ).body ) version_installed = release_metadata["tag_name"] @@ -614,6 +643,7 @@ def update_firmware(version: Optional[str]) -> None: run.add_command(actions.stirring_calibration.click_stirring_calibration) run.add_command(actions.pump_calibration.click_pump_calibration) run.add_command(actions.od_calibration.click_od_calibration) + run.add_command(actions.click_od_calibration_from_standards.click_od_calibration) # TODO: this only adds to `pio run` - what if users want to add a high level command? Examples? for plugin in pioreactor.plugin_management.get_plugins().values(): @@ -682,7 +712,11 @@ def add_pioreactor(hostname: str) -> None: raise click.Abort() res = subprocess.run( - ["bash", "/usr/local/bin/add_new_pioreactor_worker_from_leader.sh", hostname], + [ + "bash", + "/usr/local/bin/add_new_pioreactor_worker_from_leader.sh", + hostname, + ], capture_output=True, text=True, ) @@ -708,7 +742,9 @@ def discover_workers(terminate: bool) -> None: for hostname in discover_workers_on_network(terminate): click.echo(hostname) - @pio.command(name="cluster-status", short_help="report information on the pioreactor cluster") + @pio.command( + name="cluster-status", short_help="report information on the pioreactor cluster" + ) def cluster_status() -> None: """ Note that this only looks at the current cluster as defined in config.ini. @@ -746,7 +782,9 @@ def display_data_for(hostname_status: tuple[str, str]) -> bool: ip, state, reachable = get_network_metadata(hostname) - statef = click.style(f"{state:15s}", fg="green" if state == "ready" else "red") + statef = click.style( + f"{state:15s}", fg="green" if state == "ready" else "red" + ) ipf = f"{ip if (ip is not None) else 'unknown':20s}" is_leaderf = f"{('Y' if hostname==get_leader_hostname() else 'N'):15s}" @@ -754,7 +792,9 @@ def display_data_for(hostname_status: tuple[str, str]) -> bool: reachablef = f"{(click.style('Y', fg='green') if reachable else click.style('N', fg='red')):23s}" statusf = f"{(click.style('Y', fg='green') if (status == '1') else click.style('N', fg='red')):14s}" - click.echo(f"{hostnamef} {is_leaderf} {ipf} {statef} {reachablef} {statusf}") + click.echo( + f"{hostnamef} {is_leaderf} {ipf} {statef} {reachablef} {statusf}" + ) return reachable & (state == "ready") worker_statuses = list(config["cluster.inventory"].items()) @@ -777,7 +817,9 @@ def display_data_for(hostname_status: tuple[str, str]) -> bool: @click.option("-b", "--branch", help="install from a branch on github") @click.option("--source", help="use a tar.gz file") @click.option("-v", "--version", help="install a specific version") - def update_ui(branch: Optional[str], source: Optional[str], version: Optional[str]) -> None: + def update_ui( + branch: Optional[str], source: Optional[str], version: Optional[str] + ) -> None: """ Update the PioreactorUI @@ -785,7 +827,9 @@ def update_ui(branch: Optional[str], source: Optional[str], version: Optional[st This is what is provided from Github releases. """ logger = create_logger( - "update-ui", unit=whoami.get_unit_name(), experiment=whoami.UNIVERSAL_EXPERIMENT + "update-ui", + unit=whoami.get_unit_name(), + experiment=whoami.UNIVERSAL_EXPERIMENT, ) commands = [] @@ -806,7 +850,9 @@ def update_ui(branch: Optional[str], source: Optional[str], version: Optional[st else: latest_release_metadata = loads( - get(f"https://api.github.com/repos/pioreactor/pioreactorui/releases/{version}").body + get( + f"https://api.github.com/repos/pioreactor/pioreactorui/releases/{version}" + ).body ) version_installed = latest_release_metadata["tag_name"] url = f"https://github.com/Pioreactor/pioreactorui/archive/refs/tags/{version_installed}.tar.gz" @@ -815,7 +861,9 @@ def update_ui(branch: Optional[str], source: Optional[str], version: Optional[st assert source is not None assert version_installed is not None - commands.append(["bash", "/usr/local/bin/update_ui.sh", source, version_installed]) + commands.append( + ["bash", "/usr/local/bin/update_ui.sh", source, version_installed] + ) for command in commands: logger.debug(" ".join(command))