diff --git a/example_7/reference_generator/send_trajectory.cpp b/example_7/reference_generator/send_trajectory.cpp index 36e2bc2f0..0574231a9 100644 --- a/example_7/reference_generator/send_trajectory.cpp +++ b/example_7/reference_generator/send_trajectory.cpp @@ -100,6 +100,7 @@ int main(int argc, char ** argv) std::fill(last_point_msg.velocities.begin(), last_point_msg.velocities.end(), 0.0); pub->publish(trajectory_msg); + RCLCPP_INFO(node->get_logger(), "Trajectory sent with %lu points", trajectory_msg.points.size()); while (rclcpp::ok()) { } diff --git a/example_7/test/test_r6bot_controller_launch.py b/example_7/test/test_r6bot_controller_launch.py index 7dbd199d5..390ff93e7 100644 --- a/example_7/test/test_r6bot_controller_launch.py +++ b/example_7/test/test_r6bot_controller_launch.py @@ -30,6 +30,9 @@ import os import pytest +import psutil +import subprocess +import time import unittest from ament_index_python.packages import get_package_share_directory @@ -39,12 +42,12 @@ from launch_testing.actions import ReadyToTest import launch_testing.markers -import rclpy from controller_manager.test_utils import ( check_controllers_running, check_if_js_published, check_node_running, ) +import rclpy # Executes the given launch file and checks if all nodes can be started @@ -94,6 +97,30 @@ def test_check_if_msgs_published(self): "/joint_states", ["joint_1", "joint_2", "joint_3", "joint_4", "joint_5", "joint_6"] ) + def test_check_if_trajectory_published(self, proc_output): + + topic = "/r6bot_controller/joint_trajectory" + + command = [ + "ros2", + "launch", + "ros2_control_demo_example_7", + "send_trajectory.launch.py", + ] + subprocess.Popen(command) # the process won't finish + time.sleep(1) + pubs = self.node.get_publishers_info_by_topic(topic) + assert len(pubs) > 0, f"No publisher for topic '{topic}' not found!" + + # message will be published only once, so WaitForTopics is not suitable here. + # let's just check if the controller received and processed the message + proc_output.assertWaitFor("Trajectory execution complete", timeout=10, stream="stderr") + + for proc in psutil.process_iter(): + if "send_trajectory" in proc.name(): + proc.kill() + print(f"Process {proc.name()} killed") + @launch_testing.post_shutdown_test() # These tests are run after the processes in generate_test_description() have shutdown.