diff --git a/src/pty.rs b/src/pty.rs index c22a2628f..444f33e7e 100644 --- a/src/pty.rs +++ b/src/pty.rs @@ -1,7 +1,7 @@ use std::{ env, ffi::OsString, - io::{Read, Write}, + io::{self, Read, Write}, path::Path, sync::{ atomic::{AtomicBool, Ordering}, @@ -20,13 +20,36 @@ use parking_lot::Mutex; use portable_pty::{native_pty_system, CommandBuilder, PtySize}; use tokio::sync::mpsc; -/// Upper bound on queued writeback responses (DSR/DA1/DA2/CPR replies). +/// Upper bound on queued PTY writes (both terminal-query replies from +/// alacritty AND user/injection writes from `PtySession::write_all`). /// Bounded so a misbehaving child that floods query sequences while the /// drainer is stuck on `write_all` cannot grow the queue without limit /// and OOM the broker. 128 entries is well past any real burst — a -/// healthy terminal interaction sees fewer than a handful of pending -/// replies at a time. -const WRITEBACK_QUEUE_DEPTH: usize = 128; +/// healthy interaction sees fewer than a handful of pending writes at +/// a time. +const WRITE_QUEUE_DEPTH: usize = 128; + +/// Single FIFO command for the PTY write drainer. Both query-reply +/// writebacks (from `RelayEventListener`) and user-input/injection +/// writes (from `PtySession::write_all`) go through the same queue so +/// the drainer thread is the **only** thing that touches the writer. +/// That guarantees a global FIFO ordering across both producers — a +/// terminal-query reply can no longer splice between two consecutive +/// user writes (e.g. between an injection body and its trailing `\r`). +enum WriteMsg { + /// Reply produced by alacritty's `EventListener` (DSR / DA1 / DA2 / + /// CPR). Best-effort: dropped if the queue is full because the + /// listener is invoked from the parser hot path and must not block. + Reply(Vec), + /// User/injection write from a `PtySession::write_all` caller. The + /// caller's send is **blocking** so backpressure flows back through + /// the worker; ordering is preserved relative to other UserInputs + /// and to any Replies already queued. + UserInput { + bytes: Vec, + ack: std_mpsc::Sender>, + }, +} /// Forwards alacritty's terminal events back to the PTY's stdin so the /// child process sees real responses to its query sequences (DSR, DA1, @@ -36,17 +59,15 @@ const WRITEBACK_QUEUE_DEPTH: usize = 128; /// /// `send_event` is invoked from inside `Processor::advance` while the /// processor and term locks are held, so it must be non-blocking. We -/// hand the bytes off to a bounded `std::sync::mpsc::sync_channel` via -/// `try_send`; a dedicated drainer thread takes the writer lock and -/// pushes them down the PTY, decoupling potential `write_all` blocking -/// from the parser hot path. +/// hand the bytes off to the shared write queue via `try_send`; the +/// drainer thread is the single owner of the writer lock. #[derive(Clone)] pub struct RelayEventListener { - tx: std_mpsc::SyncSender>, + tx: std_mpsc::SyncSender, } impl RelayEventListener { - fn new(tx: std_mpsc::SyncSender>) -> Self { + fn new(tx: std_mpsc::SyncSender) -> Self { Self { tx } } } @@ -62,12 +83,12 @@ impl EventListener for RelayEventListener { // backed up behind a blocked write_all) or the drainer has // exited (PTY teardown), drop the reply. Telemetry must be // infallible, but a queue overflow is worth flagging. - match self.tx.try_send(text.into_bytes()) { + match self.tx.try_send(WriteMsg::Reply(text.into_bytes())) { Ok(()) | Err(std_mpsc::TrySendError::Disconnected(_)) => {} Err(std_mpsc::TrySendError::Full(_)) => { tracing::warn!( - depth = WRITEBACK_QUEUE_DEPTH, - "pty writeback queue full; dropping terminal query response" + depth = WRITE_QUEUE_DEPTH, + "pty write queue full; dropping terminal query response" ); } } @@ -112,7 +133,11 @@ impl GridSize { pub struct PtySession { master: Box, - writer: Arc>>, + /// Single producer side of the FIFO write queue. All PTY writes — + /// both query-reply writebacks from `RelayEventListener` and user + /// input from `write_all` — funnel through here. The drainer thread + /// is the only thing that ever locks the real writer. + write_tx: std_mpsc::SyncSender, child: Arc>>, child_pid: Option, reaped: Arc, @@ -185,6 +210,39 @@ fn resolve_command_path(command: &str) -> String { command.to_string() } +fn drain_write_queue(mut writer: W, write_rx: std_mpsc::Receiver) { + while let Ok(msg) = write_rx.recv() { + match msg { + WriteMsg::Reply(bytes) => { + if bytes.is_empty() { + continue; + } + if writer.write_all(&bytes).is_err() { + break; + } + if writer.flush().is_err() { + break; + } + } + WriteMsg::UserInput { bytes, ack } => { + if bytes.is_empty() { + let _ = ack.send(Ok(())); + continue; + } + match writer.write_all(&bytes).and_then(|_| writer.flush()) { + Ok(()) => { + let _ = ack.send(Ok(())); + } + Err(err) => { + let _ = ack.send(Err(err)); + break; + } + } + } + } + } +} + impl PtySession { pub fn spawn( command: &str, @@ -228,36 +286,24 @@ impl PtySession { .context("failed to take pty writer")?; let size = GridSize::from_pty(rows, cols); - // Writeback channel for query responses generated by alacritty - // (DSR/DA1/DA2/CPR). The listener is sync and non-blocking; the - // drainer thread below takes the writer lock and pushes the - // bytes down the PTY. Bounded to prevent unbounded growth if - // the drainer is stuck behind a blocked `write_all`. - let (writeback_tx, writeback_rx) = std_mpsc::sync_channel::>(WRITEBACK_QUEUE_DEPTH); - let listener = RelayEventListener::new(writeback_tx); + // Shared write queue used by both producers: query-reply + // writebacks from alacritty's listener and user/injection + // writes from `PtySession::write_all`. Routing everything + // through one FIFO is what gives us global ordering — without + // it, the writer mutex provides per-call mutual exclusion but + // a reply can splice between two consecutive user writes. + let (write_tx, write_rx) = std_mpsc::sync_channel::(WRITE_QUEUE_DEPTH); + let listener = RelayEventListener::new(write_tx.clone()); let term = Arc::new(Mutex::new(Term::new(Config::default(), &size, listener))); let processor = Arc::new(Mutex::new(Processor::new())); - // Drainer: receives writeback bytes from the listener and pushes - // them to the PTY writer. Lives on a std::thread so it doesn't - // need a tokio runtime. Exits when the listener is dropped (i.e. - // when `term` is dropped, which happens at `PtySession` drop). - let writer_arc: Arc>> = Arc::new(Mutex::new(writer)); - let writer_for_drainer = writer_arc.clone(); - thread::spawn(move || { - while let Ok(bytes) = writeback_rx.recv() { - if bytes.is_empty() { - continue; - } - let mut guard = writer_for_drainer.lock(); - if guard.write_all(&bytes).is_err() { - break; - } - if guard.flush().is_err() { - break; - } - } - }); + // Drainer: single owner of the writer. Receives `WriteMsg`s + // from the queue and pushes them to the PTY. Lives on a + // std::thread so it doesn't need a tokio runtime. Exits when + // every sender is dropped — i.e. when the listener inside + // `term` is dropped (term goes away at PtySession drop) AND + // the `write_tx` clone on the struct is dropped (same time). + thread::spawn(move || drain_write_queue(writer, write_rx)); let (tx, rx) = mpsc::channel(256); let term_clone = term.clone(); @@ -295,7 +341,7 @@ impl PtySession { Ok(( Self { master: pair.master, - writer: writer_arc, + write_tx, child: Arc::new(Mutex::new(child)), child_pid, reaped: Arc::new(AtomicBool::new(false)), @@ -307,11 +353,32 @@ impl PtySession { )) } + /// Queue bytes for the PTY drainer to write. Ordering is preserved + /// relative to other `write_all` calls and to any pending + /// terminal-query replies — both go through the same FIFO. + /// + /// Blocks if the queue is full (drainer wedged behind a slow PTY + /// write) and waits for the drainer to ack the write result. Returns + /// `Err` if the drainer has exited (PTY teardown) or if the underlying + /// PTY write/flush fails. pub fn write_all(&self, bytes: &[u8]) -> Result<()> { - let mut guard = self.writer.lock(); - guard.write_all(bytes)?; - guard.flush()?; - Ok(()) + let (ack_tx, ack_rx) = std_mpsc::channel::>(); + self.write_tx + .send(WriteMsg::UserInput { + bytes: bytes.to_vec(), + ack: ack_tx, + }) + .map_err(|_| { + anyhow::anyhow!("pty write queue is closed (drainer exited before enqueue)") + })?; + + match ack_rx.recv() { + Ok(Ok(())) => Ok(()), + Ok(Err(err)) => Err(err).context("failed to write queued input to pty"), + Err(_) => Err(anyhow::anyhow!( + "pty write drainer exited before acknowledging queued write" + )), + } } pub fn resize(&self, rows: u16, cols: u16) -> Result<()> { @@ -840,7 +907,7 @@ mod tests { // bytes that match real grid state — replacing the old hand-rolled // `TerminalQueryParser` that hardcoded `1;1` for CPR. - use super::{RelayEventListener, WRITEBACK_QUEUE_DEPTH}; + use super::{RelayEventListener, WriteMsg, WRITE_QUEUE_DEPTH}; use std::sync::mpsc as std_mpsc; use std::time::Duration as StdDuration; @@ -848,12 +915,12 @@ mod tests { rows: u16, cols: u16, chunks: &[&[u8]], - ) -> (std_mpsc::Receiver>, Term) { + ) -> (std_mpsc::Receiver, Term) { let size = GridSize { columns: cols as usize, screen_lines: rows as usize, }; - let (tx, rx) = std_mpsc::sync_channel::>(WRITEBACK_QUEUE_DEPTH); + let (tx, rx) = std_mpsc::sync_channel::(WRITE_QUEUE_DEPTH); let listener = RelayEventListener::new(tx); let mut term = Term::new(Config::default(), &size, listener); let mut processor: Processor = Processor::new(); @@ -863,12 +930,18 @@ mod tests { (rx, term) } - fn drain_writeback(rx: &std_mpsc::Receiver>) -> Vec { + fn drain_writeback(rx: &std_mpsc::Receiver) -> Vec { let mut out = Vec::new(); // First reply is queued synchronously inside processor.advance, - // but try_recv is enough since send is non-blocking. - while let Ok(bytes) = rx.recv_timeout(StdDuration::from_millis(50)) { - out.extend_from_slice(&bytes); + // but recv_timeout is enough since send is non-blocking. + while let Ok(msg) = rx.recv_timeout(StdDuration::from_millis(50)) { + // Listener can only produce Reply variants; UserInput would + // come from a real PtySession::write_all caller, which these + // tests don't construct. + match msg { + WriteMsg::Reply(bytes) => out.extend_from_slice(&bytes), + WriteMsg::UserInput { .. } => unreachable!("listener never emits UserInput"), + } } out } @@ -900,6 +973,54 @@ mod tests { ); } + #[test] + fn write_queue_preserves_fifo_across_user_and_reply() { + // Both producers (`PtySession::write_all` → UserInput and the + // alacritty listener → Reply) push onto the same channel. The + // drainer's `recv` order is the order the bytes hit the PTY. + // Interleave two of each kind in a known order and assert the + // receiver pulls them back in that same order — no reordering, + // no Reply splicing between two UserInputs. + let (tx, rx) = std_mpsc::sync_channel::(WRITE_QUEUE_DEPTH); + + let (ack_tx_1, _ack_rx_1) = std_mpsc::channel::>(); + tx.send(WriteMsg::UserInput { + bytes: b"injection-body".to_vec(), + ack: ack_tx_1, + }) + .unwrap(); + tx.send(WriteMsg::Reply(b"\x1b[0n".to_vec())).unwrap(); + let (ack_tx_2, _ack_rx_2) = std_mpsc::channel::>(); + tx.send(WriteMsg::UserInput { + bytes: b"\r".to_vec(), + ack: ack_tx_2, + }) + .unwrap(); + tx.send(WriteMsg::Reply(b"\x1b[3;5R".to_vec())).unwrap(); + + let observed: Vec> = (0..4) + .map(|_| { + let msg = rx + .recv_timeout(StdDuration::from_millis(50)) + .expect("drainer receives in order"); + match msg { + WriteMsg::Reply(bytes) | WriteMsg::UserInput { bytes, .. } => bytes, + } + }) + .collect(); + + assert_eq!( + observed, + vec![ + b"injection-body".to_vec(), + b"\x1b[0n".to_vec(), + b"\r".to_vec(), + b"\x1b[3;5R".to_vec(), + ], + "FIFO order must be preserved across both message kinds", + ); + } + #[test] fn listener_answers_cpr_with_real_cursor_position() { // Move the cursor to row 3, col 5 (1-indexed CUP), then issue @@ -921,4 +1042,72 @@ mod tests { "CPR ESC[6n must reflect real cursor position; got {writeback:?}" ); } + + #[test] + fn user_input_ack_reports_write_failure() { + struct AlwaysFailWriter; + impl std::io::Write for AlwaysFailWriter { + fn write(&mut self, _buf: &[u8]) -> std::io::Result { + Err(std::io::Error::new( + std::io::ErrorKind::BrokenPipe, + "forced failure", + )) + } + fn flush(&mut self) -> std::io::Result<()> { + Ok(()) + } + } + + let (tx, rx) = std_mpsc::sync_channel::(WRITE_QUEUE_DEPTH); + let drainer = std::thread::spawn(move || super::drain_write_queue(AlwaysFailWriter, rx)); + + let (ack_tx, ack_rx) = std_mpsc::channel::>(); + tx.send(WriteMsg::UserInput { + bytes: b"should-fail\n".to_vec(), + ack: ack_tx, + }) + .expect("queue accepts user input"); + + let ack = ack_rx + .recv_timeout(StdDuration::from_millis(200)) + .expect("drainer must ack user input writes"); + assert!(ack.is_err(), "drainer write failure must be surfaced"); + + drop(tx); + drainer.join().expect("drainer thread joins cleanly"); + } + + #[test] + fn user_input_ack_reports_flush_failure() { + struct FlushFailWriter; + impl std::io::Write for FlushFailWriter { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + Ok(buf.len()) + } + fn flush(&mut self) -> std::io::Result<()> { + Err(std::io::Error::new( + std::io::ErrorKind::BrokenPipe, + "forced flush failure", + )) + } + } + + let (tx, rx) = std_mpsc::sync_channel::(WRITE_QUEUE_DEPTH); + let drainer = std::thread::spawn(move || super::drain_write_queue(FlushFailWriter, rx)); + + let (ack_tx, ack_rx) = std_mpsc::channel::>(); + tx.send(WriteMsg::UserInput { + bytes: b"flush-should-fail\n".to_vec(), + ack: ack_tx, + }) + .expect("queue accepts user input"); + + let ack = ack_rx + .recv_timeout(StdDuration::from_millis(200)) + .expect("drainer must ack user input writes"); + assert!(ack.is_err(), "drainer flush failure must be surfaced"); + + drop(tx); + drainer.join().expect("drainer thread joins cleanly"); + } }