Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 133 additions & 0 deletions loadtest/sse-test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
import http2 from 'node:http2';
import { createH2Fetch } from '../src/lib/h2-transport/index.ts';
import { execSync } from 'node:child_process';
import fs from 'node:fs';
import os from 'node:os';
import path from 'node:path';

// Generate self-signed certs for the test server
const tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), 'sse-test-'));
const keyPath = path.join(tmpDir, 'key.pem');
const certPath = path.join(tmpDir, 'cert.pem');

execSync(
`openssl req -x509 -newkey rsa:2048 -keyout ${keyPath} -out ${certPath} ` +
`-days 1 -nodes -subj "/CN=localhost" 2>/dev/null`,
);

process.env.NODE_TLS_REJECT_UNAUTHORIZED = '0';

async function startSSEServer(): Promise<{ port: number; close: () => void }> {
return new Promise((resolve) => {
const server = http2.createSecureServer({
key: fs.readFileSync(keyPath),
cert: fs.readFileSync(certPath),
});

server.on('stream', (stream: http2.ServerHttp2Stream, headers) => {
if (headers[':path'] === '/v1/test/sse') {
stream.respond({
':status': 200,
'content-type': 'text/event-stream',
'cache-control': 'no-cache',
});

const events = [
'data: {"id":1,"message":"hello"}\n\n',
'data: {"id":2,"message":"world"}\n\n',
'data: {"id":3,"message":"streaming done"}\n\n',
];

let i = 0;
const interval = setInterval(() => {
if (i < events.length) {
stream.write(events[i]);
i++;
} else {
clearInterval(interval);
stream.end();
}
}, 50);
} else {
stream.respond({ ':status': 200, 'content-type': 'application/json' });
stream.end(JSON.stringify({ ok: true }));
}
});

server.listen(0, () => {
const port = (server.address() as any).port;
resolve({ port, close: () => server.close() });
});
});
}

async function main() {
const { port, close } = await startSSEServer();
console.log('SSE test server on port', port);

const h2Fetch = createH2Fetch();

// Test 1: Normal JSON request
console.log('\n--- Test 1: Normal JSON request ---');
const jsonResp = (await h2Fetch(`https://localhost:${port}/v1/test/json`, {
method: 'GET',
headers: {},
})) as any;
console.log('Status:', jsonResp.status);
const jsonBody = await jsonResp.json();
console.log('Body:', jsonBody);
console.log('PASS');

// Test 2: SSE streaming via getReader
console.log('\n--- Test 2: SSE streaming (getReader) ---');
const sseResp = (await h2Fetch(`https://localhost:${port}/v1/test/sse`, {
method: 'GET',
headers: { accept: 'text/event-stream' },
})) as any;
console.log('Status:', sseResp.status);
console.log('Content-Type:', sseResp.headers.get('content-type'));

const reader = sseResp.body.getReader();
const decoder = new TextDecoder();
const events: string[] = [];

while (true) {
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value, { stream: true });
events.push(text);
console.log(' Chunk:', JSON.stringify(text));
}
reader.releaseLock();
console.log('Chunks received:', events.length);
console.log('PASS');

// Test 3: SSE via async iteration (Node 18+ fast path)
console.log('\n--- Test 3: SSE streaming (async iteration) ---');
const sseResp2 = (await h2Fetch(`https://localhost:${port}/v1/test/sse`, {
method: 'GET',
headers: { accept: 'text/event-stream' },
})) as any;

const iterEvents: string[] = [];
for await (const chunk of sseResp2.body) {
const text = decoder.decode(chunk, { stream: true });
iterEvents.push(text);
console.log(' Chunk:', JSON.stringify(text));
}
console.log('Chunks received:', iterEvents.length);
console.log('PASS');

console.log('\n=== All SSE tests PASSED ===');
await h2Fetch.close();
close();

// Cleanup
fs.rmSync(tmpDir, { recursive: true });
process.exit(0);
}

main().catch((err) => {
console.error('FAIL:', err);
process.exit(1);
});
29 changes: 29 additions & 0 deletions src/lib/h2-transport/headers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import type http2 from 'node:http2';

/**
* Minimal Headers implementation satisfying the SDK's usage:
* response.headers.get('content-type')
* response.headers.entries()
*/
export class H2Headers {
private readonly map: Map<string, string>;

constructor(raw: http2.IncomingHttpHeaders) {
this.map = new Map();
for (const [key, value] of Object.entries(raw)) {
if (key.startsWith(':')) continue;
const normalized = Array.isArray(value) ? value.join(', ') : value;
if (normalized !== undefined) {
this.map.set(key.toLowerCase(), normalized);
}
}
}

get(name: string): string | null {
return this.map.get(name.toLowerCase()) ?? null;
}

entries(): IterableIterator<[string, string]> {
return this.map.entries();
}
}
139 changes: 139 additions & 0 deletions src/lib/h2-transport/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/**
* A high-performance HTTP/2 fetch adapter built on Node's native `node:http2`.
*
* Replaces undici for HTTP/2 transport. Manages a pool of persistent H2 sessions
* per origin, multiplexing requests as concurrent streams. Each session handles up
* to the server-advertised MAX_CONCURRENT_STREAMS; the pool auto-scales between
* minConnections and maxConnections as load requires.
*
* Usage:
* const fetch = createH2Fetch({ maxConnections: 20 });
* const response = await fetch('https://api.example.com/v1/items', {
* method: 'POST',
* headers: { 'content-type': 'application/json' },
* body: JSON.stringify({ name: 'test' }),
* });
*/

import { Readable } from 'node:stream';
import { H2Pool, type H2PoolOptions } from './pool';
import { MultipartBody } from '../../_shims/MultipartBody';
import { type Fetch } from '../../core';

const MIN_NODE_MAJOR = 18;

function checkNodeVersion(): void {
const match = process.versions?.node?.match(/^(\d+)/);
if (!match?.[1]) return; // non-Node runtime, let it fail naturally
const major = parseInt(match[1], 10);
if (major < MIN_NODE_MAJOR) {
throw new Error(
`h2-transport requires Node.js ${MIN_NODE_MAJOR} or later (found ${process.versions.node}). ` +
`The HTTP/2 transport depends on node:stream/web which is not available in older versions.`,
);
}
}

export type { H2PoolOptions as H2FetchOptions };

export type H2Fetch = Fetch & {
close: () => Promise<void>;
/**
* Pre-establish the minimum HTTP/2 sessions for the URL's origin.
* Path and query components are ignored.
*/
warmUp: (originOrUrl: string | URL) => Promise<void>;
};

function normalizeBody(body: unknown): string | Buffer | null {
if (body == null) return null;
if (typeof body === 'string') return body;
if (Buffer.isBuffer(body)) return body;
if (body instanceof MultipartBody) return normalizeBody((body as MultipartBody).body);
if (body instanceof Readable) {
// Multipart bodies arrive as Node Readable. For H2, we need to buffer them
// since session.request() takes string | Buffer. The Readable is a
// FormDataEncoder stream with a known content-length, so buffering is safe.
// Streaming uploads could be added later if needed.
throw new Error(
'h2-transport: streaming request bodies (Readable) are not yet supported. ' +
'Use a string or Buffer body.',
);
}
if (ArrayBuffer.isView(body)) return Buffer.from(body.buffer, body.byteOffset, body.byteLength);
if (body instanceof ArrayBuffer) return Buffer.from(body);
return String(body);
}

function parseURL(url: unknown): URL {
if (typeof url === 'string') return new URL(url);
if (url instanceof URL) return url;

const requestUrl = (url as { url?: unknown })?.url;
if (typeof requestUrl === 'string') return new URL(requestUrl);

return new URL(String(url));
}

/**
* Build a fetch adapter backed by native HTTP/2 connection pools.
*
* Compatible with the SDK's `makeHttp2Fetch` interface: called with no arguments
* for `http2: true`, or with options for `http2: <H2FetchOptions>`.
*/
export function createH2Fetch(options?: H2PoolOptions): H2Fetch {
checkNodeVersion();
const pools = new Map<string, H2Pool>();

function getPool(origin: string): H2Pool {
let pool = pools.get(origin);
if (!pool) {
pool = new H2Pool(origin, options);
pools.set(origin, pool);
}
return pool;
}

const h2Fetch = (async (url, init) => {
const {
agent: _ignored, // node-fetch artifact injected by core.ts
body: rawBody,
signal,
method = 'GET',
headers: rawHeaders,
} = (init ?? {}) as any;

const parsed = parseURL(url);
const path = parsed.pathname + parsed.search;
const body = normalizeBody(rawBody);

const reqHeaders: Record<string, string> = {};
if (rawHeaders) {
if (typeof rawHeaders.entries === 'function') {
for (const [k, v] of rawHeaders.entries()) {
reqHeaders[k.toLowerCase()] = v;
}
} else {
for (const [k, v] of Object.entries(rawHeaders as Record<string, string>)) {
reqHeaders[k.toLowerCase()] = v;
}
}
}

const pool = getPool(parsed.origin);
return pool.request(path, method.toUpperCase(), reqHeaders, body, signal) as any;
}) as H2Fetch;

h2Fetch.close = async () => {
const closeTasks = [...pools.values()].map((pool) => pool.close());
pools.clear();
await Promise.all(closeTasks);
};

h2Fetch.warmUp = async (originOrUrl) => {
const parsed = parseURL(originOrUrl);
await getPool(parsed.origin).warmUp();
};

return h2Fetch;
}
Loading