Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
c9bdfa4
feat: anon piece selection and retrieval
dennis-tra Apr 21, 2026
96c82c6
refactor(anon): only use clickhouse
dennis-tra Apr 29, 2026
81a38b1
feat(retrieval-anon): track ipni metrics
dennis-tra Apr 29, 2026
072a096
test(retrieval-anon): new ipni fields
dennis-tra Apr 29, 2026
1fcee60
refactor(retrieval-anon): function signatures
dennis-tra Apr 29, 2026
4527d29
refactor(retrieval-anon): cleanup
dennis-tra Apr 29, 2026
a797c15
chore: format code
dennis-tra Apr 29, 2026
54cc487
fix: biome checks
dennis-tra Apr 30, 2026
fcfe569
fix(ipni): return actual verified/unverfied counts
dennis-tra Apr 30, 2026
fb45bd0
refactor: store anon retrieval data primarily in postgres
dennis-tra Apr 30, 2026
92c40a8
Revert "refactor: store anon retrieval data primarily in postgres"
dennis-tra May 4, 2026
d4f7d80
refactor(retrieval-anon): introduce IpniCheckStatus enum and drop red…
dennis-tra May 4, 2026
ab3748a
remove(retrieval-anon): dedup window logic
dennis-tra May 4, 2026
beffac7
revert(ipni): sequential block CID verification
dennis-tra May 4, 2026
f26744b
docs(retrieval-anon): flow description and metrics definitions
dennis-tra May 4, 2026
5cee3ee
docs: add missing anonymous retrieval env vars
dennis-tra May 4, 2026
95a2dff
docs: fix obsolete reference to the pdp-explorer-owned subgraph
dennis-tra May 4, 2026
cff3171
improve: clarity around piece fetch status and commp validation
dennis-tra May 4, 2026
3c2a698
refactor: let two subgraph endpoints coexist
dennis-tra May 5, 2026
d82222f
refactor: reduce pr diff noise
dennis-tra May 5, 2026
527283f
remove: residual references to a pdp subgraph in the subgraph module
dennis-tra May 5, 2026
8dfb3ca
Apply suggestion from @BigLep
dennis-tra May 15, 2026
b8a2621
chore: align pnpm-lock.yaml with main
dennis-tra May 15, 2026
70af7c0
fix: wrong reference to an old maximum anon retrieval piece size
dennis-tra May 15, 2026
b003d78
docs: improve anon retrieval documentation
dennis-tra May 15, 2026
21b4f2d
docs: fix accidental changes to untouched event descriptions
dennis-tra May 15, 2026
a4f0b38
rename: metric anonRetrievalStatus to anonPieceRetrievalStatus
dennis-tra May 15, 2026
1a32373
fix: interpret abort signal as timed out for metric
dennis-tra May 15, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,5 @@ coverage/
# per-package lockfiles are stray
apps/*/pnpm-lock.yaml
!pnpm-lock.yaml

.tool-versions
18 changes: 15 additions & 3 deletions apps/backend/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ WALLET_ADDRESS=0x0000000000000000000000000000000000000000
WALLET_PRIVATE_KEY=your_private_key_here
CHECK_DATASET_CREATION_FEES=true
USE_ONLY_APPROVED_PROVIDERS=true
# Upstream pdp-explorer subgraph — drives the data-retention / overdue-periods path.
PDP_SUBGRAPH_ENDPOINT=https://api.thegraph.com/subgraphs/filecoin/pdp
# Dealbot-owned subgraph on Goldsky (see apps/subgraph/README.md) — drives only
# the new anonymous-retrieval candidate-piece query for now.
SUBGRAPH_ENDPOINT=https://api.goldsky.com/api/public/<project>/subgraphs/dealbot-subgraph/<version>/gn

# Minimum number of datasets per SP (default: 1). When > 1, a separate data_set_creation job provisions extra datasets.
MIN_NUM_DATASETS_FOR_CHECKS=1
Expand Down Expand Up @@ -52,6 +56,9 @@ DEALBOT_MAINTENANCE_WINDOW_MINUTES=20
DEALS_PER_SP_PER_HOUR=2
DATASET_CREATIONS_PER_SP_PER_HOUR=1
RETRIEVALS_PER_SP_PER_HOUR=1
RETRIEVALS_ANON_PER_SP_PER_HOUR=
ANON_RETRIEVAL_BLOCK_SAMPLE_COUNT=5
METRICS_PER_HOUR=2
PG_BOSS_LOCAL_CONCURRENCY=20
JOB_SCHEDULER_POLL_SECONDS=300
JOB_WORKER_POLL_SECONDS=60
Expand All @@ -60,6 +67,7 @@ JOB_SCHEDULE_PHASE_SECONDS=0
JOB_ENQUEUE_JITTER_SECONDS=0
DEAL_JOB_TIMEOUT_SECONDS=360 # 6m: Max runtime for deal jobs (TODO: reduce default to 3m)
RETRIEVAL_JOB_TIMEOUT_SECONDS=60 # 1m: Max runtime for retrieval jobs (TODO: reduce default to 30s)
ANON_RETRIEVAL_JOB_TIMEOUT_SECONDS=360 # 6m: Max runtime for anon retrieval jobs (pieces up to ~500 MiB)
IPFS_BLOCK_FETCH_CONCURRENCY=6 # Parallel block fetches when validating IPFS DAGs
DEALBOT_PGBOSS_POOL_MAX=1
DEALBOT_PGBOSS_SCHEDULER_ENABLED=true
Expand All @@ -73,9 +81,13 @@ PROXY_LIST=http://username:password@host:port,http://username:password@host:port
PROXY_LOCATIONS=l1,l2

# Timeout Configuration (in milliseconds)
CONNECT_TIMEOUT_MS=10000 # 10s: Initial connection timeout
HTTP_REQUEST_TIMEOUT_MS=240000 # 4m: Total transfer timeout for HTTP/1.1 (10MiB @ 170KB/s + overhead)
HTTP2_REQUEST_TIMEOUT_MS=240000 # 4m: Total transfer timeout for HTTP/2 (10MiB @ 170KB/s + overhead)
CONNECT_TIMEOUT_MS=10000 # 10s: Connection + response-headers timeout (scoped to the header phase only)
# HTTP_REQUEST_TIMEOUT_MS and HTTP2_REQUEST_TIMEOUT_MS default to the longest job timeout above
# (max of DEAL_/RETRIEVAL_/ANON_RETRIEVAL_/DATA_SET_CREATION_/MAX_PIECE_CLEANUP_ * 1000 ms) so the
# HTTP-level ceiling never pre-empts a job-scoped AbortSignal. Only override when you have a non-job
# caller of HttpClientService that needs a specific deadline.
# HTTP_REQUEST_TIMEOUT_MS=360000
# HTTP2_REQUEST_TIMEOUT_MS=360000

# SP Blocklists configuration
# BLOCKED_SP_IDS=1234,5678
Expand Down
1 change: 1 addition & 0 deletions apps/backend/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ All configuration is done via environment variables in `.env`.
| `ENABLE_IPNI_TESTING` | IPNI testing mode (`disabled`/`random`/`always`) | `always` |
| `USE_ONLY_APPROVED_PROVIDERS` | Only use approved storage providers | `true` |
| `PDP_SUBGRAPH_ENDPOINT` | PDP subgraph API endpoint for PDP proof-set/data-retention | `https://api.thegraph.com/subgraphs/filecoin/pdp` |
| `SUBGRAPH_ENDPOINT` | Subgraph GraphQL endpoint for anon-retrieval queries | `https://api.goldsky.com/api/public/<project>/subgraphs/dealbot-subgraph/<version>/gn` |

### Scheduling Configuration (pg-boss)

Expand Down
2 changes: 2 additions & 0 deletions apps/backend/src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { JobsModule } from "./jobs/jobs.module.js";
import { MetricsPrometheusModule } from "./metrics-prometheus/metrics-prometheus.module.js";
import { ProvidersModule } from "./providers/providers.module.js";
import { RetrievalModule } from "./retrieval/retrieval.module.js";
import { RetrievalAnonModule } from "./retrieval-anon/retrieval-anon.module.js";

@Module({
imports: [
Expand All @@ -28,6 +29,7 @@ import { RetrievalModule } from "./retrieval/retrieval.module.js";
JobsModule,
DealModule,
RetrievalModule,
RetrievalAnonModule,
DataSourceModule,
ProvidersModule,
...(process.env.ENABLE_DEV_MODE === "true" ? [DevToolsModule] : []),
Expand Down
44 changes: 44 additions & 0 deletions apps/backend/src/clickhouse/clickhouse.schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,50 @@ export function buildMigrations(database: string): string[] {
PARTITION BY toStartOfMonth(timestamp)
TTL toDateTime(timestamp) + INTERVAL 1 YEAR`,

`CREATE TABLE IF NOT EXISTS ${database}.anon_retrieval_checks
(
timestamp DateTime64(3, 'UTC'), -- when the check completed
probe_location LowCardinality(String), -- dealbot location
sp_address String, -- storage provider address (lowercased)
sp_id Nullable(UInt64), -- storage provider numeric id
sp_name Nullable(String), -- storage provider name

retrieval_id UUID, -- per-event correlation id (log/Prometheus join)

piece_cid String, -- piece CID (v2/CommP) sampled from the subgraph
data_set_id UInt64, -- on-chain data set id
piece_id UInt64, -- on-chain piece id within the data set
raw_size UInt64, -- raw (unpadded) piece size, bytes
with_ipfs_indexing Bool, -- whether the piece advertises IPNI metadata
ipfs_root_cid Nullable(String), -- root CID of the contained DAG; null when not IPFS-indexed

service_type LowCardinality(String), -- 'direct_sp' (only mode for anon retrievals today)
retrieval_endpoint String, -- URL probed (e.g. {spBaseUrl}/piece/{pieceCid})

piece_fetch_status LowCardinality(String), -- 'success' | 'failed' — HTTP transport outcome of GET /piece/<pieceCid> (HTTP 2xx). CommP validity, CAR/IPNI/block-fetch outcomes live in their own columns.
http_response_code Nullable(UInt16), -- raw HTTP status; null on transport failure
first_byte_ms Nullable(Float64), -- time to first response byte
last_byte_ms Nullable(Float64), -- time to last response byte
bytes_retrieved Nullable(UInt64), -- bytes received from /piece/{cid}
throughput_bps Nullable(UInt64), -- effective throughput, bytes per second
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is data that can be easily derived. Also is it (ttlb-ttfb)/bytes or simply ttlb/bytes?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's http response size / total time of the HTTP request

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah it could easily be derived, I agree


commp_valid Nullable(Bool), -- null when retrieval failed before CommP could be hashed
car_parseable Nullable(Bool), -- null when CAR validation was skipped (no IPFS indexing or piece fetch failed); true if bytes parsed as a CAR
car_block_count Nullable(UInt32), -- total number of blocks observed inside the CAR; null when skipped or unparseable
block_fetch_endpoint Nullable(String), -- gateway base URL probed for block fetch (e.g. {spBaseUrl}/ipfs/); null when skipped
block_fetch_valid Nullable(Bool), -- null when skipped; true if all sampled blocks fetched + hash-verified
block_fetch_sampled_count Nullable(UInt32), -- number of blocks sampled and probed via /ipfs/<cid>?format=raw
block_fetch_failed_count Nullable(UInt32), -- number of sampled blocks that failed (non-2xx, hash mismatch, unsupported codec, or transport error)

ipni_status LowCardinality(String), -- 'valid' | 'invalid' | 'skipped' | 'error' — all-or-nothing across the root CID and the sampled child CIDs (filecoin-pin verifies them as a single batch)
ipni_verify_ms Nullable(Float64), -- IPNI verification duration; null when skipped

error_message Nullable(String) -- failure reason; null on success
) ENGINE MergeTree()
PRIMARY KEY (probe_location, sp_address, timestamp)
PARTITION BY toStartOfMonth(timestamp)
TTL toDateTime(timestamp) + INTERVAL 1 YEAR`,

`CREATE TABLE IF NOT EXISTS ${database}.data_retention_challenges
(
timestamp DateTime64(3, 'UTC'), -- when the poll ran and detected these periods
Expand Down
91 changes: 83 additions & 8 deletions apps/backend/src/config/app.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,16 @@ export const configValidationSchema = Joi.object({
USE_ONLY_APPROVED_PROVIDERS: Joi.boolean().default(true),
DEALBOT_DATASET_VERSION: Joi.string().optional(),
MIN_NUM_DATASETS_FOR_CHECKS: Joi.number().integer().min(1).default(1),
// Two subgraph endpoints coexist intentionally to limit blast radius while we
// migrate off the upstream pdp-explorer subgraph:
// - PDP_SUBGRAPH_ENDPOINT drives the established overdue-periods / data
// retention path against the existing pdp-explorer subgraph.
// - SUBGRAPH_ENDPOINT drives only the new anonymous-retrieval candidate
// piece query against the dealbot-owned subgraph.
// Once the dealbot-owned subgraph has soaked in production we can drop
// PDP_SUBGRAPH_ENDPOINT and route everything through SUBGRAPH_ENDPOINT.
PDP_SUBGRAPH_ENDPOINT: Joi.string().uri().optional().allow(""),
SUBGRAPH_ENDPOINT: Joi.string().uri().optional().allow(""),

// Scheduling
PROVIDERS_REFRESH_INTERVAL_SECONDS: Joi.number().default(4 * 3600),
Expand All @@ -80,6 +89,7 @@ export const configValidationSchema = Joi.object({
DEALS_PER_SP_PER_HOUR: Joi.number().min(0.001).max(20).default(4),
DATASET_CREATIONS_PER_SP_PER_HOUR: Joi.number().min(0.001).max(20).default(1),
RETRIEVALS_PER_SP_PER_HOUR: Joi.number().min(0.001).max(20).default(2),
RETRIEVALS_ANON_PER_SP_PER_HOUR: Joi.number().min(0.001).max(20).optional(),
Copy link

Copilot AI Apr 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RETRIEVALS_ANON_PER_SP_PER_HOUR is declared as an optional number, but an empty-string value (as in the .env.example placeholder) will fail Joi number coercion and can prevent the app from booting. Consider adding .empty("") (or .allow("") with normalization) and/or providing a default in the schema to match loadConfig()’s fallback behavior.

Suggested change
RETRIEVALS_ANON_PER_SP_PER_HOUR: Joi.number().min(0.001).max(20).optional(),
RETRIEVALS_ANON_PER_SP_PER_HOUR: Joi.number().min(0.001).max(20).empty("").optional(),

Copilot uses AI. Check for mistakes.
// Polling interval for pg-boss scheduler (lower = more responsive, higher = less DB chatter).
JOB_SCHEDULER_POLL_SECONDS: Joi.number().min(60).default(300),
JOB_WORKER_POLL_SECONDS: Joi.number().min(5).default(60),
Expand All @@ -91,8 +101,10 @@ export const configValidationSchema = Joi.object({
JOB_ENQUEUE_JITTER_SECONDS: Joi.number().min(0).default(0),
DEAL_JOB_TIMEOUT_SECONDS: Joi.number().min(120).default(360), // 6 minutes max runtime for data storage jobs (TODO: reduce default to 3 minutes)
RETRIEVAL_JOB_TIMEOUT_SECONDS: Joi.number().min(60).default(60), // 1 minute max runtime for retrieval jobs (TODO: reduce default to 30 seconds)
ANON_RETRIEVAL_JOB_TIMEOUT_SECONDS: Joi.number().min(60).default(360), // 6 minutes max runtime for anon retrieval jobs (pieces can be up to 500 MiB)
DATA_SET_CREATION_JOB_TIMEOUT_SECONDS: Joi.number().min(60).default(300), // 5 minutes max runtime for dataset creation jobs
IPFS_BLOCK_FETCH_CONCURRENCY: Joi.number().integer().min(1).max(32).default(6),
ANON_RETRIEVAL_BLOCK_SAMPLE_COUNT: Joi.number().integer().min(1).max(50).default(5),

// Piece Cleanup
MAX_DATASET_STORAGE_SIZE_BYTES: Joi.number()
Expand Down Expand Up @@ -131,8 +143,9 @@ export const configValidationSchema = Joi.object({

// Timeouts (in milliseconds)
CONNECT_TIMEOUT_MS: Joi.number().min(1000).default(10000), // 10 seconds to establish connection/receive headers
HTTP_REQUEST_TIMEOUT_MS: Joi.number().min(1000).default(240000), // 4 minutes total for HTTP requests (10MiB @ 170KB/s + overhead)
HTTP2_REQUEST_TIMEOUT_MS: Joi.number().min(1000).default(240000), // 4 minutes total for HTTP/2 requests (10MiB @ 170KB/s + overhead)
// Defaults intentionally omitted so loadConfig can derive them from the longest job timeout.
HTTP_REQUEST_TIMEOUT_MS: Joi.number().min(1000).optional(),
HTTP2_REQUEST_TIMEOUT_MS: Joi.number().min(1000).optional(),
IPNI_VERIFICATION_TIMEOUT_MS: Joi.number().min(1000).default(60000), // 60 seconds max time to wait for IPNI verification
IPNI_VERIFICATION_POLLING_MS: Joi.number().min(250).default(2000), // 2 seconds between IPNI verification polls

Expand Down Expand Up @@ -174,6 +187,7 @@ export interface IBlockchainConfig {
dealbotDataSetVersion?: string;
minNumDataSetsForChecks: number;
pdpSubgraphEndpoint?: string;
subgraphEndpoint?: string; // Endpoint of the dealbot-owned subgraph. Eventually replaces `pdpSubgraphEndpoint`
}

export interface ISchedulingConfig {
Expand Down Expand Up @@ -264,6 +278,14 @@ export interface IJobsConfig {
* Uses AbortController to actively cancel job execution.
*/
retrievalJobTimeoutSeconds: number;
/**
* Maximum runtime (seconds) for anonymous retrieval jobs before forced abort.
*
* Anonymous retrievals fetch arbitrary pieces (up to ~500 MiB), so this is
* typically larger than `retrievalJobTimeoutSeconds`. Uses AbortController
* to actively cancel job execution while still persisting partial metrics.
*/
anonRetrievalJobTimeoutSeconds: number;
/**
* Target number of piece cleanup runs per storage provider per hour.
*
Expand All @@ -278,6 +300,12 @@ export interface IJobsConfig {
* Only used when `DEALBOT_JOBS_MODE=pgboss`.
*/
maxPieceCleanupRuntimeSeconds: number;

/**
* Target number of anonymous retrieval tests per storage provider per hour.
* Defaults to retrievalsPerSpPerHour when not set.
*/
retrievalsAnonPerSpPerHour: number;
}

export interface IDatasetConfig {
Expand All @@ -295,6 +323,10 @@ export interface ITimeoutConfig {

export interface IRetrievalConfig {
ipfsBlockFetchConcurrency: number;
/**
* Number of CAR blocks to sample for IPNI + block-fetch validation.
*/
anonBlockSampleCount: number;
}

export interface IPieceCleanupConfig {
Expand Down Expand Up @@ -336,6 +368,43 @@ export interface IConfig {
}

export function loadConfig(): IConfig {
const jobTimeoutSeconds = {
deal: Number.parseInt(process.env.DEAL_JOB_TIMEOUT_SECONDS || "360", 10),
retrieval: Number.parseInt(process.env.RETRIEVAL_JOB_TIMEOUT_SECONDS || "60", 10),
anonRetrieval: Number.parseInt(process.env.ANON_RETRIEVAL_JOB_TIMEOUT_SECONDS || "360", 10),
dataSetCreation: Number.parseInt(process.env.DATA_SET_CREATION_JOB_TIMEOUT_SECONDS || "300", 10),
pieceCleanup: Number.parseInt(process.env.MAX_PIECE_CLEANUP_RUNTIME_SECONDS || "300", 10),
};

// HTTP-level request timeouts default to the longest job timeout so the
// per-request ceiling never caps below the per-job budget. Any job-scoped
// AbortSignal fires first and is authoritative; the HTTP timer only kicks
// in for callers that do not pass a parent signal.
const longestJobTimeoutMs = Math.max(...Object.values(jobTimeoutSeconds)) * 1000;

const httpRequestTimeoutMs = Number.parseInt(process.env.HTTP_REQUEST_TIMEOUT_MS || String(longestJobTimeoutMs), 10);
const http2RequestTimeoutMs = Number.parseInt(
process.env.HTTP2_REQUEST_TIMEOUT_MS || String(longestJobTimeoutMs),
10,
);

// Misconfiguration guard: if someone explicitly sets an HTTP timeout below
// the longest job timeout, the HTTP-level timer will abort in-flight work
// before the job signal has a chance to report it. Warn loudly so this is
// caught at boot rather than inferred from short-timeout incidents later.
for (const [name, value] of [
["HTTP_REQUEST_TIMEOUT_MS", httpRequestTimeoutMs],
["HTTP2_REQUEST_TIMEOUT_MS", http2RequestTimeoutMs],
] as const) {
if (value < longestJobTimeoutMs) {
// eslint-disable-next-line no-console
console.warn(
`[config] ${name}=${value}ms is lower than the longest job timeout (${longestJobTimeoutMs}ms). ` +
`HTTP requests may abort before the job signal fires, producing short, unexplained timeouts.`,
);
}
}

return {
app: {
env: process.env.NODE_ENV || "development",
Expand Down Expand Up @@ -379,6 +448,7 @@ export function loadConfig(): IConfig {
dealbotDataSetVersion: process.env.DEALBOT_DATASET_VERSION,
minNumDataSetsForChecks: Number.parseInt(process.env.MIN_NUM_DATASETS_FOR_CHECKS || "1", 10),
pdpSubgraphEndpoint: process.env.PDP_SUBGRAPH_ENDPOINT || "",
subgraphEndpoint: process.env.SUBGRAPH_ENDPOINT || "",
},
scheduling: {
providersRefreshIntervalSeconds: Number.parseInt(process.env.PROVIDERS_REFRESH_INTERVAL_SECONDS || "14400", 10),
Expand All @@ -401,11 +471,15 @@ export function loadConfig(): IConfig {
catchupMaxEnqueue: Number.parseInt(process.env.JOB_CATCHUP_MAX_ENQUEUE || "10", 10),
schedulePhaseSeconds: Number.parseInt(process.env.JOB_SCHEDULE_PHASE_SECONDS || "0", 10),
enqueueJitterSeconds: Number.parseInt(process.env.JOB_ENQUEUE_JITTER_SECONDS || "0", 10),
dealJobTimeoutSeconds: Number.parseInt(process.env.DEAL_JOB_TIMEOUT_SECONDS || "360", 10),
retrievalJobTimeoutSeconds: Number.parseInt(process.env.RETRIEVAL_JOB_TIMEOUT_SECONDS || "60", 10),
dataSetCreationJobTimeoutSeconds: Number.parseInt(process.env.DATA_SET_CREATION_JOB_TIMEOUT_SECONDS || "300", 10),
dealJobTimeoutSeconds: jobTimeoutSeconds.deal,
retrievalJobTimeoutSeconds: jobTimeoutSeconds.retrieval,
anonRetrievalJobTimeoutSeconds: jobTimeoutSeconds.anonRetrieval,
retrievalsAnonPerSpPerHour: Number.parseFloat(
process.env.RETRIEVALS_ANON_PER_SP_PER_HOUR || process.env.RETRIEVALS_PER_SP_PER_HOUR || "2",
),
dataSetCreationJobTimeoutSeconds: jobTimeoutSeconds.dataSetCreation,
pieceCleanupPerSpPerHour: Number.parseFloat(process.env.JOB_PIECE_CLEANUP_PER_SP_PER_HOUR || String(1 / 24)),
maxPieceCleanupRuntimeSeconds: Number.parseInt(process.env.MAX_PIECE_CLEANUP_RUNTIME_SECONDS || "300", 10),
maxPieceCleanupRuntimeSeconds: jobTimeoutSeconds.pieceCleanup,
},
dataset: {
localDatasetsPath: process.env.DEALBOT_LOCAL_DATASETS_PATH || DEFAULT_LOCAL_DATASETS_PATH,
Expand All @@ -427,13 +501,14 @@ export function loadConfig(): IConfig {
},
timeouts: {
connectTimeoutMs: Number.parseInt(process.env.CONNECT_TIMEOUT_MS || "10000", 10),
httpRequestTimeoutMs: Number.parseInt(process.env.HTTP_REQUEST_TIMEOUT_MS || "240000", 10),
http2RequestTimeoutMs: Number.parseInt(process.env.HTTP2_REQUEST_TIMEOUT_MS || "240000", 10),
httpRequestTimeoutMs,
http2RequestTimeoutMs,
ipniVerificationTimeoutMs: Number.parseInt(process.env.IPNI_VERIFICATION_TIMEOUT_MS || "60000", 10),
ipniVerificationPollingMs: Number.parseInt(process.env.IPNI_VERIFICATION_POLLING_MS || "2000", 10),
},
retrieval: {
ipfsBlockFetchConcurrency: Number.parseInt(process.env.IPFS_BLOCK_FETCH_CONCURRENCY || "6", 10),
anonBlockSampleCount: Number.parseInt(process.env.ANON_RETRIEVAL_BLOCK_SAMPLE_COUNT || "5", 10),
},
clickhouse: {
url: process.env.CLICKHOUSE_URL || undefined,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -921,7 +921,7 @@ describe("DataRetentionService", () => {
expect(incCalls).toEqual(expect.arrayContaining([[10], [25]]));
});

it("reloads baselines from DB on every poll", async () => {
it("only loads baselines from DB once across multiple polls", async () => {
pdpSubgraphServiceMock.fetchProvidersWithDatasets.mockResolvedValue([makeProvider()]);

await service.pollDataRetention();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { Column, CreateDateColumn, Entity, Index, PrimaryGeneratedColumn, Update
export type JobType =
| "deal"
| "retrieval"
| "retrieval_anon"
| "data_set_creation"
| "metrics" // legacy: no longer scheduled; see RemoveMetricsJobScheduleRows migration. TODO(#457): remove.
| "metrics_cleanup" // legacy: no longer scheduled; see RemoveMetricsJobScheduleRows migration. TODO(#457): remove.
Expand Down
7 changes: 7 additions & 0 deletions apps/backend/src/database/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ export enum IpniStatus {
FAILED = "failed",
}

export enum IpniCheckStatus {
VALID = "valid",
INVALID = "invalid",
SKIPPED = "skipped",
ERROR = "error",
}

/**
* Metadata schema for deal storage and retrieval
*/
Expand Down
Loading