|
28 | 28 |
|
29 | 29 | import attr |
30 | 30 | from constantly import NamedConstant, Names |
| 31 | +from prometheus_client import Gauge |
31 | 32 | from typing_extensions import Literal |
32 | 33 |
|
33 | 34 | from twisted.internet import defer |
|
81 | 82 | EVENT_QUEUE_TIMEOUT_S = 0.1 # Timeout when waiting for requests for events |
82 | 83 |
|
83 | 84 |
|
| 85 | +event_fetch_ongoing_gauge = Gauge( |
| 86 | + "synapse_event_fetch_ongoing", |
| 87 | + "The number of event fetchers that are running", |
| 88 | +) |
| 89 | + |
| 90 | + |
84 | 91 | @attr.s(slots=True, auto_attribs=True) |
85 | 92 | class _EventCacheEntry: |
86 | 93 | event: EventBase |
@@ -222,6 +229,7 @@ def __init__(self, database: DatabasePool, db_conn, hs): |
222 | 229 | self._event_fetch_lock = threading.Condition() |
223 | 230 | self._event_fetch_list = [] |
224 | 231 | self._event_fetch_ongoing = 0 |
| 232 | + event_fetch_ongoing_gauge.set(self._event_fetch_ongoing) |
225 | 233 |
|
226 | 234 | # We define this sequence here so that it can be referenced from both |
227 | 235 | # the DataStore and PersistEventStore. |
@@ -732,28 +740,31 @@ def _do_fetch(self, conn: Connection) -> None: |
732 | 740 | """Takes a database connection and waits for requests for events from |
733 | 741 | the _event_fetch_list queue. |
734 | 742 | """ |
735 | | - i = 0 |
736 | | - while True: |
737 | | - with self._event_fetch_lock: |
738 | | - event_list = self._event_fetch_list |
739 | | - self._event_fetch_list = [] |
740 | | - |
741 | | - if not event_list: |
742 | | - single_threaded = self.database_engine.single_threaded |
743 | | - if ( |
744 | | - not self.USE_DEDICATED_DB_THREADS_FOR_EVENT_FETCHING |
745 | | - or single_threaded |
746 | | - or i > EVENT_QUEUE_ITERATIONS |
747 | | - ): |
748 | | - self._event_fetch_ongoing -= 1 |
749 | | - return |
750 | | - else: |
751 | | - self._event_fetch_lock.wait(EVENT_QUEUE_TIMEOUT_S) |
752 | | - i += 1 |
753 | | - continue |
754 | | - i = 0 |
755 | | - |
756 | | - self._fetch_event_list(conn, event_list) |
| 743 | + try: |
| 744 | + i = 0 |
| 745 | + while True: |
| 746 | + with self._event_fetch_lock: |
| 747 | + event_list = self._event_fetch_list |
| 748 | + self._event_fetch_list = [] |
| 749 | + |
| 750 | + if not event_list: |
| 751 | + single_threaded = self.database_engine.single_threaded |
| 752 | + if ( |
| 753 | + not self.USE_DEDICATED_DB_THREADS_FOR_EVENT_FETCHING |
| 754 | + or single_threaded |
| 755 | + or i > EVENT_QUEUE_ITERATIONS |
| 756 | + ): |
| 757 | + break |
| 758 | + else: |
| 759 | + self._event_fetch_lock.wait(EVENT_QUEUE_TIMEOUT_S) |
| 760 | + i += 1 |
| 761 | + continue |
| 762 | + i = 0 |
| 763 | + |
| 764 | + self._fetch_event_list(conn, event_list) |
| 765 | + finally: |
| 766 | + self._event_fetch_ongoing -= 1 |
| 767 | + event_fetch_ongoing_gauge.set(self._event_fetch_ongoing) |
757 | 768 |
|
758 | 769 | def _fetch_event_list( |
759 | 770 | self, conn: Connection, event_list: List[Tuple[List[str], defer.Deferred]] |
@@ -977,6 +988,7 @@ async def _enqueue_events(self, events: Iterable[str]) -> Dict[str, _EventRow]: |
977 | 988 |
|
978 | 989 | if self._event_fetch_ongoing < EVENT_QUEUE_THREADS: |
979 | 990 | self._event_fetch_ongoing += 1 |
| 991 | + event_fetch_ongoing_gauge.set(self._event_fetch_ongoing) |
980 | 992 | should_start = True |
981 | 993 | else: |
982 | 994 | should_start = False |
|
0 commit comments