Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
f59bad9
Revamping the lifecycle hooks, starting with init
ericallam Mar 21, 2025
0215a8c
vibes
ericallam Mar 21, 2025
2c44e59
init.ts at the root of the trigger dir is now automatically loaded
ericallam Mar 21, 2025
17e1623
Improve init lifecycle hook types and fix tabler icons on spans
ericallam Mar 21, 2025
1777ff4
move onStart to the new lifecycle hook system
ericallam Mar 21, 2025
be676fd
onFailure
ericallam Mar 22, 2025
06f6a04
onStart
ericallam Mar 22, 2025
e816ba4
onComplete
ericallam Mar 22, 2025
aaf2ed8
onWait and onResume
ericallam Mar 22, 2025
e77e8d4
handleError
ericallam Mar 22, 2025
597b7ba
adding imports
ericallam Mar 22, 2025
e34e520
more hooks
ericallam Mar 23, 2025
dc6e659
new locals API
ericallam Mar 24, 2025
6001dfa
Add middleware types
ericallam Mar 24, 2025
2d16d66
Add middleware hooks
ericallam Mar 24, 2025
2e6d0d8
share the hook registration code
ericallam Mar 24, 2025
1167889
use new onStart
ericallam Mar 24, 2025
b974d82
use new onFailure
ericallam Mar 24, 2025
307309f
implement onComplete
ericallam Mar 24, 2025
4a8d693
a couple tweaks
ericallam Mar 24, 2025
ef871b0
starting test executor
ericallam Mar 24, 2025
2262a42
more tests and fixes
ericallam Mar 24, 2025
91225da
test on failure
ericallam Mar 24, 2025
87824a8
dry up some stuff
ericallam Mar 24, 2025
f84ddcd
test oncomplete
ericallam Mar 24, 2025
6ab74c4
implement and test handleError (now catchError)
ericallam Mar 24, 2025
7f804f3
middleware working and tests passing
ericallam Mar 25, 2025
5edbb77
use tryCatch in TaskExecutor
ericallam Mar 25, 2025
6ba2b04
more tests
ericallam Mar 25, 2025
d0a5c16
Add cleanup hook
ericallam Mar 25, 2025
30705fd
implement cleanup
ericallam Mar 25, 2025
23ae77a
handle max duration timeout errors better
ericallam Mar 25, 2025
ba520fc
Make sure and register all the config hooks
ericallam Mar 25, 2025
df9cad6
Get it all working
ericallam Mar 25, 2025
3ca5985
Hooks now all use the new types, and adding some spans
ericallam Mar 25, 2025
268c9f6
implement onWait/onResume
ericallam Mar 25, 2025
887c5ca
Add changeset
ericallam Mar 25, 2025
e745fcb
Remove console.log
ericallam Mar 25, 2025
1def8d0
Use allSettled so onWait/onResume errors don't break anything
ericallam Mar 25, 2025
6d5d10a
Support other init file names
ericallam Mar 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
implement cleanup
  • Loading branch information
ericallam committed Mar 25, 2025
commit 30705fd59bdc1d17f1e14afec2a320db072e81d3
91 changes: 84 additions & 7 deletions packages/core/src/v3/workers/taskExecutor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ export class TaskExecutor {
);
}

await this.#cleanupAndWaitUntil(payload, ctx, initOutput, signal);

return {
id: execution.run.id,
ok: false,
Expand All @@ -191,6 +193,8 @@ export class TaskExecutor {

if (outputError) {
recordSpanException(span, outputError);
await this.#cleanupAndWaitUntil(payload, ctx, initOutput, signal);

return this.#internalErrorResult(
execution,
TaskRunErrorCodes.TASK_OUTPUT_ERROR,
Expand All @@ -208,6 +212,8 @@ export class TaskExecutor {

if (exportError) {
recordSpanException(span, exportError);
await this.#cleanupAndWaitUntil(payload, ctx, initOutput, signal);

return this.#internalErrorResult(
execution,
TaskRunErrorCodes.TASK_OUTPUT_ERROR,
Expand Down Expand Up @@ -236,6 +242,8 @@ export class TaskExecutor {
signal
);

await this.#cleanupAndWaitUntil(payload, ctx, initOutput, signal);

return {
ok: true,
id: execution.run.id,
Expand Down Expand Up @@ -690,21 +698,90 @@ export class TaskExecutor {
);
}

async #callTaskCleanup(
async #cleanupAndWaitUntil(
payload: unknown,
ctx: TaskRunContext,
init: unknown,
initOutput: any,
signal?: AbortSignal
) {
const cleanupFn = this.task.fns.cleanup;
await this.#callCleanupFunctions(payload, ctx, initOutput, signal);
await this.#blockForWaitUntil();
}

if (!cleanupFn) {
async #callCleanupFunctions(
payload: unknown,
ctx: TaskRunContext,
initOutput: any,
signal?: AbortSignal
) {
const globalCleanupHooks = lifecycleHooks.getGlobalCleanupHooks();
const taskCleanupHook = lifecycleHooks.getTaskCleanupHook(this.task.id);

if (globalCleanupHooks.length === 0 && !taskCleanupHook) {
return;
}

return this._tracer.startActiveSpan("cleanup", async (span) => {
return await cleanupFn(payload, { ctx, init, signal });
});
return this._tracer.startActiveSpan(
"hooks.cleanup",
async (span) => {
return await runTimelineMetrics.measureMetric(
"trigger.dev/execution",
"cleanup",
async () => {
for (const hook of globalCleanupHooks) {
const [hookError] = await tryCatch(
this._tracer.startActiveSpan(
hook.name ?? "global",
async (span) => {
await hook.fn({
payload,
ctx,
signal,
task: this.task.id,
init: initOutput,
});
},
{
attributes: {
[SemanticInternalAttributes.STYLE_ICON]: "tabler-function",
},
}
)
);
// Ignore errors from cleanup functions
}

if (taskCleanupHook) {
const [hookError] = await tryCatch(
this._tracer.startActiveSpan(
"task",
async (span) => {
await taskCleanupHook({
payload,
ctx,
signal,
task: this.task.id,
init: initOutput,
});
},
{
attributes: {
[SemanticInternalAttributes.STYLE_ICON]: "tabler-function",
},
}
)
);
// Ignore errors from cleanup functions
}
}
);
},
{
attributes: {
[SemanticInternalAttributes.STYLE_ICON]: "tabler-function",
},
}
);
}

async #blockForWaitUntil() {
Expand Down
216 changes: 215 additions & 1 deletion packages/core/test/taskExecutor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -779,7 +779,8 @@ describe("TaskExecutor", () => {
},
});

expect((result as any).result.retry.delay).toBeCloseTo(30000, -1);
expect((result as any).result.retry.delay).toBeGreaterThan(29900);
expect((result as any).result.retry.delay).toBeLessThan(30100);
});

test("should execute middleware hooks in correct order around other hooks", async () => {
Expand Down Expand Up @@ -1203,6 +1204,219 @@ describe("TaskExecutor", () => {
},
});
});

test("should call cleanup hooks in correct order after other hooks but before middleware completion", async () => {
const executionOrder: string[] = [];

// Register global init hook
lifecycleHooks.registerGlobalInitHook({
id: "test-init",
fn: async () => {
executionOrder.push("init");
return {
foo: "bar",
};
},
});

// Register global start hook
lifecycleHooks.registerGlobalStartHook({
id: "global-start",
fn: async () => {
executionOrder.push("start");
},
});

// Register global success hook
lifecycleHooks.registerGlobalSuccessHook({
id: "global-success",
fn: async () => {
executionOrder.push("success");
},
});

// Register global complete hook
lifecycleHooks.registerGlobalCompleteHook({
id: "global-complete",
fn: async () => {
executionOrder.push("complete");
},
});

// Register global cleanup hooks
lifecycleHooks.registerGlobalCleanupHook({
id: "global-cleanup-1",
fn: async ({ init }) => {
executionOrder.push("global-cleanup-1");
// Verify we have access to init data
expect(init).toEqual({ foo: "bar" });
},
});

lifecycleHooks.registerGlobalCleanupHook({
id: "global-cleanup-2",
fn: async ({ init }) => {
executionOrder.push("global-cleanup-2");
// Verify we have access to init data
expect(init).toEqual({ foo: "bar" });
},
});

// Register task-specific cleanup hook
lifecycleHooks.registerTaskCleanupHook("test-task", {
id: "task-cleanup",
fn: async ({ init }) => {
executionOrder.push("task-cleanup");
// Verify we have access to init data
expect(init).toEqual({ foo: "bar" });
},
});

// Register middleware to verify cleanup happens before middleware completion
lifecycleHooks.registerGlobalMiddlewareHook({
id: "global-middleware",
fn: async ({ next }) => {
executionOrder.push("middleware-before");
await next();
executionOrder.push("middleware-after");
},
});

const task = {
id: "test-task",
fns: {
run: async (payload: any, params: RunFnParams<any>) => {
executionOrder.push("run");
return {
output: "test-output",
init: params.init,
};
},
},
};

const result = await executeTask(task, { test: "data" });

// Verify the execution order:
// 1. Middleware starts
// 2. Init hook
// 3. Start hook
// 4. Run function
// 5. Success hook
// 6. Complete hook
// 7. Cleanup hooks
// 8. Middleware completes
expect(executionOrder).toEqual([
"middleware-before",
"init",
"start",
"run",
"success",
"complete",
"global-cleanup-1",
"global-cleanup-2",
"task-cleanup",
"middleware-after",
]);

// Verify the final result
expect(result).toEqual({
result: {
ok: true,
id: "test-run-id",
output: '{"json":{"output":"test-output","init":{"foo":"bar"}}}',
outputType: "application/super+json",
},
});
});

test("should call cleanup hooks even when task fails", async () => {
const executionOrder: string[] = [];
const expectedError = new Error("Task failed intentionally");

// Register global init hook
lifecycleHooks.registerGlobalInitHook({
id: "test-init",
fn: async () => {
executionOrder.push("init");
return {
foo: "bar",
};
},
});

// Register failure hook
lifecycleHooks.registerGlobalFailureHook({
id: "global-failure",
fn: async () => {
executionOrder.push("failure");
},
});

// Register complete hook
lifecycleHooks.registerGlobalCompleteHook({
id: "global-complete",
fn: async () => {
executionOrder.push("complete");
},
});

// Register cleanup hooks
lifecycleHooks.registerGlobalCleanupHook({
id: "global-cleanup",
fn: async ({ init }) => {
executionOrder.push("global-cleanup");
// Verify we have access to init data even after failure
expect(init).toEqual({ foo: "bar" });
},
});

lifecycleHooks.registerTaskCleanupHook("test-task", {
id: "task-cleanup",
fn: async ({ init }) => {
executionOrder.push("task-cleanup");
// Verify we have access to init data even after failure
expect(init).toEqual({ foo: "bar" });
},
});

const task = {
id: "test-task",
fns: {
run: async () => {
executionOrder.push("run");
throw expectedError;
},
},
};

const result = await executeTask(task, { test: "data" });

// Verify cleanup hooks are called even after failure
expect(executionOrder).toEqual([
"init",
"run",
"failure",
"complete",
"global-cleanup",
"task-cleanup",
]);

// Verify the error result
expect(result).toEqual({
result: {
ok: false,
id: "test-run-id",
error: {
type: "BUILT_IN_ERROR",
message: "Task failed intentionally",
name: "Error",
stackTrace: expect.any(String),
},
skippedRetrying: false,
},
});
});
});

function executeTask(task: TaskMetadataWithFunctions, payload: any) {
Expand Down