Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ htmlcov/
docs/build/
.hypothesis/

# customized config files
# Customized config files
sdk/test/test_config.ini
# Schema files needed for testing
sdk/test/adapter/schemas
Expand All @@ -31,5 +31,6 @@ sdk/basyx/version.py
compliance_tool/aas_compliance_tool/version.py
server/app/version.py

# ignore the content of the server storage
# Ignore the content of the server storage
server/input/
server/storage/
42 changes: 42 additions & 0 deletions sdk/basyx/aas/adapter/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,45 @@
Python SDK objects to/from XML.
* :ref:`aasx <adapter.aasx>`: This package offers functions for reading and writing AASX-files.
"""

from basyx.aas.adapter.aasx import AASXReader, DictSupplementaryFileContainer
from basyx.aas.adapter.json import read_aas_json_file_into
from basyx.aas.adapter.xml import read_aas_xml_file_into
from basyx.aas.model.provider import DictObjectStore
from pathlib import Path
from typing import Union


def load_directory(directory: Union[Path, str]) -> tuple[DictObjectStore, DictSupplementaryFileContainer]:
"""
Create a new :class:`~basyx.aas.model.provider.DictObjectStore` and use it to load Asset Administration Shell and
Submodel files in ``AASX``, ``JSON`` and ``XML`` format from a given directory into memory. Additionally, load all
embedded supplementary files into a new :class:`~basyx.aas.adapter.aasx.DictSupplementaryFileContainer`.

:param directory: :class:`~pathlib.Path` or ``str`` pointing to the directory containing all Asset Administration
Shell and Submodel files to load
:return: Tuple consisting of a :class:`~basyx.aas.model.provider.DictObjectStore` and a
:class:`~basyx.aas.adapter.aasx.DictSupplementaryFileContainer` containing all loaded data
"""

dict_object_store: DictObjectStore = DictObjectStore()
file_container: DictSupplementaryFileContainer = DictSupplementaryFileContainer()

directory = Path(directory)

for file in directory.iterdir():
if not file.is_file():
continue

suffix = file.suffix.lower()
if suffix == ".json":
with open(file) as f:
read_aas_json_file_into(dict_object_store, f)
elif suffix == ".xml":
with open(file) as f:
read_aas_xml_file_into(dict_object_store, f)
elif suffix == ".aasx":
with AASXReader(file) as reader:
reader.read_into(object_store=dict_object_store, file_store=file_container)

return dict_object_store, file_container
3 changes: 3 additions & 0 deletions sdk/basyx/aas/adapter/aasx.py
Original file line number Diff line number Diff line change
Expand Up @@ -871,3 +871,6 @@ def __contains__(self, item: object) -> bool:

def __iter__(self) -> Iterator[str]:
return iter(self._name_map)

def __len__(self) -> int:
return len(self._name_map)
2 changes: 1 addition & 1 deletion sdk/basyx/aas/adapter/json/json_serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ def _abstract_classes_to_json(cls, obj: object) -> Dict[str, object]:
if obj.description:
data['description'] = obj.description
try:
ref_type = next(iter(t for t in inspect.getmro(type(obj)) if t in model.KEY_TYPES_CLASSES))
ref_type = model.resolve_referable_class_in_key_types(obj)
except StopIteration as e:
raise TypeError("Object of type {} is Referable but does not inherit from a known AAS type"
.format(obj.__class__.__name__)) from e
Expand Down
14 changes: 14 additions & 0 deletions sdk/basyx/aas/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,17 @@
RelationshipElement: KeyTypes.RELATIONSHIP_ELEMENT,
SubmodelElement: KeyTypes.SUBMODEL_ELEMENT, # type: ignore
}


def resolve_referable_class_in_key_types(referable: Referable) -> type:
"""
Returns the type of referable if the type is given in `KEY_TYPES_CLASSES`, otherwise return the first parent class
in inheritance chain of the referable which is given in `KEY_TYPES_CLASSES`.

:raises TypeError: If the type of the referable or any of its parent classes is not given in `KEY_TYPES_CLASSES`.
"""
try:
ref_type = next(iter(t for t in inspect.getmro(type(referable)) if t in KEY_TYPES_CLASSES))
except StopIteration:
raise TypeError(f"Could not find a matching class in KEY_TYPES_CLASSES for {type(referable)}")
return ref_type
180 changes: 141 additions & 39 deletions sdk/basyx/aas/model/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
VersionType = str
ValueTypeIEC61360 = str

MAX_RECURSION_DEPTH = 32*2 # see https://github.com/admin-shell-io/aas-specs-metamodel/issues/333


@unique
class KeyTypes(Enum):
Expand Down Expand Up @@ -453,25 +455,31 @@ def from_referable(referable: "Referable") -> "Key":
"""
# Get the `type` by finding the first class from the base classes list (via inspect.getmro), that is contained
# in KEY_ELEMENTS_CLASSES
from . import KEY_TYPES_CLASSES, SubmodelElementList
try:
key_type = next(iter(KEY_TYPES_CLASSES[t]
for t in inspect.getmro(type(referable))
if t in KEY_TYPES_CLASSES))
except StopIteration:
key_type = KeyTypes.PROPERTY
key_type = Key._get_key_type_for_referable(referable)
key_value = Key._get_key_value_for_referable(referable)
return Key(key_type, key_value)

@staticmethod
def _get_key_type_for_referable(referable: "Referable") -> KeyTypes:
from . import KEY_TYPES_CLASSES, resolve_referable_class_in_key_types
ref_type = resolve_referable_class_in_key_types(referable)
key_type = KEY_TYPES_CLASSES[ref_type]
return key_type

@staticmethod
def _get_key_value_for_referable(referable: "Referable") -> str:
from . import SubmodelElementList
if isinstance(referable, Identifiable):
return Key(key_type, referable.id)
return referable.id
elif isinstance(referable.parent, SubmodelElementList):
try:
return Key(key_type, str(referable.parent.value.index(referable))) # type: ignore
return str(referable.parent.value.index(referable)) # type: ignore
except ValueError as e:
raise ValueError(f"Object {referable!r} is not contained within its parent {referable.parent!r}") from e
else:
if referable.id_short is None:
raise ValueError(f"Can't create Key for {referable!r} without an id_short!")
return Key(key_type, referable.id_short)
raise ValueError(f"Can't create Key value for {referable!r} without an id_short!")
return referable.id_short


_NSO = TypeVar('_NSO', bound=Union["Referable", "Qualifier", "HasSemantics", "Extension"])
Expand Down Expand Up @@ -614,26 +622,75 @@ def __init__(self):
self.parent: Optional[UniqueIdShortNamespace] = None

def __repr__(self) -> str:
reversed_path = []
root = self.get_identifiable_root()
try:
id_short_path = self.get_id_short_path()
except (ValueError, AttributeError):
id_short_path = self.id_short if self.id_short is not None else ""
item_cls_name = self.__class__.__name__

if root is None:
item_path = f"[{id_short_path}]" if id_short_path else ""
else:
item_path = f"[{root.id} / {id_short_path}]" if id_short_path else f"[{root.id}]"

return f"{item_cls_name}{item_path}"

def get_identifiable_root(self) -> Optional["Identifiable"]:
"""
Get the root :class:`~.Identifiable` of this referable, if it exists.

:return: The root :class:`~.Identifiable` or None if no such root exists
"""
item = self # type: Any
if item.id_short is not None:
from .submodel import SubmodelElementList
while item is not None:
if isinstance(item, Identifiable):
reversed_path.append(item.id)
break
elif isinstance(item, Referable):
if isinstance(item.parent, SubmodelElementList):
reversed_path.append(f"{item.parent.id_short}[{item.parent.value.index(item)}]")
item = item.parent
else:
reversed_path.append(item.id_short)
item = item.parent
else:
raise AttributeError('Referable must have an identifiable as root object and only parents that are '
'referable')
while item is not None:
if isinstance(item, Identifiable):
return item
elif isinstance(item, Referable):
item = item.parent
else:
raise AttributeError('Referable must have an identifiable as root object and only parents that are '
'referable')
return None

return self.__class__.__name__ + ("[{}]".format(" / ".join(reversed(reversed_path))) if reversed_path else "")
def get_id_short_path(self) -> str:
"""
Get the id_short path of this referable, i.e. the id_short of this referable and all its parents.

:return: The id_short path as a string, e.g. "MySECollection.MySEList[2]MySubProperty1"
"""
path_list = self.get_id_short_path_as_a_list()
return self.build_id_short_path(path_list)

def get_id_short_path_as_a_list(self) -> List[str]:
"""
Get the id_short path of this referable as a list of id_shorts and indexes.

:return: The id_short path as a list, e.g. '["MySECollection", "MySEList", "2", "MySubProperty1"]'
:raises ValueError: If this referable has no id_short or
if its parent is not a :class:`~basyx.aas.model.submodel.SubmodelElementList`
:raises AttributeError: If the parent chain is broken, i.e. if a parent is neither a :class:`~.Referable` nor an
:class:`~.Identifiable`
"""
from .submodel import SubmodelElementList
if self.id_short is None and not isinstance(self.parent, SubmodelElementList):
raise ValueError(f"Can't create id_short_path for {self.__class__.__name__} without an id_short or "
f"if its parent is a SubmodelElementList!")

item = self # type: Any
path: List[str] = []
while item is not None:
if not isinstance(item, Referable):
raise AttributeError('Referable must have an identifiable as root object and only parents that are '
'referable')
if isinstance(item, Identifiable):
break
elif isinstance(item.parent, SubmodelElementList):
path.insert(0, str(item.parent.value.index(item)))
else:
path.insert(0, item.id_short)
item = item.parent
return path

def _get_id_short(self) -> Optional[NameType]:
return self._id_short
Expand All @@ -653,6 +710,49 @@ def _set_category(self, category: Optional[NameType]):
def _get_category(self) -> Optional[NameType]:
return self._category

@classmethod
def parse_id_short_path(cls, id_short_path: str) -> List[str]:
"""
Parse an id_short_path string into a list of id_shorts and indexes.

:param id_short_path: The id_short_path string, e.g. "MySECollection.MySEList[2]MySubProperty1"
:return: The id_short path as a list, e.g. '["MySECollection", "MySEList", "2", "MySubProperty1"]'
"""
id_shorts_and_indexes = []
for part in id_short_path.split("."):
id_short = part[0:part.find('[')] if '[' in part else part
id_shorts_and_indexes.append(id_short)

indexes_part = part.removeprefix(id_short)
if indexes_part:
if not re.fullmatch(r'(?:\[\d+\])+', indexes_part):
raise ValueError(f"Invalid index format in id_short_path: '{id_short_path}', part: '{part}'")
indexes = indexes_part.strip("[]").split("][")
id_shorts_and_indexes.extend(indexes)
cls.validate_id_short_path(id_shorts_and_indexes)
return id_shorts_and_indexes

@classmethod
def build_id_short_path(cls, id_short_path: Iterable[str]) -> str:
"""
Build an id_short_path string from a list of id_shorts and indexes.
"""
if isinstance(id_short_path, str):
raise ValueError("id_short_path must be an Iterable of strings, not a single string")
path_list_with_dots_and_brackets = [f"[{part}]" if part.isdigit() else f".{part}" for part in id_short_path]
id_short_path = "".join(path_list_with_dots_and_brackets).removeprefix(".")
return id_short_path

@classmethod
def validate_id_short_path(cls, id_short_path: Union[str, NameType, Iterable[NameType]]):
if isinstance(id_short_path, str):
id_short_path = cls.parse_id_short_path(id_short_path)
for id_short in id_short_path:
if id_short.isdigit():
# This is an index, skip validation
continue
cls.validate_id_short(id_short)

@classmethod
def validate_id_short(cls, id_short: NameType) -> None:
"""
Expand Down Expand Up @@ -1001,22 +1101,24 @@ def from_referable(referable: Referable) -> "ModelReference":
object's ancestors
"""
# Get the first class from the base classes list (via inspect.getmro), that is contained in KEY_ELEMENTS_CLASSES
from . import KEY_TYPES_CLASSES
from . import resolve_referable_class_in_key_types
try:
ref_type = next(iter(t for t in inspect.getmro(type(referable)) if t in KEY_TYPES_CLASSES))
ref_type = resolve_referable_class_in_key_types(referable)
except StopIteration:
ref_type = Referable

ref: Referable = referable
keys: List[Key] = []
while True:
keys.append(Key.from_referable(ref))
keys.insert(0, Key.from_referable(ref))
if isinstance(ref, Identifiable):
keys.reverse()
return ModelReference(tuple(keys), ref_type)
if ref.parent is None or not isinstance(ref.parent, Referable):
raise ValueError("The given Referable object is not embedded within an Identifiable object")
raise ValueError(f"The given Referable object is not embedded within an Identifiable object: {ref}")
ref = ref.parent
if len(keys) > MAX_RECURSION_DEPTH:
raise ValueError(f"The given Referable object is embedded in >64 layers of Referables "
f"or there is a loop in the parent chain {ref}")


@_string_constraints.constrain_content_type("content_type")
Expand Down Expand Up @@ -1624,12 +1726,12 @@ def __init__(self) -> None:
super().__init__()
self.namespace_element_sets: List[NamespaceSet] = []

def get_referable(self, id_short: Union[NameType, Iterable[NameType]]) -> Referable:
def get_referable(self, id_short_path: Union[str, NameType, Iterable[NameType]]) -> Referable:
"""
Find a :class:`~.Referable` in this Namespace by its id_short or by its id_short path.
The id_short path may contain :class:`~basyx.aas.model.submodel.SubmodelElementList` indices.

:param id_short: id_short or id_short path as any :class:`Iterable`
:param id_short_path: id_short or id_short path as a str or any :class:`Iterable`
:returns: :class:`~.Referable`
:raises TypeError: If one of the intermediate objects on the path is not a
:class:`~.UniqueIdShortNamespace`
Expand All @@ -1638,10 +1740,10 @@ def get_referable(self, id_short: Union[NameType, Iterable[NameType]]) -> Refera
:raises KeyError: If no such :class:`~.Referable` can be found
"""
from .submodel import SubmodelElementList
if isinstance(id_short, NameType):
id_short = [id_short]
if isinstance(id_short_path, (str, NameType)):
id_short_path = Referable.parse_id_short_path(id_short_path)
item: Union[UniqueIdShortNamespace, Referable] = self
for id_ in id_short:
for id_ in id_short_path:
# This is redundant on first iteration, but it's a negligible overhead.
# Also, ModelReference.resolve() relies on this check.
if not isinstance(item, UniqueIdShortNamespace):
Expand Down
Loading