From bb8527f4b5a18dedd7f7d128f2a3cb66247429ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20L=C3=BCck?= Date: Thu, 6 Jun 2019 18:42:53 +0200 Subject: [PATCH] Clean up unneeded references for unwrapped streams when closing --- src/UnwrapReadableStream.php | 25 +++++++++----- src/UnwrapWritableStream.php | 6 ++-- tests/UnwrapReadableTest.php | 63 ++++++++++++++++++++++++++++++++++++ tests/UnwrapWritableTest.php | 33 +++++++++++++++++++ 4 files changed, 117 insertions(+), 10 deletions(-) diff --git a/src/UnwrapReadableStream.php b/src/UnwrapReadableStream.php index 5b93fc8..acd23be 100644 --- a/src/UnwrapReadableStream.php +++ b/src/UnwrapReadableStream.php @@ -31,7 +31,7 @@ public function __construct(PromiseInterface $promise) $this->promise = $promise->then( function ($stream) { - if (!($stream instanceof ReadableStreamInterface)) { + if (!$stream instanceof ReadableStreamInterface) { throw new InvalidArgumentException('Not a readable stream'); } return $stream; @@ -80,6 +80,9 @@ function ($e) use ($out, &$closed) { $out->emit('error', array($e, $out)); $out->close(); } + + // resume() and pause() may attach to this promise, so ensure we actually reject here + throw $e; } ); } @@ -91,16 +94,20 @@ public function isReadable() public function pause() { - $this->promise->then(function (ReadableStreamInterface $stream) { - $stream->pause(); - }); + if ($this->promise !== null) { + $this->promise->then(function (ReadableStreamInterface $stream) { + $stream->pause(); + }); + } } public function resume() { - $this->promise->then(function (ReadableStreamInterface $stream) { - $stream->resume(); - }); + if ($this->promise !== null) { + $this->promise->then(function (ReadableStreamInterface $stream) { + $stream->resume(); + }); + } } public function pipe(WritableStreamInterface $dest, array $options = array()) @@ -122,7 +129,9 @@ public function close() if ($this->promise instanceof CancellablePromiseInterface) { $this->promise->cancel(); } + $this->promise = null; - $this->emit('close', array($this)); + $this->emit('close'); + $this->removeAllListeners(); } } diff --git a/src/UnwrapWritableStream.php b/src/UnwrapWritableStream.php index 3221305..3f05444 100644 --- a/src/UnwrapWritableStream.php +++ b/src/UnwrapWritableStream.php @@ -35,7 +35,7 @@ public function __construct(PromiseInterface $promise) $this->promise = $promise->then( function ($stream) { - if (!($stream instanceof WritableStreamInterface)) { + if (!$stream instanceof WritableStreamInterface) { throw new InvalidArgumentException('Not a writable stream'); } return $stream; @@ -156,7 +156,9 @@ public function close() if ($this->promise instanceof CancellablePromiseInterface) { $this->promise->cancel(); } + $this->promise = $this->stream = null; - $this->emit('close', array($this)); + $this->emit('close'); + $this->removeAllListeners(); } } diff --git a/tests/UnwrapReadableTest.php b/tests/UnwrapReadableTest.php index 1297131..a6db4de 100644 --- a/tests/UnwrapReadableTest.php +++ b/tests/UnwrapReadableTest.php @@ -200,6 +200,30 @@ public function testForwardsPauseToInputStream() $stream->pause(); } + /** + * @doesNotPerformAssertions + */ + public function testPauseAfterCloseHasNoEffect() + { + $promise = new \React\Promise\Promise(function () { }); + $stream = Stream\unwrapReadable($promise); + + $stream->close(); + $stream->pause(); + } + + + /** + * @doesNotPerformAssertions + */ + public function testPauseAfterErrorDueToInvalidInputHasNoEffect() + { + $promise = \React\Promise\reject(new \RuntimeException()); + $stream = Stream\unwrapReadable($promise); + + $stream->pause(); + } + public function testForwardsResumeToInputStream() { $input = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock(); @@ -211,6 +235,18 @@ public function testForwardsResumeToInputStream() $stream->resume(); } + /** + * @doesNotPerformAssertions + */ + public function testResumeAfterCloseHasNoEffect() + { + $promise = new \React\Promise\Promise(function () { }); + $stream = Stream\unwrapReadable($promise); + + $stream->close(); + $stream->resume(); + } + public function testPipingStreamWillForwardDataEvents() { $input = new ThroughStream(); @@ -279,4 +315,31 @@ public function testClosingStreamWillCloseStreamFromCancellationHandler() $this->assertFalse($input->isReadable()); } + + public function testCloseShouldRemoveAllListenersAfterCloseEvent() + { + $promise = new \React\Promise\Promise(function () { }); + $stream = Stream\unwrapReadable($promise); + + $stream->on('close', $this->expectCallableOnce()); + $this->assertCount(1, $stream->listeners('close')); + + $stream->close(); + + $this->assertCount(0, $stream->listeners('close')); + } + + public function testCloseShouldRemoveReferenceToPromiseToAvoidGarbageReferences() + { + $promise = new \React\Promise\Promise(function () { }); + $stream = Stream\unwrapReadable($promise); + + $stream->close(); + + $ref = new \ReflectionProperty($stream, 'promise'); + $ref->setAccessible(true); + $value = $ref->getValue($stream); + + $this->assertNull($value); + } } diff --git a/tests/UnwrapWritableTest.php b/tests/UnwrapWritableTest.php index ffa115a..3b02c31 100644 --- a/tests/UnwrapWritableTest.php +++ b/tests/UnwrapWritableTest.php @@ -352,4 +352,37 @@ public function testClosingStreamWillCloseStreamFromCancellationHandler() $this->assertFalse($input->isWritable()); } + + public function testCloseShouldRemoveAllListenersAfterCloseEvent() + { + $promise = new \React\Promise\Promise(function () { }); + $stream = Stream\unwrapWritable($promise); + + $stream->on('close', $this->expectCallableOnce()); + $this->assertCount(1, $stream->listeners('close')); + + $stream->close(); + + $this->assertCount(0, $stream->listeners('close')); + } + + public function testCloseShouldRemoveReferenceToPromiseAndStreamToAvoidGarbageReferences() + { + $promise = \React\Promise\resolve(new ThroughStream()); + $stream = Stream\unwrapWritable($promise); + + $stream->close(); + + $ref = new \ReflectionProperty($stream, 'promise'); + $ref->setAccessible(true); + $value = $ref->getValue($stream); + + $this->assertNull($value); + + $ref = new \ReflectionProperty($stream, 'stream'); + $ref->setAccessible(true); + $value = $ref->getValue($stream); + + $this->assertNull($value); + } }