diff --git a/pyproject.toml b/pyproject.toml index e95fdf9..060b69d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,7 +13,7 @@ dependencies = [ 'GitPython', 'packaging', 'python-dotenv', - 'socket-sdk-python>=2.0.7' + 'socket-sdk-python>=2.0.8' ] readme = "README.md" description = "Socket Security CLI for CI/CD" @@ -41,6 +41,7 @@ test = [ ] dev = [ "ruff>=0.3.0", + "twine", # for building "pip-tools>=7.4.0", # for pip-compile ] diff --git a/socketsecurity/__init__.py b/socketsecurity/__init__.py index c2faa62..27b5366 100644 --- a/socketsecurity/__init__.py +++ b/socketsecurity/__init__.py @@ -1,2 +1,2 @@ __author__ = 'socket.dev' -__version__ = '2.0.7' +__version__ = '2.0.8' diff --git a/socketsecurity/core/__init__.py b/socketsecurity/core/__init__.py index 5236a99..4ea9312 100644 --- a/socketsecurity/core/__init__.py +++ b/socketsecurity/core/__init__.py @@ -1,15 +1,14 @@ import logging -import time import sys +import time from dataclasses import asdict from glob import glob from pathlib import PurePath from typing import BinaryIO, Dict, List, Tuple + from socketdev import socketdev -from socketdev.fullscans import ( - FullScanParams, - SocketArtifact -) +from socketdev.exceptions import APIFailure +from socketdev.fullscans import FullScanParams, SocketArtifact from socketdev.org import Organization from socketdev.repos import RepositoryInfo from socketdev.settings import SecurityPolicyRule @@ -23,10 +22,7 @@ Package, Purl, ) -from socketsecurity.core.exceptions import ( - APIResourceNotFound -) -from socketdev.exceptions import APIFailure +from socketsecurity.core.exceptions import APIResourceNotFound from socketsecurity.core.licenses import Licenses from .socket_config import SocketConfig @@ -43,11 +39,11 @@ class Core: """Main class for interacting with Socket Security API and processing scan results.""" - + ALERT_TYPE_TO_CAPABILITY = { "envVars": "Environment Variables", "networkAccess": "Network Access", - "filesystemAccess": "File System Access", + "filesystemAccess": "File System Access", "shellAccess": "Shell Access", "usesEval": "Uses Eval", "unsafe": "Unsafe" @@ -77,7 +73,7 @@ def set_org_vars(self) -> None: def get_org_id_slug(self) -> Tuple[str, str]: """Gets the Org ID and Org Slug for the API Token.""" - response = self.sdk.org.get() + response = self.sdk.org.get(use_types=True) organizations: Dict[str, Organization] = response.get("organizations", {}) if len(organizations) == 1: @@ -87,33 +83,33 @@ def get_org_id_slug(self) -> Tuple[str, str]: def get_sbom_data(self, full_scan_id: str) -> Dict[str, SocketArtifact]: """Returns the list of SBOM artifacts for a full scan.""" - response = self.sdk.fullscans.stream(self.config.org_slug, full_scan_id) + response = self.sdk.fullscans.stream(self.config.org_slug, full_scan_id, use_types=True) if not response.success: log.debug(f"Failed to get SBOM data for full-scan {full_scan_id}") log.debug(response.message) return {} return response.artifacts - + def get_sbom_data_list(self, artifacts_dict: Dict[str, SocketArtifact]) -> list[SocketArtifact]: """Converts artifacts dictionary to a list.""" return list(artifacts_dict.values()) def get_security_policy(self) -> Dict[str, SecurityPolicyRule]: """Gets the organization's security policy.""" - response = self.sdk.settings.get(self.config.org_slug) - + response = self.sdk.settings.get(self.config.org_slug, use_types=True) + if not response.success: log.error(f"Failed to get security policy: {response.status}") log.error(response.message) raise Exception(f"Failed to get security policy: {response.status}, message: {response.message}") - + return response.securityPolicyRules def create_sbom_output(self, diff: Diff) -> dict: """Creates CycloneDX output for a given diff.""" try: - result = self.sdk.export.cdx_bom(self.config.org_slug, diff.id) + result = self.sdk.export.cdx_bom(self.config.org_slug, diff.id, use_types=True) if not result.success: log.error(f"Failed to get CycloneDX Output for full-scan {diff.id}") log.error(result.message) @@ -121,7 +117,7 @@ def create_sbom_output(self, diff: Diff) -> dict: result.pop("success", None) return result - except Exception as error: + except Exception: log.error(f"Unable to get CycloneDX Output for {diff.id}") log.error(result.get("message", "No error message provided")) return {} @@ -130,17 +126,17 @@ def create_sbom_output(self, diff: Diff) -> dict: def find_files(path: str) -> List[str]: """ Finds supported manifest files in the given path. - + Args: path: Path to search for manifest files - + Returns: List of found manifest file paths """ log.debug("Starting Find Files") start_time = time.time() files = set() - + for ecosystem in socket_globs: patterns = socket_globs[ecosystem] for file_name in patterns: @@ -165,18 +161,18 @@ def find_files(path: str) -> List[str]: else: log.debug(f"{len(files_list)} Files found ({total_time:.2f}s): {', '.join(files_list)}") return list(files) - + @staticmethod def to_case_insensitive_regex(input_string: str) -> str: """ Converts a string into a case-insensitive regex pattern. - + Args: input_string: String to convert - + Returns: Case-insensitive regex pattern - + Example: "pipfile" -> "[Pp][Ii][Pp][Ff][Ii][Ll][Ee]" """ @@ -186,52 +182,52 @@ def to_case_insensitive_regex(input_string: str) -> str: def load_files_for_sending(files: List[str], workspace: str) -> List[Tuple[str, Tuple[str, BinaryIO]]]: """ Prepares files for sending to the Socket API. - + Args: files: List of file paths from find_files() workspace: Base directory path to make paths relative to - + Returns: List of tuples formatted for requests multipart upload: [(field_name, (filename, file_object)), ...] """ send_files = [] - + for file_path in files: if "/" in file_path: _, name = file_path.rsplit("/", 1) else: name = file_path - + if file_path.startswith(workspace): key = file_path[len(workspace):] else: key = file_path - + key = key.lstrip("/") key = key.lstrip("./") - + f = open(file_path, 'rb') payload = (key, (name, f)) send_files.append(payload) - + return send_files def create_full_scan(self, files: List[str], params: FullScanParams, has_head_scan: bool = False) -> FullScan: """ Creates a new full scan via the Socket API. - + Args: files: List of files to scan params: Parameters for the full scan - + Returns: FullScan object with scan results """ log.debug("Creating new full scan") create_full_start = time.time() - res = self.sdk.fullscans.post(files, params) + res = self.sdk.fullscans.post(files, params, use_types=True) if not res.success: log.error(f"Error creating full scan: {res.message}, status: {res.status}") raise Exception(f"Error creating full scan: {res.message}, status: {res.status}") @@ -245,20 +241,20 @@ def create_full_scan(self, files: List[str], params: FullScanParams, has_head_sc create_full_end = time.time() total_time = create_full_end - create_full_start log.debug(f"New Full Scan created in {total_time:.2f} seconds") - + return full_scan def get_full_scan(self, full_scan_id: str) -> FullScan: """ Get a FullScan object for an existing full scan including sbom_artifacts and packages. - + Args: full_scan_id: The ID of the full scan to get - + Returns: The FullScan object with populated artifacts and packages """ - full_scan_metadata = self.sdk.fullscans.metadata(self.config.org_slug, full_scan_id) + full_scan_metadata = self.sdk.fullscans.metadata(self.config.org_slug, full_scan_id, use_types=True) full_scan = FullScan(**asdict(full_scan_metadata.data)) full_scan_artifacts_dict = self.get_sbom_data(full_scan_id) full_scan.sbom_artifacts = self.get_sbom_data_list(full_scan_artifacts_dict) @@ -268,10 +264,10 @@ def get_full_scan(self, full_scan_id: str) -> FullScan: def create_packages_dict(self, sbom_artifacts: list[SocketArtifact]) -> dict[str, Package]: """ Creates a dictionary of Package objects from SBOM artifacts. - + Args: sbom_artifacts: List of SBOM artifacts from the scan - + Returns: Dictionary mapping package IDs to Package objects """ @@ -289,51 +285,51 @@ def create_packages_dict(self, sbom_artifacts: list[SocketArtifact]) -> dict[str top_level_count[top_id] = 1 else: top_level_count[top_id] += 1 - + for package_id, package in packages.items(): package.transitives = top_level_count.get(package_id, 0) return packages - + def get_package_license_text(self, package: Package) -> str: """ Gets the license text for a package if available. - + Args: package: Package object to get license text for - + Returns: License text if found, empty string otherwise """ if package.license is None: return "" - + license_raw = package.license all_licenses = Licenses() license_str = Licenses.make_python_safe(license_raw) - + if license_str is not None and hasattr(all_licenses, license_str): license_obj = getattr(all_licenses, license_str) return license_obj.licenseText - + return "" def get_repo_info(self, repo_slug: str, default_branch: str = "socket-default-branch") -> RepositoryInfo: """ Gets repository information from the Socket API. - + Args: repo_slug: Repository slug to get info for default_branch: Default branch string to use if the repo doesn't exist - + Returns: RepositoryInfo object - + Raises: Exception: If API request fails """ try: - response = self.sdk.repos.repo(self.config.org_slug, repo_slug) + response = self.sdk.repos.repo(self.config.org_slug, repo_slug, use_types=True) if not response.success: log.error(f"Failed to get repository: {response.status}") log.error(response.message) @@ -354,10 +350,10 @@ def get_repo_info(self, repo_slug: str, default_branch: str = "socket-default-br def get_head_scan_for_repo(self, repo_slug: str) -> str: """ Gets the head scan ID for a repository. - + Args: repo_slug: Repository slug to get head scan for - + Returns: Head scan ID if it exists, None otherwise """ @@ -377,24 +373,34 @@ def update_package_values(pkg: Package) -> Package: def get_added_and_removed_packages(self, head_full_scan_id: str, new_full_scan: FullScan) -> Tuple[Dict[str, Package], Dict[str, Package]]: """ Get packages that were added and removed between scans. - + Args: head_full_scan: Previous scan (may be None if first scan) head_full_scan_id: New scan just created - + Returns: Tuple of (added_packages, removed_packages) dictionaries """ if head_full_scan_id is None: log.info(f"No head scan found. New scan ID: {new_full_scan.id}") return new_full_scan.packages, {} - + log.info(f"Comparing scans - Head scan ID: {head_full_scan_id}, New scan ID: {new_full_scan.id}") diff_start = time.time() - diff_report = self.sdk.fullscans.stream_diff(self.config.org_slug, head_full_scan_id, new_full_scan.id).data + try: + diff_report = self.sdk.fullscans.stream_diff(self.config.org_slug, head_full_scan_id, new_full_scan.id, use_types=True).data + except APIFailure as e: + log.error(f"API Error: {e}") + sys.exit(1) + except Exception as e: + import traceback + log.error(f"Error getting diff report: {str(e)}") + log.error(f"Stack trace:\n{traceback.format_exc()}") + raise + diff_end = time.time() log.info(f"Diff Report Gathered in {diff_end - diff_start:.2f} seconds") - log.info(f"Diff report artifact counts:") + log.info("Diff report artifact counts:") log.info(f"Added: {len(diff_report.artifacts.added)}") log.info(f"Removed: {len(diff_report.artifacts.removed)}") log.info(f"Unchanged: {len(diff_report.artifacts.unchanged)}") @@ -442,7 +448,6 @@ def create_new_diff( Args: path: Path to look for manifest files params: Query params for the Full Scan endpoint - no_change: If True, return empty diff """ log.debug(f"starting create_new_diff with no_change: {no_change}") @@ -464,7 +469,7 @@ def create_new_diff( head_full_scan_id = None has_head_scan = False - # Create new scan + # Create new scan try: new_scan_start = time.time() new_full_scan = self.create_full_scan(files_for_sending, params, has_head_scan) @@ -474,17 +479,12 @@ def create_new_diff( log.error(f"API Error: {e}") sys.exit(1) except Exception as e: - log.error(f"Unexpected error while creating new scan: {e}") - sys.exit(1) + import traceback + log.error(f"Error creating new full scan: {str(e)}") + log.error(f"Stack trace:\n{traceback.format_exc()}") + raise - try: - added_packages, removed_packages = self.get_added_and_removed_packages(head_full_scan_id, new_full_scan) - except APIFailure as e: - log.error(f"API Error: {e}") - sys.exit(1) - except Exception as e: - log.error(f"Unexpected error while comparing packages: {e}") - sys.exit(1) + added_packages, removed_packages = self.get_added_and_removed_packages(head_full_scan_id, new_full_scan) diff = self.create_diff_report(added_packages, removed_packages) @@ -495,7 +495,7 @@ def create_new_diff( if not params.include_license_details: report_url += "?include_license_details=false" diff.report_url = report_url - + if head_full_scan_id is not None: diff.diff_url = f"{base_socket}/{self.config.org_slug}/diff/{diff.id}/{head_full_scan_id}" else: @@ -504,25 +504,25 @@ def create_new_diff( return diff def create_diff_report( - self, - added_packages: Dict[str, Package], + self, + added_packages: Dict[str, Package], removed_packages: Dict[str, Package], direct_only: bool = True ) -> Diff: """ Creates a diff report comparing two sets of packages. - + Takes packages that were added and removed between two scans and: 1. Records new/removed packages (direct only by default) 2. Collects alerts from both sets of packages 3. Determines new capabilities introduced - + Args: added_packages: Dict of packages added in new scan removed_packages: Dict of packages removed in new scan direct_only: If True, only direct dependencies are included in new/removed lists (but alerts are still processed for all packages) - + Returns: Diff object containing the comparison results """ @@ -577,11 +577,11 @@ def create_diff_report( def create_purl(package_id: str, packages: dict[str, Package]) -> Purl: """ Creates the extended PURL data for package identification and tracking. - + Args: package_id: Package ID to create PURL data for packages: Dictionary of all packages for transitive dependency lookup - + Returns: Purl object containing package metadata and dependency information """ @@ -606,14 +606,14 @@ def create_purl(package_id: str, packages: dict[str, Package]) -> Purl: def get_source_data(package: Package, packages: dict) -> list: """ Determines how a package was introduced into the dependency tree. - + For direct dependencies, records the manifest file. For transitive dependencies, records the top-level package that introduced it. - + Args: package: Package to analyze packages: Dictionary of all packages for ancestor lookup - + Returns: List of tuples containing (source, manifest_file) information """ @@ -648,7 +648,7 @@ def get_source_data(package: Package, packages: dict) -> list: def add_purl_capabilities(diff: Diff) -> None: """ Adds capability information to each package in the diff's new_packages list. - + Args: diff: Diff object to update with capability information """ @@ -662,18 +662,18 @@ def add_purl_capabilities(diff: Diff) -> None: new_packages.append(new_purl) else: new_packages.append(purl) - + diff.new_packages = new_packages def add_package_alerts_to_collection(self, package: Package, alerts_collection: dict, packages: dict) -> dict: """ Processes alerts from a package and adds them to a shared alerts collection. - + Args: package: Package to process alerts from alerts_collection: Dictionary to store processed alerts packages: Dictionary of all packages for dependency lookup - + Returns: Updated alerts collection dictionary """ @@ -723,11 +723,11 @@ def add_package_alerts_to_collection(self, package: Package, alerts_collection: def save_file(file_name: str, content: str) -> None: """ Saves content to a file, raising an error if the save fails. - + Args: file_name: Path to save the file content: Content to write to the file - + Raises: IOError: If file cannot be written """ @@ -742,10 +742,10 @@ def save_file(file_name: str, content: str) -> None: def has_manifest_files(files: list) -> bool: """ Checks if any files in the list are supported manifest files. - + Args: files: List of file paths to check - + Returns: True if any files match manifest patterns, False otherwise """ @@ -764,41 +764,41 @@ def has_manifest_files(files: list) -> bool: def get_capabilities_for_added_packages(added_packages: Dict[str, Package]) -> Dict[str, List[str]]: """ Maps added packages to their capabilities based on their alerts. - + Args: added_packages: Dictionary of packages added in new scan - + Returns: Dictionary mapping package IDs to their capability lists """ capabilities: Dict[str, List[str]] = {} - + for package_id, package in added_packages.items(): for alert in package.alerts: if alert["type"] in Core.ALERT_TYPE_TO_CAPABILITY: value = Core.ALERT_TYPE_TO_CAPABILITY[alert["type"]] - + if package_id not in capabilities: capabilities[package_id] = [value] elif value not in capabilities[package_id]: capabilities[package_id].append(value) - + return capabilities @staticmethod def get_new_alerts( - added_package_alerts: Dict[str, List[Issue]], + added_package_alerts: Dict[str, List[Issue]], removed_package_alerts: Dict[str, List[Issue]], ignore_readded: bool = True ) -> List[Issue]: """ Find alerts that are new or changed between added and removed packages. - + Args: added_package_alerts: Dictionary of alerts from packages that were added removed_package_alerts: Dictionary of alerts from packages that were removed ignore_readded: If True, don't report alerts that were both removed and added - + Returns: List of newly found alerts """ @@ -810,7 +810,7 @@ def get_new_alerts( new_alerts = added_package_alerts[alert_key] for alert in new_alerts: alert_str = f"{alert.purl},{alert.manifests},{alert.type}" - + if alert.error or alert.warn: if alert_str not in consolidated_alerts: alerts.append(alert) @@ -818,10 +818,10 @@ def get_new_alerts( else: new_alerts = added_package_alerts[alert_key] removed_alerts = removed_package_alerts[alert_key] - + for alert in new_alerts: alert_str = f"{alert.purl},{alert.manifests},{alert.type}" - + # Only add if: # 1. Alert isn't in removed packages (or we're not ignoring readded alerts) # 2. We haven't already recorded this alert @@ -832,5 +832,3 @@ def get_new_alerts( consolidated_alerts.add(alert_str) return alerts - -