mirror of
				https://github.com/python/cpython.git
				synced 2025-11-04 07:31:38 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			427 lines
		
	
	
	
		
			17 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			427 lines
		
	
	
	
		
			17 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
# Copyright 2009 Brian Quinlan. All Rights Reserved.
 | 
						|
# Licensed to PSF under a Contributor Agreement.
 | 
						|
 | 
						|
"""Implements ProcessPoolExecutor.
 | 
						|
 | 
						|
The follow diagram and text describe the data-flow through the system:
 | 
						|
 | 
						|
|======================= In-process =====================|== Out-of-process ==|
 | 
						|
 | 
						|
+----------+     +----------+       +--------+     +-----------+    +---------+
 | 
						|
|          |  => | Work Ids |    => |        |  => | Call Q    | => |         |
 | 
						|
|          |     +----------+       |        |     +-----------+    |         |
 | 
						|
|          |     | ...      |       |        |     | ...       |    |         |
 | 
						|
|          |     | 6        |       |        |     | 5, call() |    |         |
 | 
						|
|          |     | 7        |       |        |     | ...       |    |         |
 | 
						|
| Process  |     | ...      |       | Local  |     +-----------+    | Process |
 | 
						|
|  Pool    |     +----------+       | Worker |                      |  #1..n  |
 | 
						|
| Executor |                        | Thread |                      |         |
 | 
						|
|          |     +----------- +     |        |     +-----------+    |         |
 | 
						|
|          | <=> | Work Items | <=> |        | <=  | Result Q  | <= |         |
 | 
						|
|          |     +------------+     |        |     +-----------+    |         |
 | 
						|
|          |     | 6: call()  |     |        |     | ...       |    |         |
 | 
						|
|          |     |    future  |     |        |     | 4, result |    |         |
 | 
						|
|          |     | ...        |     |        |     | 3, except |    |         |
 | 
						|
+----------+     +------------+     +--------+     +-----------+    +---------+
 | 
						|
 | 
						|
Executor.submit() called:
 | 
						|
- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict
 | 
						|
- adds the id of the _WorkItem to the "Work Ids" queue
 | 
						|
 | 
						|
Local worker thread:
 | 
						|
- reads work ids from the "Work Ids" queue and looks up the corresponding
 | 
						|
  WorkItem from the "Work Items" dict: if the work item has been cancelled then
 | 
						|
  it is simply removed from the dict, otherwise it is repackaged as a
 | 
						|
  _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q"
 | 
						|
  until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because
 | 
						|
  calls placed in the "Call Q" can no longer be cancelled with Future.cancel().
 | 
						|
- reads _ResultItems from "Result Q", updates the future stored in the
 | 
						|
  "Work Items" dict and deletes the dict entry
 | 
						|
 | 
						|
Process #1..n:
 | 
						|
- reads _CallItems from "Call Q", executes the calls, and puts the resulting
 | 
						|
  _ResultItems in "Result Q"
 | 
						|
"""
 | 
						|
 | 
						|
__author__ = 'Brian Quinlan (brian@sweetapp.com)'
 | 
						|
 | 
						|
import atexit
 | 
						|
import os
 | 
						|
from concurrent.futures import _base
 | 
						|
import queue
 | 
						|
from queue import Full
 | 
						|
import multiprocessing
 | 
						|
from multiprocessing import SimpleQueue
 | 
						|
from multiprocessing.connection import wait
 | 
						|
import threading
 | 
						|
import weakref
 | 
						|
 | 
						|
# Workers are created as daemon threads and processes. This is done to allow the
 | 
						|
# interpreter to exit when there are still idle processes in a
 | 
						|
# ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However,
 | 
						|
# allowing workers to die with the interpreter has two undesirable properties:
 | 
						|
#   - The workers would still be running during interpretor shutdown,
 | 
						|
#     meaning that they would fail in unpredictable ways.
 | 
						|
#   - The workers could be killed while evaluating a work item, which could
 | 
						|
#     be bad if the callable being evaluated has external side-effects e.g.
 | 
						|
#     writing to a file.
 | 
						|
#
 | 
						|
# To work around this problem, an exit handler is installed which tells the
 | 
						|
# workers to exit when their work queues are empty and then waits until the
 | 
						|
# threads/processes finish.
 | 
						|
 | 
						|
_threads_queues = weakref.WeakKeyDictionary()
 | 
						|
_shutdown = False
 | 
						|
 | 
						|
def _python_exit():
 | 
						|
    global _shutdown
 | 
						|
    _shutdown = True
 | 
						|
    items = list(_threads_queues.items())
 | 
						|
    for t, q in items:
 | 
						|
        q.put(None)
 | 
						|
    for t, q in items:
 | 
						|
        t.join()
 | 
						|
 | 
						|
# Controls how many more calls than processes will be queued in the call queue.
 | 
						|
# A smaller number will mean that processes spend more time idle waiting for
 | 
						|
# work while a larger number will make Future.cancel() succeed less frequently
 | 
						|
# (Futures in the call queue cannot be cancelled).
 | 
						|
EXTRA_QUEUED_CALLS = 1
 | 
						|
 | 
						|
class _WorkItem(object):
 | 
						|
    def __init__(self, future, fn, args, kwargs):
 | 
						|
        self.future = future
 | 
						|
        self.fn = fn
 | 
						|
        self.args = args
 | 
						|
        self.kwargs = kwargs
 | 
						|
 | 
						|
class _ResultItem(object):
 | 
						|
    def __init__(self, work_id, exception=None, result=None):
 | 
						|
        self.work_id = work_id
 | 
						|
        self.exception = exception
 | 
						|
        self.result = result
 | 
						|
 | 
						|
class _CallItem(object):
 | 
						|
    def __init__(self, work_id, fn, args, kwargs):
 | 
						|
        self.work_id = work_id
 | 
						|
        self.fn = fn
 | 
						|
        self.args = args
 | 
						|
        self.kwargs = kwargs
 | 
						|
 | 
						|
def _process_worker(call_queue, result_queue):
 | 
						|
    """Evaluates calls from call_queue and places the results in result_queue.
 | 
						|
 | 
						|
    This worker is run in a separate process.
 | 
						|
 | 
						|
    Args:
 | 
						|
        call_queue: A multiprocessing.Queue of _CallItems that will be read and
 | 
						|
            evaluated by the worker.
 | 
						|
        result_queue: A multiprocessing.Queue of _ResultItems that will written
 | 
						|
            to by the worker.
 | 
						|
        shutdown: A multiprocessing.Event that will be set as a signal to the
 | 
						|
            worker that it should exit when call_queue is empty.
 | 
						|
    """
 | 
						|
    while True:
 | 
						|
        call_item = call_queue.get(block=True)
 | 
						|
        if call_item is None:
 | 
						|
            # Wake up queue management thread
 | 
						|
            result_queue.put(os.getpid())
 | 
						|
            return
 | 
						|
        try:
 | 
						|
            r = call_item.fn(*call_item.args, **call_item.kwargs)
 | 
						|
        except BaseException as e:
 | 
						|
            result_queue.put(_ResultItem(call_item.work_id,
 | 
						|
                                         exception=e))
 | 
						|
        else:
 | 
						|
            result_queue.put(_ResultItem(call_item.work_id,
 | 
						|
                                         result=r))
 | 
						|
 | 
						|
def _add_call_item_to_queue(pending_work_items,
 | 
						|
                            work_ids,
 | 
						|
                            call_queue):
 | 
						|
    """Fills call_queue with _WorkItems from pending_work_items.
 | 
						|
 | 
						|
    This function never blocks.
 | 
						|
 | 
						|
    Args:
 | 
						|
        pending_work_items: A dict mapping work ids to _WorkItems e.g.
 | 
						|
            {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
 | 
						|
        work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids
 | 
						|
            are consumed and the corresponding _WorkItems from
 | 
						|
            pending_work_items are transformed into _CallItems and put in
 | 
						|
            call_queue.
 | 
						|
        call_queue: A multiprocessing.Queue that will be filled with _CallItems
 | 
						|
            derived from _WorkItems.
 | 
						|
    """
 | 
						|
    while True:
 | 
						|
        if call_queue.full():
 | 
						|
            return
 | 
						|
        try:
 | 
						|
            work_id = work_ids.get(block=False)
 | 
						|
        except queue.Empty:
 | 
						|
            return
 | 
						|
        else:
 | 
						|
            work_item = pending_work_items[work_id]
 | 
						|
 | 
						|
            if work_item.future.set_running_or_notify_cancel():
 | 
						|
                call_queue.put(_CallItem(work_id,
 | 
						|
                                         work_item.fn,
 | 
						|
                                         work_item.args,
 | 
						|
                                         work_item.kwargs),
 | 
						|
                               block=True)
 | 
						|
            else:
 | 
						|
                del pending_work_items[work_id]
 | 
						|
                continue
 | 
						|
 | 
						|
def _queue_management_worker(executor_reference,
 | 
						|
                             processes,
 | 
						|
                             pending_work_items,
 | 
						|
                             work_ids_queue,
 | 
						|
                             call_queue,
 | 
						|
                             result_queue):
 | 
						|
    """Manages the communication between this process and the worker processes.
 | 
						|
 | 
						|
    This function is run in a local thread.
 | 
						|
 | 
						|
    Args:
 | 
						|
        executor_reference: A weakref.ref to the ProcessPoolExecutor that owns
 | 
						|
            this thread. Used to determine if the ProcessPoolExecutor has been
 | 
						|
            garbage collected and that this function can exit.
 | 
						|
        process: A list of the multiprocessing.Process instances used as
 | 
						|
            workers.
 | 
						|
        pending_work_items: A dict mapping work ids to _WorkItems e.g.
 | 
						|
            {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
 | 
						|
        work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]).
 | 
						|
        call_queue: A multiprocessing.Queue that will be filled with _CallItems
 | 
						|
            derived from _WorkItems for processing by the process workers.
 | 
						|
        result_queue: A multiprocessing.Queue of _ResultItems generated by the
 | 
						|
            process workers.
 | 
						|
    """
 | 
						|
    executor = None
 | 
						|
 | 
						|
    def shutting_down():
 | 
						|
        return _shutdown or executor is None or executor._shutdown_thread
 | 
						|
 | 
						|
    def shutdown_worker():
 | 
						|
        # This is an upper bound
 | 
						|
        nb_children_alive = sum(p.is_alive() for p in processes.values())
 | 
						|
        for i in range(0, nb_children_alive):
 | 
						|
            call_queue.put_nowait(None)
 | 
						|
        # Release the queue's resources as soon as possible.
 | 
						|
        call_queue.close()
 | 
						|
        # If .join() is not called on the created processes then
 | 
						|
        # some multiprocessing.Queue methods may deadlock on Mac OS X.
 | 
						|
        for p in processes.values():
 | 
						|
            p.join()
 | 
						|
 | 
						|
    reader = result_queue._reader
 | 
						|
 | 
						|
    while True:
 | 
						|
        _add_call_item_to_queue(pending_work_items,
 | 
						|
                                work_ids_queue,
 | 
						|
                                call_queue)
 | 
						|
 | 
						|
        sentinels = [p.sentinel for p in processes.values()]
 | 
						|
        assert sentinels
 | 
						|
        ready = wait([reader] + sentinels)
 | 
						|
        if reader in ready:
 | 
						|
            result_item = reader.recv()
 | 
						|
        else:
 | 
						|
            # Mark the process pool broken so that submits fail right now.
 | 
						|
            executor = executor_reference()
 | 
						|
            if executor is not None:
 | 
						|
                executor._broken = True
 | 
						|
                executor._shutdown_thread = True
 | 
						|
                executor = None
 | 
						|
            # All futures in flight must be marked failed
 | 
						|
            for work_id, work_item in pending_work_items.items():
 | 
						|
                work_item.future.set_exception(
 | 
						|
                    BrokenProcessPool(
 | 
						|
                        "A process in the process pool was "
 | 
						|
                        "terminated abruptly while the future was "
 | 
						|
                        "running or pending."
 | 
						|
                    ))
 | 
						|
                # Delete references to object. See issue16284
 | 
						|
                del work_item
 | 
						|
            pending_work_items.clear()
 | 
						|
            # Terminate remaining workers forcibly: the queues or their
 | 
						|
            # locks may be in a dirty state and block forever.
 | 
						|
            for p in processes.values():
 | 
						|
                p.terminate()
 | 
						|
            shutdown_worker()
 | 
						|
            return
 | 
						|
        if isinstance(result_item, int):
 | 
						|
            # Clean shutdown of a worker using its PID
 | 
						|
            # (avoids marking the executor broken)
 | 
						|
            assert shutting_down()
 | 
						|
            p = processes.pop(result_item)
 | 
						|
            p.join()
 | 
						|
            if not processes:
 | 
						|
                shutdown_worker()
 | 
						|
                return
 | 
						|
        elif result_item is not None:
 | 
						|
            work_item = pending_work_items.pop(result_item.work_id, None)
 | 
						|
            # work_item can be None if another process terminated (see above)
 | 
						|
            if work_item is not None:
 | 
						|
                if result_item.exception:
 | 
						|
                    work_item.future.set_exception(result_item.exception)
 | 
						|
                else:
 | 
						|
                    work_item.future.set_result(result_item.result)
 | 
						|
                # Delete references to object. See issue16284
 | 
						|
                del work_item
 | 
						|
        # Check whether we should start shutting down.
 | 
						|
        executor = executor_reference()
 | 
						|
        # No more work items can be added if:
 | 
						|
        #   - The interpreter is shutting down OR
 | 
						|
        #   - The executor that owns this worker has been collected OR
 | 
						|
        #   - The executor that owns this worker has been shutdown.
 | 
						|
        if shutting_down():
 | 
						|
            try:
 | 
						|
                # Since no new work items can be added, it is safe to shutdown
 | 
						|
                # this thread if there are no pending work items.
 | 
						|
                if not pending_work_items:
 | 
						|
                    shutdown_worker()
 | 
						|
                    return
 | 
						|
            except Full:
 | 
						|
                # This is not a problem: we will eventually be woken up (in
 | 
						|
                # result_queue.get()) and be able to send a sentinel again.
 | 
						|
                pass
 | 
						|
        executor = None
 | 
						|
 | 
						|
_system_limits_checked = False
 | 
						|
_system_limited = None
 | 
						|
def _check_system_limits():
 | 
						|
    global _system_limits_checked, _system_limited
 | 
						|
    if _system_limits_checked:
 | 
						|
        if _system_limited:
 | 
						|
            raise NotImplementedError(_system_limited)
 | 
						|
    _system_limits_checked = True
 | 
						|
    try:
 | 
						|
        nsems_max = os.sysconf("SC_SEM_NSEMS_MAX")
 | 
						|
    except (AttributeError, ValueError):
 | 
						|
        # sysconf not available or setting not available
 | 
						|
        return
 | 
						|
    if nsems_max == -1:
 | 
						|
        # indetermined limit, assume that limit is determined
 | 
						|
        # by available memory only
 | 
						|
        return
 | 
						|
    if nsems_max >= 256:
 | 
						|
        # minimum number of semaphores available
 | 
						|
        # according to POSIX
 | 
						|
        return
 | 
						|
    _system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max
 | 
						|
    raise NotImplementedError(_system_limited)
 | 
						|
 | 
						|
 | 
						|
class BrokenProcessPool(RuntimeError):
 | 
						|
    """
 | 
						|
    Raised when a process in a ProcessPoolExecutor terminated abruptly
 | 
						|
    while a future was in the running state.
 | 
						|
    """
 | 
						|
 | 
						|
 | 
						|
class ProcessPoolExecutor(_base.Executor):
 | 
						|
    def __init__(self, max_workers=None):
 | 
						|
        """Initializes a new ProcessPoolExecutor instance.
 | 
						|
 | 
						|
        Args:
 | 
						|
            max_workers: The maximum number of processes that can be used to
 | 
						|
                execute the given calls. If None or not given then as many
 | 
						|
                worker processes will be created as the machine has processors.
 | 
						|
        """
 | 
						|
        _check_system_limits()
 | 
						|
 | 
						|
        if max_workers is None:
 | 
						|
            self._max_workers = os.cpu_count() or 1
 | 
						|
        else:
 | 
						|
            self._max_workers = max_workers
 | 
						|
 | 
						|
        # Make the call queue slightly larger than the number of processes to
 | 
						|
        # prevent the worker processes from idling. But don't make it too big
 | 
						|
        # because futures in the call queue cannot be cancelled.
 | 
						|
        self._call_queue = multiprocessing.Queue(self._max_workers +
 | 
						|
                                                 EXTRA_QUEUED_CALLS)
 | 
						|
        # Killed worker processes can produce spurious "broken pipe"
 | 
						|
        # tracebacks in the queue's own worker thread. But we detect killed
 | 
						|
        # processes anyway, so silence the tracebacks.
 | 
						|
        self._call_queue._ignore_epipe = True
 | 
						|
        self._result_queue = SimpleQueue()
 | 
						|
        self._work_ids = queue.Queue()
 | 
						|
        self._queue_management_thread = None
 | 
						|
        # Map of pids to processes
 | 
						|
        self._processes = {}
 | 
						|
 | 
						|
        # Shutdown is a two-step process.
 | 
						|
        self._shutdown_thread = False
 | 
						|
        self._shutdown_lock = threading.Lock()
 | 
						|
        self._broken = False
 | 
						|
        self._queue_count = 0
 | 
						|
        self._pending_work_items = {}
 | 
						|
 | 
						|
    def _start_queue_management_thread(self):
 | 
						|
        # When the executor gets lost, the weakref callback will wake up
 | 
						|
        # the queue management thread.
 | 
						|
        def weakref_cb(_, q=self._result_queue):
 | 
						|
            q.put(None)
 | 
						|
        if self._queue_management_thread is None:
 | 
						|
            # Start the processes so that their sentinels are known.
 | 
						|
            self._adjust_process_count()
 | 
						|
            self._queue_management_thread = threading.Thread(
 | 
						|
                    target=_queue_management_worker,
 | 
						|
                    args=(weakref.ref(self, weakref_cb),
 | 
						|
                          self._processes,
 | 
						|
                          self._pending_work_items,
 | 
						|
                          self._work_ids,
 | 
						|
                          self._call_queue,
 | 
						|
                          self._result_queue))
 | 
						|
            self._queue_management_thread.daemon = True
 | 
						|
            self._queue_management_thread.start()
 | 
						|
            _threads_queues[self._queue_management_thread] = self._result_queue
 | 
						|
 | 
						|
    def _adjust_process_count(self):
 | 
						|
        for _ in range(len(self._processes), self._max_workers):
 | 
						|
            p = multiprocessing.Process(
 | 
						|
                    target=_process_worker,
 | 
						|
                    args=(self._call_queue,
 | 
						|
                          self._result_queue))
 | 
						|
            p.start()
 | 
						|
            self._processes[p.pid] = p
 | 
						|
 | 
						|
    def submit(self, fn, *args, **kwargs):
 | 
						|
        with self._shutdown_lock:
 | 
						|
            if self._broken:
 | 
						|
                raise BrokenProcessPool('A child process terminated '
 | 
						|
                    'abruptly, the process pool is not usable anymore')
 | 
						|
            if self._shutdown_thread:
 | 
						|
                raise RuntimeError('cannot schedule new futures after shutdown')
 | 
						|
 | 
						|
            f = _base.Future()
 | 
						|
            w = _WorkItem(f, fn, args, kwargs)
 | 
						|
 | 
						|
            self._pending_work_items[self._queue_count] = w
 | 
						|
            self._work_ids.put(self._queue_count)
 | 
						|
            self._queue_count += 1
 | 
						|
            # Wake up queue management thread
 | 
						|
            self._result_queue.put(None)
 | 
						|
 | 
						|
            self._start_queue_management_thread()
 | 
						|
            return f
 | 
						|
    submit.__doc__ = _base.Executor.submit.__doc__
 | 
						|
 | 
						|
    def shutdown(self, wait=True):
 | 
						|
        with self._shutdown_lock:
 | 
						|
            self._shutdown_thread = True
 | 
						|
        if self._queue_management_thread:
 | 
						|
            # Wake up queue management thread
 | 
						|
            self._result_queue.put(None)
 | 
						|
            if wait:
 | 
						|
                self._queue_management_thread.join()
 | 
						|
        # To reduce the risk of opening too many files, remove references to
 | 
						|
        # objects that use file descriptors.
 | 
						|
        self._queue_management_thread = None
 | 
						|
        self._call_queue = None
 | 
						|
        self._result_queue = None
 | 
						|
        self._processes = None
 | 
						|
    shutdown.__doc__ = _base.Executor.shutdown.__doc__
 | 
						|
 | 
						|
atexit.register(_python_exit)
 |