From 9b028e30ae985a2e010cd6b4b527fae644c25514 Mon Sep 17 00:00:00 2001 From: mscherer Date: Fri, 8 May 2026 16:52:46 +0200 Subject: [PATCH] Auto-evict stale queue_processes rows on worker startup Adds Queue.staleHeartbeatThreshold config (default 90s). QueueProcessesTable::add() now evicts a stale (pid, server) row whose modified is older than the threshold *before* attempting the insert. Fixes the crash loop seen on container restarts (FrankenPHP / Docker), where killed workers leave orphan rows and the new worker - getting the same recycled PID - fails the unique-index insert with a QueryException that the existing PersistenceFailedException catch in Processor::run() doesn't handle. The threshold is intentionally separate from defaultRequeueTimeout (default 10min): that one governs in-flight job requeueing and is deliberately long; workers heartbeat far more often, so a row not refreshed in 90s almost certainly belongs to a dead worker. --- src/Model/Table/QueueProcessesTable.php | 17 ++++- src/Queue/Config.php | 16 +++++ .../Model/Table/QueueProcessesTableTest.php | 68 +++++++++++++++++++ 3 files changed, 100 insertions(+), 1 deletion(-) diff --git a/src/Model/Table/QueueProcessesTable.php b/src/Model/Table/QueueProcessesTable.php index 3a0b19ce..68cf1e39 100644 --- a/src/Model/Table/QueueProcessesTable.php +++ b/src/Model/Table/QueueProcessesTable.php @@ -148,9 +148,24 @@ public function findActive(): SelectQuery { * @return int */ public function add(string $pid, string $key): int { + $server = $this->buildServerString(); + + // Evict any stale row holding our (pid, server) slot. Common after + // container restarts: the killed worker's row survives, and the + // unique index on (pid, server) would otherwise reject the insert. + // Only rows whose heartbeat is older than `staleHeartbeatThreshold` + // are removed, so a live worker on the same PID is never disturbed. + $staleThreshold = (new DateTime())->subSeconds(Config::staleHeartbeatThreshold()); + $this->deleteAll([ + 'pid' => $pid, + 'server IS' => $server, + 'workerkey !=' => $key, + 'modified <' => $staleThreshold, + ]); + $data = [ 'pid' => $pid, - 'server' => $this->buildServerString(), + 'server' => $server, 'workerkey' => $key, ]; diff --git a/src/Queue/Config.php b/src/Queue/Config.php index 3bc3d32c..baa4a2c5 100644 --- a/src/Queue/Config.php +++ b/src/Queue/Config.php @@ -74,6 +74,22 @@ public static function sleeptime(): int { return Configure::read('Queue.sleeptime', 10); } + /** + * Threshold in seconds after which a queue_processes row whose `modified` + * timestamp is older is considered stale by a starting worker. Workers + * heartbeat (refresh `modified`) on every loop iteration, so a row not + * refreshed in ~90s almost certainly belongs to a dead worker — typically + * a container that was force-restarted. This is intentionally much shorter + * than `defaultRequeueTimeout` (which governs in-flight job requeueing). + * + * @return int + */ + public static function staleHeartbeatThreshold(): int { + $threshold = Configure::read('Queue.staleHeartbeatThreshold'); + + return $threshold ?? 90; + } + /** * @return int */ diff --git a/tests/TestCase/Model/Table/QueueProcessesTableTest.php b/tests/TestCase/Model/Table/QueueProcessesTableTest.php index 75818581..fe2bd89b 100644 --- a/tests/TestCase/Model/Table/QueueProcessesTableTest.php +++ b/tests/TestCase/Model/Table/QueueProcessesTableTest.php @@ -4,6 +4,7 @@ namespace Queue\Test\TestCase\Model\Table; use Cake\Core\Configure; +use Cake\Database\Exception\QueryException; use Cake\I18n\DateTime; use Cake\ORM\Exception\PersistenceFailedException; use Cake\ORM\TableRegistry; @@ -238,4 +239,71 @@ public function testStaleRowsCountTowardMaxWorkersUntilCleaned() { $this->assertNotEmpty($id); } + /** + * Container restart scenario: a worker died without cleaning up its + * queue_processes row. A new worker starts with the same recycled PID. + * The stale row's heartbeat is older than `staleHeartbeatThreshold`, so + * `add()` evicts it before inserting — no duplicate-key crash. + * + * @return void + */ + public function testAddEvictsStaleRowOnSamePidServer() { + Configure::write('Queue.maxworkers', 5); + $this->QueueProcesses->deleteAll(['1=1']); + + $staleId = $this->QueueProcesses->add('8', 'old-workerkey'); + $this->QueueProcesses->updateAll( + ['modified' => (new DateTime())->subSeconds(180)->toDateTimeString()], + ['id' => $staleId], + ); + + $newId = $this->QueueProcesses->add('8', 'new-workerkey'); + $this->assertNotEmpty($newId); + $this->assertNotSame($staleId, $newId); + + $survivors = $this->QueueProcesses->find()->where(['pid' => '8'])->all()->toArray(); + $this->assertCount(1, $survivors); + $this->assertSame('new-workerkey', $survivors[0]->workerkey); + } + + /** + * If the existing row is fresh (recent heartbeat), it represents a real, + * live worker — eviction would be wrong. The duplicate-key error is the + * correct outcome. `Queue.maxworkers` is raised so `validateCount` cannot + * mask the real source of failure: the (pid, server) unique index fires + * at the DB level as a `QueryException`, not validation. + * + * @return void + */ + public function testAddDoesNotEvictRecentRowOnSamePidServer() { + Configure::write('Queue.maxworkers', 5); + $this->QueueProcesses->deleteAll(['1=1']); + $this->QueueProcesses->add('8', 'first-workerkey'); + + $this->expectException(QueryException::class); + $this->QueueProcesses->add('8', 'second-workerkey'); + } + + /** + * Operators can tune the threshold for unusual heartbeat intervals. + * + * @return void + */ + public function testAddRespectsCustomStaleHeartbeatThreshold() { + Configure::write('Queue.maxworkers', 5); + Configure::write('Queue.staleHeartbeatThreshold', 30); + $this->QueueProcesses->deleteAll(['1=1']); + + $staleId = $this->QueueProcesses->add('8', 'old-workerkey'); + $this->QueueProcesses->updateAll( + ['modified' => (new DateTime())->subSeconds(60)->toDateTimeString()], + ['id' => $staleId], + ); + + $newId = $this->QueueProcesses->add('8', 'new-workerkey'); + $this->assertNotEmpty($newId); + + Configure::delete('Queue.staleHeartbeatThreshold'); + } + }