mirror of
				https://github.com/python/cpython.git
				synced 2025-10-25 18:54:53 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			199 lines
		
	
	
	
		
			6.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			199 lines
		
	
	
	
		
			6.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| from asyncio import subprocess
 | |
| from asyncio import test_utils
 | |
| import asyncio
 | |
| import signal
 | |
| import sys
 | |
| import unittest
 | |
| from test import support
 | |
| if sys.platform != 'win32':
 | |
|     from asyncio import unix_events
 | |
| 
 | |
| # Program blocking
 | |
| PROGRAM_BLOCKED = [sys.executable, '-c', 'import time; time.sleep(3600)']
 | |
| 
 | |
| # Program copying input to output
 | |
| PROGRAM_CAT = [
 | |
|     sys.executable, '-c',
 | |
|     ';'.join(('import sys',
 | |
|               'data = sys.stdin.buffer.read()',
 | |
|               'sys.stdout.buffer.write(data)'))]
 | |
| 
 | |
| class SubprocessMixin:
 | |
| 
 | |
|     def test_stdin_stdout(self):
 | |
|         args = PROGRAM_CAT
 | |
| 
 | |
|         @asyncio.coroutine
 | |
|         def run(data):
 | |
|             proc = yield from asyncio.create_subprocess_exec(
 | |
|                                           *args,
 | |
|                                           stdin=subprocess.PIPE,
 | |
|                                           stdout=subprocess.PIPE,
 | |
|                                           loop=self.loop)
 | |
| 
 | |
|             # feed data
 | |
|             proc.stdin.write(data)
 | |
|             yield from proc.stdin.drain()
 | |
|             proc.stdin.close()
 | |
| 
 | |
|             # get output and exitcode
 | |
|             data = yield from proc.stdout.read()
 | |
|             exitcode = yield from proc.wait()
 | |
|             return (exitcode, data)
 | |
| 
 | |
|         task = run(b'some data')
 | |
|         task = asyncio.wait_for(task, 10.0, loop=self.loop)
 | |
|         exitcode, stdout = self.loop.run_until_complete(task)
 | |
|         self.assertEqual(exitcode, 0)
 | |
|         self.assertEqual(stdout, b'some data')
 | |
| 
 | |
|     def test_communicate(self):
 | |
|         args = PROGRAM_CAT
 | |
| 
 | |
|         @asyncio.coroutine
 | |
|         def run(data):
 | |
|             proc = yield from asyncio.create_subprocess_exec(
 | |
|                                           *args,
 | |
|                                           stdin=subprocess.PIPE,
 | |
|                                           stdout=subprocess.PIPE,
 | |
|                                           loop=self.loop)
 | |
|             stdout, stderr = yield from proc.communicate(data)
 | |
|             return proc.returncode, stdout
 | |
| 
 | |
|         task = run(b'some data')
 | |
|         task = asyncio.wait_for(task, 10.0, loop=self.loop)
 | |
|         exitcode, stdout = self.loop.run_until_complete(task)
 | |
|         self.assertEqual(exitcode, 0)
 | |
|         self.assertEqual(stdout, b'some data')
 | |
| 
 | |
|     def test_shell(self):
 | |
|         create = asyncio.create_subprocess_shell('exit 7',
 | |
|                                                  loop=self.loop)
 | |
|         proc = self.loop.run_until_complete(create)
 | |
|         exitcode = self.loop.run_until_complete(proc.wait())
 | |
|         self.assertEqual(exitcode, 7)
 | |
| 
 | |
|     def test_start_new_session(self):
 | |
|         # start the new process in a new session
 | |
|         create = asyncio.create_subprocess_shell('exit 8',
 | |
|                                                  start_new_session=True,
 | |
|                                                  loop=self.loop)
 | |
|         proc = self.loop.run_until_complete(create)
 | |
|         exitcode = self.loop.run_until_complete(proc.wait())
 | |
|         self.assertEqual(exitcode, 8)
 | |
| 
 | |
|     def test_kill(self):
 | |
|         args = PROGRAM_BLOCKED
 | |
|         create = asyncio.create_subprocess_exec(*args, loop=self.loop)
 | |
|         proc = self.loop.run_until_complete(create)
 | |
|         proc.kill()
 | |
|         returncode = self.loop.run_until_complete(proc.wait())
 | |
|         if sys.platform == 'win32':
 | |
|             self.assertIsInstance(returncode, int)
 | |
|             # expect 1 but sometimes get 0
 | |
|         else:
 | |
|             self.assertEqual(-signal.SIGKILL, returncode)
 | |
| 
 | |
|     def test_terminate(self):
 | |
|         args = PROGRAM_BLOCKED
 | |
|         create = asyncio.create_subprocess_exec(*args, loop=self.loop)
 | |
|         proc = self.loop.run_until_complete(create)
 | |
|         proc.terminate()
 | |
|         returncode = self.loop.run_until_complete(proc.wait())
 | |
|         if sys.platform == 'win32':
 | |
|             self.assertIsInstance(returncode, int)
 | |
|             # expect 1 but sometimes get 0
 | |
|         else:
 | |
|             self.assertEqual(-signal.SIGTERM, returncode)
 | |
| 
 | |
|     @unittest.skipIf(sys.platform == 'win32', "Don't have SIGHUP")
 | |
|     def test_send_signal(self):
 | |
|         args = PROGRAM_BLOCKED
 | |
|         create = asyncio.create_subprocess_exec(*args, loop=self.loop)
 | |
|         proc = self.loop.run_until_complete(create)
 | |
|         proc.send_signal(signal.SIGHUP)
 | |
|         returncode = self.loop.run_until_complete(proc.wait())
 | |
|         self.assertEqual(-signal.SIGHUP, returncode)
 | |
| 
 | |
|     def prepare_broken_pipe_test(self):
 | |
|         # buffer large enough to feed the whole pipe buffer
 | |
|         large_data = b'x' * support.PIPE_MAX_SIZE
 | |
| 
 | |
|         # the program ends before the stdin can be feeded
 | |
|         create = asyncio.create_subprocess_exec(
 | |
|                              sys.executable, '-c', 'pass',
 | |
|                              stdin=subprocess.PIPE,
 | |
|                              loop=self.loop)
 | |
|         proc = self.loop.run_until_complete(create)
 | |
|         return (proc, large_data)
 | |
| 
 | |
|     def test_stdin_broken_pipe(self):
 | |
|         proc, large_data = self.prepare_broken_pipe_test()
 | |
| 
 | |
|         # drain() must raise BrokenPipeError or ConnectionResetError
 | |
|         proc.stdin.write(large_data)
 | |
|         self.assertRaises((BrokenPipeError, ConnectionResetError),
 | |
|                           self.loop.run_until_complete, proc.stdin.drain())
 | |
|         self.loop.run_until_complete(proc.wait())
 | |
| 
 | |
|     def test_communicate_ignore_broken_pipe(self):
 | |
|         proc, large_data = self.prepare_broken_pipe_test()
 | |
| 
 | |
|         # communicate() must ignore BrokenPipeError when feeding stdin
 | |
|         self.loop.run_until_complete(proc.communicate(large_data))
 | |
|         self.loop.run_until_complete(proc.wait())
 | |
| 
 | |
| 
 | |
| if sys.platform != 'win32':
 | |
|     # Unix
 | |
|     class SubprocessWatcherMixin(SubprocessMixin):
 | |
| 
 | |
|         Watcher = None
 | |
| 
 | |
|         def setUp(self):
 | |
|             policy = asyncio.get_event_loop_policy()
 | |
|             self.loop = policy.new_event_loop()
 | |
| 
 | |
|             # ensure that the event loop is passed explicitly in asyncio
 | |
|             policy.set_event_loop(None)
 | |
| 
 | |
|             watcher = self.Watcher()
 | |
|             watcher.attach_loop(self.loop)
 | |
|             policy.set_child_watcher(watcher)
 | |
| 
 | |
|         def tearDown(self):
 | |
|             policy = asyncio.get_event_loop_policy()
 | |
|             policy.set_child_watcher(None)
 | |
|             self.loop.close()
 | |
|             super().tearDown()
 | |
| 
 | |
|     class SubprocessSafeWatcherTests(SubprocessWatcherMixin,
 | |
|                                      test_utils.TestCase):
 | |
| 
 | |
|         Watcher = unix_events.SafeChildWatcher
 | |
| 
 | |
|     class SubprocessFastWatcherTests(SubprocessWatcherMixin,
 | |
|                                      test_utils.TestCase):
 | |
| 
 | |
|         Watcher = unix_events.FastChildWatcher
 | |
| 
 | |
| else:
 | |
|     # Windows
 | |
|     class SubprocessProactorTests(SubprocessMixin, test_utils.TestCase):
 | |
| 
 | |
|         def setUp(self):
 | |
|             policy = asyncio.get_event_loop_policy()
 | |
|             self.loop = asyncio.ProactorEventLoop()
 | |
| 
 | |
|             # ensure that the event loop is passed explicitly in asyncio
 | |
|             policy.set_event_loop(None)
 | |
| 
 | |
|         def tearDown(self):
 | |
|             policy = asyncio.get_event_loop_policy()
 | |
|             self.loop.close()
 | |
|             policy.set_event_loop(None)
 | |
|             super().tearDown()
 | |
| 
 | |
| 
 | |
| if __name__ == '__main__':
 | |
|     unittest.main()
 | 
