Skip to content

Commit

Permalink
finished emulation_util test w/mypy checked
Browse files Browse the repository at this point in the history
  • Loading branch information
Yuhu-kth committed Jul 23, 2024
1 parent 3e40f84 commit bae1e0a
Showing 1 changed file with 210 additions and 1 deletion.
211 changes: 210 additions & 1 deletion simulation-system/libs/csle-common/tests/test_emulation_util.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from unittest.mock import patch, MagicMock
from csle_common.util.emulation_util import EmulationUtil
import csle_common.constants.constants as constants
from csle_common.dao.emulation_observation.common.emulation_connection_observation_state import (
EmulationConnectionObservationState,
)
from csle_common.dao.emulation_action.attacker.emulation_attacker_action_type import EmulationAttackerActionType


class TestEmulationUtilSuite:
Expand Down Expand Up @@ -117,3 +117,212 @@ def test_is_connection_active(self) -> None:
assert result
mock_conn.get_transport.assert_called()
mock_transport.is_active.assert_called()

@patch("paramiko.SSHClient")
def test_setup_custom_connection(self, mock_SSHClient) -> None:
"""
Test utility function for setting up a custom SSH connection given credentials and a proxy connection
:param mock_SSHClient: mock_SSHClient
:return: None
"""
user = "user"
pw = "password"
source_ip = "192.168.1.1"
port = 22
target_ip = "192.168.1.2"
root = True

mock_proxy_conn = MagicMock()
mock_transport = MagicMock()
mock_proxy_conn.conn.get_transport.return_value = mock_transport

mock_ssh_client = MagicMock()
mock_SSHClient.return_value = mock_ssh_client
mock_relay_channel = MagicMock()
mock_transport.open_channel.return_value = mock_relay_channel
mock_transport.is_active.return_value = True

result = EmulationUtil.setup_custom_connection(
user=user, pw=pw, source_ip=source_ip, port=port, target_ip=target_ip, proxy_conn=mock_proxy_conn, root=root
)
mock_SSHClient.assert_called()
assert result

def test_write_remote_file(self) -> None:
"""
Test utility function for writing contents to a file
:return: None
"""
mock_conn = MagicMock()
mock_sftp_client = MagicMock()
mock_remote_file = MagicMock()

mock_conn.open_sftp.return_value = mock_sftp_client
mock_sftp_client.file.return_value = mock_remote_file

file_name = "filename.txt"
contents = "content"
write_mode = "w"

EmulationUtil.write_remote_file(conn=mock_conn, file_name=file_name, contents=contents, write_mode=write_mode)
mock_conn.open_sftp.assert_called()
mock_sftp_client.file.assert_called()
mock_remote_file.write.assert_called()
mock_remote_file.close.assert_called()

def test_connect_admin(self) -> None:
"""
Test the method that connects the admin agent
:return: None
"""
emulation_env_config = MagicMock()
ip = "192.168.1.100"
create_producer = True
EmulationUtil.connect_admin(emulation_env_config=emulation_env_config, ip=ip, create_producer=create_producer)
assert emulation_env_config.agent_ip == ip
emulation_env_config.connect.assert_called()

def test_disconnect_admin(self) -> None:
"""
Test the method that disconnects the admin agent
:return: None
"""
emulation_env_config = MagicMock()
EmulationUtil.disconnect_admin(emulation_env_config=emulation_env_config)
emulation_env_config.close_all_connections.assert_called()

def test_execute_cmd_interactive_channel(self) -> None:
"""
Test method that executes an action on the emulation using an interactive shell (non synchronous)
:return: None
"""
mock_channel = MagicMock()
cmd = "ls -la"
EmulationUtil.execute_cmd_interactive_channel(cmd=cmd, channel=mock_channel)
mock_channel.send.assert_called()

@patch("csle_common.util.emulation_util.EmulationUtil.read_result_interactive_channel")
def test_read_result_interactive(self, mock_read_result_interactive_channel) -> None:
"""
Test the method that reads the result of an action executed in interactive mode
:param mock_read_result_interactive_channel: mock_read_result_interactive_channel
:return: None
"""
mock_read_result_interactive_channel.return_value = "result"
emulation_env_config = MagicMock()
mock_channel = MagicMock()
result = EmulationUtil.read_result_interactive(emulation_env_config=emulation_env_config, channel=mock_channel)
assert result == "result"

def test_read_result_interactive_channel(self) -> None:
"""
Test the method that reads the result of an action executed in interactive mode
:return: None
"""
emulation_env_config = MagicMock()
mock_channel = MagicMock()
mock_channel.recv_ready.side_effect = [False, True]
mock_channel.recv.return_value = b"Mocked result"
mock_shell_escape = MagicMock()
mock_shell_escape.sub.return_value = "result"
result = EmulationUtil.read_result_interactive_channel(
emulation_env_config=emulation_env_config, channel=mock_channel
)
mock_channel.recv_ready.assert_called()
mock_channel.recv.assert_called()
assert result

def test_is_emulation_defense_action_legal(self) -> None:
"""
Test the method that checks if a given defense action is legal in the current state of the environment
:return: None
"""
defense_action_id = 1
emulation_env_config = MagicMock()
emulation_env_state = MagicMock()
result = EmulationUtil.is_emulation_defense_action_legal(
defense_action_id=defense_action_id, env_config=emulation_env_config, env_state=emulation_env_state
)
assert result

@patch("csle_common.util.env_dynamics_util.EnvDynamicsUtil.logged_in_ips_str")
def test_is_emulation_attack_action_legal(self, mock_logged_in_ips_str) -> None:
"""
Test the method that checks if a given attack action is legal in the current state of the environment
:param mock_logged_in_ips_str: mock_logged_in_ips_str
:return: None
"""
mock_env_config = MagicMock()
mock_env_state = MagicMock()

mock_env_state.attacker_action_config.actions = [MagicMock(id=0), MagicMock(id=1)]
mock_env_state.attacker_obs_state = MagicMock()
mock_env_state.attacker_obs_state.get_action_ips.return_value = "192.168.1.1"
mock_env_state.attacker_obs_state.actions_tried = set()
mock_env_state.attacker_obs_state.machines = [
MagicMock(
ips="192.168.1.1",
logged_in=True,
root=False,
filesystem_searched=False,
tools_installed=False,
backdoor_installed=False,
untried_credentials=True,
)
]

action_id = 1
mock_action = MagicMock()
mock_action.id = action_id
mock_action.index = 0
mock_action.type = EmulationAttackerActionType.RECON
mock_env_state.attacker_action_config.actions[action_id] = mock_action

result = EmulationUtil.is_emulation_attack_action_legal(action_id, mock_env_config, mock_env_state)
assert not result

def test_check_pid(self) -> None:
"""
Test the method that checks if a given pid is running on the host
:return: None
"""
result = EmulationUtil.check_pid(1)
assert result

def test_physical_ip_match(self) -> None:
"""
Test utility method for checking if a container ip matches a physical ip
:return: None
"""
emulation_env_config = MagicMock()
emulation_env_config.kafka_config.container.docker_gw_bridge_ip = "192.168.0.1"
emulation_env_config.kafka_config.container.physical_host_ip = "10.0.0.1"
emulation_env_config.elk_config.container.docker_gw_bridge_ip = "192.168.0.2"
emulation_env_config.elk_config.container.physical_host_ip = "10.0.0.2"
emulation_env_config.sdn_controller_config = None

mock_container = MagicMock()
mock_container.physical_host_ip = "10.0.0.3"
emulation_env_config.containers_config.get_container_from_ip.return_value = mock_container

ip = "192.168.0.1"
physical_host_ip = "10.0.0.1"

result = EmulationUtil.physical_ip_match(
emulation_env_config=emulation_env_config, ip=ip, physical_host_ip=physical_host_ip
)
assert result

0 comments on commit bae1e0a

Please sign in to comment.