From d203d22a6f07bcd2c445419caae6911f4b2d3687 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 18 Nov 2021 18:43:08 -0600 Subject: [PATCH 1/6] Refactor backfilled behavior into specific function parameters Part of https://github.com/matrix-org/synapse/issues/11300 --- synapse/handlers/federation_event.py | 12 +++++--- synapse/replication/http/federation.py | 17 ++++++----- synapse/storage/databases/main/events.py | 39 ++++++++++++++++++------ synapse/storage/persist_events.py | 10 ++++-- 4 files changed, 54 insertions(+), 24 deletions(-) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 9917613298c6..ea625692a389 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -1821,7 +1821,8 @@ async def persist_events_and_notify( self, room_id: str, event_and_contexts: Sequence[Tuple[EventBase, EventContext]], - backfilled: bool = False, + *, + inhibit_push_notifications: bool = False, ) -> int: """Persists events and tells the notifier/pushers about them, if necessary. @@ -1831,8 +1832,9 @@ async def persist_events_and_notify( event_and_contexts: Sequence of events with their associated context that should be persisted. All events must belong to the same room. - backfilled: Whether these events are a result of - backfilling or not + inhibit_push_notifications: Whether to stop the notifiers/pushers + from knowing about the event. Usually this is done for any backfilled + event. Returns: The stream ID after which all events have been persisted. @@ -1850,7 +1852,7 @@ async def persist_events_and_notify( store=self._store, room_id=room_id, event_and_contexts=batch, - backfilled=backfilled, + inhibit_push_notifications=inhibit_push_notifications, ) return result["max_stream_id"] else: @@ -1867,7 +1869,7 @@ async def persist_events_and_notify( # If there's an expiry timestamp on the event, schedule its expiry. self._message_handler.maybe_schedule_expiry(event) - if not backfilled: # Never notify for backfilled events + if not inhibit_push_notifications: # Never notify for backfilled events for event in events: await self._notify_persisted_event(event, max_stream_token) diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py index 5ed535c90dea..d6be52ab711c 100644 --- a/synapse/replication/http/federation.py +++ b/synapse/replication/http/federation.py @@ -47,7 +47,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint): "rejected_reason": .., // The event.rejected_reason field "context": { .. serialized event context .. }, }], - "backfilled": false + "inhibit_push_notifications": false } 200 OK @@ -69,14 +69,17 @@ def __init__(self, hs: "HomeServer"): self.federation_event_handler = hs.get_federation_event_handler() @staticmethod - async def _serialize_payload(store, room_id, event_and_contexts, backfilled): + async def _serialize_payload( + store, room_id, event_and_contexts, inhibit_push_notifications: bool = False + ): """ Args: store room_id (str) event_and_contexts (list[tuple[FrozenEvent, EventContext]]) - backfilled (bool): Whether or not the events are the result of - backfilling + inhibit_push_notifications (bool): Whether to stop the notifiers/pushers + from knowing about the event. Usually this is done for any backfilled + event. """ event_payloads = [] for event, context in event_and_contexts: @@ -96,7 +99,7 @@ async def _serialize_payload(store, room_id, event_and_contexts, backfilled): payload = { "events": event_payloads, - "backfilled": backfilled, + "inhibit_push_notifications": inhibit_push_notifications, "room_id": room_id, } @@ -107,7 +110,7 @@ async def _handle_request(self, request): content = parse_json_object_from_request(request) room_id = content["room_id"] - backfilled = content["backfilled"] + inhibit_push_notifications = content["inhibit_push_notifications"] event_payloads = content["events"] @@ -132,7 +135,7 @@ async def _handle_request(self, request): logger.info("Got %d events from federation", len(event_and_contexts)) max_stream_id = await self.federation_event_handler.persist_events_and_notify( - room_id, event_and_contexts, backfilled + room_id, event_and_contexts, inhibit_push_notifications ) return 200, {"max_stream_id": max_stream_id} diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 120e4807d161..65ba2963d8a1 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -121,10 +121,11 @@ def __init__( async def _persist_events_and_state_updates( self, events_and_contexts: List[Tuple[EventBase, EventContext]], + *, current_state_for_room: Dict[str, StateMap[str]], state_delta_for_room: Dict[str, DeltaState], new_forward_extremeties: Dict[str, List[str]], - backfilled: bool = False, + use_negative_stream_ordering: bool = False, ) -> None: """Persist a set of events alongside updates to the current state and forward extremities tables. @@ -137,7 +138,9 @@ async def _persist_events_and_state_updates( room state new_forward_extremities: Map from room_id to list of event IDs that are the new forward extremities of the room. - backfilled + use_negative_stream_ordering: Whether to start stream_ordering on + the negative side and decrement. Usually this is done for any + backfilled event. Returns: Resolves when the events have been persisted @@ -159,7 +162,7 @@ async def _persist_events_and_state_updates( # # Note: Multiple instances of this function cannot be in flight at # the same time for the same room. - if backfilled: + if use_negative_stream_ordering: stream_ordering_manager = self._backfill_id_gen.get_next_mult( len(events_and_contexts) ) @@ -182,7 +185,8 @@ async def _persist_events_and_state_updates( ) persist_event_counter.inc(len(events_and_contexts)) - if not backfilled: + # TODO: test that this actuall works + if stream < 0: # backfilled events have negative stream orderings, so we don't # want to set the event_persisted_position to that. synapse.metrics.event_persisted_position.set( @@ -1200,7 +1204,8 @@ def _update_room_depths_txn( self, txn, events_and_contexts: List[Tuple[EventBase, EventContext]], - backfilled: bool, + *, + update_room_forward_stream_ordering: bool = True, ): """Update min_depth for each room @@ -1208,13 +1213,16 @@ def _update_room_depths_txn( txn (twisted.enterprise.adbapi.Connection): db connection events_and_contexts (list[(EventBase, EventContext)]): events we are persisting - backfilled (bool): True if the events were backfilled + update_room_forward_stream_ordering (bool): Whether to update the + stream_ordering position to mark the latest event as the front + of the room. This should only be set as false for backfilled + events. """ depth_updates: Dict[str, int] = {} for event, context in events_and_contexts: # Remove the any existing cache entries for the event_ids txn.call_after(self.store._invalidate_get_event_cache, event.event_id) - if not backfilled: + if update_room_forward_stream_ordering: txn.call_after( self.store._events_stream_cache.entity_has_changed, event.room_id, @@ -1638,8 +1646,19 @@ def _store_event_reference_hashes_txn(self, txn, events): txn, table="event_reference_hashes", values=vals ) - def _store_room_members_txn(self, txn, events, backfilled): - """Store a room member in the database.""" + def _store_room_members_txn( + self, txn, events, *, inhibit_local_membership_updates: bool = False + ): + """ + Store a room member in the database. + + Args: + txn: The transaction to use. + events: List of events to store. + inhibit_local_membership_updates: Stop the local_current_membership + from being updated by these events. Usually this is done for + backfilled events. + """ def non_null_str_or_none(val: Any) -> Optional[str]: return val if isinstance(val, str) and "\u0000" not in val else None @@ -1682,7 +1701,7 @@ def non_null_str_or_none(val: Any) -> Optional[str]: # band membership", like a remote invite or a rejection of a remote invite. if ( self.is_mine_id(event.state_key) - and not backfilled + and not inhibit_local_membership_updates and event.internal_metadata.is_outlier() and event.internal_metadata.is_out_of_band_membership() ): diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py index 402f134d894b..dc8d3e8effd3 100644 --- a/synapse/storage/persist_events.py +++ b/synapse/storage/persist_events.py @@ -379,13 +379,19 @@ async def persist_event( async def _persist_event_batch( self, events_and_contexts: List[Tuple[EventBase, EventContext]], - backfilled: bool = False, + should_calculate_state_and_forward_extrems: bool = True, ) -> Dict[str, str]: """Callback for the _event_persist_queue Calculates the change to current state and forward extremities, and persists the given events and with those updates. + Args: + events_and_contexts: + should_calculate_state_and_forward_extrems: Determines whether we + need to calculate the state and new forward extremities for the + room. This should be set to false for backfilled events. + Returns: A dictionary of event ID to event ID we didn't persist as we already had another event persisted with the same TXN ID. @@ -448,7 +454,7 @@ async def _persist_event_batch( # device lists as stale. potentially_left_users: Set[str] = set() - if not backfilled: + if should_calculate_state_and_forward_extrems: with Measure(self._clock, "_calculate_state_and_extrem"): # Work out the new "current state" for each room. # We do this by working out what the new extremities are and then From b8c60b9ad2d13815e868b6ea6159d5b77621d861 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 18 Nov 2021 20:09:38 -0600 Subject: [PATCH 2/6] Pipe arguments across the function stack --- synapse/handlers/federation_event.py | 42 +++++++++- synapse/handlers/message.py | 18 ++-- synapse/replication/http/federation.py | 53 +++++++++++- synapse/storage/databases/main/events.py | 44 ++++++++-- synapse/storage/persist_events.py | 82 +++++++++++++++++-- .../replication/slave/storage/test_events.py | 7 +- 6 files changed, 217 insertions(+), 29 deletions(-) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index ea625692a389..46606775d399 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -1809,7 +1809,24 @@ async def _run_push_actions_and_persist_event( try: await self.persist_events_and_notify( - event.room_id, [(event, context)], backfilled=backfilled + event.room_id, + [(event, context)], + # We should not send notifications about backfilled events. + inhibit_push_notifications=backfilled, + # We don't need to calculate the state for backfilled events and + # we there is no need to update the forward extrems because we + # already know this event happened in the past if it was + # backfilled. + should_calculate_state_and_forward_extrems=not backfilled, + # Backfilled events get a negative stream ordering so they don't + # come down incremental `/sync` + use_negative_stream_ordering=backfilled, + # Backfilled events do not affect the current local state + inhibit_local_membership_updates=backfilled, + # Backfilled events have negative stream ordering and happened + # in the past so we know that we don't need to update the + # stream_ordering tip for the room. + update_room_forward_stream_ordering=not backfilled, ) except Exception: run_in_background( @@ -1823,6 +1840,10 @@ async def persist_events_and_notify( event_and_contexts: Sequence[Tuple[EventBase, EventContext]], *, inhibit_push_notifications: bool = False, + should_calculate_state_and_forward_extrems: bool = True, + use_negative_stream_ordering: bool = False, + inhibit_local_membership_updates: bool = False, + update_room_forward_stream_ordering: bool = True, ) -> int: """Persists events and tells the notifier/pushers about them, if necessary. @@ -1835,6 +1856,19 @@ async def persist_events_and_notify( inhibit_push_notifications: Whether to stop the notifiers/pushers from knowing about the event. Usually this is done for any backfilled event. + should_calculate_state_and_forward_extrems: Determines whether we + need to calculate the state and new forward extremities for the + room. This should be set to false for backfilled events. + use_negative_stream_ordering: Whether to start stream_ordering on + the negative side and decrement. Usually this is done for any + backfilled event. + inhibit_local_membership_updates: Stop the local_current_membership + from being updated by these events. Usually this is done for + backfilled events. + update_room_forward_stream_ordering: Whether to update the + stream_ordering position to mark the latest event as the front + of the room. This should only be set as false for backfilled + events. Returns: The stream ID after which all events have been persisted. @@ -1861,7 +1895,11 @@ async def persist_events_and_notify( # Note that this returns the events that were persisted, which may not be # the same as were passed in if some were deduplicated due to transaction IDs. events, max_stream_token = await self._storage.persistence.persist_events( - event_and_contexts, backfilled=backfilled + event_and_contexts, + should_calculate_state_and_forward_extrems=should_calculate_state_and_forward_extrems, + use_negative_stream_ordering=use_negative_stream_ordering, + inhibit_local_membership_updates=inhibit_local_membership_updates, + update_room_forward_stream_ordering=update_room_forward_stream_ordering, ) if self._ephemeral_messages_enabled: diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 22dd4cf5fd3f..cd3c64d2d6d6 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -1565,12 +1565,6 @@ async def persist_and_notify_client_event( errcode=Codes.INVALID_PARAM, ) - # Mark any `m.historical` messages as backfilled so they don't appear - # in `/sync` and have the proper decrementing `stream_ordering` as we import - backfilled = False - if event.internal_metadata.is_historical(): - backfilled = True - # Note that this returns the event that was persisted, which may not be # the same as we passed in if it was deduplicated due transaction IDs. ( @@ -1578,7 +1572,17 @@ async def persist_and_notify_client_event( event_pos, max_stream_token, ) = await self.storage.persistence.persist_event( - event, context=context, backfilled=backfilled + event, + context=context, + # Make any historical messages behave like backfilled events + should_calculate_state_and_forward_extrems=not event.internal_metadata.is_historical(), + # We use a negative `stream_ordering`` for historical messages so + # they don't come down an incremental `/sync` and have the proper + # decrementing `stream_ordering` as we import so they sort + # as expected between two depths. + use_negative_stream_ordering=event.internal_metadata.is_historical(), + inhibit_local_membership_updates=event.internal_metadata.is_historical(), + update_room_forward_stream_ordering=not event.internal_metadata.is_historical(), ) if self._ephemeral_events_enabled: diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py index d6be52ab711c..68c6a470f729 100644 --- a/synapse/replication/http/federation.py +++ b/synapse/replication/http/federation.py @@ -70,16 +70,37 @@ def __init__(self, hs: "HomeServer"): @staticmethod async def _serialize_payload( - store, room_id, event_and_contexts, inhibit_push_notifications: bool = False + store, + room_id, + event_and_contexts, + *, + inhibit_push_notifications: bool = False, + should_calculate_state_and_forward_extrems: bool = True, + use_negative_stream_ordering: bool = False, + inhibit_local_membership_updates: bool = False, + update_room_forward_stream_ordering: bool = True, ): """ Args: store room_id (str) event_and_contexts (list[tuple[FrozenEvent, EventContext]]) - inhibit_push_notifications (bool): Whether to stop the notifiers/pushers + inhibit_push_notifications: Whether to stop the notifiers/pushers from knowing about the event. Usually this is done for any backfilled event. + should_calculate_state_and_forward_extrems: Determines whether we + need to calculate the state and new forward extremities for the + room. This should be set to false for backfilled events. + use_negative_stream_ordering: Whether to start stream_ordering on + the negative side and decrement. Usually this is done for any + backfilled event. + inhibit_local_membership_updates: Stop the local_current_membership + from being updated by these events. Usually this is done for + backfilled events. + update_room_forward_stream_ordering: Whether to update the + stream_ordering position to mark the latest event as the front + of the room. This should only be set as false for backfilled + events. """ event_payloads = [] for event, context in event_and_contexts: @@ -100,6 +121,10 @@ async def _serialize_payload( payload = { "events": event_payloads, "inhibit_push_notifications": inhibit_push_notifications, + "should_calculate_state_and_forward_extrems": should_calculate_state_and_forward_extrems, + "use_negative_stream_ordering": use_negative_stream_ordering, + "inhibit_local_membership_updates": inhibit_local_membership_updates, + "update_room_forward_stream_ordering": update_room_forward_stream_ordering, "room_id": room_id, } @@ -111,6 +136,22 @@ async def _handle_request(self, request): room_id = content["room_id"] inhibit_push_notifications = content["inhibit_push_notifications"] + should_calculate_state_and_forward_extrems = content[ + "should_calculate_state_and_forward_extrems" + ] + use_negative_stream_ordering = content["use_negative_stream_ordering"] + inhibit_local_membership_updates = content[ + "inhibit_local_membership_updates" + ] + update_room_forward_stream_ordering = content[ + "update_room_forward_stream_ordering" + ] + + assert inhibit_push_notifications is not None + assert should_calculate_state_and_forward_extrems is not None + assert use_negative_stream_ordering is not None + assert inhibit_local_membership_updates is not None + assert update_room_forward_stream_ordering is not None event_payloads = content["events"] @@ -135,7 +176,13 @@ async def _handle_request(self, request): logger.info("Got %d events from federation", len(event_and_contexts)) max_stream_id = await self.federation_event_handler.persist_events_and_notify( - room_id, event_and_contexts, inhibit_push_notifications + room_id, + event_and_contexts, + inhibit_push_notifications=inhibit_push_notifications, + should_calculate_state_and_forward_extrems=should_calculate_state_and_forward_extrems, + use_negative_stream_ordering=use_negative_stream_ordering, + inhibit_local_membership_updates=inhibit_local_membership_updates, + update_room_forward_stream_ordering=update_room_forward_stream_ordering, ) return 200, {"max_stream_id": max_stream_id} diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 65ba2963d8a1..1e1fd8e42572 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -126,6 +126,8 @@ async def _persist_events_and_state_updates( state_delta_for_room: Dict[str, DeltaState], new_forward_extremeties: Dict[str, List[str]], use_negative_stream_ordering: bool = False, + inhibit_local_membership_updates: bool = False, + update_room_forward_stream_ordering: bool = True, ) -> None: """Persist a set of events alongside updates to the current state and forward extremities tables. @@ -141,6 +143,13 @@ async def _persist_events_and_state_updates( use_negative_stream_ordering: Whether to start stream_ordering on the negative side and decrement. Usually this is done for any backfilled event. + inhibit_local_membership_updates: Stop the local_current_membership + from being updated by these events. Usually this is done for + backfilled events. + update_room_forward_stream_ordering: Whether to update the + stream_ordering position to mark the latest event as the front + of the room. This should only be set as false for backfilled + events. Returns: Resolves when the events have been persisted @@ -179,7 +188,8 @@ async def _persist_events_and_state_updates( "persist_events", self._persist_events_txn, events_and_contexts=events_and_contexts, - backfilled=backfilled, + inhibit_local_membership_updates=inhibit_local_membership_updates, + update_room_forward_stream_ordering=update_room_forward_stream_ordering, state_delta_for_room=state_delta_for_room, new_forward_extremeties=new_forward_extremeties, ) @@ -320,8 +330,10 @@ def _get_prevs_before_rejected_txn(txn, batch): def _persist_events_txn( self, txn: LoggingTransaction, + *, events_and_contexts: List[Tuple[EventBase, EventContext]], - backfilled: bool, + inhibit_local_membership_updates: bool = False, + update_room_forward_stream_ordering: bool = True, state_delta_for_room: Optional[Dict[str, DeltaState]] = None, new_forward_extremeties: Optional[Dict[str, List[str]]] = None, ): @@ -335,11 +347,18 @@ def _persist_events_txn( txn events_and_contexts: events to persist backfilled: True if the events were backfilled + inhibit_local_membership_updates: Stop the local_current_membership + from being updated by these events. Usually this is done for + backfilled events. + update_room_forward_stream_ordering: Whether to update the + stream_ordering position to mark the latest event as the front + of the room. This should only be set as false for backfilled + events. delete_existing True to purge existing table rows for the events from the database. This is useful when retrying due to IntegrityError. state_delta_for_room: The current-state delta for each room. - new_forward_extremetie: The new forward extremities for each room. + new_forward_extremeties: The new forward extremities for each room. For each room, a list of the event ids which are the forward extremities. @@ -368,7 +387,9 @@ def _persist_events_txn( ) self._update_room_depths_txn( - txn, events_and_contexts=events_and_contexts, backfilled=backfilled + txn, + events_and_contexts=events_and_contexts, + update_room_forward_stream_ordering=update_room_forward_stream_ordering, ) # _update_outliers_txn filters out any events which have already been @@ -402,7 +423,7 @@ def _persist_events_txn( txn, events_and_contexts=events_and_contexts, all_events_and_contexts=all_events_and_contexts, - backfilled=backfilled, + inhibit_local_membership_updates=inhibit_local_membership_updates, ) # We call this last as it assumes we've inserted the events into @@ -1435,7 +1456,12 @@ def _store_rejected_events_txn(self, txn, events_and_contexts): return [ec for ec in events_and_contexts if ec[0] not in to_remove] def _update_metadata_tables_txn( - self, txn, events_and_contexts, all_events_and_contexts, backfilled + self, + txn, + *, + events_and_contexts, + all_events_and_contexts, + inhibit_local_membership_updates: bool = False, ): """Update all the miscellaneous tables for new events @@ -1447,7 +1473,9 @@ def _update_metadata_tables_txn( events that we were going to persist. This includes events we've already persisted, etc, that wouldn't appear in events_and_context. - backfilled (bool): True if the events were backfilled + inhibit_local_membership_updates: Stop the local_current_membership + from being updated by these events. Usually this is done for + backfilled events. """ # Insert all the push actions into the event_push_actions table. @@ -1521,7 +1549,7 @@ def _update_metadata_tables_txn( for event, _ in events_and_contexts if event.type == EventTypes.Member ], - backfilled=backfilled, + inhibit_local_membership_updates=inhibit_local_membership_updates, ) # Insert event_reference_hashes table. diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py index dc8d3e8effd3..101b469bb41f 100644 --- a/synapse/storage/persist_events.py +++ b/synapse/storage/persist_events.py @@ -296,15 +296,29 @@ def __init__(self, hs: "HomeServer", stores: Databases): async def persist_events( self, events_and_contexts: Iterable[Tuple[EventBase, EventContext]], - backfilled: bool = False, + *, + should_calculate_state_and_forward_extrems: bool = True, + use_negative_stream_ordering: bool = False, + inhibit_local_membership_updates: bool = False, + update_room_forward_stream_ordering: bool = True, ) -> Tuple[List[EventBase], RoomStreamToken]: """ Write events to the database Args: events_and_contexts: list of tuples of (event, context) - backfilled: Whether the results are retrieved from federation - via backfill or not. Used to determine if they're "new" events - which might update the current state etc. + should_calculate_state_and_forward_extrems: Determines whether we + need to calculate the state and new forward extremities for the + room. This should be set to false for backfilled events. + use_negative_stream_ordering: Whether to start stream_ordering on + the negative side and decrement. Usually this is done for any + backfilled event. + inhibit_local_membership_updates: Stop the local_current_membership + from being updated by these events. Usually this is done for + backfilled events. + update_room_forward_stream_ordering: Whether to update the + stream_ordering position to mark the latest event as the front + of the room. This should only be set as false for backfilled + events. Returns: List of events persisted, the current position room stream position. @@ -320,7 +334,12 @@ async def persist_events( async def enqueue(item): room_id, evs_ctxs = item return await self._event_persist_queue.add_to_queue( - room_id, evs_ctxs, backfilled=backfilled + room_id, + evs_ctxs, + should_calculate_state_and_forward_extrems=should_calculate_state_and_forward_extrems, + use_negative_stream_ordering=use_negative_stream_ordering, + inhibit_local_membership_updates=inhibit_local_membership_updates, + update_room_forward_stream_ordering=update_room_forward_stream_ordering, ) ret_vals = await yieldable_gather_results(enqueue, partitioned.items()) @@ -350,9 +369,35 @@ async def enqueue(item): @opentracing.trace async def persist_event( - self, event: EventBase, context: EventContext, backfilled: bool = False + self, + event: EventBase, + context: EventContext, + *, + should_calculate_state_and_forward_extrems: bool = True, + use_negative_stream_ordering: bool = False, + inhibit_local_membership_updates: bool = False, + update_room_forward_stream_ordering: bool = True, ) -> Tuple[EventBase, PersistedEventPosition, RoomStreamToken]: """ + Write a single event to the database. + + Args: + event: + context: + should_calculate_state_and_forward_extrems: Determines whether we + need to calculate the state and new forward extremities for the + room. This should be set to false for backfilled events. + use_negative_stream_ordering: Whether to start stream_ordering on + the negative side and decrement. Usually this is done for any + backfilled event. + inhibit_local_membership_updates: Stop the local_current_membership + from being updated by these events. Usually this is done for + backfilled events. + update_room_forward_stream_ordering: Whether to update the + stream_ordering position to mark the latest event as the front + of the room. This should only be set as false for backfilled + events. + Returns: The event, stream ordering of `event`, and the stream ordering of the latest persisted event. The returned event may not match the given @@ -363,7 +408,12 @@ async def persist_event( # event was deduplicated. (The dict may also include other entries if # the event was persisted in a batch with other events.) replaced_events = await self._event_persist_queue.add_to_queue( - event.room_id, [(event, context)], backfilled=backfilled + event.room_id, + [(event, context)], + should_calculate_state_and_forward_extrems=should_calculate_state_and_forward_extrems, + use_negative_stream_ordering=use_negative_stream_ordering, + inhibit_local_membership_updates=inhibit_local_membership_updates, + update_room_forward_stream_ordering=update_room_forward_stream_ordering, ) replaced_event = replaced_events.get(event.event_id) if replaced_event: @@ -379,7 +429,11 @@ async def persist_event( async def _persist_event_batch( self, events_and_contexts: List[Tuple[EventBase, EventContext]], + *, should_calculate_state_and_forward_extrems: bool = True, + use_negative_stream_ordering: bool = False, + inhibit_local_membership_updates: bool = False, + update_room_forward_stream_ordering: bool = True, ) -> Dict[str, str]: """Callback for the _event_persist_queue @@ -391,6 +445,16 @@ async def _persist_event_batch( should_calculate_state_and_forward_extrems: Determines whether we need to calculate the state and new forward extremities for the room. This should be set to false for backfilled events. + use_negative_stream_ordering: Whether to start stream_ordering on + the negative side and decrement. Usually this is done for any + backfilled event. + inhibit_local_membership_updates: Stop the local_current_membership + from being updated by these events. Usually this is done for + backfilled events. + update_room_forward_stream_ordering: Whether to update the + stream_ordering position to mark the latest event as the front + of the room. This should only be set as false for backfilled + events. Returns: A dictionary of event ID to event ID we didn't persist as we already @@ -589,7 +653,9 @@ async def _persist_event_batch( current_state_for_room=current_state_for_room, state_delta_for_room=state_delta_for_room, new_forward_extremeties=new_forward_extremeties, - backfilled=backfilled, + use_negative_stream_ordering=use_negative_stream_ordering, + inhibit_local_membership_updates=inhibit_local_membership_updates, + update_room_forward_stream_ordering=update_room_forward_stream_ordering, ) await self._handle_potentially_left_users(potentially_left_users) diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/slave/storage/test_events.py index b25a06b4271a..dd54d8908103 100644 --- a/tests/replication/slave/storage/test_events.py +++ b/tests/replication/slave/storage/test_events.py @@ -323,7 +323,12 @@ def persist(self, backfill=False, **kwargs): if backfill: self.get_success( self.storage.persistence.persist_events( - [(event, context)], backfilled=True + [(event, context)], + # Backfilled event + should_calculate_state_and_forward_extrems=False, + use_negative_stream_ordering=True, + inhibit_local_membership_updates=True, + update_room_forward_stream_ordering=False, ) ) else: From d44fdcc8e60097eaa1258fa3946b07bf2074293c Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 19 Nov 2021 00:39:02 -0600 Subject: [PATCH 3/6] Fix event queue --- synapse/storage/persist_events.py | 54 +++++++++++++++++++++++++++---- 1 file changed, 47 insertions(+), 7 deletions(-) diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py index 101b469bb41f..18b991d60748 100644 --- a/synapse/storage/persist_events.py +++ b/synapse/storage/persist_events.py @@ -35,6 +35,7 @@ ) import attr +from mypy_extensions import DefaultNamedArg from prometheus_client import Counter, Histogram from twisted.internet import defer @@ -112,7 +113,10 @@ @attr.s(auto_attribs=True, slots=True) class _EventPersistQueueItem: events_and_contexts: List[Tuple[EventBase, EventContext]] - backfilled: bool + should_calculate_state_and_forward_extrems: bool + use_negative_stream_ordering: bool + inhibit_local_membership_updates: bool + update_room_forward_stream_ordering: bool deferred: ObservableDeferred parent_opentracing_span_contexts: List = attr.ib(factory=list) @@ -133,7 +137,17 @@ class _EventPeristenceQueue(Generic[_PersistResult]): def __init__( self, per_item_callback: Callable[ - [List[Tuple[EventBase, EventContext]], bool], + [ + List[Tuple[EventBase, EventContext]], + DefaultNamedArg( + bool, "should_calculate_state_and_forward_extrems" # noqa: F821 + ), + DefaultNamedArg(bool, "use_negative_stream_ordering"), # noqa: F821 + DefaultNamedArg(bool, "inhibit_local_membership_updates"), # noqa: F821 + DefaultNamedArg( + bool, "update_room_forward_stream_ordering" # noqa: F821 + ), + ], Awaitable[_PersistResult], ], ): @@ -150,7 +164,11 @@ async def add_to_queue( self, room_id: str, events_and_contexts: Iterable[Tuple[EventBase, EventContext]], - backfilled: bool, + *, + should_calculate_state_and_forward_extrems: bool = True, + use_negative_stream_ordering: bool = False, + inhibit_local_membership_updates: bool = False, + update_room_forward_stream_ordering: bool = True, ) -> _PersistResult: """Add events to the queue, with the given persist_event options. @@ -160,7 +178,19 @@ async def add_to_queue( Args: room_id (str): events_and_contexts (list[(EventBase, EventContext)]): - backfilled (bool): + should_calculate_state_and_forward_extrems: Determines whether we + need to calculate the state and new forward extremities for the + room. This should be set to false for backfilled events. + use_negative_stream_ordering: Whether to start stream_ordering on + the negative side and decrement. Usually this is done for any + backfilled event. + inhibit_local_membership_updates: Stop the local_current_membership + from being updated by these events. Usually this is done for + backfilled events. + update_room_forward_stream_ordering: Whether to update the + stream_ordering position to mark the latest event as the front + of the room. This should only be set as false for backfilled + events. Returns: the result returned by the `_per_item_callback` passed to @@ -170,7 +200,10 @@ async def add_to_queue( # if the last item in the queue has the same `backfilled` setting, # we can just add these new events to that item. - if queue and queue[-1].backfilled == backfilled: + if ( + queue + and queue[-1].use_negative_stream_ordering == use_negative_stream_ordering + ): end_item = queue[-1] else: # need to make a new queue item @@ -180,7 +213,10 @@ async def add_to_queue( end_item = _EventPersistQueueItem( events_and_contexts=[], - backfilled=backfilled, + should_calculate_state_and_forward_extrems=should_calculate_state_and_forward_extrems, + use_negative_stream_ordering=use_negative_stream_ordering, + inhibit_local_membership_updates=inhibit_local_membership_updates, + update_room_forward_stream_ordering=update_room_forward_stream_ordering, deferred=deferred, ) queue.append(end_item) @@ -241,7 +277,11 @@ async def handle_queue_loop(): item.opentracing_span_context = scope.span.context ret = await self._per_item_callback( - item.events_and_contexts, item.backfilled + item.events_and_contexts, + should_calculate_state_and_forward_extrems=item.should_calculate_state_and_forward_extrems, + use_negative_stream_ordering=item.use_negative_stream_ordering, + inhibit_local_membership_updates=item.inhibit_local_membership_updates, + update_room_forward_stream_ordering=item.update_room_forward_stream_ordering, ) except Exception: with PreserveLoggingContext(): From ec82407b498d19a980a335ca3fb213610263e13e Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 19 Nov 2021 00:43:32 -0600 Subject: [PATCH 4/6] Add changelog --- changelog.d/11396.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/11396.misc diff --git a/changelog.d/11396.misc b/changelog.d/11396.misc new file mode 100644 index 000000000000..de9a45fc9993 --- /dev/null +++ b/changelog.d/11396.misc @@ -0,0 +1 @@ +Refactor `backfilled` into specific behavior function arguments (`persist_events_and_notify` and downstream calls). From 56ea69fecfa4f3d5e079da865ef6a91fc581c52e Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 19 Nov 2021 01:51:05 -0600 Subject: [PATCH 5/6] Fix incompatible type ``` synapse/replication/http/federation.py:72: error: Signature of "_serialize_payload" incompatible with supertype "ReplicationEndpoint" [override] ``` --- synapse/replication/http/federation.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py index 68c6a470f729..ff5f919161d9 100644 --- a/synapse/replication/http/federation.py +++ b/synapse/replication/http/federation.py @@ -73,12 +73,11 @@ async def _serialize_payload( store, room_id, event_and_contexts, - *, - inhibit_push_notifications: bool = False, - should_calculate_state_and_forward_extrems: bool = True, - use_negative_stream_ordering: bool = False, - inhibit_local_membership_updates: bool = False, - update_room_forward_stream_ordering: bool = True, + inhibit_push_notifications, + should_calculate_state_and_forward_extrems, + use_negative_stream_ordering, + inhibit_local_membership_updates, + update_room_forward_stream_ordering, ): """ Args: From f01d506fda95651eb9c0129dcd22bd0829f3cd14 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 19 Nov 2021 02:23:24 -0600 Subject: [PATCH 6/6] Update comments docs to explain the why --- synapse/handlers/federation_event.py | 31 ++++++--- synapse/replication/http/federation.py | 27 +++++--- synapse/storage/databases/main/events.py | 45 +++++++----- synapse/storage/persist_events.py | 88 ++++++++++++++++-------- 4 files changed, 126 insertions(+), 65 deletions(-) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 46606775d399..fc1570f5acfe 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -1814,7 +1814,7 @@ async def _run_push_actions_and_persist_event( # We should not send notifications about backfilled events. inhibit_push_notifications=backfilled, # We don't need to calculate the state for backfilled events and - # we there is no need to update the forward extrems because we + # there is no need to update the forward extrems because we # already know this event happened in the past if it was # backfilled. should_calculate_state_and_forward_extrems=not backfilled, @@ -1823,7 +1823,7 @@ async def _run_push_actions_and_persist_event( use_negative_stream_ordering=backfilled, # Backfilled events do not affect the current local state inhibit_local_membership_updates=backfilled, - # Backfilled events have negative stream ordering and happened + # Backfilled events have negative stream_ordering and happened # in the past so we know that we don't need to update the # stream_ordering tip for the room. update_room_forward_stream_ordering=not backfilled, @@ -1854,21 +1854,30 @@ async def persist_events_and_notify( context that should be persisted. All events must belong to the same room. inhibit_push_notifications: Whether to stop the notifiers/pushers - from knowing about the event. Usually this is done for any backfilled - event. + from knowing about the event. This should be set as True + for backfilled events because there is no need to send push + notifications for events in the past. should_calculate_state_and_forward_extrems: Determines whether we need to calculate the state and new forward extremities for the - room. This should be set to false for backfilled events. + room. This should be set to false for backfilled events because + we don't need to calculate the state for backfilled events and + there is no need to update the forward extrems because we + already know this event happened in the past if it was + backfilled. use_negative_stream_ordering: Whether to start stream_ordering on - the negative side and decrement. Usually this is done for any - backfilled event. + the negative side and decrement. This should be set as True + for backfilled events because backfilled events get a negative + stream ordering so they don't come down incremental `/sync`. inhibit_local_membership_updates: Stop the local_current_membership - from being updated by these events. Usually this is done for - backfilled events. + from being updated by these events. This should be set to True + for backfilled events because backfilled events in the past do + not affect the current local state. update_room_forward_stream_ordering: Whether to update the stream_ordering position to mark the latest event as the front - of the room. This should only be set as false for backfilled - events. + of the room. This should be set as False for backfilled + events because backfilled events have negative stream_ordering + and happened in the past so we know that we don't need to + update the stream_ordering tip for the room. Returns: The stream ID after which all events have been persisted. diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py index ff5f919161d9..71101bba6c38 100644 --- a/synapse/replication/http/federation.py +++ b/synapse/replication/http/federation.py @@ -85,21 +85,30 @@ async def _serialize_payload( room_id (str) event_and_contexts (list[tuple[FrozenEvent, EventContext]]) inhibit_push_notifications: Whether to stop the notifiers/pushers - from knowing about the event. Usually this is done for any backfilled - event. + from knowing about the event. This should be set as True + for backfilled events because there is no need to send push + notifications for events in the past. should_calculate_state_and_forward_extrems: Determines whether we need to calculate the state and new forward extremities for the - room. This should be set to false for backfilled events. + room. This should be set to false for backfilled events because + we don't need to calculate the state for backfilled events and + there is no need to update the forward extrems because we + already know this event happened in the past if it was + backfilled. use_negative_stream_ordering: Whether to start stream_ordering on - the negative side and decrement. Usually this is done for any - backfilled event. + the negative side and decrement. This should be set as True + for backfilled events because backfilled events get a negative + stream ordering so they don't come down incremental `/sync`. inhibit_local_membership_updates: Stop the local_current_membership - from being updated by these events. Usually this is done for - backfilled events. + from being updated by these events. This should be set to True + for backfilled events because backfilled events in the past do + not affect the current local state. update_room_forward_stream_ordering: Whether to update the stream_ordering position to mark the latest event as the front - of the room. This should only be set as false for backfilled - events. + of the room. This should be set as False for backfilled + events because backfilled events have negative stream_ordering + and happened in the past so we know that we don't need to + update the stream_ordering tip for the room. """ event_payloads = [] for event, context in event_and_contexts: diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 1e1fd8e42572..f4291af6089a 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -141,15 +141,19 @@ async def _persist_events_and_state_updates( new_forward_extremities: Map from room_id to list of event IDs that are the new forward extremities of the room. use_negative_stream_ordering: Whether to start stream_ordering on - the negative side and decrement. Usually this is done for any - backfilled event. + the negative side and decrement. This should be set as True + for backfilled events because backfilled events get a negative + stream ordering so they don't come down incremental `/sync`. inhibit_local_membership_updates: Stop the local_current_membership - from being updated by these events. Usually this is done for - backfilled events. + from being updated by these events. This should be set to True + for backfilled events because backfilled events in the past do + not affect the current local state. update_room_forward_stream_ordering: Whether to update the stream_ordering position to mark the latest event as the front - of the room. This should only be set as false for backfilled - events. + of the room. This should be set as False for backfilled + events because backfilled events have negative stream_ordering + and happened in the past so we know that we don't need to + update the stream_ordering tip for the room. Returns: Resolves when the events have been persisted @@ -348,12 +352,15 @@ def _persist_events_txn( events_and_contexts: events to persist backfilled: True if the events were backfilled inhibit_local_membership_updates: Stop the local_current_membership - from being updated by these events. Usually this is done for - backfilled events. + from being updated by these events. This should be set to True + for backfilled events because backfilled events in the past do + not affect the current local state. update_room_forward_stream_ordering: Whether to update the stream_ordering position to mark the latest event as the front - of the room. This should only be set as false for backfilled - events. + of the room. This should be set as False for backfilled + events because backfilled events have negative stream_ordering + and happened in the past so we know that we don't need to + update the stream_ordering tip for the room. delete_existing True to purge existing table rows for the events from the database. This is useful when retrying due to IntegrityError. @@ -1234,10 +1241,12 @@ def _update_room_depths_txn( txn (twisted.enterprise.adbapi.Connection): db connection events_and_contexts (list[(EventBase, EventContext)]): events we are persisting - update_room_forward_stream_ordering (bool): Whether to update the + update_room_forward_stream_ordering: Whether to update the stream_ordering position to mark the latest event as the front - of the room. This should only be set as false for backfilled - events. + of the room. This should be set as False for backfilled + events because backfilled events have negative stream_ordering + and happened in the past so we know that we don't need to + update the stream_ordering tip for the room. """ depth_updates: Dict[str, int] = {} for event, context in events_and_contexts: @@ -1474,8 +1483,9 @@ def _update_metadata_tables_txn( we've already persisted, etc, that wouldn't appear in events_and_context. inhibit_local_membership_updates: Stop the local_current_membership - from being updated by these events. Usually this is done for - backfilled events. + from being updated by these events. This should be set to True + for backfilled events because backfilled events in the past do + not affect the current local state. """ # Insert all the push actions into the event_push_actions table. @@ -1684,8 +1694,9 @@ def _store_room_members_txn( txn: The transaction to use. events: List of events to store. inhibit_local_membership_updates: Stop the local_current_membership - from being updated by these events. Usually this is done for - backfilled events. + from being updated by these events. This should be set to True + for backfilled events because backfilled events in the past do + not affect the current local state. """ def non_null_str_or_none(val: Any) -> Optional[str]: diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py index 18b991d60748..d90c817e7d2c 100644 --- a/synapse/storage/persist_events.py +++ b/synapse/storage/persist_events.py @@ -180,17 +180,25 @@ async def add_to_queue( events_and_contexts (list[(EventBase, EventContext)]): should_calculate_state_and_forward_extrems: Determines whether we need to calculate the state and new forward extremities for the - room. This should be set to false for backfilled events. + room. This should be set to false for backfilled events because + we don't need to calculate the state for backfilled events and + there is no need to update the forward extrems because we + already know this event happened in the past if it was + backfilled. use_negative_stream_ordering: Whether to start stream_ordering on - the negative side and decrement. Usually this is done for any - backfilled event. + the negative side and decrement. This should be set as True + for backfilled events because backfilled events get a negative + stream ordering so they don't come down incremental `/sync`. inhibit_local_membership_updates: Stop the local_current_membership - from being updated by these events. Usually this is done for - backfilled events. + from being updated by these events. This should be set to True + for backfilled events because backfilled events in the past do + not affect the current local state. update_room_forward_stream_ordering: Whether to update the stream_ordering position to mark the latest event as the front - of the room. This should only be set as false for backfilled - events. + of the room. This should be set as False for backfilled + events because backfilled events have negative stream_ordering + and happened in the past so we know that we don't need to + update the stream_ordering tip for the room. Returns: the result returned by the `_per_item_callback` passed to @@ -348,17 +356,25 @@ async def persist_events( events_and_contexts: list of tuples of (event, context) should_calculate_state_and_forward_extrems: Determines whether we need to calculate the state and new forward extremities for the - room. This should be set to false for backfilled events. + room. This should be set to false for backfilled events because + we don't need to calculate the state for backfilled events and + there is no need to update the forward extrems because we + already know this event happened in the past if it was + backfilled. use_negative_stream_ordering: Whether to start stream_ordering on - the negative side and decrement. Usually this is done for any - backfilled event. + the negative side and decrement. This should be set as True + for backfilled events because backfilled events get a negative + stream ordering so they don't come down incremental `/sync`. inhibit_local_membership_updates: Stop the local_current_membership - from being updated by these events. Usually this is done for - backfilled events. + from being updated by these events. This should be set to True + for backfilled events because backfilled events in the past do + not affect the current local state. update_room_forward_stream_ordering: Whether to update the stream_ordering position to mark the latest event as the front - of the room. This should only be set as false for backfilled - events. + of the room. This should be set as False for backfilled + events because backfilled events have negative stream_ordering + and happened in the past so we know that we don't need to + update the stream_ordering tip for the room. Returns: List of events persisted, the current position room stream position. @@ -426,17 +442,25 @@ async def persist_event( context: should_calculate_state_and_forward_extrems: Determines whether we need to calculate the state and new forward extremities for the - room. This should be set to false for backfilled events. + room. This should be set to false for backfilled events because + we don't need to calculate the state for backfilled events and + there is no need to update the forward extrems because we + already know this event happened in the past if it was + backfilled. use_negative_stream_ordering: Whether to start stream_ordering on - the negative side and decrement. Usually this is done for any - backfilled event. + the negative side and decrement. This should be set as True + for backfilled events because backfilled events get a negative + stream ordering so they don't come down incremental `/sync`. inhibit_local_membership_updates: Stop the local_current_membership - from being updated by these events. Usually this is done for - backfilled events. + from being updated by these events. This should be set to True + for backfilled events because backfilled events in the past do + not affect the current local state. update_room_forward_stream_ordering: Whether to update the stream_ordering position to mark the latest event as the front - of the room. This should only be set as false for backfilled - events. + of the room. This should be set as False for backfilled + events because backfilled events have negative stream_ordering + and happened in the past so we know that we don't need to + update the stream_ordering tip for the room. Returns: The event, stream ordering of `event`, and the stream ordering of the @@ -484,17 +508,25 @@ async def _persist_event_batch( events_and_contexts: should_calculate_state_and_forward_extrems: Determines whether we need to calculate the state and new forward extremities for the - room. This should be set to false for backfilled events. + room. This should be set to false for backfilled events because + we don't need to calculate the state for backfilled events and + there is no need to update the forward extrems because we + already know this event happened in the past if it was + backfilled. use_negative_stream_ordering: Whether to start stream_ordering on - the negative side and decrement. Usually this is done for any - backfilled event. + the negative side and decrement. This should be set as True + for backfilled events because backfilled events get a negative + stream ordering so they don't come down incremental `/sync`. inhibit_local_membership_updates: Stop the local_current_membership - from being updated by these events. Usually this is done for - backfilled events. + from being updated by these events. This should be set to True + for backfilled events because backfilled events in the past do + not affect the current local state. update_room_forward_stream_ordering: Whether to update the stream_ordering position to mark the latest event as the front - of the room. This should only be set as false for backfilled - events. + of the room. This should be set as False for backfilled + events because backfilled events have negative stream_ordering + and happened in the past so we know that we don't need to + update the stream_ordering tip for the room. Returns: A dictionary of event ID to event ID we didn't persist as we already