diff --git a/.changeset/salty-bobcats-sort.md b/.changeset/salty-bobcats-sort.md new file mode 100644 index 0000000000..1475ec8d2a --- /dev/null +++ b/.changeset/salty-bobcats-sort.md @@ -0,0 +1,6 @@ +--- +"@workflow/core": patch +"@workflow/next": patch +--- + +Add Next.js pages router entries handling diff --git a/packages/core/e2e/e2e.test.ts b/packages/core/e2e/e2e.test.ts index aee736050c..25a3fac7bb 100644 --- a/packages/core/e2e/e2e.test.ts +++ b/packages/core/e2e/e2e.test.ts @@ -48,9 +48,13 @@ function writeE2EMetadata() { async function triggerWorkflow( workflow: string | { workflowFile: string; workflowFn: string }, - args: any[] + args: any[], + options?: { usePagesRouter?: boolean } ): Promise<{ runId: string }> { - const url = new URL('/api/trigger', deploymentUrl); + const endpoint = options?.usePagesRouter + ? '/api/trigger-pages' + : '/api/trigger'; + const url = new URL(endpoint, deploymentUrl); const workflowFn = typeof workflow === 'string' ? workflow : workflow.workflowFn; const workflowFile = @@ -1292,4 +1296,46 @@ describe('e2e', () => { expect(runData.output).toBe(300); } ); + + // ==================== PAGES ROUTER TESTS ==================== + // Tests for Next.js Pages Router API endpoint (only runs for nextjs-turbopack and nextjs-webpack) + const isNextJsApp = + process.env.APP_NAME === 'nextjs-turbopack' || + process.env.APP_NAME === 'nextjs-webpack'; + + describe.skipIf(!isNextJsApp)('pages router', () => { + test('addTenWorkflow via pages router', { timeout: 60_000 }, async () => { + const run = await triggerWorkflow( + { + workflowFile: 'workflows/99_e2e.ts', + workflowFn: 'addTenWorkflow', + }, + [123], + { usePagesRouter: true } + ); + const returnValue = await getWorkflowReturnValue(run.runId); + expect(returnValue).toBe(133); + }); + + test( + 'promiseAllWorkflow via pages router', + { timeout: 60_000 }, + async () => { + const run = await triggerWorkflow('promiseAllWorkflow', [], { + usePagesRouter: true, + }); + const returnValue = await getWorkflowReturnValue(run.runId); + expect(returnValue).toBe('ABC'); + } + ); + + test('sleepingWorkflow via pages router', { timeout: 60_000 }, async () => { + const run = await triggerWorkflow('sleepingWorkflow', [], { + usePagesRouter: true, + }); + const returnValue = await getWorkflowReturnValue(run.runId); + expect(returnValue.startTime).toBeLessThan(returnValue.endTime); + expect(returnValue.endTime - returnValue.startTime).toBeGreaterThan(9999); + }); + }); }); diff --git a/packages/next/src/builder.ts b/packages/next/src/builder.ts index 5ec2ffe5a0..ebe0156345 100644 --- a/packages/next/src/builder.ts +++ b/packages/next/src/builder.ts @@ -350,12 +350,18 @@ export async function getNextBuilder() { protected async getInputFiles(): Promise { const inputFiles = await super.getInputFiles(); - return inputFiles.filter((item) => - // non-exact pattern match to try to narrow - // down to just app route entrypoints, this will - // not be valid when pages router support is added - item.match(/[/\\](route|page|layout)\./) - ); + return inputFiles.filter((item) => { + // Match App Router entrypoints: route.ts, page.ts, layout.ts in app/ or src/app/ directories + // Matches: /app/page.ts, /app/dashboard/page.ts, /src/app/route.ts, etc. + if (item.match(/(^|.*[/\\])(app|src[/\\]app)([/\\](route|page|layout)\.|[/\\].*[/\\](route|page|layout)\.)/)) { + return true; + } + // Match Pages Router entrypoints: files in pages/ or src/pages/ + if (item.match(/[/\\](pages|src[/\\]pages)[/\\]/)) { + return true; + } + return false; + }); } private async writeFunctionsConfig(outputDir: string) { @@ -448,28 +454,52 @@ export async function getNextBuilder() { private async findAppDirectory(): Promise { const appDir = resolve(this.config.workingDir, 'app'); const srcAppDir = resolve(this.config.workingDir, 'src/app'); + const pagesDir = resolve(this.config.workingDir, 'pages'); + const srcPagesDir = resolve(this.config.workingDir, 'src/pages'); - try { - await access(appDir, constants.F_OK); - const appStats = await stat(appDir); - if (!appStats.isDirectory()) { - throw new Error(`Path exists but is not a directory: ${appDir}`); - } - return appDir; - } catch { + // Helper to check if a path exists and is a directory + const isDirectory = async (path: string): Promise => { try { - await access(srcAppDir, constants.F_OK); - const srcAppStats = await stat(srcAppDir); - if (!srcAppStats.isDirectory()) { - throw new Error(`Path exists but is not a directory: ${srcAppDir}`); + await access(path, constants.F_OK); + const stats = await stat(path); + if (!stats.isDirectory()) { + throw new Error(`Path exists but is not a directory: ${path}`); } - return srcAppDir; - } catch { - throw new Error( - 'Could not find Next.js app directory. Expected either "app" or "src/app" to exist.' - ); + return true; + } catch (e) { + if (e instanceof Error && e.message.includes('not a directory')) { + throw e; + } + return false; } + }; + + // Check if app directory exists + if (await isDirectory(appDir)) { + return appDir; + } + + // Check if src/app directory exists + if (await isDirectory(srcAppDir)) { + return srcAppDir; + } + + // If no app directory exists, check for pages directory and create app next to it + if (await isDirectory(pagesDir)) { + // Create app directory next to pages directory + await mkdir(appDir, { recursive: true }); + return appDir; + } + + if (await isDirectory(srcPagesDir)) { + // Create src/app directory next to src/pages directory + await mkdir(srcAppDir, { recursive: true }); + return srcAppDir; } + + throw new Error( + 'Could not find Next.js app or pages directory. Expected one of: "app", "src/app", "pages", or "src/pages" to exist.' + ); } } diff --git a/workbench/nextjs-turbopack/pages/api/trigger-pages.ts b/workbench/nextjs-turbopack/pages/api/trigger-pages.ts new file mode 100644 index 0000000000..34502caff3 --- /dev/null +++ b/workbench/nextjs-turbopack/pages/api/trigger-pages.ts @@ -0,0 +1,181 @@ +import type { NextApiRequest, NextApiResponse } from 'next'; +import { getRun, start } from 'workflow/api'; +import { + WorkflowRunFailedError, + WorkflowRunNotCompletedError, +} from 'workflow/internal/errors'; +import { hydrateWorkflowArguments } from 'workflow/internal/serialization'; +import { allWorkflows } from '@/_workflows'; + +export default async function handler( + req: NextApiRequest, + res: NextApiResponse +) { + if (req.method === 'POST') { + return handlePost(req, res); + } else if (req.method === 'GET') { + return handleGet(req, res); + } else { + res.setHeader('Allow', ['GET', 'POST']); + return res.status(405).end(`Method ${req.method} Not Allowed`); + } +} + +async function handlePost(req: NextApiRequest, res: NextApiResponse) { + const workflowFile = + (req.query.workflowFile as string) || 'workflows/99_e2e.ts'; + if (!workflowFile) { + return res.status(400).send('No workflowFile query parameter provided'); + } + const workflows = allWorkflows[workflowFile as keyof typeof allWorkflows]; + if (!workflows) { + return res.status(400).send(`Workflow file "${workflowFile}" not found`); + } + + const workflowFn = (req.query.workflowFn as string) || 'simple'; + if (!workflowFn) { + return res.status(400).send('No workflow query parameter provided'); + } + + // Handle static method lookups (e.g., "Calculator.calculate") + let workflow: unknown; + if (workflowFn.includes('.')) { + const [className, methodName] = workflowFn.split('.'); + const cls = workflows[className as keyof typeof workflows]; + if (cls && typeof cls === 'function') { + workflow = (cls as Record)[methodName]; + } + } else { + workflow = workflows[workflowFn as keyof typeof workflows]; + } + if (!workflow) { + return res.status(400).send(`Workflow "${workflowFn}" not found`); + } + + let args: any[] = []; + + // Args from query string + const argsParam = req.query.args as string | undefined; + if (argsParam) { + args = argsParam.split(',').map((arg) => { + const num = parseFloat(arg); + return Number.isNaN(num) ? arg.trim() : num; + }); + } else { + // Args from body + args = hydrateWorkflowArguments(JSON.parse(req.body), globalThis); + } + console.log(`Starting "${workflowFn}" workflow with args: ${args}`); + + try { + const run = await start(workflow as any, args as any); + console.log('Run', run.runId); + return res.status(200).json(run); + } catch (err) { + console.error(`Failed to start!!`, err); + throw err; + } +} + +async function handleGet(req: NextApiRequest, res: NextApiResponse) { + const runId = req.query.runId as string | undefined; + if (!runId) { + return res.status(400).send('No runId provided'); + } + + const outputStreamParam = req.query['output-stream'] as string | undefined; + if (outputStreamParam) { + const namespace = outputStreamParam === '1' ? undefined : outputStreamParam; + const run = getRun(runId); + const stream = run.getReadable({ + namespace, + }); + + res.setHeader('Content-Type', 'application/octet-stream'); + + const reader = stream.getReader(); + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + const data = + value instanceof Uint8Array + ? { data: Buffer.from(value).toString('base64') } + : value; + res.write(`${JSON.stringify(data)}\n`); + } + } finally { + reader.releaseLock(); + } + return res.end(); + } + + try { + const run = getRun(runId); + const returnValue = await run.returnValue; + console.log('Return value:', returnValue); + + // Include run metadata in headers + const [createdAt, startedAt, completedAt] = await Promise.all([ + run.createdAt, + run.startedAt, + run.completedAt, + ]); + + res.setHeader('X-Workflow-Run-Created-At', createdAt?.toISOString() || ''); + res.setHeader('X-Workflow-Run-Started-At', startedAt?.toISOString() || ''); + res.setHeader( + 'X-Workflow-Run-Completed-At', + completedAt?.toISOString() || '' + ); + + if (returnValue instanceof ReadableStream) { + res.setHeader('Content-Type', 'application/octet-stream'); + const reader = returnValue.getReader(); + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + res.write(value); + } + } finally { + reader.releaseLock(); + } + return res.end(); + } + + return res.status(200).json(returnValue); + } catch (error) { + if (error instanceof Error) { + if (WorkflowRunNotCompletedError.is(error)) { + return res.status(202).json({ + ...error, + name: error.name, + message: error.message, + }); + } + + if (WorkflowRunFailedError.is(error)) { + const cause = error.cause; + return res.status(400).json({ + ...error, + name: error.name, + message: error.message, + cause: { + message: cause.message, + stack: cause.stack, + code: cause.code, + }, + }); + } + } + + console.error( + 'Unexpected error while getting workflow return value:', + error + ); + return res.status(500).json({ + error: 'Internal server error', + }); + } +} diff --git a/workbench/nextjs-webpack/pages/api/trigger-pages.ts b/workbench/nextjs-webpack/pages/api/trigger-pages.ts new file mode 100644 index 0000000000..34502caff3 --- /dev/null +++ b/workbench/nextjs-webpack/pages/api/trigger-pages.ts @@ -0,0 +1,181 @@ +import type { NextApiRequest, NextApiResponse } from 'next'; +import { getRun, start } from 'workflow/api'; +import { + WorkflowRunFailedError, + WorkflowRunNotCompletedError, +} from 'workflow/internal/errors'; +import { hydrateWorkflowArguments } from 'workflow/internal/serialization'; +import { allWorkflows } from '@/_workflows'; + +export default async function handler( + req: NextApiRequest, + res: NextApiResponse +) { + if (req.method === 'POST') { + return handlePost(req, res); + } else if (req.method === 'GET') { + return handleGet(req, res); + } else { + res.setHeader('Allow', ['GET', 'POST']); + return res.status(405).end(`Method ${req.method} Not Allowed`); + } +} + +async function handlePost(req: NextApiRequest, res: NextApiResponse) { + const workflowFile = + (req.query.workflowFile as string) || 'workflows/99_e2e.ts'; + if (!workflowFile) { + return res.status(400).send('No workflowFile query parameter provided'); + } + const workflows = allWorkflows[workflowFile as keyof typeof allWorkflows]; + if (!workflows) { + return res.status(400).send(`Workflow file "${workflowFile}" not found`); + } + + const workflowFn = (req.query.workflowFn as string) || 'simple'; + if (!workflowFn) { + return res.status(400).send('No workflow query parameter provided'); + } + + // Handle static method lookups (e.g., "Calculator.calculate") + let workflow: unknown; + if (workflowFn.includes('.')) { + const [className, methodName] = workflowFn.split('.'); + const cls = workflows[className as keyof typeof workflows]; + if (cls && typeof cls === 'function') { + workflow = (cls as Record)[methodName]; + } + } else { + workflow = workflows[workflowFn as keyof typeof workflows]; + } + if (!workflow) { + return res.status(400).send(`Workflow "${workflowFn}" not found`); + } + + let args: any[] = []; + + // Args from query string + const argsParam = req.query.args as string | undefined; + if (argsParam) { + args = argsParam.split(',').map((arg) => { + const num = parseFloat(arg); + return Number.isNaN(num) ? arg.trim() : num; + }); + } else { + // Args from body + args = hydrateWorkflowArguments(JSON.parse(req.body), globalThis); + } + console.log(`Starting "${workflowFn}" workflow with args: ${args}`); + + try { + const run = await start(workflow as any, args as any); + console.log('Run', run.runId); + return res.status(200).json(run); + } catch (err) { + console.error(`Failed to start!!`, err); + throw err; + } +} + +async function handleGet(req: NextApiRequest, res: NextApiResponse) { + const runId = req.query.runId as string | undefined; + if (!runId) { + return res.status(400).send('No runId provided'); + } + + const outputStreamParam = req.query['output-stream'] as string | undefined; + if (outputStreamParam) { + const namespace = outputStreamParam === '1' ? undefined : outputStreamParam; + const run = getRun(runId); + const stream = run.getReadable({ + namespace, + }); + + res.setHeader('Content-Type', 'application/octet-stream'); + + const reader = stream.getReader(); + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + const data = + value instanceof Uint8Array + ? { data: Buffer.from(value).toString('base64') } + : value; + res.write(`${JSON.stringify(data)}\n`); + } + } finally { + reader.releaseLock(); + } + return res.end(); + } + + try { + const run = getRun(runId); + const returnValue = await run.returnValue; + console.log('Return value:', returnValue); + + // Include run metadata in headers + const [createdAt, startedAt, completedAt] = await Promise.all([ + run.createdAt, + run.startedAt, + run.completedAt, + ]); + + res.setHeader('X-Workflow-Run-Created-At', createdAt?.toISOString() || ''); + res.setHeader('X-Workflow-Run-Started-At', startedAt?.toISOString() || ''); + res.setHeader( + 'X-Workflow-Run-Completed-At', + completedAt?.toISOString() || '' + ); + + if (returnValue instanceof ReadableStream) { + res.setHeader('Content-Type', 'application/octet-stream'); + const reader = returnValue.getReader(); + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + res.write(value); + } + } finally { + reader.releaseLock(); + } + return res.end(); + } + + return res.status(200).json(returnValue); + } catch (error) { + if (error instanceof Error) { + if (WorkflowRunNotCompletedError.is(error)) { + return res.status(202).json({ + ...error, + name: error.name, + message: error.message, + }); + } + + if (WorkflowRunFailedError.is(error)) { + const cause = error.cause; + return res.status(400).json({ + ...error, + name: error.name, + message: error.message, + cause: { + message: cause.message, + stack: cause.stack, + code: cause.code, + }, + }); + } + } + + console.error( + 'Unexpected error while getting workflow return value:', + error + ); + return res.status(500).json({ + error: 'Internal server error', + }); + } +}