Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: include inspector package urls as part of the malicious metadata facts for pypi packages #935

Merged
merged 10 commits into from
Dec 6, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@
import logging

from macaron.errors import HeuristicAnalyzerValueError
from macaron.json_tools import JsonType
from macaron.json_tools import JsonType, json_extract
from macaron.malware_analyzer.pypi_heuristics.base_analyzer import BaseHeuristicAnalyzer
from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult, Heuristics
from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset
from macaron.util import send_head_http_raw

logger: logging.Logger = logging.getLogger(__name__)

Expand All @@ -23,6 +24,10 @@ class WheelAbsenceAnalyzer(BaseHeuristicAnalyzer):
"""

WHEEL: str = "bdist_wheel"
# as per https://github.com/pypi/inspector/blob/main/inspector/main.py line 125
INSPECTOR_TEMPLATE = (
"https://inspector.pypi.io/project/{name}/{version}/packages/{first}/{second}/{rest}/{filename}"
)

def __init__(self) -> None:
super().__init__(
Expand Down Expand Up @@ -64,21 +69,46 @@ def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicRes
logger.debug(error_msg)
raise HeuristicAnalyzerValueError(error_msg)

release_files: list[JsonType] = []
inspector_links: list[JsonType] = []
wheel_present: bool = False

try:
for release_metadata in releases[version]:
if release_metadata["packagetype"] == self.WHEEL:
wheel_present = True

release_files.append(release_metadata["filename"])
name = json_extract(pypi_package_json.package_json, ["info", "name"], str)
if name is None:
error_msg = "There is no 'name' field for this package."
logger.debug(error_msg)
raise HeuristicAnalyzerValueError(error_msg)

blake2b_256 = release_metadata["digests"]["blake2b_256"]
art1f1c3R marked this conversation as resolved.
Show resolved Hide resolved
inspector_link = self.INSPECTOR_TEMPLATE.format(
name=name,
version=version,
first=blake2b_256[0:2],
second=blake2b_256[2:4],
rest=blake2b_256[4:],
filename=release_metadata["filename"],
)

# use a head request because we don't care about the response contents
if send_head_http_raw(inspector_link) is None:
inspector_links.append(None)
else:
inspector_links.append(inspector_link)

except KeyError as error:
error_msg = f"The version {version} is not available as a release."
logger.debug(error_msg)
raise HeuristicAnalyzerValueError(error_msg) from error

detail_info: dict[str, JsonType] = {
"inspector_links": inspector_links,
}

if wheel_present:
return HeuristicResult.PASS, {version: release_files}
return HeuristicResult.PASS, detail_info

return HeuristicResult.FAIL, {version: release_files}
return HeuristicResult.FAIL, detail_info
66 changes: 66 additions & 0 deletions src/macaron/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,72 @@ def send_get_http(url: str, headers: dict) -> dict:
return dict(response.json())


def send_head_http_raw(
url: str, headers: dict | None = None, timeout: int | None = None, allow_redirects: bool = True
) -> Response | None:
"""Send the HEAD HTTP request with the given url and headers.

This method also handle logging when the API server return error status code.

Parameters
----------
url : str
The url of the request.
headers : dict | None
The dict that describes the headers of the request.
timeout: int | None
The request timeout (optional).
allow_redirects: bool
Whether to allow redirects. Default: True.

Returns
-------
Response | None
If a Response object is returned and ``allow_redirects`` is ``True`` (the default) it will have a status code of
200 (OK). If ``allow_redirects`` is ``False`` the response can instead have a status code of 302. Otherwise, the
request has failed and ``None`` will be returned.
"""
logger.debug("HEAD - %s", url)
if not timeout:
timeout = defaults.getint("requests", "timeout", fallback=10)
error_retries = defaults.getint("requests", "error_retries", fallback=5)
retry_counter = error_retries
try:
response = requests.head(
url=url,
headers=headers,
timeout=timeout,
allow_redirects=allow_redirects,
)
except requests.exceptions.RequestException as error:
logger.debug(error)
return None
if not allow_redirects and response.status_code == 302:
# Found, most likely because a redirect is about to happen.
return response
while response.status_code != 200:
logger.debug(
"Receiving error code %s from server.",
response.status_code,
)
if retry_counter <= 0:
logger.debug("Maximum retries reached: %s", error_retries)
return None
if response.status_code == 403:
check_rate_limit(response)
else:
return None
retry_counter = retry_counter - 1
response = requests.head(
url=url,
headers=headers,
timeout=timeout,
allow_redirects=allow_redirects,
)

return response


def send_get_http_raw(
url: str, headers: dict | None = None, timeout: int | None = None, allow_redirects: bool = True
) -> Response | None:
Expand Down
88 changes: 70 additions & 18 deletions tests/malware_analyzer/pypi/test_wheel_absence.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.

"""Tests for heuristic detecting wheel (.whl) file absence from PyPI packages"""
from unittest.mock import MagicMock
from unittest.mock import MagicMock, patch

import pytest

Expand All @@ -21,11 +21,23 @@ def test_analyze_no_information(pypi_package_json: MagicMock) -> None:
analyzer.analyze(pypi_package_json)


def test_analyze_tar_present(pypi_package_json: MagicMock) -> None:
# Note: to patch a function, the way it is imported matters.
# e.g. if it is imported like this: import os; os.listdir() then you patch os.listdir
# if it is imported like this: from os import listdir; listdir() then you patch <module>.listdir
@patch("macaron.malware_analyzer.pypi_heuristics.metadata.wheel_absence.send_head_http_raw")
def test_analyze_tar_present(mock_send_head_http_raw: MagicMock, pypi_package_json: MagicMock) -> None:
"""Test for when only .tar.gz is present, so failed"""
analyzer = WheelAbsenceAnalyzer()
version = "0.1.0"
filename = "ttttttttest_nester.py-0.1.0.tar.gz"
url = (
"https://files.pythonhosted.org/packages/de/fa/"
+ f"2fbcebaeeb909511139ce28dac4a77ab2452ba72b49a22b12981b2f375b3/{filename}"
)
inspector_link_expected = (
"https://inspector.pypi.io/project/ttttttttest_nester/0.1.0/packages/"
+ f"de/fa/2fbcebaeeb909511139ce28dac4a77ab2452ba72b49a22b12981b2f375b3/{filename}"
)

release = {
version: [
Expand All @@ -46,8 +58,7 @@ def test_analyze_tar_present(pypi_package_json: MagicMock) -> None:
"size": 546,
"upload_time": "2016-10-13T05:42:27",
"upload_time_iso_8601": "2016-10-13T05:42:27.073842Z",
"url": f"https://files.pythonhosted.org/packages/de/fa/2fbcebaeeb909511139ce28d \
ac4a77ab2452ba72b49a22b12981b2f375b3/{filename}",
"url": url,
"yanked": False,
"yanked_reason": None,
}
Expand All @@ -57,18 +68,34 @@ def test_analyze_tar_present(pypi_package_json: MagicMock) -> None:
pypi_package_json.get_releases.return_value = release
pypi_package_json.get_latest_version.return_value = version
pypi_package_json.component.version = None
expected_result: tuple[HeuristicResult, dict] = (HeuristicResult.FAIL, {version: [filename]})
pypi_package_json.package_json = {"info": {"name": "ttttttttest_nester"}}
mock_send_head_http_raw.return_value = MagicMock() # assume valid URL for testing purposes

expected_detail_info = {
"inspector_links": [inspector_link_expected],
}

expected_result: tuple[HeuristicResult, dict] = (HeuristicResult.FAIL, expected_detail_info)

actual_result = analyzer.analyze(pypi_package_json)

assert actual_result == expected_result


def test_analyze_whl_present(pypi_package_json: MagicMock) -> None:
@patch("macaron.malware_analyzer.pypi_heuristics.metadata.wheel_absence.send_head_http_raw")
def test_analyze_whl_present(mock_send_head_http_raw: MagicMock, pypi_package_json: MagicMock) -> None:
"""Test for when only .whl is present, so pass"""
analyzer = WheelAbsenceAnalyzer()
version = "0.1.0"
filename = "ttttttttest_nester.py-0.1.0.whl"
url = (
"https://files.pythonhosted.org/packages/de/fa/"
+ f"2fbcebaeeb909511139ce28dac4a77ab2452ba72b49a22b12981b2f375b3/{filename}"
)
inspector_link_expected = (
"https://inspector.pypi.io/project/ttttttttest_nester/0.1.0/packages/"
+ f"de/fa/2fbcebaeeb909511139ce28dac4a77ab2452ba72b49a22b12981b2f375b3/{filename}"
)

release = {
version: [
Expand All @@ -89,8 +116,7 @@ def test_analyze_whl_present(pypi_package_json: MagicMock) -> None:
"size": 546,
"upload_time": "2016-10-13T05:42:27",
"upload_time_iso_8601": "2016-10-13T05:42:27.073842Z",
"url": f"https://files.pythonhosted.org/packages/de/fa/2fbcebaeeb909511139ce28d \
ac4a77ab2452ba72b49a22b12981b2f375b3/{filename}",
"url": url,
"yanked": False,
"yanked_reason": None,
}
Expand All @@ -99,18 +125,42 @@ def test_analyze_whl_present(pypi_package_json: MagicMock) -> None:

pypi_package_json.get_releases.return_value = release
pypi_package_json.component.version = version
expected_result: tuple[HeuristicResult, dict] = (HeuristicResult.PASS, {version: [filename]})
pypi_package_json.package_json = {"info": {"name": "ttttttttest_nester"}}
mock_send_head_http_raw.return_value = MagicMock() # assume valid URL for testing purposes

expected_detail_info = {
"inspector_links": [inspector_link_expected],
}

expected_result: tuple[HeuristicResult, dict] = (HeuristicResult.PASS, expected_detail_info)

actual_result = analyzer.analyze(pypi_package_json)

assert actual_result == expected_result


def test_analyze_both_present(pypi_package_json: MagicMock) -> None:
@patch("macaron.malware_analyzer.pypi_heuristics.metadata.wheel_absence.send_head_http_raw")
def test_analyze_both_present(mock_send_head_http_raw: MagicMock, pypi_package_json: MagicMock) -> None:
"""Test for when both .tar.gz and .whl are present, so passed"""
analyzer = WheelAbsenceAnalyzer()
version = "0.1.0"
file_prefix = "ttttttttest_nester.py-0.1.0"
wheel_url = (
"https://files.pythonhosted.org/packages/de/fa/"
+ f"2fbcebaeeb909511139ce28dac4a77ab2452ba72b49a22b12981b2f375b3/{file_prefix}.whl"
)
tar_url = (
"https://files.pythonhosted.org/packages/de/fa/"
+ f"2fbcebaeeb909511139ce28dac4a77ab2452ba72b49a22b12981b2f375b3/{file_prefix}.tar.gz"
)
wheel_link_expected = (
"https://inspector.pypi.io/project/ttttttttest_nester/0.1.0/packages/"
+ f"de/fa/2fbcebaeeb909511139ce28dac4a77ab2452ba72b49a22b12981b2f375b3/{file_prefix}.whl"
)
tar_link_expected = (
"https://inspector.pypi.io/project/ttttttttest_nester/0.1.0/packages/"
+ f"de/fa/2fbcebaeeb909511139ce28dac4a77ab2452ba72b49a22b12981b2f375b3/{file_prefix}.tar.gz"
)

release = {
version: [
Expand All @@ -131,8 +181,7 @@ def test_analyze_both_present(pypi_package_json: MagicMock) -> None:
"size": 546,
"upload_time": "2016-10-13T05:42:27",
"upload_time_iso_8601": "2016-10-13T05:42:27.073842Z",
"url": f"https://files.pythonhosted.org/packages/de/fa/2fbcebaeeb909511139ce28d \
ac4a77ab2452ba72b49a22b12981b2f375b3/{file_prefix}.whl",
"url": wheel_url,
"yanked": False,
"yanked_reason": None,
},
Expand All @@ -153,8 +202,7 @@ def test_analyze_both_present(pypi_package_json: MagicMock) -> None:
"size": 546,
"upload_time": "2016-10-13T05:42:27",
"upload_time_iso_8601": "2016-10-13T05:42:27.073842Z",
"url": f"https://files.pythonhosted.org/packages/de/fa/2fbcebaeeb909511139ce28d \
ac4a77ab2452ba72b49a22b12981b2f375b3/{file_prefix}.tar.gz",
"url": tar_url,
"yanked": False,
"yanked_reason": None,
},
Expand All @@ -163,10 +211,14 @@ def test_analyze_both_present(pypi_package_json: MagicMock) -> None:

pypi_package_json.get_releases.return_value = release
pypi_package_json.component.version = version
expected_result: tuple[HeuristicResult, dict] = (
HeuristicResult.PASS,
{version: [f"{file_prefix}.whl", f"{file_prefix}.tar.gz"]},
)
pypi_package_json.package_json = {"info": {"name": "ttttttttest_nester"}}
mock_send_head_http_raw.return_value = MagicMock() # assume valid URL for testing purposes

expected_detail_info = {
"inspector_links": [wheel_link_expected, tar_link_expected],
}

expected_result: tuple[HeuristicResult, dict] = (HeuristicResult.PASS, expected_detail_info)

actual_result = analyzer.analyze(pypi_package_json)

Expand Down
Loading