Skip to content

Commit

Permalink
--json
Browse files Browse the repository at this point in the history
  • Loading branch information
CamDavidsonPilon committed Sep 18, 2024
1 parent c8ffaa8 commit 17fa844
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 42 deletions.
103 changes: 67 additions & 36 deletions pioreactor/cli/pios.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from subprocess import run as run_ssh

import click
from msgspec.json import encode as dumps

from pioreactor.cluster_management import get_active_workers_in_inventory
from pioreactor.cluster_management import get_workers_in_inventory
Expand Down Expand Up @@ -64,6 +65,7 @@ def pios(ctx) -> None:
)

confirmation = click.option("-y", is_flag=True, help="Skip asking for confirmation.")
json_output = click.option("--json", is_flag=True, help="output as json")

def parse_click_arguments(input_list: list[str]) -> dict:
args: list[str] = []
Expand Down Expand Up @@ -249,23 +251,24 @@ def _thread_function(unit: str) -> bool:
@click.option("-s", "--source", help="use a release-***.zip already on the workers")
@which_units
@confirmation
@json_output
@click.pass_context
def update(
ctx,
source: str | None,
units: tuple[str, ...],
y: bool,
json: bool,
) -> None:
if ctx.invoked_subcommand is None:
logger = create_logger("update", unit=get_unit_name(), experiment=UNIVERSAL_EXPERIMENT)

units = universal_identifier_to_all_workers(units)

if not y:
confirm = input(f"Confirm updating app and ui on {units}? Y/n: ").strip()
if confirm != "Y":
raise click.Abort()

logger = create_logger("update", unit=get_unit_name(), experiment=UNIVERSAL_EXPERIMENT)
options: dict[str, str | None] = {}

if source is not None:
Expand Down Expand Up @@ -321,10 +324,12 @@ def _thread_function(unit: str) -> tuple[bool, dict]:
with ThreadPoolExecutor(max_workers=len(units)) as executor:
results = executor.map(_thread_function, units)

for result, api_result in results:
click.secho(api_result, fg="green" if result else "red")
if json:
for success, api_result in results:
api_result["status"] = "success" if success else "error"
click.echo(dumps(api_result))

if not all((r for r in results)):
if not all(success for (success, _) in results):
click.Abort()

@update.command(name="app", short_help="update Pioreactor app on workers")
Expand All @@ -338,18 +343,19 @@ def _thread_function(unit: str) -> tuple[bool, dict]:
@click.option("-s", "--source", help="install from a source, whl or release archive")
@which_units
@confirmation
@json_output
def update_app(
branch: str | None,
repo: str | None,
version: str | None,
source: str | None,
units: tuple[str, ...],
y: bool,
json: bool,
) -> None:
"""
Pulls and installs a Pioreactor software version across the cluster
"""
logger = create_logger("update", unit=get_unit_name(), experiment=UNIVERSAL_EXPERIMENT)

units = universal_identifier_to_all_workers(units)

Expand All @@ -358,6 +364,7 @@ def update_app(
if confirm != "Y":
raise click.Abort()

logger = create_logger("update", unit=get_unit_name(), experiment=UNIVERSAL_EXPERIMENT)
options: dict[str, str | None] = {}

# only one of these three is possible, mutually exclusive
Expand Down Expand Up @@ -385,10 +392,12 @@ def _thread_function(unit: str) -> tuple[bool, dict]:
with ThreadPoolExecutor(max_workers=len(units)) as executor:
results = executor.map(_thread_function, units)

for result, api_result in results:
click.secho(api_result, fg="green" if result else "red")
if json:
for success, api_result in results:
api_result["status"] = "success" if success else "error"
click.echo(dumps(api_result))

if not all((r for r in results)):
if not all(success for (success, _) in results):
click.Abort()

@update.command(name="ui", short_help="update Pioreactor ui on workers")
Expand All @@ -402,18 +411,19 @@ def _thread_function(unit: str) -> tuple[bool, dict]:
@click.option("-s", "--source", help="install from a source")
@which_units
@confirmation
@json_output
def update_ui(
branch: str | None,
repo: str | None,
version: str | None,
source: str | None,
units: tuple[str, ...],
y: bool,
json: bool,
) -> None:
"""
Pulls and installs a Pioreactor software version across the cluster
"""
logger = create_logger("update", unit=get_unit_name(), experiment=UNIVERSAL_EXPERIMENT)

units = universal_identifier_to_all_workers(units)

Expand All @@ -422,6 +432,7 @@ def update_ui(
if confirm != "Y":
raise click.Abort()

logger = create_logger("update", unit=get_unit_name(), experiment=UNIVERSAL_EXPERIMENT)
options: dict[str, str | None] = {}

# only one of these three is possible, mutually exclusive
Expand Down Expand Up @@ -449,10 +460,12 @@ def _thread_function(unit: str) -> tuple[bool, dict]:
with ThreadPoolExecutor(max_workers=len(units)) as executor:
results = executor.map(_thread_function, units)

for result, api_result in results:
click.secho(api_result, fg="green" if result else "red")
if json:
for success, api_result in results:
api_result["status"] = "success" if success else "error"
click.echo(dumps(api_result))

if not all((r for r in results)):
if not all(success for (success, _) in results):
click.Abort()

@pios.group()
Expand All @@ -468,11 +481,11 @@ def plugins():
)
@which_units
@confirmation
def install_plugin(plugin: str, source: str | None, units: tuple[str, ...], y: bool) -> None:
@json_output
def install_plugin(plugin: str, source: str | None, units: tuple[str, ...], y: bool, json: bool) -> None:
"""
Installs a plugin to worker and leader
"""
logger = create_logger("install_plugin", unit=get_unit_name(), experiment=UNIVERSAL_EXPERIMENT)

units = add_leader(universal_identifier_to_all_workers(units))

Expand All @@ -481,7 +494,9 @@ def install_plugin(plugin: str, source: str | None, units: tuple[str, ...], y: b
if confirm != "Y":
raise click.Abort()

logger = create_logger("install_plugin", unit=get_unit_name(), experiment=UNIVERSAL_EXPERIMENT)
commands = {"args": [plugin], "options": dict()}

if source:
commands["options"] = {"source": source}

Expand All @@ -499,31 +514,34 @@ def _thread_function(unit: str) -> tuple[bool, dict]:
with ThreadPoolExecutor(max_workers=len(units)) as executor:
results = executor.map(_thread_function, units)

for result, api_result in results:
click.secho(api_result, fg="green" if result else "red")
if json:
for success, api_result in results:
api_result["status"] = "success" if success else "error"
click.echo(dumps(api_result))

if not all((r for r in results)):
if not all(success for (success, _) in results):
click.Abort()

@plugins.command("uninstall", short_help="uninstall a plugin on workers")
@click.argument("plugin")
@which_units
@confirmation
def uninstall_plugin(plugin: str, units: tuple[str, ...], y: bool) -> None:
@json_output
def uninstall_plugin(plugin: str, units: tuple[str, ...], y: bool, json: bool) -> None:
"""
Uninstalls a plugin from worker and leader
"""

logger = create_logger("uninstall_plugin", unit=get_unit_name(), experiment=UNIVERSAL_EXPERIMENT)

units = add_leader(universal_identifier_to_all_workers(units))
commands = {"args": [plugin]}

if not y:
confirm = input(f"Confirm uninstalling {plugin} on {units}? Y/n: ").strip()
if confirm != "Y":
raise click.Abort()

logger = create_logger("uninstall_plugin", unit=get_unit_name(), experiment=UNIVERSAL_EXPERIMENT)
commands = {"args": [plugin]}

def _thread_function(unit: str) -> tuple[bool, dict]:
try:
r = post_into(
Expand All @@ -539,13 +557,12 @@ def _thread_function(unit: str) -> tuple[bool, dict]:
with ThreadPoolExecutor(max_workers=len(units)) as executor:
results = executor.map(_thread_function, units)

for result, api_result in results:
click.secho(
api_result,
fg="green" if result else "red",
)
if json:
for success, api_result in results:
api_result["status"] = "success" if success else "error"
click.echo(dumps(api_result))

if not all((r for r in results)):
if not all(success for (success, _) in results):
click.Abort()

@pios.command(name="sync-configs", short_help="sync config")
Expand All @@ -572,14 +589,15 @@ def sync_configs(shared: bool, specific: bool, skip_save: bool, units: tuple[str
If neither `--shared` not `--specific` are specified, both are set to true.
"""
logger = create_logger("sync_configs", unit=get_unit_name(), experiment=UNIVERSAL_EXPERIMENT)
units = add_leader(
universal_identifier_to_all_workers(units, filter_out_non_workers=False)
) # TODO: why is leader being added if I only specify a subset of units?

if not shared and not specific:
shared = specific = True

logger = create_logger("sync_configs", unit=get_unit_name(), experiment=UNIVERSAL_EXPERIMENT)

def _thread_function(unit: str) -> bool:
logger.debug(f"Syncing configs on {unit}...")
try:
Expand Down Expand Up @@ -613,6 +631,7 @@ def _thread_function(unit: str) -> bool:
@click.option("--name", type=click.STRING)
@which_units
@confirmation
@json_output
def kill(
job: str | None,
all_jobs: bool,
Expand All @@ -621,6 +640,7 @@ def kill(
name: str | None,
units: tuple[str, ...],
y: bool,
json: bool,
) -> None:
"""
Send a SIGTERM signal to JOB. JOB can be any Pioreactor job name, like "stirring".
Expand Down Expand Up @@ -648,8 +668,15 @@ def kill(
raise click.Abort()

with ClusterJobManager(units) as cm:
if not cm.kill_jobs(all_jobs=all_jobs, experiment=experiment, job_source=job_source, name=name):
raise click.Abort("Could not be completed. Check connections to workers?")
results = cm.kill_jobs(all_jobs=all_jobs, experiment=experiment, job_source=job_source, name=name)

if json:
for success, api_result in results:
api_result["status"] = "success" if success else "error"
click.echo(dumps(api_result))

if not all(success for (success, _) in results):
click.Abort()

@pios.command(
name="run",
Expand All @@ -659,8 +686,9 @@ def kill(
@click.argument("job", type=click.STRING)
@which_units
@confirmation
@json_output
@click.pass_context
def run(ctx, job: str, units: tuple[str, ...], y: bool) -> None:
def run(ctx, job: str, units: tuple[str, ...], y: bool, json: bool) -> None:
"""
Run a job on all, or specific, workers. Ex:
Expand All @@ -683,7 +711,6 @@ def run(ctx, job: str, units: tuple[str, ...], y: bool) -> None:
click.echo("Did you mean to use 'units' instead of 'unit'? Exiting.", err=True)
raise click.Abort()

data = parse_click_arguments(extra_args)
units = universal_identifier_to_all_active_workers(units)
assert len(units) > 0, "Empty units!"

Expand All @@ -692,6 +719,8 @@ def run(ctx, job: str, units: tuple[str, ...], y: bool) -> None:
if confirm != "Y":
raise click.Abort()

data = parse_click_arguments(extra_args)

def _thread_function(unit: str) -> tuple[bool, dict]:
try:
r = post_into(resolve_to_address(unit), f"/unit_api/jobs/run/job_name/{job}", json=data)
Expand All @@ -704,10 +733,12 @@ def _thread_function(unit: str) -> tuple[bool, dict]:
with ThreadPoolExecutor(max_workers=len(units)) as executor:
results = executor.map(_thread_function, units)

for result, api_result in results:
click.secho(api_result, fg="green" if result else "red")
if json:
for success, api_result in results:
api_result["status"] = "success" if success else "error"
click.echo(dumps(api_result))

if not all((r for r in results)):
if not all(success for (success, _) in results):
click.Abort()

@pios.command(
Expand Down
12 changes: 6 additions & 6 deletions pioreactor/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -658,9 +658,9 @@ def kill_jobs(
experiment: str | None = None,
name: str | None = None,
job_source: str | None = None,
) -> bool:
) -> list[tuple[bool, dict]]:
if len(self.units) == 0:
return True
return []

if experiment:
endpoint = f"/unit_api/jobs/stop/experiment/{experiment}"
Expand All @@ -671,19 +671,19 @@ def kill_jobs(
if all_jobs:
endpoint = "/unit_api/jobs/stop/all"

def _thread_function(unit: str) -> bool:
def _thread_function(unit: str) -> tuple[bool, dict]:
try:
r = patch_into(resolve_to_address(unit), endpoint)
r.raise_for_status()
return True
return True, r.json()
except Exception as e:
print(f"Failed to send kill command to {unit}: {e}")
return False
return False, {"unit": unit}

with ThreadPoolExecutor(max_workers=len(self.units)) as executor:
results = executor.map(_thread_function, self.units)

return all(results)
return list(results)

def __enter__(self) -> ClusterJobManager:
return self
Expand Down

0 comments on commit 17fa844

Please sign in to comment.