mirror of
				https://github.com/python/cpython.git
				synced 2025-10-31 13:41:24 +00:00 
			
		
		
		
	gh-125451: Fix deadlock in ProcessPoolExecutor shutdown (#125492)
There was a deadlock when `ProcessPoolExecutor` shuts down at the same time that a queueing thread handles an error processing a task. Don't use `_shutdown_lock` to protect the `_ThreadWakeup` pipes -- use an internal lock instead. This fixes the ordering deadlock where the `ExecutorManagerThread` holds the `_shutdown_lock` and joins the queueing thread, while the queueing thread is attempting to acquire the `_shutdown_lock` while closing the `_ThreadWakeup`.
This commit is contained in:
		
							parent
							
								
									d83fcf8371
								
							
						
					
					
						commit
						760872efec
					
				
					 3 changed files with 23 additions and 32 deletions
				
			
		|  | @ -68,27 +68,31 @@ | ||||||
| class _ThreadWakeup: | class _ThreadWakeup: | ||||||
|     def __init__(self): |     def __init__(self): | ||||||
|         self._closed = False |         self._closed = False | ||||||
|  |         self._lock = threading.Lock() | ||||||
|         self._reader, self._writer = mp.Pipe(duplex=False) |         self._reader, self._writer = mp.Pipe(duplex=False) | ||||||
| 
 | 
 | ||||||
|     def close(self): |     def close(self): | ||||||
|         # Please note that we do not take the shutdown lock when |         # Please note that we do not take the self._lock when | ||||||
|         # calling clear() (to avoid deadlocking) so this method can |         # calling clear() (to avoid deadlocking) so this method can | ||||||
|         # only be called safely from the same thread as all calls to |         # only be called safely from the same thread as all calls to | ||||||
|         # clear() even if you hold the shutdown lock. Otherwise we |         # clear() even if you hold the lock. Otherwise we | ||||||
|         # might try to read from the closed pipe. |         # might try to read from the closed pipe. | ||||||
|         if not self._closed: |         with self._lock: | ||||||
|             self._closed = True |             if not self._closed: | ||||||
|             self._writer.close() |                 self._closed = True | ||||||
|             self._reader.close() |                 self._writer.close() | ||||||
|  |                 self._reader.close() | ||||||
| 
 | 
 | ||||||
|     def wakeup(self): |     def wakeup(self): | ||||||
|         if not self._closed: |         with self._lock: | ||||||
|             self._writer.send_bytes(b"") |             if not self._closed: | ||||||
|  |                 self._writer.send_bytes(b"") | ||||||
| 
 | 
 | ||||||
|     def clear(self): |     def clear(self): | ||||||
|         if not self._closed: |         if self._closed: | ||||||
|             while self._reader.poll(): |             raise RuntimeError('operation on closed _ThreadWakeup') | ||||||
|                 self._reader.recv_bytes() |         while self._reader.poll(): | ||||||
|  |             self._reader.recv_bytes() | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def _python_exit(): | def _python_exit(): | ||||||
|  | @ -167,10 +171,8 @@ def __init__(self, work_id, fn, args, kwargs): | ||||||
| 
 | 
 | ||||||
| class _SafeQueue(Queue): | class _SafeQueue(Queue): | ||||||
|     """Safe Queue set exception to the future object linked to a job""" |     """Safe Queue set exception to the future object linked to a job""" | ||||||
|     def __init__(self, max_size=0, *, ctx, pending_work_items, shutdown_lock, |     def __init__(self, max_size=0, *, ctx, pending_work_items, thread_wakeup): | ||||||
|                  thread_wakeup): |  | ||||||
|         self.pending_work_items = pending_work_items |         self.pending_work_items = pending_work_items | ||||||
|         self.shutdown_lock = shutdown_lock |  | ||||||
|         self.thread_wakeup = thread_wakeup |         self.thread_wakeup = thread_wakeup | ||||||
|         super().__init__(max_size, ctx=ctx) |         super().__init__(max_size, ctx=ctx) | ||||||
| 
 | 
 | ||||||
|  | @ -179,8 +181,7 @@ def _on_queue_feeder_error(self, e, obj): | ||||||
|             tb = format_exception(type(e), e, e.__traceback__) |             tb = format_exception(type(e), e, e.__traceback__) | ||||||
|             e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb))) |             e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb))) | ||||||
|             work_item = self.pending_work_items.pop(obj.work_id, None) |             work_item = self.pending_work_items.pop(obj.work_id, None) | ||||||
|             with self.shutdown_lock: |             self.thread_wakeup.wakeup() | ||||||
|                 self.thread_wakeup.wakeup() |  | ||||||
|             # work_item can be None if another process terminated. In this |             # work_item can be None if another process terminated. In this | ||||||
|             # case, the executor_manager_thread fails all work_items |             # case, the executor_manager_thread fails all work_items | ||||||
|             # with BrokenProcessPool |             # with BrokenProcessPool | ||||||
|  | @ -296,12 +297,10 @@ def __init__(self, executor): | ||||||
|         # if there is no pending work item. |         # if there is no pending work item. | ||||||
|         def weakref_cb(_, |         def weakref_cb(_, | ||||||
|                        thread_wakeup=self.thread_wakeup, |                        thread_wakeup=self.thread_wakeup, | ||||||
|                        shutdown_lock=self.shutdown_lock, |  | ||||||
|                        mp_util_debug=mp.util.debug): |                        mp_util_debug=mp.util.debug): | ||||||
|             mp_util_debug('Executor collected: triggering callback for' |             mp_util_debug('Executor collected: triggering callback for' | ||||||
|                           ' QueueManager wakeup') |                           ' QueueManager wakeup') | ||||||
|             with shutdown_lock: |             thread_wakeup.wakeup() | ||||||
|                 thread_wakeup.wakeup() |  | ||||||
| 
 | 
 | ||||||
|         self.executor_reference = weakref.ref(executor, weakref_cb) |         self.executor_reference = weakref.ref(executor, weakref_cb) | ||||||
| 
 | 
 | ||||||
|  | @ -429,11 +428,6 @@ def wait_result_broken_or_wakeup(self): | ||||||
|         elif wakeup_reader in ready: |         elif wakeup_reader in ready: | ||||||
|             is_broken = False |             is_broken = False | ||||||
| 
 | 
 | ||||||
|         # No need to hold the _shutdown_lock here because: |  | ||||||
|         # 1. we're the only thread to use the wakeup reader |  | ||||||
|         # 2. we're also the only thread to call thread_wakeup.close() |  | ||||||
|         # 3. we want to avoid a possible deadlock when both reader and writer |  | ||||||
|         #    would block (gh-105829) |  | ||||||
|         self.thread_wakeup.clear() |         self.thread_wakeup.clear() | ||||||
| 
 | 
 | ||||||
|         return result_item, is_broken, cause |         return result_item, is_broken, cause | ||||||
|  | @ -721,10 +715,9 @@ def __init__(self, max_workers=None, mp_context=None, | ||||||
|         # as it could result in a deadlock if a worker process dies with the |         # as it could result in a deadlock if a worker process dies with the | ||||||
|         # _result_queue write lock still acquired. |         # _result_queue write lock still acquired. | ||||||
|         # |         # | ||||||
|         # _shutdown_lock must be locked to access _ThreadWakeup.close() and |         # Care must be taken to only call clear and close from the | ||||||
|         # .wakeup(). Care must also be taken to not call clear or close from |         # executor_manager_thread, since _ThreadWakeup.clear() is not protected | ||||||
|         # more than one thread since _ThreadWakeup.clear() is not protected by |         # by a lock. | ||||||
|         # the _shutdown_lock |  | ||||||
|         self._executor_manager_thread_wakeup = _ThreadWakeup() |         self._executor_manager_thread_wakeup = _ThreadWakeup() | ||||||
| 
 | 
 | ||||||
|         # Create communication channels for the executor |         # Create communication channels for the executor | ||||||
|  | @ -735,7 +728,6 @@ def __init__(self, max_workers=None, mp_context=None, | ||||||
|         self._call_queue = _SafeQueue( |         self._call_queue = _SafeQueue( | ||||||
|             max_size=queue_size, ctx=self._mp_context, |             max_size=queue_size, ctx=self._mp_context, | ||||||
|             pending_work_items=self._pending_work_items, |             pending_work_items=self._pending_work_items, | ||||||
|             shutdown_lock=self._shutdown_lock, |  | ||||||
|             thread_wakeup=self._executor_manager_thread_wakeup) |             thread_wakeup=self._executor_manager_thread_wakeup) | ||||||
|         # Killed worker processes can produce spurious "broken pipe" |         # Killed worker processes can produce spurious "broken pipe" | ||||||
|         # tracebacks in the queue's own worker thread. But we detect killed |         # tracebacks in the queue's own worker thread. But we detect killed | ||||||
|  |  | ||||||
|  | @ -253,9 +253,6 @@ def test_cancel_futures_wait_false(self): | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| class ProcessPoolShutdownTest(ExecutorShutdownTest): | class ProcessPoolShutdownTest(ExecutorShutdownTest): | ||||||
|     # gh-125451: 'lock' cannot be serialized, the test is broken |  | ||||||
|     # and hangs randomly |  | ||||||
|     @unittest.skipIf(True, "broken test") |  | ||||||
|     def test_processes_terminate(self): |     def test_processes_terminate(self): | ||||||
|         def acquire_lock(lock): |         def acquire_lock(lock): | ||||||
|             lock.acquire() |             lock.acquire() | ||||||
|  |  | ||||||
|  | @ -0,0 +1,2 @@ | ||||||
|  | Fix deadlock when :class:`concurrent.futures.ProcessPoolExecutor` shuts down | ||||||
|  | concurrently with an error when feeding a job to a worker process. | ||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue
	
	 Sam Gross
						Sam Gross