bpo-36373: Deprecate explicit loop in task and subprocess API (GH-16033)

This commit is contained in:
Andrew Svetlov 2019-09-12 15:40:40 +03:00 committed by GitHub
parent 3ab61473ba
commit a488879cba
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 178 additions and 101 deletions

View file

@ -71,6 +71,10 @@ Creating Subprocesses
See the documentation of :meth:`loop.subprocess_exec` for other See the documentation of :meth:`loop.subprocess_exec` for other
parameters. parameters.
.. deprecated-removed:: 3.8 3.10
The *loop* parameter.
.. coroutinefunction:: create_subprocess_shell(cmd, stdin=None, \ .. coroutinefunction:: create_subprocess_shell(cmd, stdin=None, \
stdout=None, stderr=None, loop=None, \ stdout=None, stderr=None, loop=None, \
limit=None, \*\*kwds) limit=None, \*\*kwds)
@ -95,6 +99,10 @@ Creating Subprocesses
escape whitespace and special shell characters in strings that are going escape whitespace and special shell characters in strings that are going
to be used to construct shell commands. to be used to construct shell commands.
.. deprecated-removed:: 3.8 3.10
The *loop* parameter.
.. note:: .. note::
The default asyncio event loop implementation on **Windows** does not The default asyncio event loop implementation on **Windows** does not

View file

@ -334,6 +334,9 @@ Running Tasks Concurrently
cancellation of one submitted Task/Future to cause other cancellation of one submitted Task/Future to cause other
Tasks/Futures to be cancelled. Tasks/Futures to be cancelled.
.. deprecated-removed:: 3.8 3.10
The *loop* parameter.
.. _asyncio_example_gather: .. _asyncio_example_gather:
Example:: Example::
@ -411,6 +414,9 @@ Shielding From Cancellation
except CancelledError: except CancelledError:
res = None res = None
.. deprecated-removed:: 3.8 3.10
The *loop* parameter.
Timeouts Timeouts
======== ========
@ -478,22 +484,12 @@ Waiting Primitives
set concurrently and block until the condition specified set concurrently and block until the condition specified
by *return_when*. by *return_when*.
.. deprecated:: 3.8
If any awaitable in *aws* is a coroutine, it is automatically
scheduled as a Task. Passing coroutines objects to
``wait()`` directly is deprecated as it leads to
:ref:`confusing behavior <asyncio_example_wait_coroutine>`.
Returns two sets of Tasks/Futures: ``(done, pending)``. Returns two sets of Tasks/Futures: ``(done, pending)``.
Usage:: Usage::
done, pending = await asyncio.wait(aws) done, pending = await asyncio.wait(aws)
.. deprecated-removed:: 3.8 3.10
The *loop* parameter.
*timeout* (a float or int), if specified, can be used to control *timeout* (a float or int), if specified, can be used to control
the maximum number of seconds to wait before returning. the maximum number of seconds to wait before returning.
@ -525,6 +521,17 @@ Waiting Primitives
Unlike :func:`~asyncio.wait_for`, ``wait()`` does not cancel the Unlike :func:`~asyncio.wait_for`, ``wait()`` does not cancel the
futures when a timeout occurs. futures when a timeout occurs.
.. deprecated:: 3.8
If any awaitable in *aws* is a coroutine, it is automatically
scheduled as a Task. Passing coroutines objects to
``wait()`` directly is deprecated as it leads to
:ref:`confusing behavior <asyncio_example_wait_coroutine>`.
.. deprecated-removed:: 3.8 3.10
The *loop* parameter.
.. _asyncio_example_wait_coroutine: .. _asyncio_example_wait_coroutine:
.. note:: .. note::
@ -568,6 +575,9 @@ Waiting Primitives
Raises :exc:`asyncio.TimeoutError` if the timeout occurs before Raises :exc:`asyncio.TimeoutError` if the timeout occurs before
all Futures are done. all Futures are done.
.. deprecated-removed:: 3.8 3.10
The *loop* parameter.
Example:: Example::
for f in as_completed(aws): for f in as_completed(aws):
@ -694,6 +704,9 @@ Task Object
.. versionchanged:: 3.8 .. versionchanged:: 3.8
Added the ``name`` parameter. Added the ``name`` parameter.
.. deprecated-removed:: 3.8 3.10
The *loop* parameter.
.. method:: cancel() .. method:: cancel()
Request the Task to be cancelled. Request the Task to be cancelled.

View file

@ -224,6 +224,13 @@ async def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None,
**kwds): **kwds):
if loop is None: if loop is None:
loop = events.get_event_loop() loop = events.get_event_loop()
else:
warnings.warn("The loop argument is deprecated since Python 3.8 "
"and scheduled for removal in Python 3.10.",
DeprecationWarning,
stacklevel=2
)
protocol_factory = lambda: SubprocessStreamProtocol(limit=limit, protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
loop=loop, loop=loop,
_asyncio_internal=True) _asyncio_internal=True)
@ -239,6 +246,12 @@ async def create_subprocess_exec(program, *args, stdin=None, stdout=None,
limit=streams._DEFAULT_LIMIT, **kwds): limit=streams._DEFAULT_LIMIT, **kwds):
if loop is None: if loop is None:
loop = events.get_event_loop() loop = events.get_event_loop()
else:
warnings.warn("The loop argument is deprecated since Python 3.8 "
"and scheduled for removal in Python 3.10.",
DeprecationWarning,
stacklevel=2
)
protocol_factory = lambda: SubprocessStreamProtocol(limit=limit, protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
loop=loop, loop=loop,
_asyncio_internal=True) _asyncio_internal=True)

View file

@ -573,10 +573,17 @@ def as_completed(fs, *, loop=None, timeout=None):
""" """
if futures.isfuture(fs) or coroutines.iscoroutine(fs): if futures.isfuture(fs) or coroutines.iscoroutine(fs):
raise TypeError(f"expect a list of futures, not {type(fs).__name__}") raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
loop = loop if loop is not None else events.get_event_loop()
todo = {ensure_future(f, loop=loop) for f in set(fs)}
from .queues import Queue # Import here to avoid circular import problem. from .queues import Queue # Import here to avoid circular import problem.
done = Queue(loop=loop) done = Queue(loop=loop)
if loop is None:
loop = events.get_event_loop()
else:
warnings.warn("The loop argument is deprecated since Python 3.8, "
"and scheduled for removal in Python 3.10.",
DeprecationWarning, stacklevel=2)
todo = {ensure_future(f, loop=loop) for f in set(fs)}
timeout_handle = None timeout_handle = None
def _on_timeout(): def _on_timeout():
@ -733,6 +740,10 @@ def gather(*coros_or_futures, loop=None, return_exceptions=False):
if not coros_or_futures: if not coros_or_futures:
if loop is None: if loop is None:
loop = events.get_event_loop() loop = events.get_event_loop()
else:
warnings.warn("The loop argument is deprecated since Python 3.8, "
"and scheduled for removal in Python 3.10.",
DeprecationWarning, stacklevel=2)
outer = loop.create_future() outer = loop.create_future()
outer.set_result([]) outer.set_result([])
return outer return outer
@ -842,6 +853,10 @@ def shield(arg, *, loop=None):
except CancelledError: except CancelledError:
res = None res = None
""" """
if loop is not None:
warnings.warn("The loop argument is deprecated since Python 3.8, "
"and scheduled for removal in Python 3.10.",
DeprecationWarning, stacklevel=2)
inner = ensure_future(arg, loop=loop) inner = ensure_future(arg, loop=loop)
if inner.done(): if inner.done():
# Shortcut. # Shortcut.

View file

@ -855,9 +855,10 @@ def test_read_all_from_pipe_reader(self):
watcher.attach_loop(self.loop) watcher.attach_loop(self.loop)
try: try:
asyncio.set_child_watcher(watcher) asyncio.set_child_watcher(watcher)
create = asyncio.create_subprocess_exec(*args, create = asyncio.create_subprocess_exec(
*args,
pass_fds={wfd}, pass_fds={wfd},
loop=self.loop) )
proc = self.loop.run_until_complete(create) proc = self.loop.run_until_complete(create)
self.loop.run_until_complete(proc.wait()) self.loop.run_until_complete(proc.wait())
finally: finally:

View file

@ -112,7 +112,7 @@ async def run(data):
*args, *args,
stdin=subprocess.PIPE, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
loop=self.loop) )
# feed data # feed data
proc.stdin.write(data) proc.stdin.write(data)
@ -138,7 +138,7 @@ async def run(data):
*args, *args,
stdin=subprocess.PIPE, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
loop=self.loop) )
stdout, stderr = await proc.communicate(data) stdout, stderr = await proc.communicate(data)
return proc.returncode, stdout return proc.returncode, stdout
@ -149,25 +149,28 @@ async def run(data):
self.assertEqual(stdout, b'some data') self.assertEqual(stdout, b'some data')
def test_shell(self): def test_shell(self):
create = asyncio.create_subprocess_shell('exit 7', proc = self.loop.run_until_complete(
loop=self.loop) asyncio.create_subprocess_shell('exit 7')
proc = self.loop.run_until_complete(create) )
exitcode = self.loop.run_until_complete(proc.wait()) exitcode = self.loop.run_until_complete(proc.wait())
self.assertEqual(exitcode, 7) self.assertEqual(exitcode, 7)
def test_start_new_session(self): def test_start_new_session(self):
# start the new process in a new session # start the new process in a new session
create = asyncio.create_subprocess_shell('exit 8', proc = self.loop.run_until_complete(
asyncio.create_subprocess_shell(
'exit 8',
start_new_session=True, start_new_session=True,
loop=self.loop) )
proc = self.loop.run_until_complete(create) )
exitcode = self.loop.run_until_complete(proc.wait()) exitcode = self.loop.run_until_complete(proc.wait())
self.assertEqual(exitcode, 8) self.assertEqual(exitcode, 8)
def test_kill(self): def test_kill(self):
args = PROGRAM_BLOCKED args = PROGRAM_BLOCKED
create = asyncio.create_subprocess_exec(*args, loop=self.loop) proc = self.loop.run_until_complete(
proc = self.loop.run_until_complete(create) asyncio.create_subprocess_exec(*args)
)
proc.kill() proc.kill()
returncode = self.loop.run_until_complete(proc.wait()) returncode = self.loop.run_until_complete(proc.wait())
if sys.platform == 'win32': if sys.platform == 'win32':
@ -178,8 +181,9 @@ def test_kill(self):
def test_terminate(self): def test_terminate(self):
args = PROGRAM_BLOCKED args = PROGRAM_BLOCKED
create = asyncio.create_subprocess_exec(*args, loop=self.loop) proc = self.loop.run_until_complete(
proc = self.loop.run_until_complete(create) asyncio.create_subprocess_exec(*args)
)
proc.terminate() proc.terminate()
returncode = self.loop.run_until_complete(proc.wait()) returncode = self.loop.run_until_complete(proc.wait())
if sys.platform == 'win32': if sys.platform == 'win32':
@ -197,10 +201,12 @@ def test_send_signal(self):
try: try:
code = 'import time; print("sleeping", flush=True); time.sleep(3600)' code = 'import time; print("sleeping", flush=True); time.sleep(3600)'
args = [sys.executable, '-c', code] args = [sys.executable, '-c', code]
create = asyncio.create_subprocess_exec(*args, proc = self.loop.run_until_complete(
asyncio.create_subprocess_exec(
*args,
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
loop=self.loop) )
proc = self.loop.run_until_complete(create) )
async def send_signal(proc): async def send_signal(proc):
# basic synchronization to wait until the program is sleeping # basic synchronization to wait until the program is sleeping
@ -221,11 +227,13 @@ def prepare_broken_pipe_test(self):
large_data = b'x' * support.PIPE_MAX_SIZE large_data = b'x' * support.PIPE_MAX_SIZE
# the program ends before the stdin can be feeded # the program ends before the stdin can be feeded
create = asyncio.create_subprocess_exec( proc = self.loop.run_until_complete(
asyncio.create_subprocess_exec(
sys.executable, '-c', 'pass', sys.executable, '-c', 'pass',
stdin=subprocess.PIPE, stdin=subprocess.PIPE,
loop=self.loop) )
proc = self.loop.run_until_complete(create) )
return (proc, large_data) return (proc, large_data)
def test_stdin_broken_pipe(self): def test_stdin_broken_pipe(self):
@ -277,7 +285,7 @@ async def connect_read_pipe_mock(*args, **kw):
stdin=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE,
limit=limit, limit=limit,
loop=self.loop) )
stdout_transport = proc._transport.get_pipe_transport(1) stdout_transport = proc._transport.get_pipe_transport(1)
stdout, stderr = await proc.communicate() stdout, stderr = await proc.communicate()
@ -306,7 +314,7 @@ async def len_message(message):
stdout=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
close_fds=False, close_fds=False,
loop=self.loop) )
stdout, stderr = await proc.communicate(message) stdout, stderr = await proc.communicate(message)
exitcode = await proc.wait() exitcode = await proc.wait()
return (stdout, exitcode) return (stdout, exitcode)
@ -325,7 +333,7 @@ async def empty_input():
stdout=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
close_fds=False, close_fds=False,
loop=self.loop) )
stdout, stderr = await proc.communicate(b'') stdout, stderr = await proc.communicate(b'')
exitcode = await proc.wait() exitcode = await proc.wait()
return (stdout, exitcode) return (stdout, exitcode)
@ -344,7 +352,7 @@ async def empty_input():
stdout=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
close_fds=False, close_fds=False,
loop=self.loop) )
stdout, stderr = await proc.communicate() stdout, stderr = await proc.communicate()
exitcode = await proc.wait() exitcode = await proc.wait()
return (stdout, exitcode) return (stdout, exitcode)
@ -363,7 +371,7 @@ async def empty_output():
stdout=asyncio.subprocess.DEVNULL, stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
close_fds=False, close_fds=False,
loop=self.loop) )
stdout, stderr = await proc.communicate(b"abc") stdout, stderr = await proc.communicate(b"abc")
exitcode = await proc.wait() exitcode = await proc.wait()
return (stdout, exitcode) return (stdout, exitcode)
@ -382,7 +390,7 @@ async def empty_error():
stdout=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.DEVNULL,
close_fds=False, close_fds=False,
loop=self.loop) )
stdout, stderr = await proc.communicate(b"abc") stdout, stderr = await proc.communicate(b"abc")
exitcode = await proc.wait() exitcode = await proc.wait()
return (stderr, exitcode) return (stderr, exitcode)
@ -395,9 +403,7 @@ def test_cancel_process_wait(self):
# Issue #23140: cancel Process.wait() # Issue #23140: cancel Process.wait()
async def cancel_wait(): async def cancel_wait():
proc = await asyncio.create_subprocess_exec( proc = await asyncio.create_subprocess_exec(*PROGRAM_BLOCKED)
*PROGRAM_BLOCKED,
loop=self.loop)
# Create an internal future waiting on the process exit # Create an internal future waiting on the process exit
task = self.loop.create_task(proc.wait()) task = self.loop.create_task(proc.wait())
@ -419,8 +425,7 @@ async def cancel_wait():
def test_cancel_make_subprocess_transport_exec(self): def test_cancel_make_subprocess_transport_exec(self):
async def cancel_make_transport(): async def cancel_make_transport():
coro = asyncio.create_subprocess_exec(*PROGRAM_BLOCKED, coro = asyncio.create_subprocess_exec(*PROGRAM_BLOCKED)
loop=self.loop)
task = self.loop.create_task(coro) task = self.loop.create_task(coro)
self.loop.call_soon(task.cancel) self.loop.call_soon(task.cancel)
@ -524,7 +529,7 @@ async def kill_running():
isinstance(self, SubprocessFastWatcherTests)): isinstance(self, SubprocessFastWatcherTests)):
asyncio.get_child_watcher()._callbacks.clear() asyncio.get_child_watcher()._callbacks.clear()
def _test_popen_error(self, stdin): async def _test_popen_error(self, stdin):
if sys.platform == 'win32': if sys.platform == 'win32':
target = 'asyncio.windows_utils.Popen' target = 'asyncio.windows_utils.Popen'
else: else:
@ -533,23 +538,26 @@ def _test_popen_error(self, stdin):
exc = ZeroDivisionError exc = ZeroDivisionError
popen.side_effect = exc popen.side_effect = exc
create = asyncio.create_subprocess_exec(sys.executable, '-c',
'pass', stdin=stdin,
loop=self.loop)
with warnings.catch_warnings(record=True) as warns: with warnings.catch_warnings(record=True) as warns:
with self.assertRaises(exc): with self.assertRaises(exc):
self.loop.run_until_complete(create) await asyncio.create_subprocess_exec(
sys.executable,
'-c',
'pass',
stdin=stdin
)
self.assertEqual(warns, []) self.assertEqual(warns, [])
def test_popen_error(self): def test_popen_error(self):
# Issue #24763: check that the subprocess transport is closed # Issue #24763: check that the subprocess transport is closed
# when BaseSubprocessTransport fails # when BaseSubprocessTransport fails
self._test_popen_error(stdin=None) self.loop.run_until_complete(self._test_popen_error(stdin=None))
def test_popen_error_with_stdin_pipe(self): def test_popen_error_with_stdin_pipe(self):
# Issue #35721: check that newly created socket pair is closed when # Issue #35721: check that newly created socket pair is closed when
# Popen fails # Popen fails
self._test_popen_error(stdin=subprocess.PIPE) self.loop.run_until_complete(
self._test_popen_error(stdin=subprocess.PIPE))
def test_read_stdout_after_process_exit(self): def test_read_stdout_after_process_exit(self):
@ -560,12 +568,11 @@ async def execute():
'sys.stdout.flush()', 'sys.stdout.flush()',
'sys.exit(1)']) 'sys.exit(1)'])
fut = asyncio.create_subprocess_exec( process = await asyncio.create_subprocess_exec(
sys.executable, '-c', code, sys.executable, '-c', code,
stdout=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE,
loop=self.loop) )
process = await fut
while True: while True:
data = await process.stdout.read(65536) data = await process.stdout.read(65536)
if data: if data:
@ -620,7 +627,6 @@ async def execute():
self.loop.run_until_complete(execute()) self.loop.run_until_complete(execute())
def test_create_subprocess_exec_with_path(self): def test_create_subprocess_exec_with_path(self):
async def execute(): async def execute():
p = await subprocess.create_subprocess_exec( p = await subprocess.create_subprocess_exec(
@ -632,6 +638,26 @@ async def execute():
self.assertIsNone(self.loop.run_until_complete(execute())) self.assertIsNone(self.loop.run_until_complete(execute()))
def test_exec_loop_deprecated(self):
async def go():
with self.assertWarns(DeprecationWarning):
proc = await asyncio.create_subprocess_exec(
sys.executable, '-c', 'pass',
loop=self.loop,
)
await proc.wait()
self.loop.run_until_complete(go())
def test_shell_loop_deprecated(self):
async def go():
with self.assertWarns(DeprecationWarning):
proc = await asyncio.create_subprocess_shell(
"exit 0",
loop=self.loop,
)
await proc.wait()
self.loop.run_until_complete(go())
if sys.platform != 'win32': if sys.platform != 'win32':
# Unix # Unix

View file

@ -1810,7 +1810,7 @@ async def inner():
async def outer(): async def outer():
nonlocal proof nonlocal proof
await asyncio.shield(inner(), loop=self.loop) await asyncio.shield(inner())
proof += 100 proof += 100
f = asyncio.ensure_future(outer(), loop=self.loop) f = asyncio.ensure_future(outer(), loop=self.loop)
@ -1825,8 +1825,8 @@ async def outer():
def test_shield_gather(self): def test_shield_gather(self):
child1 = self.new_future(self.loop) child1 = self.new_future(self.loop)
child2 = self.new_future(self.loop) child2 = self.new_future(self.loop)
parent = asyncio.gather(child1, child2, loop=self.loop) parent = asyncio.gather(child1, child2)
outer = asyncio.shield(parent, loop=self.loop) outer = asyncio.shield(parent)
test_utils.run_briefly(self.loop) test_utils.run_briefly(self.loop)
outer.cancel() outer.cancel()
test_utils.run_briefly(self.loop) test_utils.run_briefly(self.loop)
@ -1839,9 +1839,9 @@ def test_shield_gather(self):
def test_gather_shield(self): def test_gather_shield(self):
child1 = self.new_future(self.loop) child1 = self.new_future(self.loop)
child2 = self.new_future(self.loop) child2 = self.new_future(self.loop)
inner1 = asyncio.shield(child1, loop=self.loop) inner1 = asyncio.shield(child1)
inner2 = asyncio.shield(child2, loop=self.loop) inner2 = asyncio.shield(child2)
parent = asyncio.gather(inner1, inner2, loop=self.loop) parent = asyncio.gather(inner1, inner2)
test_utils.run_briefly(self.loop) test_utils.run_briefly(self.loop)
parent.cancel() parent.cancel()
# This should cancel inner1 and inner2 but bot child1 and child2. # This should cancel inner1 and inner2 but bot child1 and child2.
@ -2981,6 +2981,7 @@ def _check_empty_sequence(self, seq_or_iter):
self._run_loop(self.one_loop) self._run_loop(self.one_loop)
self.assertTrue(fut.done()) self.assertTrue(fut.done())
self.assertEqual(fut.result(), []) self.assertEqual(fut.result(), [])
with self.assertWarns(DeprecationWarning):
fut = asyncio.gather(*seq_or_iter, loop=self.other_loop) fut = asyncio.gather(*seq_or_iter, loop=self.other_loop)
self.assertIs(fut._loop, self.other_loop) self.assertIs(fut._loop, self.other_loop)