From 82a38832b61c2b265cb56a33fa3a18491e064c11 Mon Sep 17 00:00:00 2001 From: Will Washburn Date: Sun, 17 May 2026 22:01:59 -0400 Subject: [PATCH 1/4] broker(pty): replace TerminalQueryParser with alacritty EventListener (#867) PR #836 wired the alacritty VT grid into every PtySession but passed VoidListener to Term::new, so alacritty parsed query sequences (DSR/DA1/DA2/CPR) and discarded the responses. A parallel hand-rolled TerminalQueryParser in helpers.rs answered those queries with a hardcoded 1;1 CPR reply that ignored the real cursor position. Replace VoidListener with RelayEventListener, which owns a std::sync::mpsc::Sender>. alacritty's send_event(PtyWrite) hands query response bytes to the channel; a dedicated drainer thread takes the writer lock and pushes them down the PTY. The listener is non-blocking so it is safe to call while processor+term locks are held, and the drainer exits when the listener is dropped at PtySession teardown. CPR responses now reflect the live cursor position (verified by the new unit test: ESC[3;5H ESC[6n yields ESC[3;5R, not ESC[1;1R). Delete the hand-rolled parser and its tests in helpers.rs, plus its three call sites in pty_worker.rs, wrap.rs, and the test module in main.rs. Make Snapshot::from_term generic over EventListener so it accepts both Term from a live PtySession and the Term used in offline tests. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../completed/2026-05/traj_piik8r6zu3i7.json | 89 +++++++++ .../completed/2026-05/traj_piik8r6zu3i7.md | 51 ++++++ .trajectories/index.json | 9 +- src/helpers.rs | 172 ------------------ src/main.rs | 29 +-- src/pty.rs | 164 ++++++++++++++++- src/pty_worker.rs | 6 +- src/snapshot.rs | 8 +- src/wrap.rs | 8 +- 9 files changed, 315 insertions(+), 221 deletions(-) create mode 100644 .trajectories/completed/2026-05/traj_piik8r6zu3i7.json create mode 100644 .trajectories/completed/2026-05/traj_piik8r6zu3i7.md diff --git a/.trajectories/completed/2026-05/traj_piik8r6zu3i7.json b/.trajectories/completed/2026-05/traj_piik8r6zu3i7.json new file mode 100644 index 000000000..9fe7acd47 --- /dev/null +++ b/.trajectories/completed/2026-05/traj_piik8r6zu3i7.json @@ -0,0 +1,89 @@ +{ + "id": "traj_piik8r6zu3i7", + "version": 1, + "task": { + "title": "Issue 867: RelayEventListener" + }, + "status": "completed", + "startedAt": "2026-05-18T01:56:18.236Z", + "completedAt": "2026-05-18T02:01:49.991Z", + "agents": [ + { + "name": "default", + "role": "lead", + "joinedAt": "2026-05-18T01:58:24.265Z" + } + ], + "chapters": [ + { + "id": "chap_xnbleth5fk5g", + "title": "Work", + "agentName": "default", + "startedAt": "2026-05-18T01:58:24.265Z", + "endedAt": "2026-05-18T02:01:49.991Z", + "events": [ + { + "ts": 1779069504266, + "type": "decision", + "content": "Drain writeback channel from a dedicated std::thread, not tokio task: Drain writeback channel from a dedicated std::thread, not tokio task", + "raw": { + "question": "Drain writeback channel from a dedicated std::thread, not tokio task", + "chosen": "Drain writeback channel from a dedicated std::thread, not tokio task", + "alternatives": [], + "reasoning": "The reader thread is a std::thread (not tokio) and it's the only place that calls processor.advance which fires send_event. Keeping the drainer as a std::thread that takes self.writer.lock() (parking_lot Mutex) avoids importing the tokio runtime into PtySession::spawn and matches the existing reader thread shape. Also avoids holding any async tasks at spawn time when no runtime is guaranteed." + }, + "significance": "high" + }, + { + "ts": 1779069510157, + "type": "decision", + "content": "Use std::sync::mpsc::Sender (sync, unbounded-ish) for writeback channel: Use std::sync::mpsc::Sender (sync, unbounded-ish) for writeback channel", + "raw": { + "question": "Use std::sync::mpsc::Sender (sync, unbounded-ish) for writeback channel", + "chosen": "Use std::sync::mpsc::Sender (sync, unbounded-ish) for writeback channel", + "alternatives": [], + "reasoning": "send_event must be non-blocking (called while holding processor+term locks). std mpsc Sender::send never blocks. A dedicated std::thread drains the receiver and writes to the PTY writer, decoupling potential write_all blocking from the parser's hot path. Avoids tokio runtime requirement in PtySession::spawn." + }, + "significance": "high" + }, + { + "ts": 1779069704857, + "type": "decision", + "content": "Made Snapshot::from_term generic over EventListener: Made Snapshot::from_term generic over EventListener", + "raw": { + "question": "Made Snapshot::from_term generic over EventListener", + "chosen": "Made Snapshot::from_term generic over EventListener", + "alternatives": [], + "reasoning": "Changing PtySession's term field from Term to Term broke Snapshot::capture which calls pty.with_term(Self::from_term). Generic over L: EventListener keeps offline tests on VoidListener and live capture on RelayEventListener without an extra wrapper." + }, + "significance": "high" + }, + { + "ts": 1779069705573, + "type": "decision", + "content": "Also removed parser call sites in wrap.rs and main.rs (not just pty_worker.rs): Also removed parser call sites in wrap.rs and main.rs (not just pty_worker.rs)", + "raw": { + "question": "Also removed parser call sites in wrap.rs and main.rs (not just pty_worker.rs)", + "chosen": "Also removed parser call sites in wrap.rs and main.rs (not just pty_worker.rs)", + "alternatives": [], + "reasoning": "Issue scope said pty_worker.rs, but TerminalQueryParser was also used by run_wrap (wrap.rs) and exported via helpers + asserted in main.rs tests. Both used the same PtySession, so the listener now handles them. Leaving them would have broken the import-removal in main.rs." + }, + "significance": "high" + } + ] + } + ], + "retrospective": { + "summary": "Replaced hand-rolled TerminalQueryParser with alacritty RelayEventListener. Listener owns a std::sync::mpsc::Sender; alacritty's send_event(Event::PtyWrite) pushes query responses (DSR/DA1/DA2/CPR) onto the channel, drained by a dedicated thread that writes to the PTY. CPR responses now reflect real cursor position instead of hardcoded 1;1. Deleted the parser and call sites in pty_worker.rs, wrap.rs, and main.rs; made Snapshot::from_term generic over EventListener. All 615 tests pass; clippy clean.", + "approach": "Standard approach", + "confidence": 0.85 + }, + "commits": [], + "filesChanged": [], + "projectId": "/Users/will/Projects/AgentWorkforce/relay/.claude/worktrees/agent-a496ab143c411ff7c", + "tags": [], + "_trace": { + "startRef": "5fc8a131561feedfe990fc265f224101f6f267c4", + "endRef": "5fc8a131561feedfe990fc265f224101f6f267c4" + } +} diff --git a/.trajectories/completed/2026-05/traj_piik8r6zu3i7.md b/.trajectories/completed/2026-05/traj_piik8r6zu3i7.md new file mode 100644 index 000000000..b15a7d2b7 --- /dev/null +++ b/.trajectories/completed/2026-05/traj_piik8r6zu3i7.md @@ -0,0 +1,51 @@ +# Trajectory: Issue 867: RelayEventListener + +> **Status:** ✅ Completed +> **Confidence:** 85% +> **Started:** May 17, 2026 at 09:56 PM +> **Completed:** May 17, 2026 at 10:01 PM + +--- + +## Summary + +Replaced hand-rolled TerminalQueryParser with alacritty RelayEventListener. Listener owns a std::sync::mpsc::Sender; alacritty's send_event(Event::PtyWrite) pushes query responses (DSR/DA1/DA2/CPR) onto the channel, drained by a dedicated thread that writes to the PTY. CPR responses now reflect real cursor position instead of hardcoded 1;1. Deleted the parser and call sites in pty_worker.rs, wrap.rs, and main.rs; made Snapshot::from_term generic over EventListener. All 615 tests pass; clippy clean. + +**Approach:** Standard approach + +--- + +## Key Decisions + +### Drain writeback channel from a dedicated std::thread, not tokio task + +- **Chose:** Drain writeback channel from a dedicated std::thread, not tokio task +- **Reasoning:** The reader thread is a std::thread (not tokio) and it's the only place that calls processor.advance which fires send_event. Keeping the drainer as a std::thread that takes self.writer.lock() (parking_lot Mutex) avoids importing the tokio runtime into PtySession::spawn and matches the existing reader thread shape. Also avoids holding any async tasks at spawn time when no runtime is guaranteed. + +### Use std::sync::mpsc::Sender (sync, unbounded-ish) for writeback channel + +- **Chose:** Use std::sync::mpsc::Sender (sync, unbounded-ish) for writeback channel +- **Reasoning:** send_event must be non-blocking (called while holding processor+term locks). std mpsc Sender::send never blocks. A dedicated std::thread drains the receiver and writes to the PTY writer, decoupling potential write_all blocking from the parser's hot path. Avoids tokio runtime requirement in PtySession::spawn. + +### Made Snapshot::from_term generic over EventListener + +- **Chose:** Made Snapshot::from_term generic over EventListener +- **Reasoning:** Changing PtySession's term field from Term to Term broke Snapshot::capture which calls pty.with_term(Self::from_term). Generic over L: EventListener keeps offline tests on VoidListener and live capture on RelayEventListener without an extra wrapper. + +### Also removed parser call sites in wrap.rs and main.rs (not just pty_worker.rs) + +- **Chose:** Also removed parser call sites in wrap.rs and main.rs (not just pty_worker.rs) +- **Reasoning:** Issue scope said pty_worker.rs, but TerminalQueryParser was also used by run_wrap (wrap.rs) and exported via helpers + asserted in main.rs tests. Both used the same PtySession, so the listener now handles them. Leaving them would have broken the import-removal in main.rs. + +--- + +## Chapters + +### 1. Work + +_Agent: default_ + +- Drain writeback channel from a dedicated std::thread, not tokio task: Drain writeback channel from a dedicated std::thread, not tokio task +- Use std::sync::mpsc::Sender (sync, unbounded-ish) for writeback channel: Use std::sync::mpsc::Sender (sync, unbounded-ish) for writeback channel +- Made Snapshot::from_term generic over EventListener: Made Snapshot::from_term generic over EventListener +- Also removed parser call sites in wrap.rs and main.rs (not just pty_worker.rs): Also removed parser call sites in wrap.rs and main.rs (not just pty_worker.rs) diff --git a/.trajectories/index.json b/.trajectories/index.json index e791eec49..235f46094 100644 --- a/.trajectories/index.json +++ b/.trajectories/index.json @@ -1,6 +1,6 @@ { "version": 1, - "lastUpdated": "2026-05-17T14:33:32.468Z", + "lastUpdated": "2026-05-18T02:01:50.133Z", "trajectories": { "traj_9gq96irkj00s": { "title": "Update relay to use published relaycast Rust reclaim fix", @@ -591,6 +591,13 @@ "startedAt": "2026-05-17T14:19:10.603Z", "completedAt": "2026-05-17T14:33:32.293Z", "path": ".trajectories/completed/2026-05/traj_cbmwd07phhm2.json" + }, + "traj_piik8r6zu3i7": { + "title": "Issue 867: RelayEventListener", + "status": "completed", + "startedAt": "2026-05-18T01:56:18.236Z", + "completedAt": "2026-05-18T02:01:49.991Z", + "path": "/Users/will/Projects/AgentWorkforce/relay/.claude/worktrees/agent-a496ab143c411ff7c/.trajectories/completed/2026-05/traj_piik8r6zu3i7.json" } } } diff --git a/src/helpers.rs b/src/helpers.rs index 2251bce48..555acc231 100644 --- a/src/helpers.rs +++ b/src/helpers.rs @@ -216,89 +216,6 @@ pub(crate) fn check_echo_in_output(output: &str, expected: &str) -> bool { clean.contains(expected) } -#[derive(Debug, Clone, Copy, Default)] -pub(crate) enum TerminalQueryState { - #[default] - Idle, - Esc, - Csi, - CsiQmark, - Csi6, - CsiQmark6, - /// CSI 0 — could be DA1 param (ESC [ 0 c) - Csi0, - /// CSI 5 — could be DSR (ESC [ 5 n) - Csi5, - /// CSI > — DA2 prefix (ESC [ > c) - CsiGt, -} - -#[derive(Debug, Default)] -pub(crate) struct TerminalQueryParser { - pub(crate) state: TerminalQueryState, -} - -impl TerminalQueryParser { - pub(crate) fn feed(&mut self, chunk: &[u8]) -> Vec<&'static [u8]> { - const ESC: u8 = 0x1b; - const CSI: u8 = b'['; - const QMARK: u8 = b'?'; - const SIX: u8 = b'6'; - const N: u8 = b'n'; - - let mut out = Vec::new(); - for byte in chunk { - self.state = match (self.state, *byte) { - (_, ESC) => TerminalQueryState::Esc, - (TerminalQueryState::Esc, CSI) => TerminalQueryState::Csi, - (TerminalQueryState::Csi, QMARK) => TerminalQueryState::CsiQmark, - (TerminalQueryState::Csi, b'>') => TerminalQueryState::CsiGt, - (TerminalQueryState::Csi, SIX) => TerminalQueryState::Csi6, - (TerminalQueryState::Csi, b'0') => TerminalQueryState::Csi0, - (TerminalQueryState::Csi, b'5') => TerminalQueryState::Csi5, - (TerminalQueryState::CsiQmark, SIX) => TerminalQueryState::CsiQmark6, - // CSI 6 n → Cursor Position Report (CPR) - (TerminalQueryState::Csi6, N) => { - out.push(b"\x1b[1;1R".as_slice()); - TerminalQueryState::Idle - } - (TerminalQueryState::CsiQmark6, N) => { - out.push(b"\x1b[?1;1R".as_slice()); - TerminalQueryState::Idle - } - // CSI c → DA1 (Device Attributes primary, no params) - (TerminalQueryState::Csi, b'c') => { - out.push(b"\x1b[?1;2c".as_slice()); // VT100 with AVO - TerminalQueryState::Idle - } - // CSI 0 c → DA1 with explicit 0 param - (TerminalQueryState::Csi0, b'c') => { - out.push(b"\x1b[?1;2c".as_slice()); - TerminalQueryState::Idle - } - // CSI > c → DA2 (Device Attributes secondary) - (TerminalQueryState::CsiGt, b'c') => { - out.push(b"\x1b[>1;10;0c".as_slice()); // VT100, version 10 - TerminalQueryState::Idle - } - // CSI 5 n → DSR (Device Status Report) - (TerminalQueryState::Csi5, N) => { - out.push(b"\x1b[0n".as_slice()); // terminal OK - TerminalQueryState::Idle - } - _ => TerminalQueryState::Idle, - }; - } - out - } -} - -#[cfg(test)] -pub(crate) fn terminal_query_responses(chunk: &[u8]) -> Vec<&'static [u8]> { - let mut parser = TerminalQueryParser::default(); - parser.feed(chunk) -} - fn workspace_context_label( workspace_id: Option<&str>, workspace_alias: Option<&str>, @@ -1196,46 +1113,6 @@ mod tests { assert_eq!(detector.detect_activity(&output, expected_echo), None); } - #[test] - fn terminal_query_da1_no_param() { - let mut parser = TerminalQueryParser::default(); - let responses = parser.feed(b"\x1b[c"); - assert_eq!(responses.len(), 1); - assert_eq!(responses[0], b"\x1b[?1;2c"); - } - - #[test] - fn terminal_query_da1_with_zero() { - let mut parser = TerminalQueryParser::default(); - let responses = parser.feed(b"\x1b[0c"); - assert_eq!(responses.len(), 1); - assert_eq!(responses[0], b"\x1b[?1;2c"); - } - - #[test] - fn terminal_query_da2() { - let mut parser = TerminalQueryParser::default(); - let responses = parser.feed(b"\x1b[>c"); - assert_eq!(responses.len(), 1); - assert_eq!(responses[0], b"\x1b[>1;10;0c"); - } - - #[test] - fn terminal_query_dsr() { - let mut parser = TerminalQueryParser::default(); - let responses = parser.feed(b"\x1b[5n"); - assert_eq!(responses.len(), 1); - assert_eq!(responses[0], b"\x1b[0n"); - } - - #[test] - fn terminal_query_cpr_still_works() { - let mut parser = TerminalQueryParser::default(); - let responses = parser.feed(b"\x1b[6n"); - assert_eq!(responses.len(), 1); - assert_eq!(responses[0], b"\x1b[1;1R"); - } - // ==================== detect_codex_model_prompt tests ==================== #[test] @@ -1469,55 +1346,6 @@ mod tests { assert!(!has_allow); } - // ==================== terminal query parser edge cases ==================== - - #[test] - fn terminal_query_multiple_queries_in_one_chunk() { - let mut parser = TerminalQueryParser::default(); - // DA1 + CPR + DSR all in one chunk - let responses = parser.feed(b"\x1b[c\x1b[6n\x1b[5n"); - assert_eq!(responses.len(), 3); - assert_eq!(responses[0], b"\x1b[?1;2c"); // DA1 - assert_eq!(responses[1], b"\x1b[1;1R"); // CPR - assert_eq!(responses[2], b"\x1b[0n"); // DSR - } - - #[test] - fn terminal_query_interleaved_with_text() { - let mut parser = TerminalQueryParser::default(); - // Normal text + DA1 query + more text - let responses = parser.feed(b"Hello\x1b[cWorld"); - assert_eq!(responses.len(), 1); - assert_eq!(responses[0], b"\x1b[?1;2c"); - } - - #[test] - fn terminal_query_split_across_chunks() { - let mut parser = TerminalQueryParser::default(); - // ESC in first chunk, [6n in second - let r1 = parser.feed(b"\x1b"); - assert_eq!(r1.len(), 0); - let r2 = parser.feed(b"[6n"); - assert_eq!(r2.len(), 1); - assert_eq!(r2[0], b"\x1b[1;1R"); - } - - #[test] - fn terminal_query_incomplete_sequence_reset() { - let mut parser = TerminalQueryParser::default(); - // ESC [ but then a regular char (not a query) — should reset - let responses = parser.feed(b"\x1b[A"); - assert_eq!(responses.len(), 0, "cursor up should not generate response"); - } - - #[test] - fn terminal_query_qmark_cpr() { - let mut parser = TerminalQueryParser::default(); - let responses = parser.feed(b"\x1b[?6n"); - assert_eq!(responses.len(), 1); - assert_eq!(responses[0], b"\x1b[?1;1R"); - } - // ==================== throttle edge cases ==================== #[test] diff --git a/src/main.rs b/src/main.rs index 54face2ce..6e8596b84 100644 --- a/src/main.rs +++ b/src/main.rs @@ -26,7 +26,7 @@ use helpers::{ detect_codex_model_prompt, detect_gemini_action_required, detect_gemini_trust_prompt, detect_gemini_untrusted_banner, detect_opencode_permission_prompt, floor_char_boundary, is_auto_suggestion, is_bypass_selection_menu, is_in_editor_mode, is_self_name, - normalize_cli_name, parse_cli_command, strip_ansi, TerminalQueryParser, + normalize_cli_name, parse_cli_command, strip_ansi, }; use listen_api::{broadcast_if_relevant, listen_api_router, ListenApiConfig, ListenApiRequest}; use routing::display_target_for_dashboard; @@ -5500,7 +5500,7 @@ mod tests { time::{Duration, Instant}, }; - use crate::helpers::{format_injection, terminal_query_responses}; + use crate::helpers::format_injection; use relay_broker::protocol::{MessageInjectionMode, RelayDelivery}; use serde_json::{json, Value}; @@ -5515,7 +5515,7 @@ mod tests { normalize_sender, relaycast_spawn_control_dedup_key, relaycast_ws_control_dedup_key, relaycast_ws_should_apply_local_spawn_echo_dedup, relaycast_ws_spawn_token, sender_is_dashboard_label, should_clear_pending_delivery_for_event, strip_ansi, - AgentRuntime, PendingDelivery, ProtocolHeadlessProvider, TerminalQueryParser, + AgentRuntime, PendingDelivery, ProtocolHeadlessProvider, }; use crate::helpers::floor_char_boundary; use relay_broker::dedup::DedupCache; @@ -6349,29 +6349,6 @@ mod tests { assert!(should_clear_pending_delivery_for_event(None, Some("evt_1"))); } - #[test] - fn terminal_query_responses_standard_cpr() { - let responses = terminal_query_responses(b"\x1b[6n"); - assert_eq!(responses, vec![b"\x1b[1;1R".as_slice()]); - } - - #[test] - fn terminal_query_parser_handles_split_sequences() { - let mut parser = TerminalQueryParser::default(); - assert!(parser.feed(b"\x1b[").is_empty()); - assert!(parser.feed(b"6").is_empty()); - let responses = parser.feed(b"n"); - assert_eq!(responses, vec![b"\x1b[1;1R".as_slice()]); - } - - #[test] - fn terminal_query_parser_handles_private_cpr() { - let mut parser = TerminalQueryParser::default(); - assert!(parser.feed(b"\x1b[?6").is_empty()); - let responses = parser.feed(b"n"); - assert_eq!(responses, vec![b"\x1b[?1;1R".as_slice()]); - } - // ==================== strip_ansi tests ==================== #[test] diff --git a/src/pty.rs b/src/pty.rs index b4dbccf5a..18e775e97 100644 --- a/src/pty.rs +++ b/src/pty.rs @@ -5,12 +5,12 @@ use std::{ path::Path, sync::{ atomic::{AtomicBool, Ordering}, - Arc, + mpsc as std_mpsc, Arc, }, thread, }; -use alacritty_terminal::event::VoidListener; +use alacritty_terminal::event::{Event, EventListener}; use alacritty_terminal::grid::Dimensions; use alacritty_terminal::index::{Column, Line, Point}; use alacritty_terminal::term::{Config, Term}; @@ -20,6 +20,43 @@ use parking_lot::Mutex; use portable_pty::{native_pty_system, CommandBuilder, PtySize}; use tokio::sync::mpsc; +/// Forwards alacritty's terminal events back to the PTY's stdin so the +/// child process sees real responses to its query sequences (DSR, DA1, +/// DA2, CPR, …). alacritty fills in the response payloads using the +/// real grid state — so CPR replies carry the actual cursor position +/// rather than the old hand-rolled `1;1` placeholder. +/// +/// `send_event` is invoked from inside `Processor::advance` while the +/// processor and term locks are held, so it must be non-blocking. We +/// hand the bytes off to a `std::sync::mpsc` channel; a dedicated +/// drainer thread takes the writer lock and pushes them down the PTY, +/// decoupling potential `write_all` blocking from the parser hot path. +#[derive(Clone)] +pub struct RelayEventListener { + tx: std_mpsc::Sender>, +} + +impl RelayEventListener { + fn new(tx: std_mpsc::Sender>) -> Self { + Self { tx } + } +} + +impl EventListener for RelayEventListener { + fn send_event(&self, event: Event) { + // Only `PtyWrite` actually needs to round-trip to the child. + // Title/colour/clipboard/etc. events are intentionally dropped — + // the broker isn't a real UI, so there is nothing meaningful to + // do with them. + if let Event::PtyWrite(text) = event { + // Best-effort send. If the drainer thread has exited (PTY + // teardown), silently drop — telemetry/logging must never + // break terminal emulation. + let _ = self.tx.send(text.into_bytes()); + } + } +} + /// Cell dimensions for a parsed VT grid. Implements /// `alacritty_terminal::grid::Dimensions` so `Term::new` / `Term::resize` /// accept it directly without pulling in the test-helper `TermSize`. @@ -68,7 +105,12 @@ pub struct PtySession { /// VT100 grid kept in sync with PTY output. The reader thread /// advances `processor` on every chunk; queries (`screen_text`, /// `cursor_position`, `cell_at`) read `term` under the same lock. - term: Arc>>, + /// + /// The `RelayEventListener` is plumbed into `Term::new` so that + /// query responses (DSR/DA1/DA2/CPR) generated by alacritty are + /// written back to the PTY's stdin — giving the child accurate + /// responses based on the real grid state. + term: Arc>>, /// Shared with the reader thread, which holds the actual lock most /// of the time. Kept on the struct so `resize()` can take the same /// lock the reader uses — preventing the child's post-resize @@ -168,13 +210,36 @@ impl PtySession { .context("failed to take pty writer")?; let size = GridSize::from_pty(rows, cols); - let term = Arc::new(Mutex::new(Term::new( - Config::default(), - &size, - VoidListener, - ))); + // Writeback channel for query responses generated by alacritty + // (DSR/DA1/DA2/CPR). The listener is sync and non-blocking; the + // drainer thread below takes the writer lock and pushes the + // bytes down the PTY. + let (writeback_tx, writeback_rx) = std_mpsc::channel::>(); + let listener = RelayEventListener::new(writeback_tx); + let term = Arc::new(Mutex::new(Term::new(Config::default(), &size, listener))); let processor = Arc::new(Mutex::new(Processor::new())); + // Drainer: receives writeback bytes from the listener and pushes + // them to the PTY writer. Lives on a std::thread so it doesn't + // need a tokio runtime. Exits when the listener is dropped (i.e. + // when `term` is dropped, which happens at `PtySession` drop). + let writer_arc: Arc>> = Arc::new(Mutex::new(writer)); + let writer_for_drainer = writer_arc.clone(); + thread::spawn(move || { + while let Ok(bytes) = writeback_rx.recv() { + if bytes.is_empty() { + continue; + } + let mut guard = writer_for_drainer.lock(); + if guard.write_all(&bytes).is_err() { + break; + } + if guard.flush().is_err() { + break; + } + } + }); + let (tx, rx) = mpsc::channel(256); let term_clone = term.clone(); let processor_clone = processor.clone(); @@ -189,6 +254,12 @@ impl PtySession { // takes the same order; matching prevents // deadlock and ensures bytes are never // parsed against stale dimensions. + // + // The listener inside `term` may send bytes + // onto the writeback channel while we hold + // these locks — that send is non-blocking + // (std::mpsc), so no deadlock with the + // writer lock taken by the drainer thread. let mut processor_guard = processor_clone.lock(); let mut term_guard = term_clone.lock(); processor_guard.advance(&mut *term_guard, &buf[..n]); @@ -205,7 +276,7 @@ impl PtySession { Ok(( Self { master: pair.master, - writer: Arc::new(Mutex::new(writer)), + writer: writer_arc, child: Arc::new(Mutex::new(child)), child_pid, reaped: Arc::new(AtomicBool::new(false)), @@ -311,7 +382,7 @@ impl PtySession { /// /// Keep the closure short — it blocks the reader thread from advancing /// the VT parser while it runs. - pub fn with_term(&self, f: impl FnOnce(&Term) -> R) -> R { + pub fn with_term(&self, f: impl FnOnce(&Term) -> R) -> R { let term = self.term.lock(); f(&term) } @@ -742,4 +813,77 @@ mod tests { assert_eq!(String::from_utf8_lossy(&collected), "xterm-256color"); } + + // ---- RelayEventListener wiring tests ---- + // + // These drive a free-standing `Term` so we can + // assert that alacritty itself answers DSR/CPR query sequences with + // bytes that match real grid state — replacing the old hand-rolled + // `TerminalQueryParser` that hardcoded `1;1` for CPR. + + use super::RelayEventListener; + use std::sync::mpsc as std_mpsc; + use std::time::Duration as StdDuration; + + fn drive_listener( + rows: u16, + cols: u16, + chunks: &[&[u8]], + ) -> (std_mpsc::Receiver>, Term) { + let size = GridSize { + columns: cols as usize, + screen_lines: rows as usize, + }; + let (tx, rx) = std_mpsc::channel::>(); + let listener = RelayEventListener::new(tx); + let mut term = Term::new(Config::default(), &size, listener); + let mut processor: Processor = Processor::new(); + for chunk in chunks { + processor.advance(&mut term, chunk); + } + (rx, term) + } + + fn drain_writeback(rx: &std_mpsc::Receiver>) -> Vec { + let mut out = Vec::new(); + // First reply is queued synchronously inside processor.advance, + // but try_recv is enough since send is non-blocking. + while let Ok(bytes) = rx.recv_timeout(StdDuration::from_millis(50)) { + out.extend_from_slice(&bytes); + } + out + } + + #[test] + fn listener_answers_dsr_with_terminal_ok() { + // ESC[5n is the Device Status Report query — alacritty must + // reply with ESC[0n ("terminal OK"). + let (rx, _term) = drive_listener(24, 80, &[b"\x1b[5n"]); + let writeback = drain_writeback(&rx); + assert_eq!( + writeback, + b"\x1b[0n", + "DSR ESC[5n must produce ESC[0n; got {writeback:?}" + ); + } + + #[test] + fn listener_answers_cpr_with_real_cursor_position() { + // Move the cursor to row 3, col 5 (1-indexed CUP), then issue + // a CPR query. alacritty should reply with the **real** cursor + // position — `ESC[3;5R` — not the old hardcoded `1;1`. + let (rx, term) = drive_listener(24, 80, &[b"\x1b[3;5H", b"\x1b[6n"]); + + // Sanity-check the grid actually moved the cursor. + let point = term.grid().cursor.point; + assert_eq!(point.line.0, 2, "row 3 (1-indexed) = line 2 (0-indexed)"); + assert_eq!(point.column.0, 4, "col 5 (1-indexed) = column 4 (0-indexed)"); + + let writeback = drain_writeback(&rx); + assert_eq!( + writeback, + b"\x1b[3;5R", + "CPR ESC[6n must reflect real cursor position; got {writeback:?}" + ); + } } diff --git a/src/pty_worker.rs b/src/pty_worker.rs index 3362cbc78..aed8cd378 100644 --- a/src/pty_worker.rs +++ b/src/pty_worker.rs @@ -207,7 +207,8 @@ pub(crate) async fn run_pty_worker(cmd: PtyCommand) -> Result<()> { let (init_rows, init_cols) = get_terminal_size().unwrap_or((24, 80)); let (pty, mut pty_rx) = PtySession::spawn(&resolved_cli, &effective_args, init_rows, init_cols)?; - let mut terminal_query_parser = TerminalQueryParser::default(); + // Query responses (DSR/DA1/DA2/CPR) are answered by alacritty's + // `RelayEventListener` inside `PtySession` — no parser needed here. let (out_tx, mut out_rx) = mpsc::channel::>(1024); let writer_task = tokio::spawn(async move { @@ -521,9 +522,6 @@ pub(crate) async fn run_pty_worker(cmd: PtyCommand) -> Result<()> { reported_idle = false; // Child is provably alive — reset the no-PID exit counter. pty.reset_no_pid_checks(); - for response in terminal_query_parser.feed(&chunk) { - let _ = pty.write_all(response); - } let text = String::from_utf8_lossy(&chunk).to_string(); let clean_text = strip_ansi(&text); startup_total_bytes = startup_total_bytes.saturating_add(chunk.len()); diff --git a/src/snapshot.rs b/src/snapshot.rs index d9b73e638..f8ef5fef5 100644 --- a/src/snapshot.rs +++ b/src/snapshot.rs @@ -20,7 +20,7 @@ //! the captured data, so they neither block the PTY reader thread nor race //! with subsequent grid mutations. -use alacritty_terminal::event::VoidListener; +use alacritty_terminal::event::EventListener; use alacritty_terminal::grid::Dimensions; use alacritty_terminal::index::{Column, Line, Point}; use alacritty_terminal::term::cell::{Cell, Flags}; @@ -84,7 +84,11 @@ impl Snapshot { /// Capture from a free-standing `Term` (used by tests and by the future /// `view`/`drive` clients that drive their own VT instances). - pub fn from_term(term: &Term) -> Self { + /// + /// Generic over `EventListener` so this accepts both the live + /// `Term` from `PtySession` and the + /// `Term` used in offline tests. + pub fn from_term(term: &Term) -> Self { let grid = term.grid(); let rows = grid.screen_lines() as u16; let cols = grid.columns() as u16; diff --git a/src/wrap.rs b/src/wrap.rs index b49d5a82f..2bd401405 100644 --- a/src/wrap.rs +++ b/src/wrap.rs @@ -629,7 +629,8 @@ pub(crate) async fn run_wrap( terminal_rows().unwrap_or(24), terminal_cols().unwrap_or(80), )?; - let mut terminal_query_parser = TerminalQueryParser::default(); + // Query responses (DSR/DA1/DA2/CPR) are answered by alacritty's + // `RelayEventListener` inside `PtySession`. eprintln!("[agent-relay] ready"); @@ -753,11 +754,6 @@ pub(crate) async fn run_wrap( chunk = pty_rx.recv() => { match chunk { Some(chunk) => { - // Terminal query responses (CSI 6n) - for response in terminal_query_parser.feed(&chunk) { - let _ = pty.write_all(response); - } - // Passthrough to user's terminal use tokio::io::AsyncWriteExt; let _ = stdout.write_all(&chunk).await; From c95a6095d0133056c41fbbefeba293b2fe62ddbd Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Mon, 18 May 2026 02:02:54 +0000 Subject: [PATCH 2/4] style: auto-format Rust code with cargo fmt --- src/pty.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/pty.rs b/src/pty.rs index 18e775e97..e38ac20bc 100644 --- a/src/pty.rs +++ b/src/pty.rs @@ -861,8 +861,7 @@ mod tests { let (rx, _term) = drive_listener(24, 80, &[b"\x1b[5n"]); let writeback = drain_writeback(&rx); assert_eq!( - writeback, - b"\x1b[0n", + writeback, b"\x1b[0n", "DSR ESC[5n must produce ESC[0n; got {writeback:?}" ); } @@ -877,12 +876,14 @@ mod tests { // Sanity-check the grid actually moved the cursor. let point = term.grid().cursor.point; assert_eq!(point.line.0, 2, "row 3 (1-indexed) = line 2 (0-indexed)"); - assert_eq!(point.column.0, 4, "col 5 (1-indexed) = column 4 (0-indexed)"); + assert_eq!( + point.column.0, 4, + "col 5 (1-indexed) = column 4 (0-indexed)" + ); let writeback = drain_writeback(&rx); assert_eq!( - writeback, - b"\x1b[3;5R", + writeback, b"\x1b[3;5R", "CPR ESC[6n must reflect real cursor position; got {writeback:?}" ); } From 71358b1d0fa2ddb8228cd85858b469a5f83a7028 Mon Sep 17 00:00:00 2001 From: Will Washburn Date: Sun, 17 May 2026 22:11:18 -0400 Subject: [PATCH 3/4] broker(pty): bound writeback queue, add DA1 test, clean trajectory paths MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses PR #879 review: - Bound writeback channel via sync_channel(128) + try_send. Prevents unbounded memory growth if the child floods query sequences while the drainer is stuck on write_all. Overflow logs a warn and drops the response. - Add listener_answers_da1_with_vt102_ident regression test so DA1 (startup-critical for many CLIs) is exercised before the old TerminalQueryParser path is removed. - Strip absolute /Users/... paths from .trajectories metadata (index.json, traj_piik8r6zu3i7.json projectId) — repo-relative now. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../completed/2026-05/traj_piik8r6zu3i7.json | 2 +- .trajectories/index.json | 2 +- src/pty.rs | 62 +++++++++++++++---- 3 files changed, 51 insertions(+), 15 deletions(-) diff --git a/.trajectories/completed/2026-05/traj_piik8r6zu3i7.json b/.trajectories/completed/2026-05/traj_piik8r6zu3i7.json index 9fe7acd47..3a858b050 100644 --- a/.trajectories/completed/2026-05/traj_piik8r6zu3i7.json +++ b/.trajectories/completed/2026-05/traj_piik8r6zu3i7.json @@ -80,7 +80,7 @@ }, "commits": [], "filesChanged": [], - "projectId": "/Users/will/Projects/AgentWorkforce/relay/.claude/worktrees/agent-a496ab143c411ff7c", + "projectId": "relay", "tags": [], "_trace": { "startRef": "5fc8a131561feedfe990fc265f224101f6f267c4", diff --git a/.trajectories/index.json b/.trajectories/index.json index 235f46094..5c4ceb6d4 100644 --- a/.trajectories/index.json +++ b/.trajectories/index.json @@ -597,7 +597,7 @@ "status": "completed", "startedAt": "2026-05-18T01:56:18.236Z", "completedAt": "2026-05-18T02:01:49.991Z", - "path": "/Users/will/Projects/AgentWorkforce/relay/.claude/worktrees/agent-a496ab143c411ff7c/.trajectories/completed/2026-05/traj_piik8r6zu3i7.json" + "path": ".trajectories/completed/2026-05/traj_piik8r6zu3i7.json" } } } diff --git a/src/pty.rs b/src/pty.rs index e38ac20bc..48d7950d0 100644 --- a/src/pty.rs +++ b/src/pty.rs @@ -20,6 +20,14 @@ use parking_lot::Mutex; use portable_pty::{native_pty_system, CommandBuilder, PtySize}; use tokio::sync::mpsc; +/// Upper bound on queued writeback responses (DSR/DA1/DA2/CPR replies). +/// Bounded so a misbehaving child that floods query sequences while the +/// drainer is stuck on `write_all` cannot grow the queue without limit +/// and OOM the broker. 128 entries is well past any real burst — a +/// healthy terminal interaction sees fewer than a handful of pending +/// replies at a time. +const WRITEBACK_QUEUE_DEPTH: usize = 128; + /// Forwards alacritty's terminal events back to the PTY's stdin so the /// child process sees real responses to its query sequences (DSR, DA1, /// DA2, CPR, …). alacritty fills in the response payloads using the @@ -28,16 +36,17 @@ use tokio::sync::mpsc; /// /// `send_event` is invoked from inside `Processor::advance` while the /// processor and term locks are held, so it must be non-blocking. We -/// hand the bytes off to a `std::sync::mpsc` channel; a dedicated -/// drainer thread takes the writer lock and pushes them down the PTY, -/// decoupling potential `write_all` blocking from the parser hot path. +/// hand the bytes off to a bounded `std::sync::mpsc::sync_channel` via +/// `try_send`; a dedicated drainer thread takes the writer lock and +/// pushes them down the PTY, decoupling potential `write_all` blocking +/// from the parser hot path. #[derive(Clone)] pub struct RelayEventListener { - tx: std_mpsc::Sender>, + tx: std_mpsc::SyncSender>, } impl RelayEventListener { - fn new(tx: std_mpsc::Sender>) -> Self { + fn new(tx: std_mpsc::SyncSender>) -> Self { Self { tx } } } @@ -49,10 +58,19 @@ impl EventListener for RelayEventListener { // the broker isn't a real UI, so there is nothing meaningful to // do with them. if let Event::PtyWrite(text) = event { - // Best-effort send. If the drainer thread has exited (PTY - // teardown), silently drop — telemetry/logging must never - // break terminal emulation. - let _ = self.tx.send(text.into_bytes()); + // Non-blocking try_send: if the queue is full (drainer + // backed up behind a blocked write_all) or the drainer has + // exited (PTY teardown), drop the reply. Telemetry must be + // infallible, but a queue overflow is worth flagging. + match self.tx.try_send(text.into_bytes()) { + Ok(()) | Err(std_mpsc::TrySendError::Disconnected(_)) => {} + Err(std_mpsc::TrySendError::Full(_)) => { + tracing::warn!( + depth = WRITEBACK_QUEUE_DEPTH, + "pty writeback queue full; dropping terminal query response" + ); + } + } } } } @@ -213,8 +231,10 @@ impl PtySession { // Writeback channel for query responses generated by alacritty // (DSR/DA1/DA2/CPR). The listener is sync and non-blocking; the // drainer thread below takes the writer lock and pushes the - // bytes down the PTY. - let (writeback_tx, writeback_rx) = std_mpsc::channel::>(); + // bytes down the PTY. Bounded to prevent unbounded growth if + // the drainer is stuck behind a blocked `write_all`. + let (writeback_tx, writeback_rx) = + std_mpsc::sync_channel::>(WRITEBACK_QUEUE_DEPTH); let listener = RelayEventListener::new(writeback_tx); let term = Arc::new(Mutex::new(Term::new(Config::default(), &size, listener))); let processor = Arc::new(Mutex::new(Processor::new())); @@ -821,7 +841,7 @@ mod tests { // bytes that match real grid state — replacing the old hand-rolled // `TerminalQueryParser` that hardcoded `1;1` for CPR. - use super::RelayEventListener; + use super::{RelayEventListener, WRITEBACK_QUEUE_DEPTH}; use std::sync::mpsc as std_mpsc; use std::time::Duration as StdDuration; @@ -834,7 +854,7 @@ mod tests { columns: cols as usize, screen_lines: rows as usize, }; - let (tx, rx) = std_mpsc::channel::>(); + let (tx, rx) = std_mpsc::sync_channel::>(WRITEBACK_QUEUE_DEPTH); let listener = RelayEventListener::new(tx); let mut term = Term::new(Config::default(), &size, listener); let mut processor: Processor = Processor::new(); @@ -866,6 +886,22 @@ mod tests { ); } + #[test] + fn listener_answers_da1_with_vt102_ident() { + // ESC[c is the Primary Device Attributes query — alacritty + // identifies as a VT102 (ESC[?6c). This is startup-critical: + // many CLIs hang at boot if DA1 goes unanswered. The exact + // ident byte differs between terminals (xterm uses ?1;2c) — + // we just need *some* well-formed DA1 reply to come back. + let (rx, _term) = drive_listener(24, 80, &[b"\x1b[c"]); + let writeback = drain_writeback(&rx); + assert_eq!( + writeback, + b"\x1b[?6c", + "DA1 ESC[c must produce a VT102 ident (ESC[?6c); got {writeback:?}" + ); + } + #[test] fn listener_answers_cpr_with_real_cursor_position() { // Move the cursor to row 3, col 5 (1-indexed CUP), then issue From 51c9372b8565295d3682374b98b8f420dc685e7b Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Mon, 18 May 2026 02:11:44 +0000 Subject: [PATCH 4/4] style: auto-format Rust code with cargo fmt --- src/pty.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/pty.rs b/src/pty.rs index 48d7950d0..c22a2628f 100644 --- a/src/pty.rs +++ b/src/pty.rs @@ -233,8 +233,7 @@ impl PtySession { // drainer thread below takes the writer lock and pushes the // bytes down the PTY. Bounded to prevent unbounded growth if // the drainer is stuck behind a blocked `write_all`. - let (writeback_tx, writeback_rx) = - std_mpsc::sync_channel::>(WRITEBACK_QUEUE_DEPTH); + let (writeback_tx, writeback_rx) = std_mpsc::sync_channel::>(WRITEBACK_QUEUE_DEPTH); let listener = RelayEventListener::new(writeback_tx); let term = Arc::new(Mutex::new(Term::new(Config::default(), &size, listener))); let processor = Arc::new(Mutex::new(Processor::new())); @@ -896,8 +895,7 @@ mod tests { let (rx, _term) = drive_listener(24, 80, &[b"\x1b[c"]); let writeback = drain_writeback(&rx); assert_eq!( - writeback, - b"\x1b[?6c", + writeback, b"\x1b[?6c", "DA1 ESC[c must produce a VT102 ident (ESC[?6c); got {writeback:?}" ); }