diff --git a/configs/configurations/memory_usage/iocs.xml b/configs/configurations/memory_usage/iocs.xml
index c57654c..6ee3a53 100644
--- a/configs/configurations/memory_usage/iocs.xml
+++ b/configs/configurations/memory_usage/iocs.xml
@@ -1,3 +1,4 @@
+
@@ -99,4 +100,4 @@
-
\ No newline at end of file
+
diff --git a/configs/configurations/memory_usage/meta.xml b/configs/configurations/memory_usage/meta.xml
index f22bab5..f8f0add 100644
--- a/configs/configurations/memory_usage/meta.xml
+++ b/configs/configurations/memory_usage/meta.xml
@@ -6,5 +6,6 @@
2019-02-04 12:15:24
false
+ false
false
diff --git a/test_memory_usage.py b/test_memory_usage.py
index d2d09ec..a587b2c 100644
--- a/test_memory_usage.py
+++ b/test_memory_usage.py
@@ -30,7 +30,7 @@ def setUp(self) -> None:
# all tests that interact with anything but genie should try to load
# a config to ensure that the configurations
# in the tests are not broken, e.g. by a schema update
- load_config_if_not_already_loaded(TYPICAL_CONFIG_NAME)
+ load_config_if_not_already_loaded(TYPICAL_CONFIG_NAME, timeout=360)
def get_current_memory_usage(self) -> float:
"""
diff --git a/utilities/utilities.py b/utilities/utilities.py
index 49398ef..3414ac4 100644
--- a/utilities/utilities.py
+++ b/utilities/utilities.py
@@ -7,7 +7,7 @@
import timeit
import unittest
from time import sleep, time
-from typing import Callable
+from typing import Any, Callable, ContextManager, ParamSpec, TypeVar
import six
@@ -27,9 +27,12 @@
except ImportError:
from genie_python.utilities import compress_and_hex, dehex_and_decompress
+P = ParamSpec("P")
+T = TypeVar("T")
+
WAIT_FOR_SERVER_TIMEOUT = 90
-"""Number of seconds to wait for a pv to become available in the config server e.g. when it starts or
-when it changed config"""
+"""Number of seconds to wait for a pv to become available in the
+ config server e.g. when it starts or when it changed config"""
# Number of seconds to wait for the DAE settings to update
DAE_MODE_TIMEOUT = 120
@@ -41,7 +44,7 @@
BASE_MEMORY_USAGE = "BASE_MEMORY_USAGE"
-def parameterized_list(cases):
+def parameterized_list(cases: list[Any]) -> list[tuple[str, Any]]:
"""
Creates a list of cases for parameterized to use to run tests.
@@ -68,7 +71,9 @@ def parameterized_list(cases):
return return_list
-def load_config_if_not_already_loaded(config_name):
+def load_config_if_not_already_loaded(
+ config_name: str, timeout: int = WAIT_FOR_SERVER_TIMEOUT
+) -> None:
"""
Load a config by name if it has not already been loaded.
@@ -86,7 +91,7 @@ def load_config_if_not_already_loaded(config_name):
g.set_pv("CS:BLOCKSERVER:LOAD_CONFIG", value=compress_and_hex(config_name), is_local=True)
status_was_busy = False
- for i in range(WAIT_FOR_SERVER_TIMEOUT):
+ for i in range(timeout):
status = get_server_status()
if status_was_busy and status == "":
break
@@ -103,9 +108,10 @@ def load_config_if_not_already_loaded(config_name):
)
-def _get_config_name():
+def _get_config_name() -> str:
"""
- Returns the current config name after waiting for up to WAIT_FOR_SERVER_TIMEOUT seconds for it to be readable
+ Returns the current config name after waiting for up to WAIT_FOR_SERVER_TIMEOUT seconds
+ for it to be readable
Returns: the current configs name
Raises: AssertionError if the cv can not be read
@@ -113,9 +119,10 @@ def _get_config_name():
return get_config_details()["name"]
-def get_config_details():
+def get_config_details() -> dict:
"""
- Returns the current config name after waiting for up to WAIT_FOR_SERVER_TIMEOUT seconds for it to be readable
+ Returns the current config name after waiting for up to WAIT_FOR_SERVER_TIMEOUT seconds
+ for it to be readable
Returns: the current configs name
Raises: AssertionError if the cv can not be read
@@ -136,7 +143,7 @@ def get_config_details():
raise final_exception
-def get_server_status():
+def get_server_status() -> str | None:
"""
Get the servers current status
@@ -155,7 +162,7 @@ def get_server_status():
return None
-def set_genie_python_raises_exceptions(does_throw):
+def set_genie_python_raises_exceptions(does_throw: bool) -> None:
"""
Set that genie python api raises exceptions instead of just logging a message
Args:
@@ -167,7 +174,7 @@ def set_genie_python_raises_exceptions(does_throw):
genie_api_setup._exceptions_raised = does_throw
-def setup_simulated_wiring_tables(event_data=False):
+def setup_simulated_wiring_tables(event_data: bool = False) -> None:
"""
Configures the DAE's wiring tables and sets the DAE to simulation mode
@@ -206,7 +213,7 @@ def setup_simulated_wiring_tables(event_data=False):
set_genie_python_raises_exceptions(False)
-def _wait_for_and_assert_dae_simulation_mode(mode):
+def _wait_for_and_assert_dae_simulation_mode(mode: bool) -> None:
"""
Waits for specified DAE simulation mode in the DAE
@@ -232,7 +239,7 @@ def _wait_for_and_assert_dae_simulation_mode(mode):
)
-def set_wait_for_complete_callback_dae_settings(wait):
+def set_wait_for_complete_callback_dae_settings(wait: bool) -> None:
"""Sets the wait for completion callback attribute of the DAE
@param wait: Boolean value, True if you want the DAE to wait for the operation
@@ -241,13 +248,13 @@ def set_wait_for_complete_callback_dae_settings(wait):
genie_api_setup.__api.dae.wait_for_completion_callback_dae_settings = wait
-def temporarily_kill_icp():
+def temporarily_kill_icp() -> ContextManager[None]:
# Temporarily kills the ISIS ICP (ISIS DAE)
return genie_api_setup.__api.dae.temporarily_kill_icp()
-def as_seconds(time):
+def as_seconds(time: str) -> int:
"""
Convert a up time to seconds
Args:
@@ -265,7 +272,7 @@ def as_seconds(time):
return seconds
-def _start_stop_ioc_is_a_start(is_a_start, ioc_name):
+def _start_stop_ioc_is_a_start(is_a_start: bool, ioc_name: str) -> None:
"""
Start or stop and ioc dependent on whether it "is_a_start"
Args:
@@ -285,12 +292,13 @@ def _start_stop_ioc_is_a_start(is_a_start, ioc_name):
wait_for_ioc_start_stop(timeout=IOCS_START_STOP_TIMEOUT, is_start=is_a_start, ioc_name=ioc_name)
-def bulk_start_ioc(ioc_list):
+def bulk_start_ioc(ioc_list: list[str]) -> tuple[list[str], list[str]]:
"""
start a list of IOCs in bulk
:param ioc_list: a list of the names of the IOCs to start
:return: a list of IOCs that failed to start after IOCS_START_STOP_TIMEOUT seconds
- and a list of any IOCs that were not present in proc serv (this should be a very rare case)
+ and a list of any IOCs that were not present in proc serv
+ (this should be a very rare case)
"""
failed_to_start = []
not_in_proc_serv = []
@@ -301,7 +309,8 @@ def bulk_start_ioc(ioc_list):
except UnableToConnectToPVException:
not_in_proc_serv.append(ioc_name)
print(
- f"{ioc_name} not found in proc serv, should this be added to the list of iocs to skip?"
+ f"{ioc_name} not found in proc serv, should this be added "
+ "to the list of iocs to skip?"
)
ioc_list = [ioc for ioc in ioc_list if ioc not in not_in_proc_serv]
for ioc_name in ioc_list:
@@ -314,7 +323,7 @@ def bulk_start_ioc(ioc_list):
return failed_to_start, not_in_proc_serv
-def bulk_stop_ioc(ioc_list):
+def bulk_stop_ioc(ioc_list: list[str]) -> list[str]:
"""
Stops a list of IOCs in bulk
:param ioc_list: a list of the names of the IOCs to stop
@@ -334,7 +343,7 @@ def bulk_stop_ioc(ioc_list):
return failed_to_stop
-def start_ioc(ioc_name):
+def start_ioc(ioc_name: str) -> None:
"""
Start the ioc
Args:
@@ -346,7 +355,7 @@ def start_ioc(ioc_name):
_start_stop_ioc_is_a_start(True, ioc_name)
-def stop_ioc(ioc_name):
+def stop_ioc(ioc_name: str) -> None:
"""
Stop the ioc
Args:
@@ -358,7 +367,7 @@ def stop_ioc(ioc_name):
_start_stop_ioc_is_a_start(False, ioc_name)
-def wait_for_ioc_start_stop(timeout, is_start, ioc_name):
+def wait_for_ioc_start_stop(timeout: int, is_start: bool, ioc_name: str) -> None:
"""
Wait for an ioc to start or stop, if timeout raise a timeout error
Args:
@@ -382,9 +391,10 @@ def wait_for_ioc_start_stop(timeout, is_start, ioc_name):
raise IOError(f"IOC {ioc_name} is not {'started' if is_start else 'stopped'}")
-def quick_is_ioc_down(ioc_name):
+def quick_is_ioc_down(ioc_name: str) -> bool:
"""
- Determine if IOC is up by checking proc serv, cannot be used to make sure a PV has been started, but is
+ Determine if IOC is up by checking proc serv, cannot be used to make sure a PV
+ has been started, but is
good enough for checks before attempting to start/stop
:param ioc_name: The IOC to check
:return: True if IOC is up; False otherwise
@@ -393,7 +403,7 @@ def quick_is_ioc_down(ioc_name):
return running == "Shutdown"
-def is_ioc_up(ioc_name):
+def is_ioc_up(ioc_name: str) -> bool:
"""
Determine if IOC is up by checking for the existence of its heartbeat PV
Args:
@@ -411,7 +421,7 @@ def is_ioc_up(ioc_name):
return heartbeat is not None
-def wait_for_iocs_to_be_up(ioc_names, seconds_to_wait):
+def wait_for_iocs_to_be_up(ioc_names: list[str], seconds_to_wait: int) -> None:
"""
Wait for a number of iocs to be up by checking for existence of heartbeat PVs for each ioc.
@@ -432,11 +442,14 @@ def wait_for_iocs_to_be_up(ioc_names, seconds_to_wait):
sleep(1)
else:
raise AssertionError(
- f"IOCs: {[ioc_name for ioc_name in ioc_names if not is_ioc_up(ioc_name)]} could not be started."
+ f"IOCs: {[ioc_name for ioc_name in ioc_names if not is_ioc_up(ioc_name)]} "
+ "could not be started."
)
-def wait_for_string_pvs_to_not_be_empty(pvs, seconds_to_wait, is_local=True):
+def wait_for_string_pvs_to_not_be_empty(
+ pvs: list[str], seconds_to_wait: int, is_local: bool = True
+) -> dict:
"""
Wait for a number of string pvs to be non-empty and return their values.
Raises an assertion error if at least one is not found.
@@ -469,16 +482,16 @@ def wait_for_string_pvs_to_not_be_empty(pvs, seconds_to_wait, is_local=True):
return pv_values
-def retry_on_failure(max_times):
+def retry_on_failure(max_times: int) -> Callable[[Callable[P, T]], Callable[P, None]]:
"""
Decorator that will retry running a test if it failed.
:param max_times: Maximum number of times to retry running the test
:return: the decorator
"""
- def decorator(func):
+ def decorator(func: Callable[P, T]) -> Callable[P, None]:
@six.wraps(func)
- def wrapper(*args, **kwargs):
+ def wrapper(*args: P.args, **kwargs: P.kwargs) -> None:
err = None
for attempt in range(max_times):
try:
@@ -497,7 +510,7 @@ def wrapper(*args, **kwargs):
return decorator
-def check_block_exists(block_name):
+def check_block_exists(block_name: str) -> bool:
"""
Check that the given block name is in the current blocks.
@@ -511,10 +524,12 @@ def check_block_exists(block_name):
return block_name in blocks
-def retry_assert(retry_limit: int, func: Callable[[], None], retry_time: float = 1.0):
+def retry_assert(retry_limit: int, func: Callable[[], None], retry_time: float = 1.0) -> None:
"""
- Take a function (func) that makes assertions. Try to call the function and catch any AssertionErrors if raised.
- Repeat this until either the function does not raise an AssertionError or the retry_limit is reached.
+ Take a function (func) that makes assertions. Try to call the function and
+ catch any AssertionErrors if raised.
+ Repeat this until either the function does not raise an AssertionError
+ or the retry_limit is reached.
If the retry limit is reach reraise the last error.
Args:
@@ -537,7 +552,7 @@ def retry_assert(retry_limit: int, func: Callable[[], None], retry_time: float =
raise error
-def get_execution_time(method):
+def get_execution_time(method: Callable[[], None]) -> float:
"""
Takes a method and calculates its execution time.
Useful for tests that are time sensitive
@@ -556,7 +571,7 @@ def get_execution_time(method):
return execution_time
-def assert_with_timeout(assertion: Callable, timeout: int):
+def assert_with_timeout(assertion: Callable[[], None], timeout: int) -> None:
err = None
for _ in range(timeout):
try: