Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<?php
declare(strict_types=1);

use Migrations\BaseMigration;

class MigrationQueueProcessesDropPidUniqueIndex extends BaseMigration {

/**
* @return void
*/
public function up(): void {
$this->table('queue_processes')
->removeIndexByName('pid')
->addIndex(['pid', 'server'], ['name' => 'pid_server'])
->save();
}

/**
* @return void
*/
public function down(): void {
$this->table('queue_processes')
->removeIndexByName('pid_server')
->addIndex(['pid', 'server'], ['name' => 'pid', 'unique' => true])
->save();
}

}
41 changes: 22 additions & 19 deletions src/Model/Table/QueueProcessesTable.php
Original file line number Diff line number Diff line change
Expand Up @@ -161,40 +161,35 @@ public function add(string $pid, string $key): int {
}

/**
* @param string $pid
* Heartbeat update for the worker identified by `$workerkey`. Workerkey
* is the per-process random hash assigned at startup; PID alone is not
* a stable worker identity (the OS recycles low PIDs across container
* restarts), so all heartbeat / removal paths look up by workerkey.
*
* @param string $workerkey
*
* @throws \Queue\Model\ProcessEndingException
*
* @return void
*/
public function update(string $pid): void {
$conditions = [
'pid' => $pid,
'server IS' => $this->buildServerString(),
];

public function update(string $workerkey): void {
/** @var \Queue\Model\Entity\QueueProcess $queueProcess */
$queueProcess = $this->find()->where($conditions)->firstOrFail();
$queueProcess = $this->find()->where(['workerkey' => $workerkey])->firstOrFail();
if ($queueProcess->terminate) {
throw new ProcessEndingException('PID terminated: ' . $pid);
throw new ProcessEndingException('Worker terminated: ' . $workerkey);
}

$queueProcess->modified = new DateTime();
$this->saveOrFail($queueProcess);
}

/**
* @param string $pid
* @param string $workerkey
*
* @return void
*/
public function remove(string $pid): void {
$conditions = [
'pid' => $pid,
'server IS' => $this->buildServerString(),
];

$this->deleteAll($conditions);
public function remove(string $workerkey): void {
$this->deleteAll(['workerkey' => $workerkey]);
}

/**
Expand Down Expand Up @@ -276,7 +271,12 @@ public function getProcesses(bool $forThisServer = false): array {
}

/**
* Soft ending of a running job, e.g. when migration is starting
* Soft ending of a running job, e.g. when migration is starting.
*
* If multiple rows share this PID (possible since the unique
* `(pid, server)` index was dropped), the most recently heartbeated
* row is chosen — that is the live worker; older rows are stale
* leftovers that will be cleaned up separately.
*
* @param string $pid
*
Expand All @@ -288,7 +288,10 @@ public function endProcess(string $pid): void {
}

/** @var \Queue\Model\Entity\QueueProcess $queuedProcess */
$queuedProcess = $this->find()->where(['pid' => $pid])->firstOrFail();
$queuedProcess = $this->find()
->where(['pid' => $pid])
->orderByDesc('modified')
->firstOrFail();
$queuedProcess->terminate = true;
$this->saveOrFail($queuedProcess);
}
Expand Down
28 changes: 20 additions & 8 deletions src/Queue/Processor.php
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,14 @@ class Processor {
*/
protected ?string $pid = null;

/**
* Random per-process identifier. Stable across the worker's lifetime;
* unlike `pid`, never reused. Used for heartbeat and shutdown lookups.
*
* @var string|null
*/
protected ?string $workerkey = null;

/**
* @var \Queue\Model\Table\QueuedJobsTable
*/
Expand Down Expand Up @@ -507,6 +515,7 @@ protected function initPid(): string {
$this->QueueProcesses->add($pid, $key);

$this->pid = $pid;
$this->workerkey = $key;

return $pid;
}
Expand All @@ -525,12 +534,16 @@ protected function retrievePid(): string {
}

/**
* @param string $pid
* @param string $pid Kept for log context; the actual lookup uses
* the per-process workerkey since PIDs are not unique identities.
*
* @return void
*/
protected function updatePid(string $pid): void {
$this->QueueProcesses->update($pid);
if ($this->workerkey === null) {
return;
}
$this->QueueProcesses->update($this->workerkey);
}

/**
Expand All @@ -548,19 +561,18 @@ protected function memoryUsage(): string {
}

/**
* @param string|null $pid
* @param string|null $pid Kept for backwards-compatible signature; the
* actual lookup uses the per-process workerkey since PIDs are not
* unique identities.
*
* @return void
*/
protected function deletePid(?string $pid): void {
if (!$pid) {
$pid = $this->pid;
}
if (!$pid) {
if ($this->workerkey === null) {
return;
}

$this->QueueProcesses->remove($pid);
$this->QueueProcesses->remove($this->workerkey);
}

/**
Expand Down
4 changes: 3 additions & 1 deletion tests/Fixture/QueueProcessesFixture.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ class QueueProcessesFixture extends TestFixture {
'_constraints' => [
'primary' => ['type' => 'primary', 'columns' => ['id'], 'length' => []],
'workerkey' => ['type' => 'unique', 'columns' => ['workerkey'], 'length' => []],
'pid' => ['type' => 'unique', 'columns' => ['pid', 'server'], 'length' => []],
],
'_indexes' => [
'pid_server' => ['type' => 'index', 'columns' => ['pid', 'server']],
],
'_options' => [
'engine' => 'InnoDB',
Expand Down
33 changes: 27 additions & 6 deletions tests/TestCase/Model/Table/QueueProcessesTableTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,11 @@ public function testAddMaxCount() {
* @return void
*/
public function testUpdate() {
$pid = '123';
$id = $this->QueueProcesses->add($pid, '456');
$workerkey = '456';
$id = $this->QueueProcesses->add('123', $workerkey);
$this->assertNotEmpty($id);

$this->QueueProcesses->update($pid);
$this->QueueProcesses->update($workerkey);

$queueProcess = $this->QueueProcesses->get($id);
$this->assertFalse($queueProcess->terminate);
Expand All @@ -111,16 +111,37 @@ public function testUpdate() {
* @return void
*/
public function testRemove() {
$pid = '123';
$queueProcessId = $this->QueueProcesses->add($pid, '456');
$workerkey = '456';
$queueProcessId = $this->QueueProcesses->add('123', $workerkey);
$this->assertNotEmpty($queueProcessId);

$this->QueueProcesses->remove($pid);
$this->QueueProcesses->remove($workerkey);

$result = $this->QueueProcesses->find()->where(['id' => $queueProcessId])->first();
$this->assertNull($result);
}

/**
* After dropping the unique `(pid, server)` index, two rows can coexist
* with the same PID on the same server. Mostly happens transiently after
* a container restart with PID reuse: one stale row + the new live one.
* `workerkey` remains the canonical identity (still uniquely indexed).
*
* @return void
*/
public function testAddAllowsDuplicatePidServer() {
Configure::write('Queue.maxworkers', 5);
$this->QueueProcesses->deleteAll(['1=1']);

$firstId = $this->QueueProcesses->add('8', 'first-workerkey');
$secondId = $this->QueueProcesses->add('8', 'second-workerkey');

$this->assertNotEmpty($firstId);
$this->assertNotEmpty($secondId);
$this->assertNotSame($firstId, $secondId);
$this->assertSame(2, $this->QueueProcesses->find()->where(['pid' => '8'])->count());
}

/**
* @return void
*/
Expand Down
Loading