Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 26 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,20 +105,40 @@ Tested on real hardware with CRS112-8P-4S:
| IVGHP203Y-AF | hi3516cv300 | `/dev/uart-IVGHP203Y-AF` |
| IVG85HG50PYA-S | hi3516ev300 | `/dev/uart-IVG85HG50PYA-S` |

## Flash Agent (High-Speed Flash Dump)
## Flash Agent (High-Speed Recovery)

Defib includes a bare-metal flash agent that runs directly on the SoC,
replacing U-Boot in the boot chain. It communicates over a COBS binary
protocol at 921600 baud — ~5x faster than U-Boot's `md.b` hex dump.
protocol at 921600 baud for high-speed flash operations.

### One-Command Firmware Install

Flash a complete firmware image via UART in a single command:

```bash
defib agent flash -c hi3516ev300 -i firmware.bin -p /dev/ttyUSB0
```

Power-cycle the camera when prompted. The command handles everything:
1. Uploads the bare-metal agent via boot protocol
2. Switches to 921600 baud for high-speed transfer
3. Streams firmware directly to flash (skips 0xFF sectors)
4. Verifies CRC32 of the written data
5. Reboots the device

Typical 8MB OpenIPC firmware on 16MB flash: **~2 minutes** total (upload +
flash + verify + boot). No network required — just a USB-serial adapter.

### Other Agent Commands

```bash
# 1. Upload the agent (power-cycle the camera when prompted)
# Upload agent only (for manual operations)
defib agent upload -c hi3516ev300 -p /dev/ttyUSB0

# 2. Dump the entire flash (address and size auto-detected)
# Dump the entire flash (address and size auto-detected)
defib agent read -p /dev/ttyUSB0 -o flash_dump.bin

# Query device info (flash size, RAM base, JEDEC ID)
# Query device info (flash size, RAM base, JEDEC ID, agent version)
defib agent info -p /dev/ttyUSB0

# Write data back to flash
Expand All @@ -130,7 +150,7 @@ defib agent scan -p /dev/ttyUSB0

Address defaults to flash base (`0x14000000`) and size is auto-detected
from the device. Override with `-a` and `-s` if needed. Use `--no-verify`
to skip the CRC32 check, or `--output-mode json` for automation. See
to skip the CRC32 check, or `--output json` for automation. See
[agent/README.md](agent/README.md) for protocol details and supported chips.

## Testing with QEMU
Expand Down
220 changes: 220 additions & 0 deletions src/defib/cli/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -889,6 +889,226 @@ def on_progress(e: ProgressEvent) -> None:
await transport.close()


@agent_app.command("flash")
def agent_flash(
chip: str = typer.Option(..., "-c", "--chip", help="Chip model name"),
input_file: str = typer.Option(..., "-i", "--input", help="Firmware binary file"),
port: str = typer.Option("/dev/ttyUSB0", "-p", "--port", help="Serial port"),
verify: bool = typer.Option(True, "--verify/--no-verify", help="CRC32 verify after write"),
reboot: bool = typer.Option(True, "--reboot/--no-reboot", help="Reboot after flash"),
output: str = typer.Option("human", "--output", help="Output mode: human, json"),
) -> None:
"""Flash firmware in one step: upload agent, write flash, verify, reboot.

Power-cycle the camera when prompted. The command handles everything:
boot protocol upload, high-speed UART, streaming flash write with 0xFF
sector skip, CRC32 verification, and device reboot.
"""
import asyncio
asyncio.run(_agent_flash_async(chip, input_file, port, verify, reboot, output))


async def _agent_flash_async(
chip: str, input_file: str, port: str,
verify: bool, reboot_device: bool, output: str,
) -> None:
import json as json_mod
import time
import zlib
from pathlib import Path

from rich.console import Console

from defib.agent.client import FlashAgentClient, get_agent_binary
from defib.firmware import get_cached_path
from defib.profiles.loader import load_profile
from defib.protocol.hisilicon_standard import HiSiliconStandard
from defib.recovery.events import ProgressEvent
from defib.transport.serial import SerialTransport

console = Console()
FLASH_MEM = 0x14000000

# --- Load firmware file ---
fw_path = Path(input_file)
if not fw_path.exists():
msg = f"Firmware file not found: {input_file}"
if output == "json":
print(json_mod.dumps({"event": "error", "message": msg}))
else:
console.print(f"[red]{msg}[/red]")
raise typer.Exit(1)

firmware = fw_path.read_bytes()
fw_crc = zlib.crc32(firmware) & 0xFFFFFFFF

# --- Find agent binary ---
agent_path = get_agent_binary(chip)
if not agent_path:
msg = f"No agent binary for '{chip}'"
if output == "json":
print(json_mod.dumps({"event": "error", "message": msg}))
else:
console.print(f"[red]{msg}[/red]")
raise typer.Exit(1)

agent_data = agent_path.read_bytes()

# --- Get SPL from cached U-Boot ---
profile = load_profile(chip)
cached_fw = get_cached_path(chip)
if not cached_fw:
from defib.firmware import download_firmware
if output == "human":
console.print("Downloading U-Boot for SPL...")
cached_fw = download_firmware(chip)
spl_data = cached_fw.read_bytes()[:profile.spl_max_size]

if output == "human":
console.print(f"Firmware: [cyan]{fw_path.name}[/cyan] ({len(firmware)} bytes, CRC {fw_crc:#010x})")
console.print(f"Agent: [cyan]{agent_path.name}[/cyan] ({len(agent_data)} bytes)")
console.print("\n[yellow]Power-cycle the camera now![/yellow]\n")

# --- Phase 1: Upload agent via boot protocol ---
transport = await SerialTransport.create(port)
protocol = HiSiliconStandard()
protocol.set_profile(profile)

def on_boot_progress(e: ProgressEvent) -> None:
if e.message:
if output == "human":
console.print(f" {e.message}")
elif output == "json":
print(json_mod.dumps({"event": "boot", "message": e.message}), flush=True)

hs = await protocol.handshake(transport, on_boot_progress)
if not hs.success:
msg = "Handshake failed"
if output == "json":
print(json_mod.dumps({"event": "error", "message": msg}))
else:
console.print(f"[red]{msg}[/red]")
await transport.close()
raise typer.Exit(1)

result = await protocol.send_firmware(
transport, agent_data, on_boot_progress, spl_override=spl_data,
payload_label="Agent",
)
if not result.success:
msg = result.error or "Upload failed"
if output == "json":
print(json_mod.dumps({"event": "error", "message": msg}))
else:
console.print(f"[red]Upload failed:[/red] {msg}")
await transport.close()
raise typer.Exit(1)

# --- Phase 2: Connect to agent ---
import asyncio as aio
await transport.close()
await aio.sleep(2)
transport = await SerialTransport.create(port)

client = FlashAgentClient(transport, chip)
if not await client.connect(timeout=10.0):
msg = "Agent not responding"
if output == "json":
print(json_mod.dumps({"event": "error", "message": msg}))
else:
console.print(f"[red]{msg}[/red]")
await transport.close()
raise typer.Exit(1)

info = await client.get_info()
flash_size = int(info.get("flash_size", 0))

if output == "human":
console.print(f"[green]Agent ready![/green] Flash: {flash_size // 1024}KB")

if flash_size > 0 and len(firmware) > flash_size:
msg = f"Firmware ({len(firmware)} bytes) exceeds flash size ({flash_size} bytes)"
if output == "json":
print(json_mod.dumps({"event": "error", "message": msg}))
else:
console.print(f"[red]{msg}[/red]")
await transport.close()
raise typer.Exit(1)

# --- Phase 3: Flash firmware ---
if output == "human":
console.print(f"Flashing {len(firmware)} bytes...")

t0 = time.monotonic()
last_pct = [0]

def on_flash_progress(done: int, total: int) -> None:
pct = done * 100 // total if total > 0 else 0
if pct >= last_pct[0] + 5:
elapsed = time.monotonic() - t0
speed = done / elapsed if elapsed > 0 else 0
if output == "human":
print(f"\r {pct}% ({done // 1024}KB / {total // 1024}KB) "
f"{speed:.0f} B/s", end="", flush=True)
elif output == "json":
print(json_mod.dumps({"event": "flash", "pct": pct, "speed": int(speed)}),
flush=True)
last_pct[0] = pct

ok = await client.write_flash(0, firmware, on_progress=on_flash_progress)
elapsed = time.monotonic() - t0

if output == "human":
print() # newline after progress
if not ok:
msg = f"Flash write failed after {elapsed:.1f}s"
if output == "json":
print(json_mod.dumps({"event": "error", "message": msg}))
else:
console.print(f"[red]{msg}[/red]")
await transport.close()
raise typer.Exit(1)

speed = len(firmware) / elapsed if elapsed > 0 else 0
if output == "human":
console.print(f" Written in {elapsed:.1f}s ({speed:.0f} B/s)")

# --- Phase 4: Verify ---
if verify:
if output == "human":
console.print(" Verifying CRC32...")
dev_crc = await client.crc32(FLASH_MEM, len(firmware))
match = dev_crc == fw_crc
if output == "human":
if match:
console.print(f" CRC32: [green]OK[/green] ({fw_crc:#010x})")
else:
console.print(f" CRC32: [red]MISMATCH[/red] (device {dev_crc:#010x} != {fw_crc:#010x})")
if not match:
await transport.close()
raise typer.Exit(1)

# --- Phase 5: Reboot ---
if reboot_device:
if output == "human":
console.print(" Rebooting...")
await client.reboot()

if output == "human":
console.print(f"\n[green bold]Done![/green bold] Firmware flashed in {elapsed:.0f}s")
elif output == "json":
print(json_mod.dumps({
"event": "done",
"bytes": len(firmware),
"elapsed": round(elapsed, 1),
"speed": int(speed),
"verified": verify,
"rebooted": reboot_device,
}))

await transport.close()


@agent_app.command("info")
def agent_info(
port: str = typer.Option("/dev/ttyUSB0", "-p", "--port", help="Serial port"),
Expand Down
Loading