mirror of
				https://github.com/python/cpython.git
				synced 2025-10-31 05:31:20 +00:00 
			
		
		
		
	gh-93839: Move Lib/unttest/test/ to Lib/test/test_unittest/ (#94043)
* Move Lib/unittest/test/ to Lib/test/test_unittest/ * Remove Lib/test/test_unittest.py * Replace unittest.test with test.test_unittest * Remove unittest.load_tests() * Rewrite unittest __init__.py and __main__.py * Update build system, CODEOWNERS, and wasm_assets.py
This commit is contained in:
		
							parent
							
								
									d82e0bfe8b
								
							
						
					
					
						commit
						c735d54534
					
				
					 38 changed files with 77 additions and 137 deletions
				
			
		|  | @ -1,439 +0,0 @@ | |||
| import asyncio | ||||
| import contextvars | ||||
| import unittest | ||||
| from test import support | ||||
| 
 | ||||
| support.requires_working_socket(module=True) | ||||
| 
 | ||||
| 
 | ||||
| class MyException(Exception): | ||||
|     pass | ||||
| 
 | ||||
| 
 | ||||
| def tearDownModule(): | ||||
|     asyncio.set_event_loop_policy(None) | ||||
| 
 | ||||
| 
 | ||||
| class TestCM: | ||||
|     def __init__(self, ordering, enter_result=None): | ||||
|         self.ordering = ordering | ||||
|         self.enter_result = enter_result | ||||
| 
 | ||||
|     async def __aenter__(self): | ||||
|         self.ordering.append('enter') | ||||
|         return self.enter_result | ||||
| 
 | ||||
|     async def __aexit__(self, *exc_info): | ||||
|         self.ordering.append('exit') | ||||
| 
 | ||||
| 
 | ||||
| class LacksEnterAndExit: | ||||
|     pass | ||||
| class LacksEnter: | ||||
|     async def __aexit__(self, *exc_info): | ||||
|         pass | ||||
| class LacksExit: | ||||
|     async def __aenter__(self): | ||||
|         pass | ||||
| 
 | ||||
| 
 | ||||
| VAR = contextvars.ContextVar('VAR', default=()) | ||||
| 
 | ||||
| 
 | ||||
| class TestAsyncCase(unittest.TestCase): | ||||
|     maxDiff = None | ||||
| 
 | ||||
|     def tearDown(self): | ||||
|         # Ensure that IsolatedAsyncioTestCase instances are destroyed before | ||||
|         # starting a new event loop | ||||
|         support.gc_collect() | ||||
| 
 | ||||
|     def test_full_cycle(self): | ||||
|         class Test(unittest.IsolatedAsyncioTestCase): | ||||
|             def setUp(self): | ||||
|                 self.assertEqual(events, []) | ||||
|                 events.append('setUp') | ||||
|                 VAR.set(VAR.get() + ('setUp',)) | ||||
| 
 | ||||
|             async def asyncSetUp(self): | ||||
|                 self.assertEqual(events, ['setUp']) | ||||
|                 events.append('asyncSetUp') | ||||
|                 VAR.set(VAR.get() + ('asyncSetUp',)) | ||||
|                 self.addAsyncCleanup(self.on_cleanup1) | ||||
| 
 | ||||
|             async def test_func(self): | ||||
|                 self.assertEqual(events, ['setUp', | ||||
|                                           'asyncSetUp']) | ||||
|                 events.append('test') | ||||
|                 VAR.set(VAR.get() + ('test',)) | ||||
|                 self.addAsyncCleanup(self.on_cleanup2) | ||||
| 
 | ||||
|             async def asyncTearDown(self): | ||||
|                 self.assertEqual(events, ['setUp', | ||||
|                                           'asyncSetUp', | ||||
|                                           'test']) | ||||
|                 VAR.set(VAR.get() + ('asyncTearDown',)) | ||||
|                 events.append('asyncTearDown') | ||||
| 
 | ||||
|             def tearDown(self): | ||||
|                 self.assertEqual(events, ['setUp', | ||||
|                                           'asyncSetUp', | ||||
|                                           'test', | ||||
|                                           'asyncTearDown']) | ||||
|                 events.append('tearDown') | ||||
|                 VAR.set(VAR.get() + ('tearDown',)) | ||||
| 
 | ||||
|             async def on_cleanup1(self): | ||||
|                 self.assertEqual(events, ['setUp', | ||||
|                                           'asyncSetUp', | ||||
|                                           'test', | ||||
|                                           'asyncTearDown', | ||||
|                                           'tearDown', | ||||
|                                           'cleanup2']) | ||||
|                 events.append('cleanup1') | ||||
|                 VAR.set(VAR.get() + ('cleanup1',)) | ||||
|                 nonlocal cvar | ||||
|                 cvar = VAR.get() | ||||
| 
 | ||||
|             async def on_cleanup2(self): | ||||
|                 self.assertEqual(events, ['setUp', | ||||
|                                           'asyncSetUp', | ||||
|                                           'test', | ||||
|                                           'asyncTearDown', | ||||
|                                           'tearDown']) | ||||
|                 events.append('cleanup2') | ||||
|                 VAR.set(VAR.get() + ('cleanup2',)) | ||||
| 
 | ||||
|         events = [] | ||||
|         cvar = () | ||||
|         test = Test("test_func") | ||||
|         result = test.run() | ||||
|         self.assertEqual(result.errors, []) | ||||
|         self.assertEqual(result.failures, []) | ||||
|         expected = ['setUp', 'asyncSetUp', 'test', | ||||
|                     'asyncTearDown', 'tearDown', 'cleanup2', 'cleanup1'] | ||||
|         self.assertEqual(events, expected) | ||||
|         self.assertEqual(cvar, tuple(expected)) | ||||
| 
 | ||||
|         events = [] | ||||
|         cvar = () | ||||
|         test = Test("test_func") | ||||
|         test.debug() | ||||
|         self.assertEqual(events, expected) | ||||
|         self.assertEqual(cvar, tuple(expected)) | ||||
|         test.doCleanups() | ||||
|         self.assertEqual(events, expected) | ||||
|         self.assertEqual(cvar, tuple(expected)) | ||||
| 
 | ||||
|     def test_exception_in_setup(self): | ||||
|         class Test(unittest.IsolatedAsyncioTestCase): | ||||
|             async def asyncSetUp(self): | ||||
|                 events.append('asyncSetUp') | ||||
|                 self.addAsyncCleanup(self.on_cleanup) | ||||
|                 raise MyException() | ||||
| 
 | ||||
|             async def test_func(self): | ||||
|                 events.append('test') | ||||
| 
 | ||||
|             async def asyncTearDown(self): | ||||
|                 events.append('asyncTearDown') | ||||
| 
 | ||||
|             async def on_cleanup(self): | ||||
|                 events.append('cleanup') | ||||
| 
 | ||||
| 
 | ||||
|         events = [] | ||||
|         test = Test("test_func") | ||||
|         result = test.run() | ||||
|         self.assertEqual(events, ['asyncSetUp', 'cleanup']) | ||||
|         self.assertIs(result.errors[0][0], test) | ||||
|         self.assertIn('MyException', result.errors[0][1]) | ||||
| 
 | ||||
|         events = [] | ||||
|         test = Test("test_func") | ||||
|         try: | ||||
|             test.debug() | ||||
|         except MyException: | ||||
|             pass | ||||
|         else: | ||||
|             self.fail('Expected a MyException exception') | ||||
|         self.assertEqual(events, ['asyncSetUp']) | ||||
|         test.doCleanups() | ||||
|         self.assertEqual(events, ['asyncSetUp', 'cleanup']) | ||||
| 
 | ||||
|     def test_exception_in_test(self): | ||||
|         class Test(unittest.IsolatedAsyncioTestCase): | ||||
|             async def asyncSetUp(self): | ||||
|                 events.append('asyncSetUp') | ||||
| 
 | ||||
|             async def test_func(self): | ||||
|                 events.append('test') | ||||
|                 self.addAsyncCleanup(self.on_cleanup) | ||||
|                 raise MyException() | ||||
| 
 | ||||
|             async def asyncTearDown(self): | ||||
|                 events.append('asyncTearDown') | ||||
| 
 | ||||
|             async def on_cleanup(self): | ||||
|                 events.append('cleanup') | ||||
| 
 | ||||
|         events = [] | ||||
|         test = Test("test_func") | ||||
|         result = test.run() | ||||
|         self.assertEqual(events, ['asyncSetUp', 'test', 'asyncTearDown', 'cleanup']) | ||||
|         self.assertIs(result.errors[0][0], test) | ||||
|         self.assertIn('MyException', result.errors[0][1]) | ||||
| 
 | ||||
|         events = [] | ||||
|         test = Test("test_func") | ||||
|         try: | ||||
|             test.debug() | ||||
|         except MyException: | ||||
|             pass | ||||
|         else: | ||||
|             self.fail('Expected a MyException exception') | ||||
|         self.assertEqual(events, ['asyncSetUp', 'test']) | ||||
|         test.doCleanups() | ||||
|         self.assertEqual(events, ['asyncSetUp', 'test', 'cleanup']) | ||||
| 
 | ||||
|     def test_exception_in_tear_down(self): | ||||
|         class Test(unittest.IsolatedAsyncioTestCase): | ||||
|             async def asyncSetUp(self): | ||||
|                 events.append('asyncSetUp') | ||||
| 
 | ||||
|             async def test_func(self): | ||||
|                 events.append('test') | ||||
|                 self.addAsyncCleanup(self.on_cleanup) | ||||
| 
 | ||||
|             async def asyncTearDown(self): | ||||
|                 events.append('asyncTearDown') | ||||
|                 raise MyException() | ||||
| 
 | ||||
|             async def on_cleanup(self): | ||||
|                 events.append('cleanup') | ||||
| 
 | ||||
|         events = [] | ||||
|         test = Test("test_func") | ||||
|         result = test.run() | ||||
|         self.assertEqual(events, ['asyncSetUp', 'test', 'asyncTearDown', 'cleanup']) | ||||
|         self.assertIs(result.errors[0][0], test) | ||||
|         self.assertIn('MyException', result.errors[0][1]) | ||||
| 
 | ||||
|         events = [] | ||||
|         test = Test("test_func") | ||||
|         try: | ||||
|             test.debug() | ||||
|         except MyException: | ||||
|             pass | ||||
|         else: | ||||
|             self.fail('Expected a MyException exception') | ||||
|         self.assertEqual(events, ['asyncSetUp', 'test', 'asyncTearDown']) | ||||
|         test.doCleanups() | ||||
|         self.assertEqual(events, ['asyncSetUp', 'test', 'asyncTearDown', 'cleanup']) | ||||
| 
 | ||||
|     def test_exception_in_tear_clean_up(self): | ||||
|         class Test(unittest.IsolatedAsyncioTestCase): | ||||
|             async def asyncSetUp(self): | ||||
|                 events.append('asyncSetUp') | ||||
| 
 | ||||
|             async def test_func(self): | ||||
|                 events.append('test') | ||||
|                 self.addAsyncCleanup(self.on_cleanup1) | ||||
|                 self.addAsyncCleanup(self.on_cleanup2) | ||||
| 
 | ||||
|             async def asyncTearDown(self): | ||||
|                 events.append('asyncTearDown') | ||||
| 
 | ||||
|             async def on_cleanup1(self): | ||||
|                 events.append('cleanup1') | ||||
|                 raise MyException('some error') | ||||
| 
 | ||||
|             async def on_cleanup2(self): | ||||
|                 events.append('cleanup2') | ||||
|                 raise MyException('other error') | ||||
| 
 | ||||
|         events = [] | ||||
|         test = Test("test_func") | ||||
|         result = test.run() | ||||
|         self.assertEqual(events, ['asyncSetUp', 'test', 'asyncTearDown', 'cleanup2', 'cleanup1']) | ||||
|         self.assertIs(result.errors[0][0], test) | ||||
|         self.assertIn('MyException: other error', result.errors[0][1]) | ||||
|         self.assertIn('MyException: some error', result.errors[1][1]) | ||||
| 
 | ||||
|         events = [] | ||||
|         test = Test("test_func") | ||||
|         try: | ||||
|             test.debug() | ||||
|         except MyException: | ||||
|             pass | ||||
|         else: | ||||
|             self.fail('Expected a MyException exception') | ||||
|         self.assertEqual(events, ['asyncSetUp', 'test', 'asyncTearDown', 'cleanup2']) | ||||
|         test.doCleanups() | ||||
|         self.assertEqual(events, ['asyncSetUp', 'test', 'asyncTearDown', 'cleanup2', 'cleanup1']) | ||||
| 
 | ||||
|     def test_deprecation_of_return_val_from_test(self): | ||||
|         # Issue 41322 - deprecate return of value!=None from a test | ||||
|         class Test(unittest.IsolatedAsyncioTestCase): | ||||
|             async def test1(self): | ||||
|                 return 1 | ||||
|             async def test2(self): | ||||
|                 yield 1 | ||||
| 
 | ||||
|         with self.assertWarns(DeprecationWarning) as w: | ||||
|             Test('test1').run() | ||||
|         self.assertIn('It is deprecated to return a value!=None', str(w.warning)) | ||||
|         self.assertIn('test1', str(w.warning)) | ||||
|         self.assertEqual(w.filename, __file__) | ||||
| 
 | ||||
|         with self.assertWarns(DeprecationWarning) as w: | ||||
|             Test('test2').run() | ||||
|         self.assertIn('It is deprecated to return a value!=None', str(w.warning)) | ||||
|         self.assertIn('test2', str(w.warning)) | ||||
|         self.assertEqual(w.filename, __file__) | ||||
| 
 | ||||
|     def test_cleanups_interleave_order(self): | ||||
|         events = [] | ||||
| 
 | ||||
|         class Test(unittest.IsolatedAsyncioTestCase): | ||||
|             async def test_func(self): | ||||
|                 self.addAsyncCleanup(self.on_sync_cleanup, 1) | ||||
|                 self.addAsyncCleanup(self.on_async_cleanup, 2) | ||||
|                 self.addAsyncCleanup(self.on_sync_cleanup, 3) | ||||
|                 self.addAsyncCleanup(self.on_async_cleanup, 4) | ||||
| 
 | ||||
|             async def on_sync_cleanup(self, val): | ||||
|                 events.append(f'sync_cleanup {val}') | ||||
| 
 | ||||
|             async def on_async_cleanup(self, val): | ||||
|                 events.append(f'async_cleanup {val}') | ||||
| 
 | ||||
|         test = Test("test_func") | ||||
|         test.run() | ||||
|         self.assertEqual(events, ['async_cleanup 4', | ||||
|                                   'sync_cleanup 3', | ||||
|                                   'async_cleanup 2', | ||||
|                                   'sync_cleanup 1']) | ||||
| 
 | ||||
|     def test_base_exception_from_async_method(self): | ||||
|         events = [] | ||||
|         class Test(unittest.IsolatedAsyncioTestCase): | ||||
|             async def test_base(self): | ||||
|                 events.append("test_base") | ||||
|                 raise BaseException() | ||||
|                 events.append("not it") | ||||
| 
 | ||||
|             async def test_no_err(self): | ||||
|                 events.append("test_no_err") | ||||
| 
 | ||||
|             async def test_cancel(self): | ||||
|                 raise asyncio.CancelledError() | ||||
| 
 | ||||
|         test = Test("test_base") | ||||
|         output = test.run() | ||||
|         self.assertFalse(output.wasSuccessful()) | ||||
| 
 | ||||
|         test = Test("test_no_err") | ||||
|         test.run() | ||||
|         self.assertEqual(events, ['test_base', 'test_no_err']) | ||||
| 
 | ||||
|         test = Test("test_cancel") | ||||
|         output = test.run() | ||||
|         self.assertFalse(output.wasSuccessful()) | ||||
| 
 | ||||
|     def test_cancellation_hanging_tasks(self): | ||||
|         cancelled = False | ||||
|         class Test(unittest.IsolatedAsyncioTestCase): | ||||
|             async def test_leaking_task(self): | ||||
|                 async def coro(): | ||||
|                     nonlocal cancelled | ||||
|                     try: | ||||
|                         await asyncio.sleep(1) | ||||
|                     except asyncio.CancelledError: | ||||
|                         cancelled = True | ||||
|                         raise | ||||
| 
 | ||||
|                 # Leave this running in the background | ||||
|                 asyncio.create_task(coro()) | ||||
| 
 | ||||
|         test = Test("test_leaking_task") | ||||
|         output = test.run() | ||||
|         self.assertTrue(cancelled) | ||||
| 
 | ||||
|     def test_enterAsyncContext(self): | ||||
|         events = [] | ||||
| 
 | ||||
|         class Test(unittest.IsolatedAsyncioTestCase): | ||||
|             async def test_func(slf): | ||||
|                 slf.addAsyncCleanup(events.append, 'cleanup1') | ||||
|                 cm = TestCM(events, 42) | ||||
|                 self.assertEqual(await slf.enterAsyncContext(cm), 42) | ||||
|                 slf.addAsyncCleanup(events.append, 'cleanup2') | ||||
|                 events.append('test') | ||||
| 
 | ||||
|         test = Test('test_func') | ||||
|         output = test.run() | ||||
|         self.assertTrue(output.wasSuccessful(), output) | ||||
|         self.assertEqual(events, ['enter', 'test', 'cleanup2', 'exit', 'cleanup1']) | ||||
| 
 | ||||
|     def test_enterAsyncContext_arg_errors(self): | ||||
|         class Test(unittest.IsolatedAsyncioTestCase): | ||||
|             async def test_func(slf): | ||||
|                 with self.assertRaisesRegex(TypeError, 'asynchronous context manager'): | ||||
|                     await slf.enterAsyncContext(LacksEnterAndExit()) | ||||
|                 with self.assertRaisesRegex(TypeError, 'asynchronous context manager'): | ||||
|                     await slf.enterAsyncContext(LacksEnter()) | ||||
|                 with self.assertRaisesRegex(TypeError, 'asynchronous context manager'): | ||||
|                     await slf.enterAsyncContext(LacksExit()) | ||||
| 
 | ||||
|         test = Test('test_func') | ||||
|         output = test.run() | ||||
|         self.assertTrue(output.wasSuccessful()) | ||||
| 
 | ||||
|     def test_debug_cleanup_same_loop(self): | ||||
|         class Test(unittest.IsolatedAsyncioTestCase): | ||||
|             async def asyncSetUp(self): | ||||
|                 async def coro(): | ||||
|                     await asyncio.sleep(0) | ||||
|                 fut = asyncio.ensure_future(coro()) | ||||
|                 self.addAsyncCleanup(self.cleanup, fut) | ||||
|                 events.append('asyncSetUp') | ||||
| 
 | ||||
|             async def test_func(self): | ||||
|                 events.append('test') | ||||
|                 raise MyException() | ||||
| 
 | ||||
|             async def asyncTearDown(self): | ||||
|                 events.append('asyncTearDown') | ||||
| 
 | ||||
|             async def cleanup(self, fut): | ||||
|                 try: | ||||
|                     # Raises an exception if in different loop | ||||
|                     await asyncio.wait([fut]) | ||||
|                     events.append('cleanup') | ||||
|                 except: | ||||
|                     import traceback | ||||
|                     traceback.print_exc() | ||||
|                     raise | ||||
| 
 | ||||
|         events = [] | ||||
|         test = Test("test_func") | ||||
|         result = test.run() | ||||
|         self.assertEqual(events, ['asyncSetUp', 'test', 'asyncTearDown', 'cleanup']) | ||||
|         self.assertIn('MyException', result.errors[0][1]) | ||||
| 
 | ||||
|         events = [] | ||||
|         test = Test("test_func") | ||||
|         try: | ||||
|             test.debug() | ||||
|         except MyException: | ||||
|             pass | ||||
|         else: | ||||
|             self.fail('Expected a MyException exception') | ||||
|         self.assertEqual(events, ['asyncSetUp', 'test']) | ||||
|         test.doCleanups() | ||||
|         self.assertEqual(events, ['asyncSetUp', 'test', 'cleanup']) | ||||
| 
 | ||||
| 
 | ||||
| if __name__ == "__main__": | ||||
|     unittest.main() | ||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue
	
	 Victor Stinner
						Victor Stinner