Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Reimplement get_rooms_for_user and get_rooms_for_users
This avoids the join on `events` to get stream ordering that is mostly
unused.
  • Loading branch information
Fizzadar committed Sep 13, 2022
commit 10081061d783621f6bd02ed176605a67a388a21e
3 changes: 3 additions & 0 deletions synapse/storage/databases/main/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,9 @@ def _process_event_stream_row(self, token: int, row: EventsStreamRow) -> None:
self.get_rooms_for_user_with_stream_ordering.invalidate(
(data.state_key,)
)
self.get_rooms_for_user.invalidate(
(data.state_key,)
)
else:
raise Exception("Unknown events stream row type %s" % (row.type,))

Expand Down
4 changes: 4 additions & 0 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -1178,6 +1178,10 @@ def _update_current_state_txn(
self.store.get_rooms_for_user_with_stream_ordering.invalidate,
(member,),
)
txn.call_after(
self.store.get_rooms_for_user.invalidate,
(member,),
)

self.store._invalidate_state_caches_and_stream(
txn, room_id, members_changed
Expand Down
79 changes: 72 additions & 7 deletions synapse/storage/databases/main/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import logging
from typing import (
TYPE_CHECKING,
Callable,
Collection,
Dict,
FrozenSet,
Expand Down Expand Up @@ -52,7 +51,6 @@
from synapse.util.async_helpers import Linearizer
from synapse.util.caches import intern_string
from synapse.util.caches.descriptors import _CacheContext, cached, cachedList
from synapse.util.cancellation import cancellable
from synapse.util.iterutils import batch_iter
from synapse.util.metrics import Measure

Expand Down Expand Up @@ -670,19 +668,86 @@ def _get_users_server_still_shares_room_with_txn(
_get_users_server_still_shares_room_with_txn,
)

@cancellable
@cached(max_entries=500000, iterable=True)
async def get_rooms_for_user(
self, user_id: str, on_invalidate: Optional[Callable[[], None]] = None
self, user_id: str
) -> FrozenSet[str]:
"""Returns a set of room_ids the user is currently joined to.

If a remote user only returns rooms this server is currently
participating in.
"""
rooms = await self.get_rooms_for_user_with_stream_ordering(
user_id, on_invalidate=on_invalidate
rooms = self.get_rooms_for_user_with_stream_ordering.cache.get_immediate(user_id)
if rooms:
return frozenset(r.room_id for r in rooms)

return await self.db_pool.runInteraction(
"get_rooms_for_user",
self._get_rooms_for_user_txn,
user_id,
)
return frozenset(r.room_id for r in rooms)

def _get_rooms_for_user_txn(
self, txn: LoggingTransaction, user_id: str
) -> FrozenSet[str]:
sql = """
SELECT room_id
FROM current_state_events AS c
WHERE
c.type = 'm.room.member'
AND c.state_key = ?
AND c.membership = ?
"""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may as well just use simple_select_onecol here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


txn.execute(sql, (user_id, Membership.JOIN))
return frozenset(txn)

@cachedList(
cached_method_name="get_rooms_for_user",
list_name="user_ids",
)
async def get_rooms_for_users(
self, user_ids: Collection[str]
) -> Dict[str, FrozenSet[GetRoomsForUserWithStreamOrdering]]:
"""A batched version of `get_rooms_for_user`.

Returns:
Map from user_id to set of rooms that is currently in.
"""
return await self.db_pool.runInteraction(
"get_rooms_for_users",
self._get_rooms_for_users_txn,
user_ids,
)

def _get_rooms_for_users_txn(
self, txn: LoggingTransaction, user_ids: Collection[str]
) -> Dict[str, FrozenSet[str]]:

clause, args = make_in_list_sql_clause(
self.database_engine,
"c.state_key",
user_ids,
)

sql = f"""
SELECT c.state_key, room_id
FROM current_state_events AS c
WHERE
c.type = 'm.room.member'
AND c.membership = ?
AND {clause}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto, I think we can use simple_select_many_batch to simplify things.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"""

txn.execute(sql, [Membership.JOIN] + args)

result: Dict[str, Set[str]] = {
user_id: set() for user_id in user_ids
}
for user_id, room_id in txn:
result[user_id].add(room_id)

return {user_id: frozenset(v) for user_id, v in result.items()}

@cached(max_entries=10000)
async def does_pair_of_users_share_a_room(
Expand Down