mirror of
				https://github.com/python/cpython.git
				synced 2025-10-30 21:21:22 +00:00 
			
		
		
		
	Make time.sleep calls go to 0 for common testing.
This commit is contained in:
		
							parent
							
								
									e6b7033e79
								
							
						
					
					
						commit
						13da5fa999
					
				
					 2 changed files with 19 additions and 11 deletions
				
			
		|  | @ -12,6 +12,8 @@ | ||||||
| import unittest | import unittest | ||||||
| from test import test_support | from test import test_support | ||||||
| 
 | 
 | ||||||
|  | DELAY = 0 # Set > 0 when testing a module other than dummy_thread, such as | ||||||
|  |           # the 'thread' module. | ||||||
| 
 | 
 | ||||||
| class LockTests(unittest.TestCase): | class LockTests(unittest.TestCase): | ||||||
|     """Test lock objects.""" |     """Test lock objects.""" | ||||||
|  | @ -67,18 +69,17 @@ def delay_unlock(to_unlock, delay): | ||||||
|             to_unlock.release() |             to_unlock.release() | ||||||
| 
 | 
 | ||||||
|         self.lock.acquire() |         self.lock.acquire() | ||||||
|         delay = 1  #In seconds |  | ||||||
|         start_time = int(time.time()) |         start_time = int(time.time()) | ||||||
|         _thread.start_new_thread(delay_unlock,(self.lock, delay)) |         _thread.start_new_thread(delay_unlock,(self.lock, DELAY)) | ||||||
|         if test_support.verbose: |         if test_support.verbose: | ||||||
|             print |             print | ||||||
|             print "*** Waiting for thread to release the lock "\ |             print "*** Waiting for thread to release the lock "\ | ||||||
|             "(approx. %s sec.) ***" % delay |             "(approx. %s sec.) ***" % DELAY | ||||||
|         self.lock.acquire() |         self.lock.acquire() | ||||||
|         end_time = int(time.time()) |         end_time = int(time.time()) | ||||||
|         if test_support.verbose: |         if test_support.verbose: | ||||||
|             print "done" |             print "done" | ||||||
|         self.failUnless((end_time - start_time) >= delay, |         self.failUnless((end_time - start_time) >= DELAY, | ||||||
|                         "Blocking by unconditional acquiring failed.") |                         "Blocking by unconditional acquiring failed.") | ||||||
| 
 | 
 | ||||||
| class MiscTests(unittest.TestCase): | class MiscTests(unittest.TestCase): | ||||||
|  | @ -134,26 +135,30 @@ def queue_mark(queue, delay): | ||||||
|             queue.put(_thread.get_ident()) |             queue.put(_thread.get_ident()) | ||||||
| 
 | 
 | ||||||
|         thread_count = 5 |         thread_count = 5 | ||||||
|         delay = 1.5 |  | ||||||
|         testing_queue = Queue.Queue(thread_count) |         testing_queue = Queue.Queue(thread_count) | ||||||
|         if test_support.verbose: |         if test_support.verbose: | ||||||
|             print |             print | ||||||
|             print "*** Testing multiple thread creation "\ |             print "*** Testing multiple thread creation "\ | ||||||
|             "(will take approx. %s to %s sec.) ***" % (delay, thread_count) |             "(will take approx. %s to %s sec.) ***" % (DELAY, thread_count) | ||||||
|         for count in xrange(thread_count): |         for count in xrange(thread_count): | ||||||
|  |             if DELAY: | ||||||
|  |                 local_delay = round(random.random(), 1) | ||||||
|  |             else: | ||||||
|  |                 local_delay = 0 | ||||||
|             _thread.start_new_thread(queue_mark, |             _thread.start_new_thread(queue_mark, | ||||||
|                                      (testing_queue, round(random.random(), 1))) |                                      (testing_queue, local_delay)) | ||||||
|         time.sleep(delay) |         time.sleep(DELAY) | ||||||
|         if test_support.verbose: |         if test_support.verbose: | ||||||
|             print 'done' |             print 'done' | ||||||
|         self.failUnless(testing_queue.qsize() == thread_count, |         self.failUnless(testing_queue.qsize() == thread_count, | ||||||
|                         "Not all %s threads executed properly after %s sec." % |                         "Not all %s threads executed properly after %s sec." % | ||||||
|                         (thread_count, delay)) |                         (thread_count, DELAY)) | ||||||
| 
 | 
 | ||||||
| def test_main(imported_module=None): | def test_main(imported_module=None): | ||||||
|     global _thread |     global _thread, DELAY | ||||||
|     if imported_module: |     if imported_module: | ||||||
|         _thread = imported_module |         _thread = imported_module | ||||||
|  |         DELAY = 2 | ||||||
|     if test_support.verbose: |     if test_support.verbose: | ||||||
|         print |         print | ||||||
|         print "*** Using %s as _thread module ***" % _thread |         print "*** Using %s as _thread module ***" % _thread | ||||||
|  |  | ||||||
|  | @ -12,7 +12,10 @@ class TestThread(_threading.Thread): | ||||||
| 
 | 
 | ||||||
|     def run(self): |     def run(self): | ||||||
|         global running |         global running | ||||||
|         delay = random.random() * 2 |         # Uncomment if testing another module, such as the real 'threading' | ||||||
|  |         # module. | ||||||
|  |         #delay = random.random() * 2 | ||||||
|  |         delay = 0 | ||||||
|         if verbose: |         if verbose: | ||||||
|             print 'task', self.getName(), 'will run for', delay, 'sec' |             print 'task', self.getName(), 'will run for', delay, 'sec' | ||||||
|         sema.acquire() |         sema.acquire() | ||||||
|  |  | ||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue
	
	 Brett Cannon
						Brett Cannon