|
16 | 16 | import logging |
17 | 17 | import random |
18 | 18 | from http import HTTPStatus |
19 | | -from typing import TYPE_CHECKING, Iterable, List, Optional, Set, Tuple |
| 19 | +from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Set, Tuple |
20 | 20 |
|
21 | 21 | from synapse import types |
22 | 22 | from synapse.api.constants import ( |
|
38 | 38 | from synapse.events import EventBase |
39 | 39 | from synapse.events.snapshot import EventContext |
40 | 40 | from synapse.handlers.profile import MAX_AVATAR_URL_LEN, MAX_DISPLAYNAME_LEN |
| 41 | +from synapse.handlers.state_deltas import MatchChange, StateDeltasHandler |
41 | 42 | from synapse.logging import opentracing |
| 43 | +from synapse.metrics import event_processing_positions |
| 44 | +from synapse.metrics.background_process_metrics import run_as_background_process |
42 | 45 | from synapse.module_api import NOT_SPAM |
43 | 46 | from synapse.types import ( |
44 | 47 | JsonDict, |
@@ -280,9 +283,25 @@ async def _user_left_room(self, target: UserID, room_id: str) -> None: |
280 | 283 | """ |
281 | 284 | raise NotImplementedError() |
282 | 285 |
|
283 | | - @abc.abstractmethod |
284 | 286 | async def forget(self, user: UserID, room_id: str) -> None: |
285 | | - raise NotImplementedError() |
| 287 | + user_id = user.to_string() |
| 288 | + |
| 289 | + member = await self._storage_controllers.state.get_current_state_event( |
| 290 | + room_id=room_id, event_type=EventTypes.Member, state_key=user_id |
| 291 | + ) |
| 292 | + membership = member.membership if member else None |
| 293 | + |
| 294 | + if membership is not None and membership not in [ |
| 295 | + Membership.LEAVE, |
| 296 | + Membership.BAN, |
| 297 | + ]: |
| 298 | + raise SynapseError(400, "User %s in room %s" % (user_id, room_id)) |
| 299 | + |
| 300 | + # In normal case this call is only required if `membership` is not `None`. |
| 301 | + # But: After the last member had left the room, the background update |
| 302 | + # `_background_remove_left_rooms` is deleting rows related to this room from |
| 303 | + # the table `current_state_events` and `get_current_state_events` is `None`. |
| 304 | + await self.store.forget(user_id, room_id) |
286 | 305 |
|
287 | 306 | async def ratelimit_multiple_invites( |
288 | 307 | self, |
@@ -2046,25 +2065,141 @@ async def _user_left_room(self, target: UserID, room_id: str) -> None: |
2046 | 2065 | """Implements RoomMemberHandler._user_left_room""" |
2047 | 2066 | user_left_room(self.distributor, target, room_id) |
2048 | 2067 |
|
2049 | | - async def forget(self, user: UserID, room_id: str) -> None: |
2050 | | - user_id = user.to_string() |
2051 | 2068 |
|
2052 | | - member = await self._storage_controllers.state.get_current_state_event( |
2053 | | - room_id=room_id, event_type=EventTypes.Member, state_key=user_id |
2054 | | - ) |
2055 | | - membership = member.membership if member else None |
| 2069 | +class RoomForgetterHandler(StateDeltasHandler): |
| 2070 | + """Forgets rooms when they are left, when enabled in the homeserver config. |
2056 | 2071 |
|
2057 | | - if membership is not None and membership not in [ |
2058 | | - Membership.LEAVE, |
2059 | | - Membership.BAN, |
2060 | | - ]: |
2061 | | - raise SynapseError(400, "User %s in room %s" % (user_id, room_id)) |
| 2072 | + For the purposes of this feature, kicks, bans and "leaves" via state resolution |
| 2073 | + weirdness are all considered to be leaves. |
2062 | 2074 |
|
2063 | | - # In normal case this call is only required if `membership` is not `None`. |
2064 | | - # But: After the last member had left the room, the background update |
2065 | | - # `_background_remove_left_rooms` is deleting rows related to this room from |
2066 | | - # the table `current_state_events` and `get_current_state_events` is `None`. |
2067 | | - await self.store.forget(user_id, room_id) |
| 2075 | + Derived from `StatsHandler` and `UserDirectoryHandler`. |
| 2076 | + """ |
| 2077 | + |
| 2078 | + def __init__(self, hs: "HomeServer"): |
| 2079 | + super().__init__(hs) |
| 2080 | + |
| 2081 | + self._hs = hs |
| 2082 | + self._store = hs.get_datastores().main |
| 2083 | + self._storage_controllers = hs.get_storage_controllers() |
| 2084 | + self._clock = hs.get_clock() |
| 2085 | + self._notifier = hs.get_notifier() |
| 2086 | + self._room_member_handler = hs.get_room_member_handler() |
| 2087 | + |
| 2088 | + # The current position in the current_state_delta stream |
| 2089 | + self.pos: Optional[int] = None |
| 2090 | + |
| 2091 | + # Guard to ensure we only process deltas one at a time |
| 2092 | + self._is_processing = False |
| 2093 | + |
| 2094 | + if hs.config.worker.run_background_tasks: |
| 2095 | + self._notifier.add_replication_callback(self.notify_new_event) |
| 2096 | + |
| 2097 | + # We kick this off to pick up outstanding work from before the last restart. |
| 2098 | + self._clock.call_later(0, self.notify_new_event) |
| 2099 | + |
| 2100 | + def notify_new_event(self) -> None: |
| 2101 | + """Called when there may be more deltas to process""" |
| 2102 | + if self._is_processing: |
| 2103 | + return |
| 2104 | + |
| 2105 | + self._is_processing = True |
| 2106 | + |
| 2107 | + async def process() -> None: |
| 2108 | + try: |
| 2109 | + await self._unsafe_process() |
| 2110 | + finally: |
| 2111 | + self._is_processing = False |
| 2112 | + |
| 2113 | + run_as_background_process("room_forgetter.notify_new_event", process) |
| 2114 | + |
| 2115 | + async def _unsafe_process(self) -> None: |
| 2116 | + # If self.pos is None then means we haven't fetched it from DB |
| 2117 | + if self.pos is None: |
| 2118 | + self.pos = await self._store.get_room_forgetter_stream_pos() |
| 2119 | + room_max_stream_ordering = self._store.get_room_max_stream_ordering() |
| 2120 | + if self.pos > room_max_stream_ordering: |
| 2121 | + # apparently, we've processed more events than exist in the database! |
| 2122 | + # this can happen if events are removed with history purge or similar. |
| 2123 | + logger.warning( |
| 2124 | + "Event stream ordering appears to have gone backwards (%i -> %i): " |
| 2125 | + "rewinding room forgetter processor", |
| 2126 | + self.pos, |
| 2127 | + room_max_stream_ordering, |
| 2128 | + ) |
| 2129 | + self.pos = room_max_stream_ordering |
| 2130 | + |
| 2131 | + if not self._hs.config.room.forget_on_leave: |
| 2132 | + # Update the processing position, so that if the server admin turns the |
| 2133 | + # feature on at a later date, we don't decide to forget every room that |
| 2134 | + # has ever been left in the past. |
| 2135 | + self.pos = self._store.get_room_max_stream_ordering() |
| 2136 | + await self._store.update_room_forgetter_stream_pos(self.pos) |
| 2137 | + return |
| 2138 | + |
| 2139 | + # Loop round handling deltas until we're up to date |
| 2140 | + |
| 2141 | + while True: |
| 2142 | + # Be sure to read the max stream_ordering *before* checking if there are any outstanding |
| 2143 | + # deltas, since there is otherwise a chance that we could miss updates which arrive |
| 2144 | + # after we check the deltas. |
| 2145 | + room_max_stream_ordering = self._store.get_room_max_stream_ordering() |
| 2146 | + if self.pos == room_max_stream_ordering: |
| 2147 | + break |
| 2148 | + |
| 2149 | + logger.debug( |
| 2150 | + "Processing room forgetting %s->%s", self.pos, room_max_stream_ordering |
| 2151 | + ) |
| 2152 | + ( |
| 2153 | + max_pos, |
| 2154 | + deltas, |
| 2155 | + ) = await self._storage_controllers.state.get_current_state_deltas( |
| 2156 | + self.pos, room_max_stream_ordering |
| 2157 | + ) |
| 2158 | + |
| 2159 | + logger.debug("Handling %d state deltas", len(deltas)) |
| 2160 | + await self._handle_deltas(deltas) |
| 2161 | + |
| 2162 | + self.pos = max_pos |
| 2163 | + |
| 2164 | + # Expose current event processing position to prometheus |
| 2165 | + event_processing_positions.labels("room_forgetter").set(max_pos) |
| 2166 | + |
| 2167 | + await self._store.update_room_forgetter_stream_pos(max_pos) |
| 2168 | + |
| 2169 | + async def _handle_deltas(self, deltas: List[Dict[str, Any]]) -> None: |
| 2170 | + """Called with the state deltas to process""" |
| 2171 | + for delta in deltas: |
| 2172 | + typ = delta["type"] |
| 2173 | + state_key = delta["state_key"] |
| 2174 | + room_id = delta["room_id"] |
| 2175 | + event_id = delta["event_id"] |
| 2176 | + prev_event_id = delta["prev_event_id"] |
| 2177 | + |
| 2178 | + if typ != EventTypes.Member: |
| 2179 | + continue |
| 2180 | + |
| 2181 | + if not self._hs.is_mine_id(state_key): |
| 2182 | + continue |
| 2183 | + |
| 2184 | + change = await self._get_key_change( |
| 2185 | + prev_event_id, |
| 2186 | + event_id, |
| 2187 | + key_name="membership", |
| 2188 | + public_value=Membership.JOIN, |
| 2189 | + ) |
| 2190 | + is_leave = change is MatchChange.now_false |
| 2191 | + |
| 2192 | + if is_leave: |
| 2193 | + try: |
| 2194 | + await self._room_member_handler.forget( |
| 2195 | + UserID.from_string(state_key), room_id |
| 2196 | + ) |
| 2197 | + except SynapseError as e: |
| 2198 | + if e.code == 400: |
| 2199 | + # The user is back in the room. |
| 2200 | + pass |
| 2201 | + else: |
| 2202 | + raise |
2068 | 2203 |
|
2069 | 2204 |
|
2070 | 2205 | def get_users_which_can_issue_invite(auth_events: StateMap[EventBase]) -> List[str]: |
|
0 commit comments