mirror of
				https://github.com/python/cpython.git
				synced 2025-11-03 23:21:29 +00:00 
			
		
		
		
	gh-90622: Do not spawn ProcessPool workers on demand via fork method. (#91598)
Do not spawn ProcessPool workers on demand when they spawn via fork. This avoids potential deadlocks in the child processes due to forking from a multithreaded process.
This commit is contained in:
		
							parent
							
								
									a84a56d80f
								
							
						
					
					
						commit
						ebb37fc3fd
					
				
					 3 changed files with 49 additions and 11 deletions
				
			
		| 
						 | 
				
			
			@ -652,6 +652,10 @@ def __init__(self, max_workers=None, mp_context=None,
 | 
			
		|||
                mp_context = mp.get_context()
 | 
			
		||||
        self._mp_context = mp_context
 | 
			
		||||
 | 
			
		||||
        # https://github.com/python/cpython/issues/90622
 | 
			
		||||
        self._safe_to_dynamically_spawn_children = (
 | 
			
		||||
                self._mp_context.get_start_method(allow_none=False) != "fork")
 | 
			
		||||
 | 
			
		||||
        if initializer is not None and not callable(initializer):
 | 
			
		||||
            raise TypeError("initializer must be a callable")
 | 
			
		||||
        self._initializer = initializer
 | 
			
		||||
| 
						 | 
				
			
			@ -714,6 +718,8 @@ def __init__(self, max_workers=None, mp_context=None,
 | 
			
		|||
    def _start_executor_manager_thread(self):
 | 
			
		||||
        if self._executor_manager_thread is None:
 | 
			
		||||
            # Start the processes so that their sentinels are known.
 | 
			
		||||
            if not self._safe_to_dynamically_spawn_children:  # ie, using fork.
 | 
			
		||||
                self._launch_processes()
 | 
			
		||||
            self._executor_manager_thread = _ExecutorManagerThread(self)
 | 
			
		||||
            self._executor_manager_thread.start()
 | 
			
		||||
            _threads_wakeups[self._executor_manager_thread] = \
 | 
			
		||||
| 
						 | 
				
			
			@ -726,6 +732,23 @@ def _adjust_process_count(self):
 | 
			
		|||
 | 
			
		||||
        process_count = len(self._processes)
 | 
			
		||||
        if process_count < self._max_workers:
 | 
			
		||||
            # Assertion disabled as this codepath is also used to replace a
 | 
			
		||||
            # worker that unexpectedly dies, even when using the 'fork' start
 | 
			
		||||
            # method. That means there is still a potential deadlock bug. If a
 | 
			
		||||
            # 'fork' mp_context worker dies, we'll be forking a new one when
 | 
			
		||||
            # we know a thread is running (self._executor_manager_thread).
 | 
			
		||||
            #assert self._safe_to_dynamically_spawn_children or not self._executor_manager_thread, 'https://github.com/python/cpython/issues/90622'
 | 
			
		||||
            self._spawn_process()
 | 
			
		||||
 | 
			
		||||
    def _launch_processes(self):
 | 
			
		||||
        # https://github.com/python/cpython/issues/90622
 | 
			
		||||
        assert not self._executor_manager_thread, (
 | 
			
		||||
                'Processes cannot be fork()ed after the thread has started, '
 | 
			
		||||
                'deadlock in the child processes could result.')
 | 
			
		||||
        for _ in range(len(self._processes), self._max_workers):
 | 
			
		||||
            self._spawn_process()
 | 
			
		||||
 | 
			
		||||
    def _spawn_process(self):
 | 
			
		||||
        p = self._mp_context.Process(
 | 
			
		||||
            target=_process_worker,
 | 
			
		||||
            args=(self._call_queue,
 | 
			
		||||
| 
						 | 
				
			
			@ -755,6 +778,7 @@ def submit(self, fn, /, *args, **kwargs):
 | 
			
		|||
            # Wake up queue management thread
 | 
			
		||||
            self._executor_manager_thread_wakeup.wakeup()
 | 
			
		||||
 | 
			
		||||
            if self._safe_to_dynamically_spawn_children:
 | 
			
		||||
                self._adjust_process_count()
 | 
			
		||||
            self._start_executor_manager_thread()
 | 
			
		||||
            return f
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -497,10 +497,16 @@ def acquire_lock(lock):
 | 
			
		|||
            lock.acquire()
 | 
			
		||||
 | 
			
		||||
        mp_context = self.get_context()
 | 
			
		||||
        if mp_context.get_start_method(allow_none=False) == "fork":
 | 
			
		||||
            # fork pre-spawns, not on demand.
 | 
			
		||||
            expected_num_processes = self.worker_count
 | 
			
		||||
        else:
 | 
			
		||||
            expected_num_processes = 3
 | 
			
		||||
 | 
			
		||||
        sem = mp_context.Semaphore(0)
 | 
			
		||||
        for _ in range(3):
 | 
			
		||||
            self.executor.submit(acquire_lock, sem)
 | 
			
		||||
        self.assertEqual(len(self.executor._processes), 3)
 | 
			
		||||
        self.assertEqual(len(self.executor._processes), expected_num_processes)
 | 
			
		||||
        for _ in range(3):
 | 
			
		||||
            sem.release()
 | 
			
		||||
        processes = self.executor._processes
 | 
			
		||||
| 
						 | 
				
			
			@ -1021,6 +1027,8 @@ def test_saturation(self):
 | 
			
		|||
    def test_idle_process_reuse_one(self):
 | 
			
		||||
        executor = self.executor
 | 
			
		||||
        assert executor._max_workers >= 4
 | 
			
		||||
        if self.get_context().get_start_method(allow_none=False) == "fork":
 | 
			
		||||
            raise unittest.SkipTest("Incompatible with the fork start method.")
 | 
			
		||||
        executor.submit(mul, 21, 2).result()
 | 
			
		||||
        executor.submit(mul, 6, 7).result()
 | 
			
		||||
        executor.submit(mul, 3, 14).result()
 | 
			
		||||
| 
						 | 
				
			
			@ -1029,6 +1037,8 @@ def test_idle_process_reuse_one(self):
 | 
			
		|||
    def test_idle_process_reuse_multiple(self):
 | 
			
		||||
        executor = self.executor
 | 
			
		||||
        assert executor._max_workers <= 5
 | 
			
		||||
        if self.get_context().get_start_method(allow_none=False) == "fork":
 | 
			
		||||
            raise unittest.SkipTest("Incompatible with the fork start method.")
 | 
			
		||||
        executor.submit(mul, 12, 7).result()
 | 
			
		||||
        executor.submit(mul, 33, 25)
 | 
			
		||||
        executor.submit(mul, 25, 26).result()
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -0,0 +1,4 @@
 | 
			
		|||
Worker processes for :class:`concurrent.futures.ProcessPoolExecutor` are no
 | 
			
		||||
longer spawned on demand (a feature added in 3.9) when the multiprocessing
 | 
			
		||||
context start method is ``"fork"`` as that can lead to deadlocks in the
 | 
			
		||||
child processes due to a fork happening while threads are running.
 | 
			
		||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue