@@ -2,21 +2,28 @@ package org.session.libsession.messaging.sending_receiving.pollers
22
33import kotlinx.coroutines.CancellationException
44import kotlinx.coroutines.CoroutineScope
5+ import kotlinx.coroutines.Deferred
6+ import kotlinx.coroutines.async
57import kotlinx.coroutines.channels.Channel
8+ import kotlinx.coroutines.channels.SendChannel
69import kotlinx.coroutines.delay
710import kotlinx.coroutines.flow.MutableStateFlow
811import kotlinx.coroutines.flow.StateFlow
912import kotlinx.coroutines.flow.combine
1013import kotlinx.coroutines.flow.filter
1114import kotlinx.coroutines.flow.first
1215import kotlinx.coroutines.launch
13- import kotlinx.coroutines.sync.Mutex
14- import kotlinx.coroutines.sync.withLock
16+ import kotlinx.coroutines.selects.selectUnbiased
1517import org.session.libsignal.utilities.Log
1618import org.thoughtcrime.securesms.util.AppVisibilityManager
1719import org.thoughtcrime.securesms.util.NetworkConnectivity
1820import kotlin.time.Clock
21+ import kotlin.time.Duration.Companion.seconds
1922import kotlin.time.Instant
23+ import kotlin.time.TimeMark
24+ import kotlin.time.TimeSource
25+
26+ private typealias PollRequestCallback <T > = SendChannel <Result <T >>
2027
2128/* *
2229 * Base class for pollers that perform periodic polling operations. These poller will:
@@ -30,11 +37,12 @@ import kotlin.time.Instant
3037 */
3138abstract class BasePoller <T >(
3239 private val networkConnectivity : NetworkConnectivity ,
33- appVisibilityManager : AppVisibilityManager ,
40+ private val appVisibilityManager : AppVisibilityManager ,
3441 private val scope : CoroutineScope ,
3542) {
3643 protected val logTag: String = this ::class .java.simpleName
37- private val pollMutex = Mutex ()
44+
45+ private val manualPollRequestSender: SendChannel <PollRequestCallback <T >>
3846
3947 private val mutablePollState = MutableStateFlow <PollState <T >>(PollState .Idle )
4048
@@ -44,43 +52,77 @@ abstract class BasePoller<T>(
4452 val pollState: StateFlow <PollState <T >> get() = mutablePollState
4553
4654 init {
55+ val manualPollRequestChannel = Channel <PollRequestCallback <T >>()
56+
57+ manualPollRequestSender = manualPollRequestChannel
58+
4759 scope.launch {
4860 var numConsecutiveFailures = 0
61+ var nextRoutinePollAt: TimeMark ? = null
4962
5063 while (true ) {
51- // Wait until the app is in the foreground and we have network connectivity
52- combine(
53- appVisibilityManager.isAppVisible.filter { visible ->
54- if (visible) {
55- true
56- } else {
57- Log .d(logTag, " Polling paused - app in background" )
58- false
59- }
60- },
61- networkConnectivity.networkAvailable.filter { hasNetwork ->
62- if (hasNetwork) {
63- true
64- } else {
65- Log .d(logTag, " Polling paused - no network connectivity" )
66- false
67- }
68- },
69- transform = { _, _ -> }
70- ).first()
71-
72- try {
73- pollOnce(" routine" )
74- numConsecutiveFailures = 0
75- } catch (e: CancellationException ) {
76- throw e
77- } catch (_: Throwable ) {
78- numConsecutiveFailures + = 1
64+ val waitForRoutinePollDeferred = waitForRoutinePoll(nextRoutinePollAt)
65+
66+ val (pollReason, callback) = selectUnbiased {
67+ manualPollRequestChannel.onReceive { callback ->
68+ " manual" to callback
69+ }
70+
71+ waitForRoutinePollDeferred.onAwait {
72+ " routine" to null
73+ }
7974 }
8075
76+ // Clean up the deferred
77+ waitForRoutinePollDeferred.cancel()
78+
79+ val result = runCatching {
80+ pollOnce(pollReason)
81+ }.onSuccess { numConsecutiveFailures = 0 }
82+ .onFailure {
83+ if (it is CancellationException ) throw it
84+ numConsecutiveFailures + = 1
85+ }
86+
87+ // Must use trySend as we shouldn't be waiting or responsible for
88+ // the manual request (potential) ill-setup.
89+ callback?.trySend(result)
90+
8191 val nextPollSeconds = nextPollDelaySeconds(numConsecutiveFailures)
82- Log .d(logTag, " Next poll in ${nextPollSeconds} s" )
83- delay(nextPollSeconds * 1000L )
92+ nextRoutinePollAt = TimeSource .Monotonic .markNow().plus(nextPollSeconds.seconds)
93+ }
94+ }
95+ }
96+
97+ private fun waitForRoutinePoll (minStartAt : TimeMark ? ): Deferred <Unit > {
98+ return scope.async {
99+ combine(
100+ appVisibilityManager.isAppVisible.filter { visible ->
101+ if (visible) {
102+ true
103+ } else {
104+ Log .d(logTag, " Polling paused - app in background" )
105+ false
106+ }
107+ },
108+ networkConnectivity.networkAvailable.filter { hasNetwork ->
109+ if (hasNetwork) {
110+ true
111+ } else {
112+ Log .d(logTag, " Polling paused - no network connectivity" )
113+ false
114+ }
115+ },
116+ { _, _ -> }
117+ ).first()
118+
119+ // At this point, the criteria for routine poll are all satisfied.
120+
121+ // If we are told we can only start executing from a time, wait until that.
122+ val delayMillis = minStartAt?.elapsedNow()?.let { - it.inWholeMilliseconds }
123+ if (delayMillis != null && delayMillis > 0 ) {
124+ Log .d(logTag, " Delay next poll for ${delayMillis} ms" )
125+ delay(delayMillis)
84126 }
85127 }
86128 }
@@ -104,34 +146,32 @@ abstract class BasePoller<T>(
104146 /* *
105147 * Performs a single polling operation. A failed poll should throw an exception.
106148 *
107- * @param isFirstPollSinceApoStarted True if this is the first poll since the app started.
149+ * @param isFirstPollSinceAppStarted True if this is the first poll since the app started.
108150 * @return The result of the polling operation.
109151 */
110- protected abstract suspend fun doPollOnce (isFirstPollSinceApoStarted : Boolean ): T
152+ protected abstract suspend fun doPollOnce (isFirstPollSinceAppStarted : Boolean ): T
111153
112154 private suspend fun pollOnce (reason : String ): T {
113- pollMutex.withLock {
114- val lastState = mutablePollState.value
115- mutablePollState.value =
116- PollState .Polling (reason, lastPolledResult = lastState.lastPolledResult)
117- Log .d(logTag, " Start $reason polling" )
118- val result = runCatching {
119- doPollOnce(isFirstPollSinceApoStarted = lastState is PollState .Idle )
120- }
155+ val lastState = mutablePollState.value
156+ mutablePollState.value =
157+ PollState .Polling (reason, lastPolledResult = lastState.lastPolledResult)
158+ Log .d(logTag, " Start $reason polling" )
159+ val result = runCatching {
160+ doPollOnce(isFirstPollSinceAppStarted = lastState is PollState .Idle )
161+ }
121162
122- if (result.isSuccess) {
123- Log .d(logTag, " $reason polling succeeded" )
124- } else if (result.exceptionOrNull() !is CancellationException ) {
125- Log .e(logTag, " $reason polling failed" , result.exceptionOrNull())
126- }
163+ if (result.isSuccess) {
164+ Log .d(logTag, " $reason polling succeeded" )
165+ } else if (result.exceptionOrNull() !is CancellationException ) {
166+ Log .e(logTag, " $reason polling failed" , result.exceptionOrNull())
167+ }
127168
128- mutablePollState.value = PollState .Polled (
129- at = Clock .System .now(),
130- result = result,
131- )
169+ mutablePollState.value = PollState .Polled (
170+ at = Clock .System .now(),
171+ result = result,
172+ )
132173
133- return result.getOrThrow()
134- }
174+ return result.getOrThrow()
135175 }
136176
137177 /* *
@@ -143,15 +183,9 @@ abstract class BasePoller<T>(
143183 * * This method will throw if the polling operation fails.
144184 */
145185 suspend fun manualPollOnce (): T {
146- val resultChannel = Channel <Result <T >>()
147-
148- scope.launch {
149- resultChannel.trySend(runCatching {
150- pollOnce(" manual" )
151- })
152- }
153-
154- return resultChannel.receive().getOrThrow()
186+ val callback = Channel <Result <T >>(capacity = 1 )
187+ manualPollRequestSender.send(callback)
188+ return callback.receive().getOrThrow()
155189 }
156190
157191
0 commit comments