Skip to content

Commit

Permalink
fix: changed to using a macaron.util function for head request
Browse files Browse the repository at this point in the history
  • Loading branch information
art1f1c3R committed Dec 4, 2024
1 parent 2ebd8e3 commit f9626a7
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@

import logging

import requests

from macaron.errors import HeuristicAnalyzerValueError
from macaron.json_tools import JsonType, json_extract
from macaron.malware_analyzer.pypi_heuristics.base_analyzer import BaseHeuristicAnalyzer
from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult, Heuristics
from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset
from macaron.util import send_head_http_raw

logger: logging.Logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -87,7 +86,8 @@ def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicRes
inspector_prefix = f"{self.INSPECTOR_PREFIX}{name.lower()}/{version}/"
inspector_link = release_metadata["url"].replace(self.PYPI_PREFIX, inspector_prefix)

if not self._valid_url(inspector_link, pypi_package_json.pypi_registry.request_timeout):
# use a head request because we don't care about the response contents
if send_head_http_raw(inspector_link) is None:
inspector_link = ""

release_files.append(release_metadata["url"])
Expand All @@ -102,10 +102,3 @@ def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicRes
return HeuristicResult.PASS, {version: release_files}

return HeuristicResult.FAIL, {version: release_files}

def _valid_url(self, url: str, timeout: int) -> bool:
try:
response = requests.head(url, allow_redirects=True, timeout=timeout)
return response.status_code == 200
except requests.exceptions.RequestException:
return False
66 changes: 66 additions & 0 deletions src/macaron/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,72 @@ def send_get_http(url: str, headers: dict) -> dict:
return dict(response.json())


def send_head_http_raw(
url: str, headers: dict | None = None, timeout: int | None = None, allow_redirects: bool = True
) -> Response | None:
"""Send the HEAD HTTP request with the given url and headers.
This method also handle logging when the API server return error status code.
Parameters
----------
url : str
The url of the request.
headers : dict | None
The dict that describes the headers of the request.
timeout: int | None
The request timeout (optional).
allow_redirects: bool
Whether to allow redirects. Default: True.
Returns
-------
Response | None
If a Response object is returned and ``allow_redirects`` is ``True`` (the default) it will have a status code of
200 (OK). If ``allow_redirects`` is ``False`` the response can instead have a status code of 302. Otherwise, the
request has failed and ``None`` will be returned.
"""
logger.debug("HEAD - %s", url)
if not timeout:
timeout = defaults.getint("requests", "timeout", fallback=10)
error_retries = defaults.getint("requests", "error_retries", fallback=5)
retry_counter = error_retries
try:
response = requests.head(
url=url,
headers=headers,
timeout=timeout,
allow_redirects=allow_redirects,
)
except requests.exceptions.RequestException as error:
logger.debug(error)
return None
if not allow_redirects and response.status_code == 302:
# Found, most likely because a redirect is about to happen.
return response
while response.status_code != 200:
logger.debug(
"Receiving error code %s from server.",
response.status_code,
)
if retry_counter <= 0:
logger.debug("Maximum retries reached: %s", error_retries)
return None
if response.status_code == 403:
check_rate_limit(response)
else:
return None
retry_counter = retry_counter - 1
response = requests.head(
url=url,
headers=headers,
timeout=timeout,
allow_redirects=allow_redirects,
)

return response


def send_get_http_raw(
url: str, headers: dict | None = None, timeout: int | None = None, allow_redirects: bool = True
) -> Response | None:
Expand Down
33 changes: 12 additions & 21 deletions tests/malware_analyzer/pypi/test_wheel_absence.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@ def test_analyze_no_information(pypi_package_json: MagicMock) -> None:
analyzer.analyze(pypi_package_json)


@patch("requests.head")
def test_analyze_tar_present(mock_head: MagicMock, pypi_package_json: MagicMock) -> None:
# Note: to patch a function, the way it is imported matters.
# e.g. if it is imported like this: import os; os.listdir() then you patch os.listdir
# if it is imported like this: from os import listdir; listdir() then you patch <module>.listdir
@patch("macaron.malware_analyzer.pypi_heuristics.metadata.wheel_absence.send_head_http_raw")
def test_analyze_tar_present(mock_send_head_http_raw: MagicMock, pypi_package_json: MagicMock) -> None:
"""Test for when only .tar.gz is present, so failed"""
analyzer = WheelAbsenceAnalyzer()
version = "0.1.0"
Expand Down Expand Up @@ -66,11 +69,7 @@ def test_analyze_tar_present(mock_head: MagicMock, pypi_package_json: MagicMock)
pypi_package_json.get_latest_version.return_value = version
pypi_package_json.component.version = None
pypi_package_json.package_json = {"info": {"name": "ttttttttest_nester"}}
pypi_package_json.pypi_registry.request_timeout = 100

inspector_link_mock = MagicMock()
inspector_link_mock.status_code = 200
mock_head.return_value = inspector_link_mock
mock_send_head_http_raw.return_value = MagicMock() # assume valid URL for testing purposes

expected_result: tuple[HeuristicResult, dict] = (HeuristicResult.FAIL, {version: [url, inspector_link_expected]})

Expand All @@ -79,8 +78,8 @@ def test_analyze_tar_present(mock_head: MagicMock, pypi_package_json: MagicMock)
assert actual_result == expected_result


@patch("requests.head")
def test_analyze_whl_present(mock_head: MagicMock, pypi_package_json: MagicMock) -> None:
@patch("macaron.malware_analyzer.pypi_heuristics.metadata.wheel_absence.send_head_http_raw")
def test_analyze_whl_present(mock_send_head_http_raw: MagicMock, pypi_package_json: MagicMock) -> None:
"""Test for when only .whl is present, so pass"""
analyzer = WheelAbsenceAnalyzer()
version = "0.1.0"
Expand Down Expand Up @@ -123,11 +122,7 @@ def test_analyze_whl_present(mock_head: MagicMock, pypi_package_json: MagicMock)
pypi_package_json.get_releases.return_value = release
pypi_package_json.component.version = version
pypi_package_json.package_json = {"info": {"name": "ttttttttest_nester"}}
pypi_package_json.pypi_registry.request_timeout = 100

inspector_link_mock = MagicMock()
inspector_link_mock.status_code = 200
mock_head.return_value = inspector_link_mock
mock_send_head_http_raw.return_value = MagicMock() # assume valid URL for testing purposes

expected_result: tuple[HeuristicResult, dict] = (HeuristicResult.PASS, {version: [url, inspector_link_expected]})

Expand All @@ -136,8 +131,8 @@ def test_analyze_whl_present(mock_head: MagicMock, pypi_package_json: MagicMock)
assert actual_result == expected_result


@patch("requests.head")
def test_analyze_both_present(mock_head: MagicMock, pypi_package_json: MagicMock) -> None:
@patch("macaron.malware_analyzer.pypi_heuristics.metadata.wheel_absence.send_head_http_raw")
def test_analyze_both_present(mock_send_head_http_raw: MagicMock, pypi_package_json: MagicMock) -> None:
"""Test for when both .tar.gz and .whl are present, so passed"""
analyzer = WheelAbsenceAnalyzer()
version = "0.1.0"
Expand Down Expand Up @@ -209,11 +204,7 @@ def test_analyze_both_present(mock_head: MagicMock, pypi_package_json: MagicMock
pypi_package_json.get_releases.return_value = release
pypi_package_json.component.version = version
pypi_package_json.package_json = {"info": {"name": "ttttttttest_nester"}}
pypi_package_json.pypi_registry.request_timeout = 100

inspector_link_mock = MagicMock()
inspector_link_mock.status_code = 200
mock_head.return_value = inspector_link_mock
mock_send_head_http_raw.return_value = MagicMock() # assume valid URL for testing purposes

expected_result: tuple[HeuristicResult, dict] = (
HeuristicResult.PASS,
Expand Down

0 comments on commit f9626a7

Please sign in to comment.