feat: add pull flow testing#504
Conversation
There was a problem hiding this comment.
Pull request overview
Adds a new “pull check” job/check to validate the Curio/SP pull-to-park pathway by hosting a temporary piece under /api/piece/:pieceCid, asking the SP to pullPieces, polling for completion, committing on-chain, and validating by direct /piece/:pieceCid fetch integrity. This extends DealBot’s check suite and metrics/docs to cover the pull flow described in #300.
Changes:
- Introduces backend Pull Check implementation (service, registry, controller) plus Vitest coverage and Prometheus metrics.
- Wires
pull_checkinto pg-boss scheduling/worker handling and job schedule maintenance, with new config/env vars. - Adds/updates documentation for the new check, metrics, environment variables, and runbooks.
Reviewed changes
Copilot reviewed 28 out of 28 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| docs/runbooks/jobs.md | Updates SQL runbook snippets to include pull_check pausing/resuming and adds a “run pull check” snippet. |
| docs/jobs.md | Documents pull_check as a per-SP job on sp.work and includes it in capacity formulas and env var list. |
| docs/environment-variables.md | Adds Pull Check env vars and documents DEALBOT_API_PUBLIC_URL. |
| docs/checks/README.md | Adds Pull Check to the checks index and terminology. |
| docs/checks/pull-check.md | New source-of-truth doc describing pull check lifecycle, assertions, API, and metrics. |
| docs/checks/production-configuration-and-approval-methodology.md | Notes pull check is not yet an approval criterion and estimates production load impact. |
| docs/checks/events-and-metrics.md | Adds pull-check event model and metric definitions; extends checkType list. |
| apps/backend/src/worker.module.ts | Imports PullCheckModule into worker app. |
| apps/backend/src/wallet-sdk/wallet-sdk.service.ts | Exposes underlying Synapse viem client via getSynapseClient(). |
| apps/backend/src/pull-check/pull-check.types.ts | Defines hosted piece registration/prepared types. |
| apps/backend/src/pull-check/pull-check.service.ts | Implements pull check lifecycle, hosted piece preparation/cleanup, polling, commit, and direct fetch validation. |
| apps/backend/src/pull-check/pull-check.service.spec.ts | Unit tests for pull check service behavior and metric recording. |
| apps/backend/src/pull-check/pull-check.module.ts | New Nest module wiring PullCheckService, registry, and controller. |
| apps/backend/src/pull-check/piece-source.controller.ts | Adds unauthenticated /api/piece/:pieceCid streaming endpoint with 404/410 behavior and first-byte capture. |
| apps/backend/src/pull-check/piece-source.controller.spec.ts | Tests controller status codes, headers, streaming, and error handling. |
| apps/backend/src/pull-check/hosted-piece.registry.ts | In-memory registry for hosted pieces with TTL/cleanup markers and idempotent timestamps. |
| apps/backend/src/pull-check/hosted-piece.registry.spec.ts | Unit tests for registry lifecycle and idempotency. |
| apps/backend/src/metrics-prometheus/metrics-prometheus.module.ts | Registers new pull-check histograms/counters. |
| apps/backend/src/metrics-prometheus/check-metrics.service.ts | Adds PullCheckCheckMetrics wrapper for new metrics. |
| apps/backend/src/metrics-prometheus/check-metric-labels.ts | Extends CheckType union with pullCheck. |
| apps/backend/src/jobs/repositories/job-schedule.repository.ts | Includes pull_check in schedule cleanup deletion queries. |
| apps/backend/src/jobs/jobs.service.ts | Adds pull_check handling, scheduling interval computation, and schedule upsert. |
| apps/backend/src/jobs/jobs.service.spec.ts | Updates schedule-row tests for the additional per-SP schedule upsert. |
| apps/backend/src/jobs/jobs.module.ts | Imports PullCheckModule so JobsService can run pull checks. |
| apps/backend/src/database/entities/job-schedule-state.entity.ts | Adds pull_check to the JobType union. |
| apps/backend/src/config/app.config.ts | Adds env vars + config fields for pull check and DEALBOT_API_PUBLIC_URL. |
| apps/backend/src/app.module.ts | Imports PullCheckModule into the API app. |
| apps/backend/.env.example | Documents new pull check env vars and optional DEALBOT_API_PUBLIC_URL. |
SgtPooki
left a comment
There was a problem hiding this comment.
I haven't looked through it all, but i'm mainly concerned about the mention of on-chain check which is why i'm submitting this review early.
the concern i have is the flow implemented from #300 w.r.t. this step:
verify the piece is actually on SP: this is tricky because the SP can claim it has the piece but do we believe it? We have two options: either download the piece back from the SP and check that the bytes are what we want (hash or just byte compare to original), or just by proceeding to AddPieces and make the SP prove it we get a level of assurance that's probably acceptable
did you look into doing "download the piece back from the SP and check that the bytes are what we want"?
or is the "addpieces" simpler?
| jobsQueuedGauge: { set: vi.fn() } as unknown as JobsServiceDeps[8], | ||
| jobsRetryScheduledGauge: { set: vi.fn() } as unknown as JobsServiceDeps[9], | ||
| oldestQueuedAgeGauge: { set: vi.fn() } as unknown as JobsServiceDeps[10], | ||
| oldestInFlightAgeGauge: { set: vi.fn() } as unknown as JobsServiceDeps[11], | ||
| jobsInFlightGauge: { set: vi.fn() } as unknown as JobsServiceDeps[12], | ||
| jobsEnqueueAttemptsCounter: { inc: vi.fn() } as unknown as JobsServiceDeps[13], | ||
| jobsStartedCounter: { inc: vi.fn() } as unknown as JobsServiceDeps[14], | ||
| jobsCompletedCounter: { inc: vi.fn() } as unknown as JobsServiceDeps[15], | ||
| jobsPausedGauge: { set: vi.fn() } as unknown as JobsServiceDeps[16], | ||
| jobDuration: { observe: vi.fn() } as unknown as JobsServiceDeps[17], | ||
| storageProvidersActive: { set: vi.fn() } as unknown as JobsServiceDeps[18], | ||
| storageProvidersTested: { set: vi.fn() } as unknown as JobsServiceDeps[19], | ||
| jobsQueuedGauge: { set: vi.fn() } as unknown as JobsServiceDeps[9], | ||
| jobsRetryScheduledGauge: { set: vi.fn() } as unknown as JobsServiceDeps[10], | ||
| oldestQueuedAgeGauge: { set: vi.fn() } as unknown as JobsServiceDeps[11], | ||
| oldestInFlightAgeGauge: { set: vi.fn() } as unknown as JobsServiceDeps[12], | ||
| jobsInFlightGauge: { set: vi.fn() } as unknown as JobsServiceDeps[13], | ||
| jobsEnqueueAttemptsCounter: { inc: vi.fn() } as unknown as JobsServiceDeps[14], | ||
| jobsStartedCounter: { inc: vi.fn() } as unknown as JobsServiceDeps[15], | ||
| jobsCompletedCounter: { inc: vi.fn() } as unknown as JobsServiceDeps[16], | ||
| jobsPausedGauge: { set: vi.fn() } as unknown as JobsServiceDeps[17], | ||
| jobDuration: { observe: vi.fn() } as unknown as JobsServiceDeps[18], | ||
| storageProvidersActive: { set: vi.fn() } as unknown as JobsServiceDeps[19], | ||
| storageProvidersTested: { set: vi.fn() } as unknown as JobsServiceDeps[20], |
There was a problem hiding this comment.
us having to update this every time we add something new is quite annoying.
| **When to update**: | ||
|
|
||
| - Set to the public URL of your Dealbot deployment whenever pull checks are enabled and SPs cannot reach the bind address directly (the typical production case) | ||
| - Leave unset for local development where SPs reach Dealbot on `localhost` |
There was a problem hiding this comment.
- Leave unset for local development where SPs reach Dealbot on
localhost
for local development, SPs can't reach us on localhost. Do we have a method for testing the pull_check locally that will work?
There was a problem hiding this comment.
Yeah, SPs can’t reach us on localhost, so I tested it on a vps with dealbot exposed via an nginx proxy.
Actually, after the commit, I download the piece back from the SP, recompute the piece cid, and verify that it matches the original piece cid uploaded via dealbot. So I’ll skip the commit for now. |
SgtPooki
left a comment
There was a problem hiding this comment.
only a few changes required before I think I can give a ship-it..
| await this.repo.upsert( | ||
| { | ||
| pieceCid: registration.pieceCid, | ||
| providerAddress: registration.providerAddress, | ||
| key: registration.key, | ||
| size: registration.size, | ||
| pullSubmittedAt: null, | ||
| firstByteAt: null, | ||
| }, | ||
| ["pieceCid"], | ||
| ); |
There was a problem hiding this comment.
its possible this could persist in the DB for a while if a pull job fails mid-flight. we might want an "expires_at" field and an explicit cleanup
Resolves #519 (retention double-count) via auto-merge. Also guards openPullPieceStream against serving expired registrations: public /api/piece/:pieceCid endpoint now returns 404 when expires_at has passed, regardless of when sweep runs.
|
@silent-cipher : apologies for delay. I will look at the updated docs during my 2026-05-12 so this can get merged. |
BigLep
left a comment
There was a problem hiding this comment.
Thanks for making the updates. I had a couple of small comments. Feel free to merge after reviewing and incorporating where appropraite.
|
|
||
| ### 2. Submit the pull request | ||
|
|
||
| Dealbot calls `pullPieces` from `@filoz/synapse-core/sp` with the pieceCid, the source URL, and either the SP's existing `dataSetId`/`clientDataSetId` or the SP `payee` for new-dataset flows. The submission timestamp is stamped on the registration so it can later be subtracted from the first-byte event. |
There was a problem hiding this comment.
Ok, that's a good detail to maybe add to this section? Basically when we make the pullPieces call, we pass Dealbot as the payee to pass Curio's validation... ?
|
I've incorporated documentation comments in - 70a8539. |
closes #300