Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
231 changes: 12 additions & 219 deletions controls/sae_2025_ws/src/uav/test/test_auto_launch.py
Original file line number Diff line number Diff line change
@@ -1,233 +1,23 @@
from __future__ import annotations

import importlib
import importlib.util
from pathlib import Path
import sys
from types import SimpleNamespace
import types
from typing import Any, cast

import pytest

from uav.test_support.ros_stubs import install_auto_launch_import_stubs

def _import_module_if_available(name: str):
try:
return importlib.import_module(name)
except ModuleNotFoundError:
return None


if "std_msgs" not in sys.modules:
std_msgs = _import_module_if_available("std_msgs")
else:
std_msgs = sys.modules["std_msgs"]
if std_msgs is None:
std_msgs = types.ModuleType("std_msgs")
std_msgs_msg = types.ModuleType("std_msgs.msg")
std_msgs_msg.Empty = type("Empty", (), {})
std_msgs.msg = std_msgs_msg
sys.modules.update({"std_msgs": std_msgs, "std_msgs.msg": std_msgs_msg})

ament_index_python = sys.modules.get("ament_index_python")
if ament_index_python is None:
ament_index_python = _import_module_if_available("ament_index_python")
if ament_index_python is None:
ament_index_python = types.ModuleType("ament_index_python")
sys.modules["ament_index_python"] = ament_index_python

ament_index_packages = sys.modules.get("ament_index_python.packages")
if ament_index_packages is None:
ament_index_packages = _import_module_if_available("ament_index_python.packages")
if ament_index_packages is None:
ament_index_packages = types.ModuleType("ament_index_python.packages")
sys.modules["ament_index_python.packages"] = ament_index_packages
if not hasattr(ament_index_packages, "PackageNotFoundError"):

class PackageNotFoundError(Exception):
pass

ament_index_packages.PackageNotFoundError = PackageNotFoundError
if not hasattr(ament_index_packages, "get_package_share_directory"):
ament_index_packages.get_package_share_directory = lambda _name: str(
Path(__file__).resolve().parents[1]
)
sys.modules["ament_index_python"].packages = ament_index_packages

if "std_srvs" not in sys.modules:
std_srvs = _import_module_if_available("std_srvs")
else:
std_srvs = sys.modules["std_srvs"]
if std_srvs is None:
std_srvs = types.ModuleType("std_srvs")
std_srvs_srv = types.ModuleType("std_srvs.srv")

class Trigger:
Request = type("Request", (), {})
Response = type("Response", (), {})

std_srvs_srv.Trigger = Trigger
std_srvs.srv = std_srvs_srv
sys.modules.update({"std_srvs": std_srvs, "std_srvs.srv": std_srvs_srv})

launch_module = sys.modules.get("launch")
if launch_module is None:
launch_module = _import_module_if_available("launch")
if launch_module is None:
launch_module = types.ModuleType("launch")
sys.modules["launch"] = launch_module
if not hasattr(launch_module, "LaunchDescription"):
launch_module.LaunchDescription = type("LaunchDescription", (), {})

launch_actions = sys.modules.get("launch.actions")
if launch_actions is None:
launch_actions = _import_module_if_available("launch.actions")
if launch_actions is None:
launch_actions = types.ModuleType("launch.actions")
sys.modules["launch.actions"] = launch_actions
for name in (
"DeclareLaunchArgument",
"ExecuteProcess",
"IncludeLaunchDescription",
"OpaqueFunction",
):
if not hasattr(launch_actions, name):
setattr(launch_actions, name, type(name, (), {}))

launch_sources = sys.modules.get("launch.launch_description_sources")
if launch_sources is None:
launch_sources = _import_module_if_available("launch.launch_description_sources")
if launch_sources is None:
launch_sources = types.ModuleType("launch.launch_description_sources")
sys.modules["launch.launch_description_sources"] = launch_sources
if not hasattr(launch_sources, "PythonLaunchDescriptionSource"):
launch_sources.PythonLaunchDescriptionSource = type(
"PythonLaunchDescriptionSource", (), {}
)

launch_logging = sys.modules.get("launch.logging")
if launch_logging is None:
launch_logging = _import_module_if_available("launch.logging")
if launch_logging is None:
launch_logging = types.ModuleType("launch.logging")
sys.modules["launch.logging"] = launch_logging
if not hasattr(launch_logging, "get_logger"):
launch_logging.get_logger = lambda *_args, **_kwargs: SimpleNamespace(
warning=lambda *_a, **_k: None,
warn=lambda *_a, **_k: None,
info=lambda *_a, **_k: None,
)

launch_substitutions = sys.modules.get("launch.substitutions")
if launch_substitutions is None:
launch_substitutions = _import_module_if_available("launch.substitutions")
if launch_substitutions is None:
launch_substitutions = types.ModuleType("launch.substitutions")
sys.modules["launch.substitutions"] = launch_substitutions
if not hasattr(launch_substitutions, "LaunchConfiguration"):
launch_substitutions.LaunchConfiguration = type("LaunchConfiguration", (), {})

rclpy = sys.modules.get("rclpy")
if rclpy is None:
rclpy = _import_module_if_available("rclpy")
if rclpy is None:
rclpy = types.ModuleType("rclpy")
sys.modules["rclpy"] = rclpy
if not hasattr(rclpy, "init"):
rclpy.init = lambda *args, **kwargs: None
if not hasattr(rclpy, "shutdown"):
rclpy.shutdown = lambda: None
if not hasattr(rclpy, "ok"):
rclpy.ok = lambda: True

node_mod = sys.modules.get("rclpy.node")
if node_mod is None:
node_mod = _import_module_if_available("rclpy.node")
if node_mod is None:
node_mod = types.ModuleType("rclpy.node")
sys.modules["rclpy.node"] = node_mod
if not hasattr(node_mod, "Node"):

class Node:
def __init__(self, *_args, **_kwargs) -> None:
pass

node_mod.Node = Node

executors_mod = sys.modules.get("rclpy.executors")
if executors_mod is None:
executors_mod = _import_module_if_available("rclpy.executors")
if executors_mod is None:
executors_mod = types.ModuleType("rclpy.executors")
sys.modules["rclpy.executors"] = executors_mod
if not hasattr(executors_mod, "ExternalShutdownException"):

class ExternalShutdownException(Exception):
pass

executors_mod.ExternalShutdownException = ExternalShutdownException

clock_mod = sys.modules.get("rclpy.clock")
if clock_mod is None:
clock_mod = _import_module_if_available("rclpy.clock")
if clock_mod is None:
clock_mod = types.ModuleType("rclpy.clock")
sys.modules["rclpy.clock"] = clock_mod
if not hasattr(clock_mod, "Clock"):
clock_mod.Clock = type("Clock", (), {})

parameter_mod = sys.modules.get("rclpy.parameter")
if parameter_mod is None:
parameter_mod = _import_module_if_available("rclpy.parameter")
if parameter_mod is None:
parameter_mod = types.ModuleType("rclpy.parameter")
sys.modules["rclpy.parameter"] = parameter_mod
if not hasattr(parameter_mod, "Parameter"):
parameter_mod.Parameter = type("Parameter", (), {})

validate_namespace_mod = sys.modules.get("rclpy.validate_namespace")
if validate_namespace_mod is None:
validate_namespace_mod = _import_module_if_available("rclpy.validate_namespace")
if validate_namespace_mod is None:
validate_namespace_mod = types.ModuleType("rclpy.validate_namespace")
sys.modules["rclpy.validate_namespace"] = validate_namespace_mod
if not hasattr(validate_namespace_mod, "validate_namespace"):
validate_namespace_mod.validate_namespace = lambda namespace: None

validate_node_name_mod = sys.modules.get("rclpy.validate_node_name")
if validate_node_name_mod is None:
validate_node_name_mod = _import_module_if_available("rclpy.validate_node_name")
if validate_node_name_mod is None:
validate_node_name_mod = types.ModuleType("rclpy.validate_node_name")
sys.modules["rclpy.validate_node_name"] = validate_node_name_mod
if not hasattr(validate_node_name_mod, "validate_node_name"):
validate_node_name_mod.validate_node_name = lambda node_name: None

qos_mod = sys.modules.get("rclpy.qos")
if qos_mod is None:
qos_mod = _import_module_if_available("rclpy.qos")
if qos_mod is None:
qos_mod = types.ModuleType("rclpy.qos")
sys.modules["rclpy.qos"] = qos_mod
if not hasattr(qos_mod, "QoSProfile"):
qos_mod.QoSProfile = type("QoSProfile", (), {})
if not hasattr(qos_mod, "QoSReliabilityPolicy"):
qos_mod.QoSReliabilityPolicy = type("QoSReliabilityPolicy", (), {})
if not hasattr(qos_mod, "QoSHistoryPolicy"):
qos_mod.QoSHistoryPolicy = type("QoSHistoryPolicy", (), {})
if not hasattr(qos_mod, "QoSDurabilityPolicy"):
qos_mod.QoSDurabilityPolicy = type("QoSDurabilityPolicy", (), {})

rclpy.node = node_mod
rclpy.executors = executors_mod
rclpy.clock = clock_mod
rclpy.parameter = parameter_mod
rclpy.validate_namespace = validate_namespace_mod
rclpy.validate_node_name = validate_node_name_mod
rclpy.qos = qos_mod
install_auto_launch_import_stubs(Path(__file__).resolve().parents[1])

from uav.runtime.ModeManager import ModeManager # noqa: E402

try:
uav_mission_module: Any
uav_manager_module: Any
UAVModeManager: Any
import uav.runtime.uav_mission as uav_mission_module
import uav.runtime.UAVModeManager as uav_manager_module
from uav.runtime.UAVModeManager import UAVModeManager
Expand All @@ -239,6 +29,8 @@ class ExternalShutdownException(Exception):
UAVModeManager = None

try:
payload_mission_module: Any
payload_manager_module: Any
import uav.runtime.payload_mission as payload_mission_module
import uav.runtime.PayloadModeManager as payload_manager_module
except ModuleNotFoundError as exc:
Expand Down Expand Up @@ -271,8 +63,8 @@ def cancel(self) -> None:
self.cancelled = True


def _make_mode_manager(*, ready: bool) -> ModeManager:
manager = object.__new__(ModeManager)
def _make_mode_manager(*, ready: bool) -> Any:
manager = cast(Any, object.__new__(ModeManager))
manager.vehicle = None
manager.modes = {}
manager.transitions = {}
Expand Down Expand Up @@ -396,8 +188,9 @@ def _stub_mode_manager_init(
def _load_main_launch_module():
launch_path = Path(__file__).resolve().parents[1] / "launch" / "main.launch.py"
spec = importlib.util.spec_from_file_location("uav_main_launch", launch_path)
if spec is None or spec.loader is None:
raise ImportError(f"Unable to load launch module from {launch_path}")
module = importlib.util.module_from_spec(spec)
assert spec.loader is not None
spec.loader.exec_module(module)
return module

Expand Down
3 changes: 2 additions & 1 deletion controls/sae_2025_ws/src/uav/test/test_flake8.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@

from ament_flake8.main import main_with_errors
import pytest
from typing import cast


@pytest.mark.flake8
@pytest.mark.linter
def test_flake8():
rc, errors = main_with_errors(argv=[])
rc, errors = cast(tuple[int, list[str]], main_with_errors(argv=[]))
assert rc == 0, "Found %d code style errors / warnings:\n" % len(
errors
) + "\n".join(errors)
40 changes: 27 additions & 13 deletions controls/sae_2025_ws/src/uav/test/test_fleet_launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,12 @@ def _ensure_launch_import_stubs() -> None:
ament_index_packages = types.ModuleType("ament_index_python.packages")
sys.modules["ament_index_python.packages"] = ament_index_packages
if not hasattr(ament_index_packages, "get_package_share_directory"):
ament_index_packages.get_package_share_directory = lambda _name: str(
Path(__file__).resolve().parents[1]
setattr(
ament_index_packages,
"get_package_share_directory",
lambda _name: str(Path(__file__).resolve().parents[1]),
)
ament_index_python.packages = ament_index_packages
setattr(ament_index_python, "packages", ament_index_packages)

launch_module = sys.modules.get("launch")
if launch_module is None:
Expand All @@ -58,7 +60,7 @@ def _ensure_launch_import_stubs() -> None:
launch_module = types.ModuleType("launch")
sys.modules["launch"] = launch_module
if not hasattr(launch_module, "LaunchDescription"):
launch_module.LaunchDescription = type("LaunchDescription", (), {})
setattr(launch_module, "LaunchDescription", type("LaunchDescription", (), {}))

launch_actions = sys.modules.get("launch.actions")
if launch_actions is None:
Expand All @@ -84,8 +86,10 @@ def _ensure_launch_import_stubs() -> None:
launch_sources = types.ModuleType("launch.launch_description_sources")
sys.modules["launch.launch_description_sources"] = launch_sources
if not hasattr(launch_sources, "PythonLaunchDescriptionSource"):
launch_sources.PythonLaunchDescriptionSource = type(
"PythonLaunchDescriptionSource", (), {}
setattr(
launch_sources,
"PythonLaunchDescriptionSource",
type("PythonLaunchDescriptionSource", (), {}),
)

launch_logging = sys.modules.get("launch.logging")
Expand All @@ -95,10 +99,14 @@ def _ensure_launch_import_stubs() -> None:
launch_logging = types.ModuleType("launch.logging")
sys.modules["launch.logging"] = launch_logging
if not hasattr(launch_logging, "get_logger"):
launch_logging.get_logger = lambda *_args, **_kwargs: SimpleNamespace(
warning=lambda *_a, **_k: None,
warn=lambda *_a, **_k: None,
info=lambda *_a, **_k: None,
setattr(
launch_logging,
"get_logger",
lambda *_args, **_kwargs: SimpleNamespace(
warning=lambda *_a, **_k: None,
warn=lambda *_a, **_k: None,
info=lambda *_a, **_k: None,
),
)

launch_substitutions = sys.modules.get("launch.substitutions")
Expand All @@ -108,7 +116,11 @@ def _ensure_launch_import_stubs() -> None:
launch_substitutions = types.ModuleType("launch.substitutions")
sys.modules["launch.substitutions"] = launch_substitutions
if not hasattr(launch_substitutions, "LaunchConfiguration"):
launch_substitutions.LaunchConfiguration = type("LaunchConfiguration", (), {})
setattr(
launch_substitutions,
"LaunchConfiguration",
type("LaunchConfiguration", (), {}),
)


def _load_fleet_module():
Expand All @@ -123,8 +135,9 @@ def _load_fleet_module():
spec = importlib.util.spec_from_file_location(
"uav_fleet_launch_helpers", launch_path
)
if spec is None or spec.loader is None:
raise ImportError(f"Unable to load launch module from {launch_path}")
module = importlib.util.module_from_spec(spec)
assert spec.loader is not None
spec.loader.exec_module(module)
return module

Expand Down Expand Up @@ -255,8 +268,9 @@ def test_real_backend_does_not_import_sim(monkeypatch):
spec = importlib.util.spec_from_file_location(
"uav_fleet_launch_hardware_only", launch_path
)
if spec is None or spec.loader is None:
raise ImportError(f"Unable to load launch module from {launch_path}")
module = importlib.util.module_from_spec(spec)
assert spec.loader is not None

real_import = builtins.__import__

Expand Down
Loading
Loading