Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 54 additions & 6 deletions packages/delivery/src/delivery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,15 @@ import {
telegramChat,
type DeliveryClient,
type DeliveryOptions,
type DeliveryProvider,
type DeliveryResult,
type DeliveryTransports,
type RelaycastRef,
type RelaycastSender,
type SlackRef,
type TelegramRef
} from './types.js';
import { defaultRelaycastSender } from './relaycast.js';

const WRITEBACK_TIMEOUT_MS = 45_000;

Expand All @@ -35,9 +39,12 @@ export function createDelivery(
ctx: WorkforceCtx,
transports?: DeliveryTransports,
/** Override which transports to target (defaults to all configured). */
onlyTargets?: ReadonlyArray<'slack' | 'telegram'>
onlyTargets?: ReadonlyArray<DeliveryProvider>
): DeliveryClient {
const allTargets = resolveDeliveryTargets(ctx);
// Slack/Telegram are config-driven (persona inputs). Relaycast is event-driven
// — it's a target only when the caller supplies a reply address in transports.
const allTargets: DeliveryProvider[] = [...resolveDeliveryTargets(ctx)];
if (transports?.relaycast?.to) allTargets.push('relaycast');
const targets = onlyTargets
? allTargets.filter((t) => (onlyTargets as readonly string[]).includes(t))
: allTargets;
Expand All @@ -60,11 +67,21 @@ export function createDelivery(
? telegramClient({ writebackTimeoutMs: 0 })
: undefined);

// Relaycast reply: address from the inbound event, client from the injected
// sender or the default env-backed one (POST /v1/dm with RELAY_API_KEY).
const relaycast = targets.includes('relaycast') && transports?.relaycast?.to
? {
to: transports.relaycast.to,
sender: transports.relaycast.sender ?? defaultRelaycastSender(ctx)
}
: undefined;

return new DeliveryClientImpl(ctx, targets, {
slackBlocking,
slackNonBlocking,
telegramBlocking,
telegramNonBlocking
telegramNonBlocking,
relaycast
});
}

Expand All @@ -73,17 +90,18 @@ interface DeliveryTransportsInternal {
slackNonBlocking?: SlackClient;
telegramBlocking?: TelegramClient;
telegramNonBlocking?: TelegramClient;
relaycast?: { to: string; sender: RelaycastSender };
}

class DeliveryClientImpl implements DeliveryClient {
readonly targets: ReadonlyArray<'slack' | 'telegram'>;
readonly targets: ReadonlyArray<DeliveryProvider>;

private ctx: WorkforceCtx;
private t: DeliveryTransportsInternal;

constructor(
ctx: WorkforceCtx,
targets: Array<'slack' | 'telegram'>,
targets: Array<DeliveryProvider>,
transports: DeliveryTransportsInternal
) {
this.ctx = ctx;
Expand All @@ -93,7 +111,7 @@ class DeliveryClientImpl implements DeliveryClient {

async send(text: string, opts?: DeliveryOptions): Promise<DeliveryResult> {
const nonBlocking = opts?.nonBlocking === true;
const refs: Array<SlackRef | TelegramRef> = [];
const refs: Array<SlackRef | TelegramRef | RelaycastRef> = [];
const errors: string[] = [];

const tasks: Promise<void>[] = [];
Expand All @@ -114,6 +132,20 @@ class DeliveryClientImpl implements DeliveryClient {
.catch((err) => { errors.push(`telegram: ${String(err)}`); })
);
}
if (target === 'relaycast') {
// Relaycast replies are a single blocking DM — there's no draft-ref /
// server-ordered threading pattern, so they don't participate in
// publish()/non-blocking sends (which promise no receipt wait).
if (nonBlocking) {
this.ctx.log?.('debug', 'delivery.relaycast.skip-nonblocking', { to: this.t.relaycast?.to });
continue;
}
tasks.push(
this.sendRelaycast(text)
.then((ref) => { if (ref) refs.push(ref); })
.catch((err) => { errors.push(`relaycast: ${String(err)}`); })
);
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

await Promise.all(tasks);
Expand Down Expand Up @@ -213,6 +245,22 @@ class DeliveryClientImpl implements DeliveryClient {
};
}

// ── Relaycast (agent-to-agent) ───────────────────────────────────────────

private async sendRelaycast(text: string): Promise<RelaycastRef | null> {
const rc = this.t.relaycast;
if (!rc) return null;
const res = await rc.sender.dm(rc.to, text);
// Treat a missing message id as a failed delivery (matches slack/telegram,
// which return null on a missing receipt) — don't report success with an
// unusable ref.
if (!res.ok || !res.messageId) {
this.ctx.log?.('warn', 'delivery.relaycast.no-receipt', { to: rc.to });
return null;
}
return { provider: 'relaycast', to: rc.to, messageId: res.messageId };
}

// ── Telegram ───────────────────────────────────────────────────────────

private async sendTelegram(
Expand Down
6 changes: 6 additions & 0 deletions packages/delivery/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,17 @@ export {
telegramChat,
type DeliveryClient,
type DeliveryOptions,
type DeliveryProvider,
type DeliveryResult,
type DeliveryTransports,
type MessageRef,
type RelaycastRef,
type RelaycastSender,
type RelaycastTarget,
type SlackRef,
type TelegramRef
} from './types.js';

export { DEFAULT_RELAYCAST_URL, resolveRelaycastUrl, defaultRelaycastSender } from './relaycast.js';

export { input, list, withTimeout, fetchWithTimeout } from './helpers.js';
88 changes: 88 additions & 0 deletions packages/delivery/src/relaycast.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import assert from 'node:assert/strict';
import test from 'node:test';

import { createDelivery } from './delivery.js';
import { resolveRelaycastUrl, DEFAULT_RELAYCAST_URL } from './relaycast.js';
import type { WorkforceCtx } from '@agentworkforce/runtime';
import type { RelaycastSender } from './types.js';

function makeCtx(inputs: Record<string, string> = {}): WorkforceCtx {
return { persona: { inputs, inputSpecs: {} }, log: () => {} } as unknown as WorkforceCtx;
}

test('DEFAULT_RELAYCAST_URL is cast.agentrelay.com', () => {
assert.equal(DEFAULT_RELAYCAST_URL, 'https://cast.agentrelay.com');
});

test('resolveRelaycastUrl: default, RELAY_BASE_URL, then RELAYCAST_URL precedence (trailing slash trimmed)', () => {
const saved = { u: process.env.RELAYCAST_URL, b: process.env.RELAY_BASE_URL };
try {
delete process.env.RELAYCAST_URL;
delete process.env.RELAY_BASE_URL;
assert.equal(resolveRelaycastUrl(), 'https://cast.agentrelay.com');

process.env.RELAY_BASE_URL = 'https://relay.example.com/';
assert.equal(resolveRelaycastUrl(), 'https://relay.example.com');

process.env.RELAYCAST_URL = 'https://cast.example.com';
assert.equal(resolveRelaycastUrl(), 'https://cast.example.com'); // RELAYCAST_URL wins
} finally {
saved.u === undefined ? delete process.env.RELAYCAST_URL : (process.env.RELAYCAST_URL = saved.u);
saved.b === undefined ? delete process.env.RELAY_BASE_URL : (process.env.RELAY_BASE_URL = saved.b);
}
});

test('relaycast target DMs the inbound sender and returns a RelaycastRef', async () => {
const sent: Array<{ to: string; text: string }> = [];
const sender: RelaycastSender = {
async dm(to, text) {
sent.push({ to, text });
return { ok: true, messageId: 'm1' };
}
};
const delivery = createDelivery(makeCtx(), { relaycast: { to: 'local-tester', sender } });

assert.deepEqual([...delivery.targets], ['relaycast']);
const res = await delivery.send('hello over relay');
assert.equal(res.ok, true);
assert.deepEqual(sent, [{ to: 'local-tester', text: 'hello over relay' }]);
assert.deepEqual(res.refs, [{ provider: 'relaycast', to: 'local-tester', messageId: 'm1' }]);
});

test('relaycast is NOT a target unless a reply address is supplied (event-driven, not config)', () => {
assert.equal(createDelivery(makeCtx(), {}).targets.includes('relaycast'), false);
// Even with slack configured, relaycast only appears when transports.relaycast.to is set.
assert.deepEqual([...createDelivery(makeCtx({ SLACK_CHANNEL: 'C1' }), {}).targets], ['slack']);
});

test('onlyTargets can scope delivery to relaycast (origin-only reply)', () => {
const sender: RelaycastSender = { async dm() { return { ok: true, messageId: 'x' }; } };
const delivery = createDelivery(
makeCtx({ SLACK_CHANNEL: 'C1' }),
{ relaycast: { to: 'peer', sender } },
['relaycast']
);
assert.deepEqual([...delivery.targets], ['relaycast']); // slack filtered out
});

test('relaycast-only send failure surfaces (matches slack/telegram all-targets-failed contract)', async () => {
const sender: RelaycastSender = { async dm() { return { ok: false }; } };
const delivery = createDelivery(makeCtx(), { relaycast: { to: 'peer', sender } });
await assert.rejects(() => delivery.send('x'), /Delivery failed to all targets/);
});

test('relaycast ok:true with no messageId is treated as a failed delivery', async () => {
const sender: RelaycastSender = { async dm() { return { ok: true }; } }; // no messageId
const delivery = createDelivery(makeCtx(), { relaycast: { to: 'peer', sender } });
await assert.rejects(() => delivery.send('x'), /Delivery failed to all targets/);
});

test('publish()/non-blocking does not invoke the relaycast sender', async () => {
let calls = 0;
const sender: RelaycastSender = { async dm() { calls++; return { ok: true, messageId: 'm' }; } };
const delivery = createDelivery(makeCtx(), { relaycast: { to: 'peer', sender } });
// relaycast is the only target → nothing delivered in non-blocking mode → throws,
// and crucially the sender is never called (no draft-ref/threading path for relay).
await assert.rejects(() => delivery.publish('x'), /Delivery failed to all targets/);
assert.equal(calls, 0);
});
82 changes: 82 additions & 0 deletions packages/delivery/src/relaycast.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import type { WorkforceCtx } from '@agentworkforce/runtime';
import type { RelaycastSender } from './types.js';
Comment on lines +1 to +2

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Import 'fetchWithTimeout' from './helpers.js' to enable making the relaycast HTTP request with a timeout, preventing potential hanging.

Suggested change
import type { WorkforceCtx } from '@agentworkforce/runtime';
import type { RelaycastSender } from './types.js';
import type { WorkforceCtx } from '@agentworkforce/runtime';
import type { RelaycastSender } from './types.js';
import { fetchWithTimeout } from './helpers.js';

import { fetchWithTimeout } from './helpers.js';

/**
* Canonical relaycast gateway — SINGLE SOURCE OF TRUTH. Change this one
* constant to move the default gateway. Per-env override via `RELAYCAST_URL`
* (preferred) or `RELAY_BASE_URL`.
*/
export const DEFAULT_RELAYCAST_URL = 'https://cast.agentrelay.com';

/** Resolve the relaycast base URL: RELAYCAST_URL > RELAY_BASE_URL > default. */
export function resolveRelaycastUrl(): string {
const raw =
process.env.RELAYCAST_URL?.trim() ||
process.env.RELAY_BASE_URL?.trim() ||
DEFAULT_RELAYCAST_URL;
return raw.replace(/\/+$/, '');
}

/**
* Resolve the token to authenticate relaycast agent actions (DMs). `/v1/dm` is
* secured with the AGENT token, not the workspace key — so prefer the agent
* token and only fall back to the workspace `RELAY_API_KEY` (which lets tests
* and single-identity boxes still work). Mirrors the runtime's agent-token
* resolution order.
*/
function resolveRelayAgentToken(): string | undefined {
return (
process.env.WORKFORCE_AGENT_TOKEN?.trim() ||
process.env.RELAY_AGENT_TOKEN?.trim() ||
process.env.RELAY_API_KEY?.trim() ||
undefined
);
}

/**
* Default relaycast sender — DMs a peer agent via `POST /v1/dm` using the box's
* injected agent token. Bounded by `fetchWithTimeout` and never throws: returns
* `{ ok: false }` (logged) on missing token, timeout, or non-2xx, so a relay
* reply degrades gracefully rather than crashing the handler.
*/
export function defaultRelaycastSender(ctx: WorkforceCtx): RelaycastSender {
const token = resolveRelayAgentToken();
const baseUrl = resolveRelaycastUrl();
return {
async dm(to: string, text: string): Promise<{ ok: boolean; messageId?: string }> {
if (!token) {
ctx.log?.('warn', 'delivery.relaycast.no-token', {
reason: 'no agent token (WORKFORCE_AGENT_TOKEN/RELAY_AGENT_TOKEN/RELAY_API_KEY) in the agent box'
});
return { ok: false };
}
const res = await fetchWithTimeout(`${baseUrl}/v1/dm`, {
method: 'POST',
headers: { authorization: `Bearer ${token}`, 'content-type': 'application/json' },
body: JSON.stringify({ to, text })
});
if (!res) {
ctx.log?.('warn', 'delivery.relaycast.send-failed', { to, reason: 'timeout or network error' });
return { ok: false };
}
if (!res.ok) {
ctx.log?.('warn', 'delivery.relaycast.send-failed', { to, status: res.status });
return { ok: false };
}
// Relaycast REST wraps success as `{ ok, data }`; the /dm message id lives
// under `data.message.id` (or legacy `data.id`). Unwrap before reading.
const json = (await res.json().catch(() => null)) as Record<string, unknown> | null;
const data =
json && typeof json.data === 'object' && json.data !== null
? (json.data as Record<string, unknown>)
: json;
const message =
data && typeof data.message === 'object' && data.message !== null
? (data.message as Record<string, unknown>)
: undefined;
const rawId = message?.id ?? data?.messageId ?? data?.id;
return { ok: true, messageId: rawId != null ? String(rawId) : undefined };
}
};
}
45 changes: 43 additions & 2 deletions packages/delivery/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,18 @@ export interface TelegramRef {
messageId: string;
}

export type MessageRef = SlackRef | TelegramRef;
export interface RelaycastRef {
provider: 'relaycast';
/** The agent the reply was DM'd to (the inbound message's sender). */
to: string;
/** Delivered relaycast message id, when the send returns one. */
messageId: string;
}

export type MessageRef = SlackRef | TelegramRef | RelaycastRef;

/** A delivery target provider. */
export type DeliveryProvider = 'slack' | 'telegram' | 'relaycast';

// ── delivery result ─────────────────────────────────────────────────────

Expand Down Expand Up @@ -49,13 +60,43 @@ export interface DeliveryOptions {
nonBlocking?: boolean;
}

// ── relaycast (agent-to-agent) transport ──────────────────────────────────

/**
* Minimal seam for sending a relaycast DM back to a peer agent. The default
* implementation posts `POST /v1/dm` with the box's injected `RELAY_API_KEY`;
* tests inject a mock. Unlike Slack/Telegram (config-driven via persona
* inputs), the relaycast reply address is EVENT-driven — `to` is the inbound
* message's sender, supplied by the caller.
*/
export interface RelaycastSender {
dm(to: string, text: string): Promise<{ ok: boolean; messageId?: string }>;
}

/**
* Relaycast target config. Present iff the agent is replying to a relay DM:
* `to` is the inbound sender to reply to; `sender` overrides the default
* env-backed client (for tests).
*/
export interface RelaycastTarget {
to: string;
sender?: RelaycastSender;
}

// ── injectable transport seam (for tests) ────────────────────────────────

export interface DeliveryTransports {
/** Injected Slack client (used for both blocking and non-blocking paths). */
slack?: SlackClient;
/** Injected Telegram client (used for both blocking and non-blocking paths). */
telegram?: TelegramClient;
/**
* Relaycast reply target. When set, `relaycast` becomes a delivery target
* and `send()`/`publish()` DM the inbound sender over the relay. Event-driven
* (the `to` address comes from the inbound message), so it is NOT discovered
* by `resolveDeliveryTargets(ctx)`.
*/
relaycast?: RelaycastTarget;
}

// ── delivery client ──────────────────────────────────────────────────────
Expand Down Expand Up @@ -83,7 +124,7 @@ export interface DeliveryClient {
publish(text: string): Promise<DeliveryResult>;

/** Which providers are configured. */
readonly targets: ReadonlyArray<'slack' | 'telegram'>;
readonly targets: ReadonlyArray<DeliveryProvider>;
}

// ── configuration discovery ──────────────────────────────────────────────
Expand Down
Loading