| 
									
										
										
										
											2008-06-11 16:44:04 +00:00
										 |  |  | # | 
					
						
							|  |  |  | # Module to allow spawning of processes on foreign host | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | # Depends on `multiprocessing` package -- tested with `processing-0.60` | 
					
						
							|  |  |  | # | 
					
						
							| 
									
										
											  
											
												Merged revisions 67348,67355,67359,67362,67364-67365,67367-67368,67398,67423-67424,67432,67440-67441,67444-67445,67454-67455,67457-67458 via svnmerge from
svn+ssh://pythondev@svn.python.org/python/trunk
........
  r67348 | benjamin.peterson | 2008-11-22 20:09:41 -0600 (Sat, 22 Nov 2008) | 1 line
  raise a better error
........
  r67355 | georg.brandl | 2008-11-23 13:17:25 -0600 (Sun, 23 Nov 2008) | 2 lines
  #4392: fix parameter name.
........
  r67359 | georg.brandl | 2008-11-23 15:57:30 -0600 (Sun, 23 Nov 2008) | 2 lines
  #4399: fix typo.
........
  r67362 | gregory.p.smith | 2008-11-23 18:41:43 -0600 (Sun, 23 Nov 2008) | 2 lines
  Document PY_SSIZE_T_CLEAN for PyArg_ParseTuple.
........
  r67364 | benjamin.peterson | 2008-11-23 19:16:29 -0600 (Sun, 23 Nov 2008) | 2 lines
  replace reference to debugger-hooks
........
  r67365 | benjamin.peterson | 2008-11-23 22:09:03 -0600 (Sun, 23 Nov 2008) | 1 line
  #4396 make the parser module correctly validate the with syntax
........
  r67367 | georg.brandl | 2008-11-24 10:16:07 -0600 (Mon, 24 Nov 2008) | 2 lines
  Fix typo.
........
  r67368 | georg.brandl | 2008-11-24 13:56:47 -0600 (Mon, 24 Nov 2008) | 2 lines
  #4404: make clear what "path" is.
........
  r67398 | benjamin.peterson | 2008-11-26 11:39:17 -0600 (Wed, 26 Nov 2008) | 1 line
  fix typo in sqlite3 docs
........
  r67423 | jesse.noller | 2008-11-28 12:59:35 -0600 (Fri, 28 Nov 2008) | 2 lines
  issue4238: bsd support for cpu_count
........
  r67424 | christian.heimes | 2008-11-28 13:33:33 -0600 (Fri, 28 Nov 2008) | 1 line
  Retain copyright of processing examples. This was requested by a Debian maintainer during packaging of the multiprocessing package for 2.4/2.5
........
  r67432 | benjamin.peterson | 2008-11-28 17:18:46 -0600 (Fri, 28 Nov 2008) | 1 line
  SVN format 9 is the same it seems
........
  r67440 | jeremy.hylton | 2008-11-28 17:42:59 -0600 (Fri, 28 Nov 2008) | 4 lines
  Move definition int sval into branch of ifdef where it is used.
  Otherwise, you get a warning about an undefined variable.
........
  r67441 | jeremy.hylton | 2008-11-28 18:09:16 -0600 (Fri, 28 Nov 2008) | 2 lines
  Reflow long lines.
........
  r67444 | amaury.forgeotdarc | 2008-11-28 20:03:32 -0600 (Fri, 28 Nov 2008) | 2 lines
  Fix a small typo in docstring
........
  r67445 | benjamin.peterson | 2008-11-29 21:07:33 -0600 (Sat, 29 Nov 2008) | 1 line
  StringIO.close() stops you from using the buffer, too
........
  r67454 | benjamin.peterson | 2008-11-30 08:43:23 -0600 (Sun, 30 Nov 2008) | 1 line
  note the version that works
........
  r67455 | martin.v.loewis | 2008-11-30 13:28:27 -0600 (Sun, 30 Nov 2008) | 1 line
  Issue #4365: Add crtassem.h constants to the msvcrt module.
........
  r67457 | christian.heimes | 2008-11-30 15:16:28 -0600 (Sun, 30 Nov 2008) | 1 line
  w# requires Py_ssize_t
........
  r67458 | benjamin.peterson | 2008-11-30 15:46:16 -0600 (Sun, 30 Nov 2008) | 1 line
  fix pyspecific extensions that were broken by Sphinx's grand renaming
........
											
										 
											2008-11-30 22:46:23 +00:00
										 |  |  | # Copyright (c) 2006-2008, R Oudkerk | 
					
						
							|  |  |  | # All rights reserved. | 
					
						
							|  |  |  | # | 
					
						
							| 
									
										
										
										
											2008-06-11 16:44:04 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  | __all__ = ['Cluster', 'Host', 'get_logger', 'current_process'] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | # Imports | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import sys | 
					
						
							|  |  |  | import os | 
					
						
							|  |  |  | import tarfile | 
					
						
							|  |  |  | import shutil | 
					
						
							|  |  |  | import subprocess | 
					
						
							|  |  |  | import logging | 
					
						
							|  |  |  | import itertools | 
					
						
							| 
									
										
										
										
											2008-08-19 19:17:39 +00:00
										 |  |  | import queue | 
					
						
							| 
									
										
										
										
											2008-06-11 16:44:04 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  | try: | 
					
						
							| 
									
										
										
										
											2008-08-19 19:17:39 +00:00
										 |  |  |     import pickle as pickle | 
					
						
							| 
									
										
										
										
											2008-06-11 16:44:04 +00:00
										 |  |  | except ImportError: | 
					
						
							|  |  |  |     import pickle | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | from multiprocessing import Process, current_process, cpu_count | 
					
						
							|  |  |  | from multiprocessing import util, managers, connection, forking, pool | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | # Logging | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def get_logger(): | 
					
						
							|  |  |  |     return _logger | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | _logger = logging.getLogger('distributing') | 
					
						
							| 
									
										
										
										
											2009-04-27 16:51:45 +00:00
										 |  |  | _logger.propagate = 0 | 
					
						
							| 
									
										
										
										
											2008-06-11 16:44:04 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  | _formatter = logging.Formatter(util.DEFAULT_LOGGING_FORMAT) | 
					
						
							|  |  |  | _handler = logging.StreamHandler() | 
					
						
							|  |  |  | _handler.setFormatter(_formatter) | 
					
						
							|  |  |  | _logger.addHandler(_handler) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | info = _logger.info | 
					
						
							|  |  |  | debug = _logger.debug | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | # Get number of cpus | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | try: | 
					
						
							|  |  |  |     slot_count = cpu_count() | 
					
						
							|  |  |  | except NotImplemented: | 
					
						
							|  |  |  |     slot_count = 1 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | # Manager type which spawns subprocesses | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class HostManager(managers.SyncManager): | 
					
						
							|  |  |  |     '''
 | 
					
						
							|  |  |  |     Manager type used for spawning processes on a (presumably) foreign host | 
					
						
							|  |  |  |     '''
 | 
					
						
							|  |  |  |     def __init__(self, address, authkey): | 
					
						
							|  |  |  |         managers.SyncManager.__init__(self, address, authkey) | 
					
						
							|  |  |  |         self._name = 'Host-unknown' | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def Process(self, group=None, target=None, name=None, args=(), kwargs={}): | 
					
						
							|  |  |  |         if hasattr(sys.modules['__main__'], '__file__'): | 
					
						
							|  |  |  |             main_path = os.path.basename(sys.modules['__main__'].__file__) | 
					
						
							|  |  |  |         else: | 
					
						
							|  |  |  |             main_path = None | 
					
						
							|  |  |  |         data = pickle.dumps((target, args, kwargs)) | 
					
						
							|  |  |  |         p = self._RemoteProcess(data, main_path) | 
					
						
							|  |  |  |         if name is None: | 
					
						
							|  |  |  |             temp = self._name.split('Host-')[-1] + '/Process-%s' | 
					
						
							|  |  |  |             name = temp % ':'.join(map(str, p.get_identity())) | 
					
						
							|  |  |  |         p.set_name(name) | 
					
						
							|  |  |  |         return p | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     @classmethod | 
					
						
							|  |  |  |     def from_address(cls, address, authkey): | 
					
						
							|  |  |  |         manager = cls(address, authkey) | 
					
						
							|  |  |  |         managers.transact(address, authkey, 'dummy') | 
					
						
							|  |  |  |         manager._state.value = managers.State.STARTED | 
					
						
							|  |  |  |         manager._name = 'Host-%s:%s' % manager.address | 
					
						
							|  |  |  |         manager.shutdown = util.Finalize( | 
					
						
							|  |  |  |             manager, HostManager._finalize_host, | 
					
						
							|  |  |  |             args=(manager._address, manager._authkey, manager._name), | 
					
						
							|  |  |  |             exitpriority=-10 | 
					
						
							|  |  |  |             ) | 
					
						
							|  |  |  |         return manager | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     @staticmethod | 
					
						
							|  |  |  |     def _finalize_host(address, authkey, name): | 
					
						
							|  |  |  |         managers.transact(address, authkey, 'shutdown') | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def __repr__(self): | 
					
						
							|  |  |  |         return '<Host(%s)>' % self._name | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | # Process subclass representing a process on (possibly) a remote machine | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class RemoteProcess(Process): | 
					
						
							|  |  |  |     '''
 | 
					
						
							|  |  |  |     Represents a process started on a remote host | 
					
						
							|  |  |  |     '''
 | 
					
						
							|  |  |  |     def __init__(self, data, main_path): | 
					
						
							|  |  |  |         assert not main_path or os.path.basename(main_path) == main_path | 
					
						
							|  |  |  |         Process.__init__(self) | 
					
						
							|  |  |  |         self._data = data | 
					
						
							|  |  |  |         self._main_path = main_path | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def _bootstrap(self): | 
					
						
							|  |  |  |         forking.prepare({'main_path': self._main_path}) | 
					
						
							|  |  |  |         self._target, self._args, self._kwargs = pickle.loads(self._data) | 
					
						
							|  |  |  |         return Process._bootstrap(self) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def get_identity(self): | 
					
						
							|  |  |  |         return self._identity | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | HostManager.register('_RemoteProcess', RemoteProcess) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | # A Pool class that uses a cluster | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class DistributedPool(pool.Pool): | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def __init__(self, cluster, processes=None, initializer=None, initargs=()): | 
					
						
							|  |  |  |         self._cluster = cluster | 
					
						
							|  |  |  |         self.Process = cluster.Process | 
					
						
							|  |  |  |         pool.Pool.__init__(self, processes or len(cluster), | 
					
						
							|  |  |  |                            initializer, initargs) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def _setup_queues(self): | 
					
						
							|  |  |  |         self._inqueue = self._cluster._SettableQueue() | 
					
						
							|  |  |  |         self._outqueue = self._cluster._SettableQueue() | 
					
						
							|  |  |  |         self._quick_put = self._inqueue.put | 
					
						
							|  |  |  |         self._quick_get = self._outqueue.get | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     @staticmethod | 
					
						
							|  |  |  |     def _help_stuff_finish(inqueue, task_handler, size): | 
					
						
							|  |  |  |         inqueue.set_contents([None] * size) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | # Manager type which starts host managers on other machines | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def LocalProcess(**kwds): | 
					
						
							|  |  |  |     p = Process(**kwds) | 
					
						
							| 
									
										
										
										
											2008-08-19 19:17:39 +00:00
										 |  |  |     p.set_name('localhost/' + p.name) | 
					
						
							| 
									
										
										
										
											2008-06-11 16:44:04 +00:00
										 |  |  |     return p | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class Cluster(managers.SyncManager): | 
					
						
							|  |  |  |     '''
 | 
					
						
							|  |  |  |     Represents collection of slots running on various hosts. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     `Cluster` is a subclass of `SyncManager` so it allows creation of | 
					
						
							|  |  |  |     various types of shared objects. | 
					
						
							|  |  |  |     '''
 | 
					
						
							|  |  |  |     def __init__(self, hostlist, modules): | 
					
						
							|  |  |  |         managers.SyncManager.__init__(self, address=('localhost', 0)) | 
					
						
							|  |  |  |         self._hostlist = hostlist | 
					
						
							|  |  |  |         self._modules = modules | 
					
						
							|  |  |  |         if __name__ not in modules: | 
					
						
							|  |  |  |             modules.append(__name__) | 
					
						
							|  |  |  |         files = [sys.modules[name].__file__ for name in modules] | 
					
						
							|  |  |  |         for i, file in enumerate(files): | 
					
						
							|  |  |  |             if file.endswith('.pyc') or file.endswith('.pyo'): | 
					
						
							|  |  |  |                 files[i] = file[:-4] + '.py' | 
					
						
							|  |  |  |         self._files = [os.path.abspath(file) for file in files] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def start(self): | 
					
						
							|  |  |  |         managers.SyncManager.start(self) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         l = connection.Listener(family='AF_INET', authkey=self._authkey) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         for i, host in enumerate(self._hostlist): | 
					
						
							|  |  |  |             host._start_manager(i, self._authkey, l.address, self._files) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         for host in self._hostlist: | 
					
						
							|  |  |  |             if host.hostname != 'localhost': | 
					
						
							|  |  |  |                 conn = l.accept() | 
					
						
							|  |  |  |                 i, address, cpus = conn.recv() | 
					
						
							|  |  |  |                 conn.close() | 
					
						
							|  |  |  |                 other_host = self._hostlist[i] | 
					
						
							|  |  |  |                 other_host.manager = HostManager.from_address(address, | 
					
						
							|  |  |  |                                                               self._authkey) | 
					
						
							|  |  |  |                 other_host.slots = other_host.slots or cpus | 
					
						
							|  |  |  |                 other_host.Process = other_host.manager.Process | 
					
						
							|  |  |  |             else: | 
					
						
							|  |  |  |                 host.slots = host.slots or slot_count | 
					
						
							|  |  |  |                 host.Process = LocalProcess | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self._slotlist = [ | 
					
						
							|  |  |  |             Slot(host) for host in self._hostlist for i in range(host.slots) | 
					
						
							|  |  |  |             ] | 
					
						
							|  |  |  |         self._slot_iterator = itertools.cycle(self._slotlist) | 
					
						
							|  |  |  |         self._base_shutdown = self.shutdown | 
					
						
							|  |  |  |         del self.shutdown | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def shutdown(self): | 
					
						
							|  |  |  |         for host in self._hostlist: | 
					
						
							|  |  |  |             if host.hostname != 'localhost': | 
					
						
							|  |  |  |                 host.manager.shutdown() | 
					
						
							|  |  |  |         self._base_shutdown() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def Process(self, group=None, target=None, name=None, args=(), kwargs={}): | 
					
						
							| 
									
										
										
										
											2008-08-19 19:17:39 +00:00
										 |  |  |         slot = next(self._slot_iterator) | 
					
						
							| 
									
										
										
										
											2008-06-11 16:44:04 +00:00
										 |  |  |         return slot.Process( | 
					
						
							|  |  |  |             group=group, target=target, name=name, args=args, kwargs=kwargs | 
					
						
							|  |  |  |             ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def Pool(self, processes=None, initializer=None, initargs=()): | 
					
						
							|  |  |  |         return DistributedPool(self, processes, initializer, initargs) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def __getitem__(self, i): | 
					
						
							|  |  |  |         return self._slotlist[i] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def __len__(self): | 
					
						
							|  |  |  |         return len(self._slotlist) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def __iter__(self): | 
					
						
							|  |  |  |         return iter(self._slotlist) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | # Queue subclass used by distributed pool | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2008-08-19 19:17:39 +00:00
										 |  |  | class SettableQueue(queue.Queue): | 
					
						
							| 
									
										
										
										
											2008-06-11 16:44:04 +00:00
										 |  |  |     def empty(self): | 
					
						
							|  |  |  |         return not self.queue | 
					
						
							|  |  |  |     def full(self): | 
					
						
							|  |  |  |         return self.maxsize > 0 and len(self.queue) == self.maxsize | 
					
						
							|  |  |  |     def set_contents(self, contents): | 
					
						
							|  |  |  |         # length of contents must be at least as large as the number of | 
					
						
							|  |  |  |         # threads which have potentially called get() | 
					
						
							|  |  |  |         self.not_empty.acquire() | 
					
						
							|  |  |  |         try: | 
					
						
							|  |  |  |             self.queue.clear() | 
					
						
							|  |  |  |             self.queue.extend(contents) | 
					
						
							| 
									
										
										
										
											2008-08-19 19:17:39 +00:00
										 |  |  |             self.not_empty.notifyAll() | 
					
						
							| 
									
										
										
										
											2008-06-11 16:44:04 +00:00
										 |  |  |         finally: | 
					
						
							|  |  |  |             self.not_empty.release() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | Cluster.register('_SettableQueue', SettableQueue) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | # Class representing a notional cpu in the cluster | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class Slot(object): | 
					
						
							|  |  |  |     def __init__(self, host): | 
					
						
							|  |  |  |         self.host = host | 
					
						
							|  |  |  |         self.Process = host.Process | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | # Host | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class Host(object): | 
					
						
							|  |  |  |     '''
 | 
					
						
							|  |  |  |     Represents a host to use as a node in a cluster. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     `hostname` gives the name of the host.  If hostname is not | 
					
						
							|  |  |  |     "localhost" then ssh is used to log in to the host.  To log in as | 
					
						
							|  |  |  |     a different user use a host name of the form | 
					
						
							|  |  |  |     "username@somewhere.org" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     `slots` is used to specify the number of slots for processes on | 
					
						
							|  |  |  |     the host.  This affects how often processes will be allocated to | 
					
						
							|  |  |  |     this host.  Normally this should be equal to the number of cpus on | 
					
						
							|  |  |  |     that host. | 
					
						
							|  |  |  |     '''
 | 
					
						
							|  |  |  |     def __init__(self, hostname, slots=None): | 
					
						
							|  |  |  |         self.hostname = hostname | 
					
						
							|  |  |  |         self.slots = slots | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def _start_manager(self, index, authkey, address, files): | 
					
						
							|  |  |  |         if self.hostname != 'localhost': | 
					
						
							|  |  |  |             tempdir = copy_to_remote_temporary_directory(self.hostname, files) | 
					
						
							|  |  |  |             debug('startup files copied to %s:%s', self.hostname, tempdir) | 
					
						
							|  |  |  |             p = subprocess.Popen( | 
					
						
							|  |  |  |                 ['ssh', self.hostname, 'python', '-c', | 
					
						
							|  |  |  |                  '"import os; os.chdir(%r); ' | 
					
						
							|  |  |  |                  'from distributing import main; main()"' % tempdir], | 
					
						
							|  |  |  |                 stdin=subprocess.PIPE | 
					
						
							|  |  |  |                 ) | 
					
						
							|  |  |  |             data = dict( | 
					
						
							|  |  |  |                 name='BoostrappingHost', index=index, | 
					
						
							|  |  |  |                 dist_log_level=_logger.getEffectiveLevel(), | 
					
						
							|  |  |  |                 dir=tempdir, authkey=str(authkey), parent_address=address | 
					
						
							|  |  |  |                 ) | 
					
						
							|  |  |  |             pickle.dump(data, p.stdin, pickle.HIGHEST_PROTOCOL) | 
					
						
							|  |  |  |             p.stdin.close() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | # Copy files to remote directory, returning name of directory | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | unzip_code = '''"
 | 
					
						
							|  |  |  | import tempfile, os, sys, tarfile | 
					
						
							|  |  |  | tempdir = tempfile.mkdtemp(prefix='distrib-') | 
					
						
							|  |  |  | os.chdir(tempdir) | 
					
						
							|  |  |  | tf = tarfile.open(fileobj=sys.stdin, mode='r|gz') | 
					
						
							|  |  |  | for ti in tf: | 
					
						
							|  |  |  |     tf.extract(ti) | 
					
						
							|  |  |  | print tempdir | 
					
						
							|  |  |  | "''' | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def copy_to_remote_temporary_directory(host, files): | 
					
						
							|  |  |  |     p = subprocess.Popen( | 
					
						
							|  |  |  |         ['ssh', host, 'python', '-c', unzip_code], | 
					
						
							|  |  |  |         stdout=subprocess.PIPE, stdin=subprocess.PIPE | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |     tf = tarfile.open(fileobj=p.stdin, mode='w|gz') | 
					
						
							|  |  |  |     for name in files: | 
					
						
							|  |  |  |         tf.add(name, os.path.basename(name)) | 
					
						
							|  |  |  |     tf.close() | 
					
						
							|  |  |  |     p.stdin.close() | 
					
						
							|  |  |  |     return p.stdout.read().rstrip() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | # Code which runs a host manager | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def main(): | 
					
						
							|  |  |  |     # get data from parent over stdin | 
					
						
							|  |  |  |     data = pickle.load(sys.stdin) | 
					
						
							|  |  |  |     sys.stdin.close() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     # set some stuff | 
					
						
							|  |  |  |     _logger.setLevel(data['dist_log_level']) | 
					
						
							|  |  |  |     forking.prepare(data) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     # create server for a `HostManager` object | 
					
						
							|  |  |  |     server = managers.Server(HostManager._registry, ('', 0), data['authkey']) | 
					
						
							|  |  |  |     current_process()._server = server | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     # report server address and number of cpus back to parent | 
					
						
							|  |  |  |     conn = connection.Client(data['parent_address'], authkey=data['authkey']) | 
					
						
							|  |  |  |     conn.send((data['index'], server.address, slot_count)) | 
					
						
							|  |  |  |     conn.close() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     # set name etc | 
					
						
							|  |  |  |     current_process().set_name('Host-%s:%s' % server.address) | 
					
						
							|  |  |  |     util._run_after_forkers() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     # register a cleanup function | 
					
						
							|  |  |  |     def cleanup(directory): | 
					
						
							|  |  |  |         debug('removing directory %s', directory) | 
					
						
							|  |  |  |         shutil.rmtree(directory) | 
					
						
							|  |  |  |         debug('shutting down host manager') | 
					
						
							|  |  |  |     util.Finalize(None, cleanup, args=[data['dir']], exitpriority=0) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     # start host manager | 
					
						
							|  |  |  |     debug('remote host manager starting in %s', data['dir']) | 
					
						
							|  |  |  |     server.serve_forever() |