mirror of
				https://github.com/python/cpython.git
				synced 2025-11-04 07:31:38 +00:00 
			
		
		
		
	svn+ssh://pythondev@svn.python.org/python/trunk ........ r71814 | georg.brandl | 2009-04-23 10:44:57 +0200 (Do, 23 Apr 2009) | 1 line #5820: fix bug in usage of getreader(). ........ r71815 | georg.brandl | 2009-04-23 10:49:39 +0200 (Do, 23 Apr 2009) | 1 line Fix rewrapping accident. ........ r71816 | georg.brandl | 2009-04-23 10:49:56 +0200 (Do, 23 Apr 2009) | 1 line #5813: add a reference to the "future statements" section. ........ r71817 | georg.brandl | 2009-04-23 10:52:03 +0200 (Do, 23 Apr 2009) | 1 line Add link to PEP 236. ........ r71901 | georg.brandl | 2009-04-25 16:50:25 +0200 (Sa, 25 Apr 2009) | 1 line #3320: fix spelling. ........ r71902 | georg.brandl | 2009-04-25 16:51:31 +0200 (Sa, 25 Apr 2009) | 1 line #5834: use "failure" instead of "error" because the two have different meanings in unittest context. ........ r71903 | georg.brandl | 2009-04-25 17:05:04 +0200 (Sa, 25 Apr 2009) | 1 line #5821: add some capabilities of TarFile's file-like object. ........
		
			
				
	
	
		
			364 lines
		
	
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			364 lines
		
	
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
#
 | 
						|
# Module to allow spawning of processes on foreign host
 | 
						|
#
 | 
						|
# Depends on `multiprocessing` package -- tested with `processing-0.60`
 | 
						|
#
 | 
						|
# Copyright (c) 2006-2008, R Oudkerk
 | 
						|
# All rights reserved.
 | 
						|
#
 | 
						|
 | 
						|
__all__ = ['Cluster', 'Host', 'get_logger', 'current_process']
 | 
						|
 | 
						|
#
 | 
						|
# Imports
 | 
						|
#
 | 
						|
 | 
						|
import sys
 | 
						|
import os
 | 
						|
import tarfile
 | 
						|
import shutil
 | 
						|
import subprocess
 | 
						|
import logging
 | 
						|
import itertools
 | 
						|
import queue
 | 
						|
 | 
						|
try:
 | 
						|
    import pickle as pickle
 | 
						|
except ImportError:
 | 
						|
    import pickle
 | 
						|
 | 
						|
from multiprocessing import Process, current_process, cpu_count
 | 
						|
from multiprocessing import util, managers, connection, forking, pool
 | 
						|
 | 
						|
#
 | 
						|
# Logging
 | 
						|
#
 | 
						|
 | 
						|
def get_logger():
 | 
						|
    return _logger
 | 
						|
 | 
						|
_logger = logging.getLogger('distributing')
 | 
						|
_logger.propagate = 0
 | 
						|
 | 
						|
_formatter = logging.Formatter(util.DEFAULT_LOGGING_FORMAT)
 | 
						|
_handler = logging.StreamHandler()
 | 
						|
_handler.setFormatter(_formatter)
 | 
						|
_logger.addHandler(_handler)
 | 
						|
 | 
						|
info = _logger.info
 | 
						|
debug = _logger.debug
 | 
						|
 | 
						|
#
 | 
						|
# Get number of cpus
 | 
						|
#
 | 
						|
 | 
						|
try:
 | 
						|
    slot_count = cpu_count()
 | 
						|
except NotImplemented:
 | 
						|
    slot_count = 1
 | 
						|
 | 
						|
#
 | 
						|
# Manager type which spawns subprocesses
 | 
						|
#
 | 
						|
 | 
						|
class HostManager(managers.SyncManager):
 | 
						|
    '''
 | 
						|
    Manager type used for spawning processes on a (presumably) foreign host
 | 
						|
    '''
 | 
						|
    def __init__(self, address, authkey):
 | 
						|
        managers.SyncManager.__init__(self, address, authkey)
 | 
						|
        self._name = 'Host-unknown'
 | 
						|
 | 
						|
    def Process(self, group=None, target=None, name=None, args=(), kwargs={}):
 | 
						|
        if hasattr(sys.modules['__main__'], '__file__'):
 | 
						|
            main_path = os.path.basename(sys.modules['__main__'].__file__)
 | 
						|
        else:
 | 
						|
            main_path = None
 | 
						|
        data = pickle.dumps((target, args, kwargs))
 | 
						|
        p = self._RemoteProcess(data, main_path)
 | 
						|
        if name is None:
 | 
						|
            temp = self._name.split('Host-')[-1] + '/Process-%s'
 | 
						|
            name = temp % ':'.join(map(str, p.get_identity()))
 | 
						|
        p.set_name(name)
 | 
						|
        return p
 | 
						|
 | 
						|
    @classmethod
 | 
						|
    def from_address(cls, address, authkey):
 | 
						|
        manager = cls(address, authkey)
 | 
						|
        managers.transact(address, authkey, 'dummy')
 | 
						|
        manager._state.value = managers.State.STARTED
 | 
						|
        manager._name = 'Host-%s:%s' % manager.address
 | 
						|
        manager.shutdown = util.Finalize(
 | 
						|
            manager, HostManager._finalize_host,
 | 
						|
            args=(manager._address, manager._authkey, manager._name),
 | 
						|
            exitpriority=-10
 | 
						|
            )
 | 
						|
        return manager
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    def _finalize_host(address, authkey, name):
 | 
						|
        managers.transact(address, authkey, 'shutdown')
 | 
						|
 | 
						|
    def __repr__(self):
 | 
						|
        return '<Host(%s)>' % self._name
 | 
						|
 | 
						|
#
 | 
						|
# Process subclass representing a process on (possibly) a remote machine
 | 
						|
#
 | 
						|
 | 
						|
class RemoteProcess(Process):
 | 
						|
    '''
 | 
						|
    Represents a process started on a remote host
 | 
						|
    '''
 | 
						|
    def __init__(self, data, main_path):
 | 
						|
        assert not main_path or os.path.basename(main_path) == main_path
 | 
						|
        Process.__init__(self)
 | 
						|
        self._data = data
 | 
						|
        self._main_path = main_path
 | 
						|
 | 
						|
    def _bootstrap(self):
 | 
						|
        forking.prepare({'main_path': self._main_path})
 | 
						|
        self._target, self._args, self._kwargs = pickle.loads(self._data)
 | 
						|
        return Process._bootstrap(self)
 | 
						|
 | 
						|
    def get_identity(self):
 | 
						|
        return self._identity
 | 
						|
 | 
						|
HostManager.register('_RemoteProcess', RemoteProcess)
 | 
						|
 | 
						|
#
 | 
						|
# A Pool class that uses a cluster
 | 
						|
#
 | 
						|
 | 
						|
class DistributedPool(pool.Pool):
 | 
						|
 | 
						|
    def __init__(self, cluster, processes=None, initializer=None, initargs=()):
 | 
						|
        self._cluster = cluster
 | 
						|
        self.Process = cluster.Process
 | 
						|
        pool.Pool.__init__(self, processes or len(cluster),
 | 
						|
                           initializer, initargs)
 | 
						|
 | 
						|
    def _setup_queues(self):
 | 
						|
        self._inqueue = self._cluster._SettableQueue()
 | 
						|
        self._outqueue = self._cluster._SettableQueue()
 | 
						|
        self._quick_put = self._inqueue.put
 | 
						|
        self._quick_get = self._outqueue.get
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    def _help_stuff_finish(inqueue, task_handler, size):
 | 
						|
        inqueue.set_contents([None] * size)
 | 
						|
 | 
						|
#
 | 
						|
# Manager type which starts host managers on other machines
 | 
						|
#
 | 
						|
 | 
						|
def LocalProcess(**kwds):
 | 
						|
    p = Process(**kwds)
 | 
						|
    p.set_name('localhost/' + p.name)
 | 
						|
    return p
 | 
						|
 | 
						|
class Cluster(managers.SyncManager):
 | 
						|
    '''
 | 
						|
    Represents collection of slots running on various hosts.
 | 
						|
 | 
						|
    `Cluster` is a subclass of `SyncManager` so it allows creation of
 | 
						|
    various types of shared objects.
 | 
						|
    '''
 | 
						|
    def __init__(self, hostlist, modules):
 | 
						|
        managers.SyncManager.__init__(self, address=('localhost', 0))
 | 
						|
        self._hostlist = hostlist
 | 
						|
        self._modules = modules
 | 
						|
        if __name__ not in modules:
 | 
						|
            modules.append(__name__)
 | 
						|
        files = [sys.modules[name].__file__ for name in modules]
 | 
						|
        for i, file in enumerate(files):
 | 
						|
            if file.endswith('.pyc') or file.endswith('.pyo'):
 | 
						|
                files[i] = file[:-4] + '.py'
 | 
						|
        self._files = [os.path.abspath(file) for file in files]
 | 
						|
 | 
						|
    def start(self):
 | 
						|
        managers.SyncManager.start(self)
 | 
						|
 | 
						|
        l = connection.Listener(family='AF_INET', authkey=self._authkey)
 | 
						|
 | 
						|
        for i, host in enumerate(self._hostlist):
 | 
						|
            host._start_manager(i, self._authkey, l.address, self._files)
 | 
						|
 | 
						|
        for host in self._hostlist:
 | 
						|
            if host.hostname != 'localhost':
 | 
						|
                conn = l.accept()
 | 
						|
                i, address, cpus = conn.recv()
 | 
						|
                conn.close()
 | 
						|
                other_host = self._hostlist[i]
 | 
						|
                other_host.manager = HostManager.from_address(address,
 | 
						|
                                                              self._authkey)
 | 
						|
                other_host.slots = other_host.slots or cpus
 | 
						|
                other_host.Process = other_host.manager.Process
 | 
						|
            else:
 | 
						|
                host.slots = host.slots or slot_count
 | 
						|
                host.Process = LocalProcess
 | 
						|
 | 
						|
        self._slotlist = [
 | 
						|
            Slot(host) for host in self._hostlist for i in range(host.slots)
 | 
						|
            ]
 | 
						|
        self._slot_iterator = itertools.cycle(self._slotlist)
 | 
						|
        self._base_shutdown = self.shutdown
 | 
						|
        del self.shutdown
 | 
						|
 | 
						|
    def shutdown(self):
 | 
						|
        for host in self._hostlist:
 | 
						|
            if host.hostname != 'localhost':
 | 
						|
                host.manager.shutdown()
 | 
						|
        self._base_shutdown()
 | 
						|
 | 
						|
    def Process(self, group=None, target=None, name=None, args=(), kwargs={}):
 | 
						|
        slot = next(self._slot_iterator)
 | 
						|
        return slot.Process(
 | 
						|
            group=group, target=target, name=name, args=args, kwargs=kwargs
 | 
						|
            )
 | 
						|
 | 
						|
    def Pool(self, processes=None, initializer=None, initargs=()):
 | 
						|
        return DistributedPool(self, processes, initializer, initargs)
 | 
						|
 | 
						|
    def __getitem__(self, i):
 | 
						|
        return self._slotlist[i]
 | 
						|
 | 
						|
    def __len__(self):
 | 
						|
        return len(self._slotlist)
 | 
						|
 | 
						|
    def __iter__(self):
 | 
						|
        return iter(self._slotlist)
 | 
						|
 | 
						|
#
 | 
						|
# Queue subclass used by distributed pool
 | 
						|
#
 | 
						|
 | 
						|
class SettableQueue(queue.Queue):
 | 
						|
    def empty(self):
 | 
						|
        return not self.queue
 | 
						|
    def full(self):
 | 
						|
        return self.maxsize > 0 and len(self.queue) == self.maxsize
 | 
						|
    def set_contents(self, contents):
 | 
						|
        # length of contents must be at least as large as the number of
 | 
						|
        # threads which have potentially called get()
 | 
						|
        self.not_empty.acquire()
 | 
						|
        try:
 | 
						|
            self.queue.clear()
 | 
						|
            self.queue.extend(contents)
 | 
						|
            self.not_empty.notifyAll()
 | 
						|
        finally:
 | 
						|
            self.not_empty.release()
 | 
						|
 | 
						|
Cluster.register('_SettableQueue', SettableQueue)
 | 
						|
 | 
						|
#
 | 
						|
# Class representing a notional cpu in the cluster
 | 
						|
#
 | 
						|
 | 
						|
class Slot(object):
 | 
						|
    def __init__(self, host):
 | 
						|
        self.host = host
 | 
						|
        self.Process = host.Process
 | 
						|
 | 
						|
#
 | 
						|
# Host
 | 
						|
#
 | 
						|
 | 
						|
class Host(object):
 | 
						|
    '''
 | 
						|
    Represents a host to use as a node in a cluster.
 | 
						|
 | 
						|
    `hostname` gives the name of the host.  If hostname is not
 | 
						|
    "localhost" then ssh is used to log in to the host.  To log in as
 | 
						|
    a different user use a host name of the form
 | 
						|
    "username@somewhere.org"
 | 
						|
 | 
						|
    `slots` is used to specify the number of slots for processes on
 | 
						|
    the host.  This affects how often processes will be allocated to
 | 
						|
    this host.  Normally this should be equal to the number of cpus on
 | 
						|
    that host.
 | 
						|
    '''
 | 
						|
    def __init__(self, hostname, slots=None):
 | 
						|
        self.hostname = hostname
 | 
						|
        self.slots = slots
 | 
						|
 | 
						|
    def _start_manager(self, index, authkey, address, files):
 | 
						|
        if self.hostname != 'localhost':
 | 
						|
            tempdir = copy_to_remote_temporary_directory(self.hostname, files)
 | 
						|
            debug('startup files copied to %s:%s', self.hostname, tempdir)
 | 
						|
            p = subprocess.Popen(
 | 
						|
                ['ssh', self.hostname, 'python', '-c',
 | 
						|
                 '"import os; os.chdir(%r); '
 | 
						|
                 'from distributing import main; main()"' % tempdir],
 | 
						|
                stdin=subprocess.PIPE
 | 
						|
                )
 | 
						|
            data = dict(
 | 
						|
                name='BoostrappingHost', index=index,
 | 
						|
                dist_log_level=_logger.getEffectiveLevel(),
 | 
						|
                dir=tempdir, authkey=str(authkey), parent_address=address
 | 
						|
                )
 | 
						|
            pickle.dump(data, p.stdin, pickle.HIGHEST_PROTOCOL)
 | 
						|
            p.stdin.close()
 | 
						|
 | 
						|
#
 | 
						|
# Copy files to remote directory, returning name of directory
 | 
						|
#
 | 
						|
 | 
						|
unzip_code = '''"
 | 
						|
import tempfile, os, sys, tarfile
 | 
						|
tempdir = tempfile.mkdtemp(prefix='distrib-')
 | 
						|
os.chdir(tempdir)
 | 
						|
tf = tarfile.open(fileobj=sys.stdin, mode='r|gz')
 | 
						|
for ti in tf:
 | 
						|
    tf.extract(ti)
 | 
						|
print tempdir
 | 
						|
"'''
 | 
						|
 | 
						|
def copy_to_remote_temporary_directory(host, files):
 | 
						|
    p = subprocess.Popen(
 | 
						|
        ['ssh', host, 'python', '-c', unzip_code],
 | 
						|
        stdout=subprocess.PIPE, stdin=subprocess.PIPE
 | 
						|
        )
 | 
						|
    tf = tarfile.open(fileobj=p.stdin, mode='w|gz')
 | 
						|
    for name in files:
 | 
						|
        tf.add(name, os.path.basename(name))
 | 
						|
    tf.close()
 | 
						|
    p.stdin.close()
 | 
						|
    return p.stdout.read().rstrip()
 | 
						|
 | 
						|
#
 | 
						|
# Code which runs a host manager
 | 
						|
#
 | 
						|
 | 
						|
def main():
 | 
						|
    # get data from parent over stdin
 | 
						|
    data = pickle.load(sys.stdin)
 | 
						|
    sys.stdin.close()
 | 
						|
 | 
						|
    # set some stuff
 | 
						|
    _logger.setLevel(data['dist_log_level'])
 | 
						|
    forking.prepare(data)
 | 
						|
 | 
						|
    # create server for a `HostManager` object
 | 
						|
    server = managers.Server(HostManager._registry, ('', 0), data['authkey'])
 | 
						|
    current_process()._server = server
 | 
						|
 | 
						|
    # report server address and number of cpus back to parent
 | 
						|
    conn = connection.Client(data['parent_address'], authkey=data['authkey'])
 | 
						|
    conn.send((data['index'], server.address, slot_count))
 | 
						|
    conn.close()
 | 
						|
 | 
						|
    # set name etc
 | 
						|
    current_process().set_name('Host-%s:%s' % server.address)
 | 
						|
    util._run_after_forkers()
 | 
						|
 | 
						|
    # register a cleanup function
 | 
						|
    def cleanup(directory):
 | 
						|
        debug('removing directory %s', directory)
 | 
						|
        shutil.rmtree(directory)
 | 
						|
        debug('shutting down host manager')
 | 
						|
    util.Finalize(None, cleanup, args=[data['dir']], exitpriority=0)
 | 
						|
 | 
						|
    # start host manager
 | 
						|
    debug('remote host manager starting in %s', data['dir'])
 | 
						|
    server.serve_forever()
 |