Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions packages/opencode/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1074,6 +1074,24 @@ export namespace Config {
.positive()
.optional()
.describe("Timeout in milliseconds for model context protocol (MCP) requests"),
tool_timeout: z
.number()
.int()
.positive()
.optional()
.describe("Maximum duration in milliseconds before the watchdog force-errors a stuck tool"),
task_timeout: z
.number()
.int()
.positive()
.optional()
.describe("Maximum duration in milliseconds before the watchdog force-errors a stuck task tool"),
idle_timeout: z
.number()
.int()
.positive()
.optional()
.describe("Duration in milliseconds of inactivity before the watchdog cancels an idle session"),
})
.optional(),
})
Expand Down
215 changes: 215 additions & 0 deletions packages/opencode/src/project/bootstrap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,18 @@ import { Command } from "../command"
import { Instance } from "./instance"
import { Log } from "@/util/log"
import { ShareNext } from "@/share/share-next"
import { Database, sql } from "../storage/db"
import { PartTable, SessionTable } from "../session/session.sql"
import { SessionPrompt } from "../session/prompt"
import { SessionActivity } from "../session/activity"
import { SessionID } from "../session/schema"
import { Config } from "../config/config"

const log = Log.create({ service: "bootstrap" })

const WATCHDOG_INTERVAL = 60_000
const MAX_RUNNING = 45 * 60 * 1_000
const DEFAULT_IDLE = 5 * 60 * 1_000

export async function InstanceBootstrap() {
Log.Default.info("bootstrapping", { directory: Instance.directory })
Expand All @@ -22,10 +34,213 @@ export async function InstanceBootstrap() {
FileWatcher.init()
Vcs.init()
Snapshot.init()
SessionActivity.init()
cleanupOrphanedParts()
watchdog()

Bus.subscribe(Command.Event.Executed, async (payload) => {
if (payload.properties.name === Command.Default.INIT) {
Project.setInitialized(Instance.project.id)
}
})
}

/**
* Mark any tool parts left in "running" state from a previous process as errored.
* When the process exits (crash or clean shutdown), in-flight tool executions
* are lost but their DB state remains "running" forever. This recovers them.
*/
function cleanupOrphanedParts() {
const now = Date.now()
Database.use((db) => {
const orphaned = db
.select({ id: PartTable.id })
.from(PartTable)
.where(
sql`json_extract(${PartTable.data}, '$.type') = 'tool'
AND json_extract(${PartTable.data}, '$.state.status') = 'running'`,
)
.all()
if (orphaned.length === 0) return
log.info("cleaning up orphaned tool parts", { count: orphaned.length })
db.update(PartTable)
.set({
data: sql`json_set(
json_set(
json_set(${PartTable.data}, '$.state.status', 'error'),
'$.state.error', 'Tool execution orphaned by process restart'
),
'$.state.time.end', ${now}
)`,
})
.where(
sql`json_extract(${PartTable.data}, '$.type') = 'tool'
AND json_extract(${PartTable.data}, '$.state.status') = 'running'`,
)
.run()
})
}

/**
* Single watchdog tick: find tool parts stuck in "running" beyond the cutoff,
* filter to leaf-level tools, cancel their sessions, and force-error the
* DB rows as a safety net.
*
* Only cancels "leaf" stuck tools — i.e. non-task tools that are the actual
* root cause. Task tools that are waiting on a child session with its own
* stuck tool are left alone so the normal error-propagation path can run:
* child cancel → task tool resolves → parent LLM processes the error.
*
* Exported for testing.
*/
export function watchdogTick(cutoff: number, idle?: number, taskCutoff?: number) {
Database.use((db) => {
const stuck = db
.select({
id: PartTable.id,
session_id: PartTable.session_id,
tool: sql<string>`json_extract(${PartTable.data}, '$.tool')`,
child: sql<string | null>`json_extract(${PartTable.data}, '$.state.metadata.sessionId')`,
start: sql<number>`json_extract(${PartTable.data}, '$.state.time.start')`,
})
.from(PartTable)
.where(
sql`json_extract(${PartTable.data}, '$.type') = 'tool'
AND json_extract(${PartTable.data}, '$.state.status') = 'running'
AND json_extract(${PartTable.data}, '$.state.time.start') < ${Math.max(cutoff, taskCutoff ?? cutoff)}`,
)
.all()
// Apply per-tool-type cutoff: task tools use taskCutoff, others use cutoff
.filter((r) => r.start < (r.tool === "task" ? (taskCutoff ?? cutoff) : cutoff))

const cancelled = new Set<SessionID>()

if (stuck.length > 0) {
// Sessions that contain at least one stuck tool
const stuckSessions = new Set(stuck.map((r) => SessionID.make(r.session_id)))

// A task tool whose child session also has stuck tools is just
// waiting — it will resolve once the child is cancelled.
// Everything else (non-task tools, or task tools whose child has
// no stuck tools) is a leaf that we must force-error.
const leaf = stuck.filter((r) => {
if (r.tool !== "task") return true
if (!r.child) return true
return !stuckSessions.has(SessionID.make(r.child))
})

log.warn("watchdog: found stuck tool parts", {
total: stuck.length,
leaf: leaf.length,
ids: stuck.map((r) => r.id),
})

if (leaf.length > 0) {
// For task-tool leaves, cancel the *child* session so the task tool's
// normal error-propagation path runs: child cancel → SessionPrompt.prompt()
// resolves → task tool returns structured TIMEOUT to the parent LLM.
// For non-task leaves, cancel the owning session directly.
for (const r of leaf) {
if (r.tool === "task" && r.child) {
const sid = SessionID.make(r.child)
if (cancelled.has(sid)) continue
cancelled.add(sid)
log.warn("watchdog: cancelling stuck child session", { child: r.child, parent: r.session_id })
SessionPrompt.cancel(sid).catch(() => {})
} else {
const sid = SessionID.make(r.session_id)
if (cancelled.has(sid)) continue
cancelled.add(sid)
log.warn("watchdog: cancelling stuck session", { sessionID: r.session_id })
SessionPrompt.cancel(sid).catch(() => {})
}
}

// DB update as redundant safety net — only for leaf tools
const now = Date.now()
for (const r of leaf) {
db.update(PartTable)
.set({
data: sql`json_set(
json_set(
json_set(${PartTable.data}, '$.state.status', 'error'),
'$.state.error', 'Tool execution exceeded maximum allowed duration (watchdog)'
),
'$.state.time.end', ${now}
)`,
})
.where(
sql`${PartTable.id} = ${r.id}
AND json_extract(${PartTable.data}, '$.state.status') = 'running'`,
)
.run()
}
}
}

// --- Independent idle detection sweep ---
// Runs on every tick when idle param is provided, regardless of
// whether any stuck tool parts were found above.
// Only targets child (subagent) sessions — root sessions are never
// idle-cancelled since the user controls their lifecycle.
if (idle) {
const stale = Object.entries(SessionActivity.list())
.filter(([id]) => {
if (cancelled.has(SessionID.make(id))) return false
return SessionActivity.stale(id, idle)
})
.map(([id]) => id)
if (stale.length > 0) {
// Batch-check which stale sessions are children (have parent_id)
const children = new Set(
db
.select({ id: SessionTable.id })
.from(SessionTable)
.where(
sql`${SessionTable.id} IN (${sql.join(
stale.map((id) => sql`${id}`),
sql`, `,
)})
AND ${SessionTable.parent_id} IS NOT NULL`,
)
.all()
.map((r) => r.id),
)
for (const id of stale) {
const sid = SessionID.make(id)
if (!children.has(sid)) continue
const ts = SessionActivity.last(id)
log.warn("watchdog: idle session detected", {
sessionID: id,
last: ts,
threshold: idle,
})
cancelled.add(sid)
SessionPrompt.cancel(sid).catch(() => {})
}
}
}
})
}

/**
* Periodic scan for tool parts stuck in "running" beyond the configured timeout.
* Safety net for cases where the bash hard-stop or abort signal also fails.
* Respects both tool_timeout and task_timeout config to avoid killing
* long-running but healthy Task tool executions.
*/
function watchdog() {
const timer = setInterval(async () => {
try {
const cfg = await Config.get()
const tool = cfg.experimental?.tool_timeout ?? MAX_RUNNING
const task = cfg.experimental?.task_timeout ?? 1_800_000
const idle = cfg.experimental?.idle_timeout ?? DEFAULT_IDLE
const now = Date.now()
watchdogTick(now - tool, idle, now - (task + 60_000))
} catch {
watchdogTick(Date.now() - MAX_RUNNING)
}
}, WATCHDOG_INTERVAL)
timer.unref()
}
78 changes: 78 additions & 0 deletions packages/opencode/src/session/activity.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import { Bus } from "@/bus"
import { Instance } from "@/project/instance"
import { MessageV2 } from "./message-v2"
import { Log } from "@/util/log"

const log = Log.create({ service: "session.activity" })

/**
* Tracks per-session last-activity timestamps via Bus events.
*
* Activity signals:
* - message.part.delta (token streaming — highest frequency)
* - message.part.updated (tool state changes, text completions)
*
* The watchdog queries `stale(sessionID, threshold)` to detect sessions
* that have had no activity for longer than the threshold, distinguishing
* "genuinely stuck / idle" from "actively streaming or executing tools".
*
* Root sessions (no parentID) are never subject to idle detection — only
* subagent sessions spawned by the task tool are monitored.
*/
export namespace SessionActivity {
const state = Instance.state(() => {
const data: Record<string, number> = {}
return data
})

/** Update the last-activity timestamp for a session. */
export function touch(id: string, now = Date.now()) {
state()[id] = now
}

/** Return the last-activity timestamp, or undefined if never recorded. */
export function last(id: string): number | undefined {
return state()[id]
}

/**
* True when the session has had no activity for longer than `threshold` ms.
* Returns false for sessions with no recorded activity (they haven't
* started yet, so they aren't stale).
*/
export function stale(id: string, threshold: number): boolean {
const ts = state()[id]
if (ts === undefined) return false
return Date.now() - ts > threshold
}

/** Remove tracking for a session (cleanup on cancel / completion). */
export function remove(id: string) {
delete state()[id]
}

/** Snapshot of all tracked sessions — for diagnostics / logging. */
export function list() {
return { ...state() }
}

/**
* Subscribe to Bus events that indicate session activity.
* Call once during bootstrap (idempotent per Instance lifecycle
* since Instance.state is scoped to the current instance).
*/
export function init() {
log.info("init")

// Token deltas — fires on every chunk from the LLM stream.
// This is the highest-frequency signal and acts as a natural heartbeat.
Bus.subscribe(MessageV2.Event.PartDelta, (evt) => {
touch(evt.properties.sessionID)
})

// Part state changes — tool start/complete/error, text start/end, etc.
Bus.subscribe(MessageV2.Event.PartUpdated, (evt) => {
touch(evt.properties.part.sessionID)
})
}
}
Loading
Loading