diff --git a/socketsecurity/__init__.py b/socketsecurity/__init__.py index 121b7fc..c554d9c 100644 --- a/socketsecurity/__init__.py +++ b/socketsecurity/__init__.py @@ -1,2 +1,2 @@ __author__ = 'socket.dev' -__version__ = '2.0.9' +__version__ = '2.0.10' diff --git a/socketsecurity/core/messages.py b/socketsecurity/core/messages.py index ca90c27..2940e3d 100644 --- a/socketsecurity/core/messages.py +++ b/socketsecurity/core/messages.py @@ -2,6 +2,8 @@ import os import re import json +import logging +logging.basicConfig(level=logging.DEBUG) from pathlib import Path from mdutils import MdUtils @@ -16,7 +18,7 @@ class Messages: def map_severity_to_sarif(severity: str) -> str: """ Map Socket severity levels to SARIF levels (GitHub code scanning). - + 'low' -> 'note' 'medium' or 'middle' -> 'warning' 'high' or 'critical' -> 'error' @@ -39,115 +41,89 @@ def find_line_in_file(packagename: str, packageversion: str, manifest_file: str) Supports: 1) JSON-based manifest files (package-lock.json, Pipfile.lock, composer.lock) - Locates a dictionary entry with the matching package & version - - Does a rough line-based search to find the actual line in the raw text - 2) Text-based (requirements.txt, package.json, yarn.lock, etc.) - - Uses compiled regex patterns to detect a match line by line + - Searches the raw text for the key + 2) Text-based (requirements.txt, package.json, yarn.lock, pnpm-lock.yaml, etc.) + - Uses regex patterns to detect a match line by line """ - # Extract just the file name to detect manifest type file_type = Path(manifest_file).name + logging.debug("Processing file for line lookup: %s", manifest_file) - # ---------------------------------------------------- - # 1) JSON-based manifest files - # ---------------------------------------------------- if file_type in ["package-lock.json", "Pipfile.lock", "composer.lock"]: try: - # Read entire file so we can parse JSON and also do raw line checks with open(manifest_file, "r", encoding="utf-8") as f: raw_text = f.read() - - # Attempt JSON parse + logging.debug("Read %d characters from %s", len(raw_text), manifest_file) data = json.loads(raw_text) - - # In practice, you may need to check data["dependencies"], data["default"], etc. - # This is an example approach. packages_dict = ( data.get("packages") or data.get("default") or data.get("dependencies") or {} ) - + logging.debug("Found package keys in %s: %s", manifest_file, list(packages_dict.keys())) found_key = None found_info = None - # Locate a dictionary entry whose 'version' matches for key, value in packages_dict.items(): - # For NPM package-lock, keys might look like "node_modules/axios" if key.endswith(packagename) and "version" in value: if value["version"] == packageversion: found_key = key found_info = value break - if found_key and found_info: - # Search lines to approximate the correct line number - needle_key = f'"{found_key}":' # e.g. "node_modules/axios": - needle_version = f'"version": "{packageversion}"' + needle_key = f'"{found_key}":' lines = raw_text.splitlines() - best_line = 1 - snippet = None - + logging.debug("Total lines in %s: %d", manifest_file, len(lines)) for i, line in enumerate(lines, start=1): - if (needle_key in line) or (needle_version in line): - best_line = i - snippet = line.strip() - break # On first match, stop - - # If we found an approximate line, return it; else fallback to line 1 - if best_line > 0 and snippet: - return best_line, snippet - else: - return 1, f'"{found_key}": {found_info}' + if needle_key in line: + logging.debug("Found match at line %d in %s: %s", i, manifest_file, line.strip()) + return i, line.strip() + return 1, f'"{found_key}": {found_info}' else: return 1, f"{packagename} {packageversion} (not found in {manifest_file})" - - except (FileNotFoundError, json.JSONDecodeError): + except (FileNotFoundError, json.JSONDecodeError) as e: + logging.error("Error reading %s: %s", manifest_file, e) return 1, f"Error reading {manifest_file}" - # ---------------------------------------------------- - # 2) Text-based / line-based manifests - # ---------------------------------------------------- - # Define a dictionary of patterns for common manifest types - search_patterns = { - "package.json": rf'"{packagename}":\s*"{packageversion}"', - "yarn.lock": rf'{packagename}@{packageversion}', - "pnpm-lock.yaml": rf'"{re.escape(packagename)}"\s*:\s*\{{[^}}]*"version":\s*"{re.escape(packageversion)}"', - "requirements.txt": rf'^{re.escape(packagename)}\s*(?:==|===|!=|>=|<=|~=|\s+)?\s*{re.escape(packageversion)}(?:\s*;.*)?$', - "pyproject.toml": rf'{packagename}\s*=\s*"{packageversion}"', - "Pipfile": rf'"{packagename}"\s*=\s*"{packageversion}"', - "go.mod": rf'require\s+{re.escape(packagename)}\s+{re.escape(packageversion)}', - "go.sum": rf'{re.escape(packagename)}\s+{re.escape(packageversion)}', - "pom.xml": rf'{re.escape(packagename)}\s*{re.escape(packageversion)}', - "build.gradle": rf'implementation\s+"{re.escape(packagename)}:{re.escape(packageversion)}"', - "Gemfile": rf'gem\s+"{re.escape(packagename)}",\s*"{re.escape(packageversion)}"', - "Gemfile.lock": rf'\s+{re.escape(packagename)}\s+\({re.escape(packageversion)}\)', - ".csproj": rf'', - ".fsproj": rf'', - "paket.dependencies": rf'nuget\s+{re.escape(packagename)}\s+{re.escape(packageversion)}', - "Cargo.toml": rf'{re.escape(packagename)}\s*=\s*"{re.escape(packageversion)}"', - "build.sbt": rf'"{re.escape(packagename)}"\s*%\s*"{re.escape(packageversion)}"', - "Podfile": rf'pod\s+"{re.escape(packagename)}",\s*"{re.escape(packageversion)}"', - "Package.swift": rf'\.package\(name:\s*"{re.escape(packagename)}",\s*url:\s*".*?",\s*version:\s*"{re.escape(packageversion)}"\)', - "mix.exs": rf'\{{:{re.escape(packagename)},\s*"{re.escape(packageversion)}"\}}', - "composer.json": rf'"{re.escape(packagename)}":\s*"{re.escape(packageversion)}"', - "conanfile.txt": rf'{re.escape(packagename)}/{re.escape(packageversion)}', - "vcpkg.json": rf'"{re.escape(packagename)}":\s*"{re.escape(packageversion)}"', - } - - # If no specific pattern is found for this file name, fallback to a naive approach - searchstring = search_patterns.get(file_type, rf'{re.escape(packagename)}.*{re.escape(packageversion)}') + # For pnpm-lock.yaml, use a special regex pattern. + if file_type.lower() == "pnpm-lock.yaml": + searchstring = rf'^\s*/{re.escape(packagename)}/{re.escape(packageversion)}:' + else: + search_patterns = { + "package.json": rf'"{packagename}":\s*"[\^~]?{re.escape(packageversion)}"', + "yarn.lock": rf'{packagename}@{packageversion}', + "requirements.txt": rf'^{re.escape(packagename)}\s*(?:==|===|!=|>=|<=|~=|\s+)?\s*{re.escape(packageversion)}(?:\s*;.*)?$', + "pyproject.toml": rf'{packagename}\s*=\s*"{re.escape(packageversion)}"', + "Pipfile": rf'"{packagename}"\s*=\s*"{re.escape(packageversion)}"', + "go.mod": rf'require\s+{re.escape(packagename)}\s+{re.escape(packageversion)}', + "go.sum": rf'{re.escape(packagename)}\s+{re.escape(packageversion)}', + "pom.xml": rf'{re.escape(packagename)}\s*{re.escape(packageversion)}', + "build.gradle": rf'implementation\s+"{re.escape(packagename)}:{re.escape(packageversion)}"', + "Gemfile": rf'gem\s+"{re.escape(packagename)}",\s*"{re.escape(packageversion)}"', + "Gemfile.lock": rf'\s+{re.escape(packagename)}\s+\({re.escape(packageversion)}\)', + ".csproj": rf'', + ".fsproj": rf'', + "paket.dependencies": rf'nuget\s+{re.escape(packagename)}\s+{re.escape(packageversion)}', + "Cargo.toml": rf'{re.escape(packagename)}\s*=\s*"{re.escape(packageversion)}"', + "build.sbt": rf'"{re.escape(packagename)}"\s*%\s*"{re.escape(packageversion)}"', + "Podfile": rf'pod\s+"{re.escape(packagename)}",\s*"{re.escape(packageversion)}"', + "Package.swift": rf'\.package\(name:\s*"{re.escape(packagename)}",\s*url:\s*".*?",\s*version:\s*"{re.escape(packageversion)}"\)', + "mix.exs": rf'\{{:{re.escape(packagename)},\s*"{re.escape(packageversion)}"\}}', + "composer.json": rf'"{re.escape(packagename)}":\s*"{re.escape(packageversion)}"', + "conanfile.txt": rf'{re.escape(packagename)}/{re.escape(packageversion)}', + "vcpkg.json": rf'"{re.escape(packagename)}":\s*"{re.escape(packageversion)}"', + } + searchstring = search_patterns.get(file_type, rf'{re.escape(packagename)}.*{re.escape(packageversion)}') + logging.debug("Using search pattern for %s: %s", file_type, searchstring) try: - # Read file lines and search for a match with open(manifest_file, 'r', encoding="utf-8") as file: lines = [line.rstrip("\n") for line in file] + logging.debug("Total lines in %s: %d", manifest_file, len(lines)) for line_number, line_content in enumerate(lines, start=1): - # For Python conditional dependencies, ignore everything after first ';' line_main = line_content.split(";", 1)[0].strip() - - # Use a case-insensitive regex search if re.search(searchstring, line_main, re.IGNORECASE): + logging.debug("Match found at line %d in %s: %s", line_number, manifest_file, line_content.strip()) return line_number, line_content.strip() - except FileNotFoundError: return 1, f"{manifest_file} not found" except Exception as e: @@ -181,7 +157,6 @@ def get_manifest_type_url(manifest_file: str, pkg_name: str, pkg_version: str) - "composer.json": "composer", "vcpkg.json": "vcpkg", } - file_type = Path(manifest_file).name url_prefix = manifest_to_url_prefix.get(file_type, "unknown") return f"https://socket.dev/{url_prefix}/package/{pkg_name}/alerts/{pkg_version}" @@ -191,29 +166,33 @@ def create_security_comment_sarif(diff) -> dict: """ Create SARIF-compliant output from the diff report, including dynamic URL generation based on manifest type and improved
formatting for GitHub SARIF display. + + This function now: + - Processes every alert in diff.new_alerts. + - For alerts with multiple manifest files, generates an individual SARIF result for each file. + - Appends the manifest file name to the rule ID and name to make each result unique. + - Does NOT fall back to 'requirements.txt' if no manifest file is provided. + - Adds detailed logging to validate our assumptions. + """ - scan_failed = False if len(diff.new_alerts) == 0: for alert in diff.new_alerts: - alert: Issue if alert.error: - scan_failed = True break + sarif_data = { "$schema": "https://json.schemastore.org/sarif-2.1.0.json", "version": "2.1.0", - "runs": [ - { - "tool": { - "driver": { - "name": "Socket Security", - "informationUri": "https://socket.dev", - "rules": [] - } - }, - "results": [] - } - ] + "runs": [{ + "tool": { + "driver": { + "name": "Socket Security", + "informationUri": "https://socket.dev", + "rules": [] + } + }, + "results": [] + }] } rules_map = {} @@ -222,60 +201,77 @@ def create_security_comment_sarif(diff) -> dict: for alert in diff.new_alerts: pkg_name = alert.pkg_name pkg_version = alert.pkg_version - rule_id = f"{pkg_name}=={pkg_version}" + base_rule_id = f"{pkg_name}=={pkg_version}" severity = alert.severity - # Generate the correct URL for the alert based on manifest type - introduced_list = alert.introduced_by - manifest_file = introduced_list[0][1] if introduced_list and isinstance(introduced_list[0], list) else alert.manifests or "requirements.txt" - socket_url = Messages.get_manifest_type_url(manifest_file, pkg_name, pkg_version) - - # Prepare descriptions with
replacements - short_desc = f"{alert.props.get('note', '')}

Suggested Action:
{alert.suggestion}
{socket_url}" - full_desc = "{} - {}".format(alert.title, alert.description.replace('\r\n', '
')) - - # Identify the line and snippet in the manifest file - line_number, line_content = Messages.find_line_in_file(pkg_name, pkg_version, manifest_file) - if line_number < 1: - line_number = 1 # Ensure SARIF compliance - - # Create the rule if not already defined - if rule_id not in rules_map: - rules_map[rule_id] = { - "id": rule_id, - "name": f"{pkg_name}=={pkg_version}", - "shortDescription": {"text": f"Alert generated for {rule_id} by Socket Security"}, - "fullDescription": {"text": full_desc}, - "helpUri": socket_url, - "defaultConfiguration": { - "level": Messages.map_severity_to_sarif(severity) - }, - } + logging.debug("Alert %s - introduced_by: %s, manifests: %s", base_rule_id, alert.introduced_by, getattr(alert, 'manifests', None)) + manifest_files = [] + if alert.introduced_by and isinstance(alert.introduced_by, list): + for entry in alert.introduced_by: + if isinstance(entry, (list, tuple)) and len(entry) >= 2: + files = [f.strip() for f in entry[1].split(";") if f.strip()] + manifest_files.extend(files) + elif isinstance(entry, str): + manifest_files.extend([m.strip() for m in entry.split(";") if m.strip()]) + elif hasattr(alert, 'manifests') and alert.manifests: + manifest_files = [mf.strip() for mf in alert.manifests.split(";") if mf.strip()] + + logging.debug("Alert %s - extracted manifest_files: %s", base_rule_id, manifest_files) + if not manifest_files: + logging.error("Alert %s: No manifest file found; cannot determine file location.", base_rule_id) + continue + + logging.debug("Alert %s - using manifest_files for processing: %s", base_rule_id, manifest_files) + + # Create an individual SARIF result for each manifest file. + for mf in manifest_files: + logging.debug("Alert %s - Processing manifest file: %s", base_rule_id, mf) + socket_url = Messages.get_manifest_type_url(mf, pkg_name, pkg_version) + line_number, line_content = Messages.find_line_in_file(pkg_name, pkg_version, mf) + if line_number < 1: + line_number = 1 + logging.debug("Alert %s: Manifest %s, line %d: %s", base_rule_id, mf, line_number, line_content) + + # Create a unique rule id and name by appending the manifest file. + unique_rule_id = f"{base_rule_id} ({mf})" + rule_name = f"Alert {base_rule_id} ({mf})" + + short_desc = (f"{alert.props.get('note', '')}

Suggested Action:
{alert.suggestion}" + f"
{socket_url}") + full_desc = "{} - {}".format(alert.title, alert.description.replace('\r\n', '
')) + + if unique_rule_id not in rules_map: + rules_map[unique_rule_id] = { + "id": unique_rule_id, + "name": rule_name, + "shortDescription": {"text": rule_name}, + "fullDescription": {"text": full_desc}, + "helpUri": socket_url, + "defaultConfiguration": { + "level": Messages.map_severity_to_sarif(severity) + }, + } - # Add the SARIF result - result_obj = { - "ruleId": rule_id, - "message": {"text": short_desc}, - "locations": [ - { + result_obj = { + "ruleId": unique_rule_id, + "message": {"text": short_desc}, + "locations": [{ "physicalLocation": { - "artifactLocation": {"uri": manifest_file}, + "artifactLocation": {"uri": mf}, "region": { "startLine": line_number, "snippet": {"text": line_content}, }, } - } - ], - } - results_list.append(result_obj) + }] + } + results_list.append(result_obj) - # Attach rules and results sarif_data["runs"][0]["tool"]["driver"]["rules"] = list(rules_map.values()) sarif_data["runs"][0]["results"] = results_list return sarif_data - + @staticmethod def create_security_comment_json(diff: Diff) -> dict: scan_failed = False