mirror of
				https://github.com/python/cpython.git
				synced 2025-10-30 21:21:22 +00:00 
			
		
		
		
	 3f9535b7f8
			
		
	
	
		3f9535b7f8
		
	
	
	
	
		
			
			Before, regrtest failed to remove the temporary test directory because the process was still running in this directory.
		
			
				
	
	
		
			171 lines
		
	
	
	
		
			5.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			171 lines
		
	
	
	
		
			5.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| """Tests for window_utils"""
 | |
| 
 | |
| import socket
 | |
| import sys
 | |
| import test.support
 | |
| import unittest
 | |
| from test.support import IPV6_ENABLED
 | |
| from unittest import mock
 | |
| 
 | |
| if sys.platform != 'win32':
 | |
|     raise unittest.SkipTest('Windows only')
 | |
| 
 | |
| import _winapi
 | |
| 
 | |
| from asyncio import windows_utils
 | |
| from asyncio import _overlapped
 | |
| 
 | |
| 
 | |
| class WinsocketpairTests(unittest.TestCase):
 | |
| 
 | |
|     def check_winsocketpair(self, ssock, csock):
 | |
|         csock.send(b'xxx')
 | |
|         self.assertEqual(b'xxx', ssock.recv(1024))
 | |
|         csock.close()
 | |
|         ssock.close()
 | |
| 
 | |
|     def test_winsocketpair(self):
 | |
|         ssock, csock = windows_utils.socketpair()
 | |
|         self.check_winsocketpair(ssock, csock)
 | |
| 
 | |
|     @unittest.skipUnless(IPV6_ENABLED, 'IPv6 not supported or enabled')
 | |
|     def test_winsocketpair_ipv6(self):
 | |
|         ssock, csock = windows_utils.socketpair(family=socket.AF_INET6)
 | |
|         self.check_winsocketpair(ssock, csock)
 | |
| 
 | |
|     @mock.patch('asyncio.windows_utils.socket')
 | |
|     def test_winsocketpair_exc(self, m_socket):
 | |
|         m_socket.AF_INET = socket.AF_INET
 | |
|         m_socket.SOCK_STREAM = socket.SOCK_STREAM
 | |
|         m_socket.socket.return_value.getsockname.return_value = ('', 12345)
 | |
|         m_socket.socket.return_value.accept.return_value = object(), object()
 | |
|         m_socket.socket.return_value.connect.side_effect = OSError()
 | |
| 
 | |
|         self.assertRaises(OSError, windows_utils.socketpair)
 | |
| 
 | |
|     def test_winsocketpair_invalid_args(self):
 | |
|         self.assertRaises(ValueError,
 | |
|                           windows_utils.socketpair, family=socket.AF_UNSPEC)
 | |
|         self.assertRaises(ValueError,
 | |
|                           windows_utils.socketpair, type=socket.SOCK_DGRAM)
 | |
|         self.assertRaises(ValueError,
 | |
|                           windows_utils.socketpair, proto=1)
 | |
| 
 | |
|     @mock.patch('asyncio.windows_utils.socket')
 | |
|     def test_winsocketpair_close(self, m_socket):
 | |
|         m_socket.AF_INET = socket.AF_INET
 | |
|         m_socket.SOCK_STREAM = socket.SOCK_STREAM
 | |
|         sock = mock.Mock()
 | |
|         m_socket.socket.return_value = sock
 | |
|         sock.bind.side_effect = OSError
 | |
|         self.assertRaises(OSError, windows_utils.socketpair)
 | |
|         self.assertTrue(sock.close.called)
 | |
| 
 | |
| 
 | |
| class PipeTests(unittest.TestCase):
 | |
| 
 | |
|     def test_pipe_overlapped(self):
 | |
|         h1, h2 = windows_utils.pipe(overlapped=(True, True))
 | |
|         try:
 | |
|             ov1 = _overlapped.Overlapped()
 | |
|             self.assertFalse(ov1.pending)
 | |
|             self.assertEqual(ov1.error, 0)
 | |
| 
 | |
|             ov1.ReadFile(h1, 100)
 | |
|             self.assertTrue(ov1.pending)
 | |
|             self.assertEqual(ov1.error, _winapi.ERROR_IO_PENDING)
 | |
|             ERROR_IO_INCOMPLETE = 996
 | |
|             try:
 | |
|                 ov1.getresult()
 | |
|             except OSError as e:
 | |
|                 self.assertEqual(e.winerror, ERROR_IO_INCOMPLETE)
 | |
|             else:
 | |
|                 raise RuntimeError('expected ERROR_IO_INCOMPLETE')
 | |
| 
 | |
|             ov2 = _overlapped.Overlapped()
 | |
|             self.assertFalse(ov2.pending)
 | |
|             self.assertEqual(ov2.error, 0)
 | |
| 
 | |
|             ov2.WriteFile(h2, b"hello")
 | |
|             self.assertIn(ov2.error, {0, _winapi.ERROR_IO_PENDING})
 | |
| 
 | |
|             res = _winapi.WaitForMultipleObjects([ov2.event], False, 100)
 | |
|             self.assertEqual(res, _winapi.WAIT_OBJECT_0)
 | |
| 
 | |
|             self.assertFalse(ov1.pending)
 | |
|             self.assertEqual(ov1.error, ERROR_IO_INCOMPLETE)
 | |
|             self.assertFalse(ov2.pending)
 | |
|             self.assertIn(ov2.error, {0, _winapi.ERROR_IO_PENDING})
 | |
|             self.assertEqual(ov1.getresult(), b"hello")
 | |
|         finally:
 | |
|             _winapi.CloseHandle(h1)
 | |
|             _winapi.CloseHandle(h2)
 | |
| 
 | |
|     def test_pipe_handle(self):
 | |
|         h, _ = windows_utils.pipe(overlapped=(True, True))
 | |
|         _winapi.CloseHandle(_)
 | |
|         p = windows_utils.PipeHandle(h)
 | |
|         self.assertEqual(p.fileno(), h)
 | |
|         self.assertEqual(p.handle, h)
 | |
| 
 | |
|         # check garbage collection of p closes handle
 | |
|         del p
 | |
|         test.support.gc_collect()
 | |
|         try:
 | |
|             _winapi.CloseHandle(h)
 | |
|         except OSError as e:
 | |
|             self.assertEqual(e.winerror, 6)     # ERROR_INVALID_HANDLE
 | |
|         else:
 | |
|             raise RuntimeError('expected ERROR_INVALID_HANDLE')
 | |
| 
 | |
| 
 | |
| class PopenTests(unittest.TestCase):
 | |
| 
 | |
|     def test_popen(self):
 | |
|         command = r"""if 1:
 | |
|             import sys
 | |
|             s = sys.stdin.readline()
 | |
|             sys.stdout.write(s.upper())
 | |
|             sys.stderr.write('stderr')
 | |
|             """
 | |
|         msg = b"blah\n"
 | |
| 
 | |
|         p = windows_utils.Popen([sys.executable, '-c', command],
 | |
|                                 stdin=windows_utils.PIPE,
 | |
|                                 stdout=windows_utils.PIPE,
 | |
|                                 stderr=windows_utils.PIPE)
 | |
| 
 | |
|         for f in [p.stdin, p.stdout, p.stderr]:
 | |
|             self.assertIsInstance(f, windows_utils.PipeHandle)
 | |
| 
 | |
|         ovin = _overlapped.Overlapped()
 | |
|         ovout = _overlapped.Overlapped()
 | |
|         overr = _overlapped.Overlapped()
 | |
| 
 | |
|         ovin.WriteFile(p.stdin.handle, msg)
 | |
|         ovout.ReadFile(p.stdout.handle, 100)
 | |
|         overr.ReadFile(p.stderr.handle, 100)
 | |
| 
 | |
|         events = [ovin.event, ovout.event, overr.event]
 | |
|         # Super-long timeout for slow buildbots.
 | |
|         res = _winapi.WaitForMultipleObjects(events, True, 10000)
 | |
|         self.assertEqual(res, _winapi.WAIT_OBJECT_0)
 | |
|         self.assertFalse(ovout.pending)
 | |
|         self.assertFalse(overr.pending)
 | |
|         self.assertFalse(ovin.pending)
 | |
| 
 | |
|         self.assertEqual(ovin.getresult(), len(msg))
 | |
|         out = ovout.getresult().rstrip()
 | |
|         err = overr.getresult().rstrip()
 | |
| 
 | |
|         self.assertGreater(len(out), 0)
 | |
|         self.assertGreater(len(err), 0)
 | |
|         # allow for partial reads...
 | |
|         self.assertTrue(msg.upper().rstrip().startswith(out))
 | |
|         self.assertTrue(b"stderr".startswith(err))
 | |
| 
 | |
|         p.wait()
 | |
| 
 | |
| 
 | |
| if __name__ == '__main__':
 | |
|     unittest.main()
 |