Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions src/Command/WorkerCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ public function getOptionParser(): ConsoleOptionParser {
'help' => 'PID (Process/Worker ID)',
'required' => false,
]);
$parser->addOption('force', [
'short' => 'f',
'help' => 'For `clean`: remove ALL queue_processes rows regardless of last heartbeat. '
. 'Use after container restarts when PID reuse blocks new workers from registering.',
'boolean' => true,
]);

$parser->setDescription(
'Display, end or kill running workers.',
Expand All @@ -80,7 +86,7 @@ public function execute(Arguments $args, ConsoleIo $io) {
$io->out('Actions are:');
$io->out('- end: Gracefully end a worker/process, use "all"/"server" for all');
$io->out('- kill: Kill a worker/process, use "all"/"server" for all');
$io->out('- clean: ');
$io->out('- clean: Remove stale processes (use --force to wipe all)');
$io->out();

/** @var array<\Queue\Model\Entity\QueueProcess> $processes */
Expand Down Expand Up @@ -112,6 +118,10 @@ public function execute(Arguments $args, ConsoleIo $io) {
$io->abort('Clean action does not have a 2nd argument.');
}

if ($action === 'clean') {
return $this->clean($io, (bool)$args->getOption('force'));
}

/** @phpstan-ignore-next-line */
return $this->$action($io, $pid);
}
Expand Down Expand Up @@ -191,10 +201,21 @@ protected function kill(ConsoleIo $io, string $pid): int {

/**
* @param \Cake\Console\ConsoleIo $io
* @param bool $force If true, ignores the heartbeat threshold and removes
* every queue_processes row. Recovery path for container restarts where
* recycled PIDs would otherwise collide with surviving rows.
*
* @return int
*/
protected function clean(ConsoleIo $io): int {
protected function clean(ConsoleIo $io, bool $force = false): int {
if ($force) {
$io->out('Force-deleting ALL queue_processes rows.');
$result = $this->QueueProcesses->cleanEndedProcesses(true);
$io->success('Deleted: ' . $result);

return static::CODE_SUCCESS;
}

$timeout = Config::defaultworkertimeout();
if (!$timeout) {
$io->abort('You disabled `defaultRequeueTimeout` in config. Aborting.');
Expand Down
10 changes: 9 additions & 1 deletion src/Model/Table/QueueProcessesTable.php
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,17 @@ public function remove(string $pid): void {
}

/**
* @param bool $force If true, removes ALL rows regardless of heartbeat.
* Use as a recovery tool after container restarts where PIDs may be
* reused and stale rows would block new workers from registering.
*
* @return int
*/
public function cleanEndedProcesses(): int {
public function cleanEndedProcesses(bool $force = false): int {
if ($force) {
return $this->deleteAll(['1=1']);
}

$timeout = Config::defaultworkertimeout();
$thresholdTime = (new DateTime())->subSeconds($timeout);

Expand Down
22 changes: 22 additions & 0 deletions tests/TestCase/Model/Table/QueueProcessesTableTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -238,4 +238,26 @@ public function testStaleRowsCountTowardMaxWorkersUntilCleaned() {
$this->assertNotEmpty($id);
}

/**
* `--force` ignores the heartbeat threshold and wipes every row. Recovery
* path for container restarts where the killed worker's row survives with
* a recent `modified` timestamp, so the normal heartbeat-based cleanup
* considers it "still alive."
*
* @return void
*/
public function testCleanEndedProcessesForceRemovesAllRows() {
Configure::write('Queue.maxworkers', 5);

$this->QueueProcesses->deleteAll(['1=1']);
$this->QueueProcesses->add('111', 'key-111');
$this->QueueProcesses->add('222', 'key-222');
$this->assertSame(2, $this->QueueProcesses->find()->count());

$deleted = $this->QueueProcesses->cleanEndedProcesses(true);

$this->assertSame(2, $deleted);
$this->assertSame(0, $this->QueueProcesses->find()->count());
}

}
Loading