@@ -52,6 +52,11 @@ class RoomMember:
5252FEDERATION_PING_INTERVAL = 40 * 1000
5353
5454
55+ # How long to remember a typing notification happened in a room before
56+ # forgetting about it.
57+ FORGET_TIMEOUT = 10 * 60 * 1000
58+
59+
5560class FollowerTypingHandler :
5661 """A typing handler on a different process than the writer that is updated
5762 via replication.
@@ -83,7 +88,10 @@ def __init__(self, hs: "HomeServer"):
8388 self .wheel_timer : WheelTimer [RoomMember ] = WheelTimer (bucket_size = 5000 )
8489 self ._latest_room_serial = 0
8590
91+ self ._rooms_updated : Set [str ] = set ()
92+
8693 self .clock .looping_call (self ._handle_timeouts , 5000 )
94+ self .clock .looping_call (self ._prune_old_typing , FORGET_TIMEOUT )
8795
8896 def _reset (self ) -> None :
8997 """Reset the typing handler's data caches."""
@@ -92,6 +100,8 @@ def _reset(self) -> None:
92100 # map room IDs to sets of users currently typing
93101 self ._room_typing = {}
94102
103+ self ._rooms_updated = set ()
104+
95105 self ._member_last_federation_poke = {}
96106 self .wheel_timer = WheelTimer (bucket_size = 5000 )
97107
@@ -178,6 +188,7 @@ def process_replication_rows(
178188 prev_typing = self ._room_typing .get (row .room_id , set ())
179189 now_typing = set (row .user_ids )
180190 self ._room_typing [row .room_id ] = now_typing
191+ self ._rooms_updated .add (row .room_id )
181192
182193 if self .federation :
183194 run_as_background_process (
@@ -209,6 +220,19 @@ async def _send_changes_in_typing_to_remotes(
209220 def get_current_token (self ) -> int :
210221 return self ._latest_room_serial
211222
223+ def _prune_old_typing (self ) -> None :
224+ """Prune rooms that haven't seen typing updates since last time.
225+
226+ This is safe to do as clients should time out old typing notifications.
227+ """
228+ stale_rooms = self ._room_serials .keys () - self ._rooms_updated
229+
230+ for room_id in stale_rooms :
231+ self ._room_serials .pop (room_id , None )
232+ self ._room_typing .pop (room_id , None )
233+
234+ self ._rooms_updated = set ()
235+
212236
213237class TypingWriterHandler (FollowerTypingHandler ):
214238 def __init__ (self , hs : "HomeServer" ):
@@ -388,6 +412,7 @@ def _push_update_local(self, member: RoomMember, typing: bool) -> None:
388412 self ._typing_stream_change_cache .entity_has_changed (
389413 member .room_id , self ._latest_room_serial
390414 )
415+ self ._rooms_updated .add (member .room_id )
391416
392417 self .notifier .on_new_event (
393418 StreamKeyType .TYPING , self ._latest_room_serial , rooms = [member .room_id ]
0 commit comments