Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/backport-streaming-polling.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@workflow/world-local": patch
---

Add filesystem polling for cross-process stream reads
212 changes: 99 additions & 113 deletions packages/core/e2e/e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -272,43 +272,36 @@ describe('e2e', () => {
}
);

// ReadableStream return values use the world's streaming infrastructure which
// requires in-process access. The local world's streamer uses an in-process EventEmitter
// that doesn't work cross-process (test runner ↔ workbench app).
test.skipIf(isLocalDeployment())(
'readableStreamWorkflow',
{ timeout: 120_000 },
async () => {
const run = await start(await e2e('readableStreamWorkflow'), []);
const returnValue = await run.returnValue;
expect(returnValue).toBeInstanceOf(ReadableStream);

const expected = '0\n1\n2\n3\n4\n5\n6\n7\n8\n9\n';
const decoder = new TextDecoder();
let contents = '';
// Read chunks until we have all expected content or hit a timeout.
// On Vercel, the stream close event can be delayed even after all
// chunks are delivered, so we stop once we have the expected data
// rather than waiting for the stream to end.
const reader = returnValue.getReader();
const readDeadline = Date.now() + 60_000;
try {
while (Date.now() < readDeadline) {
const { done, value } = await Promise.race([
reader.read(),
sleep(30_000).then(() => ({ done: true, value: undefined })),
]);
if (value) {
contents += decoder.decode(value, { stream: true });
}
if (done || contents.length >= expected.length) break;
test('readableStreamWorkflow', { timeout: 120_000 }, async () => {
const run = await start(await e2e('readableStreamWorkflow'), []);
const returnValue = await run.returnValue;
expect(returnValue).toBeInstanceOf(ReadableStream);

const expected = '0\n1\n2\n3\n4\n5\n6\n7\n8\n9\n';
const decoder = new TextDecoder();
let contents = '';
// Read chunks until we have all expected content or hit a timeout.
// On Vercel, the stream close event can be delayed even after all
// chunks are delivered, so we stop once we have the expected data
// rather than waiting for the stream to end.
const reader = returnValue.getReader();
const readDeadline = Date.now() + 60_000;
try {
while (Date.now() < readDeadline) {
const { done, value } = await Promise.race([
reader.read(),
sleep(30_000).then(() => ({ done: true, value: undefined })),
]);
if (value) {
contents += decoder.decode(value, { stream: true });
}
} finally {
reader.releaseLock();
if (done || contents.length >= expected.length) break;
}
expect(contents).toBe(expected);
} finally {
reader.releaseLock();
}
);
expect(contents).toBe(expected);
});

test('hookWorkflow', { timeout: 60_000 }, async () => {
const token = Math.random().toString(36).slice(2);
Expand Down Expand Up @@ -623,7 +616,7 @@ describe('e2e', () => {
// and 2 chunks to the "test" named stream:
// chunk 0: binary "Hello, named stream!"
// chunk 1: object { foo: 'bar' }
describe.skipIf(isLocalDeployment())('outputStreamWorkflow', () => {
describe('outputStreamWorkflow', () => {
const startIndexCases = [
{
name: 'no startIndex (reads all chunks)',
Expand Down Expand Up @@ -707,88 +700,85 @@ describe('e2e', () => {
}
});

describe.skipIf(isLocalDeployment())(
'outputStreamWorkflow - getTailIndex and getStreamChunks',
() => {
test(
'getTailIndex returns correct index after stream completes',
{
timeout: 60_000,
},
async () => {
const run = await start(await e2e('outputStreamWorkflow'), []);
await run.returnValue;
describe('outputStreamWorkflow - getTailIndex and getStreamChunks', () => {
test(
'getTailIndex returns correct index after stream completes',
{
timeout: 60_000,
},
async () => {
const run = await start(await e2e('outputStreamWorkflow'), []);
await run.returnValue;

const readable = run.getReadable();
const tailIndex = await readable.getTailIndex();
const readable = run.getReadable();
const tailIndex = await readable.getTailIndex();

// outputStreamWorkflow writes 2 chunks to the default stream
expect(tailIndex).toBe(1);
}
);
// outputStreamWorkflow writes 2 chunks to the default stream
expect(tailIndex).toBe(1);
}
);

test(
'getTailIndex returns -1 before any chunks are written',
{
timeout: 60_000,
},
async () => {
const run = await start(await e2e('outputStreamWorkflow'), []);

// Don't await returnValue — check immediately while stream is
// still being written (or hasn't started yet). The world should
// report tailIndex = -1 for streams with no data.
const readable = run.getReadable({ namespace: 'nonexistent' });
const tailIndex = await readable.getTailIndex();
expect(tailIndex).toBe(-1);
}
);
test(
'getTailIndex returns -1 before any chunks are written',
{
timeout: 60_000,
},
async () => {
const run = await start(await e2e('outputStreamWorkflow'), []);

test(
'getStreamChunks returns same content as reading the stream',
{
timeout: 60_000,
},
async () => {
const run = await start(await e2e('outputStreamWorkflow'), []);
await run.returnValue;
// Don't await returnValue — check immediately while stream is
// still being written (or hasn't started yet). The world should
// report tailIndex = -1 for streams with no data.
const readable = run.getReadable({ namespace: 'nonexistent' });
const tailIndex = await readable.getTailIndex();
expect(tailIndex).toBe(-1);
}
);

// Read all chunks via the stream
const reader = run.getReadable().getReader();
const streamChunks: unknown[] = [];
while (true) {
const { done, value } = await reader.read();
if (done) break;
streamChunks.push(value);
}
test(
'getStreamChunks returns same content as reading the stream',
{
timeout: 60_000,
},
async () => {
const run = await start(await e2e('outputStreamWorkflow'), []);
await run.returnValue;

// Read all chunks via the stream
const reader = run.getReadable().getReader();
const streamChunks: unknown[] = [];
while (true) {
const { done, value } = await reader.read();
if (done) break;
streamChunks.push(value);
}

// Read all chunks via getStreamChunks pagination
const world = getWorld();
const streamName = `${run.runId.replace('wrun_', 'strm_')}_user`;
const paginatedChunks: Uint8Array[] = [];
let cursor: string | null = null;
do {
const page = await world.getStreamChunks(streamName, run.runId, {
limit: 1, // small page size to exercise pagination
...(cursor ? { cursor } : {}),
});
for (const chunk of page.data) {
paginatedChunks.push(chunk.data);
}
cursor = page.cursor;
if (!page.hasMore) {
expect(page.done).toBe(true);
}
} while (cursor);
// Read all chunks via getStreamChunks pagination
const world = getWorld();
const streamName = `${run.runId.replace('wrun_', 'strm_')}_user`;
const paginatedChunks: Uint8Array[] = [];
let cursor: string | null = null;
do {
const page = await world.getStreamChunks(streamName, run.runId, {
limit: 1, // small page size to exercise pagination
...(cursor ? { cursor } : {}),
});
for (const chunk of page.data) {
paginatedChunks.push(chunk.data);
}
cursor = page.cursor;
if (!page.hasMore) {
expect(page.done).toBe(true);
}
} while (cursor);

// Both methods should return the same number of chunks
expect(paginatedChunks).toHaveLength(streamChunks.length);
}
);
}
);
// Both methods should return the same number of chunks
expect(paginatedChunks).toHaveLength(streamChunks.length);
}
);
});

test.skipIf(isLocalDeployment())(
test(
'outputStreamInsideStepWorkflow - getWritable() called inside step functions',
{ timeout: 60_000 },
async () => {
Expand Down Expand Up @@ -2184,10 +2174,6 @@ describe('e2e', () => {
// ============================================================
// Resilient start: run completes even when run_created fails
// ============================================================
// TODO: Switch this to a stream-based workflow (e.g. readableStreamWorkflow)
// to also verify that serialization, flushing, and binary data work correctly
// over the queue boundary. Currently using addTenWorkflow to avoid the
// skipIf(isLocalDeployment()) barrier that stream tests require.
test(
'resilient start: addTenWorkflow completes when run_created returns 500',
{ timeout: 60_000 },
Expand Down
86 changes: 86 additions & 0 deletions packages/world-local/src/streamer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,92 @@ describe('streamer', () => {
});
});

describe('cross-process polling', () => {
it('should deliver chunks via filesystem polling when EventEmitter is bypassed', async () => {
// Simulate cross-process streaming: write chunk files directly to
// disk (bypassing streamer.writeToStream and thus the EventEmitter)
// and verify the polling-based reader picks them up.
const testDir = await fs.mkdtemp(
path.join(os.tmpdir(), 'streamer-poll-test-')
);
onTestFinished(async (ctx) => {
if (!ctx.task.result?.errors?.length) {
await fs.rm(testDir, { recursive: true, force: true });
}
});

const streamer = createStreamer(testDir);
const streamName = 'poll-test';
const chunksDir = path.join(testDir, 'streams', 'chunks');
await fs.mkdir(chunksDir, { recursive: true });

// Start reading — sets up EventEmitter listeners + polling interval
const stream = await streamer.readFromStream(streamName);
const reader = stream.getReader();
const chunks: string[] = [];

const readPromise = (async () => {
let done = false;
while (!done) {
const result = await reader.read();
done = result.done;
if (result.value) {
chunks.push(Buffer.from(result.value).toString());
}
}
})();

// Let polling start
await new Promise((resolve) => setTimeout(resolve, 50));

// Write chunk files directly — no EventEmitter involved
const chunk1 = serializeChunk({
eof: false,
chunk: Buffer.from('hello'),
});
await fs.writeFile(
path.join(
chunksDir,
`${streamName}-chnk_01ARZ3NDEKTSV4RRFFQ69G5FAV.bin`
),
chunk1
);

await new Promise((resolve) => setTimeout(resolve, 200));

const chunk2 = serializeChunk({
eof: false,
chunk: Buffer.from(' world'),
});
await fs.writeFile(
path.join(
chunksDir,
`${streamName}-chnk_01ARZ3NDEKTSV4RRFFQ69G5FAW.bin`
),
chunk2
);

await new Promise((resolve) => setTimeout(resolve, 200));

// Write EOF chunk to close the stream
const eofChunk = serializeChunk({
eof: true,
chunk: Buffer.from([]),
});
await fs.writeFile(
path.join(
chunksDir,
`${streamName}-chnk_01ARZ3NDEKTSV4RRFFQ69G5FAX.bin`
),
eofChunk
);

await readPromise;

expect(chunks.join('')).toBe('hello world');
}, 10000);
});

describe('integration scenarios', () => {
it('should handle complete write-close-read cycle', async () => {
const { streamer } = await setupStreamer();
Expand Down
Loading
Loading