Skip to content

Commit 1e05a05

Browse files
committed
Add Sliding Sync /sync/e2ee endpoint for To-Device messages
Based on: - MSC3575: Sliding Sync (aka Sync v3): matrix-org/matrix-spec-proposals#3575 - MSC3885: Sliding Sync Extension: To-Device messages: matrix-org/matrix-spec-proposals#3885 - MSC3884: Sliding Sync Extension: E2EE: matrix-org/matrix-spec-proposals#3884
1 parent f9e6e53 commit 1e05a05

File tree

4 files changed

+276
-25
lines changed

4 files changed

+276
-25
lines changed

synapse/handlers/sync.py

Lines changed: 96 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
# [This file includes modifications made by New Vector Limited]
1919
#
2020
#
21+
from enum import Enum
2122
import itertools
2223
import logging
2324
from typing import (
@@ -112,12 +113,21 @@
112113
SyncRequestKey = Tuple[Any, ...]
113114

114115

116+
class SyncType(Enum):
117+
"""Enum for specifying the type of sync request."""
118+
119+
# These string values are semantically significant and are used in the the metrics
120+
INITIAL_SYNC = "initial_sync"
121+
FULL_STATE_SYNC = "full_state_sync"
122+
INCREMENTAL_SYNC = "incremental_sync"
123+
E2EE_SYNC = "e2ee_sync"
124+
125+
115126
@attr.s(slots=True, frozen=True, auto_attribs=True)
116127
class SyncConfig:
117128
user: UserID
118129
filter_collection: FilterCollection
119130
is_guest: bool
120-
request_key: SyncRequestKey
121131
device_id: Optional[str]
122132

123133

@@ -263,6 +273,15 @@ def __bool__(self) -> bool:
263273
)
264274

265275

276+
@attr.s(slots=True, frozen=True, auto_attribs=True)
277+
class E2eeSyncResult:
278+
next_batch: StreamToken
279+
to_device: List[JsonDict]
280+
# device_lists: DeviceListUpdates
281+
# device_one_time_keys_count: JsonMapping
282+
# device_unused_fallback_key_types: List[str]
283+
284+
266285
class SyncHandler:
267286
def __init__(self, hs: "HomeServer"):
268287
self.hs_config = hs.config
@@ -309,13 +328,18 @@ async def wait_for_sync_for_user(
309328
self,
310329
requester: Requester,
311330
sync_config: SyncConfig,
331+
sync_type: SyncType,
332+
request_key: SyncRequestKey,
312333
since_token: Optional[StreamToken] = None,
313334
timeout: int = 0,
314335
full_state: bool = False,
315336
) -> SyncResult:
316337
"""Get the sync for a client if we have new data for it now. Otherwise
317338
wait for new data to arrive on the server. If the timeout expires, then
318339
return an empty sync result.
340+
341+
Args:
342+
request_key: The key to use for caching the response.
319343
"""
320344
# If the user is not part of the mau group, then check that limits have
321345
# not been exceeded (if not part of the group by this point, almost certain
@@ -324,9 +348,10 @@ async def wait_for_sync_for_user(
324348
await self.auth_blocking.check_auth_blocking(requester=requester)
325349

326350
res = await self.response_cache.wrap(
327-
sync_config.request_key,
351+
request_key,
328352
self._wait_for_sync_for_user,
329353
sync_config,
354+
sync_type,
330355
since_token,
331356
timeout,
332357
full_state,
@@ -338,6 +363,7 @@ async def wait_for_sync_for_user(
338363
async def _wait_for_sync_for_user(
339364
self,
340365
sync_config: SyncConfig,
366+
sync_type: SyncType,
341367
since_token: Optional[StreamToken],
342368
timeout: int,
343369
full_state: bool,
@@ -356,13 +382,6 @@ async def _wait_for_sync_for_user(
356382
Computing the body of the response begins in the next method,
357383
`current_sync_for_user`.
358384
"""
359-
if since_token is None:
360-
sync_type = "initial_sync"
361-
elif full_state:
362-
sync_type = "full_state_sync"
363-
else:
364-
sync_type = "incremental_sync"
365-
366385
context = current_context()
367386
if context:
368387
context.tag = sync_type
@@ -384,14 +403,16 @@ async def _wait_for_sync_for_user(
384403
# we are going to return immediately, so don't bother calling
385404
# notifier.wait_for_events.
386405
result: SyncResult = await self.current_sync_for_user(
387-
sync_config, since_token, full_state=full_state
406+
sync_config, sync_type, since_token, full_state=full_state
388407
)
389408
else:
390409
# Otherwise, we wait for something to happen and report it to the user.
391410
async def current_sync_callback(
392411
before_token: StreamToken, after_token: StreamToken
393412
) -> SyncResult:
394-
return await self.current_sync_for_user(sync_config, since_token)
413+
return await self.current_sync_for_user(
414+
sync_config, sync_type, since_token
415+
)
395416

396417
result = await self.notifier.wait_for_events(
397418
sync_config.user.to_string(),
@@ -423,6 +444,7 @@ async def current_sync_callback(
423444
async def current_sync_for_user(
424445
self,
425446
sync_config: SyncConfig,
447+
sync_type: SyncType,
426448
since_token: Optional[StreamToken] = None,
427449
full_state: bool = False,
428450
) -> SyncResult:
@@ -434,9 +456,25 @@ async def current_sync_for_user(
434456
"""
435457
with start_active_span("sync.current_sync_for_user"):
436458
log_kv({"since_token": since_token})
437-
sync_result = await self.generate_sync_result(
438-
sync_config, since_token, full_state
439-
)
459+
460+
# Go through the `/sync` v2 path
461+
if sync_type in {
462+
SyncType.INITIAL_SYNC,
463+
SyncType.FULL_STATE_SYNC,
464+
SyncType.INCREMENTAL_SYNC,
465+
}:
466+
sync_result = await self.generate_sync_result(
467+
sync_config, since_token, full_state
468+
)
469+
# Go through the MSC3575 Sliding Sync `/sync/e2ee` path
470+
elif sync_type == SyncType.E2EE_SYNC:
471+
sync_result = await self.generate_e2ee_sync_result(
472+
sync_config, since_token
473+
)
474+
else:
475+
raise Exception(
476+
f"Unknown sync_type (this is a Synapse problem): {sync_type}"
477+
)
440478

441479
set_tag(SynapseTags.SYNC_RESULT, bool(sync_result))
442480
return sync_result
@@ -1751,6 +1789,50 @@ async def generate_sync_result(
17511789
next_batch=sync_result_builder.now_token,
17521790
)
17531791

1792+
async def generate_e2ee_sync_result(
1793+
self,
1794+
sync_config: SyncConfig,
1795+
since_token: Optional[StreamToken] = None,
1796+
) -> SyncResult:
1797+
"""Generates the response body of a MSC3575 Sliding Sync `/sync/e2ee` result."""
1798+
1799+
user_id = sync_config.user.to_string()
1800+
# TODO: Should we exclude app services here? There could be an argument to allow
1801+
# them since the appservice doesn't have to make a massive initial sync.
1802+
# (related to https://github.com/matrix-org/matrix-doc/issues/1144)
1803+
1804+
# NB: The now_token gets changed by some of the generate_sync_* methods,
1805+
# this is due to some of the underlying streams not supporting the ability
1806+
# to query up to a given point.
1807+
# Always use the `now_token` in `SyncResultBuilder`
1808+
now_token = self.event_sources.get_current_token()
1809+
log_kv({"now_token": now_token})
1810+
1811+
joined_room_ids = await self.store.get_rooms_for_user(user_id)
1812+
1813+
sync_result_builder = SyncResultBuilder(
1814+
sync_config,
1815+
full_state=False,
1816+
since_token=since_token,
1817+
now_token=now_token,
1818+
joined_room_ids=joined_room_ids,
1819+
# Dummy values to fill out `SyncResultBuilder`
1820+
excluded_room_ids=frozenset({}),
1821+
forced_newly_joined_room_ids=frozenset({}),
1822+
membership_change_events=frozenset({}),
1823+
)
1824+
1825+
await self._generate_sync_entry_for_to_device(sync_result_builder)
1826+
1827+
return E2eeSyncResult(
1828+
to_device=sync_result_builder.to_device,
1829+
# to_device: List[JsonDict]
1830+
# device_lists: DeviceListUpdates
1831+
# device_one_time_keys_count: JsonMapping
1832+
# device_unused_fallback_key_types: List[str]
1833+
next_batch=sync_result_builder.now_token,
1834+
)
1835+
17541836
@measure_func("_generate_sync_entry_for_device_list")
17551837
async def _generate_sync_entry_for_device_list(
17561838
self,

synapse/rest/client/sync.py

Lines changed: 102 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
KnockedSyncResult,
4242
SyncConfig,
4343
SyncResult,
44+
SyncType,
4445
)
4546
from synapse.http.server import HttpServer
4647
from synapse.http.servlet import RestServlet, parse_boolean, parse_integer, parse_string
@@ -198,14 +199,20 @@ async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
198199
user=user,
199200
filter_collection=filter_collection,
200201
is_guest=requester.is_guest,
201-
request_key=request_key,
202202
device_id=device_id,
203203
)
204204

205205
since_token = None
206206
if since is not None:
207207
since_token = await StreamToken.from_string(self.store, since)
208208

209+
if since_token is None:
210+
sync_type = SyncType.INITIAL_SYNC
211+
elif full_state:
212+
sync_type = SyncType.FULL_STATE_SYNC
213+
else:
214+
sync_type = SyncType.INCREMENTAL_SYNC
215+
209216
# send any outstanding server notices to the user.
210217
await self._server_notices_sender.on_user_syncing(user.to_string())
211218

@@ -221,6 +228,8 @@ async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
221228
sync_result = await self.sync_handler.wait_for_sync_for_user(
222229
requester,
223230
sync_config,
231+
sync_type,
232+
request_key,
224233
since_token=since_token,
225234
timeout=timeout,
226235
full_state=full_state,
@@ -554,27 +563,111 @@ async def encode_room(
554563
return result
555564

556565

557-
class SlidingSyncRestServlet(RestServlet):
566+
class SlidingSyncE2eeRestServlet(RestServlet):
558567
"""
559-
API endpoint TODO
560-
Useful for cases like TODO
568+
API endpoint for MSC3575 Sliding Sync `/sync/e2ee`. This is being introduced as part
569+
of Sliding Sync but doesn't have any sliding window component. It's just a way to
570+
get E2EE events without having to sit through a initial sync. And not have
571+
encryption events backed up by the main sync response.
572+
573+
GET parameters::
574+
timeout(int): How long to wait for new events in milliseconds.
575+
since(batch_token): Batch token when asking for incremental deltas.
576+
577+
Response JSON::
578+
{
579+
"next_batch": // batch token for the next /sync
580+
"to_device": {
581+
// list of to-device events
582+
"events": [
583+
{
584+
"content: { "algorithm": "m.olm.v1.curve25519-aes-sha2", "ciphertext": { ... }, "org.matrix.msgid": "abcd", "session_id": "abcd" },
585+
"type": "m.room.encrypted",
586+
"sender": "@alice:example.com",
587+
}
588+
// ...
589+
]
590+
},
591+
"device_one_time_keys_count": {
592+
"signed_curve25519": 50
593+
},
594+
"device_lists": {
595+
"changed": ["@alice:example.com"],
596+
"left": ["@bob:example.com"]
597+
},
598+
"device_unused_fallback_key_types": [
599+
"signed_curve25519"
600+
]
601+
}
561602
"""
562603

563604
PATTERNS = (re.compile("^/_matrix/client/unstable/org.matrix.msc3575/sync/e2ee$"),)
564605

565606
def __init__(self, hs: "HomeServer"):
566607
super().__init__()
567-
self._auth = hs.get_auth()
608+
self.auth = hs.get_auth()
568609
self.store = hs.get_datastores().main
610+
self.filtering = hs.get_filtering()
611+
self.sync_handler = hs.get_sync_handler()
569612

570613
async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
571-
return 200, {
572-
"foo": "bar",
573-
}
614+
requester = await self.auth.get_user_by_req(request, allow_guest=True)
615+
user = requester.user
616+
device_id = requester.device_id
617+
618+
timeout = parse_integer(request, "timeout", default=0)
619+
since = parse_string(request, "since")
620+
621+
sync_config = SyncConfig(
622+
user=user,
623+
# Filtering doesn't apply to this endpoint so just use a default to fill in
624+
# the SyncConfig
625+
filter_collection=self.filtering.DEFAULT_FILTER_COLLECTION,
626+
is_guest=requester.is_guest,
627+
device_id=device_id,
628+
)
629+
sync_type = SyncType.E2EE_SYNC
630+
631+
since_token = None
632+
if since is not None:
633+
since_token = await StreamToken.from_string(self.store, since)
634+
635+
# Request cache key
636+
request_key = (
637+
sync_type,
638+
user,
639+
timeout,
640+
since,
641+
)
642+
643+
# Gather data for the response
644+
sync_result = await self.sync_handler.wait_for_sync_for_user(
645+
requester,
646+
sync_config,
647+
sync_type,
648+
request_key,
649+
since_token=since_token,
650+
timeout=timeout,
651+
full_state=False,
652+
)
653+
654+
# The client may have disconnected by now; don't bother to serialize the
655+
# response if so.
656+
if request._disconnected:
657+
logger.info("Client has disconnected; not serializing response.")
658+
return 200, {}
659+
660+
response: JsonDict = defaultdict(dict)
661+
response["next_batch"] = await sync_result.next_batch.to_string(self.store)
662+
663+
if sync_result.to_device:
664+
response["to_device"] = {"events": sync_result.to_device}
665+
666+
return 200, response
574667

575668

576669
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
577670
SyncRestServlet(hs).register(http_server)
578671

579672
if hs.config.experimental.msc3575_enabled:
580-
SlidingSyncRestServlet(hs).register(http_server)
673+
SlidingSyncE2eeRestServlet(hs).register(http_server)

tests/rest/client/test_sendtodevice.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,9 @@ def test_user_to_user(self) -> None:
6767
}
6868
self.assertEqual(channel.json_body["to_device"], expected_result)
6969

70-
# it should re-appear if we do another sync
70+
# it should re-appear if we do another sync because the to-device message is not
71+
# deleted until we acknowledge it by sending a `?since=...` parameter in the
72+
# next sync request corresponding to the `next_batch` value from the response.
7173
channel = self.make_request("GET", "/sync", access_token=user2_tok)
7274
self.assertEqual(channel.code, 200, channel.result)
7375
self.assertEqual(channel.json_body["to_device"], expected_result)
@@ -99,7 +101,7 @@ def test_local_room_key_request(self) -> None:
99101
)
100102
self.assertEqual(chan.code, 200, chan.result)
101103

102-
# now sync: we should get two of the three
104+
# now sync: we should get two of the three (because burst_count=2)
103105
channel = self.make_request("GET", "/sync", access_token=user2_tok)
104106
self.assertEqual(channel.code, 200, channel.result)
105107
msgs = channel.json_body["to_device"]["events"]

0 commit comments

Comments
 (0)