Add live factory subscription pickup#245
Conversation
|
Warning Review limit reached
More reviews will be available in 26 minutes and 25 seconds. Learn how PR review limits work. Your organization has run out of usage credits. Purchase more credits in the billing tab to continue. ⌛ How to resolve this issue?After more reviews become available, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans include higher PR review limits than trial, open-source, and free plans. In all cases, reviews become available again over time. During sustained high-volume PR review activity, CodeRabbit may temporarily slow when the next review becomes available. Please see our Fair Usage Limits Policy for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Plus Run ID: 📒 Files selected for processing (11)
📝 WalkthroughWalkthroughThis PR implements live subscription mode for the Factory, enabling real-time event-driven issue processing as an alternative to backfill-then-watch. It introduces configurable transport selection (subscribe, poll, or hybrid), event deduplication, cursor-based polling with in-flight guarding, arrival latency tracking, and real Linear issue validation throughout the flow. ChangesLive subscription feature
Sequence DiagramsequenceDiagram
participant Client
participant FactoryLoop
participant Subscription
participant Cache
participant Handlers
Client->>FactoryLoop: start({ mode: 'live' })
alt subscribe transport
FactoryLoop->>Subscription: subscribe(liveGlob)
Subscription->>FactoryLoop: changeEvent (real-time)
else poll transport
FactoryLoop->>FactoryLoop: schedule polling loop
FactoryLoop->>Subscription: getEvents(cursor)
end
FactoryLoop->>Cache: check dedup set for event key
alt event already seen
FactoryLoop->>FactoryLoop: skip (deduplicated)
else new event
FactoryLoop->>FactoryLoop: recordArrivalLatency
FactoryLoop->>FactoryLoop: check isRealLinearIssue
FactoryLoop->>Handlers: handleChange (if real)
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request introduces a live subscription mode to the Factory orchestrator, allowing it to process issues in real-time using either event subscriptions, polling, or a combination of both. It adds configuration schema options, event deduplication, latency tracking, and robust test coverage for these live subscription flows. The review feedback highlights two key areas for improvement: preventing a potential infinite loop in #currentEventCursor when the next cursor matches the current one, and refining eventCursorAfterPage to safely handle string-based event IDs when the initial cursor is undefined.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| async #currentEventCursor(limit: number): Promise<string | undefined> { | ||
| let cursor: string | undefined | ||
| for (;;) { | ||
| const page = await this.#mount.getEvents({ cursor, limit }) | ||
| cursor = eventCursorAfterPage(cursor, page.events, page.nextCursor) | ||
| if (!page.nextCursor) return cursor | ||
| } | ||
| } |
There was a problem hiding this comment.
The #currentEventCursor method can run into an infinite loop if page.nextCursor is returned but is equal to the current cursor (e.g., when there are no new events but the API continues to return the same cursor). To prevent this, add a check to terminate the loop when page.nextCursor === cursor, matching the safety check implemented in #pollLiveEvents.
| async #currentEventCursor(limit: number): Promise<string | undefined> { | |
| let cursor: string | undefined | |
| for (;;) { | |
| const page = await this.#mount.getEvents({ cursor, limit }) | |
| cursor = eventCursorAfterPage(cursor, page.events, page.nextCursor) | |
| if (!page.nextCursor) return cursor | |
| } | |
| } | |
| async #currentEventCursor(limit: number): Promise<string | undefined> { | |
| let cursor: string | undefined | |
| for (;;) { | |
| const page = await this.#mount.getEvents({ cursor, limit }) | |
| const nextCursor = eventCursorAfterPage(cursor, page.events, page.nextCursor) | |
| if (!page.nextCursor || page.nextCursor === cursor) return nextCursor | |
| cursor = nextCursor | |
| } | |
| } |
| const eventCursorAfterPage = ( | ||
| cursor: string | undefined, | ||
| events: ChangeEvent[], | ||
| nextCursor?: string | null, | ||
| ): string | undefined => { | ||
| if (nextCursor) return nextCursor | ||
| if (events.length === 0) return cursor | ||
| const numericCursor = cursor === undefined ? 0 : Number(cursor) | ||
| if (Number.isInteger(numericCursor) && numericCursor >= 0) { | ||
| return String(numericCursor + events.length) | ||
| } | ||
| return events.at(-1)?.id ?? cursor | ||
| } |
There was a problem hiding this comment.
In eventCursorAfterPage, when cursor is undefined, the code defaults numericCursor to 0 and treats it as a numeric cursor, returning String(events.length). However, if the event stream uses string-based IDs (like UUIDs) rather than numeric offsets, this will result in an invalid cursor (e.g., "5") being used for subsequent polls. It is safer to only treat the cursor as numeric if cursor is explicitly defined and is a valid integer, falling back to the last event's ID otherwise.
const eventCursorAfterPage = (
cursor: string | undefined,
events: ChangeEvent[],
nextCursor?: string | null,
): string | undefined => {
if (nextCursor) return nextCursor
if (events.length === 0) return cursor
if (cursor !== undefined) {
const numericCursor = Number(cursor)
if (Number.isInteger(numericCursor) && numericCursor >= 0) {
return String(numericCursor + events.length)
}
}
return events.at(-1)?.id ?? cursor
}|
Implemented the PR review fixes for the live subscription cursor path. Changed:
Addressed comments
Advisory NotesNone. Local validationPassed:
I did not run the macOS-only |
|
✅ pr-reviewer applied fixes — committed and pushed Implemented one real fix found during review: Also added regression coverage for normalized event-feed output and live duplicate suppression: Addressed comments
Advisory Notes
Validation run:
I did not print |
6332764 to
fb64895
Compare
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (3)
packages/factory-sdk/src/orchestrator/factory.ts (1)
437-447: 💤 Low valueConsider consolidating the
isRealLinearIssuechecks.The function checks
isRealLinearIssueat two points:
- Line 437: early exit in live mode (when
requireRealIssueis true) before the scope check- Line 445: always checked after the scope check
While line 437 is an optimization to avoid the scope check for non-real issues in live mode, having two exit paths for the same condition can reduce clarity. Consider whether the scope check is expensive enough to justify the early exit, or consolidate to a single check.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@packages/factory-sdk/src/orchestrator/factory.ts` around lines 437 - 447, There are two checks for isRealLinearIssue around the scope check (one gated by opts.requireRealIssue and one unconditional); consolidate them by performing a single isRealLinearIssue(issue) evaluation and gating the early return with opts.requireRealIssue only when required, or move the opts.requireRealIssue logic to combine with the scope check: replace the two separate returns with one combined conditional that uses isRealLinearIssue(issue) and opts.requireRealIssue and isInFactoryScope(issue, this.#config.safety) so the code only returns once and still preserves the short-circuit optimization when requireRealIssue is true.packages/factory-sdk/src/cli/fleet.ts (1)
196-196: ⚡ Quick winConsider making the live mode configurable via CLI flag.
The factory loop command now hardcodes
mode: 'live', removing flexibility for users who might want to choose between live and backfill modes. Consider adding a--modeflag (e.g.,--mode liveor--mode backfill) to allow runtime configuration.💡 Example implementation with CLI flag
Update the
parseFactoryCommandfunction to parse a--modeflag and pass it through the command structure, then use it here:- await factory.start({ mode: 'live' }) + await factory.start({ mode: globals.mode ?? 'live' })🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@packages/factory-sdk/src/cli/fleet.ts` at line 196, The current call await factory.start({ mode: 'live' }) hardcodes live mode; update the CLI to accept a --mode flag and pass its value into factory.start. Modify parseFactoryCommand to parse a mode option (accepting 'live' or 'backfill') with a default of 'live', ensure the command structure exposes that mode, and replace the hardcoded object in fleet.ts so factory.start({ mode }) uses the parsed value; validate the mode and fall back to 'live' on invalid input.packages/factory-sdk/src/orchestrator/factory.test.ts (1)
449-473: Clarify that pre-start replay suppression during live polling is cursor-based, not timestamp-filtered.
- For
liveSubscription: { transport: 'poll' }, the factory sets#liveEventCursorby consuming the currentmount.getEvents()history, then subsequent polls fetchgetEvents({ cursor }); it does not useevent.occurredAtto decide which events to replay.event.occurredAtis only used for arrival-latency metrics (#recordArrivalLatency), so the “future timestamp” in the test doesn’t affect whether the pre-start event is ignored—only the cursor advancement does.- For readability, consider not hard-coding a future
occurredAtin this test unless you’re explicitly aiming to exercise latency behavior.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@packages/factory-sdk/src/orchestrator/factory.test.ts` around lines 449 - 473, The test is misleading because live polling suppresses pre-start history by advancing the factory's internal live event cursor (`#liveEventCursor`) via consuming mount.getEvents(), not by filtering on event.occurredAt; event.occurredAt is only used for arrival-latency recording (`#recordArrivalLatency`). Update the test in factory.test.ts (the case using createFactory with liveSubscription.transport='poll' and FakeMountClient) to either remove the artificially future occurredAt timestamp on the pre-start changeEvent or add a brief comment clarifying that suppression is cursor-based and the future timestamp does not affect replay; ensure no assertions rely on occurredAt to determine replay behavior and that the test exercises cursor advancement via mount.getEvents()/getEvents({ cursor }).
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@packages/factory-sdk/src/orchestrator/factory.ts`:
- Around line 185-192: The startup delay comes from `#currentEventCursor`
paginating all historical events via this.#mount.getEvents and
eventCursorAfterPage; to fix, first check if the mount/SDK exposes a direct
cursor API (e.g. this.#mount.getCurrentCursor, this.#mount.getCursor, or
similar) and call that and return its value instead of paginating; if no direct
method exists, add an optional fast-start/skipCatchup flag or a new
this.#mount.getLatestCursor helper and use it (falling back to the existing
pagination code only if the direct API is absent), referencing
`#currentEventCursor`, this.#mount.getEvents and eventCursorAfterPage to locate
where to implement the change.
- Around line 165-175: The polling branch incorrectly checks only for
options.transport === 'poll', so the hybrid mode 'subscribe-and-poll' never
schedules polls; update the condition that sets this.#liveEventCursor and calls
this.#scheduleLivePoll(...) to run when options.transport is 'poll' OR
'subscribe-and-poll' (i.e., include both values), using the existing methods
`#currentEventCursor`(...) to initialize this.#liveEventCursor and
`#scheduleLivePoll`(0, options) to start the loop; keep the existing subscription
logic that assigns this.#subscription and calls this.#handleLiveChange(...)
unchanged.
---
Nitpick comments:
In `@packages/factory-sdk/src/cli/fleet.ts`:
- Line 196: The current call await factory.start({ mode: 'live' }) hardcodes
live mode; update the CLI to accept a --mode flag and pass its value into
factory.start. Modify parseFactoryCommand to parse a mode option (accepting
'live' or 'backfill') with a default of 'live', ensure the command structure
exposes that mode, and replace the hardcoded object in fleet.ts so
factory.start({ mode }) uses the parsed value; validate the mode and fall back
to 'live' on invalid input.
In `@packages/factory-sdk/src/orchestrator/factory.test.ts`:
- Around line 449-473: The test is misleading because live polling suppresses
pre-start history by advancing the factory's internal live event cursor
(`#liveEventCursor`) via consuming mount.getEvents(), not by filtering on
event.occurredAt; event.occurredAt is only used for arrival-latency recording
(`#recordArrivalLatency`). Update the test in factory.test.ts (the case using
createFactory with liveSubscription.transport='poll' and FakeMountClient) to
either remove the artificially future occurredAt timestamp on the pre-start
changeEvent or add a brief comment clarifying that suppression is cursor-based
and the future timestamp does not affect replay; ensure no assertions rely on
occurredAt to determine replay behavior and that the test exercises cursor
advancement via mount.getEvents()/getEvents({ cursor }).
In `@packages/factory-sdk/src/orchestrator/factory.ts`:
- Around line 437-447: There are two checks for isRealLinearIssue around the
scope check (one gated by opts.requireRealIssue and one unconditional);
consolidate them by performing a single isRealLinearIssue(issue) evaluation and
gating the early return with opts.requireRealIssue only when required, or move
the opts.requireRealIssue logic to combine with the scope check: replace the two
separate returns with one combined conditional that uses
isRealLinearIssue(issue) and opts.requireRealIssue and isInFactoryScope(issue,
this.#config.safety) so the code only returns once and still preserves the
short-circuit optimization when requireRealIssue is true.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro Plus
Run ID: a72253a4-ac56-4e9d-ae8a-436988f9b1f7
📒 Files selected for processing (9)
packages/factory-sdk/src/cli/fleet.tspackages/factory-sdk/src/config/schema.tspackages/factory-sdk/src/index.tspackages/factory-sdk/src/mount/relayfile-cloud-mount-client.tspackages/factory-sdk/src/orchestrator/factory.test.tspackages/factory-sdk/src/orchestrator/factory.tspackages/factory-sdk/src/orchestrator/index.tspackages/factory-sdk/src/ports/mount.tspackages/factory-sdk/src/types.ts
| if (options.transport !== 'poll') { | ||
| this.#subscription = this.#mount.subscribe([LIVE_ISSUE_GLOB], (event) => { | ||
| void this.#handleLiveChange(event) | ||
| }, { from: 'now', coalesce: 'none' }) | ||
| } | ||
|
|
||
| if (options.transport === 'poll') { | ||
| this.#liveEventCursor = await this.#currentEventCursor(options.eventLimit) | ||
| this.#scheduleLivePoll(0, options) | ||
| } | ||
| } |
There was a problem hiding this comment.
Critical: 'subscribe-and-poll' transport doesn't enable polling.
The condition on line 171 only enables polling when transport === 'poll', but the hybrid 'subscribe-and-poll' mode should also enable polling. Currently, 'subscribe-and-poll' only subscribes (line 166) but never schedules the poll loop (lines 172-174), breaking the intended hybrid behavior.
🐛 Proposed fix
- if (options.transport === 'poll') {
+ if (options.transport === 'poll' || options.transport === 'subscribe-and-poll') {
this.#liveEventCursor = await this.#currentEventCursor(options.eventLimit)
this.#scheduleLivePoll(0, options)
}🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/factory-sdk/src/orchestrator/factory.ts` around lines 165 - 175, The
polling branch incorrectly checks only for options.transport === 'poll', so the
hybrid mode 'subscribe-and-poll' never schedules polls; update the condition
that sets this.#liveEventCursor and calls this.#scheduleLivePoll(...) to run
when options.transport is 'poll' OR 'subscribe-and-poll' (i.e., include both
values), using the existing methods `#currentEventCursor`(...) to initialize
this.#liveEventCursor and `#scheduleLivePoll`(0, options) to start the loop; keep
the existing subscription logic that assigns this.#subscription and calls
this.#handleLiveChange(...) unchanged.
9a693eb to
f2da103
Compare
Summary
/linear/issues/**pickup with subscribe and getEvents polling transportsstate_namevalues to known state idsTests
npx vitest run packages/factory-sdknpx tsc --noEmit -p tsconfig.node.json