Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ yolo11n.pt
.envrc
.claude
**/CLAUDE.md
.omo/
.direnv/

/logs
Expand Down
20 changes: 18 additions & 2 deletions dimos/control/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,13 @@ def _setup_hardware(self, component: HardwareComponent) -> None:
raise RuntimeError(f"Failed to connect to {component.adapter_type} adapter")

try:
if component.auto_enable and hasattr(adapter, "write_enable"):
adapter.write_enable(True)
if component.auto_enable:
activate = getattr(adapter, "activate", None)
if callable(activate):
if activate() is False:
raise RuntimeError(f"Failed to activate hardware {component.hardware_id}")
elif hasattr(adapter, "write_enable"):
adapter.write_enable(True)

self.add_hardware(adapter, component)
except Exception:
Expand Down Expand Up @@ -706,6 +711,17 @@ def stop(self) -> None:
if self._tick_loop:
self._tick_loop.stop()

with self._hardware_lock:
for hw_id, interface in self._hardware.items():
deactivate = getattr(interface.adapter, "deactivate", None)
if not callable(deactivate):
continue
try:
if deactivate() is False:
logger.error(f"Hardware {hw_id} deactivate returned False")
except Exception as e:
logger.error(f"Error deactivating hardware {hw_id}: {e}")

# Disconnect all hardware adapters
with self._hardware_lock:
for hw_id, interface in self._hardware.items():
Expand Down
66 changes: 66 additions & 0 deletions dimos/control/test_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import pytest

from dimos.control.components import HardwareComponent, HardwareType, make_joints
from dimos.control.coordinator import ControlCoordinator
from dimos.control.hardware_interface import ConnectedHardware
from dimos.control.task import (
ControlMode,
Expand Down Expand Up @@ -188,6 +189,71 @@ def test_write_command(self, connected_hardware, mock_adapter):
mock_adapter.write_joint_positions.assert_called()


class TestControlCoordinatorLifecycle:
def test_start_stop_calls_adapter_activate_and_deactivate(self):
from dimos.hardware.manipulators.mock.adapter import MockAdapter
from dimos.hardware.manipulators.registry import adapter_registry

class LifecycleAdapter(MockAdapter):
events: list[str] = []
Comment on lines +197 to +198

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 events class variable is shared across all instances

events: list[str] = [] is a class-level mutable attribute. Every instance of LifecycleAdapter appends to the same list. In this particular test only one instance is created, so it passes today, but if a second adapter instance were ever created (e.g., in a retry path inside the coordinator), the assertion on line 230 would see duplicate entries and fail unexpectedly. Prefer initialising it in __init__ to make the list per-instance.


def connect(self) -> bool:
self.events.append("connect")
return super().connect()

def activate(self) -> bool:
self.events.append("activate")
return self.write_enable(True)

def deactivate(self) -> bool:
self.events.append("deactivate")
return self.write_stop()

def disconnect(self) -> None:
self.events.append("disconnect")
super().disconnect()

adapter_registry.register("lifecycle_test", LifecycleAdapter)
component = HardwareComponent(
hardware_id="arm",
hardware_type=HardwareType.MANIPULATOR,
joints=make_joints("arm", 6),
adapter_type="lifecycle_test",
)
coordinator = ControlCoordinator(publish_joint_state=False, hardware=[component])

try:
coordinator.start()
finally:
coordinator.stop()

assert LifecycleAdapter.events == ["connect", "activate", "deactivate", "disconnect"]

def test_start_stop_with_adapter_without_lifecycle_methods(self):
"""Adapters without activate/deactivate (e.g. twist bases) start and stop cleanly."""
from dimos.control.components import make_twist_base_joints

component = HardwareComponent(
hardware_id="base",
hardware_type=HardwareType.BASE,
joints=make_twist_base_joints("base"),
adapter_type="mock_twist_base",
)
coordinator = ControlCoordinator(publish_joint_state=False, hardware=[component])

try:
coordinator.start()
adapter = coordinator._hardware["base"].adapter
assert not hasattr(adapter, "activate")
assert not hasattr(adapter, "deactivate")
# auto_enable falls back to write_enable(True) for adapters without activate()
assert adapter.read_enabled()
finally:
coordinator.stop()

assert not adapter.is_connected()


class TestJointTrajectoryTask:
def test_initial_state(self, trajectory_task):
assert trajectory_task.name == "test_traj"
Expand Down
1 change: 1 addition & 0 deletions dimos/hardware/manipulators/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ All adapters must implement these core methods:
| Category | Methods |
|----------|---------|
| Connection | `connect()`, `disconnect()`, `is_connected()` |
| Lifecycle | `activate()`, `deactivate()` |
| Info | `get_info()`, `get_dof()`, `get_limits()` |
| State | `read_joint_positions()`, `read_joint_velocities()`, `read_joint_efforts()` |
| Motion | `write_joint_positions()`, `write_joint_velocities()`, `write_stop()` |
Expand Down
8 changes: 8 additions & 0 deletions dimos/hardware/manipulators/a750/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,14 @@ def is_connected(self) -> bool:
self._connected = bool(self._robot.is_connected())
return self._connected

def activate(self) -> bool:
return self.write_enable(True)

def deactivate(self) -> bool:
stopped = self.write_stop()
disabled = self.write_enable(False)
return stopped and disabled

def get_info(self) -> ManipulatorInfo:
"""Get A-750 information."""
self._trace("get_info")
Expand Down
9 changes: 9 additions & 0 deletions dimos/hardware/manipulators/mock/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,15 @@ def is_connected(self) -> bool:
"""Check mock connection status."""
return self._connected

def activate(self) -> bool:
"""Simulate activation (enable servos)."""
return self.write_enable(True)

def deactivate(self) -> bool:
"""Simulate deactivation (stop motion, disable servos)."""
self.write_stop()
return self.write_enable(False)

def get_info(self) -> ManipulatorInfo:
"""Return mock info."""
return ManipulatorInfo(
Expand Down
8 changes: 8 additions & 0 deletions dimos/hardware/manipulators/openarm/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,14 @@ def disconnect(self) -> None:
def is_connected(self) -> bool:
return self._bus is not None

def activate(self) -> bool:
return self.write_enable(True)

def deactivate(self) -> bool:
stopped = self.write_stop()
disabled = self.write_enable(False)
return stopped and disabled

def get_info(self) -> ManipulatorInfo:
return ManipulatorInfo(
vendor="Enactic",
Expand Down
8 changes: 8 additions & 0 deletions dimos/hardware/manipulators/piper/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,14 @@ def is_connected(self) -> bool:
except Exception:
return False

def activate(self) -> bool:
return self.write_enable(True)

def deactivate(self) -> bool:
stopped = self.write_stop()
disabled = self.write_enable(False)
return stopped and disabled

def get_info(self) -> ManipulatorInfo:
"""Get Piper information."""
firmware_version = None
Expand Down
7 changes: 7 additions & 0 deletions dimos/hardware/manipulators/sim/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,13 @@ def disconnect(self) -> None:
def is_connected(self) -> bool:
return self._connected and self._shm is not None

def activate(self) -> bool:
return self.write_enable(True)

def deactivate(self) -> bool:
self.write_stop()
return self.write_enable(False)

def get_info(self) -> ManipulatorInfo:
return ManipulatorInfo(
vendor="Simulation",
Expand Down
8 changes: 8 additions & 0 deletions dimos/hardware/manipulators/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,14 @@ def is_connected(self) -> bool:
"""Check if connected."""
...

def activate(self) -> bool:
"""Prepare hardware for commanded motion after connect()."""
...

def deactivate(self) -> bool:
"""Gracefully stop commanded motion before disconnect()."""
...

def get_info(self) -> ManipulatorInfo:
"""Get manipulator info (vendor, model, DOF)."""
...
Expand Down
132 changes: 132 additions & 0 deletions dimos/hardware/manipulators/test_adapter_lifecycle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
# Copyright 2026 Dimensional Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

from typing_extensions import override

from dimos.hardware.manipulators.a750.adapter import A750Adapter
from dimos.hardware.manipulators.openarm.adapter import OpenArmAdapter
from dimos.hardware.manipulators.piper.adapter import PiperAdapter


class _PiperSdk:
def __init__(self) -> None:
self.actions: list[str] = []

def EnablePiper(self) -> bool:
self.actions.append("enable")
return True

def MotionCtrl_2(self, **_: object) -> None:
self.actions.append("position_mode")

def EmergencyStop(self) -> None:
self.actions.append("stop")

def DisablePiper(self) -> None:
self.actions.append("disable")


class _LifecyclePiperAdapter(PiperAdapter):
def use_sdk(self, sdk: _PiperSdk) -> None:
self._sdk: _PiperSdk | None
self._sdk = sdk


def test_piper_lifecycle_enables_then_stops_and_disables() -> None:
sdk = _PiperSdk()
adapter = _LifecyclePiperAdapter()
adapter.use_sdk(sdk)

assert adapter.activate()
assert adapter.deactivate()
assert sdk.actions == ["enable", "position_mode", "stop", "disable"]


class _OpenArmLifecycle:
def __init__(self) -> None:
self.actions: list[str] = []

def enable_all(self) -> None:
self.actions.append("enable")

def disable_all(self) -> None:
self.actions.append("disable")


class _LifecycleOpenArmAdapter(OpenArmAdapter):
def __init__(self, lifecycle: _OpenArmLifecycle) -> None:
super().__init__()
self._lifecycle: _OpenArmLifecycle
self._lifecycle = lifecycle

@override
def read_joint_positions(self) -> list[float]:
return [0.0] * 7

@override
def _compute_gravity_torques(self, q: list[float]) -> list[float]:
return [0.0] * len(q)

@override
def write_enable(self, enable: bool) -> bool:
if enable:
self._lifecycle.enable_all()
else:
self._lifecycle.disable_all()
return True

@override
def write_stop(self) -> bool:
self._lifecycle.actions.append("hold")
return True


def test_openarm_lifecycle_enables_then_holds_and_disables() -> None:
lifecycle = _OpenArmLifecycle()
adapter = _LifecycleOpenArmAdapter(lifecycle)

assert adapter.activate()
assert adapter.deactivate()
assert lifecycle.actions == ["enable", "hold", "disable"]


class _A750Robot:
def __init__(self) -> None:
self.actions: list[str] = []

def start_control_loop(self) -> None:
self.actions.append("start")

def stop_control_loop(self) -> None:
self.actions.append("stop")


class _LifecycleA750Adapter(A750Adapter):
def use_robot(self, robot: _A750Robot) -> None:
self._robot: _A750Robot | None
self._connected: bool
self._robot = robot
self._connected = True


def test_a750_lifecycle_starts_then_stops_control_loop() -> None:
robot = _A750Robot()
adapter = _LifecycleA750Adapter()
adapter.use_robot(robot)

assert adapter.activate()
assert adapter.deactivate()
assert robot.actions == ["start", "stop"]
Loading
Loading