diff --git a/example_11/CMakeLists.txt b/example_11/CMakeLists.txt index e6abfba4..9a66bcd3 100644 --- a/example_11/CMakeLists.txt +++ b/example_11/CMakeLists.txt @@ -68,6 +68,7 @@ if(BUILD_TESTING) ament_add_pytest_test(example_11_urdf_xacro test/test_urdf_xacro.py) ament_add_pytest_test(view_example_11_launch test/test_view_robot_launch.py) ament_add_pytest_test(run_example_11_launch test/test_carlikebot_launch.py) + ament_add_pytest_test(run_example_11_launch_remapped test/test_carlikebot_launch_remapped.py) endif() ## EXPORTS diff --git a/example_11/test/test_carlikebot_launch.py b/example_11/test/test_carlikebot_launch.py index 1afb4ff3..f5627698 100644 --- a/example_11/test/test_carlikebot_launch.py +++ b/example_11/test/test_carlikebot_launch.py @@ -37,6 +37,7 @@ from launch.actions import IncludeLaunchDescription from launch.launch_description_sources import PythonLaunchDescriptionSource from launch_testing.actions import ReadyToTest +from launch_testing_ros import WaitForTopics # import launch_testing.markers import rclpy @@ -46,6 +47,8 @@ check_node_running, ) +from tf2_msgs.msg import TFMessage + # Executes the given launch file and checks if all nodes can be started @pytest.mark.rostest @@ -98,6 +101,13 @@ def test_check_if_msgs_published(self): ], ) + def test_remapped_topic(self): + # test if the remapping of the odometry topic is disabled + old_topic = "/bicycle_steering_controller/tf_odometry" + wait_for_topics = WaitForTopics([(old_topic, TFMessage)]) + assert wait_for_topics.wait(), f"Topic '{old_topic}' not found!" + wait_for_topics.shutdown() + # TODO(anyone): enable this if shutdown of ros2_control_node does not fail anymore # @launch_testing.post_shutdown_test() diff --git a/example_11/test/test_carlikebot_launch_remapped.py b/example_11/test/test_carlikebot_launch_remapped.py new file mode 100644 index 00000000..da7aebfc --- /dev/null +++ b/example_11/test/test_carlikebot_launch_remapped.py @@ -0,0 +1,120 @@ +# Copyright (c) 2024 AIT - Austrian Institute of Technology GmbH +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# +# * Neither the name of the {copyright_holder} nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. +# +# Author: Christoph Froehlich + +import os +import pytest +import unittest + +from ament_index_python.packages import get_package_share_directory +from launch import LaunchDescription +from launch.actions import IncludeLaunchDescription +from launch.launch_description_sources import PythonLaunchDescriptionSource +from launch_testing.actions import ReadyToTest +from launch_testing_ros import WaitForTopics + +# import launch_testing.markers +import rclpy +from ros2_control_demo_testing.test_utils import ( + check_controllers_running, + check_if_js_published, + check_node_running, +) + +from tf2_msgs.msg import TFMessage + + +# Executes the given launch file and checks if all nodes can be started +@pytest.mark.rostest +def generate_test_description(): + launch_include = IncludeLaunchDescription( + PythonLaunchDescriptionSource( + os.path.join( + get_package_share_directory("ros2_control_demo_example_11"), + "launch/carlikebot.launch.py", + ) + ), + launch_arguments={"gui": "false", "remap_odometry_tf": "true"}.items(), + ) + + return LaunchDescription([launch_include, ReadyToTest()]) + + +# This is our test fixture. Each method is a test case. +# These run alongside the processes specified in generate_test_description() +class TestFixture(unittest.TestCase): + @classmethod + def setUpClass(cls): + rclpy.init() + + @classmethod + def tearDownClass(cls): + rclpy.shutdown() + + def setUp(self): + self.node = rclpy.create_node("test_node") + + def tearDown(self): + self.node.destroy_node() + + def test_node_start(self, proc_output): + check_node_running(self.node, "robot_state_publisher") + + def test_controller_running(self, proc_output): + + cnames = ["bicycle_steering_controller", "joint_state_broadcaster"] + + check_controllers_running(self.node, cnames) + + def test_check_if_msgs_published(self): + check_if_js_published( + "/joint_states", + [ + "virtual_front_wheel_joint", + "virtual_rear_wheel_joint", + ], + ) + + def test_remapped_topic(self): + # we don't want to implement a tf lookup here + # so just check if the unmapped topic is not published + old_topic = "/bicycle_steering_controller/tf_odometry" + wait_for_topics = WaitForTopics([(old_topic, TFMessage)]) + assert not wait_for_topics.wait(), f"Topic '{old_topic}' found, but should be remapped!" + wait_for_topics.shutdown() + + +# TODO(anyone): enable this if shutdown of ros2_control_node does not fail anymore +# @launch_testing.post_shutdown_test() +# # These tests are run after the processes in generate_test_description() have shutdown. +# class TestDescriptionCraneShutdown(unittest.TestCase): + +# def test_exit_codes(self, proc_info): +# """Check if the processes exited normally.""" +# launch_testing.asserts.assertExitCodes(proc_info)