diff --git a/tests/testutils.py b/tests/testutils.py index 65c9887d..9788ca5c 100644 --- a/tests/testutils.py +++ b/tests/testutils.py @@ -30,6 +30,8 @@ from trestle.oscal import component as comp from trestle.oscal import profile as prof +from trestlebot.const import COMPDEF_KEY_NAME, LEVERAGED_SSP_KEY_NAME, PROFILE_KEY_NAME + JSON_TEST_DATA_PATH = pathlib.Path("tests/data/json/").resolve() @@ -161,12 +163,19 @@ def setup_for_compdef( def write_index_json( - file_path: str, ssp_name: str, profile: str, component_definitions: List[str] + file_path: str, + ssp_name: str, + profile: str, + component_definitions: List[str], + leveraged_ssp: str = "", ) -> None: """Write out ssp index JSON for tests""" data = { - ssp_name: {"profile": profile, "component_definitions": component_definitions} + ssp_name: {PROFILE_KEY_NAME: profile, COMPDEF_KEY_NAME: component_definitions} } + if leveraged_ssp: + data[ssp_name][LEVERAGED_SSP_KEY_NAME] = leveraged_ssp + with open(file_path, "w") as file: json.dump(data, file, indent=4) diff --git a/tests/trestlebot/tasks/authored/test_ssp.py b/tests/trestlebot/tasks/authored/test_ssp.py index 6c543210..7697e714 100644 --- a/tests/trestlebot/tasks/authored/test_ssp.py +++ b/tests/trestlebot/tasks/authored/test_ssp.py @@ -35,6 +35,7 @@ test_comp = "test_comp" test_ssp_output = "test-ssp" markdown_dir = "md_ssp" +leveraged_ssp = "leveraged_ssp" def test_assemble(tmp_trestle_dir: str) -> None: @@ -145,3 +146,71 @@ def test_get_profile_by_ssp(tmp_trestle_dir: str) -> None: ssp_index: SSPIndex = SSPIndex(ssp_index_path) assert ssp_index.get_profile_by_ssp(test_ssp_output) == test_prof + + +def test_get_leveraged_ssp(tmp_trestle_dir: str) -> None: + """Test to get leveraged ssp from index""" + ssp_index_path = os.path.join(tmp_trestle_dir, "ssp-index.json") + testutils.write_index_json( + ssp_index_path, test_ssp_output, test_prof, [test_comp], leveraged_ssp + ) + ssp_index: SSPIndex = SSPIndex(ssp_index_path) + + assert ssp_index.get_leveraged_by_ssp(test_ssp_output) == leveraged_ssp + + +def test_add_ssp_to_index(tmp_trestle_dir: str) -> None: + """Test adding an ssp to an index""" + ssp_index_path = os.path.join(tmp_trestle_dir, "ssp-index.json") + testutils.write_index_json(ssp_index_path, test_ssp_output, test_prof, [test_comp]) + ssp_index: SSPIndex = SSPIndex(ssp_index_path) + + ssp_index.add_new_ssp("new_ssp", "test_prof", ["my_comp"]) + + assert ssp_index.get_profile_by_ssp("new_ssp") == "test_prof" + assert "my_comp" in ssp_index.get_comps_by_ssp("new_ssp") + assert ssp_index.get_leveraged_by_ssp("new_ssp") is None + + ssp_index.add_new_ssp("another_new_ssp", "test_prof", ["my_comp"], "test_leveraged") + + assert ssp_index.get_profile_by_ssp("another_new_ssp") == "test_prof" + assert "my_comp" in ssp_index.get_comps_by_ssp("another_new_ssp") + assert ssp_index.get_leveraged_by_ssp("another_new_ssp") == "test_leveraged" + + # Test adding to an empty ssp index + + ssp_index_path = os.path.join(tmp_trestle_dir, "another-ssp-index.json") + ssp_index = SSPIndex(ssp_index_path) + + ssp_index.add_new_ssp("another_new_ssp", "test_prof", ["my_comp"], "test_leveraged") + + assert ssp_index.get_profile_by_ssp("another_new_ssp") == "test_prof" + assert "my_comp" in ssp_index.get_comps_by_ssp("another_new_ssp") + assert ssp_index.get_leveraged_by_ssp("another_new_ssp") == "test_leveraged" + + +def test_write_new_ssp_index(tmp_trestle_dir: str) -> None: + """Test writing out a new ssp index""" + ssp_index_path = os.path.join(tmp_trestle_dir, "ssp-index.json") + testutils.write_index_json(ssp_index_path, test_ssp_output, test_prof, [test_comp]) + ssp_index: SSPIndex = SSPIndex(ssp_index_path) + + ssp_index.add_new_ssp("new_ssp", "test_prof", ["my_comp"]) + ssp_index.add_new_ssp("another_new_ssp", "test_prof", ["my_comp"], "test_leveraged") + + ssp_index.write_out() + + # Reread the ssp index from JSON + ssp_index = SSPIndex(ssp_index_path) + + assert ssp_index.get_profile_by_ssp(test_ssp_output) == test_prof + assert test_comp in ssp_index.get_comps_by_ssp(test_ssp_output) + assert ssp_index.get_leveraged_by_ssp(test_ssp_output) is None + + assert ssp_index.get_profile_by_ssp("new_ssp") == "test_prof" + assert "my_comp" in ssp_index.get_comps_by_ssp("new_ssp") + assert ssp_index.get_leveraged_by_ssp("new_ssp") is None + + assert ssp_index.get_profile_by_ssp("another_new_ssp") == "test_prof" + assert "my_comp" in ssp_index.get_comps_by_ssp("another_new_ssp") + assert ssp_index.get_leveraged_by_ssp("another_new_ssp") == "test_leveraged" diff --git a/trestlebot/const.py b/trestlebot/const.py index b471cdd7..cf64c407 100644 --- a/trestlebot/const.py +++ b/trestlebot/const.py @@ -19,3 +19,10 @@ # Common exit codes SUCCESS_EXIT_CODE = 0 ERROR_EXIT_CODE = 1 + + +# SSP Index Fields + +PROFILE_KEY_NAME = "profile" +COMPDEF_KEY_NAME = "component_definitions" +LEVERAGED_SSP_KEY_NAME = "leveraged_ssp" diff --git a/trestlebot/tasks/authored/ssp.py b/trestlebot/tasks/authored/ssp.py index 461e8fe6..fec1a6cc 100644 --- a/trestlebot/tasks/authored/ssp.py +++ b/trestlebot/tasks/authored/ssp.py @@ -18,20 +18,25 @@ import argparse import json +import logging import os import pathlib -from typing import Dict, List +from typing import Any, Dict, List, Optional from trestle.common.err import TrestleError from trestle.core.commands.author.ssp import SSPAssemble, SSPGenerate from trestle.core.commands.common.return_codes import CmdReturnCodes +from trestlebot.const import COMPDEF_KEY_NAME, LEVERAGED_SSP_KEY_NAME, PROFILE_KEY_NAME from trestlebot.tasks.authored.base_authored import ( AuthoredObjectException, AuthorObjectBase, ) +logger = logging.getLogger("trestle") + + class SSPIndex: """ Class for managing the SSP index that stores relationship data by Trestle name @@ -42,27 +47,41 @@ def __init__(self, index_path: str) -> None: """ Initialize ssp index. """ + self._index_path = index_path self.profile_by_ssp: Dict[str, str] = {} self.comps_by_ssp: Dict[str, List[str]] = {} + self.leveraged_ssp_by_ssp: Dict[str, str] = {} - with open(index_path, "r") as file: - json_data = json.load(file) - - for ssp_name, ssp_info in json_data.items(): - try: - profile = ssp_info["profile"] - component_definitions = ssp_info["component_definitions"] - except KeyError: - raise AuthoredObjectException( - f"SSP {ssp_name} entry is missing profile or component data" - ) - - if profile is not None and component_definitions is not None: - self.profile_by_ssp[ssp_name] = profile - self.comps_by_ssp[ssp_name] = component_definitions + # Try to load the current file. If it does not exist, + # create an empty JSON file. + try: + with open(index_path, "r") as file: + json_data = json.load(file) + + for ssp_name, ssp_info in json_data.items(): + try: + profile = ssp_info[PROFILE_KEY_NAME] + component_definitions = ssp_info[COMPDEF_KEY_NAME] + except KeyError: + raise AuthoredObjectException( + f"SSP {ssp_name} entry is missing profile or component data" + ) + + if profile is not None and component_definitions is not None: + self.profile_by_ssp[ssp_name] = profile + self.comps_by_ssp[ssp_name] = component_definitions + + if LEVERAGED_SSP_KEY_NAME in ssp_info: + self.leveraged_ssp_by_ssp[ssp_name] = ssp_info[ + LEVERAGED_SSP_KEY_NAME + ] + + except FileNotFoundError: + with open(index_path, "w") as file: + json.dump({}, file) def get_comps_by_ssp(self, ssp_name: str) -> List[str]: - """Returns list of compdefs associated with the SSP""" + """Return list of compdefs associated with the SSP""" try: return self.comps_by_ssp[ssp_name] except KeyError: @@ -71,7 +90,7 @@ def get_comps_by_ssp(self, ssp_name: str) -> List[str]: ) def get_profile_by_ssp(self, ssp_name: str) -> str: - """Returns the profile associated with the SSP""" + """Return the profile associated with the SSP""" try: return self.profile_by_ssp[ssp_name] except KeyError: @@ -79,9 +98,48 @@ def get_profile_by_ssp(self, ssp_name: str) -> str: f"SSP {ssp_name} does not exists in the index" ) + def get_leveraged_by_ssp(self, ssp_name: str) -> Optional[str]: + """Return the optional leveraged SSP used with the SSP""" + try: + return self.leveraged_ssp_by_ssp[ssp_name] + except KeyError: + logging.debug(f"key {ssp_name} does not exist") + return None + + def add_new_ssp( + self, + ssp_name: str, + profile_name: str, + compdefs: List[str], + leveraged_ssp: Optional[str] = None, + ) -> None: + """Add a new ssp to the index""" + self.profile_by_ssp[ssp_name] = profile_name + self.comps_by_ssp[ssp_name] = compdefs + if leveraged_ssp: + self.leveraged_ssp_by_ssp[ssp_name] = leveraged_ssp + + def write_out(self) -> None: + """Write SSP index back to the index file""" + data: Dict[str, Any] = {} + + for ssp_name, profile_name in self.profile_by_ssp.items(): + ssp_info: Dict[str, Any] = { + PROFILE_KEY_NAME: profile_name, + COMPDEF_KEY_NAME: self.comps_by_ssp[ssp_name], + } + if ssp_name in self.leveraged_ssp_by_ssp: + ssp_info[LEVERAGED_SSP_KEY_NAME] = self.leveraged_ssp_by_ssp[ssp_name] + + data[ssp_name] = ssp_info + + with open(self._index_path, "w") as file: + json.dump(data, file, indent=4) + # TODO: Move away from using private run to a public function. # Done initially because a lot of required high level logic for SSP is private. +# See - https://github.com/IBM/compliance-trestle/pull/1432 class AuthoredSSP(AuthorObjectBase): @@ -134,6 +192,9 @@ def regenerate(self, model_path: str, markdown_path: str) -> None: comps = self.ssp_index.get_comps_by_ssp(ssp) profile = self.ssp_index.get_profile_by_ssp(ssp) + # TODO: Add this to the trestle command once available + _ = self.ssp_index.get_leveraged_by_ssp(ssp) + try: exit_code = ssp_generate._generate_ssp_markdown( trestle_root=trestle_path, @@ -150,3 +211,32 @@ def regenerate(self, model_path: str, markdown_path: str) -> None: ) except TrestleError as e: raise AuthoredObjectException(f"Trestle generate failed for {ssp}: {e}") + + def create_new_default( + self, + ssp_name: str, + profile_name: str, + compdefs: List[str], + markdown_path: str, + leveraged_ssp: Optional[str] = None, + ) -> None: + """ + Create new ssp with index + + Args: + ssp_name: Output name for ssp + profile_name: Profile to import controls from + compdefs: List of component definitions to import + markdown_path: Top-level markdown path to write to + leveraged_ssp: Optional leveraged ssp name for inheritance view editing + + Notes: + This will generate SSP markdown and an index entry for a new managed SSP. + """ + + self.ssp_index.add_new_ssp(ssp_name, profile_name, compdefs, leveraged_ssp) + self.ssp_index.write_out() + + # Pass the ssp_name as the model base path. + # We don't need the model dir for SSP generation. + return self.regenerate(ssp_name, markdown_path)