| 
									
										
										
										
											2008-06-11 16:44:04 +00:00
										 |  |  | # | 
					
						
							|  |  |  | # Module to allow connection and socket objects to be transferred | 
					
						
							|  |  |  | # between processes | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | # multiprocessing/reduction.py | 
					
						
							|  |  |  | # | 
					
						
							| 
									
										
										
										
											2010-12-14 01:38:16 +00:00
										 |  |  | # Copyright (c) 2006-2008, R Oudkerk | 
					
						
							| 
									
										
										
										
											2012-04-30 12:13:55 +01:00
										 |  |  | # Licensed to PSF under a Contributor Agreement. | 
					
						
							| 
									
										
										
										
											2008-06-11 16:44:04 +00:00
										 |  |  | # | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2012-04-24 22:56:57 +02:00
										 |  |  | __all__ = ['reduce_socket', 'reduce_connection', 'send_handle', 'recv_handle'] | 
					
						
							| 
									
										
										
										
											2008-06-11 16:44:04 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  | import os | 
					
						
							|  |  |  | import sys | 
					
						
							|  |  |  | import socket | 
					
						
							|  |  |  | import threading | 
					
						
							| 
									
										
										
										
											2011-09-24 20:04:29 +02:00
										 |  |  | import struct | 
					
						
							| 
									
										
										
										
											2012-04-27 23:51:03 +02:00
										 |  |  | import signal | 
					
						
							| 
									
										
										
										
											2008-06-11 16:44:04 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  | from multiprocessing import current_process | 
					
						
							|  |  |  | from multiprocessing.util import register_after_fork, debug, sub_debug | 
					
						
							| 
									
										
										
										
											2012-04-24 22:56:57 +02:00
										 |  |  | from multiprocessing.util import is_exiting, sub_warning | 
					
						
							| 
									
										
										
										
											2008-06-11 16:44:04 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2011-09-24 20:04:29 +02:00
										 |  |  | if not(sys.platform == 'win32' or (hasattr(socket, 'CMSG_LEN') and | 
					
						
							|  |  |  |                                    hasattr(socket, 'SCM_RIGHTS'))): | 
					
						
							| 
									
										
										
										
											2008-06-11 16:44:04 +00:00
										 |  |  |     raise ImportError('pickling of connections not supported') | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | # Platform specific definitions | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | if sys.platform == 'win32': | 
					
						
							| 
									
										
										
										
											2012-04-24 22:56:57 +02:00
										 |  |  |     # Windows | 
					
						
							|  |  |  |     __all__ += ['reduce_pipe_connection'] | 
					
						
							| 
									
										
										
										
											2012-04-18 20:51:15 +02:00
										 |  |  |     import _winapi | 
					
						
							| 
									
										
										
										
											2008-06-11 16:44:04 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def send_handle(conn, handle, destination_pid): | 
					
						
							| 
									
										
										
										
											2012-04-24 22:56:57 +02:00
										 |  |  |         dh = DupHandle(handle, _winapi.DUPLICATE_SAME_ACCESS, destination_pid) | 
					
						
							|  |  |  |         conn.send(dh) | 
					
						
							| 
									
										
										
										
											2008-06-11 16:44:04 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def recv_handle(conn): | 
					
						
							| 
									
										
										
										
											2012-04-24 22:56:57 +02:00
										 |  |  |         return conn.recv().detach() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     class DupHandle(object): | 
					
						
							|  |  |  |         def __init__(self, handle, access, pid=None): | 
					
						
							|  |  |  |             # duplicate handle for process with given pid | 
					
						
							|  |  |  |             if pid is None: | 
					
						
							|  |  |  |                 pid = os.getpid() | 
					
						
							|  |  |  |             proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, pid) | 
					
						
							|  |  |  |             try: | 
					
						
							|  |  |  |                 self._handle = _winapi.DuplicateHandle( | 
					
						
							|  |  |  |                     _winapi.GetCurrentProcess(), | 
					
						
							|  |  |  |                     handle, proc, access, False, 0) | 
					
						
							|  |  |  |             finally: | 
					
						
							|  |  |  |                 _winapi.CloseHandle(proc) | 
					
						
							|  |  |  |             self._access = access | 
					
						
							|  |  |  |             self._pid = pid | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         def detach(self): | 
					
						
							|  |  |  |             # retrieve handle from process which currently owns it | 
					
						
							|  |  |  |             if self._pid == os.getpid(): | 
					
						
							|  |  |  |                 return self._handle | 
					
						
							|  |  |  |             proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, | 
					
						
							|  |  |  |                                        self._pid) | 
					
						
							|  |  |  |             try: | 
					
						
							|  |  |  |                 return _winapi.DuplicateHandle( | 
					
						
							|  |  |  |                     proc, self._handle, _winapi.GetCurrentProcess(), | 
					
						
							|  |  |  |                     self._access, False, _winapi.DUPLICATE_CLOSE_SOURCE) | 
					
						
							|  |  |  |             finally: | 
					
						
							|  |  |  |                 _winapi.CloseHandle(proc) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     class DupSocket(object): | 
					
						
							|  |  |  |         def __init__(self, sock): | 
					
						
							|  |  |  |             new_sock = sock.dup() | 
					
						
							|  |  |  |             def send(conn, pid): | 
					
						
							|  |  |  |                 share = new_sock.share(pid) | 
					
						
							|  |  |  |                 conn.send_bytes(share) | 
					
						
							|  |  |  |             self._id = resource_sharer.register(send, new_sock.close) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         def detach(self): | 
					
						
							|  |  |  |             conn = resource_sharer.get_connection(self._id) | 
					
						
							|  |  |  |             try: | 
					
						
							|  |  |  |                 share = conn.recv_bytes() | 
					
						
							|  |  |  |                 return socket.fromshare(share) | 
					
						
							|  |  |  |             finally: | 
					
						
							|  |  |  |                 conn.close() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def reduce_socket(s): | 
					
						
							|  |  |  |         return rebuild_socket, (DupSocket(s),) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def rebuild_socket(ds): | 
					
						
							|  |  |  |         return ds.detach() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def reduce_connection(conn): | 
					
						
							|  |  |  |         handle = conn.fileno() | 
					
						
							|  |  |  |         with socket.fromfd(handle, socket.AF_INET, socket.SOCK_STREAM) as s: | 
					
						
							|  |  |  |             ds = DupSocket(s) | 
					
						
							|  |  |  |             return rebuild_connection, (ds, conn.readable, conn.writable) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def rebuild_connection(ds, readable, writable): | 
					
						
							|  |  |  |         from .connection import Connection | 
					
						
							|  |  |  |         sock = ds.detach() | 
					
						
							|  |  |  |         return Connection(sock.detach(), readable, writable) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def reduce_pipe_connection(conn): | 
					
						
							|  |  |  |         access = ((_winapi.FILE_GENERIC_READ if conn.readable else 0) | | 
					
						
							|  |  |  |                   (_winapi.FILE_GENERIC_WRITE if conn.writable else 0)) | 
					
						
							|  |  |  |         dh = DupHandle(conn.fileno(), access) | 
					
						
							|  |  |  |         return rebuild_pipe_connection, (dh, conn.readable, conn.writable) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def rebuild_pipe_connection(dh, readable, writable): | 
					
						
							|  |  |  |         from .connection import PipeConnection | 
					
						
							|  |  |  |         handle = dh.detach() | 
					
						
							|  |  |  |         return PipeConnection(handle, readable, writable) | 
					
						
							| 
									
										
										
										
											2008-06-11 16:44:04 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  | else: | 
					
						
							| 
									
										
										
										
											2012-04-24 22:56:57 +02:00
										 |  |  |     # Unix | 
					
						
							| 
									
										
										
										
											2012-08-16 16:48:55 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  |     # On MacOSX we should acknowledge receipt of fds -- see Issue14669 | 
					
						
							|  |  |  |     ACKNOWLEDGE = sys.platform == 'darwin' | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2008-06-11 16:44:04 +00:00
										 |  |  |     def send_handle(conn, handle, destination_pid): | 
					
						
							| 
									
										
										
										
											2011-09-24 20:04:29 +02:00
										 |  |  |         with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s: | 
					
						
							|  |  |  |             s.sendmsg([b'x'], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, | 
					
						
							|  |  |  |                                 struct.pack("@i", handle))]) | 
					
						
							| 
									
										
										
										
											2012-08-16 16:48:55 +01:00
										 |  |  |         if ACKNOWLEDGE and conn.recv_bytes() != b'ACK': | 
					
						
							|  |  |  |             raise RuntimeError('did not receive acknowledgement of fd') | 
					
						
							| 
									
										
										
										
											2008-06-11 16:44:04 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def recv_handle(conn): | 
					
						
							| 
									
										
										
										
											2011-09-24 20:04:29 +02:00
										 |  |  |         size = struct.calcsize("@i") | 
					
						
							|  |  |  |         with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s: | 
					
						
							|  |  |  |             msg, ancdata, flags, addr = s.recvmsg(1, socket.CMSG_LEN(size)) | 
					
						
							|  |  |  |             try: | 
					
						
							| 
									
										
										
										
											2012-08-16 16:48:55 +01:00
										 |  |  |                 if ACKNOWLEDGE: | 
					
						
							|  |  |  |                     conn.send_bytes(b'ACK') | 
					
						
							| 
									
										
										
										
											2011-09-24 20:04:29 +02:00
										 |  |  |                 cmsg_level, cmsg_type, cmsg_data = ancdata[0] | 
					
						
							|  |  |  |                 if (cmsg_level == socket.SOL_SOCKET and | 
					
						
							|  |  |  |                     cmsg_type == socket.SCM_RIGHTS): | 
					
						
							|  |  |  |                     return struct.unpack("@i", cmsg_data[:size])[0] | 
					
						
							|  |  |  |             except (ValueError, IndexError, struct.error): | 
					
						
							|  |  |  |                 pass | 
					
						
							|  |  |  |             raise RuntimeError('Invalid data received') | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2012-04-24 22:56:57 +02:00
										 |  |  |     class DupFd(object): | 
					
						
							|  |  |  |         def __init__(self, fd): | 
					
						
							|  |  |  |             new_fd = os.dup(fd) | 
					
						
							|  |  |  |             def send(conn, pid): | 
					
						
							|  |  |  |                 send_handle(conn, new_fd, pid) | 
					
						
							|  |  |  |             def close(): | 
					
						
							|  |  |  |                 os.close(new_fd) | 
					
						
							|  |  |  |             self._id = resource_sharer.register(send, close) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         def detach(self): | 
					
						
							|  |  |  |             conn = resource_sharer.get_connection(self._id) | 
					
						
							|  |  |  |             try: | 
					
						
							|  |  |  |                 return recv_handle(conn) | 
					
						
							|  |  |  |             finally: | 
					
						
							|  |  |  |                 conn.close() | 
					
						
							| 
									
										
										
										
											2008-06-11 16:44:04 +00:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2012-04-24 22:56:57 +02:00
										 |  |  |     def reduce_socket(s): | 
					
						
							|  |  |  |         df = DupFd(s.fileno()) | 
					
						
							|  |  |  |         return rebuild_socket, (df, s.family, s.type, s.proto) | 
					
						
							| 
									
										
										
										
											2008-06-11 16:44:04 +00:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2012-04-24 22:56:57 +02:00
										 |  |  |     def rebuild_socket(df, family, type, proto): | 
					
						
							|  |  |  |         fd = df.detach() | 
					
						
							|  |  |  |         s = socket.fromfd(fd, family, type, proto) | 
					
						
							|  |  |  |         os.close(fd) | 
					
						
							|  |  |  |         return s | 
					
						
							| 
									
										
										
										
											2008-06-11 16:44:04 +00:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2012-04-24 22:56:57 +02:00
										 |  |  |     def reduce_connection(conn): | 
					
						
							|  |  |  |         df = DupFd(conn.fileno()) | 
					
						
							|  |  |  |         return rebuild_connection, (df, conn.readable, conn.writable) | 
					
						
							| 
									
										
										
										
											2008-06-11 16:44:04 +00:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2012-04-24 22:56:57 +02:00
										 |  |  |     def rebuild_connection(df, readable, writable): | 
					
						
							|  |  |  |         from .connection import Connection | 
					
						
							|  |  |  |         fd = df.detach() | 
					
						
							|  |  |  |         return Connection(fd, readable, writable) | 
					
						
							| 
									
										
										
										
											2008-06-11 16:44:04 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  | # | 
					
						
							| 
									
										
										
										
											2012-04-24 22:56:57 +02:00
										 |  |  | # Server which shares registered resources with clients | 
					
						
							| 
									
										
										
										
											2008-06-11 16:44:04 +00:00
										 |  |  | # | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2012-04-24 22:56:57 +02:00
										 |  |  | class ResourceSharer(object): | 
					
						
							|  |  |  |     def __init__(self): | 
					
						
							|  |  |  |         self._key = 0 | 
					
						
							|  |  |  |         self._cache = {} | 
					
						
							|  |  |  |         self._old_locks = [] | 
					
						
							|  |  |  |         self._lock = threading.Lock() | 
					
						
							|  |  |  |         self._listener = None | 
					
						
							|  |  |  |         self._address = None | 
					
						
							| 
									
										
										
										
											2012-04-27 23:51:03 +02:00
										 |  |  |         self._thread = None | 
					
						
							| 
									
										
										
										
											2012-04-24 22:56:57 +02:00
										 |  |  |         register_after_fork(self, ResourceSharer._afterfork) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def register(self, send, close): | 
					
						
							|  |  |  |         with self._lock: | 
					
						
							|  |  |  |             if self._address is None: | 
					
						
							|  |  |  |                 self._start() | 
					
						
							|  |  |  |             self._key += 1 | 
					
						
							|  |  |  |             self._cache[self._key] = (send, close) | 
					
						
							|  |  |  |             return (self._address, self._key) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     @staticmethod | 
					
						
							|  |  |  |     def get_connection(ident): | 
					
						
							|  |  |  |         from .connection import Client | 
					
						
							|  |  |  |         address, key = ident | 
					
						
							|  |  |  |         c = Client(address, authkey=current_process().authkey) | 
					
						
							|  |  |  |         c.send((key, os.getpid())) | 
					
						
							|  |  |  |         return c | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2012-04-27 23:51:03 +02:00
										 |  |  |     def stop(self, timeout=None): | 
					
						
							|  |  |  |         from .connection import Client | 
					
						
							|  |  |  |         with self._lock: | 
					
						
							|  |  |  |             if self._address is not None: | 
					
						
							|  |  |  |                 c = Client(self._address, authkey=current_process().authkey) | 
					
						
							|  |  |  |                 c.send(None) | 
					
						
							|  |  |  |                 c.close() | 
					
						
							|  |  |  |                 self._thread.join(timeout) | 
					
						
							|  |  |  |                 if self._thread.is_alive(): | 
					
						
							|  |  |  |                     sub_warn('ResourceSharer thread did not stop when asked') | 
					
						
							|  |  |  |                 self._listener.close() | 
					
						
							|  |  |  |                 self._thread = None | 
					
						
							|  |  |  |                 self._address = None | 
					
						
							|  |  |  |                 self._listener = None | 
					
						
							|  |  |  |                 for key, (send, close) in self._cache.items(): | 
					
						
							|  |  |  |                     close() | 
					
						
							|  |  |  |                 self._cache.clear() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2012-04-24 22:56:57 +02:00
										 |  |  |     def _afterfork(self): | 
					
						
							|  |  |  |         for key, (send, close) in self._cache.items(): | 
					
						
							|  |  |  |             close() | 
					
						
							|  |  |  |         self._cache.clear() | 
					
						
							|  |  |  |         # If self._lock was locked at the time of the fork, it may be broken | 
					
						
							|  |  |  |         # -- see issue 6721.  Replace it without letting it be gc'ed. | 
					
						
							|  |  |  |         self._old_locks.append(self._lock) | 
					
						
							|  |  |  |         self._lock = threading.Lock() | 
					
						
							|  |  |  |         if self._listener is not None: | 
					
						
							|  |  |  |             self._listener.close() | 
					
						
							|  |  |  |         self._listener = None | 
					
						
							|  |  |  |         self._address = None | 
					
						
							| 
									
										
										
										
											2012-04-27 23:51:03 +02:00
										 |  |  |         self._thread = None | 
					
						
							| 
									
										
										
										
											2012-04-24 22:56:57 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def _start(self): | 
					
						
							|  |  |  |         from .connection import Listener | 
					
						
							|  |  |  |         assert self._listener is None | 
					
						
							|  |  |  |         debug('starting listener and thread for sending handles') | 
					
						
							|  |  |  |         self._listener = Listener(authkey=current_process().authkey) | 
					
						
							|  |  |  |         self._address = self._listener.address | 
					
						
							|  |  |  |         t = threading.Thread(target=self._serve) | 
					
						
							|  |  |  |         t.daemon = True | 
					
						
							|  |  |  |         t.start() | 
					
						
							| 
									
										
										
										
											2012-04-27 23:51:03 +02:00
										 |  |  |         self._thread = t | 
					
						
							| 
									
										
										
										
											2012-04-24 22:56:57 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def _serve(self): | 
					
						
							| 
									
										
										
										
											2012-04-27 23:51:03 +02:00
										 |  |  |         if hasattr(signal, 'pthread_sigmask'): | 
					
						
							|  |  |  |             signal.pthread_sigmask(signal.SIG_BLOCK, range(1, signal.NSIG)) | 
					
						
							| 
									
										
										
										
											2012-04-24 22:56:57 +02:00
										 |  |  |         while 1: | 
					
						
							|  |  |  |             try: | 
					
						
							|  |  |  |                 conn = self._listener.accept() | 
					
						
							| 
									
										
										
										
											2012-04-27 23:51:03 +02:00
										 |  |  |                 msg = conn.recv() | 
					
						
							|  |  |  |                 if msg is None: | 
					
						
							|  |  |  |                     break | 
					
						
							|  |  |  |                 key, destination_pid = msg | 
					
						
							| 
									
										
										
										
											2012-04-24 22:56:57 +02:00
										 |  |  |                 send, close = self._cache.pop(key) | 
					
						
							|  |  |  |                 send(conn, destination_pid) | 
					
						
							|  |  |  |                 close() | 
					
						
							|  |  |  |                 conn.close() | 
					
						
							|  |  |  |             except: | 
					
						
							|  |  |  |                 if not is_exiting(): | 
					
						
							|  |  |  |                     import traceback | 
					
						
							|  |  |  |                     sub_warning( | 
					
						
							|  |  |  |                         'thread for sharing handles raised exception :\n' + | 
					
						
							|  |  |  |                         '-'*79 + '\n' + traceback.format_exc() + '-'*79 | 
					
						
							|  |  |  |                         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | resource_sharer = ResourceSharer() |