| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  | """Tests for window_utils""" | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2014-03-06 00:52:53 +01:00
										 |  |  | import socket | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  | import sys | 
					
						
							|  |  |  | import unittest | 
					
						
							| 
									
										
										
										
											2015-01-15 14:24:55 +01:00
										 |  |  | import warnings | 
					
						
							| 
									
										
										
										
											2014-02-26 10:25:02 +01:00
										 |  |  | from unittest import mock | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  | if sys.platform != 'win32': | 
					
						
							|  |  |  |     raise unittest.SkipTest('Windows only') | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import _winapi | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2013-10-30 14:52:03 -07:00
										 |  |  | from asyncio import _overlapped | 
					
						
							| 
									
										
										
										
											2014-12-26 21:16:42 +01:00
										 |  |  | from asyncio import windows_utils | 
					
						
							|  |  |  | try: | 
					
						
							|  |  |  |     from test import support | 
					
						
							|  |  |  | except ImportError: | 
					
						
							|  |  |  |     from asyncio import test_support as support | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class WinsocketpairTests(unittest.TestCase): | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2014-03-06 00:52:53 +01:00
										 |  |  |     def check_winsocketpair(self, ssock, csock): | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |         csock.send(b'xxx') | 
					
						
							|  |  |  |         self.assertEqual(b'xxx', ssock.recv(1024)) | 
					
						
							|  |  |  |         csock.close() | 
					
						
							|  |  |  |         ssock.close() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2014-03-06 00:52:53 +01:00
										 |  |  |     def test_winsocketpair(self): | 
					
						
							|  |  |  |         ssock, csock = windows_utils.socketpair() | 
					
						
							|  |  |  |         self.check_winsocketpair(ssock, csock) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2014-12-18 12:29:53 +01:00
										 |  |  |     @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 not supported or enabled') | 
					
						
							| 
									
										
										
										
											2014-03-06 00:52:53 +01:00
										 |  |  |     def test_winsocketpair_ipv6(self): | 
					
						
							|  |  |  |         ssock, csock = windows_utils.socketpair(family=socket.AF_INET6) | 
					
						
							|  |  |  |         self.check_winsocketpair(ssock, csock) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2014-10-14 22:56:25 +02:00
										 |  |  |     @unittest.skipIf(hasattr(socket, 'socketpair'), | 
					
						
							|  |  |  |                      'socket.socketpair is available') | 
					
						
							| 
									
										
										
										
											2014-02-26 10:25:02 +01:00
										 |  |  |     @mock.patch('asyncio.windows_utils.socket') | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |     def test_winsocketpair_exc(self, m_socket): | 
					
						
							| 
									
										
										
										
											2014-03-06 00:52:53 +01:00
										 |  |  |         m_socket.AF_INET = socket.AF_INET | 
					
						
							|  |  |  |         m_socket.SOCK_STREAM = socket.SOCK_STREAM | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |         m_socket.socket.return_value.getsockname.return_value = ('', 12345) | 
					
						
							|  |  |  |         m_socket.socket.return_value.accept.return_value = object(), object() | 
					
						
							|  |  |  |         m_socket.socket.return_value.connect.side_effect = OSError() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.assertRaises(OSError, windows_utils.socketpair) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2014-03-06 00:52:53 +01:00
										 |  |  |     def test_winsocketpair_invalid_args(self): | 
					
						
							|  |  |  |         self.assertRaises(ValueError, | 
					
						
							|  |  |  |                           windows_utils.socketpair, family=socket.AF_UNSPEC) | 
					
						
							|  |  |  |         self.assertRaises(ValueError, | 
					
						
							|  |  |  |                           windows_utils.socketpair, type=socket.SOCK_DGRAM) | 
					
						
							|  |  |  |         self.assertRaises(ValueError, | 
					
						
							|  |  |  |                           windows_utils.socketpair, proto=1) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2014-10-14 22:56:25 +02:00
										 |  |  |     @unittest.skipIf(hasattr(socket, 'socketpair'), | 
					
						
							|  |  |  |                      'socket.socketpair is available') | 
					
						
							| 
									
										
										
										
											2014-06-04 00:12:28 +02:00
										 |  |  |     @mock.patch('asyncio.windows_utils.socket') | 
					
						
							|  |  |  |     def test_winsocketpair_close(self, m_socket): | 
					
						
							|  |  |  |         m_socket.AF_INET = socket.AF_INET | 
					
						
							|  |  |  |         m_socket.SOCK_STREAM = socket.SOCK_STREAM | 
					
						
							|  |  |  |         sock = mock.Mock() | 
					
						
							|  |  |  |         m_socket.socket.return_value = sock | 
					
						
							|  |  |  |         sock.bind.side_effect = OSError | 
					
						
							|  |  |  |         self.assertRaises(OSError, windows_utils.socketpair) | 
					
						
							|  |  |  |         self.assertTrue(sock.close.called) | 
					
						
							| 
									
										
										
										
											2014-03-06 00:52:53 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  | class PipeTests(unittest.TestCase): | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_pipe_overlapped(self): | 
					
						
							|  |  |  |         h1, h2 = windows_utils.pipe(overlapped=(True, True)) | 
					
						
							|  |  |  |         try: | 
					
						
							|  |  |  |             ov1 = _overlapped.Overlapped() | 
					
						
							|  |  |  |             self.assertFalse(ov1.pending) | 
					
						
							|  |  |  |             self.assertEqual(ov1.error, 0) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             ov1.ReadFile(h1, 100) | 
					
						
							|  |  |  |             self.assertTrue(ov1.pending) | 
					
						
							|  |  |  |             self.assertEqual(ov1.error, _winapi.ERROR_IO_PENDING) | 
					
						
							|  |  |  |             ERROR_IO_INCOMPLETE = 996 | 
					
						
							|  |  |  |             try: | 
					
						
							|  |  |  |                 ov1.getresult() | 
					
						
							|  |  |  |             except OSError as e: | 
					
						
							|  |  |  |                 self.assertEqual(e.winerror, ERROR_IO_INCOMPLETE) | 
					
						
							|  |  |  |             else: | 
					
						
							|  |  |  |                 raise RuntimeError('expected ERROR_IO_INCOMPLETE') | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             ov2 = _overlapped.Overlapped() | 
					
						
							|  |  |  |             self.assertFalse(ov2.pending) | 
					
						
							|  |  |  |             self.assertEqual(ov2.error, 0) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             ov2.WriteFile(h2, b"hello") | 
					
						
							|  |  |  |             self.assertIn(ov2.error, {0, _winapi.ERROR_IO_PENDING}) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             res = _winapi.WaitForMultipleObjects([ov2.event], False, 100) | 
					
						
							|  |  |  |             self.assertEqual(res, _winapi.WAIT_OBJECT_0) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             self.assertFalse(ov1.pending) | 
					
						
							|  |  |  |             self.assertEqual(ov1.error, ERROR_IO_INCOMPLETE) | 
					
						
							|  |  |  |             self.assertFalse(ov2.pending) | 
					
						
							|  |  |  |             self.assertIn(ov2.error, {0, _winapi.ERROR_IO_PENDING}) | 
					
						
							|  |  |  |             self.assertEqual(ov1.getresult(), b"hello") | 
					
						
							|  |  |  |         finally: | 
					
						
							|  |  |  |             _winapi.CloseHandle(h1) | 
					
						
							|  |  |  |             _winapi.CloseHandle(h2) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_pipe_handle(self): | 
					
						
							|  |  |  |         h, _ = windows_utils.pipe(overlapped=(True, True)) | 
					
						
							|  |  |  |         _winapi.CloseHandle(_) | 
					
						
							|  |  |  |         p = windows_utils.PipeHandle(h) | 
					
						
							|  |  |  |         self.assertEqual(p.fileno(), h) | 
					
						
							|  |  |  |         self.assertEqual(p.handle, h) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # check garbage collection of p closes handle | 
					
						
							| 
									
										
										
										
											2015-01-15 14:24:55 +01:00
										 |  |  |         with warnings.catch_warnings(): | 
					
						
							|  |  |  |             warnings.filterwarnings("ignore", "",  ResourceWarning) | 
					
						
							|  |  |  |             del p | 
					
						
							|  |  |  |             support.gc_collect() | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |         try: | 
					
						
							|  |  |  |             _winapi.CloseHandle(h) | 
					
						
							|  |  |  |         except OSError as e: | 
					
						
							|  |  |  |             self.assertEqual(e.winerror, 6)     # ERROR_INVALID_HANDLE | 
					
						
							|  |  |  |         else: | 
					
						
							|  |  |  |             raise RuntimeError('expected ERROR_INVALID_HANDLE') | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class PopenTests(unittest.TestCase): | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_popen(self): | 
					
						
							|  |  |  |         command = r"""if 1:
 | 
					
						
							|  |  |  |             import sys | 
					
						
							|  |  |  |             s = sys.stdin.readline() | 
					
						
							|  |  |  |             sys.stdout.write(s.upper()) | 
					
						
							|  |  |  |             sys.stderr.write('stderr') | 
					
						
							|  |  |  |             """
 | 
					
						
							|  |  |  |         msg = b"blah\n" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         p = windows_utils.Popen([sys.executable, '-c', command], | 
					
						
							|  |  |  |                                 stdin=windows_utils.PIPE, | 
					
						
							|  |  |  |                                 stdout=windows_utils.PIPE, | 
					
						
							|  |  |  |                                 stderr=windows_utils.PIPE) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         for f in [p.stdin, p.stdout, p.stderr]: | 
					
						
							|  |  |  |             self.assertIsInstance(f, windows_utils.PipeHandle) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         ovin = _overlapped.Overlapped() | 
					
						
							|  |  |  |         ovout = _overlapped.Overlapped() | 
					
						
							|  |  |  |         overr = _overlapped.Overlapped() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         ovin.WriteFile(p.stdin.handle, msg) | 
					
						
							|  |  |  |         ovout.ReadFile(p.stdout.handle, 100) | 
					
						
							|  |  |  |         overr.ReadFile(p.stderr.handle, 100) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         events = [ovin.event, ovout.event, overr.event] | 
					
						
							| 
									
										
										
										
											2013-11-15 07:41:10 -08:00
										 |  |  |         # Super-long timeout for slow buildbots. | 
					
						
							|  |  |  |         res = _winapi.WaitForMultipleObjects(events, True, 10000) | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |         self.assertEqual(res, _winapi.WAIT_OBJECT_0) | 
					
						
							|  |  |  |         self.assertFalse(ovout.pending) | 
					
						
							|  |  |  |         self.assertFalse(overr.pending) | 
					
						
							|  |  |  |         self.assertFalse(ovin.pending) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.assertEqual(ovin.getresult(), len(msg)) | 
					
						
							|  |  |  |         out = ovout.getresult().rstrip() | 
					
						
							|  |  |  |         err = overr.getresult().rstrip() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.assertGreater(len(out), 0) | 
					
						
							|  |  |  |         self.assertGreater(len(err), 0) | 
					
						
							|  |  |  |         # allow for partial reads... | 
					
						
							|  |  |  |         self.assertTrue(msg.upper().rstrip().startswith(out)) | 
					
						
							|  |  |  |         self.assertTrue(b"stderr".startswith(err)) | 
					
						
							| 
									
										
										
										
											2013-10-20 21:02:53 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-01-15 14:24:55 +01:00
										 |  |  |         # The context manager calls wait() and closes resources | 
					
						
							|  |  |  |         with p: | 
					
						
							|  |  |  |             pass | 
					
						
							| 
									
										
										
										
											2014-06-04 00:42:04 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2013-10-20 21:02:53 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  | if __name__ == '__main__': | 
					
						
							|  |  |  |     unittest.main() |