55
66from synapse .api .constants import EventTypes
77from synapse .events import EventBase
8- from synapse .federation .sender import PerDestinationQueue , TransactionManager
8+ from synapse .federation .sender import (
9+ FederationSender ,
10+ PerDestinationQueue ,
11+ TransactionManager ,
12+ )
913from synapse .federation .units import Edu , Transaction
1014from synapse .rest import admin
1115from synapse .rest .client import login , room
@@ -33,8 +37,9 @@ class FederationCatchUpTestCases(FederatingHomeserverTestCase):
3337 ]
3438
3539 def make_homeserver (self , reactor : MemoryReactor , clock : Clock ) -> HomeServer :
40+ self .federation_transport_client = Mock (spec = ["send_transaction" ])
3641 return self .setup_test_homeserver (
37- federation_transport_client = Mock ( spec = [ "send_transaction" ]) ,
42+ federation_transport_client = self . federation_transport_client ,
3843 )
3944
4045 def prepare (self , reactor : MemoryReactor , clock : Clock , hs : HomeServer ) -> None :
@@ -52,10 +57,14 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
5257 self .pdus : List [JsonDict ] = []
5358 self .failed_pdus : List [JsonDict ] = []
5459 self .is_online = True
55- self .hs . get_federation_transport_client () .send_transaction .side_effect = (
60+ self .federation_transport_client .send_transaction .side_effect = (
5661 self .record_transaction
5762 )
5863
64+ federation_sender = hs .get_federation_sender ()
65+ assert isinstance (federation_sender , FederationSender )
66+ self .federation_sender = federation_sender
67+
5968 def default_config (self ) -> JsonDict :
6069 config = super ().default_config ()
6170 config ["federation_sender_instances" ] = None
@@ -229,11 +238,11 @@ def test_catch_up_from_blank_state(self) -> None:
229238 # let's delete the federation transmission queue
230239 # (this pretends we are starting up fresh.)
231240 self .assertFalse (
232- self .hs . get_federation_sender ()
233- . _per_destination_queues [ "host2" ]
234- .transmission_loop_running
241+ self .federation_sender . _per_destination_queues [
242+ "host2"
243+ ] .transmission_loop_running
235244 )
236- del self .hs . get_federation_sender () ._per_destination_queues ["host2" ]
245+ del self .federation_sender ._per_destination_queues ["host2" ]
237246
238247 # let's also clear any backoffs
239248 self .get_success (
@@ -322,6 +331,7 @@ def test_catch_up_loop(self) -> None:
322331 # also fetch event 5 so we know its last_successful_stream_ordering later
323332 event_5 = self .get_success (self .hs .get_datastores ().main .get_event (event_id_5 ))
324333
334+ assert event_2 .internal_metadata .stream_ordering is not None
325335 self .get_success (
326336 self .hs .get_datastores ().main .set_destination_last_successful_stream_ordering (
327337 "host2" , event_2 .internal_metadata .stream_ordering
@@ -425,15 +435,16 @@ def test_catch_up_on_synapse_startup(self) -> None:
425435 def wake_destination_track (destination : str ) -> None :
426436 woken .append (destination )
427437
428- self .hs . get_federation_sender (). wake_destination = wake_destination_track
438+ self .federation_sender . wake_destination = wake_destination_track # type: ignore[assignment]
429439
430440 # cancel the pre-existing timer for _wake_destinations_needing_catchup
431441 # this is because we are calling it manually rather than waiting for it
432442 # to be called automatically
433- self .hs .get_federation_sender ()._catchup_after_startup_timer .cancel ()
443+ assert self .federation_sender ._catchup_after_startup_timer is not None
444+ self .federation_sender ._catchup_after_startup_timer .cancel ()
434445
435446 self .get_success (
436- self .hs . get_federation_sender () ._wake_destinations_needing_catchup (), by = 5.0
447+ self .federation_sender ._wake_destinations_needing_catchup (), by = 5.0
437448 )
438449
439450 # ASSERT (_wake_destinations_needing_catchup):
@@ -475,6 +486,7 @@ def test_not_latest_event(self) -> None:
475486 )
476487 )
477488
489+ assert event_1 .internal_metadata .stream_ordering is not None
478490 self .get_success (
479491 self .hs .get_datastores ().main .set_destination_last_successful_stream_ordering (
480492 "host2" , event_1 .internal_metadata .stream_ordering
0 commit comments