Skip to content

Commit 61c774f

Browse files
viratttclaude
andcommitted
Add cron system and consolidate heartbeat into it
New cron subsystem (src/cron/) lets the agent create, manage, and execute scheduled tasks. Jobs run as isolated agent turns with full tool access and deliver results via WhatsApp. Features: - 3 schedule types: at (one-shot), every (interval), cron (expression) - Fulfillment modes: keep (ongoing), once (auto-disable after alert), ask - Active hours gating, error backoff, HEARTBEAT_OK suppression - Agent-facing cron tool: list/add/update/remove/run The heartbeat runner is removed — on gateway startup, the heartbeat config is migrated into a cron job named "Heartbeat". The heartbeat tool syncs HEARTBEAT.md edits to the cron job. One scheduler handles everything. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 649e1b5 commit 61c774f

File tree

14 files changed

+13131
-205
lines changed

14 files changed

+13131
-205
lines changed

package-lock.json

Lines changed: 12224 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
"@mozilla/readability": "^0.6.0",
3030
"@whiskeysockets/baileys": "7.0.0-rc.9",
3131
"better-sqlite3": "^12.8.0",
32+
"croner": "^9.1.0",
3233
"diff": "^8.0.4",
3334
"dotenv": "^17.3.1",
3435
"exa-js": "^2.10.1",

src/cron/executor.ts

Lines changed: 255 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,255 @@
1+
import { appendFileSync } from 'node:fs';
2+
import { runAgentForMessage } from '../gateway/agent-runner.js';
3+
import {
4+
evaluateSuppression,
5+
HEARTBEAT_OK_TOKEN,
6+
type SuppressionState,
7+
} from '../gateway/heartbeat/suppression.js';
8+
import { assertOutboundAllowed, sendMessageWhatsApp } from '../gateway/channels/whatsapp/index.js';
9+
import { resolveSessionStorePath, loadSessionStore, type SessionEntry } from '../gateway/sessions/store.js';
10+
import { cleanMarkdownForWhatsApp } from '../gateway/utils.js';
11+
import { getSetting } from '../utils/config.js';
12+
import { dexterPath } from '../utils/paths.js';
13+
import { saveCronStore } from './store.js';
14+
import { computeNextRunAtMs } from './schedule.js';
15+
import type { ActiveHours, CronJob, CronStore } from './types.js';
16+
17+
const LOG_PATH = dexterPath('gateway-debug.log');
18+
19+
function debugLog(msg: string) {
20+
appendFileSync(LOG_PATH, `${new Date().toISOString()} ${msg}\n`);
21+
}
22+
23+
// Per-job suppression state (in memory, resets on process restart)
24+
const suppressionStates = new Map<string, SuppressionState>();
25+
26+
const BACKOFF_SCHEDULE_MS = [
27+
30_000, // 1st error → 30s
28+
60_000, // 2nd → 1 min
29+
5 * 60_000, // 3rd → 5 min
30+
15 * 60_000, // 4th → 15 min
31+
60 * 60_000, // 5th+ → 60 min
32+
];
33+
34+
const MAX_AT_RETRIES = 3;
35+
const SCHEDULE_ERROR_DISABLE_THRESHOLD = 3;
36+
37+
function getSuppressionState(jobId: string): SuppressionState {
38+
let state = suppressionStates.get(jobId);
39+
if (!state) {
40+
state = { lastMessageText: null, lastMessageAt: null };
41+
suppressionStates.set(jobId, state);
42+
}
43+
return state;
44+
}
45+
46+
/**
47+
* Check if the current time is within configured active hours and days.
48+
*/
49+
function isWithinActiveHours(activeHours?: ActiveHours): boolean {
50+
if (!activeHours) return true;
51+
52+
const tz = activeHours.timezone ?? 'America/New_York';
53+
const now = new Date();
54+
55+
const allowedDays = activeHours.daysOfWeek ?? [1, 2, 3, 4, 5];
56+
const dayFormatter = new Intl.DateTimeFormat('en-US', { timeZone: tz, weekday: 'short' });
57+
const dayStr = dayFormatter.format(now);
58+
const dayMap: Record<string, number> = { Sun: 0, Mon: 1, Tue: 2, Wed: 3, Thu: 4, Fri: 5, Sat: 6 };
59+
const currentDay = dayMap[dayStr] ?? now.getDay();
60+
if (!allowedDays.includes(currentDay)) return false;
61+
62+
const timeFormatter = new Intl.DateTimeFormat('en-US', {
63+
timeZone: tz,
64+
hour: '2-digit',
65+
minute: '2-digit',
66+
hour12: false,
67+
});
68+
const currentTime = timeFormatter.format(now);
69+
return currentTime >= activeHours.start && currentTime <= activeHours.end;
70+
}
71+
72+
function errorBackoffMs(consecutiveErrors: number): number {
73+
const idx = Math.min(consecutiveErrors - 1, BACKOFF_SCHEDULE_MS.length - 1);
74+
return BACKOFF_SCHEDULE_MS[Math.max(0, idx)];
75+
}
76+
77+
/**
78+
* Find the most recently updated session with a delivery target.
79+
* Same pattern as heartbeat runner.
80+
*/
81+
function findTargetSession(): SessionEntry | null {
82+
const storePath = resolveSessionStorePath('default');
83+
const store = loadSessionStore(storePath);
84+
const entries = Object.values(store).filter((e) => e.lastTo);
85+
if (entries.length === 0) return null;
86+
entries.sort((a, b) => b.updatedAt - a.updatedAt);
87+
return entries[0];
88+
}
89+
90+
/**
91+
* Execute a single cron job: run isolated agent, evaluate suppression,
92+
* deliver via WhatsApp, apply fulfillment mode, update state.
93+
*/
94+
export async function executeCronJob(
95+
job: CronJob,
96+
store: CronStore,
97+
_params: { configPath?: string },
98+
): Promise<void> {
99+
const startedAt = Date.now();
100+
101+
// 0. Check active hours
102+
if (!isWithinActiveHours(job.activeHours)) {
103+
debugLog(`[cron] job ${job.id}: outside active hours, skipping`);
104+
scheduleNextRun(job, store);
105+
return;
106+
}
107+
108+
debugLog(`[cron] executing job "${job.name}" (${job.id})`);
109+
110+
// 1. Find WhatsApp delivery target
111+
const session = findTargetSession();
112+
if (!session?.lastTo || !session?.lastAccountId) {
113+
debugLog(`[cron] job ${job.id}: no delivery target, skipping`);
114+
scheduleNextRun(job, store);
115+
return;
116+
}
117+
118+
// 2. Verify outbound allowed
119+
try {
120+
assertOutboundAllowed({ to: session.lastTo, accountId: session.lastAccountId });
121+
} catch {
122+
debugLog(`[cron] job ${job.id}: outbound blocked, skipping`);
123+
scheduleNextRun(job, store);
124+
return;
125+
}
126+
127+
// 3. Resolve model
128+
const model = job.payload.model ?? (getSetting('modelId', 'gpt-5.4') as string);
129+
const modelProvider = job.payload.modelProvider ?? (getSetting('provider', 'openai') as string);
130+
131+
// 4. Build query
132+
let query = `[CRON JOB: ${job.name}]\n\n${job.payload.message}`;
133+
if (job.fulfillment === 'ask') {
134+
query += '\n\nIf you find something noteworthy, also ask the user if they want to continue monitoring this.';
135+
}
136+
query += `\n\n## Instructions\n- If nothing noteworthy, respond with exactly: ${HEARTBEAT_OK_TOKEN}\n- Do NOT send a message just to say "everything is fine"\n- Keep alerts brief and focused — lead with the key finding`;
137+
138+
// 5. Run agent
139+
let answer: string;
140+
try {
141+
answer = await runAgentForMessage({
142+
sessionKey: `cron:${job.id}`,
143+
query,
144+
model,
145+
modelProvider,
146+
maxIterations: 6,
147+
isolatedSession: true,
148+
channel: 'whatsapp',
149+
});
150+
} catch (err) {
151+
handleJobError(job, store, err, startedAt);
152+
return;
153+
}
154+
155+
const durationMs = Date.now() - startedAt;
156+
157+
// 6. Evaluate suppression
158+
const suppState = getSuppressionState(job.id);
159+
const suppResult = evaluateSuppression(answer, suppState);
160+
161+
// 7. Update job state
162+
job.state.lastRunAtMs = startedAt;
163+
job.state.lastDurationMs = durationMs;
164+
job.state.consecutiveErrors = 0;
165+
166+
if (suppResult.shouldSuppress) {
167+
job.state.lastRunStatus = 'suppressed';
168+
debugLog(`[cron] job ${job.id}: suppressed (${suppResult.reason})`);
169+
} else {
170+
job.state.lastRunStatus = 'ok';
171+
172+
// Deliver via WhatsApp
173+
const cleaned = cleanMarkdownForWhatsApp(suppResult.cleanedText);
174+
await sendMessageWhatsApp({
175+
to: session.lastTo,
176+
body: cleaned,
177+
accountId: session.lastAccountId,
178+
});
179+
debugLog(`[cron] job ${job.id}: delivered to ${session.lastTo}`);
180+
181+
// Update suppression state for duplicate detection
182+
suppState.lastMessageText = suppResult.cleanedText;
183+
suppState.lastMessageAt = Date.now();
184+
185+
// Apply fulfillment mode
186+
if (job.fulfillment === 'once') {
187+
job.enabled = false;
188+
job.state.nextRunAtMs = undefined;
189+
debugLog(`[cron] job ${job.id}: auto-disabled (fulfillment=once)`);
190+
job.updatedAtMs = Date.now();
191+
saveCronStore(store);
192+
return;
193+
}
194+
}
195+
196+
scheduleNextRun(job, store);
197+
}
198+
199+
function scheduleNextRun(job: CronJob, store: CronStore): void {
200+
const now = Date.now();
201+
202+
try {
203+
const nextRun = computeNextRunAtMs(job.schedule, now);
204+
if (nextRun === undefined) {
205+
// One-shot expired or invalid schedule
206+
job.enabled = false;
207+
job.state.nextRunAtMs = undefined;
208+
} else {
209+
job.state.nextRunAtMs = nextRun;
210+
}
211+
job.state.scheduleErrorCount = 0;
212+
} catch {
213+
job.state.scheduleErrorCount += 1;
214+
if (job.state.scheduleErrorCount >= SCHEDULE_ERROR_DISABLE_THRESHOLD) {
215+
job.enabled = false;
216+
job.state.nextRunAtMs = undefined;
217+
debugLog(`[cron] job ${job.id}: disabled after ${SCHEDULE_ERROR_DISABLE_THRESHOLD} schedule errors`);
218+
}
219+
}
220+
221+
job.updatedAtMs = Date.now();
222+
saveCronStore(store);
223+
}
224+
225+
function handleJobError(job: CronJob, store: CronStore, err: unknown, startedAt: number): void {
226+
const errorMsg = err instanceof Error ? err.message : String(err);
227+
job.state.lastRunAtMs = startedAt;
228+
job.state.lastDurationMs = Date.now() - startedAt;
229+
job.state.lastRunStatus = 'error';
230+
job.state.lastError = errorMsg;
231+
job.state.consecutiveErrors += 1;
232+
233+
debugLog(`[cron] job ${job.id}: error #${job.state.consecutiveErrors}: ${errorMsg}`);
234+
235+
const now = Date.now();
236+
237+
if (job.schedule.kind === 'at') {
238+
// One-shot: retry up to MAX_AT_RETRIES, then disable
239+
if (job.state.consecutiveErrors >= MAX_AT_RETRIES) {
240+
job.enabled = false;
241+
job.state.nextRunAtMs = undefined;
242+
debugLog(`[cron] job ${job.id}: disabled after ${MAX_AT_RETRIES} retries (at job)`);
243+
} else {
244+
job.state.nextRunAtMs = now + errorBackoffMs(job.state.consecutiveErrors);
245+
}
246+
} else {
247+
// Recurring: apply exponential backoff
248+
const normalNext = computeNextRunAtMs(job.schedule, now);
249+
const backoff = now + errorBackoffMs(job.state.consecutiveErrors);
250+
job.state.nextRunAtMs = normalNext ? Math.max(normalNext, backoff) : backoff;
251+
}
252+
253+
job.updatedAtMs = Date.now();
254+
saveCronStore(store);
255+
}

src/cron/heartbeat-migration.ts

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
import { randomBytes } from 'node:crypto';
2+
import { loadGatewayConfig } from '../gateway/config.js';
3+
import { buildHeartbeatQuery } from '../gateway/heartbeat/prompt.js';
4+
import { loadCronStore, saveCronStore } from './store.js';
5+
import { computeNextRunAtMs } from './schedule.js';
6+
import type { CronJob } from './types.js';
7+
8+
const HEARTBEAT_JOB_NAME = 'Heartbeat';
9+
10+
/**
11+
* Ensure a cron job exists for the heartbeat config.
12+
* Called once at gateway startup. If heartbeat is enabled in gateway.json
13+
* and no cron job named "Heartbeat" exists, creates one.
14+
* If the heartbeat config changed (interval, active hours, model),
15+
* updates the existing job.
16+
*/
17+
export async function ensureHeartbeatCronJob(configPath?: string): Promise<void> {
18+
const cfg = loadGatewayConfig(configPath);
19+
const hb = cfg.gateway.heartbeat;
20+
21+
if (!hb?.enabled) return;
22+
23+
const store = loadCronStore();
24+
const existing = store.jobs.find((j) => j.name === HEARTBEAT_JOB_NAME);
25+
26+
// Build the heartbeat query from HEARTBEAT.md (or defaults)
27+
const query = await buildHeartbeatQuery();
28+
if (query === null) return; // HEARTBEAT.md exists but is empty
29+
30+
const everyMs = (hb.intervalMinutes ?? 30) * 60 * 1000;
31+
32+
if (existing) {
33+
// Update existing job to match current config
34+
existing.schedule = { kind: 'every', everyMs };
35+
existing.payload.message = query;
36+
existing.payload.model = hb.model;
37+
existing.payload.modelProvider = hb.modelProvider;
38+
existing.activeHours = hb.activeHours
39+
? {
40+
start: hb.activeHours.start ?? '09:30',
41+
end: hb.activeHours.end ?? '16:00',
42+
timezone: hb.activeHours.timezone,
43+
daysOfWeek: hb.activeHours.daysOfWeek,
44+
}
45+
: undefined;
46+
if (!existing.enabled) {
47+
existing.enabled = true;
48+
existing.state.consecutiveErrors = 0;
49+
existing.state.scheduleErrorCount = 0;
50+
}
51+
if (!existing.state.nextRunAtMs) {
52+
existing.state.nextRunAtMs = computeNextRunAtMs(existing.schedule, Date.now());
53+
}
54+
existing.updatedAtMs = Date.now();
55+
saveCronStore(store);
56+
return;
57+
}
58+
59+
// Create new heartbeat cron job
60+
const now = Date.now();
61+
const job: CronJob = {
62+
id: randomBytes(8).toString('hex'),
63+
name: HEARTBEAT_JOB_NAME,
64+
description: 'Periodic heartbeat check from HEARTBEAT.md',
65+
enabled: true,
66+
createdAtMs: now,
67+
updatedAtMs: now,
68+
schedule: { kind: 'every', everyMs },
69+
payload: {
70+
message: query,
71+
model: hb.model,
72+
modelProvider: hb.modelProvider,
73+
},
74+
fulfillment: 'keep',
75+
activeHours: hb.activeHours
76+
? {
77+
start: hb.activeHours.start ?? '09:30',
78+
end: hb.activeHours.end ?? '16:00',
79+
timezone: hb.activeHours.timezone,
80+
daysOfWeek: hb.activeHours.daysOfWeek,
81+
}
82+
: undefined,
83+
state: {
84+
nextRunAtMs: computeNextRunAtMs({ kind: 'every', everyMs }, now),
85+
consecutiveErrors: 0,
86+
scheduleErrorCount: 0,
87+
},
88+
};
89+
90+
store.jobs.push(job);
91+
saveCronStore(store);
92+
}

0 commit comments

Comments
 (0)