Add Cloudflare Flue SDK adapters to cloudflare-runtime#100
Conversation
Implements four new adapters in @agent-assistant/cloudflare-runtime that leverage Cloudflare's newly announced Flue SDK and existing platform primitives: - CfShellVfsProvider: VfsProvider backed by @cloudflare/shell, wrapping the SQLite-backed virtual filesystem for read/list/grep operations - CfKvSessionStoreAdapter: SessionStoreAdapter using KVNamespace, filling the previously missing Cloudflare adapter for the sessions package - CfWorkflowSchedulerBinding: SchedulerBinding for the proactive engine using runWorkflow(), giving watch rules durable delay and retry - CfFiberTurnExecutor: ContinuationHarnessAdapter wrapping runFiber()/stash() so resumed turns survive Worker process interruptions without re-running model or tool calls All three new package dependencies (@agent-assistant/sessions, proactive, vfs) are optional peerDependencies so the main bundle stays lean for consumers that only need a subset of adapters. 82 tests pass. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_012qgiUVUBzg9YtUAPKCVwke
…ions Scaffolds a new package mirroring the structure of cloudflare-runtime, giving agent-assistant deployments on Google Cloud the same adapter coverage as the Cloudflare path. Six adapters, all tested against in-memory fakes of the GCP SDK interfaces: - GcpFirestoreSessionStoreAdapter: SessionStoreAdapter backed by Firestore — uses create() for conflict-safe inserts, where() for efficient userId/workspaceId queries, in-memory filtering for the rest - GcpFirestoreContinuationStore: ContinuationStore backed by Firestore including findByTrigger() via field path queries on waitFor.approvalId, waitFor.operationId, and waitFor.wakeUpId - GcpCloudTasksSchedulerBinding: SchedulerBinding for @agent-assistant/proactive using Cloud Tasks HTTP push tasks with scheduleTime for deferred delivery; supports OIDC auth via serviceAccountEmail - GcpCloudTasksContinuationScheduler: ContinuationSchedulerAdapter using Cloud Tasks — encodes continuationId + trigger in the task body for the consumer's resume endpoint to parse - GcpStorageVfsProvider: VfsProvider backed by Google Cloud Storage with in-memory grep for search (consumers should front with a search index for high-cardinality buckets) - GcpIdempotentTurnExecutor: ContinuationHarnessAdapter equivalent to CfFiberTurnExecutor — stashes HarnessResult to Firestore after first completion so recovered Workers never re-run model/tool calls All GCP SDK dependencies are typed via minimal local interfaces so the package ships zero GCP SDK bytes; consumers pass real SDK objects that satisfy those interfaces. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_012qgiUVUBzg9YtUAPKCVwke
|
Warning You have reached your daily quota limit. Please wait up to 24 hours and I will start processing your requests again! |
📝 WalkthroughWalkthroughExtends ChangesCloudflare runtime – new adapters and public API
New
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: a5beefb810
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| const raw = await this.kv.get(indexKey); | ||
| const ids: string[] = raw ? (JSON.parse(raw) as string[]) : []; | ||
| if (!ids.includes(sessionId)) { | ||
| ids.push(sessionId); | ||
| await this.kv.put(indexKey, JSON.stringify(ids)); |
There was a problem hiding this comment.
Avoid read-modify-write session indexes
When two inserts or workspace/user updates for the same index run concurrently, both calls can read the same JSON array here and then the later put overwrites the earlier one, so one existing session is missing from fetchMany({ userId }) or fetchMany({ workspaceId }). KV only gives per-key writes in this adapter, so this index shape needs per-session index keys or serialization rather than a shared array update.
Useful? React with 👍 / 👎.
| const listed = await this.kv.list({ prefix: `${this.prefix}${SESSION_KEY_PREFIX}` }); | ||
| const sessions = await Promise.all( | ||
| listed.keys.map((key) => this.kv.get(key.name).then((raw) => (raw ? (JSON.parse(raw) as Session) : null))), |
There was a problem hiding this comment.
Page through all KV list results
When callers fetch without a userId or workspaceId (including state-only queries), scanAll() only reads the first page returned by Workers KV. Namespaces with more than one list() page will silently omit later sessions before filters and limits are applied, so this needs to follow the returned cursor until the listing is complete.
Useful? React with 👍 / 👎.
| parent, | ||
| task: { | ||
| name: taskName, | ||
| scheduleTime: { seconds: Math.floor((Date.now() + delayMs) / 1000) }, |
There was a problem hiding this comment.
Use the injected clock for task schedule time
When a caller supplies now (for tests or a controlled scheduler clock), delayMs is computed against that clock but the final scheduleTime is anchored to real Date.now(), so the Cloud Task can be scheduled too early or too late whenever the injected clock differs from wall time. Since wakeAtMs is already an absolute timestamp, derive scheduleTime from input.wakeAtMs or consistently from this.now().
Useful? React with 👍 / 👎.
|
✅ pr-reviewer applied fixes — committed and pushed This is a logic inconsistency worth noting as a review comment (not editing): All findings are documented. Here is my review. Review SummaryPR #100 is additive: it introduces a new Findings & fixes1. Build break in
Both directly cast Verification (full build/test, not just touched files)
Note: CI's Addressed comments
Advisory Notes
The build break is fixed and everything verifies green locally, but the remaining items (clock consistency, dependency-range confirmation, CI coverage) are maintainer decisions, and I cannot confirm GitHub's mergeable/CI-check state from here. I am not printing READY. |
PR #100 Review — Add
|
There was a problem hiding this comment.
Actionable comments posted: 14
🧹 Nitpick comments (6)
packages/cloudflare-runtime/src/adapters/cf-workflow-scheduler-binding.test.ts (1)
70-76: ⚡ Quick winAdd a test that unexpected cancellation errors are rethrown.
Alongside the “non-existent bindingId is no-op” case, add a case where
get()orterminate()fails with a non-benign error and assert rejection. This prevents accidental reintroduction of broad error swallowing.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@packages/cloudflare-runtime/src/adapters/cf-workflow-scheduler-binding.test.ts` around lines 70 - 76, Add a new test case after the existing "non-existent bindingId" test in the CfWorkflowSchedulerBinding test file that verifies unexpected errors are properly rethrown. The test should mock the binding to throw an error from either the get() or terminate() methods (simulating a real failure rather than a benign not-found case), then assert that calling scheduler.cancelWakeUp() rejects with that error. This ensures the implementation doesn't accidentally swallow all errors and only allows the no-op behavior for the specific case of non-existent IDs.packages/cloudflare-runtime/src/adapters/cf-kv-session-store-adapter.test.ts (1)
17-23: ⚡ Quick winAdd regression coverage for cursor pagination and immutable
idupdates.The suite currently won’t fail if
scanAll()ignores KV pagination or ifupdate(sessionId, { id: ... })mutates identity. Adding these two tests will lock in expected adapter behavior.Also applies to: 63-70, 108-115
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@packages/cloudflare-runtime/src/adapters/cf-kv-session-store-adapter.test.ts` around lines 17 - 23, Add two new regression tests to the test suite for the cf-kv-session-store-adapter. First, add a test that verifies the scanAll() method correctly handles cursor-based pagination by ensuring it continues iterating through pages when list_complete is false and processes all keys across multiple cursor iterations. Second, add a test that verifies calling update(sessionId, { id: newId }) does not mutate the session's identity by asserting that the id field remains unchanged after an update attempt with a different id value. These tests should be added in the test file alongside the existing test cases.packages/cloudflare-runtime/src/adapters/cf-shell-vfs-provider.test.ts (1)
79-88: ⚡ Quick winAdd a regression test for prefix-collision namespaces (
ws1vsws10).Current prefix coverage checks
read()only. Add one case that provessearch()(and optionallylist('')) excludes similarly-prefixed namespaces.🧪 Suggested test addition
describe('CfShellVfsProvider', () => { @@ it('applies prefix to all operations', async () => { @@ }); + + it('does not include similarly-prefixed namespaces in search', async () => { + const provider = new CfShellVfsProvider({ + shell: makeShell({ + 'ws1/src/a.ts': 'hello from ws1', + 'ws10/src/b.ts': 'hello from ws10', + }), + prefix: 'ws1', + }); + + const results = await provider.search('hello'); + expect(results.map((r) => r.path)).toEqual(['src/a.ts']); + });🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@packages/cloudflare-runtime/src/adapters/cf-shell-vfs-provider.test.ts` around lines 79 - 88, Add a new regression test case to the test file that specifically tests prefix-collision scenarios. Create a test that sets up files in multiple workspaces with similar prefixes (such as files in both 'ws1' and 'ws10' directories) and then instantiate CfShellVfsProvider with prefix 'ws1'. In this test, call the search() method (and optionally list('')) and verify that results only include files from the 'ws1' namespace and properly exclude files from the similarly-named 'ws10' namespace. This will ensure prefix matching is exact and does not have false-positive collisions.packages/gcp-runtime/src/adapters/gcp-cloud-tasks-scheduler-binding.test.ts (1)
88-92: ⚡ Quick winAdd a cancellation test for non-ignorable delete errors.
Please add coverage that asserts
cancelWakeUprejects whendeleteTaskfails for reasons other than “missing/already fired.”Suggested test case
+ it('rethrows unexpected deleteTask failures', async () => { + const { client } = makeClient(); + client.deleteTask = async () => { + throw new Error('boom'); + }; + const scheduler = new GcpCloudTasksSchedulerBinding({ ...opts, client }); + await expect(scheduler.cancelWakeUp('any-task')).rejects.toThrow('boom'); + });🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@packages/gcp-runtime/src/adapters/gcp-cloud-tasks-scheduler-binding.test.ts` around lines 88 - 92, Add a new test case after the existing no-op test to verify that the cancelWakeUp method properly rejects when deleteTask encounters a non-ignorable error (i.e., an error that is not due to the task being missing or already fired). Set up the mock client to throw an error from the deleteTask method call, then assert that calling scheduler.cancelWakeUp with a task name results in the promise rejecting with that error, ensuring error handling is properly tested in the GcpCloudTasksSchedulerBinding class.packages/gcp-runtime/src/adapters/gcp-cloud-tasks-continuation-scheduler.test.ts (1)
72-80: ⚡ Quick winAdd regression coverage for
scheduleTimewith injectednow().Please add a test that asserts
scheduleTime.secondsis derived fromwakeAtMs(or max of wakeAtMs and now), using a fixednowprovider.Suggested test case
+ it('uses wakeAtMs when now() is injected', async () => { + const { tasks, client } = makeClient(); + const fixedNow = 1_700_000_000_000; + const wakeAtMs = fixedNow + 30_000; + const scheduler = new GcpCloudTasksContinuationScheduler({ + ...baseOpts, + client, + now: () => fixedNow, + }); + + await scheduler.scheduleWake({ continuationId: 'cont-abc', wakeAtMs }); + const task = [...tasks.values()][0] as { scheduleTime: { seconds: number } }; + expect(task.scheduleTime.seconds).toBe(Math.floor(wakeAtMs / 1000)); + });🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@packages/gcp-runtime/src/adapters/gcp-cloud-tasks-continuation-scheduler.test.ts` around lines 72 - 80, The test suite lacks coverage for verifying that scheduleTime.seconds is correctly derived from wakeAtMs using an injected now provider. Add a new test case that instantiates GcpCloudTasksContinuationScheduler with a controlled now provider (fixed timestamp), calls scheduleWake with a specific wakeAtMs value, and asserts that the returned scheduleTime.seconds matches the maximum of the injected now value and wakeAtMs. This will ensure the scheduling calculation correctly handles the timing logic.packages/gcp-runtime/src/adapters/gcp-idempotent-turn-executor.test.ts (1)
55-108: ⚡ Quick winAdd a concurrent duplicate-delivery test for the same
resumedTurnId.Current coverage is sequential; it doesn’t assert behavior when two
runResumedTurncalls race on one idempotency key. Adding that case will lock in the intended single-execution guarantee.Example test shape
+ it('executes inner at most once when two calls race on the same resumedTurnId', async () => { + let release!: () => void; + const gate = new Promise<void>((resolve) => { release = resolve; }); + const inner: ContinuationHarnessAdapter = { + runResumedTurn: vi.fn().mockImplementation(async () => { + await gate; + return successResult; + }), + }; + const stash = makeStashCollection(); + const executor = new GcpIdempotentTurnExecutor({ inner, stashCollection: stash }); + const input = makeInput('turn-race'); + + const p1 = executor.runResumedTurn(input); + const p2 = executor.runResumedTurn(input); + release(); + await Promise.allSettled([p1, p2]); + + expect(inner.runResumedTurn).toHaveBeenCalledTimes(1); + });🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@packages/gcp-runtime/src/adapters/gcp-idempotent-turn-executor.test.ts` around lines 55 - 108, The test suite for GcpIdempotentTurnExecutor currently only covers sequential scenarios and lacks concurrent duplicate-delivery test coverage. Add a new test case in the describe block that verifies the idempotency behavior when two runResumedTurn calls with the same resumedTurnId are invoked concurrently (using Promise.all or similar). The test should assert that both concurrent calls complete and return the same result, while the inner harness's runResumedTurn method is called exactly once, demonstrating that the single-execution guarantee is maintained even under concurrent duplicate requests.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@packages/cloudflare-runtime/src/adapters/cf-kv-session-store-adapter.ts`:
- Around line 61-63: The code in the slice operation with query.limit does not
validate that the limit value is non-negative, which causes unexpected behavior
when negative values are passed (e.g., slice(0, -1)). Add a validation check
before the slice call to ensure query.limit is greater than or equal to 0,
either by adding a condition to only slice when limit >= 0, or by normalizing
any negative limit values to 0 before applying the slice operation.
- Around line 77-81: The update() method in the CF KV session store adapter
creates an updated session object that allows the patch parameter to override
the session id field, which can cause the session's id to diverge from the
storage key (sessionId). After creating the updated object by spreading existing
and patch, explicitly reset the id property back to the original sessionId to
prevent patch from mutating the session identity and ensure consistency between
the session's id field and its storage key.
- Around line 23-39: The insert method has a race condition where concurrent
requests can insert duplicate sessions between the conflict check at line 24 and
the actual put operation at line 34, and the addToIndex and removeFromIndex
methods perform non-atomic read-modify-write operations that can lose data under
concurrent access. Refactor the index storage pattern from shared JSON arrays to
per-membership keys (using patterns like user:<uid>:session:<sid> and
workspace:<wid>:session:<sid>) to eliminate shared mutable state and ensure each
operation is atomic, or alternatively implement coordination through a Durable
Object that handles all index mutations sequentially. Update the insert, update,
and delete methods along with the addToIndex and removeFromIndex helper methods
to use the new key structure.
- Around line 146-151: The scanAll() method is missing pagination handling for
the kv.list() API, which will silently miss sessions when the key count exceeds
the page limit of approximately 1,000 keys. Modify the scanAll() method to
implement pagination by checking the list_complete property returned from
kv.list() and using the cursor property for subsequent calls. Create a loop that
continues fetching pages by passing the cursor from the previous response to the
next kv.list() call until list_complete is true, accumulating all the keys from
every page before processing and filtering them into the final sessions array.
In `@packages/cloudflare-runtime/src/adapters/cf-shell-vfs-provider.ts`:
- Around line 55-57: The search() and strip() methods use simple string prefix
matching without verifying path segment boundaries, allowing paths like
"ws10/file.ts" to incorrectly match prefix "ws1" and causing cross-namespace
leakage. Implement segment-safe prefix validation in the search() method by
checking that the character immediately following this.prefix is either a path
separator or the end of the string before including results in the mapping.
Additionally, add filtering logic to drop any paths from the grep results that
do not match this segment-safe prefix requirement before the strip() operation
processes them, ensuring only properly scoped paths are returned.
In `@packages/cloudflare-runtime/src/adapters/cf-workflow-scheduler-binding.ts`:
- Around line 41-48: The catch block in the cancelWakeUp method currently
suppresses all errors indiscriminately with a bare catch statement, but it
should only suppress errors that indicate the instance was not found or already
completed. Modify the catch block to inspect the caught error and determine its
type: only suppress errors that represent not-found conditions or
already-completed states (check the error message or error type/code returned by
the Workflow binding), and rethrow all other errors such as network failures,
authentication errors, or service errors so that callers have visibility into
actual termination failures.
In `@packages/gcp-runtime/src/adapters/gcp-cloud-tasks-continuation-scheduler.ts`:
- Around line 80-87: The code uses two different clock sources for scheduling:
`this.now()` is used to calculate `delayMs`, but `Date.now()` is used to
calculate `scheduleTime`. This inconsistency causes incorrect scheduling because
the delay is calculated relative to the injected `this.now()` but then added to
`Date.now()`, which bypasses the injected clock behavior. Replace `Date.now()`
with `this.now()` when calculating the `scheduleTime` value in the `createTask`
method call to ensure a single consistent clock source is used throughout the
scheduling logic.
- Around line 94-100: The catch block in the cancelWake method catches and
ignores all errors indiscriminately, which hides real deletion failures. Instead
of using an empty catch block, inspect the caught error to determine if it
represents an ignorable condition (such as a task not found or NOT_FOUND error
code), and only suppress the error in those cases. For all other error types and
unexpected failures, rethrow the error so that genuine deletion problems are not
silently hidden.
In `@packages/gcp-runtime/src/adapters/gcp-cloud-tasks-scheduler-binding.ts`:
- Around line 104-109: The catch block in the cancelWakeUp method is suppressing
all errors from deleteTask indiscriminately, which hides real failures. Instead
of catching and ignoring all exceptions, inspect the error within the catch
block to determine if it represents the expected "not found" or "already
deleted" case. Only suppress that specific error condition (typically by
checking the error code or message for "not found"), and rethrow any other
errors to ensure real failures are properly surfaced to callers.
In `@packages/gcp-runtime/src/adapters/gcp-firestore-continuation-store.ts`:
- Around line 45-55: The findByTrigger method is missing a status filter in its
Firestore query, which means it can return non-pending continuation records when
it should only return pending ones. Modify the query chain in findByTrigger to
add an additional where clause that filters by the status field with a value of
pending, placing this constraint after the constraint derived from
triggerConstraint but before the limit and get calls. This will ensure the
method enforces its documented contract of finding only pending continuations.
In `@packages/gcp-runtime/src/adapters/gcp-firestore-session-store-adapter.ts`:
- Around line 84-85: The validation for query.limit at the sessions slice
operation does not check if the limit value is negative, which causes unexpected
behavior when passed to the slice method (negative indices change slice
semantics). Add a validation check after the null check for query.limit to
ensure the value is non-negative (>= 0), either by clamping it to Math.max(0,
query.limit) or by rejecting negative values with an error before performing the
slice operation.
- Around line 88-94: The update() method has a time-of-check-time-of-use
(TOCTOU) gap where the document can be deleted between the existence check and
the actual update operation, causing Firestore to throw a provider-specific gRPC
error instead of the adapter's SessionNotFoundError. Additionally, the final
fetch lacks validation that the document exists. Wrap the ref.update(patch) call
in a try-catch block to catch Firestore errors and convert NOT_FOUND errors
(gRPC status code 5) to notFoundError(sessionId). After fetching the updated
document with await ref.get(), validate that the snapshot exists before
returning updated.data() as Session, throwing notFoundError(sessionId) if the
document no longer exists.
In `@packages/gcp-runtime/src/adapters/gcp-idempotent-turn-executor.ts`:
- Around line 38-49: The runResumedTurn method in the GcpIdempotentTurnExecutor
class has a race condition where concurrent requests with the same resumedTurnId
can both miss the stash and execute the inner harness twice. Before calling
this.inner.runResumedTurn, introduce an atomic claim step using a Firestore
transaction or a create-lock pattern that atomically tries to claim exclusive
execution rights for the given resumedTurnId. Only the winner of this claim
should proceed to execute this.inner.runResumedTurn and publish the result to
the stash, while other concurrent attempts should wait for or retrieve the
result from the stashed data once the winner completes.
In `@packages/gcp-runtime/src/adapters/gcp-storage-vfs-provider.ts`:
- Around line 77-87: The search() method creates a RegExp directly from the
user-provided query parameter without escaping special regex characters, which
can cause ReDoS attacks or runtime errors. Modify the code at line 77 where new
RegExp(query) is constructed to first escape the query parameter by replacing
special regex metacharacters with their escaped equivalents, ensuring the query
is treated as literal text rather than a regex pattern. This prevents malicious
or invalid patterns from causing performance issues or exceptions.
---
Nitpick comments:
In
`@packages/cloudflare-runtime/src/adapters/cf-kv-session-store-adapter.test.ts`:
- Around line 17-23: Add two new regression tests to the test suite for the
cf-kv-session-store-adapter. First, add a test that verifies the scanAll()
method correctly handles cursor-based pagination by ensuring it continues
iterating through pages when list_complete is false and processes all keys
across multiple cursor iterations. Second, add a test that verifies calling
update(sessionId, { id: newId }) does not mutate the session's identity by
asserting that the id field remains unchanged after an update attempt with a
different id value. These tests should be added in the test file alongside the
existing test cases.
In `@packages/cloudflare-runtime/src/adapters/cf-shell-vfs-provider.test.ts`:
- Around line 79-88: Add a new regression test case to the test file that
specifically tests prefix-collision scenarios. Create a test that sets up files
in multiple workspaces with similar prefixes (such as files in both 'ws1' and
'ws10' directories) and then instantiate CfShellVfsProvider with prefix 'ws1'.
In this test, call the search() method (and optionally list('')) and verify that
results only include files from the 'ws1' namespace and properly exclude files
from the similarly-named 'ws10' namespace. This will ensure prefix matching is
exact and does not have false-positive collisions.
In
`@packages/cloudflare-runtime/src/adapters/cf-workflow-scheduler-binding.test.ts`:
- Around line 70-76: Add a new test case after the existing "non-existent
bindingId" test in the CfWorkflowSchedulerBinding test file that verifies
unexpected errors are properly rethrown. The test should mock the binding to
throw an error from either the get() or terminate() methods (simulating a real
failure rather than a benign not-found case), then assert that calling
scheduler.cancelWakeUp() rejects with that error. This ensures the
implementation doesn't accidentally swallow all errors and only allows the no-op
behavior for the specific case of non-existent IDs.
In
`@packages/gcp-runtime/src/adapters/gcp-cloud-tasks-continuation-scheduler.test.ts`:
- Around line 72-80: The test suite lacks coverage for verifying that
scheduleTime.seconds is correctly derived from wakeAtMs using an injected now
provider. Add a new test case that instantiates
GcpCloudTasksContinuationScheduler with a controlled now provider (fixed
timestamp), calls scheduleWake with a specific wakeAtMs value, and asserts that
the returned scheduleTime.seconds matches the maximum of the injected now value
and wakeAtMs. This will ensure the scheduling calculation correctly handles the
timing logic.
In `@packages/gcp-runtime/src/adapters/gcp-cloud-tasks-scheduler-binding.test.ts`:
- Around line 88-92: Add a new test case after the existing no-op test to verify
that the cancelWakeUp method properly rejects when deleteTask encounters a
non-ignorable error (i.e., an error that is not due to the task being missing or
already fired). Set up the mock client to throw an error from the deleteTask
method call, then assert that calling scheduler.cancelWakeUp with a task name
results in the promise rejecting with that error, ensuring error handling is
properly tested in the GcpCloudTasksSchedulerBinding class.
In `@packages/gcp-runtime/src/adapters/gcp-idempotent-turn-executor.test.ts`:
- Around line 55-108: The test suite for GcpIdempotentTurnExecutor currently
only covers sequential scenarios and lacks concurrent duplicate-delivery test
coverage. Add a new test case in the describe block that verifies the
idempotency behavior when two runResumedTurn calls with the same resumedTurnId
are invoked concurrently (using Promise.all or similar). The test should assert
that both concurrent calls complete and return the same result, while the inner
harness's runResumedTurn method is called exactly once, demonstrating that the
single-execution guarantee is maintained even under concurrent duplicate
requests.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro Plus
Run ID: b5ed1118-4b01-486a-b172-3e59e1125b90
⛔ Files ignored due to path filters (1)
package-lock.jsonis excluded by!**/package-lock.json
📒 Files selected for processing (27)
package.jsonpackages/cloudflare-runtime/package.jsonpackages/cloudflare-runtime/src/adapters/cf-fiber-turn-executor.test.tspackages/cloudflare-runtime/src/adapters/cf-fiber-turn-executor.tspackages/cloudflare-runtime/src/adapters/cf-kv-session-store-adapter.test.tspackages/cloudflare-runtime/src/adapters/cf-kv-session-store-adapter.tspackages/cloudflare-runtime/src/adapters/cf-shell-vfs-provider.test.tspackages/cloudflare-runtime/src/adapters/cf-shell-vfs-provider.tspackages/cloudflare-runtime/src/adapters/cf-workflow-scheduler-binding.test.tspackages/cloudflare-runtime/src/adapters/cf-workflow-scheduler-binding.tspackages/cloudflare-runtime/src/index.tspackages/gcp-runtime/package.jsonpackages/gcp-runtime/src/adapters/gcp-cloud-tasks-continuation-scheduler.test.tspackages/gcp-runtime/src/adapters/gcp-cloud-tasks-continuation-scheduler.tspackages/gcp-runtime/src/adapters/gcp-cloud-tasks-scheduler-binding.test.tspackages/gcp-runtime/src/adapters/gcp-cloud-tasks-scheduler-binding.tspackages/gcp-runtime/src/adapters/gcp-firestore-continuation-store.test.tspackages/gcp-runtime/src/adapters/gcp-firestore-continuation-store.tspackages/gcp-runtime/src/adapters/gcp-firestore-session-store-adapter.test.tspackages/gcp-runtime/src/adapters/gcp-firestore-session-store-adapter.tspackages/gcp-runtime/src/adapters/gcp-idempotent-turn-executor.test.tspackages/gcp-runtime/src/adapters/gcp-idempotent-turn-executor.tspackages/gcp-runtime/src/adapters/gcp-storage-vfs-provider.test.tspackages/gcp-runtime/src/adapters/gcp-storage-vfs-provider.tspackages/gcp-runtime/src/index.tspackages/gcp-runtime/tsconfig.jsonpackages/gcp-runtime/vitest.config.ts
| async insert(session: Session): Promise<void> { | ||
| const existing = await this.kv.get(this.sessionKey(session.id)); | ||
| if (existing !== null) { | ||
| // Throw without importing the error class — same message shape the store expects. | ||
| const err = new Error(`Session already exists: ${session.id}`) as Error & { name: string; sessionId: string }; | ||
| err.name = 'SessionConflictError'; | ||
| err.sessionId = session.id; | ||
| throw err; | ||
| } | ||
|
|
||
| await Promise.all([ | ||
| this.kv.put(this.sessionKey(session.id), JSON.stringify(session)), | ||
| this.addToIndex(this.userIndexKey(session.userId), session.id), | ||
| session.workspaceId | ||
| ? this.addToIndex(this.workspaceIndexKey(session.workspaceId), session.id) | ||
| : Promise.resolve(), | ||
| ]); |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
fd -t f 'cf-kv-session-store-adapter' --type fRepository: AgentWorkforce/agent-assistant
Length of output: 223
🏁 Script executed:
cat -n packages/cloudflare-runtime/src/adapters/cf-kv-session-store-adapter.tsRepository: AgentWorkforce/agent-assistant
Length of output: 7816
Index operations and conflict detection lack atomic guarantees, creating data loss risks under concurrent access.
addToIndex and removeFromIndex (lines 126-143) perform read-modify-write cycles on shared JSON index arrays without atomicity: concurrent calls targeting the same user/workspace index will race, with one operation's changes overwriting the other and dropping session IDs. The insert method's conflict check (line 24) is similarly vulnerable—another request can insert the same session between the get and put (line 34).
In a Cloudflare Workers environment handling concurrent requests, this pattern will cause data loss. Consider using per-membership keys (user:<uid>:session:<sid>, workspace:<wid>:session:<sid>) to eliminate shared mutable state, or route all index mutations through a Durable Object for strongly-consistent coordination.
This applies to insert (lines 23-39), update (lines 83-97), and delete (lines 103-114).
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/cloudflare-runtime/src/adapters/cf-kv-session-store-adapter.ts`
around lines 23 - 39, The insert method has a race condition where concurrent
requests can insert duplicate sessions between the conflict check at line 24 and
the actual put operation at line 34, and the addToIndex and removeFromIndex
methods perform non-atomic read-modify-write operations that can lose data under
concurrent access. Refactor the index storage pattern from shared JSON arrays to
per-membership keys (using patterns like user:<uid>:session:<sid> and
workspace:<wid>:session:<sid>) to eliminate shared mutable state and ensure each
operation is atomic, or alternatively implement coordination through a Durable
Object that handles all index mutations sequentially. Update the insert, update,
and delete methods along with the addToIndex and removeFromIndex helper methods
to use the new key structure.
| if (query.limit != null) { | ||
| sessions = sessions.slice(0, query.limit); | ||
| } |
There was a problem hiding this comment.
Validate query.limit before slicing.
Negative limits currently produce surprising results (slice(0, -1) etc.). Guard limit >= 0 (or normalize negatives to 0) before applying.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/cloudflare-runtime/src/adapters/cf-kv-session-store-adapter.ts`
around lines 61 - 63, The code in the slice operation with query.limit does not
validate that the limit value is non-negative, which causes unexpected behavior
when negative values are passed (e.g., slice(0, -1)). Add a validation check
before the slice call to ensure query.limit is greater than or equal to 0,
either by adding a condition to only slice when limit >= 0, or by normalizing
any negative limit values to 0 before applying the slice operation.
| const updated: Session = { ...existing, ...patch }; | ||
|
|
||
| const writes: Promise<void>[] = [ | ||
| this.kv.put(this.sessionKey(sessionId), JSON.stringify(updated)), | ||
| ]; |
There was a problem hiding this comment.
Prevent update() from mutating session identity.
patch can include id, so updated.id may diverge from the storage key (sessionId), producing inconsistent reads/indexing.
Suggested fix
- const updated: Session = { ...existing, ...patch };
+ const { id: _ignoredId, ...safePatch } = patch;
+ const updated: Session = { ...existing, ...safePatch, id: sessionId };📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| const updated: Session = { ...existing, ...patch }; | |
| const writes: Promise<void>[] = [ | |
| this.kv.put(this.sessionKey(sessionId), JSON.stringify(updated)), | |
| ]; | |
| const { id: _ignoredId, ...safePatch } = patch; | |
| const updated: Session = { ...existing, ...safePatch, id: sessionId }; | |
| const writes: Promise<void>[] = [ | |
| this.kv.put(this.sessionKey(sessionId), JSON.stringify(updated)), | |
| ]; |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/cloudflare-runtime/src/adapters/cf-kv-session-store-adapter.ts`
around lines 77 - 81, The update() method in the CF KV session store adapter
creates an updated session object that allows the patch parameter to override
the session id field, which can cause the session's id to diverge from the
storage key (sessionId). After creating the updated object by spreading existing
and patch, explicitly reset the id property back to the original sessionId to
prevent patch from mutating the session identity and ensure consistency between
the session's id field and its storage key.
| private async scanAll(): Promise<Session[]> { | ||
| const listed = await this.kv.list({ prefix: `${this.prefix}${SESSION_KEY_PREFIX}` }); | ||
| const sessions = await Promise.all( | ||
| listed.keys.map((key) => this.kv.get(key.name).then((raw) => (raw ? (JSON.parse(raw) as Session) : null))), | ||
| ); | ||
| return sessions.filter((s): s is Session => s !== null); |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
cat -n packages/cloudflare-runtime/src/adapters/cf-kv-session-store-adapter.ts | sed -n '140,160p'Repository: AgentWorkforce/agent-assistant
Length of output: 1126
🏁 Script executed:
rg "kv\.list\(" packages/cloudflare-runtime --type ts --type tsx -A 3Repository: AgentWorkforce/agent-assistant
Length of output: 102
🏁 Script executed:
find packages/cloudflare-runtime -name "*.ts" -o -name "*.tsx" | xargs grep -l "list_complete\|cursor" 2>/dev/nullRepository: AgentWorkforce/agent-assistant
Length of output: 151
🏁 Script executed:
rg "kv\.list" packages/cloudflare-runtime -A 3Repository: AgentWorkforce/agent-assistant
Length of output: 613
🏁 Script executed:
cat -n packages/cloudflare-runtime/src/adapters/cf-kv-session-store-adapter.test.ts | head -100Repository: AgentWorkforce/agent-assistant
Length of output: 4323
🏁 Script executed:
cat -n packages/cloudflare-runtime/src/adapters/cf-kv-session-store-adapter.ts | sed -n '115,160p'Repository: AgentWorkforce/agent-assistant
Length of output: 2298
🏁 Script executed:
rg "fetchMany" packages/cloudflare-runtime -A 5Repository: AgentWorkforce/agent-assistant
Length of output: 3816
🏁 Script executed:
cat -n packages/cloudflare-runtime/src/adapters/cf-kv-session-store-adapter.ts | sed -n '85,115p'Repository: AgentWorkforce/agent-assistant
Length of output: 1303
🏁 Script executed:
cat -n packages/cloudflare-runtime/src/adapters/cf-kv-session-store-adapter.ts | sed -n '50,90p'Repository: AgentWorkforce/agent-assistant
Length of output: 1621
🏁 Script executed:
cat -n packages/cloudflare-runtime/src/adapters/cf-kv-session-store-adapter.ts | sed -n '1,50p'Repository: AgentWorkforce/agent-assistant
Length of output: 2260
🏁 Script executed:
web_search
Cloudflare Workers KV API list() pagination cursor list_completeRepository: AgentWorkforce/agent-assistant
Length of output: 275
🌐 Web query:
Cloudflare Workers KV namespace list() API pagination documentation
💡 Result:
The Cloudflare Workers KV list() API uses cursor-based pagination to handle namespaces containing more keys than the single-request limit [1][2]. Key Concepts for Pagination: 1. Response Properties: The list() method returns an object containing keys, list_complete, and cursor [1]. - list_complete (boolean): Indicates whether there are more keys to fetch. If false, additional pages exist [1]. - cursor (string): An opaque token used to request the next batch of keys [1][2]. 2. Pagination Logic: You must use the list_complete property to determine if further calls are required [1]. Never rely solely on the length of the keys array, as an empty keys array may still be returned if there are more keys to fetch (e.g., due to expired or deleted keys that must be iterated through) [1][3]. 3. Implementing Pagination: - Perform an initial call to list(). - If list_complete is false, use the provided cursor in the next call to list() by passing it within the options object: NAMESPACE.list({ cursor: cursor }) [1][2]. - Repeat this process until list_complete returns true [2]. 4. Important Considerations: - Prefix usage: If you are filtering results with a prefix argument, you must include that same prefix in every subsequent paginated call [1][3]. Omitting it will cause the request to lose the filter and return incorrect results [3]. - Default Limit: The API typically returns up to 1,000 keys per request [4][5]. Example Pattern: let cursor; let listComplete = false; while (!listComplete) { const result = await NAMESPACE.list({ prefix: 'my-prefix', cursor }); // Process keys in result.keys here listComplete = result.list_complete; cursor = result.cursor; }
Citations:
- 1: https://developers.cloudflare.com/kv/api/list-keys/
- 2: https://blog.cloudflare.com/whats-new-with-workers-kv/
- 3: https://skills.sh/jezweb/claude-skills/cloudflare-kv
- 4: https://developers.cloudflare.com/api/node/resources/kv/subresources/namespaces/subresources/keys/methods/list/
- 5: https://cloudflare-cloudflare-typescript.mintlify.app/api-reference/kv
Fix missing pagination in scanAll() to handle KV namespaces with > ~1,000 keys.
kv.list() is paginated and returns list_complete and cursor properties. This implementation ignores pagination, so fetchMany({}) silently misses sessions once the key count exceeds the page limit (~1,000 keys).
Suggested fix
private async scanAll(): Promise<Session[]> {
- const listed = await this.kv.list({ prefix: `${this.prefix}${SESSION_KEY_PREFIX}` });
- const sessions = await Promise.all(
- listed.keys.map((key) => this.kv.get(key.name).then((raw) => (raw ? (JSON.parse(raw) as Session) : null))),
- );
- return sessions.filter((s): s is Session => s !== null);
+ const out: Session[] = [];
+ let cursor: string | undefined;
+ do {
+ const page = await this.kv.list({ prefix: `${this.prefix}${SESSION_KEY_PREFIX}`, cursor });
+ const sessions = await Promise.all(
+ page.keys.map((key) =>
+ this.kv.get(key.name).then((raw) => (raw ? (JSON.parse(raw) as Session) : null)),
+ ),
+ );
+ out.push(...sessions.filter((s): s is Session => s !== null));
+ cursor = page.list_complete ? undefined : page.cursor;
+ } while (cursor);
+ return out;
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| private async scanAll(): Promise<Session[]> { | |
| const listed = await this.kv.list({ prefix: `${this.prefix}${SESSION_KEY_PREFIX}` }); | |
| const sessions = await Promise.all( | |
| listed.keys.map((key) => this.kv.get(key.name).then((raw) => (raw ? (JSON.parse(raw) as Session) : null))), | |
| ); | |
| return sessions.filter((s): s is Session => s !== null); | |
| private async scanAll(): Promise<Session[]> { | |
| const out: Session[] = []; | |
| let cursor: string | undefined; | |
| do { | |
| const page = await this.kv.list({ prefix: `${this.prefix}${SESSION_KEY_PREFIX}`, cursor }); | |
| const sessions = await Promise.all( | |
| page.keys.map((key) => | |
| this.kv.get(key.name).then((raw) => (raw ? (JSON.parse(raw) as Session) : null)), | |
| ), | |
| ); | |
| out.push(...sessions.filter((s): s is Session => s !== null)); | |
| cursor = page.list_complete ? undefined : page.cursor; | |
| } while (cursor); | |
| return out; | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/cloudflare-runtime/src/adapters/cf-kv-session-store-adapter.ts`
around lines 146 - 151, The scanAll() method is missing pagination handling for
the kv.list() API, which will silently miss sessions when the key count exceeds
the page limit of approximately 1,000 keys. Modify the scanAll() method to
implement pagination by checking the list_complete property returned from
kv.list() and using the cursor property for subsequent calls. Create a loop that
continues fetching pages by passing the cursor from the previous response to the
next kv.list() call until list_complete is true, accumulating all the keys from
every page before processing and filtering them into the final sessions array.
| const results = await this.shell.grep(query, { | ||
| path: this.prefix || undefined, | ||
| maxResults: options?.limit, |
There was a problem hiding this comment.
Harden prefix boundary handling to avoid cross-namespace path leakage.
search() uses path: this.prefix, and strip() removes any string starting with that prefix. With prefix = "ws1", paths like ws10/file.ts can be included/mangled (e.g., 0/file.ts). Use segment-safe prefix checks and drop non-matching paths before mapping.
🔧 Suggested fix
export class CfShellVfsProvider implements VfsProvider {
@@
async search(query: string, options?: VfsSearchOptions): Promise<VfsSearchResult[]> {
const results = await this.shell.grep(query, {
- path: this.prefix || undefined,
+ path: this.prefix ? `${this.prefix.replace(/\/+$/g, '')}/` : undefined,
maxResults: options?.limit,
});
- return results.map((r) => ({
- path: this.strip(r.path),
+ return results
+ .filter((r) => this.isWithinPrefix(r.path))
+ .map((r) => ({
+ path: this.strip(r.path),
type: 'file' as const,
snippet: r.content,
provider: 'cf-shell',
- }));
+ }));
}
@@
private strip(path: string): string {
if (!this.prefix) return path;
- const stripped = path.startsWith(this.prefix) ? path.slice(this.prefix.length) : path;
- return stripped.startsWith('/') ? stripped.slice(1) : stripped;
+ const normalizedPrefix = this.prefix.replace(/\/+$/g, '');
+ if (path === normalizedPrefix) return '';
+ if (!path.startsWith(`${normalizedPrefix}/`)) return path;
+ return path.slice(normalizedPrefix.length + 1);
}
+
+ private isWithinPrefix(path: string): boolean {
+ if (!this.prefix) return true;
+ const normalizedPrefix = this.prefix.replace(/\/+$/g, '');
+ return path === normalizedPrefix || path.startsWith(`${normalizedPrefix}/`);
+ }
}Also applies to: 78-81
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/cloudflare-runtime/src/adapters/cf-shell-vfs-provider.ts` around
lines 55 - 57, The search() and strip() methods use simple string prefix
matching without verifying path segment boundaries, allowing paths like
"ws10/file.ts" to incorrectly match prefix "ws1" and causing cross-namespace
leakage. Implement segment-safe prefix validation in the search() method by
checking that the character immediately following this.prefix is either a path
separator or the end of the string before including results in the mapping.
Additionally, add filtering logic to drop any paths from the grep results that
do not match this segment-safe prefix requirement before the strip() operation
processes them, ensuring only properly scoped paths are returned.
| async findByTrigger(trigger: ContinuationResumeTrigger): Promise<ContinuationRecord | null> { | ||
| const constraint = triggerConstraint(trigger); | ||
| if (!constraint) return null; | ||
|
|
||
| const snap = await (this.col as FirestoreQuery<ContinuationRecord>) | ||
| .where(constraint.field, '==', constraint.value) | ||
| .limit(1) | ||
| .get(); | ||
|
|
||
| return snap.docs[0] ? (snap.docs[0].data() as ContinuationRecord) : null; | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
cat -n packages/gcp-runtime/src/adapters/gcp-firestore-continuation-store.ts | head -80Repository: AgentWorkforce/agent-assistant
Length of output: 3108
🏁 Script executed:
rg -A 10 "type ContinuationRecord|interface ContinuationRecord" packages/gcp-runtime/Repository: AgentWorkforce/agent-assistant
Length of output: 56
🏁 Script executed:
rg "status" packages/gcp-runtime/src/adapters/gcp-firestore-continuation-store.tsRepository: AgentWorkforce/agent-assistant
Length of output: 56
🏁 Script executed:
find . -name "*continuation*" -type f | grep -E '\.(ts|tsx|js|jsx)$' | head -20Repository: AgentWorkforce/agent-assistant
Length of output: 808
🏁 Script executed:
rg "ContinuationRecord" -A 5 --max-count=5Repository: AgentWorkforce/agent-assistant
Length of output: 23132
🏁 Script executed:
cat -n packages/continuation/src/types.ts | head -100Repository: AgentWorkforce/agent-assistant
Length of output: 3475
🏁 Script executed:
rg "status.*pending" packages/continuation/src/Repository: AgentWorkforce/agent-assistant
Length of output: 1029
🏁 Script executed:
cat -n packages/continuation/src/types.ts | sed -n '100,180p'Repository: AgentWorkforce/agent-assistant
Length of output: 2803
🏁 Script executed:
rg "interface ContinuationRecord" -A 20 packages/continuation/src/types.tsRepository: AgentWorkforce/agent-assistant
Length of output: 573
findByTrigger must filter by status to enforce the pending contract.
The docstring states the method finds "a pending continuation," but the query doesn't filter by the status field. This allows non-pending records (expired, cancelled, completed, etc.) to be returned and incorrectly resumed. Add .where('status', '==', 'pending') to enforce the documented contract.
Suggested fix
const snap = await (this.col as FirestoreQuery<ContinuationRecord>)
.where(constraint.field, '==', constraint.value)
+ .where('status', '==', 'pending')
.limit(1)
.get();📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| async findByTrigger(trigger: ContinuationResumeTrigger): Promise<ContinuationRecord | null> { | |
| const constraint = triggerConstraint(trigger); | |
| if (!constraint) return null; | |
| const snap = await (this.col as FirestoreQuery<ContinuationRecord>) | |
| .where(constraint.field, '==', constraint.value) | |
| .limit(1) | |
| .get(); | |
| return snap.docs[0] ? (snap.docs[0].data() as ContinuationRecord) : null; | |
| } | |
| async findByTrigger(trigger: ContinuationResumeTrigger): Promise<ContinuationRecord | null> { | |
| const constraint = triggerConstraint(trigger); | |
| if (!constraint) return null; | |
| const snap = await (this.col as FirestoreQuery<ContinuationRecord>) | |
| .where(constraint.field, '==', constraint.value) | |
| .where('status', '==', 'pending') | |
| .limit(1) | |
| .get(); | |
| return snap.docs[0] ? (snap.docs[0].data() as ContinuationRecord) : null; | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/gcp-runtime/src/adapters/gcp-firestore-continuation-store.ts` around
lines 45 - 55, The findByTrigger method is missing a status filter in its
Firestore query, which means it can return non-pending continuation records when
it should only return pending ones. Modify the query chain in findByTrigger to
add an additional where clause that filters by the status field with a value of
pending, placing this constraint after the constraint derived from
triggerConstraint but before the limit and get calls. This will ensure the
method enforces its documented contract of finding only pending continuations.
| if (query.limit != null) sessions = sessions.slice(0, query.limit); | ||
| return sessions; |
There was a problem hiding this comment.
Validate query.limit before slicing.
Line 84 accepts negative limits, which changes semantics (slice(0, -1)) and can return unintended results. Clamp to >= 0 (or reject invalid input).
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/gcp-runtime/src/adapters/gcp-firestore-session-store-adapter.ts`
around lines 84 - 85, The validation for query.limit at the sessions slice
operation does not check if the limit value is negative, which causes unexpected
behavior when passed to the slice method (negative indices change slice
semantics). Add a validation check after the null check for query.limit to
ensure the value is non-negative (>= 0), either by clamping it to Math.max(0,
query.limit) or by rejecting negative values with an error before performing the
slice operation.
| async update(sessionId: string, patch: Partial<Session>): Promise<Session> { | ||
| const ref = this.col.doc(sessionId); | ||
| const snap = await ref.get(); | ||
| if (!snap.exists) throw notFoundError(sessionId); | ||
| await ref.update(patch as Record<string, unknown>); | ||
| const updated = await ref.get(); | ||
| return updated.data() as Session; |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
cat -n packages/gcp-runtime/src/adapters/gcp-firestore-session-store-adapter.ts | head -150Repository: AgentWorkforce/agent-assistant
Length of output: 5621
🏁 Script executed:
# Find notFoundError implementation
rg "notFoundError" packages/gcp-runtime/src/adapters/ -A 3Repository: AgentWorkforce/agent-assistant
Length of output: 1048
🏁 Script executed:
# Check for Firestore error code documentation or patterns
rg "code\s*===\s*[0-9]" packages/gcp-runtime/ -B 2 -A 2Repository: AgentWorkforce/agent-assistant
Length of output: 606
🏁 Script executed:
# Look for similar error handling patterns with Firestore
rg "catch.*err" packages/gcp-runtime/src/adapters/ -A 5Repository: AgentWorkforce/agent-assistant
Length of output: 724
🏁 Script executed:
# Check if there are other references to gRPC error codes in the codebase
rg "code\s*===\s*[0-9]" --type ts --type js -B 2 -A 2Repository: AgentWorkforce/agent-assistant
Length of output: 1431
🌐 Web query:
gRPC status code 5 NOT_FOUND error definition
💡 Result:
The gRPC status code 5, defined as NOT_FOUND, indicates that some requested entity (for example, a file or directory) was not found [1][2][3]. Key characteristics and usage guidelines include: Implementation Detail: This status code is never generated by the gRPC library itself; it is only returned by application-level code [1][4]. Usage Guidelines: Server developers should use NOT_FOUND if a request is denied for an entire class of users, such as during a gradual feature rollout or when using an undocumented allowlist [1][2]. If a request is denied for only specific users within a class (e.g., due to user-based access control), developers must use PERMISSION_DENIED (status code 7) instead [1][2]. Mapping: In HTTP/1.1 or HTTP/2 mapping, this status code corresponds to HTTP 404 Not Found [2][5]. Best Practices: Services should return the most specific error code available [2]. If both NOT_FOUND and another error code (such as FAILED_PRECONDITION) apply, developers should prefer NOT_FOUND [2].
Citations:
- 1: https://grpc.github.io/grpc/core/md_doc_statuscodes
- 2: https://github.com/googleapis/googleapis/blob/master/google/rpc/code.proto
- 3: https://github.com/grpc/grpc/blob/master/doc/statuscodes.md
- 4: https://github.com/grpc/grpc.io/blob/main/content/en/docs/guides/status-codes.md
- 5: https://statuscodefyi.com/grpc/5-not-found/
update() has a TOCTOU gap and can leak provider-specific errors.
Line 90 pre-checks existence, but the document can still disappear before line 92 executes. In that case, Firestore throws an error with gRPC status code 5 (NOT_FOUND), which this method would leak instead of returning the adapter's SessionNotFoundError shape. Additionally, the final fetch at line 93 lacks validation that the document still exists.
Suggested fix
async update(sessionId: string, patch: Partial<Session>): Promise<Session> {
const ref = this.col.doc(sessionId);
- const snap = await ref.get();
- if (!snap.exists) throw notFoundError(sessionId);
- await ref.update(patch as Record<string, unknown>);
+ const snap = await ref.get();
+ if (!snap.exists) throw notFoundError(sessionId);
+ try {
+ await ref.update(patch as Record<string, unknown>);
+ } catch (err) {
+ const code = (err as { code?: number })?.code;
+ if (code === 5) throw notFoundError(sessionId); // NOT_FOUND
+ throw err;
+ }
const updated = await ref.get();
- return updated.data() as Session;
+ if (!updated.exists) throw notFoundError(sessionId);
+ return updated.data() as Session;
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| async update(sessionId: string, patch: Partial<Session>): Promise<Session> { | |
| const ref = this.col.doc(sessionId); | |
| const snap = await ref.get(); | |
| if (!snap.exists) throw notFoundError(sessionId); | |
| await ref.update(patch as Record<string, unknown>); | |
| const updated = await ref.get(); | |
| return updated.data() as Session; | |
| async update(sessionId: string, patch: Partial<Session>): Promise<Session> { | |
| const ref = this.col.doc(sessionId); | |
| const snap = await ref.get(); | |
| if (!snap.exists) throw notFoundError(sessionId); | |
| try { | |
| await ref.update(patch as Record<string, unknown>); | |
| } catch (err) { | |
| const code = (err as { code?: number })?.code; | |
| if (code === 5) throw notFoundError(sessionId); // NOT_FOUND | |
| throw err; | |
| } | |
| const updated = await ref.get(); | |
| if (!updated.exists) throw notFoundError(sessionId); | |
| return updated.data() as Session; | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/gcp-runtime/src/adapters/gcp-firestore-session-store-adapter.ts`
around lines 88 - 94, The update() method has a time-of-check-time-of-use
(TOCTOU) gap where the document can be deleted between the existence check and
the actual update operation, causing Firestore to throw a provider-specific gRPC
error instead of the adapter's SessionNotFoundError. Additionally, the final
fetch lacks validation that the document exists. Wrap the ref.update(patch) call
in a try-catch block to catch Firestore errors and convert NOT_FOUND errors
(gRPC status code 5) to notFoundError(sessionId). After fetching the updated
document with await ref.get(), validate that the snapshot exists before
returning updated.data() as Session, throwing notFoundError(sessionId) if the
document no longer exists.
| async runResumedTurn(input: ContinuationResumedTurnInput): Promise<HarnessResult> { | ||
| const ref = this.stash.doc(input.resumedTurnId); | ||
| const snap = await ref.get(); | ||
|
|
||
| if (snap.exists) { | ||
| const stashed = snap.data(); | ||
| if (stashed) return stashed.result; | ||
| } | ||
|
|
||
| const result = await this.inner.runResumedTurn(input); | ||
| await ref.set({ result }); | ||
| return result; |
There was a problem hiding this comment.
Make execution atomic per resumedTurnId to preserve idempotency.
The current get → run inner → set sequence allows two concurrent deliveries of the same resumedTurnId to both miss the stash and execute the inner harness twice. That breaks the adapter’s idempotency guarantee and can duplicate downstream model/tool side effects. Introduce an atomic claim step (transaction or create-lock/state document) before running inner, with only one winner allowed to execute and publish the final result.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/gcp-runtime/src/adapters/gcp-idempotent-turn-executor.ts` around
lines 38 - 49, The runResumedTurn method in the GcpIdempotentTurnExecutor class
has a race condition where concurrent requests with the same resumedTurnId can
both miss the stash and execute the inner harness twice. Before calling
this.inner.runResumedTurn, introduce an atomic claim step using a Firestore
transaction or a create-lock pattern that atomically tries to claim exclusive
execution rights for the given resumedTurnId. Only the winner of this claim
should proceed to execute this.inner.runResumedTurn and publish the result to
the stash, while other concurrent attempts should wait for or retrieve the
result from the stashed data once the winner completes.
| const regex = new RegExp(query); | ||
| const results: VfsSearchResult[] = []; | ||
| const limit = options?.limit ?? Infinity; | ||
|
|
||
| for (const file of files) { | ||
| if (results.length >= limit) break; | ||
| try { | ||
| const [buf] = await file.download(); | ||
| const text = buf.toString('utf-8'); | ||
| const matchLine = text.split('\n').find((l) => regex.test(l)); | ||
| if (matchLine !== undefined) { |
There was a problem hiding this comment.
Untrusted regex construction in search() can trigger ReDoS or runtime errors.
Line 77 builds a regex directly from user input. Malicious or invalid patterns can cause high CPU or throw before the loop starts.
Suggested fix (treat query as literal text)
async search(query: string, options?: VfsSearchOptions): Promise<VfsSearchResult[]> {
const [files] = await this.bucket.getFiles({ prefix: this.prefix });
- const regex = new RegExp(query);
+ const escaped = query.replace(/[.*+?^${}()|[\]\\]/g, '\\$&');
+ const regex = new RegExp(escaped);🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/gcp-runtime/src/adapters/gcp-storage-vfs-provider.ts` around lines
77 - 87, The search() method creates a RegExp directly from the user-provided
query parameter without escaping special regex characters, which can cause ReDoS
attacks or runtime errors. Modify the code at line 77 where new RegExp(query) is
constructed to first escape the query parameter by replacing special regex
metacharacters with their escaped equivalents, ensuring the query is treated as
literal text rather than a regex pattern. This prevents malicious or invalid
patterns from causing performance issues or exceptions.
Source: Linters/SAST tools
|
ℹ️ pr-reviewer: review only — no file changes were applied to the PR (nothing to commit after review). The notes below are advisory and were not pushed. No code edits were needed — the diff is clean, mechanically and semantically. I made no changes to the working tree. Here is my review. PR #100 Review — Add
|
Implements four new adapters in @agent-assistant/cloudflare-runtime that
leverage Cloudflare's newly announced Flue SDK and existing platform
primitives:
the SQLite-backed virtual filesystem for read/list/grep operations
the previously missing Cloudflare adapter for the sessions package
using runWorkflow(), giving watch rules durable delay and retry
so resumed turns survive Worker process interruptions without re-running
model or tool calls
All three new package dependencies (@agent-assistant/sessions, proactive,
vfs) are optional peerDependencies so the main bundle stays lean for
consumers that only need a subset of adapters. 82 tests pass.
Co-Authored-By: Claude Sonnet 4.6 noreply@anthropic.com
Claude-Session: https://claude.ai/code/session_012qgiUVUBzg9YtUAPKCVwke