mirror of
				https://github.com/python/cpython.git
				synced 2025-10-31 13:41:24 +00:00 
			
		
		
		
	Merged revisions 87710 via svnmerge from
svn+ssh://pythondev@svn.python.org/python/branches/py3k ........ r87710 | gregory.p.smith | 2011-01-03 13:06:12 -0800 (Mon, 03 Jan 2011) | 4 lines issue6643 - Two locks held within the threading module on each thread instance needed to be reinitialized after fork(). Adds tests to confirm that they are and that a potential deadlock and crasher bug are fixed (platform dependant). ........
This commit is contained in:
		
							parent
							
								
									c3a4787ccb
								
							
						
					
					
						commit
						4b129d23f6
					
				
					 3 changed files with 155 additions and 4 deletions
				
			
		|  | @ -2,6 +2,7 @@ | ||||||
| 
 | 
 | ||||||
| import test.support | import test.support | ||||||
| from test.support import verbose | from test.support import verbose | ||||||
|  | import os | ||||||
| import random | import random | ||||||
| import re | import re | ||||||
| import sys | import sys | ||||||
|  | @ -10,6 +11,7 @@ | ||||||
| import time | import time | ||||||
| import unittest | import unittest | ||||||
| import weakref | import weakref | ||||||
|  | import subprocess | ||||||
| 
 | 
 | ||||||
| from test import lock_tests | from test import lock_tests | ||||||
| 
 | 
 | ||||||
|  | @ -247,7 +249,6 @@ def test_finalize_runnning_thread(self): | ||||||
|                 print("test_finalize_with_runnning_thread can't import ctypes") |                 print("test_finalize_with_runnning_thread can't import ctypes") | ||||||
|             return  # can't do anything |             return  # can't do anything | ||||||
| 
 | 
 | ||||||
|         import subprocess |  | ||||||
|         rc = subprocess.call([sys.executable, "-c", """if 1: |         rc = subprocess.call([sys.executable, "-c", """if 1: | ||||||
|             import ctypes, sys, time, _thread |             import ctypes, sys, time, _thread | ||||||
| 
 | 
 | ||||||
|  | @ -278,7 +279,6 @@ def waitingThread(): | ||||||
|     def test_finalize_with_trace(self): |     def test_finalize_with_trace(self): | ||||||
|         # Issue1733757 |         # Issue1733757 | ||||||
|         # Avoid a deadlock when sys.settrace steps into threading._shutdown |         # Avoid a deadlock when sys.settrace steps into threading._shutdown | ||||||
|         import subprocess |  | ||||||
|         p = subprocess.Popen([sys.executable, "-c", """if 1: |         p = subprocess.Popen([sys.executable, "-c", """if 1: | ||||||
|             import sys, threading |             import sys, threading | ||||||
| 
 | 
 | ||||||
|  | @ -311,7 +311,6 @@ def func(frame, event, arg): | ||||||
|     def test_join_nondaemon_on_shutdown(self): |     def test_join_nondaemon_on_shutdown(self): | ||||||
|         # Issue 1722344 |         # Issue 1722344 | ||||||
|         # Raising SystemExit skipped threading._shutdown |         # Raising SystemExit skipped threading._shutdown | ||||||
|         import subprocess |  | ||||||
|         p = subprocess.Popen([sys.executable, "-c", """if 1: |         p = subprocess.Popen([sys.executable, "-c", """if 1: | ||||||
|                 import threading |                 import threading | ||||||
|                 from time import sleep |                 from time import sleep | ||||||
|  | @ -412,7 +411,6 @@ def joiningfunc(mainthread): | ||||||
|                 sys.stdout.flush() |                 sys.stdout.flush() | ||||||
|         \n""" + script |         \n""" + script | ||||||
| 
 | 
 | ||||||
|         import subprocess |  | ||||||
|         p = subprocess.Popen([sys.executable, "-c", script], stdout=subprocess.PIPE) |         p = subprocess.Popen([sys.executable, "-c", script], stdout=subprocess.PIPE) | ||||||
|         rc = p.wait() |         rc = p.wait() | ||||||
|         data = p.stdout.read().decode().replace('\r', '') |         data = p.stdout.read().decode().replace('\r', '') | ||||||
|  | @ -484,6 +482,152 @@ def worker(): | ||||||
|             """ |             """ | ||||||
|         self._run_and_join(script) |         self._run_and_join(script) | ||||||
| 
 | 
 | ||||||
|  |     def assertScriptHasOutput(self, script, expected_output): | ||||||
|  |         p = subprocess.Popen([sys.executable, "-c", script], | ||||||
|  |                              stdout=subprocess.PIPE) | ||||||
|  |         rc = p.wait() | ||||||
|  |         data = p.stdout.read().decode().replace('\r', '') | ||||||
|  |         self.assertEqual(rc, 0, "Unexpected error") | ||||||
|  |         self.assertEqual(data, expected_output) | ||||||
|  | 
 | ||||||
|  |     @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()") | ||||||
|  |     def test_4_joining_across_fork_in_worker_thread(self): | ||||||
|  |         # There used to be a possible deadlock when forking from a child | ||||||
|  |         # thread.  See http://bugs.python.org/issue6643. | ||||||
|  | 
 | ||||||
|  |         # Skip platforms with known problems forking from a worker thread. | ||||||
|  |         # See http://bugs.python.org/issue3863. | ||||||
|  |         if sys.platform in ('freebsd4', 'freebsd5', 'freebsd6', 'os2emx'): | ||||||
|  |             raise unittest.SkipTest('due to known OS bugs on ' + sys.platform) | ||||||
|  | 
 | ||||||
|  |         # The script takes the following steps: | ||||||
|  |         # - The main thread in the parent process starts a new thread and then | ||||||
|  |         #   tries to join it. | ||||||
|  |         # - The join operation acquires the Lock inside the thread's _block | ||||||
|  |         #   Condition.  (See threading.py:Thread.join().) | ||||||
|  |         # - We stub out the acquire method on the condition to force it to wait | ||||||
|  |         #   until the child thread forks.  (See LOCK ACQUIRED HERE) | ||||||
|  |         # - The child thread forks.  (See LOCK HELD and WORKER THREAD FORKS | ||||||
|  |         #   HERE) | ||||||
|  |         # - The main thread of the parent process enters Condition.wait(), | ||||||
|  |         #   which releases the lock on the child thread. | ||||||
|  |         # - The child process returns.  Without the necessary fix, when the | ||||||
|  |         #   main thread of the child process (which used to be the child thread | ||||||
|  |         #   in the parent process) attempts to exit, it will try to acquire the | ||||||
|  |         #   lock in the Thread._block Condition object and hang, because the | ||||||
|  |         #   lock was held across the fork. | ||||||
|  | 
 | ||||||
|  |         script = """if 1: | ||||||
|  |             import os, time, threading | ||||||
|  | 
 | ||||||
|  |             finish_join = False | ||||||
|  |             start_fork = False | ||||||
|  | 
 | ||||||
|  |             def worker(): | ||||||
|  |                 # Wait until this thread's lock is acquired before forking to | ||||||
|  |                 # create the deadlock. | ||||||
|  |                 global finish_join | ||||||
|  |                 while not start_fork: | ||||||
|  |                     time.sleep(0.01) | ||||||
|  |                 # LOCK HELD: Main thread holds lock across this call. | ||||||
|  |                 childpid = os.fork() | ||||||
|  |                 finish_join = True | ||||||
|  |                 if childpid != 0: | ||||||
|  |                     # Parent process just waits for child. | ||||||
|  |                     os.waitpid(childpid, 0) | ||||||
|  |                 # Child process should just return. | ||||||
|  | 
 | ||||||
|  |             w = threading.Thread(target=worker) | ||||||
|  | 
 | ||||||
|  |             # Stub out the private condition variable's lock acquire method. | ||||||
|  |             # This acquires the lock and then waits until the child has forked | ||||||
|  |             # before returning, which will release the lock soon after.  If | ||||||
|  |             # someone else tries to fix this test case by acquiring this lock | ||||||
|  |             # before forking instead of reseting it, the test case will | ||||||
|  |             # deadlock when it shouldn't. | ||||||
|  |             condition = w._block | ||||||
|  |             orig_acquire = condition.acquire | ||||||
|  |             call_count_lock = threading.Lock() | ||||||
|  |             call_count = 0 | ||||||
|  |             def my_acquire(): | ||||||
|  |                 global call_count | ||||||
|  |                 global start_fork | ||||||
|  |                 orig_acquire()  # LOCK ACQUIRED HERE | ||||||
|  |                 start_fork = True | ||||||
|  |                 if call_count == 0: | ||||||
|  |                     while not finish_join: | ||||||
|  |                         time.sleep(0.01)  # WORKER THREAD FORKS HERE | ||||||
|  |                 with call_count_lock: | ||||||
|  |                     call_count += 1 | ||||||
|  |             condition.acquire = my_acquire | ||||||
|  | 
 | ||||||
|  |             w.start() | ||||||
|  |             w.join() | ||||||
|  |             print('end of main') | ||||||
|  |             """ | ||||||
|  |         self.assertScriptHasOutput(script, "end of main\n") | ||||||
|  | 
 | ||||||
|  |     @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()") | ||||||
|  |     def test_5_clear_waiter_locks_to_avoid_crash(self): | ||||||
|  |         # Check that a spawned thread that forks doesn't segfault on certain | ||||||
|  |         # platforms, namely OS X.  This used to happen if there was a waiter | ||||||
|  |         # lock in the thread's condition variable's waiters list.  Even though | ||||||
|  |         # we know the lock will be held across the fork, it is not safe to | ||||||
|  |         # release locks held across forks on all platforms, so releasing the | ||||||
|  |         # waiter lock caused a segfault on OS X.  Furthermore, since locks on | ||||||
|  |         # OS X are (as of this writing) implemented with a mutex + condition | ||||||
|  |         # variable instead of a semaphore, while we know that the Python-level | ||||||
|  |         # lock will be acquired, we can't know if the internal mutex will be | ||||||
|  |         # acquired at the time of the fork. | ||||||
|  | 
 | ||||||
|  |         # Skip platforms with known problems forking from a worker thread. | ||||||
|  |         # See http://bugs.python.org/issue3863. | ||||||
|  |         if sys.platform in ('freebsd4', 'freebsd5', 'freebsd6', 'os2emx'): | ||||||
|  |             raise unittest.SkipTest('due to known OS bugs on ' + sys.platform) | ||||||
|  |         script = """if True: | ||||||
|  |             import os, time, threading | ||||||
|  | 
 | ||||||
|  |             start_fork = False | ||||||
|  | 
 | ||||||
|  |             def worker(): | ||||||
|  |                 # Wait until the main thread has attempted to join this thread | ||||||
|  |                 # before continuing. | ||||||
|  |                 while not start_fork: | ||||||
|  |                     time.sleep(0.01) | ||||||
|  |                 childpid = os.fork() | ||||||
|  |                 if childpid != 0: | ||||||
|  |                     # Parent process just waits for child. | ||||||
|  |                     (cpid, rc) = os.waitpid(childpid, 0) | ||||||
|  |                     assert cpid == childpid | ||||||
|  |                     assert rc == 0 | ||||||
|  |                     print('end of worker thread') | ||||||
|  |                 else: | ||||||
|  |                     # Child process should just return. | ||||||
|  |                     pass | ||||||
|  | 
 | ||||||
|  |             w = threading.Thread(target=worker) | ||||||
|  | 
 | ||||||
|  |             # Stub out the private condition variable's _release_save method. | ||||||
|  |             # This releases the condition's lock and flips the global that | ||||||
|  |             # causes the worker to fork.  At this point, the problematic waiter | ||||||
|  |             # lock has been acquired once by the waiter and has been put onto | ||||||
|  |             # the waiters list. | ||||||
|  |             condition = w._block | ||||||
|  |             orig_release_save = condition._release_save | ||||||
|  |             def my_release_save(): | ||||||
|  |                 global start_fork | ||||||
|  |                 orig_release_save() | ||||||
|  |                 # Waiter lock held here, condition lock released. | ||||||
|  |                 start_fork = True | ||||||
|  |             condition._release_save = my_release_save | ||||||
|  | 
 | ||||||
|  |             w.start() | ||||||
|  |             w.join() | ||||||
|  |             print('end of main thread') | ||||||
|  |             """ | ||||||
|  |         output = "end of worker thread\nend of main thread\n" | ||||||
|  |         self.assertScriptHasOutput(script, output) | ||||||
|  | 
 | ||||||
| 
 | 
 | ||||||
| class ThreadingExceptionTests(unittest.TestCase): | class ThreadingExceptionTests(unittest.TestCase): | ||||||
|     # A RuntimeError should be raised if Thread.start() is called |     # A RuntimeError should be raised if Thread.start() is called | ||||||
|  |  | ||||||
|  | @ -856,6 +856,10 @@ def _after_fork(): | ||||||
|                 # its new value since it can have changed. |                 # its new value since it can have changed. | ||||||
|                 ident = _get_ident() |                 ident = _get_ident() | ||||||
|                 thread._ident = ident |                 thread._ident = ident | ||||||
|  |                 # Any condition variables hanging off of the active thread may | ||||||
|  |                 # be in an invalid state, so we reinitialize them. | ||||||
|  |                 thread._block.__init__() | ||||||
|  |                 thread._started._cond.__init__() | ||||||
|                 new_active[ident] = thread |                 new_active[ident] = thread | ||||||
|             else: |             else: | ||||||
|                 # All the others are already stopped. |                 # All the others are already stopped. | ||||||
|  |  | ||||||
|  | @ -27,6 +27,9 @@ Core and Builtins | ||||||
| Library | Library | ||||||
| ------- | ------- | ||||||
| 
 | 
 | ||||||
|  | - Issue #6643: Reinitialize locks held within the threading module after fork | ||||||
|  |   to avoid a potential rare deadlock or crash on some platforms. | ||||||
|  | 
 | ||||||
| - Issue #10806, issue #9905: Fix subprocess pipes when some of the standard | - Issue #10806, issue #9905: Fix subprocess pipes when some of the standard | ||||||
|   file descriptors (0, 1, 2) are closed in the parent process.  Initial |   file descriptors (0, 1, 2) are closed in the parent process.  Initial | ||||||
|   patch by Ross Lagerwall. |   patch by Ross Lagerwall. | ||||||
|  |  | ||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue
	
	 Gregory P. Smith
						Gregory P. Smith