Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions src/macaron/output_reporter/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from macaron.slsa_analyzer.checks.check_result import CheckResultType
from macaron.slsa_analyzer.levels import SLSALevels
from macaron.slsa_analyzer.registry import registry
from macaron.slsa_analyzer.slsa_req import ReqName
from macaron.slsa_analyzer.slsa_req import BUILD_REQ_DESC, ReqName


class DepSummary(TypedDict):
Expand Down Expand Up @@ -119,7 +119,7 @@ def get_dict(self) -> dict:
has_passing_check = False
if self.context:
for res in self.context.check_results.values():
if res["result_type"] == CheckResultType.PASSED:
if res.result.result_type == CheckResultType.PASSED:
has_passing_check = True
break

Expand Down Expand Up @@ -156,9 +156,9 @@ def get_dep_summary(self) -> DepSummary:
result["unique_dep_repos"] += 1
if dep_record.context:
for check_result in dep_record.context.check_results.values():
if check_result["result_type"] == CheckResultType.PASSED:
if check_result.result.result_type == CheckResultType.PASSED:
for entry in result["checks_summary"]:
if entry["check_id"] == check_result["check_id"]:
if entry["check_id"] == check_result.check.check_id:
entry["num_deps_pass"] += 1
case SCMStatus.DUPLICATED_SCM:
result["analyzed_deps"] += 1
Expand Down Expand Up @@ -291,9 +291,10 @@ def __str__(self) -> str:
)

slsa_req_mesg: dict[SLSALevels, list[str]] = {level: [] for level in SLSALevels if level != SLSALevels.LEVEL0}
for req in main_ctx.ctx_data.values():
if req.min_level_required != SLSALevels.LEVEL0 and req.is_addressed:
message = f"{req.name.capitalize()}: " + ("PASSED" if req.is_pass else "FAILED")
for req_name, req_status in main_ctx.ctx_data.items():
req = BUILD_REQ_DESC[req_name]
if req.min_level_required != SLSALevels.LEVEL0 and req_status.is_addressed:
message = f"{req.name.capitalize()}: " + ("PASSED" if req_status.is_pass else "FAILED")

if ctx_list:
# Get the fail count for dependencies.
Expand Down
20 changes: 10 additions & 10 deletions src/macaron/slsa_analyzer/analyze_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from macaron.slsa_analyzer.git_service.base_git_service import NoneGitService
from macaron.slsa_analyzer.levels import SLSALevels
from macaron.slsa_analyzer.provenance.expectations.expectation import Expectation
from macaron.slsa_analyzer.slsa_req import ReqName, SLSAReq, get_requirements_dict
from macaron.slsa_analyzer.slsa_req import ReqName, SLSAReqStatus, create_requirement_status_dict
from macaron.slsa_analyzer.specs.build_spec import BuildSpec
from macaron.slsa_analyzer.specs.ci_spec import CIInfo
from macaron.slsa_analyzer.specs.package_registry_spec import PackageRegistryInfo
Expand Down Expand Up @@ -64,7 +64,7 @@ def __init__(
The output dir.
"""
self.component = component
self.ctx_data: dict[ReqName, SLSAReq] = get_requirements_dict()
self.ctx_data: dict[ReqName, SLSAReqStatus] = create_requirement_status_dict()

self.slsa_level = SLSALevels.LEVEL0
# Indicate whether this repo fully reach a level or
Expand Down Expand Up @@ -176,14 +176,14 @@ def get_slsa_level_table(self) -> SLSALevel:

def get_dict(self) -> dict:
"""Return the dictionary representation of the AnalyzeContext instance."""
_sorted_on_id = sorted(self.check_results.values(), key=lambda item: item["check_id"])
_sorted_on_id = sorted(self.check_results.values(), key=lambda item: item.check.check_id)
# Remove result_tables since we don't have a good json representation for them.
sorted_on_id = []
for res in _sorted_on_id:
# res is CheckResult(TypedDict)
res_copy: dict = dict(res.copy())
res_copy.pop("result_tables")
sorted_on_id.append(res_copy)
# res is CheckResult
res_dict: dict = dict(res.get_summary())
res_dict.pop("result_tables")
sorted_on_id.append(res_dict)
sorted_results = sorted(sorted_on_id, key=lambda item: item["result_type"], reverse=True)
check_summary = {
result_type.value: len(result_list) for result_type, result_list in self.get_check_summary().items()
Expand Down Expand Up @@ -219,7 +219,7 @@ def get_check_summary(self) -> dict[CheckResultType, list[CheckResult]]:
result: dict[CheckResultType, list[CheckResult]] = {result_type: [] for result_type in CheckResultType}

for check_result in self.check_results.values():
match check_result["result_type"]:
match check_result.result.result_type:
case CheckResultType.PASSED:
result[CheckResultType.PASSED].append(check_result)
case CheckResultType.SKIPPED:
Expand All @@ -243,8 +243,8 @@ def __str__(self) -> str:
output = "".join(
[
output,
f"Check {check_result['check_id']}: {check_result['check_description']}\n",
f"\t{check_result['result_type'].value}\n",
f"Check {check_result.check.check_id}: {check_result.check.check_description}\n",
f"\t{check_result.result.result_type.value}\n",
]
)

Expand Down
88 changes: 49 additions & 39 deletions src/macaron/slsa_analyzer/checks/base_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,22 @@
from abc import abstractmethod

from macaron.slsa_analyzer.analyze_context import AnalyzeContext
from macaron.slsa_analyzer.checks.check_result import CheckResult, CheckResultType, SkippedInfo, get_result_as_bool
from macaron.slsa_analyzer.slsa_req import ReqName, get_requirements_dict
from macaron.slsa_analyzer.checks.check_result import (
CheckInfo,
CheckResult,
CheckResultData,
CheckResultType,
SkippedInfo,
get_result_as_bool,
)
from macaron.slsa_analyzer.slsa_req import ReqName

logger: logging.Logger = logging.getLogger(__name__)


class BaseCheck:
"""This abstract class is used to implement Checks in Macaron."""

# The dictionary that contains the data of all SLSA requirements.
SLSA_REQ_DATA = get_requirements_dict()

def __init__(
self,
check_id: str = "",
Expand All @@ -44,20 +48,34 @@ def __init__(
result_on_skip : CheckResultType
The status for this check when it's skipped based on another check's result.
"""
self.check_id = check_id
self.description = description
self._check_info = CheckInfo(
check_id=check_id, check_description=description, eval_reqs=eval_reqs if eval_reqs else []
)

if not depends_on:
self.depends_on = []
self._depends_on = []
else:
self.depends_on = depends_on
self._depends_on = depends_on

if not eval_reqs:
self.eval_reqs = []
else:
self.eval_reqs = eval_reqs
self._result_on_skip = result_on_skip

@property
def check_info(self) -> CheckInfo:
"""Get the information identifying/describing this check."""
return self._check_info

@property
def depends_on(self) -> list[tuple[str, CheckResultType]]:
"""Get the list of parent checks that this check depends on.

self.result_on_skip = result_on_skip
Each member of the list is a tuple of the parent's id and the status of that parent check.
"""
return self._depends_on

@property
def result_on_skip(self) -> CheckResultType:
"""Get the status for this check when it's skipped based on another check's result."""
return self._result_on_skip

def run(self, target: AnalyzeContext, skipped_info: SkippedInfo | None = None) -> CheckResult:
"""Run the check and return the results.
Expand All @@ -75,66 +93,58 @@ def run(self, target: AnalyzeContext, skipped_info: SkippedInfo | None = None) -
The result of the check.
"""
logger.info("----------------------------------")
logger.info("BEGIN CHECK: %s", self.check_id)
logger.info("BEGIN CHECK: %s", self.check_info.check_id)
logger.info("----------------------------------")

check_result = CheckResult(
check_id=self.check_id,
check_description=self.description,
slsa_requirements=[str(self.SLSA_REQ_DATA.get(req)) for req in self.eval_reqs],
justification=[],
result_type=CheckResultType.SKIPPED,
result_tables=[],
)
check_result_data: CheckResultData

if skipped_info:
check_result["result_type"] = self.result_on_skip
check_result["justification"].append(skipped_info["suppress_comment"])
check_result_data = CheckResultData(
justification=[skipped_info["suppress_comment"]], result_tables=[], result_type=self.result_on_skip
)
logger.info(
"Check %s is skipped on target %s, comment: %s",
self.check_id,
self.check_info.check_id,
target.component.purl,
skipped_info["suppress_comment"],
)
else:
check_result["result_type"] = self.run_check(target, check_result)
check_result_data = self.run_check(target)
logger.info(
"Check %s run %s on target %s, result: %s",
self.check_id,
check_result["result_type"].value,
self.check_info.check_id,
check_result_data.result_type.value,
target.component.purl,
check_result["justification"],
check_result_data.justification,
)

justification_str = ""
for ele in check_result["justification"]:
for ele in check_result_data.justification:
if isinstance(ele, dict):
for key, val in ele.items():
justification_str += f"{key}: {val}. "
justification_str += f"{str(ele)}. "

target.bulk_update_req_status(
self.eval_reqs,
get_result_as_bool(check_result["result_type"]),
self.check_info.eval_reqs,
get_result_as_bool(check_result_data.result_type),
justification_str,
)

return check_result
return CheckResult(check=self.check_info, result=check_result_data)

@abstractmethod
def run_check(self, ctx: AnalyzeContext, check_result: CheckResult) -> CheckResultType:
def run_check(self, ctx: AnalyzeContext) -> CheckResultData:
"""Implement the check in this method.

Parameters
----------
ctx : AnalyzeContext
The object containing processed data for the target repo.
check_result : CheckResult
The object containing result data of a check.

Returns
-------
CheckResultType
The result type of the check (e.g. PASSED).
CheckResultData
The result of the check.
"""
raise NotImplementedError
Loading