Skip to content

Commit 6da0a2b

Browse files
committed
ENH customizable serialization process for mp.Queue
1 parent 05565ed commit 6da0a2b

2 files changed

Lines changed: 66 additions & 8 deletions

File tree

Lib/multiprocessing/queues.py

Lines changed: 47 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,22 @@
2727

2828
from .util import debug, info, Finalize, register_after_fork, is_exiting
2929

30+
#
31+
# Sendable Object, with a serialization protocol
32+
#
33+
34+
35+
class _SendableObject(object):
36+
def __init__(self, obj, serialization=None):
37+
self.obj = obj
38+
self.serialization = serialization
39+
40+
def serialize(self):
41+
if self.serialization:
42+
return self.serialization(self.obj)
43+
return self.obj
44+
45+
3046
#
3147
# Queue type using a pipe, buffer and thread
3248
#
@@ -78,17 +94,26 @@ def _after_fork(self):
7894
self._poll = self._reader.poll
7995

8096
def put(self, obj, block=True, timeout=None):
97+
self._put_bytes(obj, block=block, timeout=timeout,
98+
serialization=_ForkingPickler.dumps)
99+
100+
def _put_bytes(self, obj, block=True, timeout=None, serialization=None):
81101
assert not self._closed, "Queue {0!r} has been closed".format(self)
82102
if not self._sem.acquire(block, timeout):
83103
raise Full
84104

85105
with self._notempty:
86106
if self._thread is None:
87107
self._start_thread()
88-
self._buffer.append(obj)
108+
self._buffer.append(_SendableObject(
109+
obj, serialization=serialization))
89110
self._notempty.notify()
90111

91112
def get(self, block=True, timeout=None):
113+
return self._get_bytes(block=block, timeout=timeout,
114+
deserialization=_ForkingPickler.loads)
115+
116+
def _get_bytes(self, block=True, timeout=None, deserialization=None):
92117
if block and timeout is None:
93118
with self._rlock:
94119
res = self._recv_bytes()
@@ -109,8 +134,10 @@ def get(self, block=True, timeout=None):
109134
self._sem.release()
110135
finally:
111136
self._rlock.release()
112-
# unserialize the data after having released the lock
113-
return _ForkingPickler.loads(res)
137+
# un-serialize the data after having released the lock
138+
if deserialization:
139+
return deserialization(res)
140+
return res
114141

115142
def qsize(self):
116143
# Raises NotImplementedError on Mac OSX because of broken sem_getvalue()
@@ -233,7 +260,7 @@ def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe,
233260
return
234261

235262
# serialize the data before acquiring the lock
236-
obj = _ForkingPickler.dumps(obj)
263+
obj = obj.serialize()
237264
if wacquire is None:
238265
send_bytes(obj)
239266
else:
@@ -255,7 +282,7 @@ def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe,
255282
info('error in queue thread: %s', e)
256283
return
257284
else:
258-
onerror(e, obj)
285+
onerror(e, obj.obj)
259286

260287
@staticmethod
261288
def _on_queue_feeder_error(e, obj):
@@ -299,7 +326,8 @@ def put(self, obj, block=True, timeout=None):
299326
with self._notempty, self._cond:
300327
if self._thread is None:
301328
self._start_thread()
302-
self._buffer.append(obj)
329+
self._buffer.append(_SendableObject(
330+
obj, serialization=_ForkingPickler.dumps))
303331
self._unfinished_tasks.release()
304332
self._notempty.notify()
305333

@@ -342,14 +370,25 @@ def __setstate__(self, state):
342370
self._poll = self._reader.poll
343371

344372
def get(self):
373+
# Get the object and deserialize it with the _ForkingPickler
374+
return self._get_bytes(deserialization=_ForkingPickler.loads)
375+
376+
def _get_bytes(self, deserialization=None):
345377
with self._rlock:
346378
res = self._reader.recv_bytes()
347379
# unserialize the data after having released the lock
348-
return _ForkingPickler.loads(res)
380+
if deserialization:
381+
return deserialization(res)
382+
return res
349383

350384
def put(self, obj):
385+
# Get the object and deserialize it with the _ForkingPickler
386+
self._put_bytes(obj, serialization=_ForkingPickler.dumps)
387+
388+
def _put_bytes(self, obj, serialization=None):
351389
# serialize the data before acquiring the lock
352-
obj = _ForkingPickler.dumps(obj)
390+
if serialization:
391+
obj = serialization(obj)
353392
if self._wlock is None:
354393
# writes to a message oriented win32 pipe are atomic
355394
self._writer.send_bytes(obj)

Lib/test/_test_multiprocessing.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import random
1919
import logging
2020
import struct
21+
import pickle
2122
import operator
2223
import weakref
2324
import test.support
@@ -1066,6 +1067,24 @@ def _on_queue_feeder_error(e, obj):
10661067
# Assert that the serialization and the hook have been called correctly
10671068
self.assertTrue(not_serializable_obj.reduce_was_called)
10681069
self.assertTrue(not_serializable_obj.on_queue_feeder_error_was_called)
1070+
1071+
def test_queue_serialization(self):
1072+
# bpo-30006: verify feeder handles exceptions using the
1073+
# _on_queue_feeder_error hook.
1074+
if self.TYPE != 'processes':
1075+
self.skipTest('test not appropriate for {}'.format(self.TYPE))
1076+
1077+
q = self.Queue()
1078+
1079+
# Custom serialization
1080+
def serialization(x):
1081+
return pickle.dumps(x * 2)
1082+
q._put_bytes(21, serialization=serialization)
1083+
self.assertEqual(q.get(), 42)
1084+
1085+
# Custom bytes channels
1086+
q._put_bytes(bytes(42), serialization=None)
1087+
self.assertEqual(q._get_bytes(), bytes(42))
10691088
#
10701089
#
10711090
#

0 commit comments

Comments
 (0)