From bf0911065e14f3c419a9c4d1cc0f60ade1b1906a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 12 Jul 2024 09:52:43 +0100 Subject: [PATCH 01/10] Fix bug where sync could get stuck when using workers This is because we serialized the token wrong if the instance map contained entries from before the minimum token. --- synapse/types/__init__.py | 26 +++++++++++++++++----- tests/test_types.py | 47 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 67 insertions(+), 6 deletions(-) diff --git a/synapse/types/__init__.py b/synapse/types/__init__.py index b22a13ef016..a8af17e18cb 100644 --- a/synapse/types/__init__.py +++ b/synapse/types/__init__.py @@ -20,6 +20,7 @@ # # import abc +import logging import re import string from enum import Enum @@ -74,6 +75,9 @@ from synapse.storage.databases.main import DataStore, PurgeEventsStore from synapse.storage.databases.main.appservice import ApplicationServiceWorkerStore + +logger = logging.getLogger(__name__) + # Define a state map type from type/state_key to T (usually an event ID or # event) T = TypeVar("T") @@ -666,7 +670,10 @@ async def parse(cls, store: "PurgeEventsStore", string: str) -> "RoomStreamToken except CancelledError: raise except Exception: - pass + # We log an exception here as even though this *might* be a client + # handing a bad token, its more likely that Synapse returned a bad + # token (and we really want to catch those!). + logger.exception("Failed to parse stream token: %r", string) raise SynapseError(400, "Invalid room stream token %r" % (string,)) @classmethod @@ -727,8 +734,10 @@ async def to_string(self, store: "DataStore") -> str: instance_id = await store.get_id_for_instance(name) entries.append(f"{instance_id}.{pos}") - encoded_map = "~".join(entries) - return f"m{self.stream}~{encoded_map}" + if entries: + encoded_map = "~".join(entries) + return f"m{self.stream}~{encoded_map}" + return f"s{self.stream}" else: return "s%d" % (self.stream,) @@ -770,7 +779,10 @@ async def parse(cls, store: "DataStore", string: str) -> "MultiWriterStreamToken except CancelledError: raise except Exception: - pass + # We log an exception here as even though this *might* be a client + # handing a bad token, its more likely that Synapse returned a bad + # token (and we really want to catch those!). + logger.exception("Failed to parse stream token: %r", string) raise SynapseError(400, "Invalid stream token %r" % (string,)) async def to_string(self, store: "DataStore") -> str: @@ -786,8 +798,10 @@ async def to_string(self, store: "DataStore") -> str: instance_id = await store.get_id_for_instance(name) entries.append(f"{instance_id}.{pos}") - encoded_map = "~".join(entries) - return f"m{self.stream}~{encoded_map}" + if entries: + encoded_map = "~".join(entries) + return f"m{self.stream}~{encoded_map}" + return str(self.stream) else: return str(self.stream) diff --git a/tests/test_types.py b/tests/test_types.py index 944aa784fc7..498eea40a98 100644 --- a/tests/test_types.py +++ b/tests/test_types.py @@ -19,9 +19,14 @@ # # +from unittest import skipUnless + +from immutabledict import immutabledict + from synapse.api.errors import SynapseError from synapse.types import ( RoomAlias, + RoomStreamToken, UserID, get_domain_from_id, get_localpart_from_id, @@ -29,6 +34,7 @@ ) from tests import unittest +from tests.utils import USE_POSTGRES_FOR_TESTS class IsMineIDTests(unittest.HomeserverTestCase): @@ -127,3 +133,44 @@ def test_non_ascii(self) -> None: # this should work with either a unicode or a bytes self.assertEqual(map_username_to_mxid_localpart("têst"), "t=c3=aast") self.assertEqual(map_username_to_mxid_localpart("têst".encode()), "t=c3=aast") + + +class RoomStreamTokenTestCase(unittest.HomeserverTestCase): + def test_basic_token(self) -> None: + """Test that a simple stream token be serialized and unserialized""" + store = self.hs.get_datastores().main + + token = RoomStreamToken(stream=5) + + string_token = self.get_success(token.to_string(store)) + self.assertEqual(string_token, "s5") + + parsed_token = self.get_success(RoomStreamToken.parse(store, string_token)) + self.assertEqual(parsed_token, token) + + @skipUnless(USE_POSTGRES_FOR_TESTS, "Requires Postgres") + def test_instance_map(self) -> None: + """Test for stream token with instance map""" + store = self.hs.get_datastores().main + + token = RoomStreamToken(stream=5, instance_map=immutabledict({"foo": 6})) + + string_token = self.get_success(token.to_string(store)) + self.assertEqual(string_token, "m5~1.6") + + parsed_token = self.get_success(RoomStreamToken.parse(store, string_token)) + self.assertEqual(parsed_token, token) + + @skipUnless(USE_POSTGRES_FOR_TESTS, "Requires Postgres") + def test_instance_map_behind(self) -> None: + """Test for stream token with instance map, where instance map entries + are from before stream token.""" + store = self.hs.get_datastores().main + + token = RoomStreamToken(stream=5, instance_map=immutabledict({"foo": 4})) + + string_token = self.get_success(token.to_string(store)) + self.assertEqual(string_token, "s5") + + parsed_token = self.get_success(RoomStreamToken.parse(store, string_token)) + self.assertEqual(parsed_token, RoomStreamToken(stream=5)) From d2c8d4817d2633e5138c8add01d14ce3336b2645 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 12 Jul 2024 10:42:21 +0100 Subject: [PATCH 02/10] Add assertion to MultiWriterTokens --- synapse/types/__init__.py | 13 +++++++++++++ tests/test_types.py | 17 +++++------------ 2 files changed, 18 insertions(+), 12 deletions(-) diff --git a/synapse/types/__init__.py b/synapse/types/__init__.py index a8af17e18cb..d21638f96b7 100644 --- a/synapse/types/__init__.py +++ b/synapse/types/__init__.py @@ -458,6 +458,8 @@ class AbstractMultiWriterStreamToken(metaclass=abc.ABCMeta): represented by a default `stream` attribute and a map of instance name to stream position of any writers that are ahead of the default stream position. + + The values in `instance_map` must be greater than the `stream` attribute. """ stream: int = attr.ib(validator=attr.validators.instance_of(int), kw_only=True) @@ -472,6 +474,15 @@ class AbstractMultiWriterStreamToken(metaclass=abc.ABCMeta): kw_only=True, ) + def __attrs_post_init__(self) -> None: + # Enforce that all instances have a value greater than the min stream + # position. + for v in self.instance_map.values(): + if v < self.stream: + raise ValueError( + "'instance_map' includes a stream position before the main 'stream' attribute" + ) + @classmethod @abc.abstractmethod async def parse(cls, store: "DataStore", string: str) -> "Self": @@ -641,6 +652,8 @@ def __attrs_post_init__(self) -> None: "Cannot set both 'topological' and 'instance_map' on 'RoomStreamToken'." ) + super().__attrs_post_init__() + @classmethod async def parse(cls, store: "PurgeEventsStore", string: str) -> "RoomStreamToken": try: diff --git a/tests/test_types.py b/tests/test_types.py index 498eea40a98..3af05eb7f0c 100644 --- a/tests/test_types.py +++ b/tests/test_types.py @@ -161,16 +161,9 @@ def test_instance_map(self) -> None: parsed_token = self.get_success(RoomStreamToken.parse(store, string_token)) self.assertEqual(parsed_token, token) - @skipUnless(USE_POSTGRES_FOR_TESTS, "Requires Postgres") - def test_instance_map_behind(self) -> None: - """Test for stream token with instance map, where instance map entries - are from before stream token.""" - store = self.hs.get_datastores().main + def test_instance_map_assertion(self) -> None: + """Test that we assert values in the instance map are greater than the + min stream position""" - token = RoomStreamToken(stream=5, instance_map=immutabledict({"foo": 4})) - - string_token = self.get_success(token.to_string(store)) - self.assertEqual(string_token, "s5") - - parsed_token = self.get_success(RoomStreamToken.parse(store, string_token)) - self.assertEqual(parsed_token, RoomStreamToken(stream=5)) + with self.assertRaises(ValueError): + RoomStreamToken(stream=5, instance_map=immutabledict({"foo": 4})) From 6d86303c39dc26777e5320a49ef2ebca0f66b5dc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 12 Jul 2024 10:52:41 +0100 Subject: [PATCH 03/10] Newsfile --- changelog.d/17438.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/17438.bugfix diff --git a/changelog.d/17438.bugfix b/changelog.d/17438.bugfix new file mode 100644 index 00000000000..154080b817e --- /dev/null +++ b/changelog.d/17438.bugfix @@ -0,0 +1 @@ +Fix rare bug where the `/sync` would break for a user when using workers with multiple stream writers. From 31e6508626ef790122fddfea2b91cad4cae08af4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 12 Jul 2024 11:40:08 +0100 Subject: [PATCH 04/10] Handle existing dodgy tokens --- synapse/types/__init__.py | 10 ++++++++++ tests/test_types.py | 8 ++++++++ 2 files changed, 18 insertions(+) diff --git a/synapse/types/__init__.py b/synapse/types/__init__.py index d21638f96b7..8b96fd35a0b 100644 --- a/synapse/types/__init__.py +++ b/synapse/types/__init__.py @@ -668,6 +668,11 @@ async def parse(cls, store: "PurgeEventsStore", string: str) -> "RoomStreamToken instance_map = {} for part in parts[1:]: + if not part: + # Handle tokens of the form `m5~`, which were created by + # a bug + continue + key, value = part.split(".") instance_id = int(key) pos = int(value) @@ -778,6 +783,11 @@ async def parse(cls, store: "DataStore", string: str) -> "MultiWriterStreamToken instance_map = {} for part in parts[1:]: + if not part: + # Handle tokens of the form `m5~`, which were created by + # a bug + continue + key, value = part.split(".") instance_id = int(key) pos = int(value) diff --git a/tests/test_types.py b/tests/test_types.py index 3af05eb7f0c..ef1933fc2cd 100644 --- a/tests/test_types.py +++ b/tests/test_types.py @@ -167,3 +167,11 @@ def test_instance_map_assertion(self) -> None: with self.assertRaises(ValueError): RoomStreamToken(stream=5, instance_map=immutabledict({"foo": 4})) + + def test_parse_bad_token(self) -> None: + """Test that we can parse tokens produced by a bug in Synapse of the + form `m5~`""" + store = self.hs.get_datastores().main + + parsed_token = self.get_success(RoomStreamToken.parse(store, "m5~")) + self.assertEqual(parsed_token, RoomStreamToken(stream=5)) From 940b644ca9f865ffa842d5036bd926ee8d61284e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 12 Jul 2024 11:46:39 +0100 Subject: [PATCH 05/10] Test both types of token --- tests/test_types.py | 38 +++++++++++++++++++++++++++++--------- 1 file changed, 29 insertions(+), 9 deletions(-) diff --git a/tests/test_types.py b/tests/test_types.py index ef1933fc2cd..34906a3349f 100644 --- a/tests/test_types.py +++ b/tests/test_types.py @@ -19,12 +19,16 @@ # # +from typing import Type from unittest import skipUnless from immutabledict import immutabledict +from parameterized import parameterized_class from synapse.api.errors import SynapseError from synapse.types import ( + AbstractMultiWriterStreamToken, + MultiWriterStreamToken, RoomAlias, RoomStreamToken, UserID, @@ -135,17 +139,33 @@ def test_non_ascii(self) -> None: self.assertEqual(map_username_to_mxid_localpart("têst".encode()), "t=c3=aast") -class RoomStreamTokenTestCase(unittest.HomeserverTestCase): +@parameterized_class( + ("token_type",), + [ + (MultiWriterStreamToken,), + (RoomStreamToken,), + ], + class_name_func=lambda cls, num, params_dict: f"{cls.__name__}_{params_dict['token_type'].__name__}", +) +class MultiWriterTokenTestCase(unittest.HomeserverTestCase): + """Tests for the different types of multi writer tokens.""" + + token_type: Type[AbstractMultiWriterStreamToken] + def test_basic_token(self) -> None: """Test that a simple stream token be serialized and unserialized""" store = self.hs.get_datastores().main - token = RoomStreamToken(stream=5) + token = self.token_type(stream=5) string_token = self.get_success(token.to_string(store)) - self.assertEqual(string_token, "s5") - parsed_token = self.get_success(RoomStreamToken.parse(store, string_token)) + if isinstance(token, RoomStreamToken): + self.assertEqual(string_token, "s5") + else: + self.assertEqual(string_token, "5") + + parsed_token = self.get_success(self.token_type.parse(store, string_token)) self.assertEqual(parsed_token, token) @skipUnless(USE_POSTGRES_FOR_TESTS, "Requires Postgres") @@ -153,12 +173,12 @@ def test_instance_map(self) -> None: """Test for stream token with instance map""" store = self.hs.get_datastores().main - token = RoomStreamToken(stream=5, instance_map=immutabledict({"foo": 6})) + token = self.token_type(stream=5, instance_map=immutabledict({"foo": 6})) string_token = self.get_success(token.to_string(store)) self.assertEqual(string_token, "m5~1.6") - parsed_token = self.get_success(RoomStreamToken.parse(store, string_token)) + parsed_token = self.get_success(self.token_type.parse(store, string_token)) self.assertEqual(parsed_token, token) def test_instance_map_assertion(self) -> None: @@ -166,12 +186,12 @@ def test_instance_map_assertion(self) -> None: min stream position""" with self.assertRaises(ValueError): - RoomStreamToken(stream=5, instance_map=immutabledict({"foo": 4})) + self.token_type(stream=5, instance_map=immutabledict({"foo": 4})) def test_parse_bad_token(self) -> None: """Test that we can parse tokens produced by a bug in Synapse of the form `m5~`""" store = self.hs.get_datastores().main - parsed_token = self.get_success(RoomStreamToken.parse(store, "m5~")) - self.assertEqual(parsed_token, RoomStreamToken(stream=5)) + parsed_token = self.get_success(self.token_type.parse(store, "m5~")) + self.assertEqual(parsed_token, self.token_type(stream=5)) From 4bf6b069adf29169c809a962b8c723adf7aedf92 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 12 Jul 2024 13:30:50 +0100 Subject: [PATCH 06/10] Apply suggestions from code review Co-authored-by: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> --- changelog.d/17438.bugfix | 2 +- tests/test_types.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/changelog.d/17438.bugfix b/changelog.d/17438.bugfix index 154080b817e..cff6eecd480 100644 --- a/changelog.d/17438.bugfix +++ b/changelog.d/17438.bugfix @@ -1 +1 @@ -Fix rare bug where the `/sync` would break for a user when using workers with multiple stream writers. +Fix rare bug where `/sync` would break for a user when using workers with multiple stream writers. diff --git a/tests/test_types.py b/tests/test_types.py index 34906a3349f..4fd6123c32b 100644 --- a/tests/test_types.py +++ b/tests/test_types.py @@ -153,7 +153,7 @@ class MultiWriterTokenTestCase(unittest.HomeserverTestCase): token_type: Type[AbstractMultiWriterStreamToken] def test_basic_token(self) -> None: - """Test that a simple stream token be serialized and unserialized""" + """Test that a simple stream token can be serialized and unserialized""" store = self.hs.get_datastores().main token = self.token_type(stream=5) From 0ccc29591dfa28f64ba2f340c526fdca72030e4b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 12 Jul 2024 13:43:25 +0100 Subject: [PATCH 07/10] Review comments --- synapse/types/__init__.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/synapse/types/__init__.py b/synapse/types/__init__.py index 8b96fd35a0b..42d4ee61ed7 100644 --- a/synapse/types/__init__.py +++ b/synapse/types/__init__.py @@ -477,10 +477,10 @@ class AbstractMultiWriterStreamToken(metaclass=abc.ABCMeta): def __attrs_post_init__(self) -> None: # Enforce that all instances have a value greater than the min stream # position. - for v in self.instance_map.values(): - if v < self.stream: + for i, v in self.instance_map.items(): + if v <= self.stream: raise ValueError( - "'instance_map' includes a stream position before the main 'stream' attribute" + f"'instance_map' includes a stream position before the main 'stream' attribute. Instance: {i}" ) @classmethod @@ -509,6 +509,9 @@ def copy_and_advance(self, other: "Self") -> "Self": for instance in set(self.instance_map).union(other.instance_map) } + # Filter out any redundant entries. + instance_map = {i: s for i, s in instance_map.items() if s > max_stream} + return attr.evolve( self, stream=max_stream, instance_map=immutabledict(instance_map) ) @@ -738,6 +741,8 @@ def get_stream_pos_for_instance(self, instance_name: str) -> int: return self.instance_map.get(instance_name, self.stream) async def to_string(self, store: "DataStore") -> str: + """See class level docstring for information about the format.""" + if self.topological is not None: return "t%d-%d" % (self.topological, self.stream) elif self.instance_map: @@ -809,6 +814,8 @@ async def parse(cls, store: "DataStore", string: str) -> "MultiWriterStreamToken raise SynapseError(400, "Invalid stream token %r" % (string,)) async def to_string(self, store: "DataStore") -> str: + """See class level docstring for information about the format.""" + if self.instance_map: entries = [] for name, pos in self.instance_map.items(): From 2a49675813e581198f7e98b18b0d6ba2bbabb204 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 12 Jul 2024 13:46:07 +0100 Subject: [PATCH 08/10] Add test --- tests/test_types.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/test_types.py b/tests/test_types.py index 4fd6123c32b..00adc65a5a9 100644 --- a/tests/test_types.py +++ b/tests/test_types.py @@ -188,6 +188,9 @@ def test_instance_map_assertion(self) -> None: with self.assertRaises(ValueError): self.token_type(stream=5, instance_map=immutabledict({"foo": 4})) + with self.assertRaises(ValueError): + self.token_type(stream=5, instance_map=immutabledict({"foo": 5})) + def test_parse_bad_token(self) -> None: """Test that we can parse tokens produced by a bug in Synapse of the form `m5~`""" From 224a739085ee79f63e2d47ed89c31a1c96dfc785 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 12 Jul 2024 13:58:01 +0100 Subject: [PATCH 09/10] Fix sliding sync --- synapse/handlers/sliding_sync.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 904787ced3e..387cf787ba9 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -640,10 +640,17 @@ async def get_sync_room_ids_for_user( instance_to_max_stream_ordering_map[instance_name] = stream_ordering # Then assemble the `RoomStreamToken` + min_stream_pos = min(instance_to_max_stream_ordering_map.values()) membership_snapshot_token = RoomStreamToken( # Minimum position in the `instance_map` - stream=min(instance_to_max_stream_ordering_map.values()), - instance_map=immutabledict(instance_to_max_stream_ordering_map), + stream=min_stream_pos, + instance_map=immutabledict( + { + instance_name: stream_pos + for instance_name, stream_pos in instance_to_max_stream_ordering_map.items() + if stream_pos > min_stream_pos + } + ), ) # Since we fetched the users room list at some point in time after the from/to From 546c2a8f74aa382a7f1a581ab61b055e70dd78bb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 12 Jul 2024 13:59:26 +0100 Subject: [PATCH 10/10] Fix bound_stream_token --- synapse/types/__init__.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/synapse/types/__init__.py b/synapse/types/__init__.py index 42d4ee61ed7..3962ecc9960 100644 --- a/synapse/types/__init__.py +++ b/synapse/types/__init__.py @@ -557,10 +557,15 @@ def is_before_or_eq(self, other_token: Self) -> bool: def bound_stream_token(self, max_stream: int) -> "Self": """Bound the stream positions to a maximum value""" + min_pos = min(self.stream, max_stream) return type(self)( - stream=min(self.stream, max_stream), + stream=min_pos, instance_map=immutabledict( - {k: min(s, max_stream) for k, s in self.instance_map.items()} + { + k: min(s, max_stream) + for k, s in self.instance_map.items() + if min(s, max_stream) > min_pos + } ), )