diff --git a/.github/workflows/debian_packaging.yml b/.github/workflows/debian_packaging.yml index 08bf95a..8ad75f6 100644 --- a/.github/workflows/debian_packaging.yml +++ b/.github/workflows/debian_packaging.yml @@ -9,9 +9,11 @@ on: jobs: build_debs: - name: Build debs on linux x86_64 - runs-on: ubuntu-latest - + strategy: + matrix: + os: [ubuntu-latest, ubuntu-24.04-arm] + name: Build debs + runs-on: ${{ matrix.os }} steps: - uses: actions/checkout@v4 diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 827ad5a..f20d7d3 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -2,6 +2,21 @@ Changelog for package automatika_ros_sugar ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +0.2.6 (2025-01-17) +------------------ +* (fix) Fixes type hint +* (fix) Fixes getting available events +* (feature) checks for components and events duplicate names +* (fix) Changes type of monitor components to activate +* (chore) Fixes OS versions in CI +* (chore) Adds arms builds to debian packaging +* (refactor) Changes the fuction to create events from jsons +* (fix) Fixes events parsing using serialized events as dictionary keys +* (docs) Adds verification tag +* (docs) Adds external links to docs +* (docs) Adds source link to docs +* Contributors: ahr, mkabtoul + 0.2.5 (2025-01-07) ------------------ * (fix) Gets imports and default values based on installed distro diff --git a/docs/_static/automatika-logo.png b/docs/_static/automatika-logo.png new file mode 100644 index 0000000..d18952b Binary files /dev/null and b/docs/_static/automatika-logo.png differ diff --git a/docs/conf.py b/docs/conf.py index fc24264..a8033c5 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -60,7 +60,10 @@ "substitution", "tasklist", ] - +language = "en" +myst_html_meta = { + "google-site-verification": "cQVj-BaADcGVOGB7GOvfbkgJjxni10C2fYWCZ03jOeo" +} html_theme = "sphinx_book_theme" # install with `pip install sphinx-book-theme` html_static_path = ["_static"] @@ -69,5 +72,29 @@ "logo": { "image_light": "_static/ROS_SUGAR_DARK.png", "image_dark": "_static/ROS_SUGAR.png", - } + }, + "icon_links": [ + { + "name": "Automatika", + "url": "https://automatikarobotics.com/", + "icon": "_static/automatika-logo.png", + "type": "local", + }, + { + "name": "GitHub", + "url": "https://github.com/automatika-robotics/ros-sugar", + "icon": "fa-brands fa-github", + }, + { + "name": "Discord", + "url": "https://discord.gg/cAW3BWwt", + "icon": "fa-brands fa-discord", + }, + ], + "path_to_docs": "docs", + "repository_url": "https://github.com/automatika-robotics/ros-sugar", + "repository_branch": "main", + "use_source_button": True, + "use_issues_button": True, + "use_edit_page_button": True, } diff --git a/package.xml b/package.xml index 9a80492..61ecdaa 100644 --- a/package.xml +++ b/package.xml @@ -2,7 +2,7 @@ automatika_ros_sugar - 0.2.5 + 0.2.6 Syntactic sugar for ROS2 nodes creation and management Automatika Robotics https://github.com/automatika/ros-sugar diff --git a/ros_sugar/core/component.py b/ros_sugar/core/component.py index 62fbe25..7bfc86f 100644 --- a/ros_sugar/core/component.py +++ b/ros_sugar/core/component.py @@ -31,7 +31,7 @@ from .action import Action from .event import Event -from ..events import json_to_events_list +from ..events import json_to_events_list, event_from_json from ..io.callbacks import GenericCallback from ..config.base_config import BaseComponentConfig, ComponentRunType, BaseAttrs from ..io.topic import Topic @@ -178,6 +178,16 @@ def rclpy_init_node(self, *args, **kwargs): ) self._create_default_services() + def is_node_initialized(self) -> bool: + """Checks if the rclpy Node is initialized + + :return: Is node initialized + :rtype: bool + """ + from rclpy.utilities import ok + + return ok() + def _reparse_inputs_callbacks(self, inputs: Sequence[Topic]) -> Sequence[Topic]: """Select inputs callbacks. Selects a callback for each input from the same component package if it exists. Otherwise, the first available callback will be assigned. Note: This method is added to enable using components from multiple packages in the same script, where each component prioritizes using callbacks from its own package. @@ -388,6 +398,8 @@ def activate(self): """ Create required subscriptions, publications, timers, ... etc. to activate the node """ + self.create_all_subscribers() + self.create_all_publishers() # Setup node services: servers and clients @@ -407,20 +419,20 @@ def deactivate(self): """ Destroy all declared subscriptions, publications, timers, ... etc. to deactivate the node """ + self.destroy_all_timers() + self.destroy_all_action_servers() self.destroy_all_services() - self.destroy_all_subscribers() - - self.destroy_all_publishers() - - self.destroy_all_timers() - self.destroy_all_action_clients() self.destroy_all_service_clients() + self.destroy_all_subscribers() + + self.destroy_all_publishers() + def configure(self, config_file: Optional[str] = None): """ Configure component from yaml file @@ -435,9 +447,6 @@ def configure(self, config_file: Optional[str] = None): # Init any global node variables self.init_variables() - # Setup node subscribers - self.create_all_subscribers() - # CREATION AND DESTRUCTION METHODS def init_variables(self): """ @@ -560,7 +569,7 @@ def destroy_all_subscribers(self): for listener in self.__event_listeners: self.destroy_subscription(listener) # Destroy all input subscribers - for callback in self.callbacks: + for callback in self.callbacks.values(): if callback._subscriber: self.destroy_subscription(callback._subscriber) callback._subscriber = None @@ -573,6 +582,7 @@ def destroy_all_publishers(self): if self.__enable_health_publishing: # Destroy health status publisher self.destroy_publisher(self.health_status_publisher) + self.health_status_publisher = None for publisher in self.publishers_dict.values(): if publisher._publisher: @@ -813,14 +823,6 @@ def loop_rate(self) -> float: def loop_rate(self, value: float): self.config.loop_rate = value - @property - def events(self) -> Optional[List[Event]]: - return self.__events - - @events.setter - def events(self, event_list: List[Event]) -> None: - self.__events = event_list - @property def events_actions(self) -> Dict[str, List[Action]]: """Getter of component Events/Actions @@ -837,7 +839,7 @@ def events_actions(self) -> Dict[str, List[Action]]: @events_actions.setter def events_actions( - self, events_actions_dict: Dict[Event, Union[Action, List[Action]]] + self, events_actions_dict: Dict[str, Union[Action, List[Action]]] ): """Setter of component Events/Actions @@ -847,14 +849,14 @@ def events_actions( """ self.__events = [] self.__actions = [] - for event, actions in events_actions_dict.items(): + for event_serialized, actions in events_actions_dict.items(): action_set = actions if isinstance(actions, list) else [actions] for action in action_set: if not hasattr(self, action.action_name): raise ValueError( f"Component '{self.node_name}' does not support action '{action.action_name}'" ) - self.__events.append(event) + self.__events.append(event_from_json(event_serialized)) self.__actions.append(action_set) # SERIALIZATION AND DESERIALIZATION @@ -1691,7 +1693,7 @@ def _main(self): # Execute main loop self._execution_step() - if self.__enable_health_publishing: + if self.__enable_health_publishing and self.health_status_publisher: self.health_status_publisher.publish(self.health_status()) # Execute once @@ -2250,9 +2252,6 @@ def on_deactivate( :rtype: lifecycle.TransitionCallbackReturn """ try: - # Call custom method - self.custom_on_deactivate() - self.deactivate() # Declare transition self.get_logger().info( @@ -2262,6 +2261,9 @@ def on_deactivate( self.destroy_timer(self.__fallbacks_check_timer) self.health_status.set_healthy() + # Call custom method + self.custom_on_deactivate() + except Exception as e: self.get_logger().error( f"Transition error for node {self.get_name()} to transition to state 'inactive': {e}" diff --git a/ros_sugar/core/event.py b/ros_sugar/core/event.py index eea5cf3..3ba4ef4 100644 --- a/ros_sugar/core/event.py +++ b/ros_sugar/core/event.py @@ -1,7 +1,6 @@ """Event""" import json -import os import threading import time import logging @@ -361,7 +360,6 @@ def __init__( if isinstance(nested_attributes, List) else [nested_attributes] ) - self.trigger_ref_value = trigger_value else: @@ -451,18 +449,21 @@ def dictionary(self) -> Dict: :return: Event description dictionary :rtype: Dict """ - return { + event_dict = { "event_name": self.name, "event_class": self.__class__.__name__, "topic": self.event_topic.to_json(), - "trigger_ref_value": self.trigger_ref_value, - "_attrs": self._attrs, "handle_once": self._handle_once, "event_delay": self._keep_event_delay, } + if hasattr(self, "trigger_ref_value"): + event_dict["trigger_ref_value"] = self.trigger_ref_value + if hasattr(self, "_attrs"): + event_dict["_attrs"] = self._attrs + return event_dict @dictionary.setter - def dictionary(self, dict_obj) -> None: + def dictionary(self, dict_obj: Dict) -> None: """ Setter of the event using a dictionary @@ -476,10 +477,12 @@ def dictionary(self, dict_obj) -> None: name="dummy_init", msg_type="String" ) # Dummy init to set from json self.event_topic.from_json(dict_obj["topic"]) - self.trigger_ref_value = dict_obj["trigger_ref_value"] - self._attrs = dict_obj["_attrs"] self._handle_once = dict_obj["handle_once"] self._keep_event_delay = dict_obj["event_delay"] + if dict_obj.get("trigger_ref_value"): + self.trigger_ref_value = dict_obj["trigger_ref_value"] + if dict_obj.get("_attrs"): + self._attrs = dict_obj["_attrs"] except Exception as e: logging.error(f"Cannot set Event from incompatible dictionary. {e}") raise diff --git a/ros_sugar/core/monitor.py b/ros_sugar/core/monitor.py index fe6af06..db3cb96 100644 --- a/ros_sugar/core/monitor.py +++ b/ros_sugar/core/monitor.py @@ -19,6 +19,7 @@ from ..config import BaseConfig from ..io.topic import Topic from .event import Event +from ..events import event_from_json from .action import Action from ..launch import logger @@ -41,12 +42,12 @@ def __init__( self, components_names: List[str], enable_health_status_monitoring: bool = True, - events_actions: Optional[Dict[Event, List[Action]]] = None, + events_actions: Optional[Dict[str, List[Action]]] = None, events_to_emit: Optional[List[Event]] = None, config: Optional[BaseConfig] = None, services_components: Optional[List[BaseComponent]] = None, action_servers_components: Optional[List[BaseComponent]] = None, - activate_on_start: Optional[List[BaseComponent]] = None, + activate_on_start: Optional[List[str]] = None, activation_timeout: Optional[float] = None, activation_attempt_time: float = 1.0, component_name: str = "monitor", @@ -67,8 +68,8 @@ def __init__( :type services_components: Optional[List[Component]], optional :param action_servers_components: List of components running as Action Servers, defaults to None :type action_servers_components: Optional[List[Component]], optional - :param activate_on_start: List of Lifecycle components to activate on start, defaults to None - :type activate_on_start: Optional[List[Component]], optional + :param activate_on_start: List of Lifecycle components names to activate on start, defaults to None + :type activate_on_start: Optional[List[str]], optional :param start_on_init: To activate provided components on start, defaults to False :type start_on_init: bool, optional :param component_name: Name of the ROS2 node, defaults to "monitor" @@ -98,7 +99,7 @@ def __init__( self._main_srv_clients: Dict[str, base_clients.ServiceClientHandler] = {} self._main_action_clients: Dict[str, base_clients.ActionClientHandler] = {} - self._components_to_activate_on_start = activate_on_start + self._components_to_activate_on_start: List[str] = activate_on_start self._enable_health_monitoring: bool = enable_health_status_monitoring @@ -182,19 +183,18 @@ def _check_and_activate_components(self) -> None: """ self.__activation_wait_time += self.__activation_attempt_time node_names = self.get_node_names() - components_to_activate_names = ( - [comp.node_name for comp in self._components_to_activate_on_start] - if self._components_to_activate_on_start - else [] - ) __notfound: Optional[set[str]] = None - if set(components_to_activate_names).issubset(set(node_names)): - logger.info(f"NODES '{components_to_activate_names}' ARE UP ... ACTIVATING") + if set(self._components_to_activate_on_start).issubset(set(node_names)): + logger.info( + f"NODES '{self._components_to_activate_on_start}' ARE UP ... ACTIVATING" + ) if self.__components_activation_event: self.__components_activation_event() self.destroy_timer(self.__components_monitor_timer) else: - __notfound = set(components_to_activate_names).difference(set(node_names)) + __notfound = set(self._components_to_activate_on_start).difference( + set(node_names) + ) logger.info(f"Waiting for Nodes '{__notfound}' to come up to activate ...") if ( self.__activation_timeout @@ -207,16 +207,6 @@ def _check_and_activate_components(self) -> None: f"Timeout while Waiting for nodes '{__notfound}' to come up to activate. A process might have died. If all processes are starting without errors, then this might be a ROS2 discovery problem. Run 'ros2 node list' to see if nodes with the same name already exist or old nodes are not killed properly. Alternatively, try to restart ROS2 daemon." ) - @property - def events(self): - """ - Monitored events getter - - :return: Events list - :rtype: List[Event] - """ - return self._events_actions.keys() - def _turn_on_component_management(self, component_name: str) -> None: """ Created clients for all main services in a given component @@ -513,7 +503,8 @@ def _activate_event_monitoring(self) -> None: Turn on all events """ if self._events_actions: - for event, actions in self._events_actions.items(): + for serialized_event, actions in self._events_actions.items(): + event = event_from_json(serialized_event) for action in actions: method = getattr(self, action.action_name) # register action to the event diff --git a/ros_sugar/events.py b/ros_sugar/events.py index 24d62f4..e7f9893 100644 --- a/ros_sugar/events.py +++ b/ros_sugar/events.py @@ -1,12 +1,47 @@ """Available Events""" -from typing import Union, Dict, Optional, List, Any +from typing import Union, Dict, List, Any import json from copy import deepcopy from .io.topic import Topic from .core.event import Event +def event_from_json( + json_obj: Union[str, bytes, bytearray], +) -> Event: + # Check if the serialized event contains a class name + event_as_dict = json.loads(json_obj) + if "event_class" not in event_as_dict.keys(): + raise ValueError( + "Cannot convert json object to Events Dictionary. Json item is not a valid serialized Event" + ) + + # Get and check event class + event_class_name: str = event_as_dict["event_class"] + events_classes = [ + event.__name__ + for event in globals().values() + if type(event) is type and issubclass(event, Event) + ] + if event_class_name not in events_classes: + raise ValueError( + f"Cannot convert json object to Event. Unknown event class '{event_class_name}'" + ) + + # Construct new event + event_type = globals()[event_class_name] + event: Event = event_type( + event_as_dict["event_name"], + event_as_dict, + trigger_value=event_as_dict["trigger_ref_value"] + if event_as_dict.get("trigger_ref_value") + else None, + nested_attributes=[], + ) + return event + + def json_to_events_list( json_obj: Union[str, bytes, bytearray], ) -> List: @@ -24,38 +59,17 @@ def json_to_events_list( list_obj = json.loads(json_obj) events_list = [] for event_serialized in list_obj: - # Check if the serialized event contains a class name - event_as_dict = json.loads(event_serialized) - if "event_class" not in event_as_dict.keys(): - raise ValueError( - "Cannot convert json object to Events Dictionary. Json item is not a valid serialized Event" - ) - - # Get and check event class - event_class_name: str = event_as_dict["event_class"] - events_classes = [event.__name__ for event in available_events] - if event_class_name not in events_classes: - raise ValueError( - f"Cannot convert json object to Events Dictionary. Unknown event class '{event_class_name}'" - ) - - for event in available_events: - if event.__name__ == event_class_name: - # Construct new event - new_event = event( - event_as_dict["event_name"], - event_as_dict, - event_as_dict["trigger_ref_value"], - nested_attributes=[], - ) - # Add to events dictionary - events_list.append(deepcopy(new_event)) - + new_event = event_from_json(event_serialized) + events_list.append( + deepcopy(new_event) + ) # deepcopy is needed to avoid copying the previous event return events_list class OnAny(Event): - def __init__(self, event_name: str, event_source: Union[Topic, str, Dict]) -> None: + def __init__( + self, event_name: str, event_source: Union[Topic, str, Dict], **kwargs + ) -> None: """__init__. :param event_name: @@ -435,16 +449,3 @@ def _update_trigger(self) -> None: self.trigger = self._event_value <= self.trigger_ref_value else: self.trigger = self._event_value < self.trigger_ref_value - - -available_events: List[type] = [ - OnAny, - OnChange, - OnLess, - OnGreater, - OnChangeEqual, - OnDifferent, - OnEqual, - OnContainsAll, - OnContainsAny, -] diff --git a/ros_sugar/io/supported_types.py b/ros_sugar/io/supported_types.py index 801f2bc..dab29d3 100644 --- a/ros_sugar/io/supported_types.py +++ b/ros_sugar/io/supported_types.py @@ -37,48 +37,52 @@ _additional_types = [] -def _update_supportedtype_callback(existing_class: type, new_type: type) -> None: - if not new_type.callback or new_type.callback == existing_class.callback: +def _update_supportedtype_callback(existing_class: type, new_class: type) -> None: + if not new_class.callback or new_class.callback == existing_class.callback: # If new type has no callback or it is the same as the current callback -> exit return if not existing_class.callback: # No callback is found for the existing class -> get callback from new type - existing_class.callback = new_type.callback + existing_class.callback = new_class.callback else: # If a callback already exists -> augment the list with a new callback if isinstance(existing_class.callback, List) and isinstance( - new_type.callback, List + new_class.callback, List ): - existing_class.callback.extend(new_type.callback) - elif isinstance(existing_class.callback, List) and not isinstance( - new_type.callback, List + existing_class.callback.extend(new_class.callback) + elif ( + isinstance(existing_class.callback, List) + and not isinstance(new_class.callback, List) + and new_class.callback not in existing_class.callback ): - existing_class.callback.append(new_type.callback) - else: + existing_class.callback.append(new_class.callback) + elif not isinstance(existing_class.callback, List): existing_class.callback = [ existing_class.callback, - new_type.callback, + new_class.callback, ] -def _update_supportedtype_conversion(existing_class: type, new_type: type) -> None: - if not new_type.convert or new_type.convert == existing_class.convert: +def _update_supportedtype_conversion(existing_class: type, new_class: type) -> None: + if not new_class.convert or new_class.convert == existing_class.convert: return if not existing_class.convert: - existing_class.convert = new_type.convert + existing_class.convert = new_class.convert else: if isinstance(existing_class.convert, List) and isinstance( - new_type.convert, List + new_class.convert, List ): - existing_class.convert.extend(new_type.convert) - elif isinstance(existing_class.convert, List) and not isinstance( - new_type.convert, List + existing_class.convert.extend(new_class.convert) + elif ( + isinstance(existing_class.convert, List) + and not isinstance(new_class.convert, List) + and new_class.convert not in existing_class.convert ): - existing_class.convert.append(new_type.convert) - else: + existing_class.convert.append(new_class.convert) + elif not isinstance(existing_class.convert, List): existing_class.convert = [ existing_class.convert, - new_type.convert, + new_class.convert, ] @@ -91,28 +95,29 @@ def add_additional_datatypes(types: List[type]) -> None: global _additional_types # Create a dictionary for quick lookup of existing classes by name type_dict = {t.__name__: t for t in _additional_types} + import logging - for new_type in types: - if new_type.__name__ in type_dict: + for new_class in types: + if new_class.__name__ in type_dict: # Update the existing class with non-None attributes from the new class - existing_class = type_dict[new_type.__name__] + existing_class = type_dict[new_class.__name__] - if existing_class == SupportedType: + if existing_class == SupportedType or existing_class == new_class: # Skip parent continue - _update_supportedtype_callback(existing_class, new_type) + _update_supportedtype_callback(existing_class, new_class) - if hasattr(new_type, "_ros_type") and ( + if hasattr(new_class, "_ros_type") and ( not hasattr(existing_class, "_ros_type") or not existing_class._ros_type ): - existing_class._ros_type = new_type._ros_type + existing_class._ros_type = new_class._ros_type - _update_supportedtype_conversion(existing_class, new_type) + _update_supportedtype_conversion(existing_class, new_class) else: # Add the new class to the list - _additional_types.append(new_type) + _additional_types.append(new_class) class Meta(type): diff --git a/ros_sugar/io/topic.py b/ros_sugar/io/topic.py index 9b69f98..4217c94 100644 --- a/ros_sugar/io/topic.py +++ b/ros_sugar/io/topic.py @@ -154,7 +154,7 @@ def _msg_type_validator(self, _, val): f"Got value of 'msg_type': {val}, which is not in available datatypes. Topics can only be created with one of the following types: { {msg_t.__name__: msg_t for msg_t in msg_types} }" ) # Set ros type - self.ros_msg_type = self.msg_type.get_ros_type() + self.ros_msg_type = val.get_ros_type() @define(kw_only=True) diff --git a/ros_sugar/launch/launcher.py b/ros_sugar/launch/launcher.py index 40b5cb7..0f19719 100644 --- a/ros_sugar/launch/launcher.py +++ b/ros_sugar/launch/launcher.py @@ -112,14 +112,19 @@ def __init__( self._launch_group = [] # Components list and package/executable - self.components: List[BaseComponent] = [] + self._components: List[BaseComponent] = [] self._pkg_executable: List[Tuple[Optional[str], Optional[str]]] = [] # To track each package log level when the pkg is added self._pkg_log_level: Dict[str, str] = {} # Component: run_in_process (true/false) - self.__components_to_activate_on_start: Dict[BaseComponent, bool] = {} + self.__component_names_to_activate_on_start_mp: List[ + str + ] = [] # List of multiprocessing component names to activate on start by the monitor + self.__components_to_activate_on_start_threaded: List[ + BaseComponent + ] = [] # List of threaded component names to activate on start # Timeout for activating components on start self.__components_activation_timeout = activation_timeout @@ -127,12 +132,14 @@ def __init__( # Events/Actions dictionaries self._internal_events: Optional[List[Event]] = None self._internal_event_names: Optional[List[str]] = None - self._monitor_actions: Dict[Event, List[Action]] = {} - self._ros_actions: Dict[Event, List[ROSLaunchAction]] = {} - self._components_actions: Dict[Event, List[Action]] = {} + self._ros_actions: Dict[str, List[ROSLaunchAction]] = {} + # Dictionaries {serialized_event: actions} + self._monitor_actions: Dict[str, List[Action]] = {} + self._components_actions: Dict[str, List[Action]] = {} + self.__events_names: List[str] = [] # Thread pool for external processors - self.thread_pool: Union[ThreadPoolExecutor, None] = None + self._thread_pool: Union[ThreadPoolExecutor, None] = None def add_pkg( self, @@ -173,34 +180,33 @@ def add_pkg( "Cannot run in multi-processes without specifying ROS2 'package_name' and 'executable_entry_point'" ) - if not multiprocessing: - package_name = None - executable_entry_point = None + package_name = package_name if multiprocessing else None + executable_entry_point = executable_entry_point if multiprocessing else None # Extend existing components - if not self.components: - self.components = components - self._pkg_executable = [(package_name, executable_entry_point)] * len( - components - ) - - else: - # Extend the current list of components - self.components.extend(components) - self._pkg_executable.extend( - [(package_name, executable_entry_point)] * len(components) - ) + self._components.extend(components) + self._pkg_executable.extend( + [(package_name, executable_entry_point)] * len(components) + ) # Register which components to activate on start if components_to_activate_on_start: - self.__components_to_activate_on_start.update( - (component, multiprocessing) - for component in components_to_activate_on_start - ) + if multiprocessing: + self.__component_names_to_activate_on_start_mp.extend([ + component.node_name for component in components_to_activate_on_start + ]) + else: + self.__components_to_activate_on_start_threaded.extend( + components_to_activate_on_start + ) + elif activate_all_components_on_start: - self.__components_to_activate_on_start.update( - (component, multiprocessing) for component in components - ) + if multiprocessing: + self.__component_names_to_activate_on_start_mp.extend([ + component.node_name for component in components + ]) + else: + self.__components_to_activate_on_start_threaded.extend(components) # Parse provided Events/Actions if events_actions and self.__enable_monitoring: @@ -224,14 +230,14 @@ def _setup_component_events_handlers(self, comp: BaseComponent): if not self._components_actions: return comp_dict = {} - for event, actions in self._components_actions.items(): + for event_serialized, actions in self._components_actions.items(): for action in actions: if comp.node_name == action.parent_component: - self.__update_dict_list(comp_dict, event, action) + self.__update_dict_list(comp_dict, event_serialized, action) if comp_dict: comp.events_actions = comp_dict - def __update_dict_list(self, dictionary: Dict[Any, List], name: Any, value: Any): + def __update_dict_list(self, dictionary: Dict[str, List], name: str, value: Any): """Helper method to add or update an item in a dictionary :param dictionary: Dictionary to be updated @@ -263,14 +269,20 @@ def __rewrite_actions_for_components( :raises ValueError: If given component action corresponds to unknown component """ + self.__events_names.extend(event.name for event in actions_dict) for condition, raw_action in actions_dict.items(): + serialized_condition: str = condition.json action_set: List[Union[Action, ROSLaunchAction]] = ( raw_action if isinstance(raw_action, list) else [raw_action] ) for action in action_set: # If it is a valid ROS launch action -> nothing is required if isinstance(action, ROSLaunchAction): - self.__update_dict_list(self._ros_actions, condition, action) + self.__update_dict_list(self._ros_actions, condition.name, action) + if not self._internal_events: + self._internal_events = [condition] + else: + self._internal_events.append(condition) # Check if it is a component action: elif action.component_action: action_object = action.executable.__self__ @@ -278,10 +290,14 @@ def __rewrite_actions_for_components( raise InvalidAction( f"Invalid action for condition '{condition.name}'. Action component '{action_object}' is unknown or not added to Launcher" ) - self.__update_dict_list(self._components_actions, condition, action) + self.__update_dict_list( + self._components_actions, serialized_condition, action + ) elif action.monitor_action: # Action to execute through the monitor - self.__update_dict_list(self._monitor_actions, condition, action) + self.__update_dict_list( + self._monitor_actions, serialized_condition, action + ) def _activate_components_action(self) -> SomeEntitiesType: """ @@ -291,12 +307,12 @@ def _activate_components_action(self) -> SomeEntitiesType: :type in_processes: bool """ activation_actions = [] - for component, run_in_process in self.__components_to_activate_on_start.items(): - if run_in_process: - activation_actions.extend(self.start(component.node_name)) - else: - start_action = Action(component.start) - activation_actions.append(start_action.launch_action()) + for component_name in self.__component_names_to_activate_on_start_mp: + activation_actions.extend(self.start(component_name)) + + for component in self.__components_to_activate_on_start_threaded: + start_action = Action(component.start) + activation_actions.append(start_action.launch_action()) return activation_actions # LAUNCH ACTION HANDLERS @@ -369,7 +385,7 @@ def fallback_rate(self) -> Dict: """ return { component.node_name: component.fallback_rate - for component in self.components + for component in self._components } @fallback_rate.setter @@ -380,7 +396,7 @@ def fallback_rate(self, value: float) -> None: :param value: Fallback check rate (Hz) :type value: float """ - for component in self.components: + for component in self._components: component.fallback_rate = value def on_fail(self, action_name: str, max_retries: Optional[int] = None) -> None: @@ -392,7 +408,7 @@ def on_fail(self, action_name: str, max_retries: Optional[int] = None) -> None: :param max_retries: Maximum number of action execution retries. None is equivalent to unlimited retries, defaults to None :type max_retries: Optional[int], optional """ - for component in self.components: + for component in self._components: if action_name in component.fallbacks: method = getattr(component, action_name) method_params = inspect.signature(method).parameters @@ -428,7 +444,7 @@ def _get_action_launch_entity(self, action: Action) -> SomeEntitiesType: f"Requested unavailable component action: {action.parent_component}.{action.action_name}" ) from e comp = None - for comp in self.components: + for comp in self._components: if comp.node_name == action.parent_component: break if not comp: @@ -454,35 +470,33 @@ def _setup_internal_events_handlers(self, nodes_in_processes: bool = True) -> No if not self._ros_actions: return - - for event, action_set in self._ros_actions.items(): - log_action = LogInfo(msg=f"GOT TRIGGER FOR EVENT {event.name}") - entities_dict[event.name] = [log_action] - + for event_name, action_set in self._ros_actions.items(): + log_action = LogInfo(msg=f"GOT TRIGGER FOR EVENT {event_name}") + entities_dict[event_name] = [log_action] for action in action_set: if isinstance(action, ROSLaunchAction): - entities_dict[event.name].append(action) + entities_dict[event_name].append(action) # Check action type elif action.component_action and nodes_in_processes: # Re-parse action for component related actions entities = self._get_action_launch_entity(action) if isinstance(entities, list): - entities_dict[event.name].extend(entities) + entities_dict[event_name].extend(entities) else: - entities_dict[event.name].append(entities) + entities_dict[event_name].append(entities) # If the action is not related to a component -> add opaque executable to launch else: - entities_dict[event.name].append( + entities_dict[event_name].append( action.launch_action(monitor_node=self.monitor_node) ) # Register a new internal event handler internal_events_handler = launch.actions.RegisterEventHandler( OnInternalEvent( - internal_event_name=event.name, - entities=entities_dict[event.name], + internal_event_name=event_name, + entities=entities_dict[event_name], ) ) self._description.add_action(internal_events_handler) @@ -494,8 +508,7 @@ def _setup_monitor_node(self, nodes_in_processes: bool = True) -> None: :type nodes_in_processes: bool, optional """ # Update internal events - if self._ros_actions: - self._internal_events = list(self._ros_actions.keys()) + if self._internal_events: self._internal_event_names = [ev.name for ev in self._internal_events] # Check that all internal events have unique names if len(set(self._internal_event_names)) != len(self._internal_event_names): @@ -505,16 +518,18 @@ def _setup_monitor_node(self, nodes_in_processes: bool = True) -> None: # Get components running as servers to create clients in Monitor services_components = [ - comp for comp in self.components if comp.run_type == ComponentRunType.SERVER + comp + for comp in self._components + if comp.run_type == ComponentRunType.SERVER ] action_components = [ comp - for comp in self.components + for comp in self._components if comp.run_type == ComponentRunType.ACTION_SERVER ] # Setup the monitor node - components_names = [comp.node_name for comp in self.components] + components_names = [comp.node_name for comp in self._components] # Check that all components have unique names if len(set(components_names)) != len(components_names): @@ -522,6 +537,14 @@ def _setup_monitor_node(self, nodes_in_processes: bool = True) -> None: f"Got duplicate component names in: {components_names}. Cannot launch components with duplicate names. Provide unique names for all your components" ) + all_components_to_activate_on_start = ( + self.__component_names_to_activate_on_start_mp + + [ + comp.node_name + for comp in self.__components_to_activate_on_start_threaded + ] + ) + self.monitor_node = Monitor( components_names=components_names, enable_health_status_monitoring=self.__enable_monitoring, @@ -529,7 +552,7 @@ def _setup_monitor_node(self, nodes_in_processes: bool = True) -> None: events_to_emit=self._internal_events, services_components=services_components, action_servers_components=action_components, - activate_on_start=list(self.__components_to_activate_on_start.keys()), + activate_on_start=all_components_to_activate_on_start, activation_timeout=self.__components_activation_timeout, ) @@ -585,8 +608,8 @@ def _setup_external_processors(self, component: BaseComponent) -> None: if not component._external_processors: return - if not self.thread_pool: - self.thread_pool = ThreadPoolExecutor() + if not self._thread_pool: + self._thread_pool = ThreadPoolExecutor() for key, processor_data in component._external_processors.items(): for processor in processor_data[0]: @@ -599,7 +622,7 @@ def _setup_external_processors(self, component: BaseComponent) -> None: s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) s.bind(sock_file) s.listen(0) - self.thread_pool.submit( + self._thread_pool.submit( self.__listen_for_external_processing, s, processor ) # type: ignore @@ -710,13 +733,13 @@ def configure( # Configure one component with given name if component_name: - for component in self.components: + for component in self._components: if component.node_name == component_name: component.config_from_yaml(config_file) return # If no component is specified -> configure all components - for component in self.components: + for component in self._components: component.config_from_yaml(config_file) def add_py_executable(self, path_to_executable: str, name: str = "python3"): @@ -756,6 +779,23 @@ def add_method( method_action = OpaqueFunction(function=method, args=args, kwargs=kwargs) self._description.add_action(method_action) + def _check_duplicate_names(self) -> None: + """Checks for components/events with duplicate names in the launcher + + :raises ValueError: If two components or events are found with the same name + """ + for i in range(len(self._components) - 1): + if self._components[i].node_name == self._components[i + 1].node_name: + error_msg = f"Found duplicate component name: '{self._components[i].node_name}'. Please use unique names for all your components to avoid duplicate ROS2 node names" + logger.exception(error_msg) + raise ValueError(error_msg) + + for i in range(len(self.__events_names) - 1): + if self.__events_names[i] == self.__events_names[i + 1]: + error_msg = f"Found duplicate event name: '{self.__events_names[i]}'. Please use unique names for all your events" + logger.exception(error_msg) + raise ValueError(error_msg) + def bringup( self, config_file: str | None = None, @@ -766,11 +806,13 @@ def bringup( """ Bring up the Launcher """ - if not self.components: + if not self._components: raise ValueError( "Cannot bringup without adding any components. Use 'add_pkg' method to add a set of components from one ROS2 package then use 'bringup' to start and run your system" ) + self._check_duplicate_names() + # SET PROCESS NAME setproctitle.setproctitle(logger.name) @@ -779,11 +821,11 @@ def bringup( self._setup_monitor_node() - for component in self.components: + for component in self._components: self._setup_component_events_handlers(component) # Add configured components to launcher - for idx, component in enumerate(self.components): + for idx, component in enumerate(self._components): pkg_name, executable_name = self._pkg_executable[idx] if pkg_name and executable_name: self._setup_component_in_process( @@ -798,8 +840,8 @@ def bringup( self._start_ros_launch(introspect, launch_debug) - if self.thread_pool: - self.thread_pool.shutdown() + if self._thread_pool: + self._thread_pool.shutdown() logger.info("------------------------------------") logger.info("ALL COMPONENTS ENDED")