| 
									
										
										
										
											2010-09-18 22:35:02 +00:00
										 |  |  | # Copyright 2009 Brian Quinlan. All Rights Reserved. | 
					
						
							|  |  |  | # Licensed to PSF under a Contributor Agreement. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | __author__ = 'Brian Quinlan (brian@sweetapp.com)' | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import collections | 
					
						
							|  |  |  | import logging | 
					
						
							|  |  |  | import threading | 
					
						
							|  |  |  | import time | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | FIRST_COMPLETED = 'FIRST_COMPLETED' | 
					
						
							|  |  |  | FIRST_EXCEPTION = 'FIRST_EXCEPTION' | 
					
						
							|  |  |  | ALL_COMPLETED = 'ALL_COMPLETED' | 
					
						
							| 
									
										
										
										
											2010-11-17 11:06:29 +00:00
										 |  |  | _AS_COMPLETED = '_AS_COMPLETED' | 
					
						
							| 
									
										
										
										
											2010-09-18 22:35:02 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  | # Possible future states (for internal use by the futures package). | 
					
						
							|  |  |  | PENDING = 'PENDING' | 
					
						
							|  |  |  | RUNNING = 'RUNNING' | 
					
						
							|  |  |  | # The future was cancelled by the user... | 
					
						
							|  |  |  | CANCELLED = 'CANCELLED' | 
					
						
							|  |  |  | # ...and _Waiter.add_cancelled() was called by a worker. | 
					
						
							|  |  |  | CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED' | 
					
						
							|  |  |  | FINISHED = 'FINISHED' | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | _FUTURE_STATES = [ | 
					
						
							|  |  |  |     PENDING, | 
					
						
							|  |  |  |     RUNNING, | 
					
						
							|  |  |  |     CANCELLED, | 
					
						
							|  |  |  |     CANCELLED_AND_NOTIFIED, | 
					
						
							|  |  |  |     FINISHED | 
					
						
							|  |  |  | ] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | _STATE_TO_DESCRIPTION_MAP = { | 
					
						
							|  |  |  |     PENDING: "pending", | 
					
						
							|  |  |  |     RUNNING: "running", | 
					
						
							|  |  |  |     CANCELLED: "cancelled", | 
					
						
							|  |  |  |     CANCELLED_AND_NOTIFIED: "cancelled", | 
					
						
							|  |  |  |     FINISHED: "finished" | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | # Logger for internal use by the futures package. | 
					
						
							|  |  |  | LOGGER = logging.getLogger("concurrent.futures") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class Error(Exception): | 
					
						
							|  |  |  |     """Base class for all future-related exceptions.""" | 
					
						
							|  |  |  |     pass | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class CancelledError(Error): | 
					
						
							|  |  |  |     """The Future was cancelled.""" | 
					
						
							|  |  |  |     pass | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class TimeoutError(Error): | 
					
						
							|  |  |  |     """The operation exceeded the given deadline.""" | 
					
						
							|  |  |  |     pass | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class _Waiter(object): | 
					
						
							|  |  |  |     """Provides the event that wait() and as_completed() block on.""" | 
					
						
							|  |  |  |     def __init__(self): | 
					
						
							|  |  |  |         self.event = threading.Event() | 
					
						
							|  |  |  |         self.finished_futures = [] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def add_result(self, future): | 
					
						
							|  |  |  |         self.finished_futures.append(future) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def add_exception(self, future): | 
					
						
							|  |  |  |         self.finished_futures.append(future) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def add_cancelled(self, future): | 
					
						
							|  |  |  |         self.finished_futures.append(future) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-11-17 11:06:29 +00:00
										 |  |  | class _AsCompletedWaiter(_Waiter): | 
					
						
							|  |  |  |     """Used by as_completed().""" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def __init__(self): | 
					
						
							|  |  |  |         super(_AsCompletedWaiter, self).__init__() | 
					
						
							|  |  |  |         self.lock = threading.Lock() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def add_result(self, future): | 
					
						
							|  |  |  |         with self.lock: | 
					
						
							|  |  |  |             super(_AsCompletedWaiter, self).add_result(future) | 
					
						
							|  |  |  |             self.event.set() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def add_exception(self, future): | 
					
						
							|  |  |  |         with self.lock: | 
					
						
							|  |  |  |             super(_AsCompletedWaiter, self).add_exception(future) | 
					
						
							|  |  |  |             self.event.set() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def add_cancelled(self, future): | 
					
						
							|  |  |  |         with self.lock: | 
					
						
							|  |  |  |             super(_AsCompletedWaiter, self).add_cancelled(future) | 
					
						
							|  |  |  |             self.event.set() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-09-18 22:35:02 +00:00
										 |  |  | class _FirstCompletedWaiter(_Waiter): | 
					
						
							| 
									
										
										
										
											2010-11-17 11:06:29 +00:00
										 |  |  |     """Used by wait(return_when=FIRST_COMPLETED).""" | 
					
						
							| 
									
										
										
										
											2010-09-18 22:35:02 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def add_result(self, future): | 
					
						
							|  |  |  |         super().add_result(future) | 
					
						
							|  |  |  |         self.event.set() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def add_exception(self, future): | 
					
						
							|  |  |  |         super().add_exception(future) | 
					
						
							|  |  |  |         self.event.set() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def add_cancelled(self, future): | 
					
						
							|  |  |  |         super().add_cancelled(future) | 
					
						
							|  |  |  |         self.event.set() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class _AllCompletedWaiter(_Waiter): | 
					
						
							|  |  |  |     """Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED).""" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def __init__(self, num_pending_calls, stop_on_exception): | 
					
						
							|  |  |  |         self.num_pending_calls = num_pending_calls | 
					
						
							|  |  |  |         self.stop_on_exception = stop_on_exception | 
					
						
							| 
									
										
										
										
											2012-03-31 20:23:30 +02:00
										 |  |  |         self.lock = threading.Lock() | 
					
						
							| 
									
										
										
										
											2010-09-18 22:35:02 +00:00
										 |  |  |         super().__init__() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def _decrement_pending_calls(self): | 
					
						
							| 
									
										
										
										
											2012-03-31 20:23:30 +02:00
										 |  |  |         with self.lock: | 
					
						
							|  |  |  |             self.num_pending_calls -= 1 | 
					
						
							|  |  |  |             if not self.num_pending_calls: | 
					
						
							|  |  |  |                 self.event.set() | 
					
						
							| 
									
										
										
										
											2010-09-18 22:35:02 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def add_result(self, future): | 
					
						
							|  |  |  |         super().add_result(future) | 
					
						
							|  |  |  |         self._decrement_pending_calls() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def add_exception(self, future): | 
					
						
							|  |  |  |         super().add_exception(future) | 
					
						
							|  |  |  |         if self.stop_on_exception: | 
					
						
							|  |  |  |             self.event.set() | 
					
						
							|  |  |  |         else: | 
					
						
							|  |  |  |             self._decrement_pending_calls() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def add_cancelled(self, future): | 
					
						
							|  |  |  |         super().add_cancelled(future) | 
					
						
							|  |  |  |         self._decrement_pending_calls() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class _AcquireFutures(object): | 
					
						
							|  |  |  |     """A context manager that does an ordered acquire of Future conditions.""" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def __init__(self, futures): | 
					
						
							|  |  |  |         self.futures = sorted(futures, key=id) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def __enter__(self): | 
					
						
							|  |  |  |         for future in self.futures: | 
					
						
							|  |  |  |             future._condition.acquire() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def __exit__(self, *args): | 
					
						
							|  |  |  |         for future in self.futures: | 
					
						
							|  |  |  |             future._condition.release() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def _create_and_install_waiters(fs, return_when): | 
					
						
							| 
									
										
										
										
											2010-11-17 11:06:29 +00:00
										 |  |  |     if return_when == _AS_COMPLETED: | 
					
						
							|  |  |  |         waiter = _AsCompletedWaiter() | 
					
						
							|  |  |  |     elif return_when == FIRST_COMPLETED: | 
					
						
							| 
									
										
										
										
											2010-09-18 22:35:02 +00:00
										 |  |  |         waiter = _FirstCompletedWaiter() | 
					
						
							|  |  |  |     else: | 
					
						
							|  |  |  |         pending_count = sum( | 
					
						
							|  |  |  |                 f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         if return_when == FIRST_EXCEPTION: | 
					
						
							|  |  |  |             waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True) | 
					
						
							|  |  |  |         elif return_when == ALL_COMPLETED: | 
					
						
							|  |  |  |             waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False) | 
					
						
							|  |  |  |         else: | 
					
						
							|  |  |  |             raise ValueError("Invalid return condition: %r" % return_when) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     for f in fs: | 
					
						
							|  |  |  |         f._waiters.append(waiter) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     return waiter | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def as_completed(fs, timeout=None): | 
					
						
							|  |  |  |     """An iterator over the given futures that yields each as it completes.
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     Args: | 
					
						
							|  |  |  |         fs: The sequence of Futures (possibly created by different Executors) to | 
					
						
							|  |  |  |             iterate over. | 
					
						
							|  |  |  |         timeout: The maximum number of seconds to wait. If None, then there | 
					
						
							|  |  |  |             is no limit on the wait time. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     Returns: | 
					
						
							|  |  |  |         An iterator that yields the given Futures as they complete (finished or | 
					
						
							|  |  |  |         cancelled). | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     Raises: | 
					
						
							|  |  |  |         TimeoutError: If the entire result iterator could not be generated | 
					
						
							|  |  |  |             before the given timeout. | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     if timeout is not None: | 
					
						
							|  |  |  |         end_time = timeout + time.time() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     with _AcquireFutures(fs): | 
					
						
							|  |  |  |         finished = set( | 
					
						
							|  |  |  |                 f for f in fs | 
					
						
							|  |  |  |                 if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) | 
					
						
							|  |  |  |         pending = set(fs) - finished | 
					
						
							| 
									
										
										
										
											2010-11-17 11:06:29 +00:00
										 |  |  |         waiter = _create_and_install_waiters(fs, _AS_COMPLETED) | 
					
						
							| 
									
										
										
										
											2010-09-18 22:35:02 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  |     try: | 
					
						
							| 
									
										
										
										
											2012-10-01 12:53:43 -07:00
										 |  |  |         yield from finished | 
					
						
							| 
									
										
										
										
											2010-09-18 22:35:02 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  |         while pending: | 
					
						
							|  |  |  |             if timeout is None: | 
					
						
							|  |  |  |                 wait_timeout = None | 
					
						
							|  |  |  |             else: | 
					
						
							|  |  |  |                 wait_timeout = end_time - time.time() | 
					
						
							|  |  |  |                 if wait_timeout < 0: | 
					
						
							|  |  |  |                     raise TimeoutError( | 
					
						
							|  |  |  |                             '%d (of %d) futures unfinished' % ( | 
					
						
							|  |  |  |                             len(pending), len(fs))) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-11-17 11:06:29 +00:00
										 |  |  |             waiter.event.wait(wait_timeout) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             with waiter.lock: | 
					
						
							|  |  |  |                 finished = waiter.finished_futures | 
					
						
							|  |  |  |                 waiter.finished_futures = [] | 
					
						
							|  |  |  |                 waiter.event.clear() | 
					
						
							| 
									
										
										
										
											2010-09-18 22:35:02 +00:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-11-17 11:06:29 +00:00
										 |  |  |             for future in finished: | 
					
						
							| 
									
										
										
										
											2010-09-18 22:35:02 +00:00
										 |  |  |                 yield future | 
					
						
							|  |  |  |                 pending.remove(future) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     finally: | 
					
						
							|  |  |  |         for f in fs: | 
					
						
							|  |  |  |             f._waiters.remove(waiter) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | DoneAndNotDoneFutures = collections.namedtuple( | 
					
						
							|  |  |  |         'DoneAndNotDoneFutures', 'done not_done') | 
					
						
							|  |  |  | def wait(fs, timeout=None, return_when=ALL_COMPLETED): | 
					
						
							|  |  |  |     """Wait for the futures in the given sequence to complete.
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     Args: | 
					
						
							|  |  |  |         fs: The sequence of Futures (possibly created by different Executors) to | 
					
						
							|  |  |  |             wait upon. | 
					
						
							|  |  |  |         timeout: The maximum number of seconds to wait. If None, then there | 
					
						
							|  |  |  |             is no limit on the wait time. | 
					
						
							|  |  |  |         return_when: Indicates when this function should return. The options | 
					
						
							|  |  |  |             are: | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             FIRST_COMPLETED - Return when any future finishes or is | 
					
						
							|  |  |  |                               cancelled. | 
					
						
							|  |  |  |             FIRST_EXCEPTION - Return when any future finishes by raising an | 
					
						
							|  |  |  |                               exception. If no future raises an exception | 
					
						
							|  |  |  |                               then it is equivalent to ALL_COMPLETED. | 
					
						
							|  |  |  |             ALL_COMPLETED -   Return when all futures finish or are cancelled. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     Returns: | 
					
						
							|  |  |  |         A named 2-tuple of sets. The first set, named 'done', contains the | 
					
						
							|  |  |  |         futures that completed (is finished or cancelled) before the wait | 
					
						
							|  |  |  |         completed. The second set, named 'not_done', contains uncompleted | 
					
						
							|  |  |  |         futures. | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     with _AcquireFutures(fs): | 
					
						
							|  |  |  |         done = set(f for f in fs | 
					
						
							|  |  |  |                    if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) | 
					
						
							|  |  |  |         not_done = set(fs) - done | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         if (return_when == FIRST_COMPLETED) and done: | 
					
						
							|  |  |  |             return DoneAndNotDoneFutures(done, not_done) | 
					
						
							|  |  |  |         elif (return_when == FIRST_EXCEPTION) and done: | 
					
						
							|  |  |  |             if any(f for f in done | 
					
						
							|  |  |  |                    if not f.cancelled() and f.exception() is not None): | 
					
						
							|  |  |  |                 return DoneAndNotDoneFutures(done, not_done) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         if len(done) == len(fs): | 
					
						
							|  |  |  |             return DoneAndNotDoneFutures(done, not_done) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         waiter = _create_and_install_waiters(fs, return_when) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     waiter.event.wait(timeout) | 
					
						
							|  |  |  |     for f in fs: | 
					
						
							|  |  |  |         f._waiters.remove(waiter) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     done.update(waiter.finished_futures) | 
					
						
							|  |  |  |     return DoneAndNotDoneFutures(done, set(fs) - done) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class Future(object): | 
					
						
							|  |  |  |     """Represents the result of an asynchronous computation.""" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def __init__(self): | 
					
						
							|  |  |  |         """Initializes the future. Should not be called by clients.""" | 
					
						
							|  |  |  |         self._condition = threading.Condition() | 
					
						
							|  |  |  |         self._state = PENDING | 
					
						
							|  |  |  |         self._result = None | 
					
						
							|  |  |  |         self._exception = None | 
					
						
							|  |  |  |         self._waiters = [] | 
					
						
							|  |  |  |         self._done_callbacks = [] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def _invoke_callbacks(self): | 
					
						
							|  |  |  |         for callback in self._done_callbacks: | 
					
						
							|  |  |  |             try: | 
					
						
							|  |  |  |                 callback(self) | 
					
						
							|  |  |  |             except Exception: | 
					
						
							|  |  |  |                 LOGGER.exception('exception calling callback for %r', self) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def __repr__(self): | 
					
						
							|  |  |  |         with self._condition: | 
					
						
							|  |  |  |             if self._state == FINISHED: | 
					
						
							|  |  |  |                 if self._exception: | 
					
						
							|  |  |  |                     return '<Future at %s state=%s raised %s>' % ( | 
					
						
							|  |  |  |                         hex(id(self)), | 
					
						
							|  |  |  |                         _STATE_TO_DESCRIPTION_MAP[self._state], | 
					
						
							|  |  |  |                         self._exception.__class__.__name__) | 
					
						
							|  |  |  |                 else: | 
					
						
							|  |  |  |                     return '<Future at %s state=%s returned %s>' % ( | 
					
						
							|  |  |  |                         hex(id(self)), | 
					
						
							|  |  |  |                         _STATE_TO_DESCRIPTION_MAP[self._state], | 
					
						
							|  |  |  |                         self._result.__class__.__name__) | 
					
						
							|  |  |  |             return '<Future at %s state=%s>' % ( | 
					
						
							|  |  |  |                     hex(id(self)), | 
					
						
							|  |  |  |                    _STATE_TO_DESCRIPTION_MAP[self._state]) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def cancel(self): | 
					
						
							|  |  |  |         """Cancel the future if possible.
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         Returns True if the future was cancelled, False otherwise. A future | 
					
						
							|  |  |  |         cannot be cancelled if it is running or has already completed. | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         with self._condition: | 
					
						
							|  |  |  |             if self._state in [RUNNING, FINISHED]: | 
					
						
							|  |  |  |                 return False | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: | 
					
						
							|  |  |  |                 return True | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             self._state = CANCELLED | 
					
						
							|  |  |  |             self._condition.notify_all() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self._invoke_callbacks() | 
					
						
							|  |  |  |         return True | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def cancelled(self): | 
					
						
							|  |  |  |         """Return True if the future has cancelled.""" | 
					
						
							|  |  |  |         with self._condition: | 
					
						
							|  |  |  |             return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def running(self): | 
					
						
							|  |  |  |         """Return True if the future is currently executing.""" | 
					
						
							|  |  |  |         with self._condition: | 
					
						
							|  |  |  |             return self._state == RUNNING | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def done(self): | 
					
						
							|  |  |  |         """Return True of the future was cancelled or finished executing.""" | 
					
						
							|  |  |  |         with self._condition: | 
					
						
							|  |  |  |             return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def __get_result(self): | 
					
						
							|  |  |  |         if self._exception: | 
					
						
							|  |  |  |             raise self._exception | 
					
						
							|  |  |  |         else: | 
					
						
							|  |  |  |             return self._result | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def add_done_callback(self, fn): | 
					
						
							|  |  |  |         """Attaches a callable that will be called when the future finishes.
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         Args: | 
					
						
							|  |  |  |             fn: A callable that will be called with this future as its only | 
					
						
							|  |  |  |                 argument when the future completes or is cancelled. The callable | 
					
						
							|  |  |  |                 will always be called by a thread in the same process in which | 
					
						
							|  |  |  |                 it was added. If the future has already completed or been | 
					
						
							|  |  |  |                 cancelled then the callable will be called immediately. These | 
					
						
							|  |  |  |                 callables are called in the order that they were added. | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         with self._condition: | 
					
						
							|  |  |  |             if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]: | 
					
						
							|  |  |  |                 self._done_callbacks.append(fn) | 
					
						
							|  |  |  |                 return | 
					
						
							|  |  |  |         fn(self) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def result(self, timeout=None): | 
					
						
							|  |  |  |         """Return the result of the call that the future represents.
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         Args: | 
					
						
							|  |  |  |             timeout: The number of seconds to wait for the result if the future | 
					
						
							|  |  |  |                 isn't done. If None, then there is no limit on the wait time. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         Returns: | 
					
						
							|  |  |  |             The result of the call that the future represents. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         Raises: | 
					
						
							|  |  |  |             CancelledError: If the future was cancelled. | 
					
						
							|  |  |  |             TimeoutError: If the future didn't finish executing before the given | 
					
						
							|  |  |  |                 timeout. | 
					
						
							|  |  |  |             Exception: If the call raised then that exception will be raised. | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         with self._condition: | 
					
						
							|  |  |  |             if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: | 
					
						
							|  |  |  |                 raise CancelledError() | 
					
						
							|  |  |  |             elif self._state == FINISHED: | 
					
						
							|  |  |  |                 return self.__get_result() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             self._condition.wait(timeout) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: | 
					
						
							|  |  |  |                 raise CancelledError() | 
					
						
							|  |  |  |             elif self._state == FINISHED: | 
					
						
							|  |  |  |                 return self.__get_result() | 
					
						
							|  |  |  |             else: | 
					
						
							|  |  |  |                 raise TimeoutError() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def exception(self, timeout=None): | 
					
						
							|  |  |  |         """Return the exception raised by the call that the future represents.
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         Args: | 
					
						
							|  |  |  |             timeout: The number of seconds to wait for the exception if the | 
					
						
							|  |  |  |                 future isn't done. If None, then there is no limit on the wait | 
					
						
							|  |  |  |                 time. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         Returns: | 
					
						
							|  |  |  |             The exception raised by the call that the future represents or None | 
					
						
							|  |  |  |             if the call completed without raising. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         Raises: | 
					
						
							|  |  |  |             CancelledError: If the future was cancelled. | 
					
						
							|  |  |  |             TimeoutError: If the future didn't finish executing before the given | 
					
						
							|  |  |  |                 timeout. | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         with self._condition: | 
					
						
							|  |  |  |             if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: | 
					
						
							|  |  |  |                 raise CancelledError() | 
					
						
							|  |  |  |             elif self._state == FINISHED: | 
					
						
							|  |  |  |                 return self._exception | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             self._condition.wait(timeout) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: | 
					
						
							|  |  |  |                 raise CancelledError() | 
					
						
							|  |  |  |             elif self._state == FINISHED: | 
					
						
							|  |  |  |                 return self._exception | 
					
						
							|  |  |  |             else: | 
					
						
							|  |  |  |                 raise TimeoutError() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     # The following methods should only be used by Executors and in tests. | 
					
						
							|  |  |  |     def set_running_or_notify_cancel(self): | 
					
						
							|  |  |  |         """Mark the future as running or process any cancel notifications.
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         Should only be used by Executor implementations and unit tests. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         If the future has been cancelled (cancel() was called and returned | 
					
						
							|  |  |  |         True) then any threads waiting on the future completing (though calls | 
					
						
							|  |  |  |         to as_completed() or wait()) are notified and False is returned. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         If the future was not cancelled then it is put in the running state | 
					
						
							|  |  |  |         (future calls to running() will return True) and True is returned. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         This method should be called by Executor implementations before | 
					
						
							|  |  |  |         executing the work associated with this future. If this method returns | 
					
						
							|  |  |  |         False then the work should not be executed. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         Returns: | 
					
						
							|  |  |  |             False if the Future was cancelled, True otherwise. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         Raises: | 
					
						
							|  |  |  |             RuntimeError: if this method was already called or if set_result() | 
					
						
							|  |  |  |                 or set_exception() was called. | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         with self._condition: | 
					
						
							|  |  |  |             if self._state == CANCELLED: | 
					
						
							|  |  |  |                 self._state = CANCELLED_AND_NOTIFIED | 
					
						
							|  |  |  |                 for waiter in self._waiters: | 
					
						
							|  |  |  |                     waiter.add_cancelled(self) | 
					
						
							|  |  |  |                 # self._condition.notify_all() is not necessary because | 
					
						
							|  |  |  |                 # self.cancel() triggers a notification. | 
					
						
							|  |  |  |                 return False | 
					
						
							|  |  |  |             elif self._state == PENDING: | 
					
						
							|  |  |  |                 self._state = RUNNING | 
					
						
							|  |  |  |                 return True | 
					
						
							|  |  |  |             else: | 
					
						
							|  |  |  |                 LOGGER.critical('Future %s in unexpected state: %s', | 
					
						
							| 
									
										
										
										
											2012-06-11 12:59:07 +10:00
										 |  |  |                                 id(self), | 
					
						
							|  |  |  |                                 self._state) | 
					
						
							| 
									
										
										
										
											2010-09-18 22:35:02 +00:00
										 |  |  |                 raise RuntimeError('Future in unexpected state') | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def set_result(self, result): | 
					
						
							|  |  |  |         """Sets the return value of work associated with the future.
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         Should only be used by Executor implementations and unit tests. | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         with self._condition: | 
					
						
							|  |  |  |             self._result = result | 
					
						
							|  |  |  |             self._state = FINISHED | 
					
						
							|  |  |  |             for waiter in self._waiters: | 
					
						
							|  |  |  |                 waiter.add_result(self) | 
					
						
							|  |  |  |             self._condition.notify_all() | 
					
						
							|  |  |  |         self._invoke_callbacks() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def set_exception(self, exception): | 
					
						
							|  |  |  |         """Sets the result of the future as being the given exception.
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         Should only be used by Executor implementations and unit tests. | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         with self._condition: | 
					
						
							|  |  |  |             self._exception = exception | 
					
						
							|  |  |  |             self._state = FINISHED | 
					
						
							|  |  |  |             for waiter in self._waiters: | 
					
						
							|  |  |  |                 waiter.add_exception(self) | 
					
						
							|  |  |  |             self._condition.notify_all() | 
					
						
							|  |  |  |         self._invoke_callbacks() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class Executor(object): | 
					
						
							|  |  |  |     """This is an abstract base class for concrete asynchronous executors.""" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def submit(self, fn, *args, **kwargs): | 
					
						
							|  |  |  |         """Submits a callable to be executed with the given arguments.
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         Schedules the callable to be executed as fn(*args, **kwargs) and returns | 
					
						
							|  |  |  |         a Future instance representing the execution of the callable. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         Returns: | 
					
						
							|  |  |  |             A Future representing the given call. | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         raise NotImplementedError() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def map(self, fn, *iterables, timeout=None): | 
					
						
							|  |  |  |         """Returns a iterator equivalent to map(fn, iter).
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         Args: | 
					
						
							|  |  |  |             fn: A callable that will take take as many arguments as there are | 
					
						
							|  |  |  |                 passed iterables. | 
					
						
							|  |  |  |             timeout: The maximum number of seconds to wait. If None, then there | 
					
						
							|  |  |  |                 is no limit on the wait time. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         Returns: | 
					
						
							|  |  |  |             An iterator equivalent to: map(func, *iterables) but the calls may | 
					
						
							|  |  |  |             be evaluated out-of-order. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         Raises: | 
					
						
							|  |  |  |             TimeoutError: If the entire result iterator could not be generated | 
					
						
							|  |  |  |                 before the given timeout. | 
					
						
							|  |  |  |             Exception: If fn(*args) raises for any values. | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         if timeout is not None: | 
					
						
							|  |  |  |             end_time = timeout + time.time() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         fs = [self.submit(fn, *args) for args in zip(*iterables)] | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2011-04-08 08:19:33 +10:00
										 |  |  |         # Yield must be hidden in closure so that the futures are submitted | 
					
						
							|  |  |  |         # before the first iterator value is required. | 
					
						
							|  |  |  |         def result_iterator(): | 
					
						
							|  |  |  |             try: | 
					
						
							|  |  |  |                 for future in fs: | 
					
						
							|  |  |  |                     if timeout is None: | 
					
						
							|  |  |  |                         yield future.result() | 
					
						
							|  |  |  |                     else: | 
					
						
							|  |  |  |                         yield future.result(end_time - time.time()) | 
					
						
							|  |  |  |             finally: | 
					
						
							|  |  |  |                 for future in fs: | 
					
						
							|  |  |  |                     future.cancel() | 
					
						
							|  |  |  |         return result_iterator() | 
					
						
							| 
									
										
										
										
											2010-09-18 22:35:02 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def shutdown(self, wait=True): | 
					
						
							|  |  |  |         """Clean-up the resources associated with the Executor.
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         It is safe to call this method several times. Otherwise, no other | 
					
						
							|  |  |  |         methods can be called after this one. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         Args: | 
					
						
							|  |  |  |             wait: If True then shutdown will not return until all running | 
					
						
							|  |  |  |                 futures have finished executing and the resources used by the | 
					
						
							|  |  |  |                 executor have been reclaimed. | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         pass | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def __enter__(self): | 
					
						
							|  |  |  |         return self | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def __exit__(self, exc_type, exc_val, exc_tb): | 
					
						
							|  |  |  |         self.shutdown(wait=True) | 
					
						
							|  |  |  |         return False |