mirror of
				https://github.com/python/cpython.git
				synced 2025-11-03 23:21:29 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			230 lines
		
	
	
	
		
			8.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			230 lines
		
	
	
	
		
			8.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
# Copyright 2009 Brian Quinlan. All Rights Reserved.
 | 
						|
# Licensed to PSF under a Contributor Agreement.
 | 
						|
 | 
						|
"""Implements ThreadPoolExecutor."""
 | 
						|
 | 
						|
__author__ = 'Brian Quinlan (brian@sweetapp.com)'
 | 
						|
 | 
						|
from concurrent.futures import _base
 | 
						|
import itertools
 | 
						|
import queue
 | 
						|
import threading
 | 
						|
import types
 | 
						|
import weakref
 | 
						|
import os
 | 
						|
 | 
						|
 | 
						|
_threads_queues = weakref.WeakKeyDictionary()
 | 
						|
_shutdown = False
 | 
						|
# Lock that ensures that new workers are not created while the interpreter is
 | 
						|
# shutting down. Must be held while mutating _threads_queues and _shutdown.
 | 
						|
_global_shutdown_lock = threading.Lock()
 | 
						|
 | 
						|
def _python_exit():
 | 
						|
    global _shutdown
 | 
						|
    with _global_shutdown_lock:
 | 
						|
        _shutdown = True
 | 
						|
    items = list(_threads_queues.items())
 | 
						|
    for t, q in items:
 | 
						|
        q.put(None)
 | 
						|
    for t, q in items:
 | 
						|
        t.join()
 | 
						|
 | 
						|
# Register for `_python_exit()` to be called just before joining all
 | 
						|
# non-daemon threads. This is used instead of `atexit.register()` for
 | 
						|
# compatibility with subinterpreters, which no longer support daemon threads.
 | 
						|
# See bpo-39812 for context.
 | 
						|
threading._register_atexit(_python_exit)
 | 
						|
 | 
						|
 | 
						|
class _WorkItem(object):
 | 
						|
    def __init__(self, future, fn, args, kwargs):
 | 
						|
        self.future = future
 | 
						|
        self.fn = fn
 | 
						|
        self.args = args
 | 
						|
        self.kwargs = kwargs
 | 
						|
 | 
						|
    def run(self):
 | 
						|
        if not self.future.set_running_or_notify_cancel():
 | 
						|
            return
 | 
						|
 | 
						|
        try:
 | 
						|
            result = self.fn(*self.args, **self.kwargs)
 | 
						|
        except BaseException as exc:
 | 
						|
            self.future.set_exception(exc)
 | 
						|
            # Break a reference cycle with the exception 'exc'
 | 
						|
            self = None
 | 
						|
        else:
 | 
						|
            self.future.set_result(result)
 | 
						|
 | 
						|
    __class_getitem__ = classmethod(types.GenericAlias)
 | 
						|
 | 
						|
 | 
						|
def _worker(executor_reference, work_queue, initializer, initargs):
 | 
						|
    if initializer is not None:
 | 
						|
        try:
 | 
						|
            initializer(*initargs)
 | 
						|
        except BaseException:
 | 
						|
            _base.LOGGER.critical('Exception in initializer:', exc_info=True)
 | 
						|
            executor = executor_reference()
 | 
						|
            if executor is not None:
 | 
						|
                executor._initializer_failed()
 | 
						|
            return
 | 
						|
    try:
 | 
						|
        while True:
 | 
						|
            work_item = work_queue.get(block=True)
 | 
						|
            if work_item is not None:
 | 
						|
                work_item.run()
 | 
						|
                # Delete references to object. See issue16284
 | 
						|
                del work_item
 | 
						|
 | 
						|
                # attempt to increment idle count
 | 
						|
                executor = executor_reference()
 | 
						|
                if executor is not None:
 | 
						|
                    executor._idle_semaphore.release()
 | 
						|
                del executor
 | 
						|
                continue
 | 
						|
 | 
						|
            executor = executor_reference()
 | 
						|
            # Exit if:
 | 
						|
            #   - The interpreter is shutting down OR
 | 
						|
            #   - The executor that owns the worker has been collected OR
 | 
						|
            #   - The executor that owns the worker has been shutdown.
 | 
						|
            if _shutdown or executor is None or executor._shutdown:
 | 
						|
                # Flag the executor as shutting down as early as possible if it
 | 
						|
                # is not gc-ed yet.
 | 
						|
                if executor is not None:
 | 
						|
                    executor._shutdown = True
 | 
						|
                # Notice other workers
 | 
						|
                work_queue.put(None)
 | 
						|
                return
 | 
						|
            del executor
 | 
						|
    except BaseException:
 | 
						|
        _base.LOGGER.critical('Exception in worker', exc_info=True)
 | 
						|
 | 
						|
 | 
						|
class BrokenThreadPool(_base.BrokenExecutor):
 | 
						|
    """
 | 
						|
    Raised when a worker thread in a ThreadPoolExecutor failed initializing.
 | 
						|
    """
 | 
						|
 | 
						|
 | 
						|
class ThreadPoolExecutor(_base.Executor):
 | 
						|
 | 
						|
    # Used to assign unique thread names when thread_name_prefix is not supplied.
 | 
						|
    _counter = itertools.count().__next__
 | 
						|
 | 
						|
    def __init__(self, max_workers=None, thread_name_prefix='',
 | 
						|
                 initializer=None, initargs=()):
 | 
						|
        """Initializes a new ThreadPoolExecutor instance.
 | 
						|
 | 
						|
        Args:
 | 
						|
            max_workers: The maximum number of threads that can be used to
 | 
						|
                execute the given calls.
 | 
						|
            thread_name_prefix: An optional name prefix to give our threads.
 | 
						|
            initializer: A callable used to initialize worker threads.
 | 
						|
            initargs: A tuple of arguments to pass to the initializer.
 | 
						|
        """
 | 
						|
        if max_workers is None:
 | 
						|
            # ThreadPoolExecutor is often used to:
 | 
						|
            # * CPU bound task which releases GIL
 | 
						|
            # * I/O bound task (which releases GIL, of course)
 | 
						|
            #
 | 
						|
            # We use cpu_count + 4 for both types of tasks.
 | 
						|
            # But we limit it to 32 to avoid consuming surprisingly large resource
 | 
						|
            # on many core machine.
 | 
						|
            max_workers = min(32, (os.cpu_count() or 1) + 4)
 | 
						|
        if max_workers <= 0:
 | 
						|
            raise ValueError("max_workers must be greater than 0")
 | 
						|
 | 
						|
        if initializer is not None and not callable(initializer):
 | 
						|
            raise TypeError("initializer must be a callable")
 | 
						|
 | 
						|
        self._max_workers = max_workers
 | 
						|
        self._work_queue = queue.SimpleQueue()
 | 
						|
        self._idle_semaphore = threading.Semaphore(0)
 | 
						|
        self._threads = set()
 | 
						|
        self._broken = False
 | 
						|
        self._shutdown = False
 | 
						|
        self._shutdown_lock = threading.Lock()
 | 
						|
        self._thread_name_prefix = (thread_name_prefix or
 | 
						|
                                    ("ThreadPoolExecutor-%d" % self._counter()))
 | 
						|
        self._initializer = initializer
 | 
						|
        self._initargs = initargs
 | 
						|
 | 
						|
    def submit(self, fn, /, *args, **kwargs):
 | 
						|
        with self._shutdown_lock, _global_shutdown_lock:
 | 
						|
            if self._broken:
 | 
						|
                raise BrokenThreadPool(self._broken)
 | 
						|
 | 
						|
            if self._shutdown:
 | 
						|
                raise RuntimeError('cannot schedule new futures after shutdown')
 | 
						|
            if _shutdown:
 | 
						|
                raise RuntimeError('cannot schedule new futures after '
 | 
						|
                                   'interpreter shutdown')
 | 
						|
 | 
						|
            f = _base.Future()
 | 
						|
            w = _WorkItem(f, fn, args, kwargs)
 | 
						|
 | 
						|
            self._work_queue.put(w)
 | 
						|
            self._adjust_thread_count()
 | 
						|
            return f
 | 
						|
    submit.__doc__ = _base.Executor.submit.__doc__
 | 
						|
 | 
						|
    def _adjust_thread_count(self):
 | 
						|
        # if idle threads are available, don't spin new threads
 | 
						|
        if self._idle_semaphore.acquire(timeout=0):
 | 
						|
            return
 | 
						|
 | 
						|
        # When the executor gets lost, the weakref callback will wake up
 | 
						|
        # the worker threads.
 | 
						|
        def weakref_cb(_, q=self._work_queue):
 | 
						|
            q.put(None)
 | 
						|
 | 
						|
        num_threads = len(self._threads)
 | 
						|
        if num_threads < self._max_workers:
 | 
						|
            thread_name = '%s_%d' % (self._thread_name_prefix or self,
 | 
						|
                                     num_threads)
 | 
						|
            t = threading.Thread(name=thread_name, target=_worker,
 | 
						|
                                 args=(weakref.ref(self, weakref_cb),
 | 
						|
                                       self._work_queue,
 | 
						|
                                       self._initializer,
 | 
						|
                                       self._initargs))
 | 
						|
            t.start()
 | 
						|
            self._threads.add(t)
 | 
						|
            _threads_queues[t] = self._work_queue
 | 
						|
 | 
						|
    def _initializer_failed(self):
 | 
						|
        with self._shutdown_lock:
 | 
						|
            self._broken = ('A thread initializer failed, the thread pool '
 | 
						|
                            'is not usable anymore')
 | 
						|
            # Drain work queue and mark pending futures failed
 | 
						|
            while True:
 | 
						|
                try:
 | 
						|
                    work_item = self._work_queue.get_nowait()
 | 
						|
                except queue.Empty:
 | 
						|
                    break
 | 
						|
                if work_item is not None:
 | 
						|
                    work_item.future.set_exception(BrokenThreadPool(self._broken))
 | 
						|
 | 
						|
    def shutdown(self, wait=True, *, cancel_futures=False):
 | 
						|
        with self._shutdown_lock:
 | 
						|
            self._shutdown = True
 | 
						|
            if cancel_futures:
 | 
						|
                # Drain all work items from the queue, and then cancel their
 | 
						|
                # associated futures.
 | 
						|
                while True:
 | 
						|
                    try:
 | 
						|
                        work_item = self._work_queue.get_nowait()
 | 
						|
                    except queue.Empty:
 | 
						|
                        break
 | 
						|
                    if work_item is not None:
 | 
						|
                        work_item.future.cancel()
 | 
						|
 | 
						|
            # Send a wake-up to prevent threads calling
 | 
						|
            # _work_queue.get(block=True) from permanently blocking.
 | 
						|
            self._work_queue.put(None)
 | 
						|
        if wait:
 | 
						|
            for t in self._threads:
 | 
						|
                t.join()
 | 
						|
    shutdown.__doc__ = _base.Executor.shutdown.__doc__
 |