-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathparallel-phases.ts
More file actions
84 lines (76 loc) · 2.8 KB
/
parallel-phases.ts
File metadata and controls
84 lines (76 loc) · 2.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
/**
* parallel-phases — run several phases concurrently as one composite phase.
*
* The framework treats pipelines as an ordered array, which covers linear
* flow, conditional skip (`intentGate`), and self-iteration
* (`synthesizeWithFollowup`). The one DAG shape it doesn't natively express
* is "run two independent branches at the same time, then continue when
* both finish." That's what this pattern is for.
*
* Semantics:
* - Sub-phases share the parent `ctx`. If two branches both write to the
* same field, last-write-wins. Keep branches' ctx writes disjoint.
* - Events from all branches interleave into the composite phase's
* output stream in arrival order.
* - If a sub-phase throws, the composite re-throws (after letting other
* branches finish what they're doing — no in-flight cancellation).
* - If a sub-phase sets `ctx.stop`, sibling branches still run to
* completion. The orchestrator's stop check fires AFTER the composite
* phase returns, halting subsequent top-level phases.
*
* For data-dependent fan-in, write each branch's output to a distinct ctx
* field; a downstream phase reads them all via `requireCtx`. That's a
* complete DAG-edge expression without a graph framework.
*/
import type { BasePipelineContext, Phase, PipelineEvent } from '../phase.js';
export function parallelPhases<
TCtx extends BasePipelineContext,
TEvent = PipelineEvent,
>(
phaseName: string,
phases: ReadonlyArray<Phase<TCtx, TEvent>>,
): Phase<TCtx, TEvent> {
return {
name: phaseName,
async *run(ctx) {
if (phases.length === 0) return;
// Producer/consumer: each sub-phase pushes events into a shared queue;
// the composite generator drains the queue and yields downstream.
const queue: TEvent[] = [];
let resolveWaiter: (() => void) | null = null;
const wake = () => {
const fn = resolveWaiter;
resolveWaiter = null;
fn?.();
};
const wait = () => new Promise<void>((r) => (resolveWaiter = r));
let errored: unknown = null;
let running = phases.length;
const drain = async (phase: Phase<TCtx, TEvent>): Promise<void> => {
try {
for await (const ev of phase.run(ctx)) {
queue.push(ev);
wake();
if (errored) return;
}
} catch (err) {
if (!errored) errored = err;
} finally {
running--;
wake();
}
};
const allDone = Promise.all(phases.map(drain));
while (running > 0 || queue.length > 0) {
if (errored && queue.length === 0) break;
if (queue.length === 0) {
await Promise.race([wait(), allDone]);
continue;
}
yield queue.shift()!;
}
await allDone;
if (errored) throw errored;
},
};
}