mirror of
				https://github.com/python/cpython.git
				synced 2025-10-26 11:14:33 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			158 lines
		
	
	
	
		
			5.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			158 lines
		
	
	
	
		
			5.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| #
 | |
| # We use a background thread for sharing fds on Unix, and for sharing sockets on
 | |
| # Windows.
 | |
| #
 | |
| # A client which wants to pickle a resource registers it with the resource
 | |
| # sharer and gets an identifier in return.  The unpickling process will connect
 | |
| # to the resource sharer, sends the identifier and its pid, and then receives
 | |
| # the resource.
 | |
| #
 | |
| 
 | |
| import os
 | |
| import signal
 | |
| import socket
 | |
| import sys
 | |
| import threading
 | |
| 
 | |
| from . import process
 | |
| from . import reduction
 | |
| from . import util
 | |
| 
 | |
| __all__ = ['stop']
 | |
| 
 | |
| 
 | |
| if sys.platform == 'win32':
 | |
|     __all__ += ['DupSocket']
 | |
| 
 | |
|     class DupSocket(object):
 | |
|         '''Picklable wrapper for a socket.'''
 | |
|         def __init__(self, sock):
 | |
|             new_sock = sock.dup()
 | |
|             def send(conn, pid):
 | |
|                 share = new_sock.share(pid)
 | |
|                 conn.send_bytes(share)
 | |
|             self._id = _resource_sharer.register(send, new_sock.close)
 | |
| 
 | |
|         def detach(self):
 | |
|             '''Get the socket.  This should only be called once.'''
 | |
|             with _resource_sharer.get_connection(self._id) as conn:
 | |
|                 share = conn.recv_bytes()
 | |
|                 return socket.fromshare(share)
 | |
| 
 | |
| else:
 | |
|     __all__ += ['DupFd']
 | |
| 
 | |
|     class DupFd(object):
 | |
|         '''Wrapper for fd which can be used at any time.'''
 | |
|         def __init__(self, fd):
 | |
|             new_fd = os.dup(fd)
 | |
|             def send(conn, pid):
 | |
|                 reduction.send_handle(conn, new_fd, pid)
 | |
|             def close():
 | |
|                 os.close(new_fd)
 | |
|             self._id = _resource_sharer.register(send, close)
 | |
| 
 | |
|         def detach(self):
 | |
|             '''Get the fd.  This should only be called once.'''
 | |
|             with _resource_sharer.get_connection(self._id) as conn:
 | |
|                 return reduction.recv_handle(conn)
 | |
| 
 | |
| 
 | |
| class _ResourceSharer(object):
 | |
|     '''Manager for resouces using background thread.'''
 | |
|     def __init__(self):
 | |
|         self._key = 0
 | |
|         self._cache = {}
 | |
|         self._old_locks = []
 | |
|         self._lock = threading.Lock()
 | |
|         self._listener = None
 | |
|         self._address = None
 | |
|         self._thread = None
 | |
|         util.register_after_fork(self, _ResourceSharer._afterfork)
 | |
| 
 | |
|     def register(self, send, close):
 | |
|         '''Register resource, returning an identifier.'''
 | |
|         with self._lock:
 | |
|             if self._address is None:
 | |
|                 self._start()
 | |
|             self._key += 1
 | |
|             self._cache[self._key] = (send, close)
 | |
|             return (self._address, self._key)
 | |
| 
 | |
|     @staticmethod
 | |
|     def get_connection(ident):
 | |
|         '''Return connection from which to receive identified resource.'''
 | |
|         from .connection import Client
 | |
|         address, key = ident
 | |
|         c = Client(address, authkey=process.current_process().authkey)
 | |
|         c.send((key, os.getpid()))
 | |
|         return c
 | |
| 
 | |
|     def stop(self, timeout=None):
 | |
|         '''Stop the background thread and clear registered resources.'''
 | |
|         from .connection import Client
 | |
|         with self._lock:
 | |
|             if self._address is not None:
 | |
|                 c = Client(self._address,
 | |
|                            authkey=process.current_process().authkey)
 | |
|                 c.send(None)
 | |
|                 c.close()
 | |
|                 self._thread.join(timeout)
 | |
|                 if self._thread.is_alive():
 | |
|                     util.sub_warning('_ResourceSharer thread did '
 | |
|                                      'not stop when asked')
 | |
|                 self._listener.close()
 | |
|                 self._thread = None
 | |
|                 self._address = None
 | |
|                 self._listener = None
 | |
|                 for key, (send, close) in self._cache.items():
 | |
|                     close()
 | |
|                 self._cache.clear()
 | |
| 
 | |
|     def _afterfork(self):
 | |
|         for key, (send, close) in self._cache.items():
 | |
|             close()
 | |
|         self._cache.clear()
 | |
|         # If self._lock was locked at the time of the fork, it may be broken
 | |
|         # -- see issue 6721.  Replace it without letting it be gc'ed.
 | |
|         self._old_locks.append(self._lock)
 | |
|         self._lock = threading.Lock()
 | |
|         if self._listener is not None:
 | |
|             self._listener.close()
 | |
|         self._listener = None
 | |
|         self._address = None
 | |
|         self._thread = None
 | |
| 
 | |
|     def _start(self):
 | |
|         from .connection import Listener
 | |
|         assert self._listener is None
 | |
|         util.debug('starting listener and thread for sending handles')
 | |
|         self._listener = Listener(authkey=process.current_process().authkey)
 | |
|         self._address = self._listener.address
 | |
|         t = threading.Thread(target=self._serve)
 | |
|         t.daemon = True
 | |
|         t.start()
 | |
|         self._thread = t
 | |
| 
 | |
|     def _serve(self):
 | |
|         if hasattr(signal, 'pthread_sigmask'):
 | |
|             signal.pthread_sigmask(signal.SIG_BLOCK, range(1, signal.NSIG))
 | |
|         while 1:
 | |
|             try:
 | |
|                 with self._listener.accept() as conn:
 | |
|                     msg = conn.recv()
 | |
|                     if msg is None:
 | |
|                         break
 | |
|                     key, destination_pid = msg
 | |
|                     send, close = self._cache.pop(key)
 | |
|                     try:
 | |
|                         send(conn, destination_pid)
 | |
|                     finally:
 | |
|                         close()
 | |
|             except:
 | |
|                 if not util.is_exiting():
 | |
|                     sys.excepthook(*sys.exc_info())
 | |
| 
 | |
| 
 | |
| _resource_sharer = _ResourceSharer()
 | |
| stop = _resource_sharer.stop
 | 
