diff --git a/.travis.yml b/.travis.yml index 5789f69..ff544ca 100644 --- a/.travis.yml +++ b/.travis.yml @@ -8,7 +8,7 @@ php: - 7.0 - 7.1 - 7.2 - - hhvm + - hhvm # ignore errors, see below # lock distro so new future defaults will not break the build dist: trusty @@ -17,6 +17,8 @@ matrix: include: - php: 5.3 dist: precise + allow_failures: + - php: hhvm sudo: false diff --git a/src/Queue.php b/src/Queue.php index a5fde79..350b1b8 100644 --- a/src/Queue.php +++ b/src/Queue.php @@ -3,6 +3,7 @@ namespace Clue\React\Mq; use React\Promise; +use React\Promise\CancellablePromiseInterface; use React\Promise\Deferred; use React\Promise\PromiseInterface; @@ -134,30 +135,26 @@ public function __invoke() end($queue); $id = key($queue); - $deferred = new Deferred(function ($_, $reject) use (&$queue, $id) { - // queued promise cancelled before its handler is invoked - // remove from queue and reject explicitly - unset($queue[$id]); - $reject(new \RuntimeException('Cancelled queued job before processing started')); + $deferred = new Deferred(function ($_, $reject) use (&$queue, $id, &$deferred) { + // forward cancellation to pending operation if it is currently executing + if (isset($deferred->pending) && $deferred->pending instanceof CancellablePromiseInterface) { + $deferred->pending->cancel(); + } + unset($deferred->pending); + + if (isset($deferred->args)) { + // queued promise cancelled before its handler is invoked + // remove from queue and reject explicitly + unset($queue[$id], $deferred->args); + $reject(new \RuntimeException('Cancelled queued job before processing started')); + } }); // queue job to process if number of pending jobs is below concurrency limit again + $deferred->args = func_get_args(); $queue[$id] = $deferred; - // once number of pending jobs is below concurrency limit again: - // await this situation, invoke handler and await its resolution before invoking next queued job - $handler = $this->handler; - $args = func_get_args(); - $that = $this; - $pending =& $this->pending; - return $deferred->promise()->then(function () use ($handler, $args, $that, &$pending) { - ++$pending; - - // invoke handler and await its resolution before invoking next queued job - return $that->await( - call_user_func_array($handler, $args) - ); - }); + return $deferred->promise(); } public function count() @@ -193,9 +190,28 @@ public function processQueue() return; } - $first = reset($this->queue); + /* @var $deferred Deferred */ + $deferred = reset($this->queue); unset($this->queue[key($this->queue)]); - $first->resolve(); + // once number of pending jobs is below concurrency limit again: + // await this situation, invoke handler and await its resolution before invoking next queued job + ++$this->pending; + + $promise = call_user_func_array($this->handler, $deferred->args); + $deferred->pending = $promise; + unset($deferred->args); + + // invoke handler and await its resolution before invoking next queued job + $this->await($promise)->then( + function ($result) use ($deferred) { + unset($deferred->pending); + $deferred->resolve($result); + }, + function ($e) use ($deferred) { + unset($deferred->pending); + $deferred->reject($e); + } + ); } } diff --git a/tests/QueueTest.php b/tests/QueueTest.php index 6daea81..669ccfe 100644 --- a/tests/QueueTest.php +++ b/tests/QueueTest.php @@ -211,4 +211,73 @@ public function testCancelPendingWillRejectPromiseAndRemoveJobFromQueue() $second->then(null, $this->expectCallableOnce()); $this->assertCount(1, $q); } + + public function testCancelPendingOperationThatWasPreviousQueuedShouldInvokeItsCancellationHandler() + { + $q = new Queue(1, null, function ($promise) { + return $promise; + }); + + $deferred = new Deferred(); + $first = $q($deferred->promise()); + + $second = $q(new Promise(function () { }, $this->expectCallableOnce())); + + $deferred->resolve(); + $second->cancel(); + } + + public function testCancelPendingOperationThatWasPreviouslyQueuedShouldRejectWithCancellationResult() + { + $q = new Queue(1, null, function ($promise) { + return $promise; + }); + + $deferred = new Deferred(); + $first = $q($deferred->promise()); + + $second = $q(new Promise(function () { }, function () { throw new \BadMethodCallException(); })); + + $deferred->resolve(); + $second->cancel(); + + $second->then(null, $this->expectCallableOnceWith($this->isInstanceOf('BadMethodCallException'))); + } + + public function testCancelPendingOperationThatWasPreviouslyQueuedShouldNotRejectIfCancellationHandlerDoesNotReject() + { + $q = new Queue(1, null, function ($promise) { + return $promise; + }); + + $deferred = new Deferred(); + $first = $q($deferred->promise()); + + $second = $q(new Promise(function () { }, function () { })); + + $deferred->resolve(); + $second->cancel(); + + $second->then($this->expectCallableNever(), $this->expectCallableNever()); + } + + public function testCancelNextOperationFromFirstOperationShouldInvokeCancellationHandler() + { + $q = new Queue(1, null, function () { + return new Promise(function () { }, function () { + throw new \RuntimeException(); + }); + }); + + $first = $q(); + $second = $q(); + + $first->then(null, function () use ($second) { + $second->cancel(); + }); + + $first->cancel(); + + $second->then(null, $this->expectCallableOnce()); + } }