Fix WebSocket subscription catch-up contract#239
Conversation
|
Warning You have reached your daily quota limit. Please wait up to 24 hours and I will start processing your requests again! |
|
CodeAnt AI is reviewing your PR. |
|
Caution Review failedPull request was closed or merged during review 📝 WalkthroughWalkthroughThis PR implements configurable WebSocket event subscriptions for RelayFile, enabling clients to control catch-up behavior via ChangesWebSocket Subscriptions with Cursor, from=now, and Path Filtering
Sequence Diagram(s)sequenceDiagram
participant Client as SDK Client
participant StreamMgr as StreamManager
participant Sync as RelayFileSync
participant WS as WebSocket
participant Server as Backend Server
participant Store as Store
Client->>StreamMgr: subscribe(pathScope)
StreamMgr->>StreamMgr: restartIfPathScopeChanged()
alt path scope differs
StreamMgr->>Sync: stop()
end
StreamMgr->>StreamMgr: ensureStarted()
StreamMgr->>StreamMgr: compute serverPathFilters
StreamMgr->>Sync: start(from, cursor, paths)
Sync->>Sync: normalize paths
Sync->>WS: connect with URL params<br/>(cursor or from=now, path)
WS->>Server: GET /fs/ws?cursor=evt_5&path=/a&path=/b
Server->>Server: parseWebSocketSubscriptionOptions
Server->>Store: GetEventsAfterCursor(cursor)
Store-->>Server: recent events
Server->>Server: filter by paths
Server-->>WS: catch-up events (matching only)
WS-->>Sync: message{eventId, path, origin}
Sync->>Sync: advance cursor from eventId
Sync->>Sync: emit if path matches
Sync-->>Client: FilesystemEvent
Server->>Server: subscribe to live
loop live delivery
Server->>Server: filter by paths
Server-->>WS: live events (matching only)
WS-->>Sync: message{eventId, path}
Sync->>Sync: emit if path matches
Sync-->>Client: FilesystemEvent
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Relayfile Eval ReviewRun: Passed: 4 | Needs human: 0 | Reviewable: 0 | Missing output: 0 | Failed: 0 | Skipped: 0 Human Review CasesNo reviewable human-review cases captured Relayfile output. |
kjgbot
left a comment
There was a problem hiding this comment.
Review: WS subscription contract (relayfile#238 / pear#82 Track C) — scope + acceptance + backward compat
Scope conformance: ✅ Every change maps to the #238 contract (server: from=now/cursor/path params, bounded cursor catch-up, filtered fanout, enriched frames; SDK: option surface + cursor tracking + scoped restart; OpenAPI + changelog). No creep.
Contract verification at head 3efab05:
from=nowskips backfill on subscribe and reconnect: ✅ server honors it inwebSocketCatchUpEvents; test proves no backfill before pong on both initial connect and reconnect, and live events still flow.- Exclusive cursor resume, capped at 100: ✅
GetEventsAfterCursorbinary-searches the monotonicevt_<n>ordinal, starts strictly after the cursor, caps at 100. SDK updatesthis.cursorfrom every framedeventId, so reconnects resume rather than replay. - Server-side path filters before fanout: ✅ applied to both catch-up and the live loop before frame writes — actual server-side constraint, not client-side discard. Verified the Go matcher and the SDK's
matchChangeSegmentshave identical semantics (trailing**+ single-segment*), so no server/client filter asymmetry. - Frames carry
eventId/origin/provider/correlationId: ✅ additive JSON fields. TheTestFileEventsWebSocketWritebackMaterializationCarriesAgentWriteOrigintest closes the end-to-end dependency flagged in pear#101's review — Pear'sagent_writeself-echo filter is now backed by a server guarantee. - Backward compatibility: ✅ legacy no-query clients get the exact pre-change
GetRecentEvents(workspaceID, 100)path; new frame fields are additive (old SDK 0.7.23 ignores them); subscribe-before-catch-up ordering + dedup is preserved, so no missed-event window at connect. The SDK'sfrom=nowdefault only takes effect on upgrade, which is the intended contract and is changelogged.
Tests: ran locally at head — go test ./... all pass; SDK vitest 185/185. CI fully green (Go Build/Test, SDK Typecheck, E2E, contract surface, provider evals). My local SDK tsc run hit TS2307 on @relayfile/core, but that's a hand-linked node_modules artifact of my review sandbox — green CI SDK Typecheck is authoritative.
Unbounded-dataflow check: catch-up bounded (100), fanout narrowed by filters, no new polling or recursive scans. (ws.Events grows unbounded in memory, but that's pre-existing and out of scope here.)
Findings (non-blocking, mostly contract-documentation follow-ups)
- Silent gap on unknown/expired cursor —
GetEventsAfterCursorreturns[]when the cursor isn't found (e.g. server restart resets the in-memory event log). The client resumes live believing it's caught up, with no signal. There's also a generation-ambiguity hazard:eventCounterrestarts after a server restart, so an old-generationevt_Ncan exactly match an unrelated new-generation event. Fine for Pear's no-replay use case; integrity-sensitive consumers need acursor-reset/gap signal frame eventually. Suggest tracking as a follow-up issue. - Silent truncation at the 100-event cap — >100 events past the cursor → client gets the first 100 plus live-from-subscribe-time; the middle window is dropped with no "truncated" indicator and no client paging. Bounded-by-design per #82, but the contract should say so (or emit a gap marker).
- SDK restart-on-path-scope-change loses the live cursor —
restartIfPathScopeChangedrebuildsRelayFileSyncwith the originalstartOptions.cursor, not the last-seen cursor the stopped sync had accumulated; with thefrom=nowdefault, events during the restart window are lost. Suggest threading the old sync's current cursor into the replacement. - Doc nit: path filters support trailing
**and single-segment*only (mid-pattern**compares literally, identically on both sides). Worth one line in the OpenAPI description so consumers don't pass mid-globs expecting full glob semantics.
Verdict
APPROVE — contract gap closed with bidirectional test coverage and clean backward compat. Findings 1–3 are good candidates for a follow-up issue on the subscription contract; none block this PR. After release, Pear's Track C consumes this via the SDK bump as planned.
|
Implemented a PR-specific fix in the TypeScript SDK:
Validated stale bot findings locally:
Verification run:
|
| function normalizeWebSocketPathFilters(paths: string[] | undefined): string[] { | ||
| const seen = new Set<string>(); | ||
| const normalized: string[] = []; | ||
| for (const value of paths ?? []) { | ||
| const path = typeof value === "string" ? value.trim() : ""; | ||
| if (!path) { | ||
| continue; | ||
| } | ||
| const absolute = path.startsWith("/") ? path : `/${path}`; | ||
| if (seen.has(absolute)) { | ||
| continue; | ||
| } | ||
| seen.add(absolute); | ||
| normalized.push(absolute); | ||
| } | ||
| return normalized; | ||
| } |
There was a problem hiding this comment.
Suggestion: Path filters are accepted without validating glob grammar, but matching logic only supports ** as a trailing segment. Invalid filters like /a/**/b are silently accepted and then never match, causing all events to be dropped unexpectedly. Reject unsupported patterns during normalization (same behavior as onWrite pattern validation) to avoid silent non-delivery. [incorrect condition logic]
Severity Level: Major ⚠️
- ❌ RelayFileSync subscriptions can silently drop all scoped events.
- ⚠️ WebSocket path filters behave inconsistently with onWrite patterns.
- ⚠️ Difficult debugging when invalid filters yield empty streams.Steps of Reproduction ✅
1. A consumer constructs `RelayFileSync` directly with an invalid glob-style path filter
such as `paths: ["/a/**/b"]` in `RelayFileSyncOptions` (type defined at
`packages/sdk/typescript/src/sync.ts:27-44`), then calls `sync.start()`.
2. In the `RelayFileSync` constructor (`packages/sdk/typescript/src/sync.ts:73-90`), the
`paths` option is normalized via `normalizeWebSocketPathFilters` at lines 66-82. This
helper only trims whitespace, enforces a leading `/`, and deduplicates values; it does not
validate glob grammar or restrict `**` to the trailing segment, so `"/a/**/b"` is silently
accepted and stored in `this.paths`.
3. On connect, `openWebSocketWithToken` at `packages/sdk/typescript/src/sync.ts:249-259`
appends each filter from `this.paths` as `path` query parameters, so the server receives
`path=/a/**/b`. The server parses these in `parseWebSocketSubscriptionOptions`
(`internal/httpapi/websocket.go:122-128`) using its own `normalizeWebSocketPathFilters`,
then filters events via `webSocketEventMatchesPaths` and `webSocketPathMatches` at
`internal/httpapi/websocket.go:161-199`. `webSocketPathMatches` only treats `**` as a
wildcard when it is the *final* segment; for a pattern like `/a/**/b` and an event path
like `/a/x/b`, the match fails and the server never writes that event to the WebSocket.
4. Even if an event with a non-matching path slipped through, the client-side
`emitFilesystemEvent` at `packages/sdk/typescript/src/sync.ts:33-38` re-applies filters
using `pathMatchesAnyFilter`, which calls `matchPathSegments` at lines 101-109. This uses
the same semantics as the server (only trailing `**` is special), so `/a/**/b` never
matches `/a/x/b` here either. In contrast, `onWrite` patterns are validated up-front in
`normalizePattern` at `packages/sdk/typescript/src/onWrite.ts:74-88`, which throws when
`**` appears anywhere but the last segment. Because `RelayFileSync` path filters lack
equivalent validation, callers who pass patterns like `/a/**/b` will see their
subscription receive no events from paths they expect to match, with no error or warning.Fix in Cursor | Fix in VSCode Claude
(Use Cmd/Ctrl + Click for best experience)
Prompt for AI Agent 🤖
This is a comment left during a code review.
**Path:** packages/sdk/typescript/src/sync.ts
**Line:** 66:82
**Comment:**
*Incorrect Condition Logic: Path filters are accepted without validating glob grammar, but matching logic only supports `**` as a trailing segment. Invalid filters like `/a/**/b` are silently accepted and then never match, causing all events to be dropped unexpectedly. Reject unsupported patterns during normalization (same behavior as `onWrite` pattern validation) to avoid silent non-delivery.
Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise.
Once fix is implemented, also check other comments on the same PR, and ask user if the user wants to fix the rest of the comments as well. if said yes, then fetch all the comments validate the correctness and implement a minimal fix| baseUrl: this.baseUrl, | ||
| token: this.token, | ||
| from: this.startOptions.from, | ||
| cursor: this.startOptions.cursor, |
There was a problem hiding this comment.
Suggestion: The stream restart path re-creates RelayFileSync using only the original start cursor, so any cursor progress made by the running sync is discarded. When path-scope changes trigger a restart, the new socket starts from stale state (from=now by default) and can silently miss events emitted during the restart window. Persist and reuse the latest delivered cursor when rebuilding the sync instance. [stale reference]
Severity Level: Critical 🚨
- ❌ Subscribe-based change streams can silently miss workspace events.
- ❌ Proactive runtimes may skip changes during path-scope updates.
- ⚠️ Downstream consumers cannot reliably advance event cursors.Steps of Reproduction ✅
1. In a proactive runtime, start a change stream via `RelayFileClient.subscribe()` at
`packages/sdk/typescript/src/client.ts:1453-1470` without passing `from` or `cursor`, so
`SubscribeOptions.from`/`cursor` are undefined and `getStreamManager()` at
`client.ts:766-787` constructs a `RelayFileChangeStreamManager` with
`startOptions.from`/`cursor` unset.
2. The manager's `ensureStarted()` at `client.ts:664-305` creates a `RelayFileSync` with
`from: this.startOptions.from` and `cursor: this.startOptions.cursor` (line 675), so
inside `RelayFileSync`'s constructor at `packages/sdk/typescript/src/sync.ts:83-100` the
`from` defaults to `"now"` and the internal `cursor` is initially undefined.
3. As filesystem events arrive over the WebSocket, `RelayFileSync.handleSocketMessage()`
at `sync.ts:251-41` parses each event, updates its internal `this.cursor = parsed.eventId`
(line 37), and emits events to the manager, so forward progress is tracked only inside the
running `RelayFileSync` instance.
4. Later, update the subscription path scopes (for example, by adding another subscription
with a different `pathScope` in `RelayFileChangeStreamManager.addSubscription()` at
`client.ts:206-219`), which calls `restartIfPathScopeChanged()` at `client.ts:307-321`,
stops the current `RelayFileSync`, and immediately calls `ensureStarted()` again. The new
`RelayFileSync` is constructed with `cursor: this.startOptions.cursor` (still the
original, often `undefined`) and `from` defaulting to `"now"`, so
`openWebSocketWithToken()` at `sync.ts:259-16` sends either the stale initial cursor or
`from=now` on the new `/fs/ws` URL. Any events appended after the old stream stopped and
before the new socket's `from=now` snapshot are not backfilled (per the server contract
described in the PR) and are never delivered to subscribers.Fix in Cursor | Fix in VSCode Claude
(Use Cmd/Ctrl + Click for best experience)
Prompt for AI Agent 🤖
This is a comment left during a code review.
**Path:** packages/sdk/typescript/src/client.ts
**Line:** 675:675
**Comment:**
*Stale Reference: The stream restart path re-creates `RelayFileSync` using only the original start cursor, so any cursor progress made by the running sync is discarded. When path-scope changes trigger a restart, the new socket starts from stale state (`from=now` by default) and can silently miss events emitted during the restart window. Persist and reuse the latest delivered cursor when rebuilding the sync instance.
Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise.
Once fix is implemented, also check other comments on the same PR, and ask user if the user wants to fix the rest of the comments as well. if said yes, then fetch all the comments validate the correctness and implement a minimal fix| const sync = this.sync; | ||
| this.sync = undefined; | ||
| void sync.stop(); | ||
| if (this.openHandleCount > 0 || this.subscriptions.size > 0) { | ||
| this.ensureStarted(); |
There was a problem hiding this comment.
Suggestion: This restart logic stops the current sync asynchronously and immediately starts a new one without waiting for shutdown to finish. That creates an overlap window where the old sync can still emit events (especially in polling mode) while the new sync is already active, causing duplicate/out-of-scope delivery races. Serialize restart by awaiting stop completion (or gate old emissions) before starting the replacement. [race condition]
Severity Level: Major ⚠️
- ⚠️ Duplicate change events during path-scope reconfiguration windows.
- ⚠️ Subscribers may see events outside updated path scopes.
- ⚠️ Proactive runtimes may mis-dedupe overlapping event deliveries.Steps of Reproduction ✅
1. Configure `RelayFileSync` to prefer polling (e.g., by running without a `baseUrl` or
with `preferPolling: true`), so `RelayFileSync.start()` at
`packages/sdk/typescript/src/sync.ts:147-159` calls `startPolling()`, which launches
`pollLoop()` at `sync.ts:166-235` and begins periodically fetching events via
`client.getEvents()`.
2. Attach one or more subscribe handlers through `RelayFileClient.subscribe()` at
`packages/sdk/typescript/src/client.ts:1453-1470`, which uses `getStreamManager()` at
`client.ts:766-787` to obtain a shared `RelayFileChangeStreamManager` and then
`addSubscription()` at `client.ts:206-219` to register `RelayFileChangeSubscription`
instances that consume `RelayFileSync`'s `event` notifications.
3. While the stream is in polling mode and delivering events, change the subscription path
filters (for example, by adding or removing a subscription with a different `pathScope`),
which causes `addSubscription()`/`unsubscribe()` to call `restartIfPathScopeChanged()` at
`client.ts:307-321`. That method captures the current `sync`, sets `this.sync =
undefined`, then calls `void sync.stop();` followed immediately by `this.ensureStarted()`
if there are still open handles or subscriptions.
4. Because `sync.stop()` is asynchronous, its `pollLoop()` continues the current iteration
until it next checks `this.stopped` in its `while (!this.stopped)` condition at
`sync.ts:166-235`, processing any already-fetched events and calling
`emitFilesystemEvent()` → `emit("event", ...)` even after the manager has created and
started a new `RelayFileSync` instance. During this overlap window, subscribers attached
via `RelayFileChangeSubscription.push()` at `client.ts:95-176` can receive events from
both the old and new syncs, leading to duplicate deliveries or events filtered using
outdated path-scopes.Fix in Cursor | Fix in VSCode Claude
(Use Cmd/Ctrl + Click for best experience)
Prompt for AI Agent 🤖
This is a comment left during a code review.
**Path:** packages/sdk/typescript/src/client.ts
**Line:** 715:719
**Comment:**
*Race Condition: This restart logic stops the current sync asynchronously and immediately starts a new one without waiting for shutdown to finish. That creates an overlap window where the old sync can still emit events (especially in polling mode) while the new sync is already active, causing duplicate/out-of-scope delivery races. Serialize restart by awaiting stop completion (or gate old emissions) before starting the replacement.
Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise.
Once fix is implemented, also check other comments on the same PR, and ask user if the user wants to fix the rest of the comments as well. if said yes, then fetch all the comments validate the correctness and implement a minimal fix|
CodeAnt AI finished reviewing your PR. |
|
Reviewed PR #239 and made one focused fix. Changed:
Validated:
|
User description
Summary
Fixes #238 and the upstream contract gap identified from AgentWorkforce/pear#82 / AgentWorkforce/pear#82 (comment).
This changes the WebSocket event subscription contract so updated SDK clients no longer receive historical workspace events as fresh live events by default:
/fs/wsnow acceptsfrom=now, exclusivecursor=<eventId>, and repeatedpath=<filter>query params.from=nowby sending no historical backfill, honorscursorby returning only events after that cursor, and keeps the catch-up bounded to 100 events.eventId,origin,provider, andcorrelationIdso consumers can update cursors and suppress self-echoes reliably.RelayFileSync,RelayFileClient.connectWebSocket,subscribe,open,onWrite) default tofrom=now;cursorand path filters are surfaced in the public options.Pear SDK bump note
Pear currently pins
@relayfile/sdk@0.7.23. After this PR is released, Pear needs to bump to the first published@relayfile/sdkversion that includes this PR, then use the SDK's defaultfrom=nowWebSocket behavior pluspathScope/pathsfor server-side scoped subscriptions.Validation
go test ./internal/httpapi -run 'TestFileEventsWebSocket'go test ./internal/httpapi ./internal/relayfilego test ./...npm run build --workspace=packages/corenpm run build --workspace=packages/sdk/typescriptnpm run typecheck --workspace=packages/sdk/typescriptnpm run test --workspace=packages/sdk/typescript -- --run src/sync.test.ts src/client.test.tsnpm run test --workspace=packages/sdk/typescriptscripts/check-contract-surface.shNote: the first SDK build/typecheck attempt failed in the fresh worktree because
@relayfile/corehad not been built yet. Afternpm run build --workspace=packages/core, SDK build/typecheck passed.CodeAnt-AI Description
Fix WebSocket catch-up and filtering for file event subscriptions
What Changed
from=now,cursor, and repeatedpathfilters when opening or reconnecting WebSocket subscriptions.Impact
✅ Fewer duplicate events after reconnect✅ Cleaner live-only subscriptions✅ More precise file change feeds💡 Usage Guide
Checking Your Pull Request
Every time you make a pull request, our system automatically looks through it. We check for security issues, mistakes in how you're setting up your infrastructure, and common code problems. We do this to make sure your changes are solid and won't cause any trouble later.
Talking to CodeAnt AI
Got a question or need a hand with something in your pull request? You can easily get in touch with CodeAnt AI right here. Just type the following in a comment on your pull request, and replace "Your question here" with whatever you want to ask:
This lets you have a chat with CodeAnt AI about your pull request, making it easier to understand and improve your code.
Example
Preserve Org Learnings with CodeAnt
You can record team preferences so CodeAnt AI applies them in future reviews. Reply directly to the specific CodeAnt AI suggestion (in the same thread) and replace "Your feedback here" with your input:
This helps CodeAnt AI learn and adapt to your team's coding style and standards.
Example
Retrigger review
Ask CodeAnt AI to review the PR again, by typing:
Check Your Repository Health
To analyze the health of your code repository, visit our dashboard at https://app.codeant.ai. This tool helps you identify potential issues and areas for improvement in your codebase, ensuring your repository maintains high standards of code health.