| 
									
										
										
										
											2002-12-30 22:30:22 +00:00
										 |  |  | """Generic thread tests.
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | Meant to be used by dummy_thread and thread.  To allow for different modules | 
					
						
							|  |  |  | to be used, test_main() can be called with the module to use as the thread | 
					
						
							|  |  |  | implementation as its sole argument. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | """
 | 
					
						
							|  |  |  | import dummy_thread as _thread | 
					
						
							|  |  |  | import time | 
					
						
							|  |  |  | import Queue | 
					
						
							|  |  |  | import random | 
					
						
							|  |  |  | import unittest | 
					
						
							|  |  |  | from test import test_support | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2003-04-30 03:03:37 +00:00
										 |  |  | DELAY = 0 # Set > 0 when testing a module other than dummy_thread, such as | 
					
						
							|  |  |  |           # the 'thread' module. | 
					
						
							| 
									
										
										
										
											2002-12-30 22:30:22 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  | class LockTests(unittest.TestCase): | 
					
						
							|  |  |  |     """Test lock objects.""" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def setUp(self): | 
					
						
							|  |  |  |         # Create a lock | 
					
						
							|  |  |  |         self.lock = _thread.allocate_lock() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_initlock(self): | 
					
						
							|  |  |  |         #Make sure locks start locked | 
					
						
							|  |  |  |         self.failUnless(not self.lock.locked(), | 
					
						
							|  |  |  |                         "Lock object is not initialized unlocked.") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_release(self): | 
					
						
							|  |  |  |         # Test self.lock.release() | 
					
						
							|  |  |  |         self.lock.acquire() | 
					
						
							|  |  |  |         self.lock.release() | 
					
						
							|  |  |  |         self.failUnless(not self.lock.locked(), | 
					
						
							|  |  |  |                         "Lock object did not release properly.") | 
					
						
							| 
									
										
										
										
											2003-02-19 02:35:07 +00:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2002-12-30 22:30:22 +00:00
										 |  |  |     def test_improper_release(self): | 
					
						
							|  |  |  |         #Make sure release of an unlocked thread raises _thread.error | 
					
						
							|  |  |  |         self.failUnlessRaises(_thread.error, self.lock.release) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_cond_acquire_success(self): | 
					
						
							|  |  |  |         #Make sure the conditional acquiring of the lock works. | 
					
						
							|  |  |  |         self.failUnless(self.lock.acquire(0), | 
					
						
							|  |  |  |                         "Conditional acquiring of the lock failed.") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_cond_acquire_fail(self): | 
					
						
							|  |  |  |         #Test acquiring locked lock returns False | 
					
						
							|  |  |  |         self.lock.acquire(0) | 
					
						
							|  |  |  |         self.failUnless(not self.lock.acquire(0), | 
					
						
							|  |  |  |                         "Conditional acquiring of a locked lock incorrectly " | 
					
						
							|  |  |  |                          "succeeded.") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_uncond_acquire_success(self): | 
					
						
							|  |  |  |         #Make sure unconditional acquiring of a lock works. | 
					
						
							|  |  |  |         self.lock.acquire() | 
					
						
							|  |  |  |         self.failUnless(self.lock.locked(), | 
					
						
							|  |  |  |                         "Uncondional locking failed.") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_uncond_acquire_return_val(self): | 
					
						
							|  |  |  |         #Make sure that an unconditional locking returns True. | 
					
						
							|  |  |  |         self.failUnless(self.lock.acquire(1) is True, | 
					
						
							|  |  |  |                         "Unconditional locking did not return True.") | 
					
						
							| 
									
										
										
										
											2003-02-19 02:35:07 +00:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2002-12-30 22:30:22 +00:00
										 |  |  |     def test_uncond_acquire_blocking(self): | 
					
						
							|  |  |  |         #Make sure that unconditional acquiring of a locked lock blocks. | 
					
						
							|  |  |  |         def delay_unlock(to_unlock, delay): | 
					
						
							|  |  |  |             """Hold on to lock for a set amount of time before unlocking.""" | 
					
						
							|  |  |  |             time.sleep(delay) | 
					
						
							|  |  |  |             to_unlock.release() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.lock.acquire() | 
					
						
							|  |  |  |         start_time = int(time.time()) | 
					
						
							| 
									
										
										
										
											2003-04-30 03:03:37 +00:00
										 |  |  |         _thread.start_new_thread(delay_unlock,(self.lock, DELAY)) | 
					
						
							| 
									
										
										
										
											2002-12-30 22:30:22 +00:00
										 |  |  |         if test_support.verbose: | 
					
						
							|  |  |  |             print | 
					
						
							|  |  |  |             print "*** Waiting for thread to release the lock "\ | 
					
						
							| 
									
										
										
										
											2003-04-30 03:03:37 +00:00
										 |  |  |             "(approx. %s sec.) ***" % DELAY | 
					
						
							| 
									
										
										
										
											2002-12-30 22:30:22 +00:00
										 |  |  |         self.lock.acquire() | 
					
						
							|  |  |  |         end_time = int(time.time()) | 
					
						
							|  |  |  |         if test_support.verbose: | 
					
						
							|  |  |  |             print "done" | 
					
						
							| 
									
										
										
										
											2003-04-30 03:03:37 +00:00
										 |  |  |         self.failUnless((end_time - start_time) >= DELAY, | 
					
						
							| 
									
										
										
										
											2002-12-30 22:30:22 +00:00
										 |  |  |                         "Blocking by unconditional acquiring failed.") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class MiscTests(unittest.TestCase): | 
					
						
							|  |  |  |     """Miscellaneous tests.""" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_exit(self): | 
					
						
							|  |  |  |         #Make sure _thread.exit() raises SystemExit | 
					
						
							|  |  |  |         self.failUnlessRaises(SystemExit, _thread.exit) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_ident(self): | 
					
						
							|  |  |  |         #Test sanity of _thread.get_ident() | 
					
						
							|  |  |  |         self.failUnless(isinstance(_thread.get_ident(), int), | 
					
						
							|  |  |  |                         "_thread.get_ident() returned a non-integer") | 
					
						
							|  |  |  |         self.failUnless(_thread.get_ident() != 0, | 
					
						
							|  |  |  |                         "_thread.get_ident() returned 0") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_LockType(self): | 
					
						
							|  |  |  |         #Make sure _thread.LockType is the same type as _thread.allocate_locke() | 
					
						
							|  |  |  |         self.failUnless(isinstance(_thread.allocate_lock(), _thread.LockType), | 
					
						
							|  |  |  |                         "_thread.LockType is not an instance of what is " | 
					
						
							|  |  |  |                          "returned by _thread.allocate_lock()") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class ThreadTests(unittest.TestCase): | 
					
						
							|  |  |  |     """Test thread creation.""" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_arg_passing(self): | 
					
						
							|  |  |  |         #Make sure that parameter passing works. | 
					
						
							|  |  |  |         def arg_tester(queue, arg1=False, arg2=False): | 
					
						
							|  |  |  |             """Use to test _thread.start_new_thread() passes args properly.""" | 
					
						
							|  |  |  |             queue.put((arg1, arg2)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         testing_queue = Queue.Queue(1) | 
					
						
							|  |  |  |         _thread.start_new_thread(arg_tester, (testing_queue, True, True)) | 
					
						
							|  |  |  |         result = testing_queue.get() | 
					
						
							|  |  |  |         self.failUnless(result[0] and result[1], | 
					
						
							|  |  |  |                         "Argument passing for thread creation using tuple failed") | 
					
						
							|  |  |  |         _thread.start_new_thread(arg_tester, tuple(), {'queue':testing_queue, | 
					
						
							|  |  |  |                                                        'arg1':True, 'arg2':True}) | 
					
						
							|  |  |  |         result = testing_queue.get() | 
					
						
							|  |  |  |         self.failUnless(result[0] and result[1], | 
					
						
							|  |  |  |                         "Argument passing for thread creation using kwargs failed") | 
					
						
							|  |  |  |         _thread.start_new_thread(arg_tester, (testing_queue, True), {'arg2':True}) | 
					
						
							|  |  |  |         result = testing_queue.get() | 
					
						
							|  |  |  |         self.failUnless(result[0] and result[1], | 
					
						
							|  |  |  |                         "Argument passing for thread creation using both tuple" | 
					
						
							|  |  |  |                         " and kwargs failed") | 
					
						
							| 
									
										
										
										
											2003-02-19 02:35:07 +00:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2002-12-30 22:30:22 +00:00
										 |  |  |     def test_multi_creation(self): | 
					
						
							|  |  |  |         #Make sure multiple threads can be created. | 
					
						
							|  |  |  |         def queue_mark(queue, delay): | 
					
						
							|  |  |  |             """Wait for ``delay`` seconds and then put something into ``queue``""" | 
					
						
							|  |  |  |             time.sleep(delay) | 
					
						
							|  |  |  |             queue.put(_thread.get_ident()) | 
					
						
							| 
									
										
										
										
											2003-02-19 02:35:07 +00:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2002-12-30 22:30:22 +00:00
										 |  |  |         thread_count = 5 | 
					
						
							|  |  |  |         testing_queue = Queue.Queue(thread_count) | 
					
						
							|  |  |  |         if test_support.verbose: | 
					
						
							|  |  |  |             print | 
					
						
							|  |  |  |             print "*** Testing multiple thread creation "\ | 
					
						
							| 
									
										
										
										
											2003-04-30 03:03:37 +00:00
										 |  |  |             "(will take approx. %s to %s sec.) ***" % (DELAY, thread_count) | 
					
						
							| 
									
										
										
										
											2002-12-30 22:30:22 +00:00
										 |  |  |         for count in xrange(thread_count): | 
					
						
							| 
									
										
										
										
											2003-04-30 03:03:37 +00:00
										 |  |  |             if DELAY: | 
					
						
							|  |  |  |                 local_delay = round(random.random(), 1) | 
					
						
							|  |  |  |             else: | 
					
						
							|  |  |  |                 local_delay = 0 | 
					
						
							| 
									
										
										
										
											2002-12-30 22:30:22 +00:00
										 |  |  |             _thread.start_new_thread(queue_mark, | 
					
						
							| 
									
										
										
										
											2003-04-30 03:03:37 +00:00
										 |  |  |                                      (testing_queue, local_delay)) | 
					
						
							|  |  |  |         time.sleep(DELAY) | 
					
						
							| 
									
										
										
										
											2002-12-30 22:30:22 +00:00
										 |  |  |         if test_support.verbose: | 
					
						
							|  |  |  |             print 'done' | 
					
						
							|  |  |  |         self.failUnless(testing_queue.qsize() == thread_count, | 
					
						
							| 
									
										
										
										
											2003-02-19 02:35:07 +00:00
										 |  |  |                         "Not all %s threads executed properly after %s sec." % | 
					
						
							| 
									
										
										
										
											2003-04-30 03:03:37 +00:00
										 |  |  |                         (thread_count, DELAY)) | 
					
						
							| 
									
										
										
										
											2002-12-30 22:30:22 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  | def test_main(imported_module=None): | 
					
						
							| 
									
										
										
										
											2003-04-30 03:03:37 +00:00
										 |  |  |     global _thread, DELAY | 
					
						
							| 
									
										
										
										
											2002-12-30 22:30:22 +00:00
										 |  |  |     if imported_module: | 
					
						
							|  |  |  |         _thread = imported_module | 
					
						
							| 
									
										
										
										
											2003-04-30 03:03:37 +00:00
										 |  |  |         DELAY = 2 | 
					
						
							| 
									
										
										
										
											2002-12-30 22:30:22 +00:00
										 |  |  |     if test_support.verbose: | 
					
						
							|  |  |  |         print | 
					
						
							|  |  |  |         print "*** Using %s as _thread module ***" % _thread | 
					
						
							| 
									
										
										
										
											2003-05-01 17:45:56 +00:00
										 |  |  |     test_support.run_unittest(LockTests, MiscTests, ThreadTests) | 
					
						
							| 
									
										
										
										
											2002-12-30 22:30:22 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  | if __name__ == '__main__': | 
					
						
							|  |  |  |     test_main() |