Skip to content

Commit 8fbadcc

Browse files
committed
Improve lock performance when a lot of locks are waiting
1 parent 2927008 commit 8fbadcc

File tree

4 files changed

+74
-7
lines changed

4 files changed

+74
-7
lines changed

changelog.d/16840.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Improve lock performance when a lot of locks are all waiting for a single lock to be released.

synapse/handlers/worker_lock.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -181,12 +181,15 @@ def _on_lock_released(
181181
if not locks:
182182
return
183183

184-
def _wake_deferred(deferred: defer.Deferred) -> None:
185-
if not deferred.called:
186-
deferred.callback(None)
187-
188-
for lock in locks:
189-
self._clock.call_later(0, _wake_deferred, lock.deferred)
184+
def _wake_all_locks(
185+
locks: Collection[Union[WaitingLock, WaitingMultiLock]]
186+
) -> None:
187+
for lock in locks:
188+
deferred = lock.deferred
189+
if not deferred.called:
190+
deferred.callback(None)
191+
192+
self._clock.call_later(0, _wake_all_locks, locks)
190193

191194
@wrap_as_background_process("_cleanup_locks")
192195
async def _cleanup_locks(self) -> None:

tests/handlers/test_worker_lock.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626

2727
from tests import unittest
2828
from tests.replication._base import BaseMultiWorkerStreamTestCase
29+
from tests.utils import test_timeout
2930

3031

3132
class WorkerLockTestCase(unittest.HomeserverTestCase):
@@ -49,6 +50,28 @@ def test_wait_for_lock_locally(self) -> None:
4950
self.get_success(d2)
5051
self.get_success(lock2.__aexit__(None, None, None))
5152

53+
def test_lock_contention(self) -> None:
54+
"""Test lock contention when a lot of locks wait on a single worker"""
55+
56+
# It takes around 0.5s on a 5+ years old laptop
57+
with test_timeout(5):
58+
nb_locks = 500
59+
d = self._take_locks(nb_locks)
60+
self.assertEqual(self.get_success(d), nb_locks)
61+
62+
async def _take_locks(self, nb_locks: int) -> int:
63+
locks = [
64+
self.hs.get_worker_locks_handler().acquire_lock("test_lock", "")
65+
for _ in range(nb_locks)
66+
]
67+
68+
nb_locks_taken = 0
69+
for lock in locks:
70+
async with lock:
71+
nb_locks_taken += 1
72+
73+
return nb_locks_taken
74+
5275

5376
class WorkerLockWorkersTestCase(BaseMultiWorkerStreamTestCase):
5477
def prepare(

tests/utils.py

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,20 @@
2020

2121
import atexit
2222
import os
23-
from typing import Any, Callable, Dict, List, Tuple, Type, TypeVar, Union, overload
23+
import signal
24+
from types import FrameType, TracebackType
25+
from typing import (
26+
Any,
27+
Callable,
28+
Dict,
29+
List,
30+
Optional,
31+
Tuple,
32+
Type,
33+
TypeVar,
34+
Union,
35+
overload,
36+
)
2437

2538
import attr
2639
from typing_extensions import Literal, ParamSpec
@@ -380,3 +393,30 @@ def checked_cast(type: Type[T], x: object) -> T:
380393
"""
381394
assert isinstance(x, type)
382395
return x
396+
397+
398+
class TestTimeout(Exception):
399+
pass
400+
401+
402+
class test_timeout:
403+
def __init__(self, seconds: int, error_message: Optional[str] = None) -> None:
404+
if error_message is None:
405+
error_message = "test timed out after {}s.".format(seconds)
406+
self.seconds = seconds
407+
self.error_message = error_message
408+
409+
def handle_timeout(self, signum: int, frame: Optional[FrameType]) -> None:
410+
raise TestTimeout(self.error_message)
411+
412+
def __enter__(self) -> None:
413+
signal.signal(signal.SIGALRM, self.handle_timeout)
414+
signal.alarm(self.seconds)
415+
416+
def __exit__(
417+
self,
418+
exc_type: Optional[Type[BaseException]],
419+
exc_val: Optional[BaseException],
420+
exc_tb: Optional[TracebackType],
421+
) -> None:
422+
signal.alarm(0)

0 commit comments

Comments
 (0)