diff --git a/pioreactor/actions/leader/experiment_profile.py b/pioreactor/actions/leader/experiment_profile.py index 8f7f9ebc..725248f3 100644 --- a/pioreactor/actions/leader/experiment_profile.py +++ b/pioreactor/actions/leader/experiment_profile.py @@ -349,6 +349,7 @@ def _callable() -> None: condition_met = evaluate_bool_expression(condition, env) except MQTTValueError: condition_met = False + if condition_met: for action in actions: schedule.enter( @@ -637,8 +638,7 @@ def _callable() -> None: logger.info(f"Dry-run: Stopping {job_name} on {unit}.") else: patch_into_leader( - f"/api/workers/{unit}/jobs/update/job_name/{job_name}/experiments/{experiment}", - json={"settings": {"$state": "disconnected"}}, + f"/api/workers/{unit}/jobs/stop/job_name/{job_name}/experiments/{experiment}", ) else: logger.debug(f"Action's `if` condition, `{if_}`, evaluated False. Skipping action.") diff --git a/pioreactor/tests/conftest.py b/pioreactor/tests/conftest.py index a068cd00..d6b69674 100644 --- a/pioreactor/tests/conftest.py +++ b/pioreactor/tests/conftest.py @@ -5,10 +5,12 @@ import re from unittest.mock import MagicMock from unittest.mock import patch +from urllib.parse import urlparse import pytest from pioreactor.mureq import Response +from pioreactor.pubsub import publish @pytest.fixture(autouse=True) @@ -57,26 +59,23 @@ def active_workers_in_cluster(): @pytest.fixture(autouse=True) def mock_external_leader_webserver_apis(mocker, active_workers_in_cluster): - # used mostly in pioreactor.config.py def mock_get_response(endpoint): + mm = MagicMock() if endpoint.endswith("/api/workers"): - mm = MagicMock() mm.json.return_value = [ {"pioreactor_unit": unit, "is_active": 1} for unit in active_workers_in_cluster ] + [{"pioreactor_unit": "notactiveworker", "is_active": 0}] return mm elif re.search("/api/experiments/.*/workers", endpoint): - mm = MagicMock() mm.json.return_value = [ {"pioreactor_unit": unit, "is_active": 1} for unit in active_workers_in_cluster ] return mm elif re.search("/api/workers/.*/experiment", endpoint): - mm = MagicMock() mm.json.return_value = {"experiment": "_testing_experiment"} return mm else: - raise ValueError(f"{endpoint} not mocked") + raise ValueError(f"TODO: {endpoint} not mocked") mock_get = mocker.patch( "pioreactor.cluster_management.get_from_leader", autospec=True, side_effect=mock_get_response @@ -86,11 +85,16 @@ def mock_get_response(endpoint): class CapturedRequest: - def __init__(self, method, url, headers, body): + def __init__(self, method, url, headers, body, json): self.method = method self.url = url self.headers = headers self.body = body + self.json = json + + r = urlparse(url) + + self.path = r.path @contextlib.contextmanager @@ -101,7 +105,14 @@ def mock_request(method, url, **kwargs): # Capture the request details headers = kwargs.get("headers") body = kwargs.get("body", None) - bucket.append(CapturedRequest(method, url, headers, body)) + json = kwargs.get("json", None) + bucket.append(CapturedRequest(method, url, headers, body, json)) + + if re.search("/api/workers/.*/jobs/update/job_name/.*/experiments/.*", url): + # fire a mqtt too + r = re.search("/api/workers/(.*)/jobs/update/job_name/(.*)/experiments/(.*)", url) + for setting, v in json["settings"].items(): + publish(f"pioreactor/{r.groups()[0]}/{r.groups()[2]}/{r.groups()[1]}/{setting}/set", v) # Return a mock response object return Response(url, 200, {}, b'{"mocked": "response"}') diff --git a/pioreactor/tests/test_execute_experiment_profile.py b/pioreactor/tests/test_execute_experiment_profile.py index 88d11aa1..0b332efe 100644 --- a/pioreactor/tests/test_execute_experiment_profile.py +++ b/pioreactor/tests/test_execute_experiment_profile.py @@ -29,6 +29,7 @@ from pioreactor.pubsub import publish from pioreactor.pubsub import subscribe_and_callback from pioreactor.structs import ODReading +from pioreactor.tests.conftest import capture_requests from pioreactor.utils.timing import current_utc_datetime @@ -66,24 +67,15 @@ def test_execute_experiment_profile_order( mock__load_experiment_profile.return_value = profile - actions = [] - - def collection_actions(msg): - actions.append(msg.topic) - - subscribe_and_callback( - collection_actions, - [f"pioreactor/unit1/{experiment}/#"], - allow_retained=False, - ) - - execute_experiment_profile("profile.yaml", experiment) + with capture_requests() as bucket: + execute_experiment_profile("profile.yaml", experiment) - assert actions == [ - f"pioreactor/unit1/{experiment}/run/job1", - f"pioreactor/unit1/{experiment}/run/job2", - f"pioreactor/unit1/{experiment}/job2/$state/set", - ] + assert bucket[0].path == "/api/experiments/_testing_experiment/unit_labels" + assert bucket[0].json == {"label": "label1", "unit": "unit1"} + assert bucket[1].path == "/api/workers/unit1/jobs/run/job_name/job1/experiments/_testing_experiment" + assert bucket[2].path == "/api/workers/unit2/jobs/run/job_name/job1/experiments/_testing_experiment" + assert bucket[3].path == "/api/workers/unit1/jobs/run/job_name/job2/experiments/_testing_experiment" + assert bucket[4].path == "/api/workers/unit1/jobs/stop/job_name/job2/experiments/_testing_experiment" @patch("pioreactor.actions.leader.experiment_profile._load_experiment_profile") @@ -103,33 +95,26 @@ def test_execute_experiment_profile_hack_for_led_intensity(mock__load_experiment mock__load_experiment_profile.return_value = profile - actions = [] - - def collection_actions(msg): - actions.append((msg.topic, msg.payload.decode())) + with capture_requests() as bucket: + execute_experiment_profile("profile.yaml", experiment) - subscribe_and_callback( - collection_actions, - [f"pioreactor/unit1/{experiment}/#"], - allow_retained=False, + assert ( + bucket[0].path == "/api/workers/unit1/jobs/run/job_name/led_intensity/experiments/_testing_experiment" ) + assert bucket[0].json == {"options": {"A": 50, "job_source": "experiment_profile"}, "args": []} - execute_experiment_profile("profile.yaml", experiment) + assert ( + bucket[1].path == "/api/workers/unit1/jobs/run/job_name/led_intensity/experiments/_testing_experiment" + ) + assert bucket[1].json == {"options": {"A": 40, "B": 22.5, "job_source": "experiment_profile"}, "args": []} - assert actions == [ - ( - f"pioreactor/unit1/{experiment}/run/led_intensity", - '{"options":{"A":50,"job_source":"experiment_profile"},"args":[]}', - ), - ( - f"pioreactor/unit1/{experiment}/run/led_intensity", - '{"options":{"A":40,"B":22.5,"job_source":"experiment_profile"},"args":[]}', - ), - ( - f"pioreactor/unit1/{experiment}/run/led_intensity", - '{"options":{"A":0,"B":0,"C":0,"D":0,"job_source":"experiment_profile"},"args":[]}', - ), - ] + assert ( + bucket[2].path == "/api/workers/unit1/jobs/run/job_name/led_intensity/experiments/_testing_experiment" + ) + assert bucket[2].json == { + "options": {"A": 0, "B": 0, "C": 0, "D": 0, "job_source": "experiment_profile"}, + "args": [], + } @pytest.mark.skipif(os.getenv("GITHUB_ACTIONS") == "true", reason="flakey test in CI???") @@ -279,22 +264,11 @@ def test_execute_experiment_profile_simple_if2(mock__load_experiment_profile) -> mock__load_experiment_profile.return_value = profile - actions = [] - - def collection_actions(msg): - actions.append(msg.topic) - - subscribe_and_callback( - collection_actions, - [f"pioreactor/unit1/{experiment}/#"], - allow_retained=False, - ) - - execute_experiment_profile("profile.yaml", experiment) + with capture_requests() as bucket: + execute_experiment_profile("profile.yaml", experiment) - assert actions == [ - f"pioreactor/unit1/{experiment}/run/jobbing", - ] + assert len(bucket) == 1 + assert bucket[0].path == "/api/workers/unit1/jobs/run/job_name/jobbing/experiments/_testing_experiment" @patch("pioreactor.actions.leader.experiment_profile._load_experiment_profile") @@ -317,22 +291,11 @@ def test_execute_experiment_profile_with_unit_function(mock__load_experiment_pro mock__load_experiment_profile.return_value = profile - actions = [] - - def collection_actions(msg): - actions.append(msg.topic) - - subscribe_and_callback( - collection_actions, - [f"pioreactor/unit1/{experiment}/#"], - allow_retained=False, - ) + with capture_requests() as bucket: + execute_experiment_profile("profile.yaml", experiment) - execute_experiment_profile("profile.yaml", experiment) - - assert actions == [ - f"pioreactor/unit1/{experiment}/run/jobbing", - ] + assert len(bucket) == 1 + assert bucket[0].path == "/api/workers/unit1/jobs/run/job_name/jobbing/experiments/_testing_experiment" action_true = Start(hours_elapsed=0, if_="${{ unit() == unit2 }}") @@ -351,11 +314,10 @@ def collection_actions(msg): mock__load_experiment_profile.return_value = profile - actions = [] - - execute_experiment_profile("profile.yaml", experiment) + with capture_requests() as bucket: + execute_experiment_profile("profile.yaml", experiment) - assert actions == [] + assert len(bucket) == 0 @patch("pioreactor.actions.leader.experiment_profile._load_experiment_profile") @@ -382,24 +344,16 @@ def test_execute_experiment_profile_simple_if(mock__load_experiment_profile) -> mock__load_experiment_profile.return_value = profile - actions = [] - - def collection_actions(msg): - actions.append(msg.topic) + with capture_requests() as bucket: + execute_experiment_profile("profile.yaml", experiment) - subscribe_and_callback( - collection_actions, - [f"pioreactor/unit1/{experiment}/#"], - allow_retained=False, + assert len(bucket) == 2 + assert bucket[0].path == "/api/workers/unit1/jobs/run/job_name/jobbing/experiments/_testing_experiment" + assert ( + bucket[1].path + == "/api/workers/unit1/jobs/run/job_name/conditional_jobbing/experiments/_testing_experiment" ) - execute_experiment_profile("profile.yaml", experiment) - - assert actions == [ - f"pioreactor/unit1/{experiment}/run/jobbing", - f"pioreactor/unit1/{experiment}/run/conditional_jobbing", - ] - @patch("pioreactor.actions.leader.experiment_profile._load_experiment_profile") def test_execute_experiment_profile_expression(mock__load_experiment_profile) -> None: @@ -427,22 +381,13 @@ def test_execute_experiment_profile_expression(mock__load_experiment_profile) -> mock__load_experiment_profile.return_value = profile - actions = [] - - def collection_actions(msg): - actions.append(msg.payload.decode()) - - subscribe_and_callback( - collection_actions, - [f"pioreactor/unit1/{experiment}/run/jobbing"], - allow_retained=False, - ) - - execute_experiment_profile("profile.yaml", experiment) + with capture_requests() as bucket: + execute_experiment_profile("profile.yaml", experiment) - assert actions == [ - '{"options":{"target":11.0,"dont_eval":"1.0 + 1.0","job_source":"experiment_profile"},"args":[]}' - ] + assert bucket[0].json == { + "options": {"target": 11.0, "dont_eval": "1.0 + 1.0", "job_source": "experiment_profile"}, + "args": [], + } @patch("pioreactor.actions.leader.experiment_profile._load_experiment_profile") @@ -540,22 +485,18 @@ def test_execute_experiment_profile_expression_in_common( mock__load_experiment_profile.return_value = profile - actions = [] - - def collection_actions(msg): - actions.append(msg.payload.decode()) - - subscribe_and_callback( - collection_actions, - [f"pioreactor/{worker}/_testing_experiment/run/jobbing" for worker in active_workers_in_cluster], - allow_retained=False, - ) - - execute_experiment_profile("profile.yaml", "_testing_experiment") + with capture_requests() as bucket: + execute_experiment_profile("profile.yaml", "_testing_experiment") - assert actions == ['{"options":{"target":11.0,"job_source":"experiment_profile"},"args":[]}'] * len( - active_workers_in_cluster - ) + assert len(bucket) == len(active_workers_in_cluster) + for item in bucket: + assert item.json == { + "args": [], + "options": { + "job_source": "experiment_profile", + "target": 11.0, + }, + } @patch("pioreactor.actions.leader.experiment_profile._load_experiment_profile") @@ -584,22 +525,18 @@ def test_execute_experiment_profile_expression_in_common_also_works_with_unit_fu mock__load_experiment_profile.return_value = profile - actions = [] - - def collection_actions(msg): - actions.append(msg.payload.decode()) - - subscribe_and_callback( - collection_actions, - [f"pioreactor/{worker}/_testing_experiment/run/jobbing" for worker in active_workers_in_cluster], - allow_retained=False, - ) - - execute_experiment_profile("profile.yaml", "_testing_experiment") + with capture_requests() as bucket: + execute_experiment_profile("profile.yaml", "_testing_experiment") - assert actions == ['{"options":{"target":11.0,"job_source":"experiment_profile"},"args":[]}'] * len( - active_workers_in_cluster - ) + assert len(bucket) == len(active_workers_in_cluster) + for item in bucket: + assert item.json == { + "args": [], + "options": { + "job_source": "experiment_profile", + "target": 11.0, + }, + } @patch("pioreactor.actions.leader.experiment_profile._load_experiment_profile") @@ -853,26 +790,12 @@ def test_api_requests_are_made( mock__load_experiment_profile.return_value = profile - from pioreactor.tests.conftest import capture_requests - with capture_requests() as bucket: execute_experiment_profile("profile.yaml", experiment) assert len(bucket) == 5 - assert bucket[0].url == f"http://localhost:4999/api/experiments/{experiment}/unit_labels" - assert ( - bucket[1].url - == f"http://localhost:4999/api/workers/unit1/jobs/run/job_name/job1/experiments/{experiment}" - ) - assert ( - bucket[2].url - == f"http://localhost:4999/api/workers/unit2/jobs/run/job_name/job1/experiments/{experiment}" - ) - assert ( - bucket[3].url - == f"http://localhost:4999/api/workers/unit1/jobs/run/job_name/job2/experiments/{experiment}" - ) - assert ( - bucket[4].url - == f"http://localhost:4999/api/workers/unit1/jobs/update/job_name/job2/experiments/{experiment}" - ) + assert bucket[0].path == f"/api/experiments/{experiment}/unit_labels" + assert bucket[1].path == f"/api/workers/unit1/jobs/run/job_name/job1/experiments/{experiment}" + assert bucket[2].path == f"/api/workers/unit2/jobs/run/job_name/job1/experiments/{experiment}" + assert bucket[3].path == f"/api/workers/unit1/jobs/run/job_name/job2/experiments/{experiment}" + assert bucket[4].path == f"/api/workers/unit1/jobs/stop/job_name/job2/experiments/{experiment}" diff --git a/pioreactor/tests/test_update_scripts.py b/pioreactor/tests/test_update_scripts.py index 3ffd0ff1..f1a54421 100644 --- a/pioreactor/tests/test_update_scripts.py +++ b/pioreactor/tests/test_update_scripts.py @@ -21,8 +21,11 @@ def test_pio_commands(): for script in scripts: with open(script, "r") as file: for line_number, line in enumerate(file, start=1): + if line.lstrip().startswith("#"): # comment + continue + # Checking for 'pio' not preceded by 'su -u pioreactor' - if (" pio " in line or line.strip().startswith('pio')) and "sudo -u pioreactor" not in line: + if (" pio " in line or line.strip().startswith("pio")) and "sudo -u pioreactor" not in line: error_msgs.append( f"Error in {script} at line {line_number}: 'pio' command must be prefixed with 'su -u pioreactor'." )