feat(sdk): multiplexed subscribe, @self DM routing, stable event ids#128
Conversation
- AgentClient.subscribe(channels, onMessage): Subscription multiplexes a single websocket per agent, auto-resubscribes on reconnect, and filters the @self sentinel to DM-only delivery in the SDK. - Server resolves to: "@self" on DM send from the authenticated agentId rather than trusting client-side substitution. - WebSocket message events now carry a top-level UUID id derived deterministically from the underlying message id, so clients can dedupe across reconnects. - RelayError distinguishes rate_limited and backpressure from generic transport failures. Originally implemented as Track A of the proactive-runtime M3 workflow; the worker reported COMPLETE and ran the @relaycast/types, /sdk, and /server vitest suites green but never opened a PR. Re-verified locally: 38 + 294 + 437 tests pass on this branch. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Plus Run ID: 📒 Files selected for processing (9)
✅ Files skipped from review due to trivial changes (1)
🚧 Files skipped from review as they are similar to previous changes (5)
📝 WalkthroughWalkthroughThis PR adds deterministic stableRelaycastEventId generation and applies it to message-like events, implements handler-based SDK subscriptions with managed WebSocket channel sync and subscription handles, supports server-side ChangesHandler-based subscriptions with stable event IDs and
🎯 4 (Complex) | ⏱️ ~60 minutes
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (2)
packages/sdk-typescript/src/__tests__/errors.test.ts (1)
67-73: ⚡ Quick winAdd direct normalization tests for the two new backpressure aliases.
Line 67 covers
backpressure, butqueue_overloadedandworkspace_stream_backpressurewere also added in the map and are currently unverified in this suite.Suggested test additions
it('maps backpressure to backpressure', () => { expect(normalizeRelayErrorCode('backpressure')).toBe('backpressure'); }); + + it('maps queue_overloaded to backpressure', () => { + expect(normalizeRelayErrorCode('queue_overloaded')).toBe('backpressure'); + }); + + it('maps workspace_stream_backpressure to backpressure', () => { + expect(normalizeRelayErrorCode('workspace_stream_backpressure')).toBe('backpressure'); + });🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@packages/sdk-typescript/src/__tests__/errors.test.ts` around lines 67 - 73, Add two direct unit tests to the errors.test.ts suite verifying that normalizeRelayErrorCode maps 'queue_overloaded' and 'workspace_stream_backpressure' to 'backpressure' (similar to the existing test for 'backpressure'); locate the test block that currently asserts normalizeRelayErrorCode('backpressure') and add sibling it(...) cases for the two new aliases to ensure they're covered by the normalizeRelayErrorCode mapping.packages/sdk-typescript/src/subscription.ts (1)
1-3: ⚡ Quick winMake
channelsdeeply readonly in the public subscription contract.
readonly channels: string[]still allows mutation viapush/splice. Use a readonly array type to protect the API surface.Proposed change
export interface Subscription { - readonly channels: string[]; + readonly channels: readonly string[]; unsubscribe(): void; }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@packages/sdk-typescript/src/subscription.ts` around lines 1 - 3, The public Subscription interface exposes channels as a mutable array; change its type to a readonly array (e.g., use ReadonlyArray<string> or the readonly string[] syntax) so consumers cannot call push/splice on the returned channels. Update the Subscription interface declaration (the channels property) and any implementing classes/functions that construct or return channels to satisfy the new readonly type (convert mutable arrays to readonly when returning or cast appropriately). Ensure the unsubscribe() signature remains unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@packages/sdk-typescript/src/agent.ts`:
- Around line 265-270: The invoke function calls the async/possibly-throwing
handler onMessage without handling rejections (risking unhandled promise
rejections); update invoke (which uses matchesSubscription,
ensureRelaycastMessageEventId, and channelSet) to ensure the returned promise is
observed and failures are handled by attaching a .catch that logs/handles the
error (use this.logger.error if available or console.error) or forwards it to an
existing error handler; do not leave the promise unhandled.
In `@packages/types/src/event-id.ts`:
- Line 2: The current serialization uses parts.join(':') (assigned to variable
input) which can alias distinct tuples (e.g., ['a','b:c'] vs ['a:b','c']);
replace this unsafe join with a delimiter-safe serializer such as
JSON.stringify(parts) (or an escaping scheme) wherever parts.join(':') is used
(including the other occurrences around lines 49-59) so that each tuple maps to
a unique input string before hashing/ID generation (update the code that assigns
input and any callers that assume the old format).
---
Nitpick comments:
In `@packages/sdk-typescript/src/__tests__/errors.test.ts`:
- Around line 67-73: Add two direct unit tests to the errors.test.ts suite
verifying that normalizeRelayErrorCode maps 'queue_overloaded' and
'workspace_stream_backpressure' to 'backpressure' (similar to the existing test
for 'backpressure'); locate the test block that currently asserts
normalizeRelayErrorCode('backpressure') and add sibling it(...) cases for the
two new aliases to ensure they're covered by the normalizeRelayErrorCode
mapping.
In `@packages/sdk-typescript/src/subscription.ts`:
- Around line 1-3: The public Subscription interface exposes channels as a
mutable array; change its type to a readonly array (e.g., use
ReadonlyArray<string> or the readonly string[] syntax) so consumers cannot call
push/splice on the returned channels. Update the Subscription interface
declaration (the channels property) and any implementing classes/functions that
construct or return channels to satisfy the new readonly type (convert mutable
arrays to readonly when returning or cast appropriately). Ensure the
unsubscribe() signature remains unchanged.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro Plus
Run ID: 56e47baa-525e-43da-8399-b14443fc485a
📒 Files selected for processing (21)
README.mdopenapi.yamlpackages/sdk-typescript/src/__tests__/agent-messaging.test.tspackages/sdk-typescript/src/__tests__/agent-ws.test.tspackages/sdk-typescript/src/__tests__/errors.test.tspackages/sdk-typescript/src/agent.tspackages/sdk-typescript/src/errors.tspackages/sdk-typescript/src/event-id.tspackages/sdk-typescript/src/index.tspackages/sdk-typescript/src/subscription.tspackages/sdk-typescript/src/types.tspackages/sdk-typescript/src/ws.tspackages/server/src/engine/__tests__/wsTransform.test.tspackages/server/src/engine/dm.tspackages/server/src/engine/event-id.tspackages/server/src/engine/wsTransform.tspackages/server/src/routes/__tests__/dm.test.tspackages/server/src/routes/__tests__/fanout.test.tspackages/types/src/event-id.tspackages/types/src/events.tspackages/types/src/index.ts
|
Preview deployed!
This preview shares the staging database and will be cleaned up when the PR is merged or closed. Run E2E testsnpm run e2e -- https://pr128-api.relaycast.dev --ciOpen observer dashboard |
| this.ws.disconnect(); | ||
| this.ws = null; | ||
| } | ||
| this.activeWsChannels.clear(); | ||
| // Always send the HTTP disconnect — it works even without a WS and | ||
| // serves as the authoritative presence update. | ||
| await this.client.post('/v1/agents/disconnect', {}).catch(() => {}); | ||
| } |
There was a problem hiding this comment.
🟡 disconnect() does not clear managedSubscriptions, causing phantom channel subscriptions with dead handlers after reconnect
After disconnect() sets this.ws = null, the managedSubscriptions map is not cleared. If the client later reconnects (via a new subscribe(channels, handler) call or explicit connect()), desiredWsChannels() at packages/sdk-typescript/src/agent.ts:536-551 still includes channels from old managed subscriptions. These channels are re-subscribed on the new WebSocket, but the old handlers were registered on the previous WsClient instance (now disconnected and nulled). Events arriving for those channels are received by the client but silently discarded because no handler on the active WsClient processes them. This wastes server-side bandwidth and causes silent event loss.
Scenario trace
sub = agent.subscribe(['general'], handler)→ creates WsClient A, registers handler on Aawait agent.disconnect()→ destroys WsClient A, setsthis.ws = null, butmanagedSubscriptionsretains the entrysub2 = agent.subscribe(['dev'], handler2)→ creates WsClient B, registers handler2 on B- On WsClient B open,
desiredWsChannels()returns{'general', 'dev'}, so both are subscribed on the wire - Server sends
message.createdforgeneral→ arrives at WsClient B → only handler2's listeners fire → handler2 filters it out (it only watchesdev) → old handler was on WsClient A → event silently dropped
(Refers to lines 225-241)
Prompt for agents
In AgentClient.disconnect(), the managedSubscriptions and manualSubscriptions maps are not cleared. This means that if the client reconnects later (via connect() or a new subscribe(channels, handler) call), desiredWsChannels() still includes channels from old managed subscriptions whose handlers are registered on the now-dead WsClient instance. Events for those channels arrive but no handler fires.
The fix should clear managed subscriptions during disconnect(). For each managed subscription, call its stop functions (to clean up handlers on the old WsClient) and then clear the map. Also clear manualSubscriptions. This ensures a clean state after disconnect.
Relevant locations:
- AgentClient.disconnect() at packages/sdk-typescript/src/agent.ts:225-241
- managedSubscriptions map at packages/sdk-typescript/src/agent.ts:143
- manualSubscriptions set at packages/sdk-typescript/src/agent.ts:142
- desiredWsChannels() at packages/sdk-typescript/src/agent.ts:536-551
Note: after this fix, users who call disconnect() and then re-subscribe would need to create new subscriptions (which is the expected pattern). The auto-reconnect path (WsClient internal reconnect) is NOT affected since it preserves the same WsClient instance.
Was this helpful? React with 👍 or 👎 to provide feedback.
| export function stableRelaycastEventId(...parts: Array<string | null | undefined>): string { | ||
| const normalized = parts | ||
| .map((part) => part?.trim()) | ||
| .filter((part): part is string => typeof part === 'string' && part.length > 0); | ||
|
|
||
| return formatUuidFromWords(hashStringParts(['relaycast', ...normalized])); | ||
| } |
There was a problem hiding this comment.
🟡 stableRelaycastEventId handles empty input differently in types package vs SDK/server copies
The stableRelaycastEventId function is duplicated in three packages with inconsistent empty-input handling. The packages/types/src/event-id.ts:54-56 version has a guard for empty normalized parts that returns hash(['relaycast', 'empty-event']), while the copies in packages/sdk-typescript/src/event-id.ts:54 and packages/server/src/engine/event-id.ts:54 lack this guard and would return hash(['relaycast']) for the same input. Since @relaycast/types is already a dependency of both the SDK and server, these packages could import from @relaycast/types instead of maintaining divergent copies. The inconsistency means calling stableRelaycastEventId() with no valid parts produces different UUIDs depending on which copy is used.
| export function stableRelaycastEventId(...parts: Array<string | null | undefined>): string { | |
| const normalized = parts | |
| .map((part) => part?.trim()) | |
| .filter((part): part is string => typeof part === 'string' && part.length > 0); | |
| return formatUuidFromWords(hashStringParts(['relaycast', ...normalized])); | |
| } | |
| export function stableRelaycastEventId(...parts: Array<string | null | undefined>): string { | |
| const normalized = parts | |
| .map((part) => part?.trim()) | |
| .filter((part): part is string => typeof part === 'string' && part.length > 0); | |
| if (normalized.length === 0) { | |
| return formatUuidFromWords(hashStringParts(['relaycast', 'empty-event'])); | |
| } | |
| return formatUuidFromWords(hashStringParts(['relaycast', ...normalized])); | |
| } |
Was this helpful? React with 👍 or 👎 to provide feedback.
Summary
AgentClient.subscribe(channels, onMessage): Subscriptionmultiplexes one websocket per agent, auto-resubscribes on reconnect, and treats@selfas a DM-only filter in the high-level SDK.to: "@self"on DM send from the authenticatedagentId(no client-side substitution).idderived from the message id, so clients can dedupe across reconnects.RelayErrordistinguishesrate_limitedandbackpressurefrom generic transport failures.Origin
Implemented as Track A of the proactive-runtime M3 swarm workflow on 2026-05-12. The worker (
impl-relaycast-work-b37a7cf5) reportedOWNER_DECISION: COMPLETEto the WorkflowRunner with all three workspace test suites green, then self-terminated without committing or opening a PR. The changes have been sitting uncommitted on a local rust-SDK branch since then. This PR commits that work against currentmainafter re-verifying it locally.Test plan
npm test --workspace @relaycast/types— 38 testsnpm test --workspace @relaycast/sdk— 294 testsnpm test --workspace @relaycast/server— 437 tests['general', '@self']with two agents, verify cross-agent DMs land on@selfonly and channel posts land on the named channelevent.idis stable across a reconnect+replay sequenceRelayError.code === 'rate_limited' | 'backpressure'is surfaced when the server emits the corresponding close framesDownstream
Tracks B–G of M3 (in
cloud-runtime-runandrelayfile-adapters) consume this SDK surface and likely have the same uncommitted-but-COMPLETE status — their PRs should follow this one, with a fresh@relaycast/sdkrelease bumped after merge.🤖 Generated with Claude Code