diff --git a/changelog.d/11396.misc b/changelog.d/11396.misc new file mode 100644 index 000000000000..de9a45fc9993 --- /dev/null +++ b/changelog.d/11396.misc @@ -0,0 +1 @@ +Refactor `backfilled` into specific behavior function arguments (`persist_events_and_notify` and downstream calls). diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 9917613298c6..fc1570f5acfe 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -1809,7 +1809,24 @@ async def _run_push_actions_and_persist_event( try: await self.persist_events_and_notify( - event.room_id, [(event, context)], backfilled=backfilled + event.room_id, + [(event, context)], + # We should not send notifications about backfilled events. + inhibit_push_notifications=backfilled, + # We don't need to calculate the state for backfilled events and + # there is no need to update the forward extrems because we + # already know this event happened in the past if it was + # backfilled. + should_calculate_state_and_forward_extrems=not backfilled, + # Backfilled events get a negative stream ordering so they don't + # come down incremental `/sync` + use_negative_stream_ordering=backfilled, + # Backfilled events do not affect the current local state + inhibit_local_membership_updates=backfilled, + # Backfilled events have negative stream_ordering and happened + # in the past so we know that we don't need to update the + # stream_ordering tip for the room. + update_room_forward_stream_ordering=not backfilled, ) except Exception: run_in_background( @@ -1821,7 +1838,12 @@ async def persist_events_and_notify( self, room_id: str, event_and_contexts: Sequence[Tuple[EventBase, EventContext]], - backfilled: bool = False, + *, + inhibit_push_notifications: bool = False, + should_calculate_state_and_forward_extrems: bool = True, + use_negative_stream_ordering: bool = False, + inhibit_local_membership_updates: bool = False, + update_room_forward_stream_ordering: bool = True, ) -> int: """Persists events and tells the notifier/pushers about them, if necessary. @@ -1831,8 +1853,31 @@ async def persist_events_and_notify( event_and_contexts: Sequence of events with their associated context that should be persisted. All events must belong to the same room. - backfilled: Whether these events are a result of - backfilling or not + inhibit_push_notifications: Whether to stop the notifiers/pushers + from knowing about the event. This should be set as True + for backfilled events because there is no need to send push + notifications for events in the past. + should_calculate_state_and_forward_extrems: Determines whether we + need to calculate the state and new forward extremities for the + room. This should be set to false for backfilled events because + we don't need to calculate the state for backfilled events and + there is no need to update the forward extrems because we + already know this event happened in the past if it was + backfilled. + use_negative_stream_ordering: Whether to start stream_ordering on + the negative side and decrement. This should be set as True + for backfilled events because backfilled events get a negative + stream ordering so they don't come down incremental `/sync`. + inhibit_local_membership_updates: Stop the local_current_membership + from being updated by these events. This should be set to True + for backfilled events because backfilled events in the past do + not affect the current local state. + update_room_forward_stream_ordering: Whether to update the + stream_ordering position to mark the latest event as the front + of the room. This should be set as False for backfilled + events because backfilled events have negative stream_ordering + and happened in the past so we know that we don't need to + update the stream_ordering tip for the room. Returns: The stream ID after which all events have been persisted. @@ -1850,7 +1895,7 @@ async def persist_events_and_notify( store=self._store, room_id=room_id, event_and_contexts=batch, - backfilled=backfilled, + inhibit_push_notifications=inhibit_push_notifications, ) return result["max_stream_id"] else: @@ -1859,7 +1904,11 @@ async def persist_events_and_notify( # Note that this returns the events that were persisted, which may not be # the same as were passed in if some were deduplicated due to transaction IDs. events, max_stream_token = await self._storage.persistence.persist_events( - event_and_contexts, backfilled=backfilled + event_and_contexts, + should_calculate_state_and_forward_extrems=should_calculate_state_and_forward_extrems, + use_negative_stream_ordering=use_negative_stream_ordering, + inhibit_local_membership_updates=inhibit_local_membership_updates, + update_room_forward_stream_ordering=update_room_forward_stream_ordering, ) if self._ephemeral_messages_enabled: @@ -1867,7 +1916,7 @@ async def persist_events_and_notify( # If there's an expiry timestamp on the event, schedule its expiry. self._message_handler.maybe_schedule_expiry(event) - if not backfilled: # Never notify for backfilled events + if not inhibit_push_notifications: # Never notify for backfilled events for event in events: await self._notify_persisted_event(event, max_stream_token) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 22dd4cf5fd3f..cd3c64d2d6d6 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -1565,12 +1565,6 @@ async def persist_and_notify_client_event( errcode=Codes.INVALID_PARAM, ) - # Mark any `m.historical` messages as backfilled so they don't appear - # in `/sync` and have the proper decrementing `stream_ordering` as we import - backfilled = False - if event.internal_metadata.is_historical(): - backfilled = True - # Note that this returns the event that was persisted, which may not be # the same as we passed in if it was deduplicated due transaction IDs. ( @@ -1578,7 +1572,17 @@ async def persist_and_notify_client_event( event_pos, max_stream_token, ) = await self.storage.persistence.persist_event( - event, context=context, backfilled=backfilled + event, + context=context, + # Make any historical messages behave like backfilled events + should_calculate_state_and_forward_extrems=not event.internal_metadata.is_historical(), + # We use a negative `stream_ordering`` for historical messages so + # they don't come down an incremental `/sync` and have the proper + # decrementing `stream_ordering` as we import so they sort + # as expected between two depths. + use_negative_stream_ordering=event.internal_metadata.is_historical(), + inhibit_local_membership_updates=event.internal_metadata.is_historical(), + update_room_forward_stream_ordering=not event.internal_metadata.is_historical(), ) if self._ephemeral_events_enabled: diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py index 5ed535c90dea..71101bba6c38 100644 --- a/synapse/replication/http/federation.py +++ b/synapse/replication/http/federation.py @@ -47,7 +47,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint): "rejected_reason": .., // The event.rejected_reason field "context": { .. serialized event context .. }, }], - "backfilled": false + "inhibit_push_notifications": false } 200 OK @@ -69,14 +69,46 @@ def __init__(self, hs: "HomeServer"): self.federation_event_handler = hs.get_federation_event_handler() @staticmethod - async def _serialize_payload(store, room_id, event_and_contexts, backfilled): + async def _serialize_payload( + store, + room_id, + event_and_contexts, + inhibit_push_notifications, + should_calculate_state_and_forward_extrems, + use_negative_stream_ordering, + inhibit_local_membership_updates, + update_room_forward_stream_ordering, + ): """ Args: store room_id (str) event_and_contexts (list[tuple[FrozenEvent, EventContext]]) - backfilled (bool): Whether or not the events are the result of - backfilling + inhibit_push_notifications: Whether to stop the notifiers/pushers + from knowing about the event. This should be set as True + for backfilled events because there is no need to send push + notifications for events in the past. + should_calculate_state_and_forward_extrems: Determines whether we + need to calculate the state and new forward extremities for the + room. This should be set to false for backfilled events because + we don't need to calculate the state for backfilled events and + there is no need to update the forward extrems because we + already know this event happened in the past if it was + backfilled. + use_negative_stream_ordering: Whether to start stream_ordering on + the negative side and decrement. This should be set as True + for backfilled events because backfilled events get a negative + stream ordering so they don't come down incremental `/sync`. + inhibit_local_membership_updates: Stop the local_current_membership + from being updated by these events. This should be set to True + for backfilled events because backfilled events in the past do + not affect the current local state. + update_room_forward_stream_ordering: Whether to update the + stream_ordering position to mark the latest event as the front + of the room. This should be set as False for backfilled + events because backfilled events have negative stream_ordering + and happened in the past so we know that we don't need to + update the stream_ordering tip for the room. """ event_payloads = [] for event, context in event_and_contexts: @@ -96,7 +128,11 @@ async def _serialize_payload(store, room_id, event_and_contexts, backfilled): payload = { "events": event_payloads, - "backfilled": backfilled, + "inhibit_push_notifications": inhibit_push_notifications, + "should_calculate_state_and_forward_extrems": should_calculate_state_and_forward_extrems, + "use_negative_stream_ordering": use_negative_stream_ordering, + "inhibit_local_membership_updates": inhibit_local_membership_updates, + "update_room_forward_stream_ordering": update_room_forward_stream_ordering, "room_id": room_id, } @@ -107,7 +143,23 @@ async def _handle_request(self, request): content = parse_json_object_from_request(request) room_id = content["room_id"] - backfilled = content["backfilled"] + inhibit_push_notifications = content["inhibit_push_notifications"] + should_calculate_state_and_forward_extrems = content[ + "should_calculate_state_and_forward_extrems" + ] + use_negative_stream_ordering = content["use_negative_stream_ordering"] + inhibit_local_membership_updates = content[ + "inhibit_local_membership_updates" + ] + update_room_forward_stream_ordering = content[ + "update_room_forward_stream_ordering" + ] + + assert inhibit_push_notifications is not None + assert should_calculate_state_and_forward_extrems is not None + assert use_negative_stream_ordering is not None + assert inhibit_local_membership_updates is not None + assert update_room_forward_stream_ordering is not None event_payloads = content["events"] @@ -132,7 +184,13 @@ async def _handle_request(self, request): logger.info("Got %d events from federation", len(event_and_contexts)) max_stream_id = await self.federation_event_handler.persist_events_and_notify( - room_id, event_and_contexts, backfilled + room_id, + event_and_contexts, + inhibit_push_notifications=inhibit_push_notifications, + should_calculate_state_and_forward_extrems=should_calculate_state_and_forward_extrems, + use_negative_stream_ordering=use_negative_stream_ordering, + inhibit_local_membership_updates=inhibit_local_membership_updates, + update_room_forward_stream_ordering=update_room_forward_stream_ordering, ) return 200, {"max_stream_id": max_stream_id} diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 120e4807d161..f4291af6089a 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -121,10 +121,13 @@ def __init__( async def _persist_events_and_state_updates( self, events_and_contexts: List[Tuple[EventBase, EventContext]], + *, current_state_for_room: Dict[str, StateMap[str]], state_delta_for_room: Dict[str, DeltaState], new_forward_extremeties: Dict[str, List[str]], - backfilled: bool = False, + use_negative_stream_ordering: bool = False, + inhibit_local_membership_updates: bool = False, + update_room_forward_stream_ordering: bool = True, ) -> None: """Persist a set of events alongside updates to the current state and forward extremities tables. @@ -137,7 +140,20 @@ async def _persist_events_and_state_updates( room state new_forward_extremities: Map from room_id to list of event IDs that are the new forward extremities of the room. - backfilled + use_negative_stream_ordering: Whether to start stream_ordering on + the negative side and decrement. This should be set as True + for backfilled events because backfilled events get a negative + stream ordering so they don't come down incremental `/sync`. + inhibit_local_membership_updates: Stop the local_current_membership + from being updated by these events. This should be set to True + for backfilled events because backfilled events in the past do + not affect the current local state. + update_room_forward_stream_ordering: Whether to update the + stream_ordering position to mark the latest event as the front + of the room. This should be set as False for backfilled + events because backfilled events have negative stream_ordering + and happened in the past so we know that we don't need to + update the stream_ordering tip for the room. Returns: Resolves when the events have been persisted @@ -159,7 +175,7 @@ async def _persist_events_and_state_updates( # # Note: Multiple instances of this function cannot be in flight at # the same time for the same room. - if backfilled: + if use_negative_stream_ordering: stream_ordering_manager = self._backfill_id_gen.get_next_mult( len(events_and_contexts) ) @@ -176,13 +192,15 @@ async def _persist_events_and_state_updates( "persist_events", self._persist_events_txn, events_and_contexts=events_and_contexts, - backfilled=backfilled, + inhibit_local_membership_updates=inhibit_local_membership_updates, + update_room_forward_stream_ordering=update_room_forward_stream_ordering, state_delta_for_room=state_delta_for_room, new_forward_extremeties=new_forward_extremeties, ) persist_event_counter.inc(len(events_and_contexts)) - if not backfilled: + # TODO: test that this actuall works + if stream < 0: # backfilled events have negative stream orderings, so we don't # want to set the event_persisted_position to that. synapse.metrics.event_persisted_position.set( @@ -316,8 +334,10 @@ def _get_prevs_before_rejected_txn(txn, batch): def _persist_events_txn( self, txn: LoggingTransaction, + *, events_and_contexts: List[Tuple[EventBase, EventContext]], - backfilled: bool, + inhibit_local_membership_updates: bool = False, + update_room_forward_stream_ordering: bool = True, state_delta_for_room: Optional[Dict[str, DeltaState]] = None, new_forward_extremeties: Optional[Dict[str, List[str]]] = None, ): @@ -331,11 +351,21 @@ def _persist_events_txn( txn events_and_contexts: events to persist backfilled: True if the events were backfilled + inhibit_local_membership_updates: Stop the local_current_membership + from being updated by these events. This should be set to True + for backfilled events because backfilled events in the past do + not affect the current local state. + update_room_forward_stream_ordering: Whether to update the + stream_ordering position to mark the latest event as the front + of the room. This should be set as False for backfilled + events because backfilled events have negative stream_ordering + and happened in the past so we know that we don't need to + update the stream_ordering tip for the room. delete_existing True to purge existing table rows for the events from the database. This is useful when retrying due to IntegrityError. state_delta_for_room: The current-state delta for each room. - new_forward_extremetie: The new forward extremities for each room. + new_forward_extremeties: The new forward extremities for each room. For each room, a list of the event ids which are the forward extremities. @@ -364,7 +394,9 @@ def _persist_events_txn( ) self._update_room_depths_txn( - txn, events_and_contexts=events_and_contexts, backfilled=backfilled + txn, + events_and_contexts=events_and_contexts, + update_room_forward_stream_ordering=update_room_forward_stream_ordering, ) # _update_outliers_txn filters out any events which have already been @@ -398,7 +430,7 @@ def _persist_events_txn( txn, events_and_contexts=events_and_contexts, all_events_and_contexts=all_events_and_contexts, - backfilled=backfilled, + inhibit_local_membership_updates=inhibit_local_membership_updates, ) # We call this last as it assumes we've inserted the events into @@ -1200,7 +1232,8 @@ def _update_room_depths_txn( self, txn, events_and_contexts: List[Tuple[EventBase, EventContext]], - backfilled: bool, + *, + update_room_forward_stream_ordering: bool = True, ): """Update min_depth for each room @@ -1208,13 +1241,18 @@ def _update_room_depths_txn( txn (twisted.enterprise.adbapi.Connection): db connection events_and_contexts (list[(EventBase, EventContext)]): events we are persisting - backfilled (bool): True if the events were backfilled + update_room_forward_stream_ordering: Whether to update the + stream_ordering position to mark the latest event as the front + of the room. This should be set as False for backfilled + events because backfilled events have negative stream_ordering + and happened in the past so we know that we don't need to + update the stream_ordering tip for the room. """ depth_updates: Dict[str, int] = {} for event, context in events_and_contexts: # Remove the any existing cache entries for the event_ids txn.call_after(self.store._invalidate_get_event_cache, event.event_id) - if not backfilled: + if update_room_forward_stream_ordering: txn.call_after( self.store._events_stream_cache.entity_has_changed, event.room_id, @@ -1427,7 +1465,12 @@ def _store_rejected_events_txn(self, txn, events_and_contexts): return [ec for ec in events_and_contexts if ec[0] not in to_remove] def _update_metadata_tables_txn( - self, txn, events_and_contexts, all_events_and_contexts, backfilled + self, + txn, + *, + events_and_contexts, + all_events_and_contexts, + inhibit_local_membership_updates: bool = False, ): """Update all the miscellaneous tables for new events @@ -1439,7 +1482,10 @@ def _update_metadata_tables_txn( events that we were going to persist. This includes events we've already persisted, etc, that wouldn't appear in events_and_context. - backfilled (bool): True if the events were backfilled + inhibit_local_membership_updates: Stop the local_current_membership + from being updated by these events. This should be set to True + for backfilled events because backfilled events in the past do + not affect the current local state. """ # Insert all the push actions into the event_push_actions table. @@ -1513,7 +1559,7 @@ def _update_metadata_tables_txn( for event, _ in events_and_contexts if event.type == EventTypes.Member ], - backfilled=backfilled, + inhibit_local_membership_updates=inhibit_local_membership_updates, ) # Insert event_reference_hashes table. @@ -1638,8 +1684,20 @@ def _store_event_reference_hashes_txn(self, txn, events): txn, table="event_reference_hashes", values=vals ) - def _store_room_members_txn(self, txn, events, backfilled): - """Store a room member in the database.""" + def _store_room_members_txn( + self, txn, events, *, inhibit_local_membership_updates: bool = False + ): + """ + Store a room member in the database. + + Args: + txn: The transaction to use. + events: List of events to store. + inhibit_local_membership_updates: Stop the local_current_membership + from being updated by these events. This should be set to True + for backfilled events because backfilled events in the past do + not affect the current local state. + """ def non_null_str_or_none(val: Any) -> Optional[str]: return val if isinstance(val, str) and "\u0000" not in val else None @@ -1682,7 +1740,7 @@ def non_null_str_or_none(val: Any) -> Optional[str]: # band membership", like a remote invite or a rejection of a remote invite. if ( self.is_mine_id(event.state_key) - and not backfilled + and not inhibit_local_membership_updates and event.internal_metadata.is_outlier() and event.internal_metadata.is_out_of_band_membership() ): diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py index 402f134d894b..d90c817e7d2c 100644 --- a/synapse/storage/persist_events.py +++ b/synapse/storage/persist_events.py @@ -35,6 +35,7 @@ ) import attr +from mypy_extensions import DefaultNamedArg from prometheus_client import Counter, Histogram from twisted.internet import defer @@ -112,7 +113,10 @@ @attr.s(auto_attribs=True, slots=True) class _EventPersistQueueItem: events_and_contexts: List[Tuple[EventBase, EventContext]] - backfilled: bool + should_calculate_state_and_forward_extrems: bool + use_negative_stream_ordering: bool + inhibit_local_membership_updates: bool + update_room_forward_stream_ordering: bool deferred: ObservableDeferred parent_opentracing_span_contexts: List = attr.ib(factory=list) @@ -133,7 +137,17 @@ class _EventPeristenceQueue(Generic[_PersistResult]): def __init__( self, per_item_callback: Callable[ - [List[Tuple[EventBase, EventContext]], bool], + [ + List[Tuple[EventBase, EventContext]], + DefaultNamedArg( + bool, "should_calculate_state_and_forward_extrems" # noqa: F821 + ), + DefaultNamedArg(bool, "use_negative_stream_ordering"), # noqa: F821 + DefaultNamedArg(bool, "inhibit_local_membership_updates"), # noqa: F821 + DefaultNamedArg( + bool, "update_room_forward_stream_ordering" # noqa: F821 + ), + ], Awaitable[_PersistResult], ], ): @@ -150,7 +164,11 @@ async def add_to_queue( self, room_id: str, events_and_contexts: Iterable[Tuple[EventBase, EventContext]], - backfilled: bool, + *, + should_calculate_state_and_forward_extrems: bool = True, + use_negative_stream_ordering: bool = False, + inhibit_local_membership_updates: bool = False, + update_room_forward_stream_ordering: bool = True, ) -> _PersistResult: """Add events to the queue, with the given persist_event options. @@ -160,7 +178,27 @@ async def add_to_queue( Args: room_id (str): events_and_contexts (list[(EventBase, EventContext)]): - backfilled (bool): + should_calculate_state_and_forward_extrems: Determines whether we + need to calculate the state and new forward extremities for the + room. This should be set to false for backfilled events because + we don't need to calculate the state for backfilled events and + there is no need to update the forward extrems because we + already know this event happened in the past if it was + backfilled. + use_negative_stream_ordering: Whether to start stream_ordering on + the negative side and decrement. This should be set as True + for backfilled events because backfilled events get a negative + stream ordering so they don't come down incremental `/sync`. + inhibit_local_membership_updates: Stop the local_current_membership + from being updated by these events. This should be set to True + for backfilled events because backfilled events in the past do + not affect the current local state. + update_room_forward_stream_ordering: Whether to update the + stream_ordering position to mark the latest event as the front + of the room. This should be set as False for backfilled + events because backfilled events have negative stream_ordering + and happened in the past so we know that we don't need to + update the stream_ordering tip for the room. Returns: the result returned by the `_per_item_callback` passed to @@ -170,7 +208,10 @@ async def add_to_queue( # if the last item in the queue has the same `backfilled` setting, # we can just add these new events to that item. - if queue and queue[-1].backfilled == backfilled: + if ( + queue + and queue[-1].use_negative_stream_ordering == use_negative_stream_ordering + ): end_item = queue[-1] else: # need to make a new queue item @@ -180,7 +221,10 @@ async def add_to_queue( end_item = _EventPersistQueueItem( events_and_contexts=[], - backfilled=backfilled, + should_calculate_state_and_forward_extrems=should_calculate_state_and_forward_extrems, + use_negative_stream_ordering=use_negative_stream_ordering, + inhibit_local_membership_updates=inhibit_local_membership_updates, + update_room_forward_stream_ordering=update_room_forward_stream_ordering, deferred=deferred, ) queue.append(end_item) @@ -241,7 +285,11 @@ async def handle_queue_loop(): item.opentracing_span_context = scope.span.context ret = await self._per_item_callback( - item.events_and_contexts, item.backfilled + item.events_and_contexts, + should_calculate_state_and_forward_extrems=item.should_calculate_state_and_forward_extrems, + use_negative_stream_ordering=item.use_negative_stream_ordering, + inhibit_local_membership_updates=item.inhibit_local_membership_updates, + update_room_forward_stream_ordering=item.update_room_forward_stream_ordering, ) except Exception: with PreserveLoggingContext(): @@ -296,15 +344,37 @@ def __init__(self, hs: "HomeServer", stores: Databases): async def persist_events( self, events_and_contexts: Iterable[Tuple[EventBase, EventContext]], - backfilled: bool = False, + *, + should_calculate_state_and_forward_extrems: bool = True, + use_negative_stream_ordering: bool = False, + inhibit_local_membership_updates: bool = False, + update_room_forward_stream_ordering: bool = True, ) -> Tuple[List[EventBase], RoomStreamToken]: """ Write events to the database Args: events_and_contexts: list of tuples of (event, context) - backfilled: Whether the results are retrieved from federation - via backfill or not. Used to determine if they're "new" events - which might update the current state etc. + should_calculate_state_and_forward_extrems: Determines whether we + need to calculate the state and new forward extremities for the + room. This should be set to false for backfilled events because + we don't need to calculate the state for backfilled events and + there is no need to update the forward extrems because we + already know this event happened in the past if it was + backfilled. + use_negative_stream_ordering: Whether to start stream_ordering on + the negative side and decrement. This should be set as True + for backfilled events because backfilled events get a negative + stream ordering so they don't come down incremental `/sync`. + inhibit_local_membership_updates: Stop the local_current_membership + from being updated by these events. This should be set to True + for backfilled events because backfilled events in the past do + not affect the current local state. + update_room_forward_stream_ordering: Whether to update the + stream_ordering position to mark the latest event as the front + of the room. This should be set as False for backfilled + events because backfilled events have negative stream_ordering + and happened in the past so we know that we don't need to + update the stream_ordering tip for the room. Returns: List of events persisted, the current position room stream position. @@ -320,7 +390,12 @@ async def persist_events( async def enqueue(item): room_id, evs_ctxs = item return await self._event_persist_queue.add_to_queue( - room_id, evs_ctxs, backfilled=backfilled + room_id, + evs_ctxs, + should_calculate_state_and_forward_extrems=should_calculate_state_and_forward_extrems, + use_negative_stream_ordering=use_negative_stream_ordering, + inhibit_local_membership_updates=inhibit_local_membership_updates, + update_room_forward_stream_ordering=update_room_forward_stream_ordering, ) ret_vals = await yieldable_gather_results(enqueue, partitioned.items()) @@ -350,9 +425,43 @@ async def enqueue(item): @opentracing.trace async def persist_event( - self, event: EventBase, context: EventContext, backfilled: bool = False + self, + event: EventBase, + context: EventContext, + *, + should_calculate_state_and_forward_extrems: bool = True, + use_negative_stream_ordering: bool = False, + inhibit_local_membership_updates: bool = False, + update_room_forward_stream_ordering: bool = True, ) -> Tuple[EventBase, PersistedEventPosition, RoomStreamToken]: """ + Write a single event to the database. + + Args: + event: + context: + should_calculate_state_and_forward_extrems: Determines whether we + need to calculate the state and new forward extremities for the + room. This should be set to false for backfilled events because + we don't need to calculate the state for backfilled events and + there is no need to update the forward extrems because we + already know this event happened in the past if it was + backfilled. + use_negative_stream_ordering: Whether to start stream_ordering on + the negative side and decrement. This should be set as True + for backfilled events because backfilled events get a negative + stream ordering so they don't come down incremental `/sync`. + inhibit_local_membership_updates: Stop the local_current_membership + from being updated by these events. This should be set to True + for backfilled events because backfilled events in the past do + not affect the current local state. + update_room_forward_stream_ordering: Whether to update the + stream_ordering position to mark the latest event as the front + of the room. This should be set as False for backfilled + events because backfilled events have negative stream_ordering + and happened in the past so we know that we don't need to + update the stream_ordering tip for the room. + Returns: The event, stream ordering of `event`, and the stream ordering of the latest persisted event. The returned event may not match the given @@ -363,7 +472,12 @@ async def persist_event( # event was deduplicated. (The dict may also include other entries if # the event was persisted in a batch with other events.) replaced_events = await self._event_persist_queue.add_to_queue( - event.room_id, [(event, context)], backfilled=backfilled + event.room_id, + [(event, context)], + should_calculate_state_and_forward_extrems=should_calculate_state_and_forward_extrems, + use_negative_stream_ordering=use_negative_stream_ordering, + inhibit_local_membership_updates=inhibit_local_membership_updates, + update_room_forward_stream_ordering=update_room_forward_stream_ordering, ) replaced_event = replaced_events.get(event.event_id) if replaced_event: @@ -379,13 +493,41 @@ async def persist_event( async def _persist_event_batch( self, events_and_contexts: List[Tuple[EventBase, EventContext]], - backfilled: bool = False, + *, + should_calculate_state_and_forward_extrems: bool = True, + use_negative_stream_ordering: bool = False, + inhibit_local_membership_updates: bool = False, + update_room_forward_stream_ordering: bool = True, ) -> Dict[str, str]: """Callback for the _event_persist_queue Calculates the change to current state and forward extremities, and persists the given events and with those updates. + Args: + events_and_contexts: + should_calculate_state_and_forward_extrems: Determines whether we + need to calculate the state and new forward extremities for the + room. This should be set to false for backfilled events because + we don't need to calculate the state for backfilled events and + there is no need to update the forward extrems because we + already know this event happened in the past if it was + backfilled. + use_negative_stream_ordering: Whether to start stream_ordering on + the negative side and decrement. This should be set as True + for backfilled events because backfilled events get a negative + stream ordering so they don't come down incremental `/sync`. + inhibit_local_membership_updates: Stop the local_current_membership + from being updated by these events. This should be set to True + for backfilled events because backfilled events in the past do + not affect the current local state. + update_room_forward_stream_ordering: Whether to update the + stream_ordering position to mark the latest event as the front + of the room. This should be set as False for backfilled + events because backfilled events have negative stream_ordering + and happened in the past so we know that we don't need to + update the stream_ordering tip for the room. + Returns: A dictionary of event ID to event ID we didn't persist as we already had another event persisted with the same TXN ID. @@ -448,7 +590,7 @@ async def _persist_event_batch( # device lists as stale. potentially_left_users: Set[str] = set() - if not backfilled: + if should_calculate_state_and_forward_extrems: with Measure(self._clock, "_calculate_state_and_extrem"): # Work out the new "current state" for each room. # We do this by working out what the new extremities are and then @@ -583,7 +725,9 @@ async def _persist_event_batch( current_state_for_room=current_state_for_room, state_delta_for_room=state_delta_for_room, new_forward_extremeties=new_forward_extremeties, - backfilled=backfilled, + use_negative_stream_ordering=use_negative_stream_ordering, + inhibit_local_membership_updates=inhibit_local_membership_updates, + update_room_forward_stream_ordering=update_room_forward_stream_ordering, ) await self._handle_potentially_left_users(potentially_left_users) diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/slave/storage/test_events.py index b25a06b4271a..dd54d8908103 100644 --- a/tests/replication/slave/storage/test_events.py +++ b/tests/replication/slave/storage/test_events.py @@ -323,7 +323,12 @@ def persist(self, backfill=False, **kwargs): if backfill: self.get_success( self.storage.persistence.persist_events( - [(event, context)], backfilled=True + [(event, context)], + # Backfilled event + should_calculate_state_and_forward_extrems=False, + use_negative_stream_ordering=True, + inhibit_local_membership_updates=True, + update_room_forward_stream_ordering=False, ) ) else: