diff --git a/Solutions/AbnormalSecurity/Data Connectors/AbnormalSecurityConn.zip b/Solutions/AbnormalSecurity/Data Connectors/AbnormalSecurityConn.zip index 1b9bdb6e51d..b8e6d85a4b2 100644 Binary files a/Solutions/AbnormalSecurity/Data Connectors/AbnormalSecurityConn.zip and b/Solutions/AbnormalSecurity/Data Connectors/AbnormalSecurityConn.zip differ diff --git a/Solutions/AbnormalSecurity/Data Connectors/SentinelFunctionsOrchestrator/soar_connector_async.py b/Solutions/AbnormalSecurity/Data Connectors/SentinelFunctionsOrchestrator/soar_connector_async.py index 723155e75a6..f087f3c3ac1 100644 --- a/Solutions/AbnormalSecurity/Data Connectors/SentinelFunctionsOrchestrator/soar_connector_async.py +++ b/Solutions/AbnormalSecurity/Data Connectors/SentinelFunctionsOrchestrator/soar_connector_async.py @@ -48,7 +48,7 @@ def _get_header(self): return { "Authorization": f"Bearer {self.api_key}", "Soar-Integration-Origin": "AZURE SENTINEL", - "Azure-Sentinel-Version": "2024-10-03" + "Azure-Sentinel-Version": "2024-11-29" } def _get_filter_query(self, filter_param, gte_datetime=None, lte_datetime=None): diff --git a/Solutions/AbnormalSecurity/Data Connectors/SentinelFunctionsOrchestrator/soar_connector_async_v2.py b/Solutions/AbnormalSecurity/Data Connectors/SentinelFunctionsOrchestrator/soar_connector_async_v2.py index 22981bd74c1..b21f8b5b8b0 100644 --- a/Solutions/AbnormalSecurity/Data Connectors/SentinelFunctionsOrchestrator/soar_connector_async_v2.py +++ b/Solutions/AbnormalSecurity/Data Connectors/SentinelFunctionsOrchestrator/soar_connector_async_v2.py @@ -36,7 +36,7 @@ def get_headers(ctx: Context) -> Dict[str, str]: "X-Abnormal-Trace-Id": str(ctx.TRACE_ID), "Authorization": f"Bearer {ctx.API_TOKEN}", "Soar-Integration-Origin": "AZURE SENTINEL", - "Azure-Sentinel-Version": "2024-10-03 V2", + "Azure-Sentinel-Version": "2024-11-29 V2", } @@ -97,9 +97,9 @@ async def call_threat_campaigns_endpoint( threat_campaigns = set() - nextPageNumber = 1 - while nextPageNumber: - params["pageNumber"] = nextPageNumber + pageNumber = 1 + while pageNumber: + params["pageNumber"] = pageNumber endpoint = compute_url(ctx.BASE_URL, "/v1/threats", params) headers = get_headers(ctx) @@ -112,9 +112,10 @@ async def call_threat_campaigns_endpoint( ) nextPageNumber = response.get("nextPageNumber") - assert nextPageNumber is None or nextPageNumber > 0 + assert nextPageNumber is None or nextPageNumber == pageNumber + 1 + pageNumber = nextPageNumber - if nextPageNumber is None or nextPageNumber > ctx.MAX_PAGE_NUMBER: + if pageNumber is None or pageNumber > ctx.MAX_PAGE_NUMBER: break return list(threat_campaigns) @@ -130,9 +131,9 @@ async def call_cases_endpoint( case_ids = set() - nextPageNumber = 1 - while nextPageNumber: - params["pageNumber"] = nextPageNumber + pageNumber = 1 + while pageNumber: + params["pageNumber"] = pageNumber endpoint = compute_url(ctx.BASE_URL, "/v1/cases", params) headers = get_headers(ctx) @@ -143,9 +144,10 @@ async def call_cases_endpoint( case_ids.update([case["caseId"] for case in response.get("cases", [])]) nextPageNumber = response.get("nextPageNumber") - assert nextPageNumber is None or nextPageNumber > 0 + assert nextPageNumber is None or nextPageNumber == pageNumber + 1 + pageNumber = nextPageNumber - if nextPageNumber is None or nextPageNumber > ctx.MAX_PAGE_NUMBER: + if pageNumber is None or pageNumber > ctx.MAX_PAGE_NUMBER: break return list(case_ids) @@ -155,27 +157,43 @@ async def call_single_threat_endpoint( ctx: Context, threat_id: str, semaphore: asyncio.Semaphore ) -> List[str]: async with semaphore: - endpoint = compute_url(ctx.BASE_URL, f"/v1/threats/{threat_id}", params={}) - headers = get_headers(ctx) + filtered_messages = [] - response = await fetch_with_retries(url=endpoint, headers=headers) + pageNumber = 1 + params = {"pageSize": ctx.SINGLE_THREAT_PAGE_SIZE} + while pageNumber: + params["pageNumber"] = pageNumber + print("Single Threat Params:", params) + endpoint = compute_url(ctx.BASE_URL, f"/v1/threats/{threat_id}", params=params) + headers = get_headers(ctx) - filtered_messages = [] - for message in response["messages"]: - message_id = message["abxMessageId"] - remediation_time_str = message["remediationTimestamp"] - - remediation_time = try_str_to_datetime(remediation_time_str) - if ( - remediation_time >= ctx.CLIENT_FILTER_TIME_RANGE.start - and remediation_time < ctx.CLIENT_FILTER_TIME_RANGE.end - ): - filtered_messages.append(json.dumps(message, sort_keys=True)) - logging.info(f"Successfully processed v2 threat message: {message_id}") - else: - logging.warning(f"Skipped processing v2 threat message: {message_id}") - - return filtered_messages + response = await fetch_with_retries(url=endpoint, headers=headers) + + for message in response["messages"]: + message_id = message["abxMessageId"] + remediation_time_str = message["remediationTimestamp"] + + remediation_time = try_str_to_datetime(remediation_time_str) + if ( + remediation_time >= ctx.CLIENT_FILTER_TIME_RANGE.start + and remediation_time < ctx.CLIENT_FILTER_TIME_RANGE.end + ): + filtered_messages.append(json.dumps(message, sort_keys=True)) + logging.info(f"Successfully processed v2 threat message: {message_id}") + elif remediation_time < ctx.CLIENT_FILTER_TIME_RANGE.start: + logging.info(f"Skipping further messages as remediationTime {remediation_time} of {message_id} < {ctx.CLIENT_FILTER_TIME_RANGE.start}") + return list(set(filtered_messages)) + else: + logging.warning(f"Skipped processing v2 threat message: {message_id}") + + nextPageNumber = response.get("nextPageNumber") + assert nextPageNumber is None or nextPageNumber == pageNumber + 1 + pageNumber = nextPageNumber + + if pageNumber is None or pageNumber > ctx.MAX_PAGE_NUMBER: + break + + return list(set(filtered_messages)) async def call_single_case_endpoint( diff --git a/Solutions/AbnormalSecurity/Data Connectors/SentinelFunctionsOrchestrator/utils.py b/Solutions/AbnormalSecurity/Data Connectors/SentinelFunctionsOrchestrator/utils.py index 42308eb6aae..bf75d43502d 100644 --- a/Solutions/AbnormalSecurity/Data Connectors/SentinelFunctionsOrchestrator/utils.py +++ b/Solutions/AbnormalSecurity/Data Connectors/SentinelFunctionsOrchestrator/utils.py @@ -55,6 +55,7 @@ class Context(BaseModel): LIMIT: timedelta NUM_CONCURRENCY: int MAX_PAGE_NUMBER: int + SINGLE_THREAT_PAGE_SIZE: int BASE_URL: str API_TOKEN: str TIME_RANGE: TimeRange @@ -140,15 +141,16 @@ def get_context(stored_date_time: str) -> Context: BASE_URL = os.environ.get("API_HOST", "https://api.abnormalplatform.com/v1") API_TOKEN = os.environ["ABNORMAL_SECURITY_REST_API_TOKEN"] OUTAGE_TIME = timedelta( - minutes=int(os.environ.get("ABNORMAL_OUTAGE_TIME_MIN", "15")) + minutes=int(os.environ.get("ABNORMAL_OUTAGE_TIME_MIN", "45")) ) LAG_ON_BACKEND = timedelta( seconds=int(os.environ.get("ABNORMAL_LAG_ON_BACKEND_SEC", "30")) ) FREQUENCY = timedelta(minutes=int(os.environ.get("ABNORMAL_FREQUENCY_MIN", "5"))) LIMIT = timedelta(minutes=int(os.environ.get("ABNORMAL_LIMIT_MIN", "6"))) - NUM_CONCURRENCY = int(os.environ.get("ABNORMAL_NUM_CONCURRENCY", "5")) - MAX_PAGE_NUMBER = int(os.environ.get("ABNORMAL_MAX_PAGE_NUMBER", "3")) + NUM_CONCURRENCY = int(os.environ.get("ABNORMAL_NUM_CONCURRENCY", "2")) + MAX_PAGE_NUMBER = int(os.environ.get("ABNORMAL_MAX_PAGE_NUMBER", "6")) + SINGLE_THREAT_PAGE_SIZE = int(os.environ.get("ABNORMAL_SINGLE_THREAT_PAGE_SIZE", "40")) STORED_TIME = try_str_to_datetime(stored_date_time) CURRENT_TIME = try_str_to_datetime(datetime.now().strftime(TIME_FORMAT)) @@ -171,7 +173,8 @@ def get_context(stored_date_time: str) -> Context: CURRENT_TIME=CURRENT_TIME, LIMIT=LIMIT, TRACE_ID=uuid4(), - PYTHON_VERSION=sys.version + PYTHON_VERSION=sys.version, + SINGLE_THREAT_PAGE_SIZE=SINGLE_THREAT_PAGE_SIZE ) diff --git a/Solutions/AbnormalSecurity/Data Connectors/Tests/soar_connector_async_v2_test.py b/Solutions/AbnormalSecurity/Data Connectors/Tests/soar_connector_async_v2_test.py index e3a71fffeea..b9f11e7fed0 100644 --- a/Solutions/AbnormalSecurity/Data Connectors/Tests/soar_connector_async_v2_test.py +++ b/Solutions/AbnormalSecurity/Data Connectors/Tests/soar_connector_async_v2_test.py @@ -99,18 +99,19 @@ def setUp(self): STORED_TIME=datetime(2024, 10, 1, 12, 55), CURRENT_TIME=datetime(2024, 10, 1, 13, 0), TRACE_ID=self.trace_id, - PYTHON_VERSION="3.11" + PYTHON_VERSION="3.11", + SINGLE_THREAT_PAGE_SIZE=40 ) def test_valid_headers(self): # Test case for valid headers headers = get_headers(self.ctx) expected_headers = { - "X-Sentinel-Context": "eyJMQUdfT05fQkFDS0VORCI6IDMwLjAsICJPVVRBR0VfVElNRSI6IDkwMC4wLCAiRlJFUVVFTkNZIjogMzAwLjAsICJMSU1JVCI6IDM2MC4wLCAiTlVNX0NPTkNVUlJFTkNZIjogNSwgIk1BWF9QQUdFX05VTUJFUiI6IDEwMCwgIkJBU0VfVVJMIjogImh0dHA6Ly9leGFtcGxlLmNvbSIsICJUSU1FX1JBTkdFIjogeyJzdGFydCI6ICIyMDI0LTEwLTAxVDEyOjU1OjAwIiwgImVuZCI6ICIyMDI0LTEwLTAxVDEzOjAwOjAwIn0sICJDTElFTlRfRklMVEVSX1RJTUVfUkFOR0UiOiB7InN0YXJ0IjogIjIwMjQtMTAtMDFUMTI6NTQ6MzAiLCAiZW5kIjogIjIwMjQtMTAtMDFUMTI6NTk6MzAifSwgIlNUT1JFRF9USU1FIjogIjIwMjQtMTAtMDFUMTI6NTU6MDAiLCAiQ1VSUkVOVF9USU1FIjogIjIwMjQtMTAtMDFUMTM6MDA6MDAiLCAiVFJBQ0VfSUQiOiAiYmRiMmExMjctZWQzZC00NjRhLWIyMDUtMzgyMGNjZjZkM2YyIiwgIlBZVEhPTl9WRVJTSU9OIjogIjMuMTEifQ==", + "X-Sentinel-Context": "eyJMQUdfT05fQkFDS0VORCI6IDMwLjAsICJPVVRBR0VfVElNRSI6IDkwMC4wLCAiRlJFUVVFTkNZIjogMzAwLjAsICJMSU1JVCI6IDM2MC4wLCAiTlVNX0NPTkNVUlJFTkNZIjogNSwgIk1BWF9QQUdFX05VTUJFUiI6IDEwMCwgIlNJTkdMRV9USFJFQVRfUEFHRV9TSVpFIjogNDAsICJCQVNFX1VSTCI6ICJodHRwOi8vZXhhbXBsZS5jb20iLCAiVElNRV9SQU5HRSI6IHsic3RhcnQiOiAiMjAyNC0xMC0wMVQxMjo1NTowMCIsICJlbmQiOiAiMjAyNC0xMC0wMVQxMzowMDowMCJ9LCAiQ0xJRU5UX0ZJTFRFUl9USU1FX1JBTkdFIjogeyJzdGFydCI6ICIyMDI0LTEwLTAxVDEyOjU0OjMwIiwgImVuZCI6ICIyMDI0LTEwLTAxVDEyOjU5OjMwIn0sICJTVE9SRURfVElNRSI6ICIyMDI0LTEwLTAxVDEyOjU1OjAwIiwgIkNVUlJFTlRfVElNRSI6ICIyMDI0LTEwLTAxVDEzOjAwOjAwIiwgIlRSQUNFX0lEIjogImJkYjJhMTI3LWVkM2QtNDY0YS1iMjA1LTM4MjBjY2Y2ZDNmMiIsICJQWVRIT05fVkVSU0lPTiI6ICIzLjExIn0=", "X-Abnormal-Trace-Id": str(self.trace_id), "Authorization": f"Bearer {self.api_token}", "Soar-Integration-Origin": "AZURE SENTINEL", - "Azure-Sentinel-Version": "2024-10-03 V2", + "Azure-Sentinel-Version": "2024-11-29 V2", } self.maxDiff = None self.assertEqual(headers, expected_headers) @@ -261,7 +262,7 @@ def test_compute_url_with_port_in_base_url(self): "SentinelFunctionsOrchestrator.soar_connector_async_v2.fetch_with_retries", new_callable=AsyncMock, ) -async def test_get_threats(mock_fetch): +async def test_get_threats(mock_fetch: AsyncMock): mock_intervals = [ MagicMock(start="2024-10-01T13:00:00Z", end=None), ] @@ -285,6 +286,7 @@ async def test_get_threats(mock_fetch): ctx.NUM_CONCURRENCY = 2 ctx.CLIENT_FILTER_TIME_RANGE.start = try_str_to_datetime("2024-10-01T12:00:00Z") ctx.CLIENT_FILTER_TIME_RANGE.end = try_str_to_datetime("2024-10-01T13:00:00Z") + ctx.SINGLE_THREAT_PAGE_SIZE = 40 output_queue = asyncio.Queue() @@ -308,6 +310,8 @@ async def test_get_threats(mock_fetch): await get_threats(ctx, output_queue) # Ensure fetch_with_retries was called with expected values + mock_fetch.assert_any_call(url='http://example.com/v1/threats', headers={'Authorization': 'Bearer token'}) + mock_fetch.assert_any_call(url='http://example.com/v1/threats/threat1?pageSize=40&pageNumber=1', headers={'Authorization': 'Bearer token'}) assert mock_fetch.call_count == 2 # Ensure the messages were put into the output queue @@ -336,6 +340,7 @@ async def test_get_cases(mock_fetch): ctx.NUM_CONCURRENCY = 2 ctx.CLIENT_FILTER_TIME_RANGE.start = try_str_to_datetime("2024-10-01T12:00:00Z") ctx.CLIENT_FILTER_TIME_RANGE.end = try_str_to_datetime("2024-10-01T13:00:00Z") + ctx.SINGLE_THREAT_PAGE_SIZE = 40 output_queue = asyncio.Queue() @@ -371,6 +376,8 @@ async def test_get_cases(mock_fetch): await get_cases(ctx, output_queue) # Ensure fetch_with_retries was called with expected values + mock_fetch.assert_any_call(url='http://example.com/v1/cases', headers={'Authorization': 'Bearer token'}) + mock_fetch.assert_any_call(url='http://example.com/v1/cases/case1', headers={'Authorization': 'Bearer token'}) assert mock_fetch.call_count == 2 # Ensure the cases were put into the output queue @@ -385,6 +392,343 @@ async def test_get_cases(mock_fetch): assert output_message == expected_record +@pytest.mark.asyncio +@patch( + "SentinelFunctionsOrchestrator.soar_connector_async_v2.fetch_with_retries", + new_callable=AsyncMock, +) +async def test_get_threats_paginated(mock_fetch: AsyncMock): + mock_intervals = [ + MagicMock(start="2024-10-01T13:00:00Z", end=None), + ] + + mock_threat_campaign_response = { + "total": 1, + "threats": [{"threatId": "abca34c0-04fc-222d-30f6-9e62e51dfc95"}], + "nextPageNumber": None, + } + + mock_single_threat_response_1 = { + "threatId": "abca34c0-04fc-222d-30f6-9e62e51dfc95", + "messages": [ + { + "abxMessageId": 8340091768378090492, + "remediationTimestamp": "2024-10-01T12:30:40Z" + }, + { + "abxMessageId": -7487512360242110741, + "remediationTimestamp": "2024-10-01T12:30:40Z" + }, + { + "abxMessageId": -1453682119958233571, + "remediationTimestamp": "2024-10-01T12:30:40Z" + }, + { + "abxMessageId": -2738917250488486006, + "remediationTimestamp": "2024-10-01T12:30:40Z" + }, + { + "abxMessageId": 482233753373918965, + "remediationTimestamp": "2024-10-01T12:30:40Z" + }, + { + "abxMessageId": 9119659315270197918, + "remediationTimestamp": "2024-10-01T12:30:40Z" + }, + { + "abxMessageId": 3546172484236699227, + "remediationTimestamp": "2024-10-01T12:30:40Z" + } + ], + "pageNumber": 1, + "total": 16, + "nextPageNumber": 2 + } + mock_single_threat_response_2 = { + "threatId": "abca34c0-04fc-222d-30f6-9e62e51dfc95", + "messages": [ + { + "abxMessageId": -68647174525282065, + "remediationTimestamp": "2024-10-01T12:30:40Z", + }, + { + "abxMessageId": 1025490956646620319, + "remediationTimestamp": "2024-10-01T12:30:40Z", + }, + { + "abxMessageId": 8353208793487178298, + "remediationTimestamp": "2024-10-01T12:30:40Z", + }, + { + "abxMessageId": -7300418853454868601, + "remediationTimestamp": "2024-10-01T12:30:40Z", + }, + { + "abxMessageId": -559214588526485457, + "remediationTimestamp": "2024-10-01T12:30:40Z", + }, + { + "abxMessageId": 4447975809254795357, + "remediationTimestamp": "2024-10-01T12:30:40Z", + }, + { + "abxMessageId": -374721447240777722, + "remediationTimestamp": "2024-10-01T12:30:40Z", + } + ], + "pageNumber": 2, + "total": 16, + "nextPageNumber": 3 + } + mock_single_threat_response_3 = { + "threatId": "abca34c0-04fc-222d-30f6-9e62e51dfc95", + "messages": [ + { + "abxMessageId": 3333927803157276490, + "remediationTimestamp": "2024-10-01T12:30:40Z", + }, + { + "abxMessageId": -1777029380775902847, + "remediationTimestamp": "2024-10-01T12:30:40Z", + } + ], + "pageNumber": 3, + "total": 16 + } + # Mock the context and output queue + ctx = MagicMock() + ctx.BASE_URL = "http://example.com" + ctx.MAX_PAGE_NUMBER = 10 + ctx.NUM_CONCURRENCY = 2 + ctx.CLIENT_FILTER_TIME_RANGE.start = try_str_to_datetime("2024-10-01T12:00:00Z") + ctx.CLIENT_FILTER_TIME_RANGE.end = try_str_to_datetime("2024-10-01T13:00:00Z") + ctx.SINGLE_THREAT_PAGE_SIZE = 7 + + output_queue = asyncio.Queue() + + # Mock the functions and methods used in get_threats + mock_fetch.side_effect = [ + mock_threat_campaign_response, + mock_single_threat_response_1, + mock_single_threat_response_2, + mock_single_threat_response_3, + ] + + with patch( + "SentinelFunctionsOrchestrator.soar_connector_async_v2.compute_intervals", + return_value=mock_intervals, + ): + with patch( + "SentinelFunctionsOrchestrator.soar_connector_async_v2.get_query_params" + ) as mock_get_query_params: + with patch( + "SentinelFunctionsOrchestrator.soar_connector_async_v2.get_headers", + return_value={"Authorization": "Bearer token"}, + ): + await get_threats(ctx, output_queue) + + # Ensure fetch_with_retries was called with expected values + mock_fetch.assert_any_call(url='http://example.com/v1/threats', headers={'Authorization': 'Bearer token'}) + mock_fetch.assert_any_call(url='http://example.com/v1/threats/abca34c0-04fc-222d-30f6-9e62e51dfc95?pageSize=7&pageNumber=1', headers={'Authorization': 'Bearer token'}) + mock_fetch.assert_any_call(url='http://example.com/v1/threats/abca34c0-04fc-222d-30f6-9e62e51dfc95?pageSize=7&pageNumber=2', headers={'Authorization': 'Bearer token'}) + mock_fetch.assert_any_call(url='http://example.com/v1/threats/abca34c0-04fc-222d-30f6-9e62e51dfc95?pageSize=7&pageNumber=3', headers={'Authorization': 'Bearer token'}) + assert mock_fetch.call_count == 4 + + # Ensure the messages were put into the output queue + assert output_queue.qsize() == 16 + + # Validate the content of the output queue + + expected_records = [ + ("ABNORMAL_THREAT_MESSAGES", {"abxMessageId": 8340091768378090492, "remediationTimestamp": "2024-10-01T12:30:40Z"}), + ("ABNORMAL_THREAT_MESSAGES", {"abxMessageId": -7487512360242110741, "remediationTimestamp": "2024-10-01T12:30:40Z"}), + ("ABNORMAL_THREAT_MESSAGES", {"abxMessageId": -1453682119958233571, "remediationTimestamp": "2024-10-01T12:30:40Z"}), + ("ABNORMAL_THREAT_MESSAGES", {"abxMessageId": -2738917250488486006, "remediationTimestamp": "2024-10-01T12:30:40Z"}), + ("ABNORMAL_THREAT_MESSAGES", {"abxMessageId": 482233753373918965, "remediationTimestamp": "2024-10-01T12:30:40Z"}), + ("ABNORMAL_THREAT_MESSAGES", {"abxMessageId": 9119659315270197918, "remediationTimestamp": "2024-10-01T12:30:40Z"}), + ("ABNORMAL_THREAT_MESSAGES", {"abxMessageId": 3546172484236699227, "remediationTimestamp": "2024-10-01T12:30:40Z"}), + ("ABNORMAL_THREAT_MESSAGES", {"abxMessageId": -68647174525282065, "remediationTimestamp": "2024-10-01T12:30:40Z"}), + ("ABNORMAL_THREAT_MESSAGES", {"abxMessageId": 1025490956646620319, "remediationTimestamp": "2024-10-01T12:30:40Z"}), + ("ABNORMAL_THREAT_MESSAGES", {"abxMessageId": 8353208793487178298, "remediationTimestamp": "2024-10-01T12:30:40Z"}), + ("ABNORMAL_THREAT_MESSAGES", {"abxMessageId": -7300418853454868601, "remediationTimestamp": "2024-10-01T12:30:40Z"}), + ("ABNORMAL_THREAT_MESSAGES", {"abxMessageId": -559214588526485457, "remediationTimestamp": "2024-10-01T12:30:40Z"}), + ("ABNORMAL_THREAT_MESSAGES", {"abxMessageId": 4447975809254795357, "remediationTimestamp": "2024-10-01T12:30:40Z"}), + ("ABNORMAL_THREAT_MESSAGES", {"abxMessageId": -374721447240777722, "remediationTimestamp": "2024-10-01T12:30:40Z"}), + ("ABNORMAL_THREAT_MESSAGES", {"abxMessageId": 3333927803157276490, "remediationTimestamp": "2024-10-01T12:30:40Z"}), + ("ABNORMAL_THREAT_MESSAGES", {"abxMessageId": -1777029380775902847, "remediationTimestamp": "2024-10-01T12:30:40Z"}), + ] + + actual_records = [] + + while not output_queue.empty(): + output_message = await output_queue.get() + actual_records.append(output_message) + + assert sorted(map(lambda x: str(x), expected_records)) == sorted(map(lambda x: str(x), actual_records)) + assert len(expected_records) == len(actual_records) + assert output_queue.empty() + + + + + +@pytest.mark.asyncio +@patch( + "SentinelFunctionsOrchestrator.soar_connector_async_v2.fetch_with_retries", + new_callable=AsyncMock, +) +async def test_get_threats_paginated_early_exit(mock_fetch: AsyncMock): + mock_intervals = [ + MagicMock(start="2024-10-01T13:00:00Z", end=None), + ] + + mock_threat_campaign_response = { + "total": 1, + "threats": [{"threatId": "abca34c0-04fc-222d-30f6-9e62e51dfc88"}], + "nextPageNumber": None, + } + + mock_single_threat_response_1 = { + "threatId": "abca34c0-04fc-222d-30f6-9e62e51dfc88", + "messages": [ + { + "abxMessageId": 8340091768378090492, + "remediationTimestamp": "2024-10-01T12:30:40Z" + }, + { + "abxMessageId": -7487512360242110741, + "remediationTimestamp": "2024-10-01T12:30:40Z" + }, + { + "abxMessageId": -1453682119958233571, + "remediationTimestamp": "2024-10-01T12:30:40Z" + }, + { + "abxMessageId": -2738917250488486006, + "remediationTimestamp": "2024-10-01T12:30:40Z" + }, + { + "abxMessageId": 482233753373918965, + "remediationTimestamp": "2024-10-01T12:30:40Z" + }, + { + "abxMessageId": 9119659315270197918, + "remediationTimestamp": "2024-10-01T12:30:40Z" + }, + { + "abxMessageId": 3546172484236699227, + "remediationTimestamp": "2024-10-01T12:30:40Z" + } + ], + "pageNumber": 1, + "total": 16, + "nextPageNumber": 2 + } + mock_single_threat_response_2 = { + "threatId": "abca34c0-04fc-222d-30f6-9e62e51dfc88", + "messages": [ + { + "abxMessageId": -68647174525282065, + "remediationTimestamp": "2024-10-01T12:30:40Z", + }, + { + "abxMessageId": 1025490956646620319, + "remediationTimestamp": "2024-10-01T12:30:40Z", + }, + { + "abxMessageId": 8353208793487178298, + "remediationTimestamp": "2024-10-01T11:59:40Z", + }, + { + "abxMessageId": -7300418853454868601, + "remediationTimestamp": "2024-10-01T11:59:40Z", + }, + { + "abxMessageId": -559214588526485457, + "remediationTimestamp": "2024-10-01T11:59:40Z", + }, + { + "abxMessageId": 4447975809254795357, + "remediationTimestamp": "2024-10-01T11:59:40Z", + }, + { + "abxMessageId": -374721447240777722, + "remediationTimestamp": "2024-10-01T11:59:40Z", + } + ], + "pageNumber": 2, + "total": 16, + "nextPageNumber": 3 + } + + # Mock the context and output queue + ctx = MagicMock() + ctx.BASE_URL = "http://example.com" + ctx.MAX_PAGE_NUMBER = 10 + ctx.NUM_CONCURRENCY = 2 + ctx.CLIENT_FILTER_TIME_RANGE.start = try_str_to_datetime("2024-10-01T12:00:00Z") + ctx.CLIENT_FILTER_TIME_RANGE.end = try_str_to_datetime("2024-10-01T13:00:00Z") + ctx.SINGLE_THREAT_PAGE_SIZE = 7 + + output_queue = asyncio.Queue() + + # Mock the functions and methods used in get_threats + mock_fetch.side_effect = [ + mock_threat_campaign_response, + mock_single_threat_response_1, + mock_single_threat_response_2, + ] + + with patch( + "SentinelFunctionsOrchestrator.soar_connector_async_v2.compute_intervals", + return_value=mock_intervals, + ): + with patch( + "SentinelFunctionsOrchestrator.soar_connector_async_v2.get_query_params" + ) as mock_get_query_params: + with patch( + "SentinelFunctionsOrchestrator.soar_connector_async_v2.get_headers", + return_value={"Authorization": "Bearer token"}, + ): + await get_threats(ctx, output_queue) + + # Ensure fetch_with_retries was called with expected values + mock_fetch.assert_any_call(url='http://example.com/v1/threats', headers={'Authorization': 'Bearer token'}) + mock_fetch.assert_any_call(url='http://example.com/v1/threats/abca34c0-04fc-222d-30f6-9e62e51dfc88?pageSize=7&pageNumber=1', headers={'Authorization': 'Bearer token'}) + mock_fetch.assert_any_call(url='http://example.com/v1/threats/abca34c0-04fc-222d-30f6-9e62e51dfc88?pageSize=7&pageNumber=2', headers={'Authorization': 'Bearer token'}) + assert mock_fetch.call_count == 3 + + # Ensure the messages were put into the output queue + assert output_queue.qsize() == 9 + + # Validate the content of the output queue + + expected_records = [ + ("ABNORMAL_THREAT_MESSAGES", {"abxMessageId": 8340091768378090492, "remediationTimestamp": "2024-10-01T12:30:40Z"}), + ("ABNORMAL_THREAT_MESSAGES", {"abxMessageId": -7487512360242110741, "remediationTimestamp": "2024-10-01T12:30:40Z"}), + ("ABNORMAL_THREAT_MESSAGES", {"abxMessageId": -1453682119958233571, "remediationTimestamp": "2024-10-01T12:30:40Z"}), + ("ABNORMAL_THREAT_MESSAGES", {"abxMessageId": -2738917250488486006, "remediationTimestamp": "2024-10-01T12:30:40Z"}), + ("ABNORMAL_THREAT_MESSAGES", {"abxMessageId": 482233753373918965, "remediationTimestamp": "2024-10-01T12:30:40Z"}), + ("ABNORMAL_THREAT_MESSAGES", {"abxMessageId": 9119659315270197918, "remediationTimestamp": "2024-10-01T12:30:40Z"}), + ("ABNORMAL_THREAT_MESSAGES", {"abxMessageId": 3546172484236699227, "remediationTimestamp": "2024-10-01T12:30:40Z"}), + ("ABNORMAL_THREAT_MESSAGES", {"abxMessageId": -68647174525282065, "remediationTimestamp": "2024-10-01T12:30:40Z"}), + ("ABNORMAL_THREAT_MESSAGES", {"abxMessageId": 1025490956646620319, "remediationTimestamp": "2024-10-01T12:30:40Z"}), + ] + + actual_records = [] + + while not output_queue.empty(): + output_message = await output_queue.get() + actual_records.append(output_message) + + assert sorted(map(lambda x: str(x), expected_records)) == sorted(map(lambda x: str(x), actual_records)) + assert len(expected_records) == len(actual_records) + assert output_queue.empty() + + + + + if __name__ == "__main__": unittest.main() pytest.main() diff --git a/Solutions/AbnormalSecurity/Data Connectors/Tests/utils_test.py b/Solutions/AbnormalSecurity/Data Connectors/Tests/utils_test.py index 2f4185567d5..eacce054674 100644 --- a/Solutions/AbnormalSecurity/Data Connectors/Tests/utils_test.py +++ b/Solutions/AbnormalSecurity/Data Connectors/Tests/utils_test.py @@ -144,7 +144,8 @@ def setUp(self): STORED_TIME=datetime(2024, 10, 1, 12, 55), CURRENT_TIME=datetime(2024, 10, 1, 13, 0), TRACE_ID=uuid4(), - PYTHON_VERSION="3.11" + PYTHON_VERSION="3.11", + SINGLE_THREAT_PAGE_SIZE=40 ) def test_valid_intervals(self):