mirror of
				https://github.com/python/cpython.git
				synced 2025-10-26 11:14:33 +00:00 
			
		
		
		
	
		
			
	
	
		
			148 lines
		
	
	
	
		
			5.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
		
		
			
		
	
	
			148 lines
		
	
	
	
		
			5.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
|   | """Support for running coroutines in parallel with staggered start times.""" | ||
|  | 
 | ||
|  | __all__ = 'staggered_race', | ||
|  | 
 | ||
|  | import contextlib | ||
|  | import typing | ||
|  | 
 | ||
|  | from . import events | ||
|  | from . import futures | ||
|  | from . import locks | ||
|  | from . import tasks | ||
|  | 
 | ||
|  | 
 | ||
|  | async def staggered_race( | ||
|  |         coro_fns: typing.Iterable[typing.Callable[[], typing.Awaitable]], | ||
|  |         delay: typing.Optional[float], | ||
|  |         *, | ||
|  |         loop: events.AbstractEventLoop = None, | ||
|  | ) -> typing.Tuple[ | ||
|  |     typing.Any, | ||
|  |     typing.Optional[int], | ||
|  |     typing.List[typing.Optional[Exception]] | ||
|  | ]: | ||
|  |     """Run coroutines with staggered start times and take the first to finish.
 | ||
|  | 
 | ||
|  |     This method takes an iterable of coroutine functions. The first one is | ||
|  |     started immediately. From then on, whenever the immediately preceding one | ||
|  |     fails (raises an exception), or when *delay* seconds has passed, the next | ||
|  |     coroutine is started. This continues until one of the coroutines complete | ||
|  |     successfully, in which case all others are cancelled, or until all | ||
|  |     coroutines fail. | ||
|  | 
 | ||
|  |     The coroutines provided should be well-behaved in the following way: | ||
|  | 
 | ||
|  |     * They should only ``return`` if completed successfully. | ||
|  | 
 | ||
|  |     * They should always raise an exception if they did not complete | ||
|  |       successfully. In particular, if they handle cancellation, they should | ||
|  |       probably reraise, like this:: | ||
|  | 
 | ||
|  |         try: | ||
|  |             # do work | ||
|  |         except asyncio.CancelledError: | ||
|  |             # undo partially completed work | ||
|  |             raise | ||
|  | 
 | ||
|  |     Args: | ||
|  |         coro_fns: an iterable of coroutine functions, i.e. callables that | ||
|  |             return a coroutine object when called. Use ``functools.partial`` or | ||
|  |             lambdas to pass arguments. | ||
|  | 
 | ||
|  |         delay: amount of time, in seconds, between starting coroutines. If | ||
|  |             ``None``, the coroutines will run sequentially. | ||
|  | 
 | ||
|  |         loop: the event loop to use. | ||
|  | 
 | ||
|  |     Returns: | ||
|  |         tuple *(winner_result, winner_index, exceptions)* where | ||
|  | 
 | ||
|  |         - *winner_result*: the result of the winning coroutine, or ``None`` | ||
|  |           if no coroutines won. | ||
|  | 
 | ||
|  |         - *winner_index*: the index of the winning coroutine in | ||
|  |           ``coro_fns``, or ``None`` if no coroutines won. If the winning | ||
|  |           coroutine may return None on success, *winner_index* can be used | ||
|  |           to definitively determine whether any coroutine won. | ||
|  | 
 | ||
|  |         - *exceptions*: list of exceptions returned by the coroutines. | ||
|  |           ``len(exceptions)`` is equal to the number of coroutines actually | ||
|  |           started, and the order is the same as in ``coro_fns``. The winning | ||
|  |           coroutine's entry is ``None``. | ||
|  | 
 | ||
|  |     """
 | ||
|  |     # TODO: when we have aiter() and anext(), allow async iterables in coro_fns. | ||
|  |     loop = loop or events.get_running_loop() | ||
|  |     enum_coro_fns = enumerate(coro_fns) | ||
|  |     winner_result = None | ||
|  |     winner_index = None | ||
|  |     exceptions = [] | ||
|  |     running_tasks = [] | ||
|  | 
 | ||
|  |     async def run_one_coro( | ||
|  |             previous_failed: typing.Optional[locks.Event]) -> None: | ||
|  |         # Wait for the previous task to finish, or for delay seconds | ||
|  |         if previous_failed is not None: | ||
|  |             with contextlib.suppress(futures.TimeoutError): | ||
|  |                 # Use asyncio.wait_for() instead of asyncio.wait() here, so | ||
|  |                 # that if we get cancelled at this point, Event.wait() is also | ||
|  |                 # cancelled, otherwise there will be a "Task destroyed but it is | ||
|  |                 # pending" later. | ||
|  |                 await tasks.wait_for(previous_failed.wait(), delay) | ||
|  |         # Get the next coroutine to run | ||
|  |         try: | ||
|  |             this_index, coro_fn = next(enum_coro_fns) | ||
|  |         except StopIteration: | ||
|  |             return | ||
|  |         # Start task that will run the next coroutine | ||
|  |         this_failed = locks.Event() | ||
|  |         next_task = loop.create_task(run_one_coro(this_failed)) | ||
|  |         running_tasks.append(next_task) | ||
|  |         assert len(running_tasks) == this_index + 2 | ||
|  |         # Prepare place to put this coroutine's exceptions if not won | ||
|  |         exceptions.append(None) | ||
|  |         assert len(exceptions) == this_index + 1 | ||
|  | 
 | ||
|  |         try: | ||
|  |             result = await coro_fn() | ||
|  |         except Exception as e: | ||
|  |             exceptions[this_index] = e | ||
|  |             this_failed.set()  # Kickstart the next coroutine | ||
|  |         else: | ||
|  |             # Store winner's results | ||
|  |             nonlocal winner_index, winner_result | ||
|  |             assert winner_index is None | ||
|  |             winner_index = this_index | ||
|  |             winner_result = result | ||
|  |             # Cancel all other tasks. We take care to not cancel the current | ||
|  |             # task as well. If we do so, then since there is no `await` after | ||
|  |             # here and CancelledError are usually thrown at one, we will | ||
|  |             # encounter a curious corner case where the current task will end | ||
|  |             # up as done() == True, cancelled() == False, exception() == | ||
|  |             # asyncio.CancelledError. This behavior is specified in | ||
|  |             # https://bugs.python.org/issue30048 | ||
|  |             for i, t in enumerate(running_tasks): | ||
|  |                 if i != this_index: | ||
|  |                     t.cancel() | ||
|  | 
 | ||
|  |     first_task = loop.create_task(run_one_coro(None)) | ||
|  |     running_tasks.append(first_task) | ||
|  |     try: | ||
|  |         # Wait for a growing list of tasks to all finish: poor man's version of | ||
|  |         # curio's TaskGroup or trio's nursery | ||
|  |         done_count = 0 | ||
|  |         while done_count != len(running_tasks): | ||
|  |             done, _ = await tasks.wait(running_tasks) | ||
|  |             done_count = len(done) | ||
|  |             # If run_one_coro raises an unhandled exception, it's probably a | ||
|  |             # programming error, and I want to see it. | ||
|  |             if __debug__: | ||
|  |                 for d in done: | ||
|  |                     if d.done() and not d.cancelled() and d.exception(): | ||
|  |                         raise d.exception() | ||
|  |         return winner_result, winner_index, exceptions | ||
|  |     finally: | ||
|  |         # Make sure no tasks are left running if we leave this function | ||
|  |         for t in running_tasks: | ||
|  |             t.cancel() |