Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions packages/delivery/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
{
"name": "@agentworkforce/delivery",
"version": "0.1.0",
"private": false,
"type": "module",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"exports": {
".": {
"types": "./dist/index.d.ts",
"default": "./dist/index.js"
},
"./package.json": "./package.json"
},
"files": [
"dist",
"package.json"
],
"repository": {
"type": "git",
"url": "https://github.com/AgentWorkforce/workforce",
"directory": "packages/delivery"
},
"publishConfig": {
"access": "public"
},
"scripts": {
"build": "tsc -p tsconfig.json",
"dev": "tsc -p tsconfig.json --watch --preserveWatchOutput",
"typecheck": "tsc -p tsconfig.json --noEmit",
"test": "tsc -p tsconfig.json && node --test dist/**/*.test.js dist/*.test.js",
"lint": "tsc -p tsconfig.json --noEmit"
},
"dependencies": {
"@agentworkforce/runtime": "workspace:*",
"@relayfile/relay-helpers": "^0.4.2"
}
}
244 changes: 244 additions & 0 deletions packages/delivery/src/delivery.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
import { slackClient, telegramClient } from '@relayfile/relay-helpers';
import type { SlackClient, TelegramClient } from '@relayfile/relay-helpers';
import type { WorkforceCtx } from '@agentworkforce/runtime';
import {
resolveDeliveryTargets,
slackChannel,
telegramChat,
type DeliveryClient,
type DeliveryOptions,
type DeliveryResult,
type DeliveryTransports,
type SlackRef,
type TelegramRef
} from './types.js';

const WRITEBACK_TIMEOUT_MS = 45_000;

/**
* Create a delivery client that auto-discovers configured transports from
* the persona context and sends to all of them.
*
* Blocking mode (the default):
* const heads = await delivery.send(header);
* await delivery.send(body, { replyTo: heads });
*
* Non-blocking parentRef mode (zero receipt round-trips):
* const heads = await delivery.publish(header);
* await delivery.send(body, { replyTo: heads, nonBlocking: true });
*
* Pass `transports` to inject mock clients for testing — the same injected
* client is used for both blocking and non-blocking paths (tests supply
* their own mock that short-circuits the writeback).
*/
export function createDelivery(
ctx: WorkforceCtx,
transports?: DeliveryTransports,
/** Override which transports to target (defaults to all configured). */
onlyTargets?: ReadonlyArray<'slack' | 'telegram'>
): DeliveryClient {
const allTargets = resolveDeliveryTargets(ctx);
const targets = onlyTargets
? allTargets.filter((t) => (onlyTargets as readonly string[]).includes(t))
: allTargets;

// Injected transports take priority. When not injected, construct real
// clients with appropriate timeouts.
const injectedSlack = transports?.slack;
const injectedTelegram = transports?.telegram;

const slackBlocking = injectedSlack ?? (targets.includes('slack')
? slackClient({ writebackTimeoutMs: WRITEBACK_TIMEOUT_MS })
: undefined);
const slackNonBlocking = injectedSlack ?? (targets.includes('slack')
? slackClient({ writebackTimeoutMs: 0 })
: undefined);
const telegramBlocking = injectedTelegram ?? (targets.includes('telegram')
? telegramClient({ writebackTimeoutMs: WRITEBACK_TIMEOUT_MS })
: undefined);
const telegramNonBlocking = injectedTelegram ?? (targets.includes('telegram')
? telegramClient({ writebackTimeoutMs: 0 })
: undefined);

return new DeliveryClientImpl(ctx, targets, {
slackBlocking,
slackNonBlocking,
telegramBlocking,
telegramNonBlocking
});
}

interface DeliveryTransportsInternal {
slackBlocking?: SlackClient;
slackNonBlocking?: SlackClient;
telegramBlocking?: TelegramClient;
telegramNonBlocking?: TelegramClient;
}

class DeliveryClientImpl implements DeliveryClient {
readonly targets: ReadonlyArray<'slack' | 'telegram'>;

private ctx: WorkforceCtx;
private t: DeliveryTransportsInternal;

constructor(
ctx: WorkforceCtx,
targets: Array<'slack' | 'telegram'>,
transports: DeliveryTransportsInternal
) {
this.ctx = ctx;
this.targets = targets;
this.t = transports;
}

async send(text: string, opts?: DeliveryOptions): Promise<DeliveryResult> {
const nonBlocking = opts?.nonBlocking === true;
Comment on lines +94 to +95

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

If this.targets is empty (i.e., neither Slack nor Telegram is configured), the send method behaves inconsistently:

  • In blocking mode (nonBlocking = false), it returns { ok: true, refs: [] }.
  • In non-blocking mode (nonBlocking = true), it throws an error: Delivery failed to all targets: (with an empty error list) because refs.length > 0 is false.

We should check if this.targets.length === 0 at the beginning of send and throw a clear, consistent error.

Suggested change
async send(text: string, opts?: DeliveryOptions): Promise<DeliveryResult> {
const nonBlocking = opts?.nonBlocking === true;
async send(text: string, opts?: DeliveryOptions): Promise<DeliveryResult> {
if (this.targets.length === 0) {
throw new Error('No delivery targets configured');
}
const nonBlocking = opts?.nonBlocking === true;

const refs: Array<SlackRef | TelegramRef> = [];
const errors: string[] = [];

const tasks: Promise<void>[] = [];

for (const target of this.targets) {
const parentRef = opts?.replyTo?.refs.find((r) => r.provider === target);
if (target === 'slack') {
tasks.push(
this.sendSlack(text, parentRef as SlackRef | undefined, nonBlocking)
.then((ref) => { if (ref) refs.push(ref); })
.catch((err) => { errors.push(`slack: ${String(err)}`); })
);
}
if (target === 'telegram') {
tasks.push(
this.sendTelegram(text, parentRef as TelegramRef | undefined, nonBlocking)
.then((ref) => { if (ref) refs.push(ref); })
.catch((err) => { errors.push(`telegram: ${String(err)}`); })
);
}
}

await Promise.all(tasks);

// In non-blocking mode, draft refs always succeed (no receipt wait to fail).
// Treat any ref as success. In blocking mode, require all targets to succeed.
const ok = nonBlocking
? refs.length > 0
: errors.length === 0 && refs.length === this.targets.length;

if (!ok && errors.length > 0) {
this.ctx.log?.('warn', 'delivery.partial-failure', { errors, nonBlocking });
}
if (!ok && refs.length === 0) {
const detail = errors.length > 0 ? errors.join('; ') : 'all sends returned null (no configured targets)';
throw new Error(`Delivery failed to all targets: ${detail}`);
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

return { ok, refs };
}

async publish(text: string): Promise<DeliveryResult> {
return this.send(text, { nonBlocking: true });
}

// ── Slack ──────────────────────────────────────────────────────────────

private async sendSlack(
text: string,
parentRef: SlackRef | undefined,
nonBlocking: boolean
): Promise<SlackRef | null> {
const channel = slackChannel(this.ctx);
if (!channel) return null;

if (nonBlocking) {
return this.sendSlackNonBlocking(channel, text, parentRef);
}
return this.sendSlackBlocking(channel, text, parentRef);
}

private async sendSlackBlocking(
channel: string,
text: string,
parentRef?: SlackRef
): Promise<SlackRef | null> {
const client = this.t.slackBlocking;
if (!client) return null;

const result = parentRef?.draftRef
? await client.post(channel, text, { replyTo: parentRef.draftRef })
: await client.post(channel, text);

if (!result.ts) {
this.ctx.log?.('warn', 'delivery.slack.no-receipt', { channel });
return null;
}

return {
provider: 'slack',
channel: result.channel,
ts: result.ts,
draftRef: result.ref
};
}

/**
* Non-blocking Slack: uses messages.write() directly with writebackTimeoutMs:0.
* The parentRef is embedded in the message body so the cloud orders the message
* under the parent server-side — zero receipt round-trips. The returned draftRef
* is the relay path, usable as a parent for subsequent threaded sends.
*
* Mirrors the x-reply-radar parentRef threading pattern (internal-agents).
*/
private async sendSlackNonBlocking(
channel: string,
text: string,
parentRef?: SlackRef
): Promise<SlackRef | null> {
const client = this.t.slackNonBlocking;
if (!client) return null;

const body: Record<string, unknown> = { text };
if (parentRef?.draftRef) {
// Embed parentRef in the body — the cloud lifts it from the streamed head
// and orders this message under the parent once the parent delivers.
body.parentRef = parentRef.draftRef;
}

const result = await client.messages.write({ channelId: channel }, body);

return {
provider: 'slack',
channel,
ts: '', // Not available yet (non-blocking)
draftRef: result.path
};
}

// ── Telegram ───────────────────────────────────────────────────────────

private async sendTelegram(
text: string,
parentRef: TelegramRef | undefined,
nonBlocking: boolean
): Promise<TelegramRef | null> {
const chatId = telegramChat(this.ctx);
if (!chatId) return null;

const client = nonBlocking ? this.t.telegramNonBlocking : this.t.telegramBlocking;
if (!client) return null;

const result = parentRef?.messageId
? await client.sendMessage(chatId, text, { replyToMessageId: Number(parentRef.messageId) || undefined })
: await client.sendMessage(chatId, text);

if (!nonBlocking && !result.ok) {
this.ctx.log?.('warn', 'delivery.telegram.no-receipt', { chatId });
return null;
}

return {
provider: 'telegram',
chatId: result.chatId != null ? String(result.chatId) : chatId,
messageId: result.ok ? result.messageId : ''

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Preserve a Telegram parent ref when publishing non-blocking

In the documented publish(header) then send(body, { replyTo: heads, nonBlocking: true }) flow for Telegram, the 0ms writeback path has no receipt, so result.ok is false and this stores an empty messageId. The next Telegram send only sets replyToMessageId when parentRef.messageId is truthy, so the body is posted as a top-level Telegram message instead of under the header; carry the draft result.ref/parentRef or avoid claiming non-blocking Telegram threading.

Useful? React with 👍 / 👎.

Comment on lines +229 to +241

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎯 Functional Correctness | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
rg -nP -C4 '\bsendMessage\b|replyToMessageId|messageId|chatId' $(fd -p 'relay-helpers' -t d node_modules 2>/dev/null) 2>/dev/null

Repository: AgentWorkforce/workforce

Length of output: 162


🏁 Script executed:

# First, locate the delivery.ts file and examine the context around lines 216-228
fd -type f -name "delivery.ts" packages/delivery/src/

Repository: AgentWorkforce/workforce

Length of output: 333


🏁 Script executed:

# Locate the delivery.ts file
fd -t f -name "delivery.ts" packages/delivery/src/

Repository: AgentWorkforce/workforce

Length of output: 393


🏁 Script executed:

# Find the delivery.ts file using find command
find . -name "delivery.ts" -path "*/packages/delivery/src/*"

Repository: AgentWorkforce/workforce

Length of output: 198


🏁 Script executed:

# Read the delivery.ts file to see the context around lines 216-228
cat -n ./packages/delivery/src/delivery.ts | sed -n '200,240p'

Repository: AgentWorkforce/workforce

Length of output: 1296


🏁 Script executed:

# Now let's find where the telegram client is defined/imported
grep -n "telegramBlocking\|telegramNonBlocking\|telegram.*Client" ./packages/delivery/src/delivery.ts | head -20

Repository: AgentWorkforce/workforce

Length of output: 729


🏁 Script executed:

# Find where TelegramClient is defined - likely in relay-helpers
find . -name "*.ts" -o -name "*.d.ts" | xargs grep -l "TelegramClient" | head -5

Repository: AgentWorkforce/workforce

Length of output: 231


🏁 Script executed:

# Check the types.ts file for TelegramClient definition
cat -n ./packages/delivery/src/types.ts

Repository: AgentWorkforce/workforce

Length of output: 4884


🏁 Script executed:

# Search for relay-helpers to understand what sendMessage returns
find . -name "relay-helpers" -o -path "*/node_modules/@relayfile/relay-helpers*" 2>/dev/null | head -5

Repository: AgentWorkforce/workforce

Length of output: 162


🏁 Script executed:

# Search for test files or usage of sendMessage in the codebase
find . -name "*.test.ts" -o -name "*.spec.ts" | xargs grep -l "sendMessage\|sendTelegram" 2>/dev/null | head -5

Repository: AgentWorkforce/workforce

Length of output: 162


🏁 Script executed:

# Let's look at the Slack implementation for comparison
cat -n ./packages/delivery/src/delivery.ts | sed -n '140,200p'

Repository: AgentWorkforce/workforce

Length of output: 2470


🏁 Script executed:

# Let's see the full send method to understand the flow
cat -n ./packages/delivery/src/delivery.ts | sed -n '100,145p'

Repository: AgentWorkforce/workforce

Length of output: 1871


🏁 Script executed:

# Check package.json to see relay-helpers version/repo
find . -name "package.json" -path "*/packages/delivery/*" -exec cat {} \;

Repository: AgentWorkforce/workforce

Length of output: 1112


🏁 Script executed:

# Search for relay-helpers documentation or type definitions in node_modules or package info
grep -r "interface.*TelegramClient\|sendMessage.*return\|type.*sendMessage" . --include="*.ts" --include="*.d.ts" 2>/dev/null | head -20

Repository: AgentWorkforce/workforce

Length of output: 162


🏁 Script executed:

# Let's search for comments or documentation about sendMessage behavior in non-blocking mode
grep -n "writebackTimeoutMs\|sendMessage\|non-blocking\|nonBlocking" ./packages/delivery/src/delivery.ts

Repository: AgentWorkforce/workforce

Length of output: 1558


🌐 Web query:

relay-helpers telegram client sendMessage non-blocking writebackTimeoutMs documentation

💡 Result:

There is no standardized Telegram client API, library, or documentation that defines parameters named "relay-helpers," "sendMessage," or "writebackTimeoutMs" as a unified, interconnected feature set [1][2][3][4][5][6][7]. Based on current technical documentation and repository patterns: 1. Relay Helpers: The term "relay" often refers to custom, project-specific middleware or wrappers designed to forward messages between systems (e.g., Claude Code, local notification services, or HTTP-to-Telegram bridges) [1][3][4][8]. These are typically bespoke implementations rather than a shared, standardized library. 2. writebackTimeoutMs: This parameter does not exist in standard Telegram Bot API documentation [4][9][10]. It is likely a custom configuration option within a specific, private, or project-specific relay implementation intended to manage asynchronous write operations or response timeouts for bridged messages [1][3]. 3. Non-blocking sendMessage: In the context of Telegram clients (such as Telethon or python-telegram-bot), "non-blocking" behavior is typically achieved by using native asynchronous programming patterns (e.g., async/await in Python or JavaScript) rather than a specific parameter named "non-blocking" [5][7][11]. If you are encountering this in a specific codebase, it is likely a proprietary wrapper created to handle rate limits, retries, or asynchronous message queuing specific to that application [2][3]. You should consult the README.md or local docs/ folder within that specific project repository for documentation regarding its internal configuration [1][7].

Citations:


🏁 Script executed:

# Let's look for any TypeScript type definitions or interfaces related to sendMessage result
grep -r "result\.ok\|result\.messageId\|result\.chatId" ./packages/delivery/src/ --include="*.ts"

Repository: AgentWorkforce/workforce

Length of output: 396


Telegram non-blocking threading is broken; parentRef threading relies on the previous messageId which is empty.

In non-blocking mode, sendMessage returns messageId: '' (line 228, when result.ok is false). On the next threaded send, parentRef?.messageId is '' (falsy), so line 217 skips the threading call—Slack avoids this by embedding parentRef in the message body for cloud-side ordering (lines 186–190), but Telegram has no equivalent. The documented behavior (types.ts lines 38–46) claims Telegram supports cloud-side ordering with writebackTimeoutMs: 0, but the implementation only applies this to Slack.

Fix by either:

  1. Implementing an embedded parentRef mechanism for Telegram (mirroring Slack's pattern), or
  2. Documenting that Telegram non-blocking mode does not support threading.

Additionally, String(result.chatId) on line 227 yields 'undefined' if chatId is missing from the result.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/delivery/src/delivery.ts` around lines 216 - 228, In the Telegram
delivery function, when non-blocking mode is enabled and sendMessage fails
(result.ok is false), the returned messageId is an empty string. This breaks
threading on subsequent messages because parentRef.messageId becomes empty and
the condition on line 217 fails to apply replyToMessageId. Fix this by either
implementing an embedded parentRef mechanism in the message text (similar to the
Slack pattern on lines 186-190) to preserve threading context without relying on
messageId, or add a comment documenting that Telegram non-blocking mode does not
support threading. Additionally, on line 227 where String(result.chatId) is
called, add a check to ensure result.chatId exists before converting it to a
string to prevent returning the literal string 'undefined'.

};
}
}
87 changes: 87 additions & 0 deletions packages/delivery/src/helpers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import type { WorkforceCtx } from '@agentworkforce/runtime';

/**
* Resolve a persona input value from the runtime ctx.
*
* Resolution order (mirrors persona-kit): ctx.persona.inputs (already
* resolved from agent value → env → default by the runtime) → fall back
* to process.env directly when running outside the full runtime.
*/
export function input(ctx: WorkforceCtx, name: string): string | undefined {
const spec = ctx.persona.inputSpecs?.[name];
// ctx.persona.inputs is already resolved by the runtime (agent value →
// env → default). Check it first since it reflects the canonical value.
const fromCtx = ctx.persona.inputs?.[name];
if (fromCtx && String(fromCtx).trim()) return String(fromCtx).trim();
// Fall back to raw process.env for local dev outside the full runtime.
const fromEnv = process.env[spec?.env ?? name];
if (fromEnv && String(fromEnv).trim()) return String(fromEnv).trim();
// Last resort: the spec default.
const def = spec?.default;
if (def != null && String(def).trim()) return String(def).trim();
return undefined;
}

/**
* Split a comma-separated string into trimmed, non-empty entries.
*/
export function list(raw: string | undefined): string[] {
return (raw ?? '').split(',').map((s) => s.trim()).filter(Boolean);
}

/**
* Race a promise against a timeout. On timeout the timer rejects with an
* Error so the caller can catch and fall back. Always clears the timer.
*/
export async function withTimeout<T>(
p: Promise<T>,
ms: number,
label: string
): Promise<T> {
let timer: ReturnType<typeof setTimeout>;
const timeout = new Promise<never>((_, rej) => {
timer = setTimeout(() => rej(new Error(`${label} timed out after ${ms}ms`)), ms);
});
try {
return await Promise.race([p, timeout]);
} finally {
clearTimeout(timer!);
}
}

/**
* Fetch with an AbortController timeout. Returns the response on success,
* or undefined on timeout/network error (never throws). Preserves caller-
* provided signal by racing our abort against it.
*/
export async function fetchWithTimeout(
url: string,
init: RequestInit = {},
timeoutMs: number = 8_000
): Promise<Response | undefined> {
const controller = new AbortController();
const timer = setTimeout(() => controller.abort(), timeoutMs);
const signal = init.signal
? anySignal([controller.signal, init.signal])
: controller.signal;
try {
return await fetch(url, { ...init, signal });
} catch {
return undefined;
} finally {
clearTimeout(timer);
}
}
Comment on lines +57 to +74

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

In fetchWithTimeout, if the caller passes a custom AbortSignal in init.signal, it is completely overwritten and ignored by controller.signal.

To support defensive programming and prevent ignoring caller-driven aborts, we should listen to the caller's signal and abort our internal controller if the caller's signal aborts.

export async function fetchWithTimeout(
  url: string,
  init: RequestInit = {},
  timeoutMs: number = 8_000
): Promise<Response | undefined> {
  const controller = new AbortController();
  const timer = setTimeout(() => controller.abort(), timeoutMs);

  if (init.signal) {
    if (init.signal.aborted) {
      controller.abort();
    } else {
      init.signal.addEventListener('abort', () => controller.abort());
    }
  }

  try {
    return await fetch(url, { ...init, signal: controller.signal });
  } catch {
    return undefined;
  } finally {
    clearTimeout(timer);
  }
}


/** Combine multiple AbortSignals — any one firing aborts the fetch. */
function anySignal(signals: AbortSignal[]): AbortSignal {
const controller = new AbortController();
for (const sig of signals) {
if (sig.aborted) {
controller.abort(sig.reason);
return controller.signal;
}
sig.addEventListener('abort', () => controller.abort(sig.reason), { once: true });
}
return controller.signal;
}
15 changes: 15 additions & 0 deletions packages/delivery/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
export { createDelivery } from './delivery.js';
export {
resolveDeliveryTargets,
slackChannel,
telegramChat,
type DeliveryClient,
type DeliveryOptions,
type DeliveryResult,
type DeliveryTransports,
type MessageRef,
type SlackRef,
type TelegramRef
} from './types.js';

export { input, list, withTimeout, fetchWithTimeout } from './helpers.js';
Loading
Loading