Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
stream: error queued callbacks after active write has completed
  • Loading branch information
ronag committed Dec 7, 2019
commit 771a0559d94bec28d0b7fc775cfb0ed294fa0a8f
20 changes: 12 additions & 8 deletions lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ function onwriteError(stream, state, er, cb) {
// not enabled. Passing `er` here doesn't make sense since
// it's related to one specific write, not to the buffered
// writes.
errorBuffer(state, new ERR_STREAM_DESTROYED('write'), false);
errorBuffer(state, new ERR_STREAM_DESTROYED('write'));
// This can emit error, but error must always follow cb.
errorOrDestroy(stream, er);
}
Expand Down Expand Up @@ -543,19 +543,23 @@ function afterWrite(stream, state, count, cb) {
cb();
}

if (state.destroyed) {
errorBuffer(state, new ERR_STREAM_DESTROYED('write'));
}

finishMaybe(stream, state);
}

// If there's something in the buffer waiting, then invoke callbacks.
function errorBuffer(state, err, sync) {
function errorBuffer(state, err) {
if (state.writing || !state.bufferedRequest) {
return;
}

for (let entry = state.bufferedRequest; entry; entry = entry.next) {
const len = state.objectMode ? 1 : entry.chunk.length;
state.length -= len;
if (sync) {
process.nextTick(entry.callback, err);
} else {
entry.callback(err);
}
entry.callback(err);
}
state.bufferedRequest = null;
state.lastBufferedRequest = null;
Expand Down Expand Up @@ -838,7 +842,7 @@ const destroy = destroyImpl.destroy;
Writable.prototype.destroy = function(err, cb) {
const state = this._writableState;
if (!state.destroyed) {
errorBuffer(state, new ERR_STREAM_DESTROYED('write'), true);
process.nextTick(errorBuffer, state, new ERR_STREAM_DESTROYED('write'));
}
destroy.call(this, err, cb);
return this;
Expand Down
21 changes: 21 additions & 0 deletions test/parallel/test-stream-writable-destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -366,3 +366,24 @@ const assert = require('assert');
}));
write.uncork();
}

{
// Ensure callback order.

let state = 0;
const write = new Writable({
write(chunk, enc, cb) {
// `setImmediate()` is used on purpose to ensure the callback is called
// after `process.nextTick()` callbacks.
setImmediate(cb);
}
});
write.write('asd', common.mustCall(() => {
assert.strictEqual(state++, 0);
}));
write.write('asd', common.mustCall((err) => {
assert.strictEqual(err.code, 'ERR_STREAM_DESTROYED');
assert.strictEqual(state++, 1);
}));
write.destroy();
}