| 
									
										
										
										
											2019-05-05 19:14:35 +08:00
										 |  |  | """Support for running coroutines in parallel with staggered start times.""" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | __all__ = 'staggered_race', | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import contextlib | 
					
						
							|  |  |  | import typing | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | from . import events | 
					
						
							| 
									
										
										
										
											2019-12-24 12:46:42 +02:00
										 |  |  | from . import exceptions as exceptions_mod | 
					
						
							| 
									
										
										
										
											2019-05-05 19:14:35 +08:00
										 |  |  | from . import locks | 
					
						
							|  |  |  | from . import tasks | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | async def staggered_race( | 
					
						
							|  |  |  |         coro_fns: typing.Iterable[typing.Callable[[], typing.Awaitable]], | 
					
						
							|  |  |  |         delay: typing.Optional[float], | 
					
						
							|  |  |  |         *, | 
					
						
							|  |  |  |         loop: events.AbstractEventLoop = None, | 
					
						
							|  |  |  | ) -> typing.Tuple[ | 
					
						
							|  |  |  |     typing.Any, | 
					
						
							|  |  |  |     typing.Optional[int], | 
					
						
							|  |  |  |     typing.List[typing.Optional[Exception]] | 
					
						
							|  |  |  | ]: | 
					
						
							|  |  |  |     """Run coroutines with staggered start times and take the first to finish.
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     This method takes an iterable of coroutine functions. The first one is | 
					
						
							|  |  |  |     started immediately. From then on, whenever the immediately preceding one | 
					
						
							|  |  |  |     fails (raises an exception), or when *delay* seconds has passed, the next | 
					
						
							|  |  |  |     coroutine is started. This continues until one of the coroutines complete | 
					
						
							|  |  |  |     successfully, in which case all others are cancelled, or until all | 
					
						
							|  |  |  |     coroutines fail. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     The coroutines provided should be well-behaved in the following way: | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     * They should only ``return`` if completed successfully. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     * They should always raise an exception if they did not complete | 
					
						
							|  |  |  |       successfully. In particular, if they handle cancellation, they should | 
					
						
							|  |  |  |       probably reraise, like this:: | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         try: | 
					
						
							|  |  |  |             # do work | 
					
						
							|  |  |  |         except asyncio.CancelledError: | 
					
						
							|  |  |  |             # undo partially completed work | 
					
						
							|  |  |  |             raise | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     Args: | 
					
						
							|  |  |  |         coro_fns: an iterable of coroutine functions, i.e. callables that | 
					
						
							|  |  |  |             return a coroutine object when called. Use ``functools.partial`` or | 
					
						
							|  |  |  |             lambdas to pass arguments. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         delay: amount of time, in seconds, between starting coroutines. If | 
					
						
							|  |  |  |             ``None``, the coroutines will run sequentially. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         loop: the event loop to use. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     Returns: | 
					
						
							|  |  |  |         tuple *(winner_result, winner_index, exceptions)* where | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         - *winner_result*: the result of the winning coroutine, or ``None`` | 
					
						
							|  |  |  |           if no coroutines won. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         - *winner_index*: the index of the winning coroutine in | 
					
						
							|  |  |  |           ``coro_fns``, or ``None`` if no coroutines won. If the winning | 
					
						
							|  |  |  |           coroutine may return None on success, *winner_index* can be used | 
					
						
							|  |  |  |           to definitively determine whether any coroutine won. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         - *exceptions*: list of exceptions returned by the coroutines. | 
					
						
							|  |  |  |           ``len(exceptions)`` is equal to the number of coroutines actually | 
					
						
							|  |  |  |           started, and the order is the same as in ``coro_fns``. The winning | 
					
						
							|  |  |  |           coroutine's entry is ``None``. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     # TODO: when we have aiter() and anext(), allow async iterables in coro_fns. | 
					
						
							|  |  |  |     loop = loop or events.get_running_loop() | 
					
						
							|  |  |  |     enum_coro_fns = enumerate(coro_fns) | 
					
						
							|  |  |  |     winner_result = None | 
					
						
							|  |  |  |     winner_index = None | 
					
						
							|  |  |  |     exceptions = [] | 
					
						
							|  |  |  |     running_tasks = [] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     async def run_one_coro( | 
					
						
							|  |  |  |             previous_failed: typing.Optional[locks.Event]) -> None: | 
					
						
							|  |  |  |         # Wait for the previous task to finish, or for delay seconds | 
					
						
							|  |  |  |         if previous_failed is not None: | 
					
						
							| 
									
										
										
										
											2019-12-24 12:46:42 +02:00
										 |  |  |             with contextlib.suppress(exceptions_mod.TimeoutError): | 
					
						
							| 
									
										
										
										
											2019-05-05 19:14:35 +08:00
										 |  |  |                 # Use asyncio.wait_for() instead of asyncio.wait() here, so | 
					
						
							|  |  |  |                 # that if we get cancelled at this point, Event.wait() is also | 
					
						
							|  |  |  |                 # cancelled, otherwise there will be a "Task destroyed but it is | 
					
						
							|  |  |  |                 # pending" later. | 
					
						
							|  |  |  |                 await tasks.wait_for(previous_failed.wait(), delay) | 
					
						
							|  |  |  |         # Get the next coroutine to run | 
					
						
							|  |  |  |         try: | 
					
						
							|  |  |  |             this_index, coro_fn = next(enum_coro_fns) | 
					
						
							|  |  |  |         except StopIteration: | 
					
						
							|  |  |  |             return | 
					
						
							|  |  |  |         # Start task that will run the next coroutine | 
					
						
							|  |  |  |         this_failed = locks.Event() | 
					
						
							|  |  |  |         next_task = loop.create_task(run_one_coro(this_failed)) | 
					
						
							|  |  |  |         running_tasks.append(next_task) | 
					
						
							|  |  |  |         assert len(running_tasks) == this_index + 2 | 
					
						
							|  |  |  |         # Prepare place to put this coroutine's exceptions if not won | 
					
						
							|  |  |  |         exceptions.append(None) | 
					
						
							|  |  |  |         assert len(exceptions) == this_index + 1 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         try: | 
					
						
							|  |  |  |             result = await coro_fn() | 
					
						
							| 
									
										
										
										
											2019-05-27 14:45:12 +02:00
										 |  |  |         except (SystemExit, KeyboardInterrupt): | 
					
						
							|  |  |  |             raise | 
					
						
							|  |  |  |         except BaseException as e: | 
					
						
							| 
									
										
										
										
											2019-05-05 19:14:35 +08:00
										 |  |  |             exceptions[this_index] = e | 
					
						
							|  |  |  |             this_failed.set()  # Kickstart the next coroutine | 
					
						
							|  |  |  |         else: | 
					
						
							|  |  |  |             # Store winner's results | 
					
						
							|  |  |  |             nonlocal winner_index, winner_result | 
					
						
							|  |  |  |             assert winner_index is None | 
					
						
							|  |  |  |             winner_index = this_index | 
					
						
							|  |  |  |             winner_result = result | 
					
						
							|  |  |  |             # Cancel all other tasks. We take care to not cancel the current | 
					
						
							|  |  |  |             # task as well. If we do so, then since there is no `await` after | 
					
						
							|  |  |  |             # here and CancelledError are usually thrown at one, we will | 
					
						
							|  |  |  |             # encounter a curious corner case where the current task will end | 
					
						
							|  |  |  |             # up as done() == True, cancelled() == False, exception() == | 
					
						
							|  |  |  |             # asyncio.CancelledError. This behavior is specified in | 
					
						
							|  |  |  |             # https://bugs.python.org/issue30048 | 
					
						
							|  |  |  |             for i, t in enumerate(running_tasks): | 
					
						
							|  |  |  |                 if i != this_index: | 
					
						
							|  |  |  |                     t.cancel() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     first_task = loop.create_task(run_one_coro(None)) | 
					
						
							|  |  |  |     running_tasks.append(first_task) | 
					
						
							|  |  |  |     try: | 
					
						
							|  |  |  |         # Wait for a growing list of tasks to all finish: poor man's version of | 
					
						
							|  |  |  |         # curio's TaskGroup or trio's nursery | 
					
						
							|  |  |  |         done_count = 0 | 
					
						
							|  |  |  |         while done_count != len(running_tasks): | 
					
						
							|  |  |  |             done, _ = await tasks.wait(running_tasks) | 
					
						
							|  |  |  |             done_count = len(done) | 
					
						
							|  |  |  |             # If run_one_coro raises an unhandled exception, it's probably a | 
					
						
							|  |  |  |             # programming error, and I want to see it. | 
					
						
							|  |  |  |             if __debug__: | 
					
						
							|  |  |  |                 for d in done: | 
					
						
							|  |  |  |                     if d.done() and not d.cancelled() and d.exception(): | 
					
						
							|  |  |  |                         raise d.exception() | 
					
						
							|  |  |  |         return winner_result, winner_index, exceptions | 
					
						
							|  |  |  |     finally: | 
					
						
							|  |  |  |         # Make sure no tasks are left running if we leave this function | 
					
						
							|  |  |  |         for t in running_tasks: | 
					
						
							|  |  |  |             t.cancel() |