diff --git a/.trajectories/completed/2026-05/traj_piik8r6zu3i7.json b/.trajectories/completed/2026-05/traj_piik8r6zu3i7.json new file mode 100644 index 000000000..3a858b050 --- /dev/null +++ b/.trajectories/completed/2026-05/traj_piik8r6zu3i7.json @@ -0,0 +1,89 @@ +{ + "id": "traj_piik8r6zu3i7", + "version": 1, + "task": { + "title": "Issue 867: RelayEventListener" + }, + "status": "completed", + "startedAt": "2026-05-18T01:56:18.236Z", + "completedAt": "2026-05-18T02:01:49.991Z", + "agents": [ + { + "name": "default", + "role": "lead", + "joinedAt": "2026-05-18T01:58:24.265Z" + } + ], + "chapters": [ + { + "id": "chap_xnbleth5fk5g", + "title": "Work", + "agentName": "default", + "startedAt": "2026-05-18T01:58:24.265Z", + "endedAt": "2026-05-18T02:01:49.991Z", + "events": [ + { + "ts": 1779069504266, + "type": "decision", + "content": "Drain writeback channel from a dedicated std::thread, not tokio task: Drain writeback channel from a dedicated std::thread, not tokio task", + "raw": { + "question": "Drain writeback channel from a dedicated std::thread, not tokio task", + "chosen": "Drain writeback channel from a dedicated std::thread, not tokio task", + "alternatives": [], + "reasoning": "The reader thread is a std::thread (not tokio) and it's the only place that calls processor.advance which fires send_event. Keeping the drainer as a std::thread that takes self.writer.lock() (parking_lot Mutex) avoids importing the tokio runtime into PtySession::spawn and matches the existing reader thread shape. Also avoids holding any async tasks at spawn time when no runtime is guaranteed." + }, + "significance": "high" + }, + { + "ts": 1779069510157, + "type": "decision", + "content": "Use std::sync::mpsc::Sender (sync, unbounded-ish) for writeback channel: Use std::sync::mpsc::Sender (sync, unbounded-ish) for writeback channel", + "raw": { + "question": "Use std::sync::mpsc::Sender (sync, unbounded-ish) for writeback channel", + "chosen": "Use std::sync::mpsc::Sender (sync, unbounded-ish) for writeback channel", + "alternatives": [], + "reasoning": "send_event must be non-blocking (called while holding processor+term locks). std mpsc Sender::send never blocks. A dedicated std::thread drains the receiver and writes to the PTY writer, decoupling potential write_all blocking from the parser's hot path. Avoids tokio runtime requirement in PtySession::spawn." + }, + "significance": "high" + }, + { + "ts": 1779069704857, + "type": "decision", + "content": "Made Snapshot::from_term generic over EventListener: Made Snapshot::from_term generic over EventListener", + "raw": { + "question": "Made Snapshot::from_term generic over EventListener", + "chosen": "Made Snapshot::from_term generic over EventListener", + "alternatives": [], + "reasoning": "Changing PtySession's term field from Term to Term broke Snapshot::capture which calls pty.with_term(Self::from_term). Generic over L: EventListener keeps offline tests on VoidListener and live capture on RelayEventListener without an extra wrapper." + }, + "significance": "high" + }, + { + "ts": 1779069705573, + "type": "decision", + "content": "Also removed parser call sites in wrap.rs and main.rs (not just pty_worker.rs): Also removed parser call sites in wrap.rs and main.rs (not just pty_worker.rs)", + "raw": { + "question": "Also removed parser call sites in wrap.rs and main.rs (not just pty_worker.rs)", + "chosen": "Also removed parser call sites in wrap.rs and main.rs (not just pty_worker.rs)", + "alternatives": [], + "reasoning": "Issue scope said pty_worker.rs, but TerminalQueryParser was also used by run_wrap (wrap.rs) and exported via helpers + asserted in main.rs tests. Both used the same PtySession, so the listener now handles them. Leaving them would have broken the import-removal in main.rs." + }, + "significance": "high" + } + ] + } + ], + "retrospective": { + "summary": "Replaced hand-rolled TerminalQueryParser with alacritty RelayEventListener. Listener owns a std::sync::mpsc::Sender; alacritty's send_event(Event::PtyWrite) pushes query responses (DSR/DA1/DA2/CPR) onto the channel, drained by a dedicated thread that writes to the PTY. CPR responses now reflect real cursor position instead of hardcoded 1;1. Deleted the parser and call sites in pty_worker.rs, wrap.rs, and main.rs; made Snapshot::from_term generic over EventListener. All 615 tests pass; clippy clean.", + "approach": "Standard approach", + "confidence": 0.85 + }, + "commits": [], + "filesChanged": [], + "projectId": "relay", + "tags": [], + "_trace": { + "startRef": "5fc8a131561feedfe990fc265f224101f6f267c4", + "endRef": "5fc8a131561feedfe990fc265f224101f6f267c4" + } +} diff --git a/.trajectories/completed/2026-05/traj_piik8r6zu3i7.md b/.trajectories/completed/2026-05/traj_piik8r6zu3i7.md new file mode 100644 index 000000000..b15a7d2b7 --- /dev/null +++ b/.trajectories/completed/2026-05/traj_piik8r6zu3i7.md @@ -0,0 +1,51 @@ +# Trajectory: Issue 867: RelayEventListener + +> **Status:** ✅ Completed +> **Confidence:** 85% +> **Started:** May 17, 2026 at 09:56 PM +> **Completed:** May 17, 2026 at 10:01 PM + +--- + +## Summary + +Replaced hand-rolled TerminalQueryParser with alacritty RelayEventListener. Listener owns a std::sync::mpsc::Sender; alacritty's send_event(Event::PtyWrite) pushes query responses (DSR/DA1/DA2/CPR) onto the channel, drained by a dedicated thread that writes to the PTY. CPR responses now reflect real cursor position instead of hardcoded 1;1. Deleted the parser and call sites in pty_worker.rs, wrap.rs, and main.rs; made Snapshot::from_term generic over EventListener. All 615 tests pass; clippy clean. + +**Approach:** Standard approach + +--- + +## Key Decisions + +### Drain writeback channel from a dedicated std::thread, not tokio task + +- **Chose:** Drain writeback channel from a dedicated std::thread, not tokio task +- **Reasoning:** The reader thread is a std::thread (not tokio) and it's the only place that calls processor.advance which fires send_event. Keeping the drainer as a std::thread that takes self.writer.lock() (parking_lot Mutex) avoids importing the tokio runtime into PtySession::spawn and matches the existing reader thread shape. Also avoids holding any async tasks at spawn time when no runtime is guaranteed. + +### Use std::sync::mpsc::Sender (sync, unbounded-ish) for writeback channel + +- **Chose:** Use std::sync::mpsc::Sender (sync, unbounded-ish) for writeback channel +- **Reasoning:** send_event must be non-blocking (called while holding processor+term locks). std mpsc Sender::send never blocks. A dedicated std::thread drains the receiver and writes to the PTY writer, decoupling potential write_all blocking from the parser's hot path. Avoids tokio runtime requirement in PtySession::spawn. + +### Made Snapshot::from_term generic over EventListener + +- **Chose:** Made Snapshot::from_term generic over EventListener +- **Reasoning:** Changing PtySession's term field from Term to Term broke Snapshot::capture which calls pty.with_term(Self::from_term). Generic over L: EventListener keeps offline tests on VoidListener and live capture on RelayEventListener without an extra wrapper. + +### Also removed parser call sites in wrap.rs and main.rs (not just pty_worker.rs) + +- **Chose:** Also removed parser call sites in wrap.rs and main.rs (not just pty_worker.rs) +- **Reasoning:** Issue scope said pty_worker.rs, but TerminalQueryParser was also used by run_wrap (wrap.rs) and exported via helpers + asserted in main.rs tests. Both used the same PtySession, so the listener now handles them. Leaving them would have broken the import-removal in main.rs. + +--- + +## Chapters + +### 1. Work + +_Agent: default_ + +- Drain writeback channel from a dedicated std::thread, not tokio task: Drain writeback channel from a dedicated std::thread, not tokio task +- Use std::sync::mpsc::Sender (sync, unbounded-ish) for writeback channel: Use std::sync::mpsc::Sender (sync, unbounded-ish) for writeback channel +- Made Snapshot::from_term generic over EventListener: Made Snapshot::from_term generic over EventListener +- Also removed parser call sites in wrap.rs and main.rs (not just pty_worker.rs): Also removed parser call sites in wrap.rs and main.rs (not just pty_worker.rs) diff --git a/.trajectories/index.json b/.trajectories/index.json index e791eec49..5c4ceb6d4 100644 --- a/.trajectories/index.json +++ b/.trajectories/index.json @@ -1,6 +1,6 @@ { "version": 1, - "lastUpdated": "2026-05-17T14:33:32.468Z", + "lastUpdated": "2026-05-18T02:01:50.133Z", "trajectories": { "traj_9gq96irkj00s": { "title": "Update relay to use published relaycast Rust reclaim fix", @@ -591,6 +591,13 @@ "startedAt": "2026-05-17T14:19:10.603Z", "completedAt": "2026-05-17T14:33:32.293Z", "path": ".trajectories/completed/2026-05/traj_cbmwd07phhm2.json" + }, + "traj_piik8r6zu3i7": { + "title": "Issue 867: RelayEventListener", + "status": "completed", + "startedAt": "2026-05-18T01:56:18.236Z", + "completedAt": "2026-05-18T02:01:49.991Z", + "path": ".trajectories/completed/2026-05/traj_piik8r6zu3i7.json" } } } diff --git a/src/helpers.rs b/src/helpers.rs index 2251bce48..555acc231 100644 --- a/src/helpers.rs +++ b/src/helpers.rs @@ -216,89 +216,6 @@ pub(crate) fn check_echo_in_output(output: &str, expected: &str) -> bool { clean.contains(expected) } -#[derive(Debug, Clone, Copy, Default)] -pub(crate) enum TerminalQueryState { - #[default] - Idle, - Esc, - Csi, - CsiQmark, - Csi6, - CsiQmark6, - /// CSI 0 — could be DA1 param (ESC [ 0 c) - Csi0, - /// CSI 5 — could be DSR (ESC [ 5 n) - Csi5, - /// CSI > — DA2 prefix (ESC [ > c) - CsiGt, -} - -#[derive(Debug, Default)] -pub(crate) struct TerminalQueryParser { - pub(crate) state: TerminalQueryState, -} - -impl TerminalQueryParser { - pub(crate) fn feed(&mut self, chunk: &[u8]) -> Vec<&'static [u8]> { - const ESC: u8 = 0x1b; - const CSI: u8 = b'['; - const QMARK: u8 = b'?'; - const SIX: u8 = b'6'; - const N: u8 = b'n'; - - let mut out = Vec::new(); - for byte in chunk { - self.state = match (self.state, *byte) { - (_, ESC) => TerminalQueryState::Esc, - (TerminalQueryState::Esc, CSI) => TerminalQueryState::Csi, - (TerminalQueryState::Csi, QMARK) => TerminalQueryState::CsiQmark, - (TerminalQueryState::Csi, b'>') => TerminalQueryState::CsiGt, - (TerminalQueryState::Csi, SIX) => TerminalQueryState::Csi6, - (TerminalQueryState::Csi, b'0') => TerminalQueryState::Csi0, - (TerminalQueryState::Csi, b'5') => TerminalQueryState::Csi5, - (TerminalQueryState::CsiQmark, SIX) => TerminalQueryState::CsiQmark6, - // CSI 6 n → Cursor Position Report (CPR) - (TerminalQueryState::Csi6, N) => { - out.push(b"\x1b[1;1R".as_slice()); - TerminalQueryState::Idle - } - (TerminalQueryState::CsiQmark6, N) => { - out.push(b"\x1b[?1;1R".as_slice()); - TerminalQueryState::Idle - } - // CSI c → DA1 (Device Attributes primary, no params) - (TerminalQueryState::Csi, b'c') => { - out.push(b"\x1b[?1;2c".as_slice()); // VT100 with AVO - TerminalQueryState::Idle - } - // CSI 0 c → DA1 with explicit 0 param - (TerminalQueryState::Csi0, b'c') => { - out.push(b"\x1b[?1;2c".as_slice()); - TerminalQueryState::Idle - } - // CSI > c → DA2 (Device Attributes secondary) - (TerminalQueryState::CsiGt, b'c') => { - out.push(b"\x1b[>1;10;0c".as_slice()); // VT100, version 10 - TerminalQueryState::Idle - } - // CSI 5 n → DSR (Device Status Report) - (TerminalQueryState::Csi5, N) => { - out.push(b"\x1b[0n".as_slice()); // terminal OK - TerminalQueryState::Idle - } - _ => TerminalQueryState::Idle, - }; - } - out - } -} - -#[cfg(test)] -pub(crate) fn terminal_query_responses(chunk: &[u8]) -> Vec<&'static [u8]> { - let mut parser = TerminalQueryParser::default(); - parser.feed(chunk) -} - fn workspace_context_label( workspace_id: Option<&str>, workspace_alias: Option<&str>, @@ -1196,46 +1113,6 @@ mod tests { assert_eq!(detector.detect_activity(&output, expected_echo), None); } - #[test] - fn terminal_query_da1_no_param() { - let mut parser = TerminalQueryParser::default(); - let responses = parser.feed(b"\x1b[c"); - assert_eq!(responses.len(), 1); - assert_eq!(responses[0], b"\x1b[?1;2c"); - } - - #[test] - fn terminal_query_da1_with_zero() { - let mut parser = TerminalQueryParser::default(); - let responses = parser.feed(b"\x1b[0c"); - assert_eq!(responses.len(), 1); - assert_eq!(responses[0], b"\x1b[?1;2c"); - } - - #[test] - fn terminal_query_da2() { - let mut parser = TerminalQueryParser::default(); - let responses = parser.feed(b"\x1b[>c"); - assert_eq!(responses.len(), 1); - assert_eq!(responses[0], b"\x1b[>1;10;0c"); - } - - #[test] - fn terminal_query_dsr() { - let mut parser = TerminalQueryParser::default(); - let responses = parser.feed(b"\x1b[5n"); - assert_eq!(responses.len(), 1); - assert_eq!(responses[0], b"\x1b[0n"); - } - - #[test] - fn terminal_query_cpr_still_works() { - let mut parser = TerminalQueryParser::default(); - let responses = parser.feed(b"\x1b[6n"); - assert_eq!(responses.len(), 1); - assert_eq!(responses[0], b"\x1b[1;1R"); - } - // ==================== detect_codex_model_prompt tests ==================== #[test] @@ -1469,55 +1346,6 @@ mod tests { assert!(!has_allow); } - // ==================== terminal query parser edge cases ==================== - - #[test] - fn terminal_query_multiple_queries_in_one_chunk() { - let mut parser = TerminalQueryParser::default(); - // DA1 + CPR + DSR all in one chunk - let responses = parser.feed(b"\x1b[c\x1b[6n\x1b[5n"); - assert_eq!(responses.len(), 3); - assert_eq!(responses[0], b"\x1b[?1;2c"); // DA1 - assert_eq!(responses[1], b"\x1b[1;1R"); // CPR - assert_eq!(responses[2], b"\x1b[0n"); // DSR - } - - #[test] - fn terminal_query_interleaved_with_text() { - let mut parser = TerminalQueryParser::default(); - // Normal text + DA1 query + more text - let responses = parser.feed(b"Hello\x1b[cWorld"); - assert_eq!(responses.len(), 1); - assert_eq!(responses[0], b"\x1b[?1;2c"); - } - - #[test] - fn terminal_query_split_across_chunks() { - let mut parser = TerminalQueryParser::default(); - // ESC in first chunk, [6n in second - let r1 = parser.feed(b"\x1b"); - assert_eq!(r1.len(), 0); - let r2 = parser.feed(b"[6n"); - assert_eq!(r2.len(), 1); - assert_eq!(r2[0], b"\x1b[1;1R"); - } - - #[test] - fn terminal_query_incomplete_sequence_reset() { - let mut parser = TerminalQueryParser::default(); - // ESC [ but then a regular char (not a query) — should reset - let responses = parser.feed(b"\x1b[A"); - assert_eq!(responses.len(), 0, "cursor up should not generate response"); - } - - #[test] - fn terminal_query_qmark_cpr() { - let mut parser = TerminalQueryParser::default(); - let responses = parser.feed(b"\x1b[?6n"); - assert_eq!(responses.len(), 1); - assert_eq!(responses[0], b"\x1b[?1;1R"); - } - // ==================== throttle edge cases ==================== #[test] diff --git a/src/main.rs b/src/main.rs index 54face2ce..6e8596b84 100644 --- a/src/main.rs +++ b/src/main.rs @@ -26,7 +26,7 @@ use helpers::{ detect_codex_model_prompt, detect_gemini_action_required, detect_gemini_trust_prompt, detect_gemini_untrusted_banner, detect_opencode_permission_prompt, floor_char_boundary, is_auto_suggestion, is_bypass_selection_menu, is_in_editor_mode, is_self_name, - normalize_cli_name, parse_cli_command, strip_ansi, TerminalQueryParser, + normalize_cli_name, parse_cli_command, strip_ansi, }; use listen_api::{broadcast_if_relevant, listen_api_router, ListenApiConfig, ListenApiRequest}; use routing::display_target_for_dashboard; @@ -5500,7 +5500,7 @@ mod tests { time::{Duration, Instant}, }; - use crate::helpers::{format_injection, terminal_query_responses}; + use crate::helpers::format_injection; use relay_broker::protocol::{MessageInjectionMode, RelayDelivery}; use serde_json::{json, Value}; @@ -5515,7 +5515,7 @@ mod tests { normalize_sender, relaycast_spawn_control_dedup_key, relaycast_ws_control_dedup_key, relaycast_ws_should_apply_local_spawn_echo_dedup, relaycast_ws_spawn_token, sender_is_dashboard_label, should_clear_pending_delivery_for_event, strip_ansi, - AgentRuntime, PendingDelivery, ProtocolHeadlessProvider, TerminalQueryParser, + AgentRuntime, PendingDelivery, ProtocolHeadlessProvider, }; use crate::helpers::floor_char_boundary; use relay_broker::dedup::DedupCache; @@ -6349,29 +6349,6 @@ mod tests { assert!(should_clear_pending_delivery_for_event(None, Some("evt_1"))); } - #[test] - fn terminal_query_responses_standard_cpr() { - let responses = terminal_query_responses(b"\x1b[6n"); - assert_eq!(responses, vec![b"\x1b[1;1R".as_slice()]); - } - - #[test] - fn terminal_query_parser_handles_split_sequences() { - let mut parser = TerminalQueryParser::default(); - assert!(parser.feed(b"\x1b[").is_empty()); - assert!(parser.feed(b"6").is_empty()); - let responses = parser.feed(b"n"); - assert_eq!(responses, vec![b"\x1b[1;1R".as_slice()]); - } - - #[test] - fn terminal_query_parser_handles_private_cpr() { - let mut parser = TerminalQueryParser::default(); - assert!(parser.feed(b"\x1b[?6").is_empty()); - let responses = parser.feed(b"n"); - assert_eq!(responses, vec![b"\x1b[?1;1R".as_slice()]); - } - // ==================== strip_ansi tests ==================== #[test] diff --git a/src/pty.rs b/src/pty.rs index b4dbccf5a..c22a2628f 100644 --- a/src/pty.rs +++ b/src/pty.rs @@ -5,12 +5,12 @@ use std::{ path::Path, sync::{ atomic::{AtomicBool, Ordering}, - Arc, + mpsc as std_mpsc, Arc, }, thread, }; -use alacritty_terminal::event::VoidListener; +use alacritty_terminal::event::{Event, EventListener}; use alacritty_terminal::grid::Dimensions; use alacritty_terminal::index::{Column, Line, Point}; use alacritty_terminal::term::{Config, Term}; @@ -20,6 +20,61 @@ use parking_lot::Mutex; use portable_pty::{native_pty_system, CommandBuilder, PtySize}; use tokio::sync::mpsc; +/// Upper bound on queued writeback responses (DSR/DA1/DA2/CPR replies). +/// Bounded so a misbehaving child that floods query sequences while the +/// drainer is stuck on `write_all` cannot grow the queue without limit +/// and OOM the broker. 128 entries is well past any real burst — a +/// healthy terminal interaction sees fewer than a handful of pending +/// replies at a time. +const WRITEBACK_QUEUE_DEPTH: usize = 128; + +/// Forwards alacritty's terminal events back to the PTY's stdin so the +/// child process sees real responses to its query sequences (DSR, DA1, +/// DA2, CPR, …). alacritty fills in the response payloads using the +/// real grid state — so CPR replies carry the actual cursor position +/// rather than the old hand-rolled `1;1` placeholder. +/// +/// `send_event` is invoked from inside `Processor::advance` while the +/// processor and term locks are held, so it must be non-blocking. We +/// hand the bytes off to a bounded `std::sync::mpsc::sync_channel` via +/// `try_send`; a dedicated drainer thread takes the writer lock and +/// pushes them down the PTY, decoupling potential `write_all` blocking +/// from the parser hot path. +#[derive(Clone)] +pub struct RelayEventListener { + tx: std_mpsc::SyncSender>, +} + +impl RelayEventListener { + fn new(tx: std_mpsc::SyncSender>) -> Self { + Self { tx } + } +} + +impl EventListener for RelayEventListener { + fn send_event(&self, event: Event) { + // Only `PtyWrite` actually needs to round-trip to the child. + // Title/colour/clipboard/etc. events are intentionally dropped — + // the broker isn't a real UI, so there is nothing meaningful to + // do with them. + if let Event::PtyWrite(text) = event { + // Non-blocking try_send: if the queue is full (drainer + // backed up behind a blocked write_all) or the drainer has + // exited (PTY teardown), drop the reply. Telemetry must be + // infallible, but a queue overflow is worth flagging. + match self.tx.try_send(text.into_bytes()) { + Ok(()) | Err(std_mpsc::TrySendError::Disconnected(_)) => {} + Err(std_mpsc::TrySendError::Full(_)) => { + tracing::warn!( + depth = WRITEBACK_QUEUE_DEPTH, + "pty writeback queue full; dropping terminal query response" + ); + } + } + } + } +} + /// Cell dimensions for a parsed VT grid. Implements /// `alacritty_terminal::grid::Dimensions` so `Term::new` / `Term::resize` /// accept it directly without pulling in the test-helper `TermSize`. @@ -68,7 +123,12 @@ pub struct PtySession { /// VT100 grid kept in sync with PTY output. The reader thread /// advances `processor` on every chunk; queries (`screen_text`, /// `cursor_position`, `cell_at`) read `term` under the same lock. - term: Arc>>, + /// + /// The `RelayEventListener` is plumbed into `Term::new` so that + /// query responses (DSR/DA1/DA2/CPR) generated by alacritty are + /// written back to the PTY's stdin — giving the child accurate + /// responses based on the real grid state. + term: Arc>>, /// Shared with the reader thread, which holds the actual lock most /// of the time. Kept on the struct so `resize()` can take the same /// lock the reader uses — preventing the child's post-resize @@ -168,13 +228,37 @@ impl PtySession { .context("failed to take pty writer")?; let size = GridSize::from_pty(rows, cols); - let term = Arc::new(Mutex::new(Term::new( - Config::default(), - &size, - VoidListener, - ))); + // Writeback channel for query responses generated by alacritty + // (DSR/DA1/DA2/CPR). The listener is sync and non-blocking; the + // drainer thread below takes the writer lock and pushes the + // bytes down the PTY. Bounded to prevent unbounded growth if + // the drainer is stuck behind a blocked `write_all`. + let (writeback_tx, writeback_rx) = std_mpsc::sync_channel::>(WRITEBACK_QUEUE_DEPTH); + let listener = RelayEventListener::new(writeback_tx); + let term = Arc::new(Mutex::new(Term::new(Config::default(), &size, listener))); let processor = Arc::new(Mutex::new(Processor::new())); + // Drainer: receives writeback bytes from the listener and pushes + // them to the PTY writer. Lives on a std::thread so it doesn't + // need a tokio runtime. Exits when the listener is dropped (i.e. + // when `term` is dropped, which happens at `PtySession` drop). + let writer_arc: Arc>> = Arc::new(Mutex::new(writer)); + let writer_for_drainer = writer_arc.clone(); + thread::spawn(move || { + while let Ok(bytes) = writeback_rx.recv() { + if bytes.is_empty() { + continue; + } + let mut guard = writer_for_drainer.lock(); + if guard.write_all(&bytes).is_err() { + break; + } + if guard.flush().is_err() { + break; + } + } + }); + let (tx, rx) = mpsc::channel(256); let term_clone = term.clone(); let processor_clone = processor.clone(); @@ -189,6 +273,12 @@ impl PtySession { // takes the same order; matching prevents // deadlock and ensures bytes are never // parsed against stale dimensions. + // + // The listener inside `term` may send bytes + // onto the writeback channel while we hold + // these locks — that send is non-blocking + // (std::mpsc), so no deadlock with the + // writer lock taken by the drainer thread. let mut processor_guard = processor_clone.lock(); let mut term_guard = term_clone.lock(); processor_guard.advance(&mut *term_guard, &buf[..n]); @@ -205,7 +295,7 @@ impl PtySession { Ok(( Self { master: pair.master, - writer: Arc::new(Mutex::new(writer)), + writer: writer_arc, child: Arc::new(Mutex::new(child)), child_pid, reaped: Arc::new(AtomicBool::new(false)), @@ -311,7 +401,7 @@ impl PtySession { /// /// Keep the closure short — it blocks the reader thread from advancing /// the VT parser while it runs. - pub fn with_term(&self, f: impl FnOnce(&Term) -> R) -> R { + pub fn with_term(&self, f: impl FnOnce(&Term) -> R) -> R { let term = self.term.lock(); f(&term) } @@ -742,4 +832,93 @@ mod tests { assert_eq!(String::from_utf8_lossy(&collected), "xterm-256color"); } + + // ---- RelayEventListener wiring tests ---- + // + // These drive a free-standing `Term` so we can + // assert that alacritty itself answers DSR/CPR query sequences with + // bytes that match real grid state — replacing the old hand-rolled + // `TerminalQueryParser` that hardcoded `1;1` for CPR. + + use super::{RelayEventListener, WRITEBACK_QUEUE_DEPTH}; + use std::sync::mpsc as std_mpsc; + use std::time::Duration as StdDuration; + + fn drive_listener( + rows: u16, + cols: u16, + chunks: &[&[u8]], + ) -> (std_mpsc::Receiver>, Term) { + let size = GridSize { + columns: cols as usize, + screen_lines: rows as usize, + }; + let (tx, rx) = std_mpsc::sync_channel::>(WRITEBACK_QUEUE_DEPTH); + let listener = RelayEventListener::new(tx); + let mut term = Term::new(Config::default(), &size, listener); + let mut processor: Processor = Processor::new(); + for chunk in chunks { + processor.advance(&mut term, chunk); + } + (rx, term) + } + + fn drain_writeback(rx: &std_mpsc::Receiver>) -> Vec { + let mut out = Vec::new(); + // First reply is queued synchronously inside processor.advance, + // but try_recv is enough since send is non-blocking. + while let Ok(bytes) = rx.recv_timeout(StdDuration::from_millis(50)) { + out.extend_from_slice(&bytes); + } + out + } + + #[test] + fn listener_answers_dsr_with_terminal_ok() { + // ESC[5n is the Device Status Report query — alacritty must + // reply with ESC[0n ("terminal OK"). + let (rx, _term) = drive_listener(24, 80, &[b"\x1b[5n"]); + let writeback = drain_writeback(&rx); + assert_eq!( + writeback, b"\x1b[0n", + "DSR ESC[5n must produce ESC[0n; got {writeback:?}" + ); + } + + #[test] + fn listener_answers_da1_with_vt102_ident() { + // ESC[c is the Primary Device Attributes query — alacritty + // identifies as a VT102 (ESC[?6c). This is startup-critical: + // many CLIs hang at boot if DA1 goes unanswered. The exact + // ident byte differs between terminals (xterm uses ?1;2c) — + // we just need *some* well-formed DA1 reply to come back. + let (rx, _term) = drive_listener(24, 80, &[b"\x1b[c"]); + let writeback = drain_writeback(&rx); + assert_eq!( + writeback, b"\x1b[?6c", + "DA1 ESC[c must produce a VT102 ident (ESC[?6c); got {writeback:?}" + ); + } + + #[test] + fn listener_answers_cpr_with_real_cursor_position() { + // Move the cursor to row 3, col 5 (1-indexed CUP), then issue + // a CPR query. alacritty should reply with the **real** cursor + // position — `ESC[3;5R` — not the old hardcoded `1;1`. + let (rx, term) = drive_listener(24, 80, &[b"\x1b[3;5H", b"\x1b[6n"]); + + // Sanity-check the grid actually moved the cursor. + let point = term.grid().cursor.point; + assert_eq!(point.line.0, 2, "row 3 (1-indexed) = line 2 (0-indexed)"); + assert_eq!( + point.column.0, 4, + "col 5 (1-indexed) = column 4 (0-indexed)" + ); + + let writeback = drain_writeback(&rx); + assert_eq!( + writeback, b"\x1b[3;5R", + "CPR ESC[6n must reflect real cursor position; got {writeback:?}" + ); + } } diff --git a/src/pty_worker.rs b/src/pty_worker.rs index 3362cbc78..aed8cd378 100644 --- a/src/pty_worker.rs +++ b/src/pty_worker.rs @@ -207,7 +207,8 @@ pub(crate) async fn run_pty_worker(cmd: PtyCommand) -> Result<()> { let (init_rows, init_cols) = get_terminal_size().unwrap_or((24, 80)); let (pty, mut pty_rx) = PtySession::spawn(&resolved_cli, &effective_args, init_rows, init_cols)?; - let mut terminal_query_parser = TerminalQueryParser::default(); + // Query responses (DSR/DA1/DA2/CPR) are answered by alacritty's + // `RelayEventListener` inside `PtySession` — no parser needed here. let (out_tx, mut out_rx) = mpsc::channel::>(1024); let writer_task = tokio::spawn(async move { @@ -521,9 +522,6 @@ pub(crate) async fn run_pty_worker(cmd: PtyCommand) -> Result<()> { reported_idle = false; // Child is provably alive — reset the no-PID exit counter. pty.reset_no_pid_checks(); - for response in terminal_query_parser.feed(&chunk) { - let _ = pty.write_all(response); - } let text = String::from_utf8_lossy(&chunk).to_string(); let clean_text = strip_ansi(&text); startup_total_bytes = startup_total_bytes.saturating_add(chunk.len()); diff --git a/src/snapshot.rs b/src/snapshot.rs index d9b73e638..f8ef5fef5 100644 --- a/src/snapshot.rs +++ b/src/snapshot.rs @@ -20,7 +20,7 @@ //! the captured data, so they neither block the PTY reader thread nor race //! with subsequent grid mutations. -use alacritty_terminal::event::VoidListener; +use alacritty_terminal::event::EventListener; use alacritty_terminal::grid::Dimensions; use alacritty_terminal::index::{Column, Line, Point}; use alacritty_terminal::term::cell::{Cell, Flags}; @@ -84,7 +84,11 @@ impl Snapshot { /// Capture from a free-standing `Term` (used by tests and by the future /// `view`/`drive` clients that drive their own VT instances). - pub fn from_term(term: &Term) -> Self { + /// + /// Generic over `EventListener` so this accepts both the live + /// `Term` from `PtySession` and the + /// `Term` used in offline tests. + pub fn from_term(term: &Term) -> Self { let grid = term.grid(); let rows = grid.screen_lines() as u16; let cols = grid.columns() as u16; diff --git a/src/wrap.rs b/src/wrap.rs index b49d5a82f..2bd401405 100644 --- a/src/wrap.rs +++ b/src/wrap.rs @@ -629,7 +629,8 @@ pub(crate) async fn run_wrap( terminal_rows().unwrap_or(24), terminal_cols().unwrap_or(80), )?; - let mut terminal_query_parser = TerminalQueryParser::default(); + // Query responses (DSR/DA1/DA2/CPR) are answered by alacritty's + // `RelayEventListener` inside `PtySession`. eprintln!("[agent-relay] ready"); @@ -753,11 +754,6 @@ pub(crate) async fn run_wrap( chunk = pty_rx.recv() => { match chunk { Some(chunk) => { - // Terminal query responses (CSI 6n) - for response in terminal_query_parser.feed(&chunk) { - let _ = pty.write_all(response); - } - // Passthrough to user's terminal use tokio::io::AsyncWriteExt; let _ = stdout.write_all(&chunk).await;