| 
									
										
										
										
											2008-06-13 19:20:48 +00:00
										 |  |  | # | 
					
						
							|  |  |  | # Module providing the `Pool` class for managing a process pool | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | # multiprocessing/pool.py | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | # Copyright (c) 2007-2008, R Oudkerk --- see COPYING.txt | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | __all__ = ['Pool'] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | # Imports | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import threading | 
					
						
							|  |  |  | import Queue | 
					
						
							|  |  |  | import itertools | 
					
						
							|  |  |  | import collections | 
					
						
							|  |  |  | import time | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | from multiprocessing import Process, cpu_count, TimeoutError | 
					
						
							|  |  |  | from multiprocessing.util import Finalize, debug | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | # Constants representing the state of a pool | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | RUN = 0 | 
					
						
							|  |  |  | CLOSE = 1 | 
					
						
							|  |  |  | TERMINATE = 2 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | # Miscellaneous | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | job_counter = itertools.count() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def mapstar(args): | 
					
						
							|  |  |  |     return map(*args) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | # Code run by worker processes | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-01-27 03:05:57 +00:00
										 |  |  | def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None): | 
					
						
							|  |  |  |     assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0) | 
					
						
							| 
									
										
										
										
											2008-06-13 19:20:48 +00:00
										 |  |  |     put = outqueue.put | 
					
						
							|  |  |  |     get = inqueue.get | 
					
						
							|  |  |  |     if hasattr(inqueue, '_writer'): | 
					
						
							|  |  |  |         inqueue._writer.close() | 
					
						
							|  |  |  |         outqueue._reader.close() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     if initializer is not None: | 
					
						
							|  |  |  |         initializer(*initargs) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-01-27 03:05:57 +00:00
										 |  |  |     completed = 0 | 
					
						
							|  |  |  |     while maxtasks is None or (maxtasks and completed < maxtasks): | 
					
						
							| 
									
										
										
										
											2008-06-13 19:20:48 +00:00
										 |  |  |         try: | 
					
						
							|  |  |  |             task = get() | 
					
						
							|  |  |  |         except (EOFError, IOError): | 
					
						
							|  |  |  |             debug('worker got EOFError or IOError -- exiting') | 
					
						
							|  |  |  |             break | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         if task is None: | 
					
						
							|  |  |  |             debug('worker got sentinel -- exiting') | 
					
						
							|  |  |  |             break | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         job, i, func, args, kwds = task | 
					
						
							|  |  |  |         try: | 
					
						
							|  |  |  |             result = (True, func(*args, **kwds)) | 
					
						
							|  |  |  |         except Exception, e: | 
					
						
							|  |  |  |             result = (False, e) | 
					
						
							|  |  |  |         put((job, i, result)) | 
					
						
							| 
									
										
										
										
											2010-01-27 03:05:57 +00:00
										 |  |  |         completed += 1 | 
					
						
							|  |  |  |     debug('worker exiting after %d tasks' % completed) | 
					
						
							| 
									
										
										
										
											2008-06-13 19:20:48 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | # Class representing a process pool | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class Pool(object): | 
					
						
							|  |  |  |     '''
 | 
					
						
							|  |  |  |     Class which supports an async version of the `apply()` builtin | 
					
						
							|  |  |  |     '''
 | 
					
						
							|  |  |  |     Process = Process | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-01-27 03:05:57 +00:00
										 |  |  |     def __init__(self, processes=None, initializer=None, initargs=(), | 
					
						
							|  |  |  |                  maxtasksperchild=None): | 
					
						
							| 
									
										
										
										
											2008-06-13 19:20:48 +00:00
										 |  |  |         self._setup_queues() | 
					
						
							|  |  |  |         self._taskqueue = Queue.Queue() | 
					
						
							|  |  |  |         self._cache = {} | 
					
						
							|  |  |  |         self._state = RUN | 
					
						
							| 
									
										
										
										
											2010-01-27 03:05:57 +00:00
										 |  |  |         self._maxtasksperchild = maxtasksperchild | 
					
						
							|  |  |  |         self._initializer = initializer | 
					
						
							|  |  |  |         self._initargs = initargs | 
					
						
							| 
									
										
										
										
											2008-06-13 19:20:48 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  |         if processes is None: | 
					
						
							|  |  |  |             try: | 
					
						
							|  |  |  |                 processes = cpu_count() | 
					
						
							|  |  |  |             except NotImplementedError: | 
					
						
							|  |  |  |                 processes = 1 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2009-04-02 05:17:26 +00:00
										 |  |  |         if initializer is not None and not hasattr(initializer, '__call__'): | 
					
						
							|  |  |  |             raise TypeError('initializer must be a callable') | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-01-27 03:05:57 +00:00
										 |  |  |         self._processes = processes | 
					
						
							| 
									
										
										
										
											2008-06-13 19:20:48 +00:00
										 |  |  |         self._pool = [] | 
					
						
							| 
									
										
										
										
											2010-01-27 03:05:57 +00:00
										 |  |  |         self._repopulate_pool() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self._worker_handler = threading.Thread( | 
					
						
							|  |  |  |             target=Pool._handle_workers, | 
					
						
							|  |  |  |             args=(self, ) | 
					
						
							|  |  |  |             ) | 
					
						
							|  |  |  |         self._worker_handler.daemon = True | 
					
						
							|  |  |  |         self._worker_handler._state = RUN | 
					
						
							|  |  |  |         self._worker_handler.start() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2008-06-13 19:20:48 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  |         self._task_handler = threading.Thread( | 
					
						
							|  |  |  |             target=Pool._handle_tasks, | 
					
						
							|  |  |  |             args=(self._taskqueue, self._quick_put, self._outqueue, self._pool) | 
					
						
							|  |  |  |             ) | 
					
						
							| 
									
										
										
										
											2008-08-18 18:31:58 +00:00
										 |  |  |         self._task_handler.daemon = True | 
					
						
							| 
									
										
										
										
											2008-06-13 19:20:48 +00:00
										 |  |  |         self._task_handler._state = RUN | 
					
						
							|  |  |  |         self._task_handler.start() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self._result_handler = threading.Thread( | 
					
						
							|  |  |  |             target=Pool._handle_results, | 
					
						
							|  |  |  |             args=(self._outqueue, self._quick_get, self._cache) | 
					
						
							|  |  |  |             ) | 
					
						
							| 
									
										
										
										
											2008-08-18 18:31:58 +00:00
										 |  |  |         self._result_handler.daemon = True | 
					
						
							| 
									
										
										
										
											2008-06-13 19:20:48 +00:00
										 |  |  |         self._result_handler._state = RUN | 
					
						
							|  |  |  |         self._result_handler.start() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self._terminate = Finalize( | 
					
						
							|  |  |  |             self, self._terminate_pool, | 
					
						
							|  |  |  |             args=(self._taskqueue, self._inqueue, self._outqueue, self._pool, | 
					
						
							| 
									
										
										
										
											2010-01-27 03:05:57 +00:00
										 |  |  |                   self._worker_handler, self._task_handler, | 
					
						
							|  |  |  |                   self._result_handler, self._cache), | 
					
						
							| 
									
										
										
										
											2008-06-13 19:20:48 +00:00
										 |  |  |             exitpriority=15 | 
					
						
							|  |  |  |             ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-01-27 03:05:57 +00:00
										 |  |  |     def _join_exited_workers(self): | 
					
						
							|  |  |  |         """Cleanup after any worker processes which have exited due to reaching
 | 
					
						
							|  |  |  |         their specified lifetime.  Returns True if any workers were cleaned up. | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         cleaned = False | 
					
						
							|  |  |  |         for i in reversed(range(len(self._pool))): | 
					
						
							|  |  |  |             worker = self._pool[i] | 
					
						
							|  |  |  |             if worker.exitcode is not None: | 
					
						
							|  |  |  |                 # worker exited | 
					
						
							|  |  |  |                 debug('cleaning up worker %d' % i) | 
					
						
							|  |  |  |                 worker.join() | 
					
						
							|  |  |  |                 cleaned = True | 
					
						
							|  |  |  |                 del self._pool[i] | 
					
						
							|  |  |  |         return cleaned | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def _repopulate_pool(self): | 
					
						
							|  |  |  |         """Bring the number of pool processes up to the specified number,
 | 
					
						
							|  |  |  |         for use after reaping workers which have exited. | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         for i in range(self._processes - len(self._pool)): | 
					
						
							|  |  |  |             w = self.Process(target=worker, | 
					
						
							|  |  |  |                              args=(self._inqueue, self._outqueue, | 
					
						
							|  |  |  |                                    self._initializer, | 
					
						
							|  |  |  |                                    self._initargs, self._maxtasksperchild) | 
					
						
							|  |  |  |                             ) | 
					
						
							|  |  |  |             self._pool.append(w) | 
					
						
							|  |  |  |             w.name = w.name.replace('Process', 'PoolWorker') | 
					
						
							|  |  |  |             w.daemon = True | 
					
						
							|  |  |  |             w.start() | 
					
						
							|  |  |  |             debug('added worker') | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def _maintain_pool(self): | 
					
						
							|  |  |  |         """Clean up any exited workers and start replacements for them.
 | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         if self._join_exited_workers(): | 
					
						
							|  |  |  |             self._repopulate_pool() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2008-06-13 19:20:48 +00:00
										 |  |  |     def _setup_queues(self): | 
					
						
							|  |  |  |         from .queues import SimpleQueue | 
					
						
							|  |  |  |         self._inqueue = SimpleQueue() | 
					
						
							|  |  |  |         self._outqueue = SimpleQueue() | 
					
						
							|  |  |  |         self._quick_put = self._inqueue._writer.send | 
					
						
							|  |  |  |         self._quick_get = self._outqueue._reader.recv | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def apply(self, func, args=(), kwds={}): | 
					
						
							|  |  |  |         '''
 | 
					
						
							|  |  |  |         Equivalent of `apply()` builtin | 
					
						
							|  |  |  |         '''
 | 
					
						
							|  |  |  |         assert self._state == RUN | 
					
						
							|  |  |  |         return self.apply_async(func, args, kwds).get() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def map(self, func, iterable, chunksize=None): | 
					
						
							|  |  |  |         '''
 | 
					
						
							|  |  |  |         Equivalent of `map()` builtin | 
					
						
							|  |  |  |         '''
 | 
					
						
							|  |  |  |         assert self._state == RUN | 
					
						
							|  |  |  |         return self.map_async(func, iterable, chunksize).get() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def imap(self, func, iterable, chunksize=1): | 
					
						
							|  |  |  |         '''
 | 
					
						
							| 
									
										
										
										
											2008-11-22 08:45:33 +00:00
										 |  |  |         Equivalent of `itertools.imap()` -- can be MUCH slower than `Pool.map()` | 
					
						
							| 
									
										
										
										
											2008-06-13 19:20:48 +00:00
										 |  |  |         '''
 | 
					
						
							|  |  |  |         assert self._state == RUN | 
					
						
							|  |  |  |         if chunksize == 1: | 
					
						
							|  |  |  |             result = IMapIterator(self._cache) | 
					
						
							|  |  |  |             self._taskqueue.put((((result._job, i, func, (x,), {}) | 
					
						
							|  |  |  |                          for i, x in enumerate(iterable)), result._set_length)) | 
					
						
							|  |  |  |             return result | 
					
						
							|  |  |  |         else: | 
					
						
							|  |  |  |             assert chunksize > 1 | 
					
						
							|  |  |  |             task_batches = Pool._get_tasks(func, iterable, chunksize) | 
					
						
							|  |  |  |             result = IMapIterator(self._cache) | 
					
						
							|  |  |  |             self._taskqueue.put((((result._job, i, mapstar, (x,), {}) | 
					
						
							|  |  |  |                      for i, x in enumerate(task_batches)), result._set_length)) | 
					
						
							|  |  |  |             return (item for chunk in result for item in chunk) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def imap_unordered(self, func, iterable, chunksize=1): | 
					
						
							|  |  |  |         '''
 | 
					
						
							|  |  |  |         Like `imap()` method but ordering of results is arbitrary | 
					
						
							|  |  |  |         '''
 | 
					
						
							|  |  |  |         assert self._state == RUN | 
					
						
							|  |  |  |         if chunksize == 1: | 
					
						
							|  |  |  |             result = IMapUnorderedIterator(self._cache) | 
					
						
							|  |  |  |             self._taskqueue.put((((result._job, i, func, (x,), {}) | 
					
						
							|  |  |  |                          for i, x in enumerate(iterable)), result._set_length)) | 
					
						
							|  |  |  |             return result | 
					
						
							|  |  |  |         else: | 
					
						
							|  |  |  |             assert chunksize > 1 | 
					
						
							|  |  |  |             task_batches = Pool._get_tasks(func, iterable, chunksize) | 
					
						
							|  |  |  |             result = IMapUnorderedIterator(self._cache) | 
					
						
							|  |  |  |             self._taskqueue.put((((result._job, i, mapstar, (x,), {}) | 
					
						
							|  |  |  |                      for i, x in enumerate(task_batches)), result._set_length)) | 
					
						
							|  |  |  |             return (item for chunk in result for item in chunk) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def apply_async(self, func, args=(), kwds={}, callback=None): | 
					
						
							|  |  |  |         '''
 | 
					
						
							|  |  |  |         Asynchronous equivalent of `apply()` builtin | 
					
						
							|  |  |  |         '''
 | 
					
						
							|  |  |  |         assert self._state == RUN | 
					
						
							|  |  |  |         result = ApplyResult(self._cache, callback) | 
					
						
							|  |  |  |         self._taskqueue.put(([(result._job, None, func, args, kwds)], None)) | 
					
						
							|  |  |  |         return result | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def map_async(self, func, iterable, chunksize=None, callback=None): | 
					
						
							|  |  |  |         '''
 | 
					
						
							|  |  |  |         Asynchronous equivalent of `map()` builtin | 
					
						
							|  |  |  |         '''
 | 
					
						
							|  |  |  |         assert self._state == RUN | 
					
						
							|  |  |  |         if not hasattr(iterable, '__len__'): | 
					
						
							|  |  |  |             iterable = list(iterable) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         if chunksize is None: | 
					
						
							|  |  |  |             chunksize, extra = divmod(len(iterable), len(self._pool) * 4) | 
					
						
							|  |  |  |             if extra: | 
					
						
							|  |  |  |                 chunksize += 1 | 
					
						
							| 
									
										
										
										
											2009-07-16 14:23:04 +00:00
										 |  |  |         if len(iterable) == 0: | 
					
						
							|  |  |  |             chunksize = 0 | 
					
						
							| 
									
										
										
										
											2008-06-13 19:20:48 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  |         task_batches = Pool._get_tasks(func, iterable, chunksize) | 
					
						
							|  |  |  |         result = MapResult(self._cache, chunksize, len(iterable), callback) | 
					
						
							|  |  |  |         self._taskqueue.put((((result._job, i, mapstar, (x,), {}) | 
					
						
							|  |  |  |                               for i, x in enumerate(task_batches)), None)) | 
					
						
							|  |  |  |         return result | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-01-27 03:05:57 +00:00
										 |  |  |     @staticmethod | 
					
						
							|  |  |  |     def _handle_workers(pool): | 
					
						
							|  |  |  |         while pool._worker_handler._state == RUN and pool._state == RUN: | 
					
						
							|  |  |  |             pool._maintain_pool() | 
					
						
							|  |  |  |             time.sleep(0.1) | 
					
						
							|  |  |  |         debug('worker handler exiting') | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2008-06-13 19:20:48 +00:00
										 |  |  |     @staticmethod | 
					
						
							|  |  |  |     def _handle_tasks(taskqueue, put, outqueue, pool): | 
					
						
							|  |  |  |         thread = threading.current_thread() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         for taskseq, set_length in iter(taskqueue.get, None): | 
					
						
							|  |  |  |             i = -1 | 
					
						
							|  |  |  |             for i, task in enumerate(taskseq): | 
					
						
							|  |  |  |                 if thread._state: | 
					
						
							|  |  |  |                     debug('task handler found thread._state != RUN') | 
					
						
							|  |  |  |                     break | 
					
						
							|  |  |  |                 try: | 
					
						
							|  |  |  |                     put(task) | 
					
						
							|  |  |  |                 except IOError: | 
					
						
							|  |  |  |                     debug('could not put task on queue') | 
					
						
							|  |  |  |                     break | 
					
						
							|  |  |  |             else: | 
					
						
							|  |  |  |                 if set_length: | 
					
						
							|  |  |  |                     debug('doing set_length()') | 
					
						
							|  |  |  |                     set_length(i+1) | 
					
						
							|  |  |  |                 continue | 
					
						
							|  |  |  |             break | 
					
						
							|  |  |  |         else: | 
					
						
							|  |  |  |             debug('task handler got sentinel') | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         try: | 
					
						
							|  |  |  |             # tell result handler to finish when cache is empty | 
					
						
							|  |  |  |             debug('task handler sending sentinel to result handler') | 
					
						
							|  |  |  |             outqueue.put(None) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             # tell workers there is no more work | 
					
						
							|  |  |  |             debug('task handler sending sentinel to workers') | 
					
						
							|  |  |  |             for p in pool: | 
					
						
							|  |  |  |                 put(None) | 
					
						
							|  |  |  |         except IOError: | 
					
						
							|  |  |  |             debug('task handler got IOError when sending sentinels') | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         debug('task handler exiting') | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     @staticmethod | 
					
						
							|  |  |  |     def _handle_results(outqueue, get, cache): | 
					
						
							|  |  |  |         thread = threading.current_thread() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         while 1: | 
					
						
							|  |  |  |             try: | 
					
						
							|  |  |  |                 task = get() | 
					
						
							|  |  |  |             except (IOError, EOFError): | 
					
						
							|  |  |  |                 debug('result handler got EOFError/IOError -- exiting') | 
					
						
							|  |  |  |                 return | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             if thread._state: | 
					
						
							|  |  |  |                 assert thread._state == TERMINATE | 
					
						
							|  |  |  |                 debug('result handler found thread._state=TERMINATE') | 
					
						
							|  |  |  |                 break | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             if task is None: | 
					
						
							|  |  |  |                 debug('result handler got sentinel') | 
					
						
							|  |  |  |                 break | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             job, i, obj = task | 
					
						
							|  |  |  |             try: | 
					
						
							|  |  |  |                 cache[job]._set(i, obj) | 
					
						
							|  |  |  |             except KeyError: | 
					
						
							|  |  |  |                 pass | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         while cache and thread._state != TERMINATE: | 
					
						
							|  |  |  |             try: | 
					
						
							|  |  |  |                 task = get() | 
					
						
							|  |  |  |             except (IOError, EOFError): | 
					
						
							|  |  |  |                 debug('result handler got EOFError/IOError -- exiting') | 
					
						
							|  |  |  |                 return | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             if task is None: | 
					
						
							|  |  |  |                 debug('result handler ignoring extra sentinel') | 
					
						
							|  |  |  |                 continue | 
					
						
							|  |  |  |             job, i, obj = task | 
					
						
							|  |  |  |             try: | 
					
						
							|  |  |  |                 cache[job]._set(i, obj) | 
					
						
							|  |  |  |             except KeyError: | 
					
						
							|  |  |  |                 pass | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         if hasattr(outqueue, '_reader'): | 
					
						
							|  |  |  |             debug('ensuring that outqueue is not full') | 
					
						
							|  |  |  |             # If we don't make room available in outqueue then | 
					
						
							|  |  |  |             # attempts to add the sentinel (None) to outqueue may | 
					
						
							|  |  |  |             # block.  There is guaranteed to be no more than 2 sentinels. | 
					
						
							|  |  |  |             try: | 
					
						
							|  |  |  |                 for i in range(10): | 
					
						
							|  |  |  |                     if not outqueue._reader.poll(): | 
					
						
							|  |  |  |                         break | 
					
						
							|  |  |  |                     get() | 
					
						
							|  |  |  |             except (IOError, EOFError): | 
					
						
							|  |  |  |                 pass | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         debug('result handler exiting: len(cache)=%s, thread._state=%s', | 
					
						
							|  |  |  |               len(cache), thread._state) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     @staticmethod | 
					
						
							|  |  |  |     def _get_tasks(func, it, size): | 
					
						
							|  |  |  |         it = iter(it) | 
					
						
							|  |  |  |         while 1: | 
					
						
							|  |  |  |             x = tuple(itertools.islice(it, size)) | 
					
						
							|  |  |  |             if not x: | 
					
						
							|  |  |  |                 return | 
					
						
							|  |  |  |             yield (func, x) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def __reduce__(self): | 
					
						
							|  |  |  |         raise NotImplementedError( | 
					
						
							|  |  |  |               'pool objects cannot be passed between processes or pickled' | 
					
						
							|  |  |  |               ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def close(self): | 
					
						
							|  |  |  |         debug('closing pool') | 
					
						
							|  |  |  |         if self._state == RUN: | 
					
						
							|  |  |  |             self._state = CLOSE | 
					
						
							| 
									
										
										
										
											2010-01-27 03:05:57 +00:00
										 |  |  |             self._worker_handler._state = CLOSE | 
					
						
							| 
									
										
										
										
											2008-06-13 19:20:48 +00:00
										 |  |  |             self._taskqueue.put(None) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def terminate(self): | 
					
						
							|  |  |  |         debug('terminating pool') | 
					
						
							|  |  |  |         self._state = TERMINATE | 
					
						
							| 
									
										
										
										
											2010-01-27 03:05:57 +00:00
										 |  |  |         self._worker_handler._state = TERMINATE | 
					
						
							| 
									
										
										
										
											2008-06-13 19:20:48 +00:00
										 |  |  |         self._terminate() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def join(self): | 
					
						
							|  |  |  |         debug('joining pool') | 
					
						
							|  |  |  |         assert self._state in (CLOSE, TERMINATE) | 
					
						
							| 
									
										
										
										
											2010-01-27 03:05:57 +00:00
										 |  |  |         self._worker_handler.join() | 
					
						
							| 
									
										
										
										
											2008-06-13 19:20:48 +00:00
										 |  |  |         self._task_handler.join() | 
					
						
							|  |  |  |         self._result_handler.join() | 
					
						
							|  |  |  |         for p in self._pool: | 
					
						
							|  |  |  |             p.join() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     @staticmethod | 
					
						
							|  |  |  |     def _help_stuff_finish(inqueue, task_handler, size): | 
					
						
							|  |  |  |         # task_handler may be blocked trying to put items on inqueue | 
					
						
							|  |  |  |         debug('removing tasks from inqueue until task handler finished') | 
					
						
							|  |  |  |         inqueue._rlock.acquire() | 
					
						
							|  |  |  |         while task_handler.is_alive() and inqueue._reader.poll(): | 
					
						
							|  |  |  |             inqueue._reader.recv() | 
					
						
							|  |  |  |             time.sleep(0) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     @classmethod | 
					
						
							|  |  |  |     def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, | 
					
						
							| 
									
										
										
										
											2010-01-27 03:05:57 +00:00
										 |  |  |                         worker_handler, task_handler, result_handler, cache): | 
					
						
							| 
									
										
										
										
											2008-06-13 19:20:48 +00:00
										 |  |  |         # this is guaranteed to only be called once | 
					
						
							|  |  |  |         debug('finalizing pool') | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-01-27 03:05:57 +00:00
										 |  |  |         worker_handler._state = TERMINATE | 
					
						
							| 
									
										
										
										
											2008-06-13 19:20:48 +00:00
										 |  |  |         task_handler._state = TERMINATE | 
					
						
							|  |  |  |         taskqueue.put(None)                 # sentinel | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         debug('helping task handler/workers to finish') | 
					
						
							|  |  |  |         cls._help_stuff_finish(inqueue, task_handler, len(pool)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         assert result_handler.is_alive() or len(cache) == 0 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         result_handler._state = TERMINATE | 
					
						
							|  |  |  |         outqueue.put(None)                  # sentinel | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-01-27 03:05:57 +00:00
										 |  |  |         # Terminate workers which haven't already finished. | 
					
						
							| 
									
										
										
										
											2008-06-13 19:20:48 +00:00
										 |  |  |         if pool and hasattr(pool[0], 'terminate'): | 
					
						
							|  |  |  |             debug('terminating workers') | 
					
						
							|  |  |  |             for p in pool: | 
					
						
							| 
									
										
										
										
											2010-01-27 03:05:57 +00:00
										 |  |  |                 if p.exitcode is None: | 
					
						
							|  |  |  |                     p.terminate() | 
					
						
							| 
									
										
										
										
											2008-06-13 19:20:48 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  |         debug('joining task handler') | 
					
						
							|  |  |  |         task_handler.join(1e100) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         debug('joining result handler') | 
					
						
							|  |  |  |         result_handler.join(1e100) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         if pool and hasattr(pool[0], 'terminate'): | 
					
						
							|  |  |  |             debug('joining pool workers') | 
					
						
							|  |  |  |             for p in pool: | 
					
						
							|  |  |  |                 p.join() | 
					
						
							| 
									
										
										
										
											2010-01-27 03:05:57 +00:00
										 |  |  |             for w in pool: | 
					
						
							|  |  |  |                 if w.exitcode is None: | 
					
						
							|  |  |  |                     # worker has not yet exited | 
					
						
							| 
									
										
										
										
											2010-03-08 07:21:16 +00:00
										 |  |  |                     debug('cleaning up worker %s' % w.pid) | 
					
						
							| 
									
										
										
										
											2010-01-27 03:05:57 +00:00
										 |  |  |                     w.join() | 
					
						
							| 
									
										
										
										
											2008-06-13 19:20:48 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | # Class whose instances are returned by `Pool.apply_async()` | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class ApplyResult(object): | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def __init__(self, cache, callback): | 
					
						
							|  |  |  |         self._cond = threading.Condition(threading.Lock()) | 
					
						
							|  |  |  |         self._job = job_counter.next() | 
					
						
							|  |  |  |         self._cache = cache | 
					
						
							|  |  |  |         self._ready = False | 
					
						
							|  |  |  |         self._callback = callback | 
					
						
							|  |  |  |         cache[self._job] = self | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def ready(self): | 
					
						
							|  |  |  |         return self._ready | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def successful(self): | 
					
						
							|  |  |  |         assert self._ready | 
					
						
							|  |  |  |         return self._success | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def wait(self, timeout=None): | 
					
						
							|  |  |  |         self._cond.acquire() | 
					
						
							|  |  |  |         try: | 
					
						
							|  |  |  |             if not self._ready: | 
					
						
							|  |  |  |                 self._cond.wait(timeout) | 
					
						
							|  |  |  |         finally: | 
					
						
							|  |  |  |             self._cond.release() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def get(self, timeout=None): | 
					
						
							|  |  |  |         self.wait(timeout) | 
					
						
							|  |  |  |         if not self._ready: | 
					
						
							|  |  |  |             raise TimeoutError | 
					
						
							|  |  |  |         if self._success: | 
					
						
							|  |  |  |             return self._value | 
					
						
							|  |  |  |         else: | 
					
						
							|  |  |  |             raise self._value | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def _set(self, i, obj): | 
					
						
							|  |  |  |         self._success, self._value = obj | 
					
						
							|  |  |  |         if self._callback and self._success: | 
					
						
							|  |  |  |             self._callback(self._value) | 
					
						
							|  |  |  |         self._cond.acquire() | 
					
						
							|  |  |  |         try: | 
					
						
							|  |  |  |             self._ready = True | 
					
						
							|  |  |  |             self._cond.notify() | 
					
						
							|  |  |  |         finally: | 
					
						
							|  |  |  |             self._cond.release() | 
					
						
							|  |  |  |         del self._cache[self._job] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | # Class whose instances are returned by `Pool.map_async()` | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class MapResult(ApplyResult): | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def __init__(self, cache, chunksize, length, callback): | 
					
						
							|  |  |  |         ApplyResult.__init__(self, cache, callback) | 
					
						
							|  |  |  |         self._success = True | 
					
						
							|  |  |  |         self._value = [None] * length | 
					
						
							|  |  |  |         self._chunksize = chunksize | 
					
						
							|  |  |  |         if chunksize <= 0: | 
					
						
							|  |  |  |             self._number_left = 0 | 
					
						
							|  |  |  |             self._ready = True | 
					
						
							|  |  |  |         else: | 
					
						
							|  |  |  |             self._number_left = length//chunksize + bool(length % chunksize) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def _set(self, i, success_result): | 
					
						
							|  |  |  |         success, result = success_result | 
					
						
							|  |  |  |         if success: | 
					
						
							|  |  |  |             self._value[i*self._chunksize:(i+1)*self._chunksize] = result | 
					
						
							|  |  |  |             self._number_left -= 1 | 
					
						
							|  |  |  |             if self._number_left == 0: | 
					
						
							|  |  |  |                 if self._callback: | 
					
						
							|  |  |  |                     self._callback(self._value) | 
					
						
							|  |  |  |                 del self._cache[self._job] | 
					
						
							|  |  |  |                 self._cond.acquire() | 
					
						
							|  |  |  |                 try: | 
					
						
							|  |  |  |                     self._ready = True | 
					
						
							|  |  |  |                     self._cond.notify() | 
					
						
							|  |  |  |                 finally: | 
					
						
							|  |  |  |                     self._cond.release() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         else: | 
					
						
							|  |  |  |             self._success = False | 
					
						
							|  |  |  |             self._value = result | 
					
						
							|  |  |  |             del self._cache[self._job] | 
					
						
							|  |  |  |             self._cond.acquire() | 
					
						
							|  |  |  |             try: | 
					
						
							|  |  |  |                 self._ready = True | 
					
						
							|  |  |  |                 self._cond.notify() | 
					
						
							|  |  |  |             finally: | 
					
						
							|  |  |  |                 self._cond.release() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | # Class whose instances are returned by `Pool.imap()` | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class IMapIterator(object): | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def __init__(self, cache): | 
					
						
							|  |  |  |         self._cond = threading.Condition(threading.Lock()) | 
					
						
							|  |  |  |         self._job = job_counter.next() | 
					
						
							|  |  |  |         self._cache = cache | 
					
						
							|  |  |  |         self._items = collections.deque() | 
					
						
							|  |  |  |         self._index = 0 | 
					
						
							|  |  |  |         self._length = None | 
					
						
							|  |  |  |         self._unsorted = {} | 
					
						
							|  |  |  |         cache[self._job] = self | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def __iter__(self): | 
					
						
							|  |  |  |         return self | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def next(self, timeout=None): | 
					
						
							|  |  |  |         self._cond.acquire() | 
					
						
							|  |  |  |         try: | 
					
						
							|  |  |  |             try: | 
					
						
							|  |  |  |                 item = self._items.popleft() | 
					
						
							|  |  |  |             except IndexError: | 
					
						
							|  |  |  |                 if self._index == self._length: | 
					
						
							|  |  |  |                     raise StopIteration | 
					
						
							|  |  |  |                 self._cond.wait(timeout) | 
					
						
							|  |  |  |                 try: | 
					
						
							|  |  |  |                     item = self._items.popleft() | 
					
						
							|  |  |  |                 except IndexError: | 
					
						
							|  |  |  |                     if self._index == self._length: | 
					
						
							|  |  |  |                         raise StopIteration | 
					
						
							|  |  |  |                     raise TimeoutError | 
					
						
							|  |  |  |         finally: | 
					
						
							|  |  |  |             self._cond.release() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         success, value = item | 
					
						
							|  |  |  |         if success: | 
					
						
							|  |  |  |             return value | 
					
						
							|  |  |  |         raise value | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     __next__ = next                    # XXX | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def _set(self, i, obj): | 
					
						
							|  |  |  |         self._cond.acquire() | 
					
						
							|  |  |  |         try: | 
					
						
							|  |  |  |             if self._index == i: | 
					
						
							|  |  |  |                 self._items.append(obj) | 
					
						
							|  |  |  |                 self._index += 1 | 
					
						
							|  |  |  |                 while self._index in self._unsorted: | 
					
						
							|  |  |  |                     obj = self._unsorted.pop(self._index) | 
					
						
							|  |  |  |                     self._items.append(obj) | 
					
						
							|  |  |  |                     self._index += 1 | 
					
						
							|  |  |  |                 self._cond.notify() | 
					
						
							|  |  |  |             else: | 
					
						
							|  |  |  |                 self._unsorted[i] = obj | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             if self._index == self._length: | 
					
						
							|  |  |  |                 del self._cache[self._job] | 
					
						
							|  |  |  |         finally: | 
					
						
							|  |  |  |             self._cond.release() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def _set_length(self, length): | 
					
						
							|  |  |  |         self._cond.acquire() | 
					
						
							|  |  |  |         try: | 
					
						
							|  |  |  |             self._length = length | 
					
						
							|  |  |  |             if self._index == self._length: | 
					
						
							|  |  |  |                 self._cond.notify() | 
					
						
							|  |  |  |                 del self._cache[self._job] | 
					
						
							|  |  |  |         finally: | 
					
						
							|  |  |  |             self._cond.release() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | # Class whose instances are returned by `Pool.imap_unordered()` | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class IMapUnorderedIterator(IMapIterator): | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def _set(self, i, obj): | 
					
						
							|  |  |  |         self._cond.acquire() | 
					
						
							|  |  |  |         try: | 
					
						
							|  |  |  |             self._items.append(obj) | 
					
						
							|  |  |  |             self._index += 1 | 
					
						
							|  |  |  |             self._cond.notify() | 
					
						
							|  |  |  |             if self._index == self._length: | 
					
						
							|  |  |  |                 del self._cache[self._job] | 
					
						
							|  |  |  |         finally: | 
					
						
							|  |  |  |             self._cond.release() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class ThreadPool(Pool): | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     from .dummy import Process | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def __init__(self, processes=None, initializer=None, initargs=()): | 
					
						
							|  |  |  |         Pool.__init__(self, processes, initializer, initargs) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def _setup_queues(self): | 
					
						
							|  |  |  |         self._inqueue = Queue.Queue() | 
					
						
							|  |  |  |         self._outqueue = Queue.Queue() | 
					
						
							|  |  |  |         self._quick_put = self._inqueue.put | 
					
						
							|  |  |  |         self._quick_get = self._outqueue.get | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     @staticmethod | 
					
						
							|  |  |  |     def _help_stuff_finish(inqueue, task_handler, size): | 
					
						
							|  |  |  |         # put sentinels at head of inqueue to make workers finish | 
					
						
							|  |  |  |         inqueue.not_empty.acquire() | 
					
						
							|  |  |  |         try: | 
					
						
							|  |  |  |             inqueue.queue.clear() | 
					
						
							|  |  |  |             inqueue.queue.extend([None] * size) | 
					
						
							|  |  |  |             inqueue.not_empty.notify_all() | 
					
						
							|  |  |  |         finally: | 
					
						
							|  |  |  |             inqueue.not_empty.release() |