Skip to content

Commit ef1f891

Browse files
committed
Improve lock performance when a lot of locks are waiting
1 parent 274f289 commit ef1f891

File tree

4 files changed

+74
-7
lines changed

4 files changed

+74
-7
lines changed

changelog.d/16840.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Improve lock performance when a lot of locks are all waiting for a single lock to be released.

synapse/handlers/worker_lock.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -182,12 +182,15 @@ def _on_lock_released(
182182
if not locks:
183183
return
184184

185-
def _wake_deferred(deferred: defer.Deferred) -> None:
186-
if not deferred.called:
187-
deferred.callback(None)
188-
189-
for lock in locks:
190-
self._clock.call_later(0, _wake_deferred, lock.deferred)
185+
def _wake_all_locks(
186+
locks: Collection[Union[WaitingLock, WaitingMultiLock]]
187+
) -> None:
188+
for lock in locks:
189+
deferred = lock.deferred
190+
if not deferred.called:
191+
deferred.callback(None)
192+
193+
self._clock.call_later(0, _wake_all_locks, locks)
191194

192195
@wrap_as_background_process("_cleanup_locks")
193196
async def _cleanup_locks(self) -> None:

tests/handlers/test_worker_lock.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828
from tests import unittest
2929
from tests.replication._base import BaseMultiWorkerStreamTestCase
30+
from tests.utils import test_timeout
3031

3132

3233
class WorkerLockTestCase(unittest.HomeserverTestCase):
@@ -50,6 +51,28 @@ def test_wait_for_lock_locally(self) -> None:
5051
self.get_success(d2)
5152
self.get_success(lock2.__aexit__(None, None, None))
5253

54+
def test_lock_contention(self) -> None:
55+
"""Test lock contention when a lot of locks wait on a single worker"""
56+
57+
# It takes around 0.5s on a 5+ years old laptop
58+
with test_timeout(5):
59+
nb_locks = 500
60+
d = self._take_locks(nb_locks)
61+
self.assertEqual(self.get_success(d), nb_locks)
62+
63+
async def _take_locks(self, nb_locks: int) -> int:
64+
locks = [
65+
self.hs.get_worker_locks_handler().acquire_lock("test_lock", "")
66+
for _ in range(nb_locks)
67+
]
68+
69+
nb_locks_taken = 0
70+
for lock in locks:
71+
async with lock:
72+
nb_locks_taken += 1
73+
74+
return nb_locks_taken
75+
5376

5477
class WorkerLockWorkersTestCase(BaseMultiWorkerStreamTestCase):
5578
def prepare(

tests/utils.py

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,20 @@
2121

2222
import atexit
2323
import os
24-
from typing import Any, Callable, Dict, List, Tuple, Type, TypeVar, Union, overload
24+
import signal
25+
from types import FrameType, TracebackType
26+
from typing import (
27+
Any,
28+
Callable,
29+
Dict,
30+
List,
31+
Optional,
32+
Tuple,
33+
Type,
34+
TypeVar,
35+
Union,
36+
overload,
37+
)
2538

2639
import attr
2740
from typing_extensions import Literal, ParamSpec
@@ -381,3 +394,30 @@ def checked_cast(type: Type[T], x: object) -> T:
381394
"""
382395
assert isinstance(x, type)
383396
return x
397+
398+
399+
class TestTimeout(Exception):
400+
pass
401+
402+
403+
class test_timeout:
404+
def __init__(self, seconds: int, error_message: Optional[str] = None) -> None:
405+
if error_message is None:
406+
error_message = "test timed out after {}s.".format(seconds)
407+
self.seconds = seconds
408+
self.error_message = error_message
409+
410+
def handle_timeout(self, signum: int, frame: Optional[FrameType]) -> None:
411+
raise TestTimeout(self.error_message)
412+
413+
def __enter__(self) -> None:
414+
signal.signal(signal.SIGALRM, self.handle_timeout)
415+
signal.alarm(self.seconds)
416+
417+
def __exit__(
418+
self,
419+
exc_type: Optional[Type[BaseException]],
420+
exc_val: Optional[BaseException],
421+
exc_tb: Optional[TracebackType],
422+
) -> None:
423+
signal.alarm(0)

0 commit comments

Comments
 (0)