diff --git a/dimos/robot/unitree/go2/cli/go2tool.py b/dimos/robot/unitree/go2/cli/go2tool.py index 55d24e737c..21e730eb8f 100644 --- a/dimos/robot/unitree/go2/cli/go2tool.py +++ b/dimos/robot/unitree/go2/cli/go2tool.py @@ -17,9 +17,17 @@ from __future__ import annotations import asyncio +from collections.abc import AsyncIterator, Awaitable, Callable, Sequence +from dataclasses import dataclass +from enum import Enum +from pathlib import Path +import platform +from typing import Literal, NoReturn, Protocol, cast import typer +from dimos.robot.unitree.go2.cli.ble import Go2Device, retry + app = typer.Typer( help="Go2 setup utilities (BLE wifi provisioning, network discovery)", no_args_is_help=True, @@ -27,12 +35,187 @@ _HEADER = f"{'SOURCE':<6} {'NAME':<14} {'IP':<15} {'MAC':<19} SERIAL" +_HELPER_TIMEOUT_ERROR = ( + "BLE helper backend cannot run with --timeout 0. Use --timeout > 0 or --ble-backend direct." +) +_DEFAULT_BLE_CONNECT_RETRIES = 3 + + +class BleBackend(str, Enum): + auto = "auto" + helper = "helper" + direct = "direct" + + +class _BleFinder(Protocol): + def __call__( + self, + *, + timeout: float, + on_device: Callable[[Go2Device], None] | None = None, + ) -> Awaitable[list[Go2Device]]: ... + + +class _BleProvisioner(Protocol): + def __call__( + self, + address: str, + ssid: str, + password: str, + country_code: str, + *, + timeout: float, + connect_retries: int, + on_progress: Callable[[str], None] | None = None, + ) -> Awaitable[str | None]: ... + + +@dataclass(frozen=True) +class _BleBackendRuntime: + kind: Literal["helper", "direct"] + find_robots: _BleFinder + provision_wifi: _BleProvisioner + discover_ble: Callable[[], AsyncIterator[Go2Device]] | None = None + + +class _Go2ToolError(RuntimeError): + """User-facing CLI configuration error.""" def _format_row(source: str, name: str, ip: str, mac: str, serial: str) -> str: return f"{source:<6} {name:<14} {ip:<15} {mac:<19} {serial}" +def _selected_backend_kind(backend: BleBackend) -> Literal["helper", "direct"]: + if backend == BleBackend.auto: + return "helper" if platform.system() == "Darwin" else "direct" + if backend == BleBackend.helper: + if platform.system() != "Darwin": + raise _Go2ToolError( + "BLE helper backend is only supported on macOS. " + "Use --ble-backend direct on this platform." + ) + return "helper" + return "direct" + + +def _select_ble_backend( + backend: BleBackend, + helper_app_path: Path | None, +) -> _BleBackendRuntime: + kind = _selected_backend_kind(backend) + + if kind == "helper": + from dimos.robot.unitree.go2.cli import macos_ble_helper + + async def helper_find_robots( + *, + timeout: float, + on_device: Callable[[Go2Device], None] | None = None, + ) -> list[Go2Device]: + return await macos_ble_helper.find_robots( + timeout=timeout, + helper_app_path=helper_app_path, + on_device=on_device, + ) + + async def helper_provision_wifi( + address: str, + ssid: str, + password: str, + country_code: str, + *, + timeout: float, + connect_retries: int, + on_progress: Callable[[str], None] | None = None, + ) -> str | None: + return await macos_ble_helper.provision_wifi( + address, + ssid, + password, + country_code, + timeout=timeout, + connect_retries=connect_retries, + helper_app_path=helper_app_path, + on_progress=on_progress, + ) + + return _BleBackendRuntime( + kind="helper", + find_robots=helper_find_robots, + provision_wifi=helper_provision_wifi, + ) + + from dimos.robot.unitree.go2.cli import ble as direct_ble + + async def direct_find_robots( + *, + timeout: float, + on_device: Callable[[Go2Device], None] | None = None, + ) -> list[Go2Device]: + return await direct_ble.find_robots(timeout=timeout, on_device=on_device) + + async def direct_provision_wifi( + address: str, + ssid: str, + password: str, + country_code: str, + *, + timeout: float, + connect_retries: int, + on_progress: Callable[[str], None] | None = None, + ) -> str | None: + return await direct_ble.provision_wifi( + address, + ssid, + password, + country_code, + timeout=timeout, + connect_retries=connect_retries, + on_progress=on_progress, + ) + + async def discover_ble() -> AsyncIterator[Go2Device]: + async for device in direct_ble.discover_ble(): + yield device + + return _BleBackendRuntime( + kind="direct", + find_robots=direct_find_robots, + provision_wifi=direct_provision_wifi, + discover_ble=discover_ble, + ) + + +def _exit_error(message: str) -> NoReturn: + typer.echo(message, err=True) + raise typer.Exit(1) + + +def _redact(text: str, secrets: Sequence[str]) -> str: + safe = text + for secret in secrets: + if secret: + safe = safe.replace(secret, "[REDACTED]") + return safe + + +def _echo_safe(message: str, secrets: Sequence[str], *, err: bool = False) -> None: + typer.echo(_redact(message, secrets), err=err) + + +def _require_helper_timeout(backend: _BleBackendRuntime, timeout: float) -> None: + if backend.kind == "helper" and timeout <= 0: + _exit_error(_HELPER_TIMEOUT_ERROR) + + +def _discover_backend_for_timeout(backend: BleBackend, timeout: float) -> BleBackend: + # The LaunchServices helper is one-shot; unbounded discovery needs direct BLE streaming. + if backend == BleBackend.auto and timeout <= 0 and platform.system() == "Darwin": + return BleBackend.direct + return backend + + @app.command("discover") def discover( ble: bool = typer.Option(False, "--ble", help="BLE only (default: BLE + LAN)"), @@ -41,29 +224,59 @@ def discover( timeout: float = typer.Option( 7.0, "--timeout", "-t", help="Stop after this many seconds (0 = run forever)" ), + ble_backend: BleBackend = typer.Option( + BleBackend.auto, + "--ble-backend", + help="BLE backend to use. On macOS, auto uses direct BLE when --timeout 0.", + ), + ble_helper: Path | None = typer.Option( + None, + "--ble-helper", + envvar="DIMOS_GO2_BLE_HELPER", + help="macOS BLE helper .app path.", + ), ) -> None: """Stream Go2 robot discoveries from BLE and/or LAN.""" do_ble = ble or not lan do_lan = lan or not ble + backend_runtime: _BleBackendRuntime | None = None + if do_ble: + try: + backend_runtime = _select_ble_backend( + _discover_backend_for_timeout(ble_backend, timeout), ble_helper + ) + except _Go2ToolError as e: + _exit_error(str(e)) + _require_helper_timeout(backend_runtime, timeout) typer.echo(_HEADER) import signal async def run() -> None: - from dimos.robot.unitree.go2.cli.ble import discover_ble from dimos.robot.unitree.go2.cli.landiscovery import discover_lan seen_ble: set[tuple[str, str | None]] = set() seen_lan: set[str] = set() + def _emit_ble_device(d: Go2Device) -> None: + key = (d.address, d.serial) + if key in seen_ble: + return + seen_ble.add(key) + typer.echo(_format_row("BLE", d.name, "-", d.address, d.serial or "?")) + async def _consume_ble() -> None: - async for d in discover_ble(): - key = (d.address, d.serial) - if key in seen_ble: - continue - seen_ble.add(key) - typer.echo(_format_row("BLE", d.name, "-", d.address, d.serial or "?")) + assert backend_runtime is not None + if backend_runtime.kind == "helper": + devices = await backend_runtime.find_robots(timeout=timeout) + for d in devices: + _emit_ble_device(d) + return + + assert backend_runtime.discover_ble is not None + async for d in backend_runtime.discover_ble(): + _emit_ble_device(d) async def _consume_lan() -> None: async for d in discover_lan(tick=lan_tick): @@ -90,18 +303,27 @@ def _stop() -> None: loop.add_signal_handler(sig, _stop) if timeout > 0: - loop.call_later(timeout, _stop) - - await asyncio.gather(*tasks, return_exceptions=True) - - asyncio.run(run()) + grace = 0.25 if backend_runtime is not None and backend_runtime.kind == "helper" else 0 + loop.call_later(timeout + grace, _stop) + + results = await asyncio.gather(*tasks, return_exceptions=True) + for result in results: + if isinstance(result, asyncio.CancelledError): + continue + if isinstance(result, BaseException): + raise result + + try: + asyncio.run(run()) + except Exception as e: + _exit_error(str(e)) typer.echo("\nStopped.") @app.command("connect-wifi") def connect_wifi( - ssid: str | None = typer.Option(None, "--ssid", help="Wi-Fi SSID"), - password: str | None = typer.Option(None, "--password", help="Wi-Fi password"), + ssid: str | None = typer.Option(None, "--ssid", help="wifi SSID"), + password: str | None = typer.Option(None, "--password", help="wifi password"), country: str = typer.Option("US", "--country", help="Two-letter country code"), mac: str | None = typer.Option(None, "--mac", help="BLE MAC (skip scan)"), serial: str | None = typer.Option( @@ -111,21 +333,36 @@ def connect_wifi( None, "--name", help="Robot BLE name (e.g. Go2_49060) — scan and auto-select match" ), timeout: float = typer.Option(5.0, "--timeout", help="Scan / connect timeout in seconds"), - retries: int = typer.Option(3, "--retries", help="Number of provisioning attempts"), + retries: int = typer.Option( + 3, + "--retries", + help="Number of provisioning attempts; BLE connection retries stay at 3", + ), + ble_backend: BleBackend = typer.Option( + BleBackend.auto, + "--ble-backend", + help="BLE backend to use.", + ), + ble_helper: Path | None = typer.Option( + None, + "--ble-helper", + envvar="DIMOS_GO2_BLE_HELPER", + help="macOS BLE helper .app path.", + ), ) -> None: - """Provision a Go2 with Wi-Fi credentials over Bluetooth. + """Provision a Go2 with wifi credentials over Bluetooth. Fully non-interactive when (--mac | --serial | --name) and --ssid/--password are all provided. """ - from dimos.robot.unitree.go2.cli.ble import ( - Go2Device, - find_robots, - provision_wifi, - retry, - ) + try: + backend_runtime = _select_ble_backend(ble_backend, ble_helper) + except _Go2ToolError as e: + _exit_error(str(e)) + _require_helper_timeout(backend_runtime, timeout) async def run() -> None: + target: str if mac is not None: target = mac else: @@ -134,7 +371,7 @@ async def run() -> None: def _on_device(d: Go2Device) -> None: typer.echo(_format_row("BLE", d.name, "-", d.address, d.serial or "?")) - devices = await find_robots(timeout=timeout, on_device=_on_device) + devices = await backend_runtime.find_robots(timeout=timeout, on_device=_on_device) if not devices: typer.echo("No Unitree robots detected.", err=True) raise typer.Exit(1) @@ -169,23 +406,25 @@ def _on_device(d: Go2Device) -> None: raise typer.Exit(1) target = devices[idx - 1].address - wifi_ssid = ssid if ssid is not None else typer.prompt("Wi-Fi SSID") + wifi_ssid = ssid if ssid is not None else typer.prompt("wifi SSID") wifi_password = ( password if password is not None - else typer.prompt("Wi-Fi password", hide_input=True, default="", show_default=False) + else typer.prompt("wifi password", hide_input=True, default="", show_default=False) ) def _on_error(attempt: int, exc: BaseException) -> None: - typer.echo(f" attempt {attempt} failed: {exc}", err=True) + _echo_safe(f" attempt {attempt} failed: {exc}", (wifi_password,), err=True) device_serial = await retry( - lambda: provision_wifi( - target, # type: ignore[arg-type] + lambda: backend_runtime.provision_wifi( + target, wifi_ssid, wifi_password, country, - on_progress=lambda m: typer.echo(f" {m}"), + timeout=timeout, + connect_retries=_DEFAULT_BLE_CONNECT_RETRIES, + on_progress=lambda m: _echo_safe(f" {m}", (wifi_password,)), ), attempts=retries, on_error=_on_error, @@ -199,6 +438,162 @@ def _on_error(attempt: int, exc: BaseException) -> None: asyncio.run(run()) +@app.command("verify") +def verify( + robot_ip: str = typer.Option(..., "--robot-ip", help="Robot IP address to verify"), + timeout: float = typer.Option(1.0, "--timeout", help="HTTP probe timeout in seconds"), +) -> None: + """Verify a Go2 LAN IP without sending movement commands.""" + from dimos.robot.unitree.go2.cli.verify import verify_robot_ip + + status = verify_robot_ip(robot_ip, timeout_s=timeout) + for line in status.summary_lines(): + typer.echo(line) + raise typer.Exit(0 if status.ok else 1) + + +@app.command("setup") +def setup( + ssid: str = typer.Option(..., "--ssid", help="wifi SSID"), + password: str | None = typer.Option(None, "--password", help="wifi password"), + country: str = typer.Option("US", "--country", help="Two-letter country code"), + serial: str | None = typer.Option(None, "--serial", help="Robot serial to select"), + name: str | None = typer.Option(None, "--name", help="Robot BLE name to select"), + mac: str | None = typer.Option(None, "--mac", help="BLE MAC/address to provision"), + timeout: float = typer.Option(5.0, "--timeout", help="BLE scan / connect timeout in seconds"), + retries: int = typer.Option( + 3, + "--retries", + help="Number of provisioning attempts; BLE connection retries stay at 3", + ), + lan_timeout: float = typer.Option(2.0, "--lan-timeout", help="LAN discovery timeout"), + rediscovery_attempts: int = typer.Option( + 5, + "--rediscovery-attempts", + help="LAN rediscovery attempts after provisioning", + ), + rediscovery_delay: float = typer.Option( + 2.0, + "--rediscovery-delay", + help="Seconds between LAN rediscovery attempts", + ), + verify_timeout: float = typer.Option( + 1.0, + "--verify-timeout", + help="HTTP verification timeout in seconds", + ), + ble_backend: BleBackend = typer.Option( + BleBackend.auto, + "--ble-backend", + help="BLE backend to use.", + ), + ble_helper: Path | None = typer.Option( + None, + "--ble-helper", + envvar="DIMOS_GO2_BLE_HELPER", + help="macOS BLE helper .app path.", + ), +) -> None: + """Provision wifi, rediscover the robot on LAN, and verify its IP.""" + from dimos.robot.unitree.go2.cli.landiscovery import discover as discover_lan_once + from dimos.robot.unitree.go2.cli.setup import LanRobot, setup_go2_wifi + from dimos.robot.unitree.go2.cli.verify import verify_robot_ip + + try: + backend_runtime = _select_ble_backend(ble_backend, ble_helper) + except _Go2ToolError as e: + _exit_error(str(e)) + _require_helper_timeout(backend_runtime, timeout) + + wifi_password = ( + password + if password is not None + else typer.prompt("wifi password", hide_input=True, default="", show_default=False) + ) + secrets = (wifi_password,) + + async def run() -> None: + if mac is not None: + + async def discover_ble_for_setup() -> list[Go2Device]: + return [Go2Device(name=name or mac, address=mac, serial=serial)] + + else: + + async def discover_ble_for_setup() -> list[Go2Device]: + typer.echo(f"Scanning BLE for {timeout:.0f}s ...") + return await backend_runtime.find_robots( + timeout=timeout, + on_device=lambda d: typer.echo( + _format_row("BLE", d.name, "-", d.address, d.serial or "?") + ), + ) + + async def provision_wifi_for_setup( + address: str, + provision_ssid: str, + provision_password: str, + country_code: str, + ) -> str | None: + return cast( + "str | None", + await retry( + lambda: backend_runtime.provision_wifi( + address, + provision_ssid, + provision_password, + country_code, + timeout=timeout, + connect_retries=_DEFAULT_BLE_CONNECT_RETRIES, + on_progress=lambda m: _echo_safe(f" {m}", secrets), + ), + attempts=retries, + on_error=lambda attempt, exc: _echo_safe( + f" attempt {attempt} failed: {exc}", secrets, err=True + ), + ), + ) + + async def discover_lan_for_setup() -> Sequence[LanRobot]: + loop = asyncio.get_running_loop() + return await loop.run_in_executor( + None, + lambda: discover_lan_once(timeout=lan_timeout), + ) + + result = await setup_go2_wifi( + ssid=ssid, + password=wifi_password, + discover_ble=discover_ble_for_setup, + provision_wifi=provision_wifi_for_setup, + discover_lan=discover_lan_for_setup, + serial=serial, + name=name, + address=mac, + country_code=country, + verify_robot=lambda ip: verify_robot_ip(ip, timeout_s=verify_timeout), + rediscovery_attempts=rediscovery_attempts, + rediscovery_delay_s=rediscovery_delay, + ) + + robot_ip = result.robot_ip or "-" + selected_serial = "-" + if result.lan_robot is not None: + selected_serial = result.lan_robot.serial + elif result.selected_ble is not None: + selected_serial = result.selected_ble.serial or "-" + typer.echo( + f"result={'ok' if result.ok else 'fail'} robot_ip={robot_ip} serial={selected_serial}" + ) + for line in result.summary_lines(): + _echo_safe(line, secrets) + + if not result.ok: + raise typer.Exit(1) + + asyncio.run(run()) + + def main() -> None: app() diff --git a/dimos/robot/unitree/go2/cli/macos_ble_helper.py b/dimos/robot/unitree/go2/cli/macos_ble_helper.py new file mode 100644 index 0000000000..5d733ae92e --- /dev/null +++ b/dimos/robot/unitree/go2/cli/macos_ble_helper.py @@ -0,0 +1,552 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""macOS LaunchServices helper for Unitree BLE provisioning. + +The BLE protocol lives in :mod:`dimos.robot.unitree.go2.cli.ble`. This module +only gives that protocol a macOS app-bundle wrapper so Bluetooth/TCC attributes +are associated with a LaunchServices-launched app rather than the user's shell. +""" + +from __future__ import annotations + +import argparse +import asyncio +from collections.abc import Callable, Sequence +from concurrent.futures import ThreadPoolExecutor +from dataclasses import asdict, dataclass +import json +import os +from pathlib import Path +import platform +import plistlib +import shlex +import stat +import subprocess +import sys +import tempfile +from typing import Any + +from dimos.constants import DIMOS_PROJECT_ROOT, STATE_DIR +from dimos.robot.unitree.go2.cli import ble +from dimos.robot.unitree.go2.cli.ble import Go2Device + +REQUIRED_BLUETOOTH_USAGE_KEYS = ( + "NSBluetoothAlwaysUsageDescription", + "NSBluetoothPeripheralUsageDescription", + "NSBluetoothUsageDescription", +) + +HELPER_APP_ENV = "DIMOS_GO2_BLE_HELPER" +LEGACY_HELPER_APP_ENV = "DIMOS_GO2_BLE_HELPER_APP" +HELPER_APP_NAME = "DimOS BLE Helper.app" +HELPER_BUNDLE_IDENTIFIER = "com.dimensional.dimos.blehelper" +HELPER_EXECUTABLE_NAME = "dimos-ble-helper" +OPEN_COMMAND = "/usr/bin/open" + +JsonObject = dict[str, Any] + + +class MacOSBLEHelperError(RuntimeError): + """Base error for the macOS BLE helper wrapper.""" + + +class MacOSBLEUnsupportedError(MacOSBLEHelperError): + """Raised when the LaunchServices BLE helper is used off macOS.""" + + +class HelperAppValidationError(MacOSBLEHelperError): + """Raised when a helper app bundle is missing required metadata.""" + + +class HelperLaunchError(MacOSBLEHelperError): + """Raised when LaunchServices fails to run the helper app.""" + + +@dataclass(frozen=True) +class HelperAppInfo: + app_path: Path + info_plist_path: Path + executable_path: Path + bundle_identifier: str | None + + +def ensure_macos() -> None: + """Raise a clear error when the LaunchServices backend is unavailable.""" + if platform.system() != "Darwin": + raise MacOSBLEUnsupportedError( + "The macOS LaunchServices BLE helper is only supported on macOS. " + "Use dimos.robot.unitree.go2.cli.ble directly on Linux." + ) + + +def helper_cache_dir() -> Path: + """Return the DimOS state directory used for the generated helper app.""" + return STATE_DIR / "helpers" / "macos-ble" + + +def validate_helper_app(app_path: str | Path) -> HelperAppInfo: + """Validate a BLE helper ``.app`` bundle and its Bluetooth usage metadata.""" + app = Path(app_path).expanduser() + if not app.exists(): + raise HelperAppValidationError(f"helper app does not exist: {app}") + if not app.is_dir() or app.suffix != ".app": + raise HelperAppValidationError(f"helper path is not a .app bundle: {app}") + + info_plist = app / "Contents" / "Info.plist" + if not info_plist.exists(): + raise HelperAppValidationError(f"helper app is missing Info.plist: {info_plist}") + + try: + with info_plist.open("rb") as f: + plist = plistlib.load(f) + except Exception as exc: + raise HelperAppValidationError(f"helper app Info.plist is invalid: {exc}") from exc + + missing = [ + key + for key in REQUIRED_BLUETOOTH_USAGE_KEYS + if not isinstance(plist.get(key), str) or not plist[key].strip() + ] + if missing: + raise HelperAppValidationError( + "helper app Info.plist is missing Bluetooth usage keys: " + ", ".join(missing) + ) + + executable_name = plist.get("CFBundleExecutable") + if not isinstance(executable_name, str) or not executable_name: + raise HelperAppValidationError("helper app Info.plist is missing CFBundleExecutable") + + executable = app / "Contents" / "MacOS" / executable_name + if not executable.exists(): + raise HelperAppValidationError(f"helper executable does not exist: {executable}") + if not os.access(executable, os.X_OK): + raise HelperAppValidationError(f"helper executable is not executable: {executable}") + + bundle_identifier = plist.get("CFBundleIdentifier") + return HelperAppInfo( + app_path=app, + info_plist_path=info_plist, + executable_path=executable, + bundle_identifier=bundle_identifier if isinstance(bundle_identifier, str) else None, + ) + + +def build_cached_helper_app( + *, + cache_dir: str | Path | None = None, + python_executable: str | Path | None = None, +) -> Path: + """Create or refresh the cached LaunchServices BLE helper app.""" + ensure_macos() + + root = Path(cache_dir) if cache_dir is not None else helper_cache_dir() + app_path = root / HELPER_APP_NAME + contents_dir = app_path / "Contents" + macos_dir = contents_dir / "MacOS" + macos_dir.mkdir(parents=True, exist_ok=True) + + python = Path(python_executable) if python_executable is not None else Path(sys.executable) + module = "dimos.robot.unitree.go2.cli.macos_ble_helper" + script = "\n".join( + [ + "#!/bin/sh", + 'if [ -n "${PYTHONPATH:-}" ]; then', + f' export PYTHONPATH={shlex.quote(str(DIMOS_PROJECT_ROOT))}:"$PYTHONPATH"', + "else", + f" export PYTHONPATH={shlex.quote(str(DIMOS_PROJECT_ROOT))}", + "fi", + f'exec {shlex.quote(str(python))} -m {module} --helper-run "$@"', + "", + ] + ) + + executable = macos_dir / HELPER_EXECUTABLE_NAME + executable.write_text(script, encoding="utf-8") + executable.chmod(0o755) + + bluetooth_description = "DimOS uses Bluetooth to discover and provision Unitree robots." + plist: JsonObject = { + "CFBundleDevelopmentRegion": "en", + "CFBundleExecutable": HELPER_EXECUTABLE_NAME, + "CFBundleIdentifier": HELPER_BUNDLE_IDENTIFIER, + "CFBundleInfoDictionaryVersion": "6.0", + "CFBundleName": "DimOS BLE Helper", + "CFBundlePackageType": "APPL", + "CFBundleShortVersionString": "1.0", + "CFBundleVersion": "1", + "DimOSPythonExecutable": str(python), + "LSUIElement": True, + "NSBluetoothAlwaysUsageDescription": bluetooth_description, + "NSBluetoothPeripheralUsageDescription": bluetooth_description, + "NSBluetoothUsageDescription": bluetooth_description, + } + with (contents_dir / "Info.plist").open("wb") as f: + plistlib.dump(plist, f) + + validate_helper_app(app_path) + return app_path + + +def locate_helper_app( + helper_app_path: str | Path | None = None, + *, + create: bool = True, +) -> Path: + """Return a provided, env-configured, or cached BLE helper app path.""" + ensure_macos() + + configured = helper_app_path + if configured is None: + env_value = os.environ.get(HELPER_APP_ENV) or os.environ.get(LEGACY_HELPER_APP_ENV) + configured = env_value if env_value else None + + if configured is not None: + return validate_helper_app(configured).app_path + + cached = helper_cache_dir() / HELPER_APP_NAME + if create: + return build_cached_helper_app() + + return validate_helper_app(cached).app_path + + +async def _invoke_helper_async( + action: str, + params: JsonObject, + *, + password: str | None = None, + helper_app_path: str | Path | None = None, + on_progress: Callable[[str], None] | None = None, + secrets: Sequence[str] = (), +) -> JsonObject: + loop = asyncio.get_running_loop() + with ThreadPoolExecutor(max_workers=1, thread_name_prefix="dimos-go2-ble-helper") as executor: + return await loop.run_in_executor( + executor, + lambda: _invoke_helper( + action, + params, + password=password, + helper_app_path=helper_app_path, + on_progress=on_progress, + secrets=secrets, + ), + ) + + +async def find_robots( + timeout: float = 15.0, + prefixes: tuple[str, ...] = ble.UNITREE_NAME_PREFIXES, + *, + helper_app_path: str | Path | None = None, + on_device: Callable[[Go2Device], None] | None = None, + on_progress: Callable[[str], None] | None = None, +) -> list[Go2Device]: + """Find Unitree robots through the macOS LaunchServices BLE helper.""" + response = await _invoke_helper_async( + "find_robots", + { + "timeout": timeout, + "prefixes": list(prefixes), + }, + helper_app_path=helper_app_path, + on_progress=on_progress, + ) + devices = [ + Go2Device(name=str(d["name"]), address=str(d["address"]), serial=d.get("serial")) + for d in response.get("devices", []) + ] + if on_device is not None: + for device in devices: + on_device(device) + return devices + + +async def provision_wifi( + address: str, + ssid: str, + password: str, + country_code: str = "US", + *, + timeout: float = 30.0, + connect_retries: int = 3, + helper_app_path: str | Path | None = None, + on_progress: Callable[[str], None] | None = None, +) -> str | None: + """Provision wifi through the macOS LaunchServices BLE helper.""" + response = await _invoke_helper_async( + "provision_wifi", + { + "address": address, + "ssid": ssid, + "country_code": country_code, + "timeout": timeout, + "connect_retries": connect_retries, + }, + password=password, + helper_app_path=helper_app_path, + on_progress=on_progress, + secrets=(password,), + ) + serial = response.get("serial") + return str(serial) if serial is not None else None + + +def _invoke_helper( + action: str, + params: JsonObject, + *, + password: str | None = None, + helper_app_path: str | Path | None = None, + on_progress: Callable[[str], None] | None = None, + secrets: Sequence[str] = (), +) -> JsonObject: + helper_app = locate_helper_app(helper_app_path) + safe_secrets = tuple(secret for secret in secrets if secret) + + with tempfile.TemporaryDirectory(prefix="dimos-go2-ble-") as tmp: + tmp_path = Path(tmp) + tmp_path.chmod(0o700) + request_path = tmp_path / "request.json" + response_path = tmp_path / "response.json" + progress_path = tmp_path / "progress.jsonl" + password_path = tmp_path / "password.txt" + + request: JsonObject = { + "schema_version": 1, + "action": action, + "params": params, + } + if password is not None: + _write_text_0600(password_path, password) + request["password_file"] = str(password_path) + + _write_json_0600(request_path, request) + _touch_0600(response_path) + _touch_0600(progress_path) + + cmd = _open_command(helper_app, request_path, response_path, progress_path) + completed = subprocess.run(cmd, check=False, capture_output=True, text=True) + + _emit_progress(progress_path, on_progress, secrets=safe_secrets) + + if completed.returncode != 0: + stderr = _sanitize_text(completed.stderr or completed.stdout or "", safe_secrets) + raise HelperLaunchError( + f"BLE helper exited with status {completed.returncode}: {stderr.strip()}" + ) + + response = _read_json_file(response_path) + if not response: + raise HelperLaunchError("BLE helper did not write a response") + if not response.get("ok"): + error = _sanitize_text(str(response.get("error", "BLE helper failed")), safe_secrets) + raise MacOSBLEHelperError(error) + return response + + +def _open_command( + helper_app_path: Path, + request_path: Path, + response_path: Path, + progress_path: Path, +) -> list[str]: + return [ + OPEN_COMMAND, + "-W", + "-n", + str(helper_app_path), + "--args", + "--helper-request", + str(request_path), + "--helper-response", + str(response_path), + "--helper-progress", + str(progress_path), + ] + + +def _write_text_0600(path: Path, text: str) -> None: + fd = os.open(path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600) + os.fchmod(fd, 0o600) + with os.fdopen(fd, "w", encoding="utf-8") as f: + f.write(text) + + +def _write_json_0600(path: Path, payload: JsonObject) -> None: + fd = os.open(path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600) + os.fchmod(fd, 0o600) + with os.fdopen(fd, "w", encoding="utf-8") as f: + json.dump(payload, f, separators=(",", ":")) + f.write("\n") + + +def _touch_0600(path: Path) -> None: + fd = os.open(path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600) + os.fchmod(fd, 0o600) + os.close(fd) + + +def _read_json_file(path: Path) -> JsonObject: + text = path.read_text(encoding="utf-8").strip() + if not text: + return {} + data = json.loads(text) + if not isinstance(data, dict): + raise HelperLaunchError(f"expected JSON object in {path}") + return data + + +def _append_progress(path: Path, message: str, *, secrets: Sequence[str] = ()) -> None: + safe_message = _sanitize_text(message, secrets) + fd = os.open(path, os.O_WRONLY | os.O_CREAT | os.O_APPEND, 0o600) + os.fchmod(fd, 0o600) + with os.fdopen(fd, "a", encoding="utf-8") as f: + json.dump({"message": safe_message}, f, separators=(",", ":")) + f.write("\n") + + +def _emit_progress( + progress_path: Path, + on_progress: Callable[[str], None] | None, + *, + secrets: Sequence[str] = (), +) -> None: + if on_progress is None or not progress_path.exists(): + return + for line in progress_path.read_text(encoding="utf-8").splitlines(): + if not line: + continue + try: + payload = json.loads(line) + message = str(payload.get("message", "")) + except json.JSONDecodeError: + message = line + if message: + on_progress(_sanitize_text(message, secrets)) + + +def _sanitize_text(text: str, secrets: Sequence[str]) -> str: + safe = text + for secret in secrets: + if secret: + safe = safe.replace(secret, "[REDACTED]") + return safe + + +def _assert_private_file(path: Path) -> None: + mode = stat.S_IMODE(path.stat().st_mode) + if mode & 0o077: + raise HelperLaunchError(f"helper file is not private: {path} mode={oct(mode)}") + + +async def _execute_helper_request(request: JsonObject, progress_path: Path) -> JsonObject: + action = request.get("action") + params = request.get("params", {}) + if not isinstance(params, dict): + raise HelperLaunchError("helper request params must be a JSON object") + + if action == "find_robots": + devices: list[Go2Device] = [] + + def _on_device(device: Go2Device) -> None: + devices.append(device) + _append_progress( + progress_path, + f"found {device.name} {device.address} serial={device.serial or '?'}", + ) + + result = await ble.find_robots( + timeout=float(params.get("timeout", 15.0)), + prefixes=tuple(params.get("prefixes", ble.UNITREE_NAME_PREFIXES)), + on_device=_on_device, + ) + by_address = {device.address: device for device in devices} + for device in result: + by_address[device.address] = device + return {"ok": True, "devices": [asdict(device) for device in by_address.values()]} + + if action == "provision_wifi": + password_file = request.get("password_file") + if not isinstance(password_file, str) or not password_file: + raise HelperLaunchError("provision_wifi request is missing password_file") + password_path = Path(password_file) + _assert_private_file(password_path) + password = password_path.read_text(encoding="utf-8") + + def _on_progress(message: str) -> None: + _append_progress(progress_path, message, secrets=(password,)) + + serial = await ble.provision_wifi( + address=str(params["address"]), + ssid=str(params["ssid"]), + password=password, + country_code=str(params.get("country_code", "US")), + timeout=float(params.get("timeout", 30.0)), + connect_retries=int(params.get("connect_retries", 3)), + on_progress=_on_progress, + ) + return {"ok": True, "serial": serial} + + raise HelperLaunchError(f"unknown helper action: {action}") + + +def _run_helper_request(request_path: Path, response_path: Path, progress_path: Path) -> int: + secrets: tuple[str, ...] = () + try: + _assert_private_file(request_path) + _assert_private_file(response_path) + _assert_private_file(progress_path) + request = _read_json_file(request_path) + + password_file = request.get("password_file") + if isinstance(password_file, str) and password_file: + password_path = Path(password_file) + _assert_private_file(password_path) + secrets = (password_path.read_text(encoding="utf-8"),) + + response = asyncio.run(_execute_helper_request(request, progress_path)) + except Exception as exc: + response = { + "ok": False, + "error_type": type(exc).__name__, + "error": _sanitize_text(f"{type(exc).__name__}: {exc}", secrets), + } + _write_json_0600(response_path, response) + return 0 + + +def main(argv: Sequence[str] | None = None) -> int: + parser = argparse.ArgumentParser(description="DimOS macOS BLE helper") + parser.add_argument("--helper-run", action="store_true") + parser.add_argument("--helper-request") + parser.add_argument("--helper-response") + parser.add_argument("--helper-progress") + args = parser.parse_args(argv) + + if not args.helper_run: + parser.error("this module is intended to be launched by the DimOS BLE helper app") + if not args.helper_request or not args.helper_response or not args.helper_progress: + parser.error("helper request, response, and progress paths are required") + + return _run_helper_request( + Path(args.helper_request), + Path(args.helper_response), + Path(args.helper_progress), + ) + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/dimos/robot/unitree/go2/cli/setup.py b/dimos/robot/unitree/go2/cli/setup.py new file mode 100644 index 0000000000..3163f9ec8b --- /dev/null +++ b/dimos/robot/unitree/go2/cli/setup.py @@ -0,0 +1,405 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Guided Go2 wifi setup orchestration without Typer wiring.""" + +from __future__ import annotations + +import asyncio +from collections.abc import Awaitable, Callable, Sequence +from dataclasses import dataclass, replace +from typing import Any, Protocol + +from dimos.robot.unitree.go2.cli.verify import Go2VerifyStatus, verify_robot_ip + + +class BluetoothRobot(Protocol): + @property + def name(self) -> str: ... + + @property + def address(self) -> str: ... + + @property + def serial(self) -> str | None: ... + + +class LanRobot(Protocol): + @property + def serial(self) -> str: ... + + @property + def ip(self) -> str: ... + + +BleDiscovery = Callable[[], Awaitable[Sequence[BluetoothRobot]]] +LanDiscovery = Callable[[], Awaitable[Sequence[LanRobot]]] +WifiProvisioner = Callable[[str, str, str, str], Awaitable[str | None]] +RobotVerifier = Callable[[str], Go2VerifyStatus] + + +@dataclass(frozen=True) +class Go2BleSelection: + name: str | None + address: str + serial: str | None + + def to_dict(self) -> dict[str, Any]: + return { + "name": self.name, + "address": self.address, + "serial": self.serial, + } + + +@dataclass(frozen=True) +class Go2LanSelection: + serial: str + ip: str + mac: str | None = None + iface: str | None = None + + def to_dict(self) -> dict[str, Any]: + return { + "serial": self.serial, + "ip": self.ip, + "mac": self.mac, + "iface": self.iface, + } + + +@dataclass(frozen=True) +class SetupStep: + name: str + ok: bool + message: str + + def to_dict(self) -> dict[str, Any]: + return { + "name": self.name, + "ok": self.ok, + "message": self.message, + } + + +@dataclass(frozen=True) +class Go2WifiSetupResult: + ok: bool + selected_ble: Go2BleSelection | None + lan_robot: Go2LanSelection | None + verification: Go2VerifyStatus | None + next_steps: tuple[str, ...] + steps: tuple[SetupStep, ...] + error: str | None = None + + @property + def robot_ip(self) -> str | None: + return self.lan_robot.ip if self.lan_robot else None + + def to_dict(self) -> dict[str, Any]: + return { + "ok": self.ok, + "selected_ble": self.selected_ble.to_dict() if self.selected_ble else None, + "lan_robot": self.lan_robot.to_dict() if self.lan_robot else None, + "robot_ip": self.robot_ip, + "verification": self.verification.to_dict() if self.verification else None, + "next_steps": list(self.next_steps), + "steps": [step.to_dict() for step in self.steps], + "error": self.error, + } + + def summary_lines(self) -> tuple[str, ...]: + lines = [ + f"{'OK' if step.ok else 'FAIL'} {step.name}: {step.message}" for step in self.steps + ] + if self.error: + lines.append(f"Error: {self.error}") + lines.extend(self.next_steps) + return tuple(lines) + + +def _redact(text: str | None, secret: str) -> str | None: + if text is None or not secret: + return text + return text.replace(secret, "[REDACTED]") + + +def _redact_status(status: Go2VerifyStatus, secret: str) -> Go2VerifyStatus: + return replace( + status, + reason=_redact(status.reason, secret), + error=_redact(status.error, secret), + ) + + +def _same_address(left: str, right: str) -> bool: + return left.casefold() == right.casefold() + + +def _matches_ble( + robot: BluetoothRobot, + *, + serial: str | None, + name: str | None, + address: str | None, +) -> bool: + if serial is not None and robot.serial != serial: + return False + if name is not None and robot.name != name: + return False + if address is not None and not _same_address(robot.address, address): + return False + return True + + +def _criteria_text(serial: str | None, name: str | None, address: str | None) -> str: + pairs = [ + ("serial", serial), + ("name", name), + ("address", address), + ] + parts = [f"{key}={value}" for key, value in pairs if value is not None] + return ", ".join(parts) if parts else "available robot" + + +def _select_ble_robot( + robots: Sequence[BluetoothRobot], + *, + serial: str | None, + name: str | None, + address: str | None, +) -> tuple[Go2BleSelection | None, str | None]: + if address is not None: + address_matches = [robot for robot in robots if _same_address(robot.address, address)] + matches = [ + robot + for robot in address_matches + if _matches_ble(robot, serial=serial, name=name, address=address) + ] + if len(matches) > 1: + return None, f"Ambiguous BLE robot selection: {len(matches)} robots matched address." + if matches: + robot = matches[0] + return Go2BleSelection( + name=robot.name, address=robot.address, serial=robot.serial + ), None + if address_matches: + return None, f"No BLE robot matched {_criteria_text(serial, name, address)}." + return Go2BleSelection(name=name, address=address, serial=serial), None + + matches = [ + robot for robot in robots if _matches_ble(robot, serial=serial, name=name, address=address) + ] + if not matches: + return None, f"No BLE robot matched {_criteria_text(serial, name, address)}." + if len(matches) > 1: + return ( + None, + f"Ambiguous BLE robot selection: {len(matches)} robots matched " + f"{_criteria_text(serial, name, address)}.", + ) + robot = matches[0] + return Go2BleSelection(name=robot.name, address=robot.address, serial=robot.serial), None + + +def _optional_str_attr(value: Any) -> str | None: + return value if isinstance(value, str) else None + + +def _lan_selection(robot: LanRobot) -> Go2LanSelection: + return Go2LanSelection( + serial=robot.serial, + ip=robot.ip, + mac=_optional_str_attr(getattr(robot, "mac", None)), + iface=_optional_str_attr(getattr(robot, "iface", None)), + ) + + +def _select_lan_robot( + robots: Sequence[LanRobot], + *, + serial: str | None, +) -> tuple[Go2LanSelection | None, str | None]: + if serial is not None: + matches = [robot for robot in robots if robot.serial == serial] + if len(matches) == 1: + return _lan_selection(matches[0]), None + if len(matches) > 1: + return None, f"Ambiguous LAN discovery: {len(matches)} robots matched serial={serial}." + return None, f"No LAN robot matched serial={serial}." + + if len(robots) == 1: + return _lan_selection(robots[0]), None + if robots: + return None, f"Ambiguous LAN discovery: {len(robots)} robots found and no serial is known." + return None, "No LAN robots found." + + +def _next_steps(ip: str) -> tuple[str, ...]: + return ( + f"export ROBOT_IP={ip}", + f"dimos --robot-ip {ip} run unitree-go2", + ) + + +def _result( + *, + ok: bool, + selected_ble: Go2BleSelection | None, + lan_robot: Go2LanSelection | None, + verification: Go2VerifyStatus | None, + next_steps: tuple[str, ...], + steps: list[SetupStep], + error: str | None = None, +) -> Go2WifiSetupResult: + return Go2WifiSetupResult( + ok=ok, + selected_ble=selected_ble, + lan_robot=lan_robot, + verification=verification, + next_steps=next_steps, + steps=tuple(steps), + error=error, + ) + + +async def setup_go2_wifi( + *, + ssid: str, + password: str, + discover_ble: BleDiscovery, + provision_wifi: WifiProvisioner, + discover_lan: LanDiscovery, + serial: str | None = None, + name: str | None = None, + address: str | None = None, + country_code: str = "US", + verify_robot: RobotVerifier | None = None, + rediscovery_attempts: int = 5, + rediscovery_delay_s: float = 2.0, +) -> Go2WifiSetupResult: + """Provision Go2 wifi over BLE, rediscover it on LAN, then verify WebRTC. + + The caller injects BLE discovery, wifi provisioning, and LAN discovery + functions. This keeps Typer prompting and concrete backends outside the + orchestration so it can be tested without hardware. + """ + steps: list[SetupStep] = [] + + def add_step(name: str, ok: bool, message: str) -> None: + steps.append(SetupStep(name=name, ok=ok, message=_redact(message, password) or "")) + + def fail( + name: str, + message: str, + *, + selected_ble: Go2BleSelection | None = None, + lan_robot: Go2LanSelection | None = None, + verification: Go2VerifyStatus | None = None, + ) -> Go2WifiSetupResult: + redacted = _redact(message, password) or "" + add_step(name, False, redacted) + return _result( + ok=False, + selected_ble=selected_ble, + lan_robot=lan_robot, + verification=verification, + next_steps=(), + steps=steps, + error=redacted, + ) + + try: + ble_robots = list(await discover_ble()) + except Exception as e: + return fail("ble_discovery", f"BLE discovery failed: {e}") + add_step("ble_discovery", True, f"Found {len(ble_robots)} BLE robot(s).") + + selected_ble, selection_error = _select_ble_robot( + ble_robots, serial=serial, name=name, address=address + ) + if selected_ble is None: + return fail("select_robot", selection_error or "No BLE robot selected.") + add_step( + "select_robot", + True, + f"Selected BLE robot address={selected_ble.address} serial={selected_ble.serial or '?'}", + ) + + try: + provisioned_serial = await provision_wifi( + selected_ble.address, + ssid, + password, + country_code, + ) + except Exception as e: + return fail("provision_wifi", f"wifi provisioning failed: {e}", selected_ble=selected_ble) + add_step("provision_wifi", True, "Provisioned wifi credentials over BLE.") + + target_serial = provisioned_serial or selected_ble.serial + lan_robot: Go2LanSelection | None = None + lan_error: str | None = None + attempts = max(1, rediscovery_attempts) + for attempt in range(attempts): + try: + lan_robots = list(await discover_lan()) + except Exception as e: + return fail("lan_discovery", f"LAN discovery failed: {e}", selected_ble=selected_ble) + + lan_robot, lan_error = _select_lan_robot(lan_robots, serial=target_serial) + if lan_robot is not None: + break + if lan_error and lan_error.startswith("Ambiguous"): + return fail("lan_discovery", lan_error, selected_ble=selected_ble) + if attempt + 1 < attempts: + await asyncio.sleep(rediscovery_delay_s) + + if lan_robot is None: + return fail( + "lan_discovery", + lan_error or "No LAN robot found after wifi provisioning.", + selected_ble=selected_ble, + ) + add_step("lan_discovery", True, f"Found LAN robot {lan_robot.serial} at {lan_robot.ip}.") + + verifier = verify_robot or verify_robot_ip + try: + verification = _redact_status(verifier(lan_robot.ip), password) + except Exception as e: + return fail( + "verify", + f"Verification probe failed: {e}", + selected_ble=selected_ble, + lan_robot=lan_robot, + ) + if not verification.ok: + message = " ".join(verification.summary_lines()) + return fail( + "verify", + message, + selected_ble=selected_ble, + lan_robot=lan_robot, + verification=verification, + ) + add_step("verify", True, f"Verified Go2 WebRTC endpoint at {verification.url}.") + + return _result( + ok=True, + selected_ble=selected_ble, + lan_robot=lan_robot, + verification=verification, + next_steps=_next_steps(lan_robot.ip), + steps=steps, + ) diff --git a/dimos/robot/unitree/go2/cli/test_go2tool.py b/dimos/robot/unitree/go2/cli/test_go2tool.py new file mode 100644 index 0000000000..721117402b --- /dev/null +++ b/dimos/robot/unitree/go2/cli/test_go2tool.py @@ -0,0 +1,530 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import asyncio +from collections.abc import AsyncIterator, Callable +from pathlib import Path + +import pytest +from typer.testing import CliRunner + +from dimos.robot.unitree.go2.cli import ( + ble, + go2tool, + landiscovery, + macos_ble_helper, + setup, + verify as verify_module, +) +from dimos.robot.unitree.go2.cli.verify import Go2VerifyStatus + +_runner = CliRunner() + + +def _verify_status(ip: str, *, ok: bool) -> Go2VerifyStatus: + return Go2VerifyStatus( + robot_ip=ip, + url=f"http://{ip}:9991/con_notify", + ok=ok, + status_code=200 if ok else 503, + reason="OK" if ok else "Unavailable", + error=None, + elapsed_s=0.01, + timeout_s=1.0, + ) + + +def test_auto_backend_uses_helper_on_darwin( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, +) -> None: + helper_path = tmp_path / "DimOS BLE Helper.app" + calls: list[tuple[str, Path | None]] = [] + + async def fake_helper_provision_wifi( + address: str, + ssid: str, + password: str, + country_code: str, + *, + timeout: float, + connect_retries: int, + helper_app_path: Path | None = None, + on_progress: Callable[[str], None] | None = None, + ) -> str | None: + assert address == "AA:BB:CC:DD:EE:FF" + assert ssid == "lab-wifi" + assert password == "secret" + assert country_code == "US" + assert timeout == 3.0 + assert connect_retries == go2tool._DEFAULT_BLE_CONNECT_RETRIES + assert on_progress is not None + calls.append(("helper", helper_app_path)) + return "SER123" + + async def fake_direct_provision_wifi(*args: object, **kwargs: object) -> str | None: + pytest.fail("direct BLE backend should not be used on Darwin auto") + + monkeypatch.setattr(go2tool.platform, "system", lambda: "Darwin") + monkeypatch.setattr(macos_ble_helper, "provision_wifi", fake_helper_provision_wifi) + monkeypatch.setattr(ble, "provision_wifi", fake_direct_provision_wifi) + + result = _runner.invoke( + go2tool.app, + [ + "connect-wifi", + "--ssid", + "lab-wifi", + "--password", + "secret", + "--mac", + "AA:BB:CC:DD:EE:FF", + "--timeout", + "3", + "--retries", + "1", + "--ble-backend", + "auto", + "--ble-helper", + str(helper_path), + ], + ) + + assert result.exit_code == 0, result.output + assert calls == [("helper", helper_path)] + + +def test_auto_backend_uses_direct_on_linux(monkeypatch: pytest.MonkeyPatch) -> None: + calls: list[str] = [] + + async def fake_direct_provision_wifi( + address: str, + ssid: str, + password: str, + country_code: str, + *, + timeout: float, + connect_retries: int, + on_progress: Callable[[str], None] | None = None, + ) -> str | None: + assert address == "AA:BB:CC:DD:EE:FF" + assert ssid == "lab-wifi" + assert password == "secret" + assert country_code == "US" + assert timeout == 3.0 + assert connect_retries == go2tool._DEFAULT_BLE_CONNECT_RETRIES + assert on_progress is not None + calls.append("direct") + return "SER123" + + async def fake_helper_provision_wifi(*args: object, **kwargs: object) -> str | None: + pytest.fail("helper BLE backend should not be used on Linux auto") + + monkeypatch.setattr(go2tool.platform, "system", lambda: "Linux") + monkeypatch.setattr(ble, "provision_wifi", fake_direct_provision_wifi) + monkeypatch.setattr(macos_ble_helper, "provision_wifi", fake_helper_provision_wifi) + + result = _runner.invoke( + go2tool.app, + [ + "connect-wifi", + "--ssid", + "lab-wifi", + "--password", + "secret", + "--mac", + "AA:BB:CC:DD:EE:FF", + "--timeout", + "3", + "--retries", + "1", + "--ble-backend", + "auto", + ], + ) + + assert result.exit_code == 0, result.output + assert calls == ["direct"] + + +def test_discover_lan_bypasses_ble_backend(monkeypatch: pytest.MonkeyPatch) -> None: + async def fake_discover_lan( + tick: float = 2.0, + timeout: float = 1.5, + iface_ip: str | None = None, + ) -> AsyncIterator[landiscovery.Go2Device]: + assert tick == 0.0 + assert timeout == 1.5 + assert iface_ip is None + yield landiscovery.Go2Device( + serial="SN123", + ip="192.168.1.70", + iface="en0", + mac="00:11:22:33:44:55", + ) + while True: + await asyncio.sleep(1) + + monkeypatch.setattr(go2tool.platform, "system", lambda: "Linux") + monkeypatch.setattr(landiscovery, "discover_lan", fake_discover_lan) + + result = _runner.invoke( + go2tool.app, + [ + "discover", + "--lan", + "--ble-backend", + "helper", + "--timeout", + "0.01", + "--lan-tick", + "0", + ], + ) + + assert result.exit_code == 0, result.output + assert "LAN" in result.output + assert "192.168.1.70" in result.output + + +def test_helper_backend_rejects_timeout_zero(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr(go2tool.platform, "system", lambda: "Darwin") + + result = _runner.invoke( + go2tool.app, + ["discover", "--ble", "--ble-backend", "helper", "--timeout", "0"], + ) + + assert result.exit_code == 1 + assert "BLE helper backend cannot run with --timeout 0" in result.output + + +def test_discover_auto_timeout_zero_uses_direct_ble_on_darwin( + monkeypatch: pytest.MonkeyPatch, +) -> None: + async def fake_discover_ble() -> AsyncIterator[ble.Go2Device]: + yield ble.Go2Device(name="Go2_49060", address="AA:BB:CC:DD:EE:FF", serial="SER123") + + async def fake_helper_find_robots(*args: object, **kwargs: object) -> list[ble.Go2Device]: + pytest.fail("macOS auto discovery with --timeout 0 should use direct BLE") + + monkeypatch.setattr(go2tool.platform, "system", lambda: "Darwin") + monkeypatch.setattr(ble, "discover_ble", fake_discover_ble) + monkeypatch.setattr(macos_ble_helper, "find_robots", fake_helper_find_robots) + + result = _runner.invoke( + go2tool.app, + ["discover", "--ble", "--ble-backend", "auto", "--timeout", "0"], + ) + + assert result.exit_code == 0, result.output + assert "AA:BB:CC:DD:EE:FF" in result.output + assert "SER123" in result.output + + +def test_discover_surfaces_ble_backend_errors(monkeypatch: pytest.MonkeyPatch) -> None: + async def fake_find_robots(*args: object, **kwargs: object) -> list[ble.Go2Device]: + raise RuntimeError("invalid helper app") + + monkeypatch.setattr(go2tool.platform, "system", lambda: "Darwin") + monkeypatch.setattr(macos_ble_helper, "find_robots", fake_find_robots) + + result = _runner.invoke( + go2tool.app, + ["discover", "--ble", "--ble-backend", "helper", "--timeout", "0.1"], + ) + + assert result.exit_code == 1 + assert "invalid helper app" in result.output + + +def test_connect_wifi_mac_skips_scan_and_redacts_prompted_password( + monkeypatch: pytest.MonkeyPatch, +) -> None: + secret = "super-secret-password" + scan_called = False + + async def fake_find_robots(*args: object, **kwargs: object) -> list[ble.Go2Device]: + nonlocal scan_called + scan_called = True + return [] + + async def fake_provision_wifi( + address: str, + ssid: str, + password: str, + country_code: str, + *, + timeout: float, + connect_retries: int, + on_progress: Callable[[str], None] | None = None, + ) -> str | None: + assert address == "AA:BB:CC:DD:EE:FF" + assert ssid == "lab-wifi" + assert password == secret + assert country_code == "US" + assert timeout == 2.0 + assert connect_retries == go2tool._DEFAULT_BLE_CONNECT_RETRIES + assert on_progress is not None + on_progress(f"unsafe progress {secret}") + return "SER123" + + monkeypatch.setattr(go2tool.platform, "system", lambda: "Linux") + monkeypatch.setattr(ble, "find_robots", fake_find_robots) + monkeypatch.setattr(ble, "provision_wifi", fake_provision_wifi) + + result = _runner.invoke( + go2tool.app, + [ + "connect-wifi", + "--ssid", + "lab-wifi", + "--mac", + "AA:BB:CC:DD:EE:FF", + "--timeout", + "2", + "--retries", + "1", + "--ble-backend", + "direct", + ], + input=f"{secret}\n", + ) + + assert result.exit_code == 0, result.output + assert not scan_called + assert secret not in result.output + assert "[REDACTED]" in result.output + assert "Provisioned. Serial: SER123" in result.output + + +def test_connect_wifi_retries_outer_provision_attempts( + monkeypatch: pytest.MonkeyPatch, +) -> None: + connect_retry_counts: list[int] = [] + + async def fake_provision_wifi( + address: str, + ssid: str, + password: str, + country_code: str, + *, + timeout: float, + connect_retries: int, + on_progress: Callable[[str], None] | None = None, + ) -> str | None: + assert address == "AA:BB:CC:DD:EE:FF" + assert ssid == "lab-wifi" + assert password == "secret" + assert country_code == "US" + assert timeout == 2.0 + connect_retry_counts.append(connect_retries) + if len(connect_retry_counts) == 1: + raise RuntimeError("temporary provisioning failure") + return "SER123" + + monkeypatch.setattr(go2tool.platform, "system", lambda: "Linux") + monkeypatch.setattr(ble, "provision_wifi", fake_provision_wifi) + + result = _runner.invoke( + go2tool.app, + [ + "connect-wifi", + "--ssid", + "lab-wifi", + "--password", + "secret", + "--mac", + "AA:BB:CC:DD:EE:FF", + "--timeout", + "2", + "--retries", + "2", + "--ble-backend", + "direct", + ], + ) + + assert result.exit_code == 0, result.output + assert connect_retry_counts == [ + go2tool._DEFAULT_BLE_CONNECT_RETRIES, + go2tool._DEFAULT_BLE_CONNECT_RETRIES, + ] + assert "attempt 1 failed: temporary provisioning failure" in result.output + assert "Provisioned. Serial: SER123" in result.output + + +@pytest.mark.parametrize(("ok", "expected_exit_code"), [(True, 0), (False, 1)]) +def test_verify_exit_codes( + monkeypatch: pytest.MonkeyPatch, + ok: bool, + expected_exit_code: int, +) -> None: + def fake_verify_robot_ip(robot_ip: str, *, timeout_s: float) -> Go2VerifyStatus: + assert robot_ip == "192.168.1.70" + assert timeout_s == 0.25 + return _verify_status(robot_ip, ok=ok) + + monkeypatch.setattr(verify_module, "verify_robot_ip", fake_verify_robot_ip) + + result = _runner.invoke( + go2tool.app, + ["verify", "--robot-ip", "192.168.1.70", "--timeout", "0.25"], + ) + + assert result.exit_code == expected_exit_code + assert "192.168.1.70" in result.output + + +def test_setup_success_prints_summary_and_redacts_secret(monkeypatch: pytest.MonkeyPatch) -> None: + secret = "super-secret-password" + + async def fake_setup_go2_wifi(**kwargs: object) -> setup.Go2WifiSetupResult: + assert kwargs["ssid"] == "lab-wifi" + assert kwargs["password"] == secret + assert kwargs["address"] == "AA:BB:CC:DD:EE:FF" + return setup.Go2WifiSetupResult( + ok=True, + selected_ble=setup.Go2BleSelection( + name="Go2_123", + address="AA:BB:CC:DD:EE:FF", + serial="SN123", + ), + lan_robot=setup.Go2LanSelection( + serial="SN123", + ip="192.168.1.70", + mac="00:11:22:33:44:55", + iface="en0", + ), + verification=_verify_status("192.168.1.70", ok=True), + next_steps=( + "export ROBOT_IP=192.168.1.70", + f"redacted next step {secret}", + ), + steps=( + setup.SetupStep( + name="provision_wifi", + ok=True, + message=f"unsafe step {secret}", + ), + ), + ) + + monkeypatch.setattr(go2tool.platform, "system", lambda: "Linux") + monkeypatch.setattr(setup, "setup_go2_wifi", fake_setup_go2_wifi) + + result = _runner.invoke( + go2tool.app, + [ + "setup", + "--ssid", + "lab-wifi", + "--password", + secret, + "--mac", + "AA:BB:CC:DD:EE:FF", + "--ble-backend", + "direct", + ], + ) + + assert result.exit_code == 0, result.output + assert "result=ok robot_ip=192.168.1.70 serial=SN123" in result.output + assert "export ROBOT_IP=192.168.1.70" in result.output + assert secret not in result.output + assert "[REDACTED]" in result.output + + +def test_setup_retries_outer_provision_attempts(monkeypatch: pytest.MonkeyPatch) -> None: + connect_retry_counts: list[int] = [] + + async def fake_provision_wifi( + address: str, + ssid: str, + password: str, + country_code: str, + *, + timeout: float, + connect_retries: int, + on_progress: Callable[[str], None] | None = None, + ) -> str | None: + assert address == "AA:BB:CC:DD:EE:FF" + assert ssid == "lab-wifi" + assert password == "secret" + assert country_code == "US" + assert timeout == 2.0 + connect_retry_counts.append(connect_retries) + if len(connect_retry_counts) == 1: + raise RuntimeError("temporary provisioning failure") + return "SER123" + + def fake_discover_lan(timeout: float = 2.0) -> list[landiscovery.Go2Device]: + assert timeout == 0.1 + return [ + landiscovery.Go2Device( + serial="SER123", + ip="192.168.1.70", + iface="en0", + mac="00:11:22:33:44:55", + ) + ] + + def fake_verify_robot_ip(robot_ip: str, *, timeout_s: float) -> Go2VerifyStatus: + assert robot_ip == "192.168.1.70" + assert timeout_s == 0.25 + return _verify_status(robot_ip, ok=True) + + monkeypatch.setattr(go2tool.platform, "system", lambda: "Linux") + monkeypatch.setattr(ble, "provision_wifi", fake_provision_wifi) + monkeypatch.setattr(landiscovery, "discover", fake_discover_lan) + monkeypatch.setattr(verify_module, "verify_robot_ip", fake_verify_robot_ip) + + result = _runner.invoke( + go2tool.app, + [ + "setup", + "--ssid", + "lab-wifi", + "--password", + "secret", + "--mac", + "AA:BB:CC:DD:EE:FF", + "--serial", + "SER123", + "--timeout", + "2", + "--retries", + "2", + "--lan-timeout", + "0.1", + "--rediscovery-attempts", + "1", + "--rediscovery-delay", + "0", + "--verify-timeout", + "0.25", + "--ble-backend", + "direct", + ], + ) + + assert result.exit_code == 0, result.output + assert connect_retry_counts == [ + go2tool._DEFAULT_BLE_CONNECT_RETRIES, + go2tool._DEFAULT_BLE_CONNECT_RETRIES, + ] + assert "attempt 1 failed: temporary provisioning failure" in result.output + assert "result=ok robot_ip=192.168.1.70 serial=SER123" in result.output diff --git a/dimos/robot/unitree/go2/cli/test_macos_ble_helper.py b/dimos/robot/unitree/go2/cli/test_macos_ble_helper.py new file mode 100644 index 0000000000..7b84cd2a67 --- /dev/null +++ b/dimos/robot/unitree/go2/cli/test_macos_ble_helper.py @@ -0,0 +1,300 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import json +from pathlib import Path +import plistlib +import stat +import subprocess +import threading + +import pytest + +from dimos.robot.unitree.go2.cli import macos_ble_helper + + +def _make_helper_app(tmp_path: Path, *, missing_keys: set[str] | None = None) -> Path: + app_path = tmp_path / "Test BLE Helper.app" + macos_dir = app_path / "Contents" / "MacOS" + macos_dir.mkdir(parents=True) + + executable = macos_dir / "helper" + executable.write_text("#!/bin/sh\nexit 0\n", encoding="utf-8") + executable.chmod(0o755) + + plist: dict[str, object] = { + "CFBundleExecutable": "helper", + "CFBundleIdentifier": "com.example.dimos.testblehelper", + "CFBundleName": "Test BLE Helper", + } + for key in macos_ble_helper.REQUIRED_BLUETOOTH_USAGE_KEYS: + plist[key] = "Bluetooth is used to provision Unitree robots." + for key in missing_keys or set(): + plist.pop(key, None) + + with (app_path / "Contents" / "Info.plist").open("wb") as f: + plistlib.dump(plist, f) + + return app_path + + +def _mode(path: Path) -> int: + return stat.S_IMODE(path.stat().st_mode) + + +def test_validate_helper_app_accepts_required_bluetooth_metadata(tmp_path: Path) -> None: + app_path = _make_helper_app(tmp_path) + + info = macos_ble_helper.validate_helper_app(app_path) + + assert info.app_path == app_path + assert info.bundle_identifier == "com.example.dimos.testblehelper" + assert info.executable_path.name == "helper" + + +@pytest.mark.parametrize("missing_key", macos_ble_helper.REQUIRED_BLUETOOTH_USAGE_KEYS) +def test_validate_helper_app_rejects_missing_bluetooth_usage_key( + tmp_path: Path, + missing_key: str, +) -> None: + app_path = _make_helper_app(tmp_path, missing_keys={missing_key}) + + with pytest.raises(macos_ble_helper.HelperAppValidationError, match=missing_key): + macos_ble_helper.validate_helper_app(app_path) + + +def test_locate_helper_app_builds_cached_app_with_bluetooth_keys( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, +) -> None: + monkeypatch.setattr(macos_ble_helper.platform, "system", lambda: "Darwin") + monkeypatch.setattr(macos_ble_helper, "STATE_DIR", tmp_path) + + app_path = macos_ble_helper.locate_helper_app() + + assert app_path == tmp_path / "helpers" / "macos-ble" / macos_ble_helper.HELPER_APP_NAME + info = macos_ble_helper.validate_helper_app(app_path) + plist = plistlib.loads(info.info_plist_path.read_bytes()) + for key in macos_ble_helper.REQUIRED_BLUETOOTH_USAGE_KEYS: + assert plist[key] + + +@pytest.mark.asyncio +async def test_find_robots_rejects_non_macos(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr(macos_ble_helper.platform, "system", lambda: "Linux") + + with pytest.raises(macos_ble_helper.MacOSBLEUnsupportedError, match="only supported on macOS"): + await macos_ble_helper.find_robots(timeout=0.1) + + +@pytest.mark.asyncio +async def test_helper_async_wrappers_run_off_event_loop(monkeypatch: pytest.MonkeyPatch) -> None: + loop_thread = threading.current_thread() + helper_threads: list[threading.Thread] = [] + calls: list[str] = [] + + def fake_invoke_helper( + action: str, + params: macos_ble_helper.JsonObject, + **kwargs: object, + ) -> macos_ble_helper.JsonObject: + helper_threads.append(threading.current_thread()) + calls.append(action) + assert threading.current_thread() is not loop_thread + if action == "find_robots": + assert params["timeout"] == 2.0 + return { + "ok": True, + "devices": [ + {"name": "Go2_49060", "address": "AA:BB:CC:DD:EE:FF", "serial": "SER123"} + ], + } + if action == "provision_wifi": + assert params["connect_retries"] == 3 + assert kwargs["password"] == "secret" + return {"ok": True, "serial": "SER123"} + raise AssertionError(f"unexpected action {action}") + + monkeypatch.setattr(macos_ble_helper.platform, "system", lambda: "Darwin") + monkeypatch.setattr(macos_ble_helper, "_invoke_helper", fake_invoke_helper) + + devices = await macos_ble_helper.find_robots(timeout=2.0) + serial = await macos_ble_helper.provision_wifi( + "AA:BB:CC:DD:EE:FF", + "lab-wifi", + "secret", + "US", + connect_retries=3, + ) + + assert calls == ["find_robots", "provision_wifi"] + assert all(thread is not loop_thread for thread in helper_threads) + assert devices[0].serial == "SER123" + assert serial == "SER123" + + +@pytest.mark.asyncio +async def test_find_robots_uses_launchservices_helper( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, +) -> None: + app_path = _make_helper_app(tmp_path) + calls: list[list[str]] = [] + seen_devices: list[macos_ble_helper.Go2Device] = [] + + def fake_run( + cmd: list[str], + *, + check: bool, + capture_output: bool, + text: bool, + ) -> subprocess.CompletedProcess[str]: + assert check is False + assert capture_output is True + assert text is True + calls.append(cmd) + + request_path = Path(cmd[cmd.index("--helper-request") + 1]) + response_path = Path(cmd[cmd.index("--helper-response") + 1]) + progress_path = Path(cmd[cmd.index("--helper-progress") + 1]) + request = json.loads(request_path.read_text(encoding="utf-8")) + + assert request == { + "schema_version": 1, + "action": "find_robots", + "params": {"timeout": 2.5, "prefixes": ["Go2_"]}, + } + assert _mode(request_path) == 0o600 + assert _mode(response_path) == 0o600 + assert _mode(progress_path) == 0o600 + + response_path.write_text( + json.dumps( + { + "ok": True, + "devices": [ + {"name": "Go2_49060", "address": "AA:BB:CC:DD:EE:FF", "serial": "SER123"} + ], + } + ), + encoding="utf-8", + ) + return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="") + + monkeypatch.setattr(macos_ble_helper.platform, "system", lambda: "Darwin") + monkeypatch.setattr(macos_ble_helper.subprocess, "run", fake_run) + + devices = await macos_ble_helper.find_robots( + timeout=2.5, + prefixes=("Go2_",), + helper_app_path=app_path, + on_device=seen_devices.append, + ) + + assert calls[0][:3] == [macos_ble_helper.OPEN_COMMAND, "-W", "-n"] + assert calls[0][3] == str(app_path) + assert "--args" in calls[0] + assert devices == seen_devices + assert devices[0].serial == "SER123" + + +@pytest.mark.asyncio +async def test_provision_wifi_keeps_password_out_of_argv_request_and_progress( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, +) -> None: + app_path = _make_helper_app(tmp_path) + password = "secret-wifi-password" + progress: list[str] = [] + captured_cmds: list[list[str]] = [] + + def fake_run( + cmd: list[str], + *, + check: bool, + capture_output: bool, + text: bool, + ) -> subprocess.CompletedProcess[str]: + assert check is False + assert capture_output is True + assert text is True + captured_cmds.append(cmd) + assert password not in " ".join(cmd) + + request_path = Path(cmd[cmd.index("--helper-request") + 1]) + response_path = Path(cmd[cmd.index("--helper-response") + 1]) + progress_path = Path(cmd[cmd.index("--helper-progress") + 1]) + request_text = request_path.read_text(encoding="utf-8") + request = json.loads(request_text) + + assert password not in request_text + assert request["action"] == "provision_wifi" + assert request["params"] == { + "address": "AA:BB:CC:DD:EE:FF", + "ssid": "lab-wifi", + "country_code": "US", + "timeout": 7.0, + "connect_retries": 2, + } + + password_path = Path(request["password_file"]) + assert _mode(request_path) == 0o600 + assert _mode(response_path) == 0o600 + assert _mode(progress_path) == 0o600 + assert _mode(password_path) == 0o600 + assert password_path.read_text(encoding="utf-8") == password + + progress_path.write_text( + json.dumps({"message": f"unsafe progress contains {password}"}) + "\n", + encoding="utf-8", + ) + response_path.write_text(json.dumps({"ok": True, "serial": "SER123"}), encoding="utf-8") + return subprocess.CompletedProcess(cmd, 0, stdout=f"stdout {password}", stderr="") + + monkeypatch.setattr(macos_ble_helper.platform, "system", lambda: "Darwin") + monkeypatch.setattr(macos_ble_helper.subprocess, "run", fake_run) + + serial = await macos_ble_helper.provision_wifi( + "AA:BB:CC:DD:EE:FF", + "lab-wifi", + password, + "US", + timeout=7.0, + connect_retries=2, + helper_app_path=app_path, + on_progress=progress.append, + ) + + assert serial == "SER123" + assert captured_cmds + assert all(password not in item for cmd in captured_cmds for item in cmd) + assert progress == ["unsafe progress contains [REDACTED]"] + + +def test_helper_progress_writer_redacts_password(tmp_path: Path) -> None: + progress_path = tmp_path / "progress.jsonl" + password = "secret-wifi-password" + + macos_ble_helper._append_progress( + progress_path, + f"set password {password}", + secrets=(password,), + ) + + text = progress_path.read_text(encoding="utf-8") + assert password not in text + assert "[REDACTED]" in text + assert _mode(progress_path) == 0o600 diff --git a/dimos/robot/unitree/go2/cli/test_setup.py b/dimos/robot/unitree/go2/cli/test_setup.py new file mode 100644 index 0000000000..09fe37d3b8 --- /dev/null +++ b/dimos/robot/unitree/go2/cli/test_setup.py @@ -0,0 +1,290 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from dataclasses import dataclass + +from dimos.robot.unitree.go2.cli.setup import setup_go2_wifi +from dimos.robot.unitree.go2.cli.verify import Go2VerifyStatus + + +@dataclass(frozen=True) +class _BleRobot: + name: str + address: str + serial: str | None = None + + +@dataclass(frozen=True) +class _LanRobot: + serial: str + ip: str + mac: str | None = None + iface: str | None = None + + +def _ok_verify(ip: str) -> Go2VerifyStatus: + return Go2VerifyStatus( + robot_ip=ip, + url=f"http://{ip}:9991/con_notify", + ok=True, + status_code=200, + reason="OK", + error=None, + elapsed_s=0.01, + timeout_s=1.0, + ) + + +def _failed_verify(ip: str) -> Go2VerifyStatus: + return Go2VerifyStatus( + robot_ip=ip, + url=f"http://{ip}:9991/con_notify", + ok=False, + status_code=503, + reason="Unavailable", + error=None, + elapsed_s=0.01, + timeout_s=1.0, + ) + + +async def test_setup_success_orchestrates_provision_discovery_verify() -> None: + calls: list[str] = [] + + async def discover_ble() -> list[_BleRobot]: + calls.append("discover_ble") + return [_BleRobot(name="Go2_123", address="AA:BB:CC", serial="SN123")] + + async def provision_wifi( + address: str, + ssid: str, + password: str, + country_code: str, + ) -> str | None: + calls.append(f"provision:{address}:{ssid}:{country_code}") + assert password == "secret-password" + return "SN123" + + async def discover_lan() -> list[_LanRobot]: + calls.append("discover_lan") + return [_LanRobot(serial="SN123", ip="192.168.1.70", mac="00:11", iface="en0")] + + def verify(ip: str) -> Go2VerifyStatus: + calls.append(f"verify:{ip}") + return _ok_verify(ip) + + result = await setup_go2_wifi( + ssid="LabWiFi", + password="secret-password", + discover_ble=discover_ble, + provision_wifi=provision_wifi, + discover_lan=discover_lan, + serial="SN123", + verify_robot=verify, + rediscovery_attempts=1, + rediscovery_delay_s=0, + ) + + assert result.ok + assert result.robot_ip == "192.168.1.70" + assert result.next_steps == ( + "export ROBOT_IP=192.168.1.70", + "dimos --robot-ip 192.168.1.70 run unitree-go2", + ) + assert calls == [ + "discover_ble", + "provision:AA:BB:CC:LabWiFi:US", + "discover_lan", + "verify:192.168.1.70", + ] + + +async def test_setup_returns_verification_failure() -> None: + async def discover_ble() -> list[_BleRobot]: + return [_BleRobot(name="Go2_123", address="AA:BB:CC", serial="SN123")] + + async def provision_wifi( + address: str, + ssid: str, + password: str, + country_code: str, + ) -> str | None: + return "SN123" + + async def discover_lan() -> list[_LanRobot]: + return [_LanRobot(serial="SN123", ip="192.168.1.70")] + + result = await setup_go2_wifi( + ssid="LabWiFi", + password="secret-password", + discover_ble=discover_ble, + provision_wifi=provision_wifi, + discover_lan=discover_lan, + verify_robot=_failed_verify, + rediscovery_attempts=1, + rediscovery_delay_s=0, + ) + + assert not result.ok + assert result.robot_ip == "192.168.1.70" + assert result.verification is not None + assert not result.verification.ok + assert result.next_steps == () + assert result.error is not None + assert "HTTP 503" in result.error + + +async def test_setup_no_robot_found_does_not_provision() -> None: + provision_called = False + + async def discover_ble() -> list[_BleRobot]: + return [] + + async def provision_wifi( + address: str, + ssid: str, + password: str, + country_code: str, + ) -> str | None: + nonlocal provision_called + provision_called = True + return None + + async def discover_lan() -> list[_LanRobot]: + return [] + + result = await setup_go2_wifi( + ssid="LabWiFi", + password="secret-password", + discover_ble=discover_ble, + provision_wifi=provision_wifi, + discover_lan=discover_lan, + rediscovery_attempts=1, + rediscovery_delay_s=0, + ) + + assert not result.ok + assert not provision_called + assert result.error == "No BLE robot matched available robot." + + +async def test_setup_ambiguous_robot_selection_does_not_provision() -> None: + provision_called = False + + async def discover_ble() -> list[_BleRobot]: + return [ + _BleRobot(name="Go2_123", address="AA:BB:01", serial="SN1"), + _BleRobot(name="Go2_456", address="AA:BB:02", serial="SN2"), + ] + + async def provision_wifi( + address: str, + ssid: str, + password: str, + country_code: str, + ) -> str | None: + nonlocal provision_called + provision_called = True + return None + + async def discover_lan() -> list[_LanRobot]: + return [] + + result = await setup_go2_wifi( + ssid="LabWiFi", + password="secret-password", + discover_ble=discover_ble, + provision_wifi=provision_wifi, + discover_lan=discover_lan, + rediscovery_attempts=1, + rediscovery_delay_s=0, + ) + + assert not result.ok + assert not provision_called + assert result.error is not None + assert "Ambiguous BLE robot selection" in result.error + + +async def test_setup_rejects_conflicting_address_and_serial() -> None: + provision_called = False + + async def discover_ble() -> list[_BleRobot]: + return [_BleRobot(name="Go2_123", address="AA:BB:CC", serial="SN123")] + + async def provision_wifi( + address: str, + ssid: str, + password: str, + country_code: str, + ) -> str | None: + nonlocal provision_called + provision_called = True + return None + + async def discover_lan() -> list[_LanRobot]: + return [] + + result = await setup_go2_wifi( + ssid="LabWiFi", + password="secret-password", + discover_ble=discover_ble, + provision_wifi=provision_wifi, + discover_lan=discover_lan, + address="AA:BB:CC", + serial="OTHER-SERIAL", + rediscovery_attempts=1, + rediscovery_delay_s=0, + ) + + assert not result.ok + assert not provision_called + assert result.error is not None + assert "No BLE robot matched" in result.error + + +async def test_setup_redacts_password_from_failure_result_and_summary() -> None: + secret = "super-secret-password" + + async def discover_ble() -> list[_BleRobot]: + return [_BleRobot(name="Go2_123", address="AA:BB:CC", serial="SN123")] + + async def provision_wifi( + address: str, + ssid: str, + password: str, + country_code: str, + ) -> str | None: + raise RuntimeError(f"provision failed with password {password}") + + async def discover_lan() -> list[_LanRobot]: + return [] + + result = await setup_go2_wifi( + ssid="LabWiFi", + password=secret, + discover_ble=discover_ble, + provision_wifi=provision_wifi, + discover_lan=discover_lan, + rediscovery_attempts=1, + rediscovery_delay_s=0, + ) + + assert not result.ok + assert secret not in repr(result) + assert secret not in "\n".join(result.summary_lines()) + assert result.error is not None + assert "[REDACTED]" in result.error diff --git a/dimos/robot/unitree/go2/cli/test_verify.py b/dimos/robot/unitree/go2/cli/test_verify.py new file mode 100644 index 0000000000..fe9f254e45 --- /dev/null +++ b/dimos/robot/unitree/go2/cli/test_verify.py @@ -0,0 +1,111 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import base64 +from dataclasses import dataclass +import json + +import requests + +from dimos.robot.unitree.go2.cli.verify import con_notify_url, verify_robot_ip + + +@dataclass +class _Response: + status_code: int + reason: str = "OK" + text: str = "" + + +def _con_notify_text() -> str: + payload = json.dumps({"data1": "0123456789public-key-payload0123456789", "data2": 1}) + return base64.b64encode(payload.encode("utf-8")).decode("ascii") + + +def test_verify_robot_ip_probes_con_notify_without_movement() -> None: + calls: list[tuple[str, float]] = [] + + def fake_post(url: str, *, timeout: float, allow_redirects: bool) -> _Response: + calls.append((url, timeout)) + assert allow_redirects is False + return _Response(status_code=200, text=_con_notify_text()) + + result = verify_robot_ip("192.168.1.50", timeout_s=0.25, http_post=fake_post) + + assert result.ok + assert result.url == "http://192.168.1.50:9991/con_notify" + assert calls == [("http://192.168.1.50:9991/con_notify", 0.25)] + + +def test_verify_robot_ip_returns_structured_failure_on_timeout() -> None: + def fake_post(url: str, *, timeout: float, allow_redirects: bool) -> _Response: + assert allow_redirects is False + raise requests.Timeout(f"timeout for {url} after {timeout}") + + result = verify_robot_ip("192.168.1.51", timeout_s=0.1, http_post=fake_post) + + assert not result.ok + assert result.status_code is None + assert result.error is not None + assert "timeout" in result.error + assert result.to_dict()["url"] == "http://192.168.1.51:9991/con_notify" + + +def test_verify_robot_ip_returns_http_status_failure() -> None: + result = verify_robot_ip( + "192.168.1.52", + http_post=lambda _url, *, timeout, allow_redirects: _Response( + status_code=500, + reason="Server Error", + ), + ) + + assert not result.ok + assert result.status_code == 500 + assert "HTTP 500 Server Error" in result.summary_lines()[0] + + +def test_verify_robot_ip_rejects_redirects() -> None: + result = verify_robot_ip( + "192.168.1.54", + http_post=lambda _url, *, timeout, allow_redirects: _Response( + status_code=302, + reason="Found", + ), + ) + + assert not result.ok + assert result.status_code == 302 + assert "HTTP 302 Found" in result.summary_lines()[0] + + +def test_verify_robot_ip_rejects_unexpected_success_payload() -> None: + result = verify_robot_ip( + "192.168.1.55", + http_post=lambda _url, *, timeout, allow_redirects: _Response( + status_code=200, + reason="OK", + text="not the con_notify payload", + ), + ) + + assert not result.ok + assert result.status_code == 200 + assert result.error == "unexpected con_notify response payload" + + +def test_con_notify_url_uses_go2_webrtc_port_and_path() -> None: + assert con_notify_url(" 192.168.1.53 ") == "http://192.168.1.53:9991/con_notify" diff --git a/dimos/robot/unitree/go2/cli/verify.py b/dimos/robot/unitree/go2/cli/verify.py new file mode 100644 index 0000000000..413d31ec9c --- /dev/null +++ b/dimos/robot/unitree/go2/cli/verify.py @@ -0,0 +1,153 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""No-movement Go2 LAN verification helpers.""" + +from __future__ import annotations + +import base64 +import binascii +from dataclasses import dataclass +import json +import time +from typing import Any, Protocol + +import requests + +CON_NOTIFY_PORT = 9991 +CON_NOTIFY_PATH = "/con_notify" +DEFAULT_VERIFY_TIMEOUT_S = 1.0 + + +class HttpResponse(Protocol): + status_code: int + reason: str | None + text: str + + +class HttpPost(Protocol): + def __call__( + self, + url: str, + *, + timeout: float, + allow_redirects: bool, + ) -> HttpResponse: ... + + +@dataclass(frozen=True) +class Go2VerifyStatus: + """Structured result for probing a Go2 WebRTC signaling endpoint.""" + + robot_ip: str + url: str + ok: bool + status_code: int | None + reason: str | None + error: str | None + elapsed_s: float + timeout_s: float + + def to_dict(self) -> dict[str, Any]: + return { + "robot_ip": self.robot_ip, + "url": self.url, + "ok": self.ok, + "status_code": self.status_code, + "reason": self.reason, + "error": self.error, + "elapsed_s": self.elapsed_s, + "timeout_s": self.timeout_s, + } + + def summary_lines(self) -> tuple[str, ...]: + if self.ok: + return (f"Go2 WebRTC endpoint reachable at {self.url} (HTTP {self.status_code}).",) + if self.status_code is not None: + if self.error: + return ( + f"Go2 WebRTC endpoint responded at {self.url} " + f"with HTTP {self.status_code} {self.reason or ''}: {self.error}".rstrip() + + ".", + ) + return ( + f"Go2 WebRTC endpoint responded at {self.url} " + f"with HTTP {self.status_code} {self.reason or ''}".rstrip() + + ".", + ) + return (f"Go2 WebRTC endpoint unreachable at {self.url}: {self.error}",) + + +def con_notify_url(robot_ip: str) -> str: + """Build the Go2 WebRTC signaling health probe URL.""" + return f"http://{robot_ip.strip()}:{CON_NOTIFY_PORT}{CON_NOTIFY_PATH}" + + +def _con_notify_payload_error(text: str) -> str | None: + try: + decoded = base64.b64decode(text, validate=True).decode("utf-8") + payload = json.loads(decoded) + except (binascii.Error, UnicodeDecodeError, json.JSONDecodeError): + return "unexpected con_notify response payload" + + if not isinstance(payload, dict): + return "unexpected con_notify response payload" + data1 = payload.get("data1") + if not isinstance(data1, str) or not data1: + return "unexpected con_notify response payload" + return None + + +def verify_robot_ip( + robot_ip: str, + *, + timeout_s: float = DEFAULT_VERIFY_TIMEOUT_S, + http_post: HttpPost | None = None, +) -> Go2VerifyStatus: + """Verify that a Go2 is reachable on LAN without sending movement commands. + + This only performs an HTTP POST probe against the robot's WebRTC signaling + endpoint. It does not open a robot control session or publish commands. + """ + url = con_notify_url(robot_ip) + post = http_post or requests.post + started = time.monotonic() + try: + response = post(url, timeout=timeout_s, allow_redirects=False) + except requests.RequestException as e: + return Go2VerifyStatus( + robot_ip=robot_ip, + url=url, + ok=False, + status_code=None, + reason=None, + error=str(e), + elapsed_s=time.monotonic() - started, + timeout_s=timeout_s, + ) + + status_code = response.status_code + error = None + if 200 <= status_code < 300: + error = _con_notify_payload_error(response.text) + return Go2VerifyStatus( + robot_ip=robot_ip, + url=url, + ok=200 <= status_code < 300 and error is None, + status_code=status_code, + reason=response.reason or None, + error=error, + elapsed_s=time.monotonic() - started, + timeout_s=timeout_s, + ) diff --git a/docs/coding-agents/go2-wifi-provisioning.md b/docs/coding-agents/go2-wifi-provisioning.md new file mode 100644 index 0000000000..61a93750ae --- /dev/null +++ b/docs/coding-agents/go2-wifi-provisioning.md @@ -0,0 +1,90 @@ +# Go2 wifi provisioning + +Use this runbook when a Go2 needs to move from its AP/BLE setup state onto a +site wifi network, or when `dimos go2tool connect-wifi` behaves differently on +macOS than it does on Linux. + +## macOS Bluetooth Authorization + +`go2tool` uses Bleak over Bluetooth. On macOS, direct Python execution from some +terminal environments can fail before Bluetooth authorization is usable. The +default `auto` BLE backend uses a LaunchServices-opened helper `.app` for finite +scans and wifi provisioning so macOS TCC can authorize Bluetooth access against +the app bundle and its `Info.plist` usage keys. + +If direct BLE fails only on macOS, treat Bluetooth/TCC authorization as a likely +cause before assuming a robot firmware issue. + +## Normal Flow + +Start with discovery. This shows robots seen over BLE and LAN: + +```sh skip +dimos go2tool discover +``` + +If the robot is still in AP/BLE setup mode, run setup to provision wifi, +rediscover the robot on LAN, and print the IP to use with DimOS. For +interactive use, omit `--password`; the CLI will prompt with hidden input. +For automation, `--password ` is supported. + +Example: + +```sh skip +dimos go2tool setup --ssid +``` + +If more than one robot may be visible, use one selector from the discovery +output so setup does not choose the wrong robot: + +```sh skip +dimos go2tool setup --ssid --serial +dimos go2tool setup --ssid --name +dimos go2tool setup --ssid --mac +``` + +Use `connect-wifi` instead of `setup` only when you want the lower-level BLE +provisioning step without LAN rediscovery and verification. + +## macOS Helper Requirements + +On macOS, `--ble-backend auto` builds and validates a cached helper `.app` when +needed. To use a custom helper bundle, pass `--ble-helper ` or set +`DIMOS_GO2_BLE_HELPER`. The helper bundle must include these Bluetooth usage +keys in `Info.plist`: + +- `NSBluetoothAlwaysUsageDescription` +- `NSBluetoothPeripheralUsageDescription` +- `NSBluetoothUsageDescription` + +The helper passes the wifi password through a mode-`0600` temporary file, not +argv. Prefer the CLI's hidden password prompt for interactive use; use +`--password` when a script or test needs a fully non-interactive command. + +## Post-Provision Checks + +Give the robot time to join the network, then rediscover: + +```sh skip +dimos go2tool discover +``` + +Look for a LAN row with the robot IP. Probe the Go2 WebRTC notification endpoint: + +```sh skip +curl -fsS -X POST http://:9991/con_notify +``` + +Then run DimOS with the discovered IP: + +```sh skip +dimos --robot-ip run unitree-go2 +``` + +For agentic stacks, use the same `--robot-ip` flag with the target blueprint. + +## Coding-Agent Safety + +Verification is discovery, endpoint probing, and process startup only. Do not +send movement commands, teleop input, MCP skill calls, or `agent-send` messages +during wifi provisioning verification. diff --git a/docs/coding-agents/index.md b/docs/coding-agents/index.md index ff778ac5cf..2ddb6ba453 100644 --- a/docs/coding-agents/index.md +++ b/docs/coding-agents/index.md @@ -3,6 +3,7 @@ ├── worktrees.md (creating provisioned worktrees with `bin/worktree`) ├── style.md (code style guidelines for dimos) ├── testing.md (docs about writing tests) +├── go2-wifi-provisioning.md (Go2 AP-to-wifi setup notes for coding agents) ├── docs (these are docs about writing docs) │   ├── codeblocks.md │   ├── doclinks.md diff --git a/docs/platforms/quadruped/go2/index.md b/docs/platforms/quadruped/go2/index.md index 4e392f06ca..e4a00975ab 100644 --- a/docs/platforms/quadruped/go2/index.md +++ b/docs/platforms/quadruped/go2/index.md @@ -39,6 +39,11 @@ Opens the command center at [localhost:7779](http://localhost:7779) with Rerun 3 Use `dimos go2tool` to provision wifi and find the robot's IP. Skip if the robot is already on your network and you know its IP. +macOS caveat: BLE provisioning can fail when direct Python execution cannot get +usable Bluetooth authorization from the terminal. The default `auto` BLE backend +uses a LaunchServices-opened helper for finite scans and provisioning on macOS. +See [Go2 wifi provisioning](/docs/coding-agents/go2-wifi-provisioning.md). + 1. Power on the Go2 — it advertises over BLE immediately. 2. Provision wifi (one-time per network): @@ -52,10 +57,20 @@ dimos go2tool discover configure wifi ```bash -dimos go2tool connect-wifi --ssid --password +dimos go2tool setup --ssid ``` -Scans BLE and connects to the only robot it finds, or prompts you to pick if there are several. +For interactive use, omit `--password`; the password prompt uses hidden input. +For automation, `--password ` is supported. `setup` configures wifi, +finds the robot on LAN, and prints the IP to use with DimOS. + +If several Go2s may be nearby, pass one selector from `discover` output: + +```bash +dimos go2tool setup --ssid --serial +dimos go2tool setup --ssid --name +dimos go2tool setup --ssid --mac +``` 3. Find the robot's IP: @@ -63,7 +78,9 @@ Scans BLE and connects to the only robot it finds, or prompts you to pick if the dimos go2tool discover ``` -Prints `SOURCE NAME IP MAC SERIAL` for every robot it sees over BLE and LAN. Export the IP: +Prints `SOURCE NAME IP MAC SERIAL` for every robot it sees over BLE and LAN. +If you used `setup`, this IP is also printed as `export ROBOT_IP=`. Export +the IP: ```bash export ROBOT_IP= diff --git a/docs/usage/cli.md b/docs/usage/cli.md index 7abd1fa7bb..aee9fdd38a 100644 --- a/docs/usage/cli.md +++ b/docs/usage/cli.md @@ -193,6 +193,52 @@ Print resolved GlobalConfig values and their sources. dimos show-config ``` +### `dimos go2tool` + +Provision and verify Unitree Go2 network setup. + +```bash +dimos go2tool discover +dimos go2tool connect-wifi --ssid +dimos go2tool setup --ssid +dimos go2tool setup --ssid --serial +dimos go2tool verify --robot-ip +``` + +| Command | Description | +|---------|-------------| +| `discover` | Show Go2 robots discovered over BLE and LAN. | +| `connect-wifi` | Send wifi credentials to a Go2 over BLE. Omit `--password` to use the hidden prompt. | +| `setup` | Provision wifi, rediscover the robot on LAN, and verify `:9991/con_notify`. | +| `verify` | POST to `http://:9991/con_notify` without sending movement commands. | + +`setup` options: + +| Option | Description | +|--------|-------------| +| `--ssid ` | Target wifi SSID. Required. | +| `--password ` | Target wifi password. If omitted, prompts with hidden input. | +| `--country ` | Two-letter country code. Default: `US`. | +| `--serial ` | Select a specific robot by serial. | +| `--name ` | Select a specific robot by BLE name, for example `Go2_49073`. | +| `--mac ` | Select/provision a specific BLE address and skip BLE scan. | +| `--timeout ` | BLE scan/connect timeout. Default: `5.0`. | +| `--retries ` | Provisioning attempts. Default: `3`. | +| `--lan-timeout ` | Timeout for each LAN discovery attempt. Default: `2.0`. | +| `--rediscovery-attempts ` | LAN rediscovery attempts after provisioning. Default: `5`. | +| `--rediscovery-delay ` | Delay between LAN rediscovery attempts. Default: `2.0`. | +| `--verify-timeout ` | HTTP verification timeout. Default: `1.0`. | +| `--ble-backend {auto,helper,direct}` | BLE backend. Default: `auto`. | +| `--ble-helper ` | macOS BLE helper `.app` path. Can also use `DIMOS_GO2_BLE_HELPER`. | + +When several Go2s may be visible, pass exactly one selector to `connect-wifi` +or `setup`: `--serial `, `--name `, or `--mac `. +Use values from `dimos go2tool discover`. + +BLE commands support `--ble-backend {auto,helper,direct}` and `--ble-helper`. +On macOS, `auto` uses the LaunchServices helper for finite BLE scans and +provisioning; set `DIMOS_GO2_BLE_HELPER` to point at a custom helper `.app`. + --- ## Agent & MCP Commands