Skip to content

Commit

Permalink
feat: add in new metadata-based heuristic to pypi malware analyzer (#944
Browse files Browse the repository at this point in the history
)

new heuristic with a dependency on a single-release to investigate the version number and determine if it is anomalous, defined as the major and/or epoch being above a threshold value.
  • Loading branch information
art1f1c3R authored Jan 14, 2025
1 parent beae6d0 commit 65f9325
Show file tree
Hide file tree
Showing 5 changed files with 640 additions and 3 deletions.
10 changes: 9 additions & 1 deletion src/macaron/config/defaults.ini
Original file line number Diff line number Diff line change
Expand Up @@ -584,5 +584,13 @@ include = *
[heuristic.pypi]
releases_frequency_threshold = 2
# The gap threshold.
# The timedelta indicate the gap between the date maintainer registers their pypi's account and the date of latest release.
# The timedelta represents the gap between when the date maintainer registers their pypi account, and the
# date of the latest release.
timedelta_threshold_of_join_release = 5

# Any major version above this value is detected as anomalous and marked as suspicious.
major_threshold = 20
# Any epoch number avove this value is detected as anomalous and marked as suspicious.
epoch_threshold = 3
# The number of days +/- the day of publish the calendar versioning day may be.
day_publish_error = 4
7 changes: 5 additions & 2 deletions src/macaron/malware_analyzer/pypi_heuristics/heuristics.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2024 - 2024, Oracle and/or its affiliates. All rights reserved.
# Copyright (c) 2024 - 2025, Oracle and/or its affiliates. All rights reserved.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.

"""Define the heuristic enum."""
Expand Down Expand Up @@ -31,9 +31,12 @@ class Heuristics(str, Enum):
#: Indicates that the setup.py file contains suspicious imports, such as base64 and requests.
SUSPICIOUS_SETUP = "suspicious_setup"

#: Indicates that the package does not include a .whl file
#: Indicates that the package does not include a .whl file.
WHEEL_ABSENCE = "wheel_absence"

#: Indicates that the package has an unusually large version number for a single release.
ANOMALOUS_VERSION = "anomalous_version"


class HeuristicResult(str, Enum):
"""Result type indicating the outcome of a heuristic."""
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,257 @@
# Copyright (c) 2024 - 2025, Oracle and/or its affiliates. All rights reserved.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.

"""The heuristic analyzer to check for an anomalous package version."""

import logging
from enum import Enum

from packaging.version import InvalidVersion, parse

from macaron.config.defaults import defaults
from macaron.errors import HeuristicAnalyzerValueError
from macaron.json_tools import JsonType, json_extract
from macaron.malware_analyzer.datetime_parser import parse_datetime
from macaron.malware_analyzer.pypi_heuristics.base_analyzer import BaseHeuristicAnalyzer
from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult, Heuristics
from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset

logger: logging.Logger = logging.getLogger(__name__)


class AnomalousVersionAnalyzer(BaseHeuristicAnalyzer):
"""
Analyze the version number (if there is only a single release) to detect if it is anomalous.
A version number is anomalous if any of its values are greater than the epoch, major, or minor threshold values.
If the version does not adhere to PyPI standards (PEP 440, as per the 'packaging' module), this heuristic
cannot analyze it.
Calendar versioning is detected as version numbers with the year, month and day present in the following combinations:
(using the example 11th October 2016)
- YYYY.MM.DD, e.g. 2016.10.11
- YYYY.DD.MM, e.g. 2016.11.10
- YY.DD.MM, e.g. 16.11.10
- YY.MM.DD, e.g. 16.10.11
- MM.DD.YYYY, e.g. 10.11.2016
- DD.MM.YYYY, e.g. 11.10.2016
- DD.MM.YY, e.g. 11.10.16
- MM.DD.YY, e.g. 10.11.16
- YYYYMMDD, e.g. 20161011
- YYYYDDMM, e.g. 20161110
- YYDDMM, e.g. 161110
- YYMMDD, e.g. 161011
- MMDDYYYY, e.g. 10112016
- DDMMYYYY, e.g. 11102016
- DDMMYY, e.g. 111016
- MMDDYY, e.g. 101116
This may be followed by further versioning (e.g. 2016.10.11.5.6.2). This type of versioning is detected based on the
date of the upload time for the release within a threshold of a number of days (in the defaults file).
Calendar-semantic versioning is detected as version numbers with the major value as the year (either yyyy or yy),
and any other series of numbers following it:
- 2016.7.1 woud be version 7.1 of 2016
- 16.1.4 would be version 1.4 of 2016
This type of versioning is detected based on the exact year of the upload time for the release.
All other versionings are detected as semantic versioning.
"""

DETAIL_INFO_KEY: str = "versioning"
DIGIT_DATE_FORMATS: list[str] = ["%Y%m%d", "%Y%d%m", "%d%m%Y", "%m%d%Y", "%y%m%d", "%y%d%m", "%d%m%y", "%m%d%y"]

def __init__(self) -> None:
super().__init__(
name="anomalous_version_analyzer",
heuristic=Heuristics.ANOMALOUS_VERSION,
depends_on=[(Heuristics.ONE_RELEASE, HeuristicResult.FAIL)],
)
self.major_threshold, self.epoch_threshold, self.day_publish_error = self._load_defaults()

def _load_defaults(self) -> tuple[int, int, int]:
"""Load default settings from defaults.ini.
Returns
-------
tuple[int, int, int]:
The Major threshold, Epoch threshold, and Day published error.
"""
section_name = "heuristic.pypi"
if defaults.has_section(section_name):
section = defaults[section_name]
return (
section.getint("major_threshold"),
section.getint("epoch_threshold"),
section.getint("day_publish_error"),
)
return 20, 3, 4

def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicResult, dict[str, JsonType]]:
"""Analyze the package.
Parameters
----------
pypi_package_json: PyPIPackageJsonAsset
The PyPI package JSON asset object.
Returns
-------
tuple[HeuristicResult, dict[str, JsonType]]:
The result and related information collected during the analysis.
Raises
------
HeuristicAnalyzerValueError
if there is no release information available.
"""
releases = pypi_package_json.get_releases()
if releases is None: # no release information
error_msg = "There is no information for any release of this package."
logger.debug(error_msg)
raise HeuristicAnalyzerValueError(error_msg)

if len(releases) != 1:
error_msg = (
"This heuristic depends on a single release, but somehow there are multiple when the one release"
+ " heuristic failed."
)
logger.debug(error_msg)
raise HeuristicAnalyzerValueError(error_msg)

# Since there is only one release, the latest version should be that release
release = pypi_package_json.get_latest_version()
if release is None:
error_msg = "No latest version information available"
logger.debug(error_msg)
raise HeuristicAnalyzerValueError(error_msg)

try:
release_metadata = releases[release]
except KeyError as release_error:
error_msg = "The latest release is not available in the list of releases"
logger.debug(error_msg)
raise HeuristicAnalyzerValueError(error_msg) from release_error

try:
version = parse(release)
except InvalidVersion:
return HeuristicResult.SKIP, {self.DETAIL_INFO_KEY: Versioning.INVALID.value}

years = []
months = []
publish_days = []

for distribution in release_metadata:
upload_time = json_extract(distribution, ["upload_time"], str)
if upload_time is None:
error_msg = "Missing upload time from release information"
logger.debug(error_msg)
raise HeuristicAnalyzerValueError(error_msg)

parsed_time = parse_datetime(upload_time)
if parsed_time is None:
error_msg = "Upload time is not of the expected PyPI format"
logger.debug(error_msg)
raise HeuristicAnalyzerValueError(error_msg)

years.append(parsed_time.year)
years.append(parsed_time.year % 100) # last 2 digits
months.append(parsed_time.month)
publish_days.append(parsed_time.day)

days = list(range(min(publish_days) - self.day_publish_error, max(publish_days) + self.day_publish_error + 1))

calendar = False
calendar_semantic = False

# check for year YY[YY]...
if version.major in years:
# calendar versioning: YY[YY].(M[M].D[D])(D[D].M[M])...
if (version.minor in months and version.micro in days) or (
version.minor in days and version.micro in months
):
calendar = True
else:
calendar_semantic = True
# check for calendar versioning: M[M].D[D].YY[YY]... or D[D].M[M].YY[YY]... or the whole digit rerpesenting a datetime
elif (
((version.major in months and version.minor in days) or (version.major in days and version.minor in months))
and version.micro in years
) or self._integer_date(version.major, years, months, days):
# must include day and year for this to be calendar
calendar = True

if calendar: # just check epoch
detail_info: dict[str, JsonType] = {self.DETAIL_INFO_KEY: Versioning.CALENDAR.value}
if version.epoch > self.epoch_threshold:
return HeuristicResult.FAIL, detail_info

return HeuristicResult.PASS, detail_info

if calendar_semantic: # check minor (as major) and epoch
detail_info = {self.DETAIL_INFO_KEY: Versioning.CALENDAR_SEMANTIC.value}

if version.epoch > self.epoch_threshold:
return HeuristicResult.FAIL, detail_info
if version.minor > self.major_threshold:
return HeuristicResult.FAIL, detail_info

return HeuristicResult.PASS, detail_info

# semantic versioning
detail_info = {self.DETAIL_INFO_KEY: Versioning.SEMANTIC.value}

if version.epoch > self.epoch_threshold:
return HeuristicResult.FAIL, detail_info
if version.major > self.major_threshold:
return HeuristicResult.FAIL, detail_info

return HeuristicResult.PASS, detail_info

def _integer_date(self, value: int, years: list[int], months: list[int], days: list[int]) -> bool:
"""Check whether the provided integer represents a date.
Valid representations are:
- YYYYMMDD
- YYYYDDMM
- YYDDMM
- YYMMDD
- MMDDYYYY
- DDMMYYYY
- DDMMYY
- MMDDYY
Parameters
----------
value: int
The integer to check.
years: list[int]
A list of integers representing valid years for components of value to represent.
months: list[int]
A list of integers representing valid months for components of value to represent.
days: list[int]
A list of integers representing valid days for components of value to represent.
Returns
-------
bool:
True if the integer may represent a date present in the list of valid years, months and days.
False otherwise.
"""
for date_format in self.DIGIT_DATE_FORMATS:
if (date := parse_datetime(str(value), date_format)) is None:
continue

if date.year in years and date.month in months and date.day in days:
return True

return False


class Versioning(Enum):
"""Enum used to assign different versioning methods."""

INVALID = "invalid"
CALENDAR = "calendar"
CALENDAR_SEMANTIC = "calendar_semantic"
SEMANTIC = "semantic"
Loading

0 comments on commit 65f9325

Please sign in to comment.