Skip to content

Commit

Permalink
Merge pull request #334 from iriusrisk/feature/OPT-1022
Browse files Browse the repository at this point in the history
[feature/OPT-1022] to dev
  • Loading branch information
dantolin-iriusrisk committed Oct 24, 2023
2 parents edc1d63 + d5645ce commit df48991
Show file tree
Hide file tree
Showing 13 changed files with 865 additions and 20 deletions.
5 changes: 4 additions & 1 deletion otm/otm/entity/component.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from typing import List

from otm.otm.entity.parent_type import ParentType
from otm.otm.entity.representation import RepresentationElement
from otm.otm.entity.threat import ThreatInstance


Expand All @@ -14,7 +17,7 @@ def __init__(self, component_id, name, component_type=None, parent=None, parent_
self.attributes = attributes
self.tags = tags
self.threats: [ThreatInstance] = threats or []
self.representations = representations
self.representations: List[RepresentationElement] = representations

def add_threat(self, threat: ThreatInstance):
self.threats.append(threat)
Expand Down
17 changes: 10 additions & 7 deletions sl_util/sl_util/iterations_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,16 @@ def remove_duplicates(duplicated_list: List) -> List:


def compare_unordered_list_or_string(a: Union[str, List], b: Union[str, List]) -> bool:
if isinstance(a, str) and isinstance(b, str):
return a == b
elif isinstance(a, str) and isinstance(b, list):
return sorted([a]) == sorted(b)
elif isinstance(a, list) and isinstance(b, list):
return sorted(a) == sorted(b)
else:
try:
if isinstance(a, str) and isinstance(b, str):
return a == b
elif isinstance(a, str) and isinstance(b, list):
return sorted([a]) == sorted(b)
elif isinstance(a, list) and isinstance(b, str):
return sorted(a) == sorted([b])
elif isinstance(a, list) and isinstance(b, list):
return sorted(a) == sorted(b)
except TypeError:
return False


Expand Down
19 changes: 18 additions & 1 deletion sl_util/tests/unit/test_iterations_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from sl_util.sl_util.iterations_utils import remove_from_list, remove_keys
import pytest

from sl_util.sl_util.iterations_utils import remove_from_list, remove_keys, compare_unordered_list_or_string


def remove_function(element, original_array, removed_array):
Expand Down Expand Up @@ -76,3 +78,18 @@ def test_remove_keys(self):

# Then the result is as expected
assert result == {'1': 'One', '3': 'Three'}

@pytest.mark.parametrize('a, b, result', [
pytest.param('a', 'a', True, id="eq string"),
pytest.param('a', 'b', False, id="not eq string"),
pytest.param(['a'], 'a', True, id="eq list and string"),
pytest.param(['a'], 'b', False, id="not eq list and string"),
pytest.param('a', ['a'], True, id="eq string and list"),
pytest.param('a', ['b'], False, id="not eq string and list"),
pytest.param(['a'], ['a'], True, id="eq list"),
pytest.param(['a'], ['b'], False, id="not eq list"),
pytest.param(['b', 'a'], ['a', 'b'], True, id="eq unordered list"),
pytest.param([{'a': 'a'}, {'a2': 'a2'}], [{'b': 'b'}], False, id="edge case List of Dictionaries")
])
def test_compare_unordered_list_or_string(self, a, b, result):
assert compare_unordered_list_or_string(a, b) == result
93 changes: 90 additions & 3 deletions slp_drawio/slp_drawio/load/diagram_component_loader.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,97 @@
import re
from typing import List, Dict

from otm.otm.entity.representation import RepresentationElement
from slp_base import LoadingDiagramFileError
from slp_drawio.slp_drawio.load.drawio_dict_utils import is_multiple_pages, \
get_attributes, get_position, get_size, get_mx_cell_components
from slp_drawio.slp_drawio.objects.diagram_objects import DiagramComponent

__CALCULATE_SHAPE_TYPE_EQUIVALENCES = resource_types_equivalences = {
'aws.group': 'grIcon',
'aws.groupCenter': 'grIcon',
'aws.resourceIcon': 'resIcon',
'aws.productIcon': 'prIcon'
}


def __remove_mxgraph_aws(text):
return re.sub(r"aws\d.", "aws.", text)


def __remove_mxgraph(text):
return re.sub(r"mxgraph.", "", text)


def __normalize_shape_type(text):
for normalize_function in [__remove_mxgraph, __remove_mxgraph_aws]:
text = normalize_function(text)

return text


def _calculate_shape_type(mx_cell: Dict, attr: str = 'shape'):
shape = get_attributes(mx_cell).get(attr)
if not shape:
return
shape_type = __normalize_shape_type(shape)
if shape_type in __CALCULATE_SHAPE_TYPE_EQUIVALENCES:
shape_type = _calculate_shape_type(mx_cell, __CALCULATE_SHAPE_TYPE_EQUIVALENCES.get(shape_type))

return shape_type


def _get_shape_parent_id(mx_cell: Dict, mx_cell_components: List[Dict]):
return mx_cell.get('parent') \
if any(item.get('id') == mx_cell.get('parent') for item in mx_cell_components) else None


def _get_shape_name(mx_cell: Dict):
name = mx_cell.get('value')
if not name:
name = _calculate_shape_type(mx_cell) or ''
if '.' in name:
name = name.split('.')[-1]
name = name.replace('_', ' ')
if not name:
name = 'N/A'
if len(name) == 1:
name = f'_{name}'
return name


class DiagramComponentLoader:

def __init__(self, source: dict):
self.source: dict = source
def __init__(self, project_id: str, source: dict):
self._project_id = project_id
self._source: dict = source

def load(self) -> [DiagramComponent]:
return []
if is_multiple_pages(self._source):
raise LoadingDiagramFileError(
'Diagram file is not valid', 'Diagram File is not compatible',
'DrawIO processor does not accept diagrams with multiple pages')

result: List[DiagramComponent] = []

mx_cell_components = get_mx_cell_components(self._source)
for mx_cell in mx_cell_components:
result.append(DiagramComponent(
id=mx_cell.get('id'),
name=_get_shape_name(mx_cell),
shape_type=_calculate_shape_type(mx_cell),
shape_parent_id=_get_shape_parent_id(mx_cell, mx_cell_components),
representations=[self._get_representation_element(mx_cell)]
))

return result

def _get_representation_element(self, mx_cell: Dict) -> RepresentationElement:
return RepresentationElement(
id_=f"{mx_cell.get('id')}-diagram",
name=f"{mx_cell.get('id')} Representation",
representation=f"{self._project_id}-diagram",
position=get_position(mx_cell),
size=get_size(mx_cell),
attributes={'style': mx_cell.get('style')}
)
78 changes: 78 additions & 0 deletions slp_drawio/slp_drawio/load/drawio_dict_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
from typing import Dict, List, Any


def __get_as_mx_point_in_mx_geometry(attr: str, mx_geometry: Dict):
if 'mxPoint' not in mx_geometry:
return
mx_points = [mx_geometry['mxPoint']] if isinstance(mx_geometry['mxPoint'], dict) else mx_geometry['mxPoint']

for mx_point in mx_points:
if mx_point.get('as') == attr:
return mx_point


def __is_mx_cell_dataflow(mx_cell):
if 'mxGeometry' not in mx_cell:
return False

source = mx_cell.get('source') or __get_as_mx_point_in_mx_geometry('sourcePoint', mx_cell['mxGeometry'])
target = mx_cell.get('target') or __get_as_mx_point_in_mx_geometry('targetPoint', mx_cell['mxGeometry'])

return source and target


def __is_mx_cell_component(mx_cell: Dict):
if 'mxGeometry' not in mx_cell:
return False
return not __is_mx_cell_dataflow(mx_cell)


def __get_mx_cell_from_source(source):
return source.get("mxfile", {}).get("diagram", {}).get("mxGraphModel", {}).get("root", {}).get("mxCell", [])


def get_mx_cell_components(source) -> List[Dict]:
return list(filter(lambda c: __is_mx_cell_component(c), __get_mx_cell_from_source(source)))


def get_mxcell_dataflows(source) -> List[Dict]:
return list(filter(lambda c: __is_mx_cell_dataflow(c), __get_mx_cell_from_source(source)))


def is_multiple_pages(source):
diagram_pages = source.get("mxfile", {}).get("diagram", None)
return len(diagram_pages) > 1 if isinstance(diagram_pages, list) else False


def get_attributes(mx_cell: Dict) -> Dict[str, Any]:
attributes: Dict[str, Any] = {}
styles = mx_cell.get('style', '').split(';')
for style in styles:
if not style:
continue
key, value = style, None
if '=' in style:
key, value = style.split('=', 1)
attributes[key] = value

return attributes


def __str_to_int(value: str):
return int(round(float(value), 0))


def get_position(mx_cell: Dict) -> Dict[str, float]:
mx_geometry = mx_cell.get('mxGeometry', {})
return {
'x': __str_to_int(mx_geometry.get('x', 0)),
'y': __str_to_int(mx_geometry.get('y', 0)),
}


def get_size(mx_cell: Dict) -> Dict[str, float]:
mx_geometry = mx_cell.get('mxGeometry', {})
return {
'height': __str_to_int(mx_geometry.get('height')),
'width': __str_to_int(mx_geometry.get('width')),
}
8 changes: 5 additions & 3 deletions slp_drawio/slp_drawio/load/drawio_loader.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging

from slp_base import LoadingSourceFileError
from slp_base import LoadingDiagramFileError
from slp_base.slp_base.provider_loader import ProviderLoader
from slp_drawio.slp_drawio.load.diagram_component_loader import DiagramComponentLoader
from slp_drawio.slp_drawio.load.diagram_dataflow_loader import DiagramDataflowLoader
Expand All @@ -27,15 +27,17 @@ def load(self):
source_dict = DrawIOToDict(self.source).to_dict()

representation: DiagramRepresentation = DiagramRepresentationLoader(self.project_id, source_dict).load()
components: [DiagramComponent] = DiagramComponentLoader(source_dict).load()
components: [DiagramComponent] = DiagramComponentLoader(self.project_id, source_dict).load()
dataflows: [DiagramDataflow] = DiagramDataflowLoader(source_dict).load()

self.diagram: Diagram = Diagram(representation, components, dataflows)
except LoadingDiagramFileError as e:
raise e
except Exception as e:
logger.error(f'{e}')
detail = e.__class__.__name__
message = e.__str__()
raise LoadingSourceFileError('Source file cannot be loaded', detail, message)
raise LoadingDiagramFileError('Source file cannot be loaded', detail, message)

def get_diagram(self) -> Diagram:
return self.diagram
13 changes: 8 additions & 5 deletions slp_drawio/slp_drawio/objects/diagram_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,18 @@
from otm.otm.entity.dataflow import Dataflow
from otm.otm.entity.representation import RepresentationType, RepresentationElement
from otm.otm.entity.trustzone import Trustzone
from sl_util.sl_util.lang_utils import auto_repr


@auto_repr
class DiagramTrustZone:
def __init__(self, type_: str, id_: str = None, name: str = None, default: bool = False, shape_parent_id=None):
self.otm: Trustzone = Trustzone(trustzone_id=id_, name=name or '', type=type_)
self.default = default
self.shape_parent_id = shape_parent_id


@auto_repr
class DiagramComponent:

def __init__(self,
Expand All @@ -25,23 +28,22 @@ def __init__(self,
):
self.otm: Component = Component(component_id=id,
name=name or '',
representations=representations,
representations=representations
)
self.shape_type = shape_type
self.shape_parent_id = shape_parent_id

def __str__(self) -> str:
return '{otm: ' + str(self.otm) + '}'

def __repr__(self) -> str:
return '{otm: ' + str(self.otm) + '}'
return f'{{otm: {str(self.otm)}, shape_type: {self.shape_type}, shape_parent_id: {self.shape_parent_id}}}'


@auto_repr
class DiagramDataflow:
def __init__(self, id: str):
self.otm = Dataflow(dataflow_id=id, name='', source_node=None, destination_node=None)


@auto_repr
class DiagramRepresentation:
def __init__(self, project_id: str, size: dict):
self.otm = representation.DiagramRepresentation(
Expand All @@ -52,6 +54,7 @@ def __init__(self, project_id: str, size: dict):
)


@auto_repr
class Diagram:
def __init__(self,
representation: [DiagramRepresentation] = None,
Expand Down
Loading

0 comments on commit df48991

Please sign in to comment.