mirror of
				https://github.com/python/cpython.git
				synced 2025-10-31 05:31:20 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			252 lines
		
	
	
	
		
			7.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			252 lines
		
	
	
	
		
			7.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import gc
 | |
| import io
 | |
| import os
 | |
| import sys
 | |
| import signal
 | |
| import weakref
 | |
| 
 | |
| import unittest
 | |
| 
 | |
| 
 | |
| @unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill")
 | |
| @unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows")
 | |
| @unittest.skipIf(sys.platform == 'freebsd6', "Test kills regrtest on freebsd6 "
 | |
|     "if threads have been used")
 | |
| class TestBreak(unittest.TestCase):
 | |
| 
 | |
|     def setUp(self):
 | |
|         self._default_handler = signal.getsignal(signal.SIGINT)
 | |
| 
 | |
|     def tearDown(self):
 | |
|         signal.signal(signal.SIGINT, self._default_handler)
 | |
|         unittest.signals._results = weakref.WeakKeyDictionary()
 | |
|         unittest.signals._interrupt_handler = None
 | |
| 
 | |
| 
 | |
|     def testInstallHandler(self):
 | |
|         default_handler = signal.getsignal(signal.SIGINT)
 | |
|         unittest.installHandler()
 | |
|         self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
 | |
| 
 | |
|         try:
 | |
|             pid = os.getpid()
 | |
|             os.kill(pid, signal.SIGINT)
 | |
|         except KeyboardInterrupt:
 | |
|             self.fail("KeyboardInterrupt not handled")
 | |
| 
 | |
|         self.assertTrue(unittest.signals._interrupt_handler.called)
 | |
| 
 | |
|     def testRegisterResult(self):
 | |
|         result = unittest.TestResult()
 | |
|         unittest.registerResult(result)
 | |
| 
 | |
|         for ref in unittest.signals._results:
 | |
|             if ref is result:
 | |
|                 break
 | |
|             elif ref is not result:
 | |
|                 self.fail("odd object in result set")
 | |
|         else:
 | |
|             self.fail("result not found")
 | |
| 
 | |
| 
 | |
|     def testInterruptCaught(self):
 | |
|         default_handler = signal.getsignal(signal.SIGINT)
 | |
| 
 | |
|         result = unittest.TestResult()
 | |
|         unittest.installHandler()
 | |
|         unittest.registerResult(result)
 | |
| 
 | |
|         self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
 | |
| 
 | |
|         def test(result):
 | |
|             pid = os.getpid()
 | |
|             os.kill(pid, signal.SIGINT)
 | |
|             result.breakCaught = True
 | |
|             self.assertTrue(result.shouldStop)
 | |
| 
 | |
|         try:
 | |
|             test(result)
 | |
|         except KeyboardInterrupt:
 | |
|             self.fail("KeyboardInterrupt not handled")
 | |
|         self.assertTrue(result.breakCaught)
 | |
| 
 | |
| 
 | |
|     def testSecondInterrupt(self):
 | |
|         result = unittest.TestResult()
 | |
|         unittest.installHandler()
 | |
|         unittest.registerResult(result)
 | |
| 
 | |
|         def test(result):
 | |
|             pid = os.getpid()
 | |
|             os.kill(pid, signal.SIGINT)
 | |
|             result.breakCaught = True
 | |
|             self.assertTrue(result.shouldStop)
 | |
|             os.kill(pid, signal.SIGINT)
 | |
|             self.fail("Second KeyboardInterrupt not raised")
 | |
| 
 | |
|         try:
 | |
|             test(result)
 | |
|         except KeyboardInterrupt:
 | |
|             pass
 | |
|         else:
 | |
|             self.fail("Second KeyboardInterrupt not raised")
 | |
|         self.assertTrue(result.breakCaught)
 | |
| 
 | |
| 
 | |
|     def testTwoResults(self):
 | |
|         unittest.installHandler()
 | |
| 
 | |
|         result = unittest.TestResult()
 | |
|         unittest.registerResult(result)
 | |
|         new_handler = signal.getsignal(signal.SIGINT)
 | |
| 
 | |
|         result2 = unittest.TestResult()
 | |
|         unittest.registerResult(result2)
 | |
|         self.assertEqual(signal.getsignal(signal.SIGINT), new_handler)
 | |
| 
 | |
|         result3 = unittest.TestResult()
 | |
| 
 | |
|         def test(result):
 | |
|             pid = os.getpid()
 | |
|             os.kill(pid, signal.SIGINT)
 | |
| 
 | |
|         try:
 | |
|             test(result)
 | |
|         except KeyboardInterrupt:
 | |
|             self.fail("KeyboardInterrupt not handled")
 | |
| 
 | |
|         self.assertTrue(result.shouldStop)
 | |
|         self.assertTrue(result2.shouldStop)
 | |
|         self.assertFalse(result3.shouldStop)
 | |
| 
 | |
| 
 | |
|     def testHandlerReplacedButCalled(self):
 | |
|         # If our handler has been replaced (is no longer installed) but is
 | |
|         # called by the *new* handler, then it isn't safe to delay the
 | |
|         # SIGINT and we should immediately delegate to the default handler
 | |
|         unittest.installHandler()
 | |
| 
 | |
|         handler = signal.getsignal(signal.SIGINT)
 | |
|         def new_handler(frame, signum):
 | |
|             handler(frame, signum)
 | |
|         signal.signal(signal.SIGINT, new_handler)
 | |
| 
 | |
|         try:
 | |
|             pid = os.getpid()
 | |
|             os.kill(pid, signal.SIGINT)
 | |
|         except KeyboardInterrupt:
 | |
|             pass
 | |
|         else:
 | |
|             self.fail("replaced but delegated handler doesn't raise interrupt")
 | |
| 
 | |
|     def testRunner(self):
 | |
|         # Creating a TextTestRunner with the appropriate argument should
 | |
|         # register the TextTestResult it creates
 | |
|         runner = unittest.TextTestRunner(stream=io.StringIO())
 | |
| 
 | |
|         result = runner.run(unittest.TestSuite())
 | |
|         self.assertIn(result, unittest.signals._results)
 | |
| 
 | |
|     def testWeakReferences(self):
 | |
|         # Calling registerResult on a result should not keep it alive
 | |
|         result = unittest.TestResult()
 | |
|         unittest.registerResult(result)
 | |
| 
 | |
|         ref = weakref.ref(result)
 | |
|         del result
 | |
| 
 | |
|         # For non-reference counting implementations
 | |
|         gc.collect();gc.collect()
 | |
|         self.assertIsNone(ref())
 | |
| 
 | |
| 
 | |
|     def testRemoveResult(self):
 | |
|         result = unittest.TestResult()
 | |
|         unittest.registerResult(result)
 | |
| 
 | |
|         unittest.installHandler()
 | |
|         self.assertTrue(unittest.removeResult(result))
 | |
| 
 | |
|         # Should this raise an error instead?
 | |
|         self.assertFalse(unittest.removeResult(unittest.TestResult()))
 | |
| 
 | |
|         try:
 | |
|             pid = os.getpid()
 | |
|             os.kill(pid, signal.SIGINT)
 | |
|         except KeyboardInterrupt:
 | |
|             pass
 | |
| 
 | |
|         self.assertFalse(result.shouldStop)
 | |
| 
 | |
|     def testMainInstallsHandler(self):
 | |
|         failfast = object()
 | |
|         test = object()
 | |
|         verbosity = object()
 | |
|         result = object()
 | |
|         default_handler = signal.getsignal(signal.SIGINT)
 | |
| 
 | |
|         class FakeRunner(object):
 | |
|             initArgs = []
 | |
|             runArgs = []
 | |
|             def __init__(self, *args, **kwargs):
 | |
|                 self.initArgs.append((args, kwargs))
 | |
|             def run(self, test):
 | |
|                 self.runArgs.append(test)
 | |
|                 return result
 | |
| 
 | |
|         class Program(unittest.TestProgram):
 | |
|             def __init__(self, catchbreak):
 | |
|                 self.exit = False
 | |
|                 self.verbosity = verbosity
 | |
|                 self.failfast = failfast
 | |
|                 self.catchbreak = catchbreak
 | |
|                 self.testRunner = FakeRunner
 | |
|                 self.test = test
 | |
|                 self.result = None
 | |
| 
 | |
|         p = Program(False)
 | |
|         p.runTests()
 | |
| 
 | |
|         self.assertEqual(FakeRunner.initArgs, [((), {'buffer': None,
 | |
|                                                      'verbosity': verbosity,
 | |
|                                                      'failfast': failfast,
 | |
|                                                      'warnings': None})])
 | |
|         self.assertEqual(FakeRunner.runArgs, [test])
 | |
|         self.assertEqual(p.result, result)
 | |
| 
 | |
|         self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
 | |
| 
 | |
|         FakeRunner.initArgs = []
 | |
|         FakeRunner.runArgs = []
 | |
|         p = Program(True)
 | |
|         p.runTests()
 | |
| 
 | |
|         self.assertEqual(FakeRunner.initArgs, [((), {'buffer': None,
 | |
|                                                      'verbosity': verbosity,
 | |
|                                                      'failfast': failfast,
 | |
|                                                      'warnings': None})])
 | |
|         self.assertEqual(FakeRunner.runArgs, [test])
 | |
|         self.assertEqual(p.result, result)
 | |
| 
 | |
|         self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
 | |
| 
 | |
|     def testRemoveHandler(self):
 | |
|         default_handler = signal.getsignal(signal.SIGINT)
 | |
|         unittest.installHandler()
 | |
|         unittest.removeHandler()
 | |
|         self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
 | |
| 
 | |
|         # check that calling removeHandler multiple times has no ill-effect
 | |
|         unittest.removeHandler()
 | |
|         self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
 | |
| 
 | |
|     def testRemoveHandlerAsDecorator(self):
 | |
|         default_handler = signal.getsignal(signal.SIGINT)
 | |
|         unittest.installHandler()
 | |
| 
 | |
|         @unittest.removeHandler
 | |
|         def test():
 | |
|             self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
 | |
| 
 | |
|         test()
 | |
|         self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
 | 
