mirror of
				https://github.com/python/cpython.git
				synced 2025-11-03 23:21:29 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			369 lines
		
	
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			369 lines
		
	
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
"""protocol        (David Scherer <dscherer@cmu.edu>)
 | 
						|
 | 
						|
     This module implements a simple RPC or "distributed object" protocol.
 | 
						|
     I am probably the 100,000th person to write this in Python, but, hey,
 | 
						|
     it was fun.
 | 
						|
 | 
						|
     Contents:
 | 
						|
 | 
						|
       connectionLost is an exception that will be thrown by functions in
 | 
						|
           the protocol module or calls to remote methods that fail because
 | 
						|
           the remote program has closed the socket or because no connection
 | 
						|
           could be established in the first place.
 | 
						|
 | 
						|
       Server( port=None, connection_hook=None ) creates a server on a
 | 
						|
           well-known port, to which clients can connect.  When a client
 | 
						|
           connects, a Connection is created for it.  If connection_hook
 | 
						|
           is defined, then connection_hook( socket.getpeername() ) is called
 | 
						|
           before a Connection is created, and if it returns false then the
 | 
						|
           connection is refused.  connection_hook must be prepared to be
 | 
						|
           called from any thread.
 | 
						|
  
 | 
						|
       Client( ip='127.0.0.1', port=None ) returns a Connection to a Server
 | 
						|
           object at a well-known address and port.
 | 
						|
  
 | 
						|
       Connection( socket ) creates an RPC connection on an arbitrary socket,
 | 
						|
           which must already be connected to another program.  You do not
 | 
						|
           need to use this directly if you are using Client() or Server().
 | 
						|
  
 | 
						|
       publish( name, connect_function ) provides an object with the
 | 
						|
           specified name to some or all Connections.  When another program
 | 
						|
           calls Connection.getobject() with the specified name, the
 | 
						|
           specified connect_function is called with the arguments
 | 
						|
 | 
						|
              connect_function( conn, addr )
 | 
						|
 | 
						|
           where conn is the Connection object to the requesting client and
 | 
						|
           addr is the address returned by socket.getpeername().  If that
 | 
						|
           function returns an object, that object becomes accessible to
 | 
						|
           the caller.  If it returns None, the caller's request fails.
 | 
						|
 | 
						|
     Connection objects:
 | 
						|
 | 
						|
       .close() refuses additional RPC messages from the peer, and notifies
 | 
						|
           the peer that the connection has been closed.  All pending remote
 | 
						|
           method calls in either program will fail with a connectionLost
 | 
						|
           exception.  Further remote method calls on this connection will
 | 
						|
           also result in errors.
 | 
						|
 | 
						|
       .getobject(name) returns a proxy for the remote object with the
 | 
						|
           specified name, if it exists and the peer permits us access.
 | 
						|
           Otherwise, it returns None.  It may throw a connectionLost
 | 
						|
           exception.  The returned proxy supports basic attribute access
 | 
						|
           and method calls, and its methods have an extra attribute,
 | 
						|
           .void, which is a function that has the same effect but always
 | 
						|
           returns None.  This last capability is provided as a performance
 | 
						|
           hack: object.method.void(params) can return without waiting for
 | 
						|
           the remote process to respond, but object.method(params) needs
 | 
						|
           to wait for a return value or exception.
 | 
						|
 | 
						|
       .rpc_loop(block=0) processes *incoming* messages for this connection.
 | 
						|
           If block=1, it continues processing until an exception or return
 | 
						|
           value is received, which is normally forever.  Otherwise it
 | 
						|
           returns when all currently pending messages have been delivered.
 | 
						|
           It may throw a connectionLost exception.
 | 
						|
 | 
						|
       .set_close_hook(f) specifies a function to be called when the remote
 | 
						|
           object closes the connection during a call to rpc_loop().  This
 | 
						|
           is a good way for servers to be notified when clients disconnect.
 | 
						|
 | 
						|
       .set_shutdown_hook(f) specifies a function called *immediately* when
 | 
						|
           the receive loop detects that the connection has been lost.  The
 | 
						|
           provided function must be prepared to run in any thread.
 | 
						|
 | 
						|
     Server objects:
 | 
						|
 | 
						|
       .rpc_loop() processes incoming messages on all connections, and
 | 
						|
           returns when all pending messages have been processed.  It will
 | 
						|
           *not* throw connectionLost exceptions; the
 | 
						|
           Connection.set_close_hook() mechanism is much better for servers.
 | 
						|
"""
 | 
						|
 | 
						|
import sys, os, string, types
 | 
						|
import socket
 | 
						|
from threading import Thread
 | 
						|
from Queue import Queue, Empty
 | 
						|
from cPickle import Pickler, Unpickler, PicklingError
 | 
						|
 | 
						|
class connectionLost:
 | 
						|
    def __init__(self, what=""): self.what = what
 | 
						|
    def __repr__(self): return self.what
 | 
						|
    def __str__(self): return self.what
 | 
						|
 | 
						|
def getmethods(cls):
 | 
						|
    "Returns a list of the names of the methods of a class."
 | 
						|
    methods = []
 | 
						|
    for b in cls.__bases__:
 | 
						|
        methods = methods + getmethods(b)
 | 
						|
    d = cls.__dict__
 | 
						|
    for k in d.keys():
 | 
						|
        if type(d[k])==types.FunctionType:
 | 
						|
            methods.append(k)
 | 
						|
    return methods
 | 
						|
 | 
						|
class methodproxy:
 | 
						|
    "Proxy for a method of a remote object."
 | 
						|
    def __init__(self, classp, name):
 | 
						|
        self.classp=classp
 | 
						|
        self.name=name
 | 
						|
        self.client = classp.client
 | 
						|
    def __call__(self, *args, **keywords):
 | 
						|
        return self.client.call( 'm', self.classp.name, self.name, args, keywords )
 | 
						|
 | 
						|
    def void(self, *args, **keywords):
 | 
						|
        self.client.call_void( 'm', self.classp.name,self.name,args,keywords)
 | 
						|
 | 
						|
class classproxy:
 | 
						|
    "Proxy for a remote object."
 | 
						|
    def __init__(self, client, name, methods):
 | 
						|
        self.__dict__['client'] = client
 | 
						|
        self.__dict__['name'] = name
 | 
						|
        
 | 
						|
        for m in methods:
 | 
						|
            prox = methodproxy( self, m )
 | 
						|
            self.__dict__[m] = prox
 | 
						|
 | 
						|
    def __getattr__(self, attr):
 | 
						|
        return self.client.call( 'g', self.name, attr )
 | 
						|
 | 
						|
    def __setattr__(self, attr, value):
 | 
						|
        self.client.call_void( 's', self.name, attr, value )
 | 
						|
 | 
						|
local_connect  = {}
 | 
						|
def publish(name, connect_function):
 | 
						|
    local_connect[name]=connect_function
 | 
						|
 | 
						|
class socketFile:
 | 
						|
    "File emulator based on a socket.  Provides only blocking semantics for now."
 | 
						|
 | 
						|
    def __init__(self, socket):
 | 
						|
        self.socket = socket
 | 
						|
        self.buffer = ''
 | 
						|
 | 
						|
    def _recv(self,bytes):
 | 
						|
        try:
 | 
						|
            r=self.socket.recv(bytes)
 | 
						|
        except:
 | 
						|
            raise connectionLost()
 | 
						|
        if not r:
 | 
						|
            raise connectionLost()
 | 
						|
        return r
 | 
						|
 | 
						|
    def write(self, string):
 | 
						|
        try:
 | 
						|
            self.socket.send( string )
 | 
						|
        except:
 | 
						|
            raise connectionLost()
 | 
						|
 | 
						|
    def read(self,bytes):
 | 
						|
        x = bytes-len(self.buffer)
 | 
						|
        while x>0:
 | 
						|
            self.buffer=self.buffer+self._recv(x)
 | 
						|
            x = bytes-len(self.buffer)
 | 
						|
        s = self.buffer[:bytes]
 | 
						|
        self.buffer=self.buffer[bytes:]
 | 
						|
        return s
 | 
						|
 | 
						|
    def readline(self):
 | 
						|
        while 1:
 | 
						|
            f = string.find(self.buffer,'\n')
 | 
						|
            if f>=0:
 | 
						|
                s = self.buffer[:f+1]
 | 
						|
                self.buffer=self.buffer[f+1:]
 | 
						|
                return s
 | 
						|
            self.buffer = self.buffer + self._recv(1024)
 | 
						|
 | 
						|
 | 
						|
class Connection (Thread):
 | 
						|
    debug = 0
 | 
						|
    def __init__(self, socket):
 | 
						|
        self.local_objects = {}
 | 
						|
        self.socket = socket
 | 
						|
        self.name = socket.getpeername()
 | 
						|
        self.socketfile = socketFile(socket)
 | 
						|
        self.queue = Queue(-1)
 | 
						|
        self.refuse_messages = 0
 | 
						|
        self.cmds = { 'm': self.r_meth,
 | 
						|
                      'g': self.r_get,
 | 
						|
                      's': self.r_set,
 | 
						|
                      'o': self.r_geto,
 | 
						|
                      'e': self.r_exc,
 | 
						|
                     #'r' handled by rpc_loop
 | 
						|
                    }
 | 
						|
 | 
						|
        Thread.__init__(self)
 | 
						|
        self.setDaemon(1)
 | 
						|
        self.start()
 | 
						|
 | 
						|
    def getobject(self, name):
 | 
						|
        methods = self.call( 'o', name )
 | 
						|
        if methods is None: return None
 | 
						|
        return classproxy(self, name, methods)
 | 
						|
 | 
						|
    # close_hook is called from rpc_loop(), like a normal remote method
 | 
						|
    #   invocation
 | 
						|
    def set_close_hook(self,hook): self.close_hook = hook
 | 
						|
 | 
						|
    # shutdown_hook is called directly from the run() thread, and needs
 | 
						|
    #   to be "thread safe"
 | 
						|
    def set_shutdown_hook(self,hook): self.shutdown_hook = hook
 | 
						|
 | 
						|
    close_hook = None
 | 
						|
    shutdown_hook = None
 | 
						|
 | 
						|
    def close(self):
 | 
						|
        self._shutdown()
 | 
						|
        self.refuse_messages = 1
 | 
						|
 | 
						|
    def call(self, c, *args):
 | 
						|
        self.send( (c, args, 1 ) )
 | 
						|
        return self.rpc_loop( block = 1 )
 | 
						|
 | 
						|
    def call_void(self, c, *args):
 | 
						|
        try:
 | 
						|
            self.send( (c, args, 0 ) )
 | 
						|
        except:
 | 
						|
            pass
 | 
						|
   
 | 
						|
    # the following methods handle individual RPC calls:
 | 
						|
 | 
						|
    def r_geto(self, obj):
 | 
						|
        c = local_connect.get(obj)
 | 
						|
        if not c: return None
 | 
						|
        o = c(self, self.name)
 | 
						|
        if not o: return None
 | 
						|
        self.local_objects[obj] = o
 | 
						|
        return getmethods(o.__class__)
 | 
						|
 | 
						|
    def r_meth(self, obj, name, args, keywords):
 | 
						|
        return apply( getattr(self.local_objects[obj],name), args, keywords)
 | 
						|
 | 
						|
    def r_get(self, obj, name):       
 | 
						|
        return getattr(self.local_objects[obj],name)
 | 
						|
 | 
						|
    def r_set(self, obj, name, value):
 | 
						|
        setattr(self.local_objects[obj],name,value)
 | 
						|
 | 
						|
    def r_exc(self, e, v):
 | 
						|
        raise e, v
 | 
						|
 | 
						|
    def rpc_exec(self, cmd, arg, ret):
 | 
						|
        if self.refuse_messages: return
 | 
						|
        if self.debug: print cmd,arg,ret
 | 
						|
        if ret:
 | 
						|
            try:
 | 
						|
                r=apply(self.cmds.get(cmd), arg)
 | 
						|
                self.send( ('r', r, 0) )
 | 
						|
            except:
 | 
						|
                try:
 | 
						|
                    self.send( ('e', sys.exc_info()[:2], 0) )
 | 
						|
                except PicklingError:
 | 
						|
                    self.send( ('e', (TypeError, 'Unpicklable exception.'), 0 ) )
 | 
						|
        else:
 | 
						|
            # we cannot report exceptions to the caller, so
 | 
						|
            #   we report them in this process.
 | 
						|
            r=apply(self.cmds.get(cmd), arg)
 | 
						|
 | 
						|
    # the following methods implement the RPC and message loops:
 | 
						|
 | 
						|
    def rpc_loop(self, block=0):
 | 
						|
        if self.refuse_messages: raise connectionLost('(already closed)')
 | 
						|
        try:
 | 
						|
            while 1:
 | 
						|
                try:
 | 
						|
                    cmd, arg, ret = self.queue.get( block )
 | 
						|
                except Empty:
 | 
						|
                    return None
 | 
						|
                if cmd=='r': return arg
 | 
						|
                self.rpc_exec(cmd,arg,ret)
 | 
						|
        except connectionLost:
 | 
						|
            if self.close_hook:
 | 
						|
                self.close_hook()
 | 
						|
                self.close_hook = None
 | 
						|
            raise
 | 
						|
 | 
						|
    def run(self):
 | 
						|
        try:
 | 
						|
            while 1:
 | 
						|
                data = self.recv()
 | 
						|
                self.queue.put( data )
 | 
						|
        except:
 | 
						|
            self.queue.put( ('e', sys.exc_info()[:2], 0) )
 | 
						|
 | 
						|
    # The following send raw pickled data to the peer
 | 
						|
 | 
						|
    def send(self, data):
 | 
						|
        try:
 | 
						|
            Pickler(self.socketfile,1).dump( data )
 | 
						|
        except connectionLost:
 | 
						|
            self._shutdown()
 | 
						|
            if self.shutdown_hook: self.shutdown_hook()
 | 
						|
            raise
 | 
						|
 | 
						|
    def recv(self):
 | 
						|
        try:
 | 
						|
            return Unpickler(self.socketfile).load()
 | 
						|
        except connectionLost:
 | 
						|
            self._shutdown()
 | 
						|
            if self.shutdown_hook: self.shutdown_hook()
 | 
						|
            raise
 | 
						|
        except:
 | 
						|
            raise
 | 
						|
 | 
						|
    def _shutdown(self):
 | 
						|
        try:
 | 
						|
            self.socket.shutdown(1)
 | 
						|
            self.socket.close()
 | 
						|
        except:
 | 
						|
            pass
 | 
						|
 | 
						|
 | 
						|
class Server (Thread):
 | 
						|
    default_port = 0x1D1E   # "IDlE"
 | 
						|
 | 
						|
    def __init__(self, port=None, connection_hook=None):
 | 
						|
        self.connections = []
 | 
						|
        self.port = port or self.default_port
 | 
						|
        self.connection_hook = connection_hook
 | 
						|
 | 
						|
        try:
 | 
						|
            self.wellknown = s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 | 
						|
            s.bind(('', self.port))
 | 
						|
            s.listen(3)
 | 
						|
        except:
 | 
						|
            raise connectionLost
 | 
						|
 | 
						|
        Thread.__init__(self)
 | 
						|
        self.setDaemon(1)
 | 
						|
        self.start()
 | 
						|
 | 
						|
    def run(self):
 | 
						|
        s = self.wellknown
 | 
						|
        while 1:
 | 
						|
            conn, addr = s.accept()
 | 
						|
            if self.connection_hook and not self.connection_hook(addr):
 | 
						|
                try:
 | 
						|
                    conn.shutdown(1)
 | 
						|
                except:
 | 
						|
                    pass
 | 
						|
                continue
 | 
						|
            self.connections.append( Connection(conn) )
 | 
						|
 | 
						|
    def rpc_loop(self):
 | 
						|
        cns = self.connections[:]
 | 
						|
        for c in cns:
 | 
						|
            try:
 | 
						|
                c.rpc_loop(block = 0)
 | 
						|
            except connectionLost:
 | 
						|
                if c in self.connections:
 | 
						|
                    self.connections.remove(c)
 | 
						|
 | 
						|
def Client(ip='127.0.0.1', port=None):
 | 
						|
    try:
 | 
						|
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 | 
						|
        s.connect((ip,port or Server.default_port))
 | 
						|
    except socket.error, what:
 | 
						|
        raise connectionLost(str(what))
 | 
						|
    except:
 | 
						|
        raise connectionLost()
 | 
						|
    return Connection(s)
 |