mirror of
				https://github.com/python/cpython.git
				synced 2025-10-24 18:33:49 +00:00 
			
		
		
		
	 0e9c364f4a
			
		
	
	
		0e9c364f4a
		
			
		
	
	
	
	
		
			
			Joining a thread now ensures the underlying OS thread has exited. This is required for safer fork() in multi-threaded processes. --------- Co-authored-by: blurb-it[bot] <43283697+blurb-it[bot]@users.noreply.github.com>
		
			
				
	
	
		
			231 lines
		
	
	
	
		
			8.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			231 lines
		
	
	
	
		
			8.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import os
 | |
| import sys
 | |
| import threading
 | |
| import time
 | |
| import unittest
 | |
| from concurrent import futures
 | |
| from concurrent.futures.process import BrokenProcessPool
 | |
| 
 | |
| from test import support
 | |
| from test.support import hashlib_helper
 | |
| 
 | |
| from .executor import ExecutorTest, mul
 | |
| from .util import (
 | |
|     ProcessPoolForkMixin, ProcessPoolForkserverMixin, ProcessPoolSpawnMixin,
 | |
|     create_executor_tests, setup_module)
 | |
| 
 | |
| 
 | |
| class EventfulGCObj():
 | |
|     def __init__(self, mgr):
 | |
|         self.event = mgr.Event()
 | |
| 
 | |
|     def __del__(self):
 | |
|         self.event.set()
 | |
| 
 | |
| 
 | |
| class ProcessPoolExecutorTest(ExecutorTest):
 | |
| 
 | |
|     @unittest.skipUnless(sys.platform=='win32', 'Windows-only process limit')
 | |
|     def test_max_workers_too_large(self):
 | |
|         with self.assertRaisesRegex(ValueError,
 | |
|                                     "max_workers must be <= 61"):
 | |
|             futures.ProcessPoolExecutor(max_workers=62)
 | |
| 
 | |
|     def test_killed_child(self):
 | |
|         # When a child process is abruptly terminated, the whole pool gets
 | |
|         # "broken".
 | |
|         futures = [self.executor.submit(time.sleep, 3)]
 | |
|         # Get one of the processes, and terminate (kill) it
 | |
|         p = next(iter(self.executor._processes.values()))
 | |
|         p.terminate()
 | |
|         for fut in futures:
 | |
|             self.assertRaises(BrokenProcessPool, fut.result)
 | |
|         # Submitting other jobs fails as well.
 | |
|         self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8)
 | |
| 
 | |
|     def test_map_chunksize(self):
 | |
|         def bad_map():
 | |
|             list(self.executor.map(pow, range(40), range(40), chunksize=-1))
 | |
| 
 | |
|         ref = list(map(pow, range(40), range(40)))
 | |
|         self.assertEqual(
 | |
|             list(self.executor.map(pow, range(40), range(40), chunksize=6)),
 | |
|             ref)
 | |
|         self.assertEqual(
 | |
|             list(self.executor.map(pow, range(40), range(40), chunksize=50)),
 | |
|             ref)
 | |
|         self.assertEqual(
 | |
|             list(self.executor.map(pow, range(40), range(40), chunksize=40)),
 | |
|             ref)
 | |
|         self.assertRaises(ValueError, bad_map)
 | |
| 
 | |
|     @classmethod
 | |
|     def _test_traceback(cls):
 | |
|         raise RuntimeError(123) # some comment
 | |
| 
 | |
|     def test_traceback(self):
 | |
|         # We want ensure that the traceback from the child process is
 | |
|         # contained in the traceback raised in the main process.
 | |
|         future = self.executor.submit(self._test_traceback)
 | |
|         with self.assertRaises(Exception) as cm:
 | |
|             future.result()
 | |
| 
 | |
|         exc = cm.exception
 | |
|         self.assertIs(type(exc), RuntimeError)
 | |
|         self.assertEqual(exc.args, (123,))
 | |
|         cause = exc.__cause__
 | |
|         self.assertIs(type(cause), futures.process._RemoteTraceback)
 | |
|         self.assertIn('raise RuntimeError(123) # some comment', cause.tb)
 | |
| 
 | |
|         with support.captured_stderr() as f1:
 | |
|             try:
 | |
|                 raise exc
 | |
|             except RuntimeError:
 | |
|                 sys.excepthook(*sys.exc_info())
 | |
|         self.assertIn('raise RuntimeError(123) # some comment',
 | |
|                       f1.getvalue())
 | |
| 
 | |
|     @hashlib_helper.requires_hashdigest('md5')
 | |
|     def test_ressources_gced_in_workers(self):
 | |
|         # Ensure that argument for a job are correctly gc-ed after the job
 | |
|         # is finished
 | |
|         mgr = self.get_context().Manager()
 | |
|         obj = EventfulGCObj(mgr)
 | |
|         future = self.executor.submit(id, obj)
 | |
|         future.result()
 | |
| 
 | |
|         self.assertTrue(obj.event.wait(timeout=1))
 | |
| 
 | |
|         # explicitly destroy the object to ensure that EventfulGCObj.__del__()
 | |
|         # is called while manager is still running.
 | |
|         obj = None
 | |
|         support.gc_collect()
 | |
| 
 | |
|         mgr.shutdown()
 | |
|         mgr.join()
 | |
| 
 | |
|     def test_saturation(self):
 | |
|         executor = self.executor
 | |
|         mp_context = self.get_context()
 | |
|         sem = mp_context.Semaphore(0)
 | |
|         job_count = 15 * executor._max_workers
 | |
|         for _ in range(job_count):
 | |
|             executor.submit(sem.acquire)
 | |
|         self.assertEqual(len(executor._processes), executor._max_workers)
 | |
|         for _ in range(job_count):
 | |
|             sem.release()
 | |
| 
 | |
|     def test_idle_process_reuse_one(self):
 | |
|         executor = self.executor
 | |
|         assert executor._max_workers >= 4
 | |
|         if self.get_context().get_start_method(allow_none=False) == "fork":
 | |
|             raise unittest.SkipTest("Incompatible with the fork start method.")
 | |
|         executor.submit(mul, 21, 2).result()
 | |
|         executor.submit(mul, 6, 7).result()
 | |
|         executor.submit(mul, 3, 14).result()
 | |
|         self.assertEqual(len(executor._processes), 1)
 | |
| 
 | |
|     def test_idle_process_reuse_multiple(self):
 | |
|         executor = self.executor
 | |
|         assert executor._max_workers <= 5
 | |
|         if self.get_context().get_start_method(allow_none=False) == "fork":
 | |
|             raise unittest.SkipTest("Incompatible with the fork start method.")
 | |
|         executor.submit(mul, 12, 7).result()
 | |
|         executor.submit(mul, 33, 25)
 | |
|         executor.submit(mul, 25, 26).result()
 | |
|         executor.submit(mul, 18, 29)
 | |
|         executor.submit(mul, 1, 2).result()
 | |
|         executor.submit(mul, 0, 9)
 | |
|         self.assertLessEqual(len(executor._processes), 3)
 | |
|         executor.shutdown()
 | |
| 
 | |
|     def test_max_tasks_per_child(self):
 | |
|         context = self.get_context()
 | |
|         if context.get_start_method(allow_none=False) == "fork":
 | |
|             with self.assertRaises(ValueError):
 | |
|                 self.executor_type(1, mp_context=context, max_tasks_per_child=3)
 | |
|             return
 | |
|         # not using self.executor as we need to control construction.
 | |
|         # arguably this could go in another class w/o that mixin.
 | |
|         executor = self.executor_type(
 | |
|                 1, mp_context=context, max_tasks_per_child=3)
 | |
|         f1 = executor.submit(os.getpid)
 | |
|         original_pid = f1.result()
 | |
|         # The worker pid remains the same as the worker could be reused
 | |
|         f2 = executor.submit(os.getpid)
 | |
|         self.assertEqual(f2.result(), original_pid)
 | |
|         self.assertEqual(len(executor._processes), 1)
 | |
|         f3 = executor.submit(os.getpid)
 | |
|         self.assertEqual(f3.result(), original_pid)
 | |
| 
 | |
|         # A new worker is spawned, with a statistically different pid,
 | |
|         # while the previous was reaped.
 | |
|         f4 = executor.submit(os.getpid)
 | |
|         new_pid = f4.result()
 | |
|         self.assertNotEqual(original_pid, new_pid)
 | |
|         self.assertEqual(len(executor._processes), 1)
 | |
| 
 | |
|         executor.shutdown()
 | |
| 
 | |
|     def test_max_tasks_per_child_defaults_to_spawn_context(self):
 | |
|         # not using self.executor as we need to control construction.
 | |
|         # arguably this could go in another class w/o that mixin.
 | |
|         executor = self.executor_type(1, max_tasks_per_child=3)
 | |
|         self.assertEqual(executor._mp_context.get_start_method(), "spawn")
 | |
| 
 | |
|     def test_max_tasks_early_shutdown(self):
 | |
|         context = self.get_context()
 | |
|         if context.get_start_method(allow_none=False) == "fork":
 | |
|             raise unittest.SkipTest("Incompatible with the fork start method.")
 | |
|         # not using self.executor as we need to control construction.
 | |
|         # arguably this could go in another class w/o that mixin.
 | |
|         executor = self.executor_type(
 | |
|                 3, mp_context=context, max_tasks_per_child=1)
 | |
|         futures = []
 | |
|         for i in range(6):
 | |
|             futures.append(executor.submit(mul, i, i))
 | |
|         executor.shutdown()
 | |
|         for i, future in enumerate(futures):
 | |
|             self.assertEqual(future.result(), mul(i, i))
 | |
| 
 | |
|     def test_python_finalization_error(self):
 | |
|         # gh-109047: Catch RuntimeError on thread creation
 | |
|         # during Python finalization.
 | |
| 
 | |
|         context = self.get_context()
 | |
| 
 | |
|         # gh-109047: Mock the threading.start_joinable_thread() function to inject
 | |
|         # RuntimeError: simulate the error raised during Python finalization.
 | |
|         # Block the second creation: create _ExecutorManagerThread, but block
 | |
|         # QueueFeederThread.
 | |
|         orig_start_new_thread = threading._start_joinable_thread
 | |
|         nthread = 0
 | |
|         def mock_start_new_thread(func, *args):
 | |
|             nonlocal nthread
 | |
|             if nthread >= 1:
 | |
|                 raise RuntimeError("can't create new thread at "
 | |
|                                    "interpreter shutdown")
 | |
|             nthread += 1
 | |
|             return orig_start_new_thread(func, *args)
 | |
| 
 | |
|         with support.swap_attr(threading, '_start_joinable_thread',
 | |
|                                mock_start_new_thread):
 | |
|             executor = self.executor_type(max_workers=2, mp_context=context)
 | |
|             with executor:
 | |
|                 with self.assertRaises(BrokenProcessPool):
 | |
|                     list(executor.map(mul, [(2, 3)] * 10))
 | |
|             executor.shutdown()
 | |
| 
 | |
| 
 | |
| create_executor_tests(globals(), ProcessPoolExecutorTest,
 | |
|                       executor_mixins=(ProcessPoolForkMixin,
 | |
|                                        ProcessPoolForkserverMixin,
 | |
|                                        ProcessPoolSpawnMixin))
 | |
| 
 | |
| 
 | |
| def setUpModule():
 | |
|     setup_module()
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     unittest.main()
 |