Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
quic: convert openStream to Promise
Although most of the time openStream will be able to create the stream
immediately, when a stream is opened before the handshake is complete
we have to wait for the handshake to be complete before continuing.
  • Loading branch information
jasnell committed Jul 23, 2020
commit 3aefa5e9791062ff794c05fe10a9a6c335c65f0d
94 changes: 62 additions & 32 deletions lib/internal/quic/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const {
Promise,
PromiseAll,
PromiseReject,
PromiseResolve,
RegExp,
Set,
Symbol,
Expand Down Expand Up @@ -213,13 +214,13 @@ const kDestroy = Symbol('kDestroy');
const kEndpointBound = Symbol('kEndpointBound');
const kEndpointClose = Symbol('kEndpointClose');
const kHandshake = Symbol('kHandshake');
const kHandshakeComplete = Symbol('kHandshakeComplete');
const kHandshakePost = Symbol('kHandshakePost');
const kHeaders = Symbol('kHeaders');
const kInternalState = Symbol('kInternalState');
const kInternalClientState = Symbol('kInternalClientState');
const kInternalServerState = Symbol('kInternalServerState');
const kListen = Symbol('kListen');
const kMakeStream = Symbol('kMakeStream');
const kMaybeBind = Symbol('kMaybeBind');
const kOnFileOpened = Symbol('kOnFileOpened');
const kOnFileUnpipe = Symbol('kOnFileUnpipe');
Expand Down Expand Up @@ -1651,6 +1652,9 @@ class QuicSession extends EventEmitter {
destroyed: false,
earlyData: false,
handshakeComplete: false,
handshakeCompletePromise: undefined,
handshakeCompletePromiseResolve: undefined,
handshakeCompletePromiseReject: undefined,
idleTimeout: false,
maxPacketLength: NGTCP2_DEFAULT_MAX_PKTLEN,
servername: undefined,
Expand Down Expand Up @@ -1715,6 +1719,26 @@ class QuicSession extends EventEmitter {
});
}

[kHandshakeComplete]() {
const state = this[kInternalState];
if (state.handshakeComplete)
return PromiseResolve();

if (state.handshakeCompletePromise !== undefined)
return state.handshakeCompletePromise;

state.handshakeCompletePromise = new Promise((resolve, reject) => {
state.handshakeCompletePromiseResolve = resolve;
state.handshakeCompletePromiseReject = reject;
}).finally(() => {
state.handshakeCompletePromise = undefined;
state.handshakeCompletePromiseReject = undefined;
state.handshakeCompletePromiseResolve = undefined;
});

return state.handshakeCompletePromise;
}

// Sets the internal handle for the QuicSession instance. For
// server QuicSessions, this is called immediately as the
// handle is created before the QuicServerSession JS object.
Expand Down Expand Up @@ -1827,8 +1851,18 @@ class QuicSession extends EventEmitter {
state.verifyErrorReason = verifyErrorReason;
state.verifyErrorCode = verifyErrorCode;
state.earlyData = earlyData;
if (!this[kHandshakePost]())

if (!this[kHandshakePost]()) {
if (typeof state.handshakeCompletePromiseReject === 'function') {
// TODO(@jasnell): Proper error
state.handshakeCompletePromiseReject(
new ERR_OPERATION_FAILED('Handshake failed'));
}
return;
}

if (typeof state.handshakeCompletePromiseResolve === 'function')
state.handshakeCompletePromiseResolve();

process.nextTick(() => {
try {
Expand Down Expand Up @@ -1971,6 +2005,12 @@ class QuicSession extends EventEmitter {
} else if (typeof state.closePromiseResolve === 'function')
state.closePromiseResolve();

if (typeof state.handshakeCompletePromiseReject === 'function') {
// TODO(@jasnell): Proper error
state.handshakeCompletePromiseReject(
new ERR_OPERATION_FAILED('Handshake failed'));
}

process.nextTick(emit.bind(this, 'close'));
}

Expand Down Expand Up @@ -2113,8 +2153,7 @@ class QuicSession extends EventEmitter {
return this[kInternalState].statelessReset;
}

openStream(options) {
const state = this[kInternalState];
async openStream(options) {
if (this.destroyed) {
throw new ERR_INVALID_STATE(
`${this.constructor.name} is already destroyed`);
Expand All @@ -2123,51 +2162,42 @@ class QuicSession extends EventEmitter {
throw new ERR_INVALID_STATE(
`${this.constructor.name} is closing`);
}

const {
halfOpen, // Unidirectional or Bidirectional
highWaterMark,
defaultEncoding,
} = validateQuicStreamOptions(options);

const stream = new QuicStream({
highWaterMark,
defaultEncoding,
readable: !halfOpen
}, this);
await this[kHandshakeComplete]();

state.pendingStreams.add(stream);

// If early data is being used, we can create the internal QuicStream on the
// ready event, that is immediately after the internal QuicSession handle
// has been created. Otherwise, we have to wait until the secure event
// signaling the completion of the TLS handshake.
const makeStream = QuicSession[kMakeStream].bind(this, stream, halfOpen);
let deferred = false;
if (!this.handshakeComplete) {
deferred = true;
this.once('secure', makeStream);
if (this.destroyed) {
throw new ERR_INVALID_STATE(
`${this.constructor.name} is already destroyed`);
}
if (this.closing) {
throw new ERR_INVALID_STATE(
`${this.constructor.name} is closing`);
}

if (!deferred)
makeStream(stream, halfOpen);

return stream;
}

static [kMakeStream](stream, halfOpen) {
this[kInternalState].pendingStreams.delete(stream);
const handle =
halfOpen ?
_openUnidirectionalStream(this[kHandle]) :
_openBidirectionalStream(this[kHandle]);

if (handle === undefined) {
stream.destroy(new ERR_OPERATION_FAILED('Unable to create QuicStream'));
return;
}
if (handle === undefined)
throw new ERR_OPERATION_FAILED('Unable to create QuicStream');

const stream = new QuicStream({
highWaterMark,
defaultEncoding,
readable: !halfOpen
}, this);

stream[kSetHandle](handle);
this[kAddStream](stream.id, stream);

return stream;
}

get duration() {
Expand Down
8 changes: 3 additions & 5 deletions test/parallel/test-quic-client-connect-multiple-parallel.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,9 @@ async function connect(server, client) {
for (let i = 0; i < kCount; i++) {
const server = createQuicSocket({ server: options });

server.on('session', common.mustCall((session) => {
session.on('secure', common.mustCall(() => {
const stream = session.openStream({ halfOpen: true });
stream.end('Hi!');
}));
server.on('session', common.mustCall(async (session) => {
const stream = await session.openStream({ halfOpen: true });
stream.end('Hi!');
}));

server.on('close', common.mustCall());
Expand Down
8 changes: 3 additions & 5 deletions test/parallel/test-quic-client-connect-multiple-sequential.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,9 @@ async function connect(server, client) {
for (let i = 0; i < kCount; i++) {
const server = createQuicSocket({ server: options });

server.on('session', common.mustCall((session) => {
session.on('secure', common.mustCall(() => {
const stream = session.openStream({ halfOpen: true });
stream.end('Hi!');
}));
server.on('session', common.mustCall(async (session) => {
const stream = await session.openStream({ halfOpen: true });
stream.end('Hi!');
}));

server.on('close', common.mustCall());
Expand Down
2 changes: 1 addition & 1 deletion test/parallel/test-quic-client-empty-preferred-address.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ const options = { key, cert, ca, alpn: 'zzz' };
preferredAddressPolicy: 'accept',
});

const stream = clientSession.openStream();
const stream = await clientSession.openStream();
stream.end('hello');

await Promise.all([
Expand Down
10 changes: 5 additions & 5 deletions test/parallel/test-quic-client-server.js
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ client.on('close', common.mustCall(onSocketClose.bind(client)));
});
}));

session.on('secure', common.mustCall((servername, alpn, cipher) => {
session.on('secure', common.mustCall(async (servername, alpn, cipher) => {
debug('QuicServerSession TLS Handshake Complete');
debug(' Server name: %s', servername);
debug(' ALPN: %s', alpn);
Expand All @@ -143,7 +143,7 @@ client.on('close', common.mustCall(onSocketClose.bind(client)));
assert(session.authenticated);
assert.strictEqual(session.authenticationError, undefined);

const uni = session.openStream({ halfOpen: true });
const uni = await session.openStream({ halfOpen: true });
assert(uni.unidirectional);
assert(!uni.bidirectional);
assert(uni.serverInitiated);
Expand Down Expand Up @@ -221,8 +221,8 @@ client.on('close', common.mustCall(onSocketClose.bind(client)));
name: 'Error'
};
assert.throws(() => session.ping(), err);
assert.throws(() => session.openStream(), err);
assert.throws(() => session.updateKey(), err);
assert.rejects(() => session.openStream(), err);
}));
}));

Expand Down Expand Up @@ -264,7 +264,7 @@ client.on('close', common.mustCall(onSocketClose.bind(client)));
debug(' Params: %s', params.toString('hex'));
}, 2));

req.on('secure', common.mustCall((servername, alpn, cipher) => {
req.on('secure', common.mustCall(async (servername, alpn, cipher) => {
debug('QuicClientSession TLS Handshake Complete');
debug(' Server name: %s', servername);
debug(' ALPN: %s', alpn);
Expand Down Expand Up @@ -308,7 +308,7 @@ client.on('close', common.mustCall(onSocketClose.bind(client)));
}

const file = fs.createReadStream(__filename);
const stream = req.openStream();
const stream = await req.openStream();
file.pipe(stream);
let data = '';
stream.resume();
Expand Down
59 changes: 31 additions & 28 deletions test/parallel/test-quic-errors-quicsession-openstream.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ if (!common.hasQuic)
// Test errors thrown when openStream is called incorrectly
// or is not permitted

const { once } = require('events');
const { createHook } = require('async_hooks');
const assert = require('assert');
const { createQuicSocket } = require('net');
Expand All @@ -18,18 +19,12 @@ createHook({
}
}).enable();

const Countdown = require('../common/countdown');
const { key, cert, ca } = require('../common/quic');

const options = { key, cert, ca, alpn: 'zzz', maxStreamsUni: 0 };
const server = createQuicSocket({ server: options });
const client = createQuicSocket({ client: options });

const countdown = new Countdown(1, () => {
server.close();
client.close();
});

server.on('close', common.mustCall());
client.on('close', common.mustCall());

Expand All @@ -44,40 +39,48 @@ client.on('close', common.mustCall());
port: server.endpoints[0].address.port
});

['z', 1, {}, [], null, Infinity, 1n].forEach((i) => {
assert.throws(
() => req.openStream({ halfOpen: i }),
{ code: 'ERR_INVALID_ARG_TYPE' }
);
});
for (const halfOpen of ['z', 1, {}, [], null, Infinity, 1n]) {
await assert.rejects(req.openStream({ halfOpen }), {
code: 'ERR_INVALID_ARG_TYPE'
});
}

['', 1n, {}, [], false, 'zebra'].forEach((defaultEncoding) => {
assert.throws(() => req.openStream({ defaultEncoding }), {
for (const defaultEncoding of ['', 1n, {}, [], false, 'zebra']) {
await assert.rejects(req.openStream({ defaultEncoding }), {
code: 'ERR_INVALID_ARG_VALUE'
});
});
}

[-1, Number.MAX_SAFE_INTEGER + 1].forEach((highWaterMark) => {
assert.throws(() => req.openStream({ highWaterMark }), {
for (const highWaterMark of [-1, Number.MAX_SAFE_INTEGER + 1]) {
await assert.rejects(req.openStream({ highWaterMark }), {
code: 'ERR_OUT_OF_RANGE'
});
});
}

['a', 1n, [], {}, false].forEach((highWaterMark) => {
assert.throws(() => req.openStream({ highWaterMark }), {
for (const highWaterMark of ['a', 1n, [], {}, false]) {
await assert.rejects(req.openStream({ highWaterMark }), {
code: 'ERR_INVALID_ARG_TYPE'
});
});
}

// Unidirectional streams are not allowed. openStream will succeeed
// but the stream will be destroyed immediately. The underlying
// QuicStream C++ handle will not be created.
req.openStream({
halfOpen: true,
highWaterMark: 10,
defaultEncoding: 'utf16le'
}).on('error', common.expectsError({
code: 'ERR_OPERATION_FAILED'
})).on('error', common.mustCall(() => countdown.dec()));
await assert.rejects(
req.openStream({
halfOpen: true,
highWaterMark: 10,
defaultEncoding: 'utf16le'
}), {
code: 'ERR_OPERATION_FAILED'
});

server.close();
client.close();

await Promise.all([
once(server, 'close'),
once(client, 'close')
]);

})().then(common.mustCall());
Loading