Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions config/app.example.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
'workerLifetime' => 60, // 1 minutes
// Legacy: 'workermaxruntime' is deprecated but still supported

// optional random offset (0-N seconds) added per worker to stagger shutdowns in a fleet (0 = disabled)
'workerLifetimeJitter' => 0,

// seconds of running time after which the PHP process will terminate, null uses workerLifetime * 2
'workerPhpTimeout' => null,
// Legacy: 'workertimeout' is deprecated but still supported
Expand Down
8 changes: 8 additions & 0 deletions docs/sections/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ You may create a file called `app_queue.php` inside your `config` folder (NOT th
bin/cake queue run --max-runtime 300 # Run for 5 minutes
```

- Optional per-worker jitter (in seconds) added to the worker lifetime:

```php
$config['Queue']['workerLifetimeJitter'] = 30; // up to +30s random offset per worker
```

Each worker picks a random offset in `[0, workerLifetimeJitter]` at startup and adds it to its effective lifetime. Useful when many workers are spawned simultaneously (e.g. ECS/Kubernetes) to stagger shutdowns and avoid a thundering herd of concurrent restarts. Defaults to `0` (no jitter). If `workerLifetime` or the `--max-runtime` override is `0` (unlimited), this setting has no effect.

- Seconds of running time after which the PHP process of the worker will terminate:

```php
Expand Down
53 changes: 47 additions & 6 deletions src/Queue/Processor.php
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,11 @@ public function run(array $args): int {
$this->exit = false;

$startTime = time();
$jitterOffset = $this->computeLifetimeJitterOffset();
$maxRuntime = $this->resolveMaxRuntime($config['maxruntime'], $jitterOffset);

while (!$this->exit) {
$this->setPhpTimeout($config['maxruntime']);
$this->setPhpTimeout($maxRuntime);

try {
$this->updatePid($pid);
Expand Down Expand Up @@ -232,11 +234,6 @@ public function run(array $args): int {
sleep(Config::sleeptime());
}

$workerLifetime = Configure::read('Queue.workerLifetime') ?? Configure::read('Queue.workermaxruntime');
if ($workerLifetime === null && $config['maxruntime'] === null) {
throw new RuntimeException('Queue.workerLifetime (or deprecated workermaxruntime) config is required');
}
$maxRuntime = $config['maxruntime'] ?? (int)$workerLifetime;
// check if we are over the maximum runtime and end processing if so.
if ($maxRuntime > 0 && (time() - $startTime) >= $maxRuntime) {
$this->exit = true;
Expand Down Expand Up @@ -626,6 +623,50 @@ protected function setPhpTimeout(?int $maxruntime): void {
set_time_limit($timeLimit);
}

/**
* Compute the per-worker lifetime jitter offset in seconds.
*
* Returns a random integer in [0, Queue.workerLifetimeJitter]. Used to stagger
* worker shutdowns so a fleet spawned at the same moment does not all exit
* on the same tick (thundering herd).
*
* @return int
*/
protected function computeLifetimeJitterOffset(): int {
$jitter = (int)Configure::read('Queue.workerLifetimeJitter', 0);
if ($jitter <= 0) {
return 0;
}

return mt_rand(0, $jitter);
}

/**
* Resolve the effective worker runtime, applying jitter only to bounded workers.
*
* @param int|null $maxruntime Max runtime in seconds if set via CLI option.
* @param int $jitterOffset Per-worker random offset in seconds.
*
* @throws \RuntimeException
*
* @return int
*/
protected function resolveMaxRuntime(?int $maxruntime, int $jitterOffset): int {
$workerLifetime = Configure::read('Queue.workerLifetime') ?? Configure::read('Queue.workermaxruntime');
if ($workerLifetime === null && $maxruntime === null) {
throw new RuntimeException('Queue.workerLifetime (or deprecated workermaxruntime) config is required');
}

$resolvedMaxRuntime = $maxruntime ?? (int)$workerLifetime;
if ($resolvedMaxRuntime <= 0 || $jitterOffset <= 0) {
return (int)$resolvedMaxRuntime;
}

$this->io->out('Applying worker lifetime jitter: +' . $jitterOffset . ' seconds');

return (int)$resolvedMaxRuntime + $jitterOffset;
}

/**
* @param array<string, mixed> $args
*
Expand Down
69 changes: 69 additions & 0 deletions tests/TestCase/Queue/ProcessorTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,28 @@ public function testMemoryUsage() {
$this->assertMatchesRegularExpression('/^\d+MB/', $result, 'Should be e.g. `17MB` or `17MB/1GB` etc.');
}

/**
* @return void
*/
public function testResolveMaxRuntimeAppliesJitterToBoundedWorkers() {
$this->Processor = new Processor(new Io(new ConsoleIo()), new NullLogger());

$result = $this->invokeMethod($this->Processor, 'resolveMaxRuntime', [30, 7]);

$this->assertSame(37, $result);
}

/**
* @return void
*/
public function testResolveMaxRuntimeDoesNotApplyJitterToUnlimitedWorkers() {
$this->Processor = new Processor(new Io(new ConsoleIo()), new NullLogger());

$result = $this->invokeMethod($this->Processor, 'resolveMaxRuntime', [0, 7]);

$this->assertSame(0, $result);
}

/**
* @return void
*/
Expand Down Expand Up @@ -442,4 +464,51 @@ public function testSetPhpTimeoutWithDeprecatedConfig() {
Configure::delete('Queue.workertimeout');
}

/**
* @return void
*/
public function testComputeLifetimeJitterOffsetDefaultsToZero() {
$processor = new Processor(new Io(new ConsoleIo()), new NullLogger());

Configure::delete('Queue.workerLifetimeJitter');
$result = $this->invokeMethod($processor, 'computeLifetimeJitterOffset');
$this->assertSame(0, $result);

Configure::write('Queue.workerLifetimeJitter', 0);
$result = $this->invokeMethod($processor, 'computeLifetimeJitterOffset');
$this->assertSame(0, $result);

Configure::delete('Queue.workerLifetimeJitter');
}

/**
* @return void
*/
public function testComputeLifetimeJitterOffsetWithinBounds() {
$processor = new Processor(new Io(new ConsoleIo()), new NullLogger());

Configure::write('Queue.workerLifetimeJitter', 15);
for ($i = 0; $i < 50; $i++) {
$result = $this->invokeMethod($processor, 'computeLifetimeJitterOffset');
$this->assertIsInt($result);
$this->assertGreaterThanOrEqual(0, $result);
$this->assertLessThanOrEqual(15, $result);
}

Configure::delete('Queue.workerLifetimeJitter');
}

/**
* @return void
*/
public function testComputeLifetimeJitterOffsetIgnoresNegative() {
$processor = new Processor(new Io(new ConsoleIo()), new NullLogger());

Configure::write('Queue.workerLifetimeJitter', -10);
$result = $this->invokeMethod($processor, 'computeLifetimeJitterOffset');
$this->assertSame(0, $result);

Configure::delete('Queue.workerLifetimeJitter');
}

}
Loading