diff --git a/config/Migrations/20260508000000_MigrationQueueProcessesDropPidUniqueIndex.php b/config/Migrations/20260508000000_MigrationQueueProcessesDropPidUniqueIndex.php new file mode 100644 index 00000000..310bbb44 --- /dev/null +++ b/config/Migrations/20260508000000_MigrationQueueProcessesDropPidUniqueIndex.php @@ -0,0 +1,28 @@ +table('queue_processes') + ->removeIndexByName('pid') + ->addIndex(['pid', 'server'], ['name' => 'pid_server']) + ->save(); + } + + /** + * @return void + */ + public function down(): void { + $this->table('queue_processes') + ->removeIndexByName('pid_server') + ->addIndex(['pid', 'server'], ['name' => 'pid', 'unique' => true]) + ->save(); + } + +} diff --git a/src/Model/Table/QueueProcessesTable.php b/src/Model/Table/QueueProcessesTable.php index e6fbe578..361c9785 100644 --- a/src/Model/Table/QueueProcessesTable.php +++ b/src/Model/Table/QueueProcessesTable.php @@ -161,22 +161,22 @@ public function add(string $pid, string $key): int { } /** - * @param string $pid + * Heartbeat update for the worker identified by `$workerkey`. Workerkey + * is the per-process random hash assigned at startup; PID alone is not + * a stable worker identity (the OS recycles low PIDs across container + * restarts), so all heartbeat / removal paths look up by workerkey. + * + * @param string $workerkey * * @throws \Queue\Model\ProcessEndingException * * @return void */ - public function update(string $pid): void { - $conditions = [ - 'pid' => $pid, - 'server IS' => $this->buildServerString(), - ]; - + public function update(string $workerkey): void { /** @var \Queue\Model\Entity\QueueProcess $queueProcess */ - $queueProcess = $this->find()->where($conditions)->firstOrFail(); + $queueProcess = $this->find()->where(['workerkey' => $workerkey])->firstOrFail(); if ($queueProcess->terminate) { - throw new ProcessEndingException('PID terminated: ' . $pid); + throw new ProcessEndingException('Worker terminated: ' . $workerkey); } $queueProcess->modified = new DateTime(); @@ -184,17 +184,12 @@ public function update(string $pid): void { } /** - * @param string $pid + * @param string $workerkey * * @return void */ - public function remove(string $pid): void { - $conditions = [ - 'pid' => $pid, - 'server IS' => $this->buildServerString(), - ]; - - $this->deleteAll($conditions); + public function remove(string $workerkey): void { + $this->deleteAll(['workerkey' => $workerkey]); } /** @@ -276,7 +271,12 @@ public function getProcesses(bool $forThisServer = false): array { } /** - * Soft ending of a running job, e.g. when migration is starting + * Soft ending of a running job, e.g. when migration is starting. + * + * If multiple rows share this PID (possible since the unique + * `(pid, server)` index was dropped), the most recently heartbeated + * row is chosen — that is the live worker; older rows are stale + * leftovers that will be cleaned up separately. * * @param string $pid * @@ -288,7 +288,10 @@ public function endProcess(string $pid): void { } /** @var \Queue\Model\Entity\QueueProcess $queuedProcess */ - $queuedProcess = $this->find()->where(['pid' => $pid])->firstOrFail(); + $queuedProcess = $this->find() + ->where(['pid' => $pid]) + ->orderByDesc('modified') + ->firstOrFail(); $queuedProcess->terminate = true; $this->saveOrFail($queuedProcess); } diff --git a/src/Queue/Processor.php b/src/Queue/Processor.php index ead711e5..321405e4 100644 --- a/src/Queue/Processor.php +++ b/src/Queue/Processor.php @@ -72,6 +72,14 @@ class Processor { */ protected ?string $pid = null; + /** + * Random per-process identifier. Stable across the worker's lifetime; + * unlike `pid`, never reused. Used for heartbeat and shutdown lookups. + * + * @var string|null + */ + protected ?string $workerkey = null; + /** * @var \Queue\Model\Table\QueuedJobsTable */ @@ -507,6 +515,7 @@ protected function initPid(): string { $this->QueueProcesses->add($pid, $key); $this->pid = $pid; + $this->workerkey = $key; return $pid; } @@ -525,12 +534,16 @@ protected function retrievePid(): string { } /** - * @param string $pid + * @param string $pid Kept for log context; the actual lookup uses + * the per-process workerkey since PIDs are not unique identities. * * @return void */ protected function updatePid(string $pid): void { - $this->QueueProcesses->update($pid); + if ($this->workerkey === null) { + return; + } + $this->QueueProcesses->update($this->workerkey); } /** @@ -548,19 +561,18 @@ protected function memoryUsage(): string { } /** - * @param string|null $pid + * @param string|null $pid Kept for backwards-compatible signature; the + * actual lookup uses the per-process workerkey since PIDs are not + * unique identities. * * @return void */ protected function deletePid(?string $pid): void { - if (!$pid) { - $pid = $this->pid; - } - if (!$pid) { + if ($this->workerkey === null) { return; } - $this->QueueProcesses->remove($pid); + $this->QueueProcesses->remove($this->workerkey); } /** diff --git a/tests/Fixture/QueueProcessesFixture.php b/tests/Fixture/QueueProcessesFixture.php index 2b90545d..12589586 100644 --- a/tests/Fixture/QueueProcessesFixture.php +++ b/tests/Fixture/QueueProcessesFixture.php @@ -25,7 +25,9 @@ class QueueProcessesFixture extends TestFixture { '_constraints' => [ 'primary' => ['type' => 'primary', 'columns' => ['id'], 'length' => []], 'workerkey' => ['type' => 'unique', 'columns' => ['workerkey'], 'length' => []], - 'pid' => ['type' => 'unique', 'columns' => ['pid', 'server'], 'length' => []], + ], + '_indexes' => [ + 'pid_server' => ['type' => 'index', 'columns' => ['pid', 'server']], ], '_options' => [ 'engine' => 'InnoDB', diff --git a/tests/TestCase/Model/Table/QueueProcessesTableTest.php b/tests/TestCase/Model/Table/QueueProcessesTableTest.php index 09cf1d9f..ba367f73 100644 --- a/tests/TestCase/Model/Table/QueueProcessesTableTest.php +++ b/tests/TestCase/Model/Table/QueueProcessesTableTest.php @@ -97,11 +97,11 @@ public function testAddMaxCount() { * @return void */ public function testUpdate() { - $pid = '123'; - $id = $this->QueueProcesses->add($pid, '456'); + $workerkey = '456'; + $id = $this->QueueProcesses->add('123', $workerkey); $this->assertNotEmpty($id); - $this->QueueProcesses->update($pid); + $this->QueueProcesses->update($workerkey); $queueProcess = $this->QueueProcesses->get($id); $this->assertFalse($queueProcess->terminate); @@ -111,16 +111,37 @@ public function testUpdate() { * @return void */ public function testRemove() { - $pid = '123'; - $queueProcessId = $this->QueueProcesses->add($pid, '456'); + $workerkey = '456'; + $queueProcessId = $this->QueueProcesses->add('123', $workerkey); $this->assertNotEmpty($queueProcessId); - $this->QueueProcesses->remove($pid); + $this->QueueProcesses->remove($workerkey); $result = $this->QueueProcesses->find()->where(['id' => $queueProcessId])->first(); $this->assertNull($result); } + /** + * After dropping the unique `(pid, server)` index, two rows can coexist + * with the same PID on the same server. Mostly happens transiently after + * a container restart with PID reuse: one stale row + the new live one. + * `workerkey` remains the canonical identity (still uniquely indexed). + * + * @return void + */ + public function testAddAllowsDuplicatePidServer() { + Configure::write('Queue.maxworkers', 5); + $this->QueueProcesses->deleteAll(['1=1']); + + $firstId = $this->QueueProcesses->add('8', 'first-workerkey'); + $secondId = $this->QueueProcesses->add('8', 'second-workerkey'); + + $this->assertNotEmpty($firstId); + $this->assertNotEmpty($secondId); + $this->assertNotSame($firstId, $secondId); + $this->assertSame(2, $this->QueueProcesses->find()->where(['pid' => '8'])->count()); + } + /** * @return void */