Skip to content

fix(broker): paced, escape-aware PTY injection#981

Closed
willwashburn wants to merge 1 commit into
mainfrom
claude/review-issue-801-2Fug0
Closed

fix(broker): paced, escape-aware PTY injection#981
willwashburn wants to merge 1 commit into
mainfrom
claude/review-issue-801-2Fug0

Conversation

@willwashburn

Copy link
Copy Markdown
Member

Summary

Closes #801. Replaces the broker's bulk-write injection (write_all(payload)sleep(50ms)write_all("\r")) with a paced, escape-aware writer that emits one atom at a time — fixing the "Claude occasionally batches/eats characters" class of bug we see when injecting into a busy CLI.

  • New next_atom_end in crates/broker/src/pty.rs — port of escapeSeqEnd from montanaflynn/headless-terminal. Returns the length of the next atom: a single ASCII byte, a full UTF-8 codepoint (1–4 bytes), or a complete CSI / SS3 / OSC sequence. Truncated sequences are returned as one chunk so partial writes still make forward progress.
  • New PtySession::write_paced(bytes, rate) — walks next_atom_end and enqueues each atom through the existing FIFO write queue with rate between atoms. Honours Duration::ZERO (no sleep, still atom-by-atom). Interleaves correctly with terminal-query replies because it uses the same queue.
  • pty_worker.rs injection site now calls write_paced(body + b"\r", 20ms) instead of the bulk write + sleep + \r pair. Constant INJECT_PACE_MS = 20 lives next to the other module constants.

The 20 ms cadence matches the headless-terminal default — fast enough to feel instant, slow enough that the child reliably reads each chunk between writes.

Test plan

  • cargo build -p agent-relay-broker — clean
  • cargo clippy -p agent-relay-broker --lib --tests -- -D warnings — clean
  • cargo test -p agent-relay-broker --lib pty:: — 33 pass (11 new, covering plain ASCII, CSI, SS3, OSC (BEL + ST), multi-byte UTF-8, mixed payloads, truncated CSI, lone ESC, and an end-to-end live-PTY round-trip through cat with a 4-byte codepoint)
  • cargo test -p agent-relay-broker --lib pty_worker:: — 6 pass (no regressions)
  • Smoke-test against a real Claude/Codex worker under load (recommend before merge)

https://claude.ai/code/session_01EWZheXxPE8ykajhPzqMn1H


Generated by Claude Code

The worker previously bulk-wrote an entire injection payload, slept
50ms, then wrote `\r`. Under load Claude (and other CLIs) would
occasionally batch or drop characters at the start of an injection,
or render the body as a single garbled blob.

Port `escapeSeqEnd` from montanaflynn/headless-terminal as
`next_atom_end` and add `PtySession::write_paced`, which enqueues one
atom at a time (single ASCII byte, full UTF-8 codepoint, or full
CSI/SS3/OSC sequence) with a configurable gap. The worker now writes
the injection body + trailing `\r` through `write_paced` at 20ms per
atom — fast enough to feel instant, slow enough that the child
reliably consumes each chunk between reads, and escape sequences are
never split mid-stream.

Closes #801.
@willwashburn willwashburn requested a review from khaliqgant as a code owner May 25, 2026 02:22
@coderabbitai

coderabbitai Bot commented May 25, 2026

Copy link
Copy Markdown
Contributor

Review Change Stack

📝 Walkthrough

Walkthrough

This PR introduces escape-aware paced PTY injection to prevent character loss during CLI message delivery. A new write_paced method atomizes input into terminal-safe chunks (UTF-8 codepoints and escape sequences remain intact), writes each via the existing write_all path, and applies optional inter-atom delays. The delivery routine in pty_worker now uses this method instead of bulk writes followed by fixed sleeps.

Changes

Paced PTY Injection Support

Layer / File(s) Summary
Atomic boundary detection and paced write method
crates/broker/src/pty.rs
Imports Duration, adds next_atom_end and utf8_codepoint_len helpers to identify atomic write boundaries for UTF-8 codepoints and escape sequences (CSI, SS3, OSC), and implements PtySession::write_paced to enqueue atoms via write_all with optional inter-atom delays.
Atomization unit tests
crates/broker/src/pty.rs
Adds atomize test helper and comprehensive unit tests covering ASCII, CSI/SS3 sequences, OSC termination by BEL and ST, multi-byte UTF-8, mixed payloads, and truncated escape sequences.
Integration test for paced write
crates/broker/src/pty.rs
Tokio integration test verifying that write_paced with zero rate round-trips a payload including multi-byte UTF-8 and carriage return through a live PTY without splitting sequences.
Paced injection in message delivery
crates/broker/src/pty_worker.rs
Adds INJECT_PACE_MS constant and replaces the prior bulk write plus fixed sleep with write_paced, building the injection payload with trailing carriage return and passing INJECT_PACE_MS as the rate.

Sequence Diagram

sequenceDiagram
  participant Caller
  participant write_paced
  participant next_atom_end
  participant write_all
  participant FIFO
  
  Caller->>write_paced: bytes, rate
  loop for each atom
    write_paced->>next_atom_end: remaining bytes
    next_atom_end-->>write_paced: atom length
    write_paced->>write_all: atom chunk
    write_all->>FIFO: enqueue atom
    write_paced->>write_paced: sleep(rate)
  end
  write_paced-->>Caller: Result
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related PRs

  • AgentWorkforce/relay#882: The new write_paced method atomizes payloads to write each chunk via the existing write_all path, which was refactored in that PR to enqueue writes into a single FIFO drainer.
  • AgentWorkforce/relay#921: Both PRs modify PTY message delivery logic in pty_worker.rs, with this PR routing delivery through write_paced while the other adds a separate write_pty frame handler.

Suggested reviewers

  • khaliqgant

Poem

🐰 A rabbit's ode to steady keystrokes:

Bytes no longer rush and tumble,
Escape codes stay whole, never crumble.
UTF-8 codepoints keep their form,
As paced injection becomes the norm.
Twenty milliseconds between each hop,
Claude's confusion has to stop. 🎉

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately summarizes the main change: implementing paced, escape-aware PTY injection to fix character batching/dropping issues.
Description check ✅ Passed The description is comprehensive, includes a clear summary, test plan with checkbox verification, and links to the parent issue and Claude Code session.
Linked Issues check ✅ Passed The PR fully implements all objectives from issue #801: escape-aware atomizer (next_atom_end), paced writer (write_paced), integration into pty_worker, and comprehensive test coverage.
Out of Scope Changes check ✅ Passed All changes are directly scoped to issue #801: new PTY methods, injection site updates, and related tests. No unrelated modifications detected.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch claude/review-issue-801-2Fug0

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 59938f0bbe

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines +909 to +910
.write_paced(&paced, Duration::from_millis(INJECT_PACE_MS))
.await

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Avoid sleeping 20ms for every ASCII byte injected

write_paced sleeps between every atom, and next_atom_end defines plain ASCII as 1-byte atoms, so this call effectively adds a 20ms delay per character for normal message text. That makes large injections take seconds (e.g., ~10s for 500 bytes) before delivery_injected is emitted, while pending deliveries are retried on a 1s interval in runtime/maintenance.rs (retry_pending_delivery), which can re-send the same delivery multiple times before the first injection finishes. This is a functional regression (duplicate or delayed deliveries) introduced by switching from one bulk write to per-byte pacing at this call site.

Useful? React with 👍 / 👎.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@crates/broker/src/pty_worker.rs`:
- Around line 912-918: The current unconditional re-queue on paced write failure
can replay already-sent atoms and corrupt the delivery; modify the failure
handling in the paced write path (where pending, pending.delivery.delivery_id
and pending_worker_injections are used) to be progress-aware: add or use a
progress field on the pending delivery (e.g., pending.offset or
pending.sent_bytes) that is advanced on each successful atom write, and on error
either (A) update pending to point only to the unsent suffix and push_front that
adjusted pending, or (B) if any bytes/atoms were already written, mark the
delivery non-retryable/failed instead of re-queuing. Ensure the code that
performs writes updates pending.offset/sent_bytes so the retry has correct
suffix semantics.
- Around line 909-911: The paced PTY injection (pty.write_paced(...).await) is
blocking the main tokio::select! loop and can stall pty_rx handling and control
pings; refactor so paced writes run outside the select loop — either spawn a
dedicated injection task (which writes atoms at INJECT_PACE_MS intervals and
reports completion/error back via a oneshot/mpsc) or implement a small
state-machine in the worker that writes one atom per tick so the select keeps
draining pty_rx and control channels. Also change the error/requeue behavior:
have write_paced return progress (bytes/atoms written) or track how many atoms
were flushed and on error requeue only the remaining suffix of pending rather
than the entire pending delivery to avoid duplicating already-sent data; update
the pending handling logic to consume the flushed prefix before re-enqueueing
the remainder.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro Plus

Run ID: 430d5f8a-61ee-457c-998a-c152074733fb

📥 Commits

Reviewing files that changed from the base of the PR and between ec38236 and 59938f0.

📒 Files selected for processing (2)
  • crates/broker/src/pty.rs
  • crates/broker/src/pty_worker.rs

Comment on lines +909 to +911
.write_paced(&paced, Duration::from_millis(INJECT_PACE_MS))
.await
{

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Verify call site is inside the main select branch
rg -n -C6 'pending_injection_interval\.tick\(\)|write_paced\(' crates/broker/src/pty_worker.rs

# Verify paced writer behavior (atom loop + per-atom delay)
rg -n -C12 'async fn write_paced|next_atom_end|sleep\(' crates/broker/src/pty.rs

Repository: AgentWorkforce/relay

Length of output: 10053


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Context around the select branch and the write_paced call
sed -n '780,940p' crates/broker/src/pty_worker.rs

# Find and show INJECT_PACE_MS definition
rg -n "INJECT_PACE_MS" crates/broker/src/pty_worker.rs crates/broker/src/pty.rs

# Inspect the error handling / requeue block around the warn
rg -n "PTY paced injection write failed, re-queuing delivery" -n crates/broker/src/pty_worker.rs
sed -n '880,940p' crates/broker/src/pty_worker.rs

# Confirm the select loop body structure and what else is handled concurrently
rg -n "tokio::select!\s*\{" crates/broker/src/pty_worker.rs
sed -n '700,860p' crates/broker/src/pty_worker.rs

# Confirm write_paced sleeps per atom and uses next_atom_end; already partially shown but re-display for exactness
rg -n "pub async fn write_paced" -n crates/broker/src/pty.rs
sed -n '440,540p' crates/broker/src/pty.rs

Repository: AgentWorkforce/relay

Length of output: 24830


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Locate the drainer that handles WriteMsg::UserInput and how it performs the actual PTY write/flush.
rg -n "enum WriteMsg|WriteMsg::UserInput|UserInput" crates/broker/src/pty.rs crates/broker/src/pty_worker.rs

# Show the write/drainer loop implementation around those matches.
rg -n "drain|drainer|WriteMsg" crates/broker/src/pty.rs

Repository: AgentWorkforce/relay

Length of output: 5120


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Show the tokio::select! header and the branch structure around it.
sed -n '340,520p' crates/broker/src/pty_worker.rs

# Show the rest of the select branches around where pending_injection_interval.tick() is located.
sed -n '520,760p' crates/broker/src/pty_worker.rs

# Show continuation around the pending_injection_interval.tick() branch start.
sed -n '760,940p' crates/broker/src/pty_worker.rs

# Inspect the drainer write queue implementation for how it writes and acknowledges, and what errors mean.
sed -n '180,320p' crates/broker/src/pty.rs

# Inspect the error handling inside drain_write_queue for partial write / flush failures.
sed -n '320,520p' crates/broker/src/pty.rs

Repository: AgentWorkforce/relay

Length of output: 46093


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# show write queue depth constant and pty_rx channel capacity already in context but get constants
rg -n "WRITE_QUEUE_DEPTH|mpsc::channel\\(" crates/broker/src/pty.rs
# inspect drain_write_queue around writer.write_all/flush and ack send
sed -n '200,280p' crates/broker/src/pty.rs
# inspect ack error path (full block send/recv)
sed -n '280,460p' crates/broker/src/pty.rs
# inspect tests for partial/flush failure semantics
rg -n "AlwaysFailWriter|FlushFailWriter|ack.send" crates/broker/src/pty.rs
sed -n '1160,1420p' crates/broker/src/pty.rs

Repository: AgentWorkforce/relay

Length of output: 19202


Avoid blocking tokio::select! while paced-injecting PTY input
At crates/broker/src/pty_worker.rs:909-911, pty.write_paced(...).await runs inside the main tokio::select! loop, so a long injection can stall handling of pty_rx output/control (and ping/shutdown) until the entire paced write completes (≈ INJECT_PACE_MS * atom count). Since the PTY reader uses a bounded mpsc::channel(256) and blocking_send, this can backpressure/halt PTY reads while injection is in progress.

Also, on a write_paced error the code requeues the whole pending delivery even though some atoms may already have been written to the PTY, risking duplicated (already-sent) prefixes.

  • Move paced injection off the tokio::select! loop (e.g., state machine that writes one atom per tick, or a separate task with completion/error reported back) so the worker keeps draining pty_rx.
  • On write failure, requeue only the remaining bytes/atoms (track progress from write_paced) or otherwise prevent re-sending the already-flushed prefix.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@crates/broker/src/pty_worker.rs` around lines 909 - 911, The paced PTY
injection (pty.write_paced(...).await) is blocking the main tokio::select! loop
and can stall pty_rx handling and control pings; refactor so paced writes run
outside the select loop — either spawn a dedicated injection task (which writes
atoms at INJECT_PACE_MS intervals and reports completion/error back via a
oneshot/mpsc) or implement a small state-machine in the worker that writes one
atom per tick so the select keeps draining pty_rx and control channels. Also
change the error/requeue behavior: have write_paced return progress (bytes/atoms
written) or track how many atoms were flushed and on error requeue only the
remaining suffix of pending rather than the entire pending delivery to avoid
duplicating already-sent data; update the pending handling logic to consume the
flushed prefix before re-enqueueing the remainder.

Comment on lines 912 to 918
tracing::warn!(
delivery_id = %pending.delivery.delivery_id,
error = %e,
"PTY injection write failed, re-queuing delivery"
"PTY paced injection write failed, re-queuing delivery"
);
pending_worker_injections.push_front(pending);
continue;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Re-queueing on paced-write failure can duplicate partially injected content.

At Line 917, the delivery is re-queued unconditionally. With paced atom-by-atom writes, failure can occur after some atoms are already sent, so retrying the full payload can duplicate a prefix and corrupt command text.

Consider making retries progress-aware (retry only unsent suffix) or treating post-first-atom failures as non-retryable for that delivery.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@crates/broker/src/pty_worker.rs` around lines 912 - 918, The current
unconditional re-queue on paced write failure can replay already-sent atoms and
corrupt the delivery; modify the failure handling in the paced write path (where
pending, pending.delivery.delivery_id and pending_worker_injections are used) to
be progress-aware: add or use a progress field on the pending delivery (e.g.,
pending.offset or pending.sent_bytes) that is advanced on each successful atom
write, and on error either (A) update pending to point only to the unsent suffix
and push_front that adjusted pending, or (B) if any bytes/atoms were already
written, mark the delivery non-retryable/failed instead of re-queuing. Ensure
the code that performs writes updates pending.offset/sent_bytes so the retry has
correct suffix semantics.

@cubic-dev-ai cubic-dev-ai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 issue found across 2 files

Prompt for AI agents (unresolved issues)

Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.


<file name="crates/broker/src/pty_worker.rs">

<violation number="1" location="crates/broker/src/pty_worker.rs:910">
P1: Avoid awaiting the full paced injection inside this select-loop branch; long payloads can block the worker loop for `INJECT_PACE_MS * atom_count` and delay PTY output/control handling.</violation>
</file>

Reply with feedback, questions, or to request a fix.

Re-trigger cubic

paced.push(b'\r');
if let Err(e) = pty
.write_paced(&paced, Duration::from_millis(INJECT_PACE_MS))
.await

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: Avoid awaiting the full paced injection inside this select-loop branch; long payloads can block the worker loop for INJECT_PACE_MS * atom_count and delay PTY output/control handling.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At crates/broker/src/pty_worker.rs, line 910:

<comment>Avoid awaiting the full paced injection inside this select-loop branch; long payloads can block the worker loop for `INJECT_PACE_MS * atom_count` and delay PTY output/control handling.</comment>

<file context>
@@ -890,17 +895,28 @@ pub(crate) async fn run_pty_worker(cmd: PtyCommand) -> Result<()> {
+                    paced.push(b'\r');
+                    if let Err(e) = pty
+                        .write_paced(&paced, Duration::from_millis(INJECT_PACE_MS))
+                        .await
+                    {
                         tracing::warn!(
</file context>

Copy link
Copy Markdown
Member Author

Closing without merging.

On reflection, the cost/benefit doesn't hold up:

What it gets us

  • Escape-sequence atomicity on the wire. But write_all already loops until flushed, and the child's VT parser handles fragmented reads — so this only matters if you believe the child has a read-mid-sequence bug. That's the issue's hypothesis, not a measured fact.
  • "Looks like typing" — anecdotal, not demonstrated here.

What it costs us

  • Per-injection latency goes from ~50 ms (bulk write + 50 ms sleep + \r) to 20 ms × atom_count. A 200-byte body becomes ~4 s; a 1 KB body becomes ~20 s before the trailing \r lands. That stretches verification windows and any downstream throttle timing.

The reference (headless-terminal) is a testing tool whose job is to simulate a human. The broker's job is to deliver a message — different goals, and we shouldn't pay typing-speed latency for it.

Better path: wait until we can reproduce the "Claude eats characters" bug and confirm bulk writing is actually the cause. If it is, revisit with either a much faster rate (1–2 ms or atom-by-atom with no sleep) or a flag-gated knob, with evidence to justify the trade.


Generated by Claude Code

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

broker: escape-aware paced injection (steal from ht)

2 participants