Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion src/Model/Table/QueueProcessesTable.php
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,24 @@ public function findActive(): SelectQuery {
* @return int
*/
public function add(string $pid, string $key): int {
$server = $this->buildServerString();

// Evict any stale row holding our (pid, server) slot. Common after
// container restarts: the killed worker's row survives, and the
// unique index on (pid, server) would otherwise reject the insert.
// Only rows whose heartbeat is older than `staleHeartbeatThreshold`
// are removed, so a live worker on the same PID is never disturbed.
$staleThreshold = (new DateTime())->subSeconds(Config::staleHeartbeatThreshold());
$this->deleteAll([
'pid' => $pid,
'server IS' => $server,
'workerkey !=' => $key,
'modified <' => $staleThreshold,
]);

$data = [
'pid' => $pid,
'server' => $this->buildServerString(),
'server' => $server,
'workerkey' => $key,
];

Expand Down
16 changes: 16 additions & 0 deletions src/Queue/Config.php
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,22 @@ public static function sleeptime(): int {
return Configure::read('Queue.sleeptime', 10);
}

/**
* Threshold in seconds after which a queue_processes row whose `modified`
* timestamp is older is considered stale by a starting worker. Workers
* heartbeat (refresh `modified`) on every loop iteration, so a row not
* refreshed in ~90s almost certainly belongs to a dead worker — typically
* a container that was force-restarted. This is intentionally much shorter
* than `defaultRequeueTimeout` (which governs in-flight job requeueing).
*
* @return int
*/
public static function staleHeartbeatThreshold(): int {
$threshold = Configure::read('Queue.staleHeartbeatThreshold');

return $threshold ?? 90;
}

/**
* @return int
*/
Expand Down
68 changes: 68 additions & 0 deletions tests/TestCase/Model/Table/QueueProcessesTableTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
namespace Queue\Test\TestCase\Model\Table;

use Cake\Core\Configure;
use Cake\Database\Exception\QueryException;
use Cake\I18n\DateTime;
use Cake\ORM\Exception\PersistenceFailedException;
use Cake\ORM\TableRegistry;
Expand Down Expand Up @@ -260,4 +261,71 @@ public function testCleanEndedProcessesForceRemovesAllRows() {
$this->assertSame(0, $this->QueueProcesses->find()->count());
}

/**
* Container restart scenario: a worker died without cleaning up its
* queue_processes row. A new worker starts with the same recycled PID.
* The stale row's heartbeat is older than `staleHeartbeatThreshold`, so
* `add()` evicts it before inserting — no duplicate-key crash.
*
* @return void
*/
public function testAddEvictsStaleRowOnSamePidServer() {
Configure::write('Queue.maxworkers', 5);
$this->QueueProcesses->deleteAll(['1=1']);

$staleId = $this->QueueProcesses->add('8', 'old-workerkey');
$this->QueueProcesses->updateAll(
['modified' => (new DateTime())->subSeconds(180)->toDateTimeString()],
['id' => $staleId],
);

$newId = $this->QueueProcesses->add('8', 'new-workerkey');
$this->assertNotEmpty($newId);
$this->assertNotSame($staleId, $newId);

$survivors = $this->QueueProcesses->find()->where(['pid' => '8'])->all()->toArray();
$this->assertCount(1, $survivors);
$this->assertSame('new-workerkey', $survivors[0]->workerkey);
}

/**
* If the existing row is fresh (recent heartbeat), it represents a real,
* live worker — eviction would be wrong. The duplicate-key error is the
* correct outcome. `Queue.maxworkers` is raised so `validateCount` cannot
* mask the real source of failure: the (pid, server) unique index fires
* at the DB level as a `QueryException`, not validation.
*
* @return void
*/
public function testAddDoesNotEvictRecentRowOnSamePidServer() {
Configure::write('Queue.maxworkers', 5);
$this->QueueProcesses->deleteAll(['1=1']);
$this->QueueProcesses->add('8', 'first-workerkey');

$this->expectException(QueryException::class);
$this->QueueProcesses->add('8', 'second-workerkey');
}

/**
* Operators can tune the threshold for unusual heartbeat intervals.
*
* @return void
*/
public function testAddRespectsCustomStaleHeartbeatThreshold() {
Configure::write('Queue.maxworkers', 5);
Configure::write('Queue.staleHeartbeatThreshold', 30);
$this->QueueProcesses->deleteAll(['1=1']);

$staleId = $this->QueueProcesses->add('8', 'old-workerkey');
$this->QueueProcesses->updateAll(
['modified' => (new DateTime())->subSeconds(60)->toDateTimeString()],
['id' => $staleId],
);

$newId = $this->QueueProcesses->add('8', 'new-workerkey');
$this->assertNotEmpty($newId);

Configure::delete('Queue.staleHeartbeatThreshold');
}

}
Loading