diff --git a/packages/runtime/src/cloud-api-token-refresh.test.ts b/packages/runtime/src/cloud-api-token-refresh.test.ts new file mode 100644 index 0000000..bb23bce --- /dev/null +++ b/packages/runtime/src/cloud-api-token-refresh.test.ts @@ -0,0 +1,231 @@ +import test, { mock } from 'node:test'; +import assert from 'node:assert/strict'; +import { + ensureFreshCloudApiToken, + startCloudApiTokenRefresher, + CloudApiTokenHorizonError, + resetInFlightRefreshForTests, + type CloudApiTokenEnv +} from './cloud-api-token-refresh.js'; + +/** Let queued microtasks (fetch + .json()) settle after a mocked timer fires. */ +const flush = async () => { + await new Promise((r) => setImmediate(r)); + await new Promise((r) => setImmediate(r)); +}; + +const NOW = Date.parse('2026-06-19T00:00:00.000Z'); + +interface CapturedRequest { + url: string; + body: Record; +} + +function stubFetch( + response: { status?: number; payload?: unknown; rawBody?: string }, + captured: CapturedRequest[] = [] +): typeof fetch { + return (async (input: RequestInfo | URL, init?: RequestInit) => { + captured.push({ + url: String(input), + body: JSON.parse(String(init?.body ?? '{}')) as Record + }); + const status = response.status ?? 200; + const body = response.rawBody ?? JSON.stringify(response.payload ?? {}); + return new Response(body, { status, headers: { 'content-type': 'application/json' } }); + }) as typeof fetch; +} + +function freshPair(suffix: string) { + return { + accessToken: `access-${suffix}`, + refreshToken: `refresh-${suffix}`, + accessTokenExpiresAt: new Date(NOW + 2 * 60 * 60 * 1000).toISOString(), + refreshTokenExpiresAt: new Date(NOW + 30 * 24 * 60 * 60 * 1000).toISOString() + }; +} + +function refreshableEnv(overrides: Partial = {}): CloudApiTokenEnv { + return { + CLOUD_API_ACCESS_TOKEN: 'access-old', + CLOUD_API_TOKEN: 'access-old', + CLOUD_API_REFRESH_TOKEN: 'refresh-old', + CLOUD_API_ACCESS_TOKEN_EXPIRES_AT: new Date(NOW + 60 * 1000).toISOString(), // ~1min → within skew + CLOUD_API_REFRESH_URL: 'https://api.relayauth.dev/v1/tokens/refresh', + ...overrides + }; +} + +test('returns the current token untouched when it is comfortably fresh', async () => { + resetInFlightRefreshForTests(); + const captured: CapturedRequest[] = []; + const env = refreshableEnv({ + CLOUD_API_ACCESS_TOKEN_EXPIRES_AT: new Date(NOW + 60 * 60 * 1000).toISOString() // +1h + }); + const token = await ensureFreshCloudApiToken(env, { + now: NOW, + fetchImpl: stubFetch({ payload: freshPair('new') }, captured) + }); + assert.equal(token, 'access-old'); + assert.equal(captured.length, 0, 'no refresh call when fresh'); +}); + +test('refreshes within the skew window and rotates-and-persists all vars', async () => { + resetInFlightRefreshForTests(); + const captured: CapturedRequest[] = []; + const env = refreshableEnv(); + const token = await ensureFreshCloudApiToken(env, { + now: NOW, + fetchImpl: stubFetch({ payload: freshPair('new') }, captured) + }); + assert.equal(token, 'access-new'); + assert.equal(captured.length, 1); + assert.equal(captured[0]?.url, 'https://api.relayauth.dev/v1/tokens/refresh'); + assert.equal(captured[0]?.body.refreshToken, 'refresh-old'); + // both token vars carry the new access token; refresh token rotated + assert.equal(env.CLOUD_API_ACCESS_TOKEN, 'access-new'); + assert.equal(env.CLOUD_API_TOKEN, 'access-new'); + assert.equal(env.CLOUD_API_REFRESH_TOKEN, 'refresh-new'); + assert.equal(env.CLOUD_API_ACCESS_TOKEN_EXPIRES_AT, freshPair('new').accessTokenExpiresAt); +}); + +test('refreshes when the access token is already expired', async () => { + resetInFlightRefreshForTests(); + const captured: CapturedRequest[] = []; + const env = refreshableEnv({ + CLOUD_API_ACCESS_TOKEN_EXPIRES_AT: new Date(NOW - 60 * 1000).toISOString() // expired + }); + const token = await ensureFreshCloudApiToken(env, { + now: NOW, + fetchImpl: stubFetch({ payload: freshPair('new') }, captured) + }); + assert.equal(token, 'access-new'); + assert.equal(captured.length, 1); +}); + +test('returns the current token when no refresh material is present', async () => { + resetInFlightRefreshForTests(); + const captured: CapturedRequest[] = []; + const env = refreshableEnv({ CLOUD_API_REFRESH_TOKEN: '', CLOUD_API_REFRESH_URL: '' }); + const token = await ensureFreshCloudApiToken(env, { + now: NOW, + fetchImpl: stubFetch({ payload: freshPair('new') }, captured) + }); + assert.equal(token, 'access-old'); + assert.equal(captured.length, 0, 'cannot refresh without token+url'); +}); + +test('serializes concurrent callers into a single refresh (no double-use)', async () => { + resetInFlightRefreshForTests(); + const captured: CapturedRequest[] = []; + const env = refreshableEnv(); + const fetchImpl = stubFetch({ payload: freshPair('new') }, captured); + const [a, b, c] = await Promise.all([ + ensureFreshCloudApiToken(env, { now: NOW, fetchImpl }), + ensureFreshCloudApiToken(env, { now: NOW, fetchImpl }), + ensureFreshCloudApiToken(env, { now: NOW, fetchImpl }) + ]); + assert.equal(a, 'access-new'); + assert.equal(b, 'access-new'); + assert.equal(c, 'access-new'); + assert.equal(captured.length, 1, 'single rotation shared across concurrent callers'); +}); + +test('throws CloudApiTokenHorizonError on 401 (horizon elapsed)', async () => { + resetInFlightRefreshForTests(); + const env = refreshableEnv(); + await assert.rejects( + ensureFreshCloudApiToken(env, { + now: NOW, + fetchImpl: stubFetch({ status: 401, rawBody: 'refresh token revoked' }) + }), + (err: unknown) => err instanceof CloudApiTokenHorizonError + ); + // env left unchanged on failure + assert.equal(env.CLOUD_API_ACCESS_TOKEN, 'access-old'); +}); + +test('throws on a non-401 refresh failure', async () => { + resetInFlightRefreshForTests(); + const env = refreshableEnv(); + await assert.rejects( + ensureFreshCloudApiToken(env, { + now: NOW, + fetchImpl: stubFetch({ status: 500, rawBody: 'boom' }) + }), + (err: unknown) => err instanceof Error && !(err instanceof CloudApiTokenHorizonError) + ); +}); + +test('background refresher rotates the token on a tick within the skew window', async () => { + resetInFlightRefreshForTests(); + mock.timers.enable({ apis: ['setTimeout'] }); + try { + const captured: CapturedRequest[] = []; + const env = refreshableEnv(); + const handle = startCloudApiTokenRefresher({ + env, + intervalMs: 1000, + now: NOW, + fetchImpl: stubFetch({ payload: freshPair('loop') }, captured) + }); + mock.timers.tick(1000); + await flush(); + handle.stop(); + assert.equal(env.CLOUD_API_ACCESS_TOKEN, 'access-loop'); + assert.equal(env.CLOUD_API_TOKEN, 'access-loop'); + assert.equal(env.CLOUD_API_REFRESH_TOKEN, 'refresh-loop'); + assert.equal(captured.length, 1); + } finally { + mock.timers.reset(); + } +}); + +test('stop() halts the refresher before any tick fires', async () => { + resetInFlightRefreshForTests(); + mock.timers.enable({ apis: ['setTimeout'] }); + try { + const captured: CapturedRequest[] = []; + const env = refreshableEnv(); + const handle = startCloudApiTokenRefresher({ + env, + intervalMs: 1000, + now: NOW, + fetchImpl: stubFetch({ payload: freshPair('x') }, captured) + }); + handle.stop(); + mock.timers.tick(5000); + await flush(); + assert.equal(captured.length, 0, 'no refresh after stop()'); + } finally { + mock.timers.reset(); + } +}); + +test('refresher stops and signals onHorizonElapsed on a 401', async () => { + resetInFlightRefreshForTests(); + mock.timers.enable({ apis: ['setTimeout'] }); + try { + const captured: CapturedRequest[] = []; + const env = refreshableEnv(); + let horizonErr: unknown = null; + startCloudApiTokenRefresher({ + env, + intervalMs: 1000, + now: NOW, + fetchImpl: stubFetch({ status: 401, rawBody: 'revoked' }, captured), + onHorizonElapsed: (err) => { + horizonErr = err; + } + }); + mock.timers.tick(1000); + await flush(); + assert.ok(horizonErr instanceof CloudApiTokenHorizonError, 'horizon callback fired'); + // loop stopped → a later tick triggers no further refresh attempt + mock.timers.tick(5000); + await flush(); + assert.equal(captured.length, 1, 'single attempt, then halted'); + } finally { + mock.timers.reset(); + } +}); diff --git a/packages/runtime/src/cloud-api-token-refresh.ts b/packages/runtime/src/cloud-api-token-refresh.ts new file mode 100644 index 0000000..09450b2 --- /dev/null +++ b/packages/runtime/src/cloud-api-token-refresh.ts @@ -0,0 +1,235 @@ +// Keeps the proactive sandbox's cloud API bearer fresh. +// +// The sandbox's `CLOUD_API_ACCESS_TOKEN` is a relayfile-audience JWT minted +// once at launch with a short TTL (≈2h) and no in-sandbox refresh. A +// long-running proactive agent that posts to Slack after the TTL lapses sends +// an expired-but-validly-signed bearer, the cloud verifier returns null, and +// the request 401s wholesale (cloud#2307). +// +// relayauth's refresh contract is single-use rotating: presenting an already +// rotated refresh token cascade-revokes the whole session. So refresh here is +// SERIALIZED (single in-flight) and ROTATES-AND-PERSISTS every CLOUD_API_* env +// var in place. We persist both `CLOUD_API_ACCESS_TOKEN` (read by +// `ctx.cloudApi.token`) and `CLOUD_API_TOKEN` (the relayflows Slack adapter's +// env fallback, adapter.ts:31) so neither consumer path goes stale. +// +// Because the access token is re-minted via the derived `/v1/tokens/agent` +// pair endpoint, the refreshed token keeps `aud:["relayfile"]` (the audience +// is persisted in token meta and replayed on rotation) — a plain `/v1/tokens` +// mint would silently downgrade the audience on refresh. + +/** Refresh once the access token is within this window of expiry (or past it). */ +const DEFAULT_REFRESH_SKEW_MS = 5 * 60 * 1000; + +/** Env vars rotated together. `CLOUD_API_REFRESH_URL` points at relayauth's + * `/v1/tokens/refresh`, NOT the cloud API. */ +export interface CloudApiTokenEnv { + CLOUD_API_ACCESS_TOKEN?: string; + CLOUD_API_TOKEN?: string; + CLOUD_API_REFRESH_TOKEN?: string; + CLOUD_API_ACCESS_TOKEN_EXPIRES_AT?: string; + CLOUD_API_REFRESH_URL?: string; +} + +interface RefreshTokenResponse { + accessToken: string; + refreshToken: string; + accessTokenExpiresAt: string; + refreshTokenExpiresAt?: string; +} + +/** Thrown when relayauth rejects the refresh token (401) — the delegation + * horizon has elapsed or the session was revoked. The caller must re-mint + * (e.g. recycle the sandbox); retrying the refresh will not recover. */ +export class CloudApiTokenHorizonError extends Error { + constructor(message: string) { + super(message); + this.name = 'CloudApiTokenHorizonError'; + } +} + +export interface EnsureFreshOptions { + /** Override the clock (tests). */ + now?: number; + /** Override the pre-expiry refresh window. */ + skewMs?: number; + /** Override fetch (tests). */ + fetchImpl?: typeof fetch; +} + +// Single in-flight refresh shared across concurrent callers in this process. +// The sandbox holds one cloud API identity, so a process-wide guard is correct: +// concurrent posters await the SAME rotation rather than each spending the +// single-use refresh token (which would cascade-revoke the session). +let inFlightRefresh: Promise | null = null; + +function trimmed(value: string | undefined): string { + return value?.trim() ?? ''; +} + +/** + * Returns a non-expired cloud API access token, refreshing in place if the + * current one is within `skewMs` of expiry. Persists the rotated token pair + * back onto `env` so subsequently-read consumers (`ctx.cloudApi`, the + * relayflows env fallback) see the fresh value. + * + * Returns the current token unchanged when it is still fresh, or when no + * refresh material is present (the caller then handles any 401 itself). + * Throws {@link CloudApiTokenHorizonError} when refresh is rejected at the + * delegation horizon. + */ +export async function ensureFreshCloudApiToken( + env: CloudApiTokenEnv = process.env as CloudApiTokenEnv, + opts: EnsureFreshOptions = {} +): Promise { + const now = opts.now ?? Date.now(); + const skewMs = opts.skewMs ?? DEFAULT_REFRESH_SKEW_MS; + const accessToken = trimmed(env.CLOUD_API_ACCESS_TOKEN); + const expiresAt = trimmed(env.CLOUD_API_ACCESS_TOKEN_EXPIRES_AT); + + // Still comfortably valid → leave everything untouched. + if (accessToken && expiresAt) { + const expiresMs = Date.parse(expiresAt); + if (Number.isFinite(expiresMs) && expiresMs - now > skewMs) { + return accessToken; + } + } + + const refreshToken = trimmed(env.CLOUD_API_REFRESH_TOKEN); + const refreshUrl = trimmed(env.CLOUD_API_REFRESH_URL); + if (!refreshToken || !refreshUrl) { + // No way to refresh in-sandbox; hand back whatever we have. + return accessToken; + } + + if (!inFlightRefresh) { + const fetchImpl = opts.fetchImpl ?? fetch; + inFlightRefresh = refreshAndPersist(env, refreshUrl, refreshToken, fetchImpl).finally(() => { + inFlightRefresh = null; + }); + } + return inFlightRefresh; +} + +async function refreshAndPersist( + env: CloudApiTokenEnv, + refreshUrl: string, + refreshToken: string, + fetchImpl: typeof fetch +): Promise { + const response = await fetchImpl(refreshUrl, { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ refreshToken }) + }); + + if (response.status === 401) { + const body = await response.text().catch(() => ''); + throw new CloudApiTokenHorizonError( + `cloud API token refresh rejected (401) — delegation horizon elapsed or session revoked; re-mint required${body ? `: ${body}` : ''}` + ); + } + if (!response.ok) { + const body = await response.text().catch(() => ''); + throw new Error(`cloud API token refresh failed: ${response.status}${body ? ` ${body}` : ''}`); + } + + const pair = (await response.json()) as RefreshTokenResponse; + const nextAccess = trimmed(pair.accessToken); + const nextRefresh = trimmed(pair.refreshToken); + if (!nextAccess || !nextRefresh) { + throw new Error('cloud API token refresh response missing accessToken/refreshToken'); + } + + // Rotate-and-persist. Both token vars carry the same value so the + // `ctx.cloudApi.token` reader and the relayflows env fallback stay in sync. + env.CLOUD_API_ACCESS_TOKEN = nextAccess; + env.CLOUD_API_TOKEN = nextAccess; + env.CLOUD_API_REFRESH_TOKEN = nextRefresh; + env.CLOUD_API_ACCESS_TOKEN_EXPIRES_AT = trimmed(pair.accessTokenExpiresAt); + + return nextAccess; +} + +/** Default background tick. Must be well under the refresh skew so no Slack + * post lands in a gap where the token has expired but no tick has fired. */ +const DEFAULT_REFRESHER_INTERVAL_MS = 60 * 1000; + +export interface RefresherHandle { + stop(): void; +} + +export interface StartRefresherOptions extends EnsureFreshOptions { + /** Env bag to keep fresh. Defaults to `process.env`. */ + env?: CloudApiTokenEnv; + /** Tick interval; default 60s (< the 5-min skew). */ + intervalMs?: number; + /** Transient refresh failure (network/5xx). The loop keeps ticking. */ + onError?(err: unknown): void; + /** Delegation horizon elapsed (refresh 401). The loop STOPS — only a + * re-mint (new sandbox token) can recover, which is out of band. */ + onHorizonElapsed?(err: CloudApiTokenHorizonError): void; +} + +/** + * Starts a background loop that keeps the cloud API token fresh in `env`. + * + * Each tick is a no-op until the access token is within the refresh skew of + * expiry, then it rotates-and-persists in place. Because relayflows rebuilds + * its Slack adapter per step and reads `env.CLOUD_API_TOKEN` at that point + * (adapter.ts:31), keeping `process.env` fresh in the same process means every + * per-step client picks up a live token — no per-call hook needed. + * + * NOTE: only effective when the relayflows SDK runs in THIS process (shared + * `process.env`). If the SDK runs in a subprocess, the refresher must run there + * instead. + * + * Ticks are scheduled with `setTimeout` (not `setInterval`) so a slow refresh + * never overlaps the next tick, and the timer is `unref`'d so it never keeps + * the process alive. + */ +export function startCloudApiTokenRefresher(opts: StartRefresherOptions = {}): RefresherHandle { + const env = opts.env ?? (process.env as CloudApiTokenEnv); + const intervalMs = opts.intervalMs ?? DEFAULT_REFRESHER_INTERVAL_MS; + let stopped = false; + let timer: ReturnType | null = null; + + function stop(): void { + stopped = true; + if (timer) { + clearTimeout(timer); + timer = null; + } + } + + function schedule(): void { + if (stopped) return; + timer = setTimeout(() => { + void tick(); + }, intervalMs); + (timer as unknown as { unref?: () => void }).unref?.(); + } + + async function tick(): Promise { + if (stopped) return; + try { + await ensureFreshCloudApiToken(env, opts); + } catch (err) { + if (err instanceof CloudApiTokenHorizonError) { + opts.onHorizonElapsed?.(err); + stop(); + return; + } + opts.onError?.(err); + } + schedule(); + } + + schedule(); + return { stop }; +} + +/** Test hook: reset the process-wide in-flight guard between cases. */ +export function resetInFlightRefreshForTests(): void { + inFlightRefresh = null; +} diff --git a/packages/runtime/src/runner.ts b/packages/runtime/src/runner.ts index 995c315..288d9ec 100644 --- a/packages/runtime/src/runner.ts +++ b/packages/runtime/src/runner.ts @@ -3,6 +3,7 @@ import { createCloudRuntimeDefaults } from './cloud-defaults.js'; import { buildCtx, type CtxBuildOptions } from './ctx.js'; import { getTrajectoryRecorder, type TrajectoryRecorder } from './trajectory.js'; import { isWorkforceHandler } from './handler.js'; +import { startCloudApiTokenRefresher } from './cloud-api-token-refresh.js'; import { type RawGatewayEnvelope } from './shim.js'; import { envelopeToAgentEvent } from './to-agent-event.js'; import { isCronTickEvent } from '@agent-relay/events'; @@ -141,18 +142,38 @@ export async function startRunner(options: StartRunnerOptions): Promise { integrations: options.persona.integrations ? Object.keys(options.persona.integrations) : [] }); - const recorder = getTrajectoryRecorder(ctx); - const stream = options.envelopes ?? readEnvelopesFromStdin(); - for await (const raw of stream) { - const event = envelopeToAgentEvent(raw); - if (!event) { - ctx.log('warn', 'runner.envelope.unsupported', { rawId: raw.id, rawType: raw.type }); - continue; + // Keep the cloud API bearer fresh for the lifetime of this runner subprocess. + // The persona's Slack client (relayflows) is rebuilt per step in THIS process + // and reads `process.env.CLOUD_API_TOKEN`, so rotating it in place here keeps + // long-running proactive posts authenticated (cloud#2307). No-op when the + // refresh material is absent (local/dev), so guard on the refresh URL. + const tokenRefresher = process.env.CLOUD_API_REFRESH_URL + ? startCloudApiTokenRefresher({ + onHorizonElapsed: (err) => + ctx.log('warn', 'runner.cloud-api-token.horizon-elapsed', { error: err.message }), + onError: (err) => + ctx.log('warn', 'runner.cloud-api-token.refresh-failed', { + error: err instanceof Error ? err.message : String(err) + }) + }) + : null; + + try { + const recorder = getTrajectoryRecorder(ctx); + const stream = options.envelopes ?? readEnvelopesFromStdin(); + for await (const raw of stream) { + const event = envelopeToAgentEvent(raw); + if (!event) { + ctx.log('warn', 'runner.envelope.unsupported', { rawId: raw.id, rawType: raw.type }); + continue; + } + await dispatch(ctx, handlerFn, event, recorder); } - await dispatch(ctx, handlerFn, event, recorder); - } - ctx.log('info', 'runner.envelope-stream.ended', { persona: options.persona.id }); + ctx.log('info', 'runner.envelope-stream.ended', { persona: options.persona.id }); + } finally { + tokenRefresher?.stop(); + } } function defaultRunnerLog(level: 'debug' | 'info' | 'warn' | 'error', message: string, attrs?: Record): void {