-
Notifications
You must be signed in to change notification settings - Fork 24
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Enhencement for remove_state_hops_from_sm and remove_state_from_sm (#559
) * Enhencement for remove_state_hops_from_sm and remove_state_from_sm * style * d * helpers * no more Python 3.7 on CI * f * d * style * test
- Loading branch information
1 parent
24608a7
commit cb7cd6b
Showing
3 changed files
with
844 additions
and
40 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,7 +6,7 @@ | |
""" | ||
|
||
__author__ = 'Marcin Usielski' | ||
__copyright__ = 'Copyright (C) 2024, Nokia' | ||
__copyright__ = 'Copyright (C) 2024-2025, Nokia' | ||
__email__ = '[email protected]' | ||
import six | ||
import abc | ||
|
@@ -45,6 +45,27 @@ def __init__(self, sm_params, name=None, io_connection=None, io_type=None, varia | |
lazy_cmds_events=lazy_cmds_events) | ||
self._log(level=logging.WARNING, msg="Experimental device. May be deleted at any moment. Please don't use it in your scripts.") | ||
|
||
def _get_forbidden_states_no_proxy_pc(self): | ||
""" | ||
Get forbidden states when deleted states - no proxy pc. | ||
:return: dict with forbidden states or None if no forbidden states. | ||
""" | ||
return None | ||
|
||
def _get_additional_state_hops_no_proxy_pc(self): | ||
""" | ||
Get additional state hops if states are removed. None if no additional states are required. | ||
:return: dict with additional states or None if no additional states. | ||
""" | ||
return None | ||
|
||
def _get_forbidden_hops_no_proxy_pc(self): | ||
""" | ||
Get forbidden state hops when deleted states - no proxy pc. | ||
:return: dict with forbidden hops or None if no forbidden hops. | ||
""" | ||
return None | ||
|
||
def _prepare_sm_data(self, sm_params): | ||
self._prepare_dicts_for_sm(sm_params=sm_params) | ||
|
||
|
@@ -102,9 +123,12 @@ def _trim_config_dicts(self, default_sm_configurations, transitions, state_hops) | |
source_sm=default_sm_configurations[ProxyPc3.connection_hops], | ||
source_transitions=transitions, | ||
state_to_remove=ProxyPc3.proxy_pc, | ||
forbidden=self._get_forbidden_states_no_proxy_pc() | ||
) | ||
state_hops = remove_state_hops_from_sm( | ||
source_hops=state_hops, state_to_remove=ProxyPc3.proxy_pc | ||
source_hops=state_hops, state_to_remove=ProxyPc3.proxy_pc, | ||
additional_hops=self._get_additional_state_hops_no_proxy_pc(), | ||
forbidden_hops=self._get_forbidden_hops_no_proxy_pc(), | ||
) | ||
default_sm_configurations[ProxyPc3.connection_hops] = connection_hops | ||
return (default_sm_configurations, transitions, state_hops) | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,7 +4,7 @@ | |
""" | ||
|
||
__author__ = "Grzegorz Latuszek, Michal Ernst, Marcin Usielski" | ||
__copyright__ = "Copyright (C) 2018-2024, Nokia" | ||
__copyright__ = "Copyright (C) 2018-2025, Nokia" | ||
__email__ = ( | ||
"[email protected], [email protected], [email protected]" | ||
) | ||
|
@@ -577,7 +577,7 @@ def regexp_without_anchors(regexp): | |
return re.compile(regexp_str) | ||
|
||
|
||
def remove_state_from_sm(source_sm: dict, source_transitions: dict, state_to_remove: str) -> tuple: | ||
def remove_state_from_sm(source_sm: dict, source_transitions: dict, state_to_remove: str, forbidden: dict = None) -> tuple: | ||
""" | ||
Remove a state from a state machine dict. | ||
:param source_sm: a dict with a state machine description | ||
|
@@ -594,29 +594,31 @@ def remove_state_from_sm(source_sm: dict, source_transitions: dict, state_to_rem | |
if to_state == state_to_remove: | ||
states_from_state_to_remove.append(from_state) | ||
|
||
for to_state in source_sm[state_to_remove].keys(): | ||
for to_state in states_from_state_to_remove: | ||
if to_state == state_to_remove: | ||
continue | ||
for new_from in states_from_state_to_remove: | ||
if new_from != to_state: | ||
break | ||
if new_from not in new_sm: | ||
new_sm[new_from] = {} | ||
if new_from not in new_transitions: | ||
new_transitions[new_from] = {} | ||
|
||
new_sm[new_from][to_state] = copy.deepcopy(source_sm[state_to_remove][to_state]) | ||
if 'execute_command' in source_sm[new_from][state_to_remove]: | ||
new_sm[new_from][to_state]['execute_command'] = source_sm[new_from][state_to_remove]['execute_command'] | ||
|
||
if state_to_remove in source_transitions and to_state in source_transitions[state_to_remove]: | ||
new_transitions[new_from][to_state] = copy.deepcopy(source_transitions[state_to_remove][to_state]) | ||
else: | ||
new_transitions[new_from][to_state] = { | ||
"action": [ | ||
"_execute_command_to_change_state" | ||
], | ||
} | ||
if new_from == to_state: | ||
continue | ||
if forbidden and new_from in forbidden and forbidden[new_from] == to_state: | ||
continue | ||
if new_from not in new_sm: | ||
new_sm[new_from] = {} | ||
if new_from not in new_transitions: | ||
new_transitions[new_from] = {} | ||
|
||
new_sm[new_from][to_state] = copy.deepcopy(source_sm[state_to_remove][to_state]) | ||
if 'execute_command' in source_sm[new_from][state_to_remove]: | ||
new_sm[new_from][to_state]['execute_command'] = source_sm[new_from][state_to_remove]['execute_command'] | ||
|
||
if state_to_remove in source_transitions and to_state in source_transitions[state_to_remove]: | ||
new_transitions[new_from][to_state] = copy.deepcopy(source_transitions[state_to_remove][to_state]) | ||
else: | ||
new_transitions[new_from][to_state] = { | ||
"action": [ | ||
"_execute_command_to_change_state" | ||
], | ||
} | ||
|
||
_delete_state(sm=new_sm, state_to_remove=state_to_remove) | ||
_delete_state(sm=new_transitions, state_to_remove=state_to_remove) | ||
|
@@ -652,34 +654,43 @@ def _delete_empty_states(sm: dict) -> None: | |
del sm[state] | ||
|
||
|
||
def remove_state_hops_from_sm(source_hops: dict, state_to_remove: str) -> dict: | ||
def remove_state_hops_from_sm(source_hops: dict, state_to_remove: str, additional_hops: dict = None, forbidden_hops: dict = None) -> dict: | ||
""" | ||
Remove a state from a state machine dict. | ||
:param source_sm: a dict with state machine description | ||
:param state_to_remove: name of state to remove | ||
:param forbidden_hops: dict with forbidden transitions after remove, key is source, value is destination | ||
:return: a new state machine hops dict without state_to_remove | ||
""" | ||
new_hops = copy.deepcopy(source_hops) | ||
|
||
for from_state in source_hops.keys(): | ||
item = source_hops[from_state] | ||
for dest_state in item.keys(): | ||
direct_state = item[dest_state] | ||
if direct_state == state_to_remove: | ||
if state_to_remove in source_hops and dest_state in source_hops[state_to_remove]: | ||
if source_hops[state_to_remove][dest_state] == from_state: | ||
msg = f"Found cycle from '{from_state}' to '{dest_state}' via '{source_hops[state_to_remove][dest_state]}'. Please verify state hops: {source_hops}" | ||
for old_from_state in source_hops.keys(): | ||
item = source_hops[old_from_state] | ||
for old_dest_state in item.keys(): | ||
old_via_state = item[old_dest_state] | ||
if old_via_state == state_to_remove: | ||
if state_to_remove in source_hops and old_dest_state in source_hops[state_to_remove]: | ||
if source_hops[state_to_remove][old_dest_state] == old_from_state: | ||
msg = f"Found cycle from '{old_from_state}' to '{old_dest_state}' via '{source_hops[state_to_remove][old_dest_state]}'. Please verify state hops: {source_hops}" | ||
raise MolerException(msg) | ||
new_hops[from_state][dest_state] = source_hops[state_to_remove][dest_state] | ||
new_via_state = source_hops[old_via_state][old_dest_state] | ||
if forbidden_hops and old_from_state in forbidden_hops and old_dest_state in forbidden_hops[old_from_state] and forbidden_hops[old_from_state][old_dest_state] == new_via_state: | ||
if old_from_state in new_hops and old_dest_state in new_hops[old_from_state]: | ||
del new_hops[old_from_state][old_dest_state] | ||
else: | ||
new_hops[old_from_state][old_dest_state] = new_via_state | ||
else: | ||
del new_hops[from_state][dest_state] | ||
del new_hops[old_from_state][old_dest_state] | ||
|
||
for from_state in source_hops.keys(): | ||
if from_state in new_hops and state_to_remove in new_hops[from_state]: | ||
del new_hops[from_state][state_to_remove] | ||
for old_from_state in source_hops.keys(): | ||
if old_from_state in new_hops and state_to_remove in new_hops[old_from_state]: | ||
del new_hops[old_from_state][state_to_remove] | ||
|
||
if state_to_remove in new_hops: | ||
del new_hops[state_to_remove] | ||
|
||
_delete_empty_states(new_hops) | ||
if additional_hops: | ||
update_dict(new_hops, additional_hops) | ||
|
||
return new_hops |
Oops, something went wrong.