diff --git a/README.md b/README.md index e3affe9..2474138 100644 --- a/README.md +++ b/README.md @@ -39,6 +39,7 @@ much any API that already uses Promises. * [Cancellation](#cancellation) * [Timeout](#timeout) * [all()](#all) + * [any()](#any) * [Blocking](#blocking) * [Install](#install) * [Tests](#tests) @@ -332,6 +333,80 @@ $promise = Queue::all(10, $jobs, array($browser, 'get')); > Keep in mind that returning an array of response messages means that the whole response body has to be kept in memory. +#### any() + +The static `any(int $concurrency, array $jobs, callable $handler): PromiseInterface` method can be used to +concurrently process given jobs through the given `$handler` and resolve +with first resolution value. + +This is a convenience method which uses the `Queue` internally to +schedule all jobs while limiting concurrency to ensure no more than +`$concurrency` jobs ever run at once. It will return a promise which +resolves with the result of the first job on success and will then try +to `cancel()` all outstanding jobs. + +```php +$loop = React\EventLoop\Factory::create(); +$browser = new Clue\React\Buzz\Browser($loop); + +$promise = Queue::any(3, $urls, function ($url) use ($browser) { + return $browser->get($url); +}); + +$promise->then(function (ResponseInterface $response) { + echo 'First response: ' . $response->getBody() . PHP_EOL; +}); +``` + +If all of the jobs fail, it will reject the resulting promise. Similarly, +calling `cancel()` on the resulting promise will try to cancel all +outstanding jobs. See [promises](#promises) and +[cancellation](#cancellation) for details. + +The `$concurrency` parameter sets a new soft limit for the maximum number +of jobs to handle concurrently. Finding a good concurrency limit depends +on your particular use case. It's common to limit concurrency to a rather +small value, as doing more than a dozen of things at once may easily +overwhelm the receiving side. Using a `1` value will ensure that all jobs +are processed one after another, effectively creating a "waterfall" of +jobs. Using a value less than 1 will reject with an +`InvalidArgumentException` without processing any jobs. + +```php +// handle up to 10 jobs concurrently +$promise = Queue::any(10, $jobs, $handler); +``` + +```php +// handle each job after another without concurrency (waterfall) +$promise = Queue::any(1, $jobs, $handler); +``` + +The `$jobs` parameter must be an array with all jobs to process. Each +value in this array will be passed to the `$handler` to start one job. +The array keys have no effect, the promise will simply resolve with the +job results of the first successful job as returned by the `$handler`. +If this array is empty, this method will reject without processing any +jobs. + +The `$handler` parameter must be a valid callable that accepts your job +parameters, invokes the appropriate operation and returns a Promise as a +placeholder for its future result. If the given argument is not a valid +callable, this method will reject with an `InvalidArgumentExceptionn` +without processing any jobs. + +```php +// using a Closure as handler is usually recommended +$promise = Queue::any(10, $jobs, function ($url) use ($browser) { + return $browser->get($url); +}); +``` + +```php +// accepts any callable, so PHP's array notation is also supported +$promise = Queue::any(10, $jobs, array($browser, 'get')); +``` + #### Blocking As stated above, this library provides you a powerful, async API by default. diff --git a/composer.json b/composer.json index c108c5f..30bceae 100644 --- a/composer.json +++ b/composer.json @@ -22,7 +22,7 @@ }, "require-dev": { "clue/block-react": "^1.0", - "clue/buzz-react": "^2.0", + "clue/buzz-react": "^2.4", "phpunit/phpunit": "^7.0 || ^6.4 || ^5.7 || ^4.8.35", "react/event-loop": "^1.0 || ^0.5 || ^0.4 || ^0.3" } diff --git a/examples/03-http-any.php b/examples/03-http-any.php new file mode 100644 index 0000000..6259c1a --- /dev/null +++ b/examples/03-http-any.php @@ -0,0 +1,44 @@ +get($url)->then( + function (ResponseInterface $response) use ($url) { + // return only the URL for the first successful response + return $url; + } + ); +}); + +$promise->then( + function ($url) { + echo 'First successful URL is ' . $url . PHP_EOL; + }, + function ($e) { + echo 'An error occured: ' . $e->getMessage() . PHP_EOL; + } +); + +$loop->run(); diff --git a/src/Queue.php b/src/Queue.php index 700ea2d..d9882dd 100644 --- a/src/Queue.php +++ b/src/Queue.php @@ -143,6 +143,124 @@ public static function all($concurrency, array $jobs, $handler) }); } + /** + * Concurrently process given jobs through the given `$handler` and resolve + * with first resolution value. + * + * This is a convenience method which uses the `Queue` internally to + * schedule all jobs while limiting concurrency to ensure no more than + * `$concurrency` jobs ever run at once. It will return a promise which + * resolves with the result of the first job on success and will then try + * to `cancel()` all outstanding jobs. + * + * ```php + * $loop = React\EventLoop\Factory::create(); + * $browser = new Clue\React\Buzz\Browser($loop); + * + * $promise = Queue::any(3, $urls, function ($url) use ($browser) { + * return $browser->get($url); + * }); + * + * $promise->then(function (ResponseInterface $response) { + * echo 'First response: ' . $response->getBody() . PHP_EOL; + * }); + * ``` + * + * If all of the jobs fail, it will reject the resulting promise. Similarly, + * calling `cancel()` on the resulting promise will try to cancel all + * outstanding jobs. See [promises](#promises) and + * [cancellation](#cancellation) for details. + * + * The `$concurrency` parameter sets a new soft limit for the maximum number + * of jobs to handle concurrently. Finding a good concurrency limit depends + * on your particular use case. It's common to limit concurrency to a rather + * small value, as doing more than a dozen of things at once may easily + * overwhelm the receiving side. Using a `1` value will ensure that all jobs + * are processed one after another, effectively creating a "waterfall" of + * jobs. Using a value less than 1 will reject with an + * `InvalidArgumentException` without processing any jobs. + * + * ```php + * // handle up to 10 jobs concurrently + * $promise = Queue::any(10, $jobs, $handler); + * ``` + * + * ```php + * // handle each job after another without concurrency (waterfall) + * $promise = Queue::any(1, $jobs, $handler); + * ``` + * + * The `$jobs` parameter must be an array with all jobs to process. Each + * value in this array will be passed to the `$handler` to start one job. + * The array keys have no effect, the promise will simply resolve with the + * job results of the first successful job as returned by the `$handler`. + * If this array is empty, this method will reject without processing any + * jobs. + * + * The `$handler` parameter must be a valid callable that accepts your job + * parameters, invokes the appropriate operation and returns a Promise as a + * placeholder for its future result. If the given argument is not a valid + * callable, this method will reject with an `InvalidArgumentExceptionn` + * without processing any jobs. + * + * ```php + * // using a Closure as handler is usually recommended + * $promise = Queue::any(10, $jobs, function ($url) use ($browser) { + * return $browser->get($url); + * }); + * ``` + * + * ```php + * // accepts any callable, so PHP's array notation is also supported + * $promise = Queue::any(10, $jobs, array($browser, 'get')); + * ``` + * + * @param int $concurrency concurrency soft limit + * @param array $jobs + * @param callable $handler + * @return PromiseInterface Returns a Promise which resolves with a single resolution value + * or rejects when all of the operations reject. + */ + public static function any($concurrency, array $jobs, $handler) + { + // explicitly reject with empty jobs (https://github.com/reactphp/promise/pull/34) + if (!$jobs) { + return Promise\reject(new \UnderflowException('No jobs given')); + } + + try { + // limit number of concurrent operations + $q = new self($concurrency, null, $handler); + } catch (\InvalidArgumentException $e) { + // reject if $concurrency or $handler is invalid + return Promise\reject($e); + } + + // try invoking all operations and automatically queue excessive ones + $promises = array_map($q, $jobs); + + return new Promise\Promise(function ($resolve, $reject) use ($promises) { + Promise\any($promises)->then(function ($result) use ($promises, $resolve) { + // cancel all pending promises if a single result is ready + foreach (array_reverse($promises) as $promise) { + if ($promise instanceof CancellablePromiseInterface) { + $promise->cancel(); + } + } + + // resolve with original resolution value + $resolve($result); + }, $reject); + }, function () use ($promises) { + // cancel all pending promises on cancellation + foreach (array_reverse($promises) as $promise) { + if ($promise instanceof CancellablePromiseInterface) { + $promise->cancel(); + } + } + }); + } + /** * Instantiates a new queue object. * diff --git a/tests/QueueAllTest.php b/tests/QueueAllTest.php index 091c773..541ecc4 100644 --- a/tests/QueueAllTest.php +++ b/tests/QueueAllTest.php @@ -74,8 +74,9 @@ public function testCancelResultingPromiseWillCancelPendingOperation() $promise->cancel(); } - public function testPendingOperationWillBeCancelledIfOneOperationRejects22222222222() + public function testPendingOperationWillBeStartedAndCancelledIfOneOperationRejects() { + // second operation will only be started to be cancelled immediately $first = new Deferred(); $second = new Promise(function () { }, $this->expectCallableOnce()); diff --git a/tests/QueueAnyTest.php b/tests/QueueAnyTest.php new file mode 100644 index 0000000..70f3f23 --- /dev/null +++ b/tests/QueueAnyTest.php @@ -0,0 +1,125 @@ +then(null, $this->expectCallableOnce()); + } + + public function testAnyRejectsIfHandlerIsInvalid() + { + Queue::any(1, array(1), 'foobar')->then(null, $this->expectCallableOnce()); + } + + public function testAnyWillRejectWithoutInvokingHandlerWhenJobsAreEmpty() + { + $promise = Queue::any(1, array(), $this->expectCallableNever()); + + $promise->then(null, $this->expectCallableOnce()); + } + + public function testWillResolveWithSingleValueIfHandlerResolves() + { + $promise = Queue::any(1, array(1), function ($arg) { + return \React\Promise\resolve($arg); + }); + + $promise->then($this->expectCallableOnceWith(1)); + } + + public function testWillResolveWithFirstValueIfAllHandlersResolve() + { + $promise = Queue::any(1, array(1, 2, 3), function ($arg) { + return \React\Promise\resolve($arg); + }); + + $promise->then($this->expectCallableOnceWith(1)); + } + + public function testWillRejectIfSingleReject() + { + $promise = Queue::any(1, array(1), function () { + return \React\Promise\reject(new \RuntimeException()); + }); + + $promise->then(null, $this->expectCallableOnce()); + } + + public function testWillRejectIfMoreHandlersReject() + { + $promise = Queue::any(1, array(1, 2), function () { + return \React\Promise\reject(new \RuntimeException()); + }); + + $promise->then(null, $this->expectCallableOnce()); + } + + public function testCancelResultingPromiseWillCancelPendingOperation() + { + $pending = new Promise(function () { }, $this->expectCallableOnce()); + + $promise = Queue::any(1, array(1), function () use ($pending) { + return $pending; + }); + + $promise->cancel(); + } + + public function testPendingOperationWillBeStartedAndCancelledIfFirstOperationResolves() + { + // second operation will only be started to be cancelled immediately + $first = new Deferred(); + $second = new Promise(function () { }, $this->expectCallableOnce()); + + $promise = Queue::any(1, array($first->promise(), $second), function ($promise) { + return $promise; + }); + + $first->resolve(1); + + $promise->then($this->expectCallableOnceWith(1)); + } + + public function testPendingOperationWillBeCancelledIfFirstOperationResolves() + { + $first = new Deferred(); + $second = new Promise(function () { }, $this->expectCallableOnce()); + + $promise = Queue::any(2, array($first->promise(), $second), function ($promise) { + return $promise; + }); + + $first->resolve(1); + + $promise->then($this->expectCallableOnceWith(1)); + } + + public function testQueuedOperationsWillStartAndCancelOneIfOneOperationResolves() + { + $first = new Deferred(); + $second = new Promise(function () { }, function () { + throw new \RuntimeException(); + }); + $third = new Promise(function () { }, $this->expectCallableOnce()); + $fourth = new Promise(function () { }, $this->expectCallableNever()); + + $started = 0; + $promise = Queue::any(2, array($first->promise(), $second, $third, $fourth), function ($promise) use (&$started) { + ++$started; + return $promise; + }); + + $first->resolve(1); + + $this->assertEquals(3, $started); + } +}