mirror of
				https://github.com/python/cpython.git
				synced 2025-10-27 03:34:32 +00:00 
			
		
		
		
	GH-100192: add more asyncio subprocess tests (#100194)
				
					
				
			This commit is contained in:
		
							parent
							
								
									08e5594cf3
								
							
						
					
					
						commit
						e97afefda5
					
				
					 1 changed files with 82 additions and 0 deletions
				
			
		|  | @ -686,6 +686,88 @@ async def execute(): | |||
| 
 | ||||
|         self.assertIsNone(self.loop.run_until_complete(execute())) | ||||
| 
 | ||||
|     async def check_stdout_output(self, coro, output): | ||||
|         proc = await coro | ||||
|         stdout, _ = await proc.communicate() | ||||
|         self.assertEqual(stdout, output) | ||||
|         self.assertEqual(proc.returncode, 0) | ||||
|         task = asyncio.create_task(proc.wait()) | ||||
|         await asyncio.sleep(0) | ||||
|         self.assertEqual(task.result(), proc.returncode) | ||||
| 
 | ||||
|     def test_create_subprocess_env_shell(self) -> None: | ||||
|         async def main() -> None: | ||||
|             cmd = f'''{sys.executable} -c "import os, sys; sys.stdout.write(os.getenv('FOO'))"''' | ||||
|             env = {"FOO": 'bar'} | ||||
|             proc = await asyncio.create_subprocess_shell( | ||||
|                 cmd, env=env, stdout=subprocess.PIPE | ||||
|             ) | ||||
|             return proc | ||||
| 
 | ||||
|         self.loop.run_until_complete(self.check_stdout_output(main(), b'bar')) | ||||
| 
 | ||||
|     def test_create_subprocess_env_exec(self) -> None: | ||||
|         async def main() -> None: | ||||
|             cmd = [sys.executable, "-c", | ||||
|                    "import os, sys; sys.stdout.write(os.getenv('FOO'))"] | ||||
|             env = {"FOO": 'baz'} | ||||
|             proc = await asyncio.create_subprocess_exec( | ||||
|                 *cmd, env=env, stdout=subprocess.PIPE | ||||
|             ) | ||||
|             return proc | ||||
| 
 | ||||
|         self.loop.run_until_complete(self.check_stdout_output(main(), b'baz')) | ||||
| 
 | ||||
| 
 | ||||
|     def test_subprocess_concurrent_wait(self) -> None: | ||||
|         async def main() -> None: | ||||
|             proc = await asyncio.create_subprocess_exec( | ||||
|                 *PROGRAM_CAT, | ||||
|                 stdin=subprocess.PIPE, | ||||
|                 stdout=subprocess.PIPE, | ||||
|             ) | ||||
|             stdout, _ = await proc.communicate(b'some data') | ||||
|             self.assertEqual(stdout, b"some data") | ||||
|             self.assertEqual(proc.returncode, 0) | ||||
|             self.assertEqual(await asyncio.gather(*[proc.wait() for _ in range(10)]), | ||||
|                              [proc.returncode] * 10) | ||||
| 
 | ||||
|         self.loop.run_until_complete(main()) | ||||
| 
 | ||||
|     def test_subprocess_consistent_callbacks(self): | ||||
|         events = [] | ||||
|         class MyProtocol(asyncio.SubprocessProtocol): | ||||
|             def __init__(self, exit_future: asyncio.Future) -> None: | ||||
|                 self.exit_future = exit_future | ||||
| 
 | ||||
|             def pipe_data_received(self, fd, data) -> None: | ||||
|                 events.append(('pipe_data_received', fd, data)) | ||||
| 
 | ||||
|             def pipe_connection_lost(self, fd, exc) -> None: | ||||
|                 events.append('pipe_connection_lost') | ||||
| 
 | ||||
|             def process_exited(self) -> None: | ||||
|                 events.append('process_exited') | ||||
|                 self.exit_future.set_result(True) | ||||
| 
 | ||||
|         async def main() -> None: | ||||
|             loop = asyncio.get_running_loop() | ||||
|             exit_future = asyncio.Future() | ||||
|             code = 'import sys; sys.stdout.write("stdout"); sys.stderr.write("stderr")' | ||||
|             transport, _ = await loop.subprocess_exec(lambda: MyProtocol(exit_future), | ||||
|                                                       sys.executable, '-c', code, stdin=None) | ||||
|             await exit_future | ||||
|             transport.close() | ||||
|             self.assertEqual(events, [ | ||||
|                 ('pipe_data_received', 1, b'stdout'), | ||||
|                 ('pipe_data_received', 2, b'stderr'), | ||||
|                 'pipe_connection_lost', | ||||
|                 'pipe_connection_lost', | ||||
|                 'process_exited', | ||||
|             ]) | ||||
| 
 | ||||
|         self.loop.run_until_complete(main()) | ||||
| 
 | ||||
|     def test_subprocess_communicate_stdout(self): | ||||
|         # See https://github.com/python/cpython/issues/100133 | ||||
|         async def get_command_stdout(cmd, *args): | ||||
|  |  | |||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue
	
	 Kumar Aditya
						Kumar Aditya