| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  | """Tests for window_utils""" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import sys | 
					
						
							|  |  |  | import unittest | 
					
						
							| 
									
										
										
										
											2015-01-15 14:24:55 +01:00
										 |  |  | import warnings | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  | if sys.platform != 'win32': | 
					
						
							|  |  |  |     raise unittest.SkipTest('Windows only') | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-11-28 15:19:56 +01:00
										 |  |  | import _overlapped | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  | import _winapi | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-06-01 20:34:09 -07:00
										 |  |  | import asyncio | 
					
						
							| 
									
										
										
										
											2014-12-26 21:16:42 +01:00
										 |  |  | from asyncio import windows_utils | 
					
						
							| 
									
										
										
										
											2017-12-11 10:04:40 -05:00
										 |  |  | from test import support | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-06-01 20:34:09 -07:00
										 |  |  | def tearDownModule(): | 
					
						
							|  |  |  |     asyncio.set_event_loop_policy(None) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  | class PipeTests(unittest.TestCase): | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_pipe_overlapped(self): | 
					
						
							|  |  |  |         h1, h2 = windows_utils.pipe(overlapped=(True, True)) | 
					
						
							|  |  |  |         try: | 
					
						
							|  |  |  |             ov1 = _overlapped.Overlapped() | 
					
						
							|  |  |  |             self.assertFalse(ov1.pending) | 
					
						
							|  |  |  |             self.assertEqual(ov1.error, 0) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             ov1.ReadFile(h1, 100) | 
					
						
							|  |  |  |             self.assertTrue(ov1.pending) | 
					
						
							|  |  |  |             self.assertEqual(ov1.error, _winapi.ERROR_IO_PENDING) | 
					
						
							|  |  |  |             ERROR_IO_INCOMPLETE = 996 | 
					
						
							|  |  |  |             try: | 
					
						
							|  |  |  |                 ov1.getresult() | 
					
						
							|  |  |  |             except OSError as e: | 
					
						
							|  |  |  |                 self.assertEqual(e.winerror, ERROR_IO_INCOMPLETE) | 
					
						
							|  |  |  |             else: | 
					
						
							|  |  |  |                 raise RuntimeError('expected ERROR_IO_INCOMPLETE') | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             ov2 = _overlapped.Overlapped() | 
					
						
							|  |  |  |             self.assertFalse(ov2.pending) | 
					
						
							|  |  |  |             self.assertEqual(ov2.error, 0) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             ov2.WriteFile(h2, b"hello") | 
					
						
							|  |  |  |             self.assertIn(ov2.error, {0, _winapi.ERROR_IO_PENDING}) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             res = _winapi.WaitForMultipleObjects([ov2.event], False, 100) | 
					
						
							|  |  |  |             self.assertEqual(res, _winapi.WAIT_OBJECT_0) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             self.assertFalse(ov1.pending) | 
					
						
							|  |  |  |             self.assertEqual(ov1.error, ERROR_IO_INCOMPLETE) | 
					
						
							|  |  |  |             self.assertFalse(ov2.pending) | 
					
						
							|  |  |  |             self.assertIn(ov2.error, {0, _winapi.ERROR_IO_PENDING}) | 
					
						
							|  |  |  |             self.assertEqual(ov1.getresult(), b"hello") | 
					
						
							|  |  |  |         finally: | 
					
						
							|  |  |  |             _winapi.CloseHandle(h1) | 
					
						
							|  |  |  |             _winapi.CloseHandle(h2) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_pipe_handle(self): | 
					
						
							|  |  |  |         h, _ = windows_utils.pipe(overlapped=(True, True)) | 
					
						
							|  |  |  |         _winapi.CloseHandle(_) | 
					
						
							|  |  |  |         p = windows_utils.PipeHandle(h) | 
					
						
							|  |  |  |         self.assertEqual(p.fileno(), h) | 
					
						
							|  |  |  |         self.assertEqual(p.handle, h) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # check garbage collection of p closes handle | 
					
						
							| 
									
										
										
										
											2015-01-15 14:24:55 +01:00
										 |  |  |         with warnings.catch_warnings(): | 
					
						
							|  |  |  |             warnings.filterwarnings("ignore", "",  ResourceWarning) | 
					
						
							|  |  |  |             del p | 
					
						
							|  |  |  |             support.gc_collect() | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |         try: | 
					
						
							|  |  |  |             _winapi.CloseHandle(h) | 
					
						
							|  |  |  |         except OSError as e: | 
					
						
							|  |  |  |             self.assertEqual(e.winerror, 6)     # ERROR_INVALID_HANDLE | 
					
						
							|  |  |  |         else: | 
					
						
							|  |  |  |             raise RuntimeError('expected ERROR_INVALID_HANDLE') | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class PopenTests(unittest.TestCase): | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_popen(self): | 
					
						
							|  |  |  |         command = r"""if 1:
 | 
					
						
							|  |  |  |             import sys | 
					
						
							|  |  |  |             s = sys.stdin.readline() | 
					
						
							|  |  |  |             sys.stdout.write(s.upper()) | 
					
						
							|  |  |  |             sys.stderr.write('stderr') | 
					
						
							|  |  |  |             """
 | 
					
						
							|  |  |  |         msg = b"blah\n" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         p = windows_utils.Popen([sys.executable, '-c', command], | 
					
						
							|  |  |  |                                 stdin=windows_utils.PIPE, | 
					
						
							|  |  |  |                                 stdout=windows_utils.PIPE, | 
					
						
							|  |  |  |                                 stderr=windows_utils.PIPE) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         for f in [p.stdin, p.stdout, p.stderr]: | 
					
						
							|  |  |  |             self.assertIsInstance(f, windows_utils.PipeHandle) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         ovin = _overlapped.Overlapped() | 
					
						
							|  |  |  |         ovout = _overlapped.Overlapped() | 
					
						
							|  |  |  |         overr = _overlapped.Overlapped() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         ovin.WriteFile(p.stdin.handle, msg) | 
					
						
							|  |  |  |         ovout.ReadFile(p.stdout.handle, 100) | 
					
						
							|  |  |  |         overr.ReadFile(p.stderr.handle, 100) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         events = [ovin.event, ovout.event, overr.event] | 
					
						
							| 
									
										
										
										
											2013-11-15 07:41:10 -08:00
										 |  |  |         # Super-long timeout for slow buildbots. | 
					
						
							| 
									
										
										
										
											2021-06-21 17:29:13 -07:00
										 |  |  |         res = _winapi.WaitForMultipleObjects(events, True, | 
					
						
							|  |  |  |                                              int(support.SHORT_TIMEOUT * 1000)) | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |         self.assertEqual(res, _winapi.WAIT_OBJECT_0) | 
					
						
							|  |  |  |         self.assertFalse(ovout.pending) | 
					
						
							|  |  |  |         self.assertFalse(overr.pending) | 
					
						
							|  |  |  |         self.assertFalse(ovin.pending) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.assertEqual(ovin.getresult(), len(msg)) | 
					
						
							|  |  |  |         out = ovout.getresult().rstrip() | 
					
						
							|  |  |  |         err = overr.getresult().rstrip() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.assertGreater(len(out), 0) | 
					
						
							|  |  |  |         self.assertGreater(len(err), 0) | 
					
						
							|  |  |  |         # allow for partial reads... | 
					
						
							|  |  |  |         self.assertTrue(msg.upper().rstrip().startswith(out)) | 
					
						
							|  |  |  |         self.assertTrue(b"stderr".startswith(err)) | 
					
						
							| 
									
										
										
										
											2013-10-20 21:02:53 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-01-15 14:24:55 +01:00
										 |  |  |         # The context manager calls wait() and closes resources | 
					
						
							|  |  |  |         with p: | 
					
						
							|  |  |  |             pass | 
					
						
							| 
									
										
										
										
											2014-06-04 00:42:04 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2013-10-20 21:02:53 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  | if __name__ == '__main__': | 
					
						
							|  |  |  |     unittest.main() |