Skip to content

Commit

Permalink
use the web api instead of mqtt - I think this will break tests
Browse files Browse the repository at this point in the history
  • Loading branch information
CamDavidsonPilon committed Aug 27, 2024
1 parent 200c454 commit c2fe7a8
Showing 1 changed file with 21 additions and 9 deletions.
30 changes: 21 additions & 9 deletions pioreactor/actions/leader/experiment_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from pioreactor.logging import create_logger
from pioreactor.logging import CustomLogger
from pioreactor.pubsub import Client
from pioreactor.pubsub import put_into_leader
from pioreactor.pubsub import patch_into_leader
from pioreactor.utils import ClusterJobManager
from pioreactor.utils import managed_lifecycle
from pioreactor.utils.timing import catchtime
Expand Down Expand Up @@ -535,9 +535,9 @@ def _callable() -> None:
if dry_run:
logger.info(f"Dry-run: Starting {job_name} on {unit} with options {options} and args {args}.")
else:
client.publish(
f"pioreactor/{unit}/{experiment}/run/{job_name}",
encode(
patch_into_leader(
f"/api/workers/{unit}/experiment/{experiment}/jobs/{job_name}/run",
body=encode(
{
"options": evaluate_options(options, env) | {"job_source": "experiment_profile"},
"args": args,
Expand Down Expand Up @@ -573,7 +573,10 @@ def _callable() -> None:
if dry_run:
logger.info(f"Dry-run: Pausing {job_name} on {unit}.")
else:
client.publish(f"pioreactor/{unit}/{experiment}/{job_name}/$state/set", "sleeping")
patch_into_leader(
f"/api/workers/{unit}/experiment/{experiment}/jobs/{job_name}/update",
body=encode({"settings": {"state": "sleeping"}}),
)
else:
logger.debug(f"Action's `if` condition, `{if_}`, evaluated False. Skipping action.")

Expand Down Expand Up @@ -603,7 +606,10 @@ def _callable() -> None:
if dry_run:
logger.info(f"Dry-run: Resuming {job_name} on {unit}.")
else:
client.publish(f"pioreactor/{unit}/{experiment}/{job_name}/$state/set", "ready")
patch_into_leader(
f"/api/workers/{unit}/experiment/{experiment}/jobs/{job_name}/update",
body=encode({"settings": {"state": "ready"}}),
)
else:
logger.debug(f"Action's `if` condition, `{if_}`, evaluated False. Skipping action.")

Expand Down Expand Up @@ -633,7 +639,10 @@ def _callable() -> None:
if dry_run:
logger.info(f"Dry-run: Stopping {job_name} on {unit}.")
else:
client.publish(f"pioreactor/{unit}/{experiment}/{job_name}/$state/set", "disconnected")
patch_into_leader(
f"/api/workers/{unit}/experiment/{experiment}/jobs/{job_name}/update",
body=encode({"settings": {"state": "disconnected"}}),
)
else:
logger.debug(f"Action's `if` condition, `{if_}`, evaluated False. Skipping action.")

Expand Down Expand Up @@ -667,7 +676,10 @@ def _callable() -> None:

else:
for setting, value in evaluate_options(options, env).items():
client.publish(f"pioreactor/{unit}/{experiment}/{job_name}/{setting}/set", value)
patch_into_leader(
f"/api/workers/{unit}/experiment/{experiment}/jobs/{job_name}/update",
body=encode({"settings": {setting: value}}),
)
else:
logger.debug(f"Action's `if` condition, `{if_}`, evaluated False. Skipping action.")

Expand Down Expand Up @@ -727,7 +739,7 @@ def load_and_verify_profile(profile_filename: str) -> struct.Profile:
def push_labels_to_ui(experiment, labels_map: dict[str, str]) -> None:
try:
for unit_name, label in labels_map.items():
put_into_leader(
patch_into_leader(
f"/api/experiments/{experiment}/unit_labels", json={"unit": unit_name, "label": label}
)
except Exception:
Expand Down

0 comments on commit c2fe7a8

Please sign in to comment.