diff --git a/examples/xml/component/application_component.py b/examples/xml/component/application_component.py index b296cb0..bb498d3 100644 --- a/examples/xml/component/application_component.py +++ b/examples/xml/component/application_component.py @@ -1,96 +1,109 @@ """ -Sender-receiver port interface examples +Application component example """ import os import autosar import autosar.xml.element as ar_element -def create_platform_types(packages: dict[str, ar_element.Package]): +def create_platform_types(workspace: autosar.xml.Workspace): """ Creates necessary platform types """ boolean_base_type = ar_element.SwBaseType('boolean', size=8, encoding="BOOLEAN") - packages["PlatformBaseTypes"].append(boolean_base_type) + workspace.add_element("PlatformBaseTypes", boolean_base_type) uint8_base_type = ar_element.SwBaseType('uint8', size=8) - packages["PlatformBaseTypes"].append(uint8_base_type) + workspace.add_element("PlatformBaseTypes", uint8_base_type) uint16_base_type = ar_element.SwBaseType('uint16', size=16) - packages["PlatformBaseTypes"].append(uint16_base_type) + workspace.add_element("PlatformBaseTypes", uint16_base_type) uint32_base_type = ar_element.SwBaseType('uint32', size=32) - packages["PlatformBaseTypes"].append(uint32_base_type) + workspace.add_element("PlatformBaseTypes", uint32_base_type) boolean_data_constr = ar_element.DataConstraint.make_internal("boolean_DataConstr", 0, 1) - packages["PlatformDataConstraints"].append(boolean_data_constr) + workspace.add_element("PlatformDataConstraints", boolean_data_constr) computation = ar_element.Computation.make_value_table(["FALSE", "TRUE"]) boolean_compu_method = ar_element.CompuMethod(name="boolean_CompuMethod", category="TEXTTABLE", int_to_phys=computation) - packages["PlatformCompuMethods"].append(boolean_compu_method) + workspace.add_element("PlatformCompuMethods", boolean_compu_method) sw_data_def_props = ar_element.SwDataDefPropsConditional(base_type_ref=uint8_base_type.ref(), data_constraint_ref=boolean_data_constr.ref(), compu_method_ref=boolean_compu_method.ref()) boolean_impl_type = ar_element.ImplementationDataType("boolean", category="VALUE", sw_data_def_props=sw_data_def_props) - packages["PlatformImplementationDataTypes"].append(boolean_impl_type) + workspace.add_element("PlatformImplementationDataTypes", boolean_impl_type) sw_data_def_props = ar_element.SwDataDefPropsConditional(base_type_ref=uint8_base_type.ref()) uint8_impl_type = ar_element.ImplementationDataType("uint8", category="VALUE", sw_data_def_props=sw_data_def_props) - packages["PlatformImplementationDataTypes"].append(uint8_impl_type) + workspace.add_element("PlatformImplementationDataTypes", uint8_impl_type) sw_data_def_props = ar_element.SwDataDefPropsConditional(base_type_ref=uint16_base_type.ref()) uint16_impl_type = ar_element.ImplementationDataType("uint16", category="VALUE", sw_data_def_props=sw_data_def_props) - packages["PlatformImplementationDataTypes"].append(uint16_impl_type) + workspace.add_element("PlatformImplementationDataTypes", uint16_impl_type) sw_data_def_props = ar_element.SwDataDefPropsConditional(base_type_ref=uint32_base_type.ref()) uint32_impl_type = ar_element.ImplementationDataType("uint32", category="VALUE", sw_data_def_props=sw_data_def_props) - packages["PlatformImplementationDataTypes"].append(uint32_impl_type) + workspace.add_element("PlatformImplementationDataTypes", uint32_impl_type) -def create_sender_receiver_port_interfaces(packages: dict[str, ar_element.Package]): +def create_sender_receiver_port_interfaces(workspace: autosar.xml.Workspace): """ - Create sender-receiver port interfaces used in example + Creates necessary sender-receiver port interfaces """ - uint16_impl_t = packages["PlatformImplementationDataTypes"].find("uint16") + uint16_impl_t = workspace.find_element("PlatformImplementationDataTypes", "uint16") port_interface = ar_element.SenderReceiverInterface("VehicleSpeed_I") port_interface.create_data_element("VehicleSpeed", type_ref=uint16_impl_t.ref()) - packages["PortInterfaces"].append(port_interface) + workspace.add_element("PortInterfaces", port_interface) + port_interface = ar_element.SenderReceiverInterface("EngineSpeed_I") + port_interface.create_data_element("EngineSpeed", type_ref=uint16_impl_t.ref()) + workspace.add_element("PortInterfaces", port_interface) -def create_client_server_interfaces(packages: dict[str, ar_element.Package]): +def create_constants(workspace: autosar.xml.Workspace): + """ + Creates init values in Constants package + """ + constant = ar_element.ConstantSpecification.make_constant("VehicleSpeed_IV", 65535) + workspace.add_element("Constants", constant) + constant = ar_element.ConstantSpecification.make_constant("EngineSpeed_IV", 65535) + workspace.add_element("Constants", constant) + + +def create_client_server_interfaces(workspace: autosar.xml.Workspace): """ Creates client-server interface with one operation """ - uint32_impl_type = packages["PlatformImplementationDataTypes"].find("uint32") - boolean_impl_type = packages["PlatformImplementationDataTypes"].find("boolean") - interface = ar_element.ClientServerInterface("FreeRunningTimer_I") - operation = interface.create_operation("GetTime") + uint32_impl_type = workspace.find_element("PlatformImplementationDataTypes", "uint32") + boolean_impl_type = workspace.find_element("PlatformImplementationDataTypes", "boolean") + port_interface = ar_element.ClientServerInterface("FreeRunningTimer_I") + operation = port_interface.create_operation("GetTime") operation.create_out_argument("value", type_ref=uint32_impl_type.ref()) - operation = interface.create_operation("IsTimerElapsed") + operation = port_interface.create_operation("IsTimerElapsed") operation.create_in_argument("startTime", type_ref=uint32_impl_type.ref()) operation.create_in_argument("duration", type_ref=uint32_impl_type.ref()) operation.create_out_argument("result", type_ref=boolean_impl_type.ref()) - packages["PortInterfaces"].append(interface) + workspace.add_element("PortInterfaces", port_interface) -def create_application_component(packages: dict[str, ar_element.Package]): +def create_application_component(workspace: autosar.xml.Workspace): """ - Creates application component with one sender-receiver port and one - client-server port + Creates application SWC with two sender ports """ - timer_interface = packages["PortInterfaces"].find("FreeRunningTimer_I") - vehicle_speed_interface = packages["PortInterfaces"].find("VehicleSpeed_I") - swc = ar_element.ApplicationSoftwareComponentType("MyApplication") - swc.create_require_port("VehicleSpeed", vehicle_speed_interface, com_spec={"init_value": 65535, - 'alive_timeout': 0, - 'enable_update': False, - 'uses_end_to_end_protection': False, - 'handle_never_received': False + vehicle_speed_interface = workspace.find_element("PortInterfaces", "VehicleSpeed_I") + vehicle_speed_init = workspace.find_element("Constants", "VehicleSpeed_IV") + engine_speed_interface = workspace.find_element("PortInterfaces", "EngineSpeed_I") + engine_speed_init = workspace.find_element("Constants", "EngineSpeed_IV") + swc = ar_element.ApplicationSoftwareComponentType("SenderComponent") + swc.create_provide_port("VehicleSpeed", vehicle_speed_interface, com_spec={"init_value": vehicle_speed_init.ref(), + "uses_end_to_end_protection": False, }) - swc.create_require_port("FreeRunningTimer", timer_interface) - packages["ComponentTypes"].append(swc) + swc.create_provide_port("EngineSpeed", engine_speed_interface, com_spec={"init_value": engine_speed_init.ref(), + "uses_end_to_end_protection": False, + }) + workspace.add_element("ComponentTypes", swc) def save_xml_files(workspace: autosar.xml.Workspace): @@ -102,8 +115,11 @@ def save_xml_files(workspace: autosar.xml.Workspace): platform_document_path = os.path.abspath(os.path.join(os.path.dirname( __file__), 'data', 'platform.arxml')) component_document_path = os.path.abspath(os.path.join(os.path.dirname( - __file__), 'data', 'MyApplication.arxml')) + __file__), 'data', 'sender_component.arxml')) + constant_document_path = os.path.abspath(os.path.join(os.path.dirname( + __file__), 'data', 'constants.arxml')) workspace.create_document(interface_document_path, packages="/PortInterfaces") + workspace.create_document(constant_document_path, packages="/Constants") workspace.create_document(platform_document_path, packages="/AUTOSAR_Platform") workspace.create_document(component_document_path, packages="/ComponentTypes") workspace.write_documents() @@ -114,24 +130,18 @@ def main(): Main """ workspace = autosar.xml.Workspace() - packages = dict(zip(["PlatformBaseTypes", - "PlatformImplementationDataTypes", - "PlatformDataConstraints", - "PlatformCompuMethods", - "ImplementationDataTypes", - "PortInterfaces", - "ComponentTypes"], - workspace.make_packages("AUTOSAR_Platform/BaseTypes", - "AUTOSAR_Platform/ImplementationDataTypes", - "AUTOSAR_Platform/DataConstraints", - "AUTOSAR_Platform/CompuMethods", - "DataTypes/ImplementationDataTypes", - "PortInterfaces", - "ComponentTypes"))) - create_platform_types(packages) - create_sender_receiver_port_interfaces(packages) - create_client_server_interfaces(packages) - create_application_component(packages) + workspace.init_package_map({"PlatformBaseTypes": "AUTOSAR_Platform/BaseTypes", + "PlatformImplementationDataTypes": "AUTOSAR_Platform/ImplementationDataTypes", + "PlatformDataConstraints": "AUTOSAR_Platform/DataConstraints", + "PlatformCompuMethods": "AUTOSAR_Platform/CompuMethods", + "Constants": "Constants", + "PortInterfaces": "PortInterfaces", + "ComponentTypes": "ComponentTypes"}) + create_platform_types(workspace) + create_sender_receiver_port_interfaces(workspace) + create_client_server_interfaces(workspace) + create_constants(workspace) + create_application_component(workspace) save_xml_files(workspace) diff --git a/examples/xml/component/composition_component.py b/examples/xml/component/composition_component.py new file mode 100644 index 0000000..16e0218 --- /dev/null +++ b/examples/xml/component/composition_component.py @@ -0,0 +1,188 @@ +""" +Composition component example +""" +import os +import autosar +import autosar.xml.element as ar_element + + +def create_platform_types(workspace: autosar.xml.Workspace): + """ + Creates necessary platform types + """ + boolean_base_type = ar_element.SwBaseType('boolean', size=8, encoding="BOOLEAN") + workspace.add_element("PlatformBaseTypes", boolean_base_type) + uint8_base_type = ar_element.SwBaseType('uint8', size=8) + workspace.add_element("PlatformBaseTypes", uint8_base_type) + uint16_base_type = ar_element.SwBaseType('uint16', size=16) + workspace.add_element("PlatformBaseTypes", uint16_base_type) + uint32_base_type = ar_element.SwBaseType('uint32', size=32) + workspace.add_element("PlatformBaseTypes", uint32_base_type) + boolean_data_constr = ar_element.DataConstraint.make_internal("boolean_DataConstr", 0, 1) + workspace.add_element("PlatformDataConstraints", boolean_data_constr) + computation = ar_element.Computation.make_value_table(["FALSE", "TRUE"]) + boolean_compu_method = ar_element.CompuMethod(name="boolean_CompuMethod", + category="TEXTTABLE", + int_to_phys=computation) + workspace.add_element("PlatformCompuMethods", boolean_compu_method) + sw_data_def_props = ar_element.SwDataDefPropsConditional(base_type_ref=uint8_base_type.ref(), + data_constraint_ref=boolean_data_constr.ref(), + compu_method_ref=boolean_compu_method.ref()) + boolean_impl_type = ar_element.ImplementationDataType("boolean", + category="VALUE", + sw_data_def_props=sw_data_def_props) + workspace.add_element("PlatformImplementationDataTypes", boolean_impl_type) + sw_data_def_props = ar_element.SwDataDefPropsConditional(base_type_ref=uint8_base_type.ref()) + uint8_impl_type = ar_element.ImplementationDataType("uint8", + category="VALUE", + sw_data_def_props=sw_data_def_props) + workspace.add_element("PlatformImplementationDataTypes", uint8_impl_type) + sw_data_def_props = ar_element.SwDataDefPropsConditional(base_type_ref=uint16_base_type.ref()) + uint16_impl_type = ar_element.ImplementationDataType("uint16", + category="VALUE", + sw_data_def_props=sw_data_def_props) + workspace.add_element("PlatformImplementationDataTypes", uint16_impl_type) + sw_data_def_props = ar_element.SwDataDefPropsConditional(base_type_ref=uint32_base_type.ref()) + uint32_impl_type = ar_element.ImplementationDataType("uint32", + category="VALUE", + sw_data_def_props=sw_data_def_props) + workspace.add_element("PlatformImplementationDataTypes", uint32_impl_type) + + +def create_sender_receiver_port_interfaces(workspace: autosar.xml.Workspace): + """ + Creates necessary sender-receiver port interfaces + """ + uint16_impl_t = workspace.find_element("PlatformImplementationDataTypes", "uint16") + port_interface = ar_element.SenderReceiverInterface("VehicleSpeed_I") + port_interface.create_data_element("VehicleSpeed", type_ref=uint16_impl_t.ref()) + workspace.add_element("PortInterfaces", port_interface) + port_interface = ar_element.SenderReceiverInterface("EngineSpeed_I") + port_interface.create_data_element("EngineSpeed", type_ref=uint16_impl_t.ref()) + workspace.add_element("PortInterfaces", port_interface) + + +def create_constants(workspace: autosar.xml.Workspace): + """ + Creates init values in Constants package + """ + constant = ar_element.ConstantSpecification.make_constant("VehicleSpeed_IV", 65535) + workspace.add_element("Constants", constant) + constant = ar_element.ConstantSpecification.make_constant("EngineSpeed_IV", 65535) + workspace.add_element("Constants", constant) + + +def create_client_server_interfaces(workspace: autosar.xml.Workspace): + """ + Creates client-server interface with one operation + """ + uint32_impl_type = workspace.find_element("PlatformImplementationDataTypes", "uint32") + boolean_impl_type = workspace.find_element("PlatformImplementationDataTypes", "boolean") + port_interface = ar_element.ClientServerInterface("FreeRunningTimer_I") + operation = port_interface.create_operation("GetTime") + operation.create_out_argument("value", type_ref=uint32_impl_type.ref()) + operation = port_interface.create_operation("IsTimerElapsed") + operation.create_in_argument("startTime", type_ref=uint32_impl_type.ref()) + operation.create_in_argument("duration", type_ref=uint32_impl_type.ref()) + operation.create_out_argument("result", type_ref=boolean_impl_type.ref()) + workspace.add_element("PortInterfaces", port_interface) + + +def create_receiver_component(workspace: autosar.xml.Workspace): + """ + Creates application SWC with two receiver ports and one client port + """ + timer_interface = workspace.find_element("PortInterfaces", "FreeRunningTimer_I") + vehicle_speed_interface = workspace.find_element("PortInterfaces", "VehicleSpeed_I") + vehicle_speed_init = workspace.find_element("Constants", "VehicleSpeed_IV") + engine_speed_interface = workspace.find_element("PortInterfaces", "EngineSpeed_I") + engine_speed_init = workspace.find_element("Constants", "EngineSpeed_IV") + swc = ar_element.ApplicationSoftwareComponentType("ReceiverComponent") + swc.create_require_port("VehicleSpeed", vehicle_speed_interface, com_spec={"init_value": vehicle_speed_init.ref(), + "alive_timeout": 0, + "enable_update": False, + "uses_end_to_end_protection": False, + "handle_never_received": False + }) + swc.create_require_port("EngineSpeed", engine_speed_interface, com_spec={"init_value": engine_speed_init.ref(), + "alive_timeout": 0, + "enable_update": False, + "uses_end_to_end_protection": False, + "handle_never_received": False + }) + swc.create_require_port("FreeRunningTimer", timer_interface, com_spec={"GetTime": {}, "IsTimerElapsed": {}}) + workspace.add_element("ComponentTypes", swc) + + +def create_server_component(workspace: autosar.xml.Workspace): + """ + Creates application SWC with one server port + """ + timer_interface = workspace.find_element("PortInterfaces", "FreeRunningTimer_I") + swc = ar_element.ApplicationSoftwareComponentType("TimerComponent") + swc.create_provide_port("FreeRunningTimer", timer_interface, com_spec={"GetTime": {"queue_length": 1}, + "IsTimerElapsed": {"queue_length": 1} + }) + workspace.add_element("ComponentTypes", swc) + + +def create_composition_component(workspace: autosar.xml.Workspace): + """ + Creates composition component + """ + vehicle_speed_interface = workspace.find_element("PortInterfaces", "VehicleSpeed_I") + vehicle_speed_init = workspace.find_element("Constants", "VehicleSpeed_IV") + engine_speed_interface = workspace.find_element("PortInterfaces", "EngineSpeed_I") + engine_speed_init = workspace.find_element("Constants", "EngineSpeed_IV") + swc = ar_element.ApplicationSoftwareComponentType("CompositionComponent") + swc.create_require_port("VehicleSpeed", vehicle_speed_interface, com_spec={"init_value": vehicle_speed_init.ref(), + "uses_end_to_end_protection": False}) + swc.create_require_port("EngineSpeed", engine_speed_interface, com_spec={"init_value": engine_speed_init.ref(), + "uses_end_to_end_protection": False}) + + workspace.add_element("ComponentTypes", swc) + + +def save_xml_files(workspace: autosar.xml.Workspace): + """ + Saves workspace as XML documents + """ + interface_document_path = os.path.abspath(os.path.join(os.path.dirname( + __file__), 'data', 'portinterfaces.arxml')) + platform_document_path = os.path.abspath(os.path.join(os.path.dirname( + __file__), 'data', 'platform.arxml')) + component_document_path = os.path.abspath(os.path.join(os.path.dirname( + __file__), 'data', 'composition.arxml')) + constant_document_path = os.path.abspath(os.path.join(os.path.dirname( + __file__), 'data', 'constants.arxml')) + workspace.create_document(interface_document_path, packages="/PortInterfaces") + workspace.create_document(constant_document_path, packages="/Constants") + workspace.create_document(platform_document_path, packages="/AUTOSAR_Platform") + workspace.create_document(component_document_path, packages="/ComponentTypes") + workspace.write_documents() + + +def main(): + """ + Main + """ + workspace = autosar.xml.Workspace() + workspace.init_package_map({"PlatformBaseTypes": "AUTOSAR_Platform/BaseTypes", + "PlatformImplementationDataTypes": "AUTOSAR_Platform/ImplementationDataTypes", + "PlatformDataConstraints": "AUTOSAR_Platform/DataConstraints", + "PlatformCompuMethods": "AUTOSAR_Platform/CompuMethods", + "Constants": "Constants", + "PortInterfaces": "PortInterfaces", + "ComponentTypes": "ComponentTypes"}) + create_platform_types(workspace) + create_sender_receiver_port_interfaces(workspace) + create_client_server_interfaces(workspace) + create_constants(workspace) + create_receiver_component(workspace) + create_server_component(workspace) + create_composition_component(workspace) + save_xml_files(workspace) + + +if __name__ == "__main__": + main() diff --git a/examples/xml/component/data/MyApplication.arxml b/examples/xml/component/data/MyApplication.arxml deleted file mode 100644 index 3855af3..0000000 --- a/examples/xml/component/data/MyApplication.arxml +++ /dev/null @@ -1,37 +0,0 @@ - - - - - ComponentTypes - - - MyApplication - - - VehicleSpeed - - - /PortInterfaces/VehicleSpeed_I/VehicleSpeed - false - 0 - false - false - - - 65535 - - - - - /PortInterfaces/VehicleSpeed_I - - - FreeRunningTimer - /PortInterfaces/FreeRunningTimer_I - - - - - - - \ No newline at end of file diff --git a/examples/xml/component/data/composition.arxml b/examples/xml/component/data/composition.arxml new file mode 100644 index 0000000..4ec87c0 --- /dev/null +++ b/examples/xml/component/data/composition.arxml @@ -0,0 +1,117 @@ + + + + + ComponentTypes + + + ReceiverComponent + + + VehicleSpeed + + + /PortInterfaces/VehicleSpeed_I/VehicleSpeed + false + 0 + false + false + + + /Constants/VehicleSpeed_IV + + + + + /PortInterfaces/VehicleSpeed_I + + + EngineSpeed + + + /PortInterfaces/EngineSpeed_I/EngineSpeed + false + 0 + false + false + + + /Constants/EngineSpeed_IV + + + + + /PortInterfaces/EngineSpeed_I + + + FreeRunningTimer + + + /PortInterfaces/FreeRunningTimer_I/GetTime + + + /PortInterfaces/FreeRunningTimer_I/IsTimerElapsed + + + /PortInterfaces/FreeRunningTimer_I + + + + + TimerComponent + + + FreeRunningTimer + + + /PortInterfaces/FreeRunningTimer_I/GetTime + 1 + + + /PortInterfaces/FreeRunningTimer_I/IsTimerElapsed + 1 + + + /PortInterfaces/FreeRunningTimer_I + + + + + CompositionComponent + + + VehicleSpeed + + + /PortInterfaces/VehicleSpeed_I/VehicleSpeed + false + + + /Constants/VehicleSpeed_IV + + + + + /PortInterfaces/VehicleSpeed_I + + + EngineSpeed + + + /PortInterfaces/EngineSpeed_I/EngineSpeed + false + + + /Constants/EngineSpeed_IV + + + + + /PortInterfaces/EngineSpeed_I + + + + + + + \ No newline at end of file diff --git a/examples/xml/component/data/constants.arxml b/examples/xml/component/data/constants.arxml new file mode 100644 index 0000000..0b597a5 --- /dev/null +++ b/examples/xml/component/data/constants.arxml @@ -0,0 +1,26 @@ + + + + + Constants + + + VehicleSpeed_IV + + + 65535 + + + + + EngineSpeed_IV + + + 65535 + + + + + + + \ No newline at end of file diff --git a/examples/xml/component/data/portinterfaces.arxml b/examples/xml/component/data/portinterfaces.arxml index 366b0e5..63bc5cb 100644 --- a/examples/xml/component/data/portinterfaces.arxml +++ b/examples/xml/component/data/portinterfaces.arxml @@ -13,6 +13,15 @@ + + EngineSpeed_I + + + EngineSpeed + /AUTOSAR_Platform/ImplementationDataTypes/uint16 + + + FreeRunningTimer_I diff --git a/examples/xml/component/data/sender_component.arxml b/examples/xml/component/data/sender_component.arxml new file mode 100644 index 0000000..32f5ee2 --- /dev/null +++ b/examples/xml/component/data/sender_component.arxml @@ -0,0 +1,45 @@ + + + + + ComponentTypes + + + SenderComponent + + + VehicleSpeed + + + /PortInterfaces/VehicleSpeed_I/VehicleSpeed + false + + + /Constants/VehicleSpeed_IV + + + + + /PortInterfaces/VehicleSpeed_I + + + EngineSpeed + + + /PortInterfaces/EngineSpeed_I/EngineSpeed + false + + + /Constants/EngineSpeed_IV + + + + + /PortInterfaces/EngineSpeed_I + + + + + + + \ No newline at end of file diff --git a/src/autosar/xml/element.py b/src/autosar/xml/element.py index 84e12e6..7669872 100644 --- a/src/autosar/xml/element.py +++ b/src/autosar/xml/element.py @@ -36,6 +36,14 @@ "DelegationSwConnector", "PassThroughSwConnector"] +InitValueArgType = Union["int", + "float", + "str", + "list", + "tuple", + "ValueSpecificationElement", + "ConstantRef"] + # Helper classes @@ -2938,27 +2946,21 @@ def __init__(self, label: str | None = None) -> None: # .VARIATION-POINT not supported @classmethod - def make_init_value(cls, - init_value: int | float | str | tuple | ValueSpecificationElement | None = None, # noqa E501 pylint: disable=C0301 - init_value_ref: Union[str, ConstantRef, "ConstantReference", None] = None - ) -> ValueSpecificationElement: - """ - Attempts to create an init value from either the init_value argument or init_value_ref. - Both cannot be set simultaneously - """ - if init_value is not None: - if init_value_ref is not None: - msg = "init_value and init_value_ref cannot be set at the same time. One of them must be None" - raise ValueError(msg) - if isinstance(init_value, ValueSpecification): - return init_value + def make_value_with_check(cls, + value: InitValueArgType | None = None, + ) -> ValueSpecificationElement: + """ + Wrapper for checking and creating init values based on different value types + """ + if value is not None: + if isinstance(value, ValueSpecification): + return value # Already a proper init-value + elif isinstance(value, ConstantRef): + return ConstantReference(value) # Wrap inside constant reference + elif isinstance(value, (int, float, str, list, tuple)): + return cls.make_value(value) # Attempt to create a new value based on raw python data else: - return cls.make_value(init_value) - elif init_value_ref is not None: - if isinstance(init_value_ref, ConstantReference): - return init_value_ref - elif isinstance(init_value_ref, (str, ConstantRef)): - return ConstantReference(init_value_ref) + raise TypeError(f"Unsupported type: {str(type(value))}") return None @classmethod @@ -3848,7 +3850,7 @@ def ref(self) -> ClientServerOperationRef | None: ref_str = self._calc_ref_string() if ref_str is None: return None - return PortInterfaceRef(ref_str, ar_enum.IdentifiableSubTypes.CLIENT_SERVER_OPERATION) + return ClientServerOperationRef(ref_str) def append_argument(self, argument: ArgumentDataPrototype) -> None: """ @@ -4203,13 +4205,19 @@ class ProvidePortComSpec(ARObject): @classmethod def make_from_port_interface(cls, port_interface: PortInterface, **kwargs) -> "ProvidePortComSpec": """ - Convenience method for creating require-port com-specs for port-interfaces - with one data element - Special keys for non-queued SenderReceiverInterface: - * init_value_ref: str | ConstantRef | ConstantReference - Remaining keys are the same as the constructor of NonqueuedSenderComSpec + Convenience method for creating P-PORT com-specs - Currently only supports SenderReceiverInterface + For SenderReceiverInterface: + If interface has a single element: kwargs is a dict with key-value pairs for one com-spec + If interface has multiple elements: kwargs is a dict of dict where + outer dict keys are element names and each value + is another dict containing key-value pairs for one com-spec + + For ClientServerInterface: + If interface has a single operation: kwargs is a dict with key-value pairs for one com-spec + If interface has multiple operations: kwargs is a dict of dict where + outer dict keys are operation names and each value + is another dict containing key-value pairs for one com-spec """ if isinstance(port_interface, SenderReceiverInterface): if len(port_interface.data_elements) == 0: @@ -4221,20 +4229,62 @@ def make_from_port_interface(cls, port_interface: PortInterface, **kwargs) -> "P else: return cls.make_non_queued_sender_com_spec(data_element_ref=data_element.ref(), **kwargs) else: - raise NotImplementedError("Multiple data elements not yet supported") + com_spec_list = [] + unprocessed = set() + for element_name, value in kwargs.items(): + unprocessed.add(element_name) + if not isinstance(value, dict): + msg = f"{port_interface.name}.{element_name}: Expected dict type, got {str(type(value))}" + raise TypeError(msg) + for data_element in port_interface.data_elements: + if data_element.name in unprocessed: + unprocessed.remove(data_element.name) + com_spec_args = kwargs[data_element.name] + if data_element.is_queued: + com_spec = QueuedSenderComSpec(data_element_ref=data_element.ref(), + **com_spec_args) + else: + com_spec = cls.make_non_queued_sender_com_spec(data_element_ref=data_element.ref(), + **com_spec_args) + com_spec_list.append(com_spec) + if len(unprocessed) > 0: + operations = ', '.join(list[unprocessed]) + raise ValueError(f"{port_interface.name}: Operation(s) not found in port interface: '{operations}'") + return com_spec_list + if isinstance(port_interface, ClientServerInterface): + if len(port_interface.operations) == 0: + raise ValueError(f"{port_interface.name}: Port interface must have at least one operation") + if len(port_interface.operations) == 1: + operation = port_interface.operations[0] + return ServerComSpec(operation_ref=operation.ref(), **kwargs) + else: + com_spec_list = [] + unprocessed = set() + for operation_name in kwargs: + unprocessed.add(operation_name) + for operation in port_interface.operations: + if operation.name in unprocessed: + unprocessed.remove(operation.name) + com_spec_args = kwargs[operation.name] + com_spec = ServerComSpec(operation_ref=operation.ref(), **com_spec_args) + com_spec_list.append(com_spec) + if len(unprocessed) > 0: + element_names = ', '.join(list[unprocessed]) + msg = f"{port_interface.name}: Data element(s) not found in port interface: '{element_names}'" + raise ValueError(msg) + return com_spec_list else: raise NotImplementedError(str(type(port_interface))) @classmethod def make_non_queued_sender_com_spec(cls, - init_value: int | float | str | tuple | ValueSpecificationElement | None = None, # noqa E501 pylint: disable=C0301 - init_value_ref: str | ConstantRef | ConstantReference | None = None, + init_value: InitValueArgType | None = None, **kwargs ) -> "NonqueuedSenderComSpec": """ Convenience method for creating NonqueuedSenderComSpec """ - init_value = ValueSpecification.make_init_value(init_value, init_value_ref) + init_value = ValueSpecification.make_value_with_check(init_value) return NonqueuedSenderComSpec(init_value=init_value, **kwargs) @@ -4453,13 +4503,19 @@ class RequirePortComSpec(ARObject): @classmethod def make_from_port_interface(cls, port_interface: PortInterface, **kwargs) -> "RequirePortComSpec": """ - Convenience method for creating require-port com-specs for port-interfaces - with one data element - Special keys for non-queued SenderReceiverInterface: - * init_value_ref: str | ConstantRef | ConstantReference - Remaining keys are the same as the constructor of NonqueuedReceiverComSpec + Convenience method for creating R-PORT com-specs - Currently only supports SenderReceiverInterface + For SenderReceiverInterface: + If interface has a single element: kwargs is a dict with key-value pairs for one com-spec + If interface has multiple elements: kwargs is a dict of dict where + outer dict keys are element names and each value + is another dict containing key-value pairs for one com-spec + + For ClientServerInterface: + If interface has a single operation: kwargs is a dict with key-value pairs for one com-spec + If interface has multiple operations: kwargs is a dict of dict where + outer dict keys are operation names and each value + is another dict containing key-value pairs for one com-spec """ if isinstance(port_interface, SenderReceiverInterface): if len(port_interface.data_elements) == 0: @@ -4471,20 +4527,62 @@ def make_from_port_interface(cls, port_interface: PortInterface, **kwargs) -> "R else: return cls.make_non_queued_receiver_com_spec(data_element_ref=data_element.ref(), **kwargs) else: - raise NotImplementedError("Multiple data elements not yet supported") + com_spec_list = [] + unprocessed = set() + for element_name, value in kwargs.items(): + unprocessed.add(element_name) + if not isinstance(value, dict): + msg = f"{port_interface.name}.{element_name}: Expected dict type, got {str(type(value))}" + raise TypeError(msg) + for data_element in port_interface.data_elements: + if data_element.name in unprocessed: + unprocessed.remove(data_element.name) + com_spec_args = kwargs[data_element.name] + if data_element.is_queued: + com_spec = QueuedSenderComSpec(data_element_ref=data_element.ref(), + **com_spec_args) + else: + com_spec = cls.make_non_queued_receiver_com_spec(data_element_ref=data_element.ref(), + **com_spec_args) + com_spec_list.append(com_spec) + if len(unprocessed) > 0: + element_names = ', '.join(list[unprocessed]) + msg = f"{port_interface.name}: Data element(s) not found in port interface: '{element_names}'" + raise ValueError(msg) + return com_spec_list + if isinstance(port_interface, ClientServerInterface): + if len(port_interface.operations) == 0: + raise ValueError(f"{port_interface.name}: Port interface must have at least one operation") + if len(port_interface.operations) == 1: + operation = port_interface.operations[0] + return ClientComSpec(operation_ref=operation.ref(), **kwargs) + else: + com_spec_list = [] + unprocessed = set() + for operation_name in kwargs: + unprocessed.add(operation_name) + for operation in port_interface.operations: + if operation.name in unprocessed: + unprocessed.remove(operation.name) + com_spec_args = kwargs[operation.name] + com_spec = ClientComSpec(operation_ref=operation.ref(), **com_spec_args) + com_spec_list.append(com_spec) + if len(unprocessed) > 0: + operations = ', '.join(list[unprocessed]) + raise ValueError(f"{port_interface.name}: Operation(s) not found in port interface: '{operations}'") + return com_spec_list else: raise NotImplementedError(str(type(port_interface))) @classmethod def make_non_queued_receiver_com_spec(cls, - init_value: int | float | str | tuple | ValueSpecificationElement | None = None, # noqa E501 pylint: disable=C0301 - init_value_ref: str | ConstantRef | ConstantReference | None = None, + init_value: InitValueArgType | None = None, **kwargs ) -> "NonqueuedReceiverComSpec": """ Convenience method for creating NonqueuedReceiverComSpec """ - init_value = ValueSpecification.make_init_value(init_value, init_value_ref) + init_value = ValueSpecification.make_value_with_check(init_value) return NonqueuedReceiverComSpec(init_value=init_value, **kwargs) diff --git a/src/autosar/xml/workspace.py b/src/autosar/xml/workspace.py index 263d217..e8df95a 100644 --- a/src/autosar/xml/workspace.py +++ b/src/autosar/xml/workspace.py @@ -60,9 +60,10 @@ class Workspace: def __init__(self, config_file_path: str | None = None, document_root: str | None = None) -> None: self.namespaces: dict[str, Namespace] = {} self.packages: list[ar_element.Package] = [] - self._package_map: dict[str, ar_element.Package] = {} + self._package_dict: dict[str, ar_element.Package] = {} # Each key is the name of an actual package self.documents: list[DocumentConfig] = [] self.document_root = document_root + self.package_map: dict[str, ar_element.Package] = {} # Each key is user-defined if config_file_path is not None: self.load_config(config_file_path) @@ -96,6 +97,16 @@ def get_package_ref_by_role(self, namespace_name: str, role: ar_enum.PackageRole raise ValueError(f"Role '{str(role)}'not in namespace map") from ex return posixpath.normpath(posixpath.join(base_ref, rel_path)) + def create_package(self, name: str, **kwargs) -> ar_element.Package: + """ + Creates new package in workspace + """ + if name in self._package_dict: + return ValueError(f"Package with name '{name}' already exists") + package = ar_element.Package(name, **kwargs) + self.append(package) + return package + def make_packages(self, *refs: list[str]) -> ar_element.Package | list[ar_element.Package]: """ Recursively creates packages from reference(s) @@ -107,7 +118,7 @@ def make_packages(self, *refs: list[str]) -> ar_element.Package | list[ar_elemen if ref.startswith('/'): ref = ref[1:] parts = ref.partition('/') - package = self._package_map.get(parts[0], None) + package = self._package_dict.get(parts[0], None) if package is None: package = self.create_package(parts[0]) if len(parts[2]) > 0: @@ -115,33 +126,58 @@ def make_packages(self, *refs: list[str]) -> ar_element.Package | list[ar_elemen result.append(package) return result[0] if len(result) == 1 else result - def create_package(self, name: str, **kwargs) -> ar_element.Package: + def init_package_map(self, mapping: dict[str, str]) -> None: """ - Creates new package in workspace + Initializes an internally stored package map using key-value pairs. + The key can be any (unique) name while each value must be a package reference. + This function internally creates packages from dict-values using the methd + make_packages. + + Use in conjunction with the add_element function. + + Avoid manually calling make_packages if using this method. """ - if name in self._package_map: - return ValueError(f"Package with name '{name}' already exists") - package = ar_element.Package(name, **kwargs) - self.append(package) - return package + self.package_map.clear() + for package_key, package_ref in mapping.items(): + self.package_map[package_key] = self.make_packages(package_ref) + + def add_element(self, package_key: str, element: ar_element.ARObject): + """ + Adds element to package specified by package_key + + The method init_package_map must be called before using this method. + """ + if len(self.package_map) == 0: + raise RuntimeError("Internal package map not initialized") + self.package_map[package_key].append(element) + + def find_element(self, package_key: str, element_name: str) -> ar_element.Identifiable | None: + """ + Finds element in package referenced by package_key. + + Ony use together with init_package_map and add_element. + """ + if len(self.package_map) == 0: + raise RuntimeError("Internal package map not initialized") + return self.package_map[package_key].find(element_name) def append(self, package: ar_element.Package): """ Appends package to this worksapace """ assert isinstance(package, ar_element.Package) - self._package_map[package.name] = package + self._package_dict[package.name] = package self.packages.append(package) package.parent = self - def find(self, ref: str) -> Any: + def find(self, ref: str) -> ar_element.Identifiable | None: """ Finds item by reference """ if ref.startswith('/'): ref = ref[1:] parts = ref.partition('/') - package = self._package_map.get(parts[0], None) + package = self._package_dict.get(parts[0], None) if (package is not None) and (len(parts[2]) > 0): return package.find(parts[2]) return package diff --git a/src/autosar/xml/writer.py b/src/autosar/xml/writer.py index 28d9e5a..e1c2670 100644 --- a/src/autosar/xml/writer.py +++ b/src/autosar/xml/writer.py @@ -244,6 +244,7 @@ def __init__(self) -> None: 'NvRequireComSpec': self._write_nv_require_com_spec, 'ParameterRequireComSpec': self._write_parameter_require_com_spec, 'ModeSwitchReceiverComSpec': self._write_mode_switch_receiver_com_spec, + 'ClientComSpec': self._write_client_com_spec, } # Elements used only for unit test purposes self.switcher_non_collectable = { diff --git a/tests/xml/test_software_component_builder.py b/tests/xml/test_software_component_builder.py index 39df6e9..b9a7326 100644 --- a/tests/xml/test_software_component_builder.py +++ b/tests/xml/test_software_component_builder.py @@ -112,7 +112,7 @@ def test_init_value_ref(self): port_interface = create_vehicle_speed_interface(packages) init_value = create_init_value(packages, "VehicleSpeed_IV", 65535) swc = create_application_swc(packages) - port = swc.create_provide_port("VehicleSpeed", port_interface, com_spec={'init_value_ref': init_value.ref()}) + port = swc.create_provide_port("VehicleSpeed", port_interface, com_spec={'init_value': init_value.ref()}) self.assertIsInstance(port, ar_element.ProvidePortPrototype) com_spec: ar_element.NonqueuedSenderComSpec = port.com_spec[0] self.assertIsInstance(com_spec, ar_element.NonqueuedSenderComSpec) @@ -142,7 +142,7 @@ def test_init_value_ref(self): port_interface = create_vehicle_speed_interface(packages) init_value = create_init_value(packages, "VehicleSpeed_IV", 65535) swc = create_application_swc(packages) - port = swc.create_require_port("VehicleSpeed", port_interface, com_spec={'init_value_ref': init_value.ref()}) + port = swc.create_require_port("VehicleSpeed", port_interface, com_spec={'init_value': init_value.ref()}) self.assertIsInstance(port, ar_element.RequirePortPrototype) com_spec: ar_element.NonqueuedReceiverComSpec = port.com_spec[0] self.assertIsInstance(com_spec, ar_element.NonqueuedReceiverComSpec)