Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Add queue size limit guard on triggering tasks
  • Loading branch information
ericallam committed Oct 3, 2024
commit 62a0a287483848cc6861459b823179d55b92a079
3 changes: 3 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,9 @@ const EnvironmentSchema = z.object({
TASK_PAYLOAD_OFFLOAD_THRESHOLD: z.coerce.number().int().default(524_288), // 512KB
TASK_PAYLOAD_MAXIMUM_SIZE: z.coerce.number().int().default(3_145_728), // 3MB
TASK_RUN_METADATA_MAXIMUM_SIZE: z.coerce.number().int().default(4_096), // 4KB

MAXIMUM_DEV_QUEUE_SIZE: z.coerce.number().int().optional(),
MAXIMUM_DEPLOYED_QUEUE_SIZE: z.coerce.number().int().optional(),
});

export type Environment = z.infer<typeof EnvironmentSchema>;
Expand Down
4 changes: 4 additions & 0 deletions apps/webapp/app/v3/marqs/index.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ export class MarQS {
return this.redis.zcard(this.keys.queueKey(env, queue, concurrencyKey));
}

public async lengthOfEnvQueue(env: AuthenticatedEnvironment) {
return this.redis.zcard(this.keys.envQueueKey(env));
}

public async oldestMessageInQueue(
env: AuthenticatedEnvironment,
queue: string,
Expand Down
4 changes: 4 additions & 0 deletions apps/webapp/app/v3/marqs/marqsKeyProducer.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ export class MarQSShortKeyProducer implements MarQSKeyProducer {
return `${constants.ENV_PART}:${envId}:${constants.QUEUE_PART}`;
}

envQueueKey(env: AuthenticatedEnvironment): string {
return [constants.ENV_PART, this.shortId(env.id), constants.QUEUE_PART].join(":");
}

messageKey(messageId: string) {
return `${constants.MESSAGE_PART}:${messageId}`;
}
Expand Down
1 change: 1 addition & 0 deletions apps/webapp/app/v3/marqs/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ export interface MarQSKeyProducer {
envConcurrencyLimitKey(env: AuthenticatedEnvironment): string;
orgConcurrencyLimitKey(env: AuthenticatedEnvironment): string;
queueKey(env: AuthenticatedEnvironment, queue: string, concurrencyKey?: string): string;
envQueueKey(env: AuthenticatedEnvironment): string;
envSharedQueueKey(env: AuthenticatedEnvironment): string;
sharedQueueKey(): string;
sharedQueueScanPattern(): string;
Expand Down
40 changes: 40 additions & 0 deletions apps/webapp/app/v3/queueSizeLimits.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { env } from "~/env.server";
import { MarQS } from "./marqs/index.server";

export type QueueSizeGuardResult = {
isWithinLimits: boolean;
maximumSize?: number;
queueSize?: number;
};

export async function guardQueueSizeLimitsForEnv(
environment: AuthenticatedEnvironment,
marqs?: MarQS
): Promise<QueueSizeGuardResult> {
const maximumSize = getMaximumSizeForEnvironment(environment);

if (typeof maximumSize === "undefined") {
return { isWithinLimits: true };
}

if (!marqs) {
return { isWithinLimits: true, maximumSize };
}

const queueSize = await marqs.lengthOfEnvQueue(environment);

return {
isWithinLimits: queueSize < maximumSize,
maximumSize,
queueSize,
};
}

function getMaximumSizeForEnvironment(environment: AuthenticatedEnvironment): number | undefined {
if (environment.type === "DEVELOPMENT") {
return environment.organization.maximumDevQueueSize ?? env.MAXIMUM_DEV_QUEUE_SIZE;
} else {
return environment.organization.maximumDeployedQueueSize ?? env.MAXIMUM_DEPLOYED_QUEUE_SIZE;
}
}
19 changes: 19 additions & 0 deletions apps/webapp/app/v3/services/triggerTask.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import { createTag, MAX_TAGS_PER_RUN } from "~/models/taskRunTag.server";
import { findCurrentWorkerFromEnvironment } from "../models/workerDeployment.server";
import { handleMetadataPacket } from "~/utils/packets";
import { ExpireEnqueuedRunService } from "./expireEnqueuedRun.server";
import { guardQueueSizeLimitsForEnv } from "../queueSizeLimits.server";

export type TriggerTaskServiceOptions = {
idempotencyKey?: string;
Expand Down Expand Up @@ -82,6 +83,24 @@ export class TriggerTaskService extends BaseService {
}
}

const queueSizeGuard = await guardQueueSizeLimitsForEnv(environment, marqs);

logger.debug("Queue size guard result", {
queueSizeGuard,
environment: {
id: environment.id,
type: environment.type,
organization: environment.organization,
project: environment.project,
},
});

if (!queueSizeGuard.isWithinLimits) {
throw new ServiceValidationError(
`Cannot trigger ${taskId} as the queue size limit for this environment has been reached. The maximum size is ${queueSizeGuard.maximumSize}`
);
}

if (
body.options?.tags &&
typeof body.options.tags !== "string" &&
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-- AlterTable
ALTER TABLE "Organization" ADD COLUMN "maximumDeployedQueueSize" INTEGER,
ADD COLUMN "maximumDevQueueSize" INTEGER;
3 changes: 3 additions & 0 deletions packages/database/prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ model Organization {
/// This is deprecated and will be removed in the future
maximumSchedulesLimit Int @default(5)

maximumDevQueueSize Int?
maximumDeployedQueueSize Int?

createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
deletedAt DateTime?
Expand Down
20 changes: 20 additions & 0 deletions references/v3-catalog/src/trigger/simple.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import "server-only";
import { logger, SubtaskUnwrapError, task, tasks, wait } from "@trigger.dev/sdk/v3";
import { traceAsync } from "@/telemetry.js";
import { HeaderGenerator } from "header-generator";
import { setTimeout as setTimeoutP } from "node:timers/promises";

let headerGenerator = new HeaderGenerator({
browsers: [{ name: "firefox", minVersion: 90 }, { name: "chrome", minVersion: 110 }, "safari"],
Expand Down Expand Up @@ -215,3 +216,22 @@ export const retryTask = task({
throw new Error("This task will always fail");
},
});

export const maximumQueueDepthParent = task({
id: "maximum-queue-depth-parent",
run: async (payload: any) => {
await maximumQueueDepthChild.trigger({});
await maximumQueueDepthChild.trigger({});
await maximumQueueDepthChild.trigger({});
},
});

export const maximumQueueDepthChild = task({
id: "maximum-queue-depth-child",
queue: {
concurrencyLimit: 1,
},
run: async (payload: any) => {
await setTimeoutP(10_000);
},
});
2 changes: 1 addition & 1 deletion references/v3-catalog/trigger.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ export default defineConfig({
instrumentations: [new OpenAIInstrumentation()],
additionalFiles: ["wrangler/wrangler.toml"],
retries: {
enabledInDev: true,
enabledInDev: false,
default: {
maxAttempts: 10,
minTimeoutInMs: 5_000,
Expand Down