mirror of
				https://github.com/python/cpython.git
				synced 2025-10-26 03:04:41 +00:00 
			
		
		
		
	 94459fd7dc
			
		
	
	
		94459fd7dc
		
	
	
	
	
		
			
			Fix deadlocks in :class:`concurrent.futures.ProcessPoolExecutor` when task arguments or results cause pickling or unpickling errors. This should make sure that calls to the :class:`ProcessPoolExecutor` API always eventually return.
		
			
				
	
	
		
			659 lines
		
	
	
	
		
			26 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			659 lines
		
	
	
	
		
			26 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| # Copyright 2009 Brian Quinlan. All Rights Reserved.
 | |
| # Licensed to PSF under a Contributor Agreement.
 | |
| 
 | |
| """Implements ProcessPoolExecutor.
 | |
| 
 | |
| The follow diagram and text describe the data-flow through the system:
 | |
| 
 | |
| |======================= In-process =====================|== Out-of-process ==|
 | |
| 
 | |
| +----------+     +----------+       +--------+     +-----------+    +---------+
 | |
| |          |  => | Work Ids |       |        |     | Call Q    |    | Process |
 | |
| |          |     +----------+       |        |     +-----------+    |  Pool   |
 | |
| |          |     | ...      |       |        |     | ...       |    +---------+
 | |
| |          |     | 6        |    => |        |  => | 5, call() | => |         |
 | |
| |          |     | 7        |       |        |     | ...       |    |         |
 | |
| | Process  |     | ...      |       | Local  |     +-----------+    | Process |
 | |
| |  Pool    |     +----------+       | Worker |                      |  #1..n  |
 | |
| | Executor |                        | Thread |                      |         |
 | |
| |          |     +----------- +     |        |     +-----------+    |         |
 | |
| |          | <=> | Work Items | <=> |        | <=  | Result Q  | <= |         |
 | |
| |          |     +------------+     |        |     +-----------+    |         |
 | |
| |          |     | 6: call()  |     |        |     | ...       |    |         |
 | |
| |          |     |    future  |     |        |     | 4, result |    |         |
 | |
| |          |     | ...        |     |        |     | 3, except |    |         |
 | |
| +----------+     +------------+     +--------+     +-----------+    +---------+
 | |
| 
 | |
| Executor.submit() called:
 | |
| - creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict
 | |
| - adds the id of the _WorkItem to the "Work Ids" queue
 | |
| 
 | |
| Local worker thread:
 | |
| - reads work ids from the "Work Ids" queue and looks up the corresponding
 | |
|   WorkItem from the "Work Items" dict: if the work item has been cancelled then
 | |
|   it is simply removed from the dict, otherwise it is repackaged as a
 | |
|   _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q"
 | |
|   until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because
 | |
|   calls placed in the "Call Q" can no longer be cancelled with Future.cancel().
 | |
| - reads _ResultItems from "Result Q", updates the future stored in the
 | |
|   "Work Items" dict and deletes the dict entry
 | |
| 
 | |
| Process #1..n:
 | |
| - reads _CallItems from "Call Q", executes the calls, and puts the resulting
 | |
|   _ResultItems in "Result Q"
 | |
| """
 | |
| 
 | |
| __author__ = 'Brian Quinlan (brian@sweetapp.com)'
 | |
| 
 | |
| import atexit
 | |
| import os
 | |
| from concurrent.futures import _base
 | |
| import queue
 | |
| from queue import Full
 | |
| import multiprocessing as mp
 | |
| from multiprocessing.connection import wait
 | |
| from multiprocessing.queues import Queue
 | |
| import threading
 | |
| import weakref
 | |
| from functools import partial
 | |
| import itertools
 | |
| import traceback
 | |
| 
 | |
| # Workers are created as daemon threads and processes. This is done to allow the
 | |
| # interpreter to exit when there are still idle processes in a
 | |
| # ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However,
 | |
| # allowing workers to die with the interpreter has two undesirable properties:
 | |
| #   - The workers would still be running during interpreter shutdown,
 | |
| #     meaning that they would fail in unpredictable ways.
 | |
| #   - The workers could be killed while evaluating a work item, which could
 | |
| #     be bad if the callable being evaluated has external side-effects e.g.
 | |
| #     writing to a file.
 | |
| #
 | |
| # To work around this problem, an exit handler is installed which tells the
 | |
| # workers to exit when their work queues are empty and then waits until the
 | |
| # threads/processes finish.
 | |
| 
 | |
| _threads_wakeups = weakref.WeakKeyDictionary()
 | |
| _global_shutdown = False
 | |
| 
 | |
| 
 | |
| class _ThreadWakeup:
 | |
|     __slot__ = ["_state"]
 | |
| 
 | |
|     def __init__(self):
 | |
|         self._reader, self._writer = mp.Pipe(duplex=False)
 | |
| 
 | |
|     def wakeup(self):
 | |
|         self._writer.send_bytes(b"")
 | |
| 
 | |
|     def clear(self):
 | |
|         while self._reader.poll():
 | |
|             self._reader.recv_bytes()
 | |
| 
 | |
| 
 | |
| def _python_exit():
 | |
|     global _global_shutdown
 | |
|     _global_shutdown = True
 | |
|     items = list(_threads_wakeups.items())
 | |
|     for _, thread_wakeup in items:
 | |
|         thread_wakeup.wakeup()
 | |
|     for t, _ in items:
 | |
|         t.join()
 | |
| 
 | |
| # Controls how many more calls than processes will be queued in the call queue.
 | |
| # A smaller number will mean that processes spend more time idle waiting for
 | |
| # work while a larger number will make Future.cancel() succeed less frequently
 | |
| # (Futures in the call queue cannot be cancelled).
 | |
| EXTRA_QUEUED_CALLS = 1
 | |
| 
 | |
| 
 | |
| # Hack to embed stringification of remote traceback in local traceback
 | |
| 
 | |
| class _RemoteTraceback(Exception):
 | |
|     def __init__(self, tb):
 | |
|         self.tb = tb
 | |
|     def __str__(self):
 | |
|         return self.tb
 | |
| 
 | |
| class _ExceptionWithTraceback:
 | |
|     def __init__(self, exc, tb):
 | |
|         tb = traceback.format_exception(type(exc), exc, tb)
 | |
|         tb = ''.join(tb)
 | |
|         self.exc = exc
 | |
|         self.tb = '\n"""\n%s"""' % tb
 | |
|     def __reduce__(self):
 | |
|         return _rebuild_exc, (self.exc, self.tb)
 | |
| 
 | |
| def _rebuild_exc(exc, tb):
 | |
|     exc.__cause__ = _RemoteTraceback(tb)
 | |
|     return exc
 | |
| 
 | |
| class _WorkItem(object):
 | |
|     def __init__(self, future, fn, args, kwargs):
 | |
|         self.future = future
 | |
|         self.fn = fn
 | |
|         self.args = args
 | |
|         self.kwargs = kwargs
 | |
| 
 | |
| class _ResultItem(object):
 | |
|     def __init__(self, work_id, exception=None, result=None):
 | |
|         self.work_id = work_id
 | |
|         self.exception = exception
 | |
|         self.result = result
 | |
| 
 | |
| class _CallItem(object):
 | |
|     def __init__(self, work_id, fn, args, kwargs):
 | |
|         self.work_id = work_id
 | |
|         self.fn = fn
 | |
|         self.args = args
 | |
|         self.kwargs = kwargs
 | |
| 
 | |
| 
 | |
| class _SafeQueue(Queue):
 | |
|     """Safe Queue set exception to the future object linked to a job"""
 | |
|     def __init__(self, max_size=0, *, ctx, pending_work_items):
 | |
|         self.pending_work_items = pending_work_items
 | |
|         super().__init__(max_size, ctx=ctx)
 | |
| 
 | |
|     def _on_queue_feeder_error(self, e, obj):
 | |
|         if isinstance(obj, _CallItem):
 | |
|             tb = traceback.format_exception(type(e), e, e.__traceback__)
 | |
|             e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb)))
 | |
|             work_item = self.pending_work_items.pop(obj.work_id, None)
 | |
|             # work_item can be None if another process terminated. In this case,
 | |
|             # the queue_manager_thread fails all work_items with BrokenProcessPool
 | |
|             if work_item is not None:
 | |
|                 work_item.future.set_exception(e)
 | |
|         else:
 | |
|             super()._on_queue_feeder_error(e, obj)
 | |
| 
 | |
| 
 | |
| def _get_chunks(*iterables, chunksize):
 | |
|     """ Iterates over zip()ed iterables in chunks. """
 | |
|     it = zip(*iterables)
 | |
|     while True:
 | |
|         chunk = tuple(itertools.islice(it, chunksize))
 | |
|         if not chunk:
 | |
|             return
 | |
|         yield chunk
 | |
| 
 | |
| def _process_chunk(fn, chunk):
 | |
|     """ Processes a chunk of an iterable passed to map.
 | |
| 
 | |
|     Runs the function passed to map() on a chunk of the
 | |
|     iterable passed to map.
 | |
| 
 | |
|     This function is run in a separate process.
 | |
| 
 | |
|     """
 | |
|     return [fn(*args) for args in chunk]
 | |
| 
 | |
| 
 | |
| def _sendback_result(result_queue, work_id, result=None, exception=None):
 | |
|     """Safely send back the given result or exception"""
 | |
|     try:
 | |
|         result_queue.put(_ResultItem(work_id, result=result,
 | |
|                                      exception=exception))
 | |
|     except BaseException as e:
 | |
|         exc = _ExceptionWithTraceback(e, e.__traceback__)
 | |
|         result_queue.put(_ResultItem(work_id, exception=exc))
 | |
| 
 | |
| 
 | |
| def _process_worker(call_queue, result_queue, initializer, initargs):
 | |
|     """Evaluates calls from call_queue and places the results in result_queue.
 | |
| 
 | |
|     This worker is run in a separate process.
 | |
| 
 | |
|     Args:
 | |
|         call_queue: A ctx.Queue of _CallItems that will be read and
 | |
|             evaluated by the worker.
 | |
|         result_queue: A ctx.Queue of _ResultItems that will written
 | |
|             to by the worker.
 | |
|         initializer: A callable initializer, or None
 | |
|         initargs: A tuple of args for the initializer
 | |
|     """
 | |
|     if initializer is not None:
 | |
|         try:
 | |
|             initializer(*initargs)
 | |
|         except BaseException:
 | |
|             _base.LOGGER.critical('Exception in initializer:', exc_info=True)
 | |
|             # The parent will notice that the process stopped and
 | |
|             # mark the pool broken
 | |
|             return
 | |
|     while True:
 | |
|         call_item = call_queue.get(block=True)
 | |
|         if call_item is None:
 | |
|             # Wake up queue management thread
 | |
|             result_queue.put(os.getpid())
 | |
|             return
 | |
|         try:
 | |
|             r = call_item.fn(*call_item.args, **call_item.kwargs)
 | |
|         except BaseException as e:
 | |
|             exc = _ExceptionWithTraceback(e, e.__traceback__)
 | |
|             _sendback_result(result_queue, call_item.work_id, exception=exc)
 | |
|         else:
 | |
|             _sendback_result(result_queue, call_item.work_id, result=r)
 | |
| 
 | |
|         # Liberate the resource as soon as possible, to avoid holding onto
 | |
|         # open files or shared memory that is not needed anymore
 | |
|         del call_item
 | |
| 
 | |
| 
 | |
| def _add_call_item_to_queue(pending_work_items,
 | |
|                             work_ids,
 | |
|                             call_queue):
 | |
|     """Fills call_queue with _WorkItems from pending_work_items.
 | |
| 
 | |
|     This function never blocks.
 | |
| 
 | |
|     Args:
 | |
|         pending_work_items: A dict mapping work ids to _WorkItems e.g.
 | |
|             {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
 | |
|         work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids
 | |
|             are consumed and the corresponding _WorkItems from
 | |
|             pending_work_items are transformed into _CallItems and put in
 | |
|             call_queue.
 | |
|         call_queue: A multiprocessing.Queue that will be filled with _CallItems
 | |
|             derived from _WorkItems.
 | |
|     """
 | |
|     while True:
 | |
|         if call_queue.full():
 | |
|             return
 | |
|         try:
 | |
|             work_id = work_ids.get(block=False)
 | |
|         except queue.Empty:
 | |
|             return
 | |
|         else:
 | |
|             work_item = pending_work_items[work_id]
 | |
| 
 | |
|             if work_item.future.set_running_or_notify_cancel():
 | |
|                 call_queue.put(_CallItem(work_id,
 | |
|                                          work_item.fn,
 | |
|                                          work_item.args,
 | |
|                                          work_item.kwargs),
 | |
|                                block=True)
 | |
|             else:
 | |
|                 del pending_work_items[work_id]
 | |
|                 continue
 | |
| 
 | |
| 
 | |
| def _queue_management_worker(executor_reference,
 | |
|                              processes,
 | |
|                              pending_work_items,
 | |
|                              work_ids_queue,
 | |
|                              call_queue,
 | |
|                              result_queue,
 | |
|                              thread_wakeup):
 | |
|     """Manages the communication between this process and the worker processes.
 | |
| 
 | |
|     This function is run in a local thread.
 | |
| 
 | |
|     Args:
 | |
|         executor_reference: A weakref.ref to the ProcessPoolExecutor that owns
 | |
|             this thread. Used to determine if the ProcessPoolExecutor has been
 | |
|             garbage collected and that this function can exit.
 | |
|         process: A list of the ctx.Process instances used as
 | |
|             workers.
 | |
|         pending_work_items: A dict mapping work ids to _WorkItems e.g.
 | |
|             {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
 | |
|         work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]).
 | |
|         call_queue: A ctx.Queue that will be filled with _CallItems
 | |
|             derived from _WorkItems for processing by the process workers.
 | |
|         result_queue: A ctx.SimpleQueue of _ResultItems generated by the
 | |
|             process workers.
 | |
|         thread_wakeup: A _ThreadWakeup to allow waking up the
 | |
|             queue_manager_thread from the main Thread and avoid deadlocks
 | |
|             caused by permanently locked queues.
 | |
|     """
 | |
|     executor = None
 | |
| 
 | |
|     def shutting_down():
 | |
|         return (_global_shutdown or executor is None
 | |
|                 or executor._shutdown_thread)
 | |
| 
 | |
|     def shutdown_worker():
 | |
|         # This is an upper bound on the number of children alive.
 | |
|         n_children_alive = sum(p.is_alive() for p in processes.values())
 | |
|         n_children_to_stop = n_children_alive
 | |
|         n_sentinels_sent = 0
 | |
|         # Send the right number of sentinels, to make sure all children are
 | |
|         # properly terminated.
 | |
|         while n_sentinels_sent < n_children_to_stop and n_children_alive > 0:
 | |
|             for i in range(n_children_to_stop - n_sentinels_sent):
 | |
|                 try:
 | |
|                     call_queue.put_nowait(None)
 | |
|                     n_sentinels_sent += 1
 | |
|                 except Full:
 | |
|                     break
 | |
|             n_children_alive = sum(p.is_alive() for p in processes.values())
 | |
| 
 | |
|         # Release the queue's resources as soon as possible.
 | |
|         call_queue.close()
 | |
|         # If .join() is not called on the created processes then
 | |
|         # some ctx.Queue methods may deadlock on Mac OS X.
 | |
|         for p in processes.values():
 | |
|             p.join()
 | |
| 
 | |
|     result_reader = result_queue._reader
 | |
|     wakeup_reader = thread_wakeup._reader
 | |
|     readers = [result_reader, wakeup_reader]
 | |
| 
 | |
|     while True:
 | |
|         _add_call_item_to_queue(pending_work_items,
 | |
|                                 work_ids_queue,
 | |
|                                 call_queue)
 | |
| 
 | |
|         # Wait for a result to be ready in the result_queue while checking
 | |
|         # that all worker processes are still running, or for a wake up
 | |
|         # signal send. The wake up signals come either from new tasks being
 | |
|         # submitted, from the executor being shutdown/gc-ed, or from the
 | |
|         # shutdown of the python interpreter.
 | |
|         worker_sentinels = [p.sentinel for p in processes.values()]
 | |
|         ready = wait(readers + worker_sentinels)
 | |
| 
 | |
|         cause = None
 | |
|         is_broken = True
 | |
|         if result_reader in ready:
 | |
|             try:
 | |
|                 result_item = result_reader.recv()
 | |
|                 is_broken = False
 | |
|             except BaseException as e:
 | |
|                 cause = traceback.format_exception(type(e), e, e.__traceback__)
 | |
| 
 | |
|         elif wakeup_reader in ready:
 | |
|             is_broken = False
 | |
|             result_item = None
 | |
|         thread_wakeup.clear()
 | |
|         if is_broken:
 | |
|             # Mark the process pool broken so that submits fail right now.
 | |
|             executor = executor_reference()
 | |
|             if executor is not None:
 | |
|                 executor._broken = ('A child process terminated '
 | |
|                                     'abruptly, the process pool is not '
 | |
|                                     'usable anymore')
 | |
|                 executor._shutdown_thread = True
 | |
|                 executor = None
 | |
|             bpe = BrokenProcessPool("A process in the process pool was "
 | |
|                                     "terminated abruptly while the future was "
 | |
|                                     "running or pending.")
 | |
|             if cause is not None:
 | |
|                 bpe.__cause__ = _RemoteTraceback(
 | |
|                     f"\n'''\n{''.join(cause)}'''")
 | |
|             # All futures in flight must be marked failed
 | |
|             for work_id, work_item in pending_work_items.items():
 | |
|                 work_item.future.set_exception(bpe)
 | |
|                 # Delete references to object. See issue16284
 | |
|                 del work_item
 | |
|             pending_work_items.clear()
 | |
|             # Terminate remaining workers forcibly: the queues or their
 | |
|             # locks may be in a dirty state and block forever.
 | |
|             for p in processes.values():
 | |
|                 p.terminate()
 | |
|             shutdown_worker()
 | |
|             return
 | |
|         if isinstance(result_item, int):
 | |
|             # Clean shutdown of a worker using its PID
 | |
|             # (avoids marking the executor broken)
 | |
|             assert shutting_down()
 | |
|             p = processes.pop(result_item)
 | |
|             p.join()
 | |
|             if not processes:
 | |
|                 shutdown_worker()
 | |
|                 return
 | |
|         elif result_item is not None:
 | |
|             work_item = pending_work_items.pop(result_item.work_id, None)
 | |
|             # work_item can be None if another process terminated (see above)
 | |
|             if work_item is not None:
 | |
|                 if result_item.exception:
 | |
|                     work_item.future.set_exception(result_item.exception)
 | |
|                 else:
 | |
|                     work_item.future.set_result(result_item.result)
 | |
|                 # Delete references to object. See issue16284
 | |
|                 del work_item
 | |
|             # Delete reference to result_item
 | |
|             del result_item
 | |
| 
 | |
|         # Check whether we should start shutting down.
 | |
|         executor = executor_reference()
 | |
|         # No more work items can be added if:
 | |
|         #   - The interpreter is shutting down OR
 | |
|         #   - The executor that owns this worker has been collected OR
 | |
|         #   - The executor that owns this worker has been shutdown.
 | |
|         if shutting_down():
 | |
|             try:
 | |
|                 # Since no new work items can be added, it is safe to shutdown
 | |
|                 # this thread if there are no pending work items.
 | |
|                 if not pending_work_items:
 | |
|                     shutdown_worker()
 | |
|                     return
 | |
|             except Full:
 | |
|                 # This is not a problem: we will eventually be woken up (in
 | |
|                 # result_queue.get()) and be able to send a sentinel again.
 | |
|                 pass
 | |
|         executor = None
 | |
| 
 | |
| 
 | |
| _system_limits_checked = False
 | |
| _system_limited = None
 | |
| 
 | |
| 
 | |
| def _check_system_limits():
 | |
|     global _system_limits_checked, _system_limited
 | |
|     if _system_limits_checked:
 | |
|         if _system_limited:
 | |
|             raise NotImplementedError(_system_limited)
 | |
|     _system_limits_checked = True
 | |
|     try:
 | |
|         nsems_max = os.sysconf("SC_SEM_NSEMS_MAX")
 | |
|     except (AttributeError, ValueError):
 | |
|         # sysconf not available or setting not available
 | |
|         return
 | |
|     if nsems_max == -1:
 | |
|         # indetermined limit, assume that limit is determined
 | |
|         # by available memory only
 | |
|         return
 | |
|     if nsems_max >= 256:
 | |
|         # minimum number of semaphores available
 | |
|         # according to POSIX
 | |
|         return
 | |
|     _system_limited = ("system provides too few semaphores (%d"
 | |
|                        " available, 256 necessary)" % nsems_max)
 | |
|     raise NotImplementedError(_system_limited)
 | |
| 
 | |
| 
 | |
| def _chain_from_iterable_of_lists(iterable):
 | |
|     """
 | |
|     Specialized implementation of itertools.chain.from_iterable.
 | |
|     Each item in *iterable* should be a list.  This function is
 | |
|     careful not to keep references to yielded objects.
 | |
|     """
 | |
|     for element in iterable:
 | |
|         element.reverse()
 | |
|         while element:
 | |
|             yield element.pop()
 | |
| 
 | |
| 
 | |
| class BrokenProcessPool(_base.BrokenExecutor):
 | |
|     """
 | |
|     Raised when a process in a ProcessPoolExecutor terminated abruptly
 | |
|     while a future was in the running state.
 | |
|     """
 | |
| 
 | |
| 
 | |
| class ProcessPoolExecutor(_base.Executor):
 | |
|     def __init__(self, max_workers=None, mp_context=None,
 | |
|                  initializer=None, initargs=()):
 | |
|         """Initializes a new ProcessPoolExecutor instance.
 | |
| 
 | |
|         Args:
 | |
|             max_workers: The maximum number of processes that can be used to
 | |
|                 execute the given calls. If None or not given then as many
 | |
|                 worker processes will be created as the machine has processors.
 | |
|             mp_context: A multiprocessing context to launch the workers. This
 | |
|                 object should provide SimpleQueue, Queue and Process.
 | |
|             initializer: An callable used to initialize worker processes.
 | |
|             initargs: A tuple of arguments to pass to the initializer.
 | |
|         """
 | |
|         _check_system_limits()
 | |
| 
 | |
|         if max_workers is None:
 | |
|             self._max_workers = os.cpu_count() or 1
 | |
|         else:
 | |
|             if max_workers <= 0:
 | |
|                 raise ValueError("max_workers must be greater than 0")
 | |
| 
 | |
|             self._max_workers = max_workers
 | |
| 
 | |
|         if mp_context is None:
 | |
|             mp_context = mp.get_context()
 | |
|         self._mp_context = mp_context
 | |
| 
 | |
|         if initializer is not None and not callable(initializer):
 | |
|             raise TypeError("initializer must be a callable")
 | |
|         self._initializer = initializer
 | |
|         self._initargs = initargs
 | |
| 
 | |
|         # Management thread
 | |
|         self._queue_management_thread = None
 | |
| 
 | |
|         # Map of pids to processes
 | |
|         self._processes = {}
 | |
| 
 | |
|         # Shutdown is a two-step process.
 | |
|         self._shutdown_thread = False
 | |
|         self._shutdown_lock = threading.Lock()
 | |
|         self._broken = False
 | |
|         self._queue_count = 0
 | |
|         self._pending_work_items = {}
 | |
| 
 | |
|         # Create communication channels for the executor
 | |
|         # Make the call queue slightly larger than the number of processes to
 | |
|         # prevent the worker processes from idling. But don't make it too big
 | |
|         # because futures in the call queue cannot be cancelled.
 | |
|         queue_size = self._max_workers + EXTRA_QUEUED_CALLS
 | |
|         self._call_queue = _SafeQueue(
 | |
|             max_size=queue_size, ctx=self._mp_context,
 | |
|             pending_work_items=self._pending_work_items)
 | |
|         # Killed worker processes can produce spurious "broken pipe"
 | |
|         # tracebacks in the queue's own worker thread. But we detect killed
 | |
|         # processes anyway, so silence the tracebacks.
 | |
|         self._call_queue._ignore_epipe = True
 | |
|         self._result_queue = mp_context.SimpleQueue()
 | |
|         self._work_ids = queue.Queue()
 | |
| 
 | |
|         # _ThreadWakeup is a communication channel used to interrupt the wait
 | |
|         # of the main loop of queue_manager_thread from another thread (e.g.
 | |
|         # when calling executor.submit or executor.shutdown). We do not use the
 | |
|         # _result_queue to send the wakeup signal to the queue_manager_thread
 | |
|         # as it could result in a deadlock if a worker process dies with the
 | |
|         # _result_queue write lock still acquired.
 | |
|         self._queue_management_thread_wakeup = _ThreadWakeup()
 | |
| 
 | |
|     def _start_queue_management_thread(self):
 | |
|         if self._queue_management_thread is None:
 | |
|             # When the executor gets garbarge collected, the weakref callback
 | |
|             # will wake up the queue management thread so that it can terminate
 | |
|             # if there is no pending work item.
 | |
|             def weakref_cb(_,
 | |
|                            thread_wakeup=self._queue_management_thread_wakeup):
 | |
|                 mp.util.debug('Executor collected: triggering callback for'
 | |
|                               ' QueueManager wakeup')
 | |
|                 thread_wakeup.wakeup()
 | |
|             # Start the processes so that their sentinels are known.
 | |
|             self._adjust_process_count()
 | |
|             self._queue_management_thread = threading.Thread(
 | |
|                 target=_queue_management_worker,
 | |
|                 args=(weakref.ref(self, weakref_cb),
 | |
|                       self._processes,
 | |
|                       self._pending_work_items,
 | |
|                       self._work_ids,
 | |
|                       self._call_queue,
 | |
|                       self._result_queue,
 | |
|                       self._queue_management_thread_wakeup),
 | |
|                 name="QueueManagerThread")
 | |
|             self._queue_management_thread.daemon = True
 | |
|             self._queue_management_thread.start()
 | |
|             _threads_wakeups[self._queue_management_thread] = \
 | |
|                 self._queue_management_thread_wakeup
 | |
| 
 | |
|     def _adjust_process_count(self):
 | |
|         for _ in range(len(self._processes), self._max_workers):
 | |
|             p = self._mp_context.Process(
 | |
|                 target=_process_worker,
 | |
|                 args=(self._call_queue,
 | |
|                       self._result_queue,
 | |
|                       self._initializer,
 | |
|                       self._initargs))
 | |
|             p.start()
 | |
|             self._processes[p.pid] = p
 | |
| 
 | |
|     def submit(self, fn, *args, **kwargs):
 | |
|         with self._shutdown_lock:
 | |
|             if self._broken:
 | |
|                 raise BrokenProcessPool(self._broken)
 | |
|             if self._shutdown_thread:
 | |
|                 raise RuntimeError('cannot schedule new futures after shutdown')
 | |
| 
 | |
|             f = _base.Future()
 | |
|             w = _WorkItem(f, fn, args, kwargs)
 | |
| 
 | |
|             self._pending_work_items[self._queue_count] = w
 | |
|             self._work_ids.put(self._queue_count)
 | |
|             self._queue_count += 1
 | |
|             # Wake up queue management thread
 | |
|             self._queue_management_thread_wakeup.wakeup()
 | |
| 
 | |
|             self._start_queue_management_thread()
 | |
|             return f
 | |
|     submit.__doc__ = _base.Executor.submit.__doc__
 | |
| 
 | |
|     def map(self, fn, *iterables, timeout=None, chunksize=1):
 | |
|         """Returns an iterator equivalent to map(fn, iter).
 | |
| 
 | |
|         Args:
 | |
|             fn: A callable that will take as many arguments as there are
 | |
|                 passed iterables.
 | |
|             timeout: The maximum number of seconds to wait. If None, then there
 | |
|                 is no limit on the wait time.
 | |
|             chunksize: If greater than one, the iterables will be chopped into
 | |
|                 chunks of size chunksize and submitted to the process pool.
 | |
|                 If set to one, the items in the list will be sent one at a time.
 | |
| 
 | |
|         Returns:
 | |
|             An iterator equivalent to: map(func, *iterables) but the calls may
 | |
|             be evaluated out-of-order.
 | |
| 
 | |
|         Raises:
 | |
|             TimeoutError: If the entire result iterator could not be generated
 | |
|                 before the given timeout.
 | |
|             Exception: If fn(*args) raises for any values.
 | |
|         """
 | |
|         if chunksize < 1:
 | |
|             raise ValueError("chunksize must be >= 1.")
 | |
| 
 | |
|         results = super().map(partial(_process_chunk, fn),
 | |
|                               _get_chunks(*iterables, chunksize=chunksize),
 | |
|                               timeout=timeout)
 | |
|         return _chain_from_iterable_of_lists(results)
 | |
| 
 | |
|     def shutdown(self, wait=True):
 | |
|         with self._shutdown_lock:
 | |
|             self._shutdown_thread = True
 | |
|         if self._queue_management_thread:
 | |
|             # Wake up queue management thread
 | |
|             self._queue_management_thread_wakeup.wakeup()
 | |
|             if wait:
 | |
|                 self._queue_management_thread.join()
 | |
|         # To reduce the risk of opening too many files, remove references to
 | |
|         # objects that use file descriptors.
 | |
|         self._queue_management_thread = None
 | |
|         if self._call_queue is not None:
 | |
|             self._call_queue.close()
 | |
|             if wait:
 | |
|                 self._call_queue.join_thread()
 | |
|             self._call_queue = None
 | |
|         self._result_queue = None
 | |
|         self._processes = None
 | |
|     shutdown.__doc__ = _base.Executor.shutdown.__doc__
 | |
| 
 | |
| atexit.register(_python_exit)
 |