mirror of
				https://github.com/python/cpython.git
				synced 2025-10-25 18:54:53 +00:00 
			
		
		
		
	 aefa7ebf0f
			
		
	
	
		aefa7ebf0f
		
	
	
	
	
		
			
			* bpo-6532: Make the thread id an unsigned integer. From C API side the type of results of PyThread_start_new_thread() and PyThread_get_thread_ident(), the id parameter of PyThreadState_SetAsyncExc(), and the thread_id field of PyThreadState changed from "long" to "unsigned long". * Restore a check in thread_get_ident().
		
			
				
	
	
		
			255 lines
		
	
	
	
		
			9.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			255 lines
		
	
	
	
		
			9.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import _dummy_thread as _thread
 | |
| import time
 | |
| import queue
 | |
| import random
 | |
| import unittest
 | |
| from test import support
 | |
| from unittest import mock
 | |
| 
 | |
| DELAY = 0
 | |
| 
 | |
| 
 | |
| class LockTests(unittest.TestCase):
 | |
|     """Test lock objects."""
 | |
| 
 | |
|     def setUp(self):
 | |
|         # Create a lock
 | |
|         self.lock = _thread.allocate_lock()
 | |
| 
 | |
|     def test_initlock(self):
 | |
|         #Make sure locks start locked
 | |
|         self.assertFalse(self.lock.locked(),
 | |
|                         "Lock object is not initialized unlocked.")
 | |
| 
 | |
|     def test_release(self):
 | |
|         # Test self.lock.release()
 | |
|         self.lock.acquire()
 | |
|         self.lock.release()
 | |
|         self.assertFalse(self.lock.locked(),
 | |
|                         "Lock object did not release properly.")
 | |
| 
 | |
|     def test_LockType_context_manager(self):
 | |
|         with _thread.LockType():
 | |
|             pass
 | |
|         self.assertFalse(self.lock.locked(),
 | |
|                          "Acquired Lock was not released")
 | |
| 
 | |
|     def test_improper_release(self):
 | |
|         #Make sure release of an unlocked thread raises RuntimeError
 | |
|         self.assertRaises(RuntimeError, self.lock.release)
 | |
| 
 | |
|     def test_cond_acquire_success(self):
 | |
|         #Make sure the conditional acquiring of the lock works.
 | |
|         self.assertTrue(self.lock.acquire(0),
 | |
|                         "Conditional acquiring of the lock failed.")
 | |
| 
 | |
|     def test_cond_acquire_fail(self):
 | |
|         #Test acquiring locked lock returns False
 | |
|         self.lock.acquire(0)
 | |
|         self.assertFalse(self.lock.acquire(0),
 | |
|                         "Conditional acquiring of a locked lock incorrectly "
 | |
|                          "succeeded.")
 | |
| 
 | |
|     def test_uncond_acquire_success(self):
 | |
|         #Make sure unconditional acquiring of a lock works.
 | |
|         self.lock.acquire()
 | |
|         self.assertTrue(self.lock.locked(),
 | |
|                         "Uncondional locking failed.")
 | |
| 
 | |
|     def test_uncond_acquire_return_val(self):
 | |
|         #Make sure that an unconditional locking returns True.
 | |
|         self.assertIs(self.lock.acquire(1), True,
 | |
|                         "Unconditional locking did not return True.")
 | |
|         self.assertIs(self.lock.acquire(), True)
 | |
| 
 | |
|     def test_uncond_acquire_blocking(self):
 | |
|         #Make sure that unconditional acquiring of a locked lock blocks.
 | |
|         def delay_unlock(to_unlock, delay):
 | |
|             """Hold on to lock for a set amount of time before unlocking."""
 | |
|             time.sleep(delay)
 | |
|             to_unlock.release()
 | |
| 
 | |
|         self.lock.acquire()
 | |
|         start_time = int(time.time())
 | |
|         _thread.start_new_thread(delay_unlock,(self.lock, DELAY))
 | |
|         if support.verbose:
 | |
|             print()
 | |
|             print("*** Waiting for thread to release the lock "\
 | |
|             "(approx. %s sec.) ***" % DELAY)
 | |
|         self.lock.acquire()
 | |
|         end_time = int(time.time())
 | |
|         if support.verbose:
 | |
|             print("done")
 | |
|         self.assertGreaterEqual(end_time - start_time, DELAY,
 | |
|                         "Blocking by unconditional acquiring failed.")
 | |
| 
 | |
|     @mock.patch('time.sleep')
 | |
|     def test_acquire_timeout(self, mock_sleep):
 | |
|         """Test invoking acquire() with a positive timeout when the lock is
 | |
|         already acquired. Ensure that time.sleep() is invoked with the given
 | |
|         timeout and that False is returned."""
 | |
| 
 | |
|         self.lock.acquire()
 | |
|         retval = self.lock.acquire(waitflag=0, timeout=1)
 | |
|         self.assertTrue(mock_sleep.called)
 | |
|         mock_sleep.assert_called_once_with(1)
 | |
|         self.assertEqual(retval, False)
 | |
| 
 | |
|     def test_lock_representation(self):
 | |
|         self.lock.acquire()
 | |
|         self.assertIn("locked", repr(self.lock))
 | |
|         self.lock.release()
 | |
|         self.assertIn("unlocked", repr(self.lock))
 | |
| 
 | |
| 
 | |
| class MiscTests(unittest.TestCase):
 | |
|     """Miscellaneous tests."""
 | |
| 
 | |
|     def test_exit(self):
 | |
|         self.assertRaises(SystemExit, _thread.exit)
 | |
| 
 | |
|     def test_ident(self):
 | |
|         self.assertIsInstance(_thread.get_ident(), int,
 | |
|                               "_thread.get_ident() returned a non-integer")
 | |
|         self.assertGreater(_thread.get_ident(), 0)
 | |
| 
 | |
|     def test_LockType(self):
 | |
|         self.assertIsInstance(_thread.allocate_lock(), _thread.LockType,
 | |
|                               "_thread.LockType is not an instance of what "
 | |
|                               "is returned by _thread.allocate_lock()")
 | |
| 
 | |
|     def test_set_sentinel(self):
 | |
|         self.assertIsInstance(_thread._set_sentinel(), _thread.LockType,
 | |
|                               "_thread._set_sentinel() did not return a "
 | |
|                               "LockType instance.")
 | |
| 
 | |
|     def test_interrupt_main(self):
 | |
|         #Calling start_new_thread with a function that executes interrupt_main
 | |
|         # should raise KeyboardInterrupt upon completion.
 | |
|         def call_interrupt():
 | |
|             _thread.interrupt_main()
 | |
| 
 | |
|         self.assertRaises(KeyboardInterrupt,
 | |
|                           _thread.start_new_thread,
 | |
|                           call_interrupt,
 | |
|                           tuple())
 | |
| 
 | |
|     def test_interrupt_in_main(self):
 | |
|         self.assertRaises(KeyboardInterrupt, _thread.interrupt_main)
 | |
| 
 | |
|     def test_stack_size_None(self):
 | |
|         retval = _thread.stack_size(None)
 | |
|         self.assertEqual(retval, 0)
 | |
| 
 | |
|     def test_stack_size_not_None(self):
 | |
|         with self.assertRaises(_thread.error) as cm:
 | |
|             _thread.stack_size("")
 | |
|         self.assertEqual(cm.exception.args[0],
 | |
|                          "setting thread stack size not supported")
 | |
| 
 | |
| 
 | |
| class ThreadTests(unittest.TestCase):
 | |
|     """Test thread creation."""
 | |
| 
 | |
|     def test_arg_passing(self):
 | |
|         #Make sure that parameter passing works.
 | |
|         def arg_tester(queue, arg1=False, arg2=False):
 | |
|             """Use to test _thread.start_new_thread() passes args properly."""
 | |
|             queue.put((arg1, arg2))
 | |
| 
 | |
|         testing_queue = queue.Queue(1)
 | |
|         _thread.start_new_thread(arg_tester, (testing_queue, True, True))
 | |
|         result = testing_queue.get()
 | |
|         self.assertTrue(result[0] and result[1],
 | |
|                         "Argument passing for thread creation "
 | |
|                         "using tuple failed")
 | |
| 
 | |
|         _thread.start_new_thread(
 | |
|                 arg_tester,
 | |
|                 tuple(),
 | |
|                 {'queue':testing_queue, 'arg1':True, 'arg2':True})
 | |
| 
 | |
|         result = testing_queue.get()
 | |
|         self.assertTrue(result[0] and result[1],
 | |
|                         "Argument passing for thread creation "
 | |
|                         "using kwargs failed")
 | |
| 
 | |
|         _thread.start_new_thread(
 | |
|                 arg_tester,
 | |
|                 (testing_queue, True),
 | |
|                 {'arg2':True})
 | |
| 
 | |
|         result = testing_queue.get()
 | |
|         self.assertTrue(result[0] and result[1],
 | |
|                         "Argument passing for thread creation using both tuple"
 | |
|                         " and kwargs failed")
 | |
| 
 | |
|     def test_multi_thread_creation(self):
 | |
|         def queue_mark(queue, delay):
 | |
|             time.sleep(delay)
 | |
|             queue.put(_thread.get_ident())
 | |
| 
 | |
|         thread_count = 5
 | |
|         testing_queue = queue.Queue(thread_count)
 | |
| 
 | |
|         if support.verbose:
 | |
|             print()
 | |
|             print("*** Testing multiple thread creation "
 | |
|                   "(will take approx. %s to %s sec.) ***" % (
 | |
|                     DELAY, thread_count))
 | |
| 
 | |
|         for count in range(thread_count):
 | |
|             if DELAY:
 | |
|                 local_delay = round(random.random(), 1)
 | |
|             else:
 | |
|                 local_delay = 0
 | |
|             _thread.start_new_thread(queue_mark,
 | |
|                                      (testing_queue, local_delay))
 | |
|         time.sleep(DELAY)
 | |
|         if support.verbose:
 | |
|             print('done')
 | |
|         self.assertEqual(testing_queue.qsize(), thread_count,
 | |
|                          "Not all %s threads executed properly "
 | |
|                          "after %s sec." % (thread_count, DELAY))
 | |
| 
 | |
|     def test_args_not_tuple(self):
 | |
|         """
 | |
|         Test invoking start_new_thread() with a non-tuple value for "args".
 | |
|         Expect TypeError with a meaningful error message to be raised.
 | |
|         """
 | |
|         with self.assertRaises(TypeError) as cm:
 | |
|             _thread.start_new_thread(mock.Mock(), [])
 | |
|         self.assertEqual(cm.exception.args[0], "2nd arg must be a tuple")
 | |
| 
 | |
|     def test_kwargs_not_dict(self):
 | |
|         """
 | |
|         Test invoking start_new_thread() with a non-dict value for "kwargs".
 | |
|         Expect TypeError with a meaningful error message to be raised.
 | |
|         """
 | |
|         with self.assertRaises(TypeError) as cm:
 | |
|             _thread.start_new_thread(mock.Mock(), tuple(), kwargs=[])
 | |
|         self.assertEqual(cm.exception.args[0], "3rd arg must be a dict")
 | |
| 
 | |
|     def test_SystemExit(self):
 | |
|         """
 | |
|         Test invoking start_new_thread() with a function that raises
 | |
|         SystemExit.
 | |
|         The exception should be discarded.
 | |
|         """
 | |
|         func = mock.Mock(side_effect=SystemExit())
 | |
|         try:
 | |
|             _thread.start_new_thread(func, tuple())
 | |
|         except SystemExit:
 | |
|             self.fail("start_new_thread raised SystemExit.")
 | |
| 
 | |
|     @mock.patch('traceback.print_exc')
 | |
|     def test_RaiseException(self, mock_print_exc):
 | |
|         """
 | |
|         Test invoking start_new_thread() with a function that raises exception.
 | |
| 
 | |
|         The exception should be discarded and the traceback should be printed
 | |
|         via traceback.print_exc()
 | |
|         """
 | |
|         func = mock.Mock(side_effect=Exception)
 | |
|         _thread.start_new_thread(func, tuple())
 | |
|         self.assertTrue(mock_print_exc.called)
 |