Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .agentworkforce/workforce/personas/slack-comms.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@
"harness": "claude",
"model": "claude-sonnet-4-6",
"systemPrompt": "$TASK_DESCRIPTION",
"permissions": {
"mode": "bypassPermissions"
},
"harnessSettings": {
"reasoning": "medium",
"timeoutSeconds": 600
Expand Down
4 changes: 4 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,7 @@ Pear broker work must treat duplicate delivery as a normal failure mode. Rendere
- Do not post integration or launch metadata on reused broker sessions. Notify agents only after a real broker start, reconnect, or state transition, and make repeated payloads no-ops when possible.
- Add regression tests when touching broker start, event streaming, PTY buffering, spawned personas, or integration notifications. Include duplicate/replay cases, not just the happy path.
- Add low-noise telemetry for suppressed duplicates and missing event identity so replay issues are visible without flooding the terminal.

## Terminal Screen Convergence

The broker daemon's PTY emulation (served as attach snapshots, observable via `agent-relay-broker dump-pty`) is the ground truth for what a worker terminal shows. The renderer's xterm grid can diverge from it (e.g. xterm reflow-scrolls on width resizes; PTY-side emulators don't), and diff-painting TUIs like Claude Code then *preserve* the divergence forever — they skip cells they believe unchanged, so stale glyphs bleed through new rows and stacked repaint frames accumulate. `src/renderer/src/lib/terminal-reconciler.ts` closes the loop: when a terminal is quiet it compares the viewport to the broker's `plain` snapshot and repaints from the self-framing `ansi` snapshot on confirmed divergence. Do not remove it after fixing any individual corruption vector — it is the convergence backstop for the whole class, and its gating invariants (quiet window, activity-serial recheck, dimension match, confirm-twice, rate limit) each guard a real re-corruption path documented in the module header. Repairs log `[terminal] viewport diverged from broker screen` — that line firing is the signal a new creation vector exists and is worth hunting.
46 changes: 46 additions & 0 deletions src/main/broker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
BrokerEventStreamDiagnostic,
BrokerReconciledChatMessage,
BrokerReconcileMessagesInput,
BrokerTerminalSnapshot,
BrokerTerminalSnapshotFormat,
WorkforcePersona
} from '../shared/types/ipc'
import {
Expand Down Expand Up @@ -2926,7 +2928,7 @@
): Promise<void> {
let output = ''
let timer: ReturnType<typeof setTimeout> | undefined
let livenessTimer: ReturnType<typeof setInterval> | undefined

Check warning on line 2931 in src/main/broker.ts

View workflow job for this annotation

GitHub Actions / checks

'livenessTimer' is never reassigned. Use 'const' instead
let settled = false
let resolveReady: (() => void) | undefined
let rejectReady: ((error: Error) => void) | undefined
Expand Down Expand Up @@ -3409,6 +3411,50 @@
}
}

// Side-effect-free read of the broker's authoritative PTY screen, used by
// the renderer's quiet-time terminal reconciler (terminal-reconciler.ts).
// Unlike attachTerminal this never resets input streams, never touches the
// delivery mode, and degrades to null on every failure — a skipped
// reconcile check is always safe, a thrown one floods the IPC log on a
// periodic poll. Polled read ⇒ wedge recovery, same as getPending.
async snapshotTerminal(
projectId: string | undefined,
name: string,
format: BrokerTerminalSnapshotFormat
): Promise<BrokerTerminalSnapshot | null> {
const trimmedName = name.trim()
if (!trimmedName) return null
let session: BrokerSession
try {
session = this.getSessionForAgent(trimmedName, projectId)
} catch {
return null
}
return this.withWedgeRecovery<BrokerTerminalSnapshot | null>(
session,
'snapshotTerminal',
null,
async (current) => {
try {
const snapshot = await current.client.snapshot(trimmedName, format)
return {
rows: snapshot.rows,
cols: snapshot.cols,
cursor: snapshot.cursor,
screen:
format === 'ansi'
? Buffer.from(snapshot.screen, 'base64').toString('utf-8')
: snapshot.screen
}
} catch (err) {
if (isMissingAgentError(err)) return null
throw err
}
},
{ degradeOnTimeout: true }
)
}

async sendMessage(projectId: string | undefined, input: SendMessageInput): Promise<void> {
const session = input.to.startsWith('#')
? this.getSessionForProject(projectId || '')
Expand Down
4 changes: 4 additions & 0 deletions src/main/ipc-handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,10 @@ export function registerIpcHandlers(): void {
await brokerManager.resizePty(projectId, name, rows, cols)
})

ipcMain.handle('broker:snapshot-terminal', async (_, projectId: string | undefined, name: string, format: 'plain' | 'ansi') => {
return brokerManager.snapshotTerminal(projectId, name, format)
})

ipcMain.handle('broker:input-srtt', (_, projectId: string | undefined, name: string) => {
return brokerManager.getInputSrtt(projectId, name)
})
Expand Down
4 changes: 4 additions & 0 deletions src/preload/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import type {
BrokerSpawnAgentInput,
BrokerSpawnAgentResult,
BrokerStatusEvent,
BrokerTerminalSnapshot,
BrokerTerminalSnapshotFormat,
BurnAgentBreakdown,
BurnAgentInput,
BurnAgentSummary,
Expand Down Expand Up @@ -258,6 +260,8 @@ const api = {
invoke<{ flushed: number }>('broker:flush-pending', projectId, name),
resizePty: (projectId: string | undefined, name: string, rows: number, cols: number) =>
invoke<void>('broker:resize-pty', projectId, name, rows, cols),
snapshotTerminal: (projectId: string | undefined, name: string, format: BrokerTerminalSnapshotFormat) =>
invoke<BrokerTerminalSnapshot | null>('broker:snapshot-terminal', projectId, name, format),
inputSrtt: (projectId: string | undefined, name: string) =>
invoke<number | null>('broker:input-srtt', projectId, name),
sendMessage: (projectId: string | undefined, input: BrokerSendMessageInput) =>
Expand Down
70 changes: 67 additions & 3 deletions src/renderer/src/components/issues/AttentionInbox.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import { detectRepo } from '@/lib/issue-scoping'
import { jumpToIssueWork } from '@/lib/issue-navigation'
import { spawnTeamForIssue, type TeamComposition } from '@/lib/spawn-agent'
import { useAgentStore, type ChatMessage } from '@/stores/agent-store'
import { useIssuesStore, type IssueBand, type IssueGithubLink, type IssueViewModel } from '@/stores/issues-store'
import { useIssuesStore, type IssueBand, type IssueGithubLink, type IssueViewModel, type IssueWorkflowState } from '@/stores/issues-store'
import { useProjectStore } from '@/stores/project-store'
import { useUIStore } from '@/stores/ui-store'

Expand Down Expand Up @@ -360,11 +360,15 @@ function IssueGroupSection({
function DetailPanel({
issue,
narration,
onOpenLiveProject
availableStates,
onOpenLiveProject,
onChangeState
}: {
issue: IssueViewModel | null
narration: string
availableStates: IssueWorkflowState[]
onOpenLiveProject: () => void
onChangeState: (issue: IssueViewModel, state: IssueWorkflowState) => void
}): React.ReactNode {
if (!issue) {
return (
Expand Down Expand Up @@ -421,7 +425,34 @@ function DetailPanel({

<div className="mt-5">
<div className="text-[12px] font-semibold uppercase tracking-[0.12em] text-[var(--pear-text-faint)]">Status detail</div>
<dl className="mt-2 grid grid-cols-[88px_1fr] gap-x-3 gap-y-2 text-sm">
<dl className="mt-2 grid grid-cols-[88px_1fr] items-center gap-x-3 gap-y-2 text-sm">
<dt className="text-[var(--pear-text-faint)]">Status</dt>
<dd className="text-[var(--pear-text)]">
{issue.stateId && issue.issueRemotePath && availableStates.length > 0 ? (
<select
value={issue.stateId}
onChange={(event) => {
const next = availableStates.find((state) => state.id === event.target.value)
if (next && next.id !== issue.stateId) onChangeState(issue, next)
}}
className="w-full rounded-md border border-[var(--pear-border)] bg-[var(--pear-bg-surface)] px-2 py-1 text-[13px] font-medium text-[var(--pear-text)] focus:border-[var(--pear-purple)] focus:outline-none"
title="Change Linear status"
>
{!availableStates.some((state) => state.id === issue.stateId) && (
<option value={issue.stateId}>{issue.stage}</option>
)}
{availableStates.map((state) => (
<option key={state.id} value={state.id}>
{state.name}
</option>
))}
</select>
) : (
<span className={`inline-block rounded-full border px-2 py-0.5 text-[11px] font-semibold ${stageClass(issue.stage)}`}>
{issue.stage}
</span>
)}
</dd>
<dt className="text-[var(--pear-text-faint)]">Owner</dt>
<dd className="text-[var(--pear-text)]">{issue.assignedAgentName || issue.assigneeName || 'Unassigned'}</dd>
<dt className="text-[var(--pear-text-faint)]">Actor</dt>
Expand Down Expand Up @@ -529,6 +560,8 @@ export function AttentionInbox({
const lastLoadedAt = useIssuesStore((s) => s.lastLoadedAt)
const load = useIssuesStore((s) => s.load)
const subscribe = useIssuesStore((s) => s.subscribe)
const setIssueState = useIssuesStore((s) => s.setIssueState)
const workflowStates = useIssuesStore((s) => s.workflowStates)
const [expandedBands, setExpandedBands] = useState<Record<IssueBand, boolean>>({
'needs-you': true,
'ready-for-agent': true,
Expand Down Expand Up @@ -571,6 +604,22 @@ export function AttentionInbox({
() => orderStages(Array.from(new Set(issues.map((issue) => issue.stage)))),
[issues]
)
// Prefer the authoritative `/linear/states` list (every workflow state, in
// board order). Fall back to states inferred from loaded issues when the
// states mount isn't live yet — those carry real stateIds too, but only cover
// states currently in use.
const availableStates = useMemo<IssueWorkflowState[]>(() => {
if (workflowStates.length > 0) return workflowStates
const byId = new Map<string, IssueWorkflowState>()
for (const issue of issues) {
if (issue.stateId && !byId.has(issue.stateId)) {
byId.set(issue.stateId, { id: issue.stateId, name: issue.stage, type: issue.stageType })
}
}
return orderStages(Array.from(byId.values()).map((state) => state.name)).map(
(name) => Array.from(byId.values()).find((state) => state.name === name) as IssueWorkflowState
)
}, [workflowStates, issues])
const activeStatusFilter = statusFilter && availableStages.includes(statusFilter) ? statusFilter : null
const visibleIssues = activeStatusFilter
? issues.filter((issue) => issue.stage === activeStatusFilter)
Expand Down Expand Up @@ -613,6 +662,19 @@ export function AttentionInbox({
setNavNotice(result.message)
}

async function handleChangeState(issue: IssueViewModel, state: IssueWorkflowState): Promise<void> {
if (!resolvedProjectId) {
setNavNotice('No project for status change')
return
}
try {
await setIssueState(resolvedProjectId, issue.id, state)
setNavNotice(`Moved ${issue.identifier} → ${state.name}`)
} catch (error) {
setNavNotice(error instanceof Error ? error.message : 'Failed to change status')
}
}

async function handleSpawnTeam(issue: IssueViewModel): Promise<void> {
const project = resolvedProjectId ? useProjectStore.getState().projects.find((candidate) => candidate.id === resolvedProjectId) : undefined
if (!project) {
Expand Down Expand Up @@ -794,7 +856,9 @@ export function AttentionInbox({
<DetailPanel
issue={selectedIssue}
narration={selectedIssue ? narrations.get(selectedIssue.id) || selectedIssue.agentNarration || '' : ''}
availableStates={availableStates}
onOpenLiveProject={() => selectedIssue && void openLiveProject(selectedIssue)}
onChangeState={handleChangeState}
/>

{navNotice && (
Expand Down
42 changes: 42 additions & 0 deletions src/renderer/src/lib/ipc-mock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -605,11 +605,23 @@ const mockGithubRecords: Record<string, Record<string, unknown>> = {
}
}

// Mirrors the materialized `/linear/states` resource (adapter-linear `states`).
const mockLinearStates: Array<Record<string, unknown>> = [
{ id: 'state-planning', name: 'Planning', type: 'unstarted', color: '#9aa0aa', position: 0 },
{ id: 'state-to-do', name: 'To do', type: 'unstarted', color: '#9aa0aa', position: 1 },
{ id: 'state-ready-for-agent', name: 'Ready for Agent', type: 'unstarted', color: '#b083f0', position: 2 },
{ id: 'state-in-progress', name: 'In Progress', type: 'started', color: '#5a8de6', position: 3 },
{ id: 'state-in-review', name: 'In review', type: 'started', color: '#e6d78d', position: 4 },
{ id: 'state-merged', name: 'Merged', type: 'completed', color: '#6cc36c', position: 5 },
{ id: 'state-done', name: 'Done', type: 'completed', color: '#6cc36c', position: 6 }
]

const mockRemoteFiles: Record<string, Record<string, unknown> | string> = Object.fromEntries([
...mockLinearIssues.map((issue) => [
`/linear/issues/${String(issue.identifier)}.json`,
issue
]),
...mockLinearStates.map((state) => [`/linear/states/${String(state.id)}.json`, state]),
...Object.entries(mockGithubRecords)
])

Expand Down Expand Up @@ -897,6 +909,9 @@ export const pearMock: PearAPI = {
getPending: async (): Promise<PendingRelayMessage[]> => [],
flushPending: async () => ({ flushed: 0 }),
resizePty: async () => undefined,
// No authoritative PTY emulation behind the mock: the reconciler treats
// null as "skip this check", so it stays dormant in web/harness builds.
snapshotTerminal: async () => null,
inputSrtt: async () => mockInputSrttMs,
sendMessage: async (projectId: string | undefined, input: BrokerSendMessageInput) => {
handleInjectedBrokerEvent({
Expand Down Expand Up @@ -1085,6 +1100,14 @@ export const pearMock: PearAPI = {
})
}

if (normalized === '/linear/states') {
return mockLinearStates.map((state) => ({
name: `${String(state.id)}.json`,
path: `/linear/states/${String(state.id)}.json`,
type: 'file'
}))
}

if (normalized === '/github/repos/AgentWorkforce/pear/issues') {
return Object.keys(mockGithubRecords).map((path) => ({
name: path.split('/').at(-1) || path,
Expand All @@ -1105,6 +1128,25 @@ export const pearMock: PearAPI = {
writeRemoteFile: async (_projectId: string, remotePath: string, content: string): Promise<void> => {
const normalized = normalizeMockRemotePath(remotePath)
console.log('[ipc-mock] writeRemoteFile', normalized, content.length, 'bytes')
// Mirror the adapter's Edit/PATCH semantics for canonical issue records:
// merge the included mutable fields (e.g. { stateId }) into the existing
// record and re-derive the embedded `state` so a reload reflects the move.
const existing = mockRemoteFiles[normalized]
const issueMatch = /^\/linear\/issues\/[^/]+\.json$/.test(normalized)
if (issueMatch && existing && typeof existing !== 'string') {
try {
const patch = JSON.parse(content) as Record<string, unknown>
const merged = { ...existing, ...patch }
if (typeof patch.stateId === 'string') {
const state = mockLinearStates.find((candidate) => candidate.id === patch.stateId)
if (state) merged.state = state
}
mockRemoteFiles[normalized] = merged
return
} catch {
// fall through to raw write on unparseable payloads
}
}
mockRemoteFiles[normalized] = content
},
Comment thread
coderabbitai[bot] marked this conversation as resolved.
readMountPreview: async (): Promise<FsReadPreviewResult> => ({ kind: 'missing', content: '', size: 0 }),
Expand Down
Loading
Loading