diff --git a/src/Command/WorkerCommand.php b/src/Command/WorkerCommand.php index 484a0874..f1ee3426 100644 --- a/src/Command/WorkerCommand.php +++ b/src/Command/WorkerCommand.php @@ -59,6 +59,12 @@ public function getOptionParser(): ConsoleOptionParser { 'help' => 'PID (Process/Worker ID)', 'required' => false, ]); + $parser->addOption('force', [ + 'short' => 'f', + 'help' => 'For `clean`: remove ALL queue_processes rows regardless of last heartbeat. ' + . 'Use after container restarts when PID reuse blocks new workers from registering.', + 'boolean' => true, + ]); $parser->setDescription( 'Display, end or kill running workers.', @@ -80,7 +86,7 @@ public function execute(Arguments $args, ConsoleIo $io) { $io->out('Actions are:'); $io->out('- end: Gracefully end a worker/process, use "all"/"server" for all'); $io->out('- kill: Kill a worker/process, use "all"/"server" for all'); - $io->out('- clean: '); + $io->out('- clean: Remove stale processes (use --force to wipe all)'); $io->out(); /** @var array<\Queue\Model\Entity\QueueProcess> $processes */ @@ -112,6 +118,10 @@ public function execute(Arguments $args, ConsoleIo $io) { $io->abort('Clean action does not have a 2nd argument.'); } + if ($action === 'clean') { + return $this->clean($io, (bool)$args->getOption('force')); + } + /** @phpstan-ignore-next-line */ return $this->$action($io, $pid); } @@ -191,10 +201,21 @@ protected function kill(ConsoleIo $io, string $pid): int { /** * @param \Cake\Console\ConsoleIo $io + * @param bool $force If true, ignores the heartbeat threshold and removes + * every queue_processes row. Recovery path for container restarts where + * recycled PIDs would otherwise collide with surviving rows. * * @return int */ - protected function clean(ConsoleIo $io): int { + protected function clean(ConsoleIo $io, bool $force = false): int { + if ($force) { + $io->out('Force-deleting ALL queue_processes rows.'); + $result = $this->QueueProcesses->cleanEndedProcesses(true); + $io->success('Deleted: ' . $result); + + return static::CODE_SUCCESS; + } + $timeout = Config::defaultworkertimeout(); if (!$timeout) { $io->abort('You disabled `defaultRequeueTimeout` in config. Aborting.'); diff --git a/src/Model/Table/QueueProcessesTable.php b/src/Model/Table/QueueProcessesTable.php index 3a0b19ce..e6fbe578 100644 --- a/src/Model/Table/QueueProcessesTable.php +++ b/src/Model/Table/QueueProcessesTable.php @@ -198,9 +198,17 @@ public function remove(string $pid): void { } /** + * @param bool $force If true, removes ALL rows regardless of heartbeat. + * Use as a recovery tool after container restarts where PIDs may be + * reused and stale rows would block new workers from registering. + * * @return int */ - public function cleanEndedProcesses(): int { + public function cleanEndedProcesses(bool $force = false): int { + if ($force) { + return $this->deleteAll(['1=1']); + } + $timeout = Config::defaultworkertimeout(); $thresholdTime = (new DateTime())->subSeconds($timeout); diff --git a/tests/TestCase/Model/Table/QueueProcessesTableTest.php b/tests/TestCase/Model/Table/QueueProcessesTableTest.php index 75818581..09cf1d9f 100644 --- a/tests/TestCase/Model/Table/QueueProcessesTableTest.php +++ b/tests/TestCase/Model/Table/QueueProcessesTableTest.php @@ -238,4 +238,26 @@ public function testStaleRowsCountTowardMaxWorkersUntilCleaned() { $this->assertNotEmpty($id); } + /** + * `--force` ignores the heartbeat threshold and wipes every row. Recovery + * path for container restarts where the killed worker's row survives with + * a recent `modified` timestamp, so the normal heartbeat-based cleanup + * considers it "still alive." + * + * @return void + */ + public function testCleanEndedProcessesForceRemovesAllRows() { + Configure::write('Queue.maxworkers', 5); + + $this->QueueProcesses->deleteAll(['1=1']); + $this->QueueProcesses->add('111', 'key-111'); + $this->QueueProcesses->add('222', 'key-222'); + $this->assertSame(2, $this->QueueProcesses->find()->count()); + + $deleted = $this->QueueProcesses->cleanEndedProcesses(true); + + $this->assertSame(2, $deleted); + $this->assertSame(0, $this->QueueProcesses->find()->count()); + } + }