From 14b3a423a3ea81dc3e46656733703f9e070e3feb Mon Sep 17 00:00:00 2001 From: Christoph Froehlich Date: Sun, 18 May 2025 10:34:53 +0000 Subject: [PATCH 1/2] Add a test to check if trajectory message is sent --- example_7/CMakeLists.txt | 1 + example_7/test/test_send_trajectory_launch.py | 95 +++++++++++++++++++ 2 files changed, 96 insertions(+) create mode 100644 example_7/test/test_send_trajectory_launch.py diff --git a/example_7/CMakeLists.txt b/example_7/CMakeLists.txt index 3836c0b1a..279e8102c 100644 --- a/example_7/CMakeLists.txt +++ b/example_7/CMakeLists.txt @@ -108,6 +108,7 @@ if(BUILD_TESTING) ament_add_pytest_test(example_7_urdf_xacro test/test_urdf_xacro.py) ament_add_pytest_test(view_example_7_launch test/test_view_robot_launch.py) ament_add_pytest_test(run_example_7_launch test/test_r6bot_controller_launch.py) + ament_add_pytest_test(test_send_trajectory_launch test/test_send_trajectory_launch.py) endif() ## EXPORTS diff --git a/example_7/test/test_send_trajectory_launch.py b/example_7/test/test_send_trajectory_launch.py new file mode 100644 index 000000000..12de44623 --- /dev/null +++ b/example_7/test/test_send_trajectory_launch.py @@ -0,0 +1,95 @@ +# Copyright (c) 2024 AIT - Austrian Institute of Technology GmbH +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# +# * Neither the name of the {copyright_holder} nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. +# +# Author: Christoph Froehlich + +import os +import pytest +import unittest + +from ament_index_python.packages import get_package_share_directory +from launch import LaunchDescription +from launch.actions import IncludeLaunchDescription +from launch.launch_description_sources import PythonLaunchDescriptionSource +from launch_testing.actions import ReadyToTest +from launch_testing_ros import WaitForTopics +from trajectory_msgs.msg import JointTrajectory + +import launch_testing.markers +import rclpy + + +# Executes the given launch file and checks if all nodes can be started +@pytest.mark.rostest +def generate_test_description(): + launch_include = IncludeLaunchDescription( + PythonLaunchDescriptionSource( + os.path.join( + get_package_share_directory("ros2_control_demo_example_7"), + "launch/send_trajectory.launch.py", + ) + ) + ) + + return LaunchDescription([launch_include, ReadyToTest()]) + + +# This is our test fixture. Each method is a test case. +# These run alongside the processes specified in generate_test_description() +class TestFixture(unittest.TestCase): + @classmethod + def setUpClass(cls): + rclpy.init() + + @classmethod + def tearDownClass(cls): + rclpy.shutdown() + + def setUp(self): + self.node = rclpy.create_node("test_node") + + def tearDown(self): + self.node.destroy_node() + + def test_check_if_msgs_published(self): + topic = "/r6bot_controller/joint_trajectory" + wait_for_topics = WaitForTopics([(topic, JointTrajectory)], timeout=20.0) + assert wait_for_topics.wait(), f"Topic '{topic}' not found!" + msgs = wait_for_topics.received_messages(topic) + msg = msgs[0] + assert len(msg.points) == 200, "Wrong number of joints in message" + wait_for_topics.shutdown() + + +@launch_testing.post_shutdown_test() +# These tests are run after the processes in generate_test_description() have shutdown. +class TestDescriptionCraneShutdown(unittest.TestCase): + + def test_exit_codes(self, proc_info): + """Check if the processes exited normally.""" + launch_testing.asserts.assertExitCodes(proc_info) From fc384b807d3c1483d1a7278ffd5b8490c525b2e3 Mon Sep 17 00:00:00 2001 From: Christoph Froehlich Date: Sun, 18 May 2025 10:34:53 +0000 Subject: [PATCH 2/2] Change test to check for trajectory execution --- example_7/CMakeLists.txt | 1 - .../reference_generator/send_trajectory.cpp | 1 + .../test/test_r6bot_controller_launch.py | 29 +++++- example_7/test/test_send_trajectory_launch.py | 95 ------------------- 4 files changed, 29 insertions(+), 97 deletions(-) delete mode 100644 example_7/test/test_send_trajectory_launch.py diff --git a/example_7/CMakeLists.txt b/example_7/CMakeLists.txt index 279e8102c..3836c0b1a 100644 --- a/example_7/CMakeLists.txt +++ b/example_7/CMakeLists.txt @@ -108,7 +108,6 @@ if(BUILD_TESTING) ament_add_pytest_test(example_7_urdf_xacro test/test_urdf_xacro.py) ament_add_pytest_test(view_example_7_launch test/test_view_robot_launch.py) ament_add_pytest_test(run_example_7_launch test/test_r6bot_controller_launch.py) - ament_add_pytest_test(test_send_trajectory_launch test/test_send_trajectory_launch.py) endif() ## EXPORTS diff --git a/example_7/reference_generator/send_trajectory.cpp b/example_7/reference_generator/send_trajectory.cpp index 36e2bc2f0..0574231a9 100644 --- a/example_7/reference_generator/send_trajectory.cpp +++ b/example_7/reference_generator/send_trajectory.cpp @@ -100,6 +100,7 @@ int main(int argc, char ** argv) std::fill(last_point_msg.velocities.begin(), last_point_msg.velocities.end(), 0.0); pub->publish(trajectory_msg); + RCLCPP_INFO(node->get_logger(), "Trajectory sent with %lu points", trajectory_msg.points.size()); while (rclcpp::ok()) { } diff --git a/example_7/test/test_r6bot_controller_launch.py b/example_7/test/test_r6bot_controller_launch.py index 9dcf5d189..8ea90b4e2 100644 --- a/example_7/test/test_r6bot_controller_launch.py +++ b/example_7/test/test_r6bot_controller_launch.py @@ -30,6 +30,9 @@ import os import pytest +import psutil +import subprocess +import time import unittest from ament_index_python.packages import get_package_share_directory @@ -39,12 +42,12 @@ from launch_testing.actions import ReadyToTest import launch_testing.markers -import rclpy from controller_manager.test_utils import ( check_controllers_running, check_if_js_published, check_node_running, ) +import rclpy # Executes the given launch file and checks if all nodes can be started @@ -94,6 +97,30 @@ def test_check_if_msgs_published(self): "/joint_states", ["joint_1", "joint_2", "joint_3", "joint_4", "joint_5", "joint_6"] ) + def test_check_if_trajectory_published(self, proc_output): + + topic = "/r6bot_controller/joint_trajectory" + + command = [ + "ros2", + "launch", + "ros2_control_demo_example_7", + "send_trajectory.launch.py", + ] + subprocess.Popen(command) # the process won't finish + time.sleep(1) + pubs = self.node.get_publishers_info_by_topic(topic) + assert len(pubs) > 0, f"No publisher for topic '{topic}' not found!" + + # message will be published only once, so WaitForTopics is not suitable here. + # let's just check if the controller received and processed the message + proc_output.assertWaitFor("Trajectory execution complete", timeout=10, stream="stderr") + + for proc in psutil.process_iter(): + if "send_trajectory" in proc.name(): + proc.kill() + print(f"Process {proc.name()} killed") + @launch_testing.post_shutdown_test() # These tests are run after the processes in generate_test_description() have shutdown. diff --git a/example_7/test/test_send_trajectory_launch.py b/example_7/test/test_send_trajectory_launch.py deleted file mode 100644 index 12de44623..000000000 --- a/example_7/test/test_send_trajectory_launch.py +++ /dev/null @@ -1,95 +0,0 @@ -# Copyright (c) 2024 AIT - Austrian Institute of Technology GmbH -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# -# * Redistributions in binary form must reproduce the above copyright -# notice, this list of conditions and the following disclaimer in the -# documentation and/or other materials provided with the distribution. -# -# * Neither the name of the {copyright_holder} nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE -# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR -# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF -# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN -# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) -# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE -# POSSIBILITY OF SUCH DAMAGE. -# -# Author: Christoph Froehlich - -import os -import pytest -import unittest - -from ament_index_python.packages import get_package_share_directory -from launch import LaunchDescription -from launch.actions import IncludeLaunchDescription -from launch.launch_description_sources import PythonLaunchDescriptionSource -from launch_testing.actions import ReadyToTest -from launch_testing_ros import WaitForTopics -from trajectory_msgs.msg import JointTrajectory - -import launch_testing.markers -import rclpy - - -# Executes the given launch file and checks if all nodes can be started -@pytest.mark.rostest -def generate_test_description(): - launch_include = IncludeLaunchDescription( - PythonLaunchDescriptionSource( - os.path.join( - get_package_share_directory("ros2_control_demo_example_7"), - "launch/send_trajectory.launch.py", - ) - ) - ) - - return LaunchDescription([launch_include, ReadyToTest()]) - - -# This is our test fixture. Each method is a test case. -# These run alongside the processes specified in generate_test_description() -class TestFixture(unittest.TestCase): - @classmethod - def setUpClass(cls): - rclpy.init() - - @classmethod - def tearDownClass(cls): - rclpy.shutdown() - - def setUp(self): - self.node = rclpy.create_node("test_node") - - def tearDown(self): - self.node.destroy_node() - - def test_check_if_msgs_published(self): - topic = "/r6bot_controller/joint_trajectory" - wait_for_topics = WaitForTopics([(topic, JointTrajectory)], timeout=20.0) - assert wait_for_topics.wait(), f"Topic '{topic}' not found!" - msgs = wait_for_topics.received_messages(topic) - msg = msgs[0] - assert len(msg.points) == 200, "Wrong number of joints in message" - wait_for_topics.shutdown() - - -@launch_testing.post_shutdown_test() -# These tests are run after the processes in generate_test_description() have shutdown. -class TestDescriptionCraneShutdown(unittest.TestCase): - - def test_exit_codes(self, proc_info): - """Check if the processes exited normally.""" - launch_testing.asserts.assertExitCodes(proc_info)