Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 4d6e5a5

Browse files
Use a database table to hold the users that should have full presence sent to them, instead of something in-memory (#9823)
1 parent 206a7b5 commit 4d6e5a5

File tree

11 files changed

+479
-158
lines changed

11 files changed

+479
-158
lines changed

changelog.d/9823.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Allow sending full presence to users via workers other than the one that called `ModuleApi.send_local_online_presence_to`.

docs/presence_router_module.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,11 @@ async def ModuleApi.send_local_online_presence_to(users: Iterable[str]) -> None
2828
which can be given a list of local or remote MXIDs to broadcast known, online user
2929
presence to (for those users that the receiving user is considered interested in).
3030
It does not include state for users who are currently offline, and it can only be
31-
called on workers that support sending federation.
31+
called on workers that support sending federation. Additionally, this method must
32+
only be called from the process that has been configured to write to the
33+
the [presence stream](https://github.com/matrix-org/synapse/blob/master/docs/workers.md#stream-writers).
34+
By default, this is the main process, but another worker can be configured to do
35+
so.
3236

3337
### Module structure
3438

synapse/handlers/presence.py

Lines changed: 110 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -222,9 +222,21 @@ async def current_state_for_users(
222222

223223
@abc.abstractmethod
224224
async def set_state(
225-
self, target_user: UserID, state: JsonDict, ignore_status_msg: bool = False
225+
self,
226+
target_user: UserID,
227+
state: JsonDict,
228+
ignore_status_msg: bool = False,
229+
force_notify: bool = False,
226230
) -> None:
227-
"""Set the presence state of the user. """
231+
"""Set the presence state of the user.
232+
233+
Args:
234+
target_user: The ID of the user to set the presence state of.
235+
state: The presence state as a JSON dictionary.
236+
ignore_status_msg: True to ignore the "status_msg" field of the `state` dict.
237+
If False, the user's current status will be updated.
238+
force_notify: Whether to force notification of the update to clients.
239+
"""
228240

229241
@abc.abstractmethod
230242
async def bump_presence_active_time(self, user: UserID):
@@ -296,6 +308,51 @@ async def maybe_send_presence_to_interested_destinations(
296308
for destinations, states in hosts_and_states:
297309
self._federation.send_presence_to_destinations(states, destinations)
298310

311+
async def send_full_presence_to_users(self, user_ids: Collection[str]):
312+
"""
313+
Adds to the list of users who should receive a full snapshot of presence
314+
upon their next sync. Note that this only works for local users.
315+
316+
Then, grabs the current presence state for a given set of users and adds it
317+
to the top of the presence stream.
318+
319+
Args:
320+
user_ids: The IDs of the local users to send full presence to.
321+
"""
322+
# Retrieve one of the users from the given set
323+
if not user_ids:
324+
raise Exception(
325+
"send_full_presence_to_users must be called with at least one user"
326+
)
327+
user_id = next(iter(user_ids))
328+
329+
# Mark all users as receiving full presence on their next sync
330+
await self.store.add_users_to_send_full_presence_to(user_ids)
331+
332+
# Add a new entry to the presence stream. Since we use stream tokens to determine whether a
333+
# local user should receive a full snapshot of presence when they sync, we need to bump the
334+
# presence stream so that subsequent syncs with no presence activity in between won't result
335+
# in the client receiving multiple full snapshots of presence.
336+
#
337+
# If we bump the stream ID, then the user will get a higher stream token next sync, and thus
338+
# correctly won't receive a second snapshot.
339+
340+
# Get the current presence state for one of the users (defaults to offline if not found)
341+
current_presence_state = await self.get_state(UserID.from_string(user_id))
342+
343+
# Convert the UserPresenceState object into a serializable dict
344+
state = {
345+
"presence": current_presence_state.state,
346+
"status_message": current_presence_state.status_msg,
347+
}
348+
349+
# Copy the presence state to the tip of the presence stream.
350+
351+
# We set force_notify=True here so that this presence update is guaranteed to
352+
# increment the presence stream ID (which resending the current user's presence
353+
# otherwise would not do).
354+
await self.set_state(UserID.from_string(user_id), state, force_notify=True)
355+
299356

300357
class _NullContextManager(ContextManager[None]):
301358
"""A context manager which does nothing."""
@@ -480,8 +537,17 @@ async def set_state(
480537
target_user: UserID,
481538
state: JsonDict,
482539
ignore_status_msg: bool = False,
540+
force_notify: bool = False,
483541
) -> None:
484-
"""Set the presence state of the user."""
542+
"""Set the presence state of the user.
543+
544+
Args:
545+
target_user: The ID of the user to set the presence state of.
546+
state: The presence state as a JSON dictionary.
547+
ignore_status_msg: True to ignore the "status_msg" field of the `state` dict.
548+
If False, the user's current status will be updated.
549+
force_notify: Whether to force notification of the update to clients.
550+
"""
485551
presence = state["presence"]
486552

487553
valid_presence = (
@@ -508,6 +574,7 @@ async def set_state(
508574
user_id=user_id,
509575
state=state,
510576
ignore_status_msg=ignore_status_msg,
577+
force_notify=force_notify,
511578
)
512579

513580
async def bump_presence_active_time(self, user: UserID) -> None:
@@ -677,13 +744,19 @@ async def _persist_unpersisted_changes(self) -> None:
677744
[self.user_to_current_state[user_id] for user_id in unpersisted]
678745
)
679746

680-
async def _update_states(self, new_states: Iterable[UserPresenceState]) -> None:
747+
async def _update_states(
748+
self, new_states: Iterable[UserPresenceState], force_notify: bool = False
749+
) -> None:
681750
"""Updates presence of users. Sets the appropriate timeouts. Pokes
682751
the notifier and federation if and only if the changed presence state
683752
should be sent to clients/servers.
684753
685754
Args:
686755
new_states: The new user presence state updates to process.
756+
force_notify: Whether to force notifying clients of this presence state update,
757+
even if it doesn't change the state of a user's presence (e.g online -> online).
758+
This is currently used to bump the max presence stream ID without changing any
759+
user's presence (see PresenceHandler.add_users_to_send_full_presence_to).
687760
"""
688761
now = self.clock.time_msec()
689762

@@ -720,6 +793,9 @@ async def _update_states(self, new_states: Iterable[UserPresenceState]) -> None:
720793
now=now,
721794
)
722795

796+
if force_notify:
797+
should_notify = True
798+
723799
self.user_to_current_state[user_id] = new_state
724800

725801
if should_notify:
@@ -1058,9 +1134,21 @@ async def incoming_presence(self, origin: str, content: JsonDict) -> None:
10581134
await self._update_states(updates)
10591135

10601136
async def set_state(
1061-
self, target_user: UserID, state: JsonDict, ignore_status_msg: bool = False
1137+
self,
1138+
target_user: UserID,
1139+
state: JsonDict,
1140+
ignore_status_msg: bool = False,
1141+
force_notify: bool = False,
10621142
) -> None:
1063-
"""Set the presence state of the user."""
1143+
"""Set the presence state of the user.
1144+
1145+
Args:
1146+
target_user: The ID of the user to set the presence state of.
1147+
state: The presence state as a JSON dictionary.
1148+
ignore_status_msg: True to ignore the "status_msg" field of the `state` dict.
1149+
If False, the user's current status will be updated.
1150+
force_notify: Whether to force notification of the update to clients.
1151+
"""
10641152
status_msg = state.get("status_msg", None)
10651153
presence = state["presence"]
10661154

@@ -1091,7 +1179,9 @@ async def set_state(
10911179
):
10921180
new_fields["last_active_ts"] = self.clock.time_msec()
10931181

1094-
await self._update_states([prev_state.copy_and_replace(**new_fields)])
1182+
await self._update_states(
1183+
[prev_state.copy_and_replace(**new_fields)], force_notify=force_notify
1184+
)
10951185

10961186
async def is_visible(self, observed_user: UserID, observer_user: UserID) -> bool:
10971187
"""Returns whether a user can see another user's presence."""
@@ -1389,11 +1479,10 @@ def __init__(self, hs: "HomeServer"):
13891479
#
13901480
# Presence -> Notifier -> PresenceEventSource -> Presence
13911481
#
1392-
# Same with get_module_api, get_presence_router
1482+
# Same with get_presence_router:
13931483
#
13941484
# AuthHandler -> Notifier -> PresenceEventSource -> ModuleApi -> AuthHandler
13951485
self.get_presence_handler = hs.get_presence_handler
1396-
self.get_module_api = hs.get_module_api
13971486
self.get_presence_router = hs.get_presence_router
13981487
self.clock = hs.get_clock()
13991488
self.store = hs.get_datastore()
@@ -1424,16 +1513,21 @@ async def get_new_events(
14241513
stream_change_cache = self.store.presence_stream_cache
14251514

14261515
with Measure(self.clock, "presence.get_new_events"):
1427-
if user_id in self.get_module_api()._send_full_presence_to_local_users:
1428-
# This user has been specified by a module to receive all current, online
1429-
# user presence. Removing from_key and setting include_offline to false
1430-
# will do effectively this.
1431-
from_key = None
1432-
include_offline = False
1433-
14341516
if from_key is not None:
14351517
from_key = int(from_key)
14361518

1519+
# Check if this user should receive all current, online user presence. We only
1520+
# bother to do this if from_key is set, as otherwise the user will receive all
1521+
# user presence anyways.
1522+
if await self.store.should_user_receive_full_presence_with_token(
1523+
user_id, from_key
1524+
):
1525+
# This user has been specified by a module to receive all current, online
1526+
# user presence. Removing from_key and setting include_offline to false
1527+
# will do effectively this.
1528+
from_key = None
1529+
include_offline = False
1530+
14371531
max_token = self.store.get_current_presence_token()
14381532
if from_key == max_token:
14391533
# This is necessary as due to the way stream ID generators work
@@ -1467,12 +1561,6 @@ async def get_new_events(
14671561
user_id, include_offline, from_key
14681562
)
14691563

1470-
# Remove the user from the list of users to receive all presence
1471-
if user_id in self.get_module_api()._send_full_presence_to_local_users:
1472-
self.get_module_api()._send_full_presence_to_local_users.remove(
1473-
user_id
1474-
)
1475-
14761564
return presence_updates, max_token
14771565

14781566
# Make mypy happy. users_interested_in should now be a set
@@ -1522,10 +1610,6 @@ async def get_new_events(
15221610
)
15231611
presence_updates = list(users_to_state.values())
15241612

1525-
# Remove the user from the list of users to receive all presence
1526-
if user_id in self.get_module_api()._send_full_presence_to_local_users:
1527-
self.get_module_api()._send_full_presence_to_local_users.remove(user_id)
1528-
15291613
if not include_offline:
15301614
# Filter out offline presence states
15311615
presence_updates = self._filter_offline_presence_state(presence_updates)

synapse/module_api/__init__.py

Lines changed: 30 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -56,14 +56,6 @@ def __init__(self, hs, auth_handler):
5656
self._http_client = hs.get_simple_http_client() # type: SimpleHttpClient
5757
self._public_room_list_manager = PublicRoomListManager(hs)
5858

59-
# The next time these users sync, they will receive the current presence
60-
# state of all local users. Users are added by send_local_online_presence_to,
61-
# and removed after a successful sync.
62-
#
63-
# We make this a private variable to deter modules from accessing it directly,
64-
# though other classes in Synapse will still do so.
65-
self._send_full_presence_to_local_users = set()
66-
6759
@property
6860
def http_client(self):
6961
"""Allows making outbound HTTP requests to remote resources.
@@ -405,39 +397,44 @@ async def send_local_online_presence_to(self, users: Iterable[str]) -> None:
405397
Updates to remote users will be sent immediately, whereas local users will receive
406398
them on their next sync attempt.
407399
408-
Note that this method can only be run on the main or federation_sender worker
409-
processes.
400+
Note that this method can only be run on the process that is configured to write to the
401+
presence stream. By default this is the main process.
410402
"""
411-
if not self._hs.should_send_federation():
403+
if self._hs._instance_name not in self._hs.config.worker.writers.presence:
412404
raise Exception(
413405
"send_local_online_presence_to can only be run "
414-
"on processes that send federation",
406+
"on the process that is configured to write to the "
407+
"presence stream (by default this is the main process)",
415408
)
416409

410+
local_users = set()
411+
remote_users = set()
417412
for user in users:
418413
if self._hs.is_mine_id(user):
419-
# Modify SyncHandler._generate_sync_entry_for_presence to call
420-
# presence_source.get_new_events with an empty `from_key` if
421-
# that user's ID were in a list modified by ModuleApi somewhere.
422-
# That user would then get all presence state on next incremental sync.
423-
424-
# Force a presence initial_sync for this user next time
425-
self._send_full_presence_to_local_users.add(user)
414+
local_users.add(user)
426415
else:
427-
# Retrieve presence state for currently online users that this user
428-
# is considered interested in
429-
presence_events, _ = await self._presence_stream.get_new_events(
430-
UserID.from_string(user), from_key=None, include_offline=False
431-
)
432-
433-
# Send to remote destinations.
434-
435-
# We pull out the presence handler here to break a cyclic
436-
# dependency between the presence router and module API.
437-
presence_handler = self._hs.get_presence_handler()
438-
await presence_handler.maybe_send_presence_to_interested_destinations(
439-
presence_events
440-
)
416+
remote_users.add(user)
417+
418+
# We pull out the presence handler here to break a cyclic
419+
# dependency between the presence router and module API.
420+
presence_handler = self._hs.get_presence_handler()
421+
422+
if local_users:
423+
# Force a presence initial_sync for these users next time they sync.
424+
await presence_handler.send_full_presence_to_users(local_users)
425+
426+
for user in remote_users:
427+
# Retrieve presence state for currently online users that this user
428+
# is considered interested in.
429+
presence_events, _ = await self._presence_stream.get_new_events(
430+
UserID.from_string(user), from_key=None, include_offline=False
431+
)
432+
433+
# Send to remote destinations.
434+
destination = UserID.from_string(user).domain
435+
presence_handler.get_federation_queue().send_presence_to_destinations(
436+
presence_events, destination
437+
)
441438

442439

443440
class PublicRoomListManager:

synapse/replication/http/presence.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ class ReplicationPresenceSetState(ReplicationEndpoint):
7373
{
7474
"state": { ... },
7575
"ignore_status_msg": false,
76+
"force_notify": false
7677
}
7778
7879
200 OK
@@ -91,17 +92,23 @@ def __init__(self, hs: "HomeServer"):
9192
self._presence_handler = hs.get_presence_handler()
9293

9394
@staticmethod
94-
async def _serialize_payload(user_id, state, ignore_status_msg=False):
95+
async def _serialize_payload(
96+
user_id, state, ignore_status_msg=False, force_notify=False
97+
):
9598
return {
9699
"state": state,
97100
"ignore_status_msg": ignore_status_msg,
101+
"force_notify": force_notify,
98102
}
99103

100104
async def _handle_request(self, request, user_id):
101105
content = parse_json_object_from_request(request)
102106

103107
await self._presence_handler.set_state(
104-
UserID.from_string(user_id), content["state"], content["ignore_status_msg"]
108+
UserID.from_string(user_id),
109+
content["state"],
110+
content["ignore_status_msg"],
111+
content["force_notify"],
105112
)
106113

107114
return (

synapse/rest/admin/server_notice_servlet.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@ def __init__(self, hs: "HomeServer"):
5454
self.hs = hs
5555
self.auth = hs.get_auth()
5656
self.txns = HttpTransactionCache(hs)
57-
self.snm = hs.get_server_notices_manager()
5857

5958
def register(self, json_resource: HttpServer):
6059
PATTERN = "/send_server_notice"
@@ -77,15 +76,18 @@ async def on_POST(
7776
event_type = body.get("type", EventTypes.Message)
7877
state_key = body.get("state_key")
7978

80-
if not self.snm.is_enabled():
79+
# We grab the server notices manager here as its initialisation has a check for worker processes,
80+
# but worker processes still need to initialise SendServerNoticeServlet (as it is part of the
81+
# admin api).
82+
if not self.hs.get_server_notices_manager().is_enabled():
8183
raise SynapseError(400, "Server notices are not enabled on this server")
8284

8385
user_id = body["user_id"]
8486
UserID.from_string(user_id)
8587
if not self.hs.is_mine_id(user_id):
8688
raise SynapseError(400, "Server notices can only be sent to local users")
8789

88-
event = await self.snm.send_notice(
90+
event = await self.hs.get_server_notices_manager().send_notice(
8991
user_id=body["user_id"],
9092
type=event_type,
9193
state_key=state_key,

0 commit comments

Comments
 (0)