mirror of
				https://github.com/python/cpython.git
				synced 2025-10-30 21:21:22 +00:00 
			
		
		
		
	bpo-24484: Avoid race condition in multiprocessing cleanup (#2159)
* bpo-24484: Avoid race condition in multiprocessing cleanup The finalizer registry can be mutated while inspected by multiprocessing at process exit. * Use test.support.start_threads() * Add Misc/NEWS
This commit is contained in:
		
							parent
							
								
									8323189ff1
								
							
						
					
					
						commit
						1eb6c0074d
					
				
					 3 changed files with 84 additions and 11 deletions
				
			
		|  | @ -241,20 +241,28 @@ def _run_finalizers(minpriority=None): | ||||||
|         return |         return | ||||||
| 
 | 
 | ||||||
|     if minpriority is None: |     if minpriority is None: | ||||||
|         f = lambda p : p[0][0] is not None |         f = lambda p : p[0] is not None | ||||||
|     else: |     else: | ||||||
|         f = lambda p : p[0][0] is not None and p[0][0] >= minpriority |         f = lambda p : p[0] is not None and p[0] >= minpriority | ||||||
| 
 | 
 | ||||||
|     items = [x for x in list(_finalizer_registry.items()) if f(x)] |     # Careful: _finalizer_registry may be mutated while this function | ||||||
|     items.sort(reverse=True) |     # is running (either by a GC run or by another thread). | ||||||
| 
 | 
 | ||||||
|     for key, finalizer in items: |     # list(_finalizer_registry) should be atomic, while | ||||||
|         sub_debug('calling %s', finalizer) |     # list(_finalizer_registry.items()) is not. | ||||||
|         try: |     keys = [key for key in list(_finalizer_registry) if f(key)] | ||||||
|             finalizer() |     keys.sort(reverse=True) | ||||||
|         except Exception: | 
 | ||||||
|             import traceback |     for key in keys: | ||||||
|             traceback.print_exc() |         finalizer = _finalizer_registry.get(key) | ||||||
|  |         # key may have been removed from the registry | ||||||
|  |         if finalizer is not None: | ||||||
|  |             sub_debug('calling %s', finalizer) | ||||||
|  |             try: | ||||||
|  |                 finalizer() | ||||||
|  |             except Exception: | ||||||
|  |                 import traceback | ||||||
|  |                 traceback.print_exc() | ||||||
| 
 | 
 | ||||||
|     if minpriority is None: |     if minpriority is None: | ||||||
|         _finalizer_registry.clear() |         _finalizer_registry.clear() | ||||||
|  |  | ||||||
|  | @ -3110,6 +3110,14 @@ class _TestFinalize(BaseTestCase): | ||||||
| 
 | 
 | ||||||
|     ALLOWED_TYPES = ('processes',) |     ALLOWED_TYPES = ('processes',) | ||||||
| 
 | 
 | ||||||
|  |     def setUp(self): | ||||||
|  |         self.registry_backup = util._finalizer_registry.copy() | ||||||
|  |         util._finalizer_registry.clear() | ||||||
|  | 
 | ||||||
|  |     def tearDown(self): | ||||||
|  |         self.assertFalse(util._finalizer_registry) | ||||||
|  |         util._finalizer_registry.update(self.registry_backup) | ||||||
|  | 
 | ||||||
|     @classmethod |     @classmethod | ||||||
|     def _test_finalize(cls, conn): |     def _test_finalize(cls, conn): | ||||||
|         class Foo(object): |         class Foo(object): | ||||||
|  | @ -3159,6 +3167,61 @@ def test_finalize(self): | ||||||
|         result = [obj for obj in iter(conn.recv, 'STOP')] |         result = [obj for obj in iter(conn.recv, 'STOP')] | ||||||
|         self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e']) |         self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e']) | ||||||
| 
 | 
 | ||||||
|  |     def test_thread_safety(self): | ||||||
|  |         # bpo-24484: _run_finalizers() should be thread-safe | ||||||
|  |         def cb(): | ||||||
|  |             pass | ||||||
|  | 
 | ||||||
|  |         class Foo(object): | ||||||
|  |             def __init__(self): | ||||||
|  |                 self.ref = self  # create reference cycle | ||||||
|  |                 # insert finalizer at random key | ||||||
|  |                 util.Finalize(self, cb, exitpriority=random.randint(1, 100)) | ||||||
|  | 
 | ||||||
|  |         finish = False | ||||||
|  |         exc = None | ||||||
|  | 
 | ||||||
|  |         def run_finalizers(): | ||||||
|  |             nonlocal exc | ||||||
|  |             while not finish: | ||||||
|  |                 time.sleep(random.random() * 1e-1) | ||||||
|  |                 try: | ||||||
|  |                     # A GC run will eventually happen during this, | ||||||
|  |                     # collecting stale Foo's and mutating the registry | ||||||
|  |                     util._run_finalizers() | ||||||
|  |                 except Exception as e: | ||||||
|  |                     exc = e | ||||||
|  | 
 | ||||||
|  |         def make_finalizers(): | ||||||
|  |             nonlocal exc | ||||||
|  |             d = {} | ||||||
|  |             while not finish: | ||||||
|  |                 try: | ||||||
|  |                     # Old Foo's get gradually replaced and later | ||||||
|  |                     # collected by the GC (because of the cyclic ref) | ||||||
|  |                     d[random.getrandbits(5)] = {Foo() for i in range(10)} | ||||||
|  |                 except Exception as e: | ||||||
|  |                     exc = e | ||||||
|  |                     d.clear() | ||||||
|  | 
 | ||||||
|  |         old_interval = sys.getswitchinterval() | ||||||
|  |         old_threshold = gc.get_threshold() | ||||||
|  |         try: | ||||||
|  |             sys.setswitchinterval(1e-6) | ||||||
|  |             gc.set_threshold(5, 5, 5) | ||||||
|  |             threads = [threading.Thread(target=run_finalizers), | ||||||
|  |                        threading.Thread(target=make_finalizers)] | ||||||
|  |             with test.support.start_threads(threads): | ||||||
|  |                 time.sleep(4.0)  # Wait a bit to trigger race condition | ||||||
|  |                 finish = True | ||||||
|  |             if exc is not None: | ||||||
|  |                 raise exc | ||||||
|  |         finally: | ||||||
|  |             sys.setswitchinterval(old_interval) | ||||||
|  |             gc.set_threshold(*old_threshold) | ||||||
|  |             gc.collect()  # Collect remaining Foo's | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
| # | # | ||||||
| # Test that from ... import * works for each module | # Test that from ... import * works for each module | ||||||
| # | # | ||||||
|  |  | ||||||
|  | @ -362,6 +362,8 @@ Extension Modules | ||||||
| Library | Library | ||||||
| ------- | ------- | ||||||
| 
 | 
 | ||||||
|  | - bpo-24484: Avoid race condition in multiprocessing cleanup. | ||||||
|  | 
 | ||||||
| - bpo-30589: Fix multiprocessing.Process.exitcode to return the opposite | - bpo-30589: Fix multiprocessing.Process.exitcode to return the opposite | ||||||
|   of the signal number when the process is killed by a signal (instead |   of the signal number when the process is killed by a signal (instead | ||||||
|   of 255) when using the "forkserver" method. |   of 255) when using the "forkserver" method. | ||||||
|  |  | ||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue
	
	 Antoine Pitrou
						Antoine Pitrou