Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
drive run heartbeats from child process
  • Loading branch information
nicktrn committed Apr 16, 2025
commit 0d5bb9786d19880368dac1941720476cf5b3425b
2 changes: 1 addition & 1 deletion packages/cli-v3/src/entryPoints/managed-run-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,7 @@ const heartbeatInterval = parseInt(heartbeatIntervalMs ?? "30000", 10);
for await (const _ of setInterval(heartbeatInterval)) {
if (_isRunning && _execution) {
try {
await zodIpc.send("TASK_HEARTBEAT", { id: _execution.attempt.id });
await zodIpc.send("TASK_HEARTBEAT", { id: _execution.run.id });
} catch (err) {
console.error("Failed to send HEARTBEAT message", err);
}
Expand Down
60 changes: 44 additions & 16 deletions packages/cli-v3/src/entryPoints/managed/execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import { RunLogger, SendDebugLogOptions } from "./logger.js";
import { RunnerEnv } from "./env.js";
import { WorkloadHttpClient } from "@trigger.dev/core/v3/workers";
import { setTimeout as sleep } from "timers/promises";
import { RunExecutionHeartbeat } from "./heartbeat.js";
import { RunExecutionSnapshotPoller } from "./poller.js";
import { assertExhaustive, tryCatch } from "@trigger.dev/core/utils";
import { MetadataClient } from "./overrides.js";
Expand Down Expand Up @@ -63,7 +62,6 @@ export class RunExecution {
private restoreCount: number;

private taskRunProcess?: TaskRunProcess;
private runHeartbeat?: RunExecutionHeartbeat;
private snapshotPoller?: RunExecutionSnapshotPoller;

constructor(opts: RunExecutionOptions) {
Expand Down Expand Up @@ -105,7 +103,7 @@ export class RunExecution {
envVars: Record<string, string>;
isWarmStart?: boolean;
}) {
return new TaskRunProcess({
const taskRunProcess = new TaskRunProcess({
workerManifest: this.workerManifest,
env: {
...envVars,
Expand All @@ -123,6 +121,29 @@ export class RunExecution {
},
isWarmStart,
}).initialize();

taskRunProcess.onTaskRunHeartbeat.attach(async (runId) => {
if (!this.runFriendlyId) {
this.sendDebugLog("onTaskRunHeartbeat: missing run ID", { heartbeatRunId: runId });
return;
}

if (runId !== this.runFriendlyId) {
this.sendDebugLog("onTaskRunHeartbeat: mismatched run ID", {
heartbeatRunId: runId,
expectedRunId: this.runFriendlyId,
});
return;
}

const [error] = await tryCatch(this.onHeartbeat());

if (error) {
this.sendDebugLog("onTaskRunHeartbeat: failed", { error: error.message });
}
});

return taskRunProcess;
}

/**
Expand Down Expand Up @@ -229,7 +250,6 @@ export class RunExecution {
this.currentSnapshotId = snapshot.friendlyId;

// Update services
this.runHeartbeat?.updateSnapshotId(snapshot.friendlyId);
this.snapshotPoller?.updateSnapshotId(snapshot.friendlyId);

switch (snapshot.executionStatus) {
Expand Down Expand Up @@ -450,13 +470,6 @@ export class RunExecution {
this.podScheduledAt = runOpts.podScheduledAt;

// Create and start services
this.runHeartbeat = new RunExecutionHeartbeat({
runFriendlyId: this.runFriendlyId,
snapshotFriendlyId: this.currentSnapshotId,
httpClient: this.httpClient,
logger: this.logger,
heartbeatIntervalSeconds: this.env.TRIGGER_HEARTBEAT_INTERVAL_SECONDS,
});
this.snapshotPoller = new RunExecutionSnapshotPoller({
runFriendlyId: this.runFriendlyId,
snapshotFriendlyId: this.currentSnapshotId,
Expand All @@ -466,7 +479,6 @@ export class RunExecution {
handleSnapshotChange: this.handleSnapshotChange.bind(this),
});

this.runHeartbeat.start();
this.snapshotPoller.start();

const [startError, start] = await tryCatch(
Expand Down Expand Up @@ -839,9 +851,6 @@ export class RunExecution {
this.env.override(overrides);

// Update services with new values
if (overrides.TRIGGER_HEARTBEAT_INTERVAL_SECONDS) {
this.runHeartbeat?.updateInterval(this.env.TRIGGER_HEARTBEAT_INTERVAL_SECONDS * 1000);
}
if (overrides.TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS) {
this.snapshotPoller?.updateInterval(this.env.TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS * 1000);
}
Expand All @@ -857,6 +866,26 @@ export class RunExecution {
}
}

private async onHeartbeat() {
if (!this.runFriendlyId) {
this.sendDebugLog("Heartbeat: missing run ID");
return;
}

if (!this.currentSnapshotId) {
this.sendDebugLog("Heartbeat: missing snapshot ID");
return;
}

this.sendDebugLog("Heartbeat: started");

const response = await this.httpClient.heartbeatRun(this.runFriendlyId, this.currentSnapshotId);

if (!response.success) {
this.sendDebugLog("Heartbeat: failed", { error: response.error });
}
}

sendDebugLog(
message: string,
properties?: SendDebugLogOptions["properties"],
Expand Down Expand Up @@ -917,7 +946,6 @@ export class RunExecution {
}

private stopServices() {
this.runHeartbeat?.stop();
this.snapshotPoller?.stop();
}
}