From 7022380f4cdc63f159b12a0ad1dc07fce6e1c4c5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 5 Apr 2022 09:08:50 +0100 Subject: [PATCH 1/7] Make `get_cache_dict` handle gappy streams better. --- synapse/storage/database.py | 35 ++++++++++++++++++++--------------- 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 0264dea61dbf..3aa55259bed9 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -2030,24 +2030,29 @@ def get_cache_dict( max_value: int, limit: int = 100000, ) -> Tuple[Dict[Any, int], int]: - # Fetch a mapping of room_id -> max stream position for "recent" rooms. - # It doesn't really matter how many we get, the StreamChangeCache will - # do the right thing to ensure it respects the max size of cache. - sql = ( - "SELECT %(entity)s, MAX(%(stream)s) FROM %(table)s" - " WHERE %(stream)s > ? - %(limit)s" - " GROUP BY %(entity)s" - ) % { - "table": table, - "entity": entity_column, - "stream": stream_column, - "limit": limit, - } + """Gets roughly the last N changes in the given stream table as a + map from entity to the stream ID of the most recent change. + + Also returns the minimum stream ID. + """ + + # This may return many rows for the same entity, but the `limit` is only + # a suggestion so we don't care that much. + sql = f""" + SELECT {entity_column}, {stream_column} + FROM {table} + ORDER BY {stream_column} DESC + LIMIT ? + """ txn = db_conn.cursor(txn_name="get_cache_dict") - txn.execute(sql, (int(max_value),)) + txn.execute(sql, (limit,)) - cache = {row[0]: int(row[1]) for row in txn} + # The rows come out in reverse stream ID order, so we want to keep the + # stream ID of the first row for each entity. + cache = {} + for row in txn: + cache[0].setdefault(row[0], int(row[1])) txn.close() From 8cf8cf9aa13ad687a84a164fdf25607ff4c01059 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 5 Apr 2022 09:12:42 +0100 Subject: [PATCH 2/7] Increase device list stream prefill cache limit --- synapse/replication/slave/storage/devices.py | 2 +- synapse/storage/databases/main/__init__.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/replication/slave/storage/devices.py b/synapse/replication/slave/storage/devices.py index 00a634d3a9c6..057309a84d34 100644 --- a/synapse/replication/slave/storage/devices.py +++ b/synapse/replication/slave/storage/devices.py @@ -54,7 +54,7 @@ def __init__( entity_column="user_id", stream_column="stream_id", max_value=device_list_max, - limit=1000, + limit=10000, ) self._device_list_stream_cache = StreamChangeCache( "DeviceListStreamChangeCache", diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py index d4a38daa9a96..0f8e0f43b644 100644 --- a/synapse/storage/databases/main/__init__.py +++ b/synapse/storage/databases/main/__init__.py @@ -190,7 +190,7 @@ def __init__( entity_column="user_id", stream_column="stream_id", max_value=device_list_max, - limit=1000, + limit=10000, ) self._device_list_stream_cache = StreamChangeCache( "DeviceListStreamChangeCache", From 273c5f3686f93e75db4d2b1ff102ca94ed4a8595 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 5 Apr 2022 09:20:52 +0100 Subject: [PATCH 3/7] Define device stream change caches in one place --- synapse/replication/slave/storage/devices.py | 24 ++------------------ synapse/storage/databases/main/__init__.py | 21 ----------------- synapse/storage/databases/main/devices.py | 22 ++++++++++++++++++ 3 files changed, 24 insertions(+), 43 deletions(-) diff --git a/synapse/replication/slave/storage/devices.py b/synapse/replication/slave/storage/devices.py index 057309a84d34..c0eb1e1f46a5 100644 --- a/synapse/replication/slave/storage/devices.py +++ b/synapse/replication/slave/storage/devices.py @@ -33,8 +33,6 @@ def __init__( db_conn: LoggingDatabaseConnection, hs: "HomeServer", ): - super().__init__(database, db_conn, hs) - self.hs = hs self._device_list_id_gen = SlavedIdTracker( @@ -47,26 +45,8 @@ def __init__( ("device_lists_changes_in_room", "stream_id"), ], ) - device_list_max = self._device_list_id_gen.get_current_token() - device_list_prefill, min_device_list_id = self.db_pool.get_cache_dict( - db_conn, - "device_lists_stream", - entity_column="user_id", - stream_column="stream_id", - max_value=device_list_max, - limit=10000, - ) - self._device_list_stream_cache = StreamChangeCache( - "DeviceListStreamChangeCache", - min_device_list_id, - prefilled_cache=device_list_prefill, - ) - self._user_signature_stream_cache = StreamChangeCache( - "UserSignatureStreamChangeCache", device_list_max - ) - self._device_list_federation_stream_cache = StreamChangeCache( - "DeviceListFederationStreamChangeCache", device_list_max - ) + + super().__init__(database, db_conn, hs) def get_device_stream_token(self) -> int: return self._device_list_id_gen.get_current_token() diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py index 0f8e0f43b644..951031af50b0 100644 --- a/synapse/storage/databases/main/__init__.py +++ b/synapse/storage/databases/main/__init__.py @@ -183,27 +183,6 @@ def __init__( super().__init__(database, db_conn, hs) - device_list_max = self._device_list_id_gen.get_current_token() - device_list_prefill, min_device_list_id = self.db_pool.get_cache_dict( - db_conn, - "device_lists_stream", - entity_column="user_id", - stream_column="stream_id", - max_value=device_list_max, - limit=10000, - ) - self._device_list_stream_cache = StreamChangeCache( - "DeviceListStreamChangeCache", - min_device_list_id, - prefilled_cache=device_list_prefill, - ) - self._user_signature_stream_cache = StreamChangeCache( - "UserSignatureStreamChangeCache", device_list_max - ) - self._device_list_federation_stream_cache = StreamChangeCache( - "DeviceListFederationStreamChangeCache", device_list_max - ) - events_max = self._stream_id_gen.get_current_token() curr_state_delta_prefill, min_curr_state_delta_id = self.db_pool.get_cache_dict( db_conn, diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index 07eea4b3d217..aaf7444b98ac 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -46,6 +46,7 @@ from synapse.util import json_decoder, json_encoder from synapse.util.caches.descriptors import cached, cachedList from synapse.util.caches.lrucache import LruCache +from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.iterutils import batch_iter from synapse.util.stringutils import shortstr @@ -71,6 +72,27 @@ def __init__( ): super().__init__(database, db_conn, hs) + device_list_max = self._device_list_id_gen.get_current_token() + device_list_prefill, min_device_list_id = self.db_pool.get_cache_dict( + db_conn, + "device_lists_stream", + entity_column="user_id", + stream_column="stream_id", + max_value=device_list_max, + limit=10000, + ) + self._device_list_stream_cache = StreamChangeCache( + "DeviceListStreamChangeCache", + min_device_list_id, + prefilled_cache=device_list_prefill, + ) + self._user_signature_stream_cache = StreamChangeCache( + "UserSignatureStreamChangeCache", device_list_max + ) + self._device_list_federation_stream_cache = StreamChangeCache( + "DeviceListFederationStreamChangeCache", device_list_max + ) + if hs.config.worker.run_background_tasks: self._clock.looping_call( self._prune_old_outbound_device_pokes, 60 * 60 * 1000 From d2fb0e7d506c64e4d786516dc2224d371ddf41b8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 5 Apr 2022 09:28:59 +0100 Subject: [PATCH 4/7] Prefill more stream change caches --- synapse/storage/databases/main/devices.py | 32 ++++++++++++++++++++-- synapse/storage/databases/main/receipts.py | 13 ++++++++- 2 files changed, 42 insertions(+), 3 deletions(-) diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index aaf7444b98ac..dc8009b23df5 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -86,11 +86,39 @@ def __init__( min_device_list_id, prefilled_cache=device_list_prefill, ) + + ( + user_signature_stream_prefill, + user_signature_stream_list_id, + ) = self.db_pool.get_cache_dict( + db_conn, + "user_signature_stream", + entity_column="from_user_id", + stream_column="stream_id", + max_value=device_list_max, + limit=1000, + ) self._user_signature_stream_cache = StreamChangeCache( - "UserSignatureStreamChangeCache", device_list_max + "UserSignatureStreamChangeCache", + user_signature_stream_list_id, + prefilled_cache=user_signature_stream_prefill, + ) + + ( + device_list_federation_prefill, + device_list_federation_list_id, + ) = self.db_pool.get_cache_dict( + db_conn, + "device_lists_outbound_pokes", + entity_column="destination", + stream_column="stream_id", + max_value=device_list_max, + limit=10000, ) self._device_list_federation_stream_cache = StreamChangeCache( - "DeviceListFederationStreamChangeCache", device_list_max + "DeviceListFederationStreamChangeCache", + device_list_federation_list_id, + prefilled_cache=device_list_federation_prefill, ) if hs.config.worker.run_background_tasks: diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index e6f97aeece08..332e901dda40 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -98,8 +98,19 @@ def __init__( super().__init__(database, db_conn, hs) + max_receipts_stream_id = self.get_max_receipt_stream_id() + receipts_stream_prefill, min_receipts_stream_id = self.db_pool.get_cache_dict( + db_conn, + "receipts_linearized", + entity_column="room_id", + stream_column="stream_id", + max_value=max_receipts_stream_id, + limit=10000, + ) self._receipts_stream_cache = StreamChangeCache( - "ReceiptsRoomChangeCache", self.get_max_receipt_stream_id() + "ReceiptsRoomChangeCache", + min_receipts_stream_id, + prefilled_cache=receipts_stream_prefill, ) def get_max_receipt_stream_id(self) -> int: From 61f12af73ddc50cbaa241651740683c83b9abf6f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 5 Apr 2022 09:49:31 +0100 Subject: [PATCH 5/7] Newsfile --- changelog.d/12372.feature | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/12372.feature diff --git a/changelog.d/12372.feature b/changelog.d/12372.feature new file mode 100644 index 000000000000..34bb60e96601 --- /dev/null +++ b/changelog.d/12372.feature @@ -0,0 +1 @@ +Reduce overhead of restarting synchrotrons. From b6c6eb04e7686b1d88d942a8df21443c42d795ce Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 5 Apr 2022 10:09:15 +0100 Subject: [PATCH 6/7] Fixup --- synapse/replication/slave/storage/devices.py | 1 - synapse/storage/database.py | 4 ++-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/synapse/replication/slave/storage/devices.py b/synapse/replication/slave/storage/devices.py index c0eb1e1f46a5..30717c2bd099 100644 --- a/synapse/replication/slave/storage/devices.py +++ b/synapse/replication/slave/storage/devices.py @@ -20,7 +20,6 @@ from synapse.storage.database import DatabasePool, LoggingDatabaseConnection from synapse.storage.databases.main.devices import DeviceWorkerStore from synapse.storage.databases.main.end_to_end_keys import EndToEndKeyWorkerStore -from synapse.util.caches.stream_change_cache import StreamChangeCache if TYPE_CHECKING: from synapse.server import HomeServer diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 3aa55259bed9..b0daadcc511d 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -2050,9 +2050,9 @@ def get_cache_dict( # The rows come out in reverse stream ID order, so we want to keep the # stream ID of the first row for each entity. - cache = {} + cache: Dict[Any, int] = {} for row in txn: - cache[0].setdefault(row[0], int(row[1])) + cache.setdefault(row[0], int(row[1])) txn.close() From a602604af0f1b4188aeb910390ea67214b19d720 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 5 Apr 2022 13:34:36 +0100 Subject: [PATCH 7/7] Correctly handle tables with duplicate stream IDs --- synapse/storage/database.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index b0daadcc511d..12750d9b89d4 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -2038,6 +2038,10 @@ def get_cache_dict( # This may return many rows for the same entity, but the `limit` is only # a suggestion so we don't care that much. + # + # Note: Some stream tables can have multiple rows with the same stream + # ID. Instead of handling this with complicated SQL, we instead simply + # add one to the returned minimum stream ID to ensure correctness. sql = f""" SELECT {entity_column}, {stream_column} FROM {table} @@ -2057,7 +2061,9 @@ def get_cache_dict( txn.close() if cache: - min_val = min(cache.values()) + # We add one here as we don't know if we have all rows for the + # minimum stream ID. + min_val = min(cache.values()) + 1 else: min_val = max_value