From 90ae0a67f502680eb63e576c0aaa7460a13dfdd7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 24 Jun 2024 17:42:22 +0100 Subject: [PATCH 1/3] Limit amount of replication we send --- synapse/storage/databases/main/devices.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index 5eeca6165df..e74e94f765c 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -1035,14 +1035,16 @@ def _get_all_device_list_changes_for_remotes( SELECT stream_id, user_id, hosts FROM ( SELECT stream_id, user_id, false AS hosts FROM device_lists_stream UNION ALL - SELECT DISTINCT stream_id, user_id, true AS hosts FROM device_lists_outbound_pokes + SELECT MAX(stream_id), user_id, true AS hosts FROM device_lists_outbound_pokes + WHERE ? < stream_id AND stream_id <= ? + GROUP BY user_id ) AS e WHERE ? < stream_id AND stream_id <= ? ORDER BY stream_id ASC LIMIT ? """ - txn.execute(sql, (last_id, current_id, limit)) + txn.execute(sql, (last_id, current_id, last_id, current_id, limit)) updates = [(row[0], row[1:]) for row in txn] limited = False upto_token = current_id From 108d8093abff66422f680901da9474ec376886ab Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 24 Jun 2024 17:44:25 +0100 Subject: [PATCH 2/3] Newsfile --- changelog.d/17358.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/17358.misc diff --git a/changelog.d/17358.misc b/changelog.d/17358.misc new file mode 100644 index 00000000000..d3ef0b37775 --- /dev/null +++ b/changelog.d/17358.misc @@ -0,0 +1 @@ +Handle device lists notifications for large accounts more efficiently in worker mode. From 5c06a83dbedae73e72cc814f21538eed49d3d80f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 25 Jun 2024 10:43:26 +0100 Subject: [PATCH 3/3] Use one stream ID for all device list outbound pokes --- synapse/storage/databases/main/devices.py | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index e74e94f765c..59a035dd62a 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -1035,16 +1035,14 @@ def _get_all_device_list_changes_for_remotes( SELECT stream_id, user_id, hosts FROM ( SELECT stream_id, user_id, false AS hosts FROM device_lists_stream UNION ALL - SELECT MAX(stream_id), user_id, true AS hosts FROM device_lists_outbound_pokes - WHERE ? < stream_id AND stream_id <= ? - GROUP BY user_id + SELECT DISTINCT stream_id, user_id, true AS hosts FROM device_lists_outbound_pokes ) AS e WHERE ? < stream_id AND stream_id <= ? ORDER BY stream_id ASC LIMIT ? """ - txn.execute(sql, (last_id, current_id, last_id, current_id, limit)) + txn.execute(sql, (last_id, current_id, limit)) updates = [(row[0], row[1:]) for row in txn] limited = False upto_token = current_id @@ -2133,7 +2131,7 @@ def _add_device_outbound_poke_to_stream_txn( user_id: str, device_id: str, hosts: Collection[str], - stream_ids: List[int], + stream_id: int, context: Optional[Dict[str, str]], ) -> None: if self._device_list_federation_stream_cache: @@ -2141,11 +2139,10 @@ def _add_device_outbound_poke_to_stream_txn( txn.call_after( self._device_list_federation_stream_cache.entity_has_changed, host, - stream_ids[-1], + stream_id, ) now = self._clock.time_msec() - stream_id_iterator = iter(stream_ids) encoded_context = json_encoder.encode(context) mark_sent = not self.hs.is_mine_id(user_id) @@ -2154,7 +2151,7 @@ def _add_device_outbound_poke_to_stream_txn( ( destination, self._instance_name, - next(stream_id_iterator), + stream_id, user_id, device_id, mark_sent, @@ -2339,22 +2336,22 @@ async def add_device_list_outbound_pokes( return def add_device_list_outbound_pokes_txn( - txn: LoggingTransaction, stream_ids: List[int] + txn: LoggingTransaction, stream_id: int ) -> None: self._add_device_outbound_poke_to_stream_txn( txn, user_id=user_id, device_id=device_id, hosts=hosts, - stream_ids=stream_ids, + stream_id=stream_id, context=context, ) - async with self._device_list_id_gen.get_next_mult(len(hosts)) as stream_ids: + async with self._device_list_id_gen.get_next() as stream_id: return await self.db_pool.runInteraction( "add_device_list_outbound_pokes", add_device_list_outbound_pokes_txn, - stream_ids, + stream_id, ) async def add_remote_device_list_to_pending(