Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: include inspector package urls as part of the malicious metadata facts for pypi packages #935

Merged
merged 10 commits into from
Dec 6, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@
import logging

from macaron.errors import HeuristicAnalyzerValueError
from macaron.json_tools import JsonType
from macaron.json_tools import JsonType, json_extract
from macaron.malware_analyzer.pypi_heuristics.base_analyzer import BaseHeuristicAnalyzer
from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult, Heuristics
from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset
from macaron.util import send_head_http_raw

logger: logging.Logger = logging.getLogger(__name__)

Expand All @@ -23,6 +24,8 @@ class WheelAbsenceAnalyzer(BaseHeuristicAnalyzer):
"""

WHEEL: str = "bdist_wheel"
INSPECTOR_PREFIX = "https://inspector.pypi.io/project/"
PYPI_PREFIX = "https://files.pythonhosted.org/"

def __init__(self) -> None:
super().__init__(
Expand Down Expand Up @@ -72,7 +75,24 @@ def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicRes
if release_metadata["packagetype"] == self.WHEEL:
wheel_present = True

release_files.append(release_metadata["filename"])
name = json_extract(pypi_package_json.package_json, ["info", "name"], str)
if name is None:
error_msg = "There is no 'name' field for this package."
logger.debug(error_msg)
raise HeuristicAnalyzerValueError(error_msg)

# include the pypi inspector link, which uses the same suffix of
# packages/{blake2b_256}/file_name
inspector_prefix = f"{self.INSPECTOR_PREFIX}{name.lower()}/{version}/"
inspector_link = release_metadata["url"].replace(self.PYPI_PREFIX, inspector_prefix)
behnazh-w marked this conversation as resolved.
Show resolved Hide resolved
art1f1c3R marked this conversation as resolved.
Show resolved Hide resolved

# use a head request because we don't care about the response contents
if send_head_http_raw(inspector_link) is None:
inspector_link = ""

release_files.append(release_metadata["url"])
art1f1c3R marked this conversation as resolved.
Show resolved Hide resolved
release_files.append(inspector_link)

except KeyError as error:
error_msg = f"The version {version} is not available as a release."
logger.debug(error_msg)
Expand Down
66 changes: 66 additions & 0 deletions src/macaron/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,72 @@ def send_get_http(url: str, headers: dict) -> dict:
return dict(response.json())


def send_head_http_raw(
url: str, headers: dict | None = None, timeout: int | None = None, allow_redirects: bool = True
) -> Response | None:
"""Send the HEAD HTTP request with the given url and headers.

This method also handle logging when the API server return error status code.

Parameters
----------
url : str
The url of the request.
headers : dict | None
The dict that describes the headers of the request.
timeout: int | None
The request timeout (optional).
allow_redirects: bool
Whether to allow redirects. Default: True.

Returns
-------
Response | None
If a Response object is returned and ``allow_redirects`` is ``True`` (the default) it will have a status code of
200 (OK). If ``allow_redirects`` is ``False`` the response can instead have a status code of 302. Otherwise, the
request has failed and ``None`` will be returned.
"""
logger.debug("HEAD - %s", url)
if not timeout:
timeout = defaults.getint("requests", "timeout", fallback=10)
error_retries = defaults.getint("requests", "error_retries", fallback=5)
retry_counter = error_retries
try:
response = requests.head(
url=url,
headers=headers,
timeout=timeout,
allow_redirects=allow_redirects,
)
except requests.exceptions.RequestException as error:
logger.debug(error)
return None
if not allow_redirects and response.status_code == 302:
# Found, most likely because a redirect is about to happen.
return response
while response.status_code != 200:
logger.debug(
"Receiving error code %s from server.",
response.status_code,
)
if retry_counter <= 0:
logger.debug("Maximum retries reached: %s", error_retries)
return None
if response.status_code == 403:
check_rate_limit(response)
else:
return None
retry_counter = retry_counter - 1
response = requests.head(
url=url,
headers=headers,
timeout=timeout,
allow_redirects=allow_redirects,
)

return response


def send_get_http_raw(
url: str, headers: dict | None = None, timeout: int | None = None, allow_redirects: bool = True
) -> Response | None:
Expand Down
73 changes: 58 additions & 15 deletions tests/malware_analyzer/pypi/test_wheel_absence.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.

"""Tests for heuristic detecting wheel (.whl) file absence from PyPI packages"""
from unittest.mock import MagicMock
from unittest.mock import MagicMock, patch

import pytest

Expand All @@ -21,11 +21,23 @@ def test_analyze_no_information(pypi_package_json: MagicMock) -> None:
analyzer.analyze(pypi_package_json)


def test_analyze_tar_present(pypi_package_json: MagicMock) -> None:
# Note: to patch a function, the way it is imported matters.
# e.g. if it is imported like this: import os; os.listdir() then you patch os.listdir
# if it is imported like this: from os import listdir; listdir() then you patch <module>.listdir
@patch("macaron.malware_analyzer.pypi_heuristics.metadata.wheel_absence.send_head_http_raw")
def test_analyze_tar_present(mock_send_head_http_raw: MagicMock, pypi_package_json: MagicMock) -> None:
"""Test for when only .tar.gz is present, so failed"""
analyzer = WheelAbsenceAnalyzer()
version = "0.1.0"
filename = "ttttttttest_nester.py-0.1.0.tar.gz"
url = (
"https://files.pythonhosted.org/packages/de/fa/"
+ f"2fbcebaeeb909511139ce28dac4a77ab2452ba72b49a22b12981b2f375b3/{filename}"
)
inspector_link_expected = (
"https://inspector.pypi.io/project/ttttttttest_nester/0.1.0/packages/"
+ f"de/fa/2fbcebaeeb909511139ce28dac4a77ab2452ba72b49a22b12981b2f375b3/{filename}"
)

release = {
version: [
Expand All @@ -46,8 +58,7 @@ def test_analyze_tar_present(pypi_package_json: MagicMock) -> None:
"size": 546,
"upload_time": "2016-10-13T05:42:27",
"upload_time_iso_8601": "2016-10-13T05:42:27.073842Z",
"url": f"https://files.pythonhosted.org/packages/de/fa/2fbcebaeeb909511139ce28d \
ac4a77ab2452ba72b49a22b12981b2f375b3/{filename}",
"url": url,
"yanked": False,
"yanked_reason": None,
}
Expand All @@ -57,18 +68,30 @@ def test_analyze_tar_present(pypi_package_json: MagicMock) -> None:
pypi_package_json.get_releases.return_value = release
pypi_package_json.get_latest_version.return_value = version
pypi_package_json.component.version = None
expected_result: tuple[HeuristicResult, dict] = (HeuristicResult.FAIL, {version: [filename]})
pypi_package_json.package_json = {"info": {"name": "ttttttttest_nester"}}
mock_send_head_http_raw.return_value = MagicMock() # assume valid URL for testing purposes

expected_result: tuple[HeuristicResult, dict] = (HeuristicResult.FAIL, {version: [url, inspector_link_expected]})

actual_result = analyzer.analyze(pypi_package_json)

assert actual_result == expected_result


def test_analyze_whl_present(pypi_package_json: MagicMock) -> None:
@patch("macaron.malware_analyzer.pypi_heuristics.metadata.wheel_absence.send_head_http_raw")
def test_analyze_whl_present(mock_send_head_http_raw: MagicMock, pypi_package_json: MagicMock) -> None:
"""Test for when only .whl is present, so pass"""
analyzer = WheelAbsenceAnalyzer()
version = "0.1.0"
filename = "ttttttttest_nester.py-0.1.0.whl"
url = (
"https://files.pythonhosted.org/packages/de/fa/"
+ f"2fbcebaeeb909511139ce28dac4a77ab2452ba72b49a22b12981b2f375b3/{filename}"
)
inspector_link_expected = (
"https://inspector.pypi.io/project/ttttttttest_nester/0.1.0/packages/"
+ f"de/fa/2fbcebaeeb909511139ce28dac4a77ab2452ba72b49a22b12981b2f375b3/{filename}"
)

release = {
version: [
Expand All @@ -89,8 +112,7 @@ def test_analyze_whl_present(pypi_package_json: MagicMock) -> None:
"size": 546,
"upload_time": "2016-10-13T05:42:27",
"upload_time_iso_8601": "2016-10-13T05:42:27.073842Z",
"url": f"https://files.pythonhosted.org/packages/de/fa/2fbcebaeeb909511139ce28d \
ac4a77ab2452ba72b49a22b12981b2f375b3/{filename}",
"url": url,
"yanked": False,
"yanked_reason": None,
}
Expand All @@ -99,18 +121,38 @@ def test_analyze_whl_present(pypi_package_json: MagicMock) -> None:

pypi_package_json.get_releases.return_value = release
pypi_package_json.component.version = version
expected_result: tuple[HeuristicResult, dict] = (HeuristicResult.PASS, {version: [filename]})
pypi_package_json.package_json = {"info": {"name": "ttttttttest_nester"}}
mock_send_head_http_raw.return_value = MagicMock() # assume valid URL for testing purposes

expected_result: tuple[HeuristicResult, dict] = (HeuristicResult.PASS, {version: [url, inspector_link_expected]})

actual_result = analyzer.analyze(pypi_package_json)

assert actual_result == expected_result


def test_analyze_both_present(pypi_package_json: MagicMock) -> None:
@patch("macaron.malware_analyzer.pypi_heuristics.metadata.wheel_absence.send_head_http_raw")
def test_analyze_both_present(mock_send_head_http_raw: MagicMock, pypi_package_json: MagicMock) -> None:
"""Test for when both .tar.gz and .whl are present, so passed"""
analyzer = WheelAbsenceAnalyzer()
version = "0.1.0"
file_prefix = "ttttttttest_nester.py-0.1.0"
wheel_url = (
"https://files.pythonhosted.org/packages/de/fa/"
+ f"2fbcebaeeb909511139ce28dac4a77ab2452ba72b49a22b12981b2f375b3/{file_prefix}.whl"
)
tar_url = (
"https://files.pythonhosted.org/packages/de/fa/"
+ f"2fbcebaeeb909511139ce28dac4a77ab2452ba72b49a22b12981b2f375b3/{file_prefix}.tar.gz"
)
wheel_link_expected = (
"https://inspector.pypi.io/project/ttttttttest_nester/0.1.0/packages/"
+ f"de/fa/2fbcebaeeb909511139ce28dac4a77ab2452ba72b49a22b12981b2f375b3/{file_prefix}.whl"
)
tar_link_expected = (
"https://inspector.pypi.io/project/ttttttttest_nester/0.1.0/packages/"
+ f"de/fa/2fbcebaeeb909511139ce28dac4a77ab2452ba72b49a22b12981b2f375b3/{file_prefix}.tar.gz"
)

release = {
version: [
Expand All @@ -131,8 +173,7 @@ def test_analyze_both_present(pypi_package_json: MagicMock) -> None:
"size": 546,
"upload_time": "2016-10-13T05:42:27",
"upload_time_iso_8601": "2016-10-13T05:42:27.073842Z",
"url": f"https://files.pythonhosted.org/packages/de/fa/2fbcebaeeb909511139ce28d \
ac4a77ab2452ba72b49a22b12981b2f375b3/{file_prefix}.whl",
"url": wheel_url,
"yanked": False,
"yanked_reason": None,
},
Expand All @@ -153,8 +194,7 @@ def test_analyze_both_present(pypi_package_json: MagicMock) -> None:
"size": 546,
"upload_time": "2016-10-13T05:42:27",
"upload_time_iso_8601": "2016-10-13T05:42:27.073842Z",
"url": f"https://files.pythonhosted.org/packages/de/fa/2fbcebaeeb909511139ce28d \
ac4a77ab2452ba72b49a22b12981b2f375b3/{file_prefix}.tar.gz",
"url": tar_url,
"yanked": False,
"yanked_reason": None,
},
Expand All @@ -163,9 +203,12 @@ def test_analyze_both_present(pypi_package_json: MagicMock) -> None:

pypi_package_json.get_releases.return_value = release
pypi_package_json.component.version = version
pypi_package_json.package_json = {"info": {"name": "ttttttttest_nester"}}
mock_send_head_http_raw.return_value = MagicMock() # assume valid URL for testing purposes

expected_result: tuple[HeuristicResult, dict] = (
HeuristicResult.PASS,
{version: [f"{file_prefix}.whl", f"{file_prefix}.tar.gz"]},
{version: [wheel_url, wheel_link_expected, tar_url, tar_link_expected]},
)

actual_result = analyzer.analyze(pypi_package_json)
Expand Down
Loading