Skip to content

Commit 897a8fd

Browse files
authored
Merge pull request #778 from powerapi-ng/refactor/actor-picklable-args
feat!: Modernize actor architecture and rework dependent components
2 parents 3abdccc + c057670 commit 897a8fd

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+2568
-2476
lines changed

src/powerapi/actor/__init__.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
2929

3030
from powerapi.actor.socket_interface import SocketInterface, NotConnectedException
31-
from powerapi.actor.supervisor import Supervisor, ActorInitError, ActorAlreadySupervisedException
32-
from powerapi.actor.supervisor import CrashConfigureError, FailConfigureError
31+
from powerapi.actor.message import Message, StartMessage, OKMessage, ErrorMessage, PoisonPillMessage
3332
from powerapi.actor.state import State
34-
from powerapi.actor.actor import Actor, InitializationException
33+
from powerapi.actor.actor import Actor, ActorProxy, InitializationException
34+
from powerapi.actor.supervisor import Supervisor, ActorAlreadySupervisedException, ActorInitializationError

src/powerapi/actor/actor.py

Lines changed: 140 additions & 156 deletions
Large diffs are not rendered by default.
Lines changed: 106 additions & 177 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright (c) 2018, INRIA
1+
# Copyright (c) 2018, Inria
22
# Copyright (c) 2018, University of Lille
33
# All rights reserved.
44
#
@@ -27,246 +27,175 @@
2727
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
2828
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
2929

30-
import ctypes
31-
import logging
32-
import multiprocessing
30+
from hashlib import blake2b
31+
from pathlib import Path
32+
from typing import Any
3333

3434
import zmq
3535

3636
from powerapi.exception import PowerAPIException
3737

38-
LOCAL_ADDR = 'tcp://127.0.0.1'
39-
4038

4139
class NotConnectedException(PowerAPIException):
4240
"""
43-
Exception raised when attempting to send/receinve a message on a socket
44-
that is not conected
41+
Exception raised when attempting to do an operation on a disconnected socket.
4542
"""
4643

4744

4845
class SocketInterface:
4946
"""
50-
Interface to handle comunication to/from the actor
51-
52-
general methods :
53-
54-
- :meth:`send_control <powerapi.actor.socket_interface.SocketInterface.send_control>`
55-
56-
client interface methods :
57-
58-
- :meth:`connect_data <powerapi.actor.socket_interface.SocketInterface.connect_data>`
59-
- :meth:`connect_control <powerapi.actor.socket_interface.SocketInterface.connect_control>`
60-
- :meth:`send_data <powerapi.actor.socket_interface.SocketInterface.send_data>`
61-
- :meth:`close <powerapi.actor.socket_interface.SocketInterface.close>`
62-
63-
server interface methods :
64-
65-
- :meth:`setup <powerapi.actor.socket_interface.SocketInterface.setup>`
66-
- :meth:`receive <powerapi.actor.socket_interface.SocketInterface.receive>`
67-
- :meth:`close <powerapi.actor.socket_interface.SocketInterface.close>`
47+
Interface to handle communication between actors.
6848
"""
6949

70-
def __init__(self, name, timeout):
50+
def __init__(self, actor_name: str, timeout: int | None):
7151
"""
72-
:param str name: name of the actor using this interface
73-
:param int timeout: time in millisecond to wait for a message
52+
:param str actor_name: Name of the actor whose endpoint is being accessed
53+
:param int timeout: Maximum time, in milliseconds, to wait for an operation
7454
"""
75-
self.logger = logging.getLogger(name)
76-
77-
#: (int): Time in millisecond to wait for a message before execute
78-
#: timeout_handler
55+
self.actor_name = actor_name
7956
self.timeout = timeout
8057

81-
#: (str): Address of the pull socket
82-
self.pull_socket_address = None
83-
84-
#: (str): Address of the control socket
85-
self.control_socket_address = None
86-
87-
#: (zmq.Poller): ZMQ Poller for read many socket at same time
88-
self.poller = zmq.Poller()
89-
90-
#: (zmq.Socket): ZMQ Pull socket for receiving data message
91-
self.pull_socket = None
92-
93-
#: (zmq.Socket): ZMQ Pair socket for receiving control message
94-
self.control_socket = None
58+
self.data_socket_filepath = self._generate_socket_path(actor_name, 'data')
59+
self.control_socket_filepath = self._generate_socket_path(actor_name, 'control')
9560

96-
# This socket is used to connect to the pull socket of this actor. It
97-
# won't be created on the actor's process but on the process that want
98-
# to connect to the pull socket of this actor
99-
#: (zmq.Socket): ZMQ Push socket for sending message to this actor
100-
self.push_socket = None
61+
self._control_socket: zmq.Socket | None = None
62+
self._data_socket: zmq.Socket | None = None
63+
self._sockets_poller: zmq.Poller | None = None
64+
self._is_endpoint: bool = False
10165

102-
# Shared memory used to communicate the port used to bind sockets
103-
self._pull_port = multiprocessing.Value(ctypes.c_int)
104-
self._ctrl_port = multiprocessing.Value(ctypes.c_int)
105-
self._values_available = multiprocessing.Event()
106-
107-
self._pull_port.value = -1
108-
self._ctrl_port.value = -1
109-
110-
def setup(self):
66+
@staticmethod
67+
def _generate_socket_path(actor_name: str, socket_purpose: str, basedir: str = '/tmp') -> Path:
11168
"""
112-
Initialize sockets and send the selected port number to the father
113-
process with a Pipe
69+
Generate a deterministic filesystem path for an IPC socket.
70+
:param actor_name: Name of the actor whose endpoint is being accessed
71+
:param socket_purpose: Purpose of the socket (control, data, etc.)
72+
:param basedir: Base directory used to store the socket
11473
"""
115-
# create the pull socket (to communicate with this actor, others
116-
# process have to connect a push socket to this socket)
117-
self.pull_socket, pull_port = self._create_socket(zmq.PULL, -1)
118-
119-
# create the control socket (to control this actor, a process have to
120-
# connect a pair socket to this socket with the `control` method)
121-
self.control_socket, ctrl_port = self._create_socket(zmq.PAIR, 0)
122-
123-
self.pull_socket_address = LOCAL_ADDR + ':' + str(pull_port)
124-
self.control_socket_address = LOCAL_ADDR + ':' + str(ctrl_port)
74+
key = f'{actor_name}:{socket_purpose}'
75+
digest = blake2b(key.encode('utf-8'), digest_size=16).hexdigest()
76+
return Path(basedir) / f'powerapi-ipc-{digest}'
12577

126-
self._pull_port.value = pull_port
127-
self._ctrl_port.value = ctrl_port
128-
self._values_available.set()
129-
130-
def _create_socket(self, socket_type, linger_value):
131-
"""
132-
Create a socket of the given type, bind it to a random port and
133-
register it to the poller
134-
135-
:param int socket_type: type of the socket to open
136-
:param int linger_value: -1 mean wait for receive all msg and block
137-
closing 0 mean hardkill the socket even if msg
138-
are still here.
139-
:return (zmq.Socket, int): the initialized socket and the port where the
140-
socket is bound
78+
def setup(self):
14179
"""
142-
socket = zmq.Context.instance().socket(socket_type)
143-
socket.setsockopt(zmq.LINGER, linger_value)
144-
port_number = socket.bind_to_random_port(LOCAL_ADDR)
145-
self.poller.register(socket, zmq.POLLIN)
146-
self.logger.debug('Bind socket to %s:%d', LOCAL_ADDR, port_number)
147-
return (socket, port_number)
148-
149-
def receive(self):
80+
Initializes the socket interface.
81+
This method should only be called by the actor acting as endpoint.
15082
"""
151-
Block until a message was received (or until timeout) an return the
152-
received messages
83+
self._is_endpoint = True
15384

154-
:return: the list of received messages or None if timeout
155-
:rtype: a list of Object or None
156-
"""
157-
events = dict(self.poller.poll(self.timeout))
85+
self._control_socket = zmq.Context.instance().socket(zmq.DEALER)
86+
self._control_socket.setsockopt(zmq.LINGER, 0)
87+
self._control_socket.bind(f'ipc://{self.control_socket_filepath}')
15888

159-
if len(events) == 0:
160-
return None
89+
self._data_socket = zmq.Context.instance().socket(zmq.PULL)
90+
self._data_socket.bind(f'ipc://{self.data_socket_filepath}')
16191

162-
return self._recv_serialized(next(iter(events)))
92+
self._sockets_poller = zmq.Poller()
93+
self._sockets_poller.register(self._control_socket, zmq.POLLIN)
94+
self._sockets_poller.register(self._data_socket, zmq.POLLIN)
16395

164-
def receive_control(self, timeout):
96+
def close(self) -> None:
16597
"""
166-
Block until a message was received on the control canal (client side)
167-
(or until timeout) an return the received messages
168-
169-
:return: the received message or an None if timeout
170-
:rtype: a list of Object
171-
98+
Closes the socket interface.
17299
"""
173-
if self.control_socket is None:
174-
raise NotConnectedException
175-
176-
event = self.control_socket.poll(timeout)
177-
178-
if event == 0:
179-
return None
100+
self._sockets_poller = None
180101

181-
return self._recv_serialized(self.control_socket)
182-
183-
def close(self):
184-
"""
185-
Close all socket handle by this interface
186-
"""
187-
if self.push_socket is not None:
188-
self.push_socket.close()
102+
if self._control_socket is not None:
103+
self._control_socket.close()
104+
self._control_socket = None
189105

190-
if self.pull_socket is not None:
191-
self.pull_socket.close()
106+
if self._data_socket is not None:
107+
self._data_socket.close()
108+
self._data_socket = None
192109

193-
if self.control_socket is not None:
194-
self.control_socket.close()
110+
if self._is_endpoint:
111+
# Calling `close()` on the bound sockets doesn't remove the file when using the `ipc` transport protocol.
112+
# Manually unlinking files after calling `close()` is the most reliable way to fix it.
113+
self.control_socket_filepath.unlink(missing_ok=True)
114+
self.data_socket_filepath.unlink(missing_ok=True)
195115

196116
@staticmethod
197-
def _send_serialized(socket: zmq.Socket, msg):
117+
def _send_serialized(socket: zmq.Socket, msg: Any) -> None:
198118
"""
199-
Send a message to the given socket.
119+
Sends a serialized message to the given socket.
200120
:param socket: Socket to use
201-
:param msg: Message to send
121+
:param msg: Message to serialize and send
202122
"""
203123
socket.send_pyobj(msg)
204124

205125
@staticmethod
206-
def _recv_serialized(socket: zmq.Socket):
126+
def _recv_serialized(socket: zmq.Socket) -> Any:
207127
"""
208-
Receive and returns a message from the given socket.
128+
Receive, deserialize and returns a message from the given socket.
209129
:param socket: Socket to use
210130
:return: Message received
211131
"""
212132
return socket.recv_pyobj()
213133

214-
def connect_data(self):
134+
def connect_control(self) -> None:
215135
"""
216-
Connect to the pull socket of this actor
217-
218-
Open a push socket on the process that want to communicate with this
219-
actor
136+
Connect to the control socket of the actor.
137+
This method should only be called by actors that wants to communicate with another actor's endpoint.
138+
"""
139+
self._control_socket = zmq.Context.instance().socket(zmq.DEALER)
140+
self._control_socket.setsockopt(zmq.LINGER, 0)
141+
self._control_socket.connect(f'ipc://{self.control_socket_filepath}')
142+
self._control_socket.poll(zmq.POLLIN | zmq.POLLOUT) # Very important, prevents synchronization problems.
220143

221-
this method shouldn't be called if socket interface was not initialized
222-
with the setup method
144+
def receive_control(self, timeout: int | None = None) -> Any:
223145
"""
146+
Receive a message from the control socket of the actor.
147+
:param timeout: Timeout of the operation in milliseconds, if None block indefinitely
148+
:return: Message received
149+
"""
150+
if self._control_socket is None:
151+
raise NotConnectedException()
224152

225-
if self.pull_socket_address is None:
226-
self._values_available.wait()
227-
self.pull_socket_address = LOCAL_ADDR + ':' + str(self._pull_port.value)
228-
self.control_socket_address = LOCAL_ADDR + ':' + str(self._ctrl_port.value)
153+
if self._control_socket.poll(timeout):
154+
return self._recv_serialized(self._control_socket)
229155

230-
self.push_socket = zmq.Context.instance().socket(zmq.PUSH)
231-
self.push_socket.setsockopt(zmq.LINGER, -1)
232-
self.push_socket.connect(self.pull_socket_address)
233-
self.logger.debug('Connected data socket to %s', self.pull_socket_address)
156+
return None
234157

235-
def connect_control(self):
158+
def send_control(self, msg: Any) -> None:
236159
"""
237-
Connect to the control socket of this actor
238-
239-
Open a pair socket on the process that want to control this actor
240-
this method shouldn't be called if socket interface was not initialized
241-
with the setup method
160+
Send a message to the control socket of the actor.
161+
:param msg: Message to send
242162
"""
243-
if self.pull_socket_address is None:
244-
self._values_available.wait()
245-
self.pull_socket_address = LOCAL_ADDR + ':' + str(self._pull_port.value)
246-
self.control_socket_address = LOCAL_ADDR + ':' + str(self._ctrl_port.value)
163+
if self._control_socket is None:
164+
raise NotConnectedException()
247165

248-
self.control_socket = zmq.Context.instance().socket(zmq.PAIR)
249-
self.control_socket.setsockopt(zmq.LINGER, 0)
250-
self.control_socket.connect(self.control_socket_address)
251-
self.logger.debug('Connected control socket to %s', self.control_socket_address)
166+
self._send_serialized(self._control_socket, msg)
252167

253-
def send_control(self, msg):
168+
def connect_data(self) -> None:
254169
"""
255-
Send a message on the control canal
256-
257-
:param Object msg: message to send
170+
Connect to the data socket of the actor.
171+
This method should only be called by actors that wants to communicate with another actor's endpoint.
258172
"""
173+
self._data_socket = zmq.Context.instance().socket(zmq.PUSH)
174+
self._data_socket.setsockopt(zmq.LINGER, -1)
175+
self._data_socket.connect(f'ipc://{self.data_socket_filepath}')
176+
self._data_socket.poll(zmq.POLLOUT) # Very important, prevents synchronization problems.
259177

260-
if self.control_socket is None:
178+
def send_data(self, msg: Any) -> None:
179+
"""
180+
Send a message to the data socket of the actor.
181+
:param msg: Message to send
182+
"""
183+
if self._data_socket is None:
261184
raise NotConnectedException()
262-
self._send_serialized(self.control_socket, msg)
263185

264-
def send_data(self, msg):
265-
"""
266-
Send a message on data canal
186+
self._send_serialized(self._data_socket, msg)
267187

268-
:param Object msg: message to send
188+
def receive(self, timeout: int | None = None) -> Any:
189+
"""
190+
Receive a message from either the control or the data sockets.
191+
This method should only be called by an actor acting as endpoint.
192+
:param timeout: Timeout of the operation in milliseconds, if None block indefinitely
193+
:return: The received message or None if the timeout is reached
269194
"""
270-
if self.push_socket is None:
195+
if self._sockets_poller is None:
271196
raise NotConnectedException()
272-
self._send_serialized(self.push_socket, msg)
197+
198+
for socket, _ in self._sockets_poller.poll(timeout):
199+
return self._recv_serialized(socket)
200+
201+
return None

src/powerapi/actor/state.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ def get_corresponding_handler(self, msg: Message) -> Handler:
6666
"""
6767
try:
6868
return self.handlers[msg.__class__.__name__]
69-
except ValueError as e:
69+
except KeyError as e:
7070
raise UnknownMessageTypeException() from e
7171

7272
def add_handler(self, message_type: type[Message], handler: Handler, include_subclasses: bool = True):

0 commit comments

Comments
 (0)