From bae1e0a4c98081aecf944ffe7a37834d11c3783d Mon Sep 17 00:00:00 2001 From: Yuhu-kth Date: Tue, 23 Jul 2024 20:52:04 +0200 Subject: [PATCH] finished emulation_util test w/mypy checked --- .../csle-common/tests/test_emulation_util.py | 211 +++++++++++++++++- 1 file changed, 210 insertions(+), 1 deletion(-) diff --git a/simulation-system/libs/csle-common/tests/test_emulation_util.py b/simulation-system/libs/csle-common/tests/test_emulation_util.py index cb09aea43..d82c40075 100644 --- a/simulation-system/libs/csle-common/tests/test_emulation_util.py +++ b/simulation-system/libs/csle-common/tests/test_emulation_util.py @@ -1,9 +1,9 @@ from unittest.mock import patch, MagicMock from csle_common.util.emulation_util import EmulationUtil -import csle_common.constants.constants as constants from csle_common.dao.emulation_observation.common.emulation_connection_observation_state import ( EmulationConnectionObservationState, ) +from csle_common.dao.emulation_action.attacker.emulation_attacker_action_type import EmulationAttackerActionType class TestEmulationUtilSuite: @@ -117,3 +117,212 @@ def test_is_connection_active(self) -> None: assert result mock_conn.get_transport.assert_called() mock_transport.is_active.assert_called() + + @patch("paramiko.SSHClient") + def test_setup_custom_connection(self, mock_SSHClient) -> None: + """ + Test utility function for setting up a custom SSH connection given credentials and a proxy connection + + :param mock_SSHClient: mock_SSHClient + + :return: None + """ + user = "user" + pw = "password" + source_ip = "192.168.1.1" + port = 22 + target_ip = "192.168.1.2" + root = True + + mock_proxy_conn = MagicMock() + mock_transport = MagicMock() + mock_proxy_conn.conn.get_transport.return_value = mock_transport + + mock_ssh_client = MagicMock() + mock_SSHClient.return_value = mock_ssh_client + mock_relay_channel = MagicMock() + mock_transport.open_channel.return_value = mock_relay_channel + mock_transport.is_active.return_value = True + + result = EmulationUtil.setup_custom_connection( + user=user, pw=pw, source_ip=source_ip, port=port, target_ip=target_ip, proxy_conn=mock_proxy_conn, root=root + ) + mock_SSHClient.assert_called() + assert result + + def test_write_remote_file(self) -> None: + """ + Test utility function for writing contents to a file + + :return: None + """ + mock_conn = MagicMock() + mock_sftp_client = MagicMock() + mock_remote_file = MagicMock() + + mock_conn.open_sftp.return_value = mock_sftp_client + mock_sftp_client.file.return_value = mock_remote_file + + file_name = "filename.txt" + contents = "content" + write_mode = "w" + + EmulationUtil.write_remote_file(conn=mock_conn, file_name=file_name, contents=contents, write_mode=write_mode) + mock_conn.open_sftp.assert_called() + mock_sftp_client.file.assert_called() + mock_remote_file.write.assert_called() + mock_remote_file.close.assert_called() + + def test_connect_admin(self) -> None: + """ + Test the method that connects the admin agent + + :return: None + """ + emulation_env_config = MagicMock() + ip = "192.168.1.100" + create_producer = True + EmulationUtil.connect_admin(emulation_env_config=emulation_env_config, ip=ip, create_producer=create_producer) + assert emulation_env_config.agent_ip == ip + emulation_env_config.connect.assert_called() + + def test_disconnect_admin(self) -> None: + """ + Test the method that disconnects the admin agent + + :return: None + """ + emulation_env_config = MagicMock() + EmulationUtil.disconnect_admin(emulation_env_config=emulation_env_config) + emulation_env_config.close_all_connections.assert_called() + + def test_execute_cmd_interactive_channel(self) -> None: + """ + Test method that executes an action on the emulation using an interactive shell (non synchronous) + + :return: None + """ + mock_channel = MagicMock() + cmd = "ls -la" + EmulationUtil.execute_cmd_interactive_channel(cmd=cmd, channel=mock_channel) + mock_channel.send.assert_called() + + @patch("csle_common.util.emulation_util.EmulationUtil.read_result_interactive_channel") + def test_read_result_interactive(self, mock_read_result_interactive_channel) -> None: + """ + Test the method that reads the result of an action executed in interactive mode + + :param mock_read_result_interactive_channel: mock_read_result_interactive_channel + + :return: None + """ + mock_read_result_interactive_channel.return_value = "result" + emulation_env_config = MagicMock() + mock_channel = MagicMock() + result = EmulationUtil.read_result_interactive(emulation_env_config=emulation_env_config, channel=mock_channel) + assert result == "result" + + def test_read_result_interactive_channel(self) -> None: + """ + Test the method that reads the result of an action executed in interactive mode + + :return: None + """ + emulation_env_config = MagicMock() + mock_channel = MagicMock() + mock_channel.recv_ready.side_effect = [False, True] + mock_channel.recv.return_value = b"Mocked result" + mock_shell_escape = MagicMock() + mock_shell_escape.sub.return_value = "result" + result = EmulationUtil.read_result_interactive_channel( + emulation_env_config=emulation_env_config, channel=mock_channel + ) + mock_channel.recv_ready.assert_called() + mock_channel.recv.assert_called() + assert result + + def test_is_emulation_defense_action_legal(self) -> None: + """ + Test the method that checks if a given defense action is legal in the current state of the environment + + :return: None + """ + defense_action_id = 1 + emulation_env_config = MagicMock() + emulation_env_state = MagicMock() + result = EmulationUtil.is_emulation_defense_action_legal( + defense_action_id=defense_action_id, env_config=emulation_env_config, env_state=emulation_env_state + ) + assert result + + @patch("csle_common.util.env_dynamics_util.EnvDynamicsUtil.logged_in_ips_str") + def test_is_emulation_attack_action_legal(self, mock_logged_in_ips_str) -> None: + """ + Test the method that checks if a given attack action is legal in the current state of the environment + + :param mock_logged_in_ips_str: mock_logged_in_ips_str + + :return: None + """ + mock_env_config = MagicMock() + mock_env_state = MagicMock() + + mock_env_state.attacker_action_config.actions = [MagicMock(id=0), MagicMock(id=1)] + mock_env_state.attacker_obs_state = MagicMock() + mock_env_state.attacker_obs_state.get_action_ips.return_value = "192.168.1.1" + mock_env_state.attacker_obs_state.actions_tried = set() + mock_env_state.attacker_obs_state.machines = [ + MagicMock( + ips="192.168.1.1", + logged_in=True, + root=False, + filesystem_searched=False, + tools_installed=False, + backdoor_installed=False, + untried_credentials=True, + ) + ] + + action_id = 1 + mock_action = MagicMock() + mock_action.id = action_id + mock_action.index = 0 + mock_action.type = EmulationAttackerActionType.RECON + mock_env_state.attacker_action_config.actions[action_id] = mock_action + + result = EmulationUtil.is_emulation_attack_action_legal(action_id, mock_env_config, mock_env_state) + assert not result + + def test_check_pid(self) -> None: + """ + Test the method that checks if a given pid is running on the host + + :return: None + """ + result = EmulationUtil.check_pid(1) + assert result + + def test_physical_ip_match(self) -> None: + """ + Test utility method for checking if a container ip matches a physical ip + + :return: None + """ + emulation_env_config = MagicMock() + emulation_env_config.kafka_config.container.docker_gw_bridge_ip = "192.168.0.1" + emulation_env_config.kafka_config.container.physical_host_ip = "10.0.0.1" + emulation_env_config.elk_config.container.docker_gw_bridge_ip = "192.168.0.2" + emulation_env_config.elk_config.container.physical_host_ip = "10.0.0.2" + emulation_env_config.sdn_controller_config = None + + mock_container = MagicMock() + mock_container.physical_host_ip = "10.0.0.3" + emulation_env_config.containers_config.get_container_from_ip.return_value = mock_container + + ip = "192.168.0.1" + physical_host_ip = "10.0.0.1" + + result = EmulationUtil.physical_ip_match( + emulation_env_config=emulation_env_config, ip=ip, physical_host_ip=physical_host_ip + ) + assert result