Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
7ec44c2
WIP batch trigger v2
ericallam Nov 22, 2024
e9f2f8c
Fix for the DateField being one month out… getUTCMonth() is zero inde…
matt-aitken Nov 20, 2024
5ee6b8e
Added a custom date range filter
matt-aitken Nov 21, 2024
b4ed220
Deal with closing the custom date range
matt-aitken Nov 21, 2024
bd14761
Child runs filter
matt-aitken Nov 21, 2024
6097160
Fix for the clear button untoggling the child runs
matt-aitken Nov 21, 2024
4aa5845
WIP batchTriggerV2
ericallam Nov 25, 2024
d69abd1
Finished removing rate limit from the webapp
ericallam Nov 25, 2024
a664859
Added an index TaskRun to make useRealtimeBatch performant
ericallam Nov 26, 2024
a0dfd2d
Renamed the period filter labels to be “Last X mins”
matt-aitken Nov 25, 2024
daec8f4
Denormalize background worker columns into TaskRun
ericallam Nov 26, 2024
aa82f8d
Use the runTags column on TaskRun
matt-aitken Nov 25, 2024
976b275
Add TaskRun ("projectId", "id" DESC) index
matt-aitken Nov 26, 2024
1674d33
Improved the v2 batch trigger endpoint to process items in parallel a…
ericallam Nov 26, 2024
e8cfe88
Added a runId filter, and WIP for batchId filter
matt-aitken Nov 26, 2024
b0c3c41
WIP triggerAll
ericallam Nov 26, 2024
4b181fa
Add new batch methods for triggering multiple different tasks in a si…
ericallam Nov 27, 2024
fd18784
Disabled switch styling
matt-aitken Nov 26, 2024
26980f0
Batch filtering, force child runs to show if filtering by batch/run
matt-aitken Nov 26, 2024
492fb79
Added schedule ID filtering
matt-aitken Nov 26, 2024
0060a64
Force child runs to show when filtering by scheduleId, for consistency
matt-aitken Nov 26, 2024
f1919b1
realtime: allow setting enabled: false on useApiClient
ericallam Nov 27, 2024
1b34daf
Batches page
matt-aitken Nov 27, 2024
2ebc4e5
Always complete batches, not only batchTriggerAndWait in deployed tasks
ericallam Nov 27, 2024
658f400
Add batch.retrieve and allow filtering by batch in runs.list
ericallam Nov 27, 2024
ff65310
Renamed pending to “In progress”
matt-aitken Nov 27, 2024
5f5cbe2
Tidied up the table a bit
matt-aitken Nov 27, 2024
e60b75b
Deal with old batches: “Legacy batch”
matt-aitken Nov 27, 2024
eac6aa7
Added the Batch to the run inspector
matt-aitken Nov 27, 2024
4b2932a
Fixed the migration that created the new idempotency key index on Bat…
ericallam Nov 27, 2024
4070c6a
Fixed the name of the idempotencyKeyExpiresAt option and now default …
ericallam Nov 27, 2024
f97d55e
Timezone fix: wrong month in Usage page dropdown
matt-aitken Nov 27, 2024
78e3a56
The DateField now defaults to local time, but can be overriden to use…
matt-aitken Nov 27, 2024
f95569a
Don’t allow the task icon to get squished
matt-aitken Nov 27, 2024
b7599ed
BatchFilters removed unused imports
matt-aitken Nov 27, 2024
99a3137
In the batch filtering, use `id` instead of `batchId` in the URL
matt-aitken Nov 27, 2024
681e75a
BatchFilters: we don’t need a child tasks hidden input field
matt-aitken Nov 27, 2024
bc07b52
Creates some common filter components/functions
matt-aitken Nov 27, 2024
b822915
Fix for batchVersion check when filtering by batch status
matt-aitken Nov 27, 2024
4c5fd2a
Add additional logging around telemetry and more attributes for trigg…
ericallam Nov 27, 2024
fc821ad
Show clear button for specific id filters
matt-aitken Nov 27, 2024
d5c1bda
Batch list: only allow environments that are part of this project
matt-aitken Nov 27, 2024
69f3dca
Unnecessary optional chain
matt-aitken Nov 27, 2024
153af67
Add JSDocs
ericallam Nov 28, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add new batch methods for triggering multiple different tasks in a si…
…ngle batch
  • Loading branch information
ericallam committed Nov 27, 2024
commit 4b181fa953de25999c1cfe31012023dffc3ea94e
80 changes: 80 additions & 0 deletions .changeset/perfect-onions-call.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
---
"@trigger.dev/sdk": patch
"trigger.dev": patch
"@trigger.dev/core": patch
---

Added new batch.trigger and batch.triggerByTask methods that allows triggering multiple different tasks in a single batch:

```ts
import { batch } from '@trigger.dev/sdk/v3';
import type { myTask1, myTask2 } from './trigger/tasks';

// Somewhere in your backend code
const response = await batch.trigger<typeof myTask1 | typeof myTask2>([
{ id: 'task1', payload: { foo: 'bar' } },
{ id: 'task2', payload: { baz: 'qux' } },
]);

for (const run of response.runs) {
if (run.ok) {
console.log(run.output);
} else {
console.error(run.error);
}
}
```

Or if you are inside of a task, you can use `triggerByTask`:

```ts
import { batch, task, runs } from '@trigger.dev/sdk/v3';

export const myParentTask = task({
id: 'myParentTask',
run: async () => {
const response = await batch.triggerByTask([
{ task: myTask1, payload: { foo: 'bar' } },
{ task: myTask2, payload: { baz: 'qux' } },
]);

const run1 = await runs.retrieve(response.runs[0]);
console.log(run1.output) // typed as { foo: string }

const run2 = await runs.retrieve(response.runs[1]);
console.log(run2.output) // typed as { baz: string }

const response2 = await batch.triggerByTaskAndWait([
{ task: myTask1, payload: { foo: 'bar' } },
{ task: myTask2, payload: { baz: 'qux' } },
]);

if (response2.runs[0].ok) {
console.log(response2.runs[0].output) // typed as { foo: string }
}

if (response2.runs[1].ok) {
console.log(response2.runs[1].output) // typed as { baz: string }
}
}
});

export const myTask1 = task({
id: 'myTask1',
run: async () => {
return {
foo: 'bar'
}
}
});

export const myTask2 = task({
id: 'myTask2',
run: async () => {
return {
baz: 'qux'
}
}
});

```
2 changes: 2 additions & 0 deletions apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1038,13 +1038,15 @@ class SharedQueueTasks {
id: attempt.taskRun.friendlyId,
output: attempt.output ?? undefined,
outputType: attempt.outputType,
taskIdentifier: attempt.taskRun.taskIdentifier,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ensure attempt.taskRun is defined before accessing taskIdentifier

In both instances, attempt.taskRun.taskIdentifier is accessed without checking if attempt.taskRun is defined. If attempt.taskRun is undefined, this could result in a runtime error. Consider adding null checks or using optional chaining.

Modify the code to safely access taskIdentifier:

-            taskIdentifier: attempt.taskRun.taskIdentifier,
+            taskIdentifier: attempt.taskRun?.taskIdentifier,

Also applies to: 1049-1049

};
return success;
} else {
const failure: TaskRunFailedExecutionResult = {
ok,
id: attempt.taskRun.friendlyId,
error: attempt.error as TaskRunError,
taskIdentifier: attempt.taskRun.taskIdentifier,
};
return failure;
}
Expand Down
1 change: 1 addition & 0 deletions packages/cli-v3/src/dev/backgroundWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,7 @@ export class BackgroundWorker {
ok: false,
retry: undefined,
error: TaskRunProcess.parseExecuteError(e),
taskIdentifier: payload.execution.task.id,
};
}
}
Expand Down
7 changes: 7 additions & 0 deletions packages/cli-v3/src/entryPoints/deploy-run-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ const zodIpc = new ZodIpcConnection({
usage: {
durationMs: 0,
},
taskIdentifier: execution.task.id,
},
});

Expand Down Expand Up @@ -246,6 +247,7 @@ const zodIpc = new ZodIpcConnection({
usage: {
durationMs: 0,
},
taskIdentifier: execution.task.id,
},
});

Expand Down Expand Up @@ -277,6 +279,7 @@ const zodIpc = new ZodIpcConnection({
usage: {
durationMs: 0,
},
taskIdentifier: execution.task.id,
},
});

Expand All @@ -303,6 +306,7 @@ const zodIpc = new ZodIpcConnection({
usage: {
durationMs: 0,
},
taskIdentifier: execution.task.id,
},
});

Expand Down Expand Up @@ -357,6 +361,7 @@ const zodIpc = new ZodIpcConnection({
usage: {
durationMs: usageSample.cpuTime,
},
taskIdentifier: execution.task.id,
},
});
}
Expand All @@ -380,6 +385,7 @@ const zodIpc = new ZodIpcConnection({
usage: {
durationMs: usageSample.cpuTime,
},
taskIdentifier: execution.task.id,
},
});
}
Expand All @@ -402,6 +408,7 @@ const zodIpc = new ZodIpcConnection({
usage: {
durationMs: 0,
},
taskIdentifier: execution.task.id,
},
});
}
Expand Down
6 changes: 6 additions & 0 deletions packages/cli-v3/src/entryPoints/dev-run-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ const zodIpc = new ZodIpcConnection({
usage: {
durationMs: 0,
},
taskIdentifier: execution.task.id,
},
});

Expand Down Expand Up @@ -222,6 +223,7 @@ const zodIpc = new ZodIpcConnection({
usage: {
durationMs: 0,
},
taskIdentifier: execution.task.id,
},
});

Expand All @@ -247,6 +249,7 @@ const zodIpc = new ZodIpcConnection({
usage: {
durationMs: 0,
},
taskIdentifier: execution.task.id,
},
});

Expand All @@ -273,6 +276,7 @@ const zodIpc = new ZodIpcConnection({
usage: {
durationMs: 0,
},
taskIdentifier: execution.task.id,
},
});

Expand Down Expand Up @@ -324,6 +328,7 @@ const zodIpc = new ZodIpcConnection({
usage: {
durationMs: usageSample.cpuTime,
},
taskIdentifier: execution.task.id,
},
});
}
Expand All @@ -347,6 +352,7 @@ const zodIpc = new ZodIpcConnection({
usage: {
durationMs: usageSample.cpuTime,
},
taskIdentifier: execution.task.id,
},
});
}
Expand Down
3 changes: 0 additions & 3 deletions packages/core/src/package.json

This file was deleted.

122 changes: 109 additions & 13 deletions packages/core/src/v3/types/tasks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,18 +49,24 @@ export class SubtaskUnwrapError extends Error {
}
}

export class TaskRunPromise<T> extends Promise<TaskRunResult<T>> {
export class TaskRunPromise<TIdentifier extends string, TOutput> extends Promise<
TaskRunResult<TIdentifier, TOutput>
> {
constructor(
executor: (
resolve: (value: TaskRunResult<T> | PromiseLike<TaskRunResult<T>>) => void,
resolve: (
value:
| TaskRunResult<TIdentifier, TOutput>
| PromiseLike<TaskRunResult<TIdentifier, TOutput>>
) => void,
reject: (reason?: any) => void
) => void,
private readonly taskId: string
private readonly taskId: TIdentifier
) {
super(executor);
}

unwrap(): Promise<T> {
unwrap(): Promise<TOutput> {
return this.then((result) => {
if (result.ok) {
return result.output;
Expand Down Expand Up @@ -415,38 +421,101 @@ export type RunHandleTaskIdentifier<TRunHandle> = TRunHandle extends RunHandle<
? TTaskIdentifier
: never;

export type TaskRunResult<TOutput = any> =
export type TaskRunResult<TIdentifier extends string, TOutput = any> =
| {
ok: true;
id: string;
taskIdentifier: string;
taskIdentifier: TIdentifier;
output: TOutput;
}
| {
ok: false;
id: string;
taskIdentifier: string;
taskIdentifier: TIdentifier;
error: unknown;
};

export type BatchResult<TOutput = any> = {
export type AnyTaskRunResult = TaskRunResult<string, any>;

export type TaskRunResultFromTask<TTask extends AnyTask> = TTask extends Task<
infer TIdentifier,
any,
infer TOutput
>
? TaskRunResult<TIdentifier, TOutput>
: never;

export type BatchResult<TIdentifier extends string, TOutput = any> = {
id: string;
runs: TaskRunResult<TIdentifier, TOutput>[];
};

export type BatchByIdResult<TTask extends AnyTask> = {
id: string;
runs: Array<TaskRunResultFromTask<TTask>>;
};

export type BatchByTaskResult<TTasks extends readonly AnyTask[]> = {
id: string;
runs: TaskRunResult<TOutput>[];
runs: {
[K in keyof TTasks]: TaskRunResultFromTask<TTasks[K]>;
};
};

/**
* A BatchRunHandle can be used to retrieve the runs of a batch trigger in a typesafe manner.
*/
// export type BatchTasksRunHandle<TTasks extends readonly AnyTask[]> = BrandedRun<
// {
// batchId: string;
// isCached: boolean;
// idempotencyKey?: string;
// runs: {
// [K in keyof TTasks]: BatchedRunHandle<
// TaskIdentifier<TTasks[K]>,
// TaskPayload<TTasks[K]>,
// TaskOutput<TTasks[K]>
// >;
// };
// publicAccessToken: string;
// },
// any,
// any
// >;

export type BatchTasksResult<TTasks extends readonly AnyTask[]> = BatchTasksRunHandle<TTasks>;

export type BatchItem<TInput> = { payload: TInput; options?: TriggerOptions };

export type BatchTriggerAndWaitItem<TInput> = {
payload: TInput;
options?: TriggerAndWaitOptions;
};

export type BatchAllItem<TRunTypes extends AnyRunTypes> = {
task: TRunTypes["taskIdentifier"];
export type BatchByIdItem<TRunTypes extends AnyRunTypes> = {
id: TRunTypes["taskIdentifier"];
payload: TRunTypes["payload"];
options?: TriggerOptions;
};

export type BatchByIdAndWaitItem<TRunTypes extends AnyRunTypes> = {
id: TRunTypes["taskIdentifier"];
payload: TRunTypes["payload"];
options?: TriggerAndWaitOptions;
};

export type BatchByTaskItem<TTask extends AnyTask> = {
task: TTask;
payload: TaskPayload<TTask>;
options?: TriggerOptions;
};

export type BatchByTaskAndWaitItem<TTask extends AnyTask> = {
task: TTask;
payload: TaskPayload<TTask>;
options?: TriggerAndWaitOptions;
};

export interface Task<TIdentifier extends string, TInput = void, TOutput = any> {
/**
* The id of the task.
Expand Down Expand Up @@ -497,7 +566,10 @@ export interface Task<TIdentifier extends string, TInput = void, TOutput = any>
* }
* ```
*/
triggerAndWait: (payload: TInput, options?: TriggerAndWaitOptions) => TaskRunPromise<TOutput>;
triggerAndWait: (
payload: TInput,
options?: TriggerAndWaitOptions
) => TaskRunPromise<TIdentifier, TOutput>;
Comment on lines +569 to +572
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Breaking change: Removal of idempotencyKey from triggerAndWait options impacts existing code

The verification reveals this is indeed a breaking change:

  • Multiple examples in documentation (docs/idempotency.mdx, docs/triggering.mdx, docs/how-it-works.mdx) show active usage of idempotencyKey with triggerAndWait
  • The codebase contains several examples in CHANGELOG.md files showing this as an established pattern
  • The type definition shows TriggerAndWaitOptions explicitly omits idempotencyKey from TriggerOptions, where it was previously available
  • Real usage examples found in packages/core/src/v3/idempotencyKeys.ts demonstrate this is a core feature being used

This change will break existing code that relies on idempotency control in triggerAndWait operations.

🔗 Analysis chain

Verify impact of removing idempotencyKey from triggerAndWait options.

The removal of idempotencyKey from TriggerAndWaitOptions is a breaking change that might affect existing code.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Find usages of triggerAndWait with idempotencyKey to assess impact

# Search for triggerAndWait calls with idempotencyKey
rg -A 3 'triggerAndWait.*idempotencyKey'

# Search for batchTriggerAndWait calls with idempotencyKey
rg -A 3 'batchTriggerAndWait.*idempotencyKey'

Length of output: 4171


Script:

#!/bin/bash
# Let's check the git history to see if this is a new or existing feature
git log -p packages/core/src/v3/types/tasks.ts | grep -A 5 -B 5 'TriggerAndWaitOptions'

# Also check the current definition of TriggerAndWaitOptions
rg -A 10 'export.*TriggerAndWaitOptions' packages/core/src/v3/types/tasks.ts

# And check for any interface that extends TriggerAndWaitOptions
ast-grep --pattern 'interface $_ extends TriggerAndWaitOptions'

Length of output: 573


Script:

#!/bin/bash
# Let's check the definition of TriggerOptions to understand what's being omitted
rg -A 10 'export.*TriggerOptions\s*=' packages/core/src/v3/types/tasks.ts

# Also check if there are any recent changes to TriggerAndWaitOptions in the changelog
rg -B 5 'TriggerAndWaitOptions' packages/core/CHANGELOG.md

Length of output: 979


/**
* Batch trigger multiple task runs with the given payloads, and wait for the results. Returns the results of the task runs.
Expand All @@ -521,7 +593,7 @@ export interface Task<TIdentifier extends string, TInput = void, TOutput = any>
*/
batchTriggerAndWait: (
items: Array<BatchTriggerAndWaitItem<TInput>>
) => Promise<BatchResult<TOutput>>;
) => Promise<BatchResult<TIdentifier, TOutput>>;
}

export interface TaskWithSchema<
Expand Down Expand Up @@ -758,3 +830,27 @@ export type RunHandleFromTypes<TRunTypes extends AnyRunTypes> = RunHandle<
export type BatchRunHandleFromTypes<TRunTypes extends AnyRunTypes> = TRunTypes extends AnyRunTypes
? BatchRunHandle<TRunTypes["taskIdentifier"], TRunTypes["payload"], TRunTypes["output"]>
: never;

/**
* A BatchRunHandle can be used to retrieve the runs of a batch trigger in a typesafe manner.
*/
export type BatchTasksRunHandle<TTasks extends readonly AnyTask[]> = BrandedRun<
{
batchId: string;
isCached: boolean;
idempotencyKey?: string;
runs: {
[K in keyof TTasks]: BatchedRunHandle<
TaskIdentifier<TTasks[K]>,
TaskPayload<TTasks[K]>,
TaskOutput<TTasks[K]>
>;
};
publicAccessToken: string;
},
any,
any
>;

export type BatchTasksRunHandleFromTypes<TTasks extends readonly AnyTask[]> =
BatchTasksRunHandle<TTasks>;
Loading