mirror of
				https://github.com/python/cpython.git
				synced 2025-10-24 18:33:49 +00:00 
			
		
		
		
	 c735d54534
			
		
	
	
		c735d54534
		
			
		
	
	
	
	
		
			
			* Move Lib/unittest/test/ to Lib/test/test_unittest/ * Remove Lib/test/test_unittest.py * Replace unittest.test with test.test_unittest * Remove unittest.load_tests() * Rewrite unittest __init__.py and __main__.py * Update build system, CODEOWNERS, and wasm_assets.py
		
			
				
	
	
		
			306 lines
		
	
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			306 lines
		
	
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import gc
 | |
| import io
 | |
| import os
 | |
| import sys
 | |
| import signal
 | |
| import weakref
 | |
| import unittest
 | |
| 
 | |
| from test import support
 | |
| 
 | |
| 
 | |
| @unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill")
 | |
| @unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows")
 | |
| class TestBreak(unittest.TestCase):
 | |
|     int_handler = None
 | |
|     # This number was smart-guessed, previously tests were failing
 | |
|     # after 7th run. So, we take `x * 2 + 1` to be sure.
 | |
|     default_repeats = 15
 | |
| 
 | |
|     def setUp(self):
 | |
|         self._default_handler = signal.getsignal(signal.SIGINT)
 | |
|         if self.int_handler is not None:
 | |
|             signal.signal(signal.SIGINT, self.int_handler)
 | |
| 
 | |
|     def tearDown(self):
 | |
|         signal.signal(signal.SIGINT, self._default_handler)
 | |
|         unittest.signals._results = weakref.WeakKeyDictionary()
 | |
|         unittest.signals._interrupt_handler = None
 | |
| 
 | |
| 
 | |
|     def withRepeats(self, test_function, repeats=None):
 | |
|         if not support.check_impl_detail(cpython=True):
 | |
|             # Override repeats count on non-cpython to execute only once.
 | |
|             # Because this test only makes sense to be repeated on CPython.
 | |
|             repeats = 1
 | |
|         elif repeats is None:
 | |
|             repeats = self.default_repeats
 | |
| 
 | |
|         for repeat in range(repeats):
 | |
|             with self.subTest(repeat=repeat):
 | |
|                 # We don't run `setUp` for the very first repeat
 | |
|                 # and we don't run `tearDown` for the very last one,
 | |
|                 # because they are handled by the test class itself.
 | |
|                 if repeat != 0:
 | |
|                     self.setUp()
 | |
|                 try:
 | |
|                     test_function()
 | |
|                 finally:
 | |
|                     if repeat != repeats - 1:
 | |
|                         self.tearDown()
 | |
| 
 | |
|     def testInstallHandler(self):
 | |
|         default_handler = signal.getsignal(signal.SIGINT)
 | |
|         unittest.installHandler()
 | |
|         self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
 | |
| 
 | |
|         try:
 | |
|             pid = os.getpid()
 | |
|             os.kill(pid, signal.SIGINT)
 | |
|         except KeyboardInterrupt:
 | |
|             self.fail("KeyboardInterrupt not handled")
 | |
| 
 | |
|         self.assertTrue(unittest.signals._interrupt_handler.called)
 | |
| 
 | |
|     def testRegisterResult(self):
 | |
|         result = unittest.TestResult()
 | |
|         self.assertNotIn(result, unittest.signals._results)
 | |
| 
 | |
|         unittest.registerResult(result)
 | |
|         try:
 | |
|             self.assertIn(result, unittest.signals._results)
 | |
|         finally:
 | |
|             unittest.removeResult(result)
 | |
| 
 | |
|     def testInterruptCaught(self):
 | |
|         def test(result):
 | |
|             pid = os.getpid()
 | |
|             os.kill(pid, signal.SIGINT)
 | |
|             result.breakCaught = True
 | |
|             self.assertTrue(result.shouldStop)
 | |
| 
 | |
|         def test_function():
 | |
|             result = unittest.TestResult()
 | |
|             unittest.installHandler()
 | |
|             unittest.registerResult(result)
 | |
| 
 | |
|             self.assertNotEqual(
 | |
|                 signal.getsignal(signal.SIGINT),
 | |
|                 self._default_handler,
 | |
|             )
 | |
| 
 | |
|             try:
 | |
|                 test(result)
 | |
|             except KeyboardInterrupt:
 | |
|                 self.fail("KeyboardInterrupt not handled")
 | |
|             self.assertTrue(result.breakCaught)
 | |
|         self.withRepeats(test_function)
 | |
| 
 | |
|     def testSecondInterrupt(self):
 | |
|         # Can't use skipIf decorator because the signal handler may have
 | |
|         # been changed after defining this method.
 | |
|         if signal.getsignal(signal.SIGINT) == signal.SIG_IGN:
 | |
|             self.skipTest("test requires SIGINT to not be ignored")
 | |
| 
 | |
|         def test(result):
 | |
|             pid = os.getpid()
 | |
|             os.kill(pid, signal.SIGINT)
 | |
|             result.breakCaught = True
 | |
|             self.assertTrue(result.shouldStop)
 | |
|             os.kill(pid, signal.SIGINT)
 | |
|             self.fail("Second KeyboardInterrupt not raised")
 | |
| 
 | |
|         def test_function():
 | |
|             result = unittest.TestResult()
 | |
|             unittest.installHandler()
 | |
|             unittest.registerResult(result)
 | |
| 
 | |
|             with self.assertRaises(KeyboardInterrupt):
 | |
|                 test(result)
 | |
|             self.assertTrue(result.breakCaught)
 | |
|         self.withRepeats(test_function)
 | |
| 
 | |
| 
 | |
|     def testTwoResults(self):
 | |
|         def test_function():
 | |
|             unittest.installHandler()
 | |
| 
 | |
|             result = unittest.TestResult()
 | |
|             unittest.registerResult(result)
 | |
|             new_handler = signal.getsignal(signal.SIGINT)
 | |
| 
 | |
|             result2 = unittest.TestResult()
 | |
|             unittest.registerResult(result2)
 | |
|             self.assertEqual(signal.getsignal(signal.SIGINT), new_handler)
 | |
| 
 | |
|             result3 = unittest.TestResult()
 | |
| 
 | |
|             try:
 | |
|                 os.kill(os.getpid(), signal.SIGINT)
 | |
|             except KeyboardInterrupt:
 | |
|                 self.fail("KeyboardInterrupt not handled")
 | |
| 
 | |
|             self.assertTrue(result.shouldStop)
 | |
|             self.assertTrue(result2.shouldStop)
 | |
|             self.assertFalse(result3.shouldStop)
 | |
|         self.withRepeats(test_function)
 | |
| 
 | |
| 
 | |
|     def testHandlerReplacedButCalled(self):
 | |
|         # Can't use skipIf decorator because the signal handler may have
 | |
|         # been changed after defining this method.
 | |
|         if signal.getsignal(signal.SIGINT) == signal.SIG_IGN:
 | |
|             self.skipTest("test requires SIGINT to not be ignored")
 | |
| 
 | |
|         def test_function():
 | |
|             # If our handler has been replaced (is no longer installed) but is
 | |
|             # called by the *new* handler, then it isn't safe to delay the
 | |
|             # SIGINT and we should immediately delegate to the default handler
 | |
|             unittest.installHandler()
 | |
| 
 | |
|             handler = signal.getsignal(signal.SIGINT)
 | |
|             def new_handler(frame, signum):
 | |
|                 handler(frame, signum)
 | |
|             signal.signal(signal.SIGINT, new_handler)
 | |
| 
 | |
|             try:
 | |
|                 os.kill(os.getpid(), signal.SIGINT)
 | |
|             except KeyboardInterrupt:
 | |
|                 pass
 | |
|             else:
 | |
|                 self.fail("replaced but delegated handler doesn't raise interrupt")
 | |
|         self.withRepeats(test_function)
 | |
| 
 | |
|     def testRunner(self):
 | |
|         # Creating a TextTestRunner with the appropriate argument should
 | |
|         # register the TextTestResult it creates
 | |
|         runner = unittest.TextTestRunner(stream=io.StringIO())
 | |
| 
 | |
|         result = runner.run(unittest.TestSuite())
 | |
|         self.assertIn(result, unittest.signals._results)
 | |
| 
 | |
|     def testWeakReferences(self):
 | |
|         # Calling registerResult on a result should not keep it alive
 | |
|         result = unittest.TestResult()
 | |
|         unittest.registerResult(result)
 | |
| 
 | |
|         ref = weakref.ref(result)
 | |
|         del result
 | |
| 
 | |
|         # For non-reference counting implementations
 | |
|         gc.collect();gc.collect()
 | |
|         self.assertIsNone(ref())
 | |
| 
 | |
| 
 | |
|     def testRemoveResult(self):
 | |
|         result = unittest.TestResult()
 | |
|         unittest.registerResult(result)
 | |
| 
 | |
|         unittest.installHandler()
 | |
|         self.assertTrue(unittest.removeResult(result))
 | |
| 
 | |
|         # Should this raise an error instead?
 | |
|         self.assertFalse(unittest.removeResult(unittest.TestResult()))
 | |
| 
 | |
|         try:
 | |
|             pid = os.getpid()
 | |
|             os.kill(pid, signal.SIGINT)
 | |
|         except KeyboardInterrupt:
 | |
|             pass
 | |
| 
 | |
|         self.assertFalse(result.shouldStop)
 | |
| 
 | |
|     def testMainInstallsHandler(self):
 | |
|         failfast = object()
 | |
|         test = object()
 | |
|         verbosity = object()
 | |
|         result = object()
 | |
|         default_handler = signal.getsignal(signal.SIGINT)
 | |
| 
 | |
|         class FakeRunner(object):
 | |
|             initArgs = []
 | |
|             runArgs = []
 | |
|             def __init__(self, *args, **kwargs):
 | |
|                 self.initArgs.append((args, kwargs))
 | |
|             def run(self, test):
 | |
|                 self.runArgs.append(test)
 | |
|                 return result
 | |
| 
 | |
|         class Program(unittest.TestProgram):
 | |
|             def __init__(self, catchbreak):
 | |
|                 self.exit = False
 | |
|                 self.verbosity = verbosity
 | |
|                 self.failfast = failfast
 | |
|                 self.catchbreak = catchbreak
 | |
|                 self.tb_locals = False
 | |
|                 self.testRunner = FakeRunner
 | |
|                 self.test = test
 | |
|                 self.result = None
 | |
| 
 | |
|         p = Program(False)
 | |
|         p.runTests()
 | |
| 
 | |
|         self.assertEqual(FakeRunner.initArgs, [((), {'buffer': None,
 | |
|                                                      'verbosity': verbosity,
 | |
|                                                      'failfast': failfast,
 | |
|                                                      'tb_locals': False,
 | |
|                                                      'warnings': None})])
 | |
|         self.assertEqual(FakeRunner.runArgs, [test])
 | |
|         self.assertEqual(p.result, result)
 | |
| 
 | |
|         self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
 | |
| 
 | |
|         FakeRunner.initArgs = []
 | |
|         FakeRunner.runArgs = []
 | |
|         p = Program(True)
 | |
|         p.runTests()
 | |
| 
 | |
|         self.assertEqual(FakeRunner.initArgs, [((), {'buffer': None,
 | |
|                                                      'verbosity': verbosity,
 | |
|                                                      'failfast': failfast,
 | |
|                                                      'tb_locals': False,
 | |
|                                                      'warnings': None})])
 | |
|         self.assertEqual(FakeRunner.runArgs, [test])
 | |
|         self.assertEqual(p.result, result)
 | |
| 
 | |
|         self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
 | |
| 
 | |
|     def testRemoveHandler(self):
 | |
|         default_handler = signal.getsignal(signal.SIGINT)
 | |
|         unittest.installHandler()
 | |
|         unittest.removeHandler()
 | |
|         self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
 | |
| 
 | |
|         # check that calling removeHandler multiple times has no ill-effect
 | |
|         unittest.removeHandler()
 | |
|         self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
 | |
| 
 | |
|     def testRemoveHandlerAsDecorator(self):
 | |
|         default_handler = signal.getsignal(signal.SIGINT)
 | |
|         unittest.installHandler()
 | |
| 
 | |
|         @unittest.removeHandler
 | |
|         def test():
 | |
|             self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
 | |
| 
 | |
|         test()
 | |
|         self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
 | |
| 
 | |
| @unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill")
 | |
| @unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows")
 | |
| class TestBreakDefaultIntHandler(TestBreak):
 | |
|     int_handler = signal.default_int_handler
 | |
| 
 | |
| @unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill")
 | |
| @unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows")
 | |
| class TestBreakSignalIgnored(TestBreak):
 | |
|     int_handler = signal.SIG_IGN
 | |
| 
 | |
| @unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill")
 | |
| @unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows")
 | |
| class TestBreakSignalDefault(TestBreak):
 | |
|     int_handler = signal.SIG_DFL
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     unittest.main()
 |