mirror of
				https://github.com/python/cpython.git
				synced 2025-10-25 18:54:53 +00:00 
			
		
		
		
	 a376a72bd9
			
		
	
	
		a376a72bd9
		
			
		
	
	
	
	
		
			
			The test had an instability issue due to the ordering of the dummy queue operation and the real wakeup pipe operations. Both primitives are thread safe but not done atomically as a single update and may interleave arbitrarily. With the old order of operations this can lead to an incorrect state where the dummy queue is full but the wakeup pipe is empty. By swapping the order in clear() I think this can no longer happen in any possible operation interleaving (famous last words).
		
			
				
	
	
		
			332 lines
		
	
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			332 lines
		
	
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import contextlib
 | |
| import queue
 | |
| import signal
 | |
| import sys
 | |
| import time
 | |
| import unittest
 | |
| import unittest.mock
 | |
| from pickle import PicklingError
 | |
| from concurrent import futures
 | |
| from concurrent.futures.process import BrokenProcessPool, _ThreadWakeup
 | |
| 
 | |
| from test import support
 | |
| 
 | |
| from .util import (
 | |
|     create_executor_tests, setup_module,
 | |
|     ProcessPoolForkMixin, ProcessPoolForkserverMixin, ProcessPoolSpawnMixin)
 | |
| 
 | |
| 
 | |
| def _crash(delay=None):
 | |
|     """Induces a segfault."""
 | |
|     if delay:
 | |
|         time.sleep(delay)
 | |
|     import faulthandler
 | |
|     faulthandler.disable()
 | |
|     faulthandler._sigsegv()
 | |
| 
 | |
| 
 | |
| def _crash_with_data(data):
 | |
|     """Induces a segfault with dummy data in input."""
 | |
|     _crash()
 | |
| 
 | |
| 
 | |
| def _exit():
 | |
|     """Induces a sys exit with exitcode 1."""
 | |
|     sys.exit(1)
 | |
| 
 | |
| 
 | |
| def _raise_error(Err):
 | |
|     """Function that raises an Exception in process."""
 | |
|     raise Err()
 | |
| 
 | |
| 
 | |
| def _raise_error_ignore_stderr(Err):
 | |
|     """Function that raises an Exception in process and ignores stderr."""
 | |
|     import io
 | |
|     sys.stderr = io.StringIO()
 | |
|     raise Err()
 | |
| 
 | |
| 
 | |
| def _return_instance(cls):
 | |
|     """Function that returns a instance of cls."""
 | |
|     return cls()
 | |
| 
 | |
| 
 | |
| class CrashAtPickle(object):
 | |
|     """Bad object that triggers a segfault at pickling time."""
 | |
|     def __reduce__(self):
 | |
|         _crash()
 | |
| 
 | |
| 
 | |
| class CrashAtUnpickle(object):
 | |
|     """Bad object that triggers a segfault at unpickling time."""
 | |
|     def __reduce__(self):
 | |
|         return _crash, ()
 | |
| 
 | |
| 
 | |
| class ExitAtPickle(object):
 | |
|     """Bad object that triggers a process exit at pickling time."""
 | |
|     def __reduce__(self):
 | |
|         _exit()
 | |
| 
 | |
| 
 | |
| class ExitAtUnpickle(object):
 | |
|     """Bad object that triggers a process exit at unpickling time."""
 | |
|     def __reduce__(self):
 | |
|         return _exit, ()
 | |
| 
 | |
| 
 | |
| class ErrorAtPickle(object):
 | |
|     """Bad object that triggers an error at pickling time."""
 | |
|     def __reduce__(self):
 | |
|         from pickle import PicklingError
 | |
|         raise PicklingError("Error in pickle")
 | |
| 
 | |
| 
 | |
| class ErrorAtUnpickle(object):
 | |
|     """Bad object that triggers an error at unpickling time."""
 | |
|     def __reduce__(self):
 | |
|         from pickle import UnpicklingError
 | |
|         return _raise_error_ignore_stderr, (UnpicklingError, )
 | |
| 
 | |
| 
 | |
| class ExecutorDeadlockTest:
 | |
|     TIMEOUT = support.LONG_TIMEOUT
 | |
| 
 | |
|     def _fail_on_deadlock(self, executor):
 | |
|         # If we did not recover before TIMEOUT seconds, consider that the
 | |
|         # executor is in a deadlock state and forcefully clean all its
 | |
|         # composants.
 | |
|         import faulthandler
 | |
|         from tempfile import TemporaryFile
 | |
|         with TemporaryFile(mode="w+") as f:
 | |
|             faulthandler.dump_traceback(file=f)
 | |
|             f.seek(0)
 | |
|             tb = f.read()
 | |
|         for p in executor._processes.values():
 | |
|             p.terminate()
 | |
|         # This should be safe to call executor.shutdown here as all possible
 | |
|         # deadlocks should have been broken.
 | |
|         executor.shutdown(wait=True)
 | |
|         print(f"\nTraceback:\n {tb}", file=sys.__stderr__)
 | |
|         self.fail(f"Executor deadlock:\n\n{tb}")
 | |
| 
 | |
| 
 | |
|     def _check_crash(self, error, func, *args, ignore_stderr=False):
 | |
|         # test for deadlock caused by crashes in a pool
 | |
|         self.executor.shutdown(wait=True)
 | |
| 
 | |
|         executor = self.executor_type(
 | |
|             max_workers=2, mp_context=self.get_context())
 | |
|         res = executor.submit(func, *args)
 | |
| 
 | |
|         if ignore_stderr:
 | |
|             cm = support.captured_stderr()
 | |
|         else:
 | |
|             cm = contextlib.nullcontext()
 | |
| 
 | |
|         try:
 | |
|             with self.assertRaises(error):
 | |
|                 with cm:
 | |
|                     res.result(timeout=self.TIMEOUT)
 | |
|         except futures.TimeoutError:
 | |
|             # If we did not recover before TIMEOUT seconds,
 | |
|             # consider that the executor is in a deadlock state
 | |
|             self._fail_on_deadlock(executor)
 | |
|         executor.shutdown(wait=True)
 | |
| 
 | |
|     def test_error_at_task_pickle(self):
 | |
|         # Check problem occurring while pickling a task in
 | |
|         # the task_handler thread
 | |
|         self._check_crash(PicklingError, id, ErrorAtPickle())
 | |
| 
 | |
|     def test_exit_at_task_unpickle(self):
 | |
|         # Check problem occurring while unpickling a task on workers
 | |
|         self._check_crash(BrokenProcessPool, id, ExitAtUnpickle())
 | |
| 
 | |
|     def test_error_at_task_unpickle(self):
 | |
|         # gh-109832: Restore stderr overriden by _raise_error_ignore_stderr()
 | |
|         self.addCleanup(setattr, sys, 'stderr', sys.stderr)
 | |
| 
 | |
|         # Check problem occurring while unpickling a task on workers
 | |
|         self._check_crash(BrokenProcessPool, id, ErrorAtUnpickle())
 | |
| 
 | |
|     def test_crash_at_task_unpickle(self):
 | |
|         # Check problem occurring while unpickling a task on workers
 | |
|         self._check_crash(BrokenProcessPool, id, CrashAtUnpickle())
 | |
| 
 | |
|     def test_crash_during_func_exec_on_worker(self):
 | |
|         # Check problem occurring during func execution on workers
 | |
|         self._check_crash(BrokenProcessPool, _crash)
 | |
| 
 | |
|     def test_exit_during_func_exec_on_worker(self):
 | |
|         # Check problem occurring during func execution on workers
 | |
|         self._check_crash(SystemExit, _exit)
 | |
| 
 | |
|     def test_error_during_func_exec_on_worker(self):
 | |
|         # Check problem occurring during func execution on workers
 | |
|         self._check_crash(RuntimeError, _raise_error, RuntimeError)
 | |
| 
 | |
|     def test_crash_during_result_pickle_on_worker(self):
 | |
|         # Check problem occurring while pickling a task result
 | |
|         # on workers
 | |
|         self._check_crash(BrokenProcessPool, _return_instance, CrashAtPickle)
 | |
| 
 | |
|     def test_exit_during_result_pickle_on_worker(self):
 | |
|         # Check problem occurring while pickling a task result
 | |
|         # on workers
 | |
|         self._check_crash(SystemExit, _return_instance, ExitAtPickle)
 | |
| 
 | |
|     def test_error_during_result_pickle_on_worker(self):
 | |
|         # Check problem occurring while pickling a task result
 | |
|         # on workers
 | |
|         self._check_crash(PicklingError, _return_instance, ErrorAtPickle)
 | |
| 
 | |
|     def test_error_during_result_unpickle_in_result_handler(self):
 | |
|         # gh-109832: Restore stderr overriden by _raise_error_ignore_stderr()
 | |
|         self.addCleanup(setattr, sys, 'stderr', sys.stderr)
 | |
| 
 | |
|         # Check problem occurring while unpickling a task in
 | |
|         # the result_handler thread
 | |
|         self._check_crash(BrokenProcessPool,
 | |
|                           _return_instance, ErrorAtUnpickle,
 | |
|                           ignore_stderr=True)
 | |
| 
 | |
|     def test_exit_during_result_unpickle_in_result_handler(self):
 | |
|         # Check problem occurring while unpickling a task in
 | |
|         # the result_handler thread
 | |
|         self._check_crash(BrokenProcessPool, _return_instance, ExitAtUnpickle)
 | |
| 
 | |
|     def test_shutdown_deadlock(self):
 | |
|         # Test that the pool calling shutdown do not cause deadlock
 | |
|         # if a worker fails after the shutdown call.
 | |
|         self.executor.shutdown(wait=True)
 | |
|         with self.executor_type(max_workers=2,
 | |
|                                 mp_context=self.get_context()) as executor:
 | |
|             self.executor = executor  # Allow clean up in fail_on_deadlock
 | |
|             f = executor.submit(_crash, delay=.1)
 | |
|             executor.shutdown(wait=True)
 | |
|             with self.assertRaises(BrokenProcessPool):
 | |
|                 f.result()
 | |
| 
 | |
|     def test_shutdown_deadlock_pickle(self):
 | |
|         # Test that the pool calling shutdown with wait=False does not cause
 | |
|         # a deadlock if a task fails at pickle after the shutdown call.
 | |
|         # Reported in bpo-39104.
 | |
|         self.executor.shutdown(wait=True)
 | |
|         with self.executor_type(max_workers=2,
 | |
|                                 mp_context=self.get_context()) as executor:
 | |
|             self.executor = executor  # Allow clean up in fail_on_deadlock
 | |
| 
 | |
|             # Start the executor and get the executor_manager_thread to collect
 | |
|             # the threads and avoid dangling thread that should be cleaned up
 | |
|             # asynchronously.
 | |
|             executor.submit(id, 42).result()
 | |
|             executor_manager = executor._executor_manager_thread
 | |
| 
 | |
|             # Submit a task that fails at pickle and shutdown the executor
 | |
|             # without waiting
 | |
|             f = executor.submit(id, ErrorAtPickle())
 | |
|             executor.shutdown(wait=False)
 | |
|             with self.assertRaises(PicklingError):
 | |
|                 f.result()
 | |
| 
 | |
|         # Make sure the executor is eventually shutdown and do not leave
 | |
|         # dangling threads
 | |
|         executor_manager.join()
 | |
| 
 | |
|     def test_crash_big_data(self):
 | |
|         # Test that there is a clean exception instad of a deadlock when a
 | |
|         # child process crashes while some data is being written into the
 | |
|         # queue.
 | |
|         # https://github.com/python/cpython/issues/94777
 | |
|         self.executor.shutdown(wait=True)
 | |
|         data = "a" * support.PIPE_MAX_SIZE
 | |
|         with self.executor_type(max_workers=2,
 | |
|                                 mp_context=self.get_context()) as executor:
 | |
|             self.executor = executor  # Allow clean up in fail_on_deadlock
 | |
|             with self.assertRaises(BrokenProcessPool):
 | |
|                 list(executor.map(_crash_with_data, [data] * 10))
 | |
| 
 | |
|         executor.shutdown(wait=True)
 | |
| 
 | |
|     def test_gh105829_should_not_deadlock_if_wakeup_pipe_full(self):
 | |
|         # Issue #105829: The _ExecutorManagerThread wakeup pipe could
 | |
|         # fill up and block. See: https://github.com/python/cpython/issues/105829
 | |
| 
 | |
|         # Lots of cargo culting while writing this test, apologies if
 | |
|         # something is really stupid...
 | |
| 
 | |
|         self.executor.shutdown(wait=True)
 | |
| 
 | |
|         if not hasattr(signal, 'alarm'):
 | |
|             raise unittest.SkipTest(
 | |
|                 "Tested platform does not support the alarm signal")
 | |
| 
 | |
|         def timeout(_signum, _frame):
 | |
|             import faulthandler
 | |
|             faulthandler.dump_traceback()
 | |
| 
 | |
|             raise RuntimeError("timed out while submitting jobs?")
 | |
| 
 | |
|         thread_run = futures.process._ExecutorManagerThread.run
 | |
|         def mock_run(self):
 | |
|             # Delay thread startup so the wakeup pipe can fill up and block
 | |
|             time.sleep(3)
 | |
|             thread_run(self)
 | |
| 
 | |
|         class MockWakeup(_ThreadWakeup):
 | |
|             """Mock wakeup object to force the wakeup to block"""
 | |
|             def __init__(self):
 | |
|                 super().__init__()
 | |
|                 self._dummy_queue = queue.Queue(maxsize=1)
 | |
| 
 | |
|             def wakeup(self):
 | |
|                 self._dummy_queue.put(None, block=True)
 | |
|                 super().wakeup()
 | |
| 
 | |
|             def clear(self):
 | |
|                 super().clear()
 | |
|                 try:
 | |
|                     while True:
 | |
|                         self._dummy_queue.get_nowait()
 | |
|                 except queue.Empty:
 | |
|                     pass
 | |
| 
 | |
|         with (unittest.mock.patch.object(futures.process._ExecutorManagerThread,
 | |
|                                          'run', mock_run),
 | |
|               unittest.mock.patch('concurrent.futures.process._ThreadWakeup',
 | |
|                                   MockWakeup)):
 | |
|             with self.executor_type(max_workers=2,
 | |
|                                     mp_context=self.get_context()) as executor:
 | |
|                 self.executor = executor  # Allow clean up in fail_on_deadlock
 | |
| 
 | |
|                 job_num = 100
 | |
|                 job_data = range(job_num)
 | |
| 
 | |
|                 # Need to use sigalarm for timeout detection because
 | |
|                 # Executor.submit is not guarded by any timeout (both
 | |
|                 # self._work_ids.put(self._queue_count) and
 | |
|                 # self._executor_manager_thread_wakeup.wakeup() might
 | |
|                 # timeout, maybe more?). In this specific case it was
 | |
|                 # the wakeup call that deadlocked on a blocking pipe.
 | |
|                 old_handler = signal.signal(signal.SIGALRM, timeout)
 | |
|                 try:
 | |
|                     signal.alarm(int(self.TIMEOUT))
 | |
|                     self.assertEqual(job_num, len(list(executor.map(int, job_data))))
 | |
|                 finally:
 | |
|                     signal.alarm(0)
 | |
|                     signal.signal(signal.SIGALRM, old_handler)
 | |
| 
 | |
| 
 | |
| create_executor_tests(globals(), ExecutorDeadlockTest,
 | |
|                       executor_mixins=(ProcessPoolForkMixin,
 | |
|                                        ProcessPoolForkserverMixin,
 | |
|                                        ProcessPoolSpawnMixin))
 | |
| 
 | |
| def setUpModule():
 | |
|     setup_module()
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     unittest.main()
 |