Skip to content

[codex] fix cron_sync_sub queue payload handling#1870

Merged
riderx merged 6 commits into
mainfrom
codex/fix-cron-sync-sub-queue-payload
Mar 27, 2026
Merged

[codex] fix cron_sync_sub queue payload handling#1870
riderx merged 6 commits into
mainfrom
codex/fix-cron-sync-sub-queue-payload

Conversation

@riderx
Copy link
Copy Markdown
Member

@riderx riderx commented Mar 27, 2026

Summary (AI generated)

  • standardize process_cron_sync_sub_jobs() queue messages to use the shared { function_name, function_type, payload } envelope
  • preserve legacy top-level queue fields in queue_consumer so already-queued cron_sync_sub jobs still dispatch with orgId
  • add regression coverage for both the consumer fallback and the SQL queue payload shape

Motivation (AI generated)

cron_sync_sub was still being enqueued with orgId and customerId at the top level while queue_consumer only forwarded message.payload. That made the recurring scheduler POST {} to /triggers/cron_sync_sub, producing no_orgId errors in production logs.

Business Impact (AI generated)

This restores recurring subscription sync jobs for paying organizations, removes noisy cron failures, and reduces the chance of stale billing state affecting product behavior tied to subscription status.

Test Plan (AI generated)

  • bunx eslint supabase/functions/_backend/triggers/queue_consumer.ts tests/queue-consumer-message-shape.unit.test.ts tests/process-cron-sync-sub-jobs.test.ts
  • bun run supabase:with-env -- bunx vitest run tests/queue-consumer-message-shape.unit.test.ts tests/process-cron-sync-sub-jobs.test.ts

Generated with AI

Summary by CodeRabbit

  • New Features

    • Subscription synchronization tasks are now automatically queued for each active organization/customer pair.
  • Improvements

    • Queue processor accepts both current and legacy message formats for backward compatibility.
    • Message validation relaxed to tolerate varied envelope shapes and optional payloads.
  • Tests

    • Added tests for message format compatibility and the subscription synchronization job.

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Mar 27, 2026

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 9cfd56eb-6089-4963-af82-87b872d75dd5

📥 Commits

Reviewing files that changed from the base of the PR and between 58cd3d5 and e81d623.

📒 Files selected for processing (2)
  • supabase/migrations/20260327044102_fix_cron_sync_sub_queue_payload.sql
  • tests/process-cron-sync-sub-jobs.test.ts
✅ Files skipped from review due to trivial changes (1)
  • tests/process-cron-sync-sub-jobs.test.ts

📝 Walkthrough

Walkthrough

Relaxed queue consumer message validation to accept missing payload, added a helper to normalize nested payload vs legacy top-level fields and exported test utils; introduced a DB function that enqueues cron_sync_sub messages with nested payload; added integration and unit tests validating both behaviors. (50 words)

Changes

Cohort / File(s) Summary
Queue Consumer Message Shape
supabase/functions/_backend/triggers/queue_consumer.ts
Made message.payload optional via a loose Zod schema, updated Message typings to allow legacy fields and non-undefined function_type, added extractMessageBody(message) to return nested message.payload (default {}) or a legacy body formed by stripping function_name/function_type from message.message, emit a cloudlog when falling back to legacy body, and exported __queueConsumerTestUtils__.
Cron Sync Database Function
supabase/migrations/20260327044102_fix_cron_sync_sub_queue_payload.sql
Added public.process_cron_sync_sub_jobs() which selects distinct org/customer pairs and enqueues cron_sync_sub messages to pgmq with function_name: 'cron_sync_sub', function_type: NULL, and nested payload containing orgId and customerId; sets owner to postgres, revokes broad privileges, and grants EXECUTE only to service_role.
Integration & Unit Tests
tests/process-cron-sync-sub-jobs.test.ts, tests/queue-consumer-message-shape.unit.test.ts
Added integration test asserting process_cron_sync_sub_jobs() enqueues messages with nested payload; added unit tests for extractMessageBody() covering payload-present, legacy top-level-fields fallback, and empty fallback cases.

Sequence Diagram

sequenceDiagram
    participant Cron as Cron Trigger
    participant ProcFunc as process_cron_sync_sub_jobs()
    participant Queue as pgmq Queue
    participant Consumer as Queue Consumer
    participant Helper as extractMessageBody()
    participant Handler as Message Handler

    Cron->>ProcFunc: invoke
    ProcFunc->>ProcFunc: query org/customer pairs
    loop per org/customer
        ProcFunc->>Queue: send message {function_name, function_type: NULL, payload:{orgId,customerId}}
    end

    Queue->>Consumer: dequeue message
    Consumer->>Helper: extractMessageBody(message)
    alt message has nested payload
        Helper-->>Consumer: return nested payload object
    else legacy top-level fields present
        Helper-->>Consumer: return object built from top-level legacy fields
    else no routing fields
        Helper-->>Consumer: return {}
    end
    Consumer->>Handler: invoke function with normalized body
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related PRs

Suggested labels

codex

Poem

🐇 I hopped through queues both new and old,
Found payloads nested, some shy and cold.
I stitched the shapes so messages play nice,
Cron jobs now enqueue with payload precise.
thump-thump, carrot-coded delight!

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 16.67% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly summarizes the main change: fixing cron_sync_sub queue payload handling by standardizing the message envelope format.
Description check ✅ Passed The PR description is well-structured with Summary, Motivation, Business Impact, and Test Plan sections that align with the required template, though it omits Checklist and Screenshots sections (acceptable for backend changes).

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch codex/fix-cron-sync-sub-queue-payload

Warning

There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure.

🔧 SQLFluff (4.0.4)
supabase/migrations/20260327044102_fix_cron_sync_sub_queue_payload.sql

User Error: No dialect was specified. You must configure a dialect or specify one on the command line using --dialect after the command. Available dialects:
ansi, athena, bigquery, clickhouse, databricks, db2, doris, duckdb, exasol, flink, greenplum, hive, impala, mariadb, materialize, mysql, oracle, postgres, redshift, snowflake, soql, sparksql, sqlite, starrocks, teradata, trino, tsql, vertica


Comment @coderabbitai help to get the list of available commands and usage tips.

@codspeed-hq
Copy link
Copy Markdown
Contributor

codspeed-hq Bot commented Mar 27, 2026

Merging this PR will not alter performance

✅ 28 untouched benchmarks


Comparing codex/fix-cron-sync-sub-queue-payload (e81d623) with main (fb29bd1)

Open in CodSpeed

@riderx riderx marked this pull request as ready for review March 27, 2026 22:24
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@supabase/migrations/20260327044102_fix_cron_sync_sub_queue_payload.sql`:
- Around line 20-23: The SQL constructing the queued message hard-codes
function_type = 'cloudflare' for cron_sync_sub, causing all cron_sync_sub jobs
to be routed to Cloudflare; change the payload construction so function_type is
NULL or omitted for the cron_sync_sub case (i.e. remove or set to NULL the
'function_type' field in the jsonb_build_object for 'cron_sync_sub'), and update
the corresponding test assertion in tests/process-cron-sync-sub-jobs.test.ts to
expect no 'function_type' or a null value for cron_sync_sub payloads.

In `@tests/process-cron-sync-sub-jobs.test.ts`:
- Around line 4-19: The teardown only deletes rows for a single orgId; change
clearCronSyncSubMessages (and similarly getCronSyncSubMessages) to remove/select
all messages that this RPC enqueues instead of filtering by orgId — i.e., target
rows in pgmq.q_cron_sync_sub by the message type/payload that identifies the
cron_sync_sub job (for example WHERE message->>'type' = 'cron_sync_sub' or the
appropriate message->'payload' flag used by process_cron_sync_sub_jobs()) rather
than COALESCE(... orgId), so all rows inserted by the invocation are cleaned up.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 655152c0-f9df-4cca-ba7f-bd9e9925bd77

📥 Commits

Reviewing files that changed from the base of the PR and between 7bd5daf and 266c660.

📒 Files selected for processing (4)
  • supabase/functions/_backend/triggers/queue_consumer.ts
  • supabase/migrations/20260327044102_fix_cron_sync_sub_queue_payload.sql
  • tests/process-cron-sync-sub-jobs.test.ts
  • tests/queue-consumer-message-shape.unit.test.ts

Comment thread tests/process-cron-sync-sub-jobs.test.ts Outdated
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
supabase/functions/_backend/triggers/queue_consumer.ts (1)

477-479: Consider marking test helper export as internal-only.

queueConsumerTestUtils works, but an explicit internal/test-only naming convention (for example __test__) would reduce accidental production coupling.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@supabase/functions/_backend/triggers/queue_consumer.ts` around lines 477 -
479, Rename the exported test helper to an internal/test-only identifier to
avoid accidental production use: change the export name queueConsumerTestUtils
(which currently exposes extractMessageBody) to an unmistakably internal name
such as __queueConsumerTestUtils__ or __test__queueConsumerUtils__, and update
any references/imports accordingly so only tests import the new symbol; ensure
extractMessageBody remains accessible via the renamed export but do not change
its implementation.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@supabase/functions/_backend/triggers/queue_consumer.ts`:
- Around line 477-479: Rename the exported test helper to an internal/test-only
identifier to avoid accidental production use: change the export name
queueConsumerTestUtils (which currently exposes extractMessageBody) to an
unmistakably internal name such as __queueConsumerTestUtils__ or
__test__queueConsumerUtils__, and update any references/imports accordingly so
only tests import the new symbol; ensure extractMessageBody remains accessible
via the renamed export but do not change its implementation.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: b4bbed16-39c9-4a73-a4e9-92d7b85396ee

📥 Commits

Reviewing files that changed from the base of the PR and between 266c660 and 231f12b.

📒 Files selected for processing (1)
  • supabase/functions/_backend/triggers/queue_consumer.ts

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
supabase/functions/_backend/triggers/queue_consumer.ts (1)

40-46: Tighten payload normalization to object-only before forwarding.

Current casting allows any payload type to be treated as an object. Consider guarding for plain-object payloads and defaulting to {} otherwise, so malformed payloads don’t propagate to trigger handlers.

Suggested hardening
 function extractMessageBody(message: Message): Record<string, unknown> {
-  if (message.message?.payload !== undefined)
-    return (message.message.payload ?? {}) as Record<string, unknown>
+  if (message.message?.payload !== undefined) {
+    const payload = message.message.payload
+    if (payload && typeof payload === 'object' && !Array.isArray(payload)) {
+      return payload as Record<string, unknown>
+    }
+    return {}
+  }
 
   const { function_name: _functionName, function_type: _functionType, ...legacyBody } = message.message ?? {}
   return legacyBody
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@supabase/functions/_backend/triggers/queue_consumer.ts` around lines 40 - 46,
The extractMessageBody function currently casts any payload to an object;
instead validate that message.message.payload is a plain object before returning
it. In extractMessageBody, check that message.message?.payload is non-null,
typeof 'object', not an Array, and not a function; if it passes, return it cast
as Record<string, unknown>, otherwise return {} as the safe default; keep the
existing legacy path that destructures function_name/function_type into
legacyBody unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@supabase/functions/_backend/triggers/queue_consumer.ts`:
- Around line 40-46: The extractMessageBody function currently casts any payload
to an object; instead validate that message.message.payload is a plain object
before returning it. In extractMessageBody, check that message.message?.payload
is non-null, typeof 'object', not an Array, and not a function; if it passes,
return it cast as Record<string, unknown>, otherwise return {} as the safe
default; keep the existing legacy path that destructures
function_name/function_type into legacyBody unchanged.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 5b928d7e-b89c-44fd-833d-61c9d309c95e

📥 Commits

Reviewing files that changed from the base of the PR and between 231f12b and 58cd3d5.

📒 Files selected for processing (3)
  • supabase/functions/_backend/triggers/queue_consumer.ts
  • tests/process-cron-sync-sub-jobs.test.ts
  • tests/queue-consumer-message-shape.unit.test.ts
🚧 Files skipped from review as they are similar to previous changes (2)
  • tests/queue-consumer-message-shape.unit.test.ts
  • tests/process-cron-sync-sub-jobs.test.ts

@sonarqubecloud
Copy link
Copy Markdown

@riderx riderx merged commit c3d6b4f into main Mar 27, 2026
17 checks passed
@riderx riderx deleted the codex/fix-cron-sync-sub-queue-payload branch March 27, 2026 23:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant