mirror of
https://github.com/python/cpython.git
synced 2025-12-31 04:23:37 +00:00
gh-95166: cancel map waited on future on timeout (GH-95169)
Co-authored-by: Kumar Aditya <59607654+kumaraditya303@users.noreply.github.com>
This commit is contained in:
parent
b8b2990fb3
commit
e16d4ed590
3 changed files with 42 additions and 2 deletions
|
|
@ -310,6 +310,18 @@ def wait(fs, timeout=None, return_when=ALL_COMPLETED):
|
||||||
done.update(waiter.finished_futures)
|
done.update(waiter.finished_futures)
|
||||||
return DoneAndNotDoneFutures(done, fs - done)
|
return DoneAndNotDoneFutures(done, fs - done)
|
||||||
|
|
||||||
|
|
||||||
|
def _result_or_cancel(fut, timeout=None):
|
||||||
|
try:
|
||||||
|
try:
|
||||||
|
return fut.result(timeout)
|
||||||
|
finally:
|
||||||
|
fut.cancel()
|
||||||
|
finally:
|
||||||
|
# Break a reference cycle with the exception in self._exception
|
||||||
|
del fut
|
||||||
|
|
||||||
|
|
||||||
class Future(object):
|
class Future(object):
|
||||||
"""Represents the result of an asynchronous computation."""
|
"""Represents the result of an asynchronous computation."""
|
||||||
|
|
||||||
|
|
@ -604,9 +616,9 @@ def result_iterator():
|
||||||
while fs:
|
while fs:
|
||||||
# Careful not to keep a reference to the popped future
|
# Careful not to keep a reference to the popped future
|
||||||
if timeout is None:
|
if timeout is None:
|
||||||
yield fs.pop().result()
|
yield _result_or_cancel(fs.pop())
|
||||||
else:
|
else:
|
||||||
yield fs.pop().result(end_time - time.monotonic())
|
yield _result_or_cancel(fs.pop(), end_time - time.monotonic())
|
||||||
finally:
|
finally:
|
||||||
for future in fs:
|
for future in fs:
|
||||||
future.cancel()
|
future.cancel()
|
||||||
|
|
|
||||||
|
|
@ -932,6 +932,33 @@ def submit(pool):
|
||||||
with futures.ProcessPoolExecutor(1, mp_context=mp.get_context('fork')) as workers:
|
with futures.ProcessPoolExecutor(1, mp_context=mp.get_context('fork')) as workers:
|
||||||
workers.submit(tuple)
|
workers.submit(tuple)
|
||||||
|
|
||||||
|
def test_executor_map_current_future_cancel(self):
|
||||||
|
stop_event = threading.Event()
|
||||||
|
log = []
|
||||||
|
|
||||||
|
def log_n_wait(ident):
|
||||||
|
log.append(f"{ident=} started")
|
||||||
|
try:
|
||||||
|
stop_event.wait()
|
||||||
|
finally:
|
||||||
|
log.append(f"{ident=} stopped")
|
||||||
|
|
||||||
|
with self.executor_type(max_workers=1) as pool:
|
||||||
|
# submit work to saturate the pool
|
||||||
|
fut = pool.submit(log_n_wait, ident="first")
|
||||||
|
try:
|
||||||
|
with contextlib.closing(
|
||||||
|
pool.map(log_n_wait, ["second", "third"], timeout=0)
|
||||||
|
) as gen:
|
||||||
|
with self.assertRaises(TimeoutError):
|
||||||
|
next(gen)
|
||||||
|
finally:
|
||||||
|
stop_event.set()
|
||||||
|
fut.result()
|
||||||
|
# ident='second' is cancelled as a result of raising a TimeoutError
|
||||||
|
# ident='third' is cancelled because it remained in the collection of futures
|
||||||
|
self.assertListEqual(log, ["ident='first' started", "ident='first' stopped"])
|
||||||
|
|
||||||
|
|
||||||
class ProcessPoolExecutorTest(ExecutorTest):
|
class ProcessPoolExecutorTest(ExecutorTest):
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1 @@
|
||||||
|
Fix :meth:`concurrent.futures.Executor.map` to cancel the currently waiting on future on an error - e.g. TimeoutError or KeyboardInterrupt.
|
||||||
Loading…
Add table
Add a link
Reference in a new issue