diff --git a/.agentworkforce/workforce/personas/slack-comms.json b/.agentworkforce/workforce/personas/slack-comms.json index 7ab9abdd..1b581ee9 100644 --- a/.agentworkforce/workforce/personas/slack-comms.json +++ b/.agentworkforce/workforce/personas/slack-comms.json @@ -38,6 +38,9 @@ "harness": "claude", "model": "claude-sonnet-4-6", "systemPrompt": "$TASK_DESCRIPTION", + "permissions": { + "mode": "bypassPermissions" + }, "harnessSettings": { "reasoning": "medium", "timeoutSeconds": 600 diff --git a/AGENTS.md b/AGENTS.md index d21e150e..1db8bc4d 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -12,3 +12,7 @@ Pear broker work must treat duplicate delivery as a normal failure mode. Rendere - Do not post integration or launch metadata on reused broker sessions. Notify agents only after a real broker start, reconnect, or state transition, and make repeated payloads no-ops when possible. - Add regression tests when touching broker start, event streaming, PTY buffering, spawned personas, or integration notifications. Include duplicate/replay cases, not just the happy path. - Add low-noise telemetry for suppressed duplicates and missing event identity so replay issues are visible without flooding the terminal. + +## Terminal Screen Convergence + +The broker daemon's PTY emulation (served as attach snapshots, observable via `agent-relay-broker dump-pty`) is the ground truth for what a worker terminal shows. The renderer's xterm grid can diverge from it (e.g. xterm reflow-scrolls on width resizes; PTY-side emulators don't), and diff-painting TUIs like Claude Code then *preserve* the divergence forever — they skip cells they believe unchanged, so stale glyphs bleed through new rows and stacked repaint frames accumulate. `src/renderer/src/lib/terminal-reconciler.ts` closes the loop: when a terminal is quiet it compares the viewport to the broker's `plain` snapshot and repaints from the self-framing `ansi` snapshot on confirmed divergence. Do not remove it after fixing any individual corruption vector — it is the convergence backstop for the whole class, and its gating invariants (quiet window, activity-serial recheck, dimension match, confirm-twice, rate limit) each guard a real re-corruption path documented in the module header. Repairs log `[terminal] viewport diverged from broker screen` — that line firing is the signal a new creation vector exists and is worth hunting. diff --git a/src/main/broker.ts b/src/main/broker.ts index 84d9d5ad..74a9e37b 100644 --- a/src/main/broker.ts +++ b/src/main/broker.ts @@ -48,6 +48,8 @@ import type { BrokerEventStreamDiagnostic, BrokerReconciledChatMessage, BrokerReconcileMessagesInput, + BrokerTerminalSnapshot, + BrokerTerminalSnapshotFormat, WorkforcePersona } from '../shared/types/ipc' import { @@ -3409,6 +3411,50 @@ export class BrokerManager { } } + // Side-effect-free read of the broker's authoritative PTY screen, used by + // the renderer's quiet-time terminal reconciler (terminal-reconciler.ts). + // Unlike attachTerminal this never resets input streams, never touches the + // delivery mode, and degrades to null on every failure — a skipped + // reconcile check is always safe, a thrown one floods the IPC log on a + // periodic poll. Polled read ⇒ wedge recovery, same as getPending. + async snapshotTerminal( + projectId: string | undefined, + name: string, + format: BrokerTerminalSnapshotFormat + ): Promise { + const trimmedName = name.trim() + if (!trimmedName) return null + let session: BrokerSession + try { + session = this.getSessionForAgent(trimmedName, projectId) + } catch { + return null + } + return this.withWedgeRecovery( + session, + 'snapshotTerminal', + null, + async (current) => { + try { + const snapshot = await current.client.snapshot(trimmedName, format) + return { + rows: snapshot.rows, + cols: snapshot.cols, + cursor: snapshot.cursor, + screen: + format === 'ansi' + ? Buffer.from(snapshot.screen, 'base64').toString('utf-8') + : snapshot.screen + } + } catch (err) { + if (isMissingAgentError(err)) return null + throw err + } + }, + { degradeOnTimeout: true } + ) + } + async sendMessage(projectId: string | undefined, input: SendMessageInput): Promise { const session = input.to.startsWith('#') ? this.getSessionForProject(projectId || '') diff --git a/src/main/ipc-handlers.ts b/src/main/ipc-handlers.ts index e465d0c5..da2eff02 100644 --- a/src/main/ipc-handlers.ts +++ b/src/main/ipc-handlers.ts @@ -338,6 +338,10 @@ export function registerIpcHandlers(): void { await brokerManager.resizePty(projectId, name, rows, cols) }) + ipcMain.handle('broker:snapshot-terminal', async (_, projectId: string | undefined, name: string, format: 'plain' | 'ansi') => { + return brokerManager.snapshotTerminal(projectId, name, format) + }) + ipcMain.handle('broker:input-srtt', (_, projectId: string | undefined, name: string) => { return brokerManager.getInputSrtt(projectId, name) }) diff --git a/src/preload/index.ts b/src/preload/index.ts index 1e3cfbc0..3f130d65 100644 --- a/src/preload/index.ts +++ b/src/preload/index.ts @@ -22,6 +22,8 @@ import type { BrokerSpawnAgentInput, BrokerSpawnAgentResult, BrokerStatusEvent, + BrokerTerminalSnapshot, + BrokerTerminalSnapshotFormat, BurnAgentBreakdown, BurnAgentInput, BurnAgentSummary, @@ -258,6 +260,8 @@ const api = { invoke<{ flushed: number }>('broker:flush-pending', projectId, name), resizePty: (projectId: string | undefined, name: string, rows: number, cols: number) => invoke('broker:resize-pty', projectId, name, rows, cols), + snapshotTerminal: (projectId: string | undefined, name: string, format: BrokerTerminalSnapshotFormat) => + invoke('broker:snapshot-terminal', projectId, name, format), inputSrtt: (projectId: string | undefined, name: string) => invoke('broker:input-srtt', projectId, name), sendMessage: (projectId: string | undefined, input: BrokerSendMessageInput) => diff --git a/src/renderer/src/components/issues/AttentionInbox.tsx b/src/renderer/src/components/issues/AttentionInbox.tsx index abb46f77..e5dc0a2b 100644 --- a/src/renderer/src/components/issues/AttentionInbox.tsx +++ b/src/renderer/src/components/issues/AttentionInbox.tsx @@ -23,7 +23,7 @@ import { detectRepo } from '@/lib/issue-scoping' import { jumpToIssueWork } from '@/lib/issue-navigation' import { spawnTeamForIssue, type TeamComposition } from '@/lib/spawn-agent' import { useAgentStore, type ChatMessage } from '@/stores/agent-store' -import { useIssuesStore, type IssueBand, type IssueGithubLink, type IssueViewModel } from '@/stores/issues-store' +import { useIssuesStore, type IssueBand, type IssueGithubLink, type IssueViewModel, type IssueWorkflowState } from '@/stores/issues-store' import { useProjectStore } from '@/stores/project-store' import { useUIStore } from '@/stores/ui-store' @@ -360,11 +360,15 @@ function IssueGroupSection({ function DetailPanel({ issue, narration, - onOpenLiveProject + availableStates, + onOpenLiveProject, + onChangeState }: { issue: IssueViewModel | null narration: string + availableStates: IssueWorkflowState[] onOpenLiveProject: () => void + onChangeState: (issue: IssueViewModel, state: IssueWorkflowState) => void }): React.ReactNode { if (!issue) { return ( @@ -421,7 +425,34 @@ function DetailPanel({
Status detail
-
+
+
Status
+
+ {issue.stateId && issue.issueRemotePath && availableStates.length > 0 ? ( + + ) : ( + + {issue.stage} + + )} +
Owner
{issue.assignedAgentName || issue.assigneeName || 'Unassigned'}
Actor
@@ -529,6 +560,8 @@ export function AttentionInbox({ const lastLoadedAt = useIssuesStore((s) => s.lastLoadedAt) const load = useIssuesStore((s) => s.load) const subscribe = useIssuesStore((s) => s.subscribe) + const setIssueState = useIssuesStore((s) => s.setIssueState) + const workflowStates = useIssuesStore((s) => s.workflowStates) const [expandedBands, setExpandedBands] = useState>({ 'needs-you': true, 'ready-for-agent': true, @@ -571,6 +604,22 @@ export function AttentionInbox({ () => orderStages(Array.from(new Set(issues.map((issue) => issue.stage)))), [issues] ) + // Prefer the authoritative `/linear/states` list (every workflow state, in + // board order). Fall back to states inferred from loaded issues when the + // states mount isn't live yet — those carry real stateIds too, but only cover + // states currently in use. + const availableStates = useMemo(() => { + if (workflowStates.length > 0) return workflowStates + const byId = new Map() + for (const issue of issues) { + if (issue.stateId && !byId.has(issue.stateId)) { + byId.set(issue.stateId, { id: issue.stateId, name: issue.stage, type: issue.stageType }) + } + } + return orderStages(Array.from(byId.values()).map((state) => state.name)).map( + (name) => Array.from(byId.values()).find((state) => state.name === name) as IssueWorkflowState + ) + }, [workflowStates, issues]) const activeStatusFilter = statusFilter && availableStages.includes(statusFilter) ? statusFilter : null const visibleIssues = activeStatusFilter ? issues.filter((issue) => issue.stage === activeStatusFilter) @@ -613,6 +662,19 @@ export function AttentionInbox({ setNavNotice(result.message) } + async function handleChangeState(issue: IssueViewModel, state: IssueWorkflowState): Promise { + if (!resolvedProjectId) { + setNavNotice('No project for status change') + return + } + try { + await setIssueState(resolvedProjectId, issue.id, state) + setNavNotice(`Moved ${issue.identifier} → ${state.name}`) + } catch (error) { + setNavNotice(error instanceof Error ? error.message : 'Failed to change status') + } + } + async function handleSpawnTeam(issue: IssueViewModel): Promise { const project = resolvedProjectId ? useProjectStore.getState().projects.find((candidate) => candidate.id === resolvedProjectId) : undefined if (!project) { @@ -794,7 +856,9 @@ export function AttentionInbox({ selectedIssue && void openLiveProject(selectedIssue)} + onChangeState={handleChangeState} /> {navNotice && ( diff --git a/src/renderer/src/lib/ipc-mock.ts b/src/renderer/src/lib/ipc-mock.ts index bc5af514..dbaa8608 100644 --- a/src/renderer/src/lib/ipc-mock.ts +++ b/src/renderer/src/lib/ipc-mock.ts @@ -605,11 +605,23 @@ const mockGithubRecords: Record> = { } } +// Mirrors the materialized `/linear/states` resource (adapter-linear `states`). +const mockLinearStates: Array> = [ + { id: 'state-planning', name: 'Planning', type: 'unstarted', color: '#9aa0aa', position: 0 }, + { id: 'state-to-do', name: 'To do', type: 'unstarted', color: '#9aa0aa', position: 1 }, + { id: 'state-ready-for-agent', name: 'Ready for Agent', type: 'unstarted', color: '#b083f0', position: 2 }, + { id: 'state-in-progress', name: 'In Progress', type: 'started', color: '#5a8de6', position: 3 }, + { id: 'state-in-review', name: 'In review', type: 'started', color: '#e6d78d', position: 4 }, + { id: 'state-merged', name: 'Merged', type: 'completed', color: '#6cc36c', position: 5 }, + { id: 'state-done', name: 'Done', type: 'completed', color: '#6cc36c', position: 6 } +] + const mockRemoteFiles: Record | string> = Object.fromEntries([ ...mockLinearIssues.map((issue) => [ `/linear/issues/${String(issue.identifier)}.json`, issue ]), + ...mockLinearStates.map((state) => [`/linear/states/${String(state.id)}.json`, state]), ...Object.entries(mockGithubRecords) ]) @@ -897,6 +909,9 @@ export const pearMock: PearAPI = { getPending: async (): Promise => [], flushPending: async () => ({ flushed: 0 }), resizePty: async () => undefined, + // No authoritative PTY emulation behind the mock: the reconciler treats + // null as "skip this check", so it stays dormant in web/harness builds. + snapshotTerminal: async () => null, inputSrtt: async () => mockInputSrttMs, sendMessage: async (projectId: string | undefined, input: BrokerSendMessageInput) => { handleInjectedBrokerEvent({ @@ -1085,6 +1100,14 @@ export const pearMock: PearAPI = { }) } + if (normalized === '/linear/states') { + return mockLinearStates.map((state) => ({ + name: `${String(state.id)}.json`, + path: `/linear/states/${String(state.id)}.json`, + type: 'file' + })) + } + if (normalized === '/github/repos/AgentWorkforce/pear/issues') { return Object.keys(mockGithubRecords).map((path) => ({ name: path.split('/').at(-1) || path, @@ -1105,6 +1128,25 @@ export const pearMock: PearAPI = { writeRemoteFile: async (_projectId: string, remotePath: string, content: string): Promise => { const normalized = normalizeMockRemotePath(remotePath) console.log('[ipc-mock] writeRemoteFile', normalized, content.length, 'bytes') + // Mirror the adapter's Edit/PATCH semantics for canonical issue records: + // merge the included mutable fields (e.g. { stateId }) into the existing + // record and re-derive the embedded `state` so a reload reflects the move. + const existing = mockRemoteFiles[normalized] + const issueMatch = /^\/linear\/issues\/[^/]+\.json$/.test(normalized) + if (issueMatch && existing && typeof existing !== 'string') { + try { + const patch = JSON.parse(content) as Record + const merged = { ...existing, ...patch } + if (typeof patch.stateId === 'string') { + const state = mockLinearStates.find((candidate) => candidate.id === patch.stateId) + if (state) merged.state = state + } + mockRemoteFiles[normalized] = merged + return + } catch { + // fall through to raw write on unparseable payloads + } + } mockRemoteFiles[normalized] = content }, readMountPreview: async (): Promise => ({ kind: 'missing', content: '', size: 0 }), diff --git a/src/renderer/src/lib/terminal-reconciler.test.ts b/src/renderer/src/lib/terminal-reconciler.test.ts new file mode 100644 index 00000000..2b669d6d --- /dev/null +++ b/src/renderer/src/lib/terminal-reconciler.test.ts @@ -0,0 +1,226 @@ +import { describe, expect, it, vi } from 'vitest' +import { + createTerminalReconciler, + screensMatch, + RECONCILE_CONFIRM_CHECKS, + RECONCILE_MIN_REPAIR_GAP_MS, + type ReconcileSnapshot, + type ReconcileViewport, + type TerminalReconcilerDeps +} from './terminal-reconciler' + +interface Harness { + deps: TerminalReconcilerDeps + repairs: string[] + state: { + plain: ReconcileSnapshot | null + ansi: ReconcileSnapshot | null + viewport: ReconcileViewport | null + quiet: boolean + serial: number + now: number + flushBumpsSerial: boolean + onFetch?: (format: 'plain' | 'ansi') => void + } +} + +function makeHarness(): Harness { + const repairs: string[] = [] + const state: Harness['state'] = { + plain: { rows: 2, cols: 10, screen: 'row one\nrow two' }, + ansi: { rows: 2, cols: 10, screen: '\x1b[0m\x1b[H\x1b[2J\x1b[1;1Hrow one\x1b[2;1Hrow two' }, + viewport: { rows: 2, cols: 10, lines: ['row one', 'row two'] }, + quiet: true, + serial: 0, + now: 1_000_000, + flushBumpsSerial: false + } + const deps: TerminalReconcilerDeps = { + fetchSnapshot: async (format) => { + state.onFetch?.(format) + return format === 'plain' ? state.plain : state.ansi + }, + readViewport: () => state.viewport, + writeRepair: (ansi) => repairs.push(ansi), + isQuiet: () => state.quiet, + activitySerial: () => state.serial, + flushPending: () => { + if (state.flushBumpsSerial) state.serial += 1 + }, + now: () => state.now + } + return { deps, repairs, state } +} + +function diverge(state: Harness['state']): void { + state.viewport = { rows: 2, cols: 10, lines: ['row one', 'GARBAGE tw'] } +} + +async function confirmCycles(reconciler: { checkNow(): Promise }): Promise { + for (let i = 0; i < RECONCILE_CONFIRM_CHECKS; i += 1) { + await reconciler.checkNow() + } +} + +describe('screensMatch', () => { + it('matches identical screens and ignores trailing whitespace + blank tail rows', () => { + expect(screensMatch('a\nb', ['a', 'b'])).toBe(true) + expect(screensMatch('a \nb', ['a', 'b '])).toBe(true) + expect(screensMatch('a\nb\n\n', ['a', 'b'])).toBe(true) + expect(screensMatch('a\nb', ['a', 'b', '', ''])).toBe(true) + }) + + it('detects divergent rows', () => { + expect(screensMatch('a\nb', ['a', 'x'])).toBe(false) + expect(screensMatch('a\nb', ['a'])).toBe(false) + // Stale glyphs bleeding through spaces — the real-world signature. + expect(screensMatch('Do you want to proceed?', ['Do3youowant to proceed?'])).toBe(false) + }) +}) + +describe('createTerminalReconciler', () => { + it('does not repair when screens match', async () => { + const { deps, repairs } = makeHarness() + const reconciler = createTerminalReconciler(deps) + await confirmCycles(reconciler) + expect(repairs).toEqual([]) + reconciler.dispose() + }) + + it('repairs a divergence after the confirmation streak, exactly once', async () => { + const { deps, repairs, state } = makeHarness() + const reconciler = createTerminalReconciler(deps) + diverge(state) + await reconciler.checkNow() + expect(repairs).toEqual([]) // first sighting: not yet confirmed + await reconciler.checkNow() + expect(repairs).toEqual([state.ansi!.screen]) + expect(reconciler.repairs()).toBe(1) + // Streak reset: the next mismatch sighting starts over. + await reconciler.checkNow() + expect(repairs).toHaveLength(1) + reconciler.dispose() + }) + + it('never fetches while not quiet', async () => { + const { deps, state } = makeHarness() + const fetches: string[] = [] + state.onFetch = (format) => fetches.push(format) + state.quiet = false + const reconciler = createTerminalReconciler(deps) + diverge(state) + await confirmCycles(reconciler) + expect(fetches).toEqual([]) + reconciler.dispose() + }) + + it('aborts when output arrives during the plain fetch', async () => { + const { deps, repairs, state } = makeHarness() + state.onFetch = (format) => { + if (format === 'plain') state.serial += 1 + } + const reconciler = createTerminalReconciler(deps) + diverge(state) + await confirmCycles(reconciler) + expect(repairs).toEqual([]) + reconciler.dispose() + }) + + it('aborts when output arrives during the ansi fetch', async () => { + const { deps, repairs, state } = makeHarness() + const reconciler = createTerminalReconciler(deps) + diverge(state) + await reconciler.checkNow() + state.onFetch = (format) => { + if (format === 'ansi') state.serial += 1 + } + await reconciler.checkNow() + expect(repairs).toEqual([]) + reconciler.dispose() + }) + + it('aborts when the pre-write flush surfaces staged chunks', async () => { + const { deps, repairs, state } = makeHarness() + state.flushBumpsSerial = true + const reconciler = createTerminalReconciler(deps) + diverge(state) + await confirmCycles(reconciler) + expect(repairs).toEqual([]) + reconciler.dispose() + }) + + it('skips while snapshot and grid dimensions disagree (resize in flight)', async () => { + const { deps, repairs, state } = makeHarness() + const reconciler = createTerminalReconciler(deps) + diverge(state) + state.viewport!.rows = 3 + state.viewport!.lines.push('') + await confirmCycles(reconciler) + expect(repairs).toEqual([]) + reconciler.dispose() + }) + + it('rate-limits repairs', async () => { + const { deps, repairs, state } = makeHarness() + const reconciler = createTerminalReconciler(deps) + diverge(state) + await confirmCycles(reconciler) + expect(repairs).toHaveLength(1) + // Still divergent (repair write didn't fix the fake viewport): confirm + // again — inside the gap window nothing happens, past it a second + // repair lands. + await confirmCycles(reconciler) + expect(repairs).toHaveLength(1) + state.now += RECONCILE_MIN_REPAIR_GAP_MS + 1 + await confirmCycles(reconciler) + expect(repairs).toHaveLength(2) + reconciler.dispose() + }) + + it('treats a null snapshot as a skipped check', async () => { + const { deps, repairs, state } = makeHarness() + state.plain = null + const reconciler = createTerminalReconciler(deps) + diverge(state) + await confirmCycles(reconciler) + expect(repairs).toEqual([]) + reconciler.dispose() + }) + + it('stops checking after dispose', async () => { + const { deps, state } = makeHarness() + const fetches: string[] = [] + state.onFetch = (format) => fetches.push(format) + const reconciler = createTerminalReconciler(deps) + reconciler.dispose() + await reconciler.checkNow() + expect(fetches).toEqual([]) + }) + + it('runs on an interval without overlapping checks', async () => { + vi.useFakeTimers() + try { + const { deps, state } = makeHarness() + let inFlight = 0 + let maxInFlight = 0 + let release: (() => void) | null = null + deps.fetchSnapshot = async () => { + inFlight += 1 + maxInFlight = Math.max(maxInFlight, inFlight) + await new Promise((resolve) => { + release = resolve + }) + inFlight -= 1 + return state.plain + } + const reconciler = createTerminalReconciler(deps) + await vi.advanceTimersByTimeAsync(4_000) + await vi.advanceTimersByTimeAsync(4_000) + expect(maxInFlight).toBe(1) + release?.() + reconciler.dispose() + } finally { + vi.useRealTimers() + } + }) +}) diff --git a/src/renderer/src/lib/terminal-reconciler.ts b/src/renderer/src/lib/terminal-reconciler.ts new file mode 100644 index 00000000..b1828943 --- /dev/null +++ b/src/renderer/src/lib/terminal-reconciler.ts @@ -0,0 +1,169 @@ +// Quiet-time screen reconciliation against the broker's authoritative PTY +// emulation. +// +// Why this exists: the rendering pipeline has had repeated rounds of +// corruption fixes, each closing one *creation* vector (dropped chunks, +// resize gating, echo ordering, prediction strands). But the class has a +// nastier property than any single vector: modern TUIs (Claude Code's +// renderer in particular) repaint by DIFFING against their own model of the +// screen — they skip cells they believe unchanged, using cursor-forward +// moves instead of rewriting. So a single divergence between the renderer's +// xterm grid and the PTY-side truth (e.g. an xterm reflow scroll during a +// width resize that the PTY-side emulator doesn't perform) is never healed +// by subsequent output: every diff-repaint preserves the stale cells, and +// the corruption compounds (stacked panels, old glyphs bleeding through the +// spaces of new rows). +// +// The broker daemon maintains its own emulation of every worker PTY — the +// same screen it serves as the attach snapshot, observable via +// `agent-relay-broker dump-pty`. That emulation consumes the byte stream +// in-order, atomically with resizes, and is the ground truth the renderer +// must converge to. This module polls it when the terminal is QUIET and the +// screen is suspect, and repaints the viewport from the broker's ANSI +// reproduction stream when a divergence is confirmed — mosh-style state +// convergence layered over the existing event stream. +// +// Safety invariants (each guards a real re-corruption vector): +// - Only check while quiet: no server output for RECONCILE_QUIET_MS, window +// visible (a hidden window stalls the rAF chunk flush, so "no output" is +// not trustworthy there), no outstanding optimistic-echo predictions. +// - A repair write only lands if the activity serial is unchanged across +// the snapshot fetch AND after a forced flush of staged chunks — a chunk +// that raced the fetch may already be inside the snapshot; replaying it on +// top of the repaired screen would double-apply bytes. +// - Dimensions must match exactly (snapshot rows/cols == grid rows/cols); +// a mismatch means a resize is still propagating and a repaint would be +// framed for the wrong grid. +// - A divergence must be confirmed on two consecutive checks before a +// repair, and repairs are rate-limited; this thing must never flap. +// +// The repair payload is the broker's ANSI snapshot, which is self-framing +// (leading reset + home + erase-display + absolute row addressing + cursor +// restore), so writing it onto a dirty grid fully replaces the viewport +// without touching scrollback. + +export const RECONCILE_CHECK_INTERVAL_MS = 4_000 +export const RECONCILE_QUIET_MS = 1_500 +export const RECONCILE_MIN_REPAIR_GAP_MS = 15_000 +// Confirmations required on consecutive checks before repairing. +export const RECONCILE_CONFIRM_CHECKS = 2 + +export interface ReconcileSnapshot { + rows: number + cols: number + /** Row text for `plain`; ANSI reproduction stream for `ansi`. */ + screen: string +} + +export interface ReconcileViewport { + rows: number + cols: number + /** Right-trimmed text of each viewport row. */ + lines: string[] +} + +export interface TerminalReconcilerDeps { + fetchSnapshot(format: 'plain' | 'ansi'): Promise + readViewport(): ReconcileViewport | null + /** Write the ANSI repair stream to the live terminal (ordered sink). */ + writeRepair(ansi: string): void + /** All quiet-gate conditions: see module doc. */ + isQuiet(): boolean + /** Monotonic counter, bumped once per server-output delivery. */ + activitySerial(): number + /** Force rAF-staged chunks out so the serial reflects everything received. */ + flushPending(): void + log?(message: string): void + now?(): number +} + +export interface TerminalReconciler { + /** Run one check cycle now (the interval calls this internally). */ + checkNow(): Promise + repairs(): number + dispose(): void +} + +export function createTerminalReconciler(deps: TerminalReconcilerDeps): TerminalReconciler { + const now = deps.now ?? Date.now + const log = deps.log ?? ((): void => undefined) + let disposed = false + let checking = false + let mismatchStreak = 0 + let lastRepairAt = 0 + let repairCount = 0 + + const timer = setInterval(() => { + void check() + }, RECONCILE_CHECK_INTERVAL_MS) + + const check = async (): Promise => { + if (disposed || checking) return + if (!deps.isQuiet()) return + checking = true + try { + await checkInner() + } finally { + checking = false + } + } + + const checkInner = async (): Promise => { + const serial = deps.activitySerial() + const plain = await deps.fetchSnapshot('plain') + if (disposed || !plain) return + // Output during the fetch (or a quiet-gate change) invalidates the + // comparison: the two screens were captured at different stream points. + if (deps.activitySerial() !== serial || !deps.isQuiet()) return + const viewport = deps.readViewport() + if (!viewport) return + if (plain.rows !== viewport.rows || plain.cols !== viewport.cols) return + if (screensMatch(plain.screen, viewport.lines)) { + mismatchStreak = 0 + return + } + mismatchStreak += 1 + if (mismatchStreak < RECONCILE_CONFIRM_CHECKS) return + if (now() - lastRepairAt < RECONCILE_MIN_REPAIR_GAP_MS) return + + const ansi = await deps.fetchSnapshot('ansi') + if (disposed || !ansi) return + if (deps.activitySerial() !== serial || !deps.isQuiet()) return + if (ansi.rows !== viewport.rows || ansi.cols !== viewport.cols) return + // Final gate, synchronous with the write: force any staged chunks out. + // If one lands, its bytes may already be inside the snapshot we are + // about to paint — abort and let the next cycle re-verify. + deps.flushPending() + if (disposed || deps.activitySerial() !== serial) return + deps.writeRepair(ansi.screen) + lastRepairAt = now() + mismatchStreak = 0 + repairCount += 1 + log( + `[terminal] viewport diverged from broker screen; repainted from snapshot (repair #${repairCount})` + ) + } + + return { + checkNow: check, + repairs: () => repairCount, + dispose(): void { + disposed = true + clearInterval(timer) + } + } +} + +// Plain snapshots are newline-joined rows; xterm viewport lines come in +// right-trimmed. Compare row-by-row, right-trimmed, treating absent rows as +// blank — trailing blank rows are representational noise, not divergence. +export function screensMatch(plainScreen: string, viewportLines: string[]): boolean { + const snapshotLines = plainScreen.split('\n') + const rows = Math.max(snapshotLines.length, viewportLines.length) + for (let row = 0; row < rows; row += 1) { + const expected = (snapshotLines[row] ?? '').replace(/\s+$/, '') + const actual = (viewportLines[row] ?? '').replace(/\s+$/, '') + if (expected !== actual) return false + } + return true +} diff --git a/src/renderer/src/lib/terminal-runtime-registry.ts b/src/renderer/src/lib/terminal-runtime-registry.ts index add889d2..a6c7c0ff 100644 --- a/src/renderer/src/lib/terminal-runtime-registry.ts +++ b/src/renderer/src/lib/terminal-runtime-registry.ts @@ -34,6 +34,7 @@ import { recordChunkEchoed } from '@/lib/typing-trace' import { createPredictiveEcho, type PredictiveEchoWithStatus } from '@/lib/predictive-echo' import { createPtySizeSync } from '@/lib/pty-size-sync' import { buildModelSeedFromTerminal, createEchoRouter } from '@/lib/echo-router' +import { createTerminalReconciler, RECONCILE_QUIET_MS } from '@/lib/terminal-reconciler' import type { PredictiveEcho } from '@agent-relay/harness-driver/predictive-echo' import { awaitFontSettle } from '@/lib/font-settle' import type { Theme } from '@/stores/ui-store' @@ -302,6 +303,11 @@ function createRuntime( // attach snapshot was painted. Chunks at or below this total are already // on screen (inside the snapshot); only chunks past it get replayed. let writtenTotal = 0 + // Reconciler activity tracking: serial bumps once per server-output + // delivery; lastOutputAt gates the quiet window. Both are read by the + // quiet-time screen reconciler below. + let activitySerial = 0 + let lastOutputAt = 0 let attachSeeded = false let attachInFlight = false let pendingInitFrame: number | null = null @@ -334,6 +340,40 @@ function createRuntime( isViewportPinned: () => (term ? isViewportPinnedToBottom(term) : false), scrollToBottom: () => term?.scrollToBottom() }) + // Quiet-time convergence to the broker's authoritative screen. Catches the + // divergence class no creation-vector fix can: once the grid and a diffing + // TUI's model disagree (e.g. an xterm reflow scroll during a width + // resize), every subsequent diff-repaint preserves the stale cells. See + // terminal-reconciler.ts for the gating invariants. + const reconciler = createTerminalReconciler({ + fetchSnapshot: (format) => pear.broker.snapshotTerminal(opts.projectId, opts.agentName, format), + readViewport: () => { + if (!term || !opened) return null + const buffer = term.buffer.active + const lines: string[] = [] + for (let row = 0; row < term.rows; row += 1) { + const line = buffer.getLine(buffer.baseY + row) + lines.push(line ? line.translateToString(true) : '') + } + return { rows: term.rows, cols: term.cols, lines } + }, + writeRepair: (ansi) => { + // Through the router so the repair is ordered behind any queued engine + // writes and, on the engine route, also repairs the engine's model. + echoRouter.onServerOutput(ansi) + }, + isQuiet: () => { + if (disposed || !attachSeeded || currentToken === null || !opened) return false + // A hidden window stalls the rAF chunk flush, so "no recent output" + // says nothing about what is actually pending — never reconcile there. + if (document.visibilityState !== 'visible') return false + if (predictiveEcho?.hasPredictions) return false + return Date.now() - lastOutputAt >= RECONCILE_QUIET_MS + }, + activitySerial: () => activitySerial, + flushPending: () => flushPtyChunksNow(key), + log: (message) => console.warn(message) + }) const cancelPendingInit = (): void => { if (pendingInitFrame !== null) { @@ -405,6 +445,8 @@ function createRuntime( const writeChunks = (newChunks: string[]): void => { if (disposed || !term) return if (newChunks.length === 0) return + activitySerial += 1 + lastOutputAt = Date.now() // Optional diagnostic, gated on localStorage.PEAR_DIAG_PTY === '1'. // See pty-buffer-store.ts for the enable instructions. Flag is // cached to avoid a per-batch localStorage read. @@ -642,6 +684,7 @@ function createRuntime( // writeFromBuffer notification triggered by clearPtyBuffer (with []) // runs while the closure is still consistent. cancelPendingInit() + reconciler.dispose() sizeSync.dispose() echoRouter.dispose() clearPtyBuffer(key) diff --git a/src/renderer/src/stores/issues-store.ts b/src/renderer/src/stores/issues-store.ts index 15a154b5..ec62b1c1 100644 --- a/src/renderer/src/stores/issues-store.ts +++ b/src/renderer/src/stores/issues-store.ts @@ -22,10 +22,15 @@ export interface IssueViewModel { description: string stage: string stageType?: string + stateId?: string + /** Canonical mount path of the issue file, used for writeback (state changes, comments). */ + issueRemotePath?: string actor: 'agent' | 'pairing' | 'human' | 'unknown' labels: string[] assigneeName?: string assignedAgentName?: string + teamId?: string + teamKey?: string teamName?: string projectName?: string priority?: number @@ -40,14 +45,32 @@ export interface IssueViewModel { githubLinks: IssueGithubLink[] } +export interface IssueWorkflowState { + id: string + name: string + type?: string +} + interface IssuesState { issues: IssueViewModel[] + /** + * Authoritative workflow states from the materialized `/linear/states` mount, + * when available. Empty until the states resource is in the project scope and + * synced — callers should fall back to states derived from loaded issues. + */ + workflowStates: IssueWorkflowState[] loading: boolean error: string | null loadedProjectId: string | null lastLoadedAt: number | null load: (projectId: string, options?: { force?: boolean }) => Promise subscribe: (projectId: string) => () => void + /** + * Change an issue's Linear workflow state via writeback. Writes the target + * `stateId` to the canonical issue file path and optimistically updates the + * local view model; the resulting `relayfile-change` event reconciles it. + */ + setIssueState: (projectId: string, issueId: string, state: IssueWorkflowState) => Promise } const STAGE_ORDER = ['Backlog', 'Planning', 'To do', 'In Progress', 'In review', 'Merged', 'Done'] @@ -221,7 +244,11 @@ async function readGithubLink(projectId: string, sync: Record): } } -async function normalizeIssue(projectId: string, raw: Record): Promise { +async function normalizeIssue( + projectId: string, + raw: Record, + remotePath?: string +): Promise { const issue = payloadRecord(raw) const state = readRecord(issue.state) const attention = readRecord(issue.attention) @@ -247,10 +274,14 @@ async function normalizeIssue(projectId: string, raw: Record): description: readString(issue.description) || '', stage, stageType: readString(state.type), + stateId: readString(state.id) || readString(issue.stateId) || readString(issue.state_id), + issueRemotePath: remotePath, actor: resolveActor(labels, !!assigneeName), labels, assigneeName, assignedAgentName, + teamId: readString(team.id) || readString(issue.teamId) || readString(issue.team_id), + teamKey: readString(team.key), teamName: readString(team.name) || readString(team.key), projectName: readString(project.name), priority: readNumber(issue.priority), @@ -270,6 +301,51 @@ async function normalizeIssue(projectId: string, raw: Record): } } +/** + * Build the writeback payload for an issue state change. + * + * The `@relayfile/adapter-linear` `issues` resource (discovery/linear/.adapter.md) + * defines Edit semantics as: "write the resource update payload to the canonical + * resource path; included mutable fields PATCH; fields marked readOnly in + * .schema.json are rejected." The schema is `additionalProperties: false`, so we + * send ONLY the single mutable field we intend to change — `stateId`. Including + * identity/`state`/`team` objects (readOnly or non-schema) would be rejected. + */ +function buildStateWritebackPayload(state: IssueWorkflowState): Record { + return { stateId: state.id } +} + +/** + * Load the authoritative workflow-state list from the materialized + * `/linear/states` mount (adapter-linear `states` resource). Returns [] if the + * resource is out of scope, not yet synced, or unreadable — callers fall back to + * states derived from loaded issues. + */ +async function loadWorkflowStates(projectId: string): Promise { + try { + const entries = await pear.integrations.listRemoteDir(projectId, '/linear/states') + const parsed: Array<{ id: string; name: string; type?: string; position: number }> = [] + await Promise.all( + entries.filter(isIssueFile).map(async (entry) => { + const preview = await pear.integrations.readRemoteFile(projectId, entry.path) + if (preview.kind !== 'text') return + const record = parseJsonPreview(preview.content, entry.path) + if (!record) return + const data = payloadRecord(record) + const id = readString(data.id) + const name = readString(data.name) + if (!id || !name) return + parsed.push({ id, name, type: readString(data.type), position: readNumber(data.position) ?? Number.MAX_SAFE_INTEGER }) + }) + ) + return parsed + .sort((a, b) => a.position - b.position) + .map(({ id, name, type }) => ({ id, name, type })) + } catch { + return [] + } +} + function isIssueFile(entry: FsDirEntry): boolean { return entry.type === 'file' && entry.path.endsWith('.json') && !entry.path.endsWith('/_index.json') } @@ -296,6 +372,7 @@ function scheduleRefresh(projectId: string, generation: number, load: IssuesStat export const useIssuesStore = create((set, get) => ({ issues: [], + workflowStates: [], loading: false, error: null, loadedProjectId: null, @@ -309,21 +386,26 @@ export const useIssuesStore = create((set, get) => ({ const promise = (async () => { set({ loading: true, error: null }) try { - const entries = await pear.integrations.listRemoteDir(projectId, '/linear/issues') + const [entries, workflowStates] = await Promise.all([ + pear.integrations.listRemoteDir(projectId, '/linear/issues'), + loadWorkflowStates(projectId) + ]) const records = await Promise.all( entries.filter(isIssueFile).map(async (entry) => { const preview = await pear.integrations.readRemoteFile(projectId, entry.path) - return preview.kind === 'text' ? parseJsonPreview(preview.content, entry.path) : null + const record = preview.kind === 'text' ? parseJsonPreview(preview.content, entry.path) : null + return record ? { path: entry.path, record } : null }) ) const issues = await Promise.all( records - .filter((record): record is Record => !!record) - .map((record) => normalizeIssue(projectId, record)) + .filter((entry): entry is { path: string; record: Record } => !!entry) + .map(({ path, record }) => normalizeIssue(projectId, record, path)) ) set({ issues: sortIssues(issues), + workflowStates, loading: false, error: null, loadedProjectId: projectId, @@ -370,6 +452,34 @@ export const useIssuesStore = create((set, get) => ({ refreshTimer = null } } + }, + + setIssueState: async (projectId, issueId, state) => { + const issue = get().issues.find((candidate) => candidate.id === issueId) + if (!issue) throw new Error(`Issue ${issueId} not found`) + if (!issue.issueRemotePath) throw new Error('Issue is missing its mount path; cannot change status') + if (!state.id) throw new Error('Target workflow state id is required') + if (state.id === issue.stateId) return + + const payload = buildStateWritebackPayload(state) + await pear.integrations.writeRemoteFile(projectId, issue.issueRemotePath, JSON.stringify(payload, null, 2)) + + // Optimistic update; the resulting relayfile-change event reconciles via load(). + set((current) => ({ + issues: sortIssues( + current.issues.map((candidate) => + candidate.id === issueId + ? { + ...candidate, + stage: state.name, + stageType: state.type ?? candidate.stageType, + stateId: state.id, + band: classifyIssue(state.name, state.type, candidate.labels, {}) + } + : candidate + ) + ) + })) } })) diff --git a/src/shared/types/ipc.ts b/src/shared/types/ipc.ts index b18eb5a2..d4b478c2 100644 --- a/src/shared/types/ipc.ts +++ b/src/shared/types/ipc.ts @@ -383,6 +383,16 @@ export interface BrokerSetTerminalModeResult { pending: number } +export type BrokerTerminalSnapshotFormat = 'plain' | 'ansi' + +export interface BrokerTerminalSnapshot { + rows: number + cols: number + cursor: [number, number] + /** Row text for `plain`; decoded ANSI reproduction byte stream for `ansi`. */ + screen: string +} + export interface BrokerSendMessageInput { to: string text: string @@ -877,6 +887,11 @@ export interface PearAPI { getPending: (projectId: string | undefined, name: string) => Promise flushPending: (projectId: string | undefined, name: string) => Promise<{ flushed: number }> resizePty: (projectId: string | undefined, name: string, rows: number, cols: number) => Promise + snapshotTerminal: ( + projectId: string | undefined, + name: string, + format: BrokerTerminalSnapshotFormat + ) => Promise inputSrtt: (projectId: string | undefined, name: string) => Promise sendMessage: (projectId: string | undefined, input: BrokerSendMessageInput) => Promise reconcileMessages: (input: BrokerReconcileMessagesInput) => Promise