Skip to content

Commit b934636

Browse files
author
kalenikalexander
committed
streams: pipeline error if any stream is destroyed
Fixes: nodejs#36674 Refs: nodejs#29227 (comment)
1 parent 2da3611 commit b934636

File tree

2 files changed

+34
-0
lines changed

2 files changed

+34
-0
lines changed

lib/internal/streams/pipeline.js

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,11 @@ function pipeline(...streams) {
274274
throw new ERR_INVALID_ARG_TYPE(
275275
name, ['Stream', 'Function'], stream);
276276
}
277+
278+
if (isStream(ret) && ret.destroyed) {
279+
finish(new ERR_STREAM_DESTROYED(reading ? 'pipe' : 'write'));
280+
break;
281+
}
277282
}
278283

279284
// TODO(ronag): Consider returning a Duplex proxy if the first argument

test/parallel/test-stream-pipeline.js

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1301,3 +1301,32 @@ const net = require('net');
13011301
assert.strictEqual(res, '123');
13021302
}));
13031303
}
1304+
1305+
{
1306+
const r = new Readable();
1307+
const t = new PassThrough();
1308+
const w = new Writable();
1309+
1310+
w.destroy();
1311+
1312+
pipeline([r, t, w], common.mustCall((err) => {
1313+
assert.strictEqual(err.code, 'ERR_STREAM_DESTROYED');
1314+
assert.strictEqual(err.message, 'Cannot call write after a stream was destroyed');
1315+
assert.strictEqual(r.destroyed, true);
1316+
assert.strictEqual(t.destroyed, true);
1317+
}));
1318+
}
1319+
1320+
{
1321+
const r = new Readable();
1322+
const t = new PassThrough();
1323+
const w = new Writable();
1324+
1325+
t.destroy();
1326+
1327+
pipeline([r, t, w], common.mustCall((err) => {
1328+
assert.strictEqual(err.code, 'ERR_STREAM_DESTROYED');
1329+
assert.strictEqual(err.message, 'Cannot call pipe after a stream was destroyed');
1330+
assert.strictEqual(r.destroyed, true);
1331+
}));
1332+
}

0 commit comments

Comments
 (0)