mirror of
https://github.com/python/cpython.git
synced 2025-10-21 00:44:12 +00:00

This will address the common mistake many asyncio users make: an "except Exception" clause breaking Tasks cancellation. In addition to this change, we stop inheriting asyncio.TimeoutError and asyncio.InvalidStateError from their concurrent.futures.* counterparts. There's no point for these exceptions to share the inheritance chain. In 3.9 we'll focus on implementing supervisors and cancel scopes, which should allow better handling of all exceptions, including SystemExit and KeyboardInterrupt
149 lines
5.8 KiB
Python
149 lines
5.8 KiB
Python
"""Support for running coroutines in parallel with staggered start times."""
|
|
|
|
__all__ = 'staggered_race',
|
|
|
|
import contextlib
|
|
import typing
|
|
|
|
from . import events
|
|
from . import futures
|
|
from . import locks
|
|
from . import tasks
|
|
|
|
|
|
async def staggered_race(
|
|
coro_fns: typing.Iterable[typing.Callable[[], typing.Awaitable]],
|
|
delay: typing.Optional[float],
|
|
*,
|
|
loop: events.AbstractEventLoop = None,
|
|
) -> typing.Tuple[
|
|
typing.Any,
|
|
typing.Optional[int],
|
|
typing.List[typing.Optional[Exception]]
|
|
]:
|
|
"""Run coroutines with staggered start times and take the first to finish.
|
|
|
|
This method takes an iterable of coroutine functions. The first one is
|
|
started immediately. From then on, whenever the immediately preceding one
|
|
fails (raises an exception), or when *delay* seconds has passed, the next
|
|
coroutine is started. This continues until one of the coroutines complete
|
|
successfully, in which case all others are cancelled, or until all
|
|
coroutines fail.
|
|
|
|
The coroutines provided should be well-behaved in the following way:
|
|
|
|
* They should only ``return`` if completed successfully.
|
|
|
|
* They should always raise an exception if they did not complete
|
|
successfully. In particular, if they handle cancellation, they should
|
|
probably reraise, like this::
|
|
|
|
try:
|
|
# do work
|
|
except asyncio.CancelledError:
|
|
# undo partially completed work
|
|
raise
|
|
|
|
Args:
|
|
coro_fns: an iterable of coroutine functions, i.e. callables that
|
|
return a coroutine object when called. Use ``functools.partial`` or
|
|
lambdas to pass arguments.
|
|
|
|
delay: amount of time, in seconds, between starting coroutines. If
|
|
``None``, the coroutines will run sequentially.
|
|
|
|
loop: the event loop to use.
|
|
|
|
Returns:
|
|
tuple *(winner_result, winner_index, exceptions)* where
|
|
|
|
- *winner_result*: the result of the winning coroutine, or ``None``
|
|
if no coroutines won.
|
|
|
|
- *winner_index*: the index of the winning coroutine in
|
|
``coro_fns``, or ``None`` if no coroutines won. If the winning
|
|
coroutine may return None on success, *winner_index* can be used
|
|
to definitively determine whether any coroutine won.
|
|
|
|
- *exceptions*: list of exceptions returned by the coroutines.
|
|
``len(exceptions)`` is equal to the number of coroutines actually
|
|
started, and the order is the same as in ``coro_fns``. The winning
|
|
coroutine's entry is ``None``.
|
|
|
|
"""
|
|
# TODO: when we have aiter() and anext(), allow async iterables in coro_fns.
|
|
loop = loop or events.get_running_loop()
|
|
enum_coro_fns = enumerate(coro_fns)
|
|
winner_result = None
|
|
winner_index = None
|
|
exceptions = []
|
|
running_tasks = []
|
|
|
|
async def run_one_coro(
|
|
previous_failed: typing.Optional[locks.Event]) -> None:
|
|
# Wait for the previous task to finish, or for delay seconds
|
|
if previous_failed is not None:
|
|
with contextlib.suppress(futures.TimeoutError):
|
|
# Use asyncio.wait_for() instead of asyncio.wait() here, so
|
|
# that if we get cancelled at this point, Event.wait() is also
|
|
# cancelled, otherwise there will be a "Task destroyed but it is
|
|
# pending" later.
|
|
await tasks.wait_for(previous_failed.wait(), delay)
|
|
# Get the next coroutine to run
|
|
try:
|
|
this_index, coro_fn = next(enum_coro_fns)
|
|
except StopIteration:
|
|
return
|
|
# Start task that will run the next coroutine
|
|
this_failed = locks.Event()
|
|
next_task = loop.create_task(run_one_coro(this_failed))
|
|
running_tasks.append(next_task)
|
|
assert len(running_tasks) == this_index + 2
|
|
# Prepare place to put this coroutine's exceptions if not won
|
|
exceptions.append(None)
|
|
assert len(exceptions) == this_index + 1
|
|
|
|
try:
|
|
result = await coro_fn()
|
|
except (SystemExit, KeyboardInterrupt):
|
|
raise
|
|
except BaseException as e:
|
|
exceptions[this_index] = e
|
|
this_failed.set() # Kickstart the next coroutine
|
|
else:
|
|
# Store winner's results
|
|
nonlocal winner_index, winner_result
|
|
assert winner_index is None
|
|
winner_index = this_index
|
|
winner_result = result
|
|
# Cancel all other tasks. We take care to not cancel the current
|
|
# task as well. If we do so, then since there is no `await` after
|
|
# here and CancelledError are usually thrown at one, we will
|
|
# encounter a curious corner case where the current task will end
|
|
# up as done() == True, cancelled() == False, exception() ==
|
|
# asyncio.CancelledError. This behavior is specified in
|
|
# https://bugs.python.org/issue30048
|
|
for i, t in enumerate(running_tasks):
|
|
if i != this_index:
|
|
t.cancel()
|
|
|
|
first_task = loop.create_task(run_one_coro(None))
|
|
running_tasks.append(first_task)
|
|
try:
|
|
# Wait for a growing list of tasks to all finish: poor man's version of
|
|
# curio's TaskGroup or trio's nursery
|
|
done_count = 0
|
|
while done_count != len(running_tasks):
|
|
done, _ = await tasks.wait(running_tasks)
|
|
done_count = len(done)
|
|
# If run_one_coro raises an unhandled exception, it's probably a
|
|
# programming error, and I want to see it.
|
|
if __debug__:
|
|
for d in done:
|
|
if d.done() and not d.cancelled() and d.exception():
|
|
raise d.exception()
|
|
return winner_result, winner_index, exceptions
|
|
finally:
|
|
# Make sure no tasks are left running if we leave this function
|
|
for t in running_tasks:
|
|
t.cancel()
|