mirror of
				https://github.com/python/cpython.git
				synced 2025-10-31 13:41:24 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			149 lines
		
	
	
	
		
			5.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			149 lines
		
	
	
	
		
			5.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| """Support for running coroutines in parallel with staggered start times."""
 | |
| 
 | |
| __all__ = 'staggered_race',
 | |
| 
 | |
| import contextlib
 | |
| import typing
 | |
| 
 | |
| from . import events
 | |
| from . import exceptions as exceptions_mod
 | |
| from . import locks
 | |
| from . import tasks
 | |
| 
 | |
| 
 | |
| async def staggered_race(
 | |
|         coro_fns: typing.Iterable[typing.Callable[[], typing.Awaitable]],
 | |
|         delay: typing.Optional[float],
 | |
|         *,
 | |
|         loop: events.AbstractEventLoop = None,
 | |
| ) -> typing.Tuple[
 | |
|     typing.Any,
 | |
|     typing.Optional[int],
 | |
|     typing.List[typing.Optional[Exception]]
 | |
| ]:
 | |
|     """Run coroutines with staggered start times and take the first to finish.
 | |
| 
 | |
|     This method takes an iterable of coroutine functions. The first one is
 | |
|     started immediately. From then on, whenever the immediately preceding one
 | |
|     fails (raises an exception), or when *delay* seconds has passed, the next
 | |
|     coroutine is started. This continues until one of the coroutines complete
 | |
|     successfully, in which case all others are cancelled, or until all
 | |
|     coroutines fail.
 | |
| 
 | |
|     The coroutines provided should be well-behaved in the following way:
 | |
| 
 | |
|     * They should only ``return`` if completed successfully.
 | |
| 
 | |
|     * They should always raise an exception if they did not complete
 | |
|       successfully. In particular, if they handle cancellation, they should
 | |
|       probably reraise, like this::
 | |
| 
 | |
|         try:
 | |
|             # do work
 | |
|         except asyncio.CancelledError:
 | |
|             # undo partially completed work
 | |
|             raise
 | |
| 
 | |
|     Args:
 | |
|         coro_fns: an iterable of coroutine functions, i.e. callables that
 | |
|             return a coroutine object when called. Use ``functools.partial`` or
 | |
|             lambdas to pass arguments.
 | |
| 
 | |
|         delay: amount of time, in seconds, between starting coroutines. If
 | |
|             ``None``, the coroutines will run sequentially.
 | |
| 
 | |
|         loop: the event loop to use.
 | |
| 
 | |
|     Returns:
 | |
|         tuple *(winner_result, winner_index, exceptions)* where
 | |
| 
 | |
|         - *winner_result*: the result of the winning coroutine, or ``None``
 | |
|           if no coroutines won.
 | |
| 
 | |
|         - *winner_index*: the index of the winning coroutine in
 | |
|           ``coro_fns``, or ``None`` if no coroutines won. If the winning
 | |
|           coroutine may return None on success, *winner_index* can be used
 | |
|           to definitively determine whether any coroutine won.
 | |
| 
 | |
|         - *exceptions*: list of exceptions returned by the coroutines.
 | |
|           ``len(exceptions)`` is equal to the number of coroutines actually
 | |
|           started, and the order is the same as in ``coro_fns``. The winning
 | |
|           coroutine's entry is ``None``.
 | |
| 
 | |
|     """
 | |
|     # TODO: when we have aiter() and anext(), allow async iterables in coro_fns.
 | |
|     loop = loop or events.get_running_loop()
 | |
|     enum_coro_fns = enumerate(coro_fns)
 | |
|     winner_result = None
 | |
|     winner_index = None
 | |
|     exceptions = []
 | |
|     running_tasks = []
 | |
| 
 | |
|     async def run_one_coro(
 | |
|             previous_failed: typing.Optional[locks.Event]) -> None:
 | |
|         # Wait for the previous task to finish, or for delay seconds
 | |
|         if previous_failed is not None:
 | |
|             with contextlib.suppress(exceptions_mod.TimeoutError):
 | |
|                 # Use asyncio.wait_for() instead of asyncio.wait() here, so
 | |
|                 # that if we get cancelled at this point, Event.wait() is also
 | |
|                 # cancelled, otherwise there will be a "Task destroyed but it is
 | |
|                 # pending" later.
 | |
|                 await tasks.wait_for(previous_failed.wait(), delay)
 | |
|         # Get the next coroutine to run
 | |
|         try:
 | |
|             this_index, coro_fn = next(enum_coro_fns)
 | |
|         except StopIteration:
 | |
|             return
 | |
|         # Start task that will run the next coroutine
 | |
|         this_failed = locks.Event()
 | |
|         next_task = loop.create_task(run_one_coro(this_failed))
 | |
|         running_tasks.append(next_task)
 | |
|         assert len(running_tasks) == this_index + 2
 | |
|         # Prepare place to put this coroutine's exceptions if not won
 | |
|         exceptions.append(None)
 | |
|         assert len(exceptions) == this_index + 1
 | |
| 
 | |
|         try:
 | |
|             result = await coro_fn()
 | |
|         except (SystemExit, KeyboardInterrupt):
 | |
|             raise
 | |
|         except BaseException as e:
 | |
|             exceptions[this_index] = e
 | |
|             this_failed.set()  # Kickstart the next coroutine
 | |
|         else:
 | |
|             # Store winner's results
 | |
|             nonlocal winner_index, winner_result
 | |
|             assert winner_index is None
 | |
|             winner_index = this_index
 | |
|             winner_result = result
 | |
|             # Cancel all other tasks. We take care to not cancel the current
 | |
|             # task as well. If we do so, then since there is no `await` after
 | |
|             # here and CancelledError are usually thrown at one, we will
 | |
|             # encounter a curious corner case where the current task will end
 | |
|             # up as done() == True, cancelled() == False, exception() ==
 | |
|             # asyncio.CancelledError. This behavior is specified in
 | |
|             # https://bugs.python.org/issue30048
 | |
|             for i, t in enumerate(running_tasks):
 | |
|                 if i != this_index:
 | |
|                     t.cancel()
 | |
| 
 | |
|     first_task = loop.create_task(run_one_coro(None))
 | |
|     running_tasks.append(first_task)
 | |
|     try:
 | |
|         # Wait for a growing list of tasks to all finish: poor man's version of
 | |
|         # curio's TaskGroup or trio's nursery
 | |
|         done_count = 0
 | |
|         while done_count != len(running_tasks):
 | |
|             done, _ = await tasks.wait(running_tasks)
 | |
|             done_count = len(done)
 | |
|             # If run_one_coro raises an unhandled exception, it's probably a
 | |
|             # programming error, and I want to see it.
 | |
|             if __debug__:
 | |
|                 for d in done:
 | |
|                     if d.done() and not d.cancelled() and d.exception():
 | |
|                         raise d.exception()
 | |
|         return winner_result, winner_index, exceptions
 | |
|     finally:
 | |
|         # Make sure no tasks are left running if we leave this function
 | |
|         for t in running_tasks:
 | |
|             t.cancel()
 | 
