diff --git a/pycti/api/opencti_api_client.py b/pycti/api/opencti_api_client.py index 1e6b4784b..5729f6d0e 100644 --- a/pycti/api/opencti_api_client.py +++ b/pycti/api/opencti_api_client.py @@ -52,6 +52,7 @@ from pycti.entities.opencti_opinion import Opinion from pycti.entities.opencti_report import Report from pycti.entities.opencti_role import Role +from pycti.entities.opencti_security_coverage import SecurityCoverage from pycti.entities.opencti_settings import Settings from pycti.entities.opencti_stix import Stix from pycti.entities.opencti_stix_core_object import StixCoreObject @@ -223,6 +224,7 @@ def __init__( self.narrative = Narrative(self) self.language = Language(self) self.vulnerability = Vulnerability(self) + self.security_coverage = SecurityCoverage(self) self.attack_pattern = AttackPattern(self) self.course_of_action = CourseOfAction(self) self.data_component = DataComponent(self) diff --git a/pycti/connector/opencti_connector.py b/pycti/connector/opencti_connector.py index 1601dcb22..e25b4eebf 100644 --- a/pycti/connector/opencti_connector.py +++ b/pycti/connector/opencti_connector.py @@ -43,6 +43,8 @@ def __init__( auto: bool, only_contextual: bool, playbook_compatible: bool, + auto_update: bool, + enrichment_resolution: str, listen_callback_uri=None, ): self.id = connector_id @@ -55,6 +57,8 @@ def __init__( else: self.scope = [] self.auto = auto + self.auto_update = auto_update + self.enrichment_resolution = enrichment_resolution self.only_contextual = only_contextual self.playbook_compatible = playbook_compatible self.listen_callback_uri = listen_callback_uri @@ -72,6 +76,8 @@ def to_input(self) -> dict: "type": self.type.name, "scope": self.scope, "auto": self.auto, + "auto_update": self.auto_update, + "enrichment_resolution": self.enrichment_resolution, "only_contextual": self.only_contextual, "playbook_compatible": self.playbook_compatible, "listen_callback_uri": self.listen_callback_uri, diff --git a/pycti/connector/opencti_connector_helper.py b/pycti/connector/opencti_connector_helper.py index 6d843729f..9b6808104 100644 --- a/pycti/connector/opencti_connector_helper.py +++ b/pycti/connector/opencti_connector_helper.py @@ -365,6 +365,16 @@ def _data_handler(self, json_data) -> None: event_data = json_data["event"] entity_id = event_data.get("entity_id") entity_type = event_data.get("entity_type") + stix_entity = ( + json.loads(event_data.get("stix_entity")) + if event_data.get("stix_entity") + else None + ) + stix_objects = ( + json.loads(event_data.get("stix_objects")) + if event_data.get("stix_objects") + else None + ) validation_mode = event_data.get("validation_mode", "workbench") force_validation = event_data.get("force_validation", False) # Set the API headers @@ -430,17 +440,18 @@ def _data_handler(self, json_data) -> None: else: # If not playbook but enrichment, compute object on enrichment_entity opencti_entity = event_data["enrichment_entity"] - stix_objects = self.helper.api.stix2.prepare_export( - entity=self.helper.api.stix2.generate_export( - copy.copy(opencti_entity) + if stix_objects is None: + stix_objects = self.helper.api.stix2.prepare_export( + entity=self.helper.api.stix2.generate_export( + copy.copy(opencti_entity) + ) ) - ) - stix_entity = [ - e - for e in stix_objects - if e["id"] == opencti_entity["standard_id"] - or e["id"] == "x-opencti-" + opencti_entity["standard_id"] - ][0] + stix_entity = [ + e + for e in stix_objects + if e["id"] == opencti_entity["standard_id"] + or e["id"] == "x-opencti-" + opencti_entity["standard_id"] + ][0] event_data["stix_objects"] = stix_objects event_data["stix_entity"] = stix_entity # Handle organization propagation @@ -1116,6 +1127,15 @@ def __init__(self, config: Dict, playbook_compatible=False) -> None: self.connect_auto = get_config_variable( "CONNECTOR_AUTO", ["connector", "auto"], config, default=False ) + self.connect_auto_update = get_config_variable( + "CONNECTOR_AUTO_UPDATE", ["connector", "auto_update"], config, default=False + ) + self.connect_enrichment_resolution = get_config_variable( + "CONNECTOR_ENRICHMENT_RESOLUTION", + ["connector", "enrichment_resolution"], + config, + default="none", + ) self.bundle_send_to_queue = get_config_variable( "CONNECTOR_SEND_TO_QUEUE", ["connector", "send_to_queue"], @@ -1231,14 +1251,16 @@ def __init__(self, config: Dict, playbook_compatible=False) -> None: ) # Register the connector in OpenCTI self.connector = OpenCTIConnector( - self.connect_id, - self.connect_name, - self.connect_type, - self.connect_scope, - self.connect_auto, - self.connect_only_contextual, - playbook_compatible, - ( + connector_id=self.connect_id, + connector_name=self.connect_name, + connector_type=self.connect_type, + scope=self.connect_scope, + auto=self.connect_auto, + only_contextual=self.connect_only_contextual, + playbook_compatible=playbook_compatible, + auto_update=self.connect_auto_update, + enrichment_resolution=self.connect_enrichment_resolution, + listen_callback_uri=( self.listen_protocol_api_uri + self.listen_protocol_api_path if self.listen_protocol == "API" else None diff --git a/pycti/entities/opencti_security_coverage.py b/pycti/entities/opencti_security_coverage.py new file mode 100644 index 000000000..e595a9c3b --- /dev/null +++ b/pycti/entities/opencti_security_coverage.py @@ -0,0 +1,336 @@ +# coding: utf-8 + +import json +import uuid + +from stix2.canonicalization.Canonicalize import canonicalize + + +class SecurityCoverage: + def __init__(self, opencti): + self.opencti = opencti + self.properties = """ + id + standard_id + entity_type + parent_types + spec_version + created_at + updated_at + external_uri + objectCovered { + __typename + ... on StixCoreObject { + id + } + } + objectMarking { + id + standard_id + entity_type + definition_type + definition + created + modified + x_opencti_order + x_opencti_color + } + """ + + @staticmethod + def generate_id(covered_ref): + data = {"covered_ref": covered_ref.lower().strip()} + data = canonicalize(data, utf8=False) + id = str(uuid.uuid5(uuid.UUID("00abedb4-aa42-466c-9c01-fed23315a9b7"), data)) + return "security-coverage--" + id + + @staticmethod + def generate_id_from_data(data): + return SecurityCoverage.generate_id(data["covered_ref"]) + + """ + List securityCoverage objects + + :param filters: the filters to apply + :param search: the search keyword + :param first: return the first n rows from the after ID (or the beginning if not set) + :param after: ID of the first row for pagination + :return List of SecurityCoverage objects + """ + + def list(self, **kwargs): + filters = kwargs.get("filters", None) + search = kwargs.get("search", None) + first = kwargs.get("first", 100) + after = kwargs.get("after", None) + order_by = kwargs.get("orderBy", None) + order_mode = kwargs.get("orderMode", None) + custom_attributes = kwargs.get("customAttributes", None) + get_all = kwargs.get("getAll", False) + with_pagination = kwargs.get("withPagination", False) + + self.opencti.app_logger.info( + "Listing SecurityCoverage with filters", {"filters": json.dumps(filters)} + ) + query = ( + """ + query SecurityCoverage($filters: FilterGroup, $search: String, $first: Int, $after: ID, $orderBy: SecurityCoverageOrdering, $orderMode: OrderingMode) { + securityCoverages(filters: $filters, search: $search, first: $first, after: $after, orderBy: $orderBy, orderMode: $orderMode) { + edges { + node { + """ + + (custom_attributes if custom_attributes is not None else self.properties) + + """ + } + } + pageInfo { + startCursor + endCursor + hasNextPage + hasPreviousPage + globalCount + } + } + } + """ + ) + result = self.opencti.query( + query, + { + "filters": filters, + "search": search, + "first": first, + "after": after, + "orderBy": order_by, + "orderMode": order_mode, + }, + ) + + if get_all: + final_data = [] + data = self.opencti.process_multiple(result["data"]["securityCoverages"]) + final_data = final_data + data + while result["data"]["securityCoverages"]["pageInfo"]["hasNextPage"]: + after = result["data"]["securityCoverages"]["pageInfo"]["endCursor"] + self.opencti.app_logger.info( + "Listing SecurityCoverage", {"after": after} + ) + result = self.opencti.query( + query, + { + "filters": filters, + "search": search, + "first": first, + "after": after, + "orderBy": order_by, + "orderMode": order_mode, + }, + ) + data = self.opencti.process_multiple( + result["data"]["securityCoverages"] + ) + final_data = final_data + data + return final_data + else: + return self.opencti.process_multiple( + result["data"]["securityCoverages"], with_pagination + ) + + """ + Read a SecurityCoverage object + + :param id: the id of the SecurityCoverage + :param filters: the filters to apply if no id provided + :return SecurityCoverage object + """ + + def read(self, **kwargs): + id = kwargs.get("id", None) + filters = kwargs.get("filters", None) + custom_attributes = kwargs.get("customAttributes", None) + if id is not None: + self.opencti.app_logger.info("Reading SecurityCoverage", {"id": id}) + query = ( + """ + query SecurityCoverage($id: String!) { + securityCoverage(id: $id) { + """ + + ( + custom_attributes + if custom_attributes is not None + else self.properties + ) + + """ + } + } + """ + ) + result = self.opencti.query(query, {"id": id}) + return self.opencti.process_multiple_fields( + result["data"]["securityCoverage"] + ) + elif filters is not None: + result = self.list(filters=filters) + if len(result) > 0: + return result[0] + else: + return None + else: + self.opencti.app_logger.error( + "[opencti_security_coverage] Missing parameters: id or filters" + ) + return None + + """ + Create a Security coverage object + + :return Security Coverage object + """ + + def create(self, **kwargs): + stix_id = kwargs.get("stix_id", None) + name = kwargs.get("name", None) + description = kwargs.get("description", None) + created_by = kwargs.get("createdBy", None) + object_marking = kwargs.get("objectMarking", None) + object_label = kwargs.get("objectLabel", None) + object_covered = kwargs.get("objectCovered", None) + external_references = kwargs.get("externalReferences", None) + external_uri = kwargs.get("external_uri", None) + coverage_last_result = kwargs.get("coverage_last_result", None) + coverage_valid_from = kwargs.get("coverage_valid_from", None) + coverage_valid_to = kwargs.get("coverage_valid_to", None) + coverage_information = kwargs.get("coverage_information", None) + auto_enrichment_disable = kwargs.get("auto_enrichment_disable", None) + + if name is not None and object_covered is not None: + self.opencti.app_logger.info("Creating Security Coverage", {"name": name}) + query = """ + mutation SecurityCoverageAdd($input: SecurityCoverageAddInput!) { + securityCoverageAdd(input: $input) { + id + standard_id + entity_type + parent_types + } + } + """ + result = self.opencti.query( + query, + { + "input": { + "stix_id": stix_id, + "name": name, + "description": description, + "createdBy": created_by, + "objectMarking": object_marking, + "objectLabel": object_label, + "objectCovered": object_covered, + "external_uri": external_uri, + "externalReferences": external_references, + "coverage_last_result": coverage_last_result, + "coverage_valid_from": coverage_valid_from, + "coverage_valid_to": coverage_valid_to, + "coverage_information": coverage_information, + "auto_enrichment_disable": auto_enrichment_disable, + } + }, + ) + return self.opencti.process_multiple_fields( + result["data"]["securityCoverageAdd"] + ) + else: + self.opencti.app_logger.error( + "[opencti_security_coverage] " + "Missing parameters: name or object_covered" + ) + + """ + Import a Security coverage from a STIX2 object + + :param stixObject: the Stix-Object Security coverage + :return Security coverage object + """ + + def import_from_stix2(self, **kwargs): + stix_object = kwargs.get("stixObject", None) + extras = kwargs.get("extras", {}) + if stix_object is not None: + # Search in extensions + if "x_opencti_stix_ids" not in stix_object: + stix_object["x_opencti_stix_ids"] = ( + self.opencti.get_attribute_in_extension("stix_ids", stix_object) + ) + if "x_opencti_granted_refs" not in stix_object: + stix_object["x_opencti_granted_refs"] = ( + self.opencti.get_attribute_in_extension("granted_refs", stix_object) + ) + + raw_coverages = stix_object["coverage"] if "coverage" in stix_object else [] + coverage_information = list( + map( + lambda cov: { + "coverage_name": cov["name"], + "coverage_score": cov["score"], + }, + raw_coverages, + ) + ) + + return self.create( + stix_id=stix_object["id"], + name=stix_object["name"], + external_uri=( + stix_object["external_uri"] + if "external_uri" in stix_object + else None + ), + auto_enrichment_disable=( + stix_object["auto_enrichment_disable"] + if "auto_enrichment_disable" in stix_object + else False + ), + coverage_last_result=( + stix_object["last_result"] if "last_result" in stix_object else None + ), + coverage_valid_from=( + stix_object["valid_from"] if "valid_from" in stix_object else None + ), + coverage_valid_to=( + stix_object["valid_to"] if "valid_to" in stix_object else None + ), + coverage_information=coverage_information, + description=( + self.opencti.stix2.convert_markdown(stix_object["description"]) + if "description" in stix_object + else None + ), + createdBy=( + extras["created_by_id"] if "created_by_id" in extras else None + ), + objectMarking=( + extras["object_marking_ids"] + if "object_marking_ids" in extras + else None + ), + objectLabel=( + extras["object_label_ids"] if "object_label_ids" in extras else None + ), + objectCovered=( + stix_object["covered_ref"] if "covered_ref" in stix_object else None + ), + externalReferences=( + extras["external_references_ids"] + if "external_references_ids" in extras + else None + ), + x_opencti_stix_ids=( + stix_object["x_opencti_stix_ids"] + if "x_opencti_stix_ids" in stix_object + else None + ), + ) + else: + self.opencti.app_logger.error( + "[opencti_security_coverage] Missing parameters: stixObject" + ) diff --git a/pycti/entities/opencti_settings.py b/pycti/entities/opencti_settings.py index e0d950fda..b2aadf126 100644 --- a/pycti/entities/opencti_settings.py +++ b/pycti/entities/opencti_settings.py @@ -60,11 +60,9 @@ def __init__(self, opencti): platform_theme_light_logo_login platform_map_tile_server_dark platform_map_tile_server_light - platform_openbas_url - platform_openbas_disable_display - platform_openerm_url - platform_openmtd_url platform_ai_enabled + platform_openaev_url + platform_opengrc_url platform_ai_type platform_ai_model platform_ai_has_token diff --git a/pycti/entities/opencti_stix_core_relationship.py b/pycti/entities/opencti_stix_core_relationship.py index dcd926733..c76837365 100644 --- a/pycti/entities/opencti_stix_core_relationship.py +++ b/pycti/entities/opencti_stix_core_relationship.py @@ -624,6 +624,7 @@ def create(self, **kwargs): granted_refs = kwargs.get("objectOrganization", None) x_opencti_workflow_id = kwargs.get("x_opencti_workflow_id", None) x_opencti_stix_ids = kwargs.get("x_opencti_stix_ids", None) + coverage_information = kwargs.get("coverage_information", None) update = kwargs.get("update", False) self.opencti.app_logger.info( @@ -668,6 +669,7 @@ def create(self, **kwargs): "killChainPhases": kill_chain_phases, "x_opencti_workflow_id": x_opencti_workflow_id, "x_opencti_stix_ids": x_opencti_stix_ids, + "coverage_information": coverage_information, "update": update, } }, @@ -1175,6 +1177,19 @@ def import_from_stix2(self, **kwargs): ) ) + raw_coverages = ( + stix_relation["coverage"] if "coverage" in stix_relation else [] + ) + coverage_information = list( + map( + lambda cov: { + "coverage_name": cov["name"], + "coverage_score": cov["score"], + }, + raw_coverages, + ) + ) + source_ref = stix_relation["source_ref"] target_ref = stix_relation["target_ref"] return self.create( @@ -1197,6 +1212,7 @@ def import_from_stix2(self, **kwargs): if "stop_time" in stix_relation else default_date ), + coverage_information=coverage_information, revoked=( stix_relation["revoked"] if "revoked" in stix_relation else None ), diff --git a/pycti/utils/opencti_stix2.py b/pycti/utils/opencti_stix2.py index e25ab57aa..79c927e7d 100644 --- a/pycti/utils/opencti_stix2.py +++ b/pycti/utils/opencti_stix2.py @@ -885,6 +885,7 @@ def get_readers(self): "Tool": self.opencti.tool.read, "Vocabulary": self.opencti.vocabulary.read, "Vulnerability": self.opencti.vulnerability.read, + "Security-Coverage": self.opencti.security_coverage.read, } def get_reader(self, entity_type: str): @@ -961,6 +962,7 @@ def get_stix_helper(self): "narrative": self.opencti.narrative, "task": self.opencti.task, "x-opencti-task": self.opencti.task, + "security-coverage": self.opencti.security_coverage, "vocabulary": self.opencti.vocabulary, # relationships "relationship": self.opencti.stix_core_relationship, diff --git a/pycti/utils/opencti_stix2_utils.py b/pycti/utils/opencti_stix2_utils.py index 8b8d8100e..1f7d7a876 100644 --- a/pycti/utils/opencti_stix2_utils.py +++ b/pycti/utils/opencti_stix2_utils.py @@ -64,6 +64,7 @@ "threat-actor", "tool", "vulnerability", + "security-coverage", ] SUPPORTED_STIX_ENTITY_OBJECTS = STIX_META_OBJECTS + STIX_CORE_OBJECTS