| 
									
										
										
										
											2010-03-26 03:18:31 +00:00
										 |  |  | import gc | 
					
						
							|  |  |  | import os | 
					
						
							| 
									
										
										
										
											2010-04-02 23:26:06 +00:00
										 |  |  | import sys | 
					
						
							| 
									
										
										
										
											2010-03-26 03:18:31 +00:00
										 |  |  | import signal | 
					
						
							|  |  |  | import weakref | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | from cStringIO import StringIO | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import unittest | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | @unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill") | 
					
						
							| 
									
										
										
										
											2010-04-02 23:26:06 +00:00
										 |  |  | @unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows") | 
					
						
							| 
									
										
										
										
											2010-04-17 21:59:26 +00:00
										 |  |  | @unittest.skipIf(sys.platform == 'freebsd6', "Test kills regrtest on freebsd6 " | 
					
						
							|  |  |  |     "if threads have been used") | 
					
						
							| 
									
										
										
										
											2010-03-26 03:18:31 +00:00
										 |  |  | class TestBreak(unittest.TestCase): | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def setUp(self): | 
					
						
							|  |  |  |         self._default_handler = signal.getsignal(signal.SIGINT) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def tearDown(self): | 
					
						
							|  |  |  |         signal.signal(signal.SIGINT, self._default_handler) | 
					
						
							|  |  |  |         unittest.signals._results = weakref.WeakKeyDictionary() | 
					
						
							|  |  |  |         unittest.signals._interrupt_handler = None | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def testInstallHandler(self): | 
					
						
							|  |  |  |         default_handler = signal.getsignal(signal.SIGINT) | 
					
						
							|  |  |  |         unittest.installHandler() | 
					
						
							|  |  |  |         self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         try: | 
					
						
							|  |  |  |             pid = os.getpid() | 
					
						
							|  |  |  |             os.kill(pid, signal.SIGINT) | 
					
						
							|  |  |  |         except KeyboardInterrupt: | 
					
						
							|  |  |  |             self.fail("KeyboardInterrupt not handled") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.assertTrue(unittest.signals._interrupt_handler.called) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def testRegisterResult(self): | 
					
						
							|  |  |  |         result = unittest.TestResult() | 
					
						
							|  |  |  |         unittest.registerResult(result) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         for ref in unittest.signals._results: | 
					
						
							|  |  |  |             if ref is result: | 
					
						
							|  |  |  |                 break | 
					
						
							|  |  |  |             elif ref is not result: | 
					
						
							|  |  |  |                 self.fail("odd object in result set") | 
					
						
							|  |  |  |         else: | 
					
						
							|  |  |  |             self.fail("result not found") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def testInterruptCaught(self): | 
					
						
							|  |  |  |         default_handler = signal.getsignal(signal.SIGINT) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         result = unittest.TestResult() | 
					
						
							|  |  |  |         unittest.installHandler() | 
					
						
							|  |  |  |         unittest.registerResult(result) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         def test(result): | 
					
						
							|  |  |  |             pid = os.getpid() | 
					
						
							|  |  |  |             os.kill(pid, signal.SIGINT) | 
					
						
							|  |  |  |             result.breakCaught = True | 
					
						
							|  |  |  |             self.assertTrue(result.shouldStop) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         try: | 
					
						
							|  |  |  |             test(result) | 
					
						
							|  |  |  |         except KeyboardInterrupt: | 
					
						
							|  |  |  |             self.fail("KeyboardInterrupt not handled") | 
					
						
							|  |  |  |         self.assertTrue(result.breakCaught) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def testSecondInterrupt(self): | 
					
						
							|  |  |  |         result = unittest.TestResult() | 
					
						
							|  |  |  |         unittest.installHandler() | 
					
						
							|  |  |  |         unittest.registerResult(result) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         def test(result): | 
					
						
							|  |  |  |             pid = os.getpid() | 
					
						
							|  |  |  |             os.kill(pid, signal.SIGINT) | 
					
						
							|  |  |  |             result.breakCaught = True | 
					
						
							|  |  |  |             self.assertTrue(result.shouldStop) | 
					
						
							|  |  |  |             os.kill(pid, signal.SIGINT) | 
					
						
							|  |  |  |             self.fail("Second KeyboardInterrupt not raised") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         try: | 
					
						
							|  |  |  |             test(result) | 
					
						
							|  |  |  |         except KeyboardInterrupt: | 
					
						
							|  |  |  |             pass | 
					
						
							|  |  |  |         else: | 
					
						
							|  |  |  |             self.fail("Second KeyboardInterrupt not raised") | 
					
						
							|  |  |  |         self.assertTrue(result.breakCaught) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def testTwoResults(self): | 
					
						
							|  |  |  |         unittest.installHandler() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         result = unittest.TestResult() | 
					
						
							|  |  |  |         unittest.registerResult(result) | 
					
						
							|  |  |  |         new_handler = signal.getsignal(signal.SIGINT) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         result2 = unittest.TestResult() | 
					
						
							|  |  |  |         unittest.registerResult(result2) | 
					
						
							|  |  |  |         self.assertEqual(signal.getsignal(signal.SIGINT), new_handler) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         result3 = unittest.TestResult() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         def test(result): | 
					
						
							|  |  |  |             pid = os.getpid() | 
					
						
							|  |  |  |             os.kill(pid, signal.SIGINT) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         try: | 
					
						
							|  |  |  |             test(result) | 
					
						
							|  |  |  |         except KeyboardInterrupt: | 
					
						
							|  |  |  |             self.fail("KeyboardInterrupt not handled") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.assertTrue(result.shouldStop) | 
					
						
							|  |  |  |         self.assertTrue(result2.shouldStop) | 
					
						
							|  |  |  |         self.assertFalse(result3.shouldStop) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def testHandlerReplacedButCalled(self): | 
					
						
							|  |  |  |         # If our handler has been replaced (is no longer installed) but is | 
					
						
							|  |  |  |         # called by the *new* handler, then it isn't safe to delay the | 
					
						
							|  |  |  |         # SIGINT and we should immediately delegate to the default handler | 
					
						
							|  |  |  |         unittest.installHandler() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         handler = signal.getsignal(signal.SIGINT) | 
					
						
							|  |  |  |         def new_handler(frame, signum): | 
					
						
							|  |  |  |             handler(frame, signum) | 
					
						
							|  |  |  |         signal.signal(signal.SIGINT, new_handler) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         try: | 
					
						
							|  |  |  |             pid = os.getpid() | 
					
						
							|  |  |  |             os.kill(pid, signal.SIGINT) | 
					
						
							|  |  |  |         except KeyboardInterrupt: | 
					
						
							|  |  |  |             pass | 
					
						
							|  |  |  |         else: | 
					
						
							|  |  |  |             self.fail("replaced but delegated handler doesn't raise interrupt") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def testRunner(self): | 
					
						
							|  |  |  |         # Creating a TextTestRunner with the appropriate argument should | 
					
						
							|  |  |  |         # register the TextTestResult it creates | 
					
						
							|  |  |  |         runner = unittest.TextTestRunner(stream=StringIO()) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         result = runner.run(unittest.TestSuite()) | 
					
						
							|  |  |  |         self.assertIn(result, unittest.signals._results) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def testWeakReferences(self): | 
					
						
							|  |  |  |         # Calling registerResult on a result should not keep it alive | 
					
						
							|  |  |  |         result = unittest.TestResult() | 
					
						
							|  |  |  |         unittest.registerResult(result) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         ref = weakref.ref(result) | 
					
						
							|  |  |  |         del result | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # For non-reference counting implementations | 
					
						
							|  |  |  |         gc.collect();gc.collect() | 
					
						
							|  |  |  |         self.assertIsNone(ref()) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def testRemoveResult(self): | 
					
						
							|  |  |  |         result = unittest.TestResult() | 
					
						
							|  |  |  |         unittest.registerResult(result) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         unittest.installHandler() | 
					
						
							|  |  |  |         self.assertTrue(unittest.removeResult(result)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # Should this raise an error instead? | 
					
						
							|  |  |  |         self.assertFalse(unittest.removeResult(unittest.TestResult())) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         try: | 
					
						
							|  |  |  |             pid = os.getpid() | 
					
						
							|  |  |  |             os.kill(pid, signal.SIGINT) | 
					
						
							|  |  |  |         except KeyboardInterrupt: | 
					
						
							|  |  |  |             pass | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.assertFalse(result.shouldStop) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def testMainInstallsHandler(self): | 
					
						
							|  |  |  |         failfast = object() | 
					
						
							|  |  |  |         test = object() | 
					
						
							|  |  |  |         verbosity = object() | 
					
						
							|  |  |  |         result = object() | 
					
						
							|  |  |  |         default_handler = signal.getsignal(signal.SIGINT) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         class FakeRunner(object): | 
					
						
							|  |  |  |             initArgs = [] | 
					
						
							|  |  |  |             runArgs = [] | 
					
						
							|  |  |  |             def __init__(self, *args, **kwargs): | 
					
						
							|  |  |  |                 self.initArgs.append((args, kwargs)) | 
					
						
							|  |  |  |             def run(self, test): | 
					
						
							|  |  |  |                 self.runArgs.append(test) | 
					
						
							|  |  |  |                 return result | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         class Program(unittest.TestProgram): | 
					
						
							|  |  |  |             def __init__(self, catchbreak): | 
					
						
							|  |  |  |                 self.exit = False | 
					
						
							|  |  |  |                 self.verbosity = verbosity | 
					
						
							|  |  |  |                 self.failfast = failfast | 
					
						
							|  |  |  |                 self.catchbreak = catchbreak | 
					
						
							|  |  |  |                 self.testRunner = FakeRunner | 
					
						
							|  |  |  |                 self.test = test | 
					
						
							|  |  |  |                 self.result = None | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         p = Program(False) | 
					
						
							|  |  |  |         p.runTests() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-04-02 21:42:47 +00:00
										 |  |  |         self.assertEqual(FakeRunner.initArgs, [((), {'buffer': None, | 
					
						
							|  |  |  |                                                      'verbosity': verbosity, | 
					
						
							|  |  |  |                                                      'failfast': failfast})]) | 
					
						
							| 
									
										
										
										
											2010-03-26 03:18:31 +00:00
										 |  |  |         self.assertEqual(FakeRunner.runArgs, [test]) | 
					
						
							|  |  |  |         self.assertEqual(p.result, result) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.assertEqual(signal.getsignal(signal.SIGINT), default_handler) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         FakeRunner.initArgs = [] | 
					
						
							|  |  |  |         FakeRunner.runArgs = [] | 
					
						
							|  |  |  |         p = Program(True) | 
					
						
							|  |  |  |         p.runTests() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-04-02 21:42:47 +00:00
										 |  |  |         self.assertEqual(FakeRunner.initArgs, [((), {'buffer': None, | 
					
						
							|  |  |  |                                                      'verbosity': verbosity, | 
					
						
							|  |  |  |                                                      'failfast': failfast})]) | 
					
						
							| 
									
										
										
										
											2010-03-26 03:18:31 +00:00
										 |  |  |         self.assertEqual(FakeRunner.runArgs, [test]) | 
					
						
							|  |  |  |         self.assertEqual(p.result, result) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler) |