5353import multiprocessing as mp
5454from multiprocessing .connection import wait
5555from multiprocessing .queues import Queue
56+ from multiprocessing import context
57+
5658import threading
5759import weakref
5860from functools import partial
@@ -100,12 +102,20 @@ def _python_exit():
100102 for t , _ in items :
101103 t .join ()
102104
105+
103106# Controls how many more calls than processes will be queued in the call queue.
104107# A smaller number will mean that processes spend more time idle waiting for
105108# work while a larger number will make Future.cancel() succeed less frequently
106109# (Futures in the call queue cannot be cancelled).
107110EXTRA_QUEUED_CALLS = 1
108111
112+ #####
113+ _ForkingPickler = context .reduction .ForkingPickler
114+ PICKLE_NONE = _ForkingPickler .dumps (None )
115+ WORK_ID_SIZE = 8
116+ WORK_ID_ENC = "little"
117+ SENTINEL_MSG = b'\x00 '
118+
109119
110120# Hack to embed stringification of remote traceback in local traceback
111121
@@ -149,25 +159,6 @@ def __init__(self, work_id, fn, args, kwargs):
149159 self .kwargs = kwargs
150160
151161
152- class _SafeQueue (Queue ):
153- """Safe Queue set exception to the future object linked to a job"""
154- def __init__ (self , max_size = 0 , * , ctx , pending_work_items ):
155- self .pending_work_items = pending_work_items
156- super ().__init__ (max_size , ctx = ctx )
157-
158- def _on_queue_feeder_error (self , e , obj ):
159- if isinstance (obj , _CallItem ):
160- tb = traceback .format_exception (type (e ), e , e .__traceback__ )
161- e .__cause__ = _RemoteTraceback ('\n """\n {}"""' .format ('' .join (tb )))
162- work_item = self .pending_work_items .pop (obj .work_id , None )
163- # work_item can be None if another process terminated. In this case,
164- # the queue_manager_thread fails all work_items with BrokenProcessPool
165- if work_item is not None :
166- work_item .future .set_exception (e )
167- else :
168- super ()._on_queue_feeder_error (e , obj )
169-
170-
171162def _get_chunks (* iterables , chunksize ):
172163 """ Iterates over zip()ed iterables in chunks. """
173164 it = zip (* iterables )
@@ -192,11 +183,14 @@ def _process_chunk(fn, chunk):
192183def _sendback_result (result_queue , work_id , result = None , exception = None ):
193184 """Safely send back the given result or exception"""
194185 try :
195- result_queue . put ( _ResultItem ( work_id , result = result ,
196- exception = exception ))
186+ serialize_res = _ForkingPickler . dumps (
187+ _ResultItem ( work_id , result = result , exception = exception ))
197188 except BaseException as e :
198- exc = _ExceptionWithTraceback (e , e .__traceback__ )
199- result_queue .put (_ResultItem (work_id , exception = exc ))
189+ serialize_res = _ForkingPickler .dumps (_ResultItem (
190+ work_id , exception = _ExceptionWithTraceback (e , e .__traceback__ )
191+ ))
192+ result_queue ._put_bytes (work_id .to_bytes (WORK_ID_SIZE , WORK_ID_ENC ) +
193+ serialize_res )
200194
201195
202196def _process_worker (call_queue , result_queue , initializer , initargs ):
@@ -221,18 +215,21 @@ def _process_worker(call_queue, result_queue, initializer, initargs):
221215 # mark the pool broken
222216 return
223217 while True :
224- call_item = call_queue .get (block = True )
225- if call_item is None :
218+ serialized_item = call_queue ._get_bytes (block = True )
219+ if serialized_item == SENTINEL_MSG :
226220 # Wake up queue management thread
227- result_queue .put (os .getpid ())
221+ result_queue ._put_bytes (
222+ os .getpid ().to_bytes (WORK_ID_SIZE , WORK_ID_ENC ))
228223 return
224+ work_id = int .from_bytes (serialized_item [:WORK_ID_SIZE ], WORK_ID_ENC )
225+ call_item = None
229226 try :
227+ call_item = _ForkingPickler .loads (serialized_item [WORK_ID_SIZE :])
230228 r = call_item .fn (* call_item .args , ** call_item .kwargs )
229+ _sendback_result (result_queue , work_id , result = r )
231230 except BaseException as e :
232231 exc = _ExceptionWithTraceback (e , e .__traceback__ )
233- _sendback_result (result_queue , call_item .work_id , exception = exc )
234- else :
235- _sendback_result (result_queue , call_item .work_id , result = r )
232+ _sendback_result (result_queue , work_id , exception = exc )
236233
237234 # Liberate the resource as soon as possible, to avoid holding onto
238235 # open files or shared memory that is not needed anymore
@@ -267,14 +264,27 @@ def _add_call_item_to_queue(pending_work_items,
267264 work_item = pending_work_items [work_id ]
268265
269266 if work_item .future .set_running_or_notify_cancel ():
270- call_queue .put (_CallItem (work_id ,
271- work_item .fn ,
272- work_item .args ,
273- work_item .kwargs ),
274- block = True )
275- else :
276- del pending_work_items [work_id ]
277- continue
267+ call_item = _CallItem (work_id , work_item .fn , work_item .args ,
268+ work_item .kwargs )
269+ try :
270+ msg = _ForkingPickler .dumps (call_item )
271+ except BaseException as e :
272+ tb = traceback .format_exception (
273+ type (e ), e , e .__traceback__ )
274+ e .__cause__ = _RemoteTraceback (
275+ '\n """\n {}"""' .format ('' .join (tb )))
276+ # work_item can be None if a process terminated and the
277+ # executor is broken
278+ if work_item is not None :
279+ work_item .future .set_exception (e )
280+ del work_item
281+
282+ del pending_work_items [work_id ]
283+ continue
284+ call_queue ._put_bytes (
285+ work_id .to_bytes (WORK_ID_SIZE , WORK_ID_ENC ) + msg ,
286+ block = True )
287+
278288
279289
280290def _queue_management_worker (executor_reference ,
@@ -321,7 +331,7 @@ def shutdown_worker():
321331 while n_sentinels_sent < n_children_to_stop and n_children_alive > 0 :
322332 for i in range (n_children_to_stop - n_sentinels_sent ):
323333 try :
324- call_queue .put_nowait ( None )
334+ call_queue ._put_bytes ( SENTINEL_MSG , block = False )
325335 n_sentinels_sent += 1
326336 except Full :
327337 break
@@ -352,19 +362,22 @@ def shutdown_worker():
352362 ready = wait (readers + worker_sentinels )
353363
354364 cause = None
355- is_broken = True
365+ thread_wakeup . clear ()
356366 if result_reader in ready :
357367 try :
358- result_item = result_reader .recv ()
359- is_broken = False
368+ serialize_res = result_reader .recv_bytes ()
369+ work_id = int .from_bytes (serialize_res [:WORK_ID_SIZE ],
370+ WORK_ID_ENC )
371+ result_item = work_id
372+ if len (serialize_res ) > WORK_ID_SIZE :
373+ result_item = _ForkingPickler .loads (
374+ serialize_res [WORK_ID_SIZE :])
360375 except BaseException as e :
361- cause = traceback .format_exception (type (e ), e , e .__traceback__ )
362-
376+ result_item = _ResultItem (work_id , exception = e )
363377 elif wakeup_reader in ready :
364378 is_broken = False
365379 result_item = None
366- thread_wakeup .clear ()
367- if is_broken :
380+ else :
368381 # Mark the process pool broken so that submits fail right now.
369382 executor = executor_reference ()
370383 if executor is not None :
@@ -531,9 +544,7 @@ def __init__(self, max_workers=None, mp_context=None,
531544 # prevent the worker processes from idling. But don't make it too big
532545 # because futures in the call queue cannot be cancelled.
533546 queue_size = self ._max_workers + EXTRA_QUEUED_CALLS
534- self ._call_queue = _SafeQueue (
535- max_size = queue_size , ctx = self ._mp_context ,
536- pending_work_items = self ._pending_work_items )
547+ self ._call_queue = Queue (queue_size , ctx = self ._mp_context )
537548 # Killed worker processes can produce spurious "broken pipe"
538549 # tracebacks in the queue's own worker thread. But we detect killed
539550 # processes anyway, so silence the tracebacks.
0 commit comments