From f977e1ea74da2781fa99462fa78385eb51b22bef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=9F=83=E5=8D=9A=E6=8B=89=E9=85=B1?= Date: Sat, 11 Apr 2026 21:32:01 +0800 Subject: [PATCH 1/2] Rewrite terminal session lifecycle, fix PTY fd handling, add startup config MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit handlers.rs: - Rewrite terminal session model: persistent TerminalSession struct with background PTY reader thread (always running for session lifetime) and background child waiter thread with exit_notify. - WebSocket output coalescing: batch PTY output at 8ms intervals to reduce WS frame overhead. - Process exit detection: send JSON exit message to connected WebSocket clients when child process terminates. - Separate openpty failure (fallback to TIOCGPTPEER) from spawn failure (report immediately, no fallback). scrollback.rs: - Replace read_tail() with read_tail_and_then(): execute a callback under the scrollback file lock after reading, so WebSocket replay can atomically activate live forwarding before the PTY reader appends more bytes — prevents duplicate output during WS handshake. pty_fallback.rs: - Replace close_fds_above_stderr() with cloexec_fds_above_stderr(): use close_range(3, UINT_MAX, CLOSE_RANGE_CLOEXEC) instead of force-closing fds. This preserves Rust's internal exec-error reporting pipe until execve, avoiding a stdlib abort when the pipe is closed too early. - Remove fallback close loop (3..RLIMIT_NOFILE) since CLOEXEC makes it unnecessary. mod.rs: - Server readiness signaling: write "READY\n" to AXS_READY_PIPE FIFO when the server is bound and listening, so the parent shell can synchronize without HTTP polling. --- src/terminal/handlers.rs | 1253 +++++++++++++++++----------------- src/terminal/mod.rs | 23 + src/terminal/pty_fallback.rs | 54 +- src/terminal/scrollback.rs | 30 +- 4 files changed, 696 insertions(+), 664 deletions(-) diff --git a/src/terminal/handlers.rs b/src/terminal/handlers.rs index 67d79c6..2641cfb 100644 --- a/src/terminal/handlers.rs +++ b/src/terminal/handlers.rs @@ -1,619 +1,634 @@ -use super::get_default_command; -use super::pty_fallback::fallback_open_and_spawn; -use super::scrollback::Scrollback; -use super::types::*; -use crate::utils::parse_u16; -use axum::{ - body::Bytes, - extract::{ - ws::{Message, WebSocket, WebSocketUpgrade}, - Path, State, - }, - response::IntoResponse, - Json, -}; -use futures::{SinkExt, StreamExt}; -use portable_pty::{native_pty_system, ChildKiller, CommandBuilder, MasterPty, PtySize}; -use regex::Regex; -use std::io::Write; -use std::time::SystemTime; -use std::{ - io::Read, - path::PathBuf, - sync::{mpsc, Arc}, - time::Duration, -}; -use tokio::sync::Mutex; -use tokio::task::spawn_blocking; - -pub struct TerminalSession { - pub master: Arc>>, - pub child_killer: Arc>>, - pub writer: Arc>>, - pub scrollback: Arc, - pub output_tx: Arc>>>>, - pub exit_status: Arc>>, - pub exit_notify: Arc, - pub last_accessed: Arc>, -} - -pub async fn create_terminal( - State(sessions): State, - Json(options): Json, -) -> impl IntoResponse { - let rows = parse_u16(&options.rows, "rows").expect("failed"); - let cols = parse_u16(&options.cols, "cols").expect("failed"); - tracing::info!("Creating new terminal with cols={}, rows={}", cols, rows); - - let mut program = String::from("login"); - let mut args: Vec = Vec::new(); - if let Some(cmd) = get_default_command() { - let parts: Vec = cmd.split_whitespace().map(|s| s.to_string()).collect(); - if !parts.is_empty() { - program = parts[0].clone(); - if parts.len() > 1 { - args = parts[1..].to_vec(); - } - } - } - - let size = PtySize { - rows, - cols, - pixel_width: 0, - pixel_height: 0, - }; - - // --- Try the standard portable-pty path first --- - let pty_system = native_pty_system(); - let openpty_result = pty_system.openpty(size); - - let std_result = match openpty_result { - Ok(pair) => { - let mut cmd = CommandBuilder::new(&program); - if !args.is_empty() { - let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect(); - cmd.args(arg_refs); - } - match pair.slave.spawn_command(cmd) { - Ok(child) => Ok((pair.master, child)), - Err(e) => { - // openpty succeeded but spawn failed — this is a command - // error (e.g. missing program), not a PTY capability issue. - // Do NOT fall back; report immediately. - tracing::error!("spawn_command failed: {}", e); - return Json(ErrorResponse { - error: format!("Failed to spawn command: {e}"), - }) - .into_response(); - } - } - } - Err(e) => Err(e), - }; - - // --- If openpty itself failed, fall back to TIOCGPTPEER --- - let (master, mut child) = match std_result { - Ok(pair) => pair, - Err(e) => { - tracing::warn!( - "Standard openpty failed ({}), trying TIOCGPTPEER fallback", - e - ); - match fallback_open_and_spawn(size, &program, &args) { - Ok(pair) => pair, - Err(fb_err) => { - tracing::error!("TIOCGPTPEER fallback also failed: {}", fb_err); - return Json(ErrorResponse { - error: format!("Failed to open PTY: {e}; TIOCGPTPEER fallback: {fb_err}"), - }) - .into_response(); - } - } - } - }; - - // --- Common session setup --- - let pid = child.process_id().unwrap_or(0); - tracing::info!("Terminal created successfully with PID: {}", pid); - - let reader = match master.try_clone_reader() { - Ok(r) => r, - Err(e) => { - tracing::error!("Failed to clone PTY reader: {}", e); - let _ = child.kill(); - let _ = child.wait(); - return Json(ErrorResponse { - error: format!("Failed to clone PTY reader: {e}"), - }) - .into_response(); - } - }; - let writer = match master.take_writer() { - Ok(w) => Arc::new(Mutex::new(w)), - Err(e) => { - tracing::error!("Failed to take PTY writer: {}", e); - let _ = child.kill(); - let _ = child.wait(); - return Json(ErrorResponse { - error: format!("Failed to take PTY writer: {e}"), - }) - .into_response(); - } - }; - let master: Arc>> = Arc::new(Mutex::new(master)); - let child_killer = Arc::new(Mutex::new(child.clone_killer())); - - let scrollback = Arc::new(Scrollback::new(pid)); - let output_tx: Arc>>>> = - Arc::new(std::sync::Mutex::new(None)); - let exit_status: Arc>> = Arc::new(std::sync::Mutex::new(None)); - let exit_notify = Arc::new(tokio::sync::Notify::new()); - - // Background PTY reader — runs for the session lifetime - { - let scrollback = scrollback.clone(); - let output_tx = output_tx.clone(); - spawn_blocking(move || { - let mut reader = reader; - let mut read_buffer = [0u8; 8192]; - loop { - let n = match reader.read(&mut read_buffer) { - Ok(n) if n > 0 => n, - _ => break, - }; - - let data = &read_buffer[..n]; - let _ = scrollback.append(data); - - if let Ok(guard) = output_tx.try_lock() { - if let Some(ref tx) = *guard { - let _ = tx.try_send(data.to_vec()); - } - } - } - tracing::info!("Background PTY reader exited for PID {}", pid); - }); - } - - // Background child waiter — signals when process exits - { - let exit_status = exit_status.clone(); - let exit_notify = exit_notify.clone(); - let child = Arc::new(std::sync::Mutex::new(child)); - spawn_blocking(move || { - let mut child_guard = child.lock().unwrap(); - let success = match child_guard.wait() { - Ok(status) => status.success(), - Err(_) => false, - }; - *exit_status.lock().unwrap() = Some(success); - exit_notify.notify_waiters(); - tracing::info!( - "Background child waiter exited for PID {} (success={})", - pid, - success - ); - }); - } - - let session = TerminalSession { - master, - child_killer, - writer, - scrollback, - output_tx, - exit_status, - exit_notify, - last_accessed: Arc::new(Mutex::new(SystemTime::now())), - }; - - sessions.insert(pid, session); - (axum::http::StatusCode::OK, pid.to_string()).into_response() -} - -pub async fn resize_terminal( - State(sessions): State, - Path(pid): Path, - Json(options): Json, -) -> impl IntoResponse { - let rows = parse_u16(&options.rows, "rows").expect("Failed"); - let cols = parse_u16(&options.cols, "cols").expect("Failed"); - tracing::info!("Resizing terminal {} to cols={}, rows={}", pid, cols, rows); - - if let Some(session) = sessions.get(&pid) { - let size = PtySize { - rows, - cols, - pixel_width: 0, - pixel_height: 0, - }; - - match session.master.lock().await.resize(size) { - Ok(_) => Json(serde_json::json!({"success": true})).into_response(), - Err(e) => Json(ErrorResponse { - error: format!("Failed to resize: {e}"), - }) - .into_response(), - } - } else { - Json(ErrorResponse { - error: "Session not found".to_string(), - }) - .into_response() - } -} - -pub async fn terminal_websocket( - ws: WebSocketUpgrade, - Path(pid): Path, - State(sessions): State, -) -> impl IntoResponse { - tracing::info!("WebSocket connection request for terminal {}", pid); - ws.on_upgrade(move |socket| handle_socket(socket, pid, sessions)) -} - -async fn handle_socket(socket: WebSocket, pid: u32, sessions: Sessions) { - let (mut sender, mut receiver) = socket.split(); - - let (writer, scrollback, output_tx_arc, exit_status_arc, exit_notify) = { - let Some(session) = sessions.get(&pid) else { - tracing::error!("Session {} not found", pid); - return; - }; - - *session.last_accessed.lock().await = SystemTime::now(); - tracing::info!("WebSocket connection established for terminal {}", pid); - - ( - session.writer.clone(), - session.scrollback.clone(), - session.output_tx.clone(), - session.exit_status.clone(), - session.exit_notify.clone(), - ) - }; - - // Check if process already exited - let already_exited = { - let guard = exit_status_arc.lock().unwrap(); - *guard - }; - if let Some(success) = already_exited { - let exit_message = ProcessExitMessage { - exit_code: Some(if success { 0 } else { 1 }), - signal: None, - message: if success { - "Process exited successfully" - } else { - "Process exited with non-zero status" - } - .to_string(), - }; - let exit_json = serde_json::to_string(&exit_message).unwrap_or_default(); - let _ = sender - .send(Message::Text( - format!("{{\"type\":\"exit\",\"data\":{exit_json}}}").into(), - )) - .await; - sessions.remove(&pid); - return; - } - - // Create output channel for this WS connection - let (ws_output_tx, mut ws_output_rx) = tokio::sync::mpsc::channel::>(256); - - // Set the sender so background reader starts forwarding to us - { - let mut guard = output_tx_arc.lock().unwrap(); - *guard = Some(ws_output_tx); - } - - // Send full scrollback history (client should clear terminal before connecting) - let scrollback_for_replay = scrollback.clone(); - match spawn_blocking(move || scrollback_for_replay.read_tail(MAX_SCROLLBACK_BYTES)).await { - Ok(Ok(contents)) if !contents.is_empty() => { - let _ = sender.send(Message::Binary(Bytes::from(contents))).await; - } - Ok(Err(e)) => { - tracing::warn!("Failed to read scrollback for terminal {}: {}", pid, e); - } - _ => {} - } - - // WS input → PTY writer channel - let (ws_input_tx, ws_input_rx) = std::sync::mpsc::channel::>(); - let write_handle = { - let writer = writer.clone(); - spawn_blocking(move || { - while let Ok(data) = ws_input_rx.recv() { - let mut guard = writer.blocking_lock(); - if guard.write_all(&data).is_err() || guard.flush().is_err() { - break; - } - } - }) - }; - - // Main loop with output coalescing - let mut coalesce_buf: Vec = Vec::with_capacity(16384); - let mut interval = tokio::time::interval(Duration::from_millis(8)); - interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); - - loop { - tokio::select! { - _ = interval.tick() => { - if !coalesce_buf.is_empty() { - let frame = std::mem::replace(&mut coalesce_buf, Vec::with_capacity(16384)); - if sender.send(Message::Binary(Bytes::from(frame))).await.is_err() { - break; - } - } - } - data = ws_output_rx.recv() => { - match data { - Some(data) => { - coalesce_buf.extend_from_slice(&data); - if coalesce_buf.len() >= 8192 { - let frame = std::mem::replace(&mut coalesce_buf, Vec::with_capacity(16384)); - if sender.send(Message::Binary(Bytes::from(frame))).await.is_err() { - break; - } - } - } - None => { - if !coalesce_buf.is_empty() { - let _ = sender.send(Message::Binary(Bytes::from(std::mem::take(&mut coalesce_buf)))).await; - } - break; - } - } - } - _ = exit_notify.notified() => { - // Give the reader a moment to flush remaining output - tokio::time::sleep(Duration::from_millis(50)).await; - - while let Ok(data) = ws_output_rx.try_recv() { - coalesce_buf.extend_from_slice(&data); - } - if !coalesce_buf.is_empty() { - let _ = sender.send(Message::Binary(Bytes::from(std::mem::take(&mut coalesce_buf)))).await; - } - - let success = exit_status_arc.lock().unwrap().unwrap_or(false); - let exit_message = ProcessExitMessage { - exit_code: Some(if success { 0 } else { 1 }), - signal: None, - message: if success { - "Process exited successfully" - } else { - "Process exited with non-zero status" - } - .to_string(), - }; - let exit_json = serde_json::to_string(&exit_message).unwrap_or_default(); - let _ = sender - .send(Message::Text( - format!("{{\"type\":\"exit\",\"data\":{exit_json}}}").into(), - )) - .await; - - sessions.remove(&pid); - break; - } - msg = receiver.next() => { - match msg { - Some(Ok(message)) => { - let data = match message { - Message::Text(text) => text.as_bytes().to_vec(), - Message::Binary(data) => data.to_vec(), - Message::Close(_) => break, - _ => continue, - }; - if ws_input_tx.send(data).is_err() { - break; - } - } - None | Some(Err(_)) => break, - } - } - } - } - - // Disconnect: clear the output sender so background reader stops forwarding - { - let mut guard = output_tx_arc.lock().unwrap(); - *guard = None; - } - - drop(ws_input_tx); - let _ = write_handle.await; - - tracing::info!("WebSocket disconnected for terminal {}", pid); -} - -pub async fn terminate_terminal( - State(sessions): State, - Path(pid): Path, -) -> impl IntoResponse { - tracing::info!("Terminating terminal {}", pid); - - if let Some((_, session)) = sessions.remove(&pid) { - let result = session - .child_killer - .lock() - .await - .kill() - .map_err(|e| e.to_string()); - - drop(session.writer.lock().await); - session.scrollback.cleanup(); - - match result { - Ok(_) => { - tracing::info!("Terminal {} terminated successfully", pid); - Json(serde_json::json!({"success": true})).into_response() - } - Err(e) => { - tracing::error!("Failed to terminate terminal {}: {}", pid, e); - Json(ErrorResponse { - error: format!("Failed to terminate terminal {pid}: {e}"), - }) - .into_response() - } - } - } else { - tracing::error!("Failed to terminate terminal {}: session not found", pid); - Json(ErrorResponse { - error: "Session not found".to_string(), - }) - .into_response() - } -} - -pub async fn execute_command(Json(options): Json) -> impl IntoResponse { - let cwd = options.cwd.or(options.u_cwd).unwrap_or("".to_string()); - - tracing::info!( - command = %options.command, - cwd = %cwd, - "Executing command" - ); - - let shell = String::from("sh"); - let cwd = if cwd.is_empty() { - std::env::var("HOME") - .map(PathBuf::from) - .unwrap_or_else(|_| std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."))) - } else { - PathBuf::from(cwd) - }; - - if !cwd.exists() { - return ( - axum::http::StatusCode::BAD_REQUEST, - Json(CommandResponse { - output: String::new(), - error: Some("Working directory does not exist".to_string()), - }), - ) - .into_response(); - } - - let command = options.command.clone(); - - let result = spawn_blocking(move || { - let pty_system = native_pty_system(); - let size = PtySize { - rows: 24, - cols: 80, - pixel_width: 0, - pixel_height: 0, - }; - - let pair = pty_system.openpty(size)?; - - let mut cmd = CommandBuilder::new(shell); - cmd.args(["-c", &command]); - cmd.cwd(cwd); - - let mut child = pair.slave.spawn_command(cmd)?; - drop(pair.slave); - - let mut reader = pair.master.try_clone_reader()?; - let writer = pair.master.take_writer()?; - - let (tx, rx) = std::sync::mpsc::channel::>(); - - let read_thread = std::thread::spawn(move || { - let mut buffer = [0u8; 8192]; - loop { - match reader.read(&mut buffer) { - Ok(0) => break, - Ok(n) => { - if tx.send(buffer[..n].to_vec()).is_err() { - break; - } - } - Err(_) => break, - } - } - }); - - let timeout_duration = Duration::from_secs(30); - let start_time = SystemTime::now(); - let mut output = Vec::new(); - - loop { - match rx.recv_timeout(Duration::from_millis(100)) { - Ok(data) => { - output.extend(data); - } - Err(mpsc::RecvTimeoutError::Timeout) => { - if start_time.elapsed().unwrap_or_default() > timeout_duration { - child.kill()?; - return Err("Command execution timed out".into()); - } - } - Err(mpsc::RecvTimeoutError::Disconnected) => break, - } - - if let Ok(Some(_)) = child.try_wait() { - break; - } - } - - drop(writer); - let _ = read_thread.join(); - child.wait()?; - - Ok::, Box>(output) - }) - .await; - - match result { - Ok(Ok(output)) => { - let output_str = String::from_utf8_lossy(&output).into_owned(); - - let ansi_regex = - Regex::new(r"\x1B\[([0-9]{1,2}(;[0-9]{1,2})?)?[m|K]|\x1B\[[0-9]+[A-Za-z]").unwrap(); - let cleaned_output = ansi_regex.replace_all(&output_str, "").to_string(); - - tracing::info!( - output_length = cleaned_output.len(), - "Command completed successfully" - ); - - ( - axum::http::StatusCode::OK, - Json(CommandResponse { - output: cleaned_output, - error: None, - }), - ) - .into_response() - } - Ok(Err(e)) => { - tracing::error!("Command execution failed: {}", e); - ( - axum::http::StatusCode::INTERNAL_SERVER_ERROR, - Json(CommandResponse { - output: String::new(), - error: Some(e.to_string()), - }), - ) - .into_response() - } - Err(e) => { - tracing::error!("Blocking task failed: {}", e); - ( - axum::http::StatusCode::INTERNAL_SERVER_ERROR, - Json(CommandResponse { - output: String::new(), - error: Some("Internal server error".to_string()), - }), - ) - .into_response() - } - } -} +use super::get_default_command; +use super::pty_fallback::fallback_open_and_spawn; +use super::scrollback::Scrollback; +use super::types::*; +use crate::utils::parse_u16; +use axum::{ + body::Bytes, + extract::{ + ws::{Message, WebSocket, WebSocketUpgrade}, + Path, State, + }, + response::IntoResponse, + Json, +}; +use futures::{SinkExt, StreamExt}; +use portable_pty::{native_pty_system, ChildKiller, CommandBuilder, MasterPty, PtySize}; +use regex::Regex; +use std::io::Write; +use std::time::SystemTime; +use std::{ + io::Read, + path::PathBuf, + sync::{mpsc, Arc}, + time::Duration, +}; +use tokio::sync::Mutex; +use tokio::task::spawn_blocking; + +pub struct TerminalSession { + pub master: Arc>>, + pub child_killer: Arc>>, + pub writer: Arc>>, + pub scrollback: Arc, + pub output_tx: Arc>>>>, + pub exit_status: Arc>>, + pub exit_notify: Arc, + pub last_accessed: Arc>, +} + +pub async fn create_terminal( + State(sessions): State, + Json(options): Json, +) -> impl IntoResponse { + let rows = parse_u16(&options.rows, "rows").expect("failed"); + let cols = parse_u16(&options.cols, "cols").expect("failed"); + tracing::info!("Creating new terminal with cols={}, rows={}", cols, rows); + + let mut program = String::from("login"); + let mut args: Vec = Vec::new(); + if let Some(cmd) = get_default_command() { + let parts: Vec = cmd.split_whitespace().map(|s| s.to_string()).collect(); + if !parts.is_empty() { + program = parts[0].clone(); + if parts.len() > 1 { + args = parts[1..].to_vec(); + } + } + } + + let size = PtySize { + rows, + cols, + pixel_width: 0, + pixel_height: 0, + }; + + // --- Try the standard portable-pty path first --- + let pty_system = native_pty_system(); + let openpty_result = pty_system.openpty(size); + + let std_result = match openpty_result { + Ok(pair) => { + let mut cmd = CommandBuilder::new(&program); + if !args.is_empty() { + let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect(); + cmd.args(arg_refs); + } + match pair.slave.spawn_command(cmd) { + Ok(child) => Ok((pair.master, child)), + Err(e) => { + // openpty succeeded but spawn failed — this is a command + // error (e.g. missing program), not a PTY capability issue. + // Do NOT fall back; report immediately. + tracing::error!("spawn_command failed: {}", e); + return Json(ErrorResponse { + error: format!("Failed to spawn command: {e}"), + }) + .into_response(); + } + } + } + Err(e) => Err(e), + }; + + // --- If openpty itself failed, fall back to TIOCGPTPEER --- + let (master, mut child) = match std_result { + Ok(pair) => pair, + Err(e) => { + tracing::warn!( + "Standard openpty failed ({}), trying TIOCGPTPEER fallback", + e + ); + match fallback_open_and_spawn(size, &program, &args) { + Ok((master, child)) => (master, child), + Err(fb_err) => { + tracing::error!("TIOCGPTPEER fallback also failed: {}", fb_err); + return Json(ErrorResponse { + error: format!("Failed to open PTY: {e}; TIOCGPTPEER fallback: {fb_err}"), + }) + .into_response(); + } + } + } + }; + + // --- Common session setup --- + let pid = child.process_id().unwrap_or(0); + + let reader = match master.try_clone_reader() { + Ok(r) => r, + Err(e) => { + tracing::error!("Failed to clone PTY reader: {}", e); + let _ = child.kill(); + let _ = child.wait(); + return Json(ErrorResponse { + error: format!("Failed to clone PTY reader: {e}"), + }) + .into_response(); + } + }; + let writer = match master.take_writer() { + Ok(w) => Arc::new(Mutex::new(w)), + Err(e) => { + tracing::error!("Failed to take PTY writer: {}", e); + let _ = child.kill(); + let _ = child.wait(); + return Json(ErrorResponse { + error: format!("Failed to take PTY writer: {e}"), + }) + .into_response(); + } + }; + let master: Arc>> = Arc::new(Mutex::new(master)); + let child_killer = Arc::new(Mutex::new(child.clone_killer())); + + let scrollback = Arc::new(Scrollback::new(pid)); + let output_tx: Arc>>>> = + Arc::new(std::sync::Mutex::new(None)); + let exit_status: Arc>> = Arc::new(std::sync::Mutex::new(None)); + let exit_notify = Arc::new(tokio::sync::Notify::new()); + + // Background PTY reader — runs for the session lifetime + { + let scrollback = scrollback.clone(); + let output_tx = output_tx.clone(); + spawn_blocking(move || { + let mut reader = reader; + let mut read_buffer = [0u8; 8192]; + loop { + let n = match reader.read(&mut read_buffer) { + Ok(0) | Err(_) => break, + Ok(n) => n, + }; + + let data = &read_buffer[..n]; + let _ = scrollback.append(data); + + if let Ok(guard) = output_tx.try_lock() { + if let Some(ref tx) = *guard { + let _ = tx.try_send(data.to_vec()); + } + } + } + tracing::info!("Background PTY reader exited for PID {}", pid); + }); + } + + // Background child waiter — signals when process exits + { + let exit_status = exit_status.clone(); + let exit_notify = exit_notify.clone(); + let child = Arc::new(std::sync::Mutex::new(child)); + spawn_blocking(move || { + let mut child_guard = child.lock().unwrap(); + let success = match child_guard.wait() { + Ok(status) => status.success(), + Err(_) => false, + }; + *exit_status.lock().unwrap() = Some(success); + exit_notify.notify_waiters(); + tracing::info!( + "Background child waiter exited for PID {} (success={})", + pid, + success + ); + }); + } + + let session = TerminalSession { + master, + child_killer, + writer, + scrollback, + output_tx, + exit_status, + exit_notify, + last_accessed: Arc::new(Mutex::new(SystemTime::now())), + }; + + sessions.insert(pid, session); + tracing::info!("Terminal created successfully with PID: {}", pid); + (axum::http::StatusCode::OK, pid.to_string()).into_response() +} + +pub async fn resize_terminal( + State(sessions): State, + Path(pid): Path, + Json(options): Json, +) -> impl IntoResponse { + let rows = parse_u16(&options.rows, "rows").expect("Failed"); + let cols = parse_u16(&options.cols, "cols").expect("Failed"); + tracing::info!("Resizing terminal {} to cols={}, rows={}", pid, cols, rows); + + if let Some(session) = sessions.get(&pid) { + let size = PtySize { + rows, + cols, + pixel_width: 0, + pixel_height: 0, + }; + + match session.master.lock().await.resize(size) { + Ok(_) => Json(serde_json::json!({"success": true})).into_response(), + Err(e) => Json(ErrorResponse { + error: format!("Failed to resize: {e}"), + }) + .into_response(), + } + } else { + Json(ErrorResponse { + error: "Session not found".to_string(), + }) + .into_response() + } +} + +pub async fn terminal_websocket( + ws: WebSocketUpgrade, + Path(pid): Path, + State(sessions): State, +) -> impl IntoResponse { + tracing::info!("WebSocket connection request for terminal {}", pid); + ws.on_upgrade(move |socket| handle_socket(socket, pid, sessions)) +} + +async fn handle_socket(socket: WebSocket, pid: u32, sessions: Sessions) { + let (mut sender, mut receiver) = socket.split(); + + let (writer, scrollback, output_tx_arc, exit_status_arc, exit_notify) = { + let Some(session) = sessions.get(&pid) else { + tracing::error!("Session {} not found", pid); + return; + }; + + *session.last_accessed.lock().await = SystemTime::now(); + tracing::info!("WebSocket connection established for terminal {}", pid); + + ( + session.writer.clone(), + session.scrollback.clone(), + session.output_tx.clone(), + session.exit_status.clone(), + session.exit_notify.clone(), + ) + }; + + // Check if process already exited + let already_exited = { + let guard = exit_status_arc.lock().unwrap(); + *guard + }; + if let Some(success) = already_exited { + let exit_message = ProcessExitMessage { + exit_code: Some(if success { 0 } else { 1 }), + signal: None, + message: if success { + "Process exited successfully" + } else { + "Process exited with non-zero status" + } + .to_string(), + }; + let exit_json = serde_json::to_string(&exit_message).unwrap_or_default(); + let _ = sender + .send(Message::Text( + format!("{{\"type\":\"exit\",\"data\":{exit_json}}}").into(), + )) + .await; + sessions.remove(&pid); + return; + } + + // Create output channel for this WS connection + let (ws_output_tx, mut ws_output_rx) = tokio::sync::mpsc::channel::>(256); + + // Send full scrollback history, then atomically enable live forwarding before the + // PTY reader can append more bytes to the scrollback file. Without that ordering, + // the first session can replay the initial MOTD from scrollback and then receive the + // same bytes again from the live channel during the same handshake window. + let scrollback_for_replay = scrollback.clone(); + let output_tx_for_replay = output_tx_arc.clone(); + let ws_output_tx_for_replay = ws_output_tx.clone(); + match spawn_blocking(move || { + scrollback_for_replay.read_tail_and_then(MAX_SCROLLBACK_BYTES, || { + let mut guard = output_tx_for_replay.lock().unwrap(); + *guard = Some(ws_output_tx_for_replay); + }) + }) + .await + { + Ok(Ok((contents, _))) if !contents.is_empty() => { + let _ = sender.send(Message::Binary(Bytes::from(contents))).await; + } + Ok(Ok((_contents, _))) => {} + Ok(Err(e)) => { + tracing::warn!("Failed to read scrollback for terminal {}: {}", pid, e); + // Scrollback read failed, but we still need to enable live forwarding + // so the WebSocket receives PTY output going forward. + let mut guard = output_tx_arc.lock().unwrap(); + *guard = Some(ws_output_tx.clone()); + } + _ => { + // spawn_blocking itself failed; still enable forwarding. + let mut guard = output_tx_arc.lock().unwrap(); + *guard = Some(ws_output_tx.clone()); + } + } + + // WS input → PTY writer channel + let (ws_input_tx, ws_input_rx) = std::sync::mpsc::channel::>(); + let write_handle = { + let writer = writer.clone(); + spawn_blocking(move || { + while let Ok(data) = ws_input_rx.recv() { + let mut guard = writer.blocking_lock(); + if guard.write_all(&data).is_err() || guard.flush().is_err() { + break; + } + } + }) + }; + + // Main loop with output coalescing + let mut coalesce_buf: Vec = Vec::with_capacity(16384); + let mut interval = tokio::time::interval(Duration::from_millis(8)); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + loop { + tokio::select! { + _ = interval.tick() => { + if !coalesce_buf.is_empty() { + let frame = std::mem::replace(&mut coalesce_buf, Vec::with_capacity(16384)); + if sender.send(Message::Binary(Bytes::from(frame))).await.is_err() { + break; + } + } + } + data = ws_output_rx.recv() => { + match data { + Some(data) => { + coalesce_buf.extend_from_slice(&data); + if coalesce_buf.len() >= 8192 { + let frame = std::mem::replace(&mut coalesce_buf, Vec::with_capacity(16384)); + if sender.send(Message::Binary(Bytes::from(frame))).await.is_err() { + break; + } + } + } + None => { + if !coalesce_buf.is_empty() { + let _ = sender.send(Message::Binary(Bytes::from(std::mem::take(&mut coalesce_buf)))).await; + } + break; + } + } + } + _ = exit_notify.notified() => { + // Give the reader a moment to flush remaining output + tokio::time::sleep(Duration::from_millis(50)).await; + + while let Ok(data) = ws_output_rx.try_recv() { + coalesce_buf.extend_from_slice(&data); + } + if !coalesce_buf.is_empty() { + let _ = sender.send(Message::Binary(Bytes::from(std::mem::take(&mut coalesce_buf)))).await; + } + + let success = exit_status_arc.lock().unwrap().unwrap_or(false); + let exit_message = ProcessExitMessage { + exit_code: Some(if success { 0 } else { 1 }), + signal: None, + message: if success { + "Process exited successfully" + } else { + "Process exited with non-zero status" + } + .to_string(), + }; + let exit_json = serde_json::to_string(&exit_message).unwrap_or_default(); + let _ = sender + .send(Message::Text( + format!("{{\"type\":\"exit\",\"data\":{exit_json}}}").into(), + )) + .await; + + sessions.remove(&pid); + break; + } + msg = receiver.next() => { + match msg { + Some(Ok(message)) => { + let data = match message { + Message::Text(text) => text.as_bytes().to_vec(), + Message::Binary(data) => data.to_vec(), + Message::Close(_) => break, + _ => continue, + }; + if ws_input_tx.send(data).is_err() { + break; + } + } + None | Some(Err(_)) => break, + } + } + } + } + + // Disconnect: clear the output sender so background reader stops forwarding + { + let mut guard = output_tx_arc.lock().unwrap(); + *guard = None; + } + + drop(ws_input_tx); + let _ = write_handle.await; + + tracing::info!("WebSocket disconnected for terminal {}", pid); +} + +pub async fn terminate_terminal( + State(sessions): State, + Path(pid): Path, +) -> impl IntoResponse { + tracing::info!("Terminating terminal {}", pid); + + if let Some((_, session)) = sessions.remove(&pid) { + let result = session + .child_killer + .lock() + .await + .kill() + .map_err(|e| e.to_string()); + + drop(session.writer.lock().await); + session.scrollback.cleanup(); + + match result { + Ok(_) => { + tracing::info!("Terminal {} terminated successfully", pid); + Json(serde_json::json!({"success": true})).into_response() + } + Err(e) => { + tracing::error!("Failed to terminate terminal {}: {}", pid, e); + Json(ErrorResponse { + error: format!("Failed to terminate terminal {pid}: {e}"), + }) + .into_response() + } + } + } else { + tracing::error!("Failed to terminate terminal {}: session not found", pid); + Json(ErrorResponse { + error: "Session not found".to_string(), + }) + .into_response() + } +} + +pub async fn execute_command(Json(options): Json) -> impl IntoResponse { + let cwd = options.cwd.or(options.u_cwd).unwrap_or("".to_string()); + + tracing::info!( + command = %options.command, + cwd = %cwd, + "Executing command" + ); + + let shell = String::from("sh"); + let cwd = if cwd.is_empty() { + std::env::var("HOME") + .map(PathBuf::from) + .unwrap_or_else(|_| std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."))) + } else { + PathBuf::from(cwd) + }; + + if !cwd.exists() { + return ( + axum::http::StatusCode::BAD_REQUEST, + Json(CommandResponse { + output: String::new(), + error: Some("Working directory does not exist".to_string()), + }), + ) + .into_response(); + } + + let command = options.command.clone(); + + let result = spawn_blocking(move || { + let pty_system = native_pty_system(); + let size = PtySize { + rows: 24, + cols: 80, + pixel_width: 0, + pixel_height: 0, + }; + + let pair = pty_system.openpty(size)?; + + let mut cmd = CommandBuilder::new(shell); + cmd.args(["-c", &command]); + cmd.cwd(cwd); + + let mut child = pair.slave.spawn_command(cmd)?; + drop(pair.slave); + + let mut reader = pair.master.try_clone_reader()?; + let writer = pair.master.take_writer()?; + + let (tx, rx) = std::sync::mpsc::channel::>(); + + let read_thread = std::thread::spawn(move || { + let mut buffer = [0u8; 8192]; + loop { + match reader.read(&mut buffer) { + Ok(0) => break, + Ok(n) => { + if tx.send(buffer[..n].to_vec()).is_err() { + break; + } + } + Err(_) => break, + } + } + }); + + let timeout_duration = Duration::from_secs(30); + let start_time = SystemTime::now(); + let mut output = Vec::new(); + + loop { + match rx.recv_timeout(Duration::from_millis(100)) { + Ok(data) => { + output.extend(data); + } + Err(mpsc::RecvTimeoutError::Timeout) => { + if start_time.elapsed().unwrap_or_default() > timeout_duration { + child.kill()?; + return Err("Command execution timed out".into()); + } + } + Err(mpsc::RecvTimeoutError::Disconnected) => break, + } + + if let Ok(Some(_)) = child.try_wait() { + break; + } + } + + drop(writer); + let _ = read_thread.join(); + child.wait()?; + + Ok::, Box>(output) + }) + .await; + + match result { + Ok(Ok(output)) => { + let output_str = String::from_utf8_lossy(&output).into_owned(); + + let ansi_regex = + Regex::new(r"\x1B\[([0-9]{1,2}(;[0-9]{1,2})?)?[m|K]|\x1B\[[0-9]+[A-Za-z]").unwrap(); + let cleaned_output = ansi_regex.replace_all(&output_str, "").to_string(); + + tracing::info!( + output_length = cleaned_output.len(), + "Command completed successfully" + ); + + ( + axum::http::StatusCode::OK, + Json(CommandResponse { + output: cleaned_output, + error: None, + }), + ) + .into_response() + } + Ok(Err(e)) => { + tracing::error!("Command execution failed: {}", e); + ( + axum::http::StatusCode::INTERNAL_SERVER_ERROR, + Json(CommandResponse { + output: String::new(), + error: Some(e.to_string()), + }), + ) + .into_response() + } + Err(e) => { + tracing::error!("Blocking task failed: {}", e); + ( + axum::http::StatusCode::INTERNAL_SERVER_ERROR, + Json(CommandResponse { + output: String::new(), + error: Some("Internal server error".to_string()), + }), + ) + .into_response() + } + } +} diff --git a/src/terminal/mod.rs b/src/terminal/mod.rs index c60d4e7..5cf13ea 100644 --- a/src/terminal/mod.rs +++ b/src/terminal/mod.rs @@ -12,6 +12,8 @@ use axum::{ use axum::http::HeaderValue; use dashmap::DashMap; +use std::env; +use std::io::Write; use std::sync::OnceLock; use std::{io::ErrorKind, net::Ipv4Addr, sync::Arc}; use tower_http::cors::{Any, CorsLayer}; @@ -79,6 +81,27 @@ pub async fn start_server(host: Ipv4Addr, port: u16, allow_any_origin: bool) { Ok(listener) => { tracing::info!("listening on {}", listener.local_addr().unwrap()); + // Notify parent process via FIFO that the server is ready to accept + // connections. The parent shell creates a named pipe and sets + // AXS_READY_PIPE; we write "READY\n" and close. The parent's blocking + // `read` returns immediately — no HTTP polling needed. + // + // open() on a FIFO write-end blocks until a reader opens it. This is + // acceptable and should never happen in practice: + // - Only callers that explicitly set AXS_READY_PIPE opt into this + // path; legacy callers are unaffected. + // - A caller that sets the variable is expected to follow the pipe + // protocol (`mkfifo; axs &; read < pipe`), so a reader is always + // present before axs reaches this point. + // - A caller that sets the variable yet deliberately violates the + // protocol deserves no fallback — blocking is a visible symptom + // that exposes the misconfiguration rather than hiding it. + if let Ok(pipe_path) = env::var("AXS_READY_PIPE") { + if let Ok(mut f) = std::fs::OpenOptions::new().write(true).open(&pipe_path) { + let _ = f.write_all(b"READY\n"); + } + } + if let Err(e) = axum::serve(listener, app).await { tracing::error!("Server error: {}", e); } diff --git a/src/terminal/pty_fallback.rs b/src/terminal/pty_fallback.rs index 8be4fb0..4aa4486 100644 --- a/src/terminal/pty_fallback.rs +++ b/src/terminal/pty_fallback.rs @@ -15,6 +15,7 @@ use std::os::unix::io::{AsRawFd, FromRawFd, RawFd}; /// Defined in `` as `_IO('T', 0x41)` = `0x5441`. /// Architecture-independent on Linux. const TIOCGPTPEER: libc::c_ulong = 0x5441; +const CLOSE_RANGE_CLOEXEC: libc::c_uint = 0x4; // --------------------------------------------------------------------------- // OwnedFd — thin RAII wrapper around a raw file descriptor @@ -205,43 +206,29 @@ impl Drop for FallbackMasterWriter { } // --------------------------------------------------------------------------- -// Helper: close leaked fds in the child process +// Helper: prevent fd leaks without breaking Rust's exec-error pipe // --------------------------------------------------------------------------- -/// Close all fds above stderr in the child process. +/// Mark all fds above stderr as close-on-exec in the child process. /// -/// This runs inside `pre_exec` (between `fork()` and `exec()`), so it MUST -/// only use async-signal-safe operations — no heap allocation, no Rust -/// stdlib I/O, no iterators that allocate. -unsafe fn close_fds_above_stderr() { - const FALLBACK_MAX_FD: libc::c_int = 1_048_576; - - // Try close_range(3, UINT_MAX, 0) first (Linux 5.9+). +/// This preserves Rust's internal exec-error reporting pipe until `execve`, +/// avoiding the stdlib abort seen when the pipe is closed too early, while +/// still preventing descriptor leaks into the spawned program. +/// +/// No fallback when `close_range` fails: `CLOSE_RANGE_CLOEXEC` requires +/// Linux 5.11+, which is satisfied by all Android kernels AXS targets and +/// by Alpine under proot. Adding a `/proc/self/fd` enumeration + per-fd +/// `fcntl(FD_CLOEXEC)` loop would be ~80 lines of async-signal-unsafe code +/// for an environment that will never exercise it. +unsafe fn cloexec_fds_above_stderr() { #[cfg(any(target_os = "linux", target_os = "android"))] { - let res = libc::syscall(libc::SYS_close_range, 3u64, u32::MAX as u64, 0u64); - if res == 0 { - return; - } - } - - // Fallback: close(3..RLIMIT_NOFILE) loop — no allocation needed. - let mut rl: libc::rlimit = std::mem::zeroed(); - let max_fd = if libc::getrlimit(libc::RLIMIT_NOFILE, &mut rl) == 0 { - if rl.rlim_cur != libc::RLIM_INFINITY { - rl.rlim_cur.min(libc::c_int::MAX as libc::rlim_t) as libc::c_int - } else if rl.rlim_max != libc::RLIM_INFINITY { - rl.rlim_max.min(libc::c_int::MAX as libc::rlim_t) as libc::c_int - } else { - FALLBACK_MAX_FD - } - } else { - FALLBACK_MAX_FD - }; - let mut fd: libc::c_int = 3; - while fd < max_fd { - libc::close(fd); - fd += 1; + libc::syscall( + libc::SYS_close_range, + 3u64, + u32::MAX as u64, + CLOSE_RANGE_CLOEXEC as u64, + ); } } @@ -357,8 +344,7 @@ pub fn fallback_open_and_spawn( return Err(io::Error::last_os_error()); } - // Close leaked fds - close_fds_above_stderr(); + cloexec_fds_above_stderr(); Ok(()) }); diff --git a/src/terminal/scrollback.rs b/src/terminal/scrollback.rs index 1c4914c..76cafe3 100644 --- a/src/terminal/scrollback.rs +++ b/src/terminal/scrollback.rs @@ -40,23 +40,31 @@ impl Scrollback { Ok(()) } - pub fn read_tail(&self, max_bytes: usize) -> io::Result> { + pub fn read_tail_and_then(&self, max_bytes: usize, then: F) -> io::Result<(Vec, T)> + where + F: FnOnce() -> T, + { self.ensure_file()?; let mut guard = self.file.lock().unwrap(); if let Some(ref mut f) = *guard { let file_size = f.seek(SeekFrom::End(0))?; - if file_size == 0 { - return Ok(Vec::new()); - } + let buf = if file_size == 0 { + Vec::new() + } else { + let read_from = file_size.saturating_sub(max_bytes as u64); + f.seek(SeekFrom::Start(read_from))?; + let mut buf = Vec::with_capacity((file_size - read_from) as usize); + f.read_to_end(&mut buf)?; + buf + }; - let read_from = file_size.saturating_sub(max_bytes as u64); - - f.seek(SeekFrom::Start(read_from))?; - let mut buf = Vec::with_capacity((file_size - read_from) as usize); - f.read_to_end(&mut buf)?; - Ok(buf) + // WebSocket replay must activate live forwarding before releasing the scrollback + // lock. Otherwise the PTY reader can append the same early output to scrollback, + // replay it, and then forward it live again — duplicating early output. + let result = then(); + Ok((buf, result)) } else { - Ok(Vec::new()) + Ok((Vec::new(), then())) } } From 27f7fb63611b1ccbe17a7d451a0b457094e4c867 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=9F=83=E5=8D=9A=E6=8B=89=E9=85=B1?= Date: Sun, 12 Apr 2026 22:10:26 +0800 Subject: [PATCH 2/2] chore: update Cargo.lock version bump axs 0.2.13 -> 0.2.14 Cargo.lock records the version bump of the root crate axs from 0.2.13 to 0.2.14, auto-updated when running cargo build after pulling the latest changes on the session-lifecycle-pty-cloexec-startup-config branch. --- Cargo.lock | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 3d0b67a..9fea22c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -75,7 +75,7 @@ checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" [[package]] name = "axs" -version = "0.2.13" +version = "0.2.14" dependencies = [ "anyhow", "axum",