Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ venv/
*.dcap
.env
.hypothesis/
*.log
47 changes: 43 additions & 4 deletions src/defib/recovery/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,17 +115,56 @@ async def run(
result = await protocol.send_firmware(transport, firmware, on_progress)
result.elapsed_ms = (time.monotonic() - start_time) * 1000

# Send break (Ctrl-C) if requested
# Send break (Ctrl-C) to interrupt U-Boot autoboot
if send_break and result.success:
if on_log:
on_log(LogEvent(level="info", message="Sending Ctrl-C to enter U-Boot console"))
for _ in range(49):
on_log(LogEvent(
level="info",
message="Waiting for U-Boot to start (up to 15s)...",
))
# U-Boot needs time to decompress, relocate, and initialize
# hardware (SPI, NAND, MMC, network) before showing the
# autoboot countdown. This can take 5-10 seconds.
# Strategy: send Ctrl-C every 200ms while reading output,
# looking for the autoboot prompt or U-Boot console prompt.
import asyncio
start_break = time.monotonic()
prompt_found = False
buf = bytearray()
while time.monotonic() - start_break < 15.0:
await transport.write(b"\x03")
try:
await transport.read(1, timeout=0.05)
data = await transport.read(256, timeout=0.2)
buf.extend(data)
text = buf.decode("ascii", errors="replace")
# Check for autoboot in full accumulated text
if "autoboot" in text.lower():
if on_log:
on_log(LogEvent(level="info", message="Autoboot detected, sending Ctrl-C..."))
for _ in range(20):
await transport.write(b"\x03")
await asyncio.sleep(0.1)
prompt_found = True
break
# Check for U-Boot prompt only in the LAST chunk
# (avoid false matches on boot log substrings)
tail = text[-256:] if len(text) > 256 else text
if "OpenIPC #" in tail or "hisilicon #" in tail or "\n=> " in tail:
prompt_found = True
break
except Exception:
pass

if prompt_found:
if on_log:
on_log(LogEvent(level="info", message="U-Boot console ready"))
else:
if on_log:
on_log(LogEvent(
level="warn",
message="U-Boot prompt not detected within 15s",
))

# Note: completion/failure is already reported via on_progress
# (Stage.COMPLETE event). We only log errors here that weren't
# already surfaced by the protocol.
Expand Down
1 change: 0 additions & 1 deletion src/defib/tui/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ class DefibApp(App[None]):

BINDINGS = [
Binding("q", "quit", "Quit", show=True),
Binding("ctrl+c", "quit", "Quit", show=False),
]

SCREENS = {"main": MainScreen}
Expand Down
164 changes: 154 additions & 10 deletions src/defib/tui/screens/progress.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
"""Progress screen: multi-stage progress bars and serial log panel."""
"""Progress screen: recovery progress + post-recovery serial console."""

from __future__ import annotations

import asyncio
from datetime import datetime

from textual.app import ComposeResult
Expand All @@ -14,6 +15,7 @@
ProgressBar,
RichLog,
Button,
Input,
)

from defib.recovery.events import LogEvent, ProgressEvent, Stage
Expand Down Expand Up @@ -46,7 +48,11 @@ def set_failed(self) -> None:


class ProgressScreen(Screen[None]):
"""Recovery progress screen with stage indicators, progress bar, and log."""
"""Recovery progress screen, then serial console after completion."""

BINDINGS = [
("ctrl+c", "send_break", "Send Ctrl-C to device"),
]

CSS = """
ProgressScreen {
Expand Down Expand Up @@ -105,6 +111,11 @@ class ProgressScreen(Screen[None]):
height: 1fr;
}

#console-input {
dock: bottom;
margin: 0 2;
}

#bottom-bar {
height: 3;
padding: 0 2;
Expand All @@ -127,6 +138,10 @@ def __init__(
self._send_break = send_break
self._stage_widgets: dict[str, StageIndicator] = {}
self._current_stage: str | None = None
self._transport: object | None = None # Kept open for console
self._console_mode = False
self._console_reader_task: asyncio.Task[None] | None = None
self._log_buffer: list[str] = [] # Plain-text log for export

def compose(self) -> ComposeResult:
yield Header()
Expand All @@ -150,18 +165,67 @@ def compose(self) -> ComposeResult:
with Vertical(id="log-container"):
yield RichLog(highlight=True, markup=True, id="log-panel", wrap=True)

yield Input(
placeholder="Type command and press Enter (serial console)",
id="console-input",
)

with Horizontal(id="bottom-bar"):
yield Button("Save Log", variant="success", id="save-log-btn")
yield Button("Back", variant="default", id="back-btn")

yield Footer()

def on_mount(self) -> None:
# Hide console input until recovery completes
self.query_one("#console-input").display = False
self._log("Starting recovery session...")
self.run_worker(self._run_recovery(), exclusive=True)

async def action_send_break(self) -> None:
"""Send Ctrl-C (0x03) to the serial device."""
if self._console_mode and self._transport is not None:
from defib.transport.base import Transport
transport: Transport = self._transport # type: ignore[assignment]
try:
await transport.write(b"\x03")
self._console_write("^C")
except Exception:
pass

def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "back-btn":
self._stop_console()
self.app.pop_screen()
elif event.button.id == "save-log-btn":
self._save_log()

def _save_log(self) -> None:
"""Save log buffer to a timestamped file."""
from pathlib import Path

timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"defib_{self._chip}_{timestamp}.log"
path = Path.cwd() / filename
try:
path.write_text("\n".join(self._log_buffer) + "\n")
self.notify(f"Log saved: {filename}", severity="information", title="Log Saved")
except Exception as e:
self.notify(f"Failed to save: {e}", severity="error", title="Save Error")

async def on_input_submitted(self, event: Input.Submitted) -> None:
"""Send typed command to serial port."""
if not self._console_mode or self._transport is None:
return
cmd = event.value
event.input.value = ""
if cmd:
from defib.transport.base import Transport
transport: Transport = self._transport # type: ignore[assignment]
try:
await transport.write((cmd + "\n").encode())
except Exception as e:
self._console_write(f"[Send error: {e}]\n")

def _log(self, message: str, style: str = "") -> None:
log_panel = self.query_one("#log-panel", RichLog)
Expand All @@ -170,9 +234,15 @@ def _log(self, message: str, style: str = "") -> None:
log_panel.write(f"[dim]{timestamp}[/dim] [{style}]{message}[/{style}]")
else:
log_panel.write(f"[dim]{timestamp}[/dim] {message}")
self._log_buffer.append(f"{timestamp} {message}")

def _console_write(self, text: str) -> None:
"""Write raw text to the log panel (no timestamp, for serial output)."""
log_panel = self.query_one("#log-panel", RichLog)
log_panel.write(text)
self._log_buffer.append(text)

def _map_stage_to_indicator(self, stage: Stage) -> str | None:
"""Map protocol stage to indicator widget name."""
mapping: dict[Stage, str] = {
Stage.HANDSHAKE: "handshake",
Stage.DDR_INIT: "ddr init",
Expand All @@ -189,18 +259,14 @@ def _map_stage_to_indicator(self, stage: Stage) -> str | None:
return mapping.get(stage)

def _on_progress(self, event: ProgressEvent) -> None:
# Update progress bar
if event.bytes_total > 1:
progress = self.query_one("#main-progress", ProgressBar)
progress.update(total=event.bytes_total, progress=event.bytes_sent)

# Update stage indicator
indicator_name = self._map_stage_to_indicator(event.stage)
if indicator_name and indicator_name != self._current_stage:
# Mark previous stage complete
if self._current_stage and self._current_stage in self._stage_widgets:
self._stage_widgets[self._current_stage].set_complete()
# Mark new stage active
if indicator_name in self._stage_widgets:
self._stage_widgets[indicator_name].set_active()
self._current_stage = indicator_name
Expand All @@ -213,8 +279,83 @@ def _on_log(self, event: LogEvent) -> None:
style = style_map.get(event.level, "")
self._log(event.message, style=style)

def _enter_console_mode(self) -> None:
"""Switch UI to serial console mode."""
self._console_mode = True
# Hide progress bar, show console input
self.query_one("#progress-container").display = False
self.query_one("#status-container").display = False
console_input = self.query_one("#console-input", Input)
console_input.display = True
console_input.focus()

info = self.query_one("#info-row", Static)
info.update(f"[bold]Serial Console:[/bold] {self._port}")

self._log("--- Serial Console (type commands below, press Enter to send) ---",
style="cyan bold")

# Start background reader
self._console_reader_task = asyncio.ensure_future(self._console_read_loop())

async def _console_read_loop(self) -> None:
"""Background task: read serial data, display it, auto-interrupt autoboot."""
from defib.transport.base import Transport, TransportTimeout
transport: Transport = self._transport # type: ignore[assignment]
buf = bytearray()
autoboot_handled = False
# Rolling window of recent output for autoboot detection
recent = ""

while self._console_mode and transport is not None:
try:
waiting = await transport.bytes_waiting()
if waiting > 0:
data = await transport.read(min(waiting, 1024), timeout=0.1)
buf.extend(data)
text = buf.decode("ascii", errors="replace")
if "\n" in text or len(buf) > 256:
self._console_write(text)
# Track recent output for autoboot detection
recent += text
if len(recent) > 2048:
recent = recent[-1024:]
buf.clear()

# Auto-detect autoboot and send Ctrl-C
if not autoboot_handled and "autoboot" in recent.lower():
autoboot_handled = True
self._log("Autoboot detected! Sending Ctrl-C...", style="yellow bold")
for _ in range(20):
await transport.write(b"\x03")
await asyncio.sleep(0.1)
else:
if buf:
text = buf.decode("ascii", errors="replace")
self._console_write(text)
recent += text
if len(recent) > 2048:
recent = recent[-1024:]
buf.clear()
await asyncio.sleep(0.05)
except TransportTimeout:
await asyncio.sleep(0.05)
except Exception:
break

def _stop_console(self) -> None:
"""Stop console mode and close transport."""
self._console_mode = False
if self._console_reader_task and not self._console_reader_task.done():
self._console_reader_task.cancel()
if self._transport is not None:
from defib.transport.base import Transport
transport: Transport = self._transport # type: ignore[assignment]
asyncio.ensure_future(transport.close())
self._transport = None

async def _run_recovery(self) -> None:
"""Execute recovery in a worker thread."""
"""Execute recovery, then enter console mode on success."""
from defib.recovery.session import RecoverySession
from defib.transport.serial_platform import create_transport, normalize_port_name

Expand All @@ -241,9 +382,8 @@ async def _run_recovery(self) -> None:
)
except Exception as e:
self._log(f"Recovery error: {e}", style="red bold")
return
finally:
await transport.close()
return

# Mark final stage
if self._current_stage and self._current_stage in self._stage_widgets:
Expand All @@ -256,5 +396,9 @@ async def _run_recovery(self) -> None:
self._log(f"Recovery complete! ({result.elapsed_ms:.0f}ms)", style="green bold")
progress = self.query_one("#main-progress", ProgressBar)
progress.update(total=100, progress=100)
# Keep transport open and enter console mode
self._transport = transport
self._enter_console_mode()
else:
self._log(f"Recovery failed: {result.error}", style="red bold")
await transport.close()
Loading
Loading