Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
d05720c
Create performance test harness for runs replication service
ericallam Jan 9, 2026
57861d3
improve test harness producer throughput and better organize run outputs
ericallam Jan 9, 2026
0bf8cfa
use a less CPU-intensive way of inserting task runs
ericallam Jan 9, 2026
be64766
use compact insert strategy for runs
ericallam Jan 10, 2026
32f758a
added back in max duration in seconds
ericallam Jan 10, 2026
4f946a4
cleanup the types
ericallam Jan 10, 2026
df12287
simplify
ericallam Jan 10, 2026
55ed118
much better type safety
ericallam Jan 10, 2026
44c5267
fixed types
ericallam Jan 11, 2026
284c196
fix clickhouse tests
ericallam Jan 11, 2026
a2fa855
really fix clickhouse tests
ericallam Jan 11, 2026
f8640f6
Add object-based insert functions and fix index generation
ericallam Jan 11, 2026
5f40360
Fix TypeScript errors in sort functions
ericallam Jan 11, 2026
ca37d7e
Fix sort comparators to return 0 for equal values
ericallam Jan 11, 2026
d1129d7
Remove performance test harness
ericallam Jan 11, 2026
df3ea39
Remove remaining performance harness artifacts
ericallam Jan 11, 2026
b9db398
Update pnpm-lock.yaml
ericallam Jan 11, 2026
4de8899
Stop dynamically loading superjson
ericallam Jan 12, 2026
777db7c
made accessing run and payload fields more type safe
ericallam Jan 13, 2026
8778403
speed up deduplicating of runs by making it more efficient
ericallam Jan 13, 2026
ec6ec2f
Fixed tests and made them less loggy
ericallam Jan 13, 2026
5ddabad
Fixed tests for realz
ericallam Jan 13, 2026
443f63b
added back in null check in getKey
ericallam Jan 13, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
use compact insert strategy for runs
  • Loading branch information
ericallam committed Jan 12, 2026
commit be6476660c53ecabf37212f8a37dff470c238574
173 changes: 85 additions & 88 deletions apps/webapp/app/services/runsReplicationService.server.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { ClickHouse, RawTaskRunPayloadV1, TaskRunV2 } from "@internal/clickhouse";
import { TASK_RUN_COLUMNS, PAYLOAD_COLUMNS } from "@internal/clickhouse";
import { type RedisOptions } from "@internal/redis";
import {
LogicalReplicationClient,
Expand Down Expand Up @@ -80,9 +81,7 @@ type TaskRunInsert = {

export type RunsReplicationServiceEvents = {
message: [{ lsn: string; message: PgoutputMessage; service: RunsReplicationService }];
batchFlushed: [
{ flushId: string; taskRunInserts: TaskRunV2[]; payloadInserts: RawTaskRunPayloadV1[] }
];
batchFlushed: [{ flushId: string; taskRunInserts: any[][]; payloadInserts: any[][] }];
};

export class RunsReplicationService {
Expand Down Expand Up @@ -171,12 +170,9 @@ export class RunsReplicationService {
description: "Insert retry attempts",
});

this._eventsProcessedCounter = this._meter.createCounter(
"runs_replication.events_processed",
{
description: "Replication events processed (inserts, updates, deletes)",
}
);
this._eventsProcessedCounter = this._meter.createCounter("runs_replication.events_processed", {
description: "Replication events processed (inserts, updates, deletes)",
});

this._flushDurationHistogram = this._meter.createHistogram(
"runs_replication.flush_duration_ms",
Expand Down Expand Up @@ -581,29 +577,35 @@ export class RunsReplicationService {
.filter(Boolean)
// batch inserts in clickhouse are more performant if the items
// are pre-sorted by the primary key
// Array indices: [0]=environment_id, [1]=organization_id, [2]=project_id, [3]=run_id, [5]=created_at
.sort((a, b) => {
if (a.organization_id !== b.organization_id) {
return a.organization_id < b.organization_id ? -1 : 1;
if (a[1] !== b[1]) {
// organization_id
return a[1] < b[1] ? -1 : 1;
}
if (a.project_id !== b.project_id) {
return a.project_id < b.project_id ? -1 : 1;
if (a[2] !== b[2]) {
// project_id
return a[2] < b[2] ? -1 : 1;
}
if (a.environment_id !== b.environment_id) {
return a.environment_id < b.environment_id ? -1 : 1;
if (a[0] !== b[0]) {
// environment_id
return a[0] < b[0] ? -1 : 1;
}
if (a.created_at !== b.created_at) {
return a.created_at - b.created_at;
if (a[5] !== b[5]) {
// created_at
return a[5] - b[5];
}
return a.run_id < b.run_id ? -1 : 1;
return a[3] < b[3] ? -1 : 1; // run_id
});

const payloadInserts = preparedInserts
.map(({ payloadInsert }) => payloadInsert)
.filter(Boolean)
// batch inserts in clickhouse are more performant if the items
// are pre-sorted by the primary key
// Array indices: [0]=run_id
.sort((a, b) => {
return a.run_id < b.run_id ? -1 : 1;
return a[0] < b[0] ? -1 : 1; // run_id
});

span.setAttribute("task_run_inserts", taskRunInserts.length);
Expand Down Expand Up @@ -633,7 +635,6 @@ export class RunsReplicationService {
this.logger.error("Error inserting task run inserts", {
error: taskRunError,
flushId,
runIds: taskRunInserts.map((r) => r.run_id),
});
recordSpanError(span, taskRunError);
}
Expand All @@ -642,7 +643,6 @@ export class RunsReplicationService {
this.logger.error("Error inserting payload inserts", {
error: payloadError,
flushId,
runIds: payloadInserts.map((r) => r.run_id),
});
recordSpanError(span, payloadError);
}
Expand Down Expand Up @@ -770,16 +770,14 @@ export class RunsReplicationService {
}
}

async #insertTaskRunInserts(taskRunInserts: TaskRunV2[], attempt: number) {
async #insertTaskRunInserts(taskRunInserts: any[][], attempt: number) {
return await startSpan(this._tracer, "insertTaskRunsInserts", async (span) => {
const [insertError, insertResult] = await this.options.clickhouse.taskRuns.insertUnsafe(
taskRunInserts,
{
const [insertError, insertResult] =
await this.options.clickhouse.taskRuns.insertCompactArrays(taskRunInserts, {
params: {
clickhouse_settings: this.#getClickhouseInsertSettings(),
},
}
);
});

if (insertError) {
this.logger.error("Error inserting task run inserts attempt", {
Expand All @@ -795,16 +793,14 @@ export class RunsReplicationService {
});
}

async #insertPayloadInserts(payloadInserts: RawTaskRunPayloadV1[], attempt: number) {
async #insertPayloadInserts(payloadInserts: any[][], attempt: number) {
return await startSpan(this._tracer, "insertPayloadInserts", async (span) => {
const [insertError, insertResult] = await this.options.clickhouse.taskRuns.insertPayloadsUnsafe(
payloadInserts,
{
const [insertError, insertResult] =
await this.options.clickhouse.taskRuns.insertPayloadsCompactArrays(payloadInserts, {
params: {
clickhouse_settings: this.#getClickhouseInsertSettings(),
},
}
);
});

if (insertError) {
this.logger.error("Error inserting payload inserts attempt", {
Expand All @@ -822,7 +818,7 @@ export class RunsReplicationService {

async #prepareRunInserts(
batchedRun: TaskRunInsert
): Promise<{ taskRunInsert?: TaskRunV2; payloadInsert?: RawTaskRunPayloadV1 }> {
): Promise<{ taskRunInsert?: any[]; payloadInsert?: any[] }> {
this.logger.debug("Preparing run", {
batchedRun,
});
Expand Down Expand Up @@ -875,66 +871,67 @@ export class RunsReplicationService {
environmentType: string,
event: "insert" | "update" | "delete",
_version: bigint
): Promise<TaskRunV2> {
): Promise<any[]> {
const output = await this.#prepareJson(run.output, run.outputType);

return {
environment_id: run.runtimeEnvironmentId,
organization_id: organizationId,
project_id: run.projectId,
run_id: run.id,
updated_at: run.updatedAt.getTime(),
created_at: run.createdAt.getTime(),
status: run.status,
environment_type: environmentType,
friendly_id: run.friendlyId,
engine: run.engine,
task_identifier: run.taskIdentifier,
queue: run.queue,
span_id: run.spanId,
trace_id: run.traceId,
error: { data: run.error },
attempt: run.attemptNumber ?? 1,
schedule_id: run.scheduleId ?? "",
batch_id: run.batchId ?? "",
completed_at: run.completedAt?.getTime(),
started_at: run.startedAt?.getTime(),
executed_at: run.executedAt?.getTime(),
delay_until: run.delayUntil?.getTime(),
queued_at: run.queuedAt?.getTime(),
expired_at: run.expiredAt?.getTime(),
usage_duration_ms: run.usageDurationMs,
cost_in_cents: run.costInCents,
base_cost_in_cents: run.baseCostInCents,
tags: run.runTags ?? [],
task_version: run.taskVersion ?? "",
sdk_version: run.sdkVersion ?? "",
cli_version: run.cliVersion ?? "",
machine_preset: run.machinePreset ?? "",
root_run_id: run.rootTaskRunId ?? "",
parent_run_id: run.parentTaskRunId ?? "",
depth: run.depth,
is_test: run.isTest,
idempotency_key: run.idempotencyKey ?? "",
expiration_ttl: run.ttl ?? "",
output,
concurrency_key: run.concurrencyKey ?? "",
bulk_action_group_ids: run.bulkActionGroupIds ?? [],
worker_queue: run.masterQueue,
max_duration_in_seconds: run.maxDurationInSeconds ?? undefined,
_version: _version.toString(),
_is_deleted: event === "delete" ? 1 : 0,
};
// Return array matching TASK_RUN_COLUMNS order
return [
run.runtimeEnvironmentId, // environment_id
organizationId, // organization_id
run.projectId, // project_id
run.id, // run_id
run.updatedAt.getTime(), // updated_at
run.createdAt.getTime(), // created_at
run.status, // status
environmentType, // environment_type
run.friendlyId, // friendly_id
run.attemptNumber ?? 1, // attempt
run.engine, // engine
run.taskIdentifier, // task_identifier
run.queue, // queue
run.scheduleId ?? "", // schedule_id
run.batchId ?? "", // batch_id
run.completedAt?.getTime() ?? null, // completed_at
run.startedAt?.getTime() ?? null, // started_at
run.executedAt?.getTime() ?? null, // executed_at
run.delayUntil?.getTime() ?? null, // delay_until
run.queuedAt?.getTime() ?? null, // queued_at
run.expiredAt?.getTime() ?? null, // expired_at
run.usageDurationMs ?? 0, // usage_duration_ms
run.costInCents ?? 0, // cost_in_cents
run.baseCostInCents ?? 0, // base_cost_in_cents
output, // output
{ data: run.error }, // error
run.runTags ?? [], // tags
run.taskVersion ?? "", // task_version
run.sdkVersion ?? "", // sdk_version
run.cliVersion ?? "", // cli_version
run.machinePreset ?? "", // machine_preset
run.rootTaskRunId ?? "", // root_run_id
run.parentTaskRunId ?? "", // parent_run_id
run.depth ?? 0, // depth
run.spanId, // span_id
run.traceId, // trace_id
run.idempotencyKey ?? "", // idempotency_key
run.ttl ?? "", // expiration_ttl
run.isTest ?? false, // is_test
run.concurrencyKey ?? "", // concurrency_key
run.bulkActionGroupIds ?? [], // bulk_action_group_ids
run.masterQueue ?? "", // worker_queue
_version.toString(), // _version
event === "delete" ? 1 : 0, // _is_deleted
];
}

async #preparePayloadInsert(run: TaskRun, _version: bigint): Promise<RawTaskRunPayloadV1> {
async #preparePayloadInsert(run: TaskRun, _version: bigint): Promise<any[]> {
const payload = await this.#prepareJson(run.payload, run.payloadType);

return {
run_id: run.id,
created_at: run.createdAt.getTime(),
payload,
};
// Return array matching PAYLOAD_COLUMNS order
return [
run.id, // run_id
run.createdAt.getTime(), // created_at
payload, // payload
];
}

async #prepareJson(
Expand Down
25 changes: 17 additions & 8 deletions apps/webapp/test/performance/harness.ts
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,7 @@ export class RunsReplicationHarness {
const clickhouse = new ClickHouse({
url: this.config.infrastructure.clickhouseUrl!,
name: "profiling-migrator",
logLevel: "error", // Suppress migration spam - we handle errors ourselves
});

try {
Expand All @@ -523,10 +524,11 @@ export class RunsReplicationHarness {
.filter((f) => f.endsWith(".sql"))
.sort(); // Ensure numeric ordering

console.log(`Found ${sqlFiles.length} migration files`);
// Suppress verbose output - only show errors
let successCount = 0;
let skippedCount = 0;

for (const file of sqlFiles) {
console.log(` Running migration: ${file}`);
const sql = await fs.readFile(path.join(migrationsPath, file), "utf-8");

// Parse goose migration file - only execute "up" section
Expand Down Expand Up @@ -567,21 +569,28 @@ export class RunsReplicationHarness {

for (const statement of statements) {
try {
console.log(` Executing: ${statement.substring(0, 80)}...`);
// Use the underlying client's command method
await (clickhouse.writer as any).client.command({ query: statement });
console.log(` ✓ Success`);
successCount++;
} catch (error: any) {
// Ignore "already exists" errors
if (error.message?.includes("already exists") || error.message?.includes("ALREADY_EXISTS")) {
console.log(` ⊘ Skipped (already exists)`);
// Ignore "already exists" errors silently
if (
error.message?.includes("already exists") ||
error.message?.includes("ALREADY_EXISTS") ||
error.code === "57"
) {
skippedCount++;
} else {
console.log(` ✗ Error: ${error.message}`);
console.error(`✗ Migration error in ${file}: ${error.message}`);
throw error;
}
}
}
}

if (successCount > 0 || skippedCount > 0) {
console.log(`Migrations: ${successCount} applied, ${skippedCount} skipped`);
}
} finally {
await clickhouse.close();
}
Expand Down
43 changes: 20 additions & 23 deletions apps/webapp/test/runsReplicationService.part2.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -889,18 +889,12 @@ describe("RunsReplicationService (part 2/2)", () => {
await setTimeout(1000);

expect(batchFlushedEvents?.[0].taskRunInserts).toHaveLength(2);
expect(batchFlushedEvents?.[0].taskRunInserts[0]).toEqual(
expect.objectContaining({
run_id: run.id,
status: "PENDING_VERSION",
})
);
expect(batchFlushedEvents?.[0].taskRunInserts[1]).toEqual(
expect.objectContaining({
run_id: run.id,
status: "COMPLETED_SUCCESSFULLY",
})
);
// taskRunInserts are now arrays, not objects
// Index 3 is run_id, Index 6 is status
expect(batchFlushedEvents?.[0].taskRunInserts[0][3]).toEqual(run.id); // run_id
expect(batchFlushedEvents?.[0].taskRunInserts[0][6]).toEqual("PENDING_VERSION"); // status
expect(batchFlushedEvents?.[0].taskRunInserts[1][3]).toEqual(run.id); // run_id
expect(batchFlushedEvents?.[0].taskRunInserts[1][6]).toEqual("COMPLETED_SUCCESSFULLY"); // status

await runsReplicationService.stop();
}
Expand Down Expand Up @@ -1065,23 +1059,24 @@ describe("RunsReplicationService (part 2/2)", () => {
expect(batchFlushedEvents[0]?.payloadInserts.length).toBeGreaterThan(1);

// Verify sorting order: organization_id, project_id, environment_id, created_at, run_id
// taskRunInserts are now arrays: [0]=environment_id, [1]=organization_id, [2]=project_id, [3]=run_id, [5]=created_at
for (let i = 1; i < batchFlushedEvents[0]?.taskRunInserts.length; i++) {
const prev = batchFlushedEvents[0]?.taskRunInserts[i - 1];
const curr = batchFlushedEvents[0]?.taskRunInserts[i];

const prevKey = [
prev.organization_id,
prev.project_id,
prev.environment_id,
prev.created_at,
prev.run_id,
prev[1], // organization_id
prev[2], // project_id
prev[0], // environment_id
prev[5], // created_at
prev[3], // run_id
];
const currKey = [
curr.organization_id,
curr.project_id,
curr.environment_id,
curr.created_at,
curr.run_id,
curr[1], // organization_id
curr[2], // project_id
curr[0], // environment_id
curr[5], // created_at
curr[3], // run_id
];

const keysAreEqual = prevKey.every((val, idx) => val === currKey[idx]);
Expand Down Expand Up @@ -1111,7 +1106,9 @@ describe("RunsReplicationService (part 2/2)", () => {
for (let i = 1; i < batchFlushedEvents[0]?.payloadInserts.length; i++) {
const prev = batchFlushedEvents[0]?.payloadInserts[i - 1];
const curr = batchFlushedEvents[0]?.payloadInserts[i];
expect(prev.run_id <= curr.run_id).toBeTruthy();
// payloadInserts are now arrays, not objects
// Index 0 is run_id
expect(prev[0] <= curr[0]).toBeTruthy();
}

await runsReplicationService.stop();
Expand Down
Loading