Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion configs/configurations/memory_usage/iocs.xml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
<?xml version="1.0" ?>
<iocs xmlns="http://epics.isis.rl.ac.uk/schema/iocs/1.0" xmlns:ioc="http://epics.isis.rl.ac.uk/schema/iocs/1.0" xmlns:xi="http://www.w3.org/2001/XInclude">
<ioc autostart="true" name="COUETTE_01" remotePvPrefix="" restart="true" simlevel="recsim">
<macros />
Expand Down Expand Up @@ -99,4 +100,4 @@
<pvs />
<pvsets />
</ioc>
</iocs>
</iocs>
1 change: 1 addition & 0 deletions configs/configurations/memory_usage/meta.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@
<edit>2019-02-04 12:15:24</edit>
</edits>
<isProtected>false</isProtected>
<isDynamic>false</isDynamic>
<configuresBlockGWAndArchiver>false</configuresBlockGWAndArchiver>
</meta>
2 changes: 1 addition & 1 deletion test_memory_usage.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def setUp(self) -> None:
# all tests that interact with anything but genie should try to load
# a config to ensure that the configurations
# in the tests are not broken, e.g. by a schema update
load_config_if_not_already_loaded(TYPICAL_CONFIG_NAME)
load_config_if_not_already_loaded(TYPICAL_CONFIG_NAME, timeout=360)

def get_current_memory_usage(self) -> float:
"""
Expand Down
95 changes: 55 additions & 40 deletions utilities/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import timeit
import unittest
from time import sleep, time
from typing import Callable
from typing import Any, Callable, ContextManager, ParamSpec, TypeVar

import six

Expand All @@ -27,9 +27,12 @@
except ImportError:
from genie_python.utilities import compress_and_hex, dehex_and_decompress

P = ParamSpec("P")
T = TypeVar("T")

WAIT_FOR_SERVER_TIMEOUT = 90
"""Number of seconds to wait for a pv to become available in the config server e.g. when it starts or
when it changed config"""
"""Number of seconds to wait for a pv to become available in the
config server e.g. when it starts or when it changed config"""

# Number of seconds to wait for the DAE settings to update
DAE_MODE_TIMEOUT = 120
Expand All @@ -41,7 +44,7 @@
BASE_MEMORY_USAGE = "BASE_MEMORY_USAGE"


def parameterized_list(cases):
def parameterized_list(cases: list[Any]) -> list[tuple[str, Any]]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For future I think this could be typed as:

T = TypeVar("T")

def parameterized_list(cases: list[T]) -> list[tuple[str, T]]:

or similar

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if T is a tuple then i think it is slightly different as in

parameterized_list([(1,2)]) -> "(1,2)", 1, 2 as opposed to "(1,2)", (1,2)

so T is unpacked ... so is that potentially tuple[str, Any, ...] ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see... that case can probably be done with typing.overload and typing.Unpack, but happy enough with Any for the moment then (and since this is only really used by tests, maybe not worth going too overboard with it)

"""
Creates a list of cases for parameterized to use to run tests.

Expand All @@ -68,7 +71,9 @@ def parameterized_list(cases):
return return_list


def load_config_if_not_already_loaded(config_name):
def load_config_if_not_already_loaded(
config_name: str, timeout: int = WAIT_FOR_SERVER_TIMEOUT
) -> None:
"""
Load a config by name if it has not already been loaded.

Expand All @@ -86,7 +91,7 @@ def load_config_if_not_already_loaded(config_name):

g.set_pv("CS:BLOCKSERVER:LOAD_CONFIG", value=compress_and_hex(config_name), is_local=True)
status_was_busy = False
for i in range(WAIT_FOR_SERVER_TIMEOUT):
for i in range(timeout):
status = get_server_status()
if status_was_busy and status == "":
break
Expand All @@ -103,19 +108,21 @@ def load_config_if_not_already_loaded(config_name):
)


def _get_config_name():
def _get_config_name() -> str:
"""
Returns the current config name after waiting for up to WAIT_FOR_SERVER_TIMEOUT seconds for it to be readable
Returns the current config name after waiting for up to WAIT_FOR_SERVER_TIMEOUT seconds
for it to be readable
Returns: the current configs name
Raises: AssertionError if the cv can not be read

"""
return get_config_details()["name"]


def get_config_details():
def get_config_details() -> dict:
"""
Returns the current config name after waiting for up to WAIT_FOR_SERVER_TIMEOUT seconds for it to be readable
Returns the current config name after waiting for up to WAIT_FOR_SERVER_TIMEOUT seconds
for it to be readable
Returns: the current configs name
Raises: AssertionError if the cv can not be read

Expand All @@ -136,7 +143,7 @@ def get_config_details():
raise final_exception


def get_server_status():
def get_server_status() -> str | None:
"""
Get the servers current status

Expand All @@ -155,7 +162,7 @@ def get_server_status():
return None


def set_genie_python_raises_exceptions(does_throw):
def set_genie_python_raises_exceptions(does_throw: bool) -> None:
"""
Set that genie python api raises exceptions instead of just logging a message
Args:
Expand All @@ -167,7 +174,7 @@ def set_genie_python_raises_exceptions(does_throw):
genie_api_setup._exceptions_raised = does_throw


def setup_simulated_wiring_tables(event_data=False):
def setup_simulated_wiring_tables(event_data: bool = False) -> None:
"""
Configures the DAE's wiring tables and sets the DAE to simulation mode

Expand Down Expand Up @@ -206,7 +213,7 @@ def setup_simulated_wiring_tables(event_data=False):
set_genie_python_raises_exceptions(False)


def _wait_for_and_assert_dae_simulation_mode(mode):
def _wait_for_and_assert_dae_simulation_mode(mode: bool) -> None:
"""
Waits for specified DAE simulation mode in the DAE

Expand All @@ -232,7 +239,7 @@ def _wait_for_and_assert_dae_simulation_mode(mode):
)


def set_wait_for_complete_callback_dae_settings(wait):
def set_wait_for_complete_callback_dae_settings(wait: bool) -> None:
"""Sets the wait for completion callback attribute of the DAE

@param wait: Boolean value, True if you want the DAE to wait for the operation
Expand All @@ -241,13 +248,13 @@ def set_wait_for_complete_callback_dae_settings(wait):
genie_api_setup.__api.dae.wait_for_completion_callback_dae_settings = wait


def temporarily_kill_icp():
def temporarily_kill_icp() -> ContextManager[None]:
# Temporarily kills the ISIS ICP (ISIS DAE)

return genie_api_setup.__api.dae.temporarily_kill_icp()


def as_seconds(time):
def as_seconds(time: str) -> int:
"""
Convert a up time to seconds
Args:
Expand All @@ -265,7 +272,7 @@ def as_seconds(time):
return seconds


def _start_stop_ioc_is_a_start(is_a_start, ioc_name):
def _start_stop_ioc_is_a_start(is_a_start: bool, ioc_name: str) -> None:
"""
Start or stop and ioc dependent on whether it "is_a_start"
Args:
Expand All @@ -285,12 +292,13 @@ def _start_stop_ioc_is_a_start(is_a_start, ioc_name):
wait_for_ioc_start_stop(timeout=IOCS_START_STOP_TIMEOUT, is_start=is_a_start, ioc_name=ioc_name)


def bulk_start_ioc(ioc_list):
def bulk_start_ioc(ioc_list: list[str]) -> tuple[list[str], list[str]]:
"""
start a list of IOCs in bulk
:param ioc_list: a list of the names of the IOCs to start
:return: a list of IOCs that failed to start after IOCS_START_STOP_TIMEOUT seconds
and a list of any IOCs that were not present in proc serv (this should be a very rare case)
and a list of any IOCs that were not present in proc serv
(this should be a very rare case)
"""
failed_to_start = []
not_in_proc_serv = []
Expand All @@ -301,7 +309,8 @@ def bulk_start_ioc(ioc_list):
except UnableToConnectToPVException:
not_in_proc_serv.append(ioc_name)
print(
f"{ioc_name} not found in proc serv, should this be added to the list of iocs to skip?"
f"{ioc_name} not found in proc serv, should this be added "
"to the list of iocs to skip?"
)
ioc_list = [ioc for ioc in ioc_list if ioc not in not_in_proc_serv]
for ioc_name in ioc_list:
Expand All @@ -314,7 +323,7 @@ def bulk_start_ioc(ioc_list):
return failed_to_start, not_in_proc_serv


def bulk_stop_ioc(ioc_list):
def bulk_stop_ioc(ioc_list: list[str]) -> list[str]:
"""
Stops a list of IOCs in bulk
:param ioc_list: a list of the names of the IOCs to stop
Expand All @@ -334,7 +343,7 @@ def bulk_stop_ioc(ioc_list):
return failed_to_stop


def start_ioc(ioc_name):
def start_ioc(ioc_name: str) -> None:
"""
Start the ioc
Args:
Expand All @@ -346,7 +355,7 @@ def start_ioc(ioc_name):
_start_stop_ioc_is_a_start(True, ioc_name)


def stop_ioc(ioc_name):
def stop_ioc(ioc_name: str) -> None:
"""
Stop the ioc
Args:
Expand All @@ -358,7 +367,7 @@ def stop_ioc(ioc_name):
_start_stop_ioc_is_a_start(False, ioc_name)


def wait_for_ioc_start_stop(timeout, is_start, ioc_name):
def wait_for_ioc_start_stop(timeout: int, is_start: bool, ioc_name: str) -> None:
"""
Wait for an ioc to start or stop, if timeout raise a timeout error
Args:
Expand All @@ -382,9 +391,10 @@ def wait_for_ioc_start_stop(timeout, is_start, ioc_name):
raise IOError(f"IOC {ioc_name} is not {'started' if is_start else 'stopped'}")


def quick_is_ioc_down(ioc_name):
def quick_is_ioc_down(ioc_name: str) -> bool:
"""
Determine if IOC is up by checking proc serv, cannot be used to make sure a PV has been started, but is
Determine if IOC is up by checking proc serv, cannot be used to make sure a PV
has been started, but is
good enough for checks before attempting to start/stop
:param ioc_name: The IOC to check
:return: True if IOC is up; False otherwise
Expand All @@ -393,7 +403,7 @@ def quick_is_ioc_down(ioc_name):
return running == "Shutdown"


def is_ioc_up(ioc_name):
def is_ioc_up(ioc_name: str) -> bool:
"""
Determine if IOC is up by checking for the existence of its heartbeat PV
Args:
Expand All @@ -411,7 +421,7 @@ def is_ioc_up(ioc_name):
return heartbeat is not None


def wait_for_iocs_to_be_up(ioc_names, seconds_to_wait):
def wait_for_iocs_to_be_up(ioc_names: list[str], seconds_to_wait: int) -> None:
"""
Wait for a number of iocs to be up by checking for existence of heartbeat PVs for each ioc.

Expand All @@ -432,11 +442,14 @@ def wait_for_iocs_to_be_up(ioc_names, seconds_to_wait):
sleep(1)
else:
raise AssertionError(
f"IOCs: {[ioc_name for ioc_name in ioc_names if not is_ioc_up(ioc_name)]} could not be started."
f"IOCs: {[ioc_name for ioc_name in ioc_names if not is_ioc_up(ioc_name)]} "
"could not be started."
)


def wait_for_string_pvs_to_not_be_empty(pvs, seconds_to_wait, is_local=True):
def wait_for_string_pvs_to_not_be_empty(
pvs: list[str], seconds_to_wait: int, is_local: bool = True
) -> dict:
"""
Wait for a number of string pvs to be non-empty and return their values.
Raises an assertion error if at least one is not found.
Expand Down Expand Up @@ -469,16 +482,16 @@ def wait_for_string_pvs_to_not_be_empty(pvs, seconds_to_wait, is_local=True):
return pv_values


def retry_on_failure(max_times):
def retry_on_failure(max_times: int) -> Callable[[Callable[P, T]], Callable[P, None]]:
"""
Decorator that will retry running a test if it failed.
:param max_times: Maximum number of times to retry running the test
:return: the decorator
"""

def decorator(func):
def decorator(func: Callable[P, T]) -> Callable[P, None]:
@six.wraps(func)
def wrapper(*args, **kwargs):
def wrapper(*args: P.args, **kwargs: P.kwargs) -> None:
err = None
for attempt in range(max_times):
try:
Expand All @@ -497,7 +510,7 @@ def wrapper(*args, **kwargs):
return decorator


def check_block_exists(block_name):
def check_block_exists(block_name: str) -> bool:
"""
Check that the given block name is in the current blocks.

Expand All @@ -511,10 +524,12 @@ def check_block_exists(block_name):
return block_name in blocks


def retry_assert(retry_limit: int, func: Callable[[], None], retry_time: float = 1.0):
def retry_assert(retry_limit: int, func: Callable[[], None], retry_time: float = 1.0) -> None:
"""
Take a function (func) that makes assertions. Try to call the function and catch any AssertionErrors if raised.
Repeat this until either the function does not raise an AssertionError or the retry_limit is reached.
Take a function (func) that makes assertions. Try to call the function and
catch any AssertionErrors if raised.
Repeat this until either the function does not raise an AssertionError
or the retry_limit is reached.
If the retry limit is reach reraise the last error.

Args:
Expand All @@ -537,7 +552,7 @@ def retry_assert(retry_limit: int, func: Callable[[], None], retry_time: float =
raise error


def get_execution_time(method):
def get_execution_time(method: Callable[[], None]) -> float:
"""
Takes a method and calculates its execution time.
Useful for tests that are time sensitive
Expand All @@ -556,7 +571,7 @@ def get_execution_time(method):
return execution_time


def assert_with_timeout(assertion: Callable, timeout: int):
def assert_with_timeout(assertion: Callable[[], None], timeout: int) -> None:
err = None
for _ in range(timeout):
try:
Expand Down
Loading