Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 85 additions & 2 deletions dimos/control/blueprints.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,15 @@

from __future__ import annotations

from dimos.control.components import HardwareComponent, HardwareType, make_joints
from dimos.control.components import (
HardwareComponent,
HardwareType,
make_joints,
make_twist_base_joints,
)
from dimos.control.coordinator import TaskConfig, control_coordinator
from dimos.core.transport import LCMTransport
from dimos.msgs.geometry_msgs import PoseStamped
from dimos.msgs.geometry_msgs import PoseStamped, Twist
from dimos.msgs.sensor_msgs import JointState
from dimos.teleop.quest.quest_types import Buttons
from dimos.utils.data import LfsPath
Expand Down Expand Up @@ -594,6 +599,80 @@
)


# =============================================================================
# Twist Base Blueprints (velocity-commanded platforms)
# =============================================================================

# Mock holonomic twist base (3-DOF: vx, vy, wz)
_base_joints = make_twist_base_joints("base")
coordinator_mock_twist_base = control_coordinator(
hardware=[
HardwareComponent(
hardware_id="base",
hardware_type=HardwareType.BASE,
joints=_base_joints,
adapter_type="mock_twist_base",
),
],
tasks=[
TaskConfig(
name="vel_base",
type="velocity",
joint_names=_base_joints,
priority=10,
),
],
).transports(
{
("joint_state", JointState): LCMTransport("/coordinator/joint_state", JointState),
("twist_command", Twist): LCMTransport("/cmd_vel", Twist),
}
)


# =============================================================================
# Mobile Manipulation Blueprints (arm + twist base)
# =============================================================================

# Mock arm (7-DOF) + mock holonomic base (3-DOF)
_mm_base_joints = make_twist_base_joints("base")
coordinator_mobile_manip_mock = control_coordinator(
hardware=[
HardwareComponent(
hardware_id="arm",
hardware_type=HardwareType.MANIPULATOR,
joints=make_joints("arm", 7),
adapter_type="mock",
),
HardwareComponent(
hardware_id="base",
hardware_type=HardwareType.BASE,
joints=_mm_base_joints,
adapter_type="mock_twist_base",
),
],
tasks=[
TaskConfig(
name="traj_arm",
type="trajectory",
joint_names=[f"arm_joint{i + 1}" for i in range(7)],
priority=10,
),
TaskConfig(
name="vel_base",
type="velocity",
joint_names=_mm_base_joints,
priority=10,
),
],
).transports(
{
("joint_state", JointState): LCMTransport("/coordinator/joint_state", JointState),
("twist_command", Twist): LCMTransport("/cmd_vel", Twist),
}
)


# =============================================================================
# Raw Blueprints (for programmatic setup)
# =============================================================================
Expand Down Expand Up @@ -624,8 +703,12 @@
# Dual arm
"coordinator_dual_mock",
"coordinator_dual_xarm",
# Mobile manipulation
"coordinator_mobile_manip_mock",
# Single arm
"coordinator_mock",
# Twist base
"coordinator_mock_twist_base",
"coordinator_piper",
"coordinator_piper_xarm",
# Teleop IK
Expand Down
35 changes: 35 additions & 0 deletions dimos/control/components.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,47 @@ def make_joints(hardware_id: HardwareId, dof: int) -> list[JointName]:
return [f"{hardware_id}_joint{i + 1}" for i in range(dof)]


# Maps virtual joint suffix → (Twist group, Twist field)
TWIST_SUFFIX_MAP: dict[str, tuple[str, str]] = {
"vx": ("linear", "x"),
"vy": ("linear", "y"),
"vz": ("linear", "z"),
"wx": ("angular", "x"),
"wy": ("angular", "y"),
"wz": ("angular", "z"),
}

_DEFAULT_TWIST_SUFFIXES = ["vx", "vy", "wz"]


def make_twist_base_joints(
hardware_id: HardwareId,
suffixes: list[str] | None = None,
) -> list[JointName]:
"""Create virtual joint names for a twist base.

Args:
hardware_id: The hardware identifier (e.g., "base")
suffixes: Velocity DOF suffixes. Defaults to ["vx", "vy", "wz"] (holonomic).

Returns:
List of joint names like ["base_vx", "base_vy", "base_wz"]
"""
suffixes = suffixes or _DEFAULT_TWIST_SUFFIXES
for s in suffixes:
if s not in TWIST_SUFFIX_MAP:
raise ValueError(f"Unknown twist suffix '{s}'. Valid: {list(TWIST_SUFFIX_MAP)}")
return [f"{hardware_id}_{s}" for s in suffixes]


__all__ = [
"TWIST_SUFFIX_MAP",
"HardwareComponent",
"HardwareId",
"HardwareType",
"JointName",
"JointState",
"TaskName",
"make_joints",
"make_twist_base_joints",
]
114 changes: 104 additions & 10 deletions dimos/control/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,32 @@
import time
from typing import TYPE_CHECKING, Any

from dimos.control.components import HardwareComponent, HardwareId, JointName, TaskName
from dimos.control.hardware_interface import ConnectedHardware
from dimos.control.components import (
TWIST_SUFFIX_MAP,
HardwareComponent,
HardwareId,
HardwareType,
JointName,
TaskName,
)
from dimos.control.hardware_interface import ConnectedHardware, ConnectedTwistBase
from dimos.control.task import ControlTask
from dimos.control.tick_loop import TickLoop
from dimos.core import In, Module, Out, rpc
from dimos.core.module import ModuleConfig
from dimos.hardware.drive_trains.spec import (
TwistBaseAdapter,
)
from dimos.msgs.geometry_msgs import (
PoseStamped, # noqa: TC001 - needed at runtime for In[PoseStamped]
Twist, # noqa: TC001 - needed at runtime for In[Twist]
)
from dimos.msgs.sensor_msgs import (
JointState, # noqa: TC001 - needed at runtime for Out[JointState]
JointState,
)
from dimos.teleop.quest.quest_types import (
Buttons, # noqa: TC001 - needed at runtime for In[Buttons]
)
from dimos.teleop.quest.quest_types import Buttons # noqa: TC001 - needed for teleop buttons
from dimos.utils.logging_config import setup_logger

if TYPE_CHECKING:
Expand Down Expand Up @@ -148,6 +161,9 @@ class ControlCoordinator(Module[ControlCoordinatorConfig]):
# Uses frame_id as task name for routing
cartesian_command: In[PoseStamped]

# Input: Streaming twist commands for velocity-commanded platforms
twist_command: In[Twist]

# Input: Teleop buttons for engage/disengage signaling
buttons: In[Buttons]

Expand All @@ -174,6 +190,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None:
# Subscription handles for streaming commands
self._joint_command_unsub: Callable[[], None] | None = None
self._cartesian_command_unsub: Callable[[], None] | None = None
self._twist_command_unsub: Callable[[], None] | None = None
self._buttons_unsub: Callable[[], None] | None = None

logger.info(f"ControlCoordinator initialized at {self.config.tick_rate}Hz")
Expand Down Expand Up @@ -206,7 +223,11 @@ def _setup_from_config(self) -> None:

def _setup_hardware(self, component: HardwareComponent) -> None:
"""Connect and add a single hardware adapter."""
adapter = self._create_adapter(component)
adapter: ManipulatorAdapter | TwistBaseAdapter
if component.hardware_type == HardwareType.BASE:
adapter = self._create_twist_base_adapter(component)
else:
adapter = self._create_adapter(component)

if not adapter.connect():
raise RuntimeError(f"Failed to connect to {component.adapter_type} adapter")
Expand All @@ -230,6 +251,16 @@ def _create_adapter(self, component: HardwareComponent) -> ManipulatorAdapter:
address=component.address,
)

def _create_twist_base_adapter(self, component: HardwareComponent) -> TwistBaseAdapter:
"""Create a twist base adapter from component config."""
from dimos.hardware.drive_trains.registry import twist_base_adapter_registry

return twist_base_adapter_registry.create(
component.adapter_type,
dof=len(component.joints),
address=component.address,
)

def _create_task_from_config(self, cfg: TaskConfig) -> ControlTask:
"""Create a control task from config."""
task_type = cfg.type.lower()
Expand Down Expand Up @@ -310,19 +341,34 @@ def _create_task_from_config(self, cfg: TaskConfig) -> ControlTask:
@rpc
def add_hardware(
self,
adapter: ManipulatorAdapter,
adapter: ManipulatorAdapter | TwistBaseAdapter,
component: HardwareComponent,
) -> bool:
"""Register a hardware adapter with the coordinator."""
is_base = component.hardware_type == HardwareType.BASE
if is_base != isinstance(adapter, TwistBaseAdapter):
raise TypeError(
f"Hardware type / adapter mismatch for '{component.hardware_id}': "
f"hardware_type={component.hardware_type.value} but got "
f"{type(adapter).__name__}"
)

with self._hardware_lock:
if component.hardware_id in self._hardware:
logger.warning(f"Hardware {component.hardware_id} already registered")
return False

connected = ConnectedHardware(
adapter=adapter,
component=component,
)
if isinstance(adapter, TwistBaseAdapter):
connected: ConnectedHardware = ConnectedTwistBase(
adapter=adapter,
component=component,
)
else:
connected = ConnectedHardware(
adapter=adapter,
component=component,
)

self._hardware[component.hardware_id] = connected

for joint_name in connected.joint_names:
Expand Down Expand Up @@ -490,6 +536,34 @@ def _on_cartesian_command(self, msg: PoseStamped) -> None:

task.on_cartesian_command(msg, t_now)

def _on_twist_command(self, msg: Twist) -> None:
"""Convert Twist → virtual joint velocities and route via _on_joint_command.

Maps Twist fields to virtual joints using suffix convention:
base_vx ← linear.x, base_vy ← linear.y, base_wz ← angular.z, etc.
"""
names: list[str] = []
velocities: list[float] = []

with self._hardware_lock:
for hw in self._hardware.values():
if hw.component.hardware_type != HardwareType.BASE:
continue
for joint_name in hw.joint_names:
# Extract suffix (e.g., "base_vx" → "vx")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Youre assuming here that joint names MUST have vy and vy and vz etc. Do you know this for sure?

suffix = joint_name.rsplit("_", 1)[-1]
mapping = TWIST_SUFFIX_MAP.get(suffix)
if mapping is None:
continue
group, axis = mapping
value = getattr(getattr(msg, group), axis)
names.append(joint_name)
velocities.append(value)

if names:
joint_state = JointState(name=names, velocity=velocities)
self._on_joint_command(joint_state)

def _on_buttons(self, msg: Buttons) -> None:
"""Forward button state to all tasks."""
with self._task_lock:
Expand Down Expand Up @@ -536,6 +610,9 @@ def set_gripper_position(self, hardware_id: str, position: float) -> bool:
if hw is None:
logger.warning(f"Hardware '{hardware_id}' not found for gripper command")
return False
if isinstance(hw, ConnectedTwistBase):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ideally want to automatically check things like this with protocols. So as we scale up to integrate with many robots that don't have an end effector we can do checks like this more systematically

logger.warning(f"Hardware '{hardware_id}' is a twist base, no gripper support")
return False
return hw.adapter.write_gripper_position(position)

@rpc
Expand All @@ -549,6 +626,8 @@ def get_gripper_position(self, hardware_id: str) -> float | None:
hw = self._hardware.get(hardware_id)
if hw is None:
return None
if isinstance(hw, ConnectedTwistBase):
return None
return hw.adapter.read_gripper_position()

# =========================================================================
Expand Down Expand Up @@ -610,6 +689,18 @@ def start(self) -> None:
"Use task_invoke RPC or set transport via blueprint."
)

# Subscribe to twist commands if any twist base hardware configured
has_twist_base = any(c.hardware_type == HardwareType.BASE for c in self.config.hardware)
if has_twist_base:
try:
self._twist_command_unsub = self.twist_command.subscribe(self._on_twist_command)
logger.info("Subscribed to twist_command for twist base control")
except Exception:
logger.warning(
"Twist base configured but could not subscribe to twist_command. "
"Use task_invoke RPC or set transport via blueprint."
)

# Subscribe to buttons if any teleop_ik tasks configured (engage/disengage)
has_teleop_ik = any(t.type == "teleop_ik" for t in self.config.tasks)
if has_teleop_ik:
Expand All @@ -630,6 +721,9 @@ def stop(self) -> None:
if self._cartesian_command_unsub:
self._cartesian_command_unsub()
self._cartesian_command_unsub = None
if self._twist_command_unsub:
self._twist_command_unsub()
self._twist_command_unsub = None
if self._buttons_unsub:
self._buttons_unsub()
self._buttons_unsub = None
Expand Down
Loading
Loading