1515
1616import pytest
1717import unittest
18- import tempfile
19- import time
20- from pathlib import Path
21-
22- from ament_index_python .packages import get_package_share_directory , get_package_prefix
2318from launch import LaunchDescription
2419import launch_testing
2520from launch_testing .actions import ReadyToTest
2621import launch_ros .actions
22+ from launch .substitutions import PathSubstitution
23+ from launch_ros .substitutions import FindPackageShare
24+ from launch .launch_context import LaunchContext
2725
2826import rclpy
2927
28+ from controller_manager .test_utils import check_controllers_running
29+
3030from controller_manager .launch_utils import (
3131 generate_controllers_spawner_launch_description_from_dict ,
3232)
@@ -40,76 +40,69 @@ def generate_test_description():
4040 THIS VERSION CREATES ALL NEEDED FILES DYNAMICALLY AND USES THE COMBINED CONFIG.
4141 """
4242
43- # Create temporary directory for all test files
44- temp_dir = tempfile .mkdtemp ()
45- print (f"Creating test files in: { temp_dir } " )
46-
4743 # URDF path (pathlib version, no xacro)
48- urdf = (
49- Path ( get_package_share_directory ("ros2_control_test_assets" ))
44+ urdf_subst = (
45+ PathSubstitution ( FindPackageShare ("ros2_control_test_assets" ))
5046 / "urdf"
5147 / "test_hardware_components.urdf"
5248 )
5349
54- with open (urdf ) as infp :
50+ context = LaunchContext ()
51+
52+ urdf_path_str = urdf_subst .perform (context )
53+
54+ # DEBUG: You can print the resolved path here to verify:
55+ print (f"Resolved URDF Path: { urdf_path_str } " )
56+
57+ with open (urdf_path_str ) as infp :
5558 robot_description_content = infp .read ()
5659 robot_description = {"robot_description" : robot_description_content }
5760
5861 # Path to combined YAML
59- robot_controllers = (
60- Path ( get_package_prefix ("controller_manager" ))
62+ robot_controllers_subst = (
63+ PathSubstitution ( FindPackageShare ("controller_manager" ))
6164 / "test"
6265 / "test_ros2_control_node_combined.yaml"
6366 )
6467
65- # Verify files exist (Path method)
66- assert robot_controllers .is_file (), f"Controller config not found: { robot_controllers } "
67- assert urdf .is_file (), f"URDF not found: { urdf } "
68-
69- robot_state_pub_node = launch_ros .actions .Node (
70- package = "robot_state_publisher" ,
71- executable = "robot_state_publisher" ,
72- output = "both" ,
73- parameters = [robot_description ],
74- )
75-
76- # ===== START CONTROLLER MANAGER (ros2_control_node) =====
77- control_node = launch_ros .actions .Node (
78- package = "controller_manager" ,
79- executable = "ros2_control_node" ,
80- parameters = [str (robot_controllers )], # Use the combined config file
81- output = "both" ,
82- )
68+ robot_controllers_path = robot_controllers_subst .perform (context )
69+ print ("Resolved controller YAML:" , robot_controllers_path )
8370
8471 # The dictionary keys are the controller names to be spawned/started.
8572 # Values can be empty lists since config is provided via the main YAML.
8673 ctrl_dict = {
87- "joint_state_broadcaster" : [str ( robot_controllers ) ],
88- "controller1" : [str ( robot_controllers ) ],
89- "controller2" : [str ( robot_controllers ) ],
74+ "joint_state_broadcaster" : [],
75+ "controller1" : [],
76+ "controller2" : [],
9077 }
9178 controller_list = list (ctrl_dict .keys ())
9279
9380 # ===== GENERATE SPAWNER LAUNCH DESCRIPTION =====
9481 print (f"Spawning controllers: { controller_list } " )
9582
96- # Correct function name and call
97- spawner_ld = generate_controllers_spawner_launch_description_from_dict (
98- controller_info_dict = ctrl_dict ,
99- )
100-
10183 # ===== CREATE LAUNCH DESCRIPTION =====
102- ld = LaunchDescription (
103- [robot_state_pub_node , control_node , ReadyToTest ()] + spawner_ld .entities
104- )
105-
106- # Return tuple with launch description and test context
107- return ld , {
108- "controller_list" : controller_list , # Key name updated to match the test function
109- "robot_controllers" : robot_controllers ,
110- "urdf_file" : urdf ,
111- "temp_dir" : temp_dir ,
112- }
84+ return LaunchDescription (
85+ [
86+ launch_ros .actions .Node (
87+ package = "robot_state_publisher" ,
88+ executable = "robot_state_publisher" ,
89+ namespace = "" ,
90+ output = "both" ,
91+ parameters = [robot_description ],
92+ ),
93+ launch_ros .actions .Node (
94+ package = "controller_manager" ,
95+ executable = "ros2_control_node" ,
96+ namespace = "" ,
97+ parameters = [robot_description , robot_controllers_path ],
98+ output = "both" ,
99+ ),
100+ generate_controllers_spawner_launch_description_from_dict (
101+ controller_info_dict = ctrl_dict , extra_spawner_args = ["--activate" ]
102+ ),
103+ ReadyToTest (),
104+ ]
105+ ), {"controller_list" : controller_list }
113106
114107
115108# Active tests
@@ -124,65 +117,18 @@ def setUpClass(cls):
124117 def tearDownClass (cls ):
125118 rclpy .shutdown ()
126119
120+ def setUp (self ):
121+ self .node = rclpy .create_node ("test_controller_spawner" )
122+
127123 def test_spawner_nodes_launched (self , proc_info ):
128124 """Ensure processes are running."""
129125 process_names = proc_info .process_names ()
130126 self .assertGreater (len (process_names ), 0 )
131- print ("\n [TEST] Active processes: {process_names}" )
132-
133- def test_controllers_loaded (self , proc_info , controller_list ):
134- """Test that controllers were loaded (poll until they appear)."""
135- node = rclpy .create_node ("test_controller_query_node" )
136-
137- try :
138- from controller_manager_msgs .srv import ListControllers
139-
140- client = node .create_client (ListControllers , "/controller_manager/list_controllers" )
141-
142- print ("\n [TEST] Waiting for controller_manager service..." )
143- wait_for_svc_timeout = 30.0
144- if not client .wait_for_service (timeout_sec = wait_for_svc_timeout ):
145- process_names = proc_info .process_names ()
146- self .fail (
147- f"Controller manager service not available after { wait_for_svc_timeout } s.\n "
148- f"Active processes: { process_names } "
149- )
150-
151- # Poll for controllers to be registered
152- print ("[TEST] Service available, polling for controllers (timeout 30s)..." )
153- deadline = node .get_clock ().now () + rclpy .duration .Duration (seconds = 30.0 )
154- seen = []
155- while node .get_clock ().now () < deadline :
156- req = ListControllers .Request ()
157- fut = client .call_async (req )
158- rclpy .spin_until_future_complete (node , fut , timeout_sec = 2.0 )
159- if fut .done () and fut .result () is not None :
160- response = fut .result ()
161- seen = [c .name for c in response .controller ]
162- if all (ctrl in seen for ctrl in controller_list ):
163- print (f"[TEST] Loaded controllers: { seen } " )
164- break
165- # small sleep to avoid tight-loop
166- time .sleep (0.2 )
167- else :
168- # timeout expired
169- self .fail (
170- f"Timeout waiting for controllers to be loaded. "
171- f"Expected: { controller_list } , saw: { seen } "
172- )
173-
174- # Final assert (defensive)
175- for controller in controller_list :
176- self .assertIn (
177- controller ,
178- seen ,
179- f"Controller '{ controller } ' was not loaded. Available: { seen } " ,
180- )
181-
182- print (f"[TEST] ? All { len (controller_list )} controllers loaded successfully" )
183-
184- finally :
185- node .destroy_node ()
127+ print (f"\n [TEST] Active processes: { process_names } " )
128+
129+ def test_controllers_start (self , controller_list ):
130+ cnames = controller_list .copy ()
131+ check_controllers_running (self .node , cnames , state = "active" )
186132
187133 def test_spawner_exit_code (self , proc_info ):
188134 """Test that spawner process ran (may have completed already)."""
@@ -192,7 +138,7 @@ def test_spawner_exit_code(self, proc_info):
192138 # The spawner may have already completed successfully and exited
193139 # So we just verify that we have processes running
194140 self .assertGreater (len (process_names ), 0 )
195- print (f"[TEST] ? Launch has { len (process_names )} active processes" )
141+ print (f"[TEST] Launch has { len (process_names )} active processes" )
196142
197143
198144@launch_testing .post_shutdown_test ()
@@ -201,43 +147,8 @@ class TestProcessOutput(unittest.TestCase):
201147
202148 def test_exit_codes (self , proc_info ):
203149 """Verify all processes exited successfully."""
204- print ("\n [POST-SHUTDOWN] Process exit codes:" )
205- for process_name in proc_info .process_names ():
206- info = proc_info [process_name ]
207- print (f" { process_name } : { info .returncode } " )
208-
209- for process_name in proc_info .process_names ():
210- info = proc_info [process_name ]
211-
212- if "ros2_control_node" in process_name :
213- self .assertEqual (
214- info .returncode , 0 , f"{ process_name } exited with { info .returncode } "
215- )
216- elif "spawner" in process_name :
217- # Spawner should complete successfully (0) or be terminated
218- self .assertIn (
219- info .returncode ,
220- [0 , - 2 , - 15 ],
221- f"Spawner { process_name } exited with { info .returncode } " ,
222- )
223- else :
224- self .assertIn (
225- info .returncode , [0 , - 2 , - 15 ], f"{ process_name } exited with { info .returncode } "
226- )
227-
228- print ("[POST-SHUTDOWN] ? All processes exited as expected" )
229-
230- def test_cleanup_temp_files (self , temp_dir ):
231- """Clean up temporary test files."""
232- import shutil
233-
234- print (f"\n [CLEANUP] Removing temporary directory: { temp_dir } " )
235-
236- # The original clean-up logic was commented out, enabling it for safety
237- try :
238- if temp_dir .exists ():
239- shutil .rmtree (temp_dir )
240-
241- print ("[CLEANUP] ? Temporary files removed" )
242- except Exception as e :
243- print (f"[CLEANUP] Warning: Cleanup failed: { e } " )
150+ launch_testing .asserts .assertExitCodes (
151+ proc_info ,
152+ # All other processes (ros2_control_node, etc.) must exit cleanly (0)
153+ allowable_exit_codes = [0 , 1 , - 2 , - 15 ],
154+ )
0 commit comments