1+ package org.thoughtcrime.securesms.pro
2+
3+ import kotlinx.coroutines.CoroutineScope
4+ import kotlinx.coroutines.channels.BufferOverflow
5+ import kotlinx.coroutines.channels.Channel
6+ import kotlinx.coroutines.channels.SendChannel
7+ import kotlinx.coroutines.flow.SharingStarted
8+ import kotlinx.coroutines.flow.StateFlow
9+ import kotlinx.coroutines.flow.distinctUntilChanged
10+ import kotlinx.coroutines.flow.first
11+ import kotlinx.coroutines.flow.flatMapLatest
12+ import kotlinx.coroutines.flow.flow
13+ import kotlinx.coroutines.flow.mapNotNull
14+ import kotlinx.coroutines.flow.stateIn
15+ import kotlinx.coroutines.selects.select
16+ import kotlinx.coroutines.selects.onTimeout
17+ import org.session.libsession.utilities.TextSecurePreferences
18+ import org.session.libsignal.utilities.Log
19+ import org.thoughtcrime.securesms.auth.LoginStateRepository
20+ import org.thoughtcrime.securesms.dependencies.ManagerScope
21+ import org.thoughtcrime.securesms.pro.api.GetProDetailsRequest
22+ import org.thoughtcrime.securesms.pro.api.ProApiExecutor
23+ import org.thoughtcrime.securesms.pro.api.ProDetails
24+ import org.thoughtcrime.securesms.pro.api.successOrThrow
25+ import org.thoughtcrime.securesms.pro.db.ProDatabase
26+ import org.thoughtcrime.securesms.util.NetworkConnectivity
27+ import java.time.Duration
28+ import java.time.Instant
29+ import javax.inject.Inject
30+ import javax.inject.Singleton
31+ import kotlin.coroutines.cancellation.CancellationException
32+
33+ @Singleton
34+ class ProDetailsRepository @Inject constructor(
35+ private val db : ProDatabase ,
36+ private val apiExecutor : ProApiExecutor ,
37+ private val getProDetailsRequestFactory : GetProDetailsRequest .Factory ,
38+ private val loginStateRepository : LoginStateRepository ,
39+ private val prefs : TextSecurePreferences ,
40+ networkConnectivity : NetworkConnectivity ,
41+ @ManagerScope scope : CoroutineScope ,
42+ ) {
43+ sealed interface LoadState {
44+ val lastUpdated: Pair <ProDetails , Instant >?
45+
46+ data object Init : LoadState {
47+ override val lastUpdated: Pair <ProDetails , Instant >?
48+ get() = null
49+ }
50+
51+ data class Loading (
52+ override val lastUpdated : Pair <ProDetails , Instant >? ,
53+ val waitingForNetwork : Boolean
54+ ) : LoadState
55+
56+ data class Loaded (override val lastUpdated : Pair <ProDetails , Instant >) : LoadState
57+ data class Error (override val lastUpdated : Pair <ProDetails , Instant >? ) : LoadState
58+ }
59+
60+ private val refreshRequests: SendChannel <Unit >
61+
62+ val loadState: StateFlow <LoadState >
63+
64+ init {
65+ val channel = Channel <Unit >(capacity = 10 , onBufferOverflow = BufferOverflow .DROP_OLDEST )
66+
67+ refreshRequests = channel
68+ @Suppress(" OPT_IN_USAGE" )
69+ loadState = prefs.flowPostProLaunch {
70+ loginStateRepository.loggedInState
71+ .mapNotNull { it?.seeded?.proMasterPrivateKey }
72+ }.distinctUntilChanged()
73+ .flatMapLatest { proMasterKey ->
74+ flow {
75+ var last = db.getProDetailsAndLastUpdated()
76+ var numRetried = 0
77+
78+ while (true ) {
79+ // Drain all pending requests as we are about to execute a request
80+ while (channel.tryReceive().isSuccess) { }
81+
82+ var retryingAt: Instant ? = null
83+
84+ if (last != null && last.second.plusSeconds(MIN_UPDATE_INTERVAL_SECONDS ) >= Instant .now()) {
85+ Log .d(TAG , " Pro details is fresh enough, skipping fetch" )
86+ // Last update was recent enough, skip fetching
87+ emit(LoadState .Loaded (last))
88+ } else {
89+ if (! networkConnectivity.networkAvailable.value) {
90+ // No network...mark the state and wait for it to come back
91+ emit(LoadState .Loading (last, waitingForNetwork = true ))
92+ networkConnectivity.networkAvailable.first { it }
93+ }
94+
95+ emit(LoadState .Loading (last, waitingForNetwork = false ))
96+
97+ // Fetch new details
98+ try {
99+ Log .d(TAG , " Start fetching Pro details from backend" )
100+ last = apiExecutor.executeRequest(
101+ request = getProDetailsRequestFactory.create(proMasterKey)
102+ ).successOrThrow() to Instant .now()
103+
104+ db.updateProDetails(last.first, last.second)
105+
106+ Log .d(TAG , " Successfully fetched Pro details from backend" )
107+ emit(LoadState .Loaded (last))
108+ numRetried = 0
109+ } catch (e: Exception ) {
110+ if (e is CancellationException ) throw e
111+
112+ emit(LoadState .Error (last))
113+
114+ // Exponential backoff for retries, capped at 2 minutes
115+ val delaySeconds = minOf(10L * (1L shl numRetried), 120L )
116+ Log .e(TAG , " Error fetching Pro details from backend, retrying in ${delaySeconds} s" , e)
117+
118+ retryingAt = Instant .now().plusSeconds(delaySeconds)
119+ numRetried++
120+ }
121+ }
122+
123+
124+ // Wait until either a refresh is requested, or it's time to retry
125+ select {
126+ refreshRequests.onReceiveCatching {
127+ Log .d(TAG , " Manual refresh requested" )
128+ }
129+
130+ if (retryingAt != null ) {
131+ val delayMillis =
132+ Duration .between(Instant .now(), retryingAt).toMillis()
133+ onTimeout(delayMillis) {
134+ Log .d(TAG , " Retrying Pro details fetch after delay" )
135+ }
136+ }
137+ }
138+ }
139+ }
140+ }.stateIn(scope, SharingStarted .Eagerly , LoadState .Init )
141+ }
142+
143+ fun requestRefresh () {
144+ refreshRequests.trySend(Unit )
145+ }
146+
147+ companion object {
148+ private const val TAG = " ProDetailsRepository"
149+ private const val MIN_UPDATE_INTERVAL_SECONDS = 120L
150+ }
151+ }
0 commit comments