mirror of
https://github.com/python/cpython.git
synced 2026-06-29 04:10:54 +00:00
[3.15] gh-101267: ProcessPoolExecutor no longer shares 1 BrokenProcessPool exception among all failed futures (GH-101268) (#151430)
gh-101267: ProcessPoolExecutor no longer shares 1 BrokenProcessPool exception among all failed futures (GH-101268)
(cherry picked from commit 3c00ebc2bb)
Co-authored-by: Daniel Shields <daniel.shields@twosigma.com>
Co-authored-by: blurb-it[bot] <43283697+blurb-it[bot]@users.noreply.github.com>
Co-authored-by: Gregory P. Smith <greg@krypto.org>
This commit is contained in:
parent
8b291463e3
commit
5d7cf16f3f
4 changed files with 44 additions and 7 deletions
|
|
@ -469,11 +469,9 @@ def _terminate_broken(self, cause):
|
|||
executor._shutdown_thread = True
|
||||
executor = None
|
||||
|
||||
# All pending tasks are to be marked failed with the following
|
||||
# BrokenProcessPool error
|
||||
bpe = BrokenProcessPool("A process in the process pool was "
|
||||
"terminated abruptly while the future was "
|
||||
"running or pending.")
|
||||
# All pending tasks are to be marked failed with a
|
||||
# BrokenProcessPool error, as separate instances to avoid sharing
|
||||
# a traceback (gh-101267).
|
||||
cause_str = None
|
||||
if cause is not None:
|
||||
cause_str = ''.join(cause)
|
||||
|
|
@ -489,11 +487,15 @@ def _terminate_broken(self, cause):
|
|||
f"with exit code {p.exitcode}")
|
||||
if errors:
|
||||
cause_str = "\n".join(errors)
|
||||
if cause_str:
|
||||
bpe.__cause__ = _RemoteTraceback(f"\n'''\n{cause_str}'''")
|
||||
cause_tb = f"\n'''\n{cause_str}'''" if cause_str else None
|
||||
|
||||
# Mark pending tasks as failed.
|
||||
for work_id, work_item in self.pending_work_items.items():
|
||||
bpe = BrokenProcessPool("A process in the process pool was "
|
||||
"terminated abruptly while the future was "
|
||||
"running or pending.")
|
||||
if cause_tb is not None:
|
||||
bpe.__cause__ = _RemoteTraceback(cause_tb)
|
||||
try:
|
||||
work_item.future.set_exception(bpe)
|
||||
except _base.InvalidStateError:
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@
|
|||
import sys
|
||||
import threading
|
||||
import time
|
||||
import traceback
|
||||
import unittest
|
||||
import unittest.mock
|
||||
from concurrent import futures
|
||||
|
|
@ -62,6 +63,32 @@ def test_killed_child(self):
|
|||
# Submitting other jobs fails as well.
|
||||
self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8)
|
||||
|
||||
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
|
||||
def test_broken_process_pool_traceback(self):
|
||||
# When a child process is abruptly terminated, the whole pool gets
|
||||
# "broken", and a BrokenProcessPool exception should be created
|
||||
# for each future instead of sharing one exception among all futures.
|
||||
event = self.create_event()
|
||||
futures = [self.executor.submit(event.wait) for _ in range(3)]
|
||||
p = next(iter(self.executor._processes.values()))
|
||||
p.terminate()
|
||||
for fut in futures:
|
||||
# Don't use assertRaises(): it clears the traceback off exc.
|
||||
try:
|
||||
fut.result()
|
||||
except BrokenProcessPool as exc:
|
||||
tb = exc.__traceback__
|
||||
else:
|
||||
self.fail("BrokenProcessPool not raised")
|
||||
count = sum(
|
||||
1
|
||||
for frame_summary in traceback.extract_tb(tb)
|
||||
if frame_summary.filename == __file__
|
||||
)
|
||||
# This code file should appear exactly once in the traceback.
|
||||
# A shared exception would accumulate a frame per result() call.
|
||||
self.assertEqual(count, 1)
|
||||
|
||||
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
|
||||
def test_map_chunksize(self):
|
||||
def bad_map():
|
||||
|
|
|
|||
|
|
@ -1762,6 +1762,7 @@ Charlie Shepherd
|
|||
Bruce Sherwood
|
||||
Gregory Shevchenko
|
||||
Hai Shi
|
||||
Daniel Shields
|
||||
Alexander Shigin
|
||||
Pete Shinners
|
||||
Michael Shiplett
|
||||
|
|
|
|||
|
|
@ -0,0 +1,7 @@
|
|||
When a worker process terminates unexpectedly,
|
||||
:class:`concurrent.futures.ProcessPoolExecutor` now sets a separate
|
||||
:exc:`~concurrent.futures.process.BrokenProcessPool` exception on each pending
|
||||
future instead of sharing a single instance among them all. Sharing one
|
||||
exception produced malformed tracebacks: each
|
||||
:meth:`Future.result() <concurrent.futures.Future.result>` call re-raised the
|
||||
same object, appending another copy of the traceback to it.
|
||||
Loading…
Add table
Add a link
Reference in a new issue