@@ -14,6 +14,7 @@ import okio.withLock
1414import org.session.libsession.database.MessageDataProvider
1515import org.session.libsession.database.userAuth
1616import org.session.libsession.messaging.messages.Message
17+ import org.session.libsession.messaging.messages.Message.Companion.senderOrSync
1718import org.session.libsession.messaging.messages.control.CallMessage
1819import org.session.libsession.messaging.messages.control.DataExtractionNotification
1920import org.session.libsession.messaging.messages.control.ExpirationTimerUpdate
@@ -29,6 +30,7 @@ import org.session.libsession.messaging.sending_receiving.notifications.MessageN
2930import org.session.libsession.messaging.utilities.WebRtcUtils
3031import org.session.libsession.snode.SnodeAPI
3132import org.session.libsession.utilities.Address
33+ import org.session.libsession.utilities.Address.Companion.toAddress
3234import org.session.libsession.utilities.ConfigFactoryProtocol
3335import org.session.libsession.utilities.GroupUtil.doubleEncodeGroupID
3436import org.session.libsession.utilities.SSKEnvironment
@@ -76,6 +78,15 @@ class ReceivedMessageProcessor @Inject constructor(
7678) {
7779 private val threadMutexes = ConcurrentHashMap <Address .Conversable , ReentrantLock >()
7880
81+ private inline fun <T > withThreadLock (
82+ threadAddress : Address .Conversable ,
83+ block : () -> T
84+ ) {
85+ threadMutexes.getOrPut(threadAddress) { ReentrantLock () }.withLock {
86+ block()
87+ }
88+ }
89+
7990
8091 /* *
8192 * Start a message processing session, ensuring that thread updates and notifications are handled
@@ -119,7 +130,7 @@ class ReceivedMessageProcessor @Inject constructor(
119130 threadAddress : Address .Conversable ,
120131 message : Message ,
121132 proto : SignalServiceProtos .Content ,
122- ) = threadMutexes.getOrPut (threadAddress) { ReentrantLock () }.withLock {
133+ ) = withThreadLock (threadAddress) {
123134 // The logic to check if the message should be discarded due to being from a hidden contact.
124135 if (threadAddress is Address .Standard &&
125136 message.sentTimestamp != null &&
@@ -130,7 +141,7 @@ class ReceivedMessageProcessor @Inject constructor(
130141 )
131142 ) {
132143 log { " Dropping message from hidden contact ${threadAddress.debugString} " }
133- return @withLock
144+ return @withThreadLock
134145 }
135146
136147 // Get or create thread ID, if we aren't allowed to create it, and it doesn't exist, drop the message
@@ -142,7 +153,7 @@ class ReceivedMessageProcessor @Inject constructor(
142153 .also { id ->
143154 if (id == - 1L ) {
144155 log { " Dropping message for non-existing thread ${threadAddress.debugString} " }
145- return @withLock
156+ return @withThreadLock
146157 } else {
147158 context.threadIDs[threadAddress] = id
148159 }
@@ -201,23 +212,64 @@ class ReceivedMessageProcessor @Inject constructor(
201212
202213 fun processCommunityInboxMessage (
203214 context : MessageProcessingContext ,
215+ communityServerUrl : String ,
216+ communityServerPubKeyHex : String ,
204217 message : OpenGroupApi .DirectMessage
205218 ) {
206- // TODO("Waiting for the implementation from libsession_util")
219+ val (message, proto) = messageParser.parseCommunityDirectMessage(
220+ msg = message,
221+ currentUserId = context.currentUserId,
222+ currentUserEd25519PrivKey = context.currentUserEd25519KeyPair.secretKey.data,
223+ currentUserBlindedIDs = context.getCurrentUserBlindedIDsByServer(communityServerUrl),
224+ communityServerPubKeyHex = communityServerPubKeyHex,
225+ )
226+
227+ val threadAddress = message.senderOrSync.toAddress() as Address .Conversable
228+
229+ withThreadLock(threadAddress) {
230+ processSwarmMessage(
231+ context = context,
232+ threadAddress = threadAddress,
233+ message = message,
234+ proto = proto
235+ )
236+ }
207237 }
208238
209239 fun processCommunityOutboxMessage (
210240 context : MessageProcessingContext ,
211- message : OpenGroupApi .DirectMessage
241+ communityServerUrl : String ,
242+ communityServerPubKeyHex : String ,
243+ msg : OpenGroupApi .DirectMessage
212244 ) {
213- // TODO("Waiting for the implementation from libsession_util")
245+ val (message, proto) = messageParser.parseCommunityDirectMessage(
246+ msg = msg,
247+ currentUserId = context.currentUserId,
248+ currentUserEd25519PrivKey = context.currentUserEd25519KeyPair.secretKey.data,
249+ currentUserBlindedIDs = context.getCurrentUserBlindedIDsByServer(communityServerUrl),
250+ communityServerPubKeyHex = communityServerPubKeyHex,
251+ )
252+
253+ val threadAddress = Address .CommunityBlindedId (
254+ serverUrl = communityServerUrl,
255+ blindedId = Address .Blinded (AccountId (msg.recipient))
256+ )
257+
258+ withThreadLock(threadAddress) {
259+ processSwarmMessage(
260+ context = context,
261+ threadAddress = threadAddress,
262+ message = message,
263+ proto = proto
264+ )
265+ }
214266 }
215267
216268 fun processCommunityMessage (
217269 context : MessageProcessingContext ,
218270 threadAddress : Address .Community ,
219271 message : OpenGroupApi .Message ,
220- ) = threadMutexes.getOrPut (threadAddress) { ReentrantLock () }.withLock {
272+ ) = withThreadLock (threadAddress) {
221273 var messageId = messageParser.parseCommunityMessage(
222274 msg = message,
223275 currentUserId = context.currentUserId,
0 commit comments