2626 cast ,
2727)
2828
29+ from synapse .api .constants import EventContentFields
2930from synapse .logging import issue9533_logger
30- from synapse .logging .opentracing import log_kv , set_tag , trace
31+ from synapse .logging .opentracing import (
32+ SynapseTags ,
33+ log_kv ,
34+ set_tag ,
35+ start_active_span ,
36+ trace ,
37+ )
3138from synapse .replication .tcp .streams import ToDeviceStream
3239from synapse .storage ._base import SQLBaseStore , db_to_json
3340from synapse .storage .database import (
@@ -397,6 +404,17 @@ def get_device_messages_txn(
397404 (recipient_user_id , recipient_device_id ), []
398405 ).append (message_dict )
399406
407+ # start a new span for each message, so that we can tag each separately
408+ with start_active_span ("get_to_device_message" ):
409+ set_tag (SynapseTags .TO_DEVICE_TYPE , message_dict ["type" ])
410+ set_tag (SynapseTags .TO_DEVICE_SENDER , message_dict ["sender" ])
411+ set_tag (SynapseTags .TO_DEVICE_RECIPIENT , recipient_user_id )
412+ set_tag (SynapseTags .TO_DEVICE_RECIPIENT_DEVICE , recipient_device_id )
413+ set_tag (
414+ SynapseTags .TO_DEVICE_MSGID ,
415+ message_dict ["content" ].get (EventContentFields .TO_DEVICE_MSGID ),
416+ )
417+
400418 if limit is not None and rowcount == limit :
401419 # We ended up bumping up against the message limit. There may be more messages
402420 # to retrieve. Return what we have, as well as the last stream position that
@@ -678,12 +696,35 @@ def add_messages_txn(
678696 ],
679697 )
680698
681- if remote_messages_by_destination :
682- issue9533_logger .debug (
683- "Queued outgoing to-device messages with stream_id %i for %s" ,
684- stream_id ,
685- list (remote_messages_by_destination .keys ()),
686- )
699+ for destination , edu in remote_messages_by_destination .items ():
700+ if issue9533_logger .isEnabledFor (logging .DEBUG ):
701+ issue9533_logger .debug (
702+ "Queued outgoing to-device messages with "
703+ "stream_id %i, EDU message_id %s, type %s for %s: %s" ,
704+ stream_id ,
705+ edu ["message_id" ],
706+ edu ["type" ],
707+ destination ,
708+ [
709+ f"{ user_id } /{ device_id } (msgid "
710+ f"{ msg .get (EventContentFields .TO_DEVICE_MSGID )} )"
711+ for (user_id , messages_by_device ) in edu ["messages" ].items ()
712+ for (device_id , msg ) in messages_by_device .items ()
713+ ],
714+ )
715+
716+ for (user_id , messages_by_device ) in edu ["messages" ].items ():
717+ for (device_id , msg ) in messages_by_device .items ():
718+ with start_active_span ("store_outgoing_to_device_message" ):
719+ set_tag (SynapseTags .TO_DEVICE_EDU_ID , edu ["sender" ])
720+ set_tag (SynapseTags .TO_DEVICE_EDU_ID , edu ["message_id" ])
721+ set_tag (SynapseTags .TO_DEVICE_TYPE , edu ["type" ])
722+ set_tag (SynapseTags .TO_DEVICE_RECIPIENT , user_id )
723+ set_tag (SynapseTags .TO_DEVICE_RECIPIENT_DEVICE , device_id )
724+ set_tag (
725+ SynapseTags .TO_DEVICE_MSGID ,
726+ msg .get (EventContentFields .TO_DEVICE_MSGID ),
727+ )
687728
688729 async with self ._device_inbox_id_gen .get_next () as stream_id :
689730 now_ms = self ._clock .time_msec ()
@@ -801,7 +842,19 @@ def _add_messages_to_local_device_inbox_txn(
801842 # Only insert into the local inbox if the device exists on
802843 # this server
803844 device_id = row ["device_id" ]
804- message_json = json_encoder .encode (messages_by_device [device_id ])
845+
846+ with start_active_span ("serialise_to_device_message" ):
847+ msg = messages_by_device [device_id ]
848+ set_tag (SynapseTags .TO_DEVICE_TYPE , msg ["type" ])
849+ set_tag (SynapseTags .TO_DEVICE_SENDER , msg ["sender" ])
850+ set_tag (SynapseTags .TO_DEVICE_RECIPIENT , user_id )
851+ set_tag (SynapseTags .TO_DEVICE_RECIPIENT_DEVICE , device_id )
852+ set_tag (
853+ SynapseTags .TO_DEVICE_MSGID ,
854+ msg ["content" ].get (EventContentFields .TO_DEVICE_MSGID ),
855+ )
856+ message_json = json_encoder .encode (msg )
857+
805858 messages_json_for_user [device_id ] = message_json
806859
807860 if messages_json_for_user :
@@ -821,15 +874,20 @@ def _add_messages_to_local_device_inbox_txn(
821874 ],
822875 )
823876
824- issue9533_logger .debug (
825- "Stored to-device messages with stream_id %i for %s" ,
826- stream_id ,
827- [
828- (user_id , device_id )
829- for (user_id , messages_by_device ) in local_by_user_then_device .items ()
830- for device_id in messages_by_device .keys ()
831- ],
832- )
877+ if issue9533_logger .isEnabledFor (logging .DEBUG ):
878+ issue9533_logger .debug (
879+ "Stored to-device messages with stream_id %i: %s" ,
880+ stream_id ,
881+ [
882+ f"{ user_id } /{ device_id } (msgid "
883+ f"{ msg ['content' ].get (EventContentFields .TO_DEVICE_MSGID )} )"
884+ for (
885+ user_id ,
886+ messages_by_device ,
887+ ) in messages_by_user_then_device .items ()
888+ for (device_id , msg ) in messages_by_device .items ()
889+ ],
890+ )
833891
834892
835893class DeviceInboxBackgroundUpdateStore (SQLBaseStore ):
0 commit comments