Skip to content

CBOR Transport support#25

Merged
marcopiraccini merged 11 commits into
mainfrom
cbor-support
Apr 20, 2026
Merged

CBOR Transport support#25
marcopiraccini merged 11 commits into
mainfrom
cbor-support

Conversation

@marcopiraccini
Copy link
Copy Markdown
Contributor

@marcopiraccini marcopiraccini commented Apr 18, 2026

Summary

Adds spec version 3 support to @platformatic/world and @platformatic/workflow: CBOR queue transport + resilient start + streams.* interface compat. Brings full parity with Vercel's upstream e2e suite so Platformatic can be un-skipped in Vercel's community-worlds CI.

Follow-up to vercel/workflow#1450, which landed Platformatic as if: false pending CBOR. The upstream change that made CBOR a requirement is vercel/workflow#1627 — "Gate CBOR queue transport on specVersion" — which switched queue messages from JSON to CBOR for specVersion >= 3 so runInput.input (a Uint8Array) survives the wire. Worlds that want to handle v3 runs must speak CBOR on the queue. Community worlds opt in via vercel/workflow#1658.

Workflow SDK compatibility

Our World API is typed against @workflow/world@4.1.1 stable (which already exports SPEC_VERSION_SUPPORTS_CBOR_QUEUE_TRANSPORT, RunInput, QueueOptions.specVersion, World.specVersion, getStreamChunks/getStreamInfo — CBOR support was backported into the stable 4.1.x line). The same world instance also works at runtime against the @workflow/core@5.0.0-beta line.

Installed workflow SDK Works at runtime Notes
workflow@4.2.x (stable) Declared API. Streamer calls go through the flat methods (writeToStream, getStreamChunks, ...).
workflow@5.0.0-beta.x v5 SDK calls world.streams.* (nested namespace, different arg order). Exposed at runtime alongside v4 methods.

Both are exercised in CI: e2e-v5/ runs against workflow@5.0.0-beta.2 (mirrors Vercel's main-branch CI), e2e-v4/ runs against workflow@4.2.4 stable.

Scope

Client (@platformatic/world)

  • HttpClient.post(path, body, query?, encoding?) handles both JSON and CBOR bodies via an encoding: 'json' | 'cbor' parameter. encode from cbor-x is called inline.
  • queue() picks CBOR when opts.specVersion >= 3, JSON otherwise. Falls back to CBOR when opts.specVersion is missing.
  • createQueueHandler does CBOR-first decode with a JSON fallback inline (no shared Transport abstraction).
  • HttpClient validates paths at the boundary: must start with /, must not contain // (catches empty-interpolation bugs before they hit the server).
  • Streamer has a single canonical impl; v4 flat methods and v5 streams.* are thin adapters that delegate to it. Satisfies v4.1.1's Streamer type; streams is a runtime-only addition for v5 SDKs.
  • streams.get copies each chunk into a standalone ArrayBuffer via new Uint8Array(chunk) — undici's pooled buffers aren't detachable and break the SDK's byte-stream transfer.
  • storage.runs.get renames 404 errors to WorkflowRunNotFoundError so the SDK's resilient-start retry loop recognizes them as retryable.
  • World declares specVersion: SPEC_VERSION_SUPPORTS_CBOR_QUEUE_TRANSPORT.

Server (@platformatic/workflow)

  • Migration 002.do.sql: workflow_queue_messages gains payload_bytes BYTEA + payload_encoding TEXT ('json' | 'cbor') with an XOR constraint. Undo migration is forward-only.
  • New Fastify application/cbor content-type parser.
  • plugins/queue.ts branches on Content-Type, stores in arrival encoding.
  • queue/dispatcher.ts forwards with matching Content-Type; re-enqueues preserve encoding.
  • New stream endpoints: GET /runs/:runId/streams/:name/chunks (paginated) and /info (tailIndex + done).
  • run_started event handler: idempotent (otherwise v5 replay fails with "Unconsumed event in event log") AND bootstraps the run from eventData when no prior run_created exists (resilient-start recovery).
  • Hook list query tie-breaks on correlation_id so hooks created in the same millisecond return in workflow order.

E2E structure

  • e2e-v5/ — workbench on workflow@5.0.0-beta.2 (matches Vercel's main-branch CI). Build uses WORKFLOW_PUBLIC_MANIFEST=1 so the manifest is served at /.well-known/workflow/v1/manifest.json.
  • e2e-v4/ — workbench on workflow@4.2.4 stable. Covers the v4 runtime path beyond the basic happy path: sleep, hook resume, step retry-until-success, FatalError bubbling, output stream writes.
  • Four tests ported from packages/core/e2e/e2e.test.ts into e2e-v5/test/vercel-e2e.test.ts:
    • resilient start: addTenWorkflow completes when run_created returns 500 (line 2255)
    • outputStreamWorkflow: getTailIndex returns correct index after stream completes (line 711)
    • outputStreamWorkflow: getTailIndex returns -1 before any chunks are written (line 728)
    • outputStreamWorkflow: getChunks returns same content as reading the stream (line 745)
  • Fixed pre-existing bug: getWorld() was called without await in the queue-based health check.
  • Unskipped webhookWorkflow: HTTP-triggered resume with 3 webhook types — v5 fixes the respondWith: 'manual' cross-process issue.
  • Unskipped wellKnownAgentWorkflow: step discovery in dot-prefixed directory — side-effect import from app/api/trigger-e2e/route.ts makes the file reachable from the module graph, mirroring Vercel's own workbench.
  • New cbor-e2e.test.ts (3 tests): DB-level assertion that payload_encoding = 'cbor' after a real run — ground-truth signal CBOR is engaged.

Root scripts

  • pnpm test:e2e:v5 — v5 SDK smoke suite
  • pnpm test:e2e:v4 — v4 SDK smoke suite
  • pnpm test:e2e:vercel — full Vercel-ported suite

Rollback is forward-only: drain the queue before running 002.undo.sql.

Test plan

  • `pnpm test` — world + workflow unit tests, all green
  • `pnpm test:e2e:v5` — 5/5 pass (workflow@5.0.0-beta.2)
  • `pnpm test:e2e:v4` — 9/9 pass (workflow@4.2.4 stable)
  • `pnpm test:e2e:vercel` — 61/61 pass, 0 fail, 0 skipped
  • Lint clean across all packages + workbenches
  • `SELECT payload_encoding, COUNT(*) FROM workflow_queue_messages` shows 100% `cbor` after a v5-SDK run

Known upstream issue tracked: vercel/workflow#1735. We drain streams fully before decode, same as upstream.

marcopiraccini and others added 5 commits April 18, 2026 19:36
Signed-off-by: marcopiraccini <marco.piraccini@gmail.com>
Signed-off-by: marcopiraccini <marco.piraccini@gmail.com>
Adds the server/client pieces needed for Vercel's e2e suite to pass
against us when they un-skip the Platformatic community world:

- Server: run_started now bootstraps the run from eventData when no
  prior run_created exists (resilient start path). The SDK v5 contract
  expects this so a failed run_created can be recovered via the queue
  payload.

- Client: runs.get now names 404 errors WorkflowRunNotFoundError so the
  SDK's resilient-start retry loop recognizes them as retryable.
  streams.get copies chunks into standalone ArrayBuffers — undici's
  pooled buffers aren't detachable, which broke the SDK's byte-stream
  transfer. streams.getChunks does the same for paginated chunks.

- E2E: bump next build with WORKFLOW_PUBLIC_MANIFEST=1 so the manifest
  is served from public/, matching Vercel's CI setup. Add four adapted
  tests to vercel-e2e.test.ts: resilient start (#2255), getTailIndex
  after stream completes, getTailIndex on nonexistent namespace,
  getChunks paginates. wellKnownAgentWorkflow stays skipped (upstream
  @workflow/next plugin does not discover dot-prefixed app dirs).

Verified: 61/59/0/2 (tests/pass/fail/skipped) in 220s against a clean
local run.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: marcopiraccini <marco.piraccini@gmail.com>
transport.ts is a direct adaptation of packages/world-vercel/src/queue.ts
— same wire contract, same 3 classes, same CBOR-first/JSON-fallback
logic. Add an inline comment at the top of the file pointing to NOTICE,
and list the file in NOTICE alongside the e2e suite ports. Also note
the two new vercel-e2e.test.ts ports (resilient start + outputStream
getTailIndex/getChunks) in NOTICE.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@marcopiraccini marcopiraccini changed the title CBOR support CBOR Transport support Apr 19, 2026
Signed-off-by: marcopiraccini <marco.piraccini@gmail.com>
Signed-off-by: marcopiraccini <marco.piraccini@gmail.com>
@marcopiraccini marcopiraccini marked this pull request as ready for review April 19, 2026 07:52
@marcopiraccini marcopiraccini requested a review from mcollina April 19, 2026 07:59
Comment thread packages/workflow/package.json
Copy link
Copy Markdown
Member

@mcollina mcollina left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The semversiness of this unclear to me

Comment thread packages/world/README.md Outdated
- Queue messages between client and server use CBOR framing. CBOR preserves `Uint8Array` natively (JSON does not), so binary workflow input survives the queue round-trip without base64 wrapping.
- `createQueueHandler` accepts both CBOR and JSON inbound via a dual transport. A v3 client can be deployed against a v2-only server during rollout; a v2 client can be deployed against a v3 server.

Peer dependency: `@workflow/world` ≥ 5.0.0-beta.1 (the first release exporting `SPEC_VERSION_SUPPORTS_CBOR_QUEUE_TRANSPORT`).
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes this a semver-major for us too. Can we do this in a way that we support also old clients?

I would prefer if we had integration tests for both v4 and v5 too.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We actually are still in 0.x but OK.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be clear, a user would need to use the v5 beta of workflow?

Copy link
Copy Markdown
Contributor Author

@marcopiraccini marcopiraccini Apr 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated, now we support both v4 and v5, PTAL

Signed-off-by: marcopiraccini <marco.piraccini@gmail.com>
Signed-off-by: marcopiraccini <marco.piraccini@gmail.com>
Signed-off-by: marcopiraccini <marco.piraccini@gmail.com>
Comment thread packages/world/src/lib/client.ts Outdated
}

async postRaw (path: string, body: Buffer, contentType: string, query?: Record<string, string | undefined>): Promise<any> {
let fullPath = `${this.#baseUrl}${path}`
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are you making sure that delimiters (like /) are set? I think we should validate for // checks.

Comment thread packages/world/src/lib/client.ts Outdated
async postRaw (path: string, body: Buffer, contentType: string, query?: Record<string, string | undefined>): Promise<any> {
let fullPath = `${this.#baseUrl}${path}`
if (query) {
const url = new URL(`http://localhost${fullPath}`)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we use baseUrl here?

Comment thread packages/world/src/lib/queue.ts Outdated

const cborTransport = new CborTransport()
const jsonTransport = new JsonTransport()
const dualTransport = new DualTransport()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are those globals?

Comment thread packages/world/src/lib/queue.ts Outdated
})
const result = useCbor
? await client.postRaw('/queue', transport.serialize(envelope), transport.contentType)
: await client.post('/queue', envelope)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure it's not better to handle this in the client itself?

Comment thread packages/world/src/lib/streamer.ts Outdated
transform (chunk, controller) {
const copy = new Uint8Array(chunk.byteLength)
copy.set(chunk)
controller.enqueue(copy)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sigh.

Comment thread packages/world/src/lib/streamer.ts Outdated
const src = Buffer.from(chunk.data, 'base64')
const copy = new Uint8Array(src.byteLength)
copy.set(src)
return { index: chunk.index, data: copy }
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is allocating memory twice for no particular reason, can we avoid it?

Comment thread packages/world/src/lib/transport.ts Outdated
}
}

export async function drainStream (stream: ReadableStream<Uint8Array>): Promise<Buffer> {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would call it concatStream

Comment thread packages/world/src/lib/transport.ts Outdated
total += value.byteLength
}
}
const buf = Buffer.allocUnsafe(total)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not Buffer.concat

Signed-off-by: marcopiraccini <marco.piraccini@gmail.com>
Copy link
Copy Markdown
Member

@mcollina mcollina left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@marcopiraccini marcopiraccini merged commit c1fe1c5 into main Apr 20, 2026
3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants