From 17fa844b00cd7630fe1b605356318f184e5fb5a1 Mon Sep 17 00:00:00 2001 From: CamDavidsonPilon Date: Tue, 17 Sep 2024 20:01:26 -0400 Subject: [PATCH] --json --- pioreactor/cli/pios.py | 103 +++++++++++++++++++++++------------ pioreactor/utils/__init__.py | 12 ++-- 2 files changed, 73 insertions(+), 42 deletions(-) diff --git a/pioreactor/cli/pios.py b/pioreactor/cli/pios.py index a6705ab0..eb2485ba 100644 --- a/pioreactor/cli/pios.py +++ b/pioreactor/cli/pios.py @@ -13,6 +13,7 @@ from subprocess import run as run_ssh import click +from msgspec.json import encode as dumps from pioreactor.cluster_management import get_active_workers_in_inventory from pioreactor.cluster_management import get_workers_in_inventory @@ -64,6 +65,7 @@ def pios(ctx) -> None: ) confirmation = click.option("-y", is_flag=True, help="Skip asking for confirmation.") + json_output = click.option("--json", is_flag=True, help="output as json") def parse_click_arguments(input_list: list[str]) -> dict: args: list[str] = [] @@ -249,16 +251,16 @@ def _thread_function(unit: str) -> bool: @click.option("-s", "--source", help="use a release-***.zip already on the workers") @which_units @confirmation + @json_output @click.pass_context def update( ctx, source: str | None, units: tuple[str, ...], y: bool, + json: bool, ) -> None: if ctx.invoked_subcommand is None: - logger = create_logger("update", unit=get_unit_name(), experiment=UNIVERSAL_EXPERIMENT) - units = universal_identifier_to_all_workers(units) if not y: @@ -266,6 +268,7 @@ def update( if confirm != "Y": raise click.Abort() + logger = create_logger("update", unit=get_unit_name(), experiment=UNIVERSAL_EXPERIMENT) options: dict[str, str | None] = {} if source is not None: @@ -321,10 +324,12 @@ def _thread_function(unit: str) -> tuple[bool, dict]: with ThreadPoolExecutor(max_workers=len(units)) as executor: results = executor.map(_thread_function, units) - for result, api_result in results: - click.secho(api_result, fg="green" if result else "red") + if json: + for success, api_result in results: + api_result["status"] = "success" if success else "error" + click.echo(dumps(api_result)) - if not all((r for r in results)): + if not all(success for (success, _) in results): click.Abort() @update.command(name="app", short_help="update Pioreactor app on workers") @@ -338,6 +343,7 @@ def _thread_function(unit: str) -> tuple[bool, dict]: @click.option("-s", "--source", help="install from a source, whl or release archive") @which_units @confirmation + @json_output def update_app( branch: str | None, repo: str | None, @@ -345,11 +351,11 @@ def update_app( source: str | None, units: tuple[str, ...], y: bool, + json: bool, ) -> None: """ Pulls and installs a Pioreactor software version across the cluster """ - logger = create_logger("update", unit=get_unit_name(), experiment=UNIVERSAL_EXPERIMENT) units = universal_identifier_to_all_workers(units) @@ -358,6 +364,7 @@ def update_app( if confirm != "Y": raise click.Abort() + logger = create_logger("update", unit=get_unit_name(), experiment=UNIVERSAL_EXPERIMENT) options: dict[str, str | None] = {} # only one of these three is possible, mutually exclusive @@ -385,10 +392,12 @@ def _thread_function(unit: str) -> tuple[bool, dict]: with ThreadPoolExecutor(max_workers=len(units)) as executor: results = executor.map(_thread_function, units) - for result, api_result in results: - click.secho(api_result, fg="green" if result else "red") + if json: + for success, api_result in results: + api_result["status"] = "success" if success else "error" + click.echo(dumps(api_result)) - if not all((r for r in results)): + if not all(success for (success, _) in results): click.Abort() @update.command(name="ui", short_help="update Pioreactor ui on workers") @@ -402,6 +411,7 @@ def _thread_function(unit: str) -> tuple[bool, dict]: @click.option("-s", "--source", help="install from a source") @which_units @confirmation + @json_output def update_ui( branch: str | None, repo: str | None, @@ -409,11 +419,11 @@ def update_ui( source: str | None, units: tuple[str, ...], y: bool, + json: bool, ) -> None: """ Pulls and installs a Pioreactor software version across the cluster """ - logger = create_logger("update", unit=get_unit_name(), experiment=UNIVERSAL_EXPERIMENT) units = universal_identifier_to_all_workers(units) @@ -422,6 +432,7 @@ def update_ui( if confirm != "Y": raise click.Abort() + logger = create_logger("update", unit=get_unit_name(), experiment=UNIVERSAL_EXPERIMENT) options: dict[str, str | None] = {} # only one of these three is possible, mutually exclusive @@ -449,10 +460,12 @@ def _thread_function(unit: str) -> tuple[bool, dict]: with ThreadPoolExecutor(max_workers=len(units)) as executor: results = executor.map(_thread_function, units) - for result, api_result in results: - click.secho(api_result, fg="green" if result else "red") + if json: + for success, api_result in results: + api_result["status"] = "success" if success else "error" + click.echo(dumps(api_result)) - if not all((r for r in results)): + if not all(success for (success, _) in results): click.Abort() @pios.group() @@ -468,11 +481,11 @@ def plugins(): ) @which_units @confirmation - def install_plugin(plugin: str, source: str | None, units: tuple[str, ...], y: bool) -> None: + @json_output + def install_plugin(plugin: str, source: str | None, units: tuple[str, ...], y: bool, json: bool) -> None: """ Installs a plugin to worker and leader """ - logger = create_logger("install_plugin", unit=get_unit_name(), experiment=UNIVERSAL_EXPERIMENT) units = add_leader(universal_identifier_to_all_workers(units)) @@ -481,7 +494,9 @@ def install_plugin(plugin: str, source: str | None, units: tuple[str, ...], y: b if confirm != "Y": raise click.Abort() + logger = create_logger("install_plugin", unit=get_unit_name(), experiment=UNIVERSAL_EXPERIMENT) commands = {"args": [plugin], "options": dict()} + if source: commands["options"] = {"source": source} @@ -499,31 +514,34 @@ def _thread_function(unit: str) -> tuple[bool, dict]: with ThreadPoolExecutor(max_workers=len(units)) as executor: results = executor.map(_thread_function, units) - for result, api_result in results: - click.secho(api_result, fg="green" if result else "red") + if json: + for success, api_result in results: + api_result["status"] = "success" if success else "error" + click.echo(dumps(api_result)) - if not all((r for r in results)): + if not all(success for (success, _) in results): click.Abort() @plugins.command("uninstall", short_help="uninstall a plugin on workers") @click.argument("plugin") @which_units @confirmation - def uninstall_plugin(plugin: str, units: tuple[str, ...], y: bool) -> None: + @json_output + def uninstall_plugin(plugin: str, units: tuple[str, ...], y: bool, json: bool) -> None: """ Uninstalls a plugin from worker and leader """ - logger = create_logger("uninstall_plugin", unit=get_unit_name(), experiment=UNIVERSAL_EXPERIMENT) - units = add_leader(universal_identifier_to_all_workers(units)) - commands = {"args": [plugin]} if not y: confirm = input(f"Confirm uninstalling {plugin} on {units}? Y/n: ").strip() if confirm != "Y": raise click.Abort() + logger = create_logger("uninstall_plugin", unit=get_unit_name(), experiment=UNIVERSAL_EXPERIMENT) + commands = {"args": [plugin]} + def _thread_function(unit: str) -> tuple[bool, dict]: try: r = post_into( @@ -539,13 +557,12 @@ def _thread_function(unit: str) -> tuple[bool, dict]: with ThreadPoolExecutor(max_workers=len(units)) as executor: results = executor.map(_thread_function, units) - for result, api_result in results: - click.secho( - api_result, - fg="green" if result else "red", - ) + if json: + for success, api_result in results: + api_result["status"] = "success" if success else "error" + click.echo(dumps(api_result)) - if not all((r for r in results)): + if not all(success for (success, _) in results): click.Abort() @pios.command(name="sync-configs", short_help="sync config") @@ -572,7 +589,6 @@ def sync_configs(shared: bool, specific: bool, skip_save: bool, units: tuple[str If neither `--shared` not `--specific` are specified, both are set to true. """ - logger = create_logger("sync_configs", unit=get_unit_name(), experiment=UNIVERSAL_EXPERIMENT) units = add_leader( universal_identifier_to_all_workers(units, filter_out_non_workers=False) ) # TODO: why is leader being added if I only specify a subset of units? @@ -580,6 +596,8 @@ def sync_configs(shared: bool, specific: bool, skip_save: bool, units: tuple[str if not shared and not specific: shared = specific = True + logger = create_logger("sync_configs", unit=get_unit_name(), experiment=UNIVERSAL_EXPERIMENT) + def _thread_function(unit: str) -> bool: logger.debug(f"Syncing configs on {unit}...") try: @@ -613,6 +631,7 @@ def _thread_function(unit: str) -> bool: @click.option("--name", type=click.STRING) @which_units @confirmation + @json_output def kill( job: str | None, all_jobs: bool, @@ -621,6 +640,7 @@ def kill( name: str | None, units: tuple[str, ...], y: bool, + json: bool, ) -> None: """ Send a SIGTERM signal to JOB. JOB can be any Pioreactor job name, like "stirring". @@ -648,8 +668,15 @@ def kill( raise click.Abort() with ClusterJobManager(units) as cm: - if not cm.kill_jobs(all_jobs=all_jobs, experiment=experiment, job_source=job_source, name=name): - raise click.Abort("Could not be completed. Check connections to workers?") + results = cm.kill_jobs(all_jobs=all_jobs, experiment=experiment, job_source=job_source, name=name) + + if json: + for success, api_result in results: + api_result["status"] = "success" if success else "error" + click.echo(dumps(api_result)) + + if not all(success for (success, _) in results): + click.Abort() @pios.command( name="run", @@ -659,8 +686,9 @@ def kill( @click.argument("job", type=click.STRING) @which_units @confirmation + @json_output @click.pass_context - def run(ctx, job: str, units: tuple[str, ...], y: bool) -> None: + def run(ctx, job: str, units: tuple[str, ...], y: bool, json: bool) -> None: """ Run a job on all, or specific, workers. Ex: @@ -683,7 +711,6 @@ def run(ctx, job: str, units: tuple[str, ...], y: bool) -> None: click.echo("Did you mean to use 'units' instead of 'unit'? Exiting.", err=True) raise click.Abort() - data = parse_click_arguments(extra_args) units = universal_identifier_to_all_active_workers(units) assert len(units) > 0, "Empty units!" @@ -692,6 +719,8 @@ def run(ctx, job: str, units: tuple[str, ...], y: bool) -> None: if confirm != "Y": raise click.Abort() + data = parse_click_arguments(extra_args) + def _thread_function(unit: str) -> tuple[bool, dict]: try: r = post_into(resolve_to_address(unit), f"/unit_api/jobs/run/job_name/{job}", json=data) @@ -704,10 +733,12 @@ def _thread_function(unit: str) -> tuple[bool, dict]: with ThreadPoolExecutor(max_workers=len(units)) as executor: results = executor.map(_thread_function, units) - for result, api_result in results: - click.secho(api_result, fg="green" if result else "red") + if json: + for success, api_result in results: + api_result["status"] = "success" if success else "error" + click.echo(dumps(api_result)) - if not all((r for r in results)): + if not all(success for (success, _) in results): click.Abort() @pios.command( diff --git a/pioreactor/utils/__init__.py b/pioreactor/utils/__init__.py index fc09ceae..f5275295 100644 --- a/pioreactor/utils/__init__.py +++ b/pioreactor/utils/__init__.py @@ -658,9 +658,9 @@ def kill_jobs( experiment: str | None = None, name: str | None = None, job_source: str | None = None, - ) -> bool: + ) -> list[tuple[bool, dict]]: if len(self.units) == 0: - return True + return [] if experiment: endpoint = f"/unit_api/jobs/stop/experiment/{experiment}" @@ -671,19 +671,19 @@ def kill_jobs( if all_jobs: endpoint = "/unit_api/jobs/stop/all" - def _thread_function(unit: str) -> bool: + def _thread_function(unit: str) -> tuple[bool, dict]: try: r = patch_into(resolve_to_address(unit), endpoint) r.raise_for_status() - return True + return True, r.json() except Exception as e: print(f"Failed to send kill command to {unit}: {e}") - return False + return False, {"unit": unit} with ThreadPoolExecutor(max_workers=len(self.units)) as executor: results = executor.map(_thread_function, self.units) - return all(results) + return list(results) def __enter__(self) -> ClusterJobManager: return self