Skip to content

Commit

Permalink
feat: include inspector package urls as part of the malicious metadat…
Browse files Browse the repository at this point in the history
…a facts for pypi packages (#935)

included as part of the wheel absence heuristic, it now returns instead of filenames links to the corresponding pypi inspector page as part of its information gathering.
  • Loading branch information
art1f1c3R authored Dec 6, 2024
1 parent d4294d5 commit fd17eaa
Show file tree
Hide file tree
Showing 3 changed files with 197 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@
import logging

from macaron.errors import HeuristicAnalyzerValueError
from macaron.json_tools import JsonType
from macaron.json_tools import JsonType, json_extract
from macaron.malware_analyzer.pypi_heuristics.base_analyzer import BaseHeuristicAnalyzer
from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult, Heuristics
from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset
from macaron.util import send_head_http_raw

logger: logging.Logger = logging.getLogger(__name__)

Expand All @@ -23,6 +24,10 @@ class WheelAbsenceAnalyzer(BaseHeuristicAnalyzer):
"""

WHEEL: str = "bdist_wheel"
# as per https://github.com/pypi/inspector/blob/main/inspector/main.py line 125
INSPECTOR_TEMPLATE = (
"https://inspector.pypi.io/project/{name}/{version}/packages/{first}/{second}/{rest}/{filename}"
)

def __init__(self) -> None:
super().__init__(
Expand All @@ -47,7 +52,7 @@ def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicRes
Raises
------
HeuristicAnalyzerValueError
If there is no release information, or has no most recent version (if queried).
If there is no release information, or has other missing package information.
"""
releases = pypi_package_json.get_releases()
if releases is None: # no release information
Expand All @@ -64,21 +69,64 @@ def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicRes
logger.debug(error_msg)
raise HeuristicAnalyzerValueError(error_msg)

release_files: list[JsonType] = []
inspector_links: list[JsonType] = []
wheel_present: bool = False

try:
for release_metadata in releases[version]:
if release_metadata["packagetype"] == self.WHEEL:
wheel_present = True

release_files.append(release_metadata["filename"])
except KeyError as error:
release_distributions = json_extract(releases, [version], list)
if release_distributions is None:
error_msg = f"The version {version} is not available as a release."
logger.debug(error_msg)
raise HeuristicAnalyzerValueError(error_msg) from error
raise HeuristicAnalyzerValueError(error_msg)

for distribution in release_distributions:
# validate data
package_type = json_extract(distribution, ["packagetype"], str)
if package_type is None:
error_msg = f"The version {version} has no 'package type' field in a distribution"
logger.debug(error_msg)
raise HeuristicAnalyzerValueError(error_msg)

name = json_extract(pypi_package_json.package_json, ["info", "name"], str)
if name is None:
error_msg = f"The version {version} has no 'name' field in a distribution"
logger.debug(error_msg)
raise HeuristicAnalyzerValueError(error_msg)

blake2b_256 = json_extract(distribution, ["digests", "blake2b_256"], str)
if blake2b_256 is None:
error_msg = f"The version {version} has no 'blake2b_256' field in a distribution"
logger.debug(error_msg)
raise HeuristicAnalyzerValueError(error_msg)

filename = json_extract(distribution, ["filename"], str)
if filename is None:
error_msg = f"The version {version} has no 'filename' field in a distribution"
logger.debug(error_msg)
raise HeuristicAnalyzerValueError(error_msg)

if package_type == self.WHEEL:
wheel_present = True

inspector_link = self.INSPECTOR_TEMPLATE.format(
name=name,
version=version,
first=blake2b_256[0:2],
second=blake2b_256[2:4],
rest=blake2b_256[4:],
filename=filename,
)

# use a head request because we don't care about the response contents
if send_head_http_raw(inspector_link) is None:
inspector_links.append(None)
else:
inspector_links.append(inspector_link)

detail_info: dict[str, JsonType] = {
"inspector_links": inspector_links,
}

if wheel_present:
return HeuristicResult.PASS, {version: release_files}
return HeuristicResult.PASS, detail_info

return HeuristicResult.FAIL, {version: release_files}
return HeuristicResult.FAIL, detail_info
66 changes: 66 additions & 0 deletions src/macaron/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,72 @@ def send_get_http(url: str, headers: dict) -> dict:
return dict(response.json())


def send_head_http_raw(
url: str, headers: dict | None = None, timeout: int | None = None, allow_redirects: bool = True
) -> Response | None:
"""Send the HEAD HTTP request with the given url and headers.
This method also handle logging when the API server return error status code.
Parameters
----------
url : str
The url of the request.
headers : dict | None
The dict that describes the headers of the request.
timeout: int | None
The request timeout (optional).
allow_redirects: bool
Whether to allow redirects. Default: True.
Returns
-------
Response | None
If a Response object is returned and ``allow_redirects`` is ``True`` (the default) it will have a status code of
200 (OK). If ``allow_redirects`` is ``False`` the response can instead have a status code of 302. Otherwise, the
request has failed and ``None`` will be returned.
"""
logger.debug("HEAD - %s", url)
if not timeout:
timeout = defaults.getint("requests", "timeout", fallback=10)
error_retries = defaults.getint("requests", "error_retries", fallback=5)
retry_counter = error_retries
try:
response = requests.head(
url=url,
headers=headers,
timeout=timeout,
allow_redirects=allow_redirects,
)
except requests.exceptions.RequestException as error:
logger.debug(error)
return None
if not allow_redirects and response.status_code == 302:
# Found, most likely because a redirect is about to happen.
return response
while response.status_code != 200:
logger.debug(
"Receiving error code %s from server.",
response.status_code,
)
if retry_counter <= 0:
logger.debug("Maximum retries reached: %s", error_retries)
return None
if response.status_code == 403:
check_rate_limit(response)
else:
return None
retry_counter = retry_counter - 1
response = requests.head(
url=url,
headers=headers,
timeout=timeout,
allow_redirects=allow_redirects,
)

return response


def send_get_http_raw(
url: str, headers: dict | None = None, timeout: int | None = None, allow_redirects: bool = True
) -> Response | None:
Expand Down
88 changes: 70 additions & 18 deletions tests/malware_analyzer/pypi/test_wheel_absence.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.

"""Tests for heuristic detecting wheel (.whl) file absence from PyPI packages"""
from unittest.mock import MagicMock
from unittest.mock import MagicMock, patch

import pytest

Expand All @@ -21,11 +21,23 @@ def test_analyze_no_information(pypi_package_json: MagicMock) -> None:
analyzer.analyze(pypi_package_json)


def test_analyze_tar_present(pypi_package_json: MagicMock) -> None:
# Note: to patch a function, the way it is imported matters.
# e.g. if it is imported like this: import os; os.listdir() then you patch os.listdir
# if it is imported like this: from os import listdir; listdir() then you patch <module>.listdir
@patch("macaron.malware_analyzer.pypi_heuristics.metadata.wheel_absence.send_head_http_raw")
def test_analyze_tar_present(mock_send_head_http_raw: MagicMock, pypi_package_json: MagicMock) -> None:
"""Test for when only .tar.gz is present, so failed"""
analyzer = WheelAbsenceAnalyzer()
version = "0.1.0"
filename = "ttttttttest_nester.py-0.1.0.tar.gz"
url = (
"https://files.pythonhosted.org/packages/de/fa/"
+ f"2fbcebaeeb909511139ce28dac4a77ab2452ba72b49a22b12981b2f375b3/{filename}"
)
inspector_link_expected = (
"https://inspector.pypi.io/project/ttttttttest_nester/0.1.0/packages/"
+ f"de/fa/2fbcebaeeb909511139ce28dac4a77ab2452ba72b49a22b12981b2f375b3/{filename}"
)

release = {
version: [
Expand All @@ -46,8 +58,7 @@ def test_analyze_tar_present(pypi_package_json: MagicMock) -> None:
"size": 546,
"upload_time": "2016-10-13T05:42:27",
"upload_time_iso_8601": "2016-10-13T05:42:27.073842Z",
"url": f"https://files.pythonhosted.org/packages/de/fa/2fbcebaeeb909511139ce28d \
ac4a77ab2452ba72b49a22b12981b2f375b3/{filename}",
"url": url,
"yanked": False,
"yanked_reason": None,
}
Expand All @@ -57,18 +68,34 @@ def test_analyze_tar_present(pypi_package_json: MagicMock) -> None:
pypi_package_json.get_releases.return_value = release
pypi_package_json.get_latest_version.return_value = version
pypi_package_json.component.version = None
expected_result: tuple[HeuristicResult, dict] = (HeuristicResult.FAIL, {version: [filename]})
pypi_package_json.package_json = {"info": {"name": "ttttttttest_nester"}}
mock_send_head_http_raw.return_value = MagicMock() # assume valid URL for testing purposes

expected_detail_info = {
"inspector_links": [inspector_link_expected],
}

expected_result: tuple[HeuristicResult, dict] = (HeuristicResult.FAIL, expected_detail_info)

actual_result = analyzer.analyze(pypi_package_json)

assert actual_result == expected_result


def test_analyze_whl_present(pypi_package_json: MagicMock) -> None:
@patch("macaron.malware_analyzer.pypi_heuristics.metadata.wheel_absence.send_head_http_raw")
def test_analyze_whl_present(mock_send_head_http_raw: MagicMock, pypi_package_json: MagicMock) -> None:
"""Test for when only .whl is present, so pass"""
analyzer = WheelAbsenceAnalyzer()
version = "0.1.0"
filename = "ttttttttest_nester.py-0.1.0.whl"
url = (
"https://files.pythonhosted.org/packages/de/fa/"
+ f"2fbcebaeeb909511139ce28dac4a77ab2452ba72b49a22b12981b2f375b3/{filename}"
)
inspector_link_expected = (
"https://inspector.pypi.io/project/ttttttttest_nester/0.1.0/packages/"
+ f"de/fa/2fbcebaeeb909511139ce28dac4a77ab2452ba72b49a22b12981b2f375b3/{filename}"
)

release = {
version: [
Expand All @@ -89,8 +116,7 @@ def test_analyze_whl_present(pypi_package_json: MagicMock) -> None:
"size": 546,
"upload_time": "2016-10-13T05:42:27",
"upload_time_iso_8601": "2016-10-13T05:42:27.073842Z",
"url": f"https://files.pythonhosted.org/packages/de/fa/2fbcebaeeb909511139ce28d \
ac4a77ab2452ba72b49a22b12981b2f375b3/{filename}",
"url": url,
"yanked": False,
"yanked_reason": None,
}
Expand All @@ -99,18 +125,42 @@ def test_analyze_whl_present(pypi_package_json: MagicMock) -> None:

pypi_package_json.get_releases.return_value = release
pypi_package_json.component.version = version
expected_result: tuple[HeuristicResult, dict] = (HeuristicResult.PASS, {version: [filename]})
pypi_package_json.package_json = {"info": {"name": "ttttttttest_nester"}}
mock_send_head_http_raw.return_value = MagicMock() # assume valid URL for testing purposes

expected_detail_info = {
"inspector_links": [inspector_link_expected],
}

expected_result: tuple[HeuristicResult, dict] = (HeuristicResult.PASS, expected_detail_info)

actual_result = analyzer.analyze(pypi_package_json)

assert actual_result == expected_result


def test_analyze_both_present(pypi_package_json: MagicMock) -> None:
@patch("macaron.malware_analyzer.pypi_heuristics.metadata.wheel_absence.send_head_http_raw")
def test_analyze_both_present(mock_send_head_http_raw: MagicMock, pypi_package_json: MagicMock) -> None:
"""Test for when both .tar.gz and .whl are present, so passed"""
analyzer = WheelAbsenceAnalyzer()
version = "0.1.0"
file_prefix = "ttttttttest_nester.py-0.1.0"
wheel_url = (
"https://files.pythonhosted.org/packages/de/fa/"
+ f"2fbcebaeeb909511139ce28dac4a77ab2452ba72b49a22b12981b2f375b3/{file_prefix}.whl"
)
tar_url = (
"https://files.pythonhosted.org/packages/de/fa/"
+ f"2fbcebaeeb909511139ce28dac4a77ab2452ba72b49a22b12981b2f375b3/{file_prefix}.tar.gz"
)
wheel_link_expected = (
"https://inspector.pypi.io/project/ttttttttest_nester/0.1.0/packages/"
+ f"de/fa/2fbcebaeeb909511139ce28dac4a77ab2452ba72b49a22b12981b2f375b3/{file_prefix}.whl"
)
tar_link_expected = (
"https://inspector.pypi.io/project/ttttttttest_nester/0.1.0/packages/"
+ f"de/fa/2fbcebaeeb909511139ce28dac4a77ab2452ba72b49a22b12981b2f375b3/{file_prefix}.tar.gz"
)

release = {
version: [
Expand All @@ -131,8 +181,7 @@ def test_analyze_both_present(pypi_package_json: MagicMock) -> None:
"size": 546,
"upload_time": "2016-10-13T05:42:27",
"upload_time_iso_8601": "2016-10-13T05:42:27.073842Z",
"url": f"https://files.pythonhosted.org/packages/de/fa/2fbcebaeeb909511139ce28d \
ac4a77ab2452ba72b49a22b12981b2f375b3/{file_prefix}.whl",
"url": wheel_url,
"yanked": False,
"yanked_reason": None,
},
Expand All @@ -153,8 +202,7 @@ def test_analyze_both_present(pypi_package_json: MagicMock) -> None:
"size": 546,
"upload_time": "2016-10-13T05:42:27",
"upload_time_iso_8601": "2016-10-13T05:42:27.073842Z",
"url": f"https://files.pythonhosted.org/packages/de/fa/2fbcebaeeb909511139ce28d \
ac4a77ab2452ba72b49a22b12981b2f375b3/{file_prefix}.tar.gz",
"url": tar_url,
"yanked": False,
"yanked_reason": None,
},
Expand All @@ -163,10 +211,14 @@ def test_analyze_both_present(pypi_package_json: MagicMock) -> None:

pypi_package_json.get_releases.return_value = release
pypi_package_json.component.version = version
expected_result: tuple[HeuristicResult, dict] = (
HeuristicResult.PASS,
{version: [f"{file_prefix}.whl", f"{file_prefix}.tar.gz"]},
)
pypi_package_json.package_json = {"info": {"name": "ttttttttest_nester"}}
mock_send_head_http_raw.return_value = MagicMock() # assume valid URL for testing purposes

expected_detail_info = {
"inspector_links": [wheel_link_expected, tar_link_expected],
}

expected_result: tuple[HeuristicResult, dict] = (HeuristicResult.PASS, expected_detail_info)

actual_result = analyzer.analyze(pypi_package_json)

Expand Down

0 comments on commit fd17eaa

Please sign in to comment.