Skip to content

Commit e5ad357

Browse files
authored
Concurrent - sleep & waitFor (#1449)
1 parent 2b5222b commit e5ad357

File tree

2 files changed

+136
-0
lines changed

2 files changed

+136
-0
lines changed
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package arrow.effects.internal
2+
3+
import arrow.Kind
4+
import arrow.core.Either
5+
import arrow.core.Left
6+
import arrow.core.Right
7+
import arrow.effects.typeclasses.Concurrent
8+
import arrow.effects.typeclasses.Duration
9+
import java.util.concurrent.Executors
10+
import java.util.concurrent.ScheduledExecutorService
11+
import kotlin.coroutines.Continuation
12+
import kotlin.coroutines.CoroutineContext
13+
import kotlin.coroutines.startCoroutine
14+
15+
@Suppress("FunctionName")
16+
internal fun <F> Concurrent<F>.ConcurrentSleep(duration: Duration): Kind<F, Unit> = cancelable { cb ->
17+
val cancelRef = scheduler.schedule(ShiftTick(dispatchers().default(), cb), duration.amount, duration.timeUnit)
18+
delay { cancelRef.cancel(false); Unit }
19+
}
20+
21+
/**
22+
* [scheduler] is **only** for internal use for the [Concurrent.sleep] implementation.
23+
* This way we can guarantee nothing besides sleeping ever occurs here.
24+
*/
25+
internal val scheduler: ScheduledExecutorService by lazy {
26+
Executors.newScheduledThreadPool(2) { r ->
27+
Thread(r).apply {
28+
name = "arrow-effect-scheduler-$id"
29+
isDaemon = true
30+
}
31+
}
32+
}
33+
34+
/**
35+
* [ShiftTick] is a small utility to [Concurrent.shift] work away from the [ScheduledExecutorService].
36+
* As mentioned in [scheduler] no work should ever happen there.
37+
* So after sleeping we need to shift away to not keep that thread occupied.
38+
*/
39+
internal class ShiftTick(
40+
private val ctx: CoroutineContext,
41+
private val cb: (Either<Throwable, Unit>) -> Unit
42+
) : Runnable {
43+
override fun run() {
44+
suspend { Unit }.startCoroutine(Continuation(ctx) {
45+
it.fold({ unit -> cb(Right(unit)) }, { e -> cb(Left(e)) })
46+
})
47+
}
48+
}
49+
50+
class TimeoutException(override val message: String) : Exception()

modules/effects/arrow-effects-data/src/main/kotlin/arrow/effects/typeclasses/Concurrent.kt

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ import arrow.effects.CancelToken
1313
import arrow.effects.KindConnection
1414
import arrow.effects.MVar
1515
import arrow.effects.data.internal.BindingCancellationException
16+
import arrow.effects.internal.ConcurrentSleep
17+
import arrow.effects.internal.TimeoutException
1618
import arrow.typeclasses.MonadContinuation
1719
import java.util.concurrent.atomic.AtomicReference
1820
import kotlin.coroutines.CoroutineContext
@@ -722,6 +724,90 @@ interface Concurrent<F> : Async<F> {
722724

723725
override fun <B> binding(c: suspend MonadContinuation<F, *>.() -> B): Kind<F, B> =
724726
bindingCancellable { c() }.a
727+
728+
/**
729+
* Sleeps for a given [duration] without blocking a thread.
730+
* Used to derive [waitFor] and can be used to created timed events like backing-off retries.
731+
*
732+
* ```kotlin:ank:playground
733+
* import arrow.*
734+
* import arrow.effects.*
735+
* import arrow.effects.typeclasses.*
736+
* import arrow.effects.extensions.io.concurrent.concurrent
737+
*
738+
* fun main(args: Array<String>) {
739+
* //sampleStart
740+
* fun <F> Concurrent<F>.delayHelloWorld(): Kind<F, Unit> =
741+
* sleep(3.seconds).flatMap {
742+
* delay { println("Hello World!") }
743+
* }
744+
* //sampleEnd
745+
* IO.concurrent().delayHelloWorld()
746+
* .fix().unsafeRunSync()
747+
* }
748+
* ```
749+
* @see waitFor
750+
**/
751+
fun sleep(duration: Duration): Kind<F, Unit> = ConcurrentSleep(duration)
752+
753+
/**
754+
* Returns the result of [this] within the specified [duration] or the [default] value.
755+
*
756+
* ```kotlin:ank:playground
757+
* import arrow.*
758+
* import arrow.effects.*
759+
* import arrow.effects.typeclasses.*
760+
* import arrow.effects.extensions.io.concurrent.concurrent
761+
*
762+
* fun main(args: Array<String>) {
763+
* //sampleStart
764+
* fun <F> Concurrent<F>.timedOutWorld(): Kind<F, Unit> {
765+
* val world = sleep(3.seconds).flatMap { delay { println("Hello World!") } }
766+
* val fallbackWorld = delay { println("Hello from the backup") }
767+
* return world.waitFor(1.seconds, fallbackWorld)
768+
* }
769+
* //sampleEnd
770+
* IO.concurrent().timedOutWorld()
771+
* .fix().unsafeRunSync()
772+
* }
773+
* ```
774+
**/
775+
fun <A> Kind<F, A>.waitFor(duration: Duration, default: Kind<F, A>): Kind<F, A> =
776+
dispatchers().default().raceN(this, sleep(duration)).flatMap {
777+
it.fold(
778+
{ a -> just(a) },
779+
{ default }
780+
)
781+
}
782+
783+
/**
784+
* Returns the result of [this] within the specified [duration] or the raises a [TimeoutException] exception.
785+
*
786+
* ```kotlin:ank:playground
787+
* import arrow.*
788+
* import arrow.effects.*
789+
* import arrow.effects.typeclasses.*
790+
* import arrow.effects.extensions.io.concurrent.concurrent
791+
*
792+
* fun main(args: Array<String>) {
793+
* //sampleStart
794+
* fun <F> Concurrent<F>.timedOutWorld(): Kind<F, Unit> {
795+
* val world = sleep(1.seconds).flatMap { delay { println("Hello World!") } }
796+
* return world.waitFor(3.seconds)
797+
* }
798+
* //sampleEnd
799+
* IO.concurrent().timedOutWorld()
800+
* .fix().unsafeRunSync()
801+
* }
802+
* ```
803+
**/
804+
fun <A> Kind<F, A>.waitFor(duration: Duration): Kind<F, A> =
805+
dispatchers().default().raceN(this, sleep(duration)).flatMap {
806+
it.fold(
807+
{ a -> just(a) },
808+
{ raiseError(TimeoutException(duration.toString())) }
809+
)
810+
}
725811
}
726812

727813
/** Alias for `Either` structure to provide consistent signature for race methods. */

0 commit comments

Comments
 (0)