diff --git a/src/Model/Table/QueueProcessesTable.php b/src/Model/Table/QueueProcessesTable.php index e6fbe578..8eb685f0 100644 --- a/src/Model/Table/QueueProcessesTable.php +++ b/src/Model/Table/QueueProcessesTable.php @@ -148,9 +148,24 @@ public function findActive(): SelectQuery { * @return int */ public function add(string $pid, string $key): int { + $server = $this->buildServerString(); + + // Evict any stale row holding our (pid, server) slot. Common after + // container restarts: the killed worker's row survives, and the + // unique index on (pid, server) would otherwise reject the insert. + // Only rows whose heartbeat is older than `staleHeartbeatThreshold` + // are removed, so a live worker on the same PID is never disturbed. + $staleThreshold = (new DateTime())->subSeconds(Config::staleHeartbeatThreshold()); + $this->deleteAll([ + 'pid' => $pid, + 'server IS' => $server, + 'workerkey !=' => $key, + 'modified <' => $staleThreshold, + ]); + $data = [ 'pid' => $pid, - 'server' => $this->buildServerString(), + 'server' => $server, 'workerkey' => $key, ]; diff --git a/src/Queue/Config.php b/src/Queue/Config.php index 3bc3d32c..baa4a2c5 100644 --- a/src/Queue/Config.php +++ b/src/Queue/Config.php @@ -74,6 +74,22 @@ public static function sleeptime(): int { return Configure::read('Queue.sleeptime', 10); } + /** + * Threshold in seconds after which a queue_processes row whose `modified` + * timestamp is older is considered stale by a starting worker. Workers + * heartbeat (refresh `modified`) on every loop iteration, so a row not + * refreshed in ~90s almost certainly belongs to a dead worker — typically + * a container that was force-restarted. This is intentionally much shorter + * than `defaultRequeueTimeout` (which governs in-flight job requeueing). + * + * @return int + */ + public static function staleHeartbeatThreshold(): int { + $threshold = Configure::read('Queue.staleHeartbeatThreshold'); + + return $threshold ?? 90; + } + /** * @return int */ diff --git a/tests/TestCase/Model/Table/QueueProcessesTableTest.php b/tests/TestCase/Model/Table/QueueProcessesTableTest.php index 09cf1d9f..b3b9adf0 100644 --- a/tests/TestCase/Model/Table/QueueProcessesTableTest.php +++ b/tests/TestCase/Model/Table/QueueProcessesTableTest.php @@ -4,6 +4,7 @@ namespace Queue\Test\TestCase\Model\Table; use Cake\Core\Configure; +use Cake\Database\Exception\QueryException; use Cake\I18n\DateTime; use Cake\ORM\Exception\PersistenceFailedException; use Cake\ORM\TableRegistry; @@ -260,4 +261,71 @@ public function testCleanEndedProcessesForceRemovesAllRows() { $this->assertSame(0, $this->QueueProcesses->find()->count()); } + /** + * Container restart scenario: a worker died without cleaning up its + * queue_processes row. A new worker starts with the same recycled PID. + * The stale row's heartbeat is older than `staleHeartbeatThreshold`, so + * `add()` evicts it before inserting — no duplicate-key crash. + * + * @return void + */ + public function testAddEvictsStaleRowOnSamePidServer() { + Configure::write('Queue.maxworkers', 5); + $this->QueueProcesses->deleteAll(['1=1']); + + $staleId = $this->QueueProcesses->add('8', 'old-workerkey'); + $this->QueueProcesses->updateAll( + ['modified' => (new DateTime())->subSeconds(180)->toDateTimeString()], + ['id' => $staleId], + ); + + $newId = $this->QueueProcesses->add('8', 'new-workerkey'); + $this->assertNotEmpty($newId); + $this->assertNotSame($staleId, $newId); + + $survivors = $this->QueueProcesses->find()->where(['pid' => '8'])->all()->toArray(); + $this->assertCount(1, $survivors); + $this->assertSame('new-workerkey', $survivors[0]->workerkey); + } + + /** + * If the existing row is fresh (recent heartbeat), it represents a real, + * live worker — eviction would be wrong. The duplicate-key error is the + * correct outcome. `Queue.maxworkers` is raised so `validateCount` cannot + * mask the real source of failure: the (pid, server) unique index fires + * at the DB level as a `QueryException`, not validation. + * + * @return void + */ + public function testAddDoesNotEvictRecentRowOnSamePidServer() { + Configure::write('Queue.maxworkers', 5); + $this->QueueProcesses->deleteAll(['1=1']); + $this->QueueProcesses->add('8', 'first-workerkey'); + + $this->expectException(QueryException::class); + $this->QueueProcesses->add('8', 'second-workerkey'); + } + + /** + * Operators can tune the threshold for unusual heartbeat intervals. + * + * @return void + */ + public function testAddRespectsCustomStaleHeartbeatThreshold() { + Configure::write('Queue.maxworkers', 5); + Configure::write('Queue.staleHeartbeatThreshold', 30); + $this->QueueProcesses->deleteAll(['1=1']); + + $staleId = $this->QueueProcesses->add('8', 'old-workerkey'); + $this->QueueProcesses->updateAll( + ['modified' => (new DateTime())->subSeconds(60)->toDateTimeString()], + ['id' => $staleId], + ); + + $newId = $this->QueueProcesses->add('8', 'new-workerkey'); + $this->assertNotEmpty($newId); + + Configure::delete('Queue.staleHeartbeatThreshold'); + } + }