| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  | """Support for tasks, coroutines and the scheduler.""" | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-12-10 18:36:12 -05:00
										 |  |  | __all__ = ( | 
					
						
							| 
									
										
										
										
											2017-12-15 07:04:38 +02:00
										 |  |  |     'Task', 'create_task', | 
					
						
							| 
									
										
										
										
											2017-12-10 18:36:12 -05:00
										 |  |  |     'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED', | 
					
						
							| 
									
										
										
										
											2017-12-11 10:03:48 -05:00
										 |  |  |     'wait', 'wait_for', 'as_completed', 'sleep', | 
					
						
							| 
									
										
										
										
											2017-12-10 18:36:12 -05:00
										 |  |  |     'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe', | 
					
						
							| 
									
										
										
										
											2017-12-16 21:58:38 +02:00
										 |  |  |     'current_task', 'all_tasks', | 
					
						
							|  |  |  |     '_register_task', '_unregister_task', '_enter_task', '_leave_task', | 
					
						
							| 
									
										
										
										
											2017-12-10 18:36:12 -05:00
										 |  |  | ) | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  | import concurrent.futures | 
					
						
							|  |  |  | import functools | 
					
						
							|  |  |  | import inspect | 
					
						
							| 
									
										
										
										
											2017-12-09 00:23:48 +02:00
										 |  |  | import types | 
					
						
							| 
									
										
										
										
											2015-05-11 14:48:38 -04:00
										 |  |  | import warnings | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  | import weakref | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-10-28 12:52:37 -04:00
										 |  |  | from . import base_tasks | 
					
						
							| 
									
										
										
										
											2014-06-29 00:46:45 +02:00
										 |  |  | from . import coroutines | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  | from . import events | 
					
						
							|  |  |  | from . import futures | 
					
						
							| 
									
										
										
										
											2014-06-29 00:46:45 +02:00
										 |  |  | from .coroutines import coroutine | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-12-16 21:58:38 +02:00
										 |  |  | def current_task(loop=None): | 
					
						
							|  |  |  |     """Return a currently executed task.""" | 
					
						
							|  |  |  |     if loop is None: | 
					
						
							|  |  |  |         loop = events.get_running_loop() | 
					
						
							|  |  |  |     return _current_tasks.get(loop) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def all_tasks(loop=None): | 
					
						
							|  |  |  |     """Return a set of all tasks for the loop.""" | 
					
						
							|  |  |  |     if loop is None: | 
					
						
							|  |  |  |         loop = events.get_event_loop() | 
					
						
							|  |  |  |     return {t for t, l in _all_tasks.items() if l is loop} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  | class Task(futures.Future): | 
					
						
							|  |  |  |     """A coroutine wrapped in a Future.""" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     # An important invariant maintained while a Task not done: | 
					
						
							|  |  |  |     # | 
					
						
							|  |  |  |     # - Either _fut_waiter is None, and _step() is scheduled; | 
					
						
							|  |  |  |     # - or _fut_waiter is some Future, and _step() is *not* scheduled. | 
					
						
							|  |  |  |     # | 
					
						
							|  |  |  |     # The only transition from the latter to the former is through | 
					
						
							|  |  |  |     # _wakeup().  When _fut_waiter is not None, one of its callbacks | 
					
						
							|  |  |  |     # must be _wakeup(). | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2014-12-04 23:00:13 +01:00
										 |  |  |     # If False, don't log a message if the task is destroyed whereas its | 
					
						
							|  |  |  |     # status is still pending | 
					
						
							|  |  |  |     _log_destroy_pending = True | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2013-12-06 12:57:40 -08:00
										 |  |  |     @classmethod | 
					
						
							|  |  |  |     def current_task(cls, loop=None): | 
					
						
							|  |  |  |         """Return the currently running task in an event loop or None.
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         By default the current task for the current event loop is returned. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         None is returned when called not in the context of a Task. | 
					
						
							|  |  |  |         """
 | 
					
						
							| 
									
										
										
										
											2017-12-16 21:58:38 +02:00
										 |  |  |         warnings.warn("Task.current_task() is deprecated, " | 
					
						
							|  |  |  |                       "use asyncio.current_task() instead", | 
					
						
							|  |  |  |                       PendingDeprecationWarning, | 
					
						
							|  |  |  |                       stacklevel=2) | 
					
						
							| 
									
										
										
										
											2013-12-06 12:57:40 -08:00
										 |  |  |         if loop is None: | 
					
						
							|  |  |  |             loop = events.get_event_loop() | 
					
						
							| 
									
										
										
										
											2017-12-16 21:58:38 +02:00
										 |  |  |         return current_task(loop) | 
					
						
							| 
									
										
										
										
											2013-12-06 12:57:40 -08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |     @classmethod | 
					
						
							|  |  |  |     def all_tasks(cls, loop=None): | 
					
						
							|  |  |  |         """Return a set of all tasks for an event loop.
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         By default all tasks for the current event loop are returned. | 
					
						
							|  |  |  |         """
 | 
					
						
							| 
									
										
										
										
											2017-12-16 21:58:38 +02:00
										 |  |  |         warnings.warn("Task.all_tasks() is deprecated, " | 
					
						
							|  |  |  |                       "use asyncio.all_tasks() instead", | 
					
						
							|  |  |  |                       PendingDeprecationWarning, | 
					
						
							|  |  |  |                       stacklevel=2) | 
					
						
							|  |  |  |         return all_tasks(loop) | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def __init__(self, coro, *, loop=None): | 
					
						
							|  |  |  |         super().__init__(loop=loop) | 
					
						
							| 
									
										
										
										
											2014-06-27 13:52:20 +02:00
										 |  |  |         if self._source_traceback: | 
					
						
							|  |  |  |             del self._source_traceback[-1] | 
					
						
							| 
									
										
										
										
											2017-12-15 07:04:38 +02:00
										 |  |  |         if not coroutines.iscoroutine(coro): | 
					
						
							|  |  |  |             # raise after Future.__init__(), attrs are required for __del__ | 
					
						
							|  |  |  |             # prevent logging for pending task in __del__ | 
					
						
							|  |  |  |             self._log_destroy_pending = False | 
					
						
							|  |  |  |             raise TypeError(f"a coroutine was expected, got {coro!r}") | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |         self._must_cancel = False | 
					
						
							| 
									
										
										
										
											2017-12-15 07:04:38 +02:00
										 |  |  |         self._fut_waiter = None | 
					
						
							|  |  |  |         self._coro = coro | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |         self._loop.call_soon(self._step) | 
					
						
							| 
									
										
										
										
											2017-12-16 21:58:38 +02:00
										 |  |  |         _register_task(self._loop, self) | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-04-25 10:57:18 +09:00
										 |  |  |     def __del__(self): | 
					
						
							|  |  |  |         if self._state == futures._PENDING and self._log_destroy_pending: | 
					
						
							|  |  |  |             context = { | 
					
						
							|  |  |  |                 'task': self, | 
					
						
							|  |  |  |                 'message': 'Task was destroyed but it is pending!', | 
					
						
							|  |  |  |             } | 
					
						
							|  |  |  |             if self._source_traceback: | 
					
						
							|  |  |  |                 context['source_traceback'] = self._source_traceback | 
					
						
							|  |  |  |             self._loop.call_exception_handler(context) | 
					
						
							|  |  |  |         futures.Future.__del__(self) | 
					
						
							| 
									
										
										
										
											2014-06-24 22:37:53 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2014-07-29 12:58:23 +02:00
										 |  |  |     def _repr_info(self): | 
					
						
							| 
									
										
										
										
											2016-10-28 12:52:37 -04:00
										 |  |  |         return base_tasks._task_repr_info(self) | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def get_stack(self, *, limit=None): | 
					
						
							|  |  |  |         """Return the list of stack frames for this task's coroutine.
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2014-12-02 17:57:04 +01:00
										 |  |  |         If the coroutine is not done, this returns the stack where it is | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |         suspended.  If the coroutine has completed successfully or was | 
					
						
							|  |  |  |         cancelled, this returns an empty list.  If the coroutine was | 
					
						
							|  |  |  |         terminated by an exception, this returns the list of traceback | 
					
						
							|  |  |  |         frames. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         The frames are always ordered from oldest to newest. | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2014-02-18 22:27:48 -05:00
										 |  |  |         The optional limit gives the maximum number of frames to | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |         return; by default all available frames are returned.  Its | 
					
						
							|  |  |  |         meaning differs depending on whether a stack or a traceback is | 
					
						
							|  |  |  |         returned: the newest frames of a stack are returned, but the | 
					
						
							|  |  |  |         oldest frames of a traceback are returned.  (This matches the | 
					
						
							|  |  |  |         behavior of the traceback module.) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         For reasons beyond our control, only one stack frame is | 
					
						
							|  |  |  |         returned for a suspended coroutine. | 
					
						
							|  |  |  |         """
 | 
					
						
							| 
									
										
										
										
											2016-10-28 12:52:37 -04:00
										 |  |  |         return base_tasks._task_get_stack(self, limit) | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def print_stack(self, *, limit=None, file=None): | 
					
						
							|  |  |  |         """Print the stack or traceback for this task's coroutine.
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         This produces output similar to that of the traceback module, | 
					
						
							|  |  |  |         for the frames retrieved by get_stack().  The limit argument | 
					
						
							|  |  |  |         is passed to get_stack().  The file argument is an I/O stream | 
					
						
							| 
									
										
										
										
											2014-09-24 13:13:45 -04:00
										 |  |  |         to which the output is written; by default output is written | 
					
						
							|  |  |  |         to sys.stderr. | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |         """
 | 
					
						
							| 
									
										
										
										
											2016-10-28 12:52:37 -04:00
										 |  |  |         return base_tasks._task_print_stack(self, limit, file) | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def cancel(self): | 
					
						
							| 
									
										
										
										
											2014-09-24 13:13:45 -04:00
										 |  |  |         """Request that this task cancel itself.
 | 
					
						
							| 
									
										
										
										
											2014-04-07 11:18:06 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2014-06-02 23:06:46 +02:00
										 |  |  |         This arranges for a CancelledError to be thrown into the | 
					
						
							| 
									
										
										
										
											2014-04-07 11:18:06 +02:00
										 |  |  |         wrapped coroutine on the next cycle through the event loop. | 
					
						
							|  |  |  |         The coroutine then has a chance to clean up or even deny | 
					
						
							|  |  |  |         the request using try/except/finally. | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2014-09-24 13:13:45 -04:00
										 |  |  |         Unlike Future.cancel, this does not guarantee that the | 
					
						
							| 
									
										
										
										
											2014-04-07 11:18:06 +02:00
										 |  |  |         task will be cancelled: the exception might be caught and | 
					
						
							| 
									
										
										
										
											2014-09-24 13:13:45 -04:00
										 |  |  |         acted upon, delaying cancellation of the task or preventing | 
					
						
							|  |  |  |         cancellation completely.  The task may also return a value or | 
					
						
							|  |  |  |         raise a different exception. | 
					
						
							| 
									
										
										
										
											2014-04-07 11:18:06 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  |         Immediately after this method is called, Task.cancelled() will | 
					
						
							|  |  |  |         not return True (unless the task was already cancelled).  A | 
					
						
							|  |  |  |         task will be marked as cancelled when the wrapped coroutine | 
					
						
							|  |  |  |         terminates with a CancelledError exception (even if cancel() | 
					
						
							|  |  |  |         was not called). | 
					
						
							|  |  |  |         """
 | 
					
						
							| 
									
										
										
										
											2017-06-11 13:49:18 +00:00
										 |  |  |         self._log_traceback = False | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |         if self.done(): | 
					
						
							|  |  |  |             return False | 
					
						
							|  |  |  |         if self._fut_waiter is not None: | 
					
						
							|  |  |  |             if self._fut_waiter.cancel(): | 
					
						
							|  |  |  |                 # Leave self._fut_waiter; it may be a Task that | 
					
						
							|  |  |  |                 # catches and ignores the cancellation so we may have | 
					
						
							|  |  |  |                 # to cancel it again later. | 
					
						
							|  |  |  |                 return True | 
					
						
							|  |  |  |         # It must be the case that self._step is already scheduled. | 
					
						
							|  |  |  |         self._must_cancel = True | 
					
						
							|  |  |  |         return True | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-11-20 12:41:03 -05:00
										 |  |  |     def _step(self, exc=None): | 
					
						
							| 
									
										
										
										
											2017-12-10 18:36:12 -05:00
										 |  |  |         assert not self.done(), f'_step(): already done: {self!r}, {exc!r}' | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |         if self._must_cancel: | 
					
						
							|  |  |  |             if not isinstance(exc, futures.CancelledError): | 
					
						
							|  |  |  |                 exc = futures.CancelledError() | 
					
						
							|  |  |  |             self._must_cancel = False | 
					
						
							|  |  |  |         coro = self._coro | 
					
						
							|  |  |  |         self._fut_waiter = None | 
					
						
							| 
									
										
										
										
											2013-12-06 12:57:40 -08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-12-16 21:58:38 +02:00
										 |  |  |         _enter_task(self._loop, self) | 
					
						
							| 
									
										
										
										
											2015-11-20 12:41:03 -05:00
										 |  |  |         # Call either coro.throw(exc) or coro.send(None). | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |         try: | 
					
						
							| 
									
										
										
										
											2015-11-20 12:41:03 -05:00
										 |  |  |             if exc is None: | 
					
						
							|  |  |  |                 # We use the `send` method directly, because coroutines | 
					
						
							|  |  |  |                 # don't have `__iter__` and `__next__` methods. | 
					
						
							|  |  |  |                 result = coro.send(None) | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |             else: | 
					
						
							| 
									
										
										
										
											2015-11-20 12:41:03 -05:00
										 |  |  |                 result = coro.throw(exc) | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |         except StopIteration as exc: | 
					
						
							| 
									
										
										
										
											2017-05-11 21:18:38 +09:00
										 |  |  |             if self._must_cancel: | 
					
						
							|  |  |  |                 # Task is cancelled right before coro stops. | 
					
						
							|  |  |  |                 self._must_cancel = False | 
					
						
							|  |  |  |                 self.set_exception(futures.CancelledError()) | 
					
						
							|  |  |  |             else: | 
					
						
							|  |  |  |                 self.set_result(exc.value) | 
					
						
							| 
									
										
										
										
											2016-10-09 12:19:12 -04:00
										 |  |  |         except futures.CancelledError: | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |             super().cancel()  # I.e., Future.cancel(self). | 
					
						
							|  |  |  |         except Exception as exc: | 
					
						
							|  |  |  |             self.set_exception(exc) | 
					
						
							|  |  |  |         except BaseException as exc: | 
					
						
							|  |  |  |             self.set_exception(exc) | 
					
						
							|  |  |  |             raise | 
					
						
							|  |  |  |         else: | 
					
						
							| 
									
										
										
										
											2016-09-09 12:54:54 -07:00
										 |  |  |             blocking = getattr(result, '_asyncio_future_blocking', None) | 
					
						
							|  |  |  |             if blocking is not None: | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |                 # Yielded Future must come from Future.__iter__(). | 
					
						
							| 
									
										
										
										
											2015-12-11 11:33:59 -05:00
										 |  |  |                 if result._loop is not self._loop: | 
					
						
							| 
									
										
										
										
											2017-12-10 18:36:12 -05:00
										 |  |  |                     new_exc = RuntimeError( | 
					
						
							|  |  |  |                         f'Task {self!r} got Future ' | 
					
						
							|  |  |  |                         f'{result!r} attached to a different loop') | 
					
						
							|  |  |  |                     self._loop.call_soon(self._step, new_exc) | 
					
						
							| 
									
										
										
										
											2016-09-09 12:54:54 -07:00
										 |  |  |                 elif blocking: | 
					
						
							| 
									
										
										
										
											2016-10-09 12:19:12 -04:00
										 |  |  |                     if result is self: | 
					
						
							| 
									
										
										
										
											2017-12-10 18:36:12 -05:00
										 |  |  |                         new_exc = RuntimeError( | 
					
						
							|  |  |  |                             f'Task cannot await on itself: {self!r}') | 
					
						
							|  |  |  |                         self._loop.call_soon(self._step, new_exc) | 
					
						
							| 
									
										
										
										
											2016-10-09 12:19:12 -04:00
										 |  |  |                     else: | 
					
						
							|  |  |  |                         result._asyncio_future_blocking = False | 
					
						
							|  |  |  |                         result.add_done_callback(self._wakeup) | 
					
						
							|  |  |  |                         self._fut_waiter = result | 
					
						
							|  |  |  |                         if self._must_cancel: | 
					
						
							|  |  |  |                             if self._fut_waiter.cancel(): | 
					
						
							|  |  |  |                                 self._must_cancel = False | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |                 else: | 
					
						
							| 
									
										
										
										
											2017-12-10 18:36:12 -05:00
										 |  |  |                     new_exc = RuntimeError( | 
					
						
							|  |  |  |                         f'yield was used instead of yield from ' | 
					
						
							|  |  |  |                         f'in task {self!r} with {result!r}') | 
					
						
							|  |  |  |                     self._loop.call_soon(self._step, new_exc) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |             elif result is None: | 
					
						
							|  |  |  |                 # Bare yield relinquishes control for one event loop iteration. | 
					
						
							|  |  |  |                 self._loop.call_soon(self._step) | 
					
						
							|  |  |  |             elif inspect.isgenerator(result): | 
					
						
							|  |  |  |                 # Yielding a generator is just wrong. | 
					
						
							| 
									
										
										
										
											2017-12-10 18:36:12 -05:00
										 |  |  |                 new_exc = RuntimeError( | 
					
						
							|  |  |  |                     f'yield was used instead of yield from for ' | 
					
						
							|  |  |  |                     f'generator in task {self!r} with {result}') | 
					
						
							|  |  |  |                 self._loop.call_soon(self._step, new_exc) | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |             else: | 
					
						
							|  |  |  |                 # Yielding something else is an error. | 
					
						
							| 
									
										
										
										
											2017-12-10 18:36:12 -05:00
										 |  |  |                 new_exc = RuntimeError(f'Task got bad yield: {result!r}') | 
					
						
							|  |  |  |                 self._loop.call_soon(self._step, new_exc) | 
					
						
							| 
									
										
										
										
											2013-12-06 12:57:40 -08:00
										 |  |  |         finally: | 
					
						
							| 
									
										
										
										
											2017-12-16 21:58:38 +02:00
										 |  |  |             _leave_task(self._loop, self) | 
					
						
							| 
									
										
										
										
											2014-03-04 23:07:08 +01:00
										 |  |  |             self = None  # Needed to break cycles when an exception occurs. | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def _wakeup(self, future): | 
					
						
							|  |  |  |         try: | 
					
						
							| 
									
										
										
										
											2015-11-16 15:12:10 -05:00
										 |  |  |             future.result() | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |         except Exception as exc: | 
					
						
							|  |  |  |             # This may also be a cancellation. | 
					
						
							| 
									
										
										
										
											2015-11-20 12:41:03 -05:00
										 |  |  |             self._step(exc) | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |         else: | 
					
						
							| 
									
										
										
										
											2015-11-16 15:12:10 -05:00
										 |  |  |             # Don't pass the value of `future.result()` explicitly, | 
					
						
							|  |  |  |             # as `Future.__iter__` and `Future.__await__` don't need it. | 
					
						
							|  |  |  |             # If we call `_step(value, None)` instead of `_step()`, | 
					
						
							|  |  |  |             # Python eval loop would use `.send(value)` method call, | 
					
						
							|  |  |  |             # instead of `__next__()`, which is slower for futures | 
					
						
							|  |  |  |             # that return non-generator iterators from their `__iter__`. | 
					
						
							|  |  |  |             self._step() | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |         self = None  # Needed to break cycles when an exception occurs. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-10-28 12:52:37 -04:00
										 |  |  | _PyTask = Task | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | try: | 
					
						
							|  |  |  |     import _asyncio | 
					
						
							|  |  |  | except ImportError: | 
					
						
							|  |  |  |     pass | 
					
						
							|  |  |  | else: | 
					
						
							|  |  |  |     # _CTask is needed for tests. | 
					
						
							|  |  |  |     Task = _CTask = _asyncio.Task | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-12-15 07:04:38 +02:00
										 |  |  | def create_task(coro): | 
					
						
							|  |  |  |     """Schedule the execution of a coroutine object in a spawn task.
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     Return a Task object. | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     loop = events.get_running_loop() | 
					
						
							|  |  |  |     return loop.create_task(coro) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  | # wait() and as_completed() similar to those in PEP 3148. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED | 
					
						
							|  |  |  | FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION | 
					
						
							|  |  |  | ALL_COMPLETED = concurrent.futures.ALL_COMPLETED | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-12-09 00:23:48 +02:00
										 |  |  | async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED): | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |     """Wait for the Futures and coroutines given by fs to complete.
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2014-06-10 11:16:05 +02:00
										 |  |  |     The sequence futures must not be empty. | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |     Coroutines will be wrapped in Tasks. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     Returns two sets of Future: (done, pending). | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     Usage: | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-12-09 00:23:48 +02:00
										 |  |  |         done, pending = await asyncio.wait(fs) | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  |     Note: This does not raise TimeoutError! Futures that aren't done | 
					
						
							|  |  |  |     when the timeout occurs are returned in the second set. | 
					
						
							|  |  |  |     """
 | 
					
						
							| 
									
										
										
										
											2016-09-09 14:26:31 -07:00
										 |  |  |     if futures.isfuture(fs) or coroutines.iscoroutine(fs): | 
					
						
							| 
									
										
										
										
											2017-12-10 18:36:12 -05:00
										 |  |  |         raise TypeError(f"expect a list of futures, not {type(fs).__name__}") | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |     if not fs: | 
					
						
							|  |  |  |         raise ValueError('Set of coroutines/Futures is empty.') | 
					
						
							| 
									
										
										
										
											2014-07-16 18:50:39 +02:00
										 |  |  |     if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED): | 
					
						
							| 
									
										
										
										
											2017-12-10 18:36:12 -05:00
										 |  |  |         raise ValueError(f'Invalid return_when value: {return_when}') | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  |     if loop is None: | 
					
						
							|  |  |  |         loop = events.get_event_loop() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-05-11 14:48:38 -04:00
										 |  |  |     fs = {ensure_future(f, loop=loop) for f in set(fs)} | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-12-09 00:23:48 +02:00
										 |  |  |     return await _wait(fs, timeout, return_when, loop) | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2014-08-28 11:19:25 +02:00
										 |  |  | def _release_waiter(waiter, *args): | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |     if not waiter.done(): | 
					
						
							| 
									
										
										
										
											2014-08-28 11:19:25 +02:00
										 |  |  |         waiter.set_result(None) | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-12-09 00:23:48 +02:00
										 |  |  | async def wait_for(fut, timeout, *, loop=None): | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |     """Wait for the single Future or coroutine to complete, with timeout.
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     Coroutine will be wrapped in Task. | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2014-01-23 17:40:59 +01:00
										 |  |  |     Returns result of the Future or coroutine.  When a timeout occurs, | 
					
						
							|  |  |  |     it cancels the task and raises TimeoutError.  To avoid the task | 
					
						
							|  |  |  |     cancellation, wrap it in shield(). | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-01-15 16:29:10 +01:00
										 |  |  |     If the wait is cancelled, the task is also cancelled. | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-01-15 16:29:10 +01:00
										 |  |  |     This function is a coroutine. | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |     """
 | 
					
						
							|  |  |  |     if loop is None: | 
					
						
							|  |  |  |         loop = events.get_event_loop() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2014-01-29 14:30:38 -08:00
										 |  |  |     if timeout is None: | 
					
						
							| 
									
										
										
										
											2017-12-09 00:23:48 +02:00
										 |  |  |         return await fut | 
					
						
							| 
									
										
										
										
											2014-01-29 14:30:38 -08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-10-05 19:04:39 +03:00
										 |  |  |     if timeout <= 0: | 
					
						
							|  |  |  |         fut = ensure_future(fut, loop=loop) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         if fut.done(): | 
					
						
							|  |  |  |             return fut.result() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         fut.cancel() | 
					
						
							|  |  |  |         raise futures.TimeoutError() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-05-16 15:38:39 -04:00
										 |  |  |     waiter = loop.create_future() | 
					
						
							| 
									
										
										
										
											2014-08-28 11:19:25 +02:00
										 |  |  |     timeout_handle = loop.call_later(timeout, _release_waiter, waiter) | 
					
						
							|  |  |  |     cb = functools.partial(_release_waiter, waiter) | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-05-11 14:48:38 -04:00
										 |  |  |     fut = ensure_future(fut, loop=loop) | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |     fut.add_done_callback(cb) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     try: | 
					
						
							| 
									
										
										
										
											2014-08-28 11:19:25 +02:00
										 |  |  |         # wait until the future completes or the timeout | 
					
						
							| 
									
										
										
										
											2015-01-15 16:29:10 +01:00
										 |  |  |         try: | 
					
						
							| 
									
										
										
										
											2017-12-09 00:23:48 +02:00
										 |  |  |             await waiter | 
					
						
							| 
									
										
										
										
											2015-01-15 16:29:10 +01:00
										 |  |  |         except futures.CancelledError: | 
					
						
							|  |  |  |             fut.remove_done_callback(cb) | 
					
						
							|  |  |  |             fut.cancel() | 
					
						
							|  |  |  |             raise | 
					
						
							| 
									
										
										
										
											2014-08-28 11:19:25 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  |         if fut.done(): | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |             return fut.result() | 
					
						
							|  |  |  |         else: | 
					
						
							|  |  |  |             fut.remove_done_callback(cb) | 
					
						
							| 
									
										
										
										
											2014-01-23 17:40:59 +01:00
										 |  |  |             fut.cancel() | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |             raise futures.TimeoutError() | 
					
						
							|  |  |  |     finally: | 
					
						
							|  |  |  |         timeout_handle.cancel() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-12-09 00:23:48 +02:00
										 |  |  | async def _wait(fs, timeout, return_when, loop): | 
					
						
							| 
									
										
										
										
											2016-04-01 21:39:09 +02:00
										 |  |  |     """Internal helper for wait() and wait_for().
 | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  |     The fs argument must be a collection of Futures. | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     assert fs, 'Set of Futures is empty.' | 
					
						
							| 
									
										
										
										
											2016-05-16 15:38:39 -04:00
										 |  |  |     waiter = loop.create_future() | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |     timeout_handle = None | 
					
						
							|  |  |  |     if timeout is not None: | 
					
						
							|  |  |  |         timeout_handle = loop.call_later(timeout, _release_waiter, waiter) | 
					
						
							|  |  |  |     counter = len(fs) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def _on_completion(f): | 
					
						
							|  |  |  |         nonlocal counter | 
					
						
							|  |  |  |         counter -= 1 | 
					
						
							|  |  |  |         if (counter <= 0 or | 
					
						
							|  |  |  |             return_when == FIRST_COMPLETED or | 
					
						
							|  |  |  |             return_when == FIRST_EXCEPTION and (not f.cancelled() and | 
					
						
							|  |  |  |                                                 f.exception() is not None)): | 
					
						
							|  |  |  |             if timeout_handle is not None: | 
					
						
							|  |  |  |                 timeout_handle.cancel() | 
					
						
							|  |  |  |             if not waiter.done(): | 
					
						
							| 
									
										
										
										
											2014-08-28 11:19:25 +02:00
										 |  |  |                 waiter.set_result(None) | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  |     for f in fs: | 
					
						
							|  |  |  |         f.add_done_callback(_on_completion) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     try: | 
					
						
							| 
									
										
										
										
											2017-12-09 00:23:48 +02:00
										 |  |  |         await waiter | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |     finally: | 
					
						
							|  |  |  |         if timeout_handle is not None: | 
					
						
							|  |  |  |             timeout_handle.cancel() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     done, pending = set(), set() | 
					
						
							|  |  |  |     for f in fs: | 
					
						
							|  |  |  |         f.remove_done_callback(_on_completion) | 
					
						
							|  |  |  |         if f.done(): | 
					
						
							|  |  |  |             done.add(f) | 
					
						
							|  |  |  |         else: | 
					
						
							|  |  |  |             pending.add(f) | 
					
						
							|  |  |  |     return done, pending | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | # This is *not* a @coroutine!  It is just an iterator (yielding Futures). | 
					
						
							|  |  |  | def as_completed(fs, *, loop=None, timeout=None): | 
					
						
							| 
									
										
										
										
											2014-02-12 17:58:19 -08:00
										 |  |  |     """Return an iterator whose values are coroutines.
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     When waiting for the yielded coroutines you'll get the results (or | 
					
						
							|  |  |  |     exceptions!) of the original Futures (or coroutines), in the order | 
					
						
							|  |  |  |     in which and as soon as they complete. | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  |     This differs from PEP 3148; the proper way to use this is: | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         for f in as_completed(fs): | 
					
						
							| 
									
										
										
										
											2017-12-09 00:23:48 +02:00
										 |  |  |             result = await f  # The 'await' may raise. | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |             # Use result. | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-12-09 00:23:48 +02:00
										 |  |  |     If a timeout is specified, the 'await' will raise | 
					
						
							| 
									
										
										
										
											2014-02-12 17:58:19 -08:00
										 |  |  |     TimeoutError when the timeout occurs before all Futures are done. | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  |     Note: The futures 'f' are not necessarily members of fs. | 
					
						
							|  |  |  |     """
 | 
					
						
							| 
									
										
										
										
											2016-09-09 14:26:31 -07:00
										 |  |  |     if futures.isfuture(fs) or coroutines.iscoroutine(fs): | 
					
						
							| 
									
										
										
										
											2017-12-10 18:36:12 -05:00
										 |  |  |         raise TypeError(f"expect a list of futures, not {type(fs).__name__}") | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |     loop = loop if loop is not None else events.get_event_loop() | 
					
						
							| 
									
										
										
										
											2015-05-11 14:48:38 -04:00
										 |  |  |     todo = {ensure_future(f, loop=loop) for f in set(fs)} | 
					
						
							| 
									
										
										
										
											2014-02-12 17:58:19 -08:00
										 |  |  |     from .queues import Queue  # Import here to avoid circular import problem. | 
					
						
							|  |  |  |     done = Queue(loop=loop) | 
					
						
							|  |  |  |     timeout_handle = None | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def _on_timeout(): | 
					
						
							|  |  |  |         for f in todo: | 
					
						
							|  |  |  |             f.remove_done_callback(_on_completion) | 
					
						
							|  |  |  |             done.put_nowait(None)  # Queue a dummy value for _wait_for_one(). | 
					
						
							|  |  |  |         todo.clear()  # Can't do todo.remove(f) in the loop. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def _on_completion(f): | 
					
						
							|  |  |  |         if not todo: | 
					
						
							|  |  |  |             return  # _on_timeout() was here first. | 
					
						
							|  |  |  |         todo.remove(f) | 
					
						
							|  |  |  |         done.put_nowait(f) | 
					
						
							|  |  |  |         if not todo and timeout_handle is not None: | 
					
						
							|  |  |  |             timeout_handle.cancel() | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-12-09 00:23:48 +02:00
										 |  |  |     async def _wait_for_one(): | 
					
						
							|  |  |  |         f = await done.get() | 
					
						
							| 
									
										
										
										
											2014-02-12 17:58:19 -08:00
										 |  |  |         if f is None: | 
					
						
							|  |  |  |             # Dummy value from _on_timeout(). | 
					
						
							|  |  |  |             raise futures.TimeoutError | 
					
						
							|  |  |  |         return f.result()  # May raise f.exception(). | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2014-02-12 17:58:19 -08:00
										 |  |  |     for f in todo: | 
					
						
							|  |  |  |         f.add_done_callback(_on_completion) | 
					
						
							|  |  |  |     if todo and timeout is not None: | 
					
						
							|  |  |  |         timeout_handle = loop.call_later(timeout, _on_timeout) | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |     for _ in range(len(todo)): | 
					
						
							|  |  |  |         yield _wait_for_one() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-12-09 00:23:48 +02:00
										 |  |  | @types.coroutine | 
					
						
							|  |  |  | def __sleep0(): | 
					
						
							|  |  |  |     """Skip one event loop run cycle.
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     This is a private helper for 'asyncio.sleep()', used | 
					
						
							|  |  |  |     when the 'delay' is set to 0.  It uses a bare 'yield' | 
					
						
							|  |  |  |     expression (which Task._step knows how to handle) | 
					
						
							|  |  |  |     instead of creating a Future object. | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     yield | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | async def sleep(delay, result=None, *, loop=None): | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |     """Coroutine that completes after a given time (in seconds).""" | 
					
						
							| 
									
										
										
										
											2017-12-17 16:41:30 +02:00
										 |  |  |     if delay <= 0: | 
					
						
							| 
									
										
										
										
											2017-12-09 00:23:48 +02:00
										 |  |  |         await __sleep0() | 
					
						
							| 
									
										
										
										
											2015-11-05 14:29:04 -05:00
										 |  |  |         return result | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-05-16 15:38:39 -04:00
										 |  |  |     if loop is None: | 
					
						
							|  |  |  |         loop = events.get_event_loop() | 
					
						
							|  |  |  |     future = loop.create_future() | 
					
						
							| 
									
										
										
										
											2014-07-05 15:29:41 +02:00
										 |  |  |     h = future._loop.call_later(delay, | 
					
						
							| 
									
										
										
										
											2015-11-17 12:19:41 -05:00
										 |  |  |                                 futures._set_result_unless_cancelled, | 
					
						
							|  |  |  |                                 future, result) | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |     try: | 
					
						
							| 
									
										
										
										
											2017-12-09 00:23:48 +02:00
										 |  |  |         return await future | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |     finally: | 
					
						
							|  |  |  |         h.cancel() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-05-11 14:48:38 -04:00
										 |  |  | def ensure_future(coro_or_future, *, loop=None): | 
					
						
							| 
									
										
										
										
											2015-10-02 15:00:19 -04:00
										 |  |  |     """Wrap a coroutine or an awaitable in a future.
 | 
					
						
							| 
									
										
										
										
											2015-05-11 14:48:38 -04:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |     If the argument is a Future, it is returned directly. | 
					
						
							|  |  |  |     """
 | 
					
						
							| 
									
										
										
										
											2016-09-09 14:26:31 -07:00
										 |  |  |     if futures.isfuture(coro_or_future): | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |         if loop is not None and loop is not coro_or_future._loop: | 
					
						
							|  |  |  |             raise ValueError('loop argument must agree with Future') | 
					
						
							|  |  |  |         return coro_or_future | 
					
						
							| 
									
										
										
										
											2014-06-29 00:46:45 +02:00
										 |  |  |     elif coroutines.iscoroutine(coro_or_future): | 
					
						
							| 
									
										
										
										
											2014-07-08 11:29:25 +02:00
										 |  |  |         if loop is None: | 
					
						
							|  |  |  |             loop = events.get_event_loop() | 
					
						
							|  |  |  |         task = loop.create_task(coro_or_future) | 
					
						
							| 
									
										
										
										
											2014-06-27 13:52:20 +02:00
										 |  |  |         if task._source_traceback: | 
					
						
							|  |  |  |             del task._source_traceback[-1] | 
					
						
							|  |  |  |         return task | 
					
						
							| 
									
										
										
										
											2017-11-28 14:43:52 +01:00
										 |  |  |     elif inspect.isawaitable(coro_or_future): | 
					
						
							| 
									
										
										
										
											2015-10-02 15:00:19 -04:00
										 |  |  |         return ensure_future(_wrap_awaitable(coro_or_future), loop=loop) | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |     else: | 
					
						
							| 
									
										
										
										
											2017-04-21 16:49:48 -04:00
										 |  |  |         raise TypeError('An asyncio.Future, a coroutine or an awaitable is ' | 
					
						
							|  |  |  |                         'required') | 
					
						
							| 
									
										
										
										
											2015-10-02 15:00:19 -04:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | @coroutine | 
					
						
							|  |  |  | def _wrap_awaitable(awaitable): | 
					
						
							|  |  |  |     """Helper for asyncio.ensure_future().
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     Wraps awaitable (an object with __await__) into a coroutine | 
					
						
							|  |  |  |     that will later be wrapped in a Task by ensure_future(). | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     return (yield from awaitable.__await__()) | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class _GatheringFuture(futures.Future): | 
					
						
							|  |  |  |     """Helper for gather().
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     This overrides cancel() to cancel all the children and act more | 
					
						
							|  |  |  |     like Task.cancel(), which doesn't immediately mark itself as | 
					
						
							|  |  |  |     cancelled. | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def __init__(self, children, *, loop=None): | 
					
						
							|  |  |  |         super().__init__(loop=loop) | 
					
						
							|  |  |  |         self._children = children | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def cancel(self): | 
					
						
							|  |  |  |         if self.done(): | 
					
						
							|  |  |  |             return False | 
					
						
							| 
									
										
										
										
											2016-10-21 17:22:17 -04:00
										 |  |  |         ret = False | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |         for child in self._children: | 
					
						
							| 
									
										
										
										
											2016-10-21 17:22:17 -04:00
										 |  |  |             if child.cancel(): | 
					
						
							|  |  |  |                 ret = True | 
					
						
							|  |  |  |         return ret | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def gather(*coros_or_futures, loop=None, return_exceptions=False): | 
					
						
							| 
									
										
										
										
											2017-12-19 07:19:53 -05:00
										 |  |  |     """Return a future aggregating results from the given coroutines/futures.
 | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-09-30 08:17:15 -07:00
										 |  |  |     Coroutines will be wrapped in a future and scheduled in the event | 
					
						
							|  |  |  |     loop. They will not necessarily be scheduled in the same order as | 
					
						
							|  |  |  |     passed in. | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |     All futures must share the same event loop.  If all the tasks are | 
					
						
							|  |  |  |     done successfully, the returned future's result is the list of | 
					
						
							|  |  |  |     results (in the order of the original sequence, not necessarily | 
					
						
							| 
									
										
										
										
											2014-02-06 12:03:53 -05:00
										 |  |  |     the order of results arrival).  If *return_exceptions* is True, | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |     exceptions in the tasks are treated the same as successful | 
					
						
							|  |  |  |     results, and gathered in the result list; otherwise, the first | 
					
						
							|  |  |  |     raised exception will be immediately propagated to the returned | 
					
						
							|  |  |  |     future. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     Cancellation: if the outer Future is cancelled, all children (that | 
					
						
							|  |  |  |     have not completed yet) are also cancelled.  If any child is | 
					
						
							|  |  |  |     cancelled, this is treated as if it raised CancelledError -- | 
					
						
							|  |  |  |     the outer Future is *not* cancelled in this case.  (This is to | 
					
						
							|  |  |  |     prevent the cancellation of one child to cause other children to | 
					
						
							|  |  |  |     be cancelled.) | 
					
						
							|  |  |  |     """
 | 
					
						
							| 
									
										
										
										
											2014-07-16 18:36:24 +02:00
										 |  |  |     if not coros_or_futures: | 
					
						
							| 
									
										
										
										
											2016-05-16 15:38:39 -04:00
										 |  |  |         if loop is None: | 
					
						
							|  |  |  |             loop = events.get_event_loop() | 
					
						
							|  |  |  |         outer = loop.create_future() | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |         outer.set_result([]) | 
					
						
							|  |  |  |         return outer | 
					
						
							| 
									
										
										
										
											2014-07-16 18:36:24 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-12-19 07:19:53 -05:00
										 |  |  |     def _done_callback(fut): | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |         nonlocal nfinished | 
					
						
							| 
									
										
										
										
											2017-12-19 07:19:53 -05:00
										 |  |  |         nfinished += 1 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-01-09 01:42:52 +01:00
										 |  |  |         if outer.done(): | 
					
						
							|  |  |  |             if not fut.cancelled(): | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |                 # Mark exception retrieved. | 
					
						
							|  |  |  |                 fut.exception() | 
					
						
							|  |  |  |             return | 
					
						
							| 
									
										
										
										
											2015-01-09 01:42:52 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-12-19 07:19:53 -05:00
										 |  |  |         if not return_exceptions: | 
					
						
							|  |  |  |             if fut.cancelled(): | 
					
						
							|  |  |  |                 # Check if 'fut' is cancelled first, as | 
					
						
							|  |  |  |                 # 'fut.exception()' will *raise* a CancelledError | 
					
						
							|  |  |  |                 # instead of returning it. | 
					
						
							|  |  |  |                 exc = futures.CancelledError() | 
					
						
							|  |  |  |                 outer.set_exception(exc) | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |                 return | 
					
						
							| 
									
										
										
										
											2017-12-19 07:19:53 -05:00
										 |  |  |             else: | 
					
						
							|  |  |  |                 exc = fut.exception() | 
					
						
							|  |  |  |                 if exc is not None: | 
					
						
							|  |  |  |                     outer.set_exception(exc) | 
					
						
							|  |  |  |                     return | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         if nfinished == nfuts: | 
					
						
							|  |  |  |             # All futures are done; create a list of results | 
					
						
							|  |  |  |             # and set it to the 'outer' future. | 
					
						
							|  |  |  |             results = [] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             for fut in children: | 
					
						
							|  |  |  |                 if fut.cancelled(): | 
					
						
							|  |  |  |                     # Check if 'fut' is cancelled first, as | 
					
						
							|  |  |  |                     # 'fut.exception()' will *raise* a CancelledError | 
					
						
							|  |  |  |                     # instead of returning it. | 
					
						
							|  |  |  |                     res = futures.CancelledError() | 
					
						
							|  |  |  |                 else: | 
					
						
							|  |  |  |                     res = fut.exception() | 
					
						
							|  |  |  |                     if res is None: | 
					
						
							|  |  |  |                         res = fut.result() | 
					
						
							|  |  |  |                 results.append(res) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |             outer.set_result(results) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-12-19 07:19:53 -05:00
										 |  |  |     arg_to_fut = {} | 
					
						
							|  |  |  |     children = [] | 
					
						
							|  |  |  |     nfuts = 0 | 
					
						
							|  |  |  |     nfinished = 0 | 
					
						
							|  |  |  |     for arg in coros_or_futures: | 
					
						
							|  |  |  |         if arg not in arg_to_fut: | 
					
						
							|  |  |  |             fut = ensure_future(arg, loop=loop) | 
					
						
							|  |  |  |             if loop is None: | 
					
						
							|  |  |  |                 loop = fut._loop | 
					
						
							|  |  |  |             if fut is not arg: | 
					
						
							|  |  |  |                 # 'arg' was not a Future, therefore, 'fut' is a new | 
					
						
							|  |  |  |                 # Future created specifically for 'arg'.  Since the caller | 
					
						
							|  |  |  |                 # can't control it, disable the "destroy pending task" | 
					
						
							|  |  |  |                 # warning. | 
					
						
							|  |  |  |                 fut._log_destroy_pending = False | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             nfuts += 1 | 
					
						
							|  |  |  |             arg_to_fut[arg] = fut | 
					
						
							|  |  |  |             fut.add_done_callback(_done_callback) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         else: | 
					
						
							|  |  |  |             # There's a duplicate Future object in coros_or_futures. | 
					
						
							|  |  |  |             fut = arg_to_fut[arg] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         children.append(fut) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     outer = _GatheringFuture(children, loop=loop) | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |     return outer | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def shield(arg, *, loop=None): | 
					
						
							|  |  |  |     """Wait for a future, shielding it from cancellation.
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     The statement | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-12-09 00:23:48 +02:00
										 |  |  |         res = await shield(something()) | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  |     is exactly equivalent to the statement | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-12-09 00:23:48 +02:00
										 |  |  |         res = await something() | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  |     *except* that if the coroutine containing it is cancelled, the | 
					
						
							|  |  |  |     task running in something() is not cancelled.  From the POV of | 
					
						
							|  |  |  |     something(), the cancellation did not happen.  But its caller is | 
					
						
							|  |  |  |     still cancelled, so the yield-from expression still raises | 
					
						
							|  |  |  |     CancelledError.  Note: If something() is cancelled by other means | 
					
						
							|  |  |  |     this will still cancel shield(). | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     If you want to completely ignore cancellation (not recommended) | 
					
						
							|  |  |  |     you can combine shield() with a try/except clause, as follows: | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         try: | 
					
						
							| 
									
										
										
										
											2017-12-09 00:23:48 +02:00
										 |  |  |             res = await shield(something()) | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |         except CancelledError: | 
					
						
							|  |  |  |             res = None | 
					
						
							|  |  |  |     """
 | 
					
						
							| 
									
										
										
										
											2015-05-11 14:48:38 -04:00
										 |  |  |     inner = ensure_future(arg, loop=loop) | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |     if inner.done(): | 
					
						
							|  |  |  |         # Shortcut. | 
					
						
							|  |  |  |         return inner | 
					
						
							|  |  |  |     loop = inner._loop | 
					
						
							| 
									
										
										
										
											2016-05-16 15:38:39 -04:00
										 |  |  |     outer = loop.create_future() | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def _done_callback(inner): | 
					
						
							|  |  |  |         if outer.cancelled(): | 
					
						
							| 
									
										
										
										
											2015-01-09 01:42:52 +01:00
										 |  |  |             if not inner.cancelled(): | 
					
						
							|  |  |  |                 # Mark inner's result as retrieved. | 
					
						
							|  |  |  |                 inner.exception() | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |             return | 
					
						
							| 
									
										
										
										
											2015-01-09 01:42:52 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |         if inner.cancelled(): | 
					
						
							|  |  |  |             outer.cancel() | 
					
						
							|  |  |  |         else: | 
					
						
							|  |  |  |             exc = inner.exception() | 
					
						
							|  |  |  |             if exc is not None: | 
					
						
							|  |  |  |                 outer.set_exception(exc) | 
					
						
							|  |  |  |             else: | 
					
						
							|  |  |  |                 outer.set_result(inner.result()) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     inner.add_done_callback(_done_callback) | 
					
						
							|  |  |  |     return outer | 
					
						
							| 
									
										
										
										
											2015-10-03 08:31:42 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def run_coroutine_threadsafe(coro, loop): | 
					
						
							|  |  |  |     """Submit a coroutine object to a given event loop.
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     Return a concurrent.futures.Future to access the result. | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     if not coroutines.iscoroutine(coro): | 
					
						
							|  |  |  |         raise TypeError('A coroutine object is required') | 
					
						
							|  |  |  |     future = concurrent.futures.Future() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def callback(): | 
					
						
							| 
									
										
										
										
											2015-10-05 16:20:00 -07:00
										 |  |  |         try: | 
					
						
							|  |  |  |             futures._chain_future(ensure_future(coro, loop=loop), future) | 
					
						
							|  |  |  |         except Exception as exc: | 
					
						
							|  |  |  |             if future.set_running_or_notify_cancel(): | 
					
						
							|  |  |  |                 future.set_exception(exc) | 
					
						
							|  |  |  |             raise | 
					
						
							| 
									
										
										
										
											2015-10-03 08:31:42 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  |     loop.call_soon_threadsafe(callback) | 
					
						
							|  |  |  |     return future | 
					
						
							| 
									
										
										
										
											2017-12-16 21:58:38 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | # WeakKeyDictionary of {Task: EventLoop} containing all tasks alive. | 
					
						
							|  |  |  | # Task should be a weak reference to remove entry on task garbage | 
					
						
							|  |  |  | # collection, EventLoop is required | 
					
						
							|  |  |  | # to not access to private task._loop attribute. | 
					
						
							|  |  |  | _all_tasks = weakref.WeakKeyDictionary() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | # Dictionary containing tasks that are currently active in | 
					
						
							|  |  |  | # all running event loops.  {EventLoop: Task} | 
					
						
							|  |  |  | _current_tasks = {} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def _register_task(loop, task): | 
					
						
							|  |  |  |     """Register a new task in asyncio as executed by loop.
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     Returns None. | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     _all_tasks[task] = loop | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def _enter_task(loop, task): | 
					
						
							|  |  |  |     current_task = _current_tasks.get(loop) | 
					
						
							|  |  |  |     if current_task is not None: | 
					
						
							|  |  |  |         raise RuntimeError(f"Cannot enter into task {task!r} while another " | 
					
						
							|  |  |  |                            f"task {current_task!r} is being executed.") | 
					
						
							|  |  |  |     _current_tasks[loop] = task | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def _leave_task(loop, task): | 
					
						
							|  |  |  |     current_task = _current_tasks.get(loop) | 
					
						
							|  |  |  |     if current_task is not task: | 
					
						
							|  |  |  |         raise RuntimeError(f"Leaving task {task!r} does not match " | 
					
						
							|  |  |  |                            f"the current task {current_task!r}.") | 
					
						
							|  |  |  |     del _current_tasks[loop] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def _unregister_task(loop, task): | 
					
						
							|  |  |  |     _all_tasks.pop(task, None) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | _py_register_task = _register_task | 
					
						
							|  |  |  | _py_unregister_task = _unregister_task | 
					
						
							|  |  |  | _py_enter_task = _enter_task | 
					
						
							|  |  |  | _py_leave_task = _leave_task | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | try: | 
					
						
							|  |  |  |     from _asyncio import (_register_task, _unregister_task, | 
					
						
							|  |  |  |                           _enter_task, _leave_task, | 
					
						
							|  |  |  |                           _all_tasks, _current_tasks) | 
					
						
							|  |  |  | except ImportError: | 
					
						
							|  |  |  |     pass | 
					
						
							|  |  |  | else: | 
					
						
							|  |  |  |     _c_register_task = _register_task | 
					
						
							|  |  |  |     _c_unregister_task = _unregister_task | 
					
						
							|  |  |  |     _c_enter_task = _enter_task | 
					
						
							|  |  |  |     _c_leave_task = _leave_task |