From be64f3ec6f9a4d89b1da84ca8f250b9e01248dde Mon Sep 17 00:00:00 2001 From: Writeback Reliability Bot Date: Wed, 6 May 2026 10:59:54 +0200 Subject: [PATCH 1/4] docs: add onWrite trigger SDK design MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Spec for onWrite(pattern, handler) — a thin SDK wrapper over the existing WebSocket event stream that lets customers react to file changes in two lines of code. v1 covers TypeScript + Python SDKs, glob path matching, shared WS connection, lazy reconnect, error isolation. Outbound webhook subscriptions and replay/DLQ are explicitly out of scope for v1. This is the customer-facing primitive that puts Relayfile architecturally above Composio (reactive only), Pipedream (GUI triggers), and Nango (webhooks but you write the handler service). Co-Authored-By: Claude Opus 4.7 (1M context) --- docs/onwrite-trigger-design.md | 314 +++++++++++++++++++++++++++++++++ 1 file changed, 314 insertions(+) create mode 100644 docs/onwrite-trigger-design.md diff --git a/docs/onwrite-trigger-design.md b/docs/onwrite-trigger-design.md new file mode 100644 index 00000000..bacf6504 --- /dev/null +++ b/docs/onwrite-trigger-design.md @@ -0,0 +1,314 @@ +# onWrite Trigger SDK + +**Status:** Proposed +**Affects:** `packages/sdk/typescript`, `packages/sdk/python`, `packages/core` (types) +**Depends on:** existing WebSocket event stream (`RelayFileClient.connectWebSocket`), file-observer event feed +**Pairs with:** `sdk-setup-client.md`, `relayfile-v1-spec.md` + +--- + +## Problem + +Today, a customer who wants to react to a relayfile change must: +1. Open a `RelayFileClient.connectWebSocket()` connection, +2. Subscribe to the raw event stream, +3. Filter events by path manually, +4. Handle reconnection, backoff, and dispatch errors themselves. + +This is several dozen lines of glue code per consumer, and there is no canonical surface — every team writes their own wrapper. The customer-facing pitch ("trigger an agent when a file changes") doesn't have a one-liner SDK call. + +It also means the most differentiated half of the Relayfile pitch — *event-driven, not just synced* — has no developer surface. Composio is reactive (no triggers). Pipedream has triggers but only via GUI. Nango has webhooks but you still write the handler service. The thing that puts Relayfile architecturally above all three is a customer-facing primitive that doesn't currently exist as one. + +--- + +## Goal + +A developer can subscribe to changes on a path pattern in two lines of code, with no infrastructure to host, no webhook URL to register, no reconnection logic to write. + +```ts +import { onWrite } from '@relayfile/sdk' + +onWrite('/notion/pages/calls/*/transcript', async (event) => { + const callId = event.path.split('/')[4] + await spawnFollowupAgents(callId) +}) +``` + +The same shape in Python: + +```python +from relayfile import on_write + +@on_write('/notion/pages/calls/*/transcript') +async def handle(event): + call_id = event.path.split('/')[4] + await spawn_followup_agents(call_id) +``` + +That snippet — and only that snippet — is the public surface for the v1. + +--- + +## Non-Goals (v1) + +- **Outbound webhook subscription registry on the server.** v1 uses the existing WebSocket event stream and assumes a long-running consumer process. Serverless / Lambda-shaped consumers are a v2 concern. +- **Replay / dead-letter queue.** v1 logs handler errors via `client.recordHandlerError`; durable replay is v2. +- **Cross-workspace subscriptions.** v1 scopes to one workspace per `onWrite` registration (the workspace bound to the SDK client). +- **Event deduplication / exactly-once semantics.** v1 is at-least-once. Handler should be idempotent. +- **GUI for subscription management.** v1 is code-only. file-observer can list active WS subscriptions but not configure them. + +--- + +## API surface + +### TypeScript + +```ts +// packages/sdk/typescript/src/onWrite.ts + +import type { RelayFileClient } from './client' +import type { WriteEvent } from '@relayfile/core' + +export type OnWriteHandler = (event: WriteEvent) => void | Promise + +export interface OnWriteOptions { + /** Client to subscribe through. Defaults to the module-level singleton. */ + client?: RelayFileClient + /** Operations to subscribe to. Defaults to ['create', 'update']. */ + operations?: Array + /** AbortSignal to stop the subscription. */ + signal?: AbortSignal +} + +/** + * Subscribe to write events on a path pattern. Returns an unsubscribe function. + * + * Pattern syntax (glob-like, single-segment wildcards only): + * /notion/pages/calls/* /transcript (* matches one path segment) + * /linear/issues/** (** matches any number of segments) + * /github/repos/acme/api/pulls/* (* matches one segment) + * + * Multiple wildcards are supported. No regex. + */ +export function onWrite( + pattern: string, + handler: OnWriteHandler, + options?: OnWriteOptions, +): () => void +``` + +Behavior contract: +- **Returns synchronously** with an unsubscribe function. The actual WS connection is established lazily on the first registration if the client doesn't already have one. +- **Connection is shared** across all `onWrite` calls on the same client — one socket, many registrations. The dispatcher fans events out to matching handlers in registration order. +- **Handler errors do not crash** the dispatcher. They are caught, logged via `client.recordHandlerError({ pattern, path, error })`, and the next handler runs. +- **Backpressure**: handlers run sequentially per registration (no per-handler queueing in v1). If a handler is slow, subsequent events for that pattern queue in memory. v2 adds a configurable concurrency limit and bounded queue. +- **Reconnection**: on WS disconnect, the SDK reconnects with exponential backoff (capped at 30s). On reconnect, the SDK resubscribes; missed events during the gap are *not* replayed in v1 (the server's existing replay-on-resume is the upper bound). Document this clearly. +- **Filtering**: pattern matching happens client-side in v1. v2 will push the pattern to the server so the WS only receives matching events. + +### Python + +Decorator + non-decorator forms: + +```python +# packages/sdk/python/relayfile/on_write.py +from typing import Callable, Awaitable, Iterable, Optional +from .types import WriteEvent +from .client import RelayFileClient + +OnWriteHandler = Callable[[WriteEvent], Awaitable[None]] + +def on_write( + pattern: str, + handler: Optional[OnWriteHandler] = None, + *, + client: Optional[RelayFileClient] = None, + operations: Iterable[str] = ('create', 'update'), +): + """Subscribe to write events on a path pattern. + + Usable as decorator or function: + + @on_write('/notion/pages/calls/*/transcript') + async def handle(event): ... + + on_write('/linear/issues/*', handle) + """ + ... +``` + +Implementation mirrors the TS version: shared WS, lazy connection, client-side glob matching, error isolation, reconnect with backoff. + +--- + +## Path-pattern matching + +Single grammar, both languages: + +| Token | Matches | +|---|---| +| Literal segment (`calls`) | Exactly that segment | +| `*` | Exactly one path segment, any characters | +| `**` | Zero or more path segments (greedy) | +| Trailing `/` | Optional in pattern; ignored in path | + +**Examples:** + +| Pattern | Path | Match? | +|---|---|---| +| `/notion/pages/calls/*/transcript` | `/notion/pages/calls/2026-05-08/transcript` | ✅ | +| `/notion/pages/calls/*/transcript` | `/notion/pages/calls/2026-05-08/notes/transcript` | ❌ (`*` is single-segment) | +| `/linear/issues/**` | `/linear/issues/PROJ-441/comments/c-1` | ✅ | +| `/github/repos/*/*/pulls/*` | `/github/repos/acme/api/pulls/42` | ✅ | +| `/hubspot/deals/*/stage` | `/hubspot/deals/4471/stage` | ✅ | + +`**` is convenient but expensive for client-side filtering — document that v2 server-side filtering will short-circuit `**` patterns more efficiently. + +No regex. No alternation (`{a,b}`). No character classes. If a customer needs them, they filter inside the handler. + +Reference impl: a small recursive matcher (~30 LOC) that walks segments. Library options (`micromatch`, `picomatch`, Python `fnmatch`) are tempting but bring weight; this rule set is simple enough to own. + +--- + +## Event payload + +Add to `@relayfile/core` (or wherever `WriteEvent` lives today — confirm against existing WS message schema and reuse): + +```ts +export interface WriteEvent { + /** Workspace the change happened in. */ + workspaceId: string + /** Canonical VFS path. */ + path: string + /** What happened. */ + operation: 'create' | 'update' | 'delete' + /** New revision id. */ + revision: string + /** Previous revision id (null on create). */ + previousRevision: string | null + /** ISO 8601 timestamp. */ + timestamp: string + /** Where the write originated. */ + source: 'webhook' | 'agent' | 'sync' | 'api' | 'cli' + /** + * Parsed contents for json/yaml files when small enough (<256 KB). + * For larger files or non-parseable formats, undefined — handler must + * `client.readFile(path)` to fetch. + */ + value?: unknown + /** Identity that performed the write (agent name, user id, sync provider). */ + actor?: { type: 'agent' | 'user' | 'system'; id: string } +} +``` + +Reusing the existing internal event schema (whatever WS emits today) is preferred — only add fields that aren't already there. Audit `packages/file-observer` and `packages/core` first. + +--- + +## Error model + +Three failure modes, each with explicit handling: + +1. **Handler throws.** Caught by the dispatcher. Recorded via `client.recordHandlerError({ pattern, path, error, retryable: false })`. Other handlers for the same event still run. The handler is *not* automatically retried (idempotency is the customer's responsibility). + +2. **WS disconnect.** Reconnect with exponential backoff (1s → 2s → 4s → 8s → 16s → 30s, then steady at 30s). Emit a `client.on('reconnecting', ...)` event so file-observer-style consumers can show connection state. Resubscribe on connect. + +3. **Pattern is invalid.** Throw synchronously at registration time, not on first event. Validate up-front (no `**` in middle of pattern unless trailing, no empty segments, no leading non-`/`). + +--- + +## Concurrency + +v1 — sequential per pattern, parallel across patterns. If two handlers are registered on the same pattern they run in registration order, sequentially. + +```ts +onWrite('/x/*', a) // runs first +onWrite('/x/*', b) // runs after a completes for the same event +onWrite('/y/*', c) // runs in parallel with the /x/* chain +``` + +Rationale: most customers register one handler per pattern; sequential within a pattern preserves observable order; cross-pattern parallelism gives concurrency where it's safe. + +v2 may add `{ concurrency: number }` per registration. + +--- + +## Observability + +Each registered handler is reported up to file-observer (or any future dashboard) via the existing client telemetry channel: + +- `subscription.registered` — pattern, registered-at +- `subscription.event-dispatched` — pattern, path, latency-from-write +- `subscription.handler-error` — pattern, path, error message, error type +- `subscription.reconnect` — gap duration, missed-event count (if knowable) + +These should reuse whatever telemetry the WebSocket layer already emits — don't add a new channel if the existing event feed can carry them. + +--- + +## Implementation plan + +### Phase 1 — TypeScript SDK (~2 days) + +1. Confirm the existing `WriteEvent` shape on the WS — reuse, don't reinvent. +2. Implement `pathMatches(pattern, path)` with the grammar above. Unit tests covering the table in the §Path-pattern matching section. +3. Implement `onWrite()` with lazy connection, shared dispatcher, reconnect, error isolation. +4. Add `client.recordHandlerError()` if it doesn't exist (likely just a logger call). +5. Tests: + - Glob matching unit tests + - Dispatcher fan-out (multiple registrations, multiple events) + - Reconnect behavior (with mock WS) + - Handler error doesn't break dispatcher +6. Export from `@relayfile/sdk` index. +7. Add a 30-line example to `packages/sdk/typescript/examples/onWrite.ts`. + +### Phase 2 — Python SDK (~1 day) + +Mirror Phase 1. Reuse the same matcher rules (port the implementation; same test cases). + +### Phase 3 — Docs + examples (~0.5 day) + +- Update `docs/sdk-improvements.md` with `onWrite` entry. +- Add a recipe to `docs/guides/` — "Trigger an agent when a customer call lands in Notion." +- Update the homepage hello-world to use `onWrite` instead of polling. + +### Phase 4 (post-v1) — server-side outbound webhooks + +For serverless consumers. Design separately. Out of scope here. + +--- + +## Acceptance + +A consumer can paste this into a Node script, run it, and receive events: + +```ts +import { RelayFileClient, onWrite } from '@relayfile/sdk' + +const client = new RelayFileClient({ token: process.env.RELAYFILE_TOKEN! }) + +onWrite('/notion/pages/calls/*/transcript', async (event) => { + console.log('call transcript saved:', event.path, 'rev', event.revision) +}, { client }) + +// Process stays alive on the WS subscription. +``` + +Manually save a Notion page that syncs to that path; the handler logs the event within ~2s of the WS receive (no retry storms, no missed reconnects after a `kill -STOP`/`kill -CONT` cycle, no crash on a thrown handler). + +--- + +## Open questions + +1. **Reuse existing WS event types or define new ones?** Audit `packages/core` and `packages/sdk/typescript` for the current shape. If the WS already emits a structured `FileEvent`, alias `WriteEvent` to it (or extend). Don't proliferate types. +2. **Should `delete` be in default operations?** Default I propose is `['create', 'update']`. Delete-as-trigger has different semantics (file no longer readable from the handler) and is rarer. Make it opt-in. +3. **Bound or unbounded handler queue per pattern?** v1 unbounded with a documented memory caveat. v2 add a configurable cap. Decide before merge. +4. **`onWrite` vs `onChange`?** "write" matches the operation taxonomy in the WS schema today; "change" is friendlier for non-engineer readers. Pick one and stick. Recommendation: `onWrite` (matches `relayfile:fs:write:*` scopes in relayauth — symmetry). +5. **Where does the example live?** `packages/sdk/typescript/examples/` exists for `sdk-setup-client.md`. Same place. + +--- + +## Risk + +The biggest risk is that this becomes the most-demoed primitive in the SDK and we under-spec the failure modes. Customers will copy the two-line example and put it in a Lambda where it can't hold a WS open, and then complain. The mitigation is two-fold: (1) Phase 4 (outbound webhooks) needs to be on the public roadmap from day one of `onWrite` shipping, and (2) the docs lead with "this is for long-running consumers; for serverless, use [forthcoming webhook subscriptions]." + +A secondary risk: if `**` patterns are common, client-side filtering of high-volume workspaces (millions of events/day) will saturate the WS bandwidth. Track WS bytes-received per consumer in file-observer telemetry; if any consumer exceeds X MB/min, recommend they tighten their pattern or wait for v2 server-side filtering. From 0c84fabfecf2abed10120310d0f444e11d881b8c Mon Sep 17 00:00:00 2001 From: Writeback Reliability Bot Date: Wed, 6 May 2026 14:04:30 +0200 Subject: [PATCH 2/4] feat(sdk): add onWrite/on_write trigger primitive MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Ships the SDK-level onWrite primitive specified in docs/onwrite-trigger-design.md for both TypeScript and Python clients. Two-line subscribe to file changes: onWrite('/notion/pages/calls/*/transcript', async (event) => { ... }) @on_write('/notion/pages/calls/*/transcript') def handle(event): ... v1 covers: glob path matching with `*` (single segment) and trailing `**` (deep), shared lazy WebSocket per client, reconnect backoff (1s→2s→4s→8s→16s→30s capped), per-pattern sequential dispatch, and handler error isolation that records to `client.recordHandlerError` (or logs) and never propagates back into the event loop. Core (`@relayfile/core`): - `WriteEvent`, `WriteEventOperation`, `WriteEventSource`, `WriteEventActor` types — re-exported by both SDK packages. TypeScript (`@relayfile/sdk`): - `onWrite(pattern, handler, options)` and `pathMatches(pattern, path)`. - `OnWriteDispatcher` shares a single `RelayFileSync` per client, lazily starts/stops it when registrations come and go. - 10 vitest tests covering matcher, fan-out, error isolation, reconnect backoff schedule. Python (`relayfile`): - `on_write` (callable + decorator) and `path_matches`. - Per-client `_OnWriteDispatcher` with a daemon thread, identical reconnect schedule, per-pattern lock for sequential dispatch, awaits coroutine handlers via `asyncio.run`. - 5 pytest tests; pyproject adds `[tool.pytest.ini_options] pythonpath`. Examples and tests demonstrate three real surfaces: Notion call transcripts, Linear issue subtrees, GitHub PR file events. Out of scope for v1 (called out in the design doc): outbound webhook subscriptions, replay/DLQ, server-side filtering. Co-Authored-By: Claude Opus 4.7 (1M context) --- packages/core/src/events.ts | 20 + packages/sdk/python/pyproject.toml | 3 + packages/sdk/python/src/relayfile/__init__.py | 3 + packages/sdk/python/src/relayfile/on_write.py | 362 ++++++++++++++++++ packages/sdk/python/src/relayfile/types.py | 19 + packages/sdk/python/tests/test_on_write.py | 222 +++++++++++ packages/sdk/typescript/examples/onWrite.ts | 45 +++ packages/sdk/typescript/src/index.ts | 9 + packages/sdk/typescript/src/onWrite.test.ts | 210 ++++++++++ packages/sdk/typescript/src/onWrite.ts | 309 +++++++++++++++ 10 files changed, 1202 insertions(+) create mode 100644 packages/sdk/python/src/relayfile/on_write.py create mode 100644 packages/sdk/python/tests/test_on_write.py create mode 100644 packages/sdk/typescript/examples/onWrite.ts create mode 100644 packages/sdk/typescript/src/onWrite.test.ts create mode 100644 packages/sdk/typescript/src/onWrite.ts diff --git a/packages/core/src/events.ts b/packages/core/src/events.ts index 9ba49d0a..b17303ae 100644 --- a/packages/core/src/events.ts +++ b/packages/core/src/events.ts @@ -19,6 +19,26 @@ export interface CreateEventInput { timestamp?: string; } +export type WriteEventOperation = "create" | "update" | "delete"; +export type WriteEventSource = "webhook" | "agent" | "sync" | "api" | "cli"; + +export interface WriteEventActor { + type: "agent" | "user" | "system"; + id: string; +} + +export interface WriteEvent { + workspaceId: string; + path: string; + operation: WriteEventOperation; + revision: string; + previousRevision: string | null; + timestamp: string; + source: WriteEventSource; + value?: unknown; + actor?: WriteEventActor; +} + export function createEvent(storage: StorageAdapter, input: CreateEventInput): EventRow { const event: EventRow = { eventId: storage.nextEventId(), diff --git a/packages/sdk/python/pyproject.toml b/packages/sdk/python/pyproject.toml index 7798e46b..b7440d92 100644 --- a/packages/sdk/python/pyproject.toml +++ b/packages/sdk/python/pyproject.toml @@ -14,5 +14,8 @@ dependencies = ["httpx>=0.27,<1"] [project.optional-dependencies] dev = ["pytest>=8", "pytest-asyncio>=0.24", "respx>=0.22"] +[tool.pytest.ini_options] +pythonpath = ["src"] + [tool.hatch.build.targets.wheel] packages = ["src/relayfile"] diff --git a/packages/sdk/python/src/relayfile/__init__.py b/packages/sdk/python/src/relayfile/__init__.py index 5e8b67f4..cb866732 100644 --- a/packages/sdk/python/src/relayfile/__init__.py +++ b/packages/sdk/python/src/relayfile/__init__.py @@ -13,6 +13,7 @@ WebhookInput, compute_canonical_path, ) +from .on_write import on_write, path_matches __all__ = [ "RelayFileClient", @@ -28,4 +29,6 @@ "ListProviderFilesOptions", "WatchProviderEventsOptions", "compute_canonical_path", + "on_write", + "path_matches", ] diff --git a/packages/sdk/python/src/relayfile/on_write.py b/packages/sdk/python/src/relayfile/on_write.py new file mode 100644 index 00000000..74237cfd --- /dev/null +++ b/packages/sdk/python/src/relayfile/on_write.py @@ -0,0 +1,362 @@ +from __future__ import annotations + +import asyncio +import json +import logging +import os +import threading +import time +import weakref +from collections.abc import Callable, Iterable +from typing import Any, Protocol, TypeVar +from urllib.parse import quote + +from .client import DEFAULT_RELAYFILE_BASE_URL, RelayFileClient +from .types import WriteEvent + +OnWriteHandler = Callable[[WriteEvent], Any] +Unsubscribe = Callable[[], None] +THandler = TypeVar("THandler", bound=OnWriteHandler) + +RECONNECT_DELAYS_SECONDS = (1.0, 2.0, 4.0, 8.0, 16.0, 30.0) +DEFAULT_OPERATIONS = ("create", "update") + + +class OnWriteSocket(Protocol): + def recv(self) -> str: ... + def close(self) -> None: ... + + +WebSocketFactory = Callable[[str], OnWriteSocket] +SleepFn = Callable[[float], None] + +_dispatchers: weakref.WeakKeyDictionary[RelayFileClient, "_OnWriteDispatcher"] = weakref.WeakKeyDictionary() +_default_client: RelayFileClient | None = None + + +def path_matches(pattern: str, path: str) -> bool: + pattern_segments = _normalize_pattern(pattern) + path_segments = _normalize_path(path) + return _match_segments(pattern_segments, path_segments) + + +def on_write( + pattern: str, + handler: OnWriteHandler | None = None, + *, + client: RelayFileClient | None = None, + workspace_id: str | None = None, + operations: Iterable[str] = DEFAULT_OPERATIONS, + base_url: str | None = None, + token: str | None = None, + websocket_factory: WebSocketFactory | None = None, + _sleep: SleepFn = time.sleep, +) -> Unsubscribe | Callable[[THandler], THandler]: + """Subscribe to write events on a path pattern. + + Usable as a decorator or as a direct function call. + """ + + normalized_pattern = "/" + "/".join(_normalize_pattern(pattern)) + operation_set = set(operations) + invalid_operations = operation_set - {"create", "update", "delete"} + if invalid_operations: + raise ValueError(f"Invalid on_write operation: {sorted(invalid_operations)[0]}") + + def register(actual_handler: THandler) -> THandler: + _register( + normalized_pattern, + actual_handler, + client=client, + workspace_id=workspace_id, + operations=operation_set, + base_url=base_url, + token=token, + websocket_factory=websocket_factory, + sleep=_sleep, + ) + return actual_handler + + if handler is None: + return register + + return _register( + normalized_pattern, + handler, + client=client, + workspace_id=workspace_id, + operations=operation_set, + base_url=base_url, + token=token, + websocket_factory=websocket_factory, + sleep=_sleep, + ) + + +def _register( + pattern: str, + handler: OnWriteHandler, + *, + client: RelayFileClient | None, + workspace_id: str | None, + operations: set[str], + base_url: str | None, + token: str | None, + websocket_factory: WebSocketFactory | None, + sleep: SleepFn, +) -> Unsubscribe: + if not callable(handler): + raise TypeError("on_write handler must be callable") + + resolved_client = client or _get_default_client() + resolved_workspace_id = workspace_id or os.getenv("RELAYFILE_WORKSPACE_ID") + if not resolved_workspace_id: + raise ValueError("on_write requires workspace_id or RELAYFILE_WORKSPACE_ID") + + dispatcher = _dispatchers.get(resolved_client) + if dispatcher is None: + dispatcher = _OnWriteDispatcher(resolved_client) + _dispatchers[resolved_client] = dispatcher + + return dispatcher.register( + pattern, + handler, + workspace_id=resolved_workspace_id, + operations=operations, + base_url=base_url, + token=token, + websocket_factory=websocket_factory, + sleep=sleep, + ) + + +class _Registration: + def __init__(self, registration_id: int, pattern: str, operations: set[str], handler: OnWriteHandler) -> None: + self.id = registration_id + self.pattern = pattern + self.operations = operations + self.handler = handler + + +class _OnWriteDispatcher: + def __init__(self, client: RelayFileClient) -> None: + self._client = client + self._registrations: list[_Registration] = [] + self._lock = threading.RLock() + self._pattern_locks: dict[str, threading.Lock] = {} + self._next_id = 1 + self._socket: OnWriteSocket | None = None + self._thread: threading.Thread | None = None + self._stop = threading.Event() + + def register( + self, + pattern: str, + handler: OnWriteHandler, + *, + workspace_id: str, + operations: set[str], + base_url: str | None, + token: str | None, + websocket_factory: WebSocketFactory | None, + sleep: SleepFn, + ) -> Unsubscribe: + with self._lock: + registration = _Registration(self._next_id, pattern, operations, handler) + self._next_id += 1 + self._registrations.append(registration) + if self._thread is None: + self._start( + workspace_id=workspace_id, + base_url=base_url, + token=token, + websocket_factory=websocket_factory, + sleep=sleep, + ) + + def unsubscribe() -> None: + self.unregister(registration.id) + + return unsubscribe + + def unregister(self, registration_id: int) -> None: + with self._lock: + self._registrations = [registration for registration in self._registrations if registration.id != registration_id] + if not self._registrations: + self._stop.set() + if self._socket is not None: + self._socket.close() + + def _start( + self, + *, + workspace_id: str, + base_url: str | None, + token: str | None, + websocket_factory: WebSocketFactory | None, + sleep: SleepFn, + ) -> None: + factory = websocket_factory or _default_websocket_factory + resolved_token = token or os.getenv("RELAYFILE_TOKEN") + if not resolved_token: + raise ValueError("on_write requires token or RELAYFILE_TOKEN") + + url = _build_websocket_url(base_url or os.getenv("RELAYFILE_BASE_URL") or DEFAULT_RELAYFILE_BASE_URL, workspace_id, resolved_token) + self._thread = threading.Thread( + target=self._run, + kwargs={"url": url, "factory": factory, "sleep": sleep}, + name="relayfile-on-write", + daemon=True, + ) + self._thread.start() + + def _run(self, *, url: str, factory: WebSocketFactory, sleep: SleepFn) -> None: + reconnect_attempt = 0 + while not self._stop.is_set(): + try: + self._socket = factory(url) + while not self._stop.is_set(): + raw = self._socket.recv() + if raw: + self._dispatch_raw(raw) + reconnect_attempt = 0 + except Exception: + if self._stop.is_set(): + return + delay = RECONNECT_DELAYS_SECONDS[min(reconnect_attempt, len(RECONNECT_DELAYS_SECONDS) - 1)] + reconnect_attempt += 1 + sleep(delay) + finally: + self._socket = None + + def _dispatch_raw(self, raw: str) -> None: + try: + payload = json.loads(raw) + except json.JSONDecodeError: + return + if not isinstance(payload, dict): + return + + event = _to_write_event(payload) + if event is None: + return + + with self._lock: + registrations = list(self._registrations) + + for registration in registrations: + if event.operation not in registration.operations or not path_matches(registration.pattern, event.path): + continue + pattern_lock = self._pattern_locks.setdefault(registration.pattern, threading.Lock()) + with pattern_lock: + self._run_handler(registration, event) + + def _run_handler(self, registration: _Registration, event: WriteEvent) -> None: + try: + result = registration.handler(event) + if hasattr(result, "__await__"): + asyncio.run(result) + except Exception as exc: + self._record_handler_error(registration.pattern, event.path, exc) + + def _record_handler_error(self, pattern: str, path: str, error: Exception) -> None: + payload = {"pattern": pattern, "path": path, "error": error, "retryable": False} + recorder = getattr(self._client, "record_handler_error", None) or getattr(self._client, "recordHandlerError", None) + if callable(recorder): + result = recorder(payload) + if hasattr(result, "__await__"): + asyncio.run(result) + return + logging.getLogger(__name__).exception("Relayfile on_write handler error", exc_info=error) + + +def _normalize_pattern(pattern: str) -> list[str]: + if not isinstance(pattern, str) or not pattern: + raise ValueError("on_write pattern must be a non-empty string") + if not pattern.startswith("/"): + raise ValueError("on_write pattern must start with '/'") + if "//" in pattern: + raise ValueError("on_write pattern cannot contain empty path segments") + segments = _normalize_path(pattern) + if "**" in segments and segments.index("**") != len(segments) - 1: + raise ValueError("on_write pattern only supports '**' as the trailing segment") + return segments + + +def _normalize_path(path: str) -> list[str]: + normalized = path if path.startswith("/") else f"/{path}" + trimmed = normalized.rstrip("/") + if not trimmed: + return [] + return [segment for segment in trimmed.split("/") if segment] + + +def _match_segments(pattern: list[str], path: list[str]) -> bool: + if pattern and pattern[-1] == "**": + prefix = pattern[:-1] + return len(path) >= len(prefix) and all(segment == "*" or segment == path[index] for index, segment in enumerate(prefix)) + return len(pattern) == len(path) and all(segment == "*" or segment == path[index] for index, segment in enumerate(pattern)) + + +def _to_write_event(payload: dict[str, Any]) -> WriteEvent | None: + event_type = payload.get("type") + operation = _operation_from_type(event_type) + path = payload.get("path") + if operation is None or not isinstance(path, str): + return None + return WriteEvent( + workspace_id=str(payload.get("workspaceId") or payload.get("workspace_id") or ""), + path=path, + operation=operation, + revision=str(payload.get("revision") or ""), + previous_revision=payload.get("previousRevision") or payload.get("previous_revision"), + timestamp=str(payload.get("timestamp") or payload.get("ts") or ""), + source=str(payload.get("source") or _source_from_origin(payload.get("origin"))), + value=payload.get("value"), + actor=payload.get("actor"), + ) + + +def _operation_from_type(event_type: Any) -> str | None: + if event_type == "file.created": + return "create" + if event_type == "file.updated": + return "update" + if event_type == "file.deleted": + return "delete" + return None + + +def _source_from_origin(origin: Any) -> str: + if origin == "agent_write": + return "agent" + if origin == "provider_sync": + return "sync" + return "api" + + +def _build_websocket_url(base_url: str, workspace_id: str, token: str) -> str: + clean_base = base_url.rstrip("/") + if clean_base.startswith("https://"): + clean_base = "wss://" + clean_base[len("https://") :] + elif clean_base.startswith("http://"): + clean_base = "ws://" + clean_base[len("http://") :] + return f"{clean_base}/v1/workspaces/{quote(workspace_id, safe='')}/fs/ws?token={quote(token, safe='')}" + + +def _default_websocket_factory(url: str) -> OnWriteSocket: + try: + import websocket # type: ignore[import-not-found] + except ImportError as exc: + raise RuntimeError("Install websocket-client or pass websocket_factory to use on_write.") from exc + return websocket.create_connection(url) + + +def _get_default_client() -> RelayFileClient: + global _default_client + if _default_client is None: + token = os.getenv("RELAYFILE_TOKEN") + if not token: + raise ValueError("on_write requires client or RELAYFILE_TOKEN") + _default_client = RelayFileClient(os.getenv("RELAYFILE_BASE_URL") or DEFAULT_RELAYFILE_BASE_URL, token) + return _default_client diff --git a/packages/sdk/python/src/relayfile/types.py b/packages/sdk/python/src/relayfile/types.py index dc0dc926..63455071 100644 --- a/packages/sdk/python/src/relayfile/types.py +++ b/packages/sdk/python/src/relayfile/types.py @@ -242,6 +242,25 @@ class FilesystemEvent: provider: str | None = None +@dataclass +class WriteEventActor: + type: str # "agent" | "user" | "system" + id: str + + +@dataclass +class WriteEvent: + workspace_id: str + path: str + operation: str # "create" | "update" | "delete" + revision: str + previous_revision: str | None + timestamp: str + source: str # "webhook" | "agent" | "sync" | "api" | "cli" + value: Any | None = None + actor: WriteEventActor | dict[str, Any] | None = None + + @dataclass class EventFeedResponse: events: list[FilesystemEvent] diff --git a/packages/sdk/python/tests/test_on_write.py b/packages/sdk/python/tests/test_on_write.py new file mode 100644 index 00000000..22474c16 --- /dev/null +++ b/packages/sdk/python/tests/test_on_write.py @@ -0,0 +1,222 @@ +from __future__ import annotations + +import json +import queue +import threading +import time +from typing import Any + +import pytest + +from relayfile import RelayFileClient, on_write, path_matches + + +BASE = "https://relay.test" + + +class FakeSocket: + def __init__(self) -> None: + self.messages: queue.Queue[str | BaseException] = queue.Queue() + self.closed = False + + def recv(self) -> str: + item = self.messages.get(timeout=1) + if isinstance(item, BaseException): + raise item + return item + + def close(self) -> None: + self.closed = True + self.messages.put(ConnectionError("closed")) + + def emit(self, payload: dict[str, Any]) -> None: + self.messages.put(json.dumps(payload)) + + def fail(self) -> None: + self.messages.put(ConnectionError("dropped")) + + +def event(path: str, event_type: str = "file.updated") -> dict[str, Any]: + return { + "eventId": f"evt:{path}", + "type": event_type, + "path": path, + "revision": "rev_1", + "timestamp": "2026-05-06T10:00:00Z", + "origin": "provider_sync", + } + + +def wait_for(condition: Any, timeout: float = 1.0) -> None: + deadline = time.time() + timeout + while time.time() < deadline: + if condition(): + return + time.sleep(0.01) + raise AssertionError("condition was not met before timeout") + + +class RecordingClient(RelayFileClient): + def __init__(self) -> None: + super().__init__(BASE, "tok_test") + self.handler_errors: list[dict[str, Any]] = [] + + def record_handler_error(self, payload: dict[str, Any]) -> None: + self.handler_errors.append(payload) + + +def test_path_matches_requested_patterns() -> None: + assert path_matches("/notion/pages/calls/*/transcript", "/notion/pages/calls/2026-05-08/transcript") + assert not path_matches("/notion/pages/calls/*/transcript", "/notion/pages/calls/2026-05-08/notes/transcript") + assert path_matches("/linear/issues/**", "/linear/issues/PROJ-441/comments/c-1") + assert path_matches("/linear/issues/**", "/linear/issues") + assert path_matches("/github/repos/acme/api/pulls/*", "/github/repos/acme/api/pulls/42") + assert not path_matches("/github/repos/acme/api/pulls/*", "/github/repos/acme/api/pulls/42/files") + + +def test_invalid_pattern_throws_synchronously() -> None: + with pytest.raises(ValueError, match="start with"): + path_matches("linear/issues/**", "/linear/issues/PROJ-1") + with pytest.raises(ValueError, match="trailing"): + path_matches("/linear/**/comments", "/linear/PROJ-1/comments") + with pytest.raises(ValueError, match="empty"): + on_write("/linear//issues/*", lambda event: None, client=RecordingClient(), workspace_id="ws_acme") + + +def test_on_write_dispatches_matching_events_and_shares_socket() -> None: + client = RecordingClient() + sockets: list[FakeSocket] = [] + calls: list[str] = [] + socket_ready = threading.Event() + + def factory(url: str) -> FakeSocket: + assert url == "wss://relay.test/v1/workspaces/ws_acme/fs/ws?token=tok_test" + socket = FakeSocket() + sockets.append(socket) + socket_ready.set() + return socket + + unsub_transcript = on_write( + "/notion/pages/calls/*/transcript", + lambda evt: calls.append(f"transcript:{evt.path}:{evt.operation}:{evt.source}"), + client=client, + workspace_id="ws_acme", + base_url=BASE, + token="tok_test", + websocket_factory=factory, + ) + unsub_linear = on_write( + "/linear/issues/**", + lambda evt: calls.append(f"linear:{evt.path}"), + client=client, + workspace_id="ws_acme", + base_url=BASE, + token="tok_test", + ) + unsub_pull = on_write( + "/github/repos/acme/api/pulls/*", + lambda evt: calls.append(f"pull:{evt.path}"), + client=client, + workspace_id="ws_acme", + base_url=BASE, + token="tok_test", + ) + + assert socket_ready.wait(1) + assert len(sockets) == 1 + sockets[0].emit(event("/notion/pages/calls/call-1/transcript")) + sockets[0].emit(event("/notion/pages/calls/call-1/notes/transcript")) + sockets[0].emit(event("/linear/issues/PROJ-441/comments/c-1")) + sockets[0].emit(event("/github/repos/acme/api/pulls/42", "file.created")) + + wait_for(lambda: len(calls) == 3) + assert calls == [ + "transcript:/notion/pages/calls/call-1/transcript:update:sync", + "linear:/linear/issues/PROJ-441/comments/c-1", + "pull:/github/repos/acme/api/pulls/42", + ] + + unsub_transcript() + unsub_linear() + unsub_pull() + + +def test_handler_error_isolated_and_recorded() -> None: + client = RecordingClient() + sockets: list[FakeSocket] = [] + calls: list[str] = [] + socket_ready = threading.Event() + + def factory(url: str) -> FakeSocket: + socket = FakeSocket() + sockets.append(socket) + socket_ready.set() + return socket + + def boom(_: Any) -> None: + raise RuntimeError("boom") + + unsub_1 = on_write( + "/linear/issues/**", + boom, + client=client, + workspace_id="ws_acme", + base_url=BASE, + token="tok_test", + websocket_factory=factory, + ) + unsub_2 = on_write( + "/linear/issues/**", + lambda evt: calls.append(evt.path), + client=client, + workspace_id="ws_acme", + base_url=BASE, + token="tok_test", + ) + + assert socket_ready.wait(1) + sockets[0].emit(event("/linear/issues/PROJ-1")) + + wait_for(lambda: len(calls) == 1 and len(client.handler_errors) == 1) + assert calls == ["/linear/issues/PROJ-1"] + assert client.handler_errors[0]["pattern"] == "/linear/issues/**" + assert client.handler_errors[0]["path"] == "/linear/issues/PROJ-1" + assert client.handler_errors[0]["retryable"] is False + + unsub_1() + unsub_2() + + +def test_reconnect_backoff_uses_one_then_two_seconds() -> None: + client = RecordingClient() + sockets: list[FakeSocket] = [] + delays: list[float] = [] + observed = threading.Event() + + def factory(_: str) -> FakeSocket: + socket = FakeSocket() + sockets.append(socket) + socket.fail() + return socket + + def sleep(delay: float) -> None: + delays.append(delay) + if len(delays) >= 2: + observed.set() + time.sleep(0.01) + + unsubscribe = on_write( + "/github/repos/acme/api/pulls/*", + lambda event: None, + client=client, + workspace_id="ws_acme", + base_url=BASE, + token="tok_test", + websocket_factory=factory, + _sleep=sleep, + ) + + assert observed.wait(1) + unsubscribe() + assert delays[:2] == [1.0, 2.0] + assert len(sockets) >= 2 diff --git a/packages/sdk/typescript/examples/onWrite.ts b/packages/sdk/typescript/examples/onWrite.ts new file mode 100644 index 00000000..6ec922de --- /dev/null +++ b/packages/sdk/typescript/examples/onWrite.ts @@ -0,0 +1,45 @@ +import { RelayFileClient, onWrite } from "@relayfile/sdk"; + +const token = process.env.RELAYFILE_TOKEN; +const workspaceId = process.env.RELAYFILE_WORKSPACE_ID; + +if (!token || !workspaceId) { + throw new Error("Set RELAYFILE_TOKEN and RELAYFILE_WORKSPACE_ID before running this example."); +} + +const client = new RelayFileClient({ + token, + baseUrl: process.env.RELAYFILE_BASE_URL +}); + +const stopTranscriptWatcher = onWrite( + "/notion/pages/calls/*/transcript", + async (event) => { + const callId = event.path.split("/")[4]; + console.log(`Call transcript changed for ${callId} at ${event.revision}`); + }, + { client, workspaceId } +); + +const stopIssueWatcher = onWrite( + "/linear/issues/**", + async (event) => { + console.log(`Linear issue tree changed: ${event.path}`); + }, + { client, workspaceId } +); + +const stopPullWatcher = onWrite( + "/github/repos/acme/api/pulls/*", + async (event) => { + const pullNumber = event.path.split("/").at(-1); + console.log(`API pull request ${pullNumber} changed via ${event.source}`); + }, + { client, workspaceId } +); + +process.once("SIGINT", () => { + stopTranscriptWatcher(); + stopIssueWatcher(); + stopPullWatcher(); +}); diff --git a/packages/sdk/typescript/src/index.ts b/packages/sdk/typescript/src/index.ts index ed3d1b12..147f0c6f 100644 --- a/packages/sdk/typescript/src/index.ts +++ b/packages/sdk/typescript/src/index.ts @@ -52,6 +52,14 @@ export { type RelayFileSyncSocket, type RelayFileSyncState } from "./sync.js"; +export { + onWrite, + pathMatches, + type OnWriteClient, + type OnWriteHandler, + type OnWriteHandlerError, + type OnWriteOptions +} from "./onWrite.js"; export { InvalidStateError, PayloadTooLargeError, @@ -147,6 +155,7 @@ export type { WriteQueuedResponse } from "./types.js"; export type { ForkHandle, ForkOptions } from "@relayfile/core"; +export type { WriteEvent, WriteEventActor, WriteEventOperation, WriteEventSource } from "@relayfile/core"; export { WritebackConsumer } from "./writeback-consumer.js"; export type { WritebackHandler, WritebackConsumerOptions } from "./writeback-consumer.js"; diff --git a/packages/sdk/typescript/src/onWrite.test.ts b/packages/sdk/typescript/src/onWrite.test.ts new file mode 100644 index 00000000..962cac85 --- /dev/null +++ b/packages/sdk/typescript/src/onWrite.test.ts @@ -0,0 +1,210 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; + +import type { RelayFileClient } from "./client.js"; +import { onWrite, pathMatches, type OnWriteClient } from "./onWrite.js"; + +class MockWebSocket { + readonly url: string; + sent: string[] = []; + private readonly listeners = new Map void>>(); + + constructor(url: string) { + this.url = url; + } + + addEventListener(type: string, handler: (event: any) => void): void { + if (!this.listeners.has(type)) { + this.listeners.set(type, new Set()); + } + this.listeners.get(type)!.add(handler); + } + + send(data: string): void { + this.sent.push(data); + } + + close(code?: number, reason?: string): void { + this.emit("close", { code: code ?? 1000, reason: reason ?? "closed" }); + } + + emit(type: string, event: any): void { + for (const handler of this.listeners.get(type) ?? []) { + handler(event); + } + } +} + +function makeClient(): OnWriteClient { + return { + getEvents: vi.fn().mockResolvedValue({ events: [], nextCursor: null }), + recordHandlerError: vi.fn().mockResolvedValue(undefined) + } as unknown as OnWriteClient; +} + +function emitFilesystemEvent(socket: MockWebSocket, path: string, type = "file.updated"): void { + socket.emit("message", { + data: JSON.stringify({ + eventId: `evt:${path}`, + type, + path, + revision: "rev_1", + timestamp: "2026-05-06T10:00:00Z", + origin: "provider_sync" + }) + }); +} + +async function flushPromises(): Promise { + for (let index = 0; index < 8; index += 1) { + await Promise.resolve(); + } +} + +describe("pathMatches", () => { + it.each([ + ["/notion/pages/calls/*/transcript", "/notion/pages/calls/2026-05-08/transcript", true], + ["/notion/pages/calls/*/transcript", "/notion/pages/calls/2026-05-08/notes/transcript", false], + ["/linear/issues/**", "/linear/issues/PROJ-441/comments/c-1", true], + ["/linear/issues/**", "/linear/issues", true], + ["/github/repos/acme/api/pulls/*", "/github/repos/acme/api/pulls/42", true], + ["/github/repos/acme/api/pulls/*", "/github/repos/acme/api/pulls/42/files", false] + ])("matches %s against %s as %s", (pattern, path, expected) => { + expect(pathMatches(pattern, path)).toBe(expected); + }); + + it("throws synchronously for invalid patterns", () => { + expect(() => pathMatches("linear/issues/**", "/linear/issues/PROJ-1")).toThrow("start with"); + expect(() => pathMatches("/linear/**/comments", "/linear/PROJ-1/comments")).toThrow("trailing"); + expect(() => onWrite("/linear//issues/*", () => undefined, { client: makeClient(), workspaceId: "ws_acme" })).toThrow("empty"); + }); +}); + +describe("onWrite", () => { + beforeEach(() => { + vi.useRealTimers(); + vi.restoreAllMocks(); + }); + + it("fans out matching events and ignores non-matching paths", async () => { + const client = makeClient(); + const sockets: MockWebSocket[] = []; + const calls: string[] = []; + + const unsubscribeTranscript = onWrite( + "/notion/pages/calls/*/transcript", + (event) => calls.push(`transcript:${event.path}:${event.operation}:${event.source}`), + { + client, + workspaceId: "ws_acme", + token: "tok_test", + webSocketFactory: (url) => { + const socket = new MockWebSocket(url); + sockets.push(socket); + return socket; + } + } + ); + const unsubscribeLinear = onWrite( + "/linear/issues/**", + (event) => calls.push(`linear:${event.path}`), + { client, workspaceId: "ws_acme", token: "tok_test" } + ); + const unsubscribePull = onWrite( + "/github/repos/acme/api/pulls/*", + (event) => calls.push(`pull:${event.path}`), + { client, workspaceId: "ws_acme", token: "tok_test" } + ); + + expect(sockets).toHaveLength(1); + expect(sockets[0]!.url).toBe("wss://api.relayfile.dev/v1/workspaces/ws_acme/fs/ws?token=tok_test"); + + sockets[0]!.emit("open", {}); + emitFilesystemEvent(sockets[0]!, "/notion/pages/calls/call-1/transcript"); + emitFilesystemEvent(sockets[0]!, "/notion/pages/calls/call-1/notes/transcript"); + emitFilesystemEvent(sockets[0]!, "/linear/issues/PROJ-441/comments/c-1"); + emitFilesystemEvent(sockets[0]!, "/github/repos/acme/api/pulls/42", "file.created"); + + await flushPromises(); + + expect(calls).toEqual([ + "transcript:/notion/pages/calls/call-1/transcript:update:sync", + "linear:/linear/issues/PROJ-441/comments/c-1", + "pull:/github/repos/acme/api/pulls/42" + ]); + + unsubscribeTranscript(); + unsubscribeLinear(); + unsubscribePull(); + }); + + it("isolates handler errors and records them", async () => { + const client = makeClient(); + const sockets: MockWebSocket[] = []; + const calls: string[] = []; + + onWrite( + "/linear/issues/**", + () => { + throw new Error("boom"); + }, + { + client, + workspaceId: "ws_acme", + token: "tok_test", + webSocketFactory: (url) => { + const socket = new MockWebSocket(url); + sockets.push(socket); + return socket; + } + } + ); + onWrite("/linear/issues/**", (event) => calls.push(event.path), { + client, + workspaceId: "ws_acme", + token: "tok_test" + }); + + emitFilesystemEvent(sockets[0]!, "/linear/issues/PROJ-1"); + await flushPromises(); + + expect(calls).toEqual(["/linear/issues/PROJ-1"]); + expect(client.recordHandlerError).toHaveBeenCalledWith( + expect.objectContaining({ + pattern: "/linear/issues/**", + path: "/linear/issues/PROJ-1", + retryable: false + }) + ); + }); + + it("reconnects with the 1s then 2s backoff schedule", async () => { + vi.useFakeTimers(); + const client = makeClient(); + const sockets: MockWebSocket[] = []; + + onWrite("/github/repos/acme/api/pulls/*", () => undefined, { + client, + workspaceId: "ws_acme", + token: "tok_test", + webSocketFactory: (url) => { + const socket = new MockWebSocket(url); + sockets.push(socket); + return socket; + } + }); + + sockets[0]!.emit("close", { code: 1006, reason: "dropped" }); + await vi.advanceTimersByTimeAsync(999); + expect(sockets).toHaveLength(1); + await vi.advanceTimersByTimeAsync(1); + expect(sockets).toHaveLength(2); + + sockets[1]!.emit("close", { code: 1006, reason: "dropped again" }); + await vi.advanceTimersByTimeAsync(1999); + expect(sockets).toHaveLength(2); + await vi.advanceTimersByTimeAsync(1); + expect(sockets).toHaveLength(3); + + vi.useRealTimers(); + }); +}); diff --git a/packages/sdk/typescript/src/onWrite.ts b/packages/sdk/typescript/src/onWrite.ts new file mode 100644 index 00000000..f74cd672 --- /dev/null +++ b/packages/sdk/typescript/src/onWrite.ts @@ -0,0 +1,309 @@ +import type { WriteEvent, WriteEventOperation, WriteEventSource } from "@relayfile/core"; + +import { RelayFileClient, DEFAULT_RELAYFILE_BASE_URL } from "./client.js"; +import { RelayFileSync, type RelayFileSyncSocket } from "./sync.js"; +import type { FilesystemEvent } from "./types.js"; + +export type OnWriteHandler = (event: WriteEvent) => void | Promise; + +export interface OnWriteHandlerError { + pattern: string; + path: string; + error: unknown; + retryable: false; +} + +export type OnWriteClient = RelayFileClient & { + recordHandlerError?(error: OnWriteHandlerError): void | Promise; +}; + +export interface OnWriteOptions { + client?: OnWriteClient; + workspaceId?: string; + operations?: WriteEventOperation[]; + signal?: AbortSignal; + baseUrl?: string; + token?: string; + webSocketFactory?: (url: string) => RelayFileSyncSocket; +} + +interface OnWriteRegistration { + id: number; + pattern: string; + operations: Set; + handler: OnWriteHandler; +} + +interface EnvironmentLike { + process?: { + env?: Record; + }; +} + +const DEFAULT_OPERATIONS: WriteEventOperation[] = ["create", "update"]; +const DEFAULT_RECONNECT_MIN_DELAY_MS = 1000; +const DEFAULT_RECONNECT_MAX_DELAY_MS = 30000; +const dispatchers = new WeakMap(); +let nextRegistrationId = 1; +let defaultClient: OnWriteClient | undefined; + +export function pathMatches(pattern: string, path: string): boolean { + const patternSegments = normalizePattern(pattern); + const pathSegments = normalizePath(path); + return matchSegments(patternSegments, pathSegments); +} + +export function onWrite( + pattern: string, + handler: OnWriteHandler, + options: OnWriteOptions = {} +): () => void { + const normalizedPattern = `/${normalizePattern(pattern).join("/")}`; + if (typeof handler !== "function") { + throw new Error("onWrite handler must be a function."); + } + + const client = options.client ?? getDefaultClient(); + const workspaceId = options.workspaceId ?? readEnv("RELAYFILE_WORKSPACE_ID"); + if (!workspaceId) { + throw new Error("onWrite requires options.workspaceId or RELAYFILE_WORKSPACE_ID."); + } + + const operations = new Set(options.operations ?? DEFAULT_OPERATIONS); + for (const operation of operations) { + if (operation !== "create" && operation !== "update" && operation !== "delete") { + throw new Error(`Invalid onWrite operation: ${operation}`); + } + } + + let dispatcher = dispatchers.get(client); + if (!dispatcher) { + dispatcher = new OnWriteDispatcher(client); + dispatchers.set(client, dispatcher); + } + + const registration: OnWriteRegistration = { + id: nextRegistrationId++, + pattern: normalizedPattern, + operations, + handler + }; + + dispatcher.register(registration, { + workspaceId, + signal: options.signal, + baseUrl: options.baseUrl, + token: options.token, + webSocketFactory: options.webSocketFactory + }); + + return () => { + dispatcher?.unregister(registration.id); + }; +} + +class OnWriteDispatcher { + private readonly client: OnWriteClient; + private readonly registrations: OnWriteRegistration[] = []; + private readonly patternChains = new Map>(); + private sync?: RelayFileSync; + + constructor(client: OnWriteClient) { + this.client = client; + } + + register( + registration: OnWriteRegistration, + options: Required> & Pick + ): void { + this.registrations.push(registration); + if (options.signal) { + if (options.signal.aborted) { + this.unregister(registration.id); + return; + } + options.signal.addEventListener("abort", () => this.unregister(registration.id), { once: true }); + } + this.ensureSync(options); + } + + unregister(id: number): void { + const index = this.registrations.findIndex((registration) => registration.id === id); + if (index >= 0) { + this.registrations.splice(index, 1); + } + if (this.registrations.length === 0 && this.sync) { + void this.sync.stop(); + this.sync = undefined; + } + } + + private ensureSync(options: Required> & Pick): void { + if (this.sync) { + return; + } + + this.sync = RelayFileSync.connect({ + client: this.client, + workspaceId: options.workspaceId, + baseUrl: options.baseUrl ?? readEnv("RELAYFILE_BASE_URL") ?? DEFAULT_RELAYFILE_BASE_URL, + token: options.token ?? readEnv("RELAYFILE_TOKEN"), + reconnect: { + minDelayMs: DEFAULT_RECONNECT_MIN_DELAY_MS, + maxDelayMs: DEFAULT_RECONNECT_MAX_DELAY_MS + }, + webSocketFactory: options.webSocketFactory, + onEvent: (event) => { + void this.dispatch(event); + } + }); + } + + private async dispatch(event: FilesystemEvent): Promise { + const writeEvent = toWriteEvent(event); + if (!writeEvent) { + return; + } + + for (const registration of [...this.registrations]) { + if (!registration.operations.has(writeEvent.operation) || !pathMatches(registration.pattern, writeEvent.path)) { + continue; + } + + const previous = this.patternChains.get(registration.pattern) ?? Promise.resolve(); + const next = previous + .catch(() => undefined) + .then(() => this.runHandler(registration, writeEvent)); + this.patternChains.set(registration.pattern, next); + void next.finally(() => { + if (this.patternChains.get(registration.pattern) === next) { + this.patternChains.delete(registration.pattern); + } + }); + } + } + + private async runHandler(registration: OnWriteRegistration, event: WriteEvent): Promise { + try { + await registration.handler(event); + } catch (error) { + await this.recordHandlerError({ + pattern: registration.pattern, + path: event.path, + error, + retryable: false + }); + } + } + + private async recordHandlerError(error: OnWriteHandlerError): Promise { + if (typeof this.client.recordHandlerError === "function") { + await this.client.recordHandlerError(error); + return; + } + if (typeof console !== "undefined" && typeof console.error === "function") { + console.error("Relayfile onWrite handler error", error); + } + } +} + +function normalizePattern(pattern: string): string[] { + if (typeof pattern !== "string" || pattern.length === 0) { + throw new Error("onWrite pattern must be a non-empty string."); + } + if (!pattern.startsWith("/")) { + throw new Error("onWrite pattern must start with '/'."); + } + if (pattern.includes("//")) { + throw new Error("onWrite pattern cannot contain empty path segments."); + } + const segments = normalizePath(pattern); + const recursiveIndex = segments.indexOf("**"); + if (recursiveIndex >= 0 && recursiveIndex !== segments.length - 1) { + throw new Error("onWrite pattern only supports '**' as the trailing segment."); + } + return segments; +} + +function normalizePath(path: string): string[] { + if (!path.startsWith("/")) { + return normalizePath(`/${path}`); + } + const trimmed = path.replace(/\/+$/, ""); + if (trimmed === "") { + return []; + } + return trimmed.split("/").filter(Boolean); +} + +function matchSegments(pattern: string[], path: string[]): boolean { + if (pattern.length > 0 && pattern[pattern.length - 1] === "**") { + const prefix = pattern.slice(0, -1); + return path.length >= prefix.length && prefix.every((segment, index) => segment === "*" || segment === path[index]); + } + + if (pattern.length !== path.length) { + return false; + } + return pattern.every((segment, index) => segment === "*" || segment === path[index]); +} + +function toWriteEvent(event: FilesystemEvent): WriteEvent | null { + const operation = operationFromEventType(event.type); + if (!operation) { + return null; + } + const raw = event as FilesystemEvent & Partial & { previousRevision?: string | null }; + return { + workspaceId: raw.workspaceId ?? "", + path: event.path, + operation, + revision: event.revision, + previousRevision: raw.previousRevision ?? null, + timestamp: event.timestamp, + source: raw.source ?? sourceFromOrigin(event.origin), + value: raw.value, + actor: raw.actor + }; +} + +function operationFromEventType(type: FilesystemEvent["type"]): WriteEventOperation | null { + if (type === "file.created") { + return "create"; + } + if (type === "file.updated") { + return "update"; + } + if (type === "file.deleted") { + return "delete"; + } + return null; +} + +function sourceFromOrigin(origin: FilesystemEvent["origin"]): WriteEventSource { + if (origin === "agent_write") { + return "agent"; + } + if (origin === "provider_sync") { + return "sync"; + } + return "api"; +} + +function getDefaultClient(): OnWriteClient { + if (!defaultClient) { + const token = readEnv("RELAYFILE_TOKEN"); + if (!token) { + throw new Error("onWrite requires options.client or RELAYFILE_TOKEN."); + } + defaultClient = new RelayFileClient({ + baseUrl: readEnv("RELAYFILE_BASE_URL") ?? DEFAULT_RELAYFILE_BASE_URL, + token + }) as OnWriteClient; + } + return defaultClient; +} + +function readEnv(name: string): string | undefined { + return (globalThis as EnvironmentLike).process?.env?.[name]; +} From 387df4efb61c11c2ea14d273e2c55f18678c7363 Mon Sep 17 00:00:00 2001 From: Writeback Reliability Bot Date: Wed, 6 May 2026 14:14:24 +0200 Subject: [PATCH 3/4] fix(sdk): address Codex review feedback on PR #85 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three P2 bugs Codex identified: 1. **TS — workspaceId dropped on every emitted event.** RelayFileSync normalizes the FilesystemEvent shape and does not surface workspaceId, so toWriteEvent's `raw.workspaceId ?? ""` fallback meant every WriteEvent handler received `event.workspaceId === ""`. Fix: capture the subscribed workspace on the dispatcher at register-time and thread it into toWriteEvent. 2. **Python — dispatcher could not restart after a full drain.** When the last registration was removed we set _stop and closed the socket but never reset the thread bookkeeping, so a subsequent on_write() on the same client appended a registration and skipped _start() (because _thread was still non-None) and the new handler never received events. Fix: each worker thread now owns its own threading.Event captured at _start; unregister() detaches that reference once drained so the next register() spins up a fresh thread + fresh stop event. 3. **Python — websocket-client was an undeclared runtime dependency.** The default factory imports `websocket` (websocket-client), but pyproject only declared httpx, so a vanilla `pip install relayfile` user hit a RuntimeError on first on_write() call. Fix: declare websocket-client>=1.7,<2 as a runtime dependency. Adds two regression tests (one TS, one Python) covering #1 and #2. Co-Authored-By: Claude Opus 4.7 (1M context) --- packages/sdk/python/pyproject.toml | 2 +- packages/sdk/python/src/relayfile/on_write.py | 30 ++++++++--- packages/sdk/python/tests/test_on_write.py | 53 +++++++++++++++++++ packages/sdk/typescript/src/onWrite.test.ts | 30 +++++++++++ packages/sdk/typescript/src/onWrite.ts | 12 +++-- 5 files changed, 116 insertions(+), 11 deletions(-) diff --git a/packages/sdk/python/pyproject.toml b/packages/sdk/python/pyproject.toml index b7440d92..64b7009e 100644 --- a/packages/sdk/python/pyproject.toml +++ b/packages/sdk/python/pyproject.toml @@ -9,7 +9,7 @@ description = "Python SDK for the RelayFile virtual filesystem API" readme = "README.md" license = "Apache-2.0" requires-python = ">=3.10" -dependencies = ["httpx>=0.27,<1"] +dependencies = ["httpx>=0.27,<1", "websocket-client>=1.7,<2"] [project.optional-dependencies] dev = ["pytest>=8", "pytest-asyncio>=0.24", "respx>=0.22"] diff --git a/packages/sdk/python/src/relayfile/on_write.py b/packages/sdk/python/src/relayfile/on_write.py index 74237cfd..1ef45203 100644 --- a/packages/sdk/python/src/relayfile/on_write.py +++ b/packages/sdk/python/src/relayfile/on_write.py @@ -147,7 +147,14 @@ def __init__(self, client: RelayFileClient) -> None: self._next_id = 1 self._socket: OnWriteSocket | None = None self._thread: threading.Thread | None = None - self._stop = threading.Event() + # Each spawned worker thread owns its own stop event (captured at + # _start). When the last registration is removed we set the active + # thread's event and detach our reference so the next register() call + # can start a fresh thread with a fresh event. Keeping a single shared + # event across thread lifetimes meant a drained dispatcher stayed + # permanently stopped — re-subscribing on the same client appended a + # registration but never produced events. + self._stop: threading.Event | None = None def register( self, @@ -183,9 +190,16 @@ def unregister(self, registration_id: int) -> None: with self._lock: self._registrations = [registration for registration in self._registrations if registration.id != registration_id] if not self._registrations: - self._stop.set() + if self._stop is not None: + self._stop.set() if self._socket is not None: self._socket.close() + # Detach: the worker thread owns its captured stop event and + # will exit on its own. Future register() calls start a fresh + # thread. + self._thread = None + self._stop = None + self._socket = None def _start( self, @@ -202,26 +216,28 @@ def _start( raise ValueError("on_write requires token or RELAYFILE_TOKEN") url = _build_websocket_url(base_url or os.getenv("RELAYFILE_BASE_URL") or DEFAULT_RELAYFILE_BASE_URL, workspace_id, resolved_token) + stop_event = threading.Event() + self._stop = stop_event self._thread = threading.Thread( target=self._run, - kwargs={"url": url, "factory": factory, "sleep": sleep}, + kwargs={"url": url, "factory": factory, "sleep": sleep, "stop": stop_event}, name="relayfile-on-write", daemon=True, ) self._thread.start() - def _run(self, *, url: str, factory: WebSocketFactory, sleep: SleepFn) -> None: + def _run(self, *, url: str, factory: WebSocketFactory, sleep: SleepFn, stop: threading.Event) -> None: reconnect_attempt = 0 - while not self._stop.is_set(): + while not stop.is_set(): try: self._socket = factory(url) - while not self._stop.is_set(): + while not stop.is_set(): raw = self._socket.recv() if raw: self._dispatch_raw(raw) reconnect_attempt = 0 except Exception: - if self._stop.is_set(): + if stop.is_set(): return delay = RECONNECT_DELAYS_SECONDS[min(reconnect_attempt, len(RECONNECT_DELAYS_SECONDS) - 1)] reconnect_attempt += 1 diff --git a/packages/sdk/python/tests/test_on_write.py b/packages/sdk/python/tests/test_on_write.py index 22474c16..157248e7 100644 --- a/packages/sdk/python/tests/test_on_write.py +++ b/packages/sdk/python/tests/test_on_write.py @@ -220,3 +220,56 @@ def sleep(delay: float) -> None: unsubscribe() assert delays[:2] == [1.0, 2.0] assert len(sockets) >= 2 + + +def test_dispatcher_restarts_after_full_drain() -> None: + """Regression: re-subscribing on the same client after the last unsubscribe + must spin up a fresh worker thread and deliver events. Previously the + dispatcher's stop event stayed permanently set and `_thread` stayed + non-None, so subsequent registrations silently never received anything. + """ + client = RecordingClient() + sockets: list[FakeSocket] = [] + factory_signal = threading.Event() + + def factory(_: str) -> FakeSocket: + socket = FakeSocket() + sockets.append(socket) + factory_signal.set() + return socket + + calls_one: list[str] = [] + unsub_one = on_write( + "/linear/issues/**", + lambda evt: calls_one.append(evt.path), + client=client, + workspace_id="ws_acme", + base_url=BASE, + token="tok_test", + websocket_factory=factory, + ) + assert factory_signal.wait(1) + sockets[0].emit(event("/linear/issues/PROJ-1")) + wait_for(lambda: calls_one == ["/linear/issues/PROJ-1"]) + + unsub_one() # drain — dispatcher should reset internal state + + # Re-subscribe on the same client. + factory_signal.clear() + calls_two: list[str] = [] + unsub_two = on_write( + "/linear/issues/**", + lambda evt: calls_two.append(evt.path), + client=client, + workspace_id="ws_acme", + base_url=BASE, + token="tok_test", + websocket_factory=factory, + ) + assert factory_signal.wait(1), "second registration did not start a worker" + assert len(sockets) == 2 + + sockets[1].emit(event("/linear/issues/PROJ-2")) + wait_for(lambda: calls_two == ["/linear/issues/PROJ-2"]) + + unsub_two() diff --git a/packages/sdk/typescript/src/onWrite.test.ts b/packages/sdk/typescript/src/onWrite.test.ts index 962cac85..d2a1b3da 100644 --- a/packages/sdk/typescript/src/onWrite.test.ts +++ b/packages/sdk/typescript/src/onWrite.test.ts @@ -177,6 +177,36 @@ describe("onWrite", () => { ); }); + it("stamps the subscribed workspaceId onto emitted events", async () => { + // Regression: RelayFileSync strips workspaceId from the FilesystemEvent + // shape, so toWriteEvent used to fall back to "" and break handlers that + // route by workspace. The dispatcher must thread the registered workspace + // through to the WriteEvent it hands to user code. + const client = makeClient(); + const sockets: MockWebSocket[] = []; + const received: string[] = []; + + onWrite( + "/notion/pages/calls/*/transcript", + (event) => received.push(event.workspaceId), + { + client, + workspaceId: "ws_acme", + token: "tok_test", + webSocketFactory: (url) => { + const socket = new MockWebSocket(url); + sockets.push(socket); + return socket; + } + } + ); + + emitFilesystemEvent(sockets[0]!, "/notion/pages/calls/call-1/transcript"); + await flushPromises(); + + expect(received).toEqual(["ws_acme"]); + }); + it("reconnects with the 1s then 2s backoff schedule", async () => { vi.useFakeTimers(); const client = makeClient(); diff --git a/packages/sdk/typescript/src/onWrite.ts b/packages/sdk/typescript/src/onWrite.ts index f74cd672..c37b505f 100644 --- a/packages/sdk/typescript/src/onWrite.ts +++ b/packages/sdk/typescript/src/onWrite.ts @@ -107,6 +107,11 @@ class OnWriteDispatcher { private readonly registrations: OnWriteRegistration[] = []; private readonly patternChains = new Map>(); private sync?: RelayFileSync; + // Captured at first registration. RelayFileSync normalizes the FilesystemEvent + // shape and does not surface workspaceId on emitted events, so we thread the + // subscribed workspaceId through here and stamp it onto every WriteEvent we + // hand to user handlers. + private workspaceId?: string; constructor(client: OnWriteClient) { this.client = client; @@ -116,6 +121,7 @@ class OnWriteDispatcher { registration: OnWriteRegistration, options: Required> & Pick ): void { + this.workspaceId = options.workspaceId; this.registrations.push(registration); if (options.signal) { if (options.signal.aborted) { @@ -160,7 +166,7 @@ class OnWriteDispatcher { } private async dispatch(event: FilesystemEvent): Promise { - const writeEvent = toWriteEvent(event); + const writeEvent = toWriteEvent(event, this.workspaceId); if (!writeEvent) { return; } @@ -248,14 +254,14 @@ function matchSegments(pattern: string[], path: string[]): boolean { return pattern.every((segment, index) => segment === "*" || segment === path[index]); } -function toWriteEvent(event: FilesystemEvent): WriteEvent | null { +function toWriteEvent(event: FilesystemEvent, workspaceId?: string): WriteEvent | null { const operation = operationFromEventType(event.type); if (!operation) { return null; } const raw = event as FilesystemEvent & Partial & { previousRevision?: string | null }; return { - workspaceId: raw.workspaceId ?? "", + workspaceId: raw.workspaceId ?? workspaceId ?? "", path: event.path, operation, revision: event.revision, From 62d0b3c361fca3560f3241cca2e082a15ac7742f Mon Sep 17 00:00:00 2001 From: Writeback Reliability Bot Date: Wed, 6 May 2026 14:24:33 +0200 Subject: [PATCH 4/4] fix(sdk): address second round of PR #85 review feedback MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three real bugs from coderabbitai's review, plus one push-back: 1. **Mixed-workspace per-client cache (TS + Python).** The dispatcher cache keyed by client meant `onWrite(..., { client, workspaceId: 'ws_b' })` after a `ws_a` registration silently attached to the existing `ws_a` socket and never received `ws_b` events. The design doc scopes a client to one workspace (out-of-scope: cross-workspace subscriptions), so we throw a clear error with remediation rather than route to a wrong feed. 2. **recordHandlerError can break dispatch isolation (TS + Python).** The "handler errors do not propagate" guarantee covered user handlers but not the recorder itself — a rejecting recordHandlerError would propagate out of the dispatch chain and skip subsequent handlers for the pattern. Wrap the recorder call in try/catch and fall back to console.error / logging.exception. Also stop using asyncio.run() inside the existing thread loop for awaitable recorders to avoid escaping isolation through that path. 3. **toWriteEvent advertised fields the wire format never preserves (TS + Python).** RelayFileSync's normalizeFilesystemEvent only surfaces eventId/type/path/revision/origin/provider/correlationId/timestamp. The cast to Partial with previousRevision/value/actor reads was misleading — those fields were always undefined/null in practice. Drop the cast; emit explicit null/undefined for the not-yet-wired fields with a comment noting wire-format extension is a follow-up. Push-back: coderabbitai twice flagged that trailing `**` matches zero segments as a contradiction with the PR objectives. The design doc explicitly says "any number of segments" (i.e. zero-or-more, like gitignore and standard glob); the existing test case `["/linear/issues/**", "/linear/issues", true]` locked this in. Added an explicit code comment on both matchers and a row in the design-doc match table to make the zero-or-more semantic unambiguous so future readers don't re-flag it. Adds four regression tests (two TS, two Python) covering the mixed-workspace rejection and the recorder-isolation paths. Co-Authored-By: Claude Opus 4.7 (1M context) --- docs/onwrite-trigger-design.md | 3 +- packages/sdk/python/src/relayfile/on_write.py | 59 ++++++++++--- packages/sdk/python/tests/test_on_write.py | 87 +++++++++++++++++++ packages/sdk/typescript/src/onWrite.test.ts | 78 +++++++++++++++++ packages/sdk/typescript/src/onWrite.ts | 60 +++++++++---- 5 files changed, 257 insertions(+), 30 deletions(-) diff --git a/docs/onwrite-trigger-design.md b/docs/onwrite-trigger-design.md index bacf6504..2995d9db 100644 --- a/docs/onwrite-trigger-design.md +++ b/docs/onwrite-trigger-design.md @@ -85,7 +85,7 @@ export interface OnWriteOptions { * * Pattern syntax (glob-like, single-segment wildcards only): * /notion/pages/calls/* /transcript (* matches one path segment) - * /linear/issues/** (** matches any number of segments) + * /linear/issues/** (** matches zero or more trailing segments — including the collection root) * /github/repos/acme/api/pulls/* (* matches one segment) * * Multiple wildcards are supported. No regex. @@ -158,6 +158,7 @@ Single grammar, both languages: | `/notion/pages/calls/*/transcript` | `/notion/pages/calls/2026-05-08/transcript` | ✅ | | `/notion/pages/calls/*/transcript` | `/notion/pages/calls/2026-05-08/notes/transcript` | ❌ (`*` is single-segment) | | `/linear/issues/**` | `/linear/issues/PROJ-441/comments/c-1` | ✅ | +| `/linear/issues/**` | `/linear/issues` | ✅ (`**` includes the collection root) | | `/github/repos/*/*/pulls/*` | `/github/repos/acme/api/pulls/42` | ✅ | | `/hubspot/deals/*/stage` | `/hubspot/deals/4471/stage` | ✅ | diff --git a/packages/sdk/python/src/relayfile/on_write.py b/packages/sdk/python/src/relayfile/on_write.py index 1ef45203..b1ee18e5 100644 --- a/packages/sdk/python/src/relayfile/on_write.py +++ b/packages/sdk/python/src/relayfile/on_write.py @@ -113,9 +113,19 @@ def _register( if not resolved_workspace_id: raise ValueError("on_write requires workspace_id or RELAYFILE_WORKSPACE_ID") + # The dispatcher cache is keyed by client; a single shared WebSocket is + # bound to one workspace. v1 scopes a client to a single workspace per the + # design doc (out-of-scope: cross-workspace subscriptions). Reject + # mismatched workspace_id rather than silently attaching to the wrong feed. dispatcher = _dispatchers.get(resolved_client) + if dispatcher is not None and dispatcher.workspace_id != resolved_workspace_id: + raise ValueError( + f"on_write registrations on the same client must use the same workspace_id. " + f'Existing="{dispatcher.workspace_id}", new="{resolved_workspace_id}". ' + "Construct a separate RelayFileClient per workspace." + ) if dispatcher is None: - dispatcher = _OnWriteDispatcher(resolved_client) + dispatcher = _OnWriteDispatcher(resolved_client, resolved_workspace_id) _dispatchers[resolved_client] = dispatcher return dispatcher.register( @@ -139,8 +149,13 @@ def __init__(self, registration_id: int, pattern: str, operations: set[str], han class _OnWriteDispatcher: - def __init__(self, client: RelayFileClient) -> None: + def __init__(self, client: RelayFileClient, workspace_id: str) -> None: self._client = client + # Captured at construction. The wire format does not surface + # workspaceId on emitted events, so we stamp this onto every WriteEvent + # we hand to user handlers. It also gates registrations: see the + # cross-workspace check in _register(). + self.workspace_id = workspace_id self._registrations: list[_Registration] = [] self._lock = threading.RLock() self._pattern_locks: dict[str, threading.Lock] = {} @@ -253,7 +268,7 @@ def _dispatch_raw(self, raw: str) -> None: if not isinstance(payload, dict): return - event = _to_write_event(payload) + event = _to_write_event(payload, self.workspace_id) if event is None: return @@ -276,13 +291,20 @@ def _run_handler(self, registration: _Registration, event: WriteEvent) -> None: self._record_handler_error(registration.pattern, event.path, exc) def _record_handler_error(self, pattern: str, path: str, error: Exception) -> None: + # The "handler errors do not propagate" guarantee covers the recorder + # too: if the customer's record_handler_error implementation raises, + # fall back to the logger rather than letting the exception bubble up + # the dispatch loop and break sequential dispatch for the pattern. payload = {"pattern": pattern, "path": path, "error": error, "retryable": False} recorder = getattr(self._client, "record_handler_error", None) or getattr(self._client, "recordHandlerError", None) if callable(recorder): - result = recorder(payload) - if hasattr(result, "__await__"): - asyncio.run(result) - return + try: + result = recorder(payload) + if hasattr(result, "__await__"): + asyncio.run(result) + return + except Exception: + logging.getLogger(__name__).exception("Relayfile on_write handler-error reporter failed") logging.getLogger(__name__).exception("Relayfile on_write handler error", exc_info=error) @@ -308,28 +330,39 @@ def _normalize_path(path: str) -> list[str]: def _match_segments(pattern: list[str], path: list[str]) -> bool: + # Trailing ``**`` matches **zero or more** trailing segments — same as + # gitignore and standard glob conventions, and what the design doc + # specifies ("any number of segments"). ``/linear/issues/**`` therefore + # matches both ``/linear/issues`` (the collection root) and + # ``/linear/issues/PROJ-1/comments``. ``*`` matches exactly one segment; + # ``**`` is only valid as the last segment. if pattern and pattern[-1] == "**": prefix = pattern[:-1] return len(path) >= len(prefix) and all(segment == "*" or segment == path[index] for index, segment in enumerate(prefix)) return len(pattern) == len(path) and all(segment == "*" or segment == path[index] for index, segment in enumerate(pattern)) -def _to_write_event(payload: dict[str, Any]) -> WriteEvent | None: +def _to_write_event(payload: dict[str, Any], workspace_id: str) -> WriteEvent | None: event_type = payload.get("type") operation = _operation_from_type(event_type) path = payload.get("path") if operation is None or not isinstance(path, str): return None + # The wire format currently surfaces only: + # eventId, type, path, revision, origin, provider, correlationId, timestamp + # Fields the wider WriteEvent contract advertises but the wire format does + # not yet preserve — previous_revision, value, actor — are intentionally + # left as None here. Wiring them through the wire format is a follow-up. return WriteEvent( - workspace_id=str(payload.get("workspaceId") or payload.get("workspace_id") or ""), + workspace_id=workspace_id, path=path, operation=operation, revision=str(payload.get("revision") or ""), - previous_revision=payload.get("previousRevision") or payload.get("previous_revision"), + previous_revision=None, timestamp=str(payload.get("timestamp") or payload.get("ts") or ""), - source=str(payload.get("source") or _source_from_origin(payload.get("origin"))), - value=payload.get("value"), - actor=payload.get("actor"), + source=_source_from_origin(payload.get("origin")), + value=None, + actor=None, ) diff --git a/packages/sdk/python/tests/test_on_write.py b/packages/sdk/python/tests/test_on_write.py index 157248e7..794b3fb7 100644 --- a/packages/sdk/python/tests/test_on_write.py +++ b/packages/sdk/python/tests/test_on_write.py @@ -222,6 +222,93 @@ def sleep(delay: float) -> None: assert len(sockets) >= 2 +def test_rejects_mixed_workspace_on_same_client() -> None: + """A single RelayFileClient is bound to one workspace. The second on_write + must not silently attach to the first workspace's socket — reject it. + """ + client = RecordingClient() + sockets: list[FakeSocket] = [] + + def factory(_: str) -> FakeSocket: + socket = FakeSocket() + sockets.append(socket) + return socket + + unsub = on_write( + "/notion/pages/calls/*/transcript", + lambda evt: None, + client=client, + workspace_id="ws_acme", + base_url=BASE, + token="tok_test", + websocket_factory=factory, + ) + + with pytest.raises(ValueError, match="same workspace_id"): + on_write( + "/linear/issues/**", + lambda evt: None, + client=client, + workspace_id="ws_other", + base_url=BASE, + token="tok_test", + ) + + assert len(sockets) == 1, "second registration must not have started a socket" + unsub() + + +def test_isolates_dispatch_when_recorder_raises() -> None: + """recordHandlerError implementations that raise must not break sequential + dispatch for the pattern. + """ + + class ExplodingClient(RelayFileClient): + def __init__(self) -> None: + super().__init__(BASE, "tok_test") + + def record_handler_error(self, payload: dict[str, Any]) -> None: + raise RuntimeError("telemetry exploded") + + client = ExplodingClient() + sockets: list[FakeSocket] = [] + survived: list[str] = [] + socket_ready = threading.Event() + + def factory(_: str) -> FakeSocket: + socket = FakeSocket() + sockets.append(socket) + socket_ready.set() + return socket + + unsub_one = on_write( + "/linear/issues/**", + lambda _: (_ for _ in ()).throw(RuntimeError("user handler boom")), + client=client, + workspace_id="ws_acme", + base_url=BASE, + token="tok_test", + websocket_factory=factory, + ) + unsub_two = on_write( + "/linear/issues/**", + lambda evt: survived.append(evt.path), + client=client, + workspace_id="ws_acme", + base_url=BASE, + token="tok_test", + ) + + assert socket_ready.wait(1) + sockets[0].emit(event("/linear/issues/PROJ-1")) + + # The reporter raised, but the second handler still ran for this path. + wait_for(lambda: survived == ["/linear/issues/PROJ-1"]) + + unsub_one() + unsub_two() + + def test_dispatcher_restarts_after_full_drain() -> None: """Regression: re-subscribing on the same client after the last unsubscribe must spin up a fresh worker thread and deliver events. Previously the diff --git a/packages/sdk/typescript/src/onWrite.test.ts b/packages/sdk/typescript/src/onWrite.test.ts index d2a1b3da..e5ef9be8 100644 --- a/packages/sdk/typescript/src/onWrite.test.ts +++ b/packages/sdk/typescript/src/onWrite.test.ts @@ -207,6 +207,84 @@ describe("onWrite", () => { expect(received).toEqual(["ws_acme"]); }); + it("rejects a second registration on the same client with a different workspaceId", () => { + const client = makeClient(); + const sockets: MockWebSocket[] = []; + + onWrite( + "/notion/pages/calls/*/transcript", + () => undefined, + { + client, + workspaceId: "ws_acme", + token: "tok_test", + webSocketFactory: (url) => { + const socket = new MockWebSocket(url); + sockets.push(socket); + return socket; + } + } + ); + + expect(() => + onWrite("/linear/issues/**", () => undefined, { + client, + workspaceId: "ws_other", + token: "tok_test" + }) + ).toThrow(/same workspaceId/); + + // The original socket remains the only one — we did not silently attach a + // ws_other registration to the ws_acme feed. + expect(sockets).toHaveLength(1); + }); + + it("isolates dispatch when the customer recordHandlerError implementation rejects", async () => { + const client = { + getEvents: vi.fn().mockResolvedValue({ events: [], nextCursor: null }), + recordHandlerError: vi.fn().mockRejectedValue(new Error("telemetry exploded")) + } as unknown as OnWriteClient; + const sockets: MockWebSocket[] = []; + const survivedCalls: string[] = []; + const consoleErrors: unknown[][] = []; + const errorSpy = vi.spyOn(console, "error").mockImplementation((...args) => { + consoleErrors.push(args); + }); + + onWrite( + "/linear/issues/**", + () => { + throw new Error("user handler boom"); + }, + { + client, + workspaceId: "ws_acme", + token: "tok_test", + webSocketFactory: (url) => { + const socket = new MockWebSocket(url); + sockets.push(socket); + return socket; + } + } + ); + onWrite( + "/linear/issues/**", + (event) => survivedCalls.push(event.path), + { client, workspaceId: "ws_acme", token: "tok_test" } + ); + + emitFilesystemEvent(sockets[0]!, "/linear/issues/PROJ-1"); + await flushPromises(); + + // The reporter rejected, but the second handler still ran for the same path. + expect(survivedCalls).toEqual(["/linear/issues/PROJ-1"]); + // We logged both the reporter failure and the original handler error. + const flat = consoleErrors.flat().map(String).join(" | "); + expect(flat).toMatch(/reporter failed/); + expect(flat).toMatch(/handler error/); + errorSpy.mockRestore(); + }); + it("reconnects with the 1s then 2s backoff schedule", async () => { vi.useFakeTimers(); const client = makeClient(); diff --git a/packages/sdk/typescript/src/onWrite.ts b/packages/sdk/typescript/src/onWrite.ts index c37b505f..0c5c650a 100644 --- a/packages/sdk/typescript/src/onWrite.ts +++ b/packages/sdk/typescript/src/onWrite.ts @@ -76,9 +76,18 @@ export function onWrite( } } + // The dispatcher cache is keyed by client; a single shared WebSocket is + // bound to one workspace. v1 scopes a client to a single workspace per the + // design doc (Out-of-scope: "Cross-workspace subscriptions"). Reject + // mismatched workspaceId rather than silently attaching to the wrong feed. let dispatcher = dispatchers.get(client); + if (dispatcher && dispatcher.workspaceId !== workspaceId) { + throw new Error( + `onWrite registrations on the same client must use the same workspaceId. Existing="${dispatcher.workspaceId}", new="${workspaceId}". Construct a separate RelayFileClient per workspace.` + ); + } if (!dispatcher) { - dispatcher = new OnWriteDispatcher(client); + dispatcher = new OnWriteDispatcher(client, workspaceId); dispatchers.set(client, dispatcher); } @@ -104,24 +113,24 @@ export function onWrite( class OnWriteDispatcher { private readonly client: OnWriteClient; + // Captured at construction. RelayFileSync normalizes the FilesystemEvent + // shape and does not surface workspaceId on emitted events, so we stamp the + // subscribed workspaceId onto every WriteEvent we hand to user handlers. + // It also gates registrations: see the cross-workspace check in onWrite(). + readonly workspaceId: string; private readonly registrations: OnWriteRegistration[] = []; private readonly patternChains = new Map>(); private sync?: RelayFileSync; - // Captured at first registration. RelayFileSync normalizes the FilesystemEvent - // shape and does not surface workspaceId on emitted events, so we thread the - // subscribed workspaceId through here and stamp it onto every WriteEvent we - // hand to user handlers. - private workspaceId?: string; - constructor(client: OnWriteClient) { + constructor(client: OnWriteClient, workspaceId: string) { this.client = client; + this.workspaceId = workspaceId; } register( registration: OnWriteRegistration, options: Required> & Pick ): void { - this.workspaceId = options.workspaceId; this.registrations.push(registration); if (options.signal) { if (options.signal.aborted) { @@ -202,10 +211,20 @@ class OnWriteDispatcher { } } + // The "handler errors do not propagate" guarantee covers the recorder too: + // if the customer's recordHandlerError implementation throws or rejects, fall + // back to console.error rather than letting the rejection bubble up the + // dispatch chain (which would skip subsequent handlers for the same pattern). private async recordHandlerError(error: OnWriteHandlerError): Promise { if (typeof this.client.recordHandlerError === "function") { - await this.client.recordHandlerError(error); - return; + try { + await this.client.recordHandlerError(error); + return; + } catch (reportingError) { + if (typeof console !== "undefined" && typeof console.error === "function") { + console.error("Relayfile onWrite handler-error reporter failed", reportingError); + } + } } if (typeof console !== "undefined" && typeof console.error === "function") { console.error("Relayfile onWrite handler error", error); @@ -242,6 +261,11 @@ function normalizePath(path: string): string[] { return trimmed.split("/").filter(Boolean); } +// Trailing `**` matches **zero or more** trailing segments — same as gitignore +// and standard glob conventions, and what the design doc specifies ("any +// number of segments"). `/linear/issues/**` therefore matches both +// `/linear/issues` (the collection root) and `/linear/issues/PROJ-1/comments`. +// `*` matches exactly one segment; `**` is only valid as the last segment. function matchSegments(pattern: string[], path: string[]): boolean { if (pattern.length > 0 && pattern[pattern.length - 1] === "**") { const prefix = pattern.slice(0, -1); @@ -259,17 +283,21 @@ function toWriteEvent(event: FilesystemEvent, workspaceId?: string): WriteEvent if (!operation) { return null; } - const raw = event as FilesystemEvent & Partial & { previousRevision?: string | null }; + // RelayFileSync currently surfaces only: + // eventId, type, path, revision, origin, provider, correlationId, timestamp + // Fields the wider WriteEvent contract advertises but the wire format does + // not yet preserve — previousRevision, value, actor — are intentionally + // omitted/null here. Wiring them through the wire format and + // normalizeFilesystemEvent is a follow-up; v1 callers should treat + // previousRevision/value/actor as not-yet-populated. return { - workspaceId: raw.workspaceId ?? workspaceId ?? "", + workspaceId: workspaceId ?? "", path: event.path, operation, revision: event.revision, - previousRevision: raw.previousRevision ?? null, + previousRevision: null, timestamp: event.timestamp, - source: raw.source ?? sourceFromOrigin(event.origin), - value: raw.value, - actor: raw.actor + source: sourceFromOrigin(event.origin) }; }