Skip to content

Eric/use types and logging #61

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Mar 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion socketsecurity/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
__author__ = 'socket.dev'
__version__ = '2.0.11'
__version__ = '2.0.12'
25 changes: 15 additions & 10 deletions socketsecurity/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,18 +394,23 @@ def get_repo_info(self, repo_slug: str, default_branch: str = "socket-default-br
if not response.success:
log.error(f"Failed to get repository: {response.status}")
log.error(response.message)
# raise Exception(f"Failed to get repository info: {response.status}, message: {response.message}")
except APIFailure:
log.warning(f"Failed to get repository {repo_slug}, attempting to create it")
create_response = self.sdk.repos.post(self.config.org_slug, name=repo_slug, default_branch=default_branch)
if not create_response.success:
log.error(f"Failed to create repository: {create_response.status}")
log.error(create_response.message)
raise Exception(
f"Failed to create repository: {create_response.status}, message: {create_response.message}"
)
else:
return create_response.data
try:

create_response = self.sdk.repos.post(self.config.org_slug, name=repo_slug, default_branch=default_branch)

# Check if the response is empty (failure) or has content (success)
if not create_response:
log.error("Failed to create repository: empty response")
raise Exception("Failed to create repository: empty response")
else:
return create_response

except APIFailure as e:
log.error(f"API failure while creating repository: {e}")
sys.exit(2) # Exit here with code 2. Code 1 indicates a successfully-detected security issue.

return response.data

def get_head_scan_for_repo(self, repo_slug: str) -> str:
Expand Down
6 changes: 4 additions & 2 deletions socketsecurity/core/logging.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging


def initialize_logging(
level: int = logging.INFO,
format: str = "%(asctime)s: %(message)s",
Expand All @@ -23,10 +24,11 @@ def initialize_logging(
cli_logger = logging.getLogger(cli_logger_name)
cli_logger.setLevel(level)


return socket_logger, cli_logger

def set_debug_mode(enable: bool = True) -> None:
def set_debug_mode(enable: bool = False) -> None:
"""Toggle debug logging across all loggers"""
level = logging.DEBUG if enable else logging.INFO
logging.getLogger("socketdev").setLevel(level)
logging.getLogger("socketcli").setLevel(level)
logging.getLogger("socketcli").setLevel(level)
44 changes: 21 additions & 23 deletions socketsecurity/core/messages.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
import json
import os
import re
import json
import logging
logging.basicConfig(level=logging.DEBUG)

import re
from pathlib import Path

from mdutils import MdUtils
from prettytable import PrettyTable

from socketsecurity.core.classes import Diff, Issue, Purl

log = logging.getLogger("socketcli")

class Messages:

Expand Down Expand Up @@ -46,21 +44,21 @@ def find_line_in_file(packagename: str, packageversion: str, manifest_file: str)
- Uses regex patterns to detect a match line by line
"""
file_type = Path(manifest_file).name
logging.debug("Processing file for line lookup: %s", manifest_file)
log.debug("Processing file for line lookup: %s", manifest_file)

if file_type in ["package-lock.json", "Pipfile.lock", "composer.lock"]:
try:
with open(manifest_file, "r", encoding="utf-8") as f:
raw_text = f.read()
logging.debug("Read %d characters from %s", len(raw_text), manifest_file)
log.debug("Read %d characters from %s", len(raw_text), manifest_file)
data = json.loads(raw_text)
packages_dict = (
data.get("packages")
or data.get("default")
or data.get("dependencies")
or {}
)
logging.debug("Found package keys in %s: %s", manifest_file, list(packages_dict.keys()))
log.debug("Found package keys in %s: %s", manifest_file, list(packages_dict.keys()))
found_key = None
found_info = None
for key, value in packages_dict.items():
Expand All @@ -72,16 +70,16 @@ def find_line_in_file(packagename: str, packageversion: str, manifest_file: str)
if found_key and found_info:
needle_key = f'"{found_key}":'
lines = raw_text.splitlines()
logging.debug("Total lines in %s: %d", manifest_file, len(lines))
log.debug("Total lines in %s: %d", manifest_file, len(lines))
for i, line in enumerate(lines, start=1):
if needle_key in line:
logging.debug("Found match at line %d in %s: %s", i, manifest_file, line.strip())
log.debug("Found match at line %d in %s: %s", i, manifest_file, line.strip())
return i, line.strip()
return 1, f'"{found_key}": {found_info}'
else:
return 1, f"{packagename} {packageversion} (not found in {manifest_file})"
except (FileNotFoundError, json.JSONDecodeError) as e:
logging.error("Error reading %s: %s", manifest_file, e)
log.error("Error reading %s: %s", manifest_file, e)
return 1, f"Error reading {manifest_file}"

# For pnpm-lock.yaml, use a special regex pattern.
Expand Down Expand Up @@ -114,15 +112,15 @@ def find_line_in_file(packagename: str, packageversion: str, manifest_file: str)
}
searchstring = search_patterns.get(file_type, rf'{re.escape(packagename)}.*{re.escape(packageversion)}')

logging.debug("Using search pattern for %s: %s", file_type, searchstring)
log.debug("Using search pattern for %s: %s", file_type, searchstring)
try:
with open(manifest_file, 'r', encoding="utf-8") as file:
lines = [line.rstrip("\n") for line in file]
logging.debug("Total lines in %s: %d", manifest_file, len(lines))
log.debug("Total lines in %s: %d", manifest_file, len(lines))
for line_number, line_content in enumerate(lines, start=1):
line_main = line_content.split(";", 1)[0].strip()
if re.search(searchstring, line_main, re.IGNORECASE):
logging.debug("Match found at line %d in %s: %s", line_number, manifest_file, line_content.strip())
log.debug("Match found at line %d in %s: %s", line_number, manifest_file, line_content.strip())
return line_number, line_content.strip()
except FileNotFoundError:
return 1, f"{manifest_file} not found"
Expand Down Expand Up @@ -172,8 +170,8 @@ def create_security_comment_sarif(diff) -> dict:
- For alerts with multiple manifest files, generates an individual SARIF result for each file.
- Appends the manifest file name to the rule ID and name to make each result unique.
- Does NOT fall back to 'requirements.txt' if no manifest file is provided.
- Adds detailed logging to validate our assumptions.
- Adds detailed log to validate our assumptions.

"""
if len(diff.new_alerts) == 0:
for alert in diff.new_alerts:
Expand Down Expand Up @@ -204,7 +202,7 @@ def create_security_comment_sarif(diff) -> dict:
base_rule_id = f"{pkg_name}=={pkg_version}"
severity = alert.severity

logging.debug("Alert %s - introduced_by: %s, manifests: %s", base_rule_id, alert.introduced_by, getattr(alert, 'manifests', None))
log.debug("Alert %s - introduced_by: %s, manifests: %s", base_rule_id, alert.introduced_by, getattr(alert, 'manifests', None))
manifest_files = []
if alert.introduced_by and isinstance(alert.introduced_by, list):
for entry in alert.introduced_by:
Expand All @@ -216,21 +214,21 @@ def create_security_comment_sarif(diff) -> dict:
elif hasattr(alert, 'manifests') and alert.manifests:
manifest_files = [mf.strip() for mf in alert.manifests.split(";") if mf.strip()]

logging.debug("Alert %s - extracted manifest_files: %s", base_rule_id, manifest_files)
log.debug("Alert %s - extracted manifest_files: %s", base_rule_id, manifest_files)
if not manifest_files:
logging.error("Alert %s: No manifest file found; cannot determine file location.", base_rule_id)
log.error("Alert %s: No manifest file found; cannot determine file location.", base_rule_id)
continue

logging.debug("Alert %s - using manifest_files for processing: %s", base_rule_id, manifest_files)
log.debug("Alert %s - using manifest_files for processing: %s", base_rule_id, manifest_files)

# Create an individual SARIF result for each manifest file.
for mf in manifest_files:
logging.debug("Alert %s - Processing manifest file: %s", base_rule_id, mf)
log.debug("Alert %s - Processing manifest file: %s", base_rule_id, mf)
socket_url = Messages.get_manifest_type_url(mf, pkg_name, pkg_version)
line_number, line_content = Messages.find_line_in_file(pkg_name, pkg_version, mf)
if line_number < 1:
line_number = 1
logging.debug("Alert %s: Manifest %s, line %d: %s", base_rule_id, mf, line_number, line_content)
log.debug("Alert %s: Manifest %s, line %d: %s", base_rule_id, mf, line_number, line_content)

# Create a unique rule id and name by appending the manifest file.
unique_rule_id = f"{base_rule_id} ({mf})"
Expand Down Expand Up @@ -271,7 +269,7 @@ def create_security_comment_sarif(diff) -> dict:
sarif_data["runs"][0]["results"] = results_list

return sarif_data

@staticmethod
def create_security_comment_json(diff: Diff) -> dict:
scan_failed = False
Expand Down
Loading