diff --git a/docs/source/pages/developers_guide/style_guide.rst b/docs/source/pages/developers_guide/style_guide.rst index 1381f6ddb..d726bcbba 100644 --- a/docs/source/pages/developers_guide/style_guide.rst +++ b/docs/source/pages/developers_guide/style_guide.rst @@ -76,3 +76,66 @@ For variables of a class: we do not use the ``Attribute`` section as per the `nu x: float #: The y coordinate of the point. y: float + + +-------------------------- +Logging and Console Output +-------------------------- + +Macaron uses the ``rich`` Python library to provide enhanced and well-formatted logging output in the terminal. All logging should be performed using the ``macaron.console`` module. + +''''''''''''''''''''''''' +Why use rich for logging? +''''''''''''''''''''''''' + +- Provides visually appealing, color-coded, and structured terminal UI. +- Supports custom components like table, progress bar, and panel for complex output. +- Makes it easier to spot errors, progress, failed checks, etc. + +'''''''''''''''''''''''''''''' +How to use the logging handler +'''''''''''''''''''''''''''''' + +Import the ``access_handler`` from ``macaron.console`` and use it to set/get the custom rich handler. For simple logging, you can use the handler's methods for info, debug, and error messages. + + .. code-block:: python + + from macaron.console import access_handler + + # Get the RichConsoleHandler + rich_handler = access_handler.get_handler() + + # Log an info message + console.info("") + + # Log an debug message + console.debug("") + + # Log an error message + console.error("") + +To modify the console UI, create a function call which takes necessary information and converts them into Rich RenderableType. Also modify the ``make_layout()`` function to use the newly created rich component. + + .. code-block:: python + + def update_find_source_table(self, key: str, value: str | Status) -> None: + self.find_source_content[key] = value + find_source_table = Table(show_header=False, box=None) + find_source_table.add_column("Details", justify="left") + find_source_table.add_column("Value", justify="left") + for field, content in self.find_source_content.items(): + find_source_table.add_row(field, content) + self.find_source_table = find_source_table + + + def make_layout(self) -> Group: + layout: list[RenderableType] = [] + if self.command == "find-source": + if self.find_source_table.row_count > 0: + layout = layout + [self.find_source_table] + return Group(*layout) + + + rich_handler.update_find_source_table("", "") + +For more advance formatting, refer to the ``rich`` documentation: https://rich.readthedocs.io/en/stable/ diff --git a/src/macaron/console.py b/src/macaron/console.py index 725a8af0b..f71400f17 100644 --- a/src/macaron/console.py +++ b/src/macaron/console.py @@ -17,7 +17,180 @@ from rich.table import Table -class RichConsoleHandler(RichHandler): +class TableBuilder: + """Builder to provide common table-building utilities for console classes.""" + + @staticmethod + def _make_table(content: dict, columns: list[str]) -> Table: + table = Table(show_header=False, box=None) + for col in columns: + table.add_column(col, justify="left") + for field, value in content.items(): + table.add_row(field, value) + return table + + @staticmethod + def _make_checks_table(checks: dict[str, str]) -> Table: + table = Table(show_header=False, box=None) + table.add_column("Status", justify="left") + table.add_column("Check", justify="left") + for check_name, check_status in checks.items(): + if check_status == "RUNNING": + table.add_row(Status("[bold green]RUNNING[/]"), check_name) + return table + + @staticmethod + def _make_failed_checks_table(failed_checks: list) -> Table: + table = Table(show_header=False, box=None) + table.add_column("Status", justify="left") + table.add_column("Check ID", justify="left") + table.add_column("Description", justify="left") + for check in failed_checks: + table.add_row("[bold red]FAILED[/]", check.check.check_id, check.check.check_description) + return table + + @staticmethod + def _make_summary_table(checks_summary: dict, total_checks: int) -> Table: + table = Table(show_header=False, box=None) + table.add_column("Check Result Type", justify="left") + table.add_column("Count", justify="left") + table.add_row("Total Checks", str(total_checks), style="white") + color_map = { + "PASSED": "green", + "FAILED": "red", + "SKIPPED": "yellow", + "DISABLED": "bright_blue", + "UNKNOWN": "white", + } + for check_result_type, checks in checks_summary.items(): + if check_result_type in color_map: + table.add_row(check_result_type, str(len(checks)), style=color_map[check_result_type]) + return table + + +class Dependency(TableBuilder): + """A class to manage the display of dependency analysis in the console.""" + + def __init__(self) -> None: + """Initialize the Dependency instance with default values and tables.""" + self.description_table = Table(show_header=False, box=None) + self.description_table_content: dict[str, str | Status] = { + "Package URL:": Status("[green]Processing[/]"), + "Local Cloned Path:": Status("[green]Processing[/]"), + "Remote Path:": Status("[green]Processing[/]"), + "Branch:": Status("[green]Processing[/]"), + "Commit Hash:": Status("[green]Processing[/]"), + "Commit Date:": Status("[green]Processing[/]"), + "CI Services:": Status("[green]Processing[/]"), + "Build Tools:": Status("[green]Processing[/]"), + } + self.progress = Progress( + TextColumn(" RUNNING ANALYSIS"), + BarColumn(bar_width=None, complete_style="green"), + MofNCompleteColumn(), + ) + self.task_id: TaskID + self.progress_table = Table(show_header=False, box=None) + self.checks: dict[str, str] = {} + self.failed_checks_table = Table(show_header=False, box=None) + self.summary_table = Table(show_header=False, box=None) + + def add_description_table_content(self, key: str, value: str | Status) -> None: + """ + Add or update a key-value pair in the description table. + + Parameters + ---------- + key : str + The key to be added or updated. + value : str or Status + The value associated with the key. + """ + self.description_table_content[key] = value + self.description_table = self._make_table(self.description_table_content, ["Details", "Value"]) + + def no_of_checks(self, value: int) -> None: + """ + Initialize the progress bar with the total number of checks. + + Parameters + ---------- + value : int + The total number of checks to be performed. + """ + self.task_id = self.progress.add_task("analyzing", total=value) + + def remove_progress_bar(self) -> None: + """Remove the progress bar from the display.""" + self.progress.remove_task(self.task_id) + + def update_checks(self, check_id: str, status: str = "RUNNING") -> None: + """ + Update the status of a specific check and refresh the progress table. + + Parameters + ---------- + check_id : str + The identifier of the check to be updated. + status : str, optional + The new status of the check, by default "RUNNING" + """ + self.checks[check_id] = status + self.progress_table = self._make_checks_table(self.checks) + if self.task_id is not None and status != "RUNNING": + self.progress.update(self.task_id, advance=1) + + def update_checks_summary(self, checks_summary: dict, total_checks: int) -> None: + """ + Update the summary tables with the results of the checks. + + Parameters + ---------- + checks_summary : dict + Dictionary containing lists of checks categorized by their results. + total_checks : int + The total number of checks. + """ + self.failed_checks_table = self._make_failed_checks_table(checks_summary.get("FAILED", [])) + self.summary_table = self._make_summary_table(checks_summary, total_checks) + + def make_layout(self) -> list[RenderableType]: + """ + Create the layout for the live console display. + + Returns + ------- + list[RenderableType] + A list of rich RenderableType objects containing the layout for the live console display. + """ + layout: list[RenderableType] = [] + if self.description_table.row_count > 0: + layout = layout + [ + "", + self.description_table, + ] + if self.progress_table.row_count > 0: + layout = layout + ["", self.progress, "", self.progress_table] + if self.failed_checks_table.row_count > 0: + layout = layout + [ + "", + Rule(" SUMMARY", align="left"), + "", + self.failed_checks_table, + ] + if self.summary_table.row_count > 0: + layout = layout + ["", self.summary_table] + elif self.summary_table.row_count > 0: + layout = layout + [ + "", + Rule(" SUMMARY", align="left"), + "", + self.summary_table, + ] + return layout + + +class RichConsoleHandler(RichHandler, TableBuilder): """A rich console handler for logging with rich formatting and live updates.""" def __init__(self, *args: Any, verbose: bool = False, **kwargs: Any) -> None: @@ -67,8 +240,10 @@ def __init__(self, *args: Any, verbose: bool = False, **kwargs: Any) -> None: "Dependencies Report": "Not Generated", "JSON Report": "Not Generated", } - self.components_violates_table = Table(show_header=False, box=None) - self.components_satisfy_table = Table(show_header=False, box=None) + self.if_dependency: bool = False + self.dependency_analysis_list: list[Dependency] = [] + self.components_violates_table = Table(box=None) + self.components_satisfy_table = Table(box=None) self.policy_summary_table = Table(show_header=False, box=None) self.policy_summary: dict[str, str | Status] = { "Passed Policies": "None", @@ -138,14 +313,11 @@ def add_description_table_content(self, key: str, value: str | Status) -> None: value : str or Status The value associated with the key. """ + if self.if_dependency and self.dependency_analysis_list: + self.dependency_analysis_list[-1].add_description_table_content(key, value) + return self.description_table_content[key] = value - description_table = Table(show_header=False, box=None) - description_table.add_column("Details", justify="left") - description_table.add_column("Value", justify="left") - for field, content in self.description_table_content.items(): - description_table.add_row(field, content) - - self.description_table = description_table + self.description_table = self._make_table(self.description_table_content, ["Details", "Value"]) def no_of_checks(self, value: int) -> None: """ @@ -156,8 +328,20 @@ def no_of_checks(self, value: int) -> None: value : int The total number of checks to be performed. """ + if self.if_dependency and self.dependency_analysis_list: + dependency = self.dependency_analysis_list[-1] + dependency.no_of_checks(value) + return self.task_id = self.progress.add_task("analyzing", total=value) + def remove_progress_bar(self) -> None: + """Remove the progress bar from the display.""" + if self.if_dependency and self.dependency_analysis_list: + dependency = self.dependency_analysis_list[-1] + dependency.remove_progress_bar() + return + self.progress.remove_task(self.task_id) + def update_checks(self, check_id: str, status: str = "RUNNING") -> None: """ Update the status of a specific check and refresh the progress table. @@ -169,17 +353,11 @@ def update_checks(self, check_id: str, status: str = "RUNNING") -> None: status : str, optional The new status of the check, by default "RUNNING" """ + if self.if_dependency and self.dependency_analysis_list: + self.dependency_analysis_list[-1].update_checks(check_id, status) + return self.checks[check_id] = status - - progress_table = Table(show_header=False, box=None) - progress_table.add_column("Status", justify="left") - progress_table.add_column("Check", justify="left") - - for check_name, check_status in self.checks.items(): - if check_status == "RUNNING": - progress_table.add_row(Status("[bold green]RUNNING[/]"), check_name) - self.progress_table = progress_table - + self.progress_table = self._make_checks_table(self.checks) if self.task_id is not None and status != "RUNNING": self.progress.update(self.task_id, advance=1) @@ -194,39 +372,11 @@ def update_checks_summary(self, checks_summary: dict, total_checks: int) -> None total_checks : int The total number of checks. """ - failed_checks_table = Table(show_header=False, box=None) - failed_checks_table.add_column("Status", justify="left") - failed_checks_table.add_column("Check ID", justify="left") - failed_checks_table.add_column("Description", justify="left") - - failed_checks = checks_summary["FAILED"] - for check in failed_checks: - failed_checks_table.add_row( - "[bold red]FAILED[/]", - check.check.check_id, - check.check.check_description, - ) - - self.failed_checks_table = failed_checks_table - - summary_table = Table(show_header=False, box=None) - summary_table.add_column("Check Result Type", justify="left") - summary_table.add_column("Count", justify="left") - summary_table.add_row("Total Checks", str(total_checks), style="white") - - for check_result_type, checks in checks_summary.items(): - if check_result_type == "PASSED": - summary_table.add_row("PASSED", str(len(checks)), style="green") - if check_result_type == "FAILED": - summary_table.add_row("FAILED", str(len(checks)), style="red") - if check_result_type == "SKIPPED": - summary_table.add_row("SKIPPED", str(len(checks)), style="yellow") - if check_result_type == "DISABLED": - summary_table.add_row("DISABLED", str(len(checks)), style="bright_blue") - if check_result_type == "UNKNOWN": - summary_table.add_row("UNKNOWN", str(len(checks)), style="white") - - self.summary_table = summary_table + if self.if_dependency and self.dependency_analysis_list: + self.dependency_analysis_list[-1].update_checks_summary(checks_summary, total_checks) + return + self.failed_checks_table = self._make_failed_checks_table(checks_summary.get("FAILED", [])) + self.summary_table = self._make_summary_table(checks_summary, total_checks) def update_report_table(self, report_type: str, report_path: str) -> None: """ @@ -249,6 +399,20 @@ def update_report_table(self, report_type: str, report_path: str) -> None: self.report_table = report_table + def is_dependency(self, value: bool) -> None: + """ + Update the flag indicating whether the analyzed package is a dependency. + + Parameters + ---------- + value : bool + True if the package is a dependency, False otherwise. + """ + self.if_dependency = value + if self.if_dependency: + dependency = Dependency() + self.dependency_analysis_list.append(dependency) + def generate_policy_summary_table(self) -> None: """Generate the policy summary table based on the current policy summary data.""" policy_summary_table = Table(show_header=False, box=None) @@ -300,20 +464,20 @@ def update_policy_engine(self, results: dict) -> None: Dictionary containing policy engine results including components that violate or satisfy policies, and lists of passed and failed policies. """ - components_violates_table = Table(show_header=False, box=None) - components_violates_table.add_column("Assign No.", justify="left") - components_violates_table.add_column("Component", justify="left") - components_violates_table.add_column("Policy", justify="left") + components_violates_table = Table(box=None) + components_violates_table.add_column("Component ID", justify="left") + components_violates_table.add_column("PURL", justify="left") + components_violates_table.add_column("Policy Name", justify="left") for values in results["component_violates_policy"]: components_violates_table.add_row(values[0], values[1], values[2]) self.components_violates_table = components_violates_table - components_satisfy_table = Table(show_header=False, box=None) - components_satisfy_table.add_column("Assign No.", justify="left") - components_satisfy_table.add_column("Component", justify="left") - components_satisfy_table.add_column("Policy", justify="left") + components_satisfy_table = Table(box=None) + components_satisfy_table.add_column("Component ID", justify="left") + components_satisfy_table.add_column("PURL", justify="left") + components_satisfy_table.add_column("Policy Name", justify="left") for values in results["component_satisfies_policy"]: components_satisfy_table.add_row(values[0], values[1], values[2]) @@ -398,7 +562,11 @@ def make_layout(self) -> Group: layout = layout + [error_log_panel] if self.command == "analyze": if self.description_table.row_count > 0: - layout = layout + [Rule(" DESCRIPTION", align="left"), "", self.description_table] + layout = layout + [ + Rule(" DESCRIPTION", align="left"), + "", + self.description_table, + ] if self.progress_table.row_count > 0: layout = layout + ["", self.progress, "", self.progress_table] if self.failed_checks_table.row_count > 0: @@ -425,6 +593,17 @@ def make_layout(self) -> Group: layout = layout + [ self.report_table, ] + if self.if_dependency and self.dependency_analysis_list: + for idx, dependency in enumerate(self.dependency_analysis_list, start=1): + dependency_layout = dependency.make_layout() + layout = ( + layout + + [ + "", + Rule(f" DEPENDENCY {idx}", align="left"), + ] + + dependency_layout + ) elif self.command == "verify-policy": if self.policy_summary_table.row_count > 0: if self.components_satisfy_table.row_count > 0: diff --git a/src/macaron/output_reporter/results.py b/src/macaron/output_reporter/results.py index dddd636a3..2af4fc269 100644 --- a/src/macaron/output_reporter/results.py +++ b/src/macaron/output_reporter/results.py @@ -9,7 +9,6 @@ from typing import Generic, TypedDict, TypeVar from macaron.config.target_config import Configuration -from macaron.console import access_handler from macaron.output_reporter.scm import SCMStatus from macaron.slsa_analyzer.analyze_context import AnalyzeContext from macaron.slsa_analyzer.checks.check_result import CheckResultType @@ -200,7 +199,6 @@ def __init__(self, root_record: Record) -> None: self.record_mapping: dict[str, Record] = {} if root_record.context: self.record_mapping[root_record.record_id] = root_record - self.rich_handler = access_handler.get_handler() def get_records(self) -> Iterable[Record]: """Get the generator for all records in the report. @@ -299,7 +297,6 @@ def __str__(self) -> str: """Return the string representation of the Report instance.""" ctx_list = list(self.get_ctxs()) main_ctx: AnalyzeContext = ctx_list.pop(0) - self.rich_handler = access_handler.get_handler() output = "".join( [ @@ -309,7 +306,6 @@ def __str__(self) -> str: "\nSLSA REQUIREMENT RESULTS:\n", ] ) - self.rich_handler.update_checks_summary(main_ctx.get_check_summary(), len(main_ctx.check_results)) slsa_req_mesg: dict[SLSALevels, list[str]] = {level: [] for level in SLSALevels if level != SLSALevels.LEVEL0} for req_name, req_status in main_ctx.ctx_data.items(): diff --git a/src/macaron/slsa_analyzer/analyzer.py b/src/macaron/slsa_analyzer/analyzer.py index c2a0fbc5a..017860df8 100644 --- a/src/macaron/slsa_analyzer/analyzer.py +++ b/src/macaron/slsa_analyzer/analyzer.py @@ -240,6 +240,10 @@ def run( # Create a report instance with the record of the main repo. report = Report(main_record) + self.rich_handler.update_checks_summary( + main_record.context.get_check_summary(), + len(main_record.context.check_results), + ) duplicated_scm_records: list[Record] = [] @@ -257,8 +261,14 @@ def run( report.add_dep_record(dep_record) duplicated_scm_records.append(dep_record) continue + self.rich_handler.is_dependency(True) dep_record = self.run_single(config, analysis, report.record_mapping) report.add_dep_record(dep_record) + if dep_record.context: + self.rich_handler.update_checks_summary( + dep_record.context.get_check_summary(), + len(dep_record.context.check_results), + ) else: logger.info("Found no dependencies to analyze.") diff --git a/src/macaron/slsa_analyzer/registry.py b/src/macaron/slsa_analyzer/registry.py index 564b761e7..55dd7f7a3 100644 --- a/src/macaron/slsa_analyzer/registry.py +++ b/src/macaron/slsa_analyzer/registry.py @@ -519,6 +519,8 @@ def scan(self, target: AnalyzeContext) -> dict[str, CheckResult]: logger.info("Check %s has failed.", check_id) return results + self.rich_handler.remove_progress_bar() + return results def prepare(self) -> bool: