Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 64 additions & 32 deletions openless-all/app/src-tauri/src/coordinator/dictation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use super::*;
/// 同一个 hotkey 边沿之间的最小间隔。低于此阈值的连按整体作为误触丢弃 ——
/// 避免微动开关回弹 / 用户手抖双击造成的空转写报错和 ASR session 抢资源。
const HOTKEY_DEBOUNCE: std::time::Duration = std::time::Duration::from_millis(250);
const STREAMING_INSERT_FLUSH_INTERVAL: std::time::Duration = std::time::Duration::from_millis(12);

/// 跑流式润色路径(opt-in,跨平台)。
///
Expand All @@ -25,8 +26,8 @@ const HOTKEY_DEBOUNCE: std::time::Duration = std::time::Duration::from_millis(25
///
/// 通用流程:
/// 1. `switch_to_ascii`(macOS)/ no-op(其他);失败则降级回一次性 `polish_or_passthrough`。
/// 2. 起一个 `spawn_blocking` 后台任务,从 mpsc 收 SSE delta,逐 delta 调
/// `type_unicode_chunk` 模拟键盘事件落到光标处。串行有序,无竞态。
/// 2. 起一个 `spawn_blocking` 后台任务,从 mpsc 收 SSE delta,按 12ms flush window
/// 合并后调 `type_unicode_chunk` 模拟键盘事件落到光标处。串行有序,无竞态。
/// 3. 调 `polish_or_passthrough_streaming`,`on_delta` 把 chunk 塞进 mpsc。
/// 4. 流结束 / 失败 / 取消 → drop mpsc 发送端 → typer 任务 drain 完剩余 delta 退出 →
/// `restore_input_source` 恢复用户原输入源(macOS 才有意义,其他平台 no-op)。
Expand Down Expand Up @@ -111,44 +112,40 @@ async fn run_streaming_polish(
// 同时累积 typed_text:屏幕上真正落字的内容,用于(a)SSE 中途失败时让 history
// 与用户实际看到的内容一致;(b)pr-agent #412 反馈 \"saved output diverges
// from what the user actually sees\"。
let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<String>();
let (tx, rx) = std::sync::mpsc::channel::<String>();
let typer_handle = tokio::task::spawn_blocking(move || {
let mut rx = rx;
let mut typed_text = String::new();
let mut first_failure: Option<String> = None;
while let Some(delta) = rx.blocking_recv() {
let mut pending = String::new();
while let Ok(delta) = rx.recv() {
pending.push_str(&delta);
let flush_at = std::time::Instant::now() + STREAMING_INSERT_FLUSH_INTERVAL;
loop {
let now = std::time::Instant::now();
if now >= flush_at {
break;
}
match rx.recv_timeout(flush_at.duration_since(now)) {
Ok(delta) => pending.push_str(&delta),
Err(std::sync::mpsc::RecvTimeoutError::Timeout) => break,
Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {
first_failure =
flush_streaming_insert_buffer(&mut pending, &mut typed_text);
return (typed_text, first_failure);
}
}
}
first_failure = flush_streaming_insert_buffer(&mut pending, &mut typed_text);
if first_failure.is_some() {
// 一旦类型链路出错(如 Secure Input 启用),后续 delta 全部丢弃,但仍
// 把 mpsc drain 完,避免发送端阻塞。
continue;
}
let delta_chars = delta.chars().count();
match crate::unicode_keystroke::type_unicode_chunk(&delta) {
Ok(typed_chars) => {
let appended = append_typed_prefix(&mut typed_text, &delta, typed_chars);
if appended < delta_chars {
let reason = format!(
"type_unicode_chunk typed only {appended}/{delta_chars} chars without error"
);
log::error!(
"[coord] streaming_insert: {reason} at typed={} chars; \
dropping remaining deltas",
typed_text.chars().count()
);
first_failure = Some(reason);
}
}
Err(e) => {
append_typed_prefix(&mut typed_text, &delta, e.typed_chars());
log::error!(
"[coord] streaming_insert: type_unicode_chunk failed at typed={} chars: {e}; \
dropping remaining deltas",
typed_text.chars().count()
);
first_failure = Some(e.to_string());
}
while rx.recv().is_ok() {}
break;
}
}
if first_failure.is_none() {
first_failure = flush_streaming_insert_buffer(&mut pending, &mut typed_text);
}
(typed_text, first_failure)
});

Expand Down Expand Up @@ -286,6 +283,41 @@ async fn run_streaming_polish(
}
}

fn flush_streaming_insert_buffer(pending: &mut String, typed_text: &mut String) -> Option<String> {
if pending.is_empty() {
return None;
}
let delta = std::mem::take(pending);
let delta_chars = delta.chars().count();
match crate::unicode_keystroke::type_unicode_chunk(&delta) {
Ok(typed_chars) => {
let appended = append_typed_prefix(typed_text, &delta, typed_chars);
if appended < delta_chars {
let reason = format!(
"type_unicode_chunk typed only {appended}/{delta_chars} chars without error"
);
log::error!(
"[coord] streaming_insert: {reason} at typed={} chars; \
dropping remaining deltas",
typed_text.chars().count()
);
Some(reason)
} else {
None
}
}
Err(e) => {
append_typed_prefix(typed_text, &delta, e.typed_chars());
log::error!(
"[coord] streaming_insert: type_unicode_chunk failed at typed={} chars: {e}; \
dropping remaining deltas",
typed_text.chars().count()
);
Some(e.to_string())
}
}
}

fn finalize_polished_text(
polished: String,
translation_active: bool,
Expand Down
7 changes: 0 additions & 7 deletions openless-all/app/src-tauri/src/unicode_keystroke.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,6 @@ mod macos_impl {
#[cfg(target_os = "windows")]
mod windows_impl {
use super::{TisError, TypeError};
use std::time::Duration;
use tauri::{AppHandle, Runtime};
use windows::Win32::UI::Input::KeyboardAndMouse::{
SendInput, INPUT, INPUT_0, INPUT_KEYBOARD, KEYBDINPUT, KEYBD_EVENT_FLAGS, KEYEVENTF_KEYUP,
Expand All @@ -323,11 +322,6 @@ mod windows_impl {
/// Windows / Linux 上没有 input source 概念,token 留空。Send/Sync 自动派生。
pub struct PreviousInputSource;

/// 同一个会话内 keyDown/keyUp 之间的微延迟。Windows SendInput Unicode 在大多数
/// 应用上不需要延迟,但 Chromium 系(Edge / VSCode)观察到偶尔丢字,保留 1ms
/// 兜底跟 macOS 对齐。
const INTER_KEYSTROKE_DELAY: Duration = Duration::from_millis(1);

pub fn type_unicode_chunk(text: &str) -> Result<usize, TypeError> {
if text.is_empty() {
return Ok(0);
Expand All @@ -344,7 +338,6 @@ mod windows_impl {
}
}
typed_chars += 1;
std::thread::sleep(INTER_KEYSTROKE_DELAY);
}
Ok(typed_chars)
}
Expand Down
Loading