mirror of
				https://github.com/python/cpython.git
				synced 2025-10-31 21:51:50 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			457 lines
		
	
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			457 lines
		
	
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| #
 | |
| # A higher level module for using sockets (or Windows named pipes)
 | |
| #
 | |
| # multiprocessing/connection.py
 | |
| #
 | |
| # Copyright (c) 2006-2008, R Oudkerk
 | |
| # All rights reserved.
 | |
| #
 | |
| # Redistribution and use in source and binary forms, with or without
 | |
| # modification, are permitted provided that the following conditions
 | |
| # are met:
 | |
| #
 | |
| # 1. Redistributions of source code must retain the above copyright
 | |
| #    notice, this list of conditions and the following disclaimer.
 | |
| # 2. Redistributions in binary form must reproduce the above copyright
 | |
| #    notice, this list of conditions and the following disclaimer in the
 | |
| #    documentation and/or other materials provided with the distribution.
 | |
| # 3. Neither the name of author nor the names of any contributors may be
 | |
| #    used to endorse or promote products derived from this software
 | |
| #    without specific prior written permission.
 | |
| #
 | |
| # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
 | |
| # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 | |
| # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 | |
| # ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
 | |
| # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
 | |
| # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
 | |
| # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
 | |
| # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
 | |
| # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
 | |
| # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
 | |
| # SUCH DAMAGE.
 | |
| #
 | |
| 
 | |
| __all__ = [ 'Client', 'Listener', 'Pipe' ]
 | |
| 
 | |
| import os
 | |
| import sys
 | |
| import socket
 | |
| import errno
 | |
| import time
 | |
| import tempfile
 | |
| import itertools
 | |
| 
 | |
| import _multiprocessing
 | |
| from multiprocessing import current_process, AuthenticationError
 | |
| from multiprocessing.util import get_temp_dir, Finalize, sub_debug, debug
 | |
| from multiprocessing.forking import duplicate, close
 | |
| 
 | |
| 
 | |
| #
 | |
| #
 | |
| #
 | |
| 
 | |
| BUFSIZE = 8192
 | |
| # A very generous timeout when it comes to local connections...
 | |
| CONNECTION_TIMEOUT = 20.
 | |
| 
 | |
| _mmap_counter = itertools.count()
 | |
| 
 | |
| default_family = 'AF_INET'
 | |
| families = ['AF_INET']
 | |
| 
 | |
| if hasattr(socket, 'AF_UNIX'):
 | |
|     default_family = 'AF_UNIX'
 | |
|     families += ['AF_UNIX']
 | |
| 
 | |
| if sys.platform == 'win32':
 | |
|     default_family = 'AF_PIPE'
 | |
|     families += ['AF_PIPE']
 | |
| 
 | |
| 
 | |
| def _init_timeout(timeout=CONNECTION_TIMEOUT):
 | |
|     return time.time() + timeout
 | |
| 
 | |
| def _check_timeout(t):
 | |
|     return time.time() > t
 | |
| 
 | |
| #
 | |
| #
 | |
| #
 | |
| 
 | |
| def arbitrary_address(family):
 | |
|     '''
 | |
|     Return an arbitrary free address for the given family
 | |
|     '''
 | |
|     if family == 'AF_INET':
 | |
|         return ('localhost', 0)
 | |
|     elif family == 'AF_UNIX':
 | |
|         return tempfile.mktemp(prefix='listener-', dir=get_temp_dir())
 | |
|     elif family == 'AF_PIPE':
 | |
|         return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' %
 | |
|                                (os.getpid(), next(_mmap_counter)))
 | |
|     else:
 | |
|         raise ValueError('unrecognized family')
 | |
| 
 | |
| 
 | |
| def address_type(address):
 | |
|     '''
 | |
|     Return the types of the address
 | |
| 
 | |
|     This can be 'AF_INET', 'AF_UNIX', or 'AF_PIPE'
 | |
|     '''
 | |
|     if type(address) == tuple:
 | |
|         return 'AF_INET'
 | |
|     elif type(address) is str and address.startswith('\\\\'):
 | |
|         return 'AF_PIPE'
 | |
|     elif type(address) is str:
 | |
|         return 'AF_UNIX'
 | |
|     else:
 | |
|         raise ValueError('address type of %r unrecognized' % address)
 | |
| 
 | |
| #
 | |
| # Public functions
 | |
| #
 | |
| 
 | |
| class Listener(object):
 | |
|     '''
 | |
|     Returns a listener object.
 | |
| 
 | |
|     This is a wrapper for a bound socket which is 'listening' for
 | |
|     connections, or for a Windows named pipe.
 | |
|     '''
 | |
|     def __init__(self, address=None, family=None, backlog=1, authkey=None):
 | |
|         family = family or (address and address_type(address)) \
 | |
|                  or default_family
 | |
|         address = address or arbitrary_address(family)
 | |
| 
 | |
|         if family == 'AF_PIPE':
 | |
|             self._listener = PipeListener(address, backlog)
 | |
|         else:
 | |
|             self._listener = SocketListener(address, family, backlog)
 | |
| 
 | |
|         if authkey is not None and not isinstance(authkey, bytes):
 | |
|             raise TypeError('authkey should be a byte string')
 | |
| 
 | |
|         self._authkey = authkey
 | |
| 
 | |
|     def accept(self):
 | |
|         '''
 | |
|         Accept a connection on the bound socket or named pipe of `self`.
 | |
| 
 | |
|         Returns a `Connection` object.
 | |
|         '''
 | |
|         c = self._listener.accept()
 | |
|         if self._authkey:
 | |
|             deliver_challenge(c, self._authkey)
 | |
|             answer_challenge(c, self._authkey)
 | |
|         return c
 | |
| 
 | |
|     def close(self):
 | |
|         '''
 | |
|         Close the bound socket or named pipe of `self`.
 | |
|         '''
 | |
|         return self._listener.close()
 | |
| 
 | |
|     address = property(lambda self: self._listener._address)
 | |
|     last_accepted = property(lambda self: self._listener._last_accepted)
 | |
| 
 | |
| 
 | |
| def Client(address, family=None, authkey=None):
 | |
|     '''
 | |
|     Returns a connection to the address of a `Listener`
 | |
|     '''
 | |
|     family = family or address_type(address)
 | |
|     if family == 'AF_PIPE':
 | |
|         c = PipeClient(address)
 | |
|     else:
 | |
|         c = SocketClient(address)
 | |
| 
 | |
|     if authkey is not None and not isinstance(authkey, bytes):
 | |
|         raise TypeError('authkey should be a byte string')
 | |
| 
 | |
|     if authkey is not None:
 | |
|         answer_challenge(c, authkey)
 | |
|         deliver_challenge(c, authkey)
 | |
| 
 | |
|     return c
 | |
| 
 | |
| 
 | |
| if sys.platform != 'win32':
 | |
| 
 | |
|     def Pipe(duplex=True):
 | |
|         '''
 | |
|         Returns pair of connection objects at either end of a pipe
 | |
|         '''
 | |
|         if duplex:
 | |
|             s1, s2 = socket.socketpair()
 | |
|             c1 = _multiprocessing.Connection(os.dup(s1.fileno()))
 | |
|             c2 = _multiprocessing.Connection(os.dup(s2.fileno()))
 | |
|             s1.close()
 | |
|             s2.close()
 | |
|         else:
 | |
|             fd1, fd2 = os.pipe()
 | |
|             c1 = _multiprocessing.Connection(fd1, writable=False)
 | |
|             c2 = _multiprocessing.Connection(fd2, readable=False)
 | |
| 
 | |
|         return c1, c2
 | |
| 
 | |
| else:
 | |
| 
 | |
|     from _multiprocessing import win32
 | |
| 
 | |
|     def Pipe(duplex=True):
 | |
|         '''
 | |
|         Returns pair of connection objects at either end of a pipe
 | |
|         '''
 | |
|         address = arbitrary_address('AF_PIPE')
 | |
|         if duplex:
 | |
|             openmode = win32.PIPE_ACCESS_DUPLEX
 | |
|             access = win32.GENERIC_READ | win32.GENERIC_WRITE
 | |
|             obsize, ibsize = BUFSIZE, BUFSIZE
 | |
|         else:
 | |
|             openmode = win32.PIPE_ACCESS_INBOUND
 | |
|             access = win32.GENERIC_WRITE
 | |
|             obsize, ibsize = 0, BUFSIZE
 | |
| 
 | |
|         h1 = win32.CreateNamedPipe(
 | |
|             address, openmode,
 | |
|             win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
 | |
|             win32.PIPE_WAIT,
 | |
|             1, obsize, ibsize, win32.NMPWAIT_WAIT_FOREVER, win32.NULL
 | |
|             )
 | |
|         h2 = win32.CreateFile(
 | |
|             address, access, 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL
 | |
|             )
 | |
|         win32.SetNamedPipeHandleState(
 | |
|             h2, win32.PIPE_READMODE_MESSAGE, None, None
 | |
|             )
 | |
| 
 | |
|         try:
 | |
|             win32.ConnectNamedPipe(h1, win32.NULL)
 | |
|         except WindowsError as e:
 | |
|             if e.args[0] != win32.ERROR_PIPE_CONNECTED:
 | |
|                 raise
 | |
| 
 | |
|         c1 = _multiprocessing.PipeConnection(h1, writable=duplex)
 | |
|         c2 = _multiprocessing.PipeConnection(h2, readable=duplex)
 | |
| 
 | |
|         return c1, c2
 | |
| 
 | |
| #
 | |
| # Definitions for connections based on sockets
 | |
| #
 | |
| 
 | |
| class SocketListener(object):
 | |
|     '''
 | |
|     Representation of a socket which is bound to an address and listening
 | |
|     '''
 | |
|     def __init__(self, address, family, backlog=1):
 | |
|         self._socket = socket.socket(getattr(socket, family))
 | |
|         try:
 | |
|             self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 | |
|             self._socket.bind(address)
 | |
|             self._socket.listen(backlog)
 | |
|             self._address = self._socket.getsockname()
 | |
|         except socket.error:
 | |
|             self._socket.close()
 | |
|             raise
 | |
|         self._family = family
 | |
|         self._last_accepted = None
 | |
| 
 | |
|         if family == 'AF_UNIX':
 | |
|             self._unlink = Finalize(
 | |
|                 self, os.unlink, args=(address,), exitpriority=0
 | |
|                 )
 | |
|         else:
 | |
|             self._unlink = None
 | |
| 
 | |
|     def accept(self):
 | |
|         s, self._last_accepted = self._socket.accept()
 | |
|         fd = duplicate(s.fileno())
 | |
|         conn = _multiprocessing.Connection(fd)
 | |
|         s.close()
 | |
|         return conn
 | |
| 
 | |
|     def close(self):
 | |
|         self._socket.close()
 | |
|         if self._unlink is not None:
 | |
|             self._unlink()
 | |
| 
 | |
| 
 | |
| def SocketClient(address):
 | |
|     '''
 | |
|     Return a connection object connected to the socket given by `address`
 | |
|     '''
 | |
|     family = address_type(address)
 | |
|     with socket.socket( getattr(socket, family) ) as s:
 | |
|         t = _init_timeout()
 | |
| 
 | |
|         while 1:
 | |
|             try:
 | |
|                 s.connect(address)
 | |
|             except socket.error as e:
 | |
|                 if e.args[0] != errno.ECONNREFUSED or _check_timeout(t):
 | |
|                     debug('failed to connect to address %s', address)
 | |
|                     raise
 | |
|                 time.sleep(0.01)
 | |
|             else:
 | |
|                 break
 | |
|         else:
 | |
|             raise
 | |
| 
 | |
|         fd = duplicate(s.fileno())
 | |
|     conn = _multiprocessing.Connection(fd)
 | |
|     return conn
 | |
| 
 | |
| #
 | |
| # Definitions for connections based on named pipes
 | |
| #
 | |
| 
 | |
| if sys.platform == 'win32':
 | |
| 
 | |
|     class PipeListener(object):
 | |
|         '''
 | |
|         Representation of a named pipe
 | |
|         '''
 | |
|         def __init__(self, address, backlog=None):
 | |
|             self._address = address
 | |
|             handle = win32.CreateNamedPipe(
 | |
|                 address, win32.PIPE_ACCESS_DUPLEX,
 | |
|                 win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
 | |
|                 win32.PIPE_WAIT,
 | |
|                 win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE,
 | |
|                 win32.NMPWAIT_WAIT_FOREVER, win32.NULL
 | |
|                 )
 | |
|             self._handle_queue = [handle]
 | |
|             self._last_accepted = None
 | |
| 
 | |
|             sub_debug('listener created with address=%r', self._address)
 | |
| 
 | |
|             self.close = Finalize(
 | |
|                 self, PipeListener._finalize_pipe_listener,
 | |
|                 args=(self._handle_queue, self._address), exitpriority=0
 | |
|                 )
 | |
| 
 | |
|         def accept(self):
 | |
|             newhandle = win32.CreateNamedPipe(
 | |
|                 self._address, win32.PIPE_ACCESS_DUPLEX,
 | |
|                 win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
 | |
|                 win32.PIPE_WAIT,
 | |
|                 win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE,
 | |
|                 win32.NMPWAIT_WAIT_FOREVER, win32.NULL
 | |
|                 )
 | |
|             self._handle_queue.append(newhandle)
 | |
|             handle = self._handle_queue.pop(0)
 | |
|             try:
 | |
|                 win32.ConnectNamedPipe(handle, win32.NULL)
 | |
|             except WindowsError as e:
 | |
|                 if e.args[0] != win32.ERROR_PIPE_CONNECTED:
 | |
|                     raise
 | |
|             return _multiprocessing.PipeConnection(handle)
 | |
| 
 | |
|         @staticmethod
 | |
|         def _finalize_pipe_listener(queue, address):
 | |
|             sub_debug('closing listener with address=%r', address)
 | |
|             for handle in queue:
 | |
|                 close(handle)
 | |
| 
 | |
|     def PipeClient(address):
 | |
|         '''
 | |
|         Return a connection object connected to the pipe given by `address`
 | |
|         '''
 | |
|         t = _init_timeout()
 | |
|         while 1:
 | |
|             try:
 | |
|                 win32.WaitNamedPipe(address, 1000)
 | |
|                 h = win32.CreateFile(
 | |
|                     address, win32.GENERIC_READ | win32.GENERIC_WRITE,
 | |
|                     0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL
 | |
|                     )
 | |
|             except WindowsError as e:
 | |
|                 if e.args[0] not in (win32.ERROR_SEM_TIMEOUT,
 | |
|                                      win32.ERROR_PIPE_BUSY) or _check_timeout(t):
 | |
|                     raise
 | |
|             else:
 | |
|                 break
 | |
|         else:
 | |
|             raise
 | |
| 
 | |
|         win32.SetNamedPipeHandleState(
 | |
|             h, win32.PIPE_READMODE_MESSAGE, None, None
 | |
|             )
 | |
|         return _multiprocessing.PipeConnection(h)
 | |
| 
 | |
| #
 | |
| # Authentication stuff
 | |
| #
 | |
| 
 | |
| MESSAGE_LENGTH = 20
 | |
| 
 | |
| CHALLENGE = b'#CHALLENGE#'
 | |
| WELCOME = b'#WELCOME#'
 | |
| FAILURE = b'#FAILURE#'
 | |
| 
 | |
| def deliver_challenge(connection, authkey):
 | |
|     import hmac
 | |
|     assert isinstance(authkey, bytes)
 | |
|     message = os.urandom(MESSAGE_LENGTH)
 | |
|     connection.send_bytes(CHALLENGE + message)
 | |
|     digest = hmac.new(authkey, message).digest()
 | |
|     response = connection.recv_bytes(256)        # reject large message
 | |
|     if response == digest:
 | |
|         connection.send_bytes(WELCOME)
 | |
|     else:
 | |
|         connection.send_bytes(FAILURE)
 | |
|         raise AuthenticationError('digest received was wrong')
 | |
| 
 | |
| def answer_challenge(connection, authkey):
 | |
|     import hmac
 | |
|     assert isinstance(authkey, bytes)
 | |
|     message = connection.recv_bytes(256)         # reject large message
 | |
|     assert message[:len(CHALLENGE)] == CHALLENGE, 'message = %r' % message
 | |
|     message = message[len(CHALLENGE):]
 | |
|     digest = hmac.new(authkey, message).digest()
 | |
|     connection.send_bytes(digest)
 | |
|     response = connection.recv_bytes(256)        # reject large message
 | |
|     if response != WELCOME:
 | |
|         raise AuthenticationError('digest sent was rejected')
 | |
| 
 | |
| #
 | |
| # Support for using xmlrpclib for serialization
 | |
| #
 | |
| 
 | |
| class ConnectionWrapper(object):
 | |
|     def __init__(self, conn, dumps, loads):
 | |
|         self._conn = conn
 | |
|         self._dumps = dumps
 | |
|         self._loads = loads
 | |
|         for attr in ('fileno', 'close', 'poll', 'recv_bytes', 'send_bytes'):
 | |
|             obj = getattr(conn, attr)
 | |
|             setattr(self, attr, obj)
 | |
|     def send(self, obj):
 | |
|         s = self._dumps(obj)
 | |
|         self._conn.send_bytes(s)
 | |
|     def recv(self):
 | |
|         s = self._conn.recv_bytes()
 | |
|         return self._loads(s)
 | |
| 
 | |
| def _xml_dumps(obj):
 | |
|     return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf8')
 | |
| 
 | |
| def _xml_loads(s):
 | |
|     (obj,), method = xmlrpclib.loads(s.decode('utf8'))
 | |
|     return obj
 | |
| 
 | |
| class XmlListener(Listener):
 | |
|     def accept(self):
 | |
|         global xmlrpclib
 | |
|         import xmlrpc.client as xmlrpclib
 | |
|         obj = Listener.accept(self)
 | |
|         return ConnectionWrapper(obj, _xml_dumps, _xml_loads)
 | |
| 
 | |
| def XmlClient(*args, **kwds):
 | |
|     global xmlrpclib
 | |
|     import xmlrpc.client as xmlrpclib
 | |
|     return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads)
 | 
