-
-
Notifications
You must be signed in to change notification settings - Fork 345
Expand file tree
/
Copy pathnode_context.py
More file actions
156 lines (118 loc) · 4.28 KB
/
node_context.py
File metadata and controls
156 lines (118 loc) · 4.28 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
import time
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Callable, Literal
from .settings import SettingsParser
class Aborted(Exception):
pass
class Progress(ABC):
@property
@abstractmethod
def aborted(self) -> bool:
"""
Returns whether the current operation was aborted.
"""
@property
@abstractmethod
def paused(self) -> bool:
"""
Returns whether the current operation was paused.
"""
def check_aborted(self) -> None:
"""
Raises an `Aborted` exception if the current operation was aborted. Does nothing otherwise.
"""
if self.aborted:
raise Aborted()
def suspend(self) -> None:
"""
If the operation was aborted, this method will throw an `Aborted` exception.
If the operation is paused, this method will wait until the operation is resumed or aborted.
"""
while True:
self.check_aborted()
if not self.paused:
break
time.sleep(0.1)
@abstractmethod
def set_progress(self, progress: float) -> None:
"""
Sets the progress of the current node execution. `progress` must be a value between 0 and 1.
Raises an `Aborted` exception if the current operation was aborted.
"""
def sub_progress(self, offset: float, length: float) -> "Progress":
"""
Returns a new `NodeProgress` object that represents a sub-progress of the current operation.
The progress range of the sub-progress is defined by `offset` and `length`. `offset` must be a value between 0
and 1, and `length` must be a positive value such that `offset + length <= 1`.
The real progress of the sub-progress is calculated as `offset + progress * length`, where `progress` is the
progress value passed to `set_progress` of the sub-progress.
"""
return _SubProgress(self, offset, length)
@staticmethod
def noop_progress() -> "Progress":
"""
Returns a `Progress` object that does nothing. It is never paused or aborted and does not report any progress.
"""
return _NoopProgress()
class _NoopProgress(Progress):
@property
def aborted(self) -> Literal[False]:
return False
@property
def paused(self) -> Literal[False]:
return False
def check_aborted(self) -> None:
pass
def suspend(self) -> None:
pass
def set_progress(self, progress: float) -> None:
pass
def sub_progress(self, offset: float, length: float) -> "Progress":
return _NoopProgress()
class _SubProgress(Progress):
def __init__(self, parent: Progress, offset: float, length: float):
self._parent = parent
self._offset = offset
self._length = length
@property
def aborted(self) -> bool:
return self._parent.aborted
@property
def paused(self) -> bool:
return self._parent.paused
def check_aborted(self) -> None:
self._parent.check_aborted()
def suspend(self) -> None:
self._parent.suspend()
def set_progress(self, progress: float) -> None:
self._parent.set_progress(self._offset + progress * self._length)
def sub_progress(self, offset: float, length: float) -> "_SubProgress":
return _SubProgress(
self._parent,
offset=self._offset + offset * self._length,
length=length * self._length,
)
class NodeContext(Progress, ABC):
"""
The execution context of the current node.
"""
@property
@abstractmethod
def settings(self) -> SettingsParser:
"""
Returns the settings of the current node execution.
"""
@property
@abstractmethod
def storage_dir(self) -> Path:
"""
The path of a directory where nodes can store files.
This directory persists between node executions, and its contents are shared between different nodes.
"""
@abstractmethod
def add_cleanup(self, fn: Callable[[], None]) -> None:
"""
Registers a function that will be called when the chain execution is finished.
Registering the same function (object) twice will only result in the function being called once.
"""