From c2fe7a842c642a03ff3d28e13e85951fd28fe45e Mon Sep 17 00:00:00 2001 From: CamDavidsonPilon Date: Tue, 27 Aug 2024 09:52:18 -0400 Subject: [PATCH] use the web api instead of mqtt - I think this will break tests --- .../actions/leader/experiment_profile.py | 30 +++++++++++++------ 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/pioreactor/actions/leader/experiment_profile.py b/pioreactor/actions/leader/experiment_profile.py index 9418494f..5af066cc 100644 --- a/pioreactor/actions/leader/experiment_profile.py +++ b/pioreactor/actions/leader/experiment_profile.py @@ -20,7 +20,7 @@ from pioreactor.logging import create_logger from pioreactor.logging import CustomLogger from pioreactor.pubsub import Client -from pioreactor.pubsub import put_into_leader +from pioreactor.pubsub import patch_into_leader from pioreactor.utils import ClusterJobManager from pioreactor.utils import managed_lifecycle from pioreactor.utils.timing import catchtime @@ -535,9 +535,9 @@ def _callable() -> None: if dry_run: logger.info(f"Dry-run: Starting {job_name} on {unit} with options {options} and args {args}.") else: - client.publish( - f"pioreactor/{unit}/{experiment}/run/{job_name}", - encode( + patch_into_leader( + f"/api/workers/{unit}/experiment/{experiment}/jobs/{job_name}/run", + body=encode( { "options": evaluate_options(options, env) | {"job_source": "experiment_profile"}, "args": args, @@ -573,7 +573,10 @@ def _callable() -> None: if dry_run: logger.info(f"Dry-run: Pausing {job_name} on {unit}.") else: - client.publish(f"pioreactor/{unit}/{experiment}/{job_name}/$state/set", "sleeping") + patch_into_leader( + f"/api/workers/{unit}/experiment/{experiment}/jobs/{job_name}/update", + body=encode({"settings": {"state": "sleeping"}}), + ) else: logger.debug(f"Action's `if` condition, `{if_}`, evaluated False. Skipping action.") @@ -603,7 +606,10 @@ def _callable() -> None: if dry_run: logger.info(f"Dry-run: Resuming {job_name} on {unit}.") else: - client.publish(f"pioreactor/{unit}/{experiment}/{job_name}/$state/set", "ready") + patch_into_leader( + f"/api/workers/{unit}/experiment/{experiment}/jobs/{job_name}/update", + body=encode({"settings": {"state": "ready"}}), + ) else: logger.debug(f"Action's `if` condition, `{if_}`, evaluated False. Skipping action.") @@ -633,7 +639,10 @@ def _callable() -> None: if dry_run: logger.info(f"Dry-run: Stopping {job_name} on {unit}.") else: - client.publish(f"pioreactor/{unit}/{experiment}/{job_name}/$state/set", "disconnected") + patch_into_leader( + f"/api/workers/{unit}/experiment/{experiment}/jobs/{job_name}/update", + body=encode({"settings": {"state": "disconnected"}}), + ) else: logger.debug(f"Action's `if` condition, `{if_}`, evaluated False. Skipping action.") @@ -667,7 +676,10 @@ def _callable() -> None: else: for setting, value in evaluate_options(options, env).items(): - client.publish(f"pioreactor/{unit}/{experiment}/{job_name}/{setting}/set", value) + patch_into_leader( + f"/api/workers/{unit}/experiment/{experiment}/jobs/{job_name}/update", + body=encode({"settings": {setting: value}}), + ) else: logger.debug(f"Action's `if` condition, `{if_}`, evaluated False. Skipping action.") @@ -727,7 +739,7 @@ def load_and_verify_profile(profile_filename: str) -> struct.Profile: def push_labels_to_ui(experiment, labels_map: dict[str, str]) -> None: try: for unit_name, label in labels_map.items(): - put_into_leader( + patch_into_leader( f"/api/experiments/{experiment}/unit_labels", json={"unit": unit_name, "label": label} ) except Exception: