expand the asyncio.run_coroutine_threadsafe recipes (#127576)

Co-authored-by: Kumar Aditya <kumaraditya@python.org>
This commit is contained in:
Thomas Grainger 2024-12-29 06:22:29 +00:00 committed by GitHub
parent f9a5a3a3ef
commit c9159b7436
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -1067,14 +1067,59 @@ Scheduling From Other Threads
This function is meant to be called from a different OS thread
than the one where the event loop is running. Example::
# Create a coroutine
coro = asyncio.sleep(1, result=3)
def in_thread(loop: asyncio.AbstractEventLoop) -> None:
# Run some blocking IO
pathlib.Path("example.txt").write_text("hello world", encoding="utf8")
# Submit the coroutine to a given loop
future = asyncio.run_coroutine_threadsafe(coro, loop)
# Create a coroutine
coro = asyncio.sleep(1, result=3)
# Wait for the result with an optional timeout argument
assert future.result(timeout) == 3
# Submit the coroutine to a given loop
future = asyncio.run_coroutine_threadsafe(coro, loop)
# Wait for the result with an optional timeout argument
assert future.result(timeout=2) == 3
async def amain() -> None:
# Get the running loop
loop = asyncio.get_running_loop()
# Run something in a thread
await asyncio.to_thread(in_thread, loop)
It's also possible to run the other way around. Example::
@contextlib.contextmanager
def loop_in_thread() -> Generator[asyncio.AbstractEventLoop]:
loop_fut = concurrent.futures.Future[asyncio.AbstractEventLoop]()
stop_event = asyncio.Event()
async def main() -> None:
loop_fut.set_result(asyncio.get_running_loop())
await stop_event.wait()
with concurrent.futures.ThreadPoolExecutor(1) as tpe:
complete_fut = tpe.submit(asyncio.run, main())
for fut in concurrent.futures.as_completed((loop_fut, complete_fut)):
if fut is loop_fut:
loop = loop_fut.result()
try:
yield loop
finally:
loop.call_soon_threadsafe(stop_event.set)
else:
fut.result()
# Create a loop in another thread
with loop_in_thread() as loop:
# Create a coroutine
coro = asyncio.sleep(1, result=3)
# Submit the coroutine to a given loop
future = asyncio.run_coroutine_threadsafe(coro, loop)
# Wait for the result with an optional timeout argument
assert future.result(timeout=2) == 3
If an exception is raised in the coroutine, the returned Future
will be notified. It can also be used to cancel the task in