From df0ff9cb94777ea84290f8dd4326bc4eec4e6e09 Mon Sep 17 00:00:00 2001 From: Nick Date: Mon, 8 Jun 2026 17:32:41 +0800 Subject: [PATCH 1/4] add go2 wifi setup workflow --- dimos/robot/unitree/go2/cli/go2tool.py | 427 +++++++++++++- .../robot/unitree/go2/cli/macos_ble_helper.py | 552 ++++++++++++++++++ dimos/robot/unitree/go2/cli/setup.py | 405 +++++++++++++ dimos/robot/unitree/go2/cli/test_go2tool.py | 514 ++++++++++++++++ .../unitree/go2/cli/test_macos_ble_helper.py | 300 ++++++++++ dimos/robot/unitree/go2/cli/test_setup.py | 290 +++++++++ dimos/robot/unitree/go2/cli/test_verify.py | 69 +++ dimos/robot/unitree/go2/cli/verify.py | 114 ++++ docs/coding-agents/go2-wifi-provisioning.md | 84 +++ docs/coding-agents/index.md | 13 +- docs/platforms/quadruped/go2/index.md | 37 +- docs/usage/cli.md | 22 + 12 files changed, 2783 insertions(+), 44 deletions(-) create mode 100644 dimos/robot/unitree/go2/cli/macos_ble_helper.py create mode 100644 dimos/robot/unitree/go2/cli/setup.py create mode 100644 dimos/robot/unitree/go2/cli/test_go2tool.py create mode 100644 dimos/robot/unitree/go2/cli/test_macos_ble_helper.py create mode 100644 dimos/robot/unitree/go2/cli/test_setup.py create mode 100644 dimos/robot/unitree/go2/cli/test_verify.py create mode 100644 dimos/robot/unitree/go2/cli/verify.py create mode 100644 docs/coding-agents/go2-wifi-provisioning.md diff --git a/dimos/robot/unitree/go2/cli/go2tool.py b/dimos/robot/unitree/go2/cli/go2tool.py index 55d24e737c..85aa331102 100644 --- a/dimos/robot/unitree/go2/cli/go2tool.py +++ b/dimos/robot/unitree/go2/cli/go2tool.py @@ -17,9 +17,17 @@ from __future__ import annotations import asyncio +from collections.abc import AsyncIterator, Awaitable, Callable, Sequence +from dataclasses import dataclass +from enum import Enum +from pathlib import Path +import platform +from typing import Literal, NoReturn, Protocol, cast import typer +from dimos.robot.unitree.go2.cli.ble import Go2Device, retry + app = typer.Typer( help="Go2 setup utilities (BLE wifi provisioning, network discovery)", no_args_is_help=True, @@ -27,12 +35,187 @@ _HEADER = f"{'SOURCE':<6} {'NAME':<14} {'IP':<15} {'MAC':<19} SERIAL" +_HELPER_TIMEOUT_ERROR = ( + "BLE helper backend cannot run with --timeout 0. Use --timeout > 0 or --ble-backend direct." +) +_DEFAULT_BLE_CONNECT_RETRIES = 3 + + +class BleBackend(str, Enum): + auto = "auto" + helper = "helper" + direct = "direct" + + +class _BleFinder(Protocol): + def __call__( + self, + *, + timeout: float, + on_device: Callable[[Go2Device], None] | None = None, + ) -> Awaitable[list[Go2Device]]: ... + + +class _BleProvisioner(Protocol): + def __call__( + self, + address: str, + ssid: str, + password: str, + country_code: str, + *, + timeout: float, + connect_retries: int, + on_progress: Callable[[str], None] | None = None, + ) -> Awaitable[str | None]: ... + + +@dataclass(frozen=True) +class _BleBackendRuntime: + kind: Literal["helper", "direct"] + find_robots: _BleFinder + provision_wifi: _BleProvisioner + discover_ble: Callable[[], AsyncIterator[Go2Device]] | None = None + + +class _Go2ToolError(RuntimeError): + """User-facing CLI configuration error.""" def _format_row(source: str, name: str, ip: str, mac: str, serial: str) -> str: return f"{source:<6} {name:<14} {ip:<15} {mac:<19} {serial}" +def _selected_backend_kind(backend: BleBackend) -> Literal["helper", "direct"]: + if backend == BleBackend.auto: + return "helper" if platform.system() == "Darwin" else "direct" + if backend == BleBackend.helper: + if platform.system() != "Darwin": + raise _Go2ToolError( + "BLE helper backend is only supported on macOS. " + "Use --ble-backend direct on this platform." + ) + return "helper" + return "direct" + + +def _select_ble_backend( + backend: BleBackend, + helper_app_path: Path | None, +) -> _BleBackendRuntime: + kind = _selected_backend_kind(backend) + + if kind == "helper": + from dimos.robot.unitree.go2.cli import macos_ble_helper + + async def helper_find_robots( + *, + timeout: float, + on_device: Callable[[Go2Device], None] | None = None, + ) -> list[Go2Device]: + return await macos_ble_helper.find_robots( + timeout=timeout, + helper_app_path=helper_app_path, + on_device=on_device, + ) + + async def helper_provision_wifi( + address: str, + ssid: str, + password: str, + country_code: str, + *, + timeout: float, + connect_retries: int, + on_progress: Callable[[str], None] | None = None, + ) -> str | None: + return await macos_ble_helper.provision_wifi( + address, + ssid, + password, + country_code, + timeout=timeout, + connect_retries=connect_retries, + helper_app_path=helper_app_path, + on_progress=on_progress, + ) + + return _BleBackendRuntime( + kind="helper", + find_robots=helper_find_robots, + provision_wifi=helper_provision_wifi, + ) + + from dimos.robot.unitree.go2.cli import ble as direct_ble + + async def direct_find_robots( + *, + timeout: float, + on_device: Callable[[Go2Device], None] | None = None, + ) -> list[Go2Device]: + return await direct_ble.find_robots(timeout=timeout, on_device=on_device) + + async def direct_provision_wifi( + address: str, + ssid: str, + password: str, + country_code: str, + *, + timeout: float, + connect_retries: int, + on_progress: Callable[[str], None] | None = None, + ) -> str | None: + return await direct_ble.provision_wifi( + address, + ssid, + password, + country_code, + timeout=timeout, + connect_retries=connect_retries, + on_progress=on_progress, + ) + + async def discover_ble() -> AsyncIterator[Go2Device]: + async for device in direct_ble.discover_ble(): + yield device + + return _BleBackendRuntime( + kind="direct", + find_robots=direct_find_robots, + provision_wifi=direct_provision_wifi, + discover_ble=discover_ble, + ) + + +def _exit_error(message: str) -> NoReturn: + typer.echo(message, err=True) + raise typer.Exit(1) + + +def _redact(text: str, secrets: Sequence[str]) -> str: + safe = text + for secret in secrets: + if secret: + safe = safe.replace(secret, "[REDACTED]") + return safe + + +def _echo_safe(message: str, secrets: Sequence[str], *, err: bool = False) -> None: + typer.echo(_redact(message, secrets), err=err) + + +def _require_helper_timeout(backend: _BleBackendRuntime, timeout: float) -> None: + if backend.kind == "helper" and timeout <= 0: + _exit_error(_HELPER_TIMEOUT_ERROR) + + +def _discover_backend_for_timeout(backend: BleBackend, timeout: float) -> BleBackend: + # The LaunchServices helper is one-shot; unbounded discovery needs direct BLE streaming. + if backend == BleBackend.auto and timeout <= 0 and platform.system() == "Darwin": + return BleBackend.direct + return backend + + @app.command("discover") def discover( ble: bool = typer.Option(False, "--ble", help="BLE only (default: BLE + LAN)"), @@ -41,29 +224,59 @@ def discover( timeout: float = typer.Option( 7.0, "--timeout", "-t", help="Stop after this many seconds (0 = run forever)" ), + ble_backend: BleBackend = typer.Option( + BleBackend.auto, + "--ble-backend", + help="BLE backend to use. On macOS, auto uses direct BLE when --timeout 0.", + ), + ble_helper: Path | None = typer.Option( + None, + "--ble-helper", + envvar="DIMOS_GO2_BLE_HELPER", + help="macOS BLE helper .app path.", + ), ) -> None: """Stream Go2 robot discoveries from BLE and/or LAN.""" do_ble = ble or not lan do_lan = lan or not ble + backend_runtime: _BleBackendRuntime | None = None + if do_ble: + try: + backend_runtime = _select_ble_backend( + _discover_backend_for_timeout(ble_backend, timeout), ble_helper + ) + except _Go2ToolError as e: + _exit_error(str(e)) + _require_helper_timeout(backend_runtime, timeout) typer.echo(_HEADER) import signal async def run() -> None: - from dimos.robot.unitree.go2.cli.ble import discover_ble from dimos.robot.unitree.go2.cli.landiscovery import discover_lan seen_ble: set[tuple[str, str | None]] = set() seen_lan: set[str] = set() + def _emit_ble_device(d: Go2Device) -> None: + key = (d.address, d.serial) + if key in seen_ble: + return + seen_ble.add(key) + typer.echo(_format_row("BLE", d.name, "-", d.address, d.serial or "?")) + async def _consume_ble() -> None: - async for d in discover_ble(): - key = (d.address, d.serial) - if key in seen_ble: - continue - seen_ble.add(key) - typer.echo(_format_row("BLE", d.name, "-", d.address, d.serial or "?")) + assert backend_runtime is not None + if backend_runtime.kind == "helper": + devices = await backend_runtime.find_robots(timeout=timeout) + for d in devices: + _emit_ble_device(d) + return + + assert backend_runtime.discover_ble is not None + async for d in backend_runtime.discover_ble(): + _emit_ble_device(d) async def _consume_lan() -> None: async for d in discover_lan(tick=lan_tick): @@ -90,7 +303,8 @@ def _stop() -> None: loop.add_signal_handler(sig, _stop) if timeout > 0: - loop.call_later(timeout, _stop) + grace = 0.25 if backend_runtime is not None and backend_runtime.kind == "helper" else 0 + loop.call_later(timeout + grace, _stop) await asyncio.gather(*tasks, return_exceptions=True) @@ -111,21 +325,36 @@ def connect_wifi( None, "--name", help="Robot BLE name (e.g. Go2_49060) — scan and auto-select match" ), timeout: float = typer.Option(5.0, "--timeout", help="Scan / connect timeout in seconds"), - retries: int = typer.Option(3, "--retries", help="Number of provisioning attempts"), + retries: int = typer.Option( + 3, + "--retries", + help="Number of provisioning attempts; BLE connection retries stay at 3", + ), + ble_backend: BleBackend = typer.Option( + BleBackend.auto, + "--ble-backend", + help="BLE backend to use.", + ), + ble_helper: Path | None = typer.Option( + None, + "--ble-helper", + envvar="DIMOS_GO2_BLE_HELPER", + help="macOS BLE helper .app path.", + ), ) -> None: """Provision a Go2 with Wi-Fi credentials over Bluetooth. Fully non-interactive when (--mac | --serial | --name) and --ssid/--password are all provided. """ - from dimos.robot.unitree.go2.cli.ble import ( - Go2Device, - find_robots, - provision_wifi, - retry, - ) + try: + backend_runtime = _select_ble_backend(ble_backend, ble_helper) + except _Go2ToolError as e: + _exit_error(str(e)) + _require_helper_timeout(backend_runtime, timeout) async def run() -> None: + target: str if mac is not None: target = mac else: @@ -134,7 +363,7 @@ async def run() -> None: def _on_device(d: Go2Device) -> None: typer.echo(_format_row("BLE", d.name, "-", d.address, d.serial or "?")) - devices = await find_robots(timeout=timeout, on_device=_on_device) + devices = await backend_runtime.find_robots(timeout=timeout, on_device=_on_device) if not devices: typer.echo("No Unitree robots detected.", err=True) raise typer.Exit(1) @@ -177,15 +406,17 @@ def _on_device(d: Go2Device) -> None: ) def _on_error(attempt: int, exc: BaseException) -> None: - typer.echo(f" attempt {attempt} failed: {exc}", err=True) + _echo_safe(f" attempt {attempt} failed: {exc}", (wifi_password,), err=True) device_serial = await retry( - lambda: provision_wifi( - target, # type: ignore[arg-type] + lambda: backend_runtime.provision_wifi( + target, wifi_ssid, wifi_password, country, - on_progress=lambda m: typer.echo(f" {m}"), + timeout=timeout, + connect_retries=_DEFAULT_BLE_CONNECT_RETRIES, + on_progress=lambda m: _echo_safe(f" {m}", (wifi_password,)), ), attempts=retries, on_error=_on_error, @@ -199,6 +430,162 @@ def _on_error(attempt: int, exc: BaseException) -> None: asyncio.run(run()) +@app.command("verify") +def verify( + robot_ip: str = typer.Option(..., "--robot-ip", help="Robot IP address to verify"), + timeout: float = typer.Option(1.0, "--timeout", help="HTTP probe timeout in seconds"), +) -> None: + """Verify a Go2 LAN IP without sending movement commands.""" + from dimos.robot.unitree.go2.cli.verify import verify_robot_ip + + status = verify_robot_ip(robot_ip, timeout_s=timeout) + for line in status.summary_lines(): + typer.echo(line) + raise typer.Exit(0 if status.ok else 1) + + +@app.command("setup") +def setup( + ssid: str = typer.Option(..., "--ssid", help="Wi-Fi SSID"), + password: str | None = typer.Option(None, "--password", help="Wi-Fi password"), + country: str = typer.Option("US", "--country", help="Two-letter country code"), + serial: str | None = typer.Option(None, "--serial", help="Robot serial to select"), + name: str | None = typer.Option(None, "--name", help="Robot BLE name to select"), + mac: str | None = typer.Option(None, "--mac", help="BLE MAC/address to provision"), + timeout: float = typer.Option(5.0, "--timeout", help="BLE scan / connect timeout in seconds"), + retries: int = typer.Option( + 3, + "--retries", + help="Number of provisioning attempts; BLE connection retries stay at 3", + ), + lan_timeout: float = typer.Option(2.0, "--lan-timeout", help="LAN discovery timeout"), + rediscovery_attempts: int = typer.Option( + 5, + "--rediscovery-attempts", + help="LAN rediscovery attempts after provisioning", + ), + rediscovery_delay: float = typer.Option( + 2.0, + "--rediscovery-delay", + help="Seconds between LAN rediscovery attempts", + ), + verify_timeout: float = typer.Option( + 1.0, + "--verify-timeout", + help="HTTP verification timeout in seconds", + ), + ble_backend: BleBackend = typer.Option( + BleBackend.auto, + "--ble-backend", + help="BLE backend to use.", + ), + ble_helper: Path | None = typer.Option( + None, + "--ble-helper", + envvar="DIMOS_GO2_BLE_HELPER", + help="macOS BLE helper .app path.", + ), +) -> None: + """Provision Wi-Fi, rediscover the robot on LAN, and verify its IP.""" + from dimos.robot.unitree.go2.cli.landiscovery import discover as discover_lan_once + from dimos.robot.unitree.go2.cli.setup import LanRobot, setup_go2_wifi + from dimos.robot.unitree.go2.cli.verify import verify_robot_ip + + try: + backend_runtime = _select_ble_backend(ble_backend, ble_helper) + except _Go2ToolError as e: + _exit_error(str(e)) + _require_helper_timeout(backend_runtime, timeout) + + wifi_password = ( + password + if password is not None + else typer.prompt("Wi-Fi password", hide_input=True, default="", show_default=False) + ) + secrets = (wifi_password,) + + async def run() -> None: + if mac is not None: + + async def discover_ble_for_setup() -> list[Go2Device]: + return [Go2Device(name=name or mac, address=mac, serial=serial)] + + else: + + async def discover_ble_for_setup() -> list[Go2Device]: + typer.echo(f"Scanning BLE for {timeout:.0f}s ...") + return await backend_runtime.find_robots( + timeout=timeout, + on_device=lambda d: typer.echo( + _format_row("BLE", d.name, "-", d.address, d.serial or "?") + ), + ) + + async def provision_wifi_for_setup( + address: str, + provision_ssid: str, + provision_password: str, + country_code: str, + ) -> str | None: + return cast( + "str | None", + await retry( + lambda: backend_runtime.provision_wifi( + address, + provision_ssid, + provision_password, + country_code, + timeout=timeout, + connect_retries=_DEFAULT_BLE_CONNECT_RETRIES, + on_progress=lambda m: _echo_safe(f" {m}", secrets), + ), + attempts=retries, + on_error=lambda attempt, exc: _echo_safe( + f" attempt {attempt} failed: {exc}", secrets, err=True + ), + ), + ) + + async def discover_lan_for_setup() -> Sequence[LanRobot]: + loop = asyncio.get_running_loop() + return await loop.run_in_executor( + None, + lambda: discover_lan_once(timeout=lan_timeout), + ) + + result = await setup_go2_wifi( + ssid=ssid, + password=wifi_password, + discover_ble=discover_ble_for_setup, + provision_wifi=provision_wifi_for_setup, + discover_lan=discover_lan_for_setup, + serial=serial, + name=name, + address=mac, + country_code=country, + verify_robot=lambda ip: verify_robot_ip(ip, timeout_s=verify_timeout), + rediscovery_attempts=rediscovery_attempts, + rediscovery_delay_s=rediscovery_delay, + ) + + robot_ip = result.robot_ip or "-" + selected_serial = "-" + if result.lan_robot is not None: + selected_serial = result.lan_robot.serial + elif result.selected_ble is not None: + selected_serial = result.selected_ble.serial or "-" + typer.echo( + f"result={'ok' if result.ok else 'fail'} robot_ip={robot_ip} serial={selected_serial}" + ) + for line in result.summary_lines(): + _echo_safe(line, secrets) + + if not result.ok: + raise typer.Exit(1) + + asyncio.run(run()) + + def main() -> None: app() diff --git a/dimos/robot/unitree/go2/cli/macos_ble_helper.py b/dimos/robot/unitree/go2/cli/macos_ble_helper.py new file mode 100644 index 0000000000..39b09bf7b4 --- /dev/null +++ b/dimos/robot/unitree/go2/cli/macos_ble_helper.py @@ -0,0 +1,552 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""macOS LaunchServices helper for Unitree BLE provisioning. + +The BLE protocol lives in :mod:`dimos.robot.unitree.go2.cli.ble`. This module +only gives that protocol a macOS app-bundle wrapper so Bluetooth/TCC attributes +are associated with a LaunchServices-launched app rather than the user's shell. +""" + +from __future__ import annotations + +import argparse +import asyncio +from collections.abc import Callable, Sequence +from concurrent.futures import ThreadPoolExecutor +from dataclasses import asdict, dataclass +import json +import os +from pathlib import Path +import platform +import plistlib +import shlex +import stat +import subprocess +import sys +import tempfile +from typing import Any + +from dimos.constants import DIMOS_PROJECT_ROOT, STATE_DIR +from dimos.robot.unitree.go2.cli import ble +from dimos.robot.unitree.go2.cli.ble import Go2Device + +REQUIRED_BLUETOOTH_USAGE_KEYS = ( + "NSBluetoothAlwaysUsageDescription", + "NSBluetoothPeripheralUsageDescription", + "NSBluetoothUsageDescription", +) + +HELPER_APP_ENV = "DIMOS_GO2_BLE_HELPER" +LEGACY_HELPER_APP_ENV = "DIMOS_GO2_BLE_HELPER_APP" +HELPER_APP_NAME = "DimOS BLE Helper.app" +HELPER_BUNDLE_IDENTIFIER = "com.dimensional.dimos.blehelper" +HELPER_EXECUTABLE_NAME = "dimos-ble-helper" +OPEN_COMMAND = "/usr/bin/open" + +JsonObject = dict[str, Any] + + +class MacOSBLEHelperError(RuntimeError): + """Base error for the macOS BLE helper wrapper.""" + + +class MacOSBLEUnsupportedError(MacOSBLEHelperError): + """Raised when the LaunchServices BLE helper is used off macOS.""" + + +class HelperAppValidationError(MacOSBLEHelperError): + """Raised when a helper app bundle is missing required metadata.""" + + +class HelperLaunchError(MacOSBLEHelperError): + """Raised when LaunchServices fails to run the helper app.""" + + +@dataclass(frozen=True) +class HelperAppInfo: + app_path: Path + info_plist_path: Path + executable_path: Path + bundle_identifier: str | None + + +def ensure_macos() -> None: + """Raise a clear error when the LaunchServices backend is unavailable.""" + if platform.system() != "Darwin": + raise MacOSBLEUnsupportedError( + "The macOS LaunchServices BLE helper is only supported on macOS. " + "Use dimos.robot.unitree.go2.cli.ble directly on Linux." + ) + + +def helper_cache_dir() -> Path: + """Return the DimOS state directory used for the generated helper app.""" + return STATE_DIR / "helpers" / "macos-ble" + + +def validate_helper_app(app_path: str | Path) -> HelperAppInfo: + """Validate a BLE helper ``.app`` bundle and its Bluetooth usage metadata.""" + app = Path(app_path).expanduser() + if not app.exists(): + raise HelperAppValidationError(f"helper app does not exist: {app}") + if not app.is_dir() or app.suffix != ".app": + raise HelperAppValidationError(f"helper path is not a .app bundle: {app}") + + info_plist = app / "Contents" / "Info.plist" + if not info_plist.exists(): + raise HelperAppValidationError(f"helper app is missing Info.plist: {info_plist}") + + try: + with info_plist.open("rb") as f: + plist = plistlib.load(f) + except Exception as exc: + raise HelperAppValidationError(f"helper app Info.plist is invalid: {exc}") from exc + + missing = [ + key + for key in REQUIRED_BLUETOOTH_USAGE_KEYS + if not isinstance(plist.get(key), str) or not plist[key].strip() + ] + if missing: + raise HelperAppValidationError( + "helper app Info.plist is missing Bluetooth usage keys: " + ", ".join(missing) + ) + + executable_name = plist.get("CFBundleExecutable") + if not isinstance(executable_name, str) or not executable_name: + raise HelperAppValidationError("helper app Info.plist is missing CFBundleExecutable") + + executable = app / "Contents" / "MacOS" / executable_name + if not executable.exists(): + raise HelperAppValidationError(f"helper executable does not exist: {executable}") + if not os.access(executable, os.X_OK): + raise HelperAppValidationError(f"helper executable is not executable: {executable}") + + bundle_identifier = plist.get("CFBundleIdentifier") + return HelperAppInfo( + app_path=app, + info_plist_path=info_plist, + executable_path=executable, + bundle_identifier=bundle_identifier if isinstance(bundle_identifier, str) else None, + ) + + +def build_cached_helper_app( + *, + cache_dir: str | Path | None = None, + python_executable: str | Path | None = None, +) -> Path: + """Create or refresh the cached LaunchServices BLE helper app.""" + ensure_macos() + + root = Path(cache_dir) if cache_dir is not None else helper_cache_dir() + app_path = root / HELPER_APP_NAME + contents_dir = app_path / "Contents" + macos_dir = contents_dir / "MacOS" + macos_dir.mkdir(parents=True, exist_ok=True) + + python = Path(python_executable) if python_executable is not None else Path(sys.executable) + module = "dimos.robot.unitree.go2.cli.macos_ble_helper" + script = "\n".join( + [ + "#!/bin/sh", + 'if [ -n "${PYTHONPATH:-}" ]; then', + f' export PYTHONPATH={shlex.quote(str(DIMOS_PROJECT_ROOT))}:"$PYTHONPATH"', + "else", + f" export PYTHONPATH={shlex.quote(str(DIMOS_PROJECT_ROOT))}", + "fi", + f'exec {shlex.quote(str(python))} -m {module} --helper-run "$@"', + "", + ] + ) + + executable = macos_dir / HELPER_EXECUTABLE_NAME + executable.write_text(script, encoding="utf-8") + executable.chmod(0o755) + + bluetooth_description = "DimOS uses Bluetooth to discover and provision Unitree robots." + plist: JsonObject = { + "CFBundleDevelopmentRegion": "en", + "CFBundleExecutable": HELPER_EXECUTABLE_NAME, + "CFBundleIdentifier": HELPER_BUNDLE_IDENTIFIER, + "CFBundleInfoDictionaryVersion": "6.0", + "CFBundleName": "DimOS BLE Helper", + "CFBundlePackageType": "APPL", + "CFBundleShortVersionString": "1.0", + "CFBundleVersion": "1", + "DimOSPythonExecutable": str(python), + "LSUIElement": True, + "NSBluetoothAlwaysUsageDescription": bluetooth_description, + "NSBluetoothPeripheralUsageDescription": bluetooth_description, + "NSBluetoothUsageDescription": bluetooth_description, + } + with (contents_dir / "Info.plist").open("wb") as f: + plistlib.dump(plist, f) + + validate_helper_app(app_path) + return app_path + + +def locate_helper_app( + helper_app_path: str | Path | None = None, + *, + create: bool = True, +) -> Path: + """Return a provided, env-configured, or cached BLE helper app path.""" + ensure_macos() + + configured = helper_app_path + if configured is None: + env_value = os.environ.get(HELPER_APP_ENV) or os.environ.get(LEGACY_HELPER_APP_ENV) + configured = env_value if env_value else None + + if configured is not None: + return validate_helper_app(configured).app_path + + cached = helper_cache_dir() / HELPER_APP_NAME + if create: + return build_cached_helper_app() + + return validate_helper_app(cached).app_path + + +async def _invoke_helper_async( + action: str, + params: JsonObject, + *, + password: str | None = None, + helper_app_path: str | Path | None = None, + on_progress: Callable[[str], None] | None = None, + secrets: Sequence[str] = (), +) -> JsonObject: + loop = asyncio.get_running_loop() + with ThreadPoolExecutor(max_workers=1, thread_name_prefix="dimos-go2-ble-helper") as executor: + return await loop.run_in_executor( + executor, + lambda: _invoke_helper( + action, + params, + password=password, + helper_app_path=helper_app_path, + on_progress=on_progress, + secrets=secrets, + ), + ) + + +async def find_robots( + timeout: float = 15.0, + prefixes: tuple[str, ...] = ble.UNITREE_NAME_PREFIXES, + *, + helper_app_path: str | Path | None = None, + on_device: Callable[[Go2Device], None] | None = None, + on_progress: Callable[[str], None] | None = None, +) -> list[Go2Device]: + """Find Unitree robots through the macOS LaunchServices BLE helper.""" + response = await _invoke_helper_async( + "find_robots", + { + "timeout": timeout, + "prefixes": list(prefixes), + }, + helper_app_path=helper_app_path, + on_progress=on_progress, + ) + devices = [ + Go2Device(name=str(d["name"]), address=str(d["address"]), serial=d.get("serial")) + for d in response.get("devices", []) + ] + if on_device is not None: + for device in devices: + on_device(device) + return devices + + +async def provision_wifi( + address: str, + ssid: str, + password: str, + country_code: str = "US", + *, + timeout: float = 30.0, + connect_retries: int = 3, + helper_app_path: str | Path | None = None, + on_progress: Callable[[str], None] | None = None, +) -> str | None: + """Provision Wi-Fi through the macOS LaunchServices BLE helper.""" + response = await _invoke_helper_async( + "provision_wifi", + { + "address": address, + "ssid": ssid, + "country_code": country_code, + "timeout": timeout, + "connect_retries": connect_retries, + }, + password=password, + helper_app_path=helper_app_path, + on_progress=on_progress, + secrets=(password,), + ) + serial = response.get("serial") + return str(serial) if serial is not None else None + + +def _invoke_helper( + action: str, + params: JsonObject, + *, + password: str | None = None, + helper_app_path: str | Path | None = None, + on_progress: Callable[[str], None] | None = None, + secrets: Sequence[str] = (), +) -> JsonObject: + helper_app = locate_helper_app(helper_app_path) + safe_secrets = tuple(secret for secret in secrets if secret) + + with tempfile.TemporaryDirectory(prefix="dimos-go2-ble-") as tmp: + tmp_path = Path(tmp) + tmp_path.chmod(0o700) + request_path = tmp_path / "request.json" + response_path = tmp_path / "response.json" + progress_path = tmp_path / "progress.jsonl" + password_path = tmp_path / "password.txt" + + request: JsonObject = { + "schema_version": 1, + "action": action, + "params": params, + } + if password is not None: + _write_text_0600(password_path, password) + request["password_file"] = str(password_path) + + _write_json_0600(request_path, request) + _touch_0600(response_path) + _touch_0600(progress_path) + + cmd = _open_command(helper_app, request_path, response_path, progress_path) + completed = subprocess.run(cmd, check=False, capture_output=True, text=True) + + _emit_progress(progress_path, on_progress, secrets=safe_secrets) + + if completed.returncode != 0: + stderr = _sanitize_text(completed.stderr or completed.stdout or "", safe_secrets) + raise HelperLaunchError( + f"BLE helper exited with status {completed.returncode}: {stderr.strip()}" + ) + + response = _read_json_file(response_path) + if not response: + raise HelperLaunchError("BLE helper did not write a response") + if not response.get("ok"): + error = _sanitize_text(str(response.get("error", "BLE helper failed")), safe_secrets) + raise MacOSBLEHelperError(error) + return response + + +def _open_command( + helper_app_path: Path, + request_path: Path, + response_path: Path, + progress_path: Path, +) -> list[str]: + return [ + OPEN_COMMAND, + "-W", + "-n", + str(helper_app_path), + "--args", + "--helper-request", + str(request_path), + "--helper-response", + str(response_path), + "--helper-progress", + str(progress_path), + ] + + +def _write_text_0600(path: Path, text: str) -> None: + fd = os.open(path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600) + os.fchmod(fd, 0o600) + with os.fdopen(fd, "w", encoding="utf-8") as f: + f.write(text) + + +def _write_json_0600(path: Path, payload: JsonObject) -> None: + fd = os.open(path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600) + os.fchmod(fd, 0o600) + with os.fdopen(fd, "w", encoding="utf-8") as f: + json.dump(payload, f, separators=(",", ":")) + f.write("\n") + + +def _touch_0600(path: Path) -> None: + fd = os.open(path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600) + os.fchmod(fd, 0o600) + os.close(fd) + + +def _read_json_file(path: Path) -> JsonObject: + text = path.read_text(encoding="utf-8").strip() + if not text: + return {} + data = json.loads(text) + if not isinstance(data, dict): + raise HelperLaunchError(f"expected JSON object in {path}") + return data + + +def _append_progress(path: Path, message: str, *, secrets: Sequence[str] = ()) -> None: + safe_message = _sanitize_text(message, secrets) + fd = os.open(path, os.O_WRONLY | os.O_CREAT | os.O_APPEND, 0o600) + os.fchmod(fd, 0o600) + with os.fdopen(fd, "a", encoding="utf-8") as f: + json.dump({"message": safe_message}, f, separators=(",", ":")) + f.write("\n") + + +def _emit_progress( + progress_path: Path, + on_progress: Callable[[str], None] | None, + *, + secrets: Sequence[str] = (), +) -> None: + if on_progress is None or not progress_path.exists(): + return + for line in progress_path.read_text(encoding="utf-8").splitlines(): + if not line: + continue + try: + payload = json.loads(line) + message = str(payload.get("message", "")) + except json.JSONDecodeError: + message = line + if message: + on_progress(_sanitize_text(message, secrets)) + + +def _sanitize_text(text: str, secrets: Sequence[str]) -> str: + safe = text + for secret in secrets: + if secret: + safe = safe.replace(secret, "") + return safe + + +def _assert_private_file(path: Path) -> None: + mode = stat.S_IMODE(path.stat().st_mode) + if mode & 0o077: + raise HelperLaunchError(f"helper file is not private: {path} mode={oct(mode)}") + + +async def _execute_helper_request(request: JsonObject, progress_path: Path) -> JsonObject: + action = request.get("action") + params = request.get("params", {}) + if not isinstance(params, dict): + raise HelperLaunchError("helper request params must be a JSON object") + + if action == "find_robots": + devices: list[Go2Device] = [] + + def _on_device(device: Go2Device) -> None: + devices.append(device) + _append_progress( + progress_path, + f"found {device.name} {device.address} serial={device.serial or '?'}", + ) + + result = await ble.find_robots( + timeout=float(params.get("timeout", 15.0)), + prefixes=tuple(params.get("prefixes", ble.UNITREE_NAME_PREFIXES)), + on_device=_on_device, + ) + by_address = {device.address: device for device in devices} + for device in result: + by_address[device.address] = device + return {"ok": True, "devices": [asdict(device) for device in by_address.values()]} + + if action == "provision_wifi": + password_file = request.get("password_file") + if not isinstance(password_file, str) or not password_file: + raise HelperLaunchError("provision_wifi request is missing password_file") + password_path = Path(password_file) + _assert_private_file(password_path) + password = password_path.read_text(encoding="utf-8") + + def _on_progress(message: str) -> None: + _append_progress(progress_path, message, secrets=(password,)) + + serial = await ble.provision_wifi( + address=str(params["address"]), + ssid=str(params["ssid"]), + password=password, + country_code=str(params.get("country_code", "US")), + timeout=float(params.get("timeout", 30.0)), + connect_retries=int(params.get("connect_retries", 3)), + on_progress=_on_progress, + ) + return {"ok": True, "serial": serial} + + raise HelperLaunchError(f"unknown helper action: {action}") + + +def _run_helper_request(request_path: Path, response_path: Path, progress_path: Path) -> int: + secrets: tuple[str, ...] = () + try: + _assert_private_file(request_path) + _assert_private_file(response_path) + _assert_private_file(progress_path) + request = _read_json_file(request_path) + + password_file = request.get("password_file") + if isinstance(password_file, str) and password_file: + password_path = Path(password_file) + _assert_private_file(password_path) + secrets = (password_path.read_text(encoding="utf-8"),) + + response = asyncio.run(_execute_helper_request(request, progress_path)) + except Exception as exc: + response = { + "ok": False, + "error_type": type(exc).__name__, + "error": _sanitize_text(f"{type(exc).__name__}: {exc}", secrets), + } + _write_json_0600(response_path, response) + return 0 + + +def main(argv: Sequence[str] | None = None) -> int: + parser = argparse.ArgumentParser(description="DimOS macOS BLE helper") + parser.add_argument("--helper-run", action="store_true") + parser.add_argument("--helper-request") + parser.add_argument("--helper-response") + parser.add_argument("--helper-progress") + args = parser.parse_args(argv) + + if not args.helper_run: + parser.error("this module is intended to be launched by the DimOS BLE helper app") + if not args.helper_request or not args.helper_response or not args.helper_progress: + parser.error("helper request, response, and progress paths are required") + + return _run_helper_request( + Path(args.helper_request), + Path(args.helper_response), + Path(args.helper_progress), + ) + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/dimos/robot/unitree/go2/cli/setup.py b/dimos/robot/unitree/go2/cli/setup.py new file mode 100644 index 0000000000..1229654363 --- /dev/null +++ b/dimos/robot/unitree/go2/cli/setup.py @@ -0,0 +1,405 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Guided Go2 Wi-Fi setup orchestration without Typer wiring.""" + +from __future__ import annotations + +import asyncio +from collections.abc import Awaitable, Callable, Sequence +from dataclasses import dataclass, replace +from typing import Any, Protocol + +from dimos.robot.unitree.go2.cli.verify import Go2VerifyStatus, verify_robot_ip + + +class BluetoothRobot(Protocol): + @property + def name(self) -> str: ... + + @property + def address(self) -> str: ... + + @property + def serial(self) -> str | None: ... + + +class LanRobot(Protocol): + @property + def serial(self) -> str: ... + + @property + def ip(self) -> str: ... + + +BleDiscovery = Callable[[], Awaitable[Sequence[BluetoothRobot]]] +LanDiscovery = Callable[[], Awaitable[Sequence[LanRobot]]] +WifiProvisioner = Callable[[str, str, str, str], Awaitable[str | None]] +RobotVerifier = Callable[[str], Go2VerifyStatus] + + +@dataclass(frozen=True) +class Go2BleSelection: + name: str | None + address: str + serial: str | None + + def to_dict(self) -> dict[str, Any]: + return { + "name": self.name, + "address": self.address, + "serial": self.serial, + } + + +@dataclass(frozen=True) +class Go2LanSelection: + serial: str + ip: str + mac: str | None = None + iface: str | None = None + + def to_dict(self) -> dict[str, Any]: + return { + "serial": self.serial, + "ip": self.ip, + "mac": self.mac, + "iface": self.iface, + } + + +@dataclass(frozen=True) +class SetupStep: + name: str + ok: bool + message: str + + def to_dict(self) -> dict[str, Any]: + return { + "name": self.name, + "ok": self.ok, + "message": self.message, + } + + +@dataclass(frozen=True) +class Go2WifiSetupResult: + ok: bool + selected_ble: Go2BleSelection | None + lan_robot: Go2LanSelection | None + verification: Go2VerifyStatus | None + next_steps: tuple[str, ...] + steps: tuple[SetupStep, ...] + error: str | None = None + + @property + def robot_ip(self) -> str | None: + return self.lan_robot.ip if self.lan_robot else None + + def to_dict(self) -> dict[str, Any]: + return { + "ok": self.ok, + "selected_ble": self.selected_ble.to_dict() if self.selected_ble else None, + "lan_robot": self.lan_robot.to_dict() if self.lan_robot else None, + "robot_ip": self.robot_ip, + "verification": self.verification.to_dict() if self.verification else None, + "next_steps": list(self.next_steps), + "steps": [step.to_dict() for step in self.steps], + "error": self.error, + } + + def summary_lines(self) -> tuple[str, ...]: + lines = [ + f"{'OK' if step.ok else 'FAIL'} {step.name}: {step.message}" for step in self.steps + ] + if self.error: + lines.append(f"Error: {self.error}") + lines.extend(self.next_steps) + return tuple(lines) + + +def _redact(text: str | None, secret: str) -> str | None: + if text is None or not secret: + return text + return text.replace(secret, "[REDACTED]") + + +def _redact_status(status: Go2VerifyStatus, secret: str) -> Go2VerifyStatus: + return replace( + status, + reason=_redact(status.reason, secret), + error=_redact(status.error, secret), + ) + + +def _same_address(left: str, right: str) -> bool: + return left.casefold() == right.casefold() + + +def _matches_ble( + robot: BluetoothRobot, + *, + serial: str | None, + name: str | None, + address: str | None, +) -> bool: + if serial is not None and robot.serial != serial: + return False + if name is not None and robot.name != name: + return False + if address is not None and not _same_address(robot.address, address): + return False + return True + + +def _criteria_text(serial: str | None, name: str | None, address: str | None) -> str: + pairs = [ + ("serial", serial), + ("name", name), + ("address", address), + ] + parts = [f"{key}={value}" for key, value in pairs if value is not None] + return ", ".join(parts) if parts else "available robot" + + +def _select_ble_robot( + robots: Sequence[BluetoothRobot], + *, + serial: str | None, + name: str | None, + address: str | None, +) -> tuple[Go2BleSelection | None, str | None]: + if address is not None: + address_matches = [robot for robot in robots if _same_address(robot.address, address)] + matches = [ + robot + for robot in address_matches + if _matches_ble(robot, serial=serial, name=name, address=address) + ] + if len(matches) > 1: + return None, f"Ambiguous BLE robot selection: {len(matches)} robots matched address." + if matches: + robot = matches[0] + return Go2BleSelection( + name=robot.name, address=robot.address, serial=robot.serial + ), None + if address_matches: + return None, f"No BLE robot matched {_criteria_text(serial, name, address)}." + return Go2BleSelection(name=name, address=address, serial=serial), None + + matches = [ + robot for robot in robots if _matches_ble(robot, serial=serial, name=name, address=address) + ] + if not matches: + return None, f"No BLE robot matched {_criteria_text(serial, name, address)}." + if len(matches) > 1: + return ( + None, + f"Ambiguous BLE robot selection: {len(matches)} robots matched " + f"{_criteria_text(serial, name, address)}.", + ) + robot = matches[0] + return Go2BleSelection(name=robot.name, address=robot.address, serial=robot.serial), None + + +def _optional_str_attr(value: Any) -> str | None: + return value if isinstance(value, str) else None + + +def _lan_selection(robot: LanRobot) -> Go2LanSelection: + return Go2LanSelection( + serial=robot.serial, + ip=robot.ip, + mac=_optional_str_attr(getattr(robot, "mac", None)), + iface=_optional_str_attr(getattr(robot, "iface", None)), + ) + + +def _select_lan_robot( + robots: Sequence[LanRobot], + *, + serial: str | None, +) -> tuple[Go2LanSelection | None, str | None]: + if serial is not None: + matches = [robot for robot in robots if robot.serial == serial] + if len(matches) == 1: + return _lan_selection(matches[0]), None + if len(matches) > 1: + return None, f"Ambiguous LAN discovery: {len(matches)} robots matched serial={serial}." + return None, f"No LAN robot matched serial={serial}." + + if len(robots) == 1: + return _lan_selection(robots[0]), None + if robots: + return None, f"Ambiguous LAN discovery: {len(robots)} robots found and no serial is known." + return None, "No LAN robots found." + + +def _next_steps(ip: str) -> tuple[str, ...]: + return ( + f"export ROBOT_IP={ip}", + f"dimos --robot-ip {ip} run unitree-go2", + ) + + +def _result( + *, + ok: bool, + selected_ble: Go2BleSelection | None, + lan_robot: Go2LanSelection | None, + verification: Go2VerifyStatus | None, + next_steps: tuple[str, ...], + steps: list[SetupStep], + error: str | None = None, +) -> Go2WifiSetupResult: + return Go2WifiSetupResult( + ok=ok, + selected_ble=selected_ble, + lan_robot=lan_robot, + verification=verification, + next_steps=next_steps, + steps=tuple(steps), + error=error, + ) + + +async def setup_go2_wifi( + *, + ssid: str, + password: str, + discover_ble: BleDiscovery, + provision_wifi: WifiProvisioner, + discover_lan: LanDiscovery, + serial: str | None = None, + name: str | None = None, + address: str | None = None, + country_code: str = "US", + verify_robot: RobotVerifier | None = None, + rediscovery_attempts: int = 5, + rediscovery_delay_s: float = 2.0, +) -> Go2WifiSetupResult: + """Provision Go2 Wi-Fi over BLE, rediscover it on LAN, then verify WebRTC. + + The caller injects BLE discovery, Wi-Fi provisioning, and LAN discovery + functions. This keeps Typer prompting and concrete backends outside the + orchestration so it can be tested without hardware. + """ + steps: list[SetupStep] = [] + + def add_step(name: str, ok: bool, message: str) -> None: + steps.append(SetupStep(name=name, ok=ok, message=_redact(message, password) or "")) + + def fail( + name: str, + message: str, + *, + selected_ble: Go2BleSelection | None = None, + lan_robot: Go2LanSelection | None = None, + verification: Go2VerifyStatus | None = None, + ) -> Go2WifiSetupResult: + redacted = _redact(message, password) or "" + add_step(name, False, redacted) + return _result( + ok=False, + selected_ble=selected_ble, + lan_robot=lan_robot, + verification=verification, + next_steps=(), + steps=steps, + error=redacted, + ) + + try: + ble_robots = list(await discover_ble()) + except Exception as e: + return fail("ble_discovery", f"BLE discovery failed: {e}") + add_step("ble_discovery", True, f"Found {len(ble_robots)} BLE robot(s).") + + selected_ble, selection_error = _select_ble_robot( + ble_robots, serial=serial, name=name, address=address + ) + if selected_ble is None: + return fail("select_robot", selection_error or "No BLE robot selected.") + add_step( + "select_robot", + True, + f"Selected BLE robot address={selected_ble.address} serial={selected_ble.serial or '?'}", + ) + + try: + provisioned_serial = await provision_wifi( + selected_ble.address, + ssid, + password, + country_code, + ) + except Exception as e: + return fail("provision_wifi", f"Wi-Fi provisioning failed: {e}", selected_ble=selected_ble) + add_step("provision_wifi", True, "Provisioned Wi-Fi credentials over BLE.") + + target_serial = provisioned_serial or selected_ble.serial + lan_robot: Go2LanSelection | None = None + lan_error: str | None = None + attempts = max(1, rediscovery_attempts) + for attempt in range(attempts): + try: + lan_robots = list(await discover_lan()) + except Exception as e: + return fail("lan_discovery", f"LAN discovery failed: {e}", selected_ble=selected_ble) + + lan_robot, lan_error = _select_lan_robot(lan_robots, serial=target_serial) + if lan_robot is not None: + break + if lan_error and lan_error.startswith("Ambiguous"): + return fail("lan_discovery", lan_error, selected_ble=selected_ble) + if attempt + 1 < attempts: + await asyncio.sleep(rediscovery_delay_s) + + if lan_robot is None: + return fail( + "lan_discovery", + lan_error or "No LAN robot found after Wi-Fi provisioning.", + selected_ble=selected_ble, + ) + add_step("lan_discovery", True, f"Found LAN robot {lan_robot.serial} at {lan_robot.ip}.") + + verifier = verify_robot or verify_robot_ip + try: + verification = _redact_status(verifier(lan_robot.ip), password) + except Exception as e: + return fail( + "verify", + f"Verification probe failed: {e}", + selected_ble=selected_ble, + lan_robot=lan_robot, + ) + if not verification.ok: + message = " ".join(verification.summary_lines()) + return fail( + "verify", + message, + selected_ble=selected_ble, + lan_robot=lan_robot, + verification=verification, + ) + add_step("verify", True, f"Verified Go2 WebRTC endpoint at {verification.url}.") + + return _result( + ok=True, + selected_ble=selected_ble, + lan_robot=lan_robot, + verification=verification, + next_steps=_next_steps(lan_robot.ip), + steps=steps, + ) diff --git a/dimos/robot/unitree/go2/cli/test_go2tool.py b/dimos/robot/unitree/go2/cli/test_go2tool.py new file mode 100644 index 0000000000..0503c9721b --- /dev/null +++ b/dimos/robot/unitree/go2/cli/test_go2tool.py @@ -0,0 +1,514 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import asyncio +from collections.abc import AsyncIterator, Callable +from pathlib import Path + +import pytest +from typer.testing import CliRunner + +from dimos.robot.unitree.go2.cli import ( + ble, + go2tool, + landiscovery, + macos_ble_helper, + setup, + verify as verify_module, +) +from dimos.robot.unitree.go2.cli.verify import Go2VerifyStatus + +_runner = CliRunner() + + +def _verify_status(ip: str, *, ok: bool) -> Go2VerifyStatus: + return Go2VerifyStatus( + robot_ip=ip, + url=f"http://{ip}:9991/con_notify", + ok=ok, + status_code=200 if ok else 503, + reason="OK" if ok else "Unavailable", + error=None, + elapsed_s=0.01, + timeout_s=1.0, + ) + + +def test_auto_backend_uses_helper_on_darwin( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, +) -> None: + helper_path = tmp_path / "DimOS BLE Helper.app" + calls: list[tuple[str, Path | None]] = [] + + async def fake_helper_provision_wifi( + address: str, + ssid: str, + password: str, + country_code: str, + *, + timeout: float, + connect_retries: int, + helper_app_path: Path | None = None, + on_progress: Callable[[str], None] | None = None, + ) -> str | None: + assert address == "AA:BB:CC:DD:EE:FF" + assert ssid == "lab-wifi" + assert password == "secret" + assert country_code == "US" + assert timeout == 3.0 + assert connect_retries == go2tool._DEFAULT_BLE_CONNECT_RETRIES + assert on_progress is not None + calls.append(("helper", helper_app_path)) + return "SER123" + + async def fake_direct_provision_wifi(*args: object, **kwargs: object) -> str | None: + pytest.fail("direct BLE backend should not be used on Darwin auto") + + monkeypatch.setattr(go2tool.platform, "system", lambda: "Darwin") + monkeypatch.setattr(macos_ble_helper, "provision_wifi", fake_helper_provision_wifi) + monkeypatch.setattr(ble, "provision_wifi", fake_direct_provision_wifi) + + result = _runner.invoke( + go2tool.app, + [ + "connect-wifi", + "--ssid", + "lab-wifi", + "--password", + "secret", + "--mac", + "AA:BB:CC:DD:EE:FF", + "--timeout", + "3", + "--retries", + "1", + "--ble-backend", + "auto", + "--ble-helper", + str(helper_path), + ], + ) + + assert result.exit_code == 0, result.output + assert calls == [("helper", helper_path)] + + +def test_auto_backend_uses_direct_on_linux(monkeypatch: pytest.MonkeyPatch) -> None: + calls: list[str] = [] + + async def fake_direct_provision_wifi( + address: str, + ssid: str, + password: str, + country_code: str, + *, + timeout: float, + connect_retries: int, + on_progress: Callable[[str], None] | None = None, + ) -> str | None: + assert address == "AA:BB:CC:DD:EE:FF" + assert ssid == "lab-wifi" + assert password == "secret" + assert country_code == "US" + assert timeout == 3.0 + assert connect_retries == go2tool._DEFAULT_BLE_CONNECT_RETRIES + assert on_progress is not None + calls.append("direct") + return "SER123" + + async def fake_helper_provision_wifi(*args: object, **kwargs: object) -> str | None: + pytest.fail("helper BLE backend should not be used on Linux auto") + + monkeypatch.setattr(go2tool.platform, "system", lambda: "Linux") + monkeypatch.setattr(ble, "provision_wifi", fake_direct_provision_wifi) + monkeypatch.setattr(macos_ble_helper, "provision_wifi", fake_helper_provision_wifi) + + result = _runner.invoke( + go2tool.app, + [ + "connect-wifi", + "--ssid", + "lab-wifi", + "--password", + "secret", + "--mac", + "AA:BB:CC:DD:EE:FF", + "--timeout", + "3", + "--retries", + "1", + "--ble-backend", + "auto", + ], + ) + + assert result.exit_code == 0, result.output + assert calls == ["direct"] + + +def test_discover_lan_bypasses_ble_backend(monkeypatch: pytest.MonkeyPatch) -> None: + async def fake_discover_lan( + tick: float = 2.0, + timeout: float = 1.5, + iface_ip: str | None = None, + ) -> AsyncIterator[landiscovery.Go2Device]: + assert tick == 0.0 + assert timeout == 1.5 + assert iface_ip is None + yield landiscovery.Go2Device( + serial="SN123", + ip="192.168.1.70", + iface="en0", + mac="00:11:22:33:44:55", + ) + while True: + await asyncio.sleep(1) + + monkeypatch.setattr(go2tool.platform, "system", lambda: "Linux") + monkeypatch.setattr(landiscovery, "discover_lan", fake_discover_lan) + + result = _runner.invoke( + go2tool.app, + [ + "discover", + "--lan", + "--ble-backend", + "helper", + "--timeout", + "0.01", + "--lan-tick", + "0", + ], + ) + + assert result.exit_code == 0, result.output + assert "LAN" in result.output + assert "192.168.1.70" in result.output + + +def test_helper_backend_rejects_timeout_zero(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr(go2tool.platform, "system", lambda: "Darwin") + + result = _runner.invoke( + go2tool.app, + ["discover", "--ble", "--ble-backend", "helper", "--timeout", "0"], + ) + + assert result.exit_code == 1 + assert "BLE helper backend cannot run with --timeout 0" in result.output + + +def test_discover_auto_timeout_zero_uses_direct_ble_on_darwin( + monkeypatch: pytest.MonkeyPatch, +) -> None: + async def fake_discover_ble() -> AsyncIterator[ble.Go2Device]: + yield ble.Go2Device(name="Go2_49060", address="AA:BB:CC:DD:EE:FF", serial="SER123") + + async def fake_helper_find_robots(*args: object, **kwargs: object) -> list[ble.Go2Device]: + pytest.fail("macOS auto discovery with --timeout 0 should use direct BLE") + + monkeypatch.setattr(go2tool.platform, "system", lambda: "Darwin") + monkeypatch.setattr(ble, "discover_ble", fake_discover_ble) + monkeypatch.setattr(macos_ble_helper, "find_robots", fake_helper_find_robots) + + result = _runner.invoke( + go2tool.app, + ["discover", "--ble", "--ble-backend", "auto", "--timeout", "0"], + ) + + assert result.exit_code == 0, result.output + assert "AA:BB:CC:DD:EE:FF" in result.output + assert "SER123" in result.output + + +def test_connect_wifi_mac_skips_scan_and_redacts_prompted_password( + monkeypatch: pytest.MonkeyPatch, +) -> None: + secret = "super-secret-password" + scan_called = False + + async def fake_find_robots(*args: object, **kwargs: object) -> list[ble.Go2Device]: + nonlocal scan_called + scan_called = True + return [] + + async def fake_provision_wifi( + address: str, + ssid: str, + password: str, + country_code: str, + *, + timeout: float, + connect_retries: int, + on_progress: Callable[[str], None] | None = None, + ) -> str | None: + assert address == "AA:BB:CC:DD:EE:FF" + assert ssid == "lab-wifi" + assert password == secret + assert country_code == "US" + assert timeout == 2.0 + assert connect_retries == go2tool._DEFAULT_BLE_CONNECT_RETRIES + assert on_progress is not None + on_progress(f"unsafe progress {secret}") + return "SER123" + + monkeypatch.setattr(go2tool.platform, "system", lambda: "Linux") + monkeypatch.setattr(ble, "find_robots", fake_find_robots) + monkeypatch.setattr(ble, "provision_wifi", fake_provision_wifi) + + result = _runner.invoke( + go2tool.app, + [ + "connect-wifi", + "--ssid", + "lab-wifi", + "--mac", + "AA:BB:CC:DD:EE:FF", + "--timeout", + "2", + "--retries", + "1", + "--ble-backend", + "direct", + ], + input=f"{secret}\n", + ) + + assert result.exit_code == 0, result.output + assert not scan_called + assert secret not in result.output + assert "[REDACTED]" in result.output + assert "Provisioned. Serial: SER123" in result.output + + +def test_connect_wifi_retries_outer_provision_attempts( + monkeypatch: pytest.MonkeyPatch, +) -> None: + connect_retry_counts: list[int] = [] + + async def fake_provision_wifi( + address: str, + ssid: str, + password: str, + country_code: str, + *, + timeout: float, + connect_retries: int, + on_progress: Callable[[str], None] | None = None, + ) -> str | None: + assert address == "AA:BB:CC:DD:EE:FF" + assert ssid == "lab-wifi" + assert password == "secret" + assert country_code == "US" + assert timeout == 2.0 + connect_retry_counts.append(connect_retries) + if len(connect_retry_counts) == 1: + raise RuntimeError("temporary provisioning failure") + return "SER123" + + monkeypatch.setattr(go2tool.platform, "system", lambda: "Linux") + monkeypatch.setattr(ble, "provision_wifi", fake_provision_wifi) + + result = _runner.invoke( + go2tool.app, + [ + "connect-wifi", + "--ssid", + "lab-wifi", + "--password", + "secret", + "--mac", + "AA:BB:CC:DD:EE:FF", + "--timeout", + "2", + "--retries", + "2", + "--ble-backend", + "direct", + ], + ) + + assert result.exit_code == 0, result.output + assert connect_retry_counts == [ + go2tool._DEFAULT_BLE_CONNECT_RETRIES, + go2tool._DEFAULT_BLE_CONNECT_RETRIES, + ] + assert "attempt 1 failed: temporary provisioning failure" in result.output + assert "Provisioned. Serial: SER123" in result.output + + +@pytest.mark.parametrize(("ok", "expected_exit_code"), [(True, 0), (False, 1)]) +def test_verify_exit_codes( + monkeypatch: pytest.MonkeyPatch, + ok: bool, + expected_exit_code: int, +) -> None: + def fake_verify_robot_ip(robot_ip: str, *, timeout_s: float) -> Go2VerifyStatus: + assert robot_ip == "192.168.1.70" + assert timeout_s == 0.25 + return _verify_status(robot_ip, ok=ok) + + monkeypatch.setattr(verify_module, "verify_robot_ip", fake_verify_robot_ip) + + result = _runner.invoke( + go2tool.app, + ["verify", "--robot-ip", "192.168.1.70", "--timeout", "0.25"], + ) + + assert result.exit_code == expected_exit_code + assert "192.168.1.70" in result.output + + +def test_setup_success_prints_summary_and_redacts_secret(monkeypatch: pytest.MonkeyPatch) -> None: + secret = "super-secret-password" + + async def fake_setup_go2_wifi(**kwargs: object) -> setup.Go2WifiSetupResult: + assert kwargs["ssid"] == "lab-wifi" + assert kwargs["password"] == secret + assert kwargs["address"] == "AA:BB:CC:DD:EE:FF" + return setup.Go2WifiSetupResult( + ok=True, + selected_ble=setup.Go2BleSelection( + name="Go2_123", + address="AA:BB:CC:DD:EE:FF", + serial="SN123", + ), + lan_robot=setup.Go2LanSelection( + serial="SN123", + ip="192.168.1.70", + mac="00:11:22:33:44:55", + iface="en0", + ), + verification=_verify_status("192.168.1.70", ok=True), + next_steps=( + "export ROBOT_IP=192.168.1.70", + f"redacted next step {secret}", + ), + steps=( + setup.SetupStep( + name="provision_wifi", + ok=True, + message=f"unsafe step {secret}", + ), + ), + ) + + monkeypatch.setattr(go2tool.platform, "system", lambda: "Linux") + monkeypatch.setattr(setup, "setup_go2_wifi", fake_setup_go2_wifi) + + result = _runner.invoke( + go2tool.app, + [ + "setup", + "--ssid", + "lab-wifi", + "--password", + secret, + "--mac", + "AA:BB:CC:DD:EE:FF", + "--ble-backend", + "direct", + ], + ) + + assert result.exit_code == 0, result.output + assert "result=ok robot_ip=192.168.1.70 serial=SN123" in result.output + assert "export ROBOT_IP=192.168.1.70" in result.output + assert secret not in result.output + assert "[REDACTED]" in result.output + + +def test_setup_retries_outer_provision_attempts(monkeypatch: pytest.MonkeyPatch) -> None: + connect_retry_counts: list[int] = [] + + async def fake_provision_wifi( + address: str, + ssid: str, + password: str, + country_code: str, + *, + timeout: float, + connect_retries: int, + on_progress: Callable[[str], None] | None = None, + ) -> str | None: + assert address == "AA:BB:CC:DD:EE:FF" + assert ssid == "lab-wifi" + assert password == "secret" + assert country_code == "US" + assert timeout == 2.0 + connect_retry_counts.append(connect_retries) + if len(connect_retry_counts) == 1: + raise RuntimeError("temporary provisioning failure") + return "SER123" + + def fake_discover_lan(timeout: float = 2.0) -> list[landiscovery.Go2Device]: + assert timeout == 0.1 + return [ + landiscovery.Go2Device( + serial="SER123", + ip="192.168.1.70", + iface="en0", + mac="00:11:22:33:44:55", + ) + ] + + def fake_verify_robot_ip(robot_ip: str, *, timeout_s: float) -> Go2VerifyStatus: + assert robot_ip == "192.168.1.70" + assert timeout_s == 0.25 + return _verify_status(robot_ip, ok=True) + + monkeypatch.setattr(go2tool.platform, "system", lambda: "Linux") + monkeypatch.setattr(ble, "provision_wifi", fake_provision_wifi) + monkeypatch.setattr(landiscovery, "discover", fake_discover_lan) + monkeypatch.setattr(verify_module, "verify_robot_ip", fake_verify_robot_ip) + + result = _runner.invoke( + go2tool.app, + [ + "setup", + "--ssid", + "lab-wifi", + "--password", + "secret", + "--mac", + "AA:BB:CC:DD:EE:FF", + "--serial", + "SER123", + "--timeout", + "2", + "--retries", + "2", + "--lan-timeout", + "0.1", + "--rediscovery-attempts", + "1", + "--rediscovery-delay", + "0", + "--verify-timeout", + "0.25", + "--ble-backend", + "direct", + ], + ) + + assert result.exit_code == 0, result.output + assert connect_retry_counts == [ + go2tool._DEFAULT_BLE_CONNECT_RETRIES, + go2tool._DEFAULT_BLE_CONNECT_RETRIES, + ] + assert "attempt 1 failed: temporary provisioning failure" in result.output + assert "result=ok robot_ip=192.168.1.70 serial=SER123" in result.output diff --git a/dimos/robot/unitree/go2/cli/test_macos_ble_helper.py b/dimos/robot/unitree/go2/cli/test_macos_ble_helper.py new file mode 100644 index 0000000000..e8877788c4 --- /dev/null +++ b/dimos/robot/unitree/go2/cli/test_macos_ble_helper.py @@ -0,0 +1,300 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import json +from pathlib import Path +import plistlib +import stat +import subprocess +import threading + +import pytest + +from dimos.robot.unitree.go2.cli import macos_ble_helper + + +def _make_helper_app(tmp_path: Path, *, missing_keys: set[str] | None = None) -> Path: + app_path = tmp_path / "Test BLE Helper.app" + macos_dir = app_path / "Contents" / "MacOS" + macos_dir.mkdir(parents=True) + + executable = macos_dir / "helper" + executable.write_text("#!/bin/sh\nexit 0\n", encoding="utf-8") + executable.chmod(0o755) + + plist: dict[str, object] = { + "CFBundleExecutable": "helper", + "CFBundleIdentifier": "com.example.dimos.testblehelper", + "CFBundleName": "Test BLE Helper", + } + for key in macos_ble_helper.REQUIRED_BLUETOOTH_USAGE_KEYS: + plist[key] = "Bluetooth is used to provision Unitree robots." + for key in missing_keys or set(): + plist.pop(key, None) + + with (app_path / "Contents" / "Info.plist").open("wb") as f: + plistlib.dump(plist, f) + + return app_path + + +def _mode(path: Path) -> int: + return stat.S_IMODE(path.stat().st_mode) + + +def test_validate_helper_app_accepts_required_bluetooth_metadata(tmp_path: Path) -> None: + app_path = _make_helper_app(tmp_path) + + info = macos_ble_helper.validate_helper_app(app_path) + + assert info.app_path == app_path + assert info.bundle_identifier == "com.example.dimos.testblehelper" + assert info.executable_path.name == "helper" + + +@pytest.mark.parametrize("missing_key", macos_ble_helper.REQUIRED_BLUETOOTH_USAGE_KEYS) +def test_validate_helper_app_rejects_missing_bluetooth_usage_key( + tmp_path: Path, + missing_key: str, +) -> None: + app_path = _make_helper_app(tmp_path, missing_keys={missing_key}) + + with pytest.raises(macos_ble_helper.HelperAppValidationError, match=missing_key): + macos_ble_helper.validate_helper_app(app_path) + + +def test_locate_helper_app_builds_cached_app_with_bluetooth_keys( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, +) -> None: + monkeypatch.setattr(macos_ble_helper.platform, "system", lambda: "Darwin") + monkeypatch.setattr(macos_ble_helper, "STATE_DIR", tmp_path) + + app_path = macos_ble_helper.locate_helper_app() + + assert app_path == tmp_path / "helpers" / "macos-ble" / macos_ble_helper.HELPER_APP_NAME + info = macos_ble_helper.validate_helper_app(app_path) + plist = plistlib.loads(info.info_plist_path.read_bytes()) + for key in macos_ble_helper.REQUIRED_BLUETOOTH_USAGE_KEYS: + assert plist[key] + + +@pytest.mark.asyncio +async def test_find_robots_rejects_non_macos(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr(macos_ble_helper.platform, "system", lambda: "Linux") + + with pytest.raises(macos_ble_helper.MacOSBLEUnsupportedError, match="only supported on macOS"): + await macos_ble_helper.find_robots(timeout=0.1) + + +@pytest.mark.asyncio +async def test_helper_async_wrappers_run_off_event_loop(monkeypatch: pytest.MonkeyPatch) -> None: + loop_thread = threading.current_thread() + helper_threads: list[threading.Thread] = [] + calls: list[str] = [] + + def fake_invoke_helper( + action: str, + params: macos_ble_helper.JsonObject, + **kwargs: object, + ) -> macos_ble_helper.JsonObject: + helper_threads.append(threading.current_thread()) + calls.append(action) + assert threading.current_thread() is not loop_thread + if action == "find_robots": + assert params["timeout"] == 2.0 + return { + "ok": True, + "devices": [ + {"name": "Go2_49060", "address": "AA:BB:CC:DD:EE:FF", "serial": "SER123"} + ], + } + if action == "provision_wifi": + assert params["connect_retries"] == 3 + assert kwargs["password"] == "secret" + return {"ok": True, "serial": "SER123"} + raise AssertionError(f"unexpected action {action}") + + monkeypatch.setattr(macos_ble_helper.platform, "system", lambda: "Darwin") + monkeypatch.setattr(macos_ble_helper, "_invoke_helper", fake_invoke_helper) + + devices = await macos_ble_helper.find_robots(timeout=2.0) + serial = await macos_ble_helper.provision_wifi( + "AA:BB:CC:DD:EE:FF", + "lab-wifi", + "secret", + "US", + connect_retries=3, + ) + + assert calls == ["find_robots", "provision_wifi"] + assert all(thread is not loop_thread for thread in helper_threads) + assert devices[0].serial == "SER123" + assert serial == "SER123" + + +@pytest.mark.asyncio +async def test_find_robots_uses_launchservices_helper( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, +) -> None: + app_path = _make_helper_app(tmp_path) + calls: list[list[str]] = [] + seen_devices: list[macos_ble_helper.Go2Device] = [] + + def fake_run( + cmd: list[str], + *, + check: bool, + capture_output: bool, + text: bool, + ) -> subprocess.CompletedProcess[str]: + assert check is False + assert capture_output is True + assert text is True + calls.append(cmd) + + request_path = Path(cmd[cmd.index("--helper-request") + 1]) + response_path = Path(cmd[cmd.index("--helper-response") + 1]) + progress_path = Path(cmd[cmd.index("--helper-progress") + 1]) + request = json.loads(request_path.read_text(encoding="utf-8")) + + assert request == { + "schema_version": 1, + "action": "find_robots", + "params": {"timeout": 2.5, "prefixes": ["Go2_"]}, + } + assert _mode(request_path) == 0o600 + assert _mode(response_path) == 0o600 + assert _mode(progress_path) == 0o600 + + response_path.write_text( + json.dumps( + { + "ok": True, + "devices": [ + {"name": "Go2_49060", "address": "AA:BB:CC:DD:EE:FF", "serial": "SER123"} + ], + } + ), + encoding="utf-8", + ) + return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="") + + monkeypatch.setattr(macos_ble_helper.platform, "system", lambda: "Darwin") + monkeypatch.setattr(macos_ble_helper.subprocess, "run", fake_run) + + devices = await macos_ble_helper.find_robots( + timeout=2.5, + prefixes=("Go2_",), + helper_app_path=app_path, + on_device=seen_devices.append, + ) + + assert calls[0][:3] == [macos_ble_helper.OPEN_COMMAND, "-W", "-n"] + assert calls[0][3] == str(app_path) + assert "--args" in calls[0] + assert devices == seen_devices + assert devices[0].serial == "SER123" + + +@pytest.mark.asyncio +async def test_provision_wifi_keeps_password_out_of_argv_request_and_progress( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, +) -> None: + app_path = _make_helper_app(tmp_path) + password = "secret-wifi-password" + progress: list[str] = [] + captured_cmds: list[list[str]] = [] + + def fake_run( + cmd: list[str], + *, + check: bool, + capture_output: bool, + text: bool, + ) -> subprocess.CompletedProcess[str]: + assert check is False + assert capture_output is True + assert text is True + captured_cmds.append(cmd) + assert password not in " ".join(cmd) + + request_path = Path(cmd[cmd.index("--helper-request") + 1]) + response_path = Path(cmd[cmd.index("--helper-response") + 1]) + progress_path = Path(cmd[cmd.index("--helper-progress") + 1]) + request_text = request_path.read_text(encoding="utf-8") + request = json.loads(request_text) + + assert password not in request_text + assert request["action"] == "provision_wifi" + assert request["params"] == { + "address": "AA:BB:CC:DD:EE:FF", + "ssid": "lab-wifi", + "country_code": "US", + "timeout": 7.0, + "connect_retries": 2, + } + + password_path = Path(request["password_file"]) + assert _mode(request_path) == 0o600 + assert _mode(response_path) == 0o600 + assert _mode(progress_path) == 0o600 + assert _mode(password_path) == 0o600 + assert password_path.read_text(encoding="utf-8") == password + + progress_path.write_text( + json.dumps({"message": f"unsafe progress contains {password}"}) + "\n", + encoding="utf-8", + ) + response_path.write_text(json.dumps({"ok": True, "serial": "SER123"}), encoding="utf-8") + return subprocess.CompletedProcess(cmd, 0, stdout=f"stdout {password}", stderr="") + + monkeypatch.setattr(macos_ble_helper.platform, "system", lambda: "Darwin") + monkeypatch.setattr(macos_ble_helper.subprocess, "run", fake_run) + + serial = await macos_ble_helper.provision_wifi( + "AA:BB:CC:DD:EE:FF", + "lab-wifi", + password, + "US", + timeout=7.0, + connect_retries=2, + helper_app_path=app_path, + on_progress=progress.append, + ) + + assert serial == "SER123" + assert captured_cmds + assert all(password not in item for cmd in captured_cmds for item in cmd) + assert progress == ["unsafe progress contains "] + + +def test_helper_progress_writer_redacts_password(tmp_path: Path) -> None: + progress_path = tmp_path / "progress.jsonl" + password = "secret-wifi-password" + + macos_ble_helper._append_progress( + progress_path, + f"set password {password}", + secrets=(password,), + ) + + text = progress_path.read_text(encoding="utf-8") + assert password not in text + assert "" in text + assert _mode(progress_path) == 0o600 diff --git a/dimos/robot/unitree/go2/cli/test_setup.py b/dimos/robot/unitree/go2/cli/test_setup.py new file mode 100644 index 0000000000..09fe37d3b8 --- /dev/null +++ b/dimos/robot/unitree/go2/cli/test_setup.py @@ -0,0 +1,290 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from dataclasses import dataclass + +from dimos.robot.unitree.go2.cli.setup import setup_go2_wifi +from dimos.robot.unitree.go2.cli.verify import Go2VerifyStatus + + +@dataclass(frozen=True) +class _BleRobot: + name: str + address: str + serial: str | None = None + + +@dataclass(frozen=True) +class _LanRobot: + serial: str + ip: str + mac: str | None = None + iface: str | None = None + + +def _ok_verify(ip: str) -> Go2VerifyStatus: + return Go2VerifyStatus( + robot_ip=ip, + url=f"http://{ip}:9991/con_notify", + ok=True, + status_code=200, + reason="OK", + error=None, + elapsed_s=0.01, + timeout_s=1.0, + ) + + +def _failed_verify(ip: str) -> Go2VerifyStatus: + return Go2VerifyStatus( + robot_ip=ip, + url=f"http://{ip}:9991/con_notify", + ok=False, + status_code=503, + reason="Unavailable", + error=None, + elapsed_s=0.01, + timeout_s=1.0, + ) + + +async def test_setup_success_orchestrates_provision_discovery_verify() -> None: + calls: list[str] = [] + + async def discover_ble() -> list[_BleRobot]: + calls.append("discover_ble") + return [_BleRobot(name="Go2_123", address="AA:BB:CC", serial="SN123")] + + async def provision_wifi( + address: str, + ssid: str, + password: str, + country_code: str, + ) -> str | None: + calls.append(f"provision:{address}:{ssid}:{country_code}") + assert password == "secret-password" + return "SN123" + + async def discover_lan() -> list[_LanRobot]: + calls.append("discover_lan") + return [_LanRobot(serial="SN123", ip="192.168.1.70", mac="00:11", iface="en0")] + + def verify(ip: str) -> Go2VerifyStatus: + calls.append(f"verify:{ip}") + return _ok_verify(ip) + + result = await setup_go2_wifi( + ssid="LabWiFi", + password="secret-password", + discover_ble=discover_ble, + provision_wifi=provision_wifi, + discover_lan=discover_lan, + serial="SN123", + verify_robot=verify, + rediscovery_attempts=1, + rediscovery_delay_s=0, + ) + + assert result.ok + assert result.robot_ip == "192.168.1.70" + assert result.next_steps == ( + "export ROBOT_IP=192.168.1.70", + "dimos --robot-ip 192.168.1.70 run unitree-go2", + ) + assert calls == [ + "discover_ble", + "provision:AA:BB:CC:LabWiFi:US", + "discover_lan", + "verify:192.168.1.70", + ] + + +async def test_setup_returns_verification_failure() -> None: + async def discover_ble() -> list[_BleRobot]: + return [_BleRobot(name="Go2_123", address="AA:BB:CC", serial="SN123")] + + async def provision_wifi( + address: str, + ssid: str, + password: str, + country_code: str, + ) -> str | None: + return "SN123" + + async def discover_lan() -> list[_LanRobot]: + return [_LanRobot(serial="SN123", ip="192.168.1.70")] + + result = await setup_go2_wifi( + ssid="LabWiFi", + password="secret-password", + discover_ble=discover_ble, + provision_wifi=provision_wifi, + discover_lan=discover_lan, + verify_robot=_failed_verify, + rediscovery_attempts=1, + rediscovery_delay_s=0, + ) + + assert not result.ok + assert result.robot_ip == "192.168.1.70" + assert result.verification is not None + assert not result.verification.ok + assert result.next_steps == () + assert result.error is not None + assert "HTTP 503" in result.error + + +async def test_setup_no_robot_found_does_not_provision() -> None: + provision_called = False + + async def discover_ble() -> list[_BleRobot]: + return [] + + async def provision_wifi( + address: str, + ssid: str, + password: str, + country_code: str, + ) -> str | None: + nonlocal provision_called + provision_called = True + return None + + async def discover_lan() -> list[_LanRobot]: + return [] + + result = await setup_go2_wifi( + ssid="LabWiFi", + password="secret-password", + discover_ble=discover_ble, + provision_wifi=provision_wifi, + discover_lan=discover_lan, + rediscovery_attempts=1, + rediscovery_delay_s=0, + ) + + assert not result.ok + assert not provision_called + assert result.error == "No BLE robot matched available robot." + + +async def test_setup_ambiguous_robot_selection_does_not_provision() -> None: + provision_called = False + + async def discover_ble() -> list[_BleRobot]: + return [ + _BleRobot(name="Go2_123", address="AA:BB:01", serial="SN1"), + _BleRobot(name="Go2_456", address="AA:BB:02", serial="SN2"), + ] + + async def provision_wifi( + address: str, + ssid: str, + password: str, + country_code: str, + ) -> str | None: + nonlocal provision_called + provision_called = True + return None + + async def discover_lan() -> list[_LanRobot]: + return [] + + result = await setup_go2_wifi( + ssid="LabWiFi", + password="secret-password", + discover_ble=discover_ble, + provision_wifi=provision_wifi, + discover_lan=discover_lan, + rediscovery_attempts=1, + rediscovery_delay_s=0, + ) + + assert not result.ok + assert not provision_called + assert result.error is not None + assert "Ambiguous BLE robot selection" in result.error + + +async def test_setup_rejects_conflicting_address_and_serial() -> None: + provision_called = False + + async def discover_ble() -> list[_BleRobot]: + return [_BleRobot(name="Go2_123", address="AA:BB:CC", serial="SN123")] + + async def provision_wifi( + address: str, + ssid: str, + password: str, + country_code: str, + ) -> str | None: + nonlocal provision_called + provision_called = True + return None + + async def discover_lan() -> list[_LanRobot]: + return [] + + result = await setup_go2_wifi( + ssid="LabWiFi", + password="secret-password", + discover_ble=discover_ble, + provision_wifi=provision_wifi, + discover_lan=discover_lan, + address="AA:BB:CC", + serial="OTHER-SERIAL", + rediscovery_attempts=1, + rediscovery_delay_s=0, + ) + + assert not result.ok + assert not provision_called + assert result.error is not None + assert "No BLE robot matched" in result.error + + +async def test_setup_redacts_password_from_failure_result_and_summary() -> None: + secret = "super-secret-password" + + async def discover_ble() -> list[_BleRobot]: + return [_BleRobot(name="Go2_123", address="AA:BB:CC", serial="SN123")] + + async def provision_wifi( + address: str, + ssid: str, + password: str, + country_code: str, + ) -> str | None: + raise RuntimeError(f"provision failed with password {password}") + + async def discover_lan() -> list[_LanRobot]: + return [] + + result = await setup_go2_wifi( + ssid="LabWiFi", + password=secret, + discover_ble=discover_ble, + provision_wifi=provision_wifi, + discover_lan=discover_lan, + rediscovery_attempts=1, + rediscovery_delay_s=0, + ) + + assert not result.ok + assert secret not in repr(result) + assert secret not in "\n".join(result.summary_lines()) + assert result.error is not None + assert "[REDACTED]" in result.error diff --git a/dimos/robot/unitree/go2/cli/test_verify.py b/dimos/robot/unitree/go2/cli/test_verify.py new file mode 100644 index 0000000000..c6a97f63be --- /dev/null +++ b/dimos/robot/unitree/go2/cli/test_verify.py @@ -0,0 +1,69 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from dataclasses import dataclass + +import requests + +from dimos.robot.unitree.go2.cli.verify import con_notify_url, verify_robot_ip + + +@dataclass +class _Response: + status_code: int + reason: str = "OK" + + +def test_verify_robot_ip_probes_con_notify_without_movement() -> None: + calls: list[tuple[str, float]] = [] + + def fake_get(url: str, *, timeout: float) -> _Response: + calls.append((url, timeout)) + return _Response(status_code=200) + + result = verify_robot_ip("192.168.1.50", timeout_s=0.25, http_get=fake_get) + + assert result.ok + assert result.url == "http://192.168.1.50:9991/con_notify" + assert calls == [("http://192.168.1.50:9991/con_notify", 0.25)] + + +def test_verify_robot_ip_returns_structured_failure_on_timeout() -> None: + def fake_get(url: str, *, timeout: float) -> _Response: + raise requests.Timeout(f"timeout for {url} after {timeout}") + + result = verify_robot_ip("192.168.1.51", timeout_s=0.1, http_get=fake_get) + + assert not result.ok + assert result.status_code is None + assert result.error is not None + assert "timeout" in result.error + assert result.to_dict()["url"] == "http://192.168.1.51:9991/con_notify" + + +def test_verify_robot_ip_returns_http_status_failure() -> None: + result = verify_robot_ip( + "192.168.1.52", + http_get=lambda _url, *, timeout: _Response(status_code=500, reason="Server Error"), + ) + + assert not result.ok + assert result.status_code == 500 + assert "HTTP 500 Server Error" in result.summary_lines()[0] + + +def test_con_notify_url_uses_go2_webrtc_port_and_path() -> None: + assert con_notify_url(" 192.168.1.53 ") == "http://192.168.1.53:9991/con_notify" diff --git a/dimos/robot/unitree/go2/cli/verify.py b/dimos/robot/unitree/go2/cli/verify.py new file mode 100644 index 0000000000..befc5514e1 --- /dev/null +++ b/dimos/robot/unitree/go2/cli/verify.py @@ -0,0 +1,114 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""No-movement Go2 LAN verification helpers.""" + +from __future__ import annotations + +from dataclasses import dataclass +import time +from typing import Any, Protocol + +import requests + +CON_NOTIFY_PORT = 9991 +CON_NOTIFY_PATH = "/con_notify" +DEFAULT_VERIFY_TIMEOUT_S = 1.0 + + +class HttpGet(Protocol): + def __call__(self, url: str, *, timeout: float) -> requests.Response: ... + + +@dataclass(frozen=True) +class Go2VerifyStatus: + """Structured result for probing a Go2 WebRTC signaling endpoint.""" + + robot_ip: str + url: str + ok: bool + status_code: int | None + reason: str | None + error: str | None + elapsed_s: float + timeout_s: float + + def to_dict(self) -> dict[str, Any]: + return { + "robot_ip": self.robot_ip, + "url": self.url, + "ok": self.ok, + "status_code": self.status_code, + "reason": self.reason, + "error": self.error, + "elapsed_s": self.elapsed_s, + "timeout_s": self.timeout_s, + } + + def summary_lines(self) -> tuple[str, ...]: + if self.ok: + return (f"Go2 WebRTC endpoint reachable at {self.url} (HTTP {self.status_code}).",) + if self.status_code is not None: + return ( + f"Go2 WebRTC endpoint responded at {self.url} " + f"with HTTP {self.status_code} {self.reason or ''}".rstrip() + + ".", + ) + return (f"Go2 WebRTC endpoint unreachable at {self.url}: {self.error}",) + + +def con_notify_url(robot_ip: str) -> str: + """Build the Go2 WebRTC signaling health probe URL.""" + return f"http://{robot_ip.strip()}:{CON_NOTIFY_PORT}{CON_NOTIFY_PATH}" + + +def verify_robot_ip( + robot_ip: str, + *, + timeout_s: float = DEFAULT_VERIFY_TIMEOUT_S, + http_get: HttpGet | None = None, +) -> Go2VerifyStatus: + """Verify that a Go2 is reachable on LAN without sending movement commands. + + This only performs an HTTP GET probe against the robot's WebRTC signaling + endpoint. It does not open a robot control session or publish commands. + """ + url = con_notify_url(robot_ip) + get = http_get or requests.get + started = time.monotonic() + try: + response = get(url, timeout=timeout_s) + except requests.RequestException as e: + return Go2VerifyStatus( + robot_ip=robot_ip, + url=url, + ok=False, + status_code=None, + reason=None, + error=str(e), + elapsed_s=time.monotonic() - started, + timeout_s=timeout_s, + ) + + status_code = response.status_code + return Go2VerifyStatus( + robot_ip=robot_ip, + url=url, + ok=200 <= status_code < 400, + status_code=status_code, + reason=response.reason or None, + error=None, + elapsed_s=time.monotonic() - started, + timeout_s=timeout_s, + ) diff --git a/docs/coding-agents/go2-wifi-provisioning.md b/docs/coding-agents/go2-wifi-provisioning.md new file mode 100644 index 0000000000..fe50c65518 --- /dev/null +++ b/docs/coding-agents/go2-wifi-provisioning.md @@ -0,0 +1,84 @@ +# Go2 Wi-Fi Provisioning + +Use this runbook when a Go2 needs to move from its AP/BLE setup state onto a +site Wi-Fi network, or when `dimos go2tool connect-wifi` behaves differently on +macOS than it does on Linux. + +## Why The Last Run Took Time + +The slow part was not the Go2 provisioning protocol. It was macOS Bluetooth +permission handling. + +`go2tool` uses Bleak over Bluetooth. On macOS, direct Python execution from a +terminal app such as Terminal or Warp can crash or fail before the Bluetooth +permission flow is usable. A LaunchServices-opened `.app` bundle worked because +macOS TCC could authorize Bluetooth access against the app bundle and its +`Info.plist` usage keys. + +If this happens again, treat it as a macOS Bluetooth/TCC issue first, not as a +robot firmware issue. + +## Normal Flow + +Start with discovery. This shows robots seen over BLE and LAN: + +```sh skip +dimos go2tool discover +``` + +If the robot is still in AP/BLE setup mode, provision it onto the target Wi-Fi. +Do not pass the Wi-Fi password in process args. Omit `--password`; the CLI will +prompt with hidden input. + +Public-safe example: move the robot/AP label `dimair09` onto SSID +`dimensional_5G`. The AP label is not necessarily the BLE name, so use +discovery to select the robot interactively: + +```sh skip +dimos go2tool connect-wifi --ssid dimensional_5G +``` + +If more than one robot appears, select the intended robot at the prompt or use +`--name`, `--serial`, or `--mac` from the discovery output. + +## macOS Helper Requirements + +When direct terminal execution crashes on macOS, run the provisioning helper as +a LaunchServices-opened `.app`, for example with `open`, and include these +Bluetooth usage keys in `Info.plist`: + +- `NSBluetoothAlwaysUsageDescription` +- `NSBluetoothPeripheralUsageDescription` +- `NSBluetoothUsageDescription` + +Keep secrets out of process args. Pass the SSID and robot selector through +non-secret configuration, then prompt for the Wi-Fi password inside the helper +or read it from a protected secret source. + +## Post-Provision Checks + +Give the robot time to join the network, then rediscover: + +```sh skip +dimos go2tool discover +``` + +Look for a LAN row with the robot IP. Probe the Go2 WebRTC notification endpoint: + +```sh skip +curl -fsS http://:9991/con_notify +``` + +Then run DimOS with the discovered IP: + +```sh skip +dimos --robot-ip run unitree-go2 +``` + +For agentic stacks, use the same `--robot-ip` flag with the target blueprint. + +## Coding-Agent Safety + +Verification is discovery, endpoint probing, and process startup only. Do not +send movement commands, teleop input, MCP skill calls, or `agent-send` messages +during Wi-Fi provisioning verification. diff --git a/docs/coding-agents/index.md b/docs/coding-agents/index.md index ff778ac5cf..4960d6d520 100644 --- a/docs/coding-agents/index.md +++ b/docs/coding-agents/index.md @@ -1,10 +1,7 @@ # For Agents -├── worktrees.md (creating provisioned worktrees with `bin/worktree`) -├── style.md (code style guidelines for dimos) -├── testing.md (docs about writing tests) -├── docs (these are docs about writing docs) -│   ├── codeblocks.md -│   ├── doclinks.md -│   └── index.md -└── index.md +- [Worktrees](/docs/coding-agents/worktrees.md): creating provisioned worktrees with `bin/worktree` +- [Style](/docs/coding-agents/style.md): code style guidelines for DimOS +- [Testing](/docs/coding-agents/testing.md): docs about writing tests +- [Go2 Wi-Fi Provisioning](/docs/coding-agents/go2-wifi-provisioning.md): AP-to-Wi-Fi setup notes for coding agents +- [Docs](/docs/coding-agents/docs/index.md): docs about writing docs diff --git a/docs/platforms/quadruped/go2/index.md b/docs/platforms/quadruped/go2/index.md index 4e392f06ca..144f1f4cfc 100644 --- a/docs/platforms/quadruped/go2/index.md +++ b/docs/platforms/quadruped/go2/index.md @@ -1,6 +1,6 @@ -# Unitree Go2 — Getting Started +# Unitree Go2 - Getting Started -The Unitree Go2 is DimOS's primary reference platform. Full autonomous navigation, mapping, and agentic control — no ROS required. +The Unitree Go2 is DimOS's primary reference platform. Full autonomous navigation, mapping, and agentic control - no ROS required. ## Requirements @@ -23,7 +23,7 @@ source .venv/bin/activate uv pip install 'dimos[base,unitree]' ``` -## Try It — No Hardware Needed +## Try It - No Hardware Needed ```bash # Replay a recorded Go2 navigation session @@ -31,7 +31,7 @@ uv pip install 'dimos[base,unitree]' dimos --replay run unitree-go2 ``` -Opens the command center at [localhost:7779](http://localhost:7779) with Rerun 3D visualization — watch the Go2 map and navigate an office in real time. +Opens the command center at [localhost:7779](http://localhost:7779) with Rerun 3D visualization - watch the Go2 map and navigate an office in real time. ## Run on Your Go2 @@ -39,7 +39,12 @@ Opens the command center at [localhost:7779](http://localhost:7779) with Rerun 3 Use `dimos go2tool` to provision wifi and find the robot's IP. Skip if the robot is already on your network and you know its IP. -1. Power on the Go2 — it advertises over BLE immediately. +macOS caveat: BLE provisioning can crash when Bleak/Python is run directly from +Terminal or Warp because of Bluetooth permission handling. If that happens, use +a LaunchServices-opened `.app` helper with Bluetooth usage keys. See +[Go2 Wi-Fi Provisioning](/docs/coding-agents/go2-wifi-provisioning.md). + +1. Power on the Go2 - it advertises over BLE immediately. 2. Provision wifi (one-time per network): @@ -52,10 +57,10 @@ dimos go2tool discover configure wifi ```bash -dimos go2tool connect-wifi --ssid --password +dimos go2tool connect-wifi --ssid ``` -Scans BLE and connects to the only robot it finds, or prompts you to pick if there are several. +Scans BLE and connects to the only robot it finds, or prompts you to pick if there are several. The password prompt uses hidden input; avoid passing Wi-Fi passwords in process args. 3. Find the robot's IP: @@ -91,9 +96,9 @@ That's it. DimOS connects via WebRTC (no jailbreak required), starts the full na | Module | What It Does | |--------|-------------| -| **GO2Connection** | WebRTC connection to the robot — streams LiDAR, video, odometry | +| **GO2Connection** | WebRTC connection to the robot - streams LiDAR, video, odometry | | **VoxelGridMapper** | Builds a 3D voxel map using column-carving (CUDA accelerated) | -| **CostMapper** | Converts 3D map → 2D costmap via terrain slope analysis | +| **CostMapper** | Converts 3D map to 2D costmap via terrain slope analysis | | **ReplanningAStarPlanner** | Continuous A* path planning with dynamic replanning | | **WavefrontFrontierExplorer** | Autonomous exploration of unmapped areas | | **RerunBridge** | 3D visualization in browser | @@ -113,7 +118,7 @@ uv pip install 'dimos[base,unitree,sim]' dimos --simulation run unitree-go2 ``` -Full navigation stack in MuJoCo — same code, simulated robot. +Full navigation stack in MuJoCo - same code, simulated robot. ## Agentic Control @@ -132,7 +137,7 @@ humancli > explore the space ``` -The agent subscribes to camera, LiDAR, and spatial memory streams — it sees what the robot sees. +The agent subscribes to camera, LiDAR, and spatial memory streams - it sees what the robot sees. ## Available Blueprints @@ -148,8 +153,8 @@ The agent subscribes to camera, LiDAR, and spatial memory streams — it sees wh ## Deep Dive -- [Navigation Stack](/docs/capabilities/navigation/native/index.md) — column-carving voxel mapping, costmap generation, A* planning -- [Visualization](/docs/usage/visualization.md) — Rerun, performance tuning -- [Data Streams](/docs/usage/data_streams) — RxPY streams, backpressure, quality filtering -- [Transports](/docs/usage/transports/index.md) — LCM, SHM, DDS -- [Blueprints](/docs/usage/blueprints.md) — composing modules +- [Navigation Stack](/docs/capabilities/navigation/native/index.md) - column-carving voxel mapping, costmap generation, A* planning +- [Visualization](/docs/usage/visualization.md) - Rerun, performance tuning +- [Data Streams](/docs/usage/data_streams) - RxPY streams, backpressure, quality filtering +- [Transports](/docs/usage/transports/index.md) - LCM, SHM, DDS +- [Blueprints](/docs/usage/blueprints.md) - composing modules diff --git a/docs/usage/cli.md b/docs/usage/cli.md index 7abd1fa7bb..73c7b2603e 100644 --- a/docs/usage/cli.md +++ b/docs/usage/cli.md @@ -193,6 +193,28 @@ Print resolved GlobalConfig values and their sources. dimos show-config ``` +### `dimos go2tool` + +Provision and verify Unitree Go2 network setup. + +```bash +dimos go2tool discover +dimos go2tool connect-wifi --ssid +dimos go2tool setup --ssid +dimos go2tool verify --robot-ip +``` + +| Command | Description | +|---------|-------------| +| `discover` | Show Go2 robots discovered over BLE and LAN. | +| `connect-wifi` | Send Wi-Fi credentials to a Go2 over BLE. Omit `--password` to use the hidden prompt. | +| `setup` | Provision Wi-Fi, rediscover the robot on LAN, and verify `:9991/con_notify`. | +| `verify` | Probe `http://:9991/con_notify` without sending movement commands. | + +BLE commands support `--ble-backend {auto,helper,direct}` and `--ble-helper`. +On macOS, `auto` uses the LaunchServices helper for finite BLE scans and +provisioning; set `DIMOS_GO2_BLE_HELPER` to point at a custom helper `.app`. + --- ## Agent & MCP Commands From a77f8dcc9330f3a4e25f44946bd969c9caf95fc3 Mon Sep 17 00:00:00 2001 From: Nick Date: Mon, 8 Jun 2026 18:34:20 +0800 Subject: [PATCH 2/4] fix: polish Go2 Wi-Fi setup PR --- dimos/robot/unitree/go2/cli/go2tool.py | 12 ++++- dimos/robot/unitree/go2/cli/test_go2tool.py | 16 ++++++ dimos/robot/unitree/go2/cli/test_verify.py | 54 +++++++++++++++++--- dimos/robot/unitree/go2/cli/verify.py | 55 ++++++++++++++++++--- docs/coding-agents/go2-wifi-provisioning.md | 40 +++++++-------- docs/platforms/quadruped/go2/index.md | 44 ++++++++--------- docs/usage/cli.md | 2 +- 7 files changed, 162 insertions(+), 61 deletions(-) diff --git a/dimos/robot/unitree/go2/cli/go2tool.py b/dimos/robot/unitree/go2/cli/go2tool.py index 85aa331102..349d4de14d 100644 --- a/dimos/robot/unitree/go2/cli/go2tool.py +++ b/dimos/robot/unitree/go2/cli/go2tool.py @@ -306,9 +306,17 @@ def _stop() -> None: grace = 0.25 if backend_runtime is not None and backend_runtime.kind == "helper" else 0 loop.call_later(timeout + grace, _stop) - await asyncio.gather(*tasks, return_exceptions=True) + results = await asyncio.gather(*tasks, return_exceptions=True) + for result in results: + if isinstance(result, asyncio.CancelledError): + continue + if isinstance(result, BaseException): + raise result - asyncio.run(run()) + try: + asyncio.run(run()) + except Exception as e: + _exit_error(str(e)) typer.echo("\nStopped.") diff --git a/dimos/robot/unitree/go2/cli/test_go2tool.py b/dimos/robot/unitree/go2/cli/test_go2tool.py index 0503c9721b..721117402b 100644 --- a/dimos/robot/unitree/go2/cli/test_go2tool.py +++ b/dimos/robot/unitree/go2/cli/test_go2tool.py @@ -235,6 +235,22 @@ async def fake_helper_find_robots(*args: object, **kwargs: object) -> list[ble.G assert "SER123" in result.output +def test_discover_surfaces_ble_backend_errors(monkeypatch: pytest.MonkeyPatch) -> None: + async def fake_find_robots(*args: object, **kwargs: object) -> list[ble.Go2Device]: + raise RuntimeError("invalid helper app") + + monkeypatch.setattr(go2tool.platform, "system", lambda: "Darwin") + monkeypatch.setattr(macos_ble_helper, "find_robots", fake_find_robots) + + result = _runner.invoke( + go2tool.app, + ["discover", "--ble", "--ble-backend", "helper", "--timeout", "0.1"], + ) + + assert result.exit_code == 1 + assert "invalid helper app" in result.output + + def test_connect_wifi_mac_skips_scan_and_redacts_prompted_password( monkeypatch: pytest.MonkeyPatch, ) -> None: diff --git a/dimos/robot/unitree/go2/cli/test_verify.py b/dimos/robot/unitree/go2/cli/test_verify.py index c6a97f63be..fe9f254e45 100644 --- a/dimos/robot/unitree/go2/cli/test_verify.py +++ b/dimos/robot/unitree/go2/cli/test_verify.py @@ -14,7 +14,9 @@ from __future__ import annotations +import base64 from dataclasses import dataclass +import json import requests @@ -25,16 +27,23 @@ class _Response: status_code: int reason: str = "OK" + text: str = "" + + +def _con_notify_text() -> str: + payload = json.dumps({"data1": "0123456789public-key-payload0123456789", "data2": 1}) + return base64.b64encode(payload.encode("utf-8")).decode("ascii") def test_verify_robot_ip_probes_con_notify_without_movement() -> None: calls: list[tuple[str, float]] = [] - def fake_get(url: str, *, timeout: float) -> _Response: + def fake_post(url: str, *, timeout: float, allow_redirects: bool) -> _Response: calls.append((url, timeout)) - return _Response(status_code=200) + assert allow_redirects is False + return _Response(status_code=200, text=_con_notify_text()) - result = verify_robot_ip("192.168.1.50", timeout_s=0.25, http_get=fake_get) + result = verify_robot_ip("192.168.1.50", timeout_s=0.25, http_post=fake_post) assert result.ok assert result.url == "http://192.168.1.50:9991/con_notify" @@ -42,10 +51,11 @@ def fake_get(url: str, *, timeout: float) -> _Response: def test_verify_robot_ip_returns_structured_failure_on_timeout() -> None: - def fake_get(url: str, *, timeout: float) -> _Response: + def fake_post(url: str, *, timeout: float, allow_redirects: bool) -> _Response: + assert allow_redirects is False raise requests.Timeout(f"timeout for {url} after {timeout}") - result = verify_robot_ip("192.168.1.51", timeout_s=0.1, http_get=fake_get) + result = verify_robot_ip("192.168.1.51", timeout_s=0.1, http_post=fake_post) assert not result.ok assert result.status_code is None @@ -57,7 +67,10 @@ def fake_get(url: str, *, timeout: float) -> _Response: def test_verify_robot_ip_returns_http_status_failure() -> None: result = verify_robot_ip( "192.168.1.52", - http_get=lambda _url, *, timeout: _Response(status_code=500, reason="Server Error"), + http_post=lambda _url, *, timeout, allow_redirects: _Response( + status_code=500, + reason="Server Error", + ), ) assert not result.ok @@ -65,5 +78,34 @@ def test_verify_robot_ip_returns_http_status_failure() -> None: assert "HTTP 500 Server Error" in result.summary_lines()[0] +def test_verify_robot_ip_rejects_redirects() -> None: + result = verify_robot_ip( + "192.168.1.54", + http_post=lambda _url, *, timeout, allow_redirects: _Response( + status_code=302, + reason="Found", + ), + ) + + assert not result.ok + assert result.status_code == 302 + assert "HTTP 302 Found" in result.summary_lines()[0] + + +def test_verify_robot_ip_rejects_unexpected_success_payload() -> None: + result = verify_robot_ip( + "192.168.1.55", + http_post=lambda _url, *, timeout, allow_redirects: _Response( + status_code=200, + reason="OK", + text="not the con_notify payload", + ), + ) + + assert not result.ok + assert result.status_code == 200 + assert result.error == "unexpected con_notify response payload" + + def test_con_notify_url_uses_go2_webrtc_port_and_path() -> None: assert con_notify_url(" 192.168.1.53 ") == "http://192.168.1.53:9991/con_notify" diff --git a/dimos/robot/unitree/go2/cli/verify.py b/dimos/robot/unitree/go2/cli/verify.py index befc5514e1..413d31ec9c 100644 --- a/dimos/robot/unitree/go2/cli/verify.py +++ b/dimos/robot/unitree/go2/cli/verify.py @@ -16,7 +16,10 @@ from __future__ import annotations +import base64 +import binascii from dataclasses import dataclass +import json import time from typing import Any, Protocol @@ -27,8 +30,20 @@ DEFAULT_VERIFY_TIMEOUT_S = 1.0 -class HttpGet(Protocol): - def __call__(self, url: str, *, timeout: float) -> requests.Response: ... +class HttpResponse(Protocol): + status_code: int + reason: str | None + text: str + + +class HttpPost(Protocol): + def __call__( + self, + url: str, + *, + timeout: float, + allow_redirects: bool, + ) -> HttpResponse: ... @dataclass(frozen=True) @@ -60,6 +75,12 @@ def summary_lines(self) -> tuple[str, ...]: if self.ok: return (f"Go2 WebRTC endpoint reachable at {self.url} (HTTP {self.status_code}).",) if self.status_code is not None: + if self.error: + return ( + f"Go2 WebRTC endpoint responded at {self.url} " + f"with HTTP {self.status_code} {self.reason or ''}: {self.error}".rstrip() + + ".", + ) return ( f"Go2 WebRTC endpoint responded at {self.url} " f"with HTTP {self.status_code} {self.reason or ''}".rstrip() @@ -73,22 +94,37 @@ def con_notify_url(robot_ip: str) -> str: return f"http://{robot_ip.strip()}:{CON_NOTIFY_PORT}{CON_NOTIFY_PATH}" +def _con_notify_payload_error(text: str) -> str | None: + try: + decoded = base64.b64decode(text, validate=True).decode("utf-8") + payload = json.loads(decoded) + except (binascii.Error, UnicodeDecodeError, json.JSONDecodeError): + return "unexpected con_notify response payload" + + if not isinstance(payload, dict): + return "unexpected con_notify response payload" + data1 = payload.get("data1") + if not isinstance(data1, str) or not data1: + return "unexpected con_notify response payload" + return None + + def verify_robot_ip( robot_ip: str, *, timeout_s: float = DEFAULT_VERIFY_TIMEOUT_S, - http_get: HttpGet | None = None, + http_post: HttpPost | None = None, ) -> Go2VerifyStatus: """Verify that a Go2 is reachable on LAN without sending movement commands. - This only performs an HTTP GET probe against the robot's WebRTC signaling + This only performs an HTTP POST probe against the robot's WebRTC signaling endpoint. It does not open a robot control session or publish commands. """ url = con_notify_url(robot_ip) - get = http_get or requests.get + post = http_post or requests.post started = time.monotonic() try: - response = get(url, timeout=timeout_s) + response = post(url, timeout=timeout_s, allow_redirects=False) except requests.RequestException as e: return Go2VerifyStatus( robot_ip=robot_ip, @@ -102,13 +138,16 @@ def verify_robot_ip( ) status_code = response.status_code + error = None + if 200 <= status_code < 300: + error = _con_notify_payload_error(response.text) return Go2VerifyStatus( robot_ip=robot_ip, url=url, - ok=200 <= status_code < 400, + ok=200 <= status_code < 300 and error is None, status_code=status_code, reason=response.reason or None, - error=None, + error=error, elapsed_s=time.monotonic() - started, timeout_s=timeout_s, ) diff --git a/docs/coding-agents/go2-wifi-provisioning.md b/docs/coding-agents/go2-wifi-provisioning.md index fe50c65518..68b7d6e130 100644 --- a/docs/coding-agents/go2-wifi-provisioning.md +++ b/docs/coding-agents/go2-wifi-provisioning.md @@ -4,19 +4,16 @@ Use this runbook when a Go2 needs to move from its AP/BLE setup state onto a site Wi-Fi network, or when `dimos go2tool connect-wifi` behaves differently on macOS than it does on Linux. -## Why The Last Run Took Time +## macOS Bluetooth Authorization -The slow part was not the Go2 provisioning protocol. It was macOS Bluetooth -permission handling. +`go2tool` uses Bleak over Bluetooth. On macOS, direct Python execution from some +terminal environments can fail before Bluetooth authorization is usable. The +default `auto` BLE backend uses a LaunchServices-opened helper `.app` for finite +scans and Wi-Fi provisioning so macOS TCC can authorize Bluetooth access against +the app bundle and its `Info.plist` usage keys. -`go2tool` uses Bleak over Bluetooth. On macOS, direct Python execution from a -terminal app such as Terminal or Warp can crash or fail before the Bluetooth -permission flow is usable. A LaunchServices-opened `.app` bundle worked because -macOS TCC could authorize Bluetooth access against the app bundle and its -`Info.plist` usage keys. - -If this happens again, treat it as a macOS Bluetooth/TCC issue first, not as a -robot firmware issue. +If direct BLE fails only on macOS, treat Bluetooth/TCC authorization as a likely +cause before assuming a robot firmware issue. ## Normal Flow @@ -30,12 +27,10 @@ If the robot is still in AP/BLE setup mode, provision it onto the target Wi-Fi. Do not pass the Wi-Fi password in process args. Omit `--password`; the CLI will prompt with hidden input. -Public-safe example: move the robot/AP label `dimair09` onto SSID -`dimensional_5G`. The AP label is not necessarily the BLE name, so use -discovery to select the robot interactively: +Example: ```sh skip -dimos go2tool connect-wifi --ssid dimensional_5G +dimos go2tool connect-wifi --ssid ``` If more than one robot appears, select the intended robot at the prompt or use @@ -43,17 +38,18 @@ If more than one robot appears, select the intended robot at the prompt or use ## macOS Helper Requirements -When direct terminal execution crashes on macOS, run the provisioning helper as -a LaunchServices-opened `.app`, for example with `open`, and include these -Bluetooth usage keys in `Info.plist`: +On macOS, `--ble-backend auto` builds and validates a cached helper `.app` when +needed. To use a custom helper bundle, pass `--ble-helper ` or set +`DIMOS_GO2_BLE_HELPER`. The helper bundle must include these Bluetooth usage +keys in `Info.plist`: - `NSBluetoothAlwaysUsageDescription` - `NSBluetoothPeripheralUsageDescription` - `NSBluetoothUsageDescription` -Keep secrets out of process args. Pass the SSID and robot selector through -non-secret configuration, then prompt for the Wi-Fi password inside the helper -or read it from a protected secret source. +The helper passes the Wi-Fi password through a mode-`0600` temporary file, not +argv. Keep using the CLI's hidden password prompt unless automation requires a +different secret source. ## Post-Provision Checks @@ -66,7 +62,7 @@ dimos go2tool discover Look for a LAN row with the robot IP. Probe the Go2 WebRTC notification endpoint: ```sh skip -curl -fsS http://:9991/con_notify +curl -fsS -X POST http://:9991/con_notify ``` Then run DimOS with the discovered IP: diff --git a/docs/platforms/quadruped/go2/index.md b/docs/platforms/quadruped/go2/index.md index 144f1f4cfc..a7a8e2333b 100644 --- a/docs/platforms/quadruped/go2/index.md +++ b/docs/platforms/quadruped/go2/index.md @@ -1,6 +1,6 @@ -# Unitree Go2 - Getting Started +# Unitree Go2 — Getting Started -The Unitree Go2 is DimOS's primary reference platform. Full autonomous navigation, mapping, and agentic control - no ROS required. +The Unitree Go2 is DimOS's primary reference platform. Full autonomous navigation, mapping, and agentic control — no ROS required. ## Requirements @@ -23,7 +23,7 @@ source .venv/bin/activate uv pip install 'dimos[base,unitree]' ``` -## Try It - No Hardware Needed +## Try It — No Hardware Needed ```bash # Replay a recorded Go2 navigation session @@ -31,22 +31,22 @@ uv pip install 'dimos[base,unitree]' dimos --replay run unitree-go2 ``` -Opens the command center at [localhost:7779](http://localhost:7779) with Rerun 3D visualization - watch the Go2 map and navigate an office in real time. +Opens the command center at [localhost:7779](http://localhost:7779) with Rerun 3D visualization — watch the Go2 map and navigate an office in real time. ## Run on Your Go2 -### First-time setup, connecting to wifi, finding robot IP +### First-time setup, connecting to Wi-Fi, finding robot IP -Use `dimos go2tool` to provision wifi and find the robot's IP. Skip if the robot is already on your network and you know its IP. +Use `dimos go2tool` to provision Wi-Fi and find the robot's IP. Skip if the robot is already on your network and you know its IP. -macOS caveat: BLE provisioning can crash when Bleak/Python is run directly from -Terminal or Warp because of Bluetooth permission handling. If that happens, use -a LaunchServices-opened `.app` helper with Bluetooth usage keys. See -[Go2 Wi-Fi Provisioning](/docs/coding-agents/go2-wifi-provisioning.md). +macOS caveat: BLE provisioning can fail when direct Python execution cannot get +usable Bluetooth authorization from the terminal. The default `auto` BLE backend +uses a LaunchServices-opened helper for finite scans and provisioning on macOS. +See [Go2 Wi-Fi Provisioning](/docs/coding-agents/go2-wifi-provisioning.md). -1. Power on the Go2 - it advertises over BLE immediately. +1. Power on the Go2 — it advertises over BLE immediately. -2. Provision wifi (one-time per network): +2. Provision Wi-Fi (one-time per network): optionally use discover to make sure robot is detected @@ -54,7 +54,7 @@ optionally use discover to make sure robot is detected dimos go2tool discover ``` -configure wifi +configure Wi-Fi ```bash dimos go2tool connect-wifi --ssid @@ -96,9 +96,9 @@ That's it. DimOS connects via WebRTC (no jailbreak required), starts the full na | Module | What It Does | |--------|-------------| -| **GO2Connection** | WebRTC connection to the robot - streams LiDAR, video, odometry | +| **GO2Connection** | WebRTC connection to the robot — streams LiDAR, video, odometry | | **VoxelGridMapper** | Builds a 3D voxel map using column-carving (CUDA accelerated) | -| **CostMapper** | Converts 3D map to 2D costmap via terrain slope analysis | +| **CostMapper** | Converts 3D map → 2D costmap via terrain slope analysis | | **ReplanningAStarPlanner** | Continuous A* path planning with dynamic replanning | | **WavefrontFrontierExplorer** | Autonomous exploration of unmapped areas | | **RerunBridge** | 3D visualization in browser | @@ -118,7 +118,7 @@ uv pip install 'dimos[base,unitree,sim]' dimos --simulation run unitree-go2 ``` -Full navigation stack in MuJoCo - same code, simulated robot. +Full navigation stack in MuJoCo — same code, simulated robot. ## Agentic Control @@ -137,7 +137,7 @@ humancli > explore the space ``` -The agent subscribes to camera, LiDAR, and spatial memory streams - it sees what the robot sees. +The agent subscribes to camera, LiDAR, and spatial memory streams — it sees what the robot sees. ## Available Blueprints @@ -153,8 +153,8 @@ The agent subscribes to camera, LiDAR, and spatial memory streams - it sees what ## Deep Dive -- [Navigation Stack](/docs/capabilities/navigation/native/index.md) - column-carving voxel mapping, costmap generation, A* planning -- [Visualization](/docs/usage/visualization.md) - Rerun, performance tuning -- [Data Streams](/docs/usage/data_streams) - RxPY streams, backpressure, quality filtering -- [Transports](/docs/usage/transports/index.md) - LCM, SHM, DDS -- [Blueprints](/docs/usage/blueprints.md) - composing modules +- [Navigation Stack](/docs/capabilities/navigation/native/index.md) — column-carving voxel mapping, costmap generation, A* planning +- [Visualization](/docs/usage/visualization.md) — Rerun, performance tuning +- [Data Streams](/docs/usage/data_streams) — RxPY streams, backpressure, quality filtering +- [Transports](/docs/usage/transports/index.md) — LCM, SHM, DDS +- [Blueprints](/docs/usage/blueprints.md) — composing modules diff --git a/docs/usage/cli.md b/docs/usage/cli.md index 73c7b2603e..5d1b4a6e2d 100644 --- a/docs/usage/cli.md +++ b/docs/usage/cli.md @@ -209,7 +209,7 @@ dimos go2tool verify --robot-ip | `discover` | Show Go2 robots discovered over BLE and LAN. | | `connect-wifi` | Send Wi-Fi credentials to a Go2 over BLE. Omit `--password` to use the hidden prompt. | | `setup` | Provision Wi-Fi, rediscover the robot on LAN, and verify `:9991/con_notify`. | -| `verify` | Probe `http://:9991/con_notify` without sending movement commands. | +| `verify` | POST to `http://:9991/con_notify` without sending movement commands. | BLE commands support `--ble-backend {auto,helper,direct}` and `--ble-helper`. On macOS, `auto` uses the LaunchServices helper for finite BLE scans and From af77df583390103488f46453b8ac69d4c40325ad Mon Sep 17 00:00:00 2001 From: Nick Date: Mon, 8 Jun 2026 21:14:02 +0800 Subject: [PATCH 3/4] fix: address Go2 wifi PR review polish --- dimos/robot/unitree/go2/cli/go2tool.py | 18 +++++++++--------- .../robot/unitree/go2/cli/macos_ble_helper.py | 4 ++-- dimos/robot/unitree/go2/cli/setup.py | 12 ++++++------ .../unitree/go2/cli/test_macos_ble_helper.py | 4 ++-- docs/coding-agents/go2-wifi-provisioning.md | 14 +++++++------- docs/coding-agents/index.md | 14 +++++++++----- docs/platforms/quadruped/go2/index.md | 12 ++++++------ docs/usage/cli.md | 4 ++-- 8 files changed, 43 insertions(+), 39 deletions(-) diff --git a/dimos/robot/unitree/go2/cli/go2tool.py b/dimos/robot/unitree/go2/cli/go2tool.py index 349d4de14d..21e730eb8f 100644 --- a/dimos/robot/unitree/go2/cli/go2tool.py +++ b/dimos/robot/unitree/go2/cli/go2tool.py @@ -322,8 +322,8 @@ def _stop() -> None: @app.command("connect-wifi") def connect_wifi( - ssid: str | None = typer.Option(None, "--ssid", help="Wi-Fi SSID"), - password: str | None = typer.Option(None, "--password", help="Wi-Fi password"), + ssid: str | None = typer.Option(None, "--ssid", help="wifi SSID"), + password: str | None = typer.Option(None, "--password", help="wifi password"), country: str = typer.Option("US", "--country", help="Two-letter country code"), mac: str | None = typer.Option(None, "--mac", help="BLE MAC (skip scan)"), serial: str | None = typer.Option( @@ -350,7 +350,7 @@ def connect_wifi( help="macOS BLE helper .app path.", ), ) -> None: - """Provision a Go2 with Wi-Fi credentials over Bluetooth. + """Provision a Go2 with wifi credentials over Bluetooth. Fully non-interactive when (--mac | --serial | --name) and --ssid/--password are all provided. @@ -406,11 +406,11 @@ def _on_device(d: Go2Device) -> None: raise typer.Exit(1) target = devices[idx - 1].address - wifi_ssid = ssid if ssid is not None else typer.prompt("Wi-Fi SSID") + wifi_ssid = ssid if ssid is not None else typer.prompt("wifi SSID") wifi_password = ( password if password is not None - else typer.prompt("Wi-Fi password", hide_input=True, default="", show_default=False) + else typer.prompt("wifi password", hide_input=True, default="", show_default=False) ) def _on_error(attempt: int, exc: BaseException) -> None: @@ -454,8 +454,8 @@ def verify( @app.command("setup") def setup( - ssid: str = typer.Option(..., "--ssid", help="Wi-Fi SSID"), - password: str | None = typer.Option(None, "--password", help="Wi-Fi password"), + ssid: str = typer.Option(..., "--ssid", help="wifi SSID"), + password: str | None = typer.Option(None, "--password", help="wifi password"), country: str = typer.Option("US", "--country", help="Two-letter country code"), serial: str | None = typer.Option(None, "--serial", help="Robot serial to select"), name: str | None = typer.Option(None, "--name", help="Robot BLE name to select"), @@ -494,7 +494,7 @@ def setup( help="macOS BLE helper .app path.", ), ) -> None: - """Provision Wi-Fi, rediscover the robot on LAN, and verify its IP.""" + """Provision wifi, rediscover the robot on LAN, and verify its IP.""" from dimos.robot.unitree.go2.cli.landiscovery import discover as discover_lan_once from dimos.robot.unitree.go2.cli.setup import LanRobot, setup_go2_wifi from dimos.robot.unitree.go2.cli.verify import verify_robot_ip @@ -508,7 +508,7 @@ def setup( wifi_password = ( password if password is not None - else typer.prompt("Wi-Fi password", hide_input=True, default="", show_default=False) + else typer.prompt("wifi password", hide_input=True, default="", show_default=False) ) secrets = (wifi_password,) diff --git a/dimos/robot/unitree/go2/cli/macos_ble_helper.py b/dimos/robot/unitree/go2/cli/macos_ble_helper.py index 39b09bf7b4..5d733ae92e 100644 --- a/dimos/robot/unitree/go2/cli/macos_ble_helper.py +++ b/dimos/robot/unitree/go2/cli/macos_ble_helper.py @@ -285,7 +285,7 @@ async def provision_wifi( helper_app_path: str | Path | None = None, on_progress: Callable[[str], None] | None = None, ) -> str | None: - """Provision Wi-Fi through the macOS LaunchServices BLE helper.""" + """Provision wifi through the macOS LaunchServices BLE helper.""" response = await _invoke_helper_async( "provision_wifi", { @@ -442,7 +442,7 @@ def _sanitize_text(text: str, secrets: Sequence[str]) -> str: safe = text for secret in secrets: if secret: - safe = safe.replace(secret, "") + safe = safe.replace(secret, "[REDACTED]") return safe diff --git a/dimos/robot/unitree/go2/cli/setup.py b/dimos/robot/unitree/go2/cli/setup.py index 1229654363..3163f9ec8b 100644 --- a/dimos/robot/unitree/go2/cli/setup.py +++ b/dimos/robot/unitree/go2/cli/setup.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Guided Go2 Wi-Fi setup orchestration without Typer wiring.""" +"""Guided Go2 wifi setup orchestration without Typer wiring.""" from __future__ import annotations @@ -289,9 +289,9 @@ async def setup_go2_wifi( rediscovery_attempts: int = 5, rediscovery_delay_s: float = 2.0, ) -> Go2WifiSetupResult: - """Provision Go2 Wi-Fi over BLE, rediscover it on LAN, then verify WebRTC. + """Provision Go2 wifi over BLE, rediscover it on LAN, then verify WebRTC. - The caller injects BLE discovery, Wi-Fi provisioning, and LAN discovery + The caller injects BLE discovery, wifi provisioning, and LAN discovery functions. This keeps Typer prompting and concrete backends outside the orchestration so it can be tested without hardware. """ @@ -345,8 +345,8 @@ def fail( country_code, ) except Exception as e: - return fail("provision_wifi", f"Wi-Fi provisioning failed: {e}", selected_ble=selected_ble) - add_step("provision_wifi", True, "Provisioned Wi-Fi credentials over BLE.") + return fail("provision_wifi", f"wifi provisioning failed: {e}", selected_ble=selected_ble) + add_step("provision_wifi", True, "Provisioned wifi credentials over BLE.") target_serial = provisioned_serial or selected_ble.serial lan_robot: Go2LanSelection | None = None @@ -369,7 +369,7 @@ def fail( if lan_robot is None: return fail( "lan_discovery", - lan_error or "No LAN robot found after Wi-Fi provisioning.", + lan_error or "No LAN robot found after wifi provisioning.", selected_ble=selected_ble, ) add_step("lan_discovery", True, f"Found LAN robot {lan_robot.serial} at {lan_robot.ip}.") diff --git a/dimos/robot/unitree/go2/cli/test_macos_ble_helper.py b/dimos/robot/unitree/go2/cli/test_macos_ble_helper.py index e8877788c4..7b84cd2a67 100644 --- a/dimos/robot/unitree/go2/cli/test_macos_ble_helper.py +++ b/dimos/robot/unitree/go2/cli/test_macos_ble_helper.py @@ -281,7 +281,7 @@ def fake_run( assert serial == "SER123" assert captured_cmds assert all(password not in item for cmd in captured_cmds for item in cmd) - assert progress == ["unsafe progress contains "] + assert progress == ["unsafe progress contains [REDACTED]"] def test_helper_progress_writer_redacts_password(tmp_path: Path) -> None: @@ -296,5 +296,5 @@ def test_helper_progress_writer_redacts_password(tmp_path: Path) -> None: text = progress_path.read_text(encoding="utf-8") assert password not in text - assert "" in text + assert "[REDACTED]" in text assert _mode(progress_path) == 0o600 diff --git a/docs/coding-agents/go2-wifi-provisioning.md b/docs/coding-agents/go2-wifi-provisioning.md index 68b7d6e130..f2af3da133 100644 --- a/docs/coding-agents/go2-wifi-provisioning.md +++ b/docs/coding-agents/go2-wifi-provisioning.md @@ -1,7 +1,7 @@ -# Go2 Wi-Fi Provisioning +# Go2 wifi provisioning Use this runbook when a Go2 needs to move from its AP/BLE setup state onto a -site Wi-Fi network, or when `dimos go2tool connect-wifi` behaves differently on +site wifi network, or when `dimos go2tool connect-wifi` behaves differently on macOS than it does on Linux. ## macOS Bluetooth Authorization @@ -9,7 +9,7 @@ macOS than it does on Linux. `go2tool` uses Bleak over Bluetooth. On macOS, direct Python execution from some terminal environments can fail before Bluetooth authorization is usable. The default `auto` BLE backend uses a LaunchServices-opened helper `.app` for finite -scans and Wi-Fi provisioning so macOS TCC can authorize Bluetooth access against +scans and wifi provisioning so macOS TCC can authorize Bluetooth access against the app bundle and its `Info.plist` usage keys. If direct BLE fails only on macOS, treat Bluetooth/TCC authorization as a likely @@ -23,8 +23,8 @@ Start with discovery. This shows robots seen over BLE and LAN: dimos go2tool discover ``` -If the robot is still in AP/BLE setup mode, provision it onto the target Wi-Fi. -Do not pass the Wi-Fi password in process args. Omit `--password`; the CLI will +If the robot is still in AP/BLE setup mode, provision it onto the target wifi. +Do not pass the wifi password in process args. Omit `--password`; the CLI will prompt with hidden input. Example: @@ -47,7 +47,7 @@ keys in `Info.plist`: - `NSBluetoothPeripheralUsageDescription` - `NSBluetoothUsageDescription` -The helper passes the Wi-Fi password through a mode-`0600` temporary file, not +The helper passes the wifi password through a mode-`0600` temporary file, not argv. Keep using the CLI's hidden password prompt unless automation requires a different secret source. @@ -77,4 +77,4 @@ For agentic stacks, use the same `--robot-ip` flag with the target blueprint. Verification is discovery, endpoint probing, and process startup only. Do not send movement commands, teleop input, MCP skill calls, or `agent-send` messages -during Wi-Fi provisioning verification. +during wifi provisioning verification. diff --git a/docs/coding-agents/index.md b/docs/coding-agents/index.md index 4960d6d520..2ddb6ba453 100644 --- a/docs/coding-agents/index.md +++ b/docs/coding-agents/index.md @@ -1,7 +1,11 @@ # For Agents -- [Worktrees](/docs/coding-agents/worktrees.md): creating provisioned worktrees with `bin/worktree` -- [Style](/docs/coding-agents/style.md): code style guidelines for DimOS -- [Testing](/docs/coding-agents/testing.md): docs about writing tests -- [Go2 Wi-Fi Provisioning](/docs/coding-agents/go2-wifi-provisioning.md): AP-to-Wi-Fi setup notes for coding agents -- [Docs](/docs/coding-agents/docs/index.md): docs about writing docs +├── worktrees.md (creating provisioned worktrees with `bin/worktree`) +├── style.md (code style guidelines for dimos) +├── testing.md (docs about writing tests) +├── go2-wifi-provisioning.md (Go2 AP-to-wifi setup notes for coding agents) +├── docs (these are docs about writing docs) +│   ├── codeblocks.md +│   ├── doclinks.md +│   └── index.md +└── index.md diff --git a/docs/platforms/quadruped/go2/index.md b/docs/platforms/quadruped/go2/index.md index a7a8e2333b..71134fa354 100644 --- a/docs/platforms/quadruped/go2/index.md +++ b/docs/platforms/quadruped/go2/index.md @@ -35,18 +35,18 @@ Opens the command center at [localhost:7779](http://localhost:7779) with Rerun 3 ## Run on Your Go2 -### First-time setup, connecting to Wi-Fi, finding robot IP +### First-time setup, connecting to wifi, finding robot IP -Use `dimos go2tool` to provision Wi-Fi and find the robot's IP. Skip if the robot is already on your network and you know its IP. +Use `dimos go2tool` to provision wifi and find the robot's IP. Skip if the robot is already on your network and you know its IP. macOS caveat: BLE provisioning can fail when direct Python execution cannot get usable Bluetooth authorization from the terminal. The default `auto` BLE backend uses a LaunchServices-opened helper for finite scans and provisioning on macOS. -See [Go2 Wi-Fi Provisioning](/docs/coding-agents/go2-wifi-provisioning.md). +See [Go2 wifi provisioning](/docs/coding-agents/go2-wifi-provisioning.md). 1. Power on the Go2 — it advertises over BLE immediately. -2. Provision Wi-Fi (one-time per network): +2. Provision wifi (one-time per network): optionally use discover to make sure robot is detected @@ -54,13 +54,13 @@ optionally use discover to make sure robot is detected dimos go2tool discover ``` -configure Wi-Fi +configure wifi ```bash dimos go2tool connect-wifi --ssid ``` -Scans BLE and connects to the only robot it finds, or prompts you to pick if there are several. The password prompt uses hidden input; avoid passing Wi-Fi passwords in process args. +Scans BLE and connects to the only robot it finds, or prompts you to pick if there are several. The password prompt uses hidden input; avoid passing wifi passwords in process args. 3. Find the robot's IP: diff --git a/docs/usage/cli.md b/docs/usage/cli.md index 5d1b4a6e2d..e296ecbc94 100644 --- a/docs/usage/cli.md +++ b/docs/usage/cli.md @@ -207,8 +207,8 @@ dimos go2tool verify --robot-ip | Command | Description | |---------|-------------| | `discover` | Show Go2 robots discovered over BLE and LAN. | -| `connect-wifi` | Send Wi-Fi credentials to a Go2 over BLE. Omit `--password` to use the hidden prompt. | -| `setup` | Provision Wi-Fi, rediscover the robot on LAN, and verify `:9991/con_notify`. | +| `connect-wifi` | Send wifi credentials to a Go2 over BLE. Omit `--password` to use the hidden prompt. | +| `setup` | Provision wifi, rediscover the robot on LAN, and verify `:9991/con_notify`. | | `verify` | POST to `http://:9991/con_notify` without sending movement commands. | BLE commands support `--ble-backend {auto,helper,direct}` and `--ble-helper`. From d5bfdf7d6e9ed4d3b678857f5e88107fc45f13c4 Mon Sep 17 00:00:00 2001 From: Nick Date: Tue, 9 Jun 2026 13:22:49 +0800 Subject: [PATCH 4/4] docs: clarify Go2 wifi setup options --- docs/coding-agents/go2-wifi-provisioning.md | 26 ++++++++++++++------- docs/platforms/quadruped/go2/index.md | 18 +++++++++++--- docs/usage/cli.md | 24 +++++++++++++++++++ 3 files changed, 57 insertions(+), 11 deletions(-) diff --git a/docs/coding-agents/go2-wifi-provisioning.md b/docs/coding-agents/go2-wifi-provisioning.md index f2af3da133..61a93750ae 100644 --- a/docs/coding-agents/go2-wifi-provisioning.md +++ b/docs/coding-agents/go2-wifi-provisioning.md @@ -23,18 +23,28 @@ Start with discovery. This shows robots seen over BLE and LAN: dimos go2tool discover ``` -If the robot is still in AP/BLE setup mode, provision it onto the target wifi. -Do not pass the wifi password in process args. Omit `--password`; the CLI will -prompt with hidden input. +If the robot is still in AP/BLE setup mode, run setup to provision wifi, +rediscover the robot on LAN, and print the IP to use with DimOS. For +interactive use, omit `--password`; the CLI will prompt with hidden input. +For automation, `--password ` is supported. Example: ```sh skip -dimos go2tool connect-wifi --ssid +dimos go2tool setup --ssid ``` -If more than one robot appears, select the intended robot at the prompt or use -`--name`, `--serial`, or `--mac` from the discovery output. +If more than one robot may be visible, use one selector from the discovery +output so setup does not choose the wrong robot: + +```sh skip +dimos go2tool setup --ssid --serial +dimos go2tool setup --ssid --name +dimos go2tool setup --ssid --mac +``` + +Use `connect-wifi` instead of `setup` only when you want the lower-level BLE +provisioning step without LAN rediscovery and verification. ## macOS Helper Requirements @@ -48,8 +58,8 @@ keys in `Info.plist`: - `NSBluetoothUsageDescription` The helper passes the wifi password through a mode-`0600` temporary file, not -argv. Keep using the CLI's hidden password prompt unless automation requires a -different secret source. +argv. Prefer the CLI's hidden password prompt for interactive use; use +`--password` when a script or test needs a fully non-interactive command. ## Post-Provision Checks diff --git a/docs/platforms/quadruped/go2/index.md b/docs/platforms/quadruped/go2/index.md index 71134fa354..e4a00975ab 100644 --- a/docs/platforms/quadruped/go2/index.md +++ b/docs/platforms/quadruped/go2/index.md @@ -57,10 +57,20 @@ dimos go2tool discover configure wifi ```bash -dimos go2tool connect-wifi --ssid +dimos go2tool setup --ssid ``` -Scans BLE and connects to the only robot it finds, or prompts you to pick if there are several. The password prompt uses hidden input; avoid passing wifi passwords in process args. +For interactive use, omit `--password`; the password prompt uses hidden input. +For automation, `--password ` is supported. `setup` configures wifi, +finds the robot on LAN, and prints the IP to use with DimOS. + +If several Go2s may be nearby, pass one selector from `discover` output: + +```bash +dimos go2tool setup --ssid --serial +dimos go2tool setup --ssid --name +dimos go2tool setup --ssid --mac +``` 3. Find the robot's IP: @@ -68,7 +78,9 @@ Scans BLE and connects to the only robot it finds, or prompts you to pick if the dimos go2tool discover ``` -Prints `SOURCE NAME IP MAC SERIAL` for every robot it sees over BLE and LAN. Export the IP: +Prints `SOURCE NAME IP MAC SERIAL` for every robot it sees over BLE and LAN. +If you used `setup`, this IP is also printed as `export ROBOT_IP=`. Export +the IP: ```bash export ROBOT_IP= diff --git a/docs/usage/cli.md b/docs/usage/cli.md index e296ecbc94..aee9fdd38a 100644 --- a/docs/usage/cli.md +++ b/docs/usage/cli.md @@ -201,6 +201,7 @@ Provision and verify Unitree Go2 network setup. dimos go2tool discover dimos go2tool connect-wifi --ssid dimos go2tool setup --ssid +dimos go2tool setup --ssid --serial dimos go2tool verify --robot-ip ``` @@ -211,6 +212,29 @@ dimos go2tool verify --robot-ip | `setup` | Provision wifi, rediscover the robot on LAN, and verify `:9991/con_notify`. | | `verify` | POST to `http://:9991/con_notify` without sending movement commands. | +`setup` options: + +| Option | Description | +|--------|-------------| +| `--ssid ` | Target wifi SSID. Required. | +| `--password ` | Target wifi password. If omitted, prompts with hidden input. | +| `--country ` | Two-letter country code. Default: `US`. | +| `--serial ` | Select a specific robot by serial. | +| `--name ` | Select a specific robot by BLE name, for example `Go2_49073`. | +| `--mac ` | Select/provision a specific BLE address and skip BLE scan. | +| `--timeout ` | BLE scan/connect timeout. Default: `5.0`. | +| `--retries ` | Provisioning attempts. Default: `3`. | +| `--lan-timeout ` | Timeout for each LAN discovery attempt. Default: `2.0`. | +| `--rediscovery-attempts ` | LAN rediscovery attempts after provisioning. Default: `5`. | +| `--rediscovery-delay ` | Delay between LAN rediscovery attempts. Default: `2.0`. | +| `--verify-timeout ` | HTTP verification timeout. Default: `1.0`. | +| `--ble-backend {auto,helper,direct}` | BLE backend. Default: `auto`. | +| `--ble-helper ` | macOS BLE helper `.app` path. Can also use `DIMOS_GO2_BLE_HELPER`. | + +When several Go2s may be visible, pass exactly one selector to `connect-wifi` +or `setup`: `--serial `, `--name `, or `--mac `. +Use values from `dimos go2tool discover`. + BLE commands support `--ble-backend {auto,helper,direct}` and `--ble-helper`. On macOS, `auto` uses the LaunchServices helper for finite BLE scans and provisioning; set `DIMOS_GO2_BLE_HELPER` to point at a custom helper `.app`.