From d9055eff36745269ef74f434197f5ffaabcd61d3 Mon Sep 17 00:00:00 2001 From: Ricky Schema Cascade Date: Tue, 23 Jun 2026 16:07:54 +0200 Subject: [PATCH 1/5] =?UTF-8?q?feat(delivery):=20add=20@agentworkforce/del?= =?UTF-8?q?ivery=20=E2=80=94=20unified=20multi-target=20messaging?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Provides a configuration-driven delivery client that auto-discovers configured transports (Slack, Telegram) from persona inputs and sends to all of them through a single API. Supports non-blocking parentRef threading (zero receipt round-trips, cloud-side ordering) for the header+threaded-body pattern from x-reply-radar. Exports shared helpers (input, list, withTimeout, fetchWithTimeout) that were previously copy-pasted across every agent. - createDelivery(ctx) — auto-detects SLACK_CHANNEL/TELEGRAM_CHAT - delivery.send(text, opts?) — blocking mode (waits for receipt) - delivery.publish(text) — non-blocking parentRef mode - delivery.send(text, { replyTo, nonBlocking }) — threaded non-blocking --- packages/delivery/package.json | 38 +++++ packages/delivery/src/delivery.ts | 231 ++++++++++++++++++++++++++++++ packages/delivery/src/helpers.ts | 60 ++++++++ packages/delivery/src/index.ts | 15 ++ packages/delivery/src/types.ts | 110 ++++++++++++++ packages/delivery/tsconfig.json | 8 ++ pnpm-lock.yaml | 45 ++++++ 7 files changed, 507 insertions(+) create mode 100644 packages/delivery/package.json create mode 100644 packages/delivery/src/delivery.ts create mode 100644 packages/delivery/src/helpers.ts create mode 100644 packages/delivery/src/index.ts create mode 100644 packages/delivery/src/types.ts create mode 100644 packages/delivery/tsconfig.json diff --git a/packages/delivery/package.json b/packages/delivery/package.json new file mode 100644 index 00000000..01b88780 --- /dev/null +++ b/packages/delivery/package.json @@ -0,0 +1,38 @@ +{ + "name": "@agentworkforce/delivery", + "version": "0.1.0", + "private": false, + "type": "module", + "main": "dist/index.js", + "types": "dist/index.d.ts", + "exports": { + ".": { + "types": "./dist/index.d.ts", + "default": "./dist/index.js" + }, + "./package.json": "./package.json" + }, + "files": [ + "dist", + "package.json" + ], + "repository": { + "type": "git", + "url": "https://github.com/AgentWorkforce/workforce", + "directory": "packages/delivery" + }, + "publishConfig": { + "access": "public" + }, + "scripts": { + "build": "tsc -p tsconfig.json", + "dev": "tsc -p tsconfig.json --watch --preserveWatchOutput", + "typecheck": "tsc -p tsconfig.json --noEmit", + "test": "tsc -p tsconfig.json && node --test dist/**/*.test.js dist/*.test.js", + "lint": "tsc -p tsconfig.json --noEmit" + }, + "dependencies": { + "@agentworkforce/runtime": "^4.0.0", + "@relayfile/relay-helpers": "^0.4.2" + } +} diff --git a/packages/delivery/src/delivery.ts b/packages/delivery/src/delivery.ts new file mode 100644 index 00000000..751acf52 --- /dev/null +++ b/packages/delivery/src/delivery.ts @@ -0,0 +1,231 @@ +import { slackClient, telegramClient } from '@relayfile/relay-helpers'; +import type { SlackClient, TelegramClient } from '@relayfile/relay-helpers'; +import type { WorkforceCtx } from '@agentworkforce/runtime'; +import { + resolveDeliveryTargets, + slackChannel, + telegramChat, + type DeliveryClient, + type DeliveryOptions, + type DeliveryResult, + type DeliveryTransports, + type SlackRef, + type TelegramRef +} from './types.js'; + +const WRITEBACK_TIMEOUT_MS = 15_000; + +/** + * Create a delivery client that auto-discovers configured transports from + * the persona context and sends to all of them. + * + * Blocking mode (the default): + * const heads = await delivery.send(header); + * await delivery.send(body, { replyTo: heads }); + * + * Non-blocking parentRef mode (zero receipt round-trips): + * const heads = await delivery.publish(header); + * await delivery.send(body, { replyTo: heads, nonBlocking: true }); + */ +export function createDelivery( + ctx: WorkforceCtx, + transports?: DeliveryTransports +): DeliveryClient { + const targets = resolveDeliveryTargets(ctx); + + // Two sets of clients: blocking (waits for receipt) and non-blocking + // (0ms timeout, returns draft refs immediately for parentRef threading). + const slackBlocking = transports?.slack ?? (targets.includes('slack') + ? slackClient({ writebackTimeoutMs: WRITEBACK_TIMEOUT_MS }) + : undefined); + const slackNonBlocking = targets.includes('slack') + ? slackClient({ writebackTimeoutMs: 0 }) + : undefined; + const telegramBlocking = transports?.telegram ?? (targets.includes('telegram') + ? telegramClient({ writebackTimeoutMs: WRITEBACK_TIMEOUT_MS }) + : undefined); + const telegramNonBlocking = targets.includes('telegram') + ? telegramClient({ writebackTimeoutMs: 0 }) + : undefined; + + return new DeliveryClientImpl(ctx, targets, { + slackBlocking, + slackNonBlocking, + telegramBlocking, + telegramNonBlocking + }); +} + +interface DeliveryTransportsInternal { + slackBlocking?: SlackClient; + slackNonBlocking?: SlackClient; + telegramBlocking?: TelegramClient; + telegramNonBlocking?: TelegramClient; +} + +class DeliveryClientImpl implements DeliveryClient { + readonly targets: ReadonlyArray<'slack' | 'telegram'>; + + private ctx: WorkforceCtx; + private t: DeliveryTransportsInternal; + + constructor( + ctx: WorkforceCtx, + targets: Array<'slack' | 'telegram'>, + transports: DeliveryTransportsInternal + ) { + this.ctx = ctx; + this.targets = targets; + this.t = transports; + } + + async send(text: string, opts?: DeliveryOptions): Promise { + const nonBlocking = opts?.nonBlocking === true; + const refs: Array = []; + const errors: string[] = []; + + const tasks: Promise[] = []; + + for (const target of this.targets) { + const parentRef = opts?.replyTo?.refs.find((r) => r.provider === target); + if (target === 'slack') { + tasks.push( + this.sendSlack(text, parentRef as SlackRef | undefined, nonBlocking) + .then((ref) => { if (ref) refs.push(ref); }) + .catch((err) => { errors.push(`slack: ${String(err)}`); }) + ); + } + if (target === 'telegram') { + tasks.push( + this.sendTelegram(text, parentRef as TelegramRef | undefined, nonBlocking) + .then((ref) => { if (ref) refs.push(ref); }) + .catch((err) => { errors.push(`telegram: ${String(err)}`); }) + ); + } + } + + await Promise.all(tasks); + + // In non-blocking mode, draft refs always succeed (no receipt wait to fail). + // Treat any ref as success. + const ok = nonBlocking + ? refs.length > 0 + : errors.length === 0 && refs.length === this.targets.length; + + if (!ok && errors.length > 0) { + this.ctx.log?.('warn', 'delivery.partial-failure', { errors, nonBlocking }); + } + if (!ok && refs.length === 0) { + throw new Error(`Delivery failed to all targets: ${errors.join('; ')}`); + } + + return { ok, refs }; + } + + async publish(text: string): Promise { + return this.send(text, { nonBlocking: true }); + } + + // ── Slack ────────────────────────────────────────────────────────────── + + private async sendSlack( + text: string, + parentRef: SlackRef | undefined, + nonBlocking: boolean + ): Promise { + const channel = slackChannel(this.ctx); + if (!channel) return null; + + if (nonBlocking) { + return this.sendSlackNonBlocking(channel, text, parentRef); + } + return this.sendSlackBlocking(channel, text, parentRef); + } + + private async sendSlackBlocking( + channel: string, + text: string, + parentRef?: SlackRef + ): Promise { + const client = this.t.slackBlocking; + if (!client) return null; + + const result = parentRef?.draftRef + ? await client.post(channel, text, { replyTo: parentRef.draftRef }) + : await client.post(channel, text); + + if (!result.ts) { + this.ctx.log?.('warn', 'delivery.slack.no-receipt', { channel }); + return null; + } + + return { + provider: 'slack', + channel: result.channel, + ts: result.ts, + draftRef: result.ref + }; + } + + /** + * Non-blocking Slack: uses messages.write() directly with writebackTimeoutMs:0. + * The parentRef is embedded in the message body so the cloud orders the message + * under the parent server-side — zero receipt round-trips. The returned draftRef + * is the relay path, usable as a parent for subsequent threaded sends. + * + * Mirrors the x-reply-radar parentRef threading pattern (internal-agents). + */ + private async sendSlackNonBlocking( + channel: string, + text: string, + parentRef?: SlackRef + ): Promise { + const client = this.t.slackNonBlocking; + if (!client) return null; + + const body: Record = { text }; + if (parentRef?.draftRef) { + // Embed parentRef in the body — the cloud lifts it from the streamed head + // and orders this message under the parent once the parent delivers. + body.parentRef = parentRef.draftRef; + } + + const result = await client.messages.write({ channelId: channel }, body); + + return { + provider: 'slack', + channel, + ts: '', // Not available yet (non-blocking) + draftRef: result.path + }; + } + + // ── Telegram ─────────────────────────────────────────────────────────── + + private async sendTelegram( + text: string, + parentRef: TelegramRef | undefined, + nonBlocking: boolean + ): Promise { + const chatId = telegramChat(this.ctx); + if (!chatId) return null; + + const client = nonBlocking ? this.t.telegramNonBlocking : this.t.telegramBlocking; + if (!client) return null; + + const result = parentRef?.messageId + ? await client.sendMessage(chatId, text, { replyToMessageId: Number(parentRef.messageId) || undefined }) + : await client.sendMessage(chatId, text); + + if (!nonBlocking && !result.ok) { + this.ctx.log?.('warn', 'delivery.telegram.no-receipt', { chatId }); + return null; + } + + return { + provider: 'telegram', + chatId: String(result.chatId), + messageId: result.ok ? result.messageId : '' + }; + } +} diff --git a/packages/delivery/src/helpers.ts b/packages/delivery/src/helpers.ts new file mode 100644 index 00000000..a9ea7a2e --- /dev/null +++ b/packages/delivery/src/helpers.ts @@ -0,0 +1,60 @@ +import type { WorkforceCtx } from '@agentworkforce/runtime'; + +/** + * Resolve a persona input value from the runtime ctx. + * + * Resolution order (mirrors persona-kit): agent-provided value → + * process env (`spec.env` or the input key) → spec default. + */ +export function input(ctx: WorkforceCtx, name: string): string | undefined { + const spec = ctx.persona.inputSpecs?.[name]; + const v = process.env[spec?.env ?? name] ?? ctx.persona.inputs?.[name] ?? spec?.default; + return v && String(v).trim() ? String(v).trim() : undefined; +} + +/** + * Split a comma-separated string into trimmed, non-empty entries. + */ +export function list(raw: string | undefined): string[] { + return (raw ?? '').split(',').map((s) => s.trim()).filter(Boolean); +} + +/** + * Race a promise against a timeout. On timeout the timer rejects with an + * Error so the caller can catch and fall back. Always clears the timer. + */ +export async function withTimeout( + p: Promise, + ms: number, + label: string +): Promise { + let timer: ReturnType; + const timeout = new Promise((_, rej) => { + timer = setTimeout(() => rej(new Error(`${label} timed out after ${ms}ms`)), ms); + }); + try { + return await Promise.race([p, timeout]); + } finally { + clearTimeout(timer!); + } +} + +/** + * Fetch with an AbortController timeout. Returns the response on success, + * or undefined on timeout/network error (never throws). + */ +export async function fetchWithTimeout( + url: string, + init: RequestInit = {}, + timeoutMs: number = 8_000 +): Promise { + const controller = new AbortController(); + const timer = setTimeout(() => controller.abort(), timeoutMs); + try { + return await fetch(url, { ...init, signal: controller.signal }); + } catch { + return undefined; + } finally { + clearTimeout(timer); + } +} diff --git a/packages/delivery/src/index.ts b/packages/delivery/src/index.ts new file mode 100644 index 00000000..3836b3cb --- /dev/null +++ b/packages/delivery/src/index.ts @@ -0,0 +1,15 @@ +export { createDelivery } from './delivery.js'; +export { + resolveDeliveryTargets, + slackChannel, + telegramChat, + type DeliveryClient, + type DeliveryOptions, + type DeliveryResult, + type DeliveryTransports, + type MessageRef, + type SlackRef, + type TelegramRef +} from './types.js'; + +export { input, list, withTimeout, fetchWithTimeout } from './helpers.js'; diff --git a/packages/delivery/src/types.ts b/packages/delivery/src/types.ts new file mode 100644 index 00000000..a21c3908 --- /dev/null +++ b/packages/delivery/src/types.ts @@ -0,0 +1,110 @@ +import type { WorkforceCtx } from '@agentworkforce/runtime'; +import type { SlackClient } from '@relayfile/relay-helpers'; +import type { TelegramClient } from '@relayfile/relay-helpers'; + +// ── message reference (returned by send, accepted by reply) ────────────── + +export interface SlackRef { + provider: 'slack'; + channel: string; + /** Delivered message ts (set after the writeback receipt arrives). */ + ts: string; + /** Draft ref for cloud-side replyTo threading. */ + draftRef: string; +} + +export interface TelegramRef { + provider: 'telegram'; + chatId: string; + /** Delivered message id (set after the writeback receipt arrives). */ + messageId: string; +} + +export type MessageRef = SlackRef | TelegramRef; + +// ── delivery result ───────────────────────────────────────────────────── + +export interface DeliveryResult { + ok: boolean; + refs: MessageRef[]; +} + +// ── options ────────────────────────────────────────────────────────────── + +export interface DeliveryOptions { + /** Thread the message under a prior delivery result. */ + replyTo?: DeliveryResult; + /** + * When true, don't wait for the writeback receipt. Returns draft refs + * immediately and relies on the cloud's server-side ordering for + * threading (Slack parentRef pattern, Telegram sendMessage with 0ms + * timeout). The returned refs have empty ts/messageId but valid + * draftRef for use as a parent in subsequent threaded sends. + * + * Use this for the header in a header+threaded-body pattern so the + * digest never blocks on a receipt — the cloud orders the threaded + * body under the header server-side. + */ + nonBlocking?: boolean; +} + +// ── injectable transport seam (for tests) ──────────────────────────────── + +export interface DeliveryTransports { + slack?: SlackClient; + telegram?: TelegramClient; +} + +// ── delivery client ────────────────────────────────────────────────────── + +export interface DeliveryClient { + /** + * Send a message to all configured targets. + * + * When `opts.replyTo` is set the message is threaded under the targets + * from that prior `DeliveryResult`, each using its transport's native + * threading mechanism. + * + * In blocking mode (default): waits for the writeback receipt and returns + * the delivered ts/messageId. In non-blocking mode (`opts.nonBlocking: true`): + * returns draft refs immediately with the relay path as draftRef — zero + * receipt round-trips, cloud-side server ordering handles threading. + */ + send(text: string, opts?: DeliveryOptions): Promise; + + /** + * Convenience: same as `send(text, { nonBlocking: true })`. + * Publish a message without waiting for a receipt. Returns draft refs + * immediately for use as a threading parent. + */ + publish(text: string): Promise; + + /** Which providers are configured. */ + readonly targets: ReadonlyArray<'slack' | 'telegram'>; +} + +// ── configuration discovery ────────────────────────────────────────────── + +/** + * Resolve which transport targets are configured for the given persona ctx. + * Checks ctx.persona.inputs for SLACK_CHANNEL and TELEGRAM_CHAT. + */ +export function resolveDeliveryTargets(ctx: WorkforceCtx): Array<'slack' | 'telegram'> { + const inputs = ctx.persona.inputs ?? {}; + const targets: Array<'slack' | 'telegram'> = []; + if (inputs['SLACK_CHANNEL']?.trim()) targets.push('slack'); + if (inputs['TELEGRAM_CHAT']?.trim()) targets.push('telegram'); + return targets; +} + +/** Get the configured slack channel id (bare, without `__name` suffix). */ +export function slackChannel(ctx: WorkforceCtx): string | undefined { + const raw = ctx.persona.inputs?.['SLACK_CHANNEL']; + return raw?.split('__')[0]?.trim() || undefined; +} + +/** Get the configured telegram chat id (bare, without `__title` suffix). */ +export function telegramChat(ctx: WorkforceCtx): string | undefined { + const raw = ctx.persona.inputs?.['TELEGRAM_CHAT']; + return raw?.split('__')[0]?.trim() || undefined; +} diff --git a/packages/delivery/tsconfig.json b/packages/delivery/tsconfig.json new file mode 100644 index 00000000..df59da57 --- /dev/null +++ b/packages/delivery/tsconfig.json @@ -0,0 +1,8 @@ +{ + "extends": "../../tsconfig.base.json", + "compilerOptions": { + "rootDir": "src", + "outDir": "dist" + }, + "include": ["src/**/*.ts"] +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index f371df5e..3d3ac4d2 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -68,6 +68,15 @@ importers: specifier: ^0.185.0 version: 0.185.0 + packages/delivery: + dependencies: + '@agentworkforce/runtime': + specifier: workspace:* + version: link:../runtime + '@relayfile/relay-helpers': + specifier: ^0.4.2 + version: 0.4.2(@relayfile/sdk@0.7.40) + packages/deploy: dependencies: '@agent-relay/cloud': @@ -961,6 +970,19 @@ packages: peerDependencies: '@relayfile/sdk': '>=0.6.0 <1' + '@relayfile/adapter-core@0.4.3': + resolution: {integrity: sha512-VoGco3xYab+Ij9v2wSi6qsYg15gDfYWlZsdbiKrqM/YN+t0IhPS8Shy5dRA18kYCBKEr7bH7QdFfa8PCXj6LFg==} + engines: {node: '>=18'} + hasBin: true + peerDependencies: + '@relayfile/sdk': '>=0.6.0 <1' + + '@relayfile/adapter-linear@0.4.2': + resolution: {integrity: sha512-Yg0hv/RqEB5WJ4RA4LX9JinCxxT/01GSdd+0HeT0L9P36EX27kEEuzWmNwWEpjDHQMcYykEocruEJHuV4XXGCQ==} + engines: {node: '>=18'} + peerDependencies: + '@relayfile/sdk': '>=0.6.0 <1' + '@relayfile/core@0.7.40': resolution: {integrity: sha512-vY48SxZgahnvE0CHDyy/iny17ypnfbX5myVPtocZVNpMiz4dS1iabO70WR+uYVgSiIZR7MpKTPFSs3SxEjQWag==} engines: {node: '>=18'} @@ -969,6 +991,9 @@ packages: resolution: {integrity: sha512-h2ac/frfSFgjrKDSBCkmMHjphx19XZrHrSjaI29wlTzSblfiE65MQST47lS6Fm1+K9yap+W+msUi8JRIYYWFEw==} engines: {node: '>=18'} + '@relayfile/relay-helpers@0.4.2': + resolution: {integrity: sha512-/SnZW5vEtNBJkYslLOuw8JIMU1edYmY9AXEJF52k4H3Q1Z/I3GPAShXCbQmCry07Q42CM/+1SEk1mLzDaU5cVw==} + '@relayfile/sdk@0.7.40': resolution: {integrity: sha512-9qPQ/qSexD11x/CKZlbZUFC7KDdxJ2yYBQd1nuzcoHNqnvErplq5bDBy5Dqr1AXID9i1Q7sMxnY8M1raMTQsAg==} engines: {node: '>=18'} @@ -3283,6 +3308,19 @@ snapshots: minimatch: 10.2.5 yaml: 2.9.0 + '@relayfile/adapter-core@0.4.3(@relayfile/sdk@0.7.40)': + dependencies: + '@relayfile/sdk': 0.7.40 + '@scalar/postman-to-openapi': 0.6.3 + cheerio: 1.2.0 + minimatch: 10.2.5 + yaml: 2.9.0 + + '@relayfile/adapter-linear@0.4.2(@relayfile/sdk@0.7.40)': + dependencies: + '@relayfile/adapter-core': 0.4.3(@relayfile/sdk@0.7.40) + '@relayfile/sdk': 0.7.40 + '@relayfile/core@0.7.40': {} '@relayfile/local-mount@0.7.24': @@ -3290,6 +3328,13 @@ snapshots: '@parcel/watcher': 2.5.6 ignore: 7.0.5 + '@relayfile/relay-helpers@0.4.2(@relayfile/sdk@0.7.40)': + dependencies: + '@relayfile/adapter-core': 0.4.3(@relayfile/sdk@0.7.40) + '@relayfile/adapter-linear': 0.4.2(@relayfile/sdk@0.7.40) + transitivePeerDependencies: + - '@relayfile/sdk' + '@relayfile/sdk@0.7.40': dependencies: '@relayfile/core': 0.7.40 From 84a4c9201a8803a8241352586758f7714adc57dc Mon Sep 17 00:00:00 2001 From: Ricky Schema Cascade Date: Tue, 23 Jun 2026 19:02:02 +0200 Subject: [PATCH 2/5] =?UTF-8?q?fix(delivery):=20address=20PR=20feedback=20?= =?UTF-8?q?=E2=80=94=20input=20precedence,=20transport=20injection?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Fix input() precedence: ctx.persona.inputs before process.env (agent value > ctx resolved > env > default). PR feedback caught that the original put process.env before ctx.persona.inputs. - Fix non-blocking transports to respect injected mocks. Previously slackNonBlocking/telegramNonBlocking always constructed real clients; now they check transports?.slack/telegram first (same as blocking). - Fix resolveDeliveryTargets/slackChannel/telegramChat to use input() helper for proper resolution order instead of direct ctx.persona.inputs access. - Fix fetchWithTimeout to preserve caller-provided abort signal by racing our abort timer against the caller's signal. --- packages/delivery/src/delivery.ts | 30 +++++++++++++++--------- packages/delivery/src/helpers.ts | 39 ++++++++++++++++++++++++++----- packages/delivery/src/types.ts | 24 ++++++++++++------- 3 files changed, 68 insertions(+), 25 deletions(-) diff --git a/packages/delivery/src/delivery.ts b/packages/delivery/src/delivery.ts index 751acf52..c764a700 100644 --- a/packages/delivery/src/delivery.ts +++ b/packages/delivery/src/delivery.ts @@ -26,6 +26,10 @@ const WRITEBACK_TIMEOUT_MS = 15_000; * Non-blocking parentRef mode (zero receipt round-trips): * const heads = await delivery.publish(header); * await delivery.send(body, { replyTo: heads, nonBlocking: true }); + * + * Pass `transports` to inject mock clients for testing — the same injected + * client is used for both blocking and non-blocking paths (tests supply + * their own mock that short-circuits the writeback). */ export function createDelivery( ctx: WorkforceCtx, @@ -33,20 +37,23 @@ export function createDelivery( ): DeliveryClient { const targets = resolveDeliveryTargets(ctx); - // Two sets of clients: blocking (waits for receipt) and non-blocking - // (0ms timeout, returns draft refs immediately for parentRef threading). - const slackBlocking = transports?.slack ?? (targets.includes('slack') + // Injected transports take priority. When not injected, construct real + // clients with appropriate timeouts. + const injectedSlack = transports?.slack; + const injectedTelegram = transports?.telegram; + + const slackBlocking = injectedSlack ?? (targets.includes('slack') ? slackClient({ writebackTimeoutMs: WRITEBACK_TIMEOUT_MS }) : undefined); - const slackNonBlocking = targets.includes('slack') + const slackNonBlocking = injectedSlack ?? (targets.includes('slack') ? slackClient({ writebackTimeoutMs: 0 }) - : undefined; - const telegramBlocking = transports?.telegram ?? (targets.includes('telegram') + : undefined); + const telegramBlocking = injectedTelegram ?? (targets.includes('telegram') ? telegramClient({ writebackTimeoutMs: WRITEBACK_TIMEOUT_MS }) : undefined); - const telegramNonBlocking = targets.includes('telegram') + const telegramNonBlocking = injectedTelegram ?? (targets.includes('telegram') ? telegramClient({ writebackTimeoutMs: 0 }) - : undefined; + : undefined); return new DeliveryClientImpl(ctx, targets, { slackBlocking, @@ -107,7 +114,7 @@ class DeliveryClientImpl implements DeliveryClient { await Promise.all(tasks); // In non-blocking mode, draft refs always succeed (no receipt wait to fail). - // Treat any ref as success. + // Treat any ref as success. In blocking mode, require all targets to succeed. const ok = nonBlocking ? refs.length > 0 : errors.length === 0 && refs.length === this.targets.length; @@ -116,7 +123,8 @@ class DeliveryClientImpl implements DeliveryClient { this.ctx.log?.('warn', 'delivery.partial-failure', { errors, nonBlocking }); } if (!ok && refs.length === 0) { - throw new Error(`Delivery failed to all targets: ${errors.join('; ')}`); + const detail = errors.length > 0 ? errors.join('; ') : 'all sends returned null (no configured targets)'; + throw new Error(`Delivery failed to all targets: ${detail}`); } return { ok, refs }; @@ -224,7 +232,7 @@ class DeliveryClientImpl implements DeliveryClient { return { provider: 'telegram', - chatId: String(result.chatId), + chatId: result.chatId != null ? String(result.chatId) : chatId, messageId: result.ok ? result.messageId : '' }; } diff --git a/packages/delivery/src/helpers.ts b/packages/delivery/src/helpers.ts index a9ea7a2e..1cf525bc 100644 --- a/packages/delivery/src/helpers.ts +++ b/packages/delivery/src/helpers.ts @@ -3,13 +3,23 @@ import type { WorkforceCtx } from '@agentworkforce/runtime'; /** * Resolve a persona input value from the runtime ctx. * - * Resolution order (mirrors persona-kit): agent-provided value → - * process env (`spec.env` or the input key) → spec default. + * Resolution order (mirrors persona-kit): ctx.persona.inputs (already + * resolved from agent value → env → default by the runtime) → fall back + * to process.env directly when running outside the full runtime. */ export function input(ctx: WorkforceCtx, name: string): string | undefined { const spec = ctx.persona.inputSpecs?.[name]; - const v = process.env[spec?.env ?? name] ?? ctx.persona.inputs?.[name] ?? spec?.default; - return v && String(v).trim() ? String(v).trim() : undefined; + // ctx.persona.inputs is already resolved by the runtime (agent value → + // env → default). Check it first since it reflects the canonical value. + const fromCtx = ctx.persona.inputs?.[name]; + if (fromCtx && String(fromCtx).trim()) return String(fromCtx).trim(); + // Fall back to raw process.env for local dev outside the full runtime. + const fromEnv = process.env[spec?.env ?? name]; + if (fromEnv && String(fromEnv).trim()) return String(fromEnv).trim(); + // Last resort: the spec default. + const def = spec?.default; + if (def != null && String(def).trim()) return String(def).trim(); + return undefined; } /** @@ -41,7 +51,8 @@ export async function withTimeout( /** * Fetch with an AbortController timeout. Returns the response on success, - * or undefined on timeout/network error (never throws). + * or undefined on timeout/network error (never throws). Preserves caller- + * provided signal by racing our abort against it. */ export async function fetchWithTimeout( url: string, @@ -50,11 +61,27 @@ export async function fetchWithTimeout( ): Promise { const controller = new AbortController(); const timer = setTimeout(() => controller.abort(), timeoutMs); + const signal = init.signal + ? anySignal([controller.signal, init.signal]) + : controller.signal; try { - return await fetch(url, { ...init, signal: controller.signal }); + return await fetch(url, { ...init, signal }); } catch { return undefined; } finally { clearTimeout(timer); } } + +/** Combine multiple AbortSignals — any one firing aborts the fetch. */ +function anySignal(signals: AbortSignal[]): AbortSignal { + const controller = new AbortController(); + for (const sig of signals) { + if (sig.aborted) { + controller.abort(sig.reason); + return controller.signal; + } + sig.addEventListener('abort', () => controller.abort(sig.reason), { once: true }); + } + return controller.signal; +} diff --git a/packages/delivery/src/types.ts b/packages/delivery/src/types.ts index a21c3908..4b4aec3f 100644 --- a/packages/delivery/src/types.ts +++ b/packages/delivery/src/types.ts @@ -1,6 +1,7 @@ import type { WorkforceCtx } from '@agentworkforce/runtime'; import type { SlackClient } from '@relayfile/relay-helpers'; import type { TelegramClient } from '@relayfile/relay-helpers'; +import { input } from './helpers.js'; // ── message reference (returned by send, accepted by reply) ────────────── @@ -51,7 +52,9 @@ export interface DeliveryOptions { // ── injectable transport seam (for tests) ──────────────────────────────── export interface DeliveryTransports { + /** Injected Slack client (used for both blocking and non-blocking paths). */ slack?: SlackClient; + /** Injected Telegram client (used for both blocking and non-blocking paths). */ telegram?: TelegramClient; } @@ -87,24 +90,29 @@ export interface DeliveryClient { /** * Resolve which transport targets are configured for the given persona ctx. - * Checks ctx.persona.inputs for SLACK_CHANNEL and TELEGRAM_CHAT. + * Uses input() helper for proper resolution order (ctx → env → default). */ export function resolveDeliveryTargets(ctx: WorkforceCtx): Array<'slack' | 'telegram'> { - const inputs = ctx.persona.inputs ?? {}; const targets: Array<'slack' | 'telegram'> = []; - if (inputs['SLACK_CHANNEL']?.trim()) targets.push('slack'); - if (inputs['TELEGRAM_CHAT']?.trim()) targets.push('telegram'); + if (input(ctx, 'SLACK_CHANNEL')) targets.push('slack'); + if (input(ctx, 'TELEGRAM_CHAT')) targets.push('telegram'); return targets; } -/** Get the configured slack channel id (bare, without `__name` suffix). */ +/** + * Get the configured slack channel id (bare, without `__name` suffix). + * Uses input() for proper resolution. + */ export function slackChannel(ctx: WorkforceCtx): string | undefined { - const raw = ctx.persona.inputs?.['SLACK_CHANNEL']; + const raw = input(ctx, 'SLACK_CHANNEL'); return raw?.split('__')[0]?.trim() || undefined; } -/** Get the configured telegram chat id (bare, without `__title` suffix). */ +/** + * Get the configured telegram chat id (bare, without `__title` suffix). + * Uses input() for proper resolution. + */ export function telegramChat(ctx: WorkforceCtx): string | undefined { - const raw = ctx.persona.inputs?.['TELEGRAM_CHAT']; + const raw = input(ctx, 'TELEGRAM_CHAT'); return raw?.split('__')[0]?.trim() || undefined; } From 686eaf518dbd684e4548dc20fe96c61f785d530b Mon Sep 17 00:00:00 2001 From: Ricky Schema Cascade Date: Tue, 23 Jun 2026 19:51:59 +0200 Subject: [PATCH 3/5] fix(delivery): restore 45s writeback timeout (was 15s) --- packages/delivery/src/delivery.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/delivery/src/delivery.ts b/packages/delivery/src/delivery.ts index c764a700..1e8b3af2 100644 --- a/packages/delivery/src/delivery.ts +++ b/packages/delivery/src/delivery.ts @@ -13,7 +13,7 @@ import { type TelegramRef } from './types.js'; -const WRITEBACK_TIMEOUT_MS = 15_000; +const WRITEBACK_TIMEOUT_MS = 45_000; /** * Create a delivery client that auto-discovers configured transports from From b35373291380f94f20ed20715e1f5cb2765307ad Mon Sep 17 00:00:00 2001 From: Ricky Schema Cascade Date: Tue, 23 Jun 2026 20:48:56 +0200 Subject: [PATCH 4/5] fix(delivery): add onlyTargets parameter for transport-scoped delivery --- packages/delivery/src/delivery.ts | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/packages/delivery/src/delivery.ts b/packages/delivery/src/delivery.ts index 1e8b3af2..cfddcc2e 100644 --- a/packages/delivery/src/delivery.ts +++ b/packages/delivery/src/delivery.ts @@ -33,9 +33,14 @@ const WRITEBACK_TIMEOUT_MS = 45_000; */ export function createDelivery( ctx: WorkforceCtx, - transports?: DeliveryTransports + transports?: DeliveryTransports, + /** Override which transports to target (defaults to all configured). */ + onlyTargets?: ReadonlyArray<'slack' | 'telegram'> ): DeliveryClient { - const targets = resolveDeliveryTargets(ctx); + const allTargets = resolveDeliveryTargets(ctx); + const targets = onlyTargets + ? allTargets.filter((t) => (onlyTargets as readonly string[]).includes(t)) + : allTargets; // Injected transports take priority. When not injected, construct real // clients with appropriate timeouts. From 4f609e10fa0a737ac22647a6f3187184f5b28a0b Mon Sep 17 00:00:00 2001 From: Ricky Schema Cascade Date: Tue, 23 Jun 2026 20:56:46 +0200 Subject: [PATCH 5/5] fix(delivery): use workspace:* for @agentworkforce/runtime dep MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Lockfile records workspace:* but manifest had ^4.0.0 — CI runs pnpm install --frozen-lockfile which fails on this mismatch. --- packages/delivery/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/delivery/package.json b/packages/delivery/package.json index 01b88780..965203c7 100644 --- a/packages/delivery/package.json +++ b/packages/delivery/package.json @@ -32,7 +32,7 @@ "lint": "tsc -p tsconfig.json --noEmit" }, "dependencies": { - "@agentworkforce/runtime": "^4.0.0", + "@agentworkforce/runtime": "workspace:*", "@relayfile/relay-helpers": "^0.4.2" } }