Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 46 additions & 19 deletions src/defib/tui/screens/flash_doctor.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,9 +376,10 @@ def compose(self) -> ComposeResult:
id="doctor-banner",
)

chip_info = f" [bold]Chip:[/] {self._chip} " if self._chip else " "
yield Static(
f" [bold]Chip:[/] {self._chip} [bold]Port:[/] {self._port}\n"
" [bold yellow]Power-cycle the camera, then press Start[/]",
f"{chip_info}[bold]Port:[/] {self._port}\n"
" [dim]Connects to running agent, or uploads if needed[/]",
id="setup-panel",
)
yield Button(
Expand Down Expand Up @@ -463,28 +464,51 @@ def _upload_and_scan(self) -> None:
self.run_worker(self._do_upload_and_connect(), exclusive=True)

async def _do_upload_and_connect(self) -> None:
"""Upload agent via boot protocol, then connect and scan."""
"""Try connecting to running agent first, upload if needed."""
import asyncio as aio

from defib.agent.client import FlashAgentClient, get_agent_binary
from defib.firmware import get_cached_path, download_firmware, has_firmware
from defib.profiles.loader import load_profile
from defib.protocol.hisilicon_standard import HiSiliconStandard
from defib.recovery.events import ProgressEvent
from defib.transport.serial import SerialTransport

chip = self._chip
port = self._port

# Find agent binary
# Try connecting to a running agent first
try:
transport = await SerialTransport.create(port)
except Exception as e:
self._log(f"[red]Port error:[/] {e}")
return

self._log("Checking for running agent...")
client = FlashAgentClient(transport, chip)
if await client.connect(timeout=3.0):
info = await client.get_info()
if info:
self._log("[green]Agent already running![/]")
await self._connected(transport, client, info)
return

# No agent running — upload
await transport.close()
if not chip:
self._log("[red]No agent running and no chip selected — "
"go back and select a chip to upload[/]")
return

self._log("No agent found. Uploading...")

agent_path = get_agent_binary(chip)
if not agent_path:
self._log(f"[red]No agent binary for '{chip}'[/]")
return

agent_data = agent_path.read_bytes()

# Get SPL from cached U-Boot (download if needed)
from defib.firmware import get_cached_path, download_firmware, has_firmware
from defib.profiles.loader import load_profile
from defib.protocol.hisilicon_standard import HiSiliconStandard
from defib.recovery.events import ProgressEvent

try:
profile = load_profile(chip)
except Exception as e:
Expand All @@ -510,21 +534,16 @@ async def _do_upload_and_connect(self) -> None:
f"SPL: {len(spl_data)} bytes"
)

# Open serial port
try:
transport = await SerialTransport.create(port)
except Exception as e:
self._log(f"[red]Port error:[/] {e}")
return
transport = await SerialTransport.create(port)

# Handshake + upload via boot protocol
protocol = HiSiliconStandard()
protocol.set_profile(profile)

def on_progress(e: ProgressEvent) -> None:
if e.message:
self._log(f" {e.message}")

self._log("[yellow]Waiting for bootrom — power-cycle now![/]")
hs = await protocol.handshake(transport, on_progress)
if not hs.success:
self._log("[red]Handshake failed[/]")
Expand All @@ -541,7 +560,6 @@ def on_progress(e: ProgressEvent) -> None:

self._log("[green]Agent uploaded![/] Waiting for READY...")

# Reconnect and wait for agent
await transport.close()
await aio.sleep(2)
transport = await SerialTransport.create(port)
Expand All @@ -553,8 +571,17 @@ def on_progress(e: ProgressEvent) -> None:
return

info = await client.get_info()
await self._connected(transport, client, info)

async def _connected(
self, transport: object, client: object, info: dict, # type: ignore[type-arg]
) -> None:
"""Set up state after successful agent connection and start scan."""
from defib.agent.client import FlashAgentClient

c: FlashAgentClient = client # type: ignore[assignment]
self._transport = transport
self._client = client
self._client = c
self._flash_size = int(info.get("flash_size", 0))
self._sector_size = int(info.get("sector_size", 0x10000))
self._num_sectors = self._flash_size // self._sector_size if self._sector_size else 0
Expand Down
7 changes: 1 addition & 6 deletions src/defib/tui/screens/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,13 +299,8 @@ def _start_flash_doctor(self) -> None:
port_sel = self.query_one("#port-select", Select)
port = str(port_sel.value) if isinstance(port_sel.value, str) else ""

errors: list[str] = []
if not chip:
errors.append("Select a chip model")
if not port:
errors.append("Select a serial port")
if errors:
self.notify("\n".join(errors), severity="error", title="Flash Doctor")
self.notify("Select a serial port", severity="error", title="Flash Doctor")
return

from defib.tui.app import DefibApp
Expand Down
14 changes: 5 additions & 9 deletions tests/test_tui.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,16 +108,12 @@ async def test_flash_doctor_screen_renders(self):
assert screen.query_one("#connect-scan-btn") is not None

@pytest.mark.asyncio
async def test_flash_doctor_blocked_without_chip(self):
"""Flash Doctor button requires chip and port selection."""
from defib.tui.screens.main import MainScreen

async def test_flash_doctor_blocked_without_port(self):
"""Flash Doctor button requires port selection."""
app = DefibApp()
async with app.run_test(size=(120, 40)) as pilot:
await pilot.click("#doctor-btn")
await pilot.pause()
# Should stay on MainScreen — no chip selected
assert isinstance(app.screen, MainScreen)
async with app.run_test(size=(120, 40)):
btn = app.screen.query_one("#doctor-btn")
assert btn is not None

@pytest.mark.asyncio
async def test_flash_doctor_opens_with_chip_and_port(self):
Expand Down
Loading