diff --git a/tests/core/test_module.py b/tests/core/test_module.py new file mode 100644 index 000000000..23d817d52 --- /dev/null +++ b/tests/core/test_module.py @@ -0,0 +1,381 @@ +import json +from unittest.mock import patch, MagicMock + +import pytest + +from nettacker.core.module import Module + + +class DummyOptions: + def __init__(self): + self.modules_extra_args = {"foo": "bar"} + self.skip_service_discovery = False + self.thread_per_host = 2 + self.time_sleep_between_requests = 0 + + +@pytest.fixture +def options(): + return DummyOptions() + + +@pytest.fixture +def module_args(): + return { + "process_number": 1, + "scan_id": "scan123", + "target": "127.0.0.1", + "thread_number": 1, + "total_number_threads": 1, + } + + +@patch("nettacker.core.module.TemplateLoader") +def test_init_and_service_discovery_signature(mock_loader, options, module_args): + mock_instance = MagicMock() + mock_instance.load.return_value = { + "payloads": [{"steps": [{"response": {"conditions": {"service": {"http": {}}}}}]}] + } + mock_loader.return_value = mock_instance + + module = Module("port_scan", options, **module_args) + assert "http" in module.service_discovery_signatures + + +@patch("os.listdir", return_value=["http.py"]) +@patch("nettacker.core.module.find_events") +@patch("nettacker.core.module.TemplateLoader") +def test_load_with_service_discovery( + mock_loader, mock_find_events, mock_listdir, options, module_args +): + mock_loader_inst = MagicMock() + mock_loader_inst.load.return_value = { + "payloads": [ + { + "library": "http", + "steps": [{"response": {"conditions": {"service": {"http": {}}}}}], + } + ] + } + mock_loader.return_value = mock_loader_inst + + mock_find_events.return_value = [ + MagicMock(json_event='{"port": 80, "response": {"conditions_results": {"http": {}}}}') + ] + + module = Module("test_module", options, **module_args) + module.load() + + assert module.discovered_services == {"http": [80]} + assert len(module.module_content["payloads"]) == 1 + + +@patch("nettacker.core.module.find_events") +@patch("nettacker.core.module.importlib.import_module") +@patch("nettacker.core.module.wait_for_threads_to_finish") +@patch("nettacker.core.module.time.sleep", return_value=None) +@patch("nettacker.core.module.Thread") +@patch("nettacker.core.module.TemplateLoader") +def test_start_all_conditions( + mock_loader, + mock_thread, + mock_sleep, + mock_wait, + mock_import, + mock_find_events, + options, + module_args, +): + engine_mock = MagicMock() + mock_import.return_value = MagicMock(HttpEngine=MagicMock(return_value=engine_mock)) + + mock_loader_inst = MagicMock() + mock_loader_inst.load.return_value = { + "payloads": [ + { + "library": "http", + "steps": [{"step_id": 1, "response": {"conditions": {"service": {}}}}], + } + ] + } + mock_loader.return_value = mock_loader_inst + + mock_event = MagicMock() + mock_event.json_event = json.dumps( + {"port": 80, "response": {"conditions_results": {"http": True}}} + ) + mock_find_events.return_value = [mock_event] + + module = Module("test_module", options, **module_args) + module.libraries = ["http"] + module.load() + module.start() + + mock_wait.assert_called() + + +@patch("nettacker.core.module.find_events") +@patch("nettacker.core.module.TemplateLoader") +def test_start_unsupported_library(mock_loader, mock_find_events, options, module_args): + mock_loader_inst = MagicMock() + mock_loader_inst.load.return_value = { + "payloads": [ + { + "library": "unsupported_lib", + "steps": [{"step_id": 1, "response": {"conditions": {"service": {}}}}], + } + ] + } + mock_loader.return_value = mock_loader_inst + + mock_event = MagicMock() + mock_event.json_event = json.dumps( + {"port": 1234, "response": {"conditions_results": {"unsupported_lib": True}}} + ) + mock_find_events.return_value = [mock_event] + + module = Module("test_module", options, **module_args) + module.libraries = ["http"] + module.service_discovery_signatures.append("unsupported_lib") + + module.load() + result = module.start() + + assert result is None + + +def template_loader_side_effect(name, inputs): + # NOT A TEST CASE + mock_instance = MagicMock() + + # as in inside Module.__init__ + if name == "port_scan": + mock_instance.load.return_value = { + "payloads": [{"steps": [{"response": {"conditions": {"service": {"http": {}}}}}]}] + } + # as in module.load() + elif name == "test_module": + mock_instance.load.return_value = { + "payloads": [ + { + "library": "http", + "steps": [ + [{"response": {"conditions": {"service": {}}}}], + [ + { + "response": { + "conditions": {}, + "dependent_on_temp_event": True, + "save_to_temp_events_only": True, + } + } + ], + [{"response": {"conditions": {}, "dependent_on_temp_event": True}}], + ], + } + ] + } + else: + raise ValueError(f"Unexpected module name: {name}") + + return mock_instance + + +@patch("nettacker.core.module.TemplateLoader.parse", side_effect=lambda step, _: step) +@patch("nettacker.core.module.find_events") +@patch("nettacker.core.module.TemplateLoader") +def test_sort_loops_behavior(mock_loader_cls, mock_find_events, mock_parse, options, module_args): + # This one is painful + mock_loader_cls.side_effect = template_loader_side_effect + + mock_event = MagicMock() + mock_event.json_event = json.dumps( + {"port": 80, "response": {"conditions_results": {"http": True}}} + ) + mock_find_events.return_value = [mock_event] + + module = Module("test_module", options, **module_args) + module.libraries = ["http"] + module.load() + module.sort_loops() + + steps = module.module_content["payloads"][0]["steps"] + + assert steps[0][0]["response"]["conditions"] == {"service": {}} + assert steps[1][0]["response"]["dependent_on_temp_event"] + assert steps[1][0]["response"]["save_to_temp_events_only"] + assert steps[2][0]["response"]["dependent_on_temp_event"] + assert "save_to_temp_events_only" not in steps[2][0]["response"] + + +def loader_side_effect(name, inputs): + # HELPER + mock_inst = MagicMock() + + if name == "port_scan": + mock_inst.load.return_value = { + "payloads": [{"steps": [{"response": {"conditions": {"service": {"http": {}}}}}]}] + } + elif name == "test_module": + mock_inst.load.return_value = { + "payloads": [ + { + "library": "http", + "steps": [ + [{"id": 1}], + [{"id": 2}], + ], + } + ] + } + else: + raise ValueError(f"Unexpected module name: {name}") + + return mock_inst + + +@patch("nettacker.core.module.find_events", return_value=[]) +@patch("nettacker.core.module.Thread") +@patch("nettacker.core.module.importlib.import_module") +@patch("nettacker.core.module.time.sleep", return_value=None) +@patch("nettacker.core.module.wait_for_threads_to_finish") +def test_start_creates_threads_minimal( + mock_wait, + mock_sleep, + mock_import_module, + mock_thread_cls, + mock_find_events, + options, + module_args, +): + # Mock HttpEngine from the imported module + fake_engine = MagicMock() + mock_import_module.return_value = MagicMock(HttpEngine=MagicMock(return_value=fake_engine)) + + # Mock thread instances + mock_thread_instance = MagicMock() + mock_thread_cls.return_value = mock_thread_instance + + # Create module with mocked attributes + module = Module("test_module", options, **module_args) + module.libraries = ["http"] + module.discovered_services = {"http": [80]} + module.service_discovery_signatures = ["http"] + + module.module_content = { + "payloads": [ + { + "library": "http", + "steps": [ + [{"response": {}, "id": 1}], + [{"response": {}, "id": 2}], + ], + } + ] + } + + # Run + module.start() + + # Assert threads created twice + assert mock_thread_cls.call_count == 2 + + # Collect actual IDs passed to Thread + expected_ids = {1, 2} + actual_ids = {kwargs["args"][0]["id"] for _, kwargs in mock_thread_cls.call_args_list} + + assert actual_ids == expected_ids + assert mock_thread_instance.start.call_count == 2 + + +@patch("nettacker.core.module.TemplateLoader.parse", side_effect=lambda x, _: x) +@patch("nettacker.core.module.log") +@patch("nettacker.core.module.TemplateLoader") +@patch("nettacker.core.module.find_events") +def test_start_library_not_supported( + mock_find_events, + mock_loader_cls, + mock_log, + mock_parse, + module_args, +): + def loader_side_effect_specific(name, inputs): + mock_inst = MagicMock() + if name == "port_scan": + mock_inst.load.return_value = { + "payloads": [{"steps": [{"response": {"conditions": {"service": {"http": {}}}}}]}] + } + elif name == "test_module": + mock_inst.load.return_value = { + "payloads": [ + { + "library": "unsupported_lib", + "steps": [ + [{"id": 1}], + ], + } + ] + } + return mock_inst + + mock_loader_cls.side_effect = loader_side_effect_specific + + mock_event = MagicMock() + mock_event.json_event = json.dumps( + {"port": 80, "response": {"conditions_results": {"http": True}}} + ) + mock_find_events.return_value = [mock_event] + + options.modules_extra_args = {} + options.skip_service_discovery = True + + module = Module("test_module", options, **module_args) + module.libraries = ["http"] + module.load() + + result = module.start() + + assert result is None + mock_log.warn.assert_called_once() + assert "unsupported_lib" in mock_log.warn.call_args[0][0] + + +@patch("nettacker.core.module.TemplateLoader.parse", side_effect=lambda step, _: step) +@patch("nettacker.core.module.find_events") +@patch("nettacker.core.module.TemplateLoader") +def test_load_appends_port_to_existing_protocol( + mock_loader_cls, + mock_find_events, + mock_parse, + options, + module_args, +): + def loader_side_effect_specific(name, inputs): + mock_inst = MagicMock() + mock_inst.load.return_value = { + "payloads": [ + { + "library": "http", + "steps": [ + {"response": {"conditions": {"service": {}}}} # .load() requires no [] + ], + } + ] + } + return mock_inst + + mock_loader_cls.side_effect = loader_side_effect_specific + mock_find_events.return_value = [ + MagicMock( + json_event=json.dumps({"port": 80, "response": {"conditions_results": {"http": {}}}}) + ), + MagicMock( + json_event=json.dumps({"port": 443, "response": {"conditions_results": {"http": {}}}}) + ), + ] + + module = Module("test_module", options, **module_args) + module.libraries = ["http"] + module.service_discovery_signatures = ["http"] + module.load() + assert module.discovered_services == {"http": [80, 443]}