Skip to content
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package arrow.effects.internal

import arrow.Kind
import arrow.core.Either
import arrow.core.Left
import arrow.core.Right
import arrow.effects.typeclasses.Concurrent
import arrow.effects.typeclasses.Duration
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService
import kotlin.coroutines.Continuation
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.startCoroutine

@Suppress("FunctionName")
internal fun <F> Concurrent<F>.ConcurrentSleep(duration: Duration): Kind<F, Unit> = cancelable { cb ->
val cancelRef = scheduler.schedule(ShiftTick(dispatchers().default(), cb), duration.amount, duration.timeUnit)
delay { cancelRef.cancel(false); Unit }
}

/**
* [scheduler] is **only** for internal use for the [Concurrent.sleep] implementation.
* This way we can guarantee nothing besides sleeping ever occurs here.
*/
internal val scheduler: ScheduledExecutorService by lazy {
Executors.newScheduledThreadPool(2) { r ->
Thread(r).apply {
name = "arrow-effect-scheduler-$id"
isDaemon = true
}
}
}

/**
* [ShiftTick] is a small utility to [Concurrent.shift] work away from the [ScheduledExecutorService].
* As mentioned in [scheduler] no work should ever happen there.
* So after sleeping we need to shift away to not keep that thread occupied.
*/
internal class ShiftTick(
private val ctx: CoroutineContext,
private val cb: (Either<Throwable, Unit>) -> Unit
) : Runnable {
override fun run() {
suspend { Unit }.startCoroutine(Continuation(ctx) {
it.fold({ unit -> cb(Right(unit)) }, { e -> cb(Left(e)) })
})
}
}

class TimeoutException(override val message: String) : Exception()
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import arrow.effects.CancelToken
import arrow.effects.KindConnection
import arrow.effects.MVar
import arrow.effects.data.internal.BindingCancellationException
import arrow.effects.internal.ConcurrentSleep
import arrow.effects.internal.TimeoutException
import arrow.typeclasses.MonadContinuation
import java.util.concurrent.atomic.AtomicReference
import kotlin.coroutines.CoroutineContext
Expand Down Expand Up @@ -722,6 +724,90 @@ interface Concurrent<F> : Async<F> {

override fun <B> binding(c: suspend MonadContinuation<F, *>.() -> B): Kind<F, B> =
bindingCancellable { c() }.a

/**
* Sleeps for a given [duration] without blocking a thread.
* Used to derive [waitFor] and can be used to created timed events like backing-off retries.
*
* ```kotlin:ank:playground
* import arrow.*
* import arrow.effects.*
* import arrow.effects.typeclasses.*
* import arrow.effects.extensions.io.concurrent.concurrent
*
* fun main(args: Array<String>) {
* //sampleStart
* fun <F> Concurrent<F>.delayHelloWorld(): Kind<F, Unit> =
* sleep(3.seconds).flatMap {
* delay { println("Hello World!") }
* }
* //sampleEnd
* IO.concurrent().delayHelloWorld()
* .fix().unsafeRunSync()
* }
* ```
* @see waitFor
**/
fun sleep(duration: Duration): Kind<F, Unit> = ConcurrentSleep(duration)

/**
* Returns the result of [this] within the specified [duration] or the [default] value.
*
* ```kotlin:ank:playground
* import arrow.*
* import arrow.effects.*
* import arrow.effects.typeclasses.*
* import arrow.effects.extensions.io.concurrent.concurrent
*
* fun main(args: Array<String>) {
* //sampleStart
* fun <F> Concurrent<F>.timedOutWorld(): Kind<F, Unit> {
* val world = sleep(3.seconds).flatMap { delay { println("Hello World!") } }
* val fallbackWorld = delay { println("Hello from the backup") }
* return world.waitFor(1.seconds, fallbackWorld)
* }
* //sampleEnd
* IO.concurrent().timedOutWorld()
* .fix().unsafeRunSync()
* }
* ```
**/
fun <A> Kind<F, A>.waitFor(duration: Duration, default: Kind<F, A>): Kind<F, A> =
dispatchers().default().raceN(this, sleep(duration)).flatMap {
it.fold(
{ a -> just(a) },
{ default }
)
}

/**
* Returns the result of [this] within the specified [duration] or the raises a [TimeoutException] exception.
*
* ```kotlin:ank:playground
* import arrow.*
* import arrow.effects.*
* import arrow.effects.typeclasses.*
* import arrow.effects.extensions.io.concurrent.concurrent
*
* fun main(args: Array<String>) {
* //sampleStart
* fun <F> Concurrent<F>.timedOutWorld(): Kind<F, Unit> {
* val world = sleep(1.seconds).flatMap { delay { println("Hello World!") } }
* return world.waitFor(3.seconds)
* }
* //sampleEnd
* IO.concurrent().timedOutWorld()
* .fix().unsafeRunSync()
* }
* ```
**/
fun <A> Kind<F, A>.waitFor(duration: Duration): Kind<F, A> =
dispatchers().default().raceN(this, sleep(duration)).flatMap {
it.fold(
{ a -> just(a) },
{ raiseError(TimeoutException(duration.toString())) }
)
}
}

/** Alias for `Either` structure to provide consistent signature for race methods. */
Expand Down