mirror of
				https://github.com/python/cpython.git
				synced 2025-11-04 07:31:38 +00:00 
			
		
		
		
	svn+ssh://pythondev@svn.python.org/python/branches/p3yk ........ r55077 | guido.van.rossum | 2007-05-02 11:54:37 -0700 (Wed, 02 May 2007) | 2 lines Use the new print syntax, at least. ........ r55142 | fred.drake | 2007-05-04 21:27:30 -0700 (Fri, 04 May 2007) | 1 line remove old cruftiness ........ r55143 | fred.drake | 2007-05-04 21:52:16 -0700 (Fri, 04 May 2007) | 1 line make this work with the new Python ........ r55162 | neal.norwitz | 2007-05-06 22:29:18 -0700 (Sun, 06 May 2007) | 1 line Get asdl code gen working with Python 2.3. Should continue to work with 3.0 ........ r55164 | neal.norwitz | 2007-05-07 00:00:38 -0700 (Mon, 07 May 2007) | 1 line Verify checkins to p3yk (sic) branch go to 3000 list. ........ r55166 | neal.norwitz | 2007-05-07 00:12:35 -0700 (Mon, 07 May 2007) | 1 line Fix this test so it runs again by importing warnings_test properly. ........ r55167 | neal.norwitz | 2007-05-07 01:03:22 -0700 (Mon, 07 May 2007) | 8 lines So long xrange. range() now supports values that are outside -sys.maxint to sys.maxint. floats raise a TypeError. This has been sitting for a long time. It probably has some problems and needs cleanup. Objects/rangeobject.c now uses 4-space indents since it is almost completely new. ........ r55171 | guido.van.rossum | 2007-05-07 10:21:26 -0700 (Mon, 07 May 2007) | 4 lines Fix two tests that were previously depending on significant spaces at the end of a line (and before that on Python 2.x print behavior that has no exact equivalent in 3.0). ........
		
			
				
	
	
		
			181 lines
		
	
	
	
		
			7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			181 lines
		
	
	
	
		
			7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
"""Generic thread tests.
 | 
						|
 | 
						|
Meant to be used by dummy_thread and thread.  To allow for different modules
 | 
						|
to be used, test_main() can be called with the module to use as the thread
 | 
						|
implementation as its sole argument.
 | 
						|
 | 
						|
"""
 | 
						|
import dummy_thread as _thread
 | 
						|
import time
 | 
						|
import Queue
 | 
						|
import random
 | 
						|
import unittest
 | 
						|
from test import test_support
 | 
						|
 | 
						|
DELAY = 0 # Set > 0 when testing a module other than dummy_thread, such as
 | 
						|
          # the 'thread' module.
 | 
						|
 | 
						|
class LockTests(unittest.TestCase):
 | 
						|
    """Test lock objects."""
 | 
						|
 | 
						|
    def setUp(self):
 | 
						|
        # Create a lock
 | 
						|
        self.lock = _thread.allocate_lock()
 | 
						|
 | 
						|
    def test_initlock(self):
 | 
						|
        #Make sure locks start locked
 | 
						|
        self.failUnless(not self.lock.locked(),
 | 
						|
                        "Lock object is not initialized unlocked.")
 | 
						|
 | 
						|
    def test_release(self):
 | 
						|
        # Test self.lock.release()
 | 
						|
        self.lock.acquire()
 | 
						|
        self.lock.release()
 | 
						|
        self.failUnless(not self.lock.locked(),
 | 
						|
                        "Lock object did not release properly.")
 | 
						|
 | 
						|
    def test_improper_release(self):
 | 
						|
        #Make sure release of an unlocked thread raises _thread.error
 | 
						|
        self.failUnlessRaises(_thread.error, self.lock.release)
 | 
						|
 | 
						|
    def test_cond_acquire_success(self):
 | 
						|
        #Make sure the conditional acquiring of the lock works.
 | 
						|
        self.failUnless(self.lock.acquire(0),
 | 
						|
                        "Conditional acquiring of the lock failed.")
 | 
						|
 | 
						|
    def test_cond_acquire_fail(self):
 | 
						|
        #Test acquiring locked lock returns False
 | 
						|
        self.lock.acquire(0)
 | 
						|
        self.failUnless(not self.lock.acquire(0),
 | 
						|
                        "Conditional acquiring of a locked lock incorrectly "
 | 
						|
                         "succeeded.")
 | 
						|
 | 
						|
    def test_uncond_acquire_success(self):
 | 
						|
        #Make sure unconditional acquiring of a lock works.
 | 
						|
        self.lock.acquire()
 | 
						|
        self.failUnless(self.lock.locked(),
 | 
						|
                        "Uncondional locking failed.")
 | 
						|
 | 
						|
    def test_uncond_acquire_return_val(self):
 | 
						|
        #Make sure that an unconditional locking returns True.
 | 
						|
        self.failUnless(self.lock.acquire(1) is True,
 | 
						|
                        "Unconditional locking did not return True.")
 | 
						|
 | 
						|
    def test_uncond_acquire_blocking(self):
 | 
						|
        #Make sure that unconditional acquiring of a locked lock blocks.
 | 
						|
        def delay_unlock(to_unlock, delay):
 | 
						|
            """Hold on to lock for a set amount of time before unlocking."""
 | 
						|
            time.sleep(delay)
 | 
						|
            to_unlock.release()
 | 
						|
 | 
						|
        self.lock.acquire()
 | 
						|
        start_time = int(time.time())
 | 
						|
        _thread.start_new_thread(delay_unlock,(self.lock, DELAY))
 | 
						|
        if test_support.verbose:
 | 
						|
            print()
 | 
						|
            print("*** Waiting for thread to release the lock "\
 | 
						|
            "(approx. %s sec.) ***" % DELAY)
 | 
						|
        self.lock.acquire()
 | 
						|
        end_time = int(time.time())
 | 
						|
        if test_support.verbose:
 | 
						|
            print("done")
 | 
						|
        self.failUnless((end_time - start_time) >= DELAY,
 | 
						|
                        "Blocking by unconditional acquiring failed.")
 | 
						|
 | 
						|
class MiscTests(unittest.TestCase):
 | 
						|
    """Miscellaneous tests."""
 | 
						|
 | 
						|
    def test_exit(self):
 | 
						|
        #Make sure _thread.exit() raises SystemExit
 | 
						|
        self.failUnlessRaises(SystemExit, _thread.exit)
 | 
						|
 | 
						|
    def test_ident(self):
 | 
						|
        #Test sanity of _thread.get_ident()
 | 
						|
        self.failUnless(isinstance(_thread.get_ident(), int),
 | 
						|
                        "_thread.get_ident() returned a non-integer")
 | 
						|
        self.failUnless(_thread.get_ident() != 0,
 | 
						|
                        "_thread.get_ident() returned 0")
 | 
						|
 | 
						|
    def test_LockType(self):
 | 
						|
        #Make sure _thread.LockType is the same type as _thread.allocate_locke()
 | 
						|
        self.failUnless(isinstance(_thread.allocate_lock(), _thread.LockType),
 | 
						|
                        "_thread.LockType is not an instance of what is "
 | 
						|
                         "returned by _thread.allocate_lock()")
 | 
						|
 | 
						|
    def test_interrupt_main(self):
 | 
						|
        #Calling start_new_thread with a function that executes interrupt_main
 | 
						|
        # should raise KeyboardInterrupt upon completion.
 | 
						|
        def call_interrupt():
 | 
						|
            _thread.interrupt_main()
 | 
						|
        self.failUnlessRaises(KeyboardInterrupt, _thread.start_new_thread,
 | 
						|
                              call_interrupt, tuple())
 | 
						|
 | 
						|
    def test_interrupt_in_main(self):
 | 
						|
        # Make sure that if interrupt_main is called in main threat that
 | 
						|
        # KeyboardInterrupt is raised instantly.
 | 
						|
        self.failUnlessRaises(KeyboardInterrupt, _thread.interrupt_main)
 | 
						|
 | 
						|
class ThreadTests(unittest.TestCase):
 | 
						|
    """Test thread creation."""
 | 
						|
 | 
						|
    def test_arg_passing(self):
 | 
						|
        #Make sure that parameter passing works.
 | 
						|
        def arg_tester(queue, arg1=False, arg2=False):
 | 
						|
            """Use to test _thread.start_new_thread() passes args properly."""
 | 
						|
            queue.put((arg1, arg2))
 | 
						|
 | 
						|
        testing_queue = Queue.Queue(1)
 | 
						|
        _thread.start_new_thread(arg_tester, (testing_queue, True, True))
 | 
						|
        result = testing_queue.get()
 | 
						|
        self.failUnless(result[0] and result[1],
 | 
						|
                        "Argument passing for thread creation using tuple failed")
 | 
						|
        _thread.start_new_thread(arg_tester, tuple(), {'queue':testing_queue,
 | 
						|
                                                       'arg1':True, 'arg2':True})
 | 
						|
        result = testing_queue.get()
 | 
						|
        self.failUnless(result[0] and result[1],
 | 
						|
                        "Argument passing for thread creation using kwargs failed")
 | 
						|
        _thread.start_new_thread(arg_tester, (testing_queue, True), {'arg2':True})
 | 
						|
        result = testing_queue.get()
 | 
						|
        self.failUnless(result[0] and result[1],
 | 
						|
                        "Argument passing for thread creation using both tuple"
 | 
						|
                        " and kwargs failed")
 | 
						|
 | 
						|
    def test_multi_creation(self):
 | 
						|
        #Make sure multiple threads can be created.
 | 
						|
        def queue_mark(queue, delay):
 | 
						|
            """Wait for ``delay`` seconds and then put something into ``queue``"""
 | 
						|
            time.sleep(delay)
 | 
						|
            queue.put(_thread.get_ident())
 | 
						|
 | 
						|
        thread_count = 5
 | 
						|
        testing_queue = Queue.Queue(thread_count)
 | 
						|
        if test_support.verbose:
 | 
						|
            print()
 | 
						|
            print("*** Testing multiple thread creation "\
 | 
						|
            "(will take approx. %s to %s sec.) ***" % (DELAY, thread_count))
 | 
						|
        for count in range(thread_count):
 | 
						|
            if DELAY:
 | 
						|
                local_delay = round(random.random(), 1)
 | 
						|
            else:
 | 
						|
                local_delay = 0
 | 
						|
            _thread.start_new_thread(queue_mark,
 | 
						|
                                     (testing_queue, local_delay))
 | 
						|
        time.sleep(DELAY)
 | 
						|
        if test_support.verbose:
 | 
						|
            print('done')
 | 
						|
        self.failUnless(testing_queue.qsize() == thread_count,
 | 
						|
                        "Not all %s threads executed properly after %s sec." %
 | 
						|
                        (thread_count, DELAY))
 | 
						|
 | 
						|
def test_main(imported_module=None):
 | 
						|
    global _thread, DELAY
 | 
						|
    if imported_module:
 | 
						|
        _thread = imported_module
 | 
						|
        DELAY = 2
 | 
						|
    if test_support.verbose:
 | 
						|
        print()
 | 
						|
        print("*** Using %s as _thread module ***" % _thread)
 | 
						|
    test_support.run_unittest(LockTests, MiscTests, ThreadTests)
 | 
						|
 | 
						|
if __name__ == '__main__':
 | 
						|
    test_main()
 |