Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/defib/protocol/hisilicon_cv6xx.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ async def handshake(
"""Send DEADBEEF handshake until 'uart ddr' or 'uart flash' response."""
_emit(on_progress, ProgressEvent(
stage=Stage.HANDSHAKE, bytes_sent=0, bytes_total=1,
message="Sending CV6xx handshake...",
message="Waiting for bootrom... power-cycle the device now!",
))

# Build handshake frame: magic + baudrate(LE) + serial params + CRC(LE)
Expand Down
8 changes: 3 additions & 5 deletions src/defib/protocol/hisilicon_standard.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
BOOTMODE_MARKER = b"\x20"
BOOTMODE_COUNT = 5
BOOTMODE_ACK = b"\xaa"
MAX_INIT_READS = 30
MAX_INIT_READS = 30 # Only used by tests; interactive mode loops forever

FRAME_SEND_RETRIES_SHORT = 16
FRAME_SEND_RETRIES_LONG = 32
Expand Down Expand Up @@ -77,11 +77,11 @@ async def handshake(
"""Wait for bootrom 0x20 pattern and send 0xAA acknowledgment."""
_emit(on_progress, ProgressEvent(
stage=Stage.HANDSHAKE, bytes_sent=0, bytes_total=1,
message="Waiting for bootrom...",
message="Waiting for bootrom... power-cycle the device now!",
))

counter = 0
for i in range(MAX_INIT_READS):
while True:
try:
byte = await transport.read(1, timeout=1.0)
except TransportTimeout:
Expand All @@ -105,8 +105,6 @@ async def handshake(
))
return HandshakeResult(success=True, message="Boot mode entered")

return HandshakeResult(success=False, message="Bootrom handshake timeout")

async def _send_frame_with_retry(
self,
transport: Transport,
Expand Down
7 changes: 2 additions & 5 deletions src/defib/protocol/hisilicon_v500.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,14 @@ async def handshake(
"""Send V500 handshake frame until device responds with chip ID."""
_emit(on_progress, ProgressEvent(
stage=Stage.HANDSHAKE, bytes_sent=0, bytes_total=1,
message="Sending V500 handshake...",
message="Waiting for bootrom... power-cycle the device now!",
))

handshake_frame = append_crc(
V500_HANDSHAKE_MAGIC + b"\x00\x00\x00\x00\x00\x00\x00\x00"
)

start_time = time.monotonic()
while time.monotonic() - start_time < HANDSHAKE_TIMEOUT:
while True:
await transport.write(handshake_frame)
try:
response = await transport.read(14, timeout=0.1)
Expand All @@ -95,8 +94,6 @@ async def handshake(
except TransportTimeout:
continue

return HandshakeResult(success=False, message="V500 handshake timeout")

async def _send_frame_wait_ack(
self,
transport: Transport,
Expand Down
17 changes: 7 additions & 10 deletions src/defib/recovery/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ async def run(
elapsed_ms=elapsed,
)

if on_log:
on_log(LogEvent(level="info", message=handshake.message))
# Note: handshake success is already reported via on_progress,
# so we don't duplicate it here via on_log.

# Firmware transfer
firmware = self._load_firmware()
Expand All @@ -126,13 +126,10 @@ async def run(
except Exception:
pass

if on_log:
if result.success:
on_log(LogEvent(
level="info",
message=f"Recovery complete in {result.elapsed_ms:.0f}ms",
))
else:
on_log(LogEvent(level="error", message=f"Recovery failed: {result.error}"))
# Note: completion/failure is already reported via on_progress
# (Stage.COMPLETE event). We only log errors here that weren't
# already surfaced by the protocol.
if on_log and not result.success:
on_log(LogEvent(level="error", message=f"Recovery failed: {result.error}"))

return result
157 changes: 139 additions & 18 deletions src/defib/tui/screens/main.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Main setup screen: chip selector, file picker, port selector, start button."""
"""Main setup screen: chip selector, firmware (auto-download or local), port selector."""

from __future__ import annotations

Expand All @@ -18,14 +18,14 @@
Checkbox,
)

from defib.firmware import has_firmware, download_firmware, get_cached_path
from defib.profiles.loader import list_all_chips


def _get_serial_ports() -> list[tuple[str, str]]:
"""Get available serial ports as (label, value) tuples."""
try:
from serial.tools.list_ports import comports
# Filter out ghost/placeholder ports (no USB vendor ID = not real)
ports = sorted(
[p for p in comports() if p.vid is not None],
key=lambda p: p.device,
Expand All @@ -47,9 +47,9 @@ class MainScreen(Screen[None]):
}

#form-container {
width: 70;
width: 74;
height: auto;
max-height: 30;
max-height: 34;
border: thick $accent;
padding: 1 2;
background: $panel;
Expand All @@ -72,6 +72,18 @@ class MainScreen(Screen[None]):
width: 100%;
}

#fw-status {
height: 1;
color: $success;
margin-top: 0;
}

#fw-hint {
height: 1;
color: $text-muted;
text-style: italic;
}

#button-row {
margin-top: 1;
align: center middle;
Expand All @@ -82,8 +94,8 @@ class MainScreen(Screen[None]):
min-width: 20;
}

#chip-input {
width: 100%;
#download-btn {
min-width: 30;
}

#file-input {
Expand All @@ -110,9 +122,17 @@ def compose(self) -> ComposeResult:
allow_blank=True,
)

yield Label("Firmware File:")
yield Label("Firmware:")
yield Button(
"Select a chip first",
variant="default",
id="download-btn",
disabled=True,
)
yield Static("", id="fw-status")
yield Static("", id="fw-hint")
yield Input(
placeholder="/path/to/u-boot.bin",
placeholder="Or enter path to local firmware file",
id="file-input",
)

Expand All @@ -125,34 +145,136 @@ def compose(self) -> ComposeResult:
value=port_options[0][1] if port_options else "",
)

yield Checkbox("Send Ctrl-C after upload (enter U-Boot console)", id="break-check")
yield Checkbox(
"Send Ctrl-C after upload (enter U-Boot console)",
id="break-check",
)

with Horizontal(id="button-row"):
yield Button("Start Recovery", variant="primary", id="start-btn")

yield Footer()

def _get_chip(self) -> str:
sel = self.query_one("#chip-select", Select)
return str(sel.value) if sel.value != Select.BLANK else ""

def on_select_changed(self, event: Select.Changed) -> None:
if event.select.id == "chip-select":
self._on_chip_changed()

def on_input_changed(self, event: Input.Changed) -> None:
if event.input.id == "file-input":
self._update_start_button()

def _on_chip_changed(self) -> None:
chip = self._get_chip()
dl_btn = self.query_one("#download-btn", Button)
hint = self.query_one("#fw-hint", Static)
status = self.query_one("#fw-status", Static)

if not chip:
dl_btn.label = "Select a chip first"
dl_btn.disabled = True
dl_btn.variant = "default"
hint.update("")
status.update("")
elif has_firmware(chip):
cached = get_cached_path(chip)
if cached:
dl_btn.label = f"Re-download U-Boot for {chip}"
dl_btn.disabled = False
dl_btn.variant = "default"
status.update(f"✓ Cached: {cached.name} ({cached.stat().st_size // 1024} KB)")
else:
dl_btn.label = f"Download U-Boot for {chip}"
dl_btn.disabled = False
dl_btn.variant = "success"
status.update("")
hint.update("Or enter a local file path below for custom builds.")
else:
dl_btn.label = "No OpenIPC build available"
dl_btn.disabled = True
dl_btn.variant = "default"
hint.update("Enter a local firmware file path below.")
status.update("")

self._update_start_button()

def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "start-btn":
self._start_recovery()
elif event.button.id == "download-btn":
self._download_firmware()

def _download_firmware(self) -> None:
chip = self._get_chip()
if not chip or not has_firmware(chip):
return

dl_btn = self.query_one("#download-btn", Button)
status = self.query_one("#fw-status", Static)

dl_btn.label = "Downloading..."
dl_btn.disabled = True
status.update("")

try:
path = download_firmware(chip)
status.update(f"✓ {path.name} ({path.stat().st_size // 1024} KB)")
dl_btn.label = f"Re-download U-Boot for {chip}"
dl_btn.disabled = False
dl_btn.variant = "default"
self.notify(
f"Downloaded {path.name}",
severity="information",
title="Firmware Ready",
)
except (ValueError, ConnectionError) as e:
status.update("")
dl_btn.label = "Download failed — retry?"
dl_btn.disabled = False
dl_btn.variant = "error"
self.notify(str(e), severity="error", title="Download Failed")

self._update_start_button()

def _get_firmware_path(self) -> str:
"""Get firmware path: local file input takes priority, then cached download."""
local = str(self.query_one("#file-input", Input).value).strip()
if local:
return local

chip = self._get_chip()
if chip:
cached = get_cached_path(chip)
if cached:
return str(cached)

return ""

def _update_start_button(self) -> None:
chip = self._get_chip()
firmware = self._get_firmware_path()
port_sel = self.query_one("#port-select", Select)
port = str(port_sel.value) if port_sel.value != Select.BLANK else ""

self.query_one("#start-btn", Button).disabled = not (chip and firmware and port)

def _start_recovery(self) -> None:
chip_select = self.query_one("#chip-select", Select)
file_input = self.query_one("#file-input", Input)
port_select = self.query_one("#port-select", Select)
chip = self._get_chip()
firmware_path = self._get_firmware_path()
port_sel = self.query_one("#port-select", Select)
port = str(port_sel.value) if port_sel.value != Select.BLANK else ""
break_check = self.query_one("#break-check", Checkbox)

chip = str(chip_select.value) if chip_select.value != Select.BLANK else ""
firmware_path = file_input.value.strip()
port = str(port_select.value) if port_select.value != Select.BLANK else ""
send_break = break_check.value

# Validation
errors: list[str] = []
if not chip:
errors.append("Select a chip model")
if not firmware_path:
errors.append("Enter a firmware file path")
errors.append("Download firmware or enter a file path")
elif not Path(firmware_path).is_file():
errors.append(f"File not found: {firmware_path}")
if not port:
Expand All @@ -162,7 +284,6 @@ def _start_recovery(self) -> None:
self.notify("\n".join(errors), severity="error", title="Validation Error")
return

# Start recovery via the app
from defib.tui.app import DefibApp
app = self.app
if isinstance(app, DefibApp):
Expand Down
5 changes: 3 additions & 2 deletions src/defib/tui/screens/progress.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ def compose(self) -> ComposeResult:
with Horizontal(id="stages-row"):
stages = ["Handshake", "DDR Init", "SPL/GSL", "U-Boot"]
for name in stages:
indicator = StageIndicator(name, id=f"stage-{name.lower().replace('/', '-')}")
safe_id = name.lower().replace("/", "-").replace(" ", "-")
indicator = StageIndicator(name, id=f"stage-{safe_id}")
self._stage_widgets[name.lower()] = indicator
yield indicator

Expand Down Expand Up @@ -204,7 +205,7 @@ def _on_progress(self, event: ProgressEvent) -> None:
self._stage_widgets[indicator_name].set_active()
self._current_stage = indicator_name

if event.message:
if event.message and event.stage != Stage.COMPLETE:
self._log(event.message)

def _on_log(self, event: LogEvent) -> None:
Expand Down
11 changes: 2 additions & 9 deletions tests/test_protocol_standard.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,8 @@ async def test_handshake_with_noise(self):
result = await protocol.handshake(transport)
assert result.success

@pytest.mark.asyncio
async def test_handshake_timeout(self):
transport = MockTransport()
# Only 3x 0x20, not enough
transport.enqueue_rx(b"\x20\x20\x20")

protocol = HiSiliconStandard()
result = await protocol.handshake(transport)
assert not result.success
# No timeout test — handshake waits forever for user to power-cycle.
# User cancels via Ctrl-C, not a timeout.


class TestStandardFirmwareTransfer:
Expand Down
Loading
Loading