Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions deps/undici/src/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,9 @@ undici-fetch.js

# File generated by /test/request-timeout.js
test/request-timeout.10mb.bin

# Local agent configuration
CLAUDE.md
AGENTS.md
.pi/
.claude/
3 changes: 3 additions & 0 deletions deps/undici/src/docs/docs/api/Client.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ Returns: `Client`
* **keepAliveTimeoutThreshold** `number | null` (optional) - Default: `2e3` - A number of milliseconds subtracted from server *keep-alive* hints when overriding `keepAliveTimeout` to account for timing inaccuracies caused by e.g. transport latency. Defaults to 2 seconds.
* **maxHeaderSize** `number | null` (optional) - Default: `--max-http-header-size` or `16384` - The maximum length of request headers in bytes. Defaults to Node.js' --max-http-header-size or 16KiB.
* **maxResponseSize** `number | null` (optional) - Default: `-1` - The maximum length of response body in bytes. Set to `-1` to disable.
* **webSocket** `WebSocketOptions` (optional) - WebSocket-specific configuration options.
* **maxFragments** `number` (optional) - Default: `131072` - Maximum number of fragments in a message. Set to 0 to disable the limit.
* **maxPayloadSize** `number` (optional) - Default: `134217728` (128 MB) - Maximum allowed payload size in bytes for WebSocket messages. Applied to uncompressed messages, compressed frame payloads, and decompressed (permessage-deflate) messages. Set to 0 to disable the limit.
* **pipelining** `number | null` (optional) - Default: `1` - The amount of concurrent requests to be sent over the single TCP/TLS connection according to [RFC7230](https://tools.ietf.org/html/rfc7230#section-6.3.2). Carefully consider your workload and environment before enabling concurrent requests as pipelining may reduce performance if used incorrectly. Pipelining is sensitive to network stack settings as well as head of line blocking caused by e.g. long running requests. Set to `0` to disable keep-alive connections.
* **connect** `ConnectOptions | Function | null` (optional) - Default: `null`.
* **strictContentLength** `Boolean` (optional) - Default: `true` - Whether to treat request content length mismatches as errors. If true, an error is thrown when the request content-length header doesn't match the length of the request body.
Expand Down
4 changes: 2 additions & 2 deletions deps/undici/src/lib/dispatcher/agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ function defaultFactory (origin, opts) {

class Agent extends DispatcherBase {
constructor ({ factory = defaultFactory, maxRedirections = 0, connect, ...options } = {}) {
super()

if (typeof factory !== 'function') {
throw new InvalidArgumentError('factory must be a function.')
}
Expand All @@ -38,6 +36,8 @@ class Agent extends DispatcherBase {
throw new InvalidArgumentError('maxRedirections must be a positive number')
}

super(options)

if (connect && typeof connect !== 'function') {
connect = { ...connect }
}
Expand Down
163 changes: 140 additions & 23 deletions deps/undici/src/lib/dispatcher/client-h1.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ const EMPTY_BUF = Buffer.alloc(0)
const FastBuffer = Buffer[Symbol.species]
const addListener = util.addListener
const removeAllListeners = util.removeAllListeners
const kIdleSocketValidation = Symbol('kIdleSocketValidation')
const kIdleSocketValidationTimeout = Symbol('kIdleSocketValidationTimeout')
const kSocketUsed = Symbol('kSocketUsed')

let extractBody

Expand Down Expand Up @@ -279,29 +282,71 @@ class Parser {

const offset = llhttp.llhttp_get_error_pos(this.ptr) - currentBufferPtr

if (ret === constants.ERROR.PAUSED_UPGRADE) {
this.onUpgrade(data.slice(offset))
} else if (ret === constants.ERROR.PAUSED) {
this.paused = true
socket.unshift(data.slice(offset))
} else if (ret !== constants.ERROR.OK) {
const ptr = llhttp.llhttp_get_error_reason(this.ptr)
let message = ''
/* istanbul ignore else: difficult to make a test case for */
if (ptr) {
const len = new Uint8Array(llhttp.memory.buffer, ptr).indexOf(0)
message =
'Response does not match the HTTP/1.1 protocol (' +
Buffer.from(llhttp.memory.buffer, ptr, len).toString() +
')'
if (ret !== constants.ERROR.OK) {
const body = data.subarray(offset)

if (ret === constants.ERROR.PAUSED_UPGRADE) {
this.onUpgrade(body)
} else if (ret === constants.ERROR.PAUSED) {
this.paused = true
socket.unshift(body)
} else {
throw this.createError(ret, body)
}
throw new HTTPParserError(message, constants.ERROR[ret], data.slice(offset))
}
} catch (err) {
util.destroy(socket, err)
}
}

finish () {
assert(currentParser === null)
assert(this.ptr != null)
assert(!this.paused)

const { llhttp } = this

let ret

try {
currentParser = this
ret = llhttp.llhttp_finish(this.ptr)
} finally {
currentParser = null
}

if (ret === constants.ERROR.OK) {
return null
}

if (ret === constants.ERROR.PAUSED || ret === constants.ERROR.PAUSED_UPGRADE) {
this.paused = true
return null
}

return this.createError(ret, EMPTY_BUF)
}

createError (ret, data) {
const { llhttp, contentLength, bytesRead } = this

if (contentLength && bytesRead !== parseInt(contentLength, 10)) {
return new ResponseContentLengthMismatchError()
}

const ptr = llhttp.llhttp_get_error_reason(this.ptr)
let message = ''
if (ptr) {
const len = new Uint8Array(llhttp.memory.buffer, ptr).indexOf(0)
message =
'Response does not match the HTTP/1.1 protocol (' +
Buffer.from(llhttp.memory.buffer, ptr, len).toString() +
')'
}

return new HTTPParserError(message, constants.ERROR[ret], data)
}

destroy () {
assert(this.ptr != null)
assert(currentParser == null)
Expand Down Expand Up @@ -329,6 +374,11 @@ class Parser {
return -1
}

if (client[kRunning] === 0) {
util.destroy(socket, new SocketError('bad response', util.getSocketInfo(socket)))
return -1
}

const request = client[kQueue][client[kRunningIdx]]
if (!request) {
return -1
Expand Down Expand Up @@ -432,6 +482,11 @@ class Parser {
return -1
}

if (client[kRunning] === 0) {
util.destroy(socket, new SocketError('bad response', util.getSocketInfo(socket)))
return -1
}

const request = client[kQueue][client[kRunningIdx]]

/* istanbul ignore next: difficult to make a test case for */
Expand Down Expand Up @@ -605,6 +660,7 @@ class Parser {
request.onComplete(headers)

client[kQueue][client[kRunningIdx]++] = null
socket[kSocketUsed] = true

if (socket[kWriting]) {
assert(client[kRunning] === 0)
Expand Down Expand Up @@ -663,6 +719,9 @@ async function connectH1 (client, socket) {
socket[kWriting] = false
socket[kReset] = false
socket[kBlocking] = false
socket[kIdleSocketValidation] = 0
socket[kIdleSocketValidationTimeout] = null
socket[kSocketUsed] = false
socket[kParser] = new Parser(client, socket, llhttpInstance)

addListener(socket, 'error', function (err) {
Expand All @@ -673,8 +732,11 @@ async function connectH1 (client, socket) {
// On Mac OS, we get an ECONNRESET even if there is a full body to be forwarded
// to the user.
if (err.code === 'ECONNRESET' && parser.statusCode && !parser.shouldKeepAlive) {
// We treat all incoming data so for as a valid response.
parser.onMessageComplete()
const parserErr = parser.finish()
if (parserErr) {
this[kError] = parserErr
this[kClient][kOnError](parserErr)
}
return
}

Expand All @@ -693,8 +755,10 @@ async function connectH1 (client, socket) {
const parser = this[kParser]

if (parser.statusCode && !parser.shouldKeepAlive) {
// We treat all incoming data so far as a valid response.
parser.onMessageComplete()
const parserErr = parser.finish()
if (parserErr) {
util.destroy(this, parserErr)
}
return
}

Expand All @@ -704,10 +768,11 @@ async function connectH1 (client, socket) {
const client = this[kClient]
const parser = this[kParser]

clearIdleSocketValidation(this)

if (parser) {
if (!this[kError] && parser.statusCode && !parser.shouldKeepAlive) {
// We treat all incoming data so far as a valid response.
parser.onMessageComplete()
this[kError] = parser.finish() || this[kError]
}

this[kParser].destroy()
Expand Down Expand Up @@ -770,7 +835,7 @@ async function connectH1 (client, socket) {
return socket.destroyed
},
busy (request) {
if (socket[kWriting] || socket[kReset] || socket[kBlocking]) {
if (socket[kWriting] || socket[kReset] || socket[kBlocking] || socket[kIdleSocketValidation] === 1) {
return true
}

Expand Down Expand Up @@ -808,6 +873,31 @@ async function connectH1 (client, socket) {
}
}

function clearIdleSocketValidation (socket) {
if (socket[kIdleSocketValidationTimeout]) {
clearTimeout(socket[kIdleSocketValidationTimeout])
socket[kIdleSocketValidationTimeout] = null
}

socket[kIdleSocketValidation] = 0
}

function scheduleIdleSocketValidation (client, socket) {
socket[kIdleSocketValidation] = 1
socket[kIdleSocketValidationTimeout] = setTimeout(() => {
socket[kIdleSocketValidationTimeout] = null
socket[kIdleSocketValidation] = 2

if (client[kSocket] === socket && !socket.destroyed) {
client[kResume]()
}
}, 0)
socket[kIdleSocketValidationTimeout].unref?.()
}

/**
* @param {import('./client.js')} client
*/
function resumeH1 (client) {
const socket = client[kSocket]

Expand All @@ -822,6 +912,32 @@ function resumeH1 (client) {
socket[kNoRef] = false
}

if (client[kRunning] === 0 && client[kPending] > 0 && socket[kSocketUsed]) {
if (socket[kIdleSocketValidation] === 0) {
scheduleIdleSocketValidation(client, socket)
socket[kParser].readMore()
if (socket.destroyed) {
return
}
return
}

if (socket[kIdleSocketValidation] === 1) {
socket[kParser].readMore()
if (socket.destroyed) {
return
}
return
}
}

if (client[kRunning] === 0) {
socket[kParser].readMore()
if (socket.destroyed) {
return
}
}

if (client[kSize] === 0) {
if (socket[kParser].timeoutType !== TIMEOUT_KEEP_ALIVE) {
socket[kParser].setTimeout(client[kKeepAliveTimeoutValue], TIMEOUT_KEEP_ALIVE)
Expand Down Expand Up @@ -915,6 +1031,7 @@ function writeH1 (client, request) {
}

const socket = client[kSocket]
clearIdleSocketValidation(socket)

const abort = (err) => {
if (request.aborted || request.completed) {
Expand Down
5 changes: 3 additions & 2 deletions deps/undici/src/lib/dispatcher/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,10 @@ class Client extends DispatcherBase {
autoSelectFamilyAttemptTimeout,
// h2
maxConcurrentStreams,
allowH2
allowH2,
webSocket
} = {}) {
super()
super({ webSocket })

if (keepAlive !== undefined) {
throw new InvalidArgumentError('unsupported keepAlive, use pipelining=0 instead')
Expand Down
11 changes: 10 additions & 1 deletion deps/undici/src/lib/dispatcher/dispatcher-base.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,24 @@ const { kDestroy, kClose, kClosed, kDestroyed, kDispatch, kInterceptors } = requ
const kOnDestroyed = Symbol('onDestroyed')
const kOnClosed = Symbol('onClosed')
const kInterceptedDispatch = Symbol('Intercepted Dispatch')
const kWebSocketOptions = Symbol('webSocketOptions')

class DispatcherBase extends Dispatcher {
constructor () {
constructor (opts) {
super()

this[kDestroyed] = false
this[kOnDestroyed] = null
this[kClosed] = false
this[kOnClosed] = []
this[kWebSocketOptions] = opts?.webSocket ?? {}
}

get webSocketOptions () {
return {
maxFragments: this[kWebSocketOptions].maxFragments ?? 131072,
maxPayloadSize: this[kWebSocketOptions].maxPayloadSize ?? 128 * 1024 * 1024
}
}

get destroyed () {
Expand Down
4 changes: 2 additions & 2 deletions deps/undici/src/lib/dispatcher/pool-base.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ const kRemoveClient = Symbol('remove client')
const kStats = Symbol('stats')

class PoolBase extends DispatcherBase {
constructor () {
super()
constructor (opts) {
super(opts)

this[kQueue] = new FixedQueue()
this[kClients] = []
Expand Down
4 changes: 2 additions & 2 deletions deps/undici/src/lib/dispatcher/pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ class Pool extends PoolBase {
allowH2,
...options
} = {}) {
super()

if (connections != null && (!Number.isFinite(connections) || connections < 0)) {
throw new InvalidArgumentError('invalid connections')
}
Expand All @@ -63,6 +61,8 @@ class Pool extends PoolBase {
})
}

super(options)

this[kInterceptors] = options.interceptors?.Pool && Array.isArray(options.interceptors.Pool)
? options.interceptors.Pool
: []
Expand Down
Loading
Loading