Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/salty-bobcats-sort.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@workflow/core": patch
"@workflow/next": patch
---

Add Next.js pages router entries handling
50 changes: 48 additions & 2 deletions packages/core/e2e/e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,13 @@ function writeE2EMetadata() {

async function triggerWorkflow(
workflow: string | { workflowFile: string; workflowFn: string },
args: any[]
args: any[],
options?: { usePagesRouter?: boolean }
): Promise<{ runId: string }> {
const url = new URL('/api/trigger', deploymentUrl);
const endpoint = options?.usePagesRouter
? '/api/trigger-pages'
: '/api/trigger';
const url = new URL(endpoint, deploymentUrl);
const workflowFn =
typeof workflow === 'string' ? workflow : workflow.workflowFn;
const workflowFile =
Expand Down Expand Up @@ -1292,4 +1296,46 @@ describe('e2e', () => {
expect(runData.output).toBe(300);
}
);

// ==================== PAGES ROUTER TESTS ====================
// Tests for Next.js Pages Router API endpoint (only runs for nextjs-turbopack and nextjs-webpack)
const isNextJsApp =
process.env.APP_NAME === 'nextjs-turbopack' ||
process.env.APP_NAME === 'nextjs-webpack';

describe.skipIf(!isNextJsApp)('pages router', () => {
test('addTenWorkflow via pages router', { timeout: 60_000 }, async () => {
const run = await triggerWorkflow(
{
workflowFile: 'workflows/99_e2e.ts',
workflowFn: 'addTenWorkflow',
},
[123],
{ usePagesRouter: true }
);
const returnValue = await getWorkflowReturnValue(run.runId);
expect(returnValue).toBe(133);
});

test(
'promiseAllWorkflow via pages router',
{ timeout: 60_000 },
async () => {
const run = await triggerWorkflow('promiseAllWorkflow', [], {
usePagesRouter: true,
});
const returnValue = await getWorkflowReturnValue(run.runId);
expect(returnValue).toBe('ABC');
}
);

test('sleepingWorkflow via pages router', { timeout: 60_000 }, async () => {
const run = await triggerWorkflow('sleepingWorkflow', [], {
usePagesRouter: true,
});
const returnValue = await getWorkflowReturnValue(run.runId);
expect(returnValue.startTime).toBeLessThan(returnValue.endTime);
expect(returnValue.endTime - returnValue.startTime).toBeGreaterThan(9999);
});
});
});
76 changes: 53 additions & 23 deletions packages/next/src/builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -350,12 +350,18 @@ export async function getNextBuilder() {

protected async getInputFiles(): Promise<string[]> {
const inputFiles = await super.getInputFiles();
return inputFiles.filter((item) =>
// non-exact pattern match to try to narrow
// down to just app route entrypoints, this will
// not be valid when pages router support is added
item.match(/[/\\](route|page|layout)\./)
);
return inputFiles.filter((item) => {
// Match App Router entrypoints: route.ts, page.ts, layout.ts in app/ or src/app/ directories
// Matches: /app/page.ts, /app/dashboard/page.ts, /src/app/route.ts, etc.
if (item.match(/(^|.*[/\\])(app|src[/\\]app)([/\\](route|page|layout)\.|[/\\].*[/\\](route|page|layout)\.)/)) {
return true;
}
// Match Pages Router entrypoints: files in pages/ or src/pages/
if (item.match(/[/\\](pages|src[/\\]pages)[/\\]/)) {
return true;
}
return false;
});
}

private async writeFunctionsConfig(outputDir: string) {
Expand Down Expand Up @@ -448,28 +454,52 @@ export async function getNextBuilder() {
private async findAppDirectory(): Promise<string> {
const appDir = resolve(this.config.workingDir, 'app');
const srcAppDir = resolve(this.config.workingDir, 'src/app');
const pagesDir = resolve(this.config.workingDir, 'pages');
const srcPagesDir = resolve(this.config.workingDir, 'src/pages');

try {
await access(appDir, constants.F_OK);
const appStats = await stat(appDir);
if (!appStats.isDirectory()) {
throw new Error(`Path exists but is not a directory: ${appDir}`);
}
return appDir;
} catch {
// Helper to check if a path exists and is a directory
const isDirectory = async (path: string): Promise<boolean> => {
try {
await access(srcAppDir, constants.F_OK);
const srcAppStats = await stat(srcAppDir);
if (!srcAppStats.isDirectory()) {
throw new Error(`Path exists but is not a directory: ${srcAppDir}`);
await access(path, constants.F_OK);
const stats = await stat(path);
if (!stats.isDirectory()) {
throw new Error(`Path exists but is not a directory: ${path}`);
}
return srcAppDir;
} catch {
throw new Error(
'Could not find Next.js app directory. Expected either "app" or "src/app" to exist.'
);
return true;
} catch (e) {
if (e instanceof Error && e.message.includes('not a directory')) {
throw e;
}
return false;
}
};

// Check if app directory exists
if (await isDirectory(appDir)) {
return appDir;
}

// Check if src/app directory exists
if (await isDirectory(srcAppDir)) {
return srcAppDir;
}

// If no app directory exists, check for pages directory and create app next to it
if (await isDirectory(pagesDir)) {
// Create app directory next to pages directory
await mkdir(appDir, { recursive: true });
return appDir;
}

if (await isDirectory(srcPagesDir)) {
// Create src/app directory next to src/pages directory
await mkdir(srcAppDir, { recursive: true });
return srcAppDir;
}

throw new Error(
'Could not find Next.js app or pages directory. Expected one of: "app", "src/app", "pages", or "src/pages" to exist.'
);
}
}

Expand Down
181 changes: 181 additions & 0 deletions workbench/nextjs-turbopack/pages/api/trigger-pages.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
import type { NextApiRequest, NextApiResponse } from 'next';
import { getRun, start } from 'workflow/api';
import {
WorkflowRunFailedError,
WorkflowRunNotCompletedError,
} from 'workflow/internal/errors';
import { hydrateWorkflowArguments } from 'workflow/internal/serialization';
import { allWorkflows } from '@/_workflows';

export default async function handler(
req: NextApiRequest,
res: NextApiResponse
) {
if (req.method === 'POST') {
return handlePost(req, res);
} else if (req.method === 'GET') {
return handleGet(req, res);
} else {
res.setHeader('Allow', ['GET', 'POST']);
return res.status(405).end(`Method ${req.method} Not Allowed`);
}
}

async function handlePost(req: NextApiRequest, res: NextApiResponse) {
const workflowFile =
(req.query.workflowFile as string) || 'workflows/99_e2e.ts';
if (!workflowFile) {
return res.status(400).send('No workflowFile query parameter provided');
}
const workflows = allWorkflows[workflowFile as keyof typeof allWorkflows];
if (!workflows) {
return res.status(400).send(`Workflow file "${workflowFile}" not found`);
}

const workflowFn = (req.query.workflowFn as string) || 'simple';
if (!workflowFn) {
return res.status(400).send('No workflow query parameter provided');
}

// Handle static method lookups (e.g., "Calculator.calculate")
let workflow: unknown;
if (workflowFn.includes('.')) {
const [className, methodName] = workflowFn.split('.');
const cls = workflows[className as keyof typeof workflows];
if (cls && typeof cls === 'function') {
workflow = (cls as Record<string, unknown>)[methodName];
}
} else {
workflow = workflows[workflowFn as keyof typeof workflows];
}
if (!workflow) {
return res.status(400).send(`Workflow "${workflowFn}" not found`);
}

let args: any[] = [];

// Args from query string
const argsParam = req.query.args as string | undefined;
if (argsParam) {
args = argsParam.split(',').map((arg) => {
const num = parseFloat(arg);
return Number.isNaN(num) ? arg.trim() : num;
});
} else {
// Args from body
args = hydrateWorkflowArguments(JSON.parse(req.body), globalThis);
Comment thread
ijjk marked this conversation as resolved.
}
console.log(`Starting "${workflowFn}" workflow with args: ${args}`);

try {
const run = await start(workflow as any, args as any);
console.log('Run', run.runId);
return res.status(200).json(run);
} catch (err) {
console.error(`Failed to start!!`, err);
throw err;
}
}

async function handleGet(req: NextApiRequest, res: NextApiResponse) {
const runId = req.query.runId as string | undefined;
if (!runId) {
return res.status(400).send('No runId provided');
}

const outputStreamParam = req.query['output-stream'] as string | undefined;
if (outputStreamParam) {
const namespace = outputStreamParam === '1' ? undefined : outputStreamParam;
const run = getRun(runId);
const stream = run.getReadable({
namespace,
});

res.setHeader('Content-Type', 'application/octet-stream');

const reader = stream.getReader();
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
const data =
value instanceof Uint8Array
? { data: Buffer.from(value).toString('base64') }
: value;
res.write(`${JSON.stringify(data)}\n`);
}
} finally {
reader.releaseLock();
}
return res.end();
}

try {
const run = getRun(runId);
const returnValue = await run.returnValue;
console.log('Return value:', returnValue);

// Include run metadata in headers
const [createdAt, startedAt, completedAt] = await Promise.all([
run.createdAt,
run.startedAt,
run.completedAt,
]);

res.setHeader('X-Workflow-Run-Created-At', createdAt?.toISOString() || '');
res.setHeader('X-Workflow-Run-Started-At', startedAt?.toISOString() || '');
res.setHeader(
'X-Workflow-Run-Completed-At',
completedAt?.toISOString() || ''
);

if (returnValue instanceof ReadableStream) {
res.setHeader('Content-Type', 'application/octet-stream');
const reader = returnValue.getReader();
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
res.write(value);
}
} finally {
reader.releaseLock();
}
return res.end();
}

return res.status(200).json(returnValue);
} catch (error) {
if (error instanceof Error) {
if (WorkflowRunNotCompletedError.is(error)) {
return res.status(202).json({
...error,
name: error.name,
message: error.message,
});
}

if (WorkflowRunFailedError.is(error)) {
const cause = error.cause;
return res.status(400).json({
...error,
name: error.name,
message: error.message,
cause: {
message: cause.message,
stack: cause.stack,
code: cause.code,
},
});
}
}

console.error(
'Unexpected error while getting workflow return value:',
error
);
return res.status(500).json({
error: 'Internal server error',
});
}
}
Loading
Loading