From 3efab05fe2ccd76fc34e0b5eb53675a5105e501b Mon Sep 17 00:00:00 2001 From: kjgbot Date: Fri, 5 Jun 2026 16:48:24 +0200 Subject: [PATCH] fix websocket subscription catch-up contract --- internal/httpapi/server_test.go | 272 ++++++++++++++++++++ internal/httpapi/websocket.go | 122 ++++++++- internal/relayfile/store.go | 45 ++++ openapi/relayfile-v1.openapi.yaml | 31 +++ packages/sdk/typescript/CHANGELOG.md | 6 +- packages/sdk/typescript/src/client.test.ts | 22 +- packages/sdk/typescript/src/client.ts | 83 +++++- packages/sdk/typescript/src/index.ts | 1 + packages/sdk/typescript/src/onWrite.test.ts | 2 +- packages/sdk/typescript/src/sync.test.ts | 43 +++- packages/sdk/typescript/src/sync.ts | 75 +++++- packages/sdk/typescript/src/types.ts | 4 + 12 files changed, 681 insertions(+), 25 deletions(-) diff --git a/internal/httpapi/server_test.go b/internal/httpapi/server_test.go index 533414eb..a6563393 100644 --- a/internal/httpapi/server_test.go +++ b/internal/httpapi/server_test.go @@ -147,6 +147,278 @@ func TestFileEventsWebSocketCatchUpAndPingPong(t *testing.T) { if live["path"] != "/notion/Docs/Two.md" { t.Fatalf("unexpected live event path: %v", live["path"]) } + if live["origin"] != "agent_write" { + t.Fatalf("expected live write event origin=agent_write, got %v", live["origin"]) + } +} + +func TestFileEventsWebSocketFromNowSkipsBackfillOnSubscribeAndReconnect(t *testing.T) { + store := relayfile.NewStoreWithOptions(relayfile.StoreOptions{DisableWorkers: true}) + t.Cleanup(store.Close) + + if _, err := store.WriteFile(relayfile.WriteRequest{ + WorkspaceID: "ws_socket_now", + Path: "/slack/channels/C1/messages/old.json", + IfMatch: "0", + ContentType: "application/json", + Content: "{}", + }); err != nil { + t.Fatalf("seed write failed: %v", err) + } + + server := httptest.NewServer(NewServer(store)) + defer server.Close() + + token := mustTestJWT(t, "dev-secret", "ws_socket_now", "Worker1", []string{"fs:read"}, time.Now().Add(time.Hour)) + wsURL := "ws" + strings.TrimPrefix(server.URL, "http") + "/v1/workspaces/ws_socket_now/fs/ws?token=" + token + "&from=now" + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + conn, _, err := websocket.Dial(ctx, wsURL, nil) + if err != nil { + t.Fatalf("websocket dial failed: %v", err) + } + if err := wsjson.Write(ctx, conn, map[string]any{"type": "ping"}); err != nil { + t.Fatalf("write ping failed: %v", err) + } + var pong map[string]any + if err := wsjson.Read(ctx, conn, &pong); err != nil { + t.Fatalf("read pong failed: %v", err) + } + if pong["type"] != "pong" { + t.Fatalf("from=now must not backfill seeded event before pong, got %+v", pong) + } + _ = conn.Close(websocket.StatusNormalClosure, "") + + if _, err := store.WriteFile(relayfile.WriteRequest{ + WorkspaceID: "ws_socket_now", + Path: "/slack/channels/C1/messages/disconnected.json", + IfMatch: "0", + ContentType: "application/json", + Content: "{}", + }); err != nil { + t.Fatalf("disconnected write failed: %v", err) + } + + conn, _, err = websocket.Dial(ctx, wsURL, nil) + if err != nil { + t.Fatalf("reconnect websocket dial failed: %v", err) + } + defer conn.Close(websocket.StatusNormalClosure, "") + if err := wsjson.Write(ctx, conn, map[string]any{"type": "ping"}); err != nil { + t.Fatalf("write reconnect ping failed: %v", err) + } + pong = map[string]any{} + if err := wsjson.Read(ctx, conn, &pong); err != nil { + t.Fatalf("read reconnect pong failed: %v", err) + } + if pong["type"] != "pong" { + t.Fatalf("from=now reconnect must not backfill disconnected event before pong, got %+v", pong) + } + + if _, err := store.WriteFile(relayfile.WriteRequest{ + WorkspaceID: "ws_socket_now", + Path: "/slack/channels/C1/messages/live.json", + IfMatch: "0", + ContentType: "application/json", + Content: "{}", + }); err != nil { + t.Fatalf("live write failed: %v", err) + } + var live map[string]any + if err := wsjson.Read(ctx, conn, &live); err != nil { + t.Fatalf("read live event failed: %v", err) + } + if live["path"] != "/slack/channels/C1/messages/live.json" { + t.Fatalf("unexpected live path after from=now reconnect: %+v", live) + } +} + +func TestFileEventsWebSocketCursorCatchUpIsExclusive(t *testing.T) { + store := relayfile.NewStoreWithOptions(relayfile.StoreOptions{DisableWorkers: true}) + t.Cleanup(store.Close) + + for _, path := range []string{"/docs/one.md", "/docs/two.md", "/docs/three.md"} { + if _, err := store.WriteFile(relayfile.WriteRequest{ + WorkspaceID: "ws_socket_cursor", + Path: path, + IfMatch: "0", + ContentType: "text/markdown", + Content: "# doc", + }); err != nil { + t.Fatalf("seed write %s failed: %v", path, err) + } + } + feed, err := store.GetEvents("ws_socket_cursor", "", "", 100) + if err != nil { + t.Fatalf("get events failed: %v", err) + } + if len(feed.Events) != 3 { + t.Fatalf("expected 3 seeded events, got %d", len(feed.Events)) + } + + server := httptest.NewServer(NewServer(store)) + defer server.Close() + + token := mustTestJWT(t, "dev-secret", "ws_socket_cursor", "Worker1", []string{"fs:read"}, time.Now().Add(time.Hour)) + wsURL := "ws" + strings.TrimPrefix(server.URL, "http") + "/v1/workspaces/ws_socket_cursor/fs/ws?token=" + token + "&cursor=" + url.QueryEscape(feed.Events[0].EventID) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + conn, _, err := websocket.Dial(ctx, wsURL, nil) + if err != nil { + t.Fatalf("websocket dial failed: %v", err) + } + defer conn.Close(websocket.StatusNormalClosure, "") + + var event map[string]any + if err := wsjson.Read(ctx, conn, &event); err != nil { + t.Fatalf("read first cursor catch-up event failed: %v", err) + } + if event["eventId"] != feed.Events[1].EventID || event["path"] != "/docs/two.md" { + t.Fatalf("expected exclusive cursor catch-up to start at second event, got %+v", event) + } + if err := wsjson.Read(ctx, conn, &event); err != nil { + t.Fatalf("read second cursor catch-up event failed: %v", err) + } + if event["eventId"] != feed.Events[2].EventID || event["path"] != "/docs/three.md" { + t.Fatalf("expected second catch-up event after cursor, got %+v", event) + } + if err := wsjson.Write(ctx, conn, map[string]any{"type": "ping"}); err != nil { + t.Fatalf("write ping failed: %v", err) + } + var pong map[string]any + if err := wsjson.Read(ctx, conn, &pong); err != nil { + t.Fatalf("read pong failed: %v", err) + } + if pong["type"] != "pong" { + t.Fatalf("expected no additional catch-up events before pong, got %+v", pong) + } +} + +func TestFileEventsWebSocketPathFilterConstrainsServerFanout(t *testing.T) { + store := relayfile.NewStoreWithOptions(relayfile.StoreOptions{DisableWorkers: true}) + t.Cleanup(store.Close) + + server := httptest.NewServer(NewServer(store)) + defer server.Close() + + token := mustTestJWT(t, "dev-secret", "ws_socket_path", "Worker1", []string{"fs:read"}, time.Now().Add(time.Hour)) + wsURL := "ws" + strings.TrimPrefix(server.URL, "http") + "/v1/workspaces/ws_socket_path/fs/ws?token=" + token + "&from=now&path=" + url.QueryEscape("/slack/channels/C1/**") + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + conn, _, err := websocket.Dial(ctx, wsURL, nil) + if err != nil { + t.Fatalf("websocket dial failed: %v", err) + } + defer conn.Close(websocket.StatusNormalClosure, "") + + if _, err := store.WriteFile(relayfile.WriteRequest{ + WorkspaceID: "ws_socket_path", + Path: "/slack/channels/C2/messages/ignored.json", + IfMatch: "0", + ContentType: "application/json", + Content: "{}", + }); err != nil { + t.Fatalf("non-matching write failed: %v", err) + } + type readResult struct { + event map[string]any + err error + } + readCh := make(chan readResult, 1) + go func() { + var event map[string]any + err := wsjson.Read(ctx, conn, &event) + readCh <- readResult{event: event, err: err} + }() + select { + case result := <-readCh: + if result.err != nil { + t.Fatalf("read after non-matching write failed: %v", result.err) + } + t.Fatalf("path filter must suppress non-matching event, got %+v", result.event) + case <-time.After(150 * time.Millisecond): + } + + if _, err := store.WriteFile(relayfile.WriteRequest{ + WorkspaceID: "ws_socket_path", + Path: "/slack/channels/C1/messages/live.json", + IfMatch: "0", + ContentType: "application/json", + Content: "{}", + }); err != nil { + t.Fatalf("matching write failed: %v", err) + } + var event map[string]any + select { + case result := <-readCh: + if result.err != nil { + t.Fatalf("read matching event failed: %v", result.err) + } + event = result.event + case <-ctx.Done(): + t.Fatalf("timed out waiting for matching event: %v", ctx.Err()) + } + if event["path"] != "/slack/channels/C1/messages/live.json" { + t.Fatalf("expected matching path event, got %+v", event) + } +} + +func TestFileEventsWebSocketWritebackMaterializationCarriesAgentWriteOrigin(t *testing.T) { + store := relayfile.NewStoreWithOptions(relayfile.StoreOptions{ + ProviderWriteAction: func(action relayfile.WritebackAction) error { + return nil + }, + }) + t.Cleanup(store.Close) + + server := httptest.NewServer(NewServer(store)) + defer server.Close() + + token := mustTestJWT(t, "dev-secret", "ws_socket_writeback_origin", "Worker1", []string{"fs:read", "fs:write"}, time.Now().Add(time.Hour)) + wsURL := "ws" + strings.TrimPrefix(server.URL, "http") + "/v1/workspaces/ws_socket_writeback_origin/fs/ws?token=" + token + "&from=now&path=" + url.QueryEscape("/external/**") + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + conn, _, err := websocket.Dial(ctx, wsURL, nil) + if err != nil { + t.Fatalf("websocket dial failed: %v", err) + } + defer conn.Close(websocket.StatusNormalClosure, "") + + writeResp := doRequest(t, NewServer(store), request{ + method: http.MethodPut, + path: "/v1/workspaces/ws_socket_writeback_origin/fs/file?path=/external/Writeback.md", + headers: map[string]string{ + "Authorization": "Bearer " + token, + "X-Correlation-Id": "corr_ws_writeback_origin", + "If-Match": "0", + }, + body: map[string]any{ + "contentType": "text/markdown", + "content": "# writeback", + }, + }) + if writeResp.Code != http.StatusAccepted { + t.Fatalf("expected 202 write, got %d (%s)", writeResp.Code, writeResp.Body.String()) + } + + var event map[string]any + if err := wsjson.Read(ctx, conn, &event); err != nil { + t.Fatalf("read writeback materialization event failed: %v", err) + } + if event["type"] != "file.created" || event["path"] != "/external/Writeback.md" { + t.Fatalf("expected writeback materialization file.created event, got %+v", event) + } + if event["origin"] != "agent_write" { + t.Fatalf("expected writeback materialization origin=agent_write, got %+v", event) + } } func TestLifecycleAndConflicts(t *testing.T) { diff --git a/internal/httpapi/websocket.go b/internal/httpapi/websocket.go index 7cd6c49e..105fd083 100644 --- a/internal/httpapi/websocket.go +++ b/internal/httpapi/websocket.go @@ -13,17 +13,27 @@ import ( ) type fileEventMessage struct { - Type string `json:"type"` - Path string `json:"path,omitempty"` - Revision string `json:"revision,omitempty"` - ContentHash string `json:"contentHash,omitempty"` - Timestamp string `json:"timestamp,omitempty"` + EventID string `json:"eventId,omitempty"` + Type string `json:"type"` + Path string `json:"path,omitempty"` + Revision string `json:"revision,omitempty"` + ContentHash string `json:"contentHash,omitempty"` + Origin string `json:"origin,omitempty"` + Provider string `json:"provider,omitempty"` + CorrelationID string `json:"correlationId,omitempty"` + Timestamp string `json:"timestamp,omitempty"` } type websocketClientMessage struct { Type string `json:"type"` } +type websocketSubscriptionOptions struct { + From string + Cursor string + Paths []string +} + func (s *Server) handleFileEventsWebSocket(w http.ResponseWriter, r *http.Request, workspaceID string) { claims, authErr := authorizeBearer("Bearer "+strings.TrimSpace(r.URL.Query().Get("token")), s.bearerVerifier, workspaceID, "fs:read", "", time.Now().UTC()) if authErr != nil { @@ -47,13 +57,14 @@ func (s *Server) handleFileEventsWebSocket(w http.ResponseWriter, r *http.Reques defer conn.Close(websocket.StatusNormalClosure, "") ctx := r.Context() + options := parseWebSocketSubscriptionOptions(r) // Subscribe FIRST, then catch up, so no events are missed in between. subscriptionCh := make(chan relayfile.Event, 256) unsubscribe := s.store.Subscribe(workspaceID, subscriptionCh) defer unsubscribe() - catchUp, err := s.store.GetRecentEvents(workspaceID, 100) + catchUp, err := s.webSocketCatchUpEvents(workspaceID, options) if err != nil { _ = conn.Close(websocket.StatusInternalError, "failed to load catch-up events") return @@ -64,6 +75,9 @@ func (s *Server) handleFileEventsWebSocket(w http.ResponseWriter, r *http.Reques if event.EventID != "" { catchUpIDs[event.EventID] = struct{}{} } + if !webSocketEventMatchesPaths(event, options.Paths) { + continue + } if err := s.writeWebSocketEvent(ctx, conn, event); err != nil { return } @@ -88,6 +102,9 @@ func (s *Server) handleFileEventsWebSocket(w http.ResponseWriter, r *http.Reques return } case event := <-subscriptionCh: + if !webSocketEventMatchesPaths(event, options.Paths) { + continue + } // Skip events already sent during catch-up to avoid duplicates if event.EventID != "" { if _, dup := catchUpIDs[event.EventID]; dup { @@ -102,6 +119,85 @@ func (s *Server) handleFileEventsWebSocket(w http.ResponseWriter, r *http.Reques } } +func parseWebSocketSubscriptionOptions(r *http.Request) websocketSubscriptionOptions { + query := r.URL.Query() + return websocketSubscriptionOptions{ + From: strings.ToLower(strings.TrimSpace(query.Get("from"))), + Cursor: strings.TrimSpace(query.Get("cursor")), + Paths: normalizeWebSocketPathFilters(query["path"]), + } +} + +func (s *Server) webSocketCatchUpEvents(workspaceID string, options websocketSubscriptionOptions) ([]relayfile.Event, error) { + if options.Cursor != "" { + return s.store.GetEventsAfterCursor(workspaceID, options.Cursor, 100) + } + if options.From == "now" { + return []relayfile.Event{}, nil + } + return s.store.GetRecentEvents(workspaceID, 100) +} + +func normalizeWebSocketPathFilters(values []string) []string { + seen := map[string]struct{}{} + paths := make([]string, 0, len(values)) + for _, value := range values { + path := strings.TrimSpace(value) + if path == "" { + continue + } + if !strings.HasPrefix(path, "/") { + path = "/" + path + } + if _, ok := seen[path]; ok { + continue + } + seen[path] = struct{}{} + paths = append(paths, path) + } + return paths +} + +func webSocketEventMatchesPaths(event relayfile.Event, filters []string) bool { + if len(filters) == 0 { + return true + } + for _, filter := range filters { + if webSocketPathMatches(filter, event.Path) { + return true + } + } + return false +} + +func webSocketPathMatches(pattern, eventPath string) bool { + pattern = strings.TrimSpace(pattern) + eventPath = strings.TrimSpace(eventPath) + if pattern == "" || eventPath == "" { + return false + } + if pattern == eventPath { + return true + } + patternSegments := strings.Split(strings.Trim(pattern, "/"), "/") + pathSegments := strings.Split(strings.Trim(eventPath, "/"), "/") + for index, segment := range patternSegments { + if segment == "**" && index == len(patternSegments)-1 { + return true + } + if index >= len(pathSegments) { + return false + } + if segment == "*" { + continue + } + if segment != pathSegments[index] { + return false + } + } + return len(patternSegments) == len(pathSegments) +} + func (s *Server) readWebSocketMessages(ctx context.Context, conn *websocket.Conn, controlCh chan<- fileEventMessage, readErrCh chan<- error) { defer close(readErrCh) for { @@ -133,10 +229,14 @@ func (s *Server) readWebSocketMessages(ctx context.Context, conn *websocket.Conn func (s *Server) writeWebSocketEvent(ctx context.Context, conn *websocket.Conn, event relayfile.Event) error { return wsjson.Write(ctx, conn, fileEventMessage{ - Type: event.Type, - Path: event.Path, - Revision: event.Revision, - ContentHash: event.ContentHash, - Timestamp: event.Timestamp, + EventID: event.EventID, + Type: event.Type, + Path: event.Path, + Revision: event.Revision, + ContentHash: event.ContentHash, + Origin: event.Origin, + Provider: event.Provider, + CorrelationID: event.CorrelationID, + Timestamp: event.Timestamp, }) } diff --git a/internal/relayfile/store.go b/internal/relayfile/store.go index 1e221c92..df61ac98 100644 --- a/internal/relayfile/store.go +++ b/internal/relayfile/store.go @@ -2043,6 +2043,51 @@ func (s *Store) GetRecentEvents(workspaceID string, limit int) ([]Event, error) return append([]Event(nil), ws.Events[start:]...), nil } +func (s *Store) GetEventsAfterCursor(workspaceID, cursor string, limit int) ([]Event, error) { + if workspaceID == "" || strings.TrimSpace(cursor) == "" { + return []Event{}, ErrInvalidInput + } + if limit <= 0 { + return []Event{}, nil + } + s.mu.RLock() + defer s.mu.RUnlock() + + ws, ok := s.workspaces[workspaceID] + if !ok || len(ws.Events) == 0 { + return []Event{}, nil + } + cursorOrdinal, ok := parseEventIDOrdinal(cursor) + if !ok { + return []Event{}, nil + } + index := sort.Search(len(ws.Events), func(i int) bool { + ordinal, ok := parseEventIDOrdinal(ws.Events[i].EventID) + return ok && ordinal >= cursorOrdinal + }) + if index >= len(ws.Events) || ws.Events[index].EventID != cursor { + return []Event{}, nil + } + start := index + 1 + if start >= len(ws.Events) { + return []Event{}, nil + } + end := start + limit + if end > len(ws.Events) { + end = len(ws.Events) + } + return append([]Event(nil), ws.Events[start:end]...), nil +} + +func parseEventIDOrdinal(eventID string) (uint64, bool) { + value := strings.TrimPrefix(strings.TrimSpace(eventID), "evt_") + if value == "" || value == eventID { + return 0, false + } + ordinal, err := strconv.ParseUint(value, 10, 64) + return ordinal, err == nil +} + func (s *Store) Subscribe(workspaceID string, ch chan<- Event) func() { workspaceID = strings.TrimSpace(workspaceID) if workspaceID == "" || ch == nil { diff --git a/openapi/relayfile-v1.openapi.yaml b/openapi/relayfile-v1.openapi.yaml index 8cd7a544..0e62317d 100644 --- a/openapi/relayfile-v1.openapi.yaml +++ b/openapi/relayfile-v1.openapi.yaml @@ -119,6 +119,13 @@ paths: The server sends JSON messages with `type`, `path`, `revision`, and `timestamp` fields. The client may send `{"type":"ping"}` to keep the connection alive; the server responds with `{"type":"pong"}`. + + Updated clients should pass `from=now` for live-only subscriptions, or + an exclusive `cursor` event id to catch up only events after that + cursor. Repeated `path` query parameters restrict both catch-up and + live delivery to exact path/glob filters. Legacy clients that omit + `from`, `cursor`, and `path` keep the historical recent-event catch-up + behavior. security: - BearerAuth: [fs:read] parameters: @@ -129,6 +136,30 @@ paths: description: JWT token for authentication (same as Bearer token value) schema: type: string + - name: from + in: query + required: false + description: Use `now` to skip historical catch-up and subscribe only to live events. + schema: + type: string + enum: [now] + - name: cursor + in: query + required: false + description: Exclusive filesystem event id cursor for bounded reconnect catch-up. + schema: + type: string + - name: path + in: query + required: false + description: Exact path or trailing-`**`/single-segment-`*` path filter. Repeat to subscribe to multiple paths. + schema: + type: array + items: + type: string + pattern: '^/.*' + style: form + explode: true responses: '101': description: WebSocket upgrade successful diff --git a/packages/sdk/typescript/CHANGELOG.md b/packages/sdk/typescript/CHANGELOG.md index 15ccd403..4ba8985f 100644 --- a/packages/sdk/typescript/CHANGELOG.md +++ b/packages/sdk/typescript/CHANGELOG.md @@ -6,7 +6,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] -_No unreleased changes._ +### Added + +- WebSocket subscriptions now send `from=now` by default, support exclusive + `cursor` resume, and forward exact `path` filters for server-side scoped + catch-up and live delivery. ## [0.8.9] - 2026-06-02 diff --git a/packages/sdk/typescript/src/client.test.ts b/packages/sdk/typescript/src/client.test.ts index 6573a360..be8a9972 100644 --- a/packages/sdk/typescript/src/client.test.ts +++ b/packages/sdk/typescript/src/client.test.ts @@ -259,8 +259,11 @@ describe("RelayFileClient — existing methods", () => { { coalesce: "none" }, ); - const socket = await waitForWebSocket(); - expect(ProactiveMockWebSocket.instances).toHaveLength(1); + await waitForExpectation(() => { + const lastSocket = ProactiveMockWebSocket.instances[ProactiveMockWebSocket.instances.length - 1]; + expect(lastSocket?.url).toBe("wss://relay.test/v1/workspaces/ws_acme/fs/ws?token=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ3b3Jrc3BhY2VfaWQiOiJ3c19hY21lIiwiYWdlbnRfbmFtZSI6InN1cHBvcnQtYWdlbnQiLCJhdWQiOlsicmVsYXlmaWxlIl19.sig&from=now&path=%2Flinear%2F**&path=%2Flinear%2Fissues%2F**"); + }); + const socket = ProactiveMockWebSocket.instances[ProactiveMockWebSocket.instances.length - 1]!; socket.emit("open", {}); socket.emit("message", { data: JSON.stringify({ @@ -1385,7 +1388,20 @@ describe("RelayFileClient — existing methods", () => { client.connectWebSocket("ws_acme", { token: "ws_token" }); expect(MockWebSocket.instances).toHaveLength(1); - expect(MockWebSocket.instances[0]!.url).toBe("wss://relay.test/v1/workspaces/ws_acme/fs/ws?token=ws_token"); + expect(MockWebSocket.instances[0]!.url).toBe("wss://relay.test/v1/workspaces/ws_acme/fs/ws?token=ws_token&from=now"); + }); + + it("forwards cursor and path filters to the WebSocket endpoint", () => { + const client = makeClient(mockFetch({ path: "/", entries: [], nextCursor: null })); + + client.connectWebSocket("ws_acme", { + token: "ws_token", + cursor: "evt_42", + paths: ["/slack/channels/C1/**", "/github/repos/acme/api/pulls/*"] + }); + + expect(MockWebSocket.instances).toHaveLength(1); + expect(MockWebSocket.instances[0]!.url).toBe("wss://relay.test/v1/workspaces/ws_acme/fs/ws?token=ws_token&cursor=evt_42&path=%2Fslack%2Fchannels%2FC1%2F**&path=%2Fgithub%2Frepos%2Facme%2Fapi%2Fpulls%2F*"); }); it("emits parsed filesystem events to the event handler", () => { diff --git a/packages/sdk/typescript/src/client.ts b/packages/sdk/typescript/src/client.ts index 4adcc76b..cdd734d8 100644 --- a/packages/sdk/typescript/src/client.ts +++ b/packages/sdk/typescript/src/client.ts @@ -139,6 +139,9 @@ export interface WebSocketConnection { export interface ConnectWebSocketOptions { token?: string; + from?: "now" | "legacy"; + cursor?: string; + paths?: string[]; onEvent?: (event: FilesystemEvent) => void; } @@ -156,6 +159,11 @@ const CLIENT_TOKEN_STREAM_KEY = "__client__"; type JsonObject = Record; +interface StreamStartOptions { + from?: "now" | "legacy"; + cursor?: string; +} + interface JwtClaimsShape { workspace_id?: unknown; agent_name?: unknown; @@ -526,6 +534,15 @@ class RelayFileChangeSubscription { await drain; } + serverPathFilters(): string[] { + const filters = new Set(); + const patterns = this.pathScopes ?? this.globPatterns; + for (const pattern of patterns) { + filters.add(`/${pattern.join("/")}`); + } + return Array.from(filters); + } + private matches(path: string): boolean { const pathSegments = normalizeChangePath(path); const matchesGlob = this.globPatterns.some((pattern) => matchChangeSegments(pattern, pathSegments)); @@ -562,6 +579,7 @@ class RelayFileChangeStreamManager { private readonly subscriptions = new Set(); private openHandleCount = 0; private sync?: RelayFileSync; + private activePathFilterKey?: string; private readyResolved = false; private readonly readyInternal: Promise; private resolveReady!: () => void; @@ -571,7 +589,8 @@ class RelayFileChangeStreamManager { private readonly client: RelayFileClient, private readonly workspaceId: string, private readonly token: string | undefined, - private readonly baseUrl: string + private readonly baseUrl: string, + private readonly startOptions: StreamStartOptions ) { this.readyInternal = new Promise((resolve, reject) => { this.resolveReady = resolve; @@ -586,11 +605,13 @@ class RelayFileChangeStreamManager { addSubscription(globs: string[], onChange: (event: ChangeEvent) => void, options?: SubscribeOptions): Subscription { const subscription = new RelayFileChangeSubscription(this, globs, onChange, options); this.subscriptions.add(subscription); + this.restartIfPathScopeChanged(); this.ensureStarted(); return { unsubscribe: async () => { this.subscriptions.delete(subscription); await subscription.close(); + this.restartIfPathScopeChanged(); await this.maybeStop(); } }; @@ -598,11 +619,13 @@ class RelayFileChangeStreamManager { open(): ChangeStreamConnection { this.openHandleCount += 1; + this.restartIfPathScopeChanged(); this.ensureStarted(); return { ready: this.ready, unsubscribe: async () => { this.openHandleCount = Math.max(0, this.openHandleCount - 1); + this.restartIfPathScopeChanged(); await this.maybeStop(); } }; @@ -641,11 +664,16 @@ class RelayFileChangeStreamManager { if (this.sync) { return; } + const paths = this.serverPathFilters(); + this.activePathFilterKey = paths.join("\n"); const sync = new RelayFileSync({ client: this.client, workspaceId: this.workspaceId, baseUrl: this.baseUrl, token: this.token, + from: this.startOptions.from, + cursor: this.startOptions.cursor, + paths, onPollingFallback: () => { this.resolveReadyOnce(); } @@ -675,6 +703,36 @@ class RelayFileChangeStreamManager { } } + private restartIfPathScopeChanged(): void { + if (!this.sync) { + return; + } + const nextKey = this.serverPathFilters().join("\n"); + const currentKey = this.activePathFilterKey; + if (nextKey === currentKey) { + return; + } + const sync = this.sync; + this.sync = undefined; + void sync.stop(); + if (this.openHandleCount > 0 || this.subscriptions.size > 0) { + this.ensureStarted(); + } + } + + private serverPathFilters(): string[] { + if (this.openHandleCount > 0) { + return []; + } + const filters = new Set(); + for (const subscription of this.subscriptions) { + for (const path of subscription.serverPathFilters()) { + filters.add(path); + } + } + return Array.from(filters).sort(); + } + private resolveReadyOnce(): void { if (this.readyResolved) { return; @@ -690,6 +748,7 @@ class RelayFileChangeStreamManager { if (this.sync) { const sync = this.sync; this.sync = undefined; + this.activePathFilterKey = undefined; await sync.stop(); } const managers = changeStreamManagers.get(this.client); @@ -708,19 +767,23 @@ function getStreamManager( client: RelayFileClient, workspaceId: string, token: string | undefined, - baseUrl: string + baseUrl: string, + startOptions: StreamStartOptions = {} ): RelayFileChangeStreamManager { let managers = changeStreamManagers.get(client); if (!managers) { managers = new Map(); changeStreamManagers.set(client, managers); } - const key = `${workspaceId}:${token ?? CLIENT_TOKEN_STREAM_KEY}`; + const key = `${workspaceId}:${token ?? CLIENT_TOKEN_STREAM_KEY}:${startOptions.from ?? "now"}:${startOptions.cursor ?? ""}`; const existing = managers.get(key); if (existing) { return existing; } - const manager = new RelayFileChangeStreamManager(client, workspaceId, token, baseUrl); + const manager = new RelayFileChangeStreamManager(client, workspaceId, token, baseUrl, { + from: startOptions.from, + cursor: startOptions.cursor + }); managers.set(key, manager); return manager; } @@ -1400,7 +1463,7 @@ export class RelayFileClient { ): Subscription { const setup = this.resolveWorkspaceId(options?.aclToken) .then((workspaceId) => { - const manager = getStreamManager(this, workspaceId, options?.aclToken, this.baseUrl); + const manager = getStreamManager(this, workspaceId, options?.aclToken, this.baseUrl, options); return manager.addSubscription(globs, onChange, options); }); return { @@ -1412,7 +1475,7 @@ export class RelayFileClient { } open(options: ChangeStreamConnectionOptions): ChangeStreamConnection { - const manager = getStreamManager(this, options.workspaceId, options.aclToken, this.baseUrl); + const manager = getStreamManager(this, options.workspaceId, options.aclToken, this.baseUrl, options); const connection = manager.open(); const replay = this.primeReplayCache(options).catch((error) => { if (typeof console !== "undefined" && typeof console.error === "function") { @@ -1535,6 +1598,14 @@ export class RelayFileClient { const url = new URL(`${this.baseUrl}/v1/workspaces/${encodeURIComponent(workspaceId)}/fs/ws`); url.protocol = url.protocol === "https:" ? "wss:" : "ws:"; url.searchParams.set("token", token); + if (options.cursor) { + url.searchParams.set("cursor", options.cursor); + } else if ((options.from ?? "now") === "now") { + url.searchParams.set("from", "now"); + } + for (const path of options.paths ?? []) { + url.searchParams.append("path", path); + } const socket = new WebSocket(url.toString()); return new RelayFileWebSocketConnection(socket, options.onEvent); diff --git a/packages/sdk/typescript/src/index.ts b/packages/sdk/typescript/src/index.ts index 9d70b355..94560166 100644 --- a/packages/sdk/typescript/src/index.ts +++ b/packages/sdk/typescript/src/index.ts @@ -73,6 +73,7 @@ export { type RelayFileSyncPong, type RelayFileSyncReconnectOptions, type RelayFileSyncSocket, + type RelayFileSyncStart, type RelayFileSyncState, type RelayFileSyncTokenProvider } from "./sync.js"; diff --git a/packages/sdk/typescript/src/onWrite.test.ts b/packages/sdk/typescript/src/onWrite.test.ts index 387cac12..2391ffef 100644 --- a/packages/sdk/typescript/src/onWrite.test.ts +++ b/packages/sdk/typescript/src/onWrite.test.ts @@ -119,7 +119,7 @@ describe("onWrite", () => { ); expect(sockets).toHaveLength(1); - expect(sockets[0]!.url).toBe("wss://api.relayfile.dev/v1/workspaces/ws_acme/fs/ws?token=tok_test"); + expect(sockets[0]!.url).toBe("wss://api.relayfile.dev/v1/workspaces/ws_acme/fs/ws?token=tok_test&from=now"); sockets[0]!.emit("open", {}); emitFilesystemEvent(sockets[0]!, "/notion/pages/calls/call-1/transcript"); diff --git a/packages/sdk/typescript/src/sync.test.ts b/packages/sdk/typescript/src/sync.test.ts index 781ccba2..e295b614 100644 --- a/packages/sdk/typescript/src/sync.test.ts +++ b/packages/sdk/typescript/src/sync.test.ts @@ -68,7 +68,7 @@ describe("RelayFileSync", () => { sync.start(); expect(sockets).toHaveLength(1); - expect(sockets[0]!.url).toBe("wss://relay.test/v1/workspaces/ws_acme/fs/ws?token=ws_token"); + expect(sockets[0]!.url).toBe("wss://relay.test/v1/workspaces/ws_acme/fs/ws?token=ws_token&from=now"); sockets[0]!.emit("open", {}); sockets[0]!.emit("message", { @@ -103,6 +103,47 @@ describe("RelayFileSync", () => { await sync.stop(); }); + it("sends exclusive cursor and path filters on WebSocket connect and reconnect", async () => { + vi.useFakeTimers(); + const sockets: MockWebSocket[] = []; + const sync = new RelayFileSync({ + client: makeClient(), + workspaceId: "ws_acme", + baseUrl: "https://relay.test", + token: "ws_token", + cursor: "evt_10", + paths: ["/slack/channels/C1/**"], + reconnect: { minDelayMs: 1, maxDelayMs: 1 }, + webSocketFactory: (url) => { + const socket = new MockWebSocket(url); + sockets.push(socket); + return socket; + } + }); + + sync.start(); + expect(sockets[0]!.url).toBe("wss://relay.test/v1/workspaces/ws_acme/fs/ws?token=ws_token&cursor=evt_10&path=%2Fslack%2Fchannels%2FC1%2F**"); + sockets[0]!.emit("open", {}); + sockets[0]!.emit("message", { + data: JSON.stringify({ + eventId: "evt_11", + type: "file.updated", + path: "/slack/channels/C1/messages/1.json", + revision: "rev_11", + timestamp: "2026-03-26T00:00:00Z" + }) + }); + sockets[0]!.emit("close", { code: 1006, reason: "lost" }); + + await vi.advanceTimersByTimeAsync(1); + + expect(sockets).toHaveLength(2); + expect(sockets[1]!.url).toBe("wss://relay.test/v1/workspaces/ws_acme/fs/ws?token=ws_token&cursor=evt_11&path=%2Fslack%2Fchannels%2FC1%2F**"); + + await sync.stop(); + vi.useRealTimers(); + }); + it("falls back to polling when preferred, seeds to the live cursor, then emits only new events", async () => { const getEvents = vi.fn(); getEvents diff --git a/packages/sdk/typescript/src/sync.ts b/packages/sdk/typescript/src/sync.ts index c29a6645..75aff030 100644 --- a/packages/sdk/typescript/src/sync.ts +++ b/packages/sdk/typescript/src/sync.ts @@ -11,6 +11,7 @@ import type { EventOrigin, FilesystemEvent } from "./types.js"; export type RelayFileSyncTokenProvider = string | (() => string | undefined | Promise); export type RelayFileSyncState = "idle" | "connecting" | "open" | "polling" | "reconnecting" | "closed"; +export type RelayFileSyncStart = "now" | "legacy"; export interface RelayFileSyncPong { type: "pong"; @@ -37,7 +38,9 @@ export interface RelayFileSyncOptions { * normally NOT pass this and let it inherit from the client. */ token?: RelayFileSyncTokenProvider; + from?: RelayFileSyncStart; cursor?: string; + paths?: string[]; preferPolling?: boolean; pollIntervalMs?: number; pingIntervalMs?: number; @@ -60,6 +63,52 @@ export interface RelayFileSyncOptions { onPollingFallback?: (info: { reason: string; cause?: unknown }) => void; } +function normalizeWebSocketPathFilters(paths: string[] | undefined): string[] { + const seen = new Set(); + const normalized: string[] = []; + for (const value of paths ?? []) { + const path = typeof value === "string" ? value.trim() : ""; + if (!path) { + continue; + } + const absolute = path.startsWith("/") ? path : `/${path}`; + if (seen.has(absolute)) { + continue; + } + seen.add(absolute); + normalized.push(absolute); + } + return normalized; +} + +function pathMatchesAnyFilter(filters: string[], path: string): boolean { + if (filters.length === 0) { + return true; + } + const pathSegments = normalizePathSegments(path); + return filters.some((filter) => matchPathSegments(normalizePathSegments(filter), pathSegments)); +} + +function normalizePathSegments(path: string): string[] { + const absolute = path.startsWith("/") ? path : `/${path}`; + const trimmed = absolute.replace(/\/+$/, ""); + if (!trimmed) { + return []; + } + return trimmed.split("/").filter(Boolean); +} + +function matchPathSegments(pattern: string[], path: string[]): boolean { + if (pattern.length > 0 && pattern[pattern.length - 1] === "**") { + const prefix = pattern.slice(0, -1); + return path.length >= prefix.length && prefix.every((segment, index) => segment === "*" || segment === path[index]); + } + if (pattern.length !== path.length) { + return false; + } + return pattern.every((segment, index) => segment === "*" || segment === path[index]); +} + export interface RelayFileSyncSocket { addEventListener(type: "open", handler: (event: Event) => void): void; addEventListener(type: "message", handler: (event: MessageEvent) => void): void; @@ -249,6 +298,8 @@ export class RelayFileSync { private state: RelayFileSyncState = "idle"; private cursor?: string; + private readonly from: RelayFileSyncStart; + private readonly paths: string[]; private readonly polledEventIds: Set = new Set(); private readonly polledEventOrder: string[] = []; private firstPollComplete = false; @@ -293,7 +344,9 @@ export class RelayFileSync { const literal = options.token; this.tokenProvider = () => literal; } + this.from = options.from ?? "now"; this.cursor = options.cursor; + this.paths = normalizeWebSocketPathFilters(options.paths); this.onPollingFallback = options.onPollingFallback; this.pollIntervalMs = Math.max(1, Math.floor(options.pollIntervalMs ?? DEFAULT_POLL_INTERVAL_MS)); this.pingIntervalMs = Math.max(1, Math.floor(options.pingIntervalMs ?? DEFAULT_PING_INTERVAL_MS)); @@ -464,6 +517,14 @@ export class RelayFileSync { if (token) { url.searchParams.set("token", token); } + if (this.cursor) { + url.searchParams.set("cursor", this.cursor); + } else if (this.from === "now") { + url.searchParams.set("from", "now"); + } + for (const path of this.paths) { + url.searchParams.append("path", path); + } let socket: RelayFileSyncSocket; try { @@ -665,7 +726,7 @@ export class RelayFileSync { this.firstPollComplete = true; } else { for (const event of pending) { - this.emit("event", event); + this.emitFilesystemEvent(event); } } await this.sleep(this.pollIntervalMs); @@ -739,8 +800,18 @@ export class RelayFileSync { } const normalized = normalizeFilesystemEvent(parsed); + if (parsed.eventId) { + this.cursor = parsed.eventId; + } debugLog("event", { type: normalized.type, path: normalized.path, revision: normalized.revision }); - this.emit("event", normalized); + this.emitFilesystemEvent(normalized); + } + + private emitFilesystemEvent(event: FilesystemEvent): void { + if (!pathMatchesAnyFilter(this.paths, event.path)) { + return; + } + this.emit("event", event); } private startPingLoop(socket: RelayFileSyncSocket): void { diff --git a/packages/sdk/typescript/src/types.ts b/packages/sdk/typescript/src/types.ts index 2d35a181..15d0f5d8 100644 --- a/packages/sdk/typescript/src/types.ts +++ b/packages/sdk/typescript/src/types.ts @@ -310,6 +310,8 @@ export interface SubscribeOptions { coalesce?: "none" | "fire-once"; coalesceMs?: number; pathScope?: string[]; + from?: "now" | "legacy"; + cursor?: string; aclToken?: string; drainMs?: number; } @@ -326,6 +328,8 @@ export type ReplayOptions = export type ChangeStreamConnectionOptions = ReplayOptions & { workspaceId: string; aclToken?: string; + from?: "now" | "legacy"; + cursor?: string; }; export interface ChangeStreamConnection extends Subscription {