From 2ef454dd8864cde8bd41d06c40887fdb376bd3cd Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 29 Sep 2020 11:32:30 +0100 Subject: [PATCH 1/4] Fix table scan of events on worker startup. This happened because we assumed "new" writers had an initial stream position of 0, so the replication code tried to fetch all events written by the instance between 0 and the current position. Instead, set the initial position of new writers to the current persisted up to position, on the assumption that new writers won't have written anything before that point. --- synapse/storage/util/id_generators.py | 13 ++++++++++++- tests/storage/test_id_generators.py | 11 +++++++++++ 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index 4fd7573e260d..7f126ab2051a 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -453,11 +453,22 @@ def get_current_token_for_writer(self, instance_name: str) -> int: """Returns the position of the given writer. """ + # If we don't have an entry for the given instance name, we assume its a + # new writer. + # + # For new writers we assume their initial position to be the current + # persisted up to position. This stops Synapse from doing a full table + # scan when a new writer announces itself over replication. with self._lock: - return self._return_factor * self._current_positions.get(instance_name, 0) + return self._return_factor * self._current_positions.get( + instance_name, self._persisted_upto_position + ) def get_positions(self) -> Dict[str, int]: """Get a copy of the current positon map. + + Note that this won't necessarily include all configured writers if some + writers haven't written anything yet. """ with self._lock: diff --git a/tests/storage/test_id_generators.py b/tests/storage/test_id_generators.py index 4558bee7be85..d2507ac07580 100644 --- a/tests/storage/test_id_generators.py +++ b/tests/storage/test_id_generators.py @@ -390,17 +390,28 @@ def test_writer_config_change(self): # Initial config has two writers id_gen = self._create_id_generator("first", writers=["first", "second"]) self.assertEqual(id_gen.get_persisted_upto_position(), 3) + self.assertEqual(id_gen.get_current_token_for_writer("first"), 3) + self.assertEqual(id_gen.get_current_token_for_writer("second"), 5) # New config removes one of the configs. Note that if the writer is # removed from config we assume that it has been shut down and has # finished persisting, hence why the persisted upto position is 5. id_gen_2 = self._create_id_generator("second", writers=["second"]) self.assertEqual(id_gen_2.get_persisted_upto_position(), 5) + self.assertEqual(id_gen_2.get_current_token_for_writer("second"), 5) # This config points to a single, previously unused writer. id_gen_3 = self._create_id_generator("third", writers=["third"]) self.assertEqual(id_gen_3.get_persisted_upto_position(), 5) + # For new writers we assume their initial position to be the current + # persisted up to position. This stops Synapse from doing a full table + # scan when a new writer comes along. + self.assertEqual(id_gen_3.get_current_token_for_writer("third"), 5) + + id_gen_4 = self._create_id_generator("fourth", writers=["third"]) + self.assertEqual(id_gen_4.get_current_token_for_writer("third"), 5) + # Check that we get a sane next stream ID with this new config. async def _get_next_async(): From 55834dadc0616c209f1f0f30bb48b1c9ee2e78a1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 29 Sep 2020 11:55:52 +0100 Subject: [PATCH 2/4] Consider old writers coming back as "new". Otherwise we'd try and fetch entries between the old stale token and the current position, even though it won't have written any rows. --- synapse/storage/util/id_generators.py | 13 +++++++++++++ tests/storage/test_id_generators.py | 7 +++++++ 2 files changed, 20 insertions(+) diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index 7f126ab2051a..5c315e137eb0 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -273,6 +273,19 @@ def _load_current_ids( # Load the current positions of all writers for the stream. if self._writers: + # We delete any stale entries in the positions table. This is + # important if we add back a writer after a long time; we want to + # consider that a "new" writer, rather than using the old stale + # entry here. + sql = """ + DELETE FROM stream_positions + WHERE + stream_name = ? + AND instance_name != ALL(?) + """ + sql = self._db.engine.convert_param_style(sql) + cur.execute(sql, (self._stream_name, self._writers)) + sql = """ SELECT instance_name, stream_id FROM stream_positions WHERE stream_name = ? diff --git a/tests/storage/test_id_generators.py b/tests/storage/test_id_generators.py index d2507ac07580..392b08832b05 100644 --- a/tests/storage/test_id_generators.py +++ b/tests/storage/test_id_generators.py @@ -421,6 +421,13 @@ async def _get_next_async(): self.get_success(_get_next_async()) self.assertEqual(id_gen_3.get_persisted_upto_position(), 6) + # If we add back the old "first" then we shouldn't see the persisted up + # to position revert back to 3. + id_gen_5 = self._create_id_generator("five", writers=["first", "third"]) + self.assertEqual(id_gen_5.get_persisted_upto_position(), 6) + self.assertEqual(id_gen_5.get_current_token_for_writer("first"), 6) + self.assertEqual(id_gen_5.get_current_token_for_writer("third"), 6) + def test_sequence_consistency(self): """Test that we error out if the table and sequence diverges. """ From 36750fb646eed3bf043f705ac395989ffa72126c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 29 Sep 2020 12:00:31 +0100 Subject: [PATCH 3/4] Newsfile --- changelog.d/8419.feature | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/8419.feature diff --git a/changelog.d/8419.feature b/changelog.d/8419.feature new file mode 100644 index 000000000000..b363e929ea8c --- /dev/null +++ b/changelog.d/8419.feature @@ -0,0 +1 @@ +Add experimental support for sharding event persister. From 491cb5fdb9561bc7dff8a686156fb5daa5d324cd Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 29 Sep 2020 15:48:51 +0100 Subject: [PATCH 4/4] Fix typo Co-authored-by: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> --- synapse/storage/util/id_generators.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index 5c315e137eb0..02fbb656e81c 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -466,7 +466,7 @@ def get_current_token_for_writer(self, instance_name: str) -> int: """Returns the position of the given writer. """ - # If we don't have an entry for the given instance name, we assume its a + # If we don't have an entry for the given instance name, we assume it's a # new writer. # # For new writers we assume their initial position to be the current