Skip to content

Commit 9f514dd

Browse files
authored
Use _invalidate_cache_and_stream_bulk in more places. (#16616)
This takes advantage of the new bulk method in more places to invalidate caches for many keys at once (and then to stream that over replication).
1 parent ab3f1b3 commit 9f514dd

File tree

8 files changed

+72
-47
lines changed

8 files changed

+72
-47
lines changed

changelog.d/16613.feature

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
Improve the performance of claiming encryption keys in multi-worker deployments.
1+
Improve the performance of some operations in multi-worker deployments.

changelog.d/16616.feature

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Improve the performance of some operations in multi-worker deployments.

synapse/storage/databases/main/account_data.py

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -747,8 +747,16 @@ def _add_account_data_for_user(
747747
)
748748

749749
# Invalidate the cache for any ignored users which were added or removed.
750-
for ignored_user_id in previously_ignored_users ^ currently_ignored_users:
751-
self._invalidate_cache_and_stream(txn, self.ignored_by, (ignored_user_id,))
750+
self._invalidate_cache_and_stream_bulk(
751+
txn,
752+
self.ignored_by,
753+
[
754+
(ignored_user_id,)
755+
for ignored_user_id in (
756+
previously_ignored_users ^ currently_ignored_users
757+
)
758+
],
759+
)
752760
self._invalidate_cache_and_stream(txn, self.ignored_users, (user_id,))
753761

754762
async def remove_account_data_for_user(
@@ -824,10 +832,14 @@ def _remove_account_data_for_user_txn(
824832
)
825833

826834
# Invalidate the cache for ignored users which were removed.
827-
for ignored_user_id in previously_ignored_users:
828-
self._invalidate_cache_and_stream(
829-
txn, self.ignored_by, (ignored_user_id,)
830-
)
835+
self._invalidate_cache_and_stream_bulk(
836+
txn,
837+
self.ignored_by,
838+
[
839+
(ignored_user_id,)
840+
for ignored_user_id in previously_ignored_users
841+
],
842+
)
831843

832844
# Invalidate for this user the cache tracking ignored users.
833845
self._invalidate_cache_and_stream(txn, self.ignored_users, (user_id,))

synapse/storage/databases/main/events_bg_updates.py

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1222,14 +1222,13 @@ def _event_arbitrary_relations_txn(txn: LoggingTransaction) -> int:
12221222
)
12231223

12241224
# Iterate the parent IDs and invalidate caches.
1225-
for parent_id in {r[1] for r in relations_to_insert}:
1226-
cache_tuple = (parent_id,)
1227-
self._invalidate_cache_and_stream( # type: ignore[attr-defined]
1228-
txn, self.get_relations_for_event, cache_tuple # type: ignore[attr-defined]
1229-
)
1230-
self._invalidate_cache_and_stream( # type: ignore[attr-defined]
1231-
txn, self.get_thread_summary, cache_tuple # type: ignore[attr-defined]
1232-
)
1225+
cache_tuples = {(r[1],) for r in relations_to_insert}
1226+
self._invalidate_cache_and_stream_bulk( # type: ignore[attr-defined]
1227+
txn, self.get_relations_for_event, cache_tuples # type: ignore[attr-defined]
1228+
)
1229+
self._invalidate_cache_and_stream_bulk( # type: ignore[attr-defined]
1230+
txn, self.get_thread_summary, cache_tuples # type: ignore[attr-defined]
1231+
)
12331232

12341233
if results:
12351234
latest_event_id = results[-1][0]

synapse/storage/databases/main/keys.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -107,13 +107,16 @@ def store_server_keys_response_txn(txn: LoggingTransaction) -> None:
107107
# invalidate takes a tuple corresponding to the params of
108108
# _get_server_keys_json. _get_server_keys_json only takes one
109109
# param, which is itself the 2-tuple (server_name, key_id).
110-
for key_id in verify_keys:
111-
self._invalidate_cache_and_stream(
112-
txn, self._get_server_keys_json, ((server_name, key_id),)
113-
)
114-
self._invalidate_cache_and_stream(
115-
txn, self.get_server_key_json_for_remote, (server_name, key_id)
116-
)
110+
self._invalidate_cache_and_stream_bulk(
111+
txn,
112+
self._get_server_keys_json,
113+
[((server_name, key_id),) for key_id in verify_keys],
114+
)
115+
self._invalidate_cache_and_stream_bulk(
116+
txn,
117+
self.get_server_key_json_for_remote,
118+
[(server_name, key_id) for key_id in verify_keys],
119+
)
117120

118121
await self.db_pool.runInteraction(
119122
"store_server_keys_response", store_server_keys_response_txn

synapse/storage/databases/main/presence.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -363,10 +363,11 @@ def _add_users_to_send_full_presence_to(txn: LoggingTransaction) -> None:
363363
# for their user ID.
364364
value_values=[(presence_stream_id,) for _ in user_ids],
365365
)
366-
for user_id in user_ids:
367-
self._invalidate_cache_and_stream(
368-
txn, self._get_full_presence_stream_token_for_user, (user_id,)
369-
)
366+
self._invalidate_cache_and_stream_bulk(
367+
txn,
368+
self._get_full_presence_stream_token_for_user,
369+
[(user_id,) for user_id in user_ids],
370+
)
370371

371372
return await self.db_pool.runInteraction(
372373
"add_users_to_send_full_presence_to", _add_users_to_send_full_presence_to

synapse/storage/databases/main/purge_events.py

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -295,19 +295,28 @@ def _purge_history_txn(
295295
# so make sure to keep this actually last.
296296
txn.execute("DROP TABLE events_to_purge")
297297

298-
for event_id, should_delete in event_rows:
299-
self._invalidate_cache_and_stream(
300-
txn, self._get_state_group_for_event, (event_id,)
301-
)
298+
self._invalidate_cache_and_stream_bulk(
299+
txn,
300+
self._get_state_group_for_event,
301+
[(event_id,) for event_id, _ in event_rows],
302+
)
302303

303-
# XXX: This is racy, since have_seen_events could be called between the
304-
# transaction completing and the invalidation running. On the other hand,
305-
# that's no different to calling `have_seen_events` just before the
306-
# event is deleted from the database.
304+
# XXX: This is racy, since have_seen_events could be called between the
305+
# transaction completing and the invalidation running. On the other hand,
306+
# that's no different to calling `have_seen_events` just before the
307+
# event is deleted from the database.
308+
self._invalidate_cache_and_stream_bulk(
309+
txn,
310+
self.have_seen_event,
311+
[
312+
(room_id, event_id)
313+
for event_id, should_delete in event_rows
314+
if should_delete
315+
],
316+
)
317+
318+
for event_id, should_delete in event_rows:
307319
if should_delete:
308-
self._invalidate_cache_and_stream(
309-
txn, self.have_seen_event, (room_id, event_id)
310-
)
311320
self.invalidate_get_event_cache_after_txn(txn, event_id)
312321

313322
logger.info("[purge] done")

synapse/storage/databases/main/registration.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -561,16 +561,15 @@ def set_shadow_banned_txn(txn: LoggingTransaction) -> None:
561561
updatevalues={"shadow_banned": shadow_banned},
562562
)
563563
# In order for this to apply immediately, clear the cache for this user.
564-
tokens = self.db_pool.simple_select_onecol_txn(
564+
tokens = self.db_pool.simple_select_list_txn(
565565
txn,
566566
table="access_tokens",
567567
keyvalues={"user_id": user_id},
568-
retcol="token",
568+
retcols=("token",),
569+
)
570+
self._invalidate_cache_and_stream_bulk(
571+
txn, self.get_user_by_access_token, tokens
569572
)
570-
for token in tokens:
571-
self._invalidate_cache_and_stream(
572-
txn, self.get_user_by_access_token, (token,)
573-
)
574573
self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
575574

576575
await self.db_pool.runInteraction("set_shadow_banned", set_shadow_banned_txn)
@@ -2683,10 +2682,11 @@ def f(txn: LoggingTransaction) -> List[Tuple[str, int, Optional[str]]]:
26832682
)
26842683
tokens_and_devices = [(r[0], r[1], r[2]) for r in txn]
26852684

2686-
for token, _, _ in tokens_and_devices:
2687-
self._invalidate_cache_and_stream(
2688-
txn, self.get_user_by_access_token, (token,)
2689-
)
2685+
self._invalidate_cache_and_stream_bulk(
2686+
txn,
2687+
self.get_user_by_access_token,
2688+
[(token,) for token, _, _ in tokens_and_devices],
2689+
)
26902690

26912691
txn.execute("DELETE FROM access_tokens WHERE %s" % where_clause, values)
26922692

0 commit comments

Comments
 (0)