forked from pybricks/pybricks-code
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathindex.ts
More file actions
110 lines (99 loc) · 3.19 KB
/
index.ts
File metadata and controls
110 lines (99 loc) · 3.19 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
// SPDX-License-Identifier: MIT
// Copyright (c) 2020 The Pybricks Authors
import { END, MulticastChannel, Saga, Task, runSaga, stdChannel } from 'redux-saga';
import { Action } from '../src/actions';
import { RootState } from '../src/reducers';
export class AsyncSaga {
private channel: MulticastChannel<Action>;
private dispatches: (Action | END)[];
private takers: { put: (action: Action | END) => void }[];
private state: Partial<RootState>;
private task: Task;
public constructor(saga: Saga, context?: Record<string, unknown>) {
this.channel = stdChannel();
this.dispatches = [];
this.takers = [];
this.state = {};
this.task = runSaga(
{
channel: this.channel,
dispatch: this.dispatch.bind(this),
getState: () => this.state,
onError: (e, _i): void => {
throw e;
},
context,
},
saga,
);
}
public numPending(): number {
return this.dispatches.length;
}
public put(action: Action): void {
this.channel.put(action);
}
public take(): Promise<Action> {
const next = this.dispatches.shift();
if (next === undefined) {
// if there are no dispatches queued, then queue the taker to be
// completed later
return new Promise((resolve, reject) => {
this.takers.push({
put: (a: Action | END): void => {
if (a.type === END.type) {
reject();
} else {
resolve(a);
}
},
});
});
}
// otherwise complete immediately
if (next.type === END.type) {
return Promise.reject();
}
return Promise.resolve(next);
}
public setState(state: Partial<RootState>): void {
this.state = state;
}
public async end(): Promise<void> {
this.task.cancel();
await this.task.toPromise();
if (this.dispatches.some((x) => x.type !== END.type)) {
throw Error(
`unhandled dispatches remain: ${JSON.stringify(this.dispatches)}`,
);
}
}
private dispatch(action: Action | END): Action | END {
const taker = this.takers.shift();
if (taker === undefined) {
// if there are no takers waiting, the queue the action
this.dispatches.push(action);
} else {
// otherwise complete the promise
taker.put(action);
}
return action;
}
}
export function delay(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
/**
* Looks up a nested property in an object.
* @param obj The object
* @param id The property path
*/
export function lookup(obj: unknown, id: string): string | undefined {
const value = id
.split('.')
.reduce((pv, cv) => pv && (pv as Record<string, unknown>)[cv], obj);
if (typeof value === 'string') {
return value;
}
return undefined;
}