Skip to content

Commit 8695150

Browse files
d-phnicolas-grekas
authored andcommitted
[Messenger] Move PostgreSQL LISTEN/NOTIFY blocking to worker idle event listener
1 parent b4ef904 commit 8695150

File tree

7 files changed

+420
-11
lines changed

7 files changed

+420
-11
lines changed

src/Symfony/Component/Messenger/Bridge/Doctrine/CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
CHANGELOG
22
=========
33

4+
8.1
5+
---
6+
7+
* Add `PostgreSqlNotifyOnIdleListener` to properly support LISTEN/NOTIFY with multi-queue workers
8+
49
7.3
510
---
611

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Bridge\Doctrine\EventListener;
13+
14+
use Psr\Log\LoggerInterface;
15+
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
16+
use Symfony\Component\Messenger\Bridge\Doctrine\Transport\PostgreSqlConnection;
17+
use Symfony\Component\Messenger\Event\WorkerRunningEvent;
18+
use Symfony\Component\Messenger\Event\WorkerStartedEvent;
19+
20+
/**
21+
* When the worker is idle, blocks on PostgreSQL LISTEN/NOTIFY instead of
22+
* polling. This allows instant wake-up when a new message arrives while
23+
* properly supporting workers that consume from multiple queues.
24+
*
25+
* @author d-ph <dph03292@gmail.com>
26+
*/
27+
class PostgreSqlNotifyOnIdleListener implements EventSubscriberInterface
28+
{
29+
/** @var array<string, PostgreSqlConnection> */
30+
private array $connections = [];
31+
private ?PostgreSqlConnection $activeConnection = null;
32+
33+
public function __construct(
34+
private ?LoggerInterface $logger = null,
35+
) {
36+
}
37+
38+
/**
39+
* Registers a PostgreSQL connection candidate for LISTEN/NOTIFY.
40+
*
41+
* Called by {@see \Symfony\Component\Messenger\Bridge\Doctrine\Transport\DoctrineTransportFactory}
42+
* during transport creation.
43+
*/
44+
public function addConnection(string $transportName, PostgreSqlConnection $connection): void
45+
{
46+
$this->connections[$transportName] = $connection;
47+
}
48+
49+
public function onWorkerStarted(WorkerStartedEvent $event): void
50+
{
51+
$this->activeConnection = null;
52+
53+
foreach ($event->getWorker()->getMetadata()->getTransportNames() as $transportName) {
54+
if ($connection = $this->connections[$transportName] ?? null) {
55+
$connection->listen();
56+
$this->activeConnection ??= $connection;
57+
}
58+
}
59+
}
60+
61+
public function onWorkerRunning(WorkerRunningEvent $event): void
62+
{
63+
if (!$event->isWorkerIdle() || !$this->activeConnection) {
64+
return;
65+
}
66+
67+
$config = $this->activeConnection->getConfiguration();
68+
69+
if (0 >= $timeout = $config['get_notify_timeout'] ?: $config['check_delayed_interval']) {
70+
return;
71+
}
72+
73+
$this->logger?->debug('Worker waiting for PostgreSQL LISTEN/NOTIFY wake-up.');
74+
75+
$this->activeConnection->waitForNotify($timeout);
76+
}
77+
78+
public static function getSubscribedEvents(): array
79+
{
80+
return [
81+
WorkerStartedEvent::class => 'onWorkerStarted',
82+
WorkerRunningEvent::class => 'onWorkerRunning',
83+
];
84+
}
85+
}
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Bridge\Doctrine\Tests\EventListener;
13+
14+
use Doctrine\DBAL\Connection;
15+
use PHPUnit\Framework\TestCase;
16+
use Symfony\Component\Messenger\Bridge\Doctrine\EventListener\PostgreSqlNotifyOnIdleListener;
17+
use Symfony\Component\Messenger\Bridge\Doctrine\Transport\PostgreSqlConnection;
18+
use Symfony\Component\Messenger\Event\WorkerRunningEvent;
19+
use Symfony\Component\Messenger\Event\WorkerStartedEvent;
20+
use Symfony\Component\Messenger\Worker;
21+
use Symfony\Component\Messenger\WorkerMetadata;
22+
23+
class PostgreSqlNotifyOnIdleListenerTest extends TestCase
24+
{
25+
private function createPostgreSqlConnection(array $additionalConfig = []): PostgreSqlConnection
26+
{
27+
$driverConnection = $this->createStub(Connection::class);
28+
$driverConnection->method('executeStatement')->willReturn(1);
29+
30+
return new PostgreSqlConnection($additionalConfig + ['table_name' => 'queue_table'], $driverConnection);
31+
}
32+
33+
private function createWorkerWithTransports(array $transportNames): Worker
34+
{
35+
$worker = $this->createStub(Worker::class);
36+
$worker->method('getMetadata')->willReturn(new WorkerMetadata(['transportNames' => $transportNames]));
37+
38+
return $worker;
39+
}
40+
41+
public function testListenIsCalledOnWorkerStarted()
42+
{
43+
$connection = $this->createPostgreSqlConnection();
44+
45+
$listener = new PostgreSqlNotifyOnIdleListener();
46+
$listener->addConnection('async', $connection);
47+
48+
$this->assertFalse($connection->isListening());
49+
50+
$listener->onWorkerStarted(new WorkerStartedEvent($this->createWorkerWithTransports(['async'])));
51+
52+
$this->assertTrue($connection->isListening());
53+
}
54+
55+
public function testListenIsNotCalledForUnknownTransport()
56+
{
57+
$connection = $this->createPostgreSqlConnection();
58+
59+
$listener = new PostgreSqlNotifyOnIdleListener();
60+
$listener->addConnection('async', $connection);
61+
62+
$listener->onWorkerStarted(new WorkerStartedEvent($this->createWorkerWithTransports(['other'])));
63+
64+
$this->assertFalse($connection->isListening());
65+
}
66+
67+
public function testNoWaitWhenWorkerIsNotIdle()
68+
{
69+
$connection = $this->createPostgreSqlConnection();
70+
71+
$listener = new PostgreSqlNotifyOnIdleListener();
72+
$listener->addConnection('async', $connection);
73+
74+
$worker = $this->createWorkerWithTransports(['async']);
75+
$listener->onWorkerStarted(new WorkerStartedEvent($worker));
76+
77+
// isIdle=false, should not try to wait
78+
$listener->onWorkerRunning(new WorkerRunningEvent($worker, false));
79+
80+
$this->assertTrue(true);
81+
}
82+
83+
public function testNoWaitWhenNoPostgreSqlConnection()
84+
{
85+
$listener = new PostgreSqlNotifyOnIdleListener();
86+
$worker = $this->createWorkerWithTransports(['other']);
87+
88+
$listener->onWorkerStarted(new WorkerStartedEvent($worker));
89+
$listener->onWorkerRunning(new WorkerRunningEvent($worker, true));
90+
91+
$this->assertTrue(true);
92+
}
93+
94+
public function testNoWaitWhenTimeoutsAreZero()
95+
{
96+
$connection = $this->createPostgreSqlConnection([
97+
'get_notify_timeout' => 0,
98+
'check_delayed_interval' => 0,
99+
]);
100+
101+
$listener = new PostgreSqlNotifyOnIdleListener();
102+
$listener->addConnection('async', $connection);
103+
104+
$worker = $this->createWorkerWithTransports(['async']);
105+
$listener->onWorkerStarted(new WorkerStartedEvent($worker));
106+
107+
// Both timeouts are 0: should return immediately without blocking
108+
$listener->onWorkerRunning(new WorkerRunningEvent($worker, true));
109+
110+
$this->assertTrue(true);
111+
}
112+
113+
public function testMultipleTransportsListenOnAllConnections()
114+
{
115+
$conn1 = $this->createPostgreSqlConnection();
116+
$conn2 = $this->createPostgreSqlConnection();
117+
118+
$listener = new PostgreSqlNotifyOnIdleListener();
119+
$listener->addConnection('high', $conn1);
120+
$listener->addConnection('low', $conn2);
121+
122+
$listener->onWorkerStarted(new WorkerStartedEvent($this->createWorkerWithTransports(['high', 'low'])));
123+
124+
$this->assertTrue($conn1->isListening());
125+
$this->assertTrue($conn2->isListening());
126+
}
127+
128+
public function testGetSubscribedEvents()
129+
{
130+
$this->assertSame([
131+
WorkerStartedEvent::class => 'onWorkerStarted',
132+
WorkerRunningEvent::class => 'onWorkerRunning',
133+
], PostgreSqlNotifyOnIdleListener::getSubscribedEvents());
134+
}
135+
}

src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/DoctrineTransportFactoryTest.php

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use Doctrine\DBAL\Platforms\PostgreSQLPlatform;
1616
use Doctrine\Persistence\ConnectionRegistry;
1717
use PHPUnit\Framework\TestCase;
18+
use Symfony\Component\Messenger\Bridge\Doctrine\EventListener\PostgreSqlNotifyOnIdleListener;
1819
use Symfony\Component\Messenger\Bridge\Doctrine\Transport\Connection;
1920
use Symfony\Component\Messenger\Bridge\Doctrine\Transport\DoctrineTransport;
2021
use Symfony\Component\Messenger\Bridge\Doctrine\Transport\DoctrineTransportFactory;
@@ -89,4 +90,61 @@ public function testCreateTransportMustThrowAnExceptionIfManagerIsNotFound()
8990
$factory = new DoctrineTransportFactory($registry);
9091
$factory->createTransport('doctrine://default', [], $this->createStub(SerializerInterface::class));
9192
}
93+
94+
public function testCreateTransportRegistersConnectionWithListener()
95+
{
96+
$driverConnection = $this->createStub(\Doctrine\DBAL\Connection::class);
97+
$platform = $this->createStub(PostgreSQLPlatform::class);
98+
$driverConnection->method('getDatabasePlatform')->willReturn($platform);
99+
$driverConnection->method('executeStatement')->willReturn(1);
100+
101+
$registry = $this->createStub(ConnectionRegistry::class);
102+
$registry->method('getConnection')->willReturn($driverConnection);
103+
104+
$listener = $this->createMock(PostgreSqlNotifyOnIdleListener::class);
105+
$listener->expects($this->once())
106+
->method('addConnection')
107+
->with('my_transport', $this->isInstanceOf(PostgreSqlConnection::class));
108+
109+
$factory = new DoctrineTransportFactory($registry, $listener);
110+
$serializer = $this->createStub(SerializerInterface::class);
111+
112+
$factory->createTransport('doctrine://default', ['transport_name' => 'my_transport'], $serializer);
113+
}
114+
115+
public function testCreateTransportDoesNotRegisterWithoutTransportName()
116+
{
117+
$driverConnection = $this->createStub(\Doctrine\DBAL\Connection::class);
118+
$platform = $this->createStub(PostgreSQLPlatform::class);
119+
$driverConnection->method('getDatabasePlatform')->willReturn($platform);
120+
$driverConnection->method('executeStatement')->willReturn(1);
121+
122+
$registry = $this->createStub(ConnectionRegistry::class);
123+
$registry->method('getConnection')->willReturn($driverConnection);
124+
125+
$listener = $this->createMock(PostgreSqlNotifyOnIdleListener::class);
126+
$listener->expects($this->never())->method('addConnection');
127+
128+
$factory = new DoctrineTransportFactory($registry, $listener);
129+
$serializer = $this->createStub(SerializerInterface::class);
130+
131+
$factory->createTransport('doctrine://default', [], $serializer);
132+
}
133+
134+
public function testCreateTransportWorksWithoutListener()
135+
{
136+
$driverConnection = $this->createStub(\Doctrine\DBAL\Connection::class);
137+
$platform = $this->createStub(PostgreSQLPlatform::class);
138+
$driverConnection->method('getDatabasePlatform')->willReturn($platform);
139+
$driverConnection->method('executeStatement')->willReturn(1);
140+
141+
$registry = $this->createStub(ConnectionRegistry::class);
142+
$registry->method('getConnection')->willReturn($driverConnection);
143+
144+
$factory = new DoctrineTransportFactory($registry);
145+
$serializer = $this->createStub(SerializerInterface::class);
146+
147+
$transport = $factory->createTransport('doctrine://default', ['transport_name' => 'my_transport'], $serializer);
148+
$this->assertInstanceOf(DoctrineTransport::class, $transport);
149+
}
92150
}

0 commit comments

Comments
 (0)