mirror of
https://github.com/python/cpython.git
synced 2026-02-13 19:04:37 +00:00
bpo-19675: Terminate processes if construction of a pool is failing. (GH-5614)
This commit is contained in:
parent
b4db249c95
commit
5d236cafd7
3 changed files with 54 additions and 2 deletions
|
|
@ -174,7 +174,15 @@ def __init__(self, processes=None, initializer=None, initargs=(),
|
|||
|
||||
self._processes = processes
|
||||
self._pool = []
|
||||
self._repopulate_pool()
|
||||
try:
|
||||
self._repopulate_pool()
|
||||
except Exception:
|
||||
for p in self._pool:
|
||||
if p.exitcode is None:
|
||||
p.terminate()
|
||||
for p in self._pool:
|
||||
p.join()
|
||||
raise
|
||||
|
||||
self._worker_handler = threading.Thread(
|
||||
target=Pool._handle_workers,
|
||||
|
|
@ -251,10 +259,10 @@ def _repopulate_pool_static(ctx, Process, processes, pool, inqueue,
|
|||
initargs, maxtasksperchild,
|
||||
wrap_exception)
|
||||
)
|
||||
pool.append(w)
|
||||
w.name = w.name.replace('Process', 'PoolWorker')
|
||||
w.daemon = True
|
||||
w.start()
|
||||
pool.append(w)
|
||||
util.debug('added worker')
|
||||
|
||||
@staticmethod
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@
|
|||
#
|
||||
|
||||
import unittest
|
||||
import unittest.mock
|
||||
import queue as pyqueue
|
||||
import contextlib
|
||||
import time
|
||||
|
|
@ -4635,6 +4636,48 @@ def test_empty(self):
|
|||
proc.join()
|
||||
|
||||
|
||||
class TestPoolNotLeakOnFailure(unittest.TestCase):
|
||||
|
||||
def test_release_unused_processes(self):
|
||||
# Issue #19675: During pool creation, if we can't create a process,
|
||||
# don't leak already created ones.
|
||||
will_fail_in = 3
|
||||
forked_processes = []
|
||||
|
||||
class FailingForkProcess:
|
||||
def __init__(self, **kwargs):
|
||||
self.name = 'Fake Process'
|
||||
self.exitcode = None
|
||||
self.state = None
|
||||
forked_processes.append(self)
|
||||
|
||||
def start(self):
|
||||
nonlocal will_fail_in
|
||||
if will_fail_in <= 0:
|
||||
raise OSError("Manually induced OSError")
|
||||
will_fail_in -= 1
|
||||
self.state = 'started'
|
||||
|
||||
def terminate(self):
|
||||
self.state = 'stopping'
|
||||
|
||||
def join(self):
|
||||
if self.state == 'stopping':
|
||||
self.state = 'stopped'
|
||||
|
||||
def is_alive(self):
|
||||
return self.state == 'started' or self.state == 'stopping'
|
||||
|
||||
with self.assertRaisesRegex(OSError, 'Manually induced OSError'):
|
||||
p = multiprocessing.pool.Pool(5, context=unittest.mock.MagicMock(
|
||||
Process=FailingForkProcess))
|
||||
p.close()
|
||||
p.join()
|
||||
self.assertFalse(
|
||||
any(process.is_alive() for process in forked_processes))
|
||||
|
||||
|
||||
|
||||
class MiscTestCase(unittest.TestCase):
|
||||
def test__all__(self):
|
||||
# Just make sure names in blacklist are excluded
|
||||
|
|
|
|||
|
|
@ -0,0 +1 @@
|
|||
``multiprocessing.Pool`` no longer leaks processes if its initialization fails.
|
||||
Loading…
Add table
Add a link
Reference in a new issue