mirror of
				https://github.com/python/cpython.git
				synced 2025-11-03 23:21:29 +00:00 
			
		
		
		
	Revert "gh-128041: Add terminate_workers and kill_workers methods to ProcessPoolExecutor (GH-128043)" (#130838)
				
					
				
			The test_concurrent_futures.test_process_pool test is failing in CI.
This reverts commit f97e4098ff.
			
			
This commit is contained in:
		
							parent
							
								
									63ffb406bb
								
							
						
					
					
						commit
						efadc5874c
					
				
					 5 changed files with 0 additions and 201 deletions
				
			
		| 
						 | 
					@ -415,30 +415,6 @@ to a :class:`ProcessPoolExecutor` will result in deadlock.
 | 
				
			||||||
      require the *fork* start method for :class:`ProcessPoolExecutor` you must
 | 
					      require the *fork* start method for :class:`ProcessPoolExecutor` you must
 | 
				
			||||||
      explicitly pass ``mp_context=multiprocessing.get_context("fork")``.
 | 
					      explicitly pass ``mp_context=multiprocessing.get_context("fork")``.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
   .. method:: terminate_workers()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
      Attempt to terminate all living worker processes immediately by calling
 | 
					 | 
				
			||||||
      :meth:`Process.terminate <multiprocessing.Process.terminate>` on each of them.
 | 
					 | 
				
			||||||
      Internally, it will also call :meth:`Executor.shutdown` to ensure that all
 | 
					 | 
				
			||||||
      other resources associated with the executor are freed.
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
      After calling this method the caller should no longer submit tasks to the
 | 
					 | 
				
			||||||
      executor.
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
      .. versionadded:: next
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
   .. method:: kill_workers()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
      Attempt to kill all living worker processes immediately by calling
 | 
					 | 
				
			||||||
      :meth:`Process.kill <multiprocessing.Process.kill>` on each of them.
 | 
					 | 
				
			||||||
      Internally, it will also call :meth:`Executor.shutdown` to ensure that all
 | 
					 | 
				
			||||||
      other resources associated with the executor are freed.
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
      After calling this method the caller should no longer submit tasks to the
 | 
					 | 
				
			||||||
      executor.
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
      .. versionadded:: next
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
.. _processpoolexecutor-example:
 | 
					.. _processpoolexecutor-example:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
ProcessPoolExecutor Example
 | 
					ProcessPoolExecutor Example
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -444,11 +444,6 @@ contextvars
 | 
				
			||||||
* Support context manager protocol by :class:`contextvars.Token`.
 | 
					* Support context manager protocol by :class:`contextvars.Token`.
 | 
				
			||||||
  (Contributed by Andrew Svetlov in :gh:`129889`.)
 | 
					  (Contributed by Andrew Svetlov in :gh:`129889`.)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
* Add :meth:`concurrent.futures.ProcessPoolExecutor.terminate_workers` and
 | 
					 | 
				
			||||||
  :meth:`concurrent.futures.ProcessPoolExecutor.kill_workers` as
 | 
					 | 
				
			||||||
  ways to terminate or kill all living worker processes in the given pool.
 | 
					 | 
				
			||||||
  (Contributed by Charles Machalow in :gh:`128043`.)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
ctypes
 | 
					ctypes
 | 
				
			||||||
------
 | 
					------
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -626,14 +626,6 @@ class BrokenProcessPool(_base.BrokenExecutor):
 | 
				
			||||||
    while a future was in the running state.
 | 
					    while a future was in the running state.
 | 
				
			||||||
    """
 | 
					    """
 | 
				
			||||||
 | 
					
 | 
				
			||||||
_TERMINATE = "terminate"
 | 
					 | 
				
			||||||
_KILL = "kill"
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
_SHUTDOWN_CALLBACK_OPERATION = {
 | 
					 | 
				
			||||||
    _TERMINATE,
 | 
					 | 
				
			||||||
    _KILL
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
class ProcessPoolExecutor(_base.Executor):
 | 
					class ProcessPoolExecutor(_base.Executor):
 | 
				
			||||||
    def __init__(self, max_workers=None, mp_context=None,
 | 
					    def __init__(self, max_workers=None, mp_context=None,
 | 
				
			||||||
| 
						 | 
					@ -863,66 +855,3 @@ def shutdown(self, wait=True, *, cancel_futures=False):
 | 
				
			||||||
        self._executor_manager_thread_wakeup = None
 | 
					        self._executor_manager_thread_wakeup = None
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    shutdown.__doc__ = _base.Executor.shutdown.__doc__
 | 
					    shutdown.__doc__ = _base.Executor.shutdown.__doc__
 | 
				
			||||||
 | 
					 | 
				
			||||||
    def _force_shutdown(self, operation):
 | 
					 | 
				
			||||||
        """Attempts to terminate or kill the executor's workers based off the
 | 
					 | 
				
			||||||
        given operation. Iterates through all of the current processes and
 | 
					 | 
				
			||||||
        performs the relevant task if the process is still alive.
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        After terminating workers, the pool will be in a broken state
 | 
					 | 
				
			||||||
        and no longer usable (for instance, new tasks should not be
 | 
					 | 
				
			||||||
        submitted).
 | 
					 | 
				
			||||||
        """
 | 
					 | 
				
			||||||
        if operation not in _SHUTDOWN_CALLBACK_OPERATION:
 | 
					 | 
				
			||||||
            raise ValueError(f"Unsupported operation: {operation!r}")
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        processes = {}
 | 
					 | 
				
			||||||
        if self._processes:
 | 
					 | 
				
			||||||
            processes = self._processes.copy()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        # shutdown will invalidate ._processes, so we copy it right before
 | 
					 | 
				
			||||||
        # calling. If we waited here, we would deadlock if a process decides not
 | 
					 | 
				
			||||||
        # to exit.
 | 
					 | 
				
			||||||
        self.shutdown(wait=False, cancel_futures=True)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        if not processes:
 | 
					 | 
				
			||||||
            return
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        for proc in processes.values():
 | 
					 | 
				
			||||||
            try:
 | 
					 | 
				
			||||||
                if not proc.is_alive():
 | 
					 | 
				
			||||||
                    continue
 | 
					 | 
				
			||||||
            except ValueError:
 | 
					 | 
				
			||||||
                # The process is already exited/closed out.
 | 
					 | 
				
			||||||
                continue
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            try:
 | 
					 | 
				
			||||||
                if operation == _TERMINATE:
 | 
					 | 
				
			||||||
                    proc.terminate()
 | 
					 | 
				
			||||||
                elif operation == _KILL:
 | 
					 | 
				
			||||||
                    proc.kill()
 | 
					 | 
				
			||||||
            except ProcessLookupError:
 | 
					 | 
				
			||||||
                # The process just ended before our signal
 | 
					 | 
				
			||||||
                continue
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    def terminate_workers(self):
 | 
					 | 
				
			||||||
        """Attempts to terminate the executor's workers.
 | 
					 | 
				
			||||||
        Iterates through all of the current worker processes and terminates
 | 
					 | 
				
			||||||
        each one that is still alive.
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        After terminating workers, the pool will be in a broken state
 | 
					 | 
				
			||||||
        and no longer usable (for instance, new tasks should not be
 | 
					 | 
				
			||||||
        submitted).
 | 
					 | 
				
			||||||
        """
 | 
					 | 
				
			||||||
        return self._force_shutdown(operation=_TERMINATE)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    def kill_workers(self):
 | 
					 | 
				
			||||||
        """Attempts to kill the executor's workers.
 | 
					 | 
				
			||||||
        Iterates through all of the current worker processes and kills
 | 
					 | 
				
			||||||
        each one that is still alive.
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        After killing workers, the pool will be in a broken state
 | 
					 | 
				
			||||||
        and no longer usable (for instance, new tasks should not be
 | 
					 | 
				
			||||||
        submitted).
 | 
					 | 
				
			||||||
        """
 | 
					 | 
				
			||||||
        return self._force_shutdown(operation=_KILL)
 | 
					 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -1,17 +1,13 @@
 | 
				
			||||||
import os
 | 
					import os
 | 
				
			||||||
import queue
 | 
					 | 
				
			||||||
import signal
 | 
					 | 
				
			||||||
import sys
 | 
					import sys
 | 
				
			||||||
import threading
 | 
					import threading
 | 
				
			||||||
import time
 | 
					import time
 | 
				
			||||||
import unittest
 | 
					import unittest
 | 
				
			||||||
import unittest.mock
 | 
					 | 
				
			||||||
from concurrent import futures
 | 
					from concurrent import futures
 | 
				
			||||||
from concurrent.futures.process import BrokenProcessPool
 | 
					from concurrent.futures.process import BrokenProcessPool
 | 
				
			||||||
 | 
					
 | 
				
			||||||
from test import support
 | 
					from test import support
 | 
				
			||||||
from test.support import hashlib_helper
 | 
					from test.support import hashlib_helper
 | 
				
			||||||
from test.test_importlib.metadata.fixtures import parameterize
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
from .executor import ExecutorTest, mul
 | 
					from .executor import ExecutorTest, mul
 | 
				
			||||||
from .util import (
 | 
					from .util import (
 | 
				
			||||||
| 
						 | 
					@ -26,19 +22,6 @@ def __init__(self, mgr):
 | 
				
			||||||
    def __del__(self):
 | 
					    def __del__(self):
 | 
				
			||||||
        self.event.set()
 | 
					        self.event.set()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
TERMINATE_WORKERS = futures.ProcessPoolExecutor.terminate_workers.__name__
 | 
					 | 
				
			||||||
KILL_WORKERS = futures.ProcessPoolExecutor.kill_workers.__name__
 | 
					 | 
				
			||||||
FORCE_SHUTDOWN_PARAMS = [
 | 
					 | 
				
			||||||
    dict(function_name=TERMINATE_WORKERS),
 | 
					 | 
				
			||||||
    dict(function_name=KILL_WORKERS),
 | 
					 | 
				
			||||||
]
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
def _put_sleep_put(queue):
 | 
					 | 
				
			||||||
    """ Used as part of test_terminate_workers """
 | 
					 | 
				
			||||||
    queue.put('started')
 | 
					 | 
				
			||||||
    time.sleep(2)
 | 
					 | 
				
			||||||
    queue.put('finished')
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
class ProcessPoolExecutorTest(ExecutorTest):
 | 
					class ProcessPoolExecutorTest(ExecutorTest):
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -235,86 +218,6 @@ def mock_start_new_thread(func, *args, **kwargs):
 | 
				
			||||||
                    list(executor.map(mul, [(2, 3)] * 10))
 | 
					                    list(executor.map(mul, [(2, 3)] * 10))
 | 
				
			||||||
            executor.shutdown()
 | 
					            executor.shutdown()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def test_terminate_workers(self):
 | 
					 | 
				
			||||||
        mock_fn = unittest.mock.Mock()
 | 
					 | 
				
			||||||
        with self.executor_type(max_workers=1) as executor:
 | 
					 | 
				
			||||||
            executor._force_shutdown = mock_fn
 | 
					 | 
				
			||||||
            executor.terminate_workers()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        mock_fn.assert_called_once_with(operation=futures.process._TERMINATE)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    def test_kill_workers(self):
 | 
					 | 
				
			||||||
        mock_fn = unittest.mock.Mock()
 | 
					 | 
				
			||||||
        with self.executor_type(max_workers=1) as executor:
 | 
					 | 
				
			||||||
            executor._force_shutdown = mock_fn
 | 
					 | 
				
			||||||
            executor.kill_workers()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        mock_fn.assert_called_once_with(operation=futures.process._KILL)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    def test_force_shutdown_workers_invalid_op(self):
 | 
					 | 
				
			||||||
        with self.executor_type(max_workers=1) as executor:
 | 
					 | 
				
			||||||
            self.assertRaises(ValueError,
 | 
					 | 
				
			||||||
                              executor._force_shutdown,
 | 
					 | 
				
			||||||
                              operation='invalid operation'),
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    @parameterize(*FORCE_SHUTDOWN_PARAMS)
 | 
					 | 
				
			||||||
    def test_force_shutdown_workers(self, function_name):
 | 
					 | 
				
			||||||
        manager = self.get_context().Manager()
 | 
					 | 
				
			||||||
        q = manager.Queue()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        with self.executor_type(max_workers=1) as executor:
 | 
					 | 
				
			||||||
            executor.submit(_put_sleep_put, q)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            # We should get started, but not finished since we'll terminate the
 | 
					 | 
				
			||||||
            # workers just after
 | 
					 | 
				
			||||||
            self.assertEqual(q.get(timeout=5), 'started')
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            worker_process = list(executor._processes.values())[0]
 | 
					 | 
				
			||||||
            getattr(executor, function_name)()
 | 
					 | 
				
			||||||
            worker_process.join()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            if function_name == TERMINATE_WORKERS or \
 | 
					 | 
				
			||||||
                sys.platform == 'win32':
 | 
					 | 
				
			||||||
                # On windows, kill and terminate both send SIGTERM
 | 
					 | 
				
			||||||
                self.assertEqual(worker_process.exitcode, -signal.SIGTERM)
 | 
					 | 
				
			||||||
            elif function_name == KILL_WORKERS:
 | 
					 | 
				
			||||||
                self.assertEqual(worker_process.exitcode, -signal.SIGKILL)
 | 
					 | 
				
			||||||
            else:
 | 
					 | 
				
			||||||
                self.fail(f"Unknown operation: {function_name}")
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            self.assertRaises(queue.Empty, q.get, timeout=1)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    @parameterize(*FORCE_SHUTDOWN_PARAMS)
 | 
					 | 
				
			||||||
    def test_force_shutdown_workers_dead_workers(self, function_name):
 | 
					 | 
				
			||||||
        with self.executor_type(max_workers=1) as executor:
 | 
					 | 
				
			||||||
            future = executor.submit(os._exit, 1)
 | 
					 | 
				
			||||||
            self.assertRaises(BrokenProcessPool, future.result)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            # even though the pool is broken, this shouldn't raise
 | 
					 | 
				
			||||||
            getattr(executor, function_name)()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    @parameterize(*FORCE_SHUTDOWN_PARAMS)
 | 
					 | 
				
			||||||
    def test_force_shutdown_workers_not_started_yet(self, function_name):
 | 
					 | 
				
			||||||
        ctx = self.get_context()
 | 
					 | 
				
			||||||
        with unittest.mock.patch.object(ctx, 'Process') as mock_process:
 | 
					 | 
				
			||||||
            with self.executor_type(max_workers=1, mp_context=ctx) as executor:
 | 
					 | 
				
			||||||
                # The worker has not been started yet, terminate/kill_workers
 | 
					 | 
				
			||||||
                # should basically no-op
 | 
					 | 
				
			||||||
                getattr(executor, function_name)()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            mock_process.return_value.kill.assert_not_called()
 | 
					 | 
				
			||||||
            mock_process.return_value.terminate.assert_not_called()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    @parameterize(*FORCE_SHUTDOWN_PARAMS)
 | 
					 | 
				
			||||||
    def test_force_shutdown_workers_stops_pool(self, function_name):
 | 
					 | 
				
			||||||
        with self.executor_type(max_workers=1) as executor:
 | 
					 | 
				
			||||||
            task = executor.submit(time.sleep, 0)
 | 
					 | 
				
			||||||
            self.assertIsNone(task.result())
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            getattr(executor, function_name)()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            self.assertRaises(RuntimeError, executor.submit, time.sleep, 0)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
create_executor_tests(globals(), ProcessPoolExecutorTest,
 | 
					create_executor_tests(globals(), ProcessPoolExecutorTest,
 | 
				
			||||||
                      executor_mixins=(ProcessPoolForkMixin,
 | 
					                      executor_mixins=(ProcessPoolForkMixin,
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -1,4 +0,0 @@
 | 
				
			||||||
Add :meth:`concurrent.futures.ProcessPoolExecutor.terminate_workers` and
 | 
					 | 
				
			||||||
:meth:`concurrent.futures.ProcessPoolExecutor.kill_workers` as
 | 
					 | 
				
			||||||
ways to terminate or kill all living worker processes in the given pool.
 | 
					 | 
				
			||||||
(Contributed by Charles Machalow in :gh:`128043`.)
 | 
					 | 
				
			||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue