mirror of
				https://github.com/python/cpython.git
				synced 2025-11-03 23:21:29 +00:00 
			
		
		
		
	svn+ssh://pythondev@svn.python.org/python/trunk ........ r80476 | michael.foord | 2010-04-25 20:02:46 +0100 (Sun, 25 Apr 2010) | 1 line Adding unittest.removeHandler function / decorator for removing the signal.SIGINT signal handler. With tests and docs. ........
		
			
				
	
	
		
			250 lines
		
	
	
	
		
			7.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			250 lines
		
	
	
	
		
			7.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import gc
 | 
						|
import io
 | 
						|
import os
 | 
						|
import sys
 | 
						|
import signal
 | 
						|
import weakref
 | 
						|
 | 
						|
import unittest
 | 
						|
 | 
						|
 | 
						|
@unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill")
 | 
						|
@unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows")
 | 
						|
@unittest.skipIf(sys.platform == 'freebsd6', "Test kills regrtest on freebsd6 "
 | 
						|
    "if threads have been used")
 | 
						|
class TestBreak(unittest.TestCase):
 | 
						|
 | 
						|
    def setUp(self):
 | 
						|
        self._default_handler = signal.getsignal(signal.SIGINT)
 | 
						|
 | 
						|
    def tearDown(self):
 | 
						|
        signal.signal(signal.SIGINT, self._default_handler)
 | 
						|
        unittest.signals._results = weakref.WeakKeyDictionary()
 | 
						|
        unittest.signals._interrupt_handler = None
 | 
						|
 | 
						|
 | 
						|
    def testInstallHandler(self):
 | 
						|
        default_handler = signal.getsignal(signal.SIGINT)
 | 
						|
        unittest.installHandler()
 | 
						|
        self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
 | 
						|
 | 
						|
        try:
 | 
						|
            pid = os.getpid()
 | 
						|
            os.kill(pid, signal.SIGINT)
 | 
						|
        except KeyboardInterrupt:
 | 
						|
            self.fail("KeyboardInterrupt not handled")
 | 
						|
 | 
						|
        self.assertTrue(unittest.signals._interrupt_handler.called)
 | 
						|
 | 
						|
    def testRegisterResult(self):
 | 
						|
        result = unittest.TestResult()
 | 
						|
        unittest.registerResult(result)
 | 
						|
 | 
						|
        for ref in unittest.signals._results:
 | 
						|
            if ref is result:
 | 
						|
                break
 | 
						|
            elif ref is not result:
 | 
						|
                self.fail("odd object in result set")
 | 
						|
        else:
 | 
						|
            self.fail("result not found")
 | 
						|
 | 
						|
 | 
						|
    def testInterruptCaught(self):
 | 
						|
        default_handler = signal.getsignal(signal.SIGINT)
 | 
						|
 | 
						|
        result = unittest.TestResult()
 | 
						|
        unittest.installHandler()
 | 
						|
        unittest.registerResult(result)
 | 
						|
 | 
						|
        self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
 | 
						|
 | 
						|
        def test(result):
 | 
						|
            pid = os.getpid()
 | 
						|
            os.kill(pid, signal.SIGINT)
 | 
						|
            result.breakCaught = True
 | 
						|
            self.assertTrue(result.shouldStop)
 | 
						|
 | 
						|
        try:
 | 
						|
            test(result)
 | 
						|
        except KeyboardInterrupt:
 | 
						|
            self.fail("KeyboardInterrupt not handled")
 | 
						|
        self.assertTrue(result.breakCaught)
 | 
						|
 | 
						|
 | 
						|
    def testSecondInterrupt(self):
 | 
						|
        result = unittest.TestResult()
 | 
						|
        unittest.installHandler()
 | 
						|
        unittest.registerResult(result)
 | 
						|
 | 
						|
        def test(result):
 | 
						|
            pid = os.getpid()
 | 
						|
            os.kill(pid, signal.SIGINT)
 | 
						|
            result.breakCaught = True
 | 
						|
            self.assertTrue(result.shouldStop)
 | 
						|
            os.kill(pid, signal.SIGINT)
 | 
						|
            self.fail("Second KeyboardInterrupt not raised")
 | 
						|
 | 
						|
        try:
 | 
						|
            test(result)
 | 
						|
        except KeyboardInterrupt:
 | 
						|
            pass
 | 
						|
        else:
 | 
						|
            self.fail("Second KeyboardInterrupt not raised")
 | 
						|
        self.assertTrue(result.breakCaught)
 | 
						|
 | 
						|
 | 
						|
    def testTwoResults(self):
 | 
						|
        unittest.installHandler()
 | 
						|
 | 
						|
        result = unittest.TestResult()
 | 
						|
        unittest.registerResult(result)
 | 
						|
        new_handler = signal.getsignal(signal.SIGINT)
 | 
						|
 | 
						|
        result2 = unittest.TestResult()
 | 
						|
        unittest.registerResult(result2)
 | 
						|
        self.assertEqual(signal.getsignal(signal.SIGINT), new_handler)
 | 
						|
 | 
						|
        result3 = unittest.TestResult()
 | 
						|
 | 
						|
        def test(result):
 | 
						|
            pid = os.getpid()
 | 
						|
            os.kill(pid, signal.SIGINT)
 | 
						|
 | 
						|
        try:
 | 
						|
            test(result)
 | 
						|
        except KeyboardInterrupt:
 | 
						|
            self.fail("KeyboardInterrupt not handled")
 | 
						|
 | 
						|
        self.assertTrue(result.shouldStop)
 | 
						|
        self.assertTrue(result2.shouldStop)
 | 
						|
        self.assertFalse(result3.shouldStop)
 | 
						|
 | 
						|
 | 
						|
    def testHandlerReplacedButCalled(self):
 | 
						|
        # If our handler has been replaced (is no longer installed) but is
 | 
						|
        # called by the *new* handler, then it isn't safe to delay the
 | 
						|
        # SIGINT and we should immediately delegate to the default handler
 | 
						|
        unittest.installHandler()
 | 
						|
 | 
						|
        handler = signal.getsignal(signal.SIGINT)
 | 
						|
        def new_handler(frame, signum):
 | 
						|
            handler(frame, signum)
 | 
						|
        signal.signal(signal.SIGINT, new_handler)
 | 
						|
 | 
						|
        try:
 | 
						|
            pid = os.getpid()
 | 
						|
            os.kill(pid, signal.SIGINT)
 | 
						|
        except KeyboardInterrupt:
 | 
						|
            pass
 | 
						|
        else:
 | 
						|
            self.fail("replaced but delegated handler doesn't raise interrupt")
 | 
						|
 | 
						|
    def testRunner(self):
 | 
						|
        # Creating a TextTestRunner with the appropriate argument should
 | 
						|
        # register the TextTestResult it creates
 | 
						|
        runner = unittest.TextTestRunner(stream=io.StringIO())
 | 
						|
 | 
						|
        result = runner.run(unittest.TestSuite())
 | 
						|
        self.assertIn(result, unittest.signals._results)
 | 
						|
 | 
						|
    def testWeakReferences(self):
 | 
						|
        # Calling registerResult on a result should not keep it alive
 | 
						|
        result = unittest.TestResult()
 | 
						|
        unittest.registerResult(result)
 | 
						|
 | 
						|
        ref = weakref.ref(result)
 | 
						|
        del result
 | 
						|
 | 
						|
        # For non-reference counting implementations
 | 
						|
        gc.collect();gc.collect()
 | 
						|
        self.assertIsNone(ref())
 | 
						|
 | 
						|
 | 
						|
    def testRemoveResult(self):
 | 
						|
        result = unittest.TestResult()
 | 
						|
        unittest.registerResult(result)
 | 
						|
 | 
						|
        unittest.installHandler()
 | 
						|
        self.assertTrue(unittest.removeResult(result))
 | 
						|
 | 
						|
        # Should this raise an error instead?
 | 
						|
        self.assertFalse(unittest.removeResult(unittest.TestResult()))
 | 
						|
 | 
						|
        try:
 | 
						|
            pid = os.getpid()
 | 
						|
            os.kill(pid, signal.SIGINT)
 | 
						|
        except KeyboardInterrupt:
 | 
						|
            pass
 | 
						|
 | 
						|
        self.assertFalse(result.shouldStop)
 | 
						|
 | 
						|
    def testMainInstallsHandler(self):
 | 
						|
        failfast = object()
 | 
						|
        test = object()
 | 
						|
        verbosity = object()
 | 
						|
        result = object()
 | 
						|
        default_handler = signal.getsignal(signal.SIGINT)
 | 
						|
 | 
						|
        class FakeRunner(object):
 | 
						|
            initArgs = []
 | 
						|
            runArgs = []
 | 
						|
            def __init__(self, *args, **kwargs):
 | 
						|
                self.initArgs.append((args, kwargs))
 | 
						|
            def run(self, test):
 | 
						|
                self.runArgs.append(test)
 | 
						|
                return result
 | 
						|
 | 
						|
        class Program(unittest.TestProgram):
 | 
						|
            def __init__(self, catchbreak):
 | 
						|
                self.exit = False
 | 
						|
                self.verbosity = verbosity
 | 
						|
                self.failfast = failfast
 | 
						|
                self.catchbreak = catchbreak
 | 
						|
                self.testRunner = FakeRunner
 | 
						|
                self.test = test
 | 
						|
                self.result = None
 | 
						|
 | 
						|
        p = Program(False)
 | 
						|
        p.runTests()
 | 
						|
 | 
						|
        self.assertEqual(FakeRunner.initArgs, [((), {'buffer': None,
 | 
						|
                                                     'verbosity': verbosity,
 | 
						|
                                                     'failfast': failfast})])
 | 
						|
        self.assertEqual(FakeRunner.runArgs, [test])
 | 
						|
        self.assertEqual(p.result, result)
 | 
						|
 | 
						|
        self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
 | 
						|
 | 
						|
        FakeRunner.initArgs = []
 | 
						|
        FakeRunner.runArgs = []
 | 
						|
        p = Program(True)
 | 
						|
        p.runTests()
 | 
						|
 | 
						|
        self.assertEqual(FakeRunner.initArgs, [((), {'buffer': None,
 | 
						|
                                                     'verbosity': verbosity,
 | 
						|
                                                     'failfast': failfast})])
 | 
						|
        self.assertEqual(FakeRunner.runArgs, [test])
 | 
						|
        self.assertEqual(p.result, result)
 | 
						|
 | 
						|
        self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
 | 
						|
 | 
						|
    def testRemoveHandler(self):
 | 
						|
        default_handler = signal.getsignal(signal.SIGINT)
 | 
						|
        unittest.installHandler()
 | 
						|
        unittest.removeHandler()
 | 
						|
        self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
 | 
						|
 | 
						|
        # check that calling removeHandler multiple times has no ill-effect
 | 
						|
        unittest.removeHandler()
 | 
						|
        self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
 | 
						|
 | 
						|
    def testRemoveHandlerAsDecorator(self):
 | 
						|
        default_handler = signal.getsignal(signal.SIGINT)
 | 
						|
        unittest.installHandler()
 | 
						|
 | 
						|
        @unittest.removeHandler
 | 
						|
        def test():
 | 
						|
            self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
 | 
						|
 | 
						|
        test()
 | 
						|
        self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
 |