| 
									
										
										
										
											2000-08-15 01:13:23 +00:00
										 |  |  | """protocol        (David Scherer <dscherer@cmu.edu>)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |      This module implements a simple RPC or "distributed object" protocol. | 
					
						
							|  |  |  |      I am probably the 100,000th person to write this in Python, but, hey, | 
					
						
							|  |  |  |      it was fun. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |      Contents: | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |        connectionLost is an exception that will be thrown by functions in | 
					
						
							|  |  |  |            the protocol module or calls to remote methods that fail because | 
					
						
							|  |  |  |            the remote program has closed the socket or because no connection | 
					
						
							|  |  |  |            could be established in the first place. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |        Server( port=None, connection_hook=None ) creates a server on a | 
					
						
							|  |  |  |            well-known port, to which clients can connect.  When a client | 
					
						
							|  |  |  |            connects, a Connection is created for it.  If connection_hook | 
					
						
							|  |  |  |            is defined, then connection_hook( socket.getpeername() ) is called | 
					
						
							|  |  |  |            before a Connection is created, and if it returns false then the | 
					
						
							|  |  |  |            connection is refused.  connection_hook must be prepared to be | 
					
						
							|  |  |  |            called from any thread. | 
					
						
							|  |  |  |    | 
					
						
							|  |  |  |        Client( ip='127.0.0.1', port=None ) returns a Connection to a Server | 
					
						
							|  |  |  |            object at a well-known address and port. | 
					
						
							|  |  |  |    | 
					
						
							|  |  |  |        Connection( socket ) creates an RPC connection on an arbitrary socket, | 
					
						
							|  |  |  |            which must already be connected to another program.  You do not | 
					
						
							|  |  |  |            need to use this directly if you are using Client() or Server(). | 
					
						
							|  |  |  |    | 
					
						
							|  |  |  |        publish( name, connect_function ) provides an object with the | 
					
						
							|  |  |  |            specified name to some or all Connections.  When another program | 
					
						
							|  |  |  |            calls Connection.getobject() with the specified name, the | 
					
						
							|  |  |  |            specified connect_function is called with the arguments | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |               connect_function( conn, addr ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |            where conn is the Connection object to the requesting client and | 
					
						
							|  |  |  |            addr is the address returned by socket.getpeername().  If that | 
					
						
							|  |  |  |            function returns an object, that object becomes accessible to | 
					
						
							|  |  |  |            the caller.  If it returns None, the caller's request fails. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |      Connection objects: | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |        .close() refuses additional RPC messages from the peer, and notifies | 
					
						
							|  |  |  |            the peer that the connection has been closed.  All pending remote | 
					
						
							|  |  |  |            method calls in either program will fail with a connectionLost | 
					
						
							|  |  |  |            exception.  Further remote method calls on this connection will | 
					
						
							|  |  |  |            also result in errors. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |        .getobject(name) returns a proxy for the remote object with the | 
					
						
							|  |  |  |            specified name, if it exists and the peer permits us access. | 
					
						
							|  |  |  |            Otherwise, it returns None.  It may throw a connectionLost | 
					
						
							|  |  |  |            exception.  The returned proxy supports basic attribute access | 
					
						
							|  |  |  |            and method calls, and its methods have an extra attribute, | 
					
						
							|  |  |  |            .void, which is a function that has the same effect but always | 
					
						
							|  |  |  |            returns None.  This last capability is provided as a performance | 
					
						
							|  |  |  |            hack: object.method.void(params) can return without waiting for | 
					
						
							|  |  |  |            the remote process to respond, but object.method(params) needs | 
					
						
							|  |  |  |            to wait for a return value or exception. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |        .rpc_loop(block=0) processes *incoming* messages for this connection. | 
					
						
							|  |  |  |            If block=1, it continues processing until an exception or return | 
					
						
							|  |  |  |            value is received, which is normally forever.  Otherwise it | 
					
						
							|  |  |  |            returns when all currently pending messages have been delivered. | 
					
						
							|  |  |  |            It may throw a connectionLost exception. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |        .set_close_hook(f) specifies a function to be called when the remote | 
					
						
							|  |  |  |            object closes the connection during a call to rpc_loop().  This | 
					
						
							|  |  |  |            is a good way for servers to be notified when clients disconnect. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |        .set_shutdown_hook(f) specifies a function called *immediately* when | 
					
						
							|  |  |  |            the receive loop detects that the connection has been lost.  The | 
					
						
							|  |  |  |            provided function must be prepared to run in any thread. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |      Server objects: | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |        .rpc_loop() processes incoming messages on all connections, and | 
					
						
							|  |  |  |            returns when all pending messages have been processed.  It will | 
					
						
							|  |  |  |            *not* throw connectionLost exceptions; the | 
					
						
							|  |  |  |            Connection.set_close_hook() mechanism is much better for servers. | 
					
						
							|  |  |  | """
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import sys, os, string, types | 
					
						
							|  |  |  | import socket | 
					
						
							|  |  |  | from threading import Thread | 
					
						
							|  |  |  | from Queue import Queue, Empty | 
					
						
							|  |  |  | from cPickle import Pickler, Unpickler, PicklingError | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class connectionLost: | 
					
						
							|  |  |  |     def __init__(self, what=""): self.what = what | 
					
						
							|  |  |  |     def __repr__(self): return self.what | 
					
						
							|  |  |  |     def __str__(self): return self.what | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def getmethods(cls): | 
					
						
							|  |  |  |     "Returns a list of the names of the methods of a class." | 
					
						
							|  |  |  |     methods = [] | 
					
						
							|  |  |  |     for b in cls.__bases__: | 
					
						
							|  |  |  |         methods = methods + getmethods(b) | 
					
						
							|  |  |  |     d = cls.__dict__ | 
					
						
							|  |  |  |     for k in d.keys(): | 
					
						
							|  |  |  |         if type(d[k])==types.FunctionType: | 
					
						
							|  |  |  |             methods.append(k) | 
					
						
							|  |  |  |     return methods | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class methodproxy: | 
					
						
							|  |  |  |     "Proxy for a method of a remote object." | 
					
						
							|  |  |  |     def __init__(self, classp, name): | 
					
						
							|  |  |  |         self.classp=classp | 
					
						
							|  |  |  |         self.name=name | 
					
						
							|  |  |  |         self.client = classp.client | 
					
						
							|  |  |  |     def __call__(self, *args, **keywords): | 
					
						
							|  |  |  |         return self.client.call( 'm', self.classp.name, self.name, args, keywords ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def void(self, *args, **keywords): | 
					
						
							|  |  |  |         self.client.call_void( 'm', self.classp.name,self.name,args,keywords) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class classproxy: | 
					
						
							|  |  |  |     "Proxy for a remote object." | 
					
						
							|  |  |  |     def __init__(self, client, name, methods): | 
					
						
							|  |  |  |         self.__dict__['client'] = client | 
					
						
							|  |  |  |         self.__dict__['name'] = name | 
					
						
							|  |  |  |          | 
					
						
							|  |  |  |         for m in methods: | 
					
						
							|  |  |  |             prox = methodproxy( self, m ) | 
					
						
							|  |  |  |             self.__dict__[m] = prox | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def __getattr__(self, attr): | 
					
						
							|  |  |  |         return self.client.call( 'g', self.name, attr ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def __setattr__(self, attr, value): | 
					
						
							|  |  |  |         self.client.call_void( 's', self.name, attr, value ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | local_connect  = {} | 
					
						
							|  |  |  | def publish(name, connect_function): | 
					
						
							|  |  |  |     local_connect[name]=connect_function | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class socketFile: | 
					
						
							|  |  |  |     "File emulator based on a socket.  Provides only blocking semantics for now." | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def __init__(self, socket): | 
					
						
							|  |  |  |         self.socket = socket | 
					
						
							|  |  |  |         self.buffer = '' | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def _recv(self,bytes): | 
					
						
							|  |  |  |         try: | 
					
						
							|  |  |  |             r=self.socket.recv(bytes) | 
					
						
							|  |  |  |         except: | 
					
						
							|  |  |  |             raise connectionLost() | 
					
						
							|  |  |  |         if not r: | 
					
						
							|  |  |  |             raise connectionLost() | 
					
						
							|  |  |  |         return r | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def write(self, string): | 
					
						
							|  |  |  |         try: | 
					
						
							|  |  |  |             self.socket.send( string ) | 
					
						
							|  |  |  |         except: | 
					
						
							|  |  |  |             raise connectionLost() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def read(self,bytes): | 
					
						
							|  |  |  |         x = bytes-len(self.buffer) | 
					
						
							|  |  |  |         while x>0: | 
					
						
							|  |  |  |             self.buffer=self.buffer+self._recv(x) | 
					
						
							|  |  |  |             x = bytes-len(self.buffer) | 
					
						
							|  |  |  |         s = self.buffer[:bytes] | 
					
						
							|  |  |  |         self.buffer=self.buffer[bytes:] | 
					
						
							|  |  |  |         return s | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def readline(self): | 
					
						
							|  |  |  |         while 1: | 
					
						
							|  |  |  |             f = string.find(self.buffer,'\n') | 
					
						
							|  |  |  |             if f>=0: | 
					
						
							|  |  |  |                 s = self.buffer[:f+1] | 
					
						
							|  |  |  |                 self.buffer=self.buffer[f+1:] | 
					
						
							|  |  |  |                 return s | 
					
						
							|  |  |  |             self.buffer = self.buffer + self._recv(1024) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class Connection (Thread): | 
					
						
							|  |  |  |     debug = 0 | 
					
						
							|  |  |  |     def __init__(self, socket): | 
					
						
							|  |  |  |         self.local_objects = {} | 
					
						
							|  |  |  |         self.socket = socket | 
					
						
							|  |  |  |         self.name = socket.getpeername() | 
					
						
							|  |  |  |         self.socketfile = socketFile(socket) | 
					
						
							|  |  |  |         self.queue = Queue(-1) | 
					
						
							|  |  |  |         self.refuse_messages = 0 | 
					
						
							|  |  |  |         self.cmds = { 'm': self.r_meth, | 
					
						
							|  |  |  |                       'g': self.r_get, | 
					
						
							|  |  |  |                       's': self.r_set, | 
					
						
							|  |  |  |                       'o': self.r_geto, | 
					
						
							|  |  |  |                       'e': self.r_exc, | 
					
						
							|  |  |  |                      #'r' handled by rpc_loop | 
					
						
							|  |  |  |                     } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         Thread.__init__(self) | 
					
						
							|  |  |  |         self.setDaemon(1) | 
					
						
							|  |  |  |         self.start() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def getobject(self, name): | 
					
						
							|  |  |  |         methods = self.call( 'o', name ) | 
					
						
							|  |  |  |         if methods is None: return None | 
					
						
							|  |  |  |         return classproxy(self, name, methods) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     # close_hook is called from rpc_loop(), like a normal remote method | 
					
						
							|  |  |  |     #   invocation | 
					
						
							|  |  |  |     def set_close_hook(self,hook): self.close_hook = hook | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     # shutdown_hook is called directly from the run() thread, and needs | 
					
						
							|  |  |  |     #   to be "thread safe" | 
					
						
							|  |  |  |     def set_shutdown_hook(self,hook): self.shutdown_hook = hook | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     close_hook = None | 
					
						
							|  |  |  |     shutdown_hook = None | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def close(self): | 
					
						
							|  |  |  |         self._shutdown() | 
					
						
							|  |  |  |         self.refuse_messages = 1 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def call(self, c, *args): | 
					
						
							|  |  |  |         self.send( (c, args, 1 ) ) | 
					
						
							|  |  |  |         return self.rpc_loop( block = 1 ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def call_void(self, c, *args): | 
					
						
							|  |  |  |         try: | 
					
						
							|  |  |  |             self.send( (c, args, 0 ) ) | 
					
						
							|  |  |  |         except: | 
					
						
							|  |  |  |             pass | 
					
						
							|  |  |  |     | 
					
						
							|  |  |  |     # the following methods handle individual RPC calls: | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def r_geto(self, obj): | 
					
						
							|  |  |  |         c = local_connect.get(obj) | 
					
						
							|  |  |  |         if not c: return None | 
					
						
							|  |  |  |         o = c(self, self.name) | 
					
						
							|  |  |  |         if not o: return None | 
					
						
							|  |  |  |         self.local_objects[obj] = o | 
					
						
							|  |  |  |         return getmethods(o.__class__) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def r_meth(self, obj, name, args, keywords): | 
					
						
							|  |  |  |         return apply( getattr(self.local_objects[obj],name), args, keywords) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def r_get(self, obj, name):        | 
					
						
							|  |  |  |         return getattr(self.local_objects[obj],name) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def r_set(self, obj, name, value): | 
					
						
							|  |  |  |         setattr(self.local_objects[obj],name,value) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def r_exc(self, e, v): | 
					
						
							|  |  |  |         raise e, v | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def rpc_exec(self, cmd, arg, ret): | 
					
						
							|  |  |  |         if self.refuse_messages: return | 
					
						
							|  |  |  |         if self.debug: print cmd,arg,ret | 
					
						
							|  |  |  |         if ret: | 
					
						
							|  |  |  |             try: | 
					
						
							|  |  |  |                 r=apply(self.cmds.get(cmd), arg) | 
					
						
							|  |  |  |                 self.send( ('r', r, 0) ) | 
					
						
							|  |  |  |             except: | 
					
						
							|  |  |  |                 try: | 
					
						
							|  |  |  |                     self.send( ('e', sys.exc_info()[:2], 0) ) | 
					
						
							|  |  |  |                 except PicklingError: | 
					
						
							|  |  |  |                     self.send( ('e', (TypeError, 'Unpicklable exception.'), 0 ) ) | 
					
						
							|  |  |  |         else: | 
					
						
							|  |  |  |             # we cannot report exceptions to the caller, so | 
					
						
							|  |  |  |             #   we report them in this process. | 
					
						
							|  |  |  |             r=apply(self.cmds.get(cmd), arg) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     # the following methods implement the RPC and message loops: | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def rpc_loop(self, block=0): | 
					
						
							|  |  |  |         if self.refuse_messages: raise connectionLost('(already closed)') | 
					
						
							|  |  |  |         try: | 
					
						
							|  |  |  |             while 1: | 
					
						
							|  |  |  |                 try: | 
					
						
							|  |  |  |                     cmd, arg, ret = self.queue.get( block ) | 
					
						
							|  |  |  |                 except Empty: | 
					
						
							|  |  |  |                     return None | 
					
						
							|  |  |  |                 if cmd=='r': return arg | 
					
						
							|  |  |  |                 self.rpc_exec(cmd,arg,ret) | 
					
						
							|  |  |  |         except connectionLost: | 
					
						
							|  |  |  |             if self.close_hook: | 
					
						
							|  |  |  |                 self.close_hook() | 
					
						
							|  |  |  |                 self.close_hook = None | 
					
						
							|  |  |  |             raise | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def run(self): | 
					
						
							|  |  |  |         try: | 
					
						
							|  |  |  |             while 1: | 
					
						
							|  |  |  |                 data = self.recv() | 
					
						
							|  |  |  |                 self.queue.put( data ) | 
					
						
							|  |  |  |         except: | 
					
						
							|  |  |  |             self.queue.put( ('e', sys.exc_info()[:2], 0) ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     # The following send raw pickled data to the peer | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def send(self, data): | 
					
						
							|  |  |  |         try: | 
					
						
							|  |  |  |             Pickler(self.socketfile,1).dump( data ) | 
					
						
							|  |  |  |         except connectionLost: | 
					
						
							|  |  |  |             self._shutdown() | 
					
						
							|  |  |  |             if self.shutdown_hook: self.shutdown_hook() | 
					
						
							|  |  |  |             raise | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def recv(self): | 
					
						
							|  |  |  |         try: | 
					
						
							|  |  |  |             return Unpickler(self.socketfile).load() | 
					
						
							|  |  |  |         except connectionLost: | 
					
						
							|  |  |  |             self._shutdown() | 
					
						
							|  |  |  |             if self.shutdown_hook: self.shutdown_hook() | 
					
						
							|  |  |  |             raise | 
					
						
							|  |  |  |         except: | 
					
						
							|  |  |  |             raise | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def _shutdown(self): | 
					
						
							|  |  |  |         try: | 
					
						
							|  |  |  |             self.socket.shutdown(1) | 
					
						
							|  |  |  |             self.socket.close() | 
					
						
							|  |  |  |         except: | 
					
						
							|  |  |  |             pass | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class Server (Thread): | 
					
						
							|  |  |  |     default_port = 0x1D1E   # "IDlE" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def __init__(self, port=None, connection_hook=None): | 
					
						
							|  |  |  |         self.connections = [] | 
					
						
							|  |  |  |         self.port = port or self.default_port | 
					
						
							|  |  |  |         self.connection_hook = connection_hook | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         try: | 
					
						
							|  |  |  |             self.wellknown = s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | 
					
						
							| 
									
										
										
										
											2000-09-24 06:29:50 +00:00
										 |  |  |             s.bind(('', self.port)) | 
					
						
							| 
									
										
										
										
											2000-08-15 01:13:23 +00:00
										 |  |  |             s.listen(3) | 
					
						
							|  |  |  |         except: | 
					
						
							|  |  |  |             raise connectionLost | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         Thread.__init__(self) | 
					
						
							|  |  |  |         self.setDaemon(1) | 
					
						
							|  |  |  |         self.start() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def run(self): | 
					
						
							|  |  |  |         s = self.wellknown | 
					
						
							|  |  |  |         while 1: | 
					
						
							|  |  |  |             conn, addr = s.accept() | 
					
						
							|  |  |  |             if self.connection_hook and not self.connection_hook(addr): | 
					
						
							|  |  |  |                 try: | 
					
						
							|  |  |  |                     conn.shutdown(1) | 
					
						
							|  |  |  |                 except: | 
					
						
							|  |  |  |                     pass | 
					
						
							|  |  |  |                 continue | 
					
						
							|  |  |  |             self.connections.append( Connection(conn) ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def rpc_loop(self): | 
					
						
							|  |  |  |         cns = self.connections[:] | 
					
						
							|  |  |  |         for c in cns: | 
					
						
							|  |  |  |             try: | 
					
						
							|  |  |  |                 c.rpc_loop(block = 0) | 
					
						
							|  |  |  |             except connectionLost: | 
					
						
							|  |  |  |                 if c in self.connections: | 
					
						
							|  |  |  |                     self.connections.remove(c) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def Client(ip='127.0.0.1', port=None): | 
					
						
							|  |  |  |     try: | 
					
						
							|  |  |  |         s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | 
					
						
							| 
									
										
										
										
											2000-09-24 06:29:50 +00:00
										 |  |  |         s.connect((ip,port or Server.default_port)) | 
					
						
							| 
									
										
										
										
											2000-08-15 01:13:23 +00:00
										 |  |  |     except socket.error, what: | 
					
						
							|  |  |  |         raise connectionLost(str(what)) | 
					
						
							|  |  |  |     except: | 
					
						
							|  |  |  |         raise connectionLost() | 
					
						
							|  |  |  |     return Connection(s) |