diff --git a/src/macaron/__main__.py b/src/macaron/__main__.py index 422227dea..3e541d633 100644 --- a/src/macaron/__main__.py +++ b/src/macaron/__main__.py @@ -19,6 +19,7 @@ from macaron.errors import ConfigurationError from macaron.output_reporter.reporter import HTMLReporter, JSONReporter, PolicyReporter from macaron.policy_engine.policy_engine import run_policy_engine, show_prelude +from macaron.repo_finder import repo_finder from macaron.slsa_analyzer.analyzer import Analyzer from macaron.slsa_analyzer.git_service import GIT_SERVICES from macaron.slsa_analyzer.package_registry import PACKAGE_REGISTRIES @@ -212,6 +213,14 @@ def verify_policy(verify_policy_args: argparse.Namespace) -> int: return os.EX_USAGE +def find_source(find_args: argparse.Namespace) -> int: + """Perform repo and commit finding for a passed PURL, or commit finding for a passed PURL and repo.""" + if repo_finder.find_source(find_args.package_url, find_args.repo_path or None): + return os.EX_OK + + return os.EX_DATAERR + + def perform_action(action_args: argparse.Namespace) -> None: """Perform the indicated action of Macaron.""" match action_args.action: @@ -239,6 +248,17 @@ def perform_action(action_args: argparse.Namespace) -> None: sys.exit(os.EX_USAGE) analyze_slsa_levels_single(action_args) + + case "find-source": + try: + for git_service in GIT_SERVICES: + git_service.load_defaults() + except ConfigurationError as error: + logger.error(error) + sys.exit(os.EX_USAGE) + + find_source(action_args) + case _: logger.error("Macaron does not support command option %s.", action_args.action) sys.exit(os.EX_USAGE) @@ -444,6 +464,28 @@ def main(argv: list[str] | None = None) -> None: vp_group.add_argument("-f", "--file", type=str, help="Path to the Datalog policy.") vp_group.add_argument("-s", "--show-prelude", action="store_true", help="Show policy prelude.") + # Find the repo and commit of a passed PURL, or the commit of a passed PURL and repo. + find_parser = sub_parser.add_parser(name="find-source") + + find_parser.add_argument( + "-purl", + "--package-url", + required=True, + type=str, + help=("The PURL string to perform repository and commit finding for."), + ) + + find_parser.add_argument( + "-rp", + "--repo-path", + required=False, + type=str, + help=( + "The path to a repository that matches the provided PURL, can be local or remote. " + "This argument is only required in cases where the repository cannot be discovered automatically." + ), + ) + args = main_parser.parse_args(argv) if not args.action: diff --git a/src/macaron/config/defaults.ini b/src/macaron/config/defaults.ini index dea07dd6e..8d7b2b1cd 100644 --- a/src/macaron/config/defaults.ini +++ b/src/macaron/config/defaults.ini @@ -59,6 +59,7 @@ use_open_source_insights = True redirect_urls = gitbox.apache.org git-wip-us.apache.org +find_source_should_clone = False [repofinder.java] # The list of maven-like repositories to attempt to retrieve artifact POMs from. diff --git a/src/macaron/repo_finder/__init__.py b/src/macaron/repo_finder/__init__.py index c406a64cc..dfccaa6a9 100644 --- a/src/macaron/repo_finder/__init__.py +++ b/src/macaron/repo_finder/__init__.py @@ -1,4 +1,26 @@ -# Copyright (c) 2023 - 2023, Oracle and/or its affiliates. All rights reserved. +# Copyright (c) 2023 - 2024, Oracle and/or its affiliates. All rights reserved. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. """This package contains the dependency resolvers for Java projects.""" + + +def to_domain_from_known_purl_types(purl_type: str) -> str | None: + """Return the git service domain from a known web-based purl type. + + This method is used to handle cases where the purl type value is not the git domain but a pre-defined + repo-based type in https://github.com/package-url/purl-spec/blob/master/PURL-TYPES.rst. + + Note that this method will be updated when there are new pre-defined types as per the PURL specification. + + Parameters + ---------- + purl_type : str + The type field of the PURL. + + Returns + ------- + str | None + The git service domain corresponding to the purl type or None if the purl type is unknown. + """ + known_types = {"github": "github.com", "bitbucket": "bitbucket.org"} + return known_types.get(purl_type, None) diff --git a/src/macaron/repo_finder/commit_finder.py b/src/macaron/repo_finder/commit_finder.py index 7851b481b..a637c2aaf 100644 --- a/src/macaron/repo_finder/commit_finder.py +++ b/src/macaron/repo_finder/commit_finder.py @@ -12,8 +12,7 @@ from packageurl import PackageURL from pydriller import Commit, Git -from macaron.repo_finder import repo_finder_deps_dev -from macaron.repo_finder.repo_finder import to_domain_from_known_purl_types +from macaron.repo_finder import repo_finder_deps_dev, to_domain_from_known_purl_types from macaron.slsa_analyzer.git_service import GIT_SERVICES logger: logging.Logger = logging.getLogger(__name__) diff --git a/src/macaron/repo_finder/provenance_extractor.py b/src/macaron/repo_finder/provenance_extractor.py index 5c3307c58..42a8819d0 100644 --- a/src/macaron/repo_finder/provenance_extractor.py +++ b/src/macaron/repo_finder/provenance_extractor.py @@ -10,12 +10,12 @@ from macaron.errors import ProvenanceError from macaron.json_tools import JsonType, json_extract +from macaron.repo_finder import to_domain_from_known_purl_types from macaron.repo_finder.commit_finder import ( AbstractPurlType, determine_abstract_purl_type, extract_commit_from_version, ) -from macaron.repo_finder.repo_finder import to_domain_from_known_purl_types from macaron.slsa_analyzer.provenance.intoto import InTotoPayload, InTotoV1Payload, InTotoV01Payload logger: logging.Logger = logging.getLogger(__name__) diff --git a/src/macaron/repo_finder/repo_finder.py b/src/macaron/repo_finder/repo_finder.py index 5bd983bb7..1a58fc575 100644 --- a/src/macaron/repo_finder/repo_finder.py +++ b/src/macaron/repo_finder/repo_finder.py @@ -36,12 +36,18 @@ import os from urllib.parse import ParseResult, urlunparse +import git from packageurl import PackageURL from macaron.config.defaults import defaults +from macaron.config.global_config import global_config +from macaron.repo_finder import to_domain_from_known_purl_types +from macaron.repo_finder.commit_finder import match_tags from macaron.repo_finder.repo_finder_base import BaseRepoFinder from macaron.repo_finder.repo_finder_deps_dev import DepsDevRepoFinder from macaron.repo_finder.repo_finder_java import JavaRepoFinder +from macaron.repo_finder.repo_utils import generate_report, prepare_repo +from macaron.slsa_analyzer.git_url import GIT_REPOS_DIR logger: logging.Logger = logging.getLogger(__name__) @@ -78,28 +84,6 @@ def find_repo(purl: PackageURL) -> str: return repo_finder.find_repo(purl) -def to_domain_from_known_purl_types(purl_type: str) -> str | None: - """Return the git service domain from a known web-based purl type. - - This method is used to handle cases where the purl type value is not the git domain but a pre-defined - repo-based type in https://github.com/package-url/purl-spec/blob/master/PURL-TYPES.rst. - - Note that this method will be updated when there are new pre-defined types as per the PURL specification. - - Parameters - ---------- - purl_type : str - The type field of the PURL. - - Returns - ------- - str | None - The git service domain corresponding to the purl type or None if the purl type is unknown. - """ - known_types = {"github": "github.com", "bitbucket": "bitbucket.org"} - return known_types.get(purl_type, None) - - def to_repo_path(purl: PackageURL, available_domains: list[str]) -> str | None: """Return the repository path from the PURL string. @@ -148,3 +132,134 @@ def to_repo_path(purl: PackageURL, available_domains: list[str]) -> str | None: fragment="", ) ) + + +def find_source(purl_string: str, input_repo: str | None) -> bool: + """Perform repo and commit finding for a passed PURL, or commit finding for a passed PURL and repo. + + Parameters + ---------- + purl_string: str + The PURL string of the target. + input_repo: str | None + The repository path optionally provided by the user. + + Returns + ------- + bool + True if the source was found. + """ + try: + purl = PackageURL.from_string(purl_string) + except ValueError as error: + logger.error("Could not parse PURL: %s", error) + return False + + if not purl.version: + logger.debug("PURL is missing version.") + return False + + found_repo = input_repo + if not input_repo: + logger.debug("Searching for repo of PURL: %s", purl) + found_repo = find_repo(purl) + + if not found_repo: + logger.error("Could not find repo for PURL: %s", purl) + return False + + # Disable other loggers for cleaner output. + logging.getLogger("macaron.slsa_analyzer.analyzer").disabled = True + logging.getLogger("macaron.slsa_analyzer.git_url").disabled = True + + if defaults.getboolean("repofinder", "find_source_should_clone"): + logger.debug("Preparing repo: %s", found_repo) + repo_dir = os.path.join(global_config.output_path, GIT_REPOS_DIR) + git_obj = prepare_repo( + repo_dir, + found_repo, + purl=purl, + ) + + if not git_obj: + # TODO expand this message to cover cases where the obj was not created due to lack of correct tag. + logger.error("Could not resolve repository: %s", found_repo) + return False + + try: + digest = git_obj.get_head().hash + except ValueError: + logger.debug("Could not retrieve commit hash from repository.") + return False + else: + # Retrieve the tags. + tags = get_tags_via_git_remote(found_repo) + if not tags: + return False + + matches = match_tags(list(tags.keys()), purl.name, purl.version) + + if not matches: + return False + + matched_tag = matches[0] + digest = tags[matched_tag] + + if not digest: + logger.error("Could not find commit for purl / repository: %s / %s", purl, found_repo) + return False + + if not input_repo: + logger.info("Found repository for PURL: %s", found_repo) + + logger.info("Found commit for PURL: %s", digest) + + if not generate_report(purl_string, digest, found_repo, os.path.join(global_config.output_path, "reports")): + return False + + return True + + +def get_tags_via_git_remote(repo: str) -> dict[str, str] | None: + """Retrieve all tags from a given repository using ls-remote. + + Parameters + ---------- + repo: str + The repository to perform the operation on. + + Returns + ------- + dict[str] + A dictionary of tags mapped to their commits, or None if the operation failed.. + """ + tags = {} + try: + tag_data = git.cmd.Git().ls_remote("--tags", repo) + except git.exc.GitCommandError as error: + logger.debug("Failed to retrieve tags: %s", error) + return None + + for tag_line in tag_data.splitlines(): + tag_line = tag_line.strip() + if not tag_line: + continue + split = tag_line.split("\t") + if len(split) != 2: + continue + possible_tag = split[1] + if possible_tag.endswith("^{}"): + possible_tag = possible_tag[:-3] + elif possible_tag in tags: + # If a tag already exists, it must be the annotated reference of an annotated tag. + # In that case we skip the tag as it does not point to the proper source commit. + # Note that this should only happen if the tags are received out of standard order. + continue + possible_tag = possible_tag.replace("refs/tags/", "") + if not possible_tag: + continue + tags[possible_tag] = split[0] + + logger.debug("Found %s tags via ls-remote of %s", len(tags), repo) + + return tags diff --git a/src/macaron/repo_finder/repo_finder_java.py b/src/macaron/repo_finder/repo_finder_java.py index 148c03e1b..77e1705f8 100644 --- a/src/macaron/repo_finder/repo_finder_java.py +++ b/src/macaron/repo_finder/repo_finder_java.py @@ -50,7 +50,7 @@ def find_repo(self, purl: PackageURL) -> str: limit = defaults.getint("repofinder.java", "parent_limit", fallback=10) if not version: - logger.debug("Version missing for maven artifact: %s:%s", group, artifact) + logger.info("Version missing for maven artifact: %s:%s", group, artifact) # TODO add support for Java artifacts without a version return "" diff --git a/src/macaron/repo_finder/repo_utils.py b/src/macaron/repo_finder/repo_utils.py new file mode 100644 index 000000000..c3dffc8c5 --- /dev/null +++ b/src/macaron/repo_finder/repo_utils.py @@ -0,0 +1,280 @@ +# Copyright (c) 2024 - 2024, Oracle and/or its affiliates. All rights reserved. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. + +"""This module contains the utility functions for repo and commit finder operations.""" +import json +import logging +import os +import string +from urllib.parse import urlparse + +from git import InvalidGitRepositoryError +from packageurl import PackageURL +from pydriller import Git + +from macaron.config.global_config import global_config +from macaron.errors import CloneError, RepoCheckOutError +from macaron.repo_finder.commit_finder import find_commit +from macaron.slsa_analyzer.git_service import GIT_SERVICES, BaseGitService +from macaron.slsa_analyzer.git_service.base_git_service import NoneGitService +from macaron.slsa_analyzer.git_url import ( + GIT_REPOS_DIR, + check_out_repo_target, + get_remote_origin_of_local_repo, + get_remote_vcs_url, + get_repo_dir_name, + is_empty_repo, + is_remote_repo, + resolve_local_path, +) + +logger: logging.Logger = logging.getLogger(__name__) + + +def create_filename(purl: PackageURL) -> str: + """Create the filename of the report based on the PURL. + + Parameters + ---------- + purl: PackageURL + The PackageURL of the artifact. + + Returns + ------- + str + The filename to save the report under. + """ + + def convert_to_path(text: str) -> str: + """Convert a PackageURL component to a path safe form.""" + allowed_chars = string.ascii_letters + string.digits + "-" + return "".join(c if c in allowed_chars else "_" for c in text) + + filename = f"{convert_to_path(purl.type)}" + if purl.namespace: + filename = filename + f"/{convert_to_path(purl.namespace)}" + filename = filename + f"/{convert_to_path(purl.name)}/{convert_to_path(purl.name)}.source.json" + return filename + + +def generate_report(purl: str, commit: str, repo: str, target_dir: str) -> bool: + """Create the report and save it to the passed directory. + + Parameters + ---------- + purl: str + The PackageURL of the target artifact, as a string. + commit: str + The commit hash to report. + repo: str + The repository to report. + target_dir: str + The path of the directory where the report will be saved. + + Returns + ------- + bool + True if the report was created. False otherwise. + """ + try: + purl_object = PackageURL.from_string(purl) + except ValueError as error: + logger.debug("Failed to parse purl string as PURL: %s", error) + return False + + report_json = create_report(purl, commit, repo) + + filename = create_filename(purl_object) + fullpath = f"{target_dir}/{filename}" + + os.makedirs(os.path.dirname(fullpath), exist_ok=True) + logger.info("Writing report to: %s", fullpath) + + try: + with open(fullpath, "w", encoding="utf-8") as file: + file.write(report_json) + except OSError as error: + logger.debug("Failed to write report to file: %s", error) + return False + + logger.info("Report written to: %s", fullpath) + + return True + + +def create_report(purl: str, commit: str, repo: str) -> str: + """Generate report for standalone uses of the repo / commit finder. + + Parameters + ---------- + purl: str + The PackageURL of the target artifact, as a string. + commit: str + The commit hash to report. + repo: str + The repository to report. + + Returns + ------- + str + The report as a JSON string. + """ + data = {"purl": purl, "commit": commit, "repo": repo, "repo_validated": False, "commit_validated": False, "url": ""} + if urlparse(repo).hostname == "github.com": + data["url"] = f"{repo}/commit/{commit}" + return json.dumps(data, indent=4) + + +def prepare_repo( + target_dir: str, + repo_path: str, + branch_name: str = "", + digest: str = "", + purl: PackageURL | None = None, +) -> Git | None: + """Prepare the target repository for analysis. + + If ``repo_path`` is a remote path, the target repo is cloned to ``{target_dir}/{unique_path}``. + The ``unique_path`` of a repository will depend on its remote url. + For example, if given the ``repo_path`` https://github.com/org/name.git, it will + be cloned to ``{target_dir}/github_com/org/name``. + + If ``repo_path`` is a local path, this method will check if ``repo_path`` resolves to a directory inside + ``local_repos_path`` and to a valid git repository. + + Parameters + ---------- + target_dir : str + The directory where all remote repository will be cloned. + repo_path : str + The path to the repository, can be either local or remote. + branch_name : str + The name of the branch we want to checkout. + digest : str + The hash of the commit that we want to checkout in the branch. + purl : PackageURL | None + The PURL of the analysis target. + + Returns + ------- + Git | None + The pydriller.Git object of the repository or None if error. + """ + # TODO: separate the logic for handling remote and local repos instead of putting them into this method. + logger.info( + "Preparing the repository for the analysis (path=%s, branch=%s, digest=%s)", + repo_path, + branch_name, + digest, + ) + + resolved_local_path = "" + is_remote = is_remote_repo(repo_path) + + if is_remote: + logger.info("The path to repo %s is a remote path.", repo_path) + resolved_remote_path = get_remote_vcs_url(repo_path) + if not resolved_remote_path: + logger.error("The provided path to repo %s is not a valid remote path.", repo_path) + return None + + git_service = get_git_service(resolved_remote_path) + repo_unique_path = get_repo_dir_name(resolved_remote_path) + resolved_local_path = os.path.join(target_dir, repo_unique_path) + logger.info("Cloning the repository.") + try: + git_service.clone_repo(resolved_local_path, resolved_remote_path) + except CloneError as error: + logger.error("Cannot clone %s: %s", resolved_remote_path, str(error)) + return None + else: + logger.info("Checking if the path to repo %s is a local path.", repo_path) + resolved_local_path = resolve_local_path(get_local_repos_path(), repo_path) + + if resolved_local_path: + try: + git_obj = Git(resolved_local_path) + except InvalidGitRepositoryError: + logger.error("No git repo exists at %s.", resolved_local_path) + return None + else: + logger.error("Error happened while preparing the repo.") + return None + + if is_empty_repo(git_obj): + logger.error("The target repository does not have any commit.") + return None + + # Find the digest and branch if a version has been specified + if not digest and purl and purl.version: + found_digest = find_commit(git_obj, purl) + if not found_digest: + logger.error("Could not map the input purl string to a specific commit in the corresponding repository.") + return None + digest = found_digest + + # Checking out the specific branch or commit. This operation varies depends on the git service that the + # repository uses. + if not is_remote: + # If the repo path provided by the user is a local path, we need to get the actual origin remote URL of + # the repo to decide on the suitable git service. + origin_remote_url = get_remote_origin_of_local_repo(git_obj) + if is_remote_repo(origin_remote_url): + # The local repo's origin remote url is a remote URL (e.g https://host.com/a/b): In this case, we obtain + # the corresponding git service using ``self.get_git_service``. + git_service = get_git_service(origin_remote_url) + else: + # The local repo's origin remote url is a local path (e.g /path/to/local/...). This happens when the + # target repository is a clone from another local repo or is a clone from a git archive - + # https://git-scm.com/docs/git-archive: In this case, we fall-back to the generic function + # ``git_url.check_out_repo_target``. + if not check_out_repo_target(git_obj, branch_name, digest, not is_remote): + logger.error("Cannot checkout the specific branch or commit of the target repo.") + return None + + return git_obj + + try: + git_service.check_out_repo(git_obj, branch_name, digest, not is_remote) + except RepoCheckOutError as error: + logger.error("Failed to check out repository at %s", resolved_local_path) + logger.error(error) + return None + + return git_obj + + +def get_local_repos_path() -> str: + """Get the local repos path from global config or use default. + + If the directory does not exist, it is created. + """ + local_repos_path = ( + global_config.local_repos_path + if global_config.local_repos_path + else os.path.join(global_config.output_path, GIT_REPOS_DIR, "local_repos") + ) + if not os.path.exists(local_repos_path): + os.makedirs(local_repos_path, exist_ok=True) + return local_repos_path + + +def get_git_service(remote_path: str | None) -> BaseGitService: + """Return the git service used from the remote path. + + Parameters + ---------- + remote_path : str | None + The remote path of the repo. + + Returns + ------- + BaseGitService + The git service derived from the remote path. + """ + if remote_path: + for git_service in GIT_SERVICES: + if git_service.is_detected(remote_path): + return git_service + + return NoneGitService() diff --git a/src/macaron/slsa_analyzer/analyzer.py b/src/macaron/slsa_analyzer/analyzer.py index 5c2b29368..6f809894a 100644 --- a/src/macaron/slsa_analyzer/analyzer.py +++ b/src/macaron/slsa_analyzer/analyzer.py @@ -11,7 +11,6 @@ from typing import Any, NamedTuple import sqlalchemy.exc -from git import InvalidGitRepositoryError from packageurl import PackageURL from pydriller.git import Git from sqlalchemy.orm import Session @@ -24,24 +23,22 @@ from macaron.database.table_definitions import Analysis, Component, ProvenanceSubject, Repository from macaron.dependency_analyzer.cyclonedx import DependencyAnalyzer, DependencyInfo from macaron.errors import ( - CloneError, DuplicateError, InvalidAnalysisTargetError, InvalidPURLError, ProvenanceError, PURLNotFoundError, - RepoCheckOutError, ) from macaron.output_reporter.reporter import FileReporter from macaron.output_reporter.results import Record, Report, SCMStatus from macaron.repo_finder import repo_finder -from macaron.repo_finder.commit_finder import find_commit from macaron.repo_finder.provenance_extractor import ( check_if_input_purl_provenance_conflict, check_if_input_repo_provenance_conflict, extract_repo_and_commit_from_provenance, ) from macaron.repo_finder.provenance_finder import ProvenanceFinder, find_provenance_from_ci +from macaron.repo_finder.repo_utils import get_git_service, prepare_repo from macaron.repo_verifier.repo_verifier import verify_repo from macaron.slsa_analyzer import git_url from macaron.slsa_analyzer.analyze_context import AnalyzeContext @@ -54,6 +51,7 @@ from macaron.slsa_analyzer.database_store import store_analyze_context_to_db from macaron.slsa_analyzer.git_service import GIT_SERVICES, BaseGitService from macaron.slsa_analyzer.git_service.base_git_service import NoneGitService +from macaron.slsa_analyzer.git_url import GIT_REPOS_DIR from macaron.slsa_analyzer.package_registry import PACKAGE_REGISTRIES from macaron.slsa_analyzer.provenance.expectations.expectation_registry import ExpectationRegistry from macaron.slsa_analyzer.provenance.intoto import InTotoPayload, InTotoV01Payload @@ -69,9 +67,6 @@ class Analyzer: """This class is used to analyze SLSA levels of a Git repo.""" - GIT_REPOS_DIR = "git_repos" - """The directory in the output dir to store all cloned repositories.""" - def __init__(self, output_path: str, build_log_path: str) -> None: """Initialize instance. @@ -104,17 +99,6 @@ def __init__(self, output_path: str, build_log_path: str) -> None: if not os.path.isdir(self.build_log_path): os.makedirs(self.build_log_path) - # If provided with local_repos_path, we resolve the path of the target repo - # to the path within local_repos_path. - # If not, we use the default value /git_repos/local_repos. - self.local_repos_path = ( - global_config.local_repos_path - if global_config.local_repos_path - else os.path.join(global_config.output_path, Analyzer.GIT_REPOS_DIR, "local_repos") - ) - if not os.path.exists(self.local_repos_path): - os.makedirs(self.local_repos_path, exist_ok=True) - # Load the expectations from global config. self.expectations = ExpectationRegistry(global_config.expectation_paths) @@ -384,8 +368,8 @@ def run_single( # Prepare the repo. git_obj = None if analysis_target.repo_path: - git_obj = self._prepare_repo( - os.path.join(self.output_path, self.GIT_REPOS_DIR), + git_obj = prepare_repo( + os.path.join(self.output_path, GIT_REPOS_DIR), analysis_target.repo_path, analysis_target.branch, analysis_target.digest, @@ -859,186 +843,10 @@ def get_analyze_ctx(self, component: Component) -> AnalyzeContext: return analyze_ctx - def _prepare_repo( - self, - target_dir: str, - repo_path: str, - branch_name: str = "", - digest: str = "", - purl: PackageURL | None = None, - ) -> Git | None: - """Prepare the target repository for analysis. - - If ``repo_path`` is a remote path, the target repo is cloned to ``{target_dir}/{unique_path}``. - The ``unique_path`` of a repository will depend on its remote url. - For example, if given the ``repo_path`` https://github.com/org/name.git, it will - be cloned to ``{target_dir}/github_com/org/name``. - - If ``repo_path`` is a local path, this method will check if ``repo_path`` resolves to a directory inside - ``Analyzer.local_repos_path`` and to a valid git repository. - - Parameters - ---------- - target_dir : str - The directory where all remote repository will be cloned. - repo_path : str - The path to the repository, can be either local or remote. - branch_name : str - The name of the branch we want to checkout. - digest : str - The hash of the commit that we want to checkout in the branch. - purl : PackageURL | None - The PURL of the analysis target. - - Returns - ------- - Git | None - The pydriller.Git object of the repository or None if error. - """ - # TODO: separate the logic for handling remote and local repos instead of putting them into this method. - logger.info( - "Preparing the repository for the analysis (path=%s, branch=%s, digest=%s)", - repo_path, - branch_name, - digest, - ) - - resolved_local_path = "" - is_remote = git_url.is_remote_repo(repo_path) - - if is_remote: - logger.info("The path to repo %s is a remote path.", repo_path) - resolved_remote_path = git_url.get_remote_vcs_url(repo_path) - if not resolved_remote_path: - logger.error("The provided path to repo %s is not a valid remote path.", repo_path) - return None - - git_service = self.get_git_service(resolved_remote_path) - repo_unique_path = git_url.get_repo_dir_name(resolved_remote_path) - resolved_local_path = os.path.join(target_dir, repo_unique_path) - logger.info("Cloning the repository.") - try: - git_service.clone_repo(resolved_local_path, resolved_remote_path) - except CloneError as error: - logger.error("Cannot clone %s: %s", resolved_remote_path, str(error)) - return None - else: - logger.info("Checking if the path to repo %s is a local path.", repo_path) - resolved_local_path = self._resolve_local_path(self.local_repos_path, repo_path) - - if resolved_local_path: - try: - git_obj = Git(resolved_local_path) - except InvalidGitRepositoryError: - logger.error("No git repo exists at %s.", resolved_local_path) - return None - else: - logger.error("Error happened while preparing the repo.") - return None - - if git_url.is_empty_repo(git_obj): - logger.error("The target repository does not have any commit.") - return None - - # Find the digest and branch if a version has been specified - if not digest and purl and purl.version: - found_digest = find_commit(git_obj, purl) - if not found_digest: - logger.error( - "Could not map the input purl string to a specific commit in the corresponding repository." - ) - return None - digest = found_digest - - # Checking out the specific branch or commit. This operation varies depends on the git service that the - # repository uses. - if not is_remote: - # If the repo path provided by the user is a local path, we need to get the actual origin remote URL of - # the repo to decide on the suitable git service. - origin_remote_url = git_url.get_remote_origin_of_local_repo(git_obj) - if git_url.is_remote_repo(origin_remote_url): - # The local repo's origin remote url is a remote URL (e.g https://host.com/a/b): In this case, we obtain - # the corresponding git service using ``self.get_git_service``. - git_service = self.get_git_service(origin_remote_url) - else: - # The local repo's origin remote url is a local path (e.g /path/to/local/...). This happens when the - # target repository is a clone from another local repo or is a clone from a git archive - - # https://git-scm.com/docs/git-archive: In this case, we fall-back to the generic function - # ``git_url.check_out_repo_target``. - if not git_url.check_out_repo_target(git_obj, branch_name, digest, not is_remote): - logger.error("Cannot checkout the specific branch or commit of the target repo.") - return None - - return git_obj - - try: - git_service.check_out_repo(git_obj, branch_name, digest, not is_remote) - except RepoCheckOutError as error: - logger.error("Failed to check out repository at %s", resolved_local_path) - logger.error(error) - return None - - return git_obj - - @staticmethod - def get_git_service(remote_path: str | None) -> BaseGitService: - """Return the git service used from the remote path. - - Parameters - ---------- - remote_path : str | None - The remote path of the repo. - - Returns - ------- - BaseGitService - The git service derived from the remote path. - """ - if remote_path: - for git_service in GIT_SERVICES: - if git_service.is_detected(remote_path): - return git_service - - return NoneGitService() - - @staticmethod - def _resolve_local_path(start_dir: str, local_path: str) -> str: - """Resolve the local path and check if it's within a directory. - - This method returns an empty string if there are errors with resolving ``local_path`` - (e.g. non-existed dir, broken symlinks, etc.) or ``start_dir`` does not exist. - - Parameters - ---------- - start_dir : str - The directory to look for the existence of path. - local_path: str - The local path to resolve within start_dir. - - Returns - ------- - str - The resolved path in canonical form or an empty string if errors. - """ - # Resolve the path by joining dir and path. - # Because strict mode is enabled, if a path doesn't exist or a symlink loop - # is encountered, OSError is raised. - # ValueError is raised if we use both relative and absolute paths in os.path.commonpath. - try: - dir_real = os.path.realpath(start_dir, strict=True) - resolve_path = os.path.realpath(os.path.join(start_dir, local_path), strict=True) - if os.path.commonpath([resolve_path, dir_real]) != dir_real: - return "" - - return resolve_path - except (OSError, ValueError) as error: - logger.error(error) - return "" - def _determine_git_service(self, analyze_ctx: AnalyzeContext) -> BaseGitService: """Determine the Git service used by the software component.""" remote_path = analyze_ctx.component.repository.remote_path if analyze_ctx.component.repository else None - git_service = self.get_git_service(remote_path) + git_service = get_git_service(remote_path) if isinstance(git_service, NoneGitService): logger.info("Unable to find repository or unsupported git service for %s", analyze_ctx.component.purl) diff --git a/src/macaron/slsa_analyzer/git_url.py b/src/macaron/slsa_analyzer/git_url.py index 4b8d96813..e34c113da 100644 --- a/src/macaron/slsa_analyzer/git_url.py +++ b/src/macaron/slsa_analyzer/git_url.py @@ -25,6 +25,10 @@ logger: logging.Logger = logging.getLogger(__name__) +GIT_REPOS_DIR = "git_repos" +"""The directory in the output dir to store all cloned repositories.""" + + def parse_git_branch_output(content: str) -> list[str]: """Return the list of branch names from a string that has a format similar to the output of ``git branch --list``. @@ -372,6 +376,40 @@ def clone_remote_repo(clone_dir: str, url: str) -> Repo | None: return Repo(path=clone_dir) +def resolve_local_path(start_dir: str, local_path: str) -> str: + """Resolve the local path and check if it's within a directory. + + This method returns an empty string if there are errors with resolving ``local_path`` + (e.g. non-existed dir, broken symlinks, etc.) or ``start_dir`` does not exist. + + Parameters + ---------- + start_dir : str + The directory to look for the existence of path. + local_path: str + The local path to resolve within start_dir. + + Returns + ------- + str + The resolved path in canonical form or an empty string if errors. + """ + # Resolve the path by joining dir and path. + # Because strict mode is enabled, if a path doesn't exist or a symlink loop + # is encountered, OSError is raised. + # ValueError is raised if we use both relative and absolute paths in os.path.commonpath. + try: + dir_real = os.path.realpath(start_dir, strict=True) + resolve_path = os.path.realpath(os.path.join(start_dir, local_path), strict=True) + if os.path.commonpath([resolve_path, dir_real]) != dir_real: + return "" + + return resolve_path + except (OSError, ValueError) as error: + logger.error(error) + return "" + + def get_repo_name_from_url(url: str) -> str: """Extract the repo name of the repository from the remote url. diff --git a/tests/integration/cases/find_source_avaje/config.ini b/tests/integration/cases/find_source_avaje/config.ini new file mode 100644 index 000000000..0f4361f51 --- /dev/null +++ b/tests/integration/cases/find_source_avaje/config.ini @@ -0,0 +1,5 @@ +# Copyright (c) 2024 - 2024, Oracle and/or its affiliates. All rights reserved. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. + +[repofinder] +find_source_should_clone = True diff --git a/tests/integration/cases/find_source_avaje/test.yaml b/tests/integration/cases/find_source_avaje/test.yaml new file mode 100644 index 000000000..116171722 --- /dev/null +++ b/tests/integration/cases/find_source_avaje/test.yaml @@ -0,0 +1,39 @@ +# Copyright (c) 2024 - 2024, Oracle and/or its affiliates. All rights reserved. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. + +description: | + Analyzing the find source command on the avaje artifact. + +tags: +- macaron-python-package +- macaron-docker-image + +steps: +- name: Run macaron find source + kind: find-source + options: + command_args: + - -purl + - pkg:maven/io.avaje/avaje-prisms@1.1 +- name: Validate the produced report + kind: validate_schema + options: + kind: json_schema + schema: find_source_json_report + result: output/reports/maven/io_avaje/avaje-prisms/avaje-prisms.source.json +- name: Check that the repository was not cloned + kind: shell + options: + cmd: ls output/git_repos/github_com/avaje/avaje-prisms/ + expect_fail: true +- name: Run macaron find source with cloning enabled + kind: find-source + options: + ini: config.ini + command_args: + - -purl + - pkg:maven/io.avaje/avaje-prisms@1.1 +- name: Check that the repository was cloned + kind: shell + options: + cmd: ls output/git_repos/github_com/avaje/avaje-prisms/ diff --git a/tests/integration/run.py b/tests/integration/run.py index 4ad35d04b..e11b64783 100644 --- a/tests/integration/run.py +++ b/tests/integration/run.py @@ -87,6 +87,7 @@ def configure_logging(verbose: bool) -> None: DEFAULT_SCHEMAS: dict[str, Sequence[str]] = { "output_json_report": ["tests", "schema_validation", "report_schema.json"], + "find_source_json_report": ["tests", "repo_finder", "resources", "find_source_report_schema.json"], } @@ -528,6 +529,55 @@ def cmd(self, macaron_cmd: str) -> list[str]: return args +class FindSourceStepOptions(TypedDict): + """The configuration options of a find source step.""" + + main_args: Sequence[str] + command_args: Sequence[str] + ini: str | None + + +@dataclass +class FindSourceStep(Step[FindSourceStepOptions]): + """A step running the ``macaron find-source`` command.""" + + @staticmethod + def options_schema(cwd: str) -> cfgv.Map: + """Generate the schema of a find-source step.""" + return cfgv.Map( + "find source options", + None, + *[ + cfgv.Optional( + key="main_args", + check_fn=cfgv.check_array(cfgv.check_string), + default=[], + ), + cfgv.Optional( + key="command_args", + check_fn=cfgv.check_array(cfgv.check_string), + default=[], + ), + cfgv.Optional( + key="ini", + check_fn=check_required_file(cwd), + default=None, + ), + ], + ) + + def cmd(self, macaron_cmd: str) -> list[str]: + """Generate the command of the step.""" + args = [macaron_cmd] + args.extend(self.options["main_args"]) + ini_file = self.options.get("ini", None) + if ini_file is not None: + args.extend(["--defaults-path", ini_file]) + args.append("find-source") + args.extend(self.options["command_args"]) + return args + + def gen_step_schema(cwd: str, check_expected_result_files: bool) -> cfgv.Map: """Generate schema for a step.""" return cfgv.Map( @@ -547,6 +597,7 @@ def gen_step_schema(cwd: str, check_expected_result_files: bool) -> cfgv.Map: "analyze", "verify", "validate_schema", + "find-source", ), ), ), @@ -586,6 +637,12 @@ def gen_step_schema(cwd: str, check_expected_result_files: bool) -> cfgv.Map: key="options", schema=VerifyStep.options_schema(cwd=cwd), ), + cfgv.ConditionalRecurse( + condition_key="kind", + condition_value="find-source", + key="options", + schema=FindSourceStep.options_schema(cwd=cwd), + ), cfgv.Optional( key="env", check_fn=check_env, @@ -783,6 +840,7 @@ def parse_step_config(step_id: int, step_config: Mapping) -> Step: "shell": ShellStep, "compare": CompareStep, "validate_schema": ValidateSchemaStep, + "find-source": FindSourceStep, }[kind] return step_cls( # type: ignore # https://github.com/python/mypy/issues/3115 step_id=step_id, @@ -890,13 +948,12 @@ def do_run( macaron_cmd: str, include_tags: list[str], exclude_tags: list[str], - interactive: bool, - dry: bool, + run_options: RunOptions, ) -> int: """Execute the run command.""" test_cases = load_test_cases( test_case_dirs, - check_expected_result_files=not interactive, + check_expected_result_files=not run_options.interactive, include_tags=include_tags, exclude_tags=exclude_tags, ) @@ -914,8 +971,8 @@ def do_run( for test_case in test_cases: case_exit = test_case.run( macaron_cmd=macaron_cmd, - interactive=interactive, - dry=dry, + interactive=run_options.interactive, + dry=run_options.dry, ) if case_exit != 0: # Do not exit here, but let all test cases run and aggregate the result. @@ -1109,13 +1166,13 @@ def main(argv: Sequence[str] | None = None) -> int: macaron_cmd = os.path.abspath(path) if args.command == "run": + run_options = RunOptions(args.interactive, args.dry) return do_run( test_case_dirs=test_case_dirs, macaron_cmd=macaron_cmd, include_tags=args.include_tag, exclude_tags=args.exclude_tag, - interactive=args.interactive, - dry=args.dry, + run_options=run_options, ) if args.command == "update": return do_update( @@ -1128,5 +1185,13 @@ def main(argv: Sequence[str] | None = None) -> int: return 0 +@dataclass +class RunOptions: + """A class that exists to reduce the argument count of the run function.""" + + interactive: bool + dry: bool + + if __name__ == "__main__": raise SystemExit(main()) diff --git a/tests/repo_finder/resources/find_source_report_schema.json b/tests/repo_finder/resources/find_source_report_schema.json new file mode 100644 index 000000000..e36b7fa7f --- /dev/null +++ b/tests/repo_finder/resources/find_source_report_schema.json @@ -0,0 +1,27 @@ +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "$id": "macaron-json-report-schema", + "title": "Macaron JSON Report", + "$comment": "For any details about the schema specification and validation documentation, see https://json-schema.org/draft/2020-12/draft-bhutton-json-schema-00 and https://json-schema.org/draft/2020-12/draft-bhutton-json-schema-validation-00.", + "type": "object", + "properties": { + "purl": { + "type": "string" + }, + "commit": { + "type": "string" + }, + "repo": { + "type": "string" + }, + "repo_validated": { + "type": "boolean" + }, + "commit_validated": { + "type": "boolean" + }, + "url": { + "type": "string" + } + } +} diff --git a/tests/repo_finder/test_report_schema.py b/tests/repo_finder/test_report_schema.py new file mode 100644 index 000000000..5ec1cf9a9 --- /dev/null +++ b/tests/repo_finder/test_report_schema.py @@ -0,0 +1,33 @@ +# Copyright (c) 2024 - 2024, Oracle and/or its affiliates. All rights reserved. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. + +"""This module tests the report schema of the repo finder.""" +import json +from pathlib import Path +from typing import Any + +import jsonschema +import pytest + +from macaron.repo_finder.repo_utils import create_report + + +@pytest.fixture(name="json_schema") +def json_schema_() -> Any: + """Load and return the JSON schema.""" + with open(Path(__file__).parent.joinpath("resources", "find_source_report_schema.json"), encoding="utf-8") as file: + return json.load(file) + + +@pytest.mark.parametrize( + ("purl", "commit", "repo"), [("pkg:pypi/macaron@1.0", "commit_digest", "https://github.com/oracle/macaron")] +) +def test_report(purl: str, commit: str, repo: str, json_schema: Any) -> None: + """Test creation of reports for standalone repo / commit finder.""" + json_report_str = create_report(purl, commit, repo) + json_report = json.loads(json_report_str) + + jsonschema.validate( + schema=json_schema, + instance=json_report, + ) diff --git a/tests/slsa_analyzer/test_analyzer.py b/tests/slsa_analyzer/test_analyzer.py index f4e68f321..d2b754cba 100644 --- a/tests/slsa_analyzer/test_analyzer.py +++ b/tests/slsa_analyzer/test_analyzer.py @@ -3,8 +3,6 @@ """This module tests the slsa_analyzer.Gh module.""" -from pathlib import Path - import hypothesis.provisional as st_pr import hypothesis.strategies as st import pytest @@ -15,35 +13,6 @@ from macaron.errors import InvalidAnalysisTargetError, InvalidPURLError from macaron.slsa_analyzer.analyzer import Analyzer -from ..macaron_testcase import MacaronTestCase - - -class TestAnalyzer(MacaronTestCase): - """ - This class contains all the tests for the Analyzer - """ - - # Using the parent dir of this module as a valid start dir. - PARENT_DIR = str(Path(__file__).parent) - - # pylint: disable=protected-access - def test_resolve_local_path(self) -> None: - """Test the resolve local path method.""" - # Test resolving a path outside of the start_dir - assert not Analyzer._resolve_local_path(self.PARENT_DIR, "../") - assert not Analyzer._resolve_local_path(self.PARENT_DIR, "./../") - assert not Analyzer._resolve_local_path(self.PARENT_DIR, "../../../../../") - - # Test resolving a non-existing dir - assert not Analyzer._resolve_local_path(self.PARENT_DIR, "./this-should-not-exist") - - # Test with invalid start_dir - assert not Analyzer._resolve_local_path("non-existing-dir", "./") - - # Test resolve successfully - assert Analyzer._resolve_local_path(self.PARENT_DIR, "./") == self.PARENT_DIR - assert Analyzer._resolve_local_path(self.PARENT_DIR, "././././") == self.PARENT_DIR - @pytest.mark.parametrize( ("config", "available_domains", "expect"), diff --git a/tests/slsa_analyzer/test_git_url.py b/tests/slsa_analyzer/test_git_url.py index 6b4fd44f2..006a92608 100644 --- a/tests/slsa_analyzer/test_git_url.py +++ b/tests/slsa_analyzer/test_git_url.py @@ -13,6 +13,7 @@ from macaron.config.defaults import defaults, load_defaults from macaron.slsa_analyzer import git_url +from macaron.slsa_analyzer.git_url import resolve_local_path @pytest.mark.parametrize( @@ -313,3 +314,42 @@ def test_clean_url_valid_input(url: str, expected: str) -> None: def test_clean_url_invalid_input(url: str) -> None: """Test that the clean_url function correctly returns None for invalid input.""" assert git_url.clean_url(url) is None + + +@pytest.fixture(name="parent_dir") +def parent_dir_() -> str: + """Return the parent dir.""" + return str(Path(__file__).parent) + + +@pytest.mark.parametrize( + "target", + [ + # Paths outside of parent dir. + "../", + "./../", + "../../../../../", + # Non-existent path. + "./this-should-not-exist", + ], +) +def test_resolve_invalid_local_path(parent_dir: str, target: str) -> None: + """Test the resolve local path method with invalid local paths.""" + assert not resolve_local_path(parent_dir, target) + + +def test_resolve_invalid_parent_path() -> None: + """Test the resolve local path method with an invalid parent directory.""" + assert not resolve_local_path("non-existing-dir", "./") + + +@pytest.mark.parametrize( + "target", + [ + "./", + "././././", + ], +) +def test_resolve_valid_local_path(parent_dir: str, target: str) -> None: + """Test the resolve local path method with valid local paths.""" + assert resolve_local_path(parent_dir, target) == parent_dir