diff --git a/packages/delivery/package.json b/packages/delivery/package.json new file mode 100644 index 0000000..965203c --- /dev/null +++ b/packages/delivery/package.json @@ -0,0 +1,38 @@ +{ + "name": "@agentworkforce/delivery", + "version": "0.1.0", + "private": false, + "type": "module", + "main": "dist/index.js", + "types": "dist/index.d.ts", + "exports": { + ".": { + "types": "./dist/index.d.ts", + "default": "./dist/index.js" + }, + "./package.json": "./package.json" + }, + "files": [ + "dist", + "package.json" + ], + "repository": { + "type": "git", + "url": "https://github.com/AgentWorkforce/workforce", + "directory": "packages/delivery" + }, + "publishConfig": { + "access": "public" + }, + "scripts": { + "build": "tsc -p tsconfig.json", + "dev": "tsc -p tsconfig.json --watch --preserveWatchOutput", + "typecheck": "tsc -p tsconfig.json --noEmit", + "test": "tsc -p tsconfig.json && node --test dist/**/*.test.js dist/*.test.js", + "lint": "tsc -p tsconfig.json --noEmit" + }, + "dependencies": { + "@agentworkforce/runtime": "workspace:*", + "@relayfile/relay-helpers": "^0.4.2" + } +} diff --git a/packages/delivery/src/delivery.ts b/packages/delivery/src/delivery.ts new file mode 100644 index 0000000..cfddcc2 --- /dev/null +++ b/packages/delivery/src/delivery.ts @@ -0,0 +1,244 @@ +import { slackClient, telegramClient } from '@relayfile/relay-helpers'; +import type { SlackClient, TelegramClient } from '@relayfile/relay-helpers'; +import type { WorkforceCtx } from '@agentworkforce/runtime'; +import { + resolveDeliveryTargets, + slackChannel, + telegramChat, + type DeliveryClient, + type DeliveryOptions, + type DeliveryResult, + type DeliveryTransports, + type SlackRef, + type TelegramRef +} from './types.js'; + +const WRITEBACK_TIMEOUT_MS = 45_000; + +/** + * Create a delivery client that auto-discovers configured transports from + * the persona context and sends to all of them. + * + * Blocking mode (the default): + * const heads = await delivery.send(header); + * await delivery.send(body, { replyTo: heads }); + * + * Non-blocking parentRef mode (zero receipt round-trips): + * const heads = await delivery.publish(header); + * await delivery.send(body, { replyTo: heads, nonBlocking: true }); + * + * Pass `transports` to inject mock clients for testing — the same injected + * client is used for both blocking and non-blocking paths (tests supply + * their own mock that short-circuits the writeback). + */ +export function createDelivery( + ctx: WorkforceCtx, + transports?: DeliveryTransports, + /** Override which transports to target (defaults to all configured). */ + onlyTargets?: ReadonlyArray<'slack' | 'telegram'> +): DeliveryClient { + const allTargets = resolveDeliveryTargets(ctx); + const targets = onlyTargets + ? allTargets.filter((t) => (onlyTargets as readonly string[]).includes(t)) + : allTargets; + + // Injected transports take priority. When not injected, construct real + // clients with appropriate timeouts. + const injectedSlack = transports?.slack; + const injectedTelegram = transports?.telegram; + + const slackBlocking = injectedSlack ?? (targets.includes('slack') + ? slackClient({ writebackTimeoutMs: WRITEBACK_TIMEOUT_MS }) + : undefined); + const slackNonBlocking = injectedSlack ?? (targets.includes('slack') + ? slackClient({ writebackTimeoutMs: 0 }) + : undefined); + const telegramBlocking = injectedTelegram ?? (targets.includes('telegram') + ? telegramClient({ writebackTimeoutMs: WRITEBACK_TIMEOUT_MS }) + : undefined); + const telegramNonBlocking = injectedTelegram ?? (targets.includes('telegram') + ? telegramClient({ writebackTimeoutMs: 0 }) + : undefined); + + return new DeliveryClientImpl(ctx, targets, { + slackBlocking, + slackNonBlocking, + telegramBlocking, + telegramNonBlocking + }); +} + +interface DeliveryTransportsInternal { + slackBlocking?: SlackClient; + slackNonBlocking?: SlackClient; + telegramBlocking?: TelegramClient; + telegramNonBlocking?: TelegramClient; +} + +class DeliveryClientImpl implements DeliveryClient { + readonly targets: ReadonlyArray<'slack' | 'telegram'>; + + private ctx: WorkforceCtx; + private t: DeliveryTransportsInternal; + + constructor( + ctx: WorkforceCtx, + targets: Array<'slack' | 'telegram'>, + transports: DeliveryTransportsInternal + ) { + this.ctx = ctx; + this.targets = targets; + this.t = transports; + } + + async send(text: string, opts?: DeliveryOptions): Promise { + const nonBlocking = opts?.nonBlocking === true; + const refs: Array = []; + const errors: string[] = []; + + const tasks: Promise[] = []; + + for (const target of this.targets) { + const parentRef = opts?.replyTo?.refs.find((r) => r.provider === target); + if (target === 'slack') { + tasks.push( + this.sendSlack(text, parentRef as SlackRef | undefined, nonBlocking) + .then((ref) => { if (ref) refs.push(ref); }) + .catch((err) => { errors.push(`slack: ${String(err)}`); }) + ); + } + if (target === 'telegram') { + tasks.push( + this.sendTelegram(text, parentRef as TelegramRef | undefined, nonBlocking) + .then((ref) => { if (ref) refs.push(ref); }) + .catch((err) => { errors.push(`telegram: ${String(err)}`); }) + ); + } + } + + await Promise.all(tasks); + + // In non-blocking mode, draft refs always succeed (no receipt wait to fail). + // Treat any ref as success. In blocking mode, require all targets to succeed. + const ok = nonBlocking + ? refs.length > 0 + : errors.length === 0 && refs.length === this.targets.length; + + if (!ok && errors.length > 0) { + this.ctx.log?.('warn', 'delivery.partial-failure', { errors, nonBlocking }); + } + if (!ok && refs.length === 0) { + const detail = errors.length > 0 ? errors.join('; ') : 'all sends returned null (no configured targets)'; + throw new Error(`Delivery failed to all targets: ${detail}`); + } + + return { ok, refs }; + } + + async publish(text: string): Promise { + return this.send(text, { nonBlocking: true }); + } + + // ── Slack ────────────────────────────────────────────────────────────── + + private async sendSlack( + text: string, + parentRef: SlackRef | undefined, + nonBlocking: boolean + ): Promise { + const channel = slackChannel(this.ctx); + if (!channel) return null; + + if (nonBlocking) { + return this.sendSlackNonBlocking(channel, text, parentRef); + } + return this.sendSlackBlocking(channel, text, parentRef); + } + + private async sendSlackBlocking( + channel: string, + text: string, + parentRef?: SlackRef + ): Promise { + const client = this.t.slackBlocking; + if (!client) return null; + + const result = parentRef?.draftRef + ? await client.post(channel, text, { replyTo: parentRef.draftRef }) + : await client.post(channel, text); + + if (!result.ts) { + this.ctx.log?.('warn', 'delivery.slack.no-receipt', { channel }); + return null; + } + + return { + provider: 'slack', + channel: result.channel, + ts: result.ts, + draftRef: result.ref + }; + } + + /** + * Non-blocking Slack: uses messages.write() directly with writebackTimeoutMs:0. + * The parentRef is embedded in the message body so the cloud orders the message + * under the parent server-side — zero receipt round-trips. The returned draftRef + * is the relay path, usable as a parent for subsequent threaded sends. + * + * Mirrors the x-reply-radar parentRef threading pattern (internal-agents). + */ + private async sendSlackNonBlocking( + channel: string, + text: string, + parentRef?: SlackRef + ): Promise { + const client = this.t.slackNonBlocking; + if (!client) return null; + + const body: Record = { text }; + if (parentRef?.draftRef) { + // Embed parentRef in the body — the cloud lifts it from the streamed head + // and orders this message under the parent once the parent delivers. + body.parentRef = parentRef.draftRef; + } + + const result = await client.messages.write({ channelId: channel }, body); + + return { + provider: 'slack', + channel, + ts: '', // Not available yet (non-blocking) + draftRef: result.path + }; + } + + // ── Telegram ─────────────────────────────────────────────────────────── + + private async sendTelegram( + text: string, + parentRef: TelegramRef | undefined, + nonBlocking: boolean + ): Promise { + const chatId = telegramChat(this.ctx); + if (!chatId) return null; + + const client = nonBlocking ? this.t.telegramNonBlocking : this.t.telegramBlocking; + if (!client) return null; + + const result = parentRef?.messageId + ? await client.sendMessage(chatId, text, { replyToMessageId: Number(parentRef.messageId) || undefined }) + : await client.sendMessage(chatId, text); + + if (!nonBlocking && !result.ok) { + this.ctx.log?.('warn', 'delivery.telegram.no-receipt', { chatId }); + return null; + } + + return { + provider: 'telegram', + chatId: result.chatId != null ? String(result.chatId) : chatId, + messageId: result.ok ? result.messageId : '' + }; + } +} diff --git a/packages/delivery/src/helpers.ts b/packages/delivery/src/helpers.ts new file mode 100644 index 0000000..1cf525b --- /dev/null +++ b/packages/delivery/src/helpers.ts @@ -0,0 +1,87 @@ +import type { WorkforceCtx } from '@agentworkforce/runtime'; + +/** + * Resolve a persona input value from the runtime ctx. + * + * Resolution order (mirrors persona-kit): ctx.persona.inputs (already + * resolved from agent value → env → default by the runtime) → fall back + * to process.env directly when running outside the full runtime. + */ +export function input(ctx: WorkforceCtx, name: string): string | undefined { + const spec = ctx.persona.inputSpecs?.[name]; + // ctx.persona.inputs is already resolved by the runtime (agent value → + // env → default). Check it first since it reflects the canonical value. + const fromCtx = ctx.persona.inputs?.[name]; + if (fromCtx && String(fromCtx).trim()) return String(fromCtx).trim(); + // Fall back to raw process.env for local dev outside the full runtime. + const fromEnv = process.env[spec?.env ?? name]; + if (fromEnv && String(fromEnv).trim()) return String(fromEnv).trim(); + // Last resort: the spec default. + const def = spec?.default; + if (def != null && String(def).trim()) return String(def).trim(); + return undefined; +} + +/** + * Split a comma-separated string into trimmed, non-empty entries. + */ +export function list(raw: string | undefined): string[] { + return (raw ?? '').split(',').map((s) => s.trim()).filter(Boolean); +} + +/** + * Race a promise against a timeout. On timeout the timer rejects with an + * Error so the caller can catch and fall back. Always clears the timer. + */ +export async function withTimeout( + p: Promise, + ms: number, + label: string +): Promise { + let timer: ReturnType; + const timeout = new Promise((_, rej) => { + timer = setTimeout(() => rej(new Error(`${label} timed out after ${ms}ms`)), ms); + }); + try { + return await Promise.race([p, timeout]); + } finally { + clearTimeout(timer!); + } +} + +/** + * Fetch with an AbortController timeout. Returns the response on success, + * or undefined on timeout/network error (never throws). Preserves caller- + * provided signal by racing our abort against it. + */ +export async function fetchWithTimeout( + url: string, + init: RequestInit = {}, + timeoutMs: number = 8_000 +): Promise { + const controller = new AbortController(); + const timer = setTimeout(() => controller.abort(), timeoutMs); + const signal = init.signal + ? anySignal([controller.signal, init.signal]) + : controller.signal; + try { + return await fetch(url, { ...init, signal }); + } catch { + return undefined; + } finally { + clearTimeout(timer); + } +} + +/** Combine multiple AbortSignals — any one firing aborts the fetch. */ +function anySignal(signals: AbortSignal[]): AbortSignal { + const controller = new AbortController(); + for (const sig of signals) { + if (sig.aborted) { + controller.abort(sig.reason); + return controller.signal; + } + sig.addEventListener('abort', () => controller.abort(sig.reason), { once: true }); + } + return controller.signal; +} diff --git a/packages/delivery/src/index.ts b/packages/delivery/src/index.ts new file mode 100644 index 0000000..3836b3c --- /dev/null +++ b/packages/delivery/src/index.ts @@ -0,0 +1,15 @@ +export { createDelivery } from './delivery.js'; +export { + resolveDeliveryTargets, + slackChannel, + telegramChat, + type DeliveryClient, + type DeliveryOptions, + type DeliveryResult, + type DeliveryTransports, + type MessageRef, + type SlackRef, + type TelegramRef +} from './types.js'; + +export { input, list, withTimeout, fetchWithTimeout } from './helpers.js'; diff --git a/packages/delivery/src/types.ts b/packages/delivery/src/types.ts new file mode 100644 index 0000000..4b4aec3 --- /dev/null +++ b/packages/delivery/src/types.ts @@ -0,0 +1,118 @@ +import type { WorkforceCtx } from '@agentworkforce/runtime'; +import type { SlackClient } from '@relayfile/relay-helpers'; +import type { TelegramClient } from '@relayfile/relay-helpers'; +import { input } from './helpers.js'; + +// ── message reference (returned by send, accepted by reply) ────────────── + +export interface SlackRef { + provider: 'slack'; + channel: string; + /** Delivered message ts (set after the writeback receipt arrives). */ + ts: string; + /** Draft ref for cloud-side replyTo threading. */ + draftRef: string; +} + +export interface TelegramRef { + provider: 'telegram'; + chatId: string; + /** Delivered message id (set after the writeback receipt arrives). */ + messageId: string; +} + +export type MessageRef = SlackRef | TelegramRef; + +// ── delivery result ───────────────────────────────────────────────────── + +export interface DeliveryResult { + ok: boolean; + refs: MessageRef[]; +} + +// ── options ────────────────────────────────────────────────────────────── + +export interface DeliveryOptions { + /** Thread the message under a prior delivery result. */ + replyTo?: DeliveryResult; + /** + * When true, don't wait for the writeback receipt. Returns draft refs + * immediately and relies on the cloud's server-side ordering for + * threading (Slack parentRef pattern, Telegram sendMessage with 0ms + * timeout). The returned refs have empty ts/messageId but valid + * draftRef for use as a parent in subsequent threaded sends. + * + * Use this for the header in a header+threaded-body pattern so the + * digest never blocks on a receipt — the cloud orders the threaded + * body under the header server-side. + */ + nonBlocking?: boolean; +} + +// ── injectable transport seam (for tests) ──────────────────────────────── + +export interface DeliveryTransports { + /** Injected Slack client (used for both blocking and non-blocking paths). */ + slack?: SlackClient; + /** Injected Telegram client (used for both blocking and non-blocking paths). */ + telegram?: TelegramClient; +} + +// ── delivery client ────────────────────────────────────────────────────── + +export interface DeliveryClient { + /** + * Send a message to all configured targets. + * + * When `opts.replyTo` is set the message is threaded under the targets + * from that prior `DeliveryResult`, each using its transport's native + * threading mechanism. + * + * In blocking mode (default): waits for the writeback receipt and returns + * the delivered ts/messageId. In non-blocking mode (`opts.nonBlocking: true`): + * returns draft refs immediately with the relay path as draftRef — zero + * receipt round-trips, cloud-side server ordering handles threading. + */ + send(text: string, opts?: DeliveryOptions): Promise; + + /** + * Convenience: same as `send(text, { nonBlocking: true })`. + * Publish a message without waiting for a receipt. Returns draft refs + * immediately for use as a threading parent. + */ + publish(text: string): Promise; + + /** Which providers are configured. */ + readonly targets: ReadonlyArray<'slack' | 'telegram'>; +} + +// ── configuration discovery ────────────────────────────────────────────── + +/** + * Resolve which transport targets are configured for the given persona ctx. + * Uses input() helper for proper resolution order (ctx → env → default). + */ +export function resolveDeliveryTargets(ctx: WorkforceCtx): Array<'slack' | 'telegram'> { + const targets: Array<'slack' | 'telegram'> = []; + if (input(ctx, 'SLACK_CHANNEL')) targets.push('slack'); + if (input(ctx, 'TELEGRAM_CHAT')) targets.push('telegram'); + return targets; +} + +/** + * Get the configured slack channel id (bare, without `__name` suffix). + * Uses input() for proper resolution. + */ +export function slackChannel(ctx: WorkforceCtx): string | undefined { + const raw = input(ctx, 'SLACK_CHANNEL'); + return raw?.split('__')[0]?.trim() || undefined; +} + +/** + * Get the configured telegram chat id (bare, without `__title` suffix). + * Uses input() for proper resolution. + */ +export function telegramChat(ctx: WorkforceCtx): string | undefined { + const raw = input(ctx, 'TELEGRAM_CHAT'); + return raw?.split('__')[0]?.trim() || undefined; +} diff --git a/packages/delivery/tsconfig.json b/packages/delivery/tsconfig.json new file mode 100644 index 0000000..df59da5 --- /dev/null +++ b/packages/delivery/tsconfig.json @@ -0,0 +1,8 @@ +{ + "extends": "../../tsconfig.base.json", + "compilerOptions": { + "rootDir": "src", + "outDir": "dist" + }, + "include": ["src/**/*.ts"] +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index f371df5..3d3ac4d 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -68,6 +68,15 @@ importers: specifier: ^0.185.0 version: 0.185.0 + packages/delivery: + dependencies: + '@agentworkforce/runtime': + specifier: workspace:* + version: link:../runtime + '@relayfile/relay-helpers': + specifier: ^0.4.2 + version: 0.4.2(@relayfile/sdk@0.7.40) + packages/deploy: dependencies: '@agent-relay/cloud': @@ -961,6 +970,19 @@ packages: peerDependencies: '@relayfile/sdk': '>=0.6.0 <1' + '@relayfile/adapter-core@0.4.3': + resolution: {integrity: sha512-VoGco3xYab+Ij9v2wSi6qsYg15gDfYWlZsdbiKrqM/YN+t0IhPS8Shy5dRA18kYCBKEr7bH7QdFfa8PCXj6LFg==} + engines: {node: '>=18'} + hasBin: true + peerDependencies: + '@relayfile/sdk': '>=0.6.0 <1' + + '@relayfile/adapter-linear@0.4.2': + resolution: {integrity: sha512-Yg0hv/RqEB5WJ4RA4LX9JinCxxT/01GSdd+0HeT0L9P36EX27kEEuzWmNwWEpjDHQMcYykEocruEJHuV4XXGCQ==} + engines: {node: '>=18'} + peerDependencies: + '@relayfile/sdk': '>=0.6.0 <1' + '@relayfile/core@0.7.40': resolution: {integrity: sha512-vY48SxZgahnvE0CHDyy/iny17ypnfbX5myVPtocZVNpMiz4dS1iabO70WR+uYVgSiIZR7MpKTPFSs3SxEjQWag==} engines: {node: '>=18'} @@ -969,6 +991,9 @@ packages: resolution: {integrity: sha512-h2ac/frfSFgjrKDSBCkmMHjphx19XZrHrSjaI29wlTzSblfiE65MQST47lS6Fm1+K9yap+W+msUi8JRIYYWFEw==} engines: {node: '>=18'} + '@relayfile/relay-helpers@0.4.2': + resolution: {integrity: sha512-/SnZW5vEtNBJkYslLOuw8JIMU1edYmY9AXEJF52k4H3Q1Z/I3GPAShXCbQmCry07Q42CM/+1SEk1mLzDaU5cVw==} + '@relayfile/sdk@0.7.40': resolution: {integrity: sha512-9qPQ/qSexD11x/CKZlbZUFC7KDdxJ2yYBQd1nuzcoHNqnvErplq5bDBy5Dqr1AXID9i1Q7sMxnY8M1raMTQsAg==} engines: {node: '>=18'} @@ -3283,6 +3308,19 @@ snapshots: minimatch: 10.2.5 yaml: 2.9.0 + '@relayfile/adapter-core@0.4.3(@relayfile/sdk@0.7.40)': + dependencies: + '@relayfile/sdk': 0.7.40 + '@scalar/postman-to-openapi': 0.6.3 + cheerio: 1.2.0 + minimatch: 10.2.5 + yaml: 2.9.0 + + '@relayfile/adapter-linear@0.4.2(@relayfile/sdk@0.7.40)': + dependencies: + '@relayfile/adapter-core': 0.4.3(@relayfile/sdk@0.7.40) + '@relayfile/sdk': 0.7.40 + '@relayfile/core@0.7.40': {} '@relayfile/local-mount@0.7.24': @@ -3290,6 +3328,13 @@ snapshots: '@parcel/watcher': 2.5.6 ignore: 7.0.5 + '@relayfile/relay-helpers@0.4.2(@relayfile/sdk@0.7.40)': + dependencies: + '@relayfile/adapter-core': 0.4.3(@relayfile/sdk@0.7.40) + '@relayfile/adapter-linear': 0.4.2(@relayfile/sdk@0.7.40) + transitivePeerDependencies: + - '@relayfile/sdk' + '@relayfile/sdk@0.7.40': dependencies: '@relayfile/core': 0.7.40