Skip to content

Commit 959be0d

Browse files
authored
IO forking (#1447)
1 parent 80d74ce commit 959be0d

File tree

14 files changed

+121
-186
lines changed

14 files changed

+121
-186
lines changed

modules/benchmarks/arrow-benchmarks-effects/src/jmh/kotlin/arrow/benchmarks/ForkFiber.kt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package arrow.benchmarks
33
import arrow.effects.IO
44
import arrow.effects.IODispatchers
55
import arrow.effects.fix
6-
import arrow.effects.startFiber
76
import org.openjdk.jmh.annotations.Benchmark
87
import org.openjdk.jmh.annotations.CompilerControl
98
import org.openjdk.jmh.annotations.Fork

modules/effects/arrow-effects-data/src/main/kotlin/arrow/effects/IO.kt

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,12 @@ import arrow.effects.OnCancel.Companion.CancellationException
1313
import arrow.effects.OnCancel.Silent
1414
import arrow.effects.OnCancel.ThrowCancellationException
1515
import arrow.effects.internal.IOBracket
16+
import arrow.effects.internal.IOFiber
17+
import arrow.effects.internal.IOForkedStart
1618
import arrow.effects.internal.Platform.maxStackDepthSize
1719
import arrow.effects.internal.Platform.onceOnly
1820
import arrow.effects.internal.Platform.unsafeResync
21+
import arrow.effects.internal.UnsafePromise
1922
import arrow.effects.typeclasses.Disposable
2023
import arrow.effects.typeclasses.Duration
2124
import arrow.effects.typeclasses.ExitCase
@@ -123,6 +126,39 @@ sealed class IO<out A> : IOOf<A> {
123126
open fun continueOn(ctx: CoroutineContext): IO<A> =
124127
ContinueOn(this, ctx)
125128

129+
/**
130+
* Create a new [IO] that upon execution starts the receiver [IO] within a [Fiber] on [ctx].
131+
*
132+
* ```kotlin:ank:playground
133+
* import arrow.effects.*
134+
* import arrow.effects.extensions.io.async.async
135+
* import arrow.effects.extensions.io.monad.binding
136+
* import kotlinx.coroutines.Dispatchers
137+
*
138+
* fun main(args: Array<String>) {
139+
* //sampleStart
140+
* binding {
141+
* val promise = Promise.uncancelable<ForIO, Int>(IO.async()).bind()
142+
* val fiber = promise.get().fix().startFiber(Dispatchers.Default).bind()
143+
* promise.complete(1).bind()
144+
* fiber.join().bind()
145+
* }.unsafeRunSync() == 1
146+
* //sampleEnd
147+
* }
148+
* ```
149+
*
150+
* @receiver [IO] to execute on [ctx] within a new suspended [IO].
151+
* @param ctx [CoroutineContext] to execute the source [IO] on.
152+
* @return [IO] with suspended execution of source [IO] on context [ctx].
153+
*/
154+
fun startFiber(ctx: CoroutineContext): IO<Fiber<ForIO, A>> = IO {
155+
val promise = UnsafePromise<A>()
156+
// A new IOConnection, because its cancellation is now decoupled from our current one.
157+
val conn = IOConnection()
158+
IORunLoop.startCancelable(IOForkedStart(this, ctx), conn, promise::complete)
159+
IOFiber(promise, conn)
160+
}
161+
126162
fun <B> followedBy(fb: IOOf<B>) = flatMap { fb }
127163

128164
fun attempt(): IO<Either<Throwable, A>> =

modules/effects/arrow-effects-data/src/main/kotlin/arrow/effects/IORacePair.kt

Lines changed: 7 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,12 @@ import arrow.core.Left
55
import arrow.core.Right
66
import arrow.core.Tuple2
77
import arrow.effects.internal.IOFiber
8+
import arrow.effects.internal.IOForkedStart
89
import arrow.effects.internal.Platform
910
import arrow.effects.internal.UnsafePromise
10-
import arrow.effects.internal.asyncContinuation
1111
import arrow.effects.typeclasses.Fiber
1212
import java.util.concurrent.atomic.AtomicBoolean
13-
import kotlin.coroutines.Continuation
1413
import kotlin.coroutines.CoroutineContext
15-
import kotlin.coroutines.startCoroutine
16-
import kotlin.coroutines.suspendCoroutine
1714

1815
/**
1916
* Race two tasks concurrently within a new [IO].
@@ -51,10 +48,10 @@ import kotlin.coroutines.suspendCoroutine
5148
* @see [arrow.effects.typeclasses.Concurrent.raceN] for a simpler version that cancels loser.
5249
*/
5350
fun <A, B> IO.Companion.racePair(ctx: CoroutineContext, ioA: IOOf<A>, ioB: IOOf<B>): IO<Either<Tuple2<A, Fiber<ForIO, B>>, Tuple2<Fiber<ForIO, A>, B>>> =
54-
IO.async { conn, cb ->
51+
async { conn, cb ->
5552
val active = AtomicBoolean(true)
5653

57-
val upstreamCancelToken = IO.defer { if (conn.isCanceled()) IO.unit else conn.cancel() }
54+
val upstreamCancelToken = defer { if (conn.isCanceled()) unit else conn.cancel() }
5855

5956
// Cancelable connection for the left value
6057
val connA = IOConnection()
@@ -68,31 +65,7 @@ fun <A, B> IO.Companion.racePair(ctx: CoroutineContext, ioA: IOOf<A>, ioB: IOOf<
6865

6966
conn.pushPair(connA, connB)
7067

71-
val a: suspend () -> A = {
72-
suspendCoroutine { ca: Continuation<A> ->
73-
IORunLoop.startCancelable(ioA, connA) { either: Either<Throwable, A> ->
74-
either.fold({ error ->
75-
ca.resumeWith(Result.failure(error))
76-
}, { a ->
77-
ca.resumeWith(Result.success(a))
78-
})
79-
}
80-
}
81-
}
82-
83-
val b: suspend () -> B = {
84-
suspendCoroutine { ca: Continuation<B> ->
85-
IORunLoop.startCancelable(ioB, connB) { either: Either<Throwable, B> ->
86-
either.fold({ error ->
87-
ca.resumeWith(Result.failure(error))
88-
}, { b ->
89-
ca.resumeWith(Result.success(b))
90-
})
91-
}
92-
}
93-
}
94-
95-
a.startCoroutine(asyncContinuation(ctx) { either ->
68+
IORunLoop.startCancelable(IOForkedStart(ioA, ctx), connA) { either: Either<Throwable, A> ->
9669
either.fold({ error ->
9770
if (active.getAndSet(false)) { // if an error finishes first, stop the race.
9871
connB.cancel().fix().unsafeRunAsync { r2 ->
@@ -110,9 +83,9 @@ fun <A, B> IO.Companion.racePair(ctx: CoroutineContext, ioA: IOOf<A>, ioB: IOOf<
11083
promiseA.complete(Right(a))
11184
}
11285
})
113-
})
86+
}
11487

115-
b.startCoroutine(asyncContinuation(ctx) { either ->
88+
IORunLoop.startCancelable(IOForkedStart(ioB, ctx), connB) { either: Either<Throwable, B> ->
11689
either.fold({ error ->
11790
if (active.getAndSet(false)) { // if an error finishes first, stop the race.
11891
connA.cancel().fix().unsafeRunAsync { r2 ->
@@ -130,5 +103,5 @@ fun <A, B> IO.Companion.racePair(ctx: CoroutineContext, ioA: IOOf<A>, ioB: IOOf<
130103
promiseB.complete(Right(b))
131104
}
132105
})
133-
})
106+
}
134107
}

modules/effects/arrow-effects-data/src/main/kotlin/arrow/effects/IORaceTriple.kt

Lines changed: 9 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,13 @@ import arrow.core.Left
55
import arrow.core.Right
66
import arrow.core.Tuple3
77
import arrow.effects.internal.IOFiber
8+
import arrow.effects.internal.IOForkedStart
89
import arrow.effects.internal.Platform
910
import arrow.effects.internal.UnsafePromise
10-
import arrow.effects.internal.asyncContinuation
1111
import arrow.effects.typeclasses.Fiber
1212
import arrow.effects.typeclasses.RaceTriple
1313
import java.util.concurrent.atomic.AtomicBoolean
14-
import kotlin.coroutines.Continuation
1514
import kotlin.coroutines.CoroutineContext
16-
import kotlin.coroutines.startCoroutine
17-
import kotlin.coroutines.suspendCoroutine
1815

1916
/**
2017
* Race three tasks concurrently within a new [IO].
@@ -52,10 +49,10 @@ import kotlin.coroutines.suspendCoroutine
5249
* @see [arrow.effects.typeclasses.Concurrent.raceN] for a simpler version that cancels losers.
5350
*/
5451
fun <A, B, C> IO.Companion.raceTriple(ctx: CoroutineContext, ioA: IOOf<A>, ioB: IOOf<B>, ioC: IOOf<C>): IO<RaceTriple<ForIO, A, B, C>> =
55-
IO.async { conn, cb ->
52+
async { conn, cb ->
5653
val active = AtomicBoolean(true)
5754

58-
val upstreamCancelToken = IO.defer { if (conn.isCanceled()) IO.unit else conn.cancel() }
55+
val upstreamCancelToken = defer { if (conn.isCanceled()) unit else conn.cancel() }
5956

6057
val connA = IOConnection()
6158
connA.push(upstreamCancelToken)
@@ -71,43 +68,7 @@ fun <A, B, C> IO.Companion.raceTriple(ctx: CoroutineContext, ioA: IOOf<A>, ioB:
7168

7269
conn.push(connA.cancel(), connB.cancel(), connC.cancel())
7370

74-
val a: suspend () -> A = {
75-
suspendCoroutine { ca: Continuation<A> ->
76-
IORunLoop.startCancelable(ioA, connA) { either: Either<Throwable, A> ->
77-
either.fold({ error ->
78-
ca.resumeWith(Result.failure(error))
79-
}, { a ->
80-
ca.resumeWith(Result.success(a))
81-
})
82-
}
83-
}
84-
}
85-
86-
val b: suspend () -> B = {
87-
suspendCoroutine { ca: Continuation<B> ->
88-
IORunLoop.startCancelable(ioB, connB) { either: Either<Throwable, B> ->
89-
either.fold({ error ->
90-
ca.resumeWith(Result.failure(error))
91-
}, { b ->
92-
ca.resumeWith(Result.success(b))
93-
})
94-
}
95-
}
96-
}
97-
98-
val c: suspend () -> C = {
99-
suspendCoroutine { ca: Continuation<C> ->
100-
IORunLoop.startCancelable(ioC, connC) { either: Either<Throwable, C> ->
101-
either.fold({ error ->
102-
ca.resumeWith(Result.failure(error))
103-
}, { c ->
104-
ca.resumeWith(Result.success(c))
105-
})
106-
}
107-
}
108-
}
109-
110-
a.startCoroutine(asyncContinuation(ctx) { either ->
71+
IORunLoop.startCancelable(IOForkedStart(ioA, ctx), connA) { either: Either<Throwable, A> ->
11172
either.fold({ error ->
11273
if (active.getAndSet(false)) { // if an error finishes first, stop the race.
11374
connB.cancel().fix().unsafeRunAsync { r2 ->
@@ -132,9 +93,9 @@ fun <A, B, C> IO.Companion.raceTriple(ctx: CoroutineContext, ioA: IOOf<A>, ioB:
13293
promiseA.complete(Right(a))
13394
}
13495
})
135-
})
96+
}
13697

137-
b.startCoroutine(asyncContinuation(ctx) { either ->
98+
IORunLoop.startCancelable(IOForkedStart(ioB, ctx), connB) { either: Either<Throwable, B> ->
13899
either.fold({ error ->
139100
if (active.getAndSet(false)) { // if an error finishes first, stop the race.
140101
connA.cancel().fix().unsafeRunAsync { r2 ->
@@ -159,9 +120,9 @@ fun <A, B, C> IO.Companion.raceTriple(ctx: CoroutineContext, ioA: IOOf<A>, ioB:
159120
promiseB.complete(Right(b))
160121
}
161122
})
162-
})
123+
}
163124

164-
c.startCoroutine(asyncContinuation(ctx) { either ->
125+
IORunLoop.startCancelable(IOForkedStart(ioC, ctx), connC) { either: Either<Throwable, C> ->
165126
either.fold({ error ->
166127
if (active.getAndSet(false)) { // if an error finishes first, stop the race.
167128
connA.cancel().fix().unsafeRunAsync { r2 ->
@@ -186,5 +147,5 @@ fun <A, B, C> IO.Companion.raceTriple(ctx: CoroutineContext, ioA: IOOf<A>, ioB:
186147
promiseC.complete(Right(c))
187148
}
188149
})
189-
})
150+
}
190151
}

modules/effects/arrow-effects-data/src/main/kotlin/arrow/effects/IOStart.kt

Lines changed: 0 additions & 67 deletions
This file was deleted.

modules/effects/arrow-effects-data/src/main/kotlin/arrow/effects/internal/Utils.kt

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ import arrow.core.Some
77
import arrow.core.left
88
import arrow.core.right
99
import arrow.effects.IO
10+
import arrow.effects.IOOf
1011
import arrow.effects.KindConnection
12+
import arrow.effects.fix
1113
import arrow.effects.typeclasses.Duration
1214
import java.util.concurrent.Executor
1315
import java.util.concurrent.atomic.AtomicBoolean
@@ -284,3 +286,16 @@ internal fun <A> asyncContinuation(ctx: CoroutineContext, cc: (Either<Throwable,
284286
cc(exception.left())
285287
}
286288
}
289+
290+
/**
291+
* Utility to makes sure that the original [fa] is gets forked on [ctx].
292+
* @see IO.startFiber
293+
* @see arrow.effects.racePair
294+
* @see arrow.effects.raceTriple
295+
*
296+
* This moves the forking inside the [IO] operation,
297+
* so it'll share it's [kotlin.coroutines.Continuation] with other potential jumps or [IO.async].
298+
* @see [arrow.effects.IORunLoop.RestartCallback]
299+
*/
300+
internal fun <A> IOForkedStart(fa: IOOf<A>, ctx: CoroutineContext): IO<A> =
301+
IO.Bind(IO.ContinueOn(IO.unit, ctx)) { fa.fix() }

modules/effects/arrow-effects-data/src/main/kotlin/arrow/effects/typeclasses/Concurrent.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ interface Concurrent<F> : Async<F> {
153153
* //sampleStart
154154
* binding {
155155
* val promise = Promise.uncancelable<ForIO, Int>(IO.async()).bind()
156-
* val fiber = promise.get().startFiber(Dispatchers.Default).bind()
156+
* val fiber = promise.get().fix().startFiber(Dispatchers.Default).bind()
157157
* promise.complete(1).bind()
158158
* fiber.join().bind()
159159
* }.unsafeRunSync() == 1

modules/effects/arrow-effects-data/src/main/kotlin/arrow/effects/typeclasses/Fiber.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import arrow.effects.CancelToken
1010
* You can think of fibers as being lightweight threads, a Fiber being a
1111
* concurrency primitive for doing cooperative multi-tasking.
1212
*/
13-
interface Fiber<F, A> {
13+
interface Fiber<F, out A> {
1414

1515
/**
1616
* Returns a new task that will await for the completion of the

modules/effects/arrow-effects-data/src/test/kotlin/arrow/effects/EffectsSuspendDSLTests.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,11 +72,11 @@ class EffectsSuspendDSLTests : UnitSpec() {
7272
val program = fx {
7373
// note how the receiving value is typed in the environment and not inside IO despite being effectful and
7474
// non-blocking parallel computations
75-
val result: List<String> = !NonBlocking.parMapN(
75+
val result: List<String> = !newCountingThreadFactory("test", 2).asCoroutineContext().parMapN(
7676
effect { getThreadName() },
7777
effect { getThreadName() }
7878
) { a, b -> listOf(a, b) }
79-
effect { println(result) }
79+
!effect { println(result) }
8080
result
8181
}
8282
unsafe { runBlocking { program } }.distinct().size shouldBe 2

0 commit comments

Comments
 (0)