Skip to content

Commit 6e1ac55

Browse files
authored
Expose MSC4354 Sticky Events over the legacy (v3) /sync API. (#19487)
Follows: #19365 Part of: MSC4354 whose experimental feature tracking issue is #19409 Partially supersedes: #18968 --------- Signed-off-by: Olivier 'reivilibre' <oliverw@matrix.org>
1 parent 16125ce commit 6e1ac55

File tree

5 files changed

+749
-11
lines changed

5 files changed

+749
-11
lines changed

changelog.d/19487.feature

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Expose [MSC4354 Sticky Events](https://github.com/matrix-org/matrix-spec-proposals/pull/4354) over the legacy (v3) /sync API.

synapse/handlers/sync.py

Lines changed: 117 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
EventContentFields,
3838
EventTypes,
3939
Membership,
40+
StickyEvent,
4041
)
4142
from synapse.api.filtering import FilterCollection
4243
from synapse.api.presence import UserPresenceState
@@ -147,6 +148,7 @@ class JoinedSyncResult:
147148
state: StateMap[EventBase]
148149
ephemeral: list[JsonDict]
149150
account_data: list[JsonDict]
151+
sticky: list[EventBase]
150152
unread_notifications: JsonDict
151153
unread_thread_notifications: JsonDict
152154
summary: JsonDict | None
@@ -157,7 +159,11 @@ def __bool__(self) -> bool:
157159
to tell if room needs to be part of the sync result.
158160
"""
159161
return bool(
160-
self.timeline or self.state or self.ephemeral or self.account_data
162+
self.timeline
163+
or self.state
164+
or self.ephemeral
165+
or self.account_data
166+
or self.sticky
161167
# nb the notification count does not, er, count: if there's nothing
162168
# else in the result, we don't need to send it.
163169
)
@@ -601,6 +607,41 @@ async def ephemeral_by_room(
601607

602608
return now_token, ephemeral_by_room
603609

610+
async def sticky_events_by_room(
611+
self,
612+
sync_result_builder: "SyncResultBuilder",
613+
now_token: StreamToken,
614+
since_token: StreamToken | None = None,
615+
) -> tuple[StreamToken, dict[str, list[str]]]:
616+
"""Get the sticky events for each room the user is in
617+
Args:
618+
sync_result_builder
619+
now_token: Where the server is currently up to.
620+
since_token: Where the server was when the client last synced.
621+
Returns:
622+
A tuple of the now StreamToken, updated to reflect the which sticky
623+
events are included, and a dict mapping from room_id to a list
624+
of sticky event IDs for that room (in sticky event stream order).
625+
"""
626+
now = self.clock.time_msec()
627+
with Measure(
628+
self.clock, name="sticky_events_by_room", server_name=self.server_name
629+
):
630+
from_id = since_token.sticky_events_key if since_token else 0
631+
632+
room_ids = sync_result_builder.joined_room_ids
633+
634+
to_id, sticky_by_room = await self.store.get_sticky_events_in_rooms(
635+
room_ids,
636+
from_id=from_id,
637+
to_id=now_token.sticky_events_key,
638+
now=now,
639+
limit=StickyEvent.MAX_EVENTS_IN_SYNC,
640+
)
641+
now_token = now_token.copy_and_replace(StreamKeyType.STICKY_EVENTS, to_id)
642+
643+
return now_token, sticky_by_room
644+
604645
async def _load_filtered_recents(
605646
self,
606647
room_id: str,
@@ -2177,11 +2218,43 @@ async def _generate_sync_entry_for_rooms(
21772218
)
21782219
sync_result_builder.now_token = now_token
21792220

2221+
sticky_by_room: dict[str, list[str]] = {}
2222+
if self.hs_config.experimental.msc4354_enabled:
2223+
now_token, sticky_by_room = await self.sticky_events_by_room(
2224+
sync_result_builder, now_token, since_token
2225+
)
2226+
sync_result_builder.now_token = now_token
2227+
21802228
# 2. We check up front if anything has changed, if it hasn't then there is
21812229
# no point in going further.
2230+
#
2231+
# If this is an initial sync (no since_token), then of course we can't skip
2232+
# the sync entry, as we have no base to use as a comparison for the question
2233+
# 'has anything changed' (this is the client's first time 'seeing' anything).
2234+
#
2235+
# Otherwise, for incremental syncs, we consider skipping the sync entry,
2236+
# doing cheap checks first:
2237+
#
2238+
# - are there any per-room EDUs;
2239+
# - is there any Room Account Data; or
2240+
# - are there any sticky events in the rooms; or
2241+
# - might the rooms have changed
2242+
# (using in-memory event stream change caches, which can
2243+
# only answer either 'Not changed' or 'Possibly changed')
2244+
#
2245+
# If none of those cheap checks give us a reason to continue generating the sync entry,
2246+
# we finally query the database to check for changed room tags.
2247+
# If there are also no changed tags, we can short-circuit return an empty sync entry.
21822248
if not sync_result_builder.full_state:
2183-
if since_token and not ephemeral_by_room and not account_data_by_room:
2184-
have_changed = await self._have_rooms_changed(sync_result_builder)
2249+
# Cheap checks first
2250+
if (
2251+
since_token
2252+
and not ephemeral_by_room
2253+
and not account_data_by_room
2254+
and not sticky_by_room
2255+
):
2256+
# This is also a cheap check, but we log the answer
2257+
have_changed = self._may_have_rooms_changed(sync_result_builder)
21852258
log_kv({"rooms_have_changed": have_changed})
21862259
if not have_changed:
21872260
tags_by_room = await self.store.get_updated_tags(
@@ -2225,6 +2298,7 @@ async def handle_room_entries(room_entry: "RoomSyncResultBuilder") -> None:
22252298
ephemeral=ephemeral_by_room.get(room_entry.room_id, []),
22262299
tags=tags_by_room.get(room_entry.room_id),
22272300
account_data=account_data_by_room.get(room_entry.room_id, {}),
2301+
sticky_event_ids=sticky_by_room.get(room_entry.room_id, []),
22282302
always_include=sync_result_builder.full_state,
22292303
)
22302304
logger.debug("Generated room entry for %s", room_entry.room_id)
@@ -2237,11 +2311,9 @@ async def handle_room_entries(room_entry: "RoomSyncResultBuilder") -> None:
22372311

22382312
return set(newly_joined_rooms), set(newly_left_rooms)
22392313

2240-
async def _have_rooms_changed(
2241-
self, sync_result_builder: "SyncResultBuilder"
2242-
) -> bool:
2314+
def _may_have_rooms_changed(self, sync_result_builder: "SyncResultBuilder") -> bool:
22432315
"""Returns whether there may be any new events that should be sent down
2244-
the sync. Returns True if there are.
2316+
the sync. Returns True if there **may** be.
22452317
22462318
Does not modify the `sync_result_builder`.
22472319
"""
@@ -2611,6 +2683,7 @@ async def _generate_room_entry(
26112683
ephemeral: list[JsonDict],
26122684
tags: Mapping[str, JsonMapping] | None,
26132685
account_data: Mapping[str, JsonMapping],
2686+
sticky_event_ids: list[str],
26142687
always_include: bool = False,
26152688
) -> None:
26162689
"""Populates the `joined` and `archived` section of `sync_result_builder`
@@ -2640,6 +2713,8 @@ async def _generate_room_entry(
26402713
tags: List of *all* tags for room, or None if there has been
26412714
no change.
26422715
account_data: List of new account data for room
2716+
sticky_event_ids: MSC4354 sticky events in the room, if any.
2717+
In sticky event stream order.
26432718
always_include: Always include this room in the sync response,
26442719
even if empty.
26452720
"""
@@ -2650,7 +2725,13 @@ async def _generate_room_entry(
26502725
events = room_builder.events
26512726

26522727
# We want to shortcut out as early as possible.
2653-
if not (always_include or account_data or ephemeral or full_state):
2728+
if not (
2729+
always_include
2730+
or account_data
2731+
or ephemeral
2732+
or full_state
2733+
or sticky_event_ids
2734+
):
26542735
if events == [] and tags is None:
26552736
return
26562737

@@ -2742,6 +2823,7 @@ async def _generate_room_entry(
27422823
or account_data_events
27432824
or ephemeral
27442825
or full_state
2826+
or sticky_event_ids
27452827
):
27462828
return
27472829

@@ -2788,6 +2870,32 @@ async def _generate_room_entry(
27882870

27892871
if room_builder.rtype == "joined":
27902872
unread_notifications: dict[str, int] = {}
2873+
sticky_events: list[EventBase] = []
2874+
if sticky_event_ids:
2875+
# As per MSC4354:
2876+
# Remove sticky events that are already in the timeline, else we will needlessly duplicate
2877+
# events.
2878+
# There is no purpose in including sticky events in the sticky section if they're already in
2879+
# the timeline, as either way the client becomes aware of them.
2880+
# This is particularly important given the risk of sticky events spam since
2881+
# anyone can send sticky events, so halving the bandwidth on average for each sticky
2882+
# event is helpful.
2883+
timeline_event_id_set = {ev.event_id for ev in batch.events}
2884+
# Must preserve sticky event stream order
2885+
sticky_event_ids = [
2886+
e for e in sticky_event_ids if e not in timeline_event_id_set
2887+
]
2888+
if sticky_event_ids:
2889+
# Fetch and filter the sticky events
2890+
sticky_events = await filter_and_transform_events_for_client(
2891+
self._storage_controllers,
2892+
sync_result_builder.sync_config.user.to_string(),
2893+
await self.store.get_events_as_list(sticky_event_ids),
2894+
# As per MSC4354:
2895+
# > History visibility checks MUST NOT be applied to sticky events.
2896+
# > Any joined user is authorised to see sticky events for the duration they remain sticky.
2897+
always_include_ids=frozenset(sticky_event_ids),
2898+
)
27912899
room_sync = JoinedSyncResult(
27922900
room_id=room_id,
27932901
timeline=batch,
@@ -2798,6 +2906,7 @@ async def _generate_room_entry(
27982906
unread_thread_notifications={},
27992907
summary=summary,
28002908
unread_count=0,
2909+
sticky=sticky_events,
28012910
)
28022911

28032912
if room_sync or always_include:

synapse/rest/client/sync.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -619,6 +619,14 @@ async def encode_room(
619619
ephemeral_events = room.ephemeral
620620
result["ephemeral"] = {"events": ephemeral_events}
621621
result["unread_notifications"] = room.unread_notifications
622+
if room.sticky:
623+
# The sticky events have already been deduplicated so that events
624+
# appearing in the timeline won't appear again here
625+
result["msc4354_sticky"] = {
626+
"events": await self._event_serializer.serialize_events(
627+
room.sticky, time_now, config=serialize_options
628+
)
629+
}
622630
if room.unread_thread_notifications:
623631
result["unread_thread_notifications"] = room.unread_thread_notifications
624632
if self._msc3773_enabled:

synapse/storage/databases/main/sticky_events.py

Lines changed: 94 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,7 @@
1313
import logging
1414
import random
1515
from dataclasses import dataclass
16-
from typing import (
17-
TYPE_CHECKING,
18-
)
16+
from typing import TYPE_CHECKING, Collection, cast
1917

2018
from twisted.internet.defer import Deferred
2119

@@ -25,6 +23,7 @@
2523
DatabasePool,
2624
LoggingDatabaseConnection,
2725
LoggingTransaction,
26+
make_in_list_sql_clause,
2827
)
2928
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
3029
from synapse.storage.databases.main.state import StateGroupWorkerStore
@@ -138,6 +137,98 @@ def get_max_sticky_events_stream_id(self) -> int:
138137
def get_sticky_events_stream_id_generator(self) -> MultiWriterIdGenerator:
139138
return self._sticky_events_id_gen
140139

140+
async def get_sticky_events_in_rooms(
141+
self,
142+
room_ids: Collection[str],
143+
*,
144+
from_id: int,
145+
to_id: int,
146+
now: int,
147+
limit: int | None,
148+
) -> tuple[int, dict[str, list[str]]]:
149+
"""
150+
Fetch all the sticky events' IDs in the given rooms, with sticky stream IDs satisfying
151+
from_id < sticky stream ID <= to_id.
152+
153+
The events are returned ordered by the sticky events stream.
154+
155+
Args:
156+
room_ids: The room IDs to return sticky events in.
157+
from_id: The sticky stream ID that sticky events should be returned from (exclusive).
158+
to_id: The sticky stream ID that sticky events should end at (inclusive).
159+
now: The current time in unix millis, used for skipping expired events.
160+
limit: Max sticky events to return, or None to apply no limit.
161+
Returns:
162+
to_id, dict[room_id, list[event_ids]]
163+
"""
164+
sticky_events_rows = await self.db_pool.runInteraction(
165+
"get_sticky_events_in_rooms",
166+
self._get_sticky_events_in_rooms_txn,
167+
room_ids,
168+
from_id=from_id,
169+
to_id=to_id,
170+
now=now,
171+
limit=limit,
172+
)
173+
174+
if not sticky_events_rows:
175+
return to_id, {}
176+
177+
# Get stream_id of the last row, which is the highest
178+
new_to_id, _, _ = sticky_events_rows[-1]
179+
180+
# room ID -> event IDs
181+
room_id_to_event_ids: dict[str, list[str]] = {}
182+
for _, room_id, event_id in sticky_events_rows:
183+
events = room_id_to_event_ids.setdefault(room_id, [])
184+
events.append(event_id)
185+
186+
return (new_to_id, room_id_to_event_ids)
187+
188+
def _get_sticky_events_in_rooms_txn(
189+
self,
190+
txn: LoggingTransaction,
191+
room_ids: Collection[str],
192+
*,
193+
from_id: int,
194+
to_id: int,
195+
now: int,
196+
limit: int | None,
197+
) -> list[tuple[int, str, str]]:
198+
if len(room_ids) == 0:
199+
return []
200+
room_id_in_list_clause, room_id_in_list_values = make_in_list_sql_clause(
201+
txn.database_engine, "se.room_id", room_ids
202+
)
203+
limit_clause = ""
204+
limit_params: tuple[int, ...] = ()
205+
if limit is not None:
206+
limit_clause = "LIMIT ?"
207+
limit_params = (limit,)
208+
209+
if isinstance(self.database_engine, PostgresEngine):
210+
expr_soft_failed = "COALESCE(((ej.internal_metadata::jsonb)->>'soft_failed')::boolean, FALSE)"
211+
else:
212+
expr_soft_failed = "COALESCE(ej.internal_metadata->>'soft_failed', FALSE)"
213+
214+
txn.execute(
215+
f"""
216+
SELECT se.stream_id, se.room_id, event_id
217+
FROM sticky_events se
218+
INNER JOIN event_json ej USING (event_id)
219+
WHERE
220+
NOT {expr_soft_failed}
221+
AND ? < expires_at
222+
AND ? < stream_id
223+
AND stream_id <= ?
224+
AND {room_id_in_list_clause}
225+
ORDER BY stream_id ASC
226+
{limit_clause}
227+
""",
228+
(now, from_id, to_id, *room_id_in_list_values, *limit_params),
229+
)
230+
return cast(list[tuple[int, str, str]], txn.fetchall())
231+
141232
async def get_updated_sticky_events(
142233
self, *, from_id: int, to_id: int, limit: int
143234
) -> list[StickyEventUpdate]:

0 commit comments

Comments
 (0)