diff --git a/README.md b/README.md index 770fc47..7647c22 100644 --- a/README.md +++ b/README.md @@ -167,6 +167,19 @@ If the given promise is already settled and does not resolve with an instance of `ReadableStreamInterface`, then you will not be able to receive the `error` event. +You can `close()` the resulting stream at any time, which will either try to +`cancel()` the pending promise or try to `close()` the underlying stream. + +```php +$promise = startDownloadStream($uri); + +$stream = Stream\unwrapReadable($promise); + +$loop->addTimer(2.0, function () use ($stream) { + $stream->close(); +}); +``` + ### unwrapWritable() The `unwrapWritable(PromiseInterface $promise)` function can be used to unwrap @@ -211,6 +224,19 @@ If the given promise is already settled and does not resolve with an instance of `WritableStreamInterface`, then you will not be able to receive the `error` event. +You can `close()` the resulting stream at any time, which will either try to +`cancel()` the pending promise or try to `close()` the underlying stream. + +```php +$promise = startUploadStream($uri); + +$stream = Stream\unwrapWritable($promise); + +$loop->addTimer(2.0, function () use ($stream) { + $stream->close(); +}); +``` + ## Install The recommended way to install this library is [through Composer](https://getcomposer.org). diff --git a/src/UnwrapReadableStream.php b/src/UnwrapReadableStream.php index edf8a04..5b93fc8 100644 --- a/src/UnwrapReadableStream.php +++ b/src/UnwrapReadableStream.php @@ -75,9 +75,11 @@ function (ReadableStreamInterface $stream) use ($out, &$closed) { return $stream; }, - function ($e) use ($out) { - $out->emit('error', array($e, $out)); - $out->close(); + function ($e) use ($out, &$closed) { + if (!$closed) { + $out->emit('error', array($e, $out)); + $out->close(); + } } ); } diff --git a/src/UnwrapWritableStream.php b/src/UnwrapWritableStream.php index acc7c73..d833f4c 100644 --- a/src/UnwrapWritableStream.php +++ b/src/UnwrapWritableStream.php @@ -88,9 +88,11 @@ function (WritableStreamInterface $stream) use ($out, &$store, &$buffer, &$endin return $stream; }, - function ($e) use ($out) { - $out->emit('error', array($e, $out)); - $out->close(); + function ($e) use ($out, &$closed) { + if (!$closed) { + $out->emit('error', array($e, $out)); + $out->close(); + } } ); } diff --git a/tests/UnwrapReadableTest.php b/tests/UnwrapReadableTest.php index c0b5846..a5b5268 100644 --- a/tests/UnwrapReadableTest.php +++ b/tests/UnwrapReadableTest.php @@ -31,12 +31,28 @@ public function testClosingStreamMakesItNotReadable() $stream->on('close', $this->expectCallableOnce()); $stream->on('end', $this->expectCallableNever()); + $stream->on('error', $this->expectCallableNever()); $stream->close(); $this->assertFalse($stream->isReadable()); } + public function testClosingRejectingStreamMakesItNotReadable() + { + $promise = Timer\reject(0.001, $this->loop); + $stream = Stream\unwrapReadable($promise); + + $stream->on('close', $this->expectCallableOnce()); + $stream->on('end', $this->expectCallableNever()); + $stream->on('error', $this->expectCallableNever()); + + $stream->close(); + $this->loop->run(); + + $this->assertFalse($stream->isReadable()); + } + public function testClosingStreamWillCancelInputPromiseAndMakeStreamNotReadable() { $promise = new \React\Promise\Promise(function () { }, $this->expectCallableOnce()); diff --git a/tests/UnwrapWritableTest.php b/tests/UnwrapWritableTest.php index e21500e..5a8d093 100644 --- a/tests/UnwrapWritableTest.php +++ b/tests/UnwrapWritableTest.php @@ -30,12 +30,27 @@ public function testClosingStreamMakesItNotWritable() $stream = Stream\unwrapWritable($promise); $stream->on('close', $this->expectCallableOnce()); + $stream->on('error', $this->expectCallableNever()); $stream->close(); $this->assertFalse($stream->isWritable()); } + public function testClosingRejectingStreamMakesItNotWritable() + { + $promise = Timer\reject(0.001, $this->loop); + $stream = Stream\unwrapWritable($promise); + + $stream->on('close', $this->expectCallableOnce()); + $stream->on('error', $this->expectCallableNever()); + + $stream->close(); + $this->loop->run(); + + $this->assertFalse($stream->isWritable()); + } + public function testClosingStreamWillCancelInputPromiseAndMakeStreamNotWritable() { $promise = new \React\Promise\Promise(function () { }, $this->expectCallableOnce()); @@ -247,6 +262,7 @@ public function testEmitsCloseOnlyOnceWhenClosingStreamMultipleTimes() $stream = Stream\unwrapWritable($promise); $stream->on('close', $this->expectCallableOnce()); + $stream->on('error', $this->expectCallableNever()); $stream->close(); $stream->close();