mirror of
				https://github.com/python/cpython.git
				synced 2025-10-31 21:51:50 +00:00 
			
		
		
		
	
		
			
	
	
		
			116 lines
		
	
	
	
		
			3.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
		
		
			
		
	
	
			116 lines
		
	
	
	
		
			3.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
|   | from importlib import _bootstrap | ||
|  | import time | ||
|  | import unittest | ||
|  | import weakref | ||
|  | 
 | ||
|  | from test import support | ||
|  | 
 | ||
|  | try: | ||
|  |     import threading | ||
|  | except ImportError: | ||
|  |     threading = None | ||
|  | else: | ||
|  |     from test import lock_tests | ||
|  | 
 | ||
|  | 
 | ||
|  | LockType = _bootstrap._ModuleLock | ||
|  | DeadlockError = _bootstrap._DeadlockError | ||
|  | 
 | ||
|  | 
 | ||
|  | if threading is not None: | ||
|  |     class ModuleLockAsRLockTests(lock_tests.RLockTests): | ||
|  |         locktype = staticmethod(lambda: LockType("some_lock")) | ||
|  | 
 | ||
|  |         # _is_owned() unsupported | ||
|  |         test__is_owned = None | ||
|  |         # acquire(blocking=False) unsupported | ||
|  |         test_try_acquire = None | ||
|  |         test_try_acquire_contended = None | ||
|  |         # `with` unsupported | ||
|  |         test_with = None | ||
|  |         # acquire(timeout=...) unsupported | ||
|  |         test_timeout = None | ||
|  |         # _release_save() unsupported | ||
|  |         test_release_save_unacquired = None | ||
|  | 
 | ||
|  | else: | ||
|  |     class ModuleLockAsRLockTests(unittest.TestCase): | ||
|  |         pass | ||
|  | 
 | ||
|  | 
 | ||
|  | @unittest.skipUnless(threading, "threads needed for this test") | ||
|  | class DeadlockAvoidanceTests(unittest.TestCase): | ||
|  | 
 | ||
|  |     def run_deadlock_avoidance_test(self, create_deadlock): | ||
|  |         NLOCKS = 10 | ||
|  |         locks = [LockType(str(i)) for i in range(NLOCKS)] | ||
|  |         pairs = [(locks[i], locks[(i+1)%NLOCKS]) for i in range(NLOCKS)] | ||
|  |         if create_deadlock: | ||
|  |             NTHREADS = NLOCKS | ||
|  |         else: | ||
|  |             NTHREADS = NLOCKS - 1 | ||
|  |         barrier = threading.Barrier(NTHREADS) | ||
|  |         results = [] | ||
|  |         def _acquire(lock): | ||
|  |             """Try to acquire the lock. Return True on success, False on deadlock.""" | ||
|  |             try: | ||
|  |                 lock.acquire() | ||
|  |             except DeadlockError: | ||
|  |                 return False | ||
|  |             else: | ||
|  |                 return True | ||
|  |         def f(): | ||
|  |             a, b = pairs.pop() | ||
|  |             ra = _acquire(a) | ||
|  |             barrier.wait() | ||
|  |             rb = _acquire(b) | ||
|  |             results.append((ra, rb)) | ||
|  |             if rb: | ||
|  |                 b.release() | ||
|  |             if ra: | ||
|  |                 a.release() | ||
|  |         lock_tests.Bunch(f, NTHREADS).wait_for_finished() | ||
|  |         self.assertEqual(len(results), NTHREADS) | ||
|  |         return results | ||
|  | 
 | ||
|  |     def test_deadlock(self): | ||
|  |         results = self.run_deadlock_avoidance_test(True) | ||
|  |         # One of the threads detected a potential deadlock on its second | ||
|  |         # acquire() call. | ||
|  |         self.assertEqual(results.count((True, False)), 1) | ||
|  |         self.assertEqual(results.count((True, True)), len(results) - 1) | ||
|  | 
 | ||
|  |     def test_no_deadlock(self): | ||
|  |         results = self.run_deadlock_avoidance_test(False) | ||
|  |         self.assertEqual(results.count((True, False)), 0) | ||
|  |         self.assertEqual(results.count((True, True)), len(results)) | ||
|  | 
 | ||
|  | 
 | ||
|  | class LifetimeTests(unittest.TestCase): | ||
|  | 
 | ||
|  |     def test_lock_lifetime(self): | ||
|  |         name = "xyzzy" | ||
|  |         self.assertNotIn(name, _bootstrap._module_locks) | ||
|  |         lock = _bootstrap._get_module_lock(name) | ||
|  |         self.assertIn(name, _bootstrap._module_locks) | ||
|  |         wr = weakref.ref(lock) | ||
|  |         del lock | ||
|  |         support.gc_collect() | ||
|  |         self.assertNotIn(name, _bootstrap._module_locks) | ||
|  |         self.assertIs(wr(), None) | ||
|  | 
 | ||
|  |     def test_all_locks(self): | ||
|  |         support.gc_collect() | ||
|  |         self.assertEqual(0, len(_bootstrap._module_locks)) | ||
|  | 
 | ||
|  | 
 | ||
|  | @support.reap_threads | ||
|  | def test_main(): | ||
|  |     support.run_unittest(ModuleLockAsRLockTests, | ||
|  |                          DeadlockAvoidanceTests, | ||
|  |                          LifetimeTests) | ||
|  | 
 | ||
|  | 
 | ||
|  | if __name__ == '__main__': | ||
|  |     test_main() |