Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/fix-stream-get-runid.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@workflow/world-vercel': patch
---

Fix `streams.get()` to include `runId` in the request URL instead of always omitting it.
41 changes: 41 additions & 0 deletions packages/world-vercel/src/streamer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,47 @@ vi.mock('./utils.js', () => ({
}),
}));

describe('streams.get', () => {
async function getStreamer() {
const { createStreamer } = await import('./streamer.js');
return createStreamer();
}

afterEach(() => {
vi.restoreAllMocks();
});

it('includes runId in the fetch URL', async () => {
const fetchSpy = vi
.spyOn(globalThis, 'fetch')
.mockImplementation(
async () => new Response(new ReadableStream(), { status: 200 })
);

const streamer = await getStreamer();
await streamer.streams.get('run-123', 'my-stream');

expect(fetchSpy).toHaveBeenCalledTimes(1);
const url = new URL(fetchSpy.mock.calls[0][0] as string);
expect(url.pathname).toBe('/v2/runs/run-123/stream/my-stream');
});

it('passes startIndex as a query parameter', async () => {
const fetchSpy = vi
.spyOn(globalThis, 'fetch')
.mockImplementation(
async () => new Response(new ReadableStream(), { status: 200 })
);

const streamer = await getStreamer();
await streamer.streams.get('run-123', 'my-stream', 5);

const url = new URL(fetchSpy.mock.calls[0][0] as string);
expect(url.pathname).toBe('/v2/runs/run-123/stream/my-stream');
expect(url.searchParams.get('startIndex')).toBe('5');
});
});

describe('writeMulti pagination', () => {
/**
* Decode length-prefixed multi-chunk body to count chunks per request.
Expand Down
19 changes: 6 additions & 13 deletions packages/world-vercel/src/streamer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,10 @@ export const MAX_CHUNKS_PER_REQUEST = 1000;
// (partial writes, long-lived reads), and duplex streams are incompatible
// with undici's experimental H2 support.

function getStreamUrl(
name: string,
runId: string | undefined,
httpConfig: HttpConfig
) {
if (runId) {
return new URL(
`${httpConfig.baseUrl}/v2/runs/${encodeURIComponent(runId)}/stream/${encodeURIComponent(name)}`
);
}
return new URL(`${httpConfig.baseUrl}/v2/stream/${encodeURIComponent(name)}`);
function getStreamUrl(name: string, runId: string, httpConfig: HttpConfig) {
return new URL(
`${httpConfig.baseUrl}/v2/runs/${encodeURIComponent(runId)}/stream/${encodeURIComponent(name)}`
);
}

/**
Expand Down Expand Up @@ -188,9 +181,9 @@ export function createStreamer(config?: APIConfig): Streamer {
}
},

async get(_runId: string, name: string, startIndex?: number) {
async get(runId: string, name: string, startIndex?: number) {
const httpConfig = await getHttpConfig(config);
const url = getStreamUrl(name, undefined, httpConfig);
const url = getStreamUrl(name, runId, httpConfig);
Comment thread
TooTallNate marked this conversation as resolved.
if (typeof startIndex === 'number') {
url.searchParams.set('startIndex', String(startIndex));
}
Expand Down
Loading