mirror of
https://github.com/python/cpython.git
synced 2025-10-24 02:13:49 +00:00
bpo-40692: Run more test_concurrent_futures tests (GH-20239)
In the case of multiprocessing.synchronize() being missing, the test_concurrent_futures test suite now skips only the tests that require multiprocessing.synchronize(). Validate that multiprocessing.synchronize exists as part of _check_system_limits(), allowing ProcessPoolExecutor to raise NotImplementedError during __init__, rather than crashing with ImportError during __init__ when creating a lock imported from multiprocessing.synchronize. Use _check_system_limits() to disable tests of ProcessPoolExecutor on systems without multiprocessing.synchronize. Running the test suite without multiprocessing.synchronize reveals that Lib/compileall.py crashes when it uses a ProcessPoolExecutor. Therefore, change Lib/compileall.py to call _check_system_limits() before creating the ProcessPoolExecutor. Note that both Lib/compileall.py and Lib/test/test_compileall.py were attempting to sanity-check ProcessPoolExecutor by expecting ImportError. In multiprocessing.resource_tracker, sem_unlink() is also absent on platforms where POSIX semaphores aren't available. Avoid using sem_unlink() if it, too, does not exist. Co-authored-by: Pablo Galindo <Pablogsal@gmail.com>
This commit is contained in:
parent
30a8b28396
commit
bf2e7e55d7
6 changed files with 49 additions and 10 deletions
|
@ -84,12 +84,14 @@ def compile_dir(dir, maxlevels=None, ddir=None, force=False,
|
||||||
if workers < 0:
|
if workers < 0:
|
||||||
raise ValueError('workers must be greater or equal to 0')
|
raise ValueError('workers must be greater or equal to 0')
|
||||||
if workers != 1:
|
if workers != 1:
|
||||||
|
# Check if this is a system where ProcessPoolExecutor can function.
|
||||||
|
from concurrent.futures.process import _check_system_limits
|
||||||
try:
|
try:
|
||||||
# Only import when needed, as low resource platforms may
|
_check_system_limits()
|
||||||
# fail to import it
|
except NotImplementedError:
|
||||||
from concurrent.futures import ProcessPoolExecutor
|
|
||||||
except ImportError:
|
|
||||||
workers = 1
|
workers = 1
|
||||||
|
else:
|
||||||
|
from concurrent.futures import ProcessPoolExecutor
|
||||||
if maxlevels is None:
|
if maxlevels is None:
|
||||||
maxlevels = sys.getrecursionlimit()
|
maxlevels = sys.getrecursionlimit()
|
||||||
files = _walk_dir(dir, quiet=quiet, maxlevels=maxlevels)
|
files = _walk_dir(dir, quiet=quiet, maxlevels=maxlevels)
|
||||||
|
|
|
@ -532,6 +532,14 @@ def _check_system_limits():
|
||||||
if _system_limited:
|
if _system_limited:
|
||||||
raise NotImplementedError(_system_limited)
|
raise NotImplementedError(_system_limited)
|
||||||
_system_limits_checked = True
|
_system_limits_checked = True
|
||||||
|
try:
|
||||||
|
import multiprocessing.synchronize
|
||||||
|
except ImportError:
|
||||||
|
_system_limited = (
|
||||||
|
"This Python build lacks multiprocessing.synchronize, usually due "
|
||||||
|
"to named semaphores being unavailable on this platform."
|
||||||
|
)
|
||||||
|
raise NotImplementedError(_system_limited)
|
||||||
try:
|
try:
|
||||||
nsems_max = os.sysconf("SC_SEM_NSEMS_MAX")
|
nsems_max = os.sysconf("SC_SEM_NSEMS_MAX")
|
||||||
except (AttributeError, ValueError):
|
except (AttributeError, ValueError):
|
||||||
|
|
|
@ -37,8 +37,16 @@
|
||||||
import _multiprocessing
|
import _multiprocessing
|
||||||
import _posixshmem
|
import _posixshmem
|
||||||
|
|
||||||
|
# Use sem_unlink() to clean up named semaphores.
|
||||||
|
#
|
||||||
|
# sem_unlink() may be missing if the Python build process detected the
|
||||||
|
# absence of POSIX named semaphores. In that case, no named semaphores were
|
||||||
|
# ever opened, so no cleanup would be necessary.
|
||||||
|
if hasattr(_multiprocessing, 'sem_unlink'):
|
||||||
_CLEANUP_FUNCS.update({
|
_CLEANUP_FUNCS.update({
|
||||||
'semaphore': _multiprocessing.sem_unlink,
|
'semaphore': _multiprocessing.sem_unlink,
|
||||||
|
})
|
||||||
|
_CLEANUP_FUNCS.update({
|
||||||
'shared_memory': _posixshmem.shm_unlink,
|
'shared_memory': _posixshmem.shm_unlink,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
|
@ -16,10 +16,14 @@
|
||||||
import unittest
|
import unittest
|
||||||
|
|
||||||
from unittest import mock, skipUnless
|
from unittest import mock, skipUnless
|
||||||
try:
|
|
||||||
from concurrent.futures import ProcessPoolExecutor
|
from concurrent.futures import ProcessPoolExecutor
|
||||||
|
try:
|
||||||
|
# compileall relies on ProcessPoolExecutor if ProcessPoolExecutor exists
|
||||||
|
# and it can function.
|
||||||
|
from concurrent.futures.process import _check_system_limits
|
||||||
|
_check_system_limits()
|
||||||
_have_multiprocessing = True
|
_have_multiprocessing = True
|
||||||
except ImportError:
|
except NotImplementedError:
|
||||||
_have_multiprocessing = False
|
_have_multiprocessing = False
|
||||||
|
|
||||||
from test import support
|
from test import support
|
||||||
|
@ -188,6 +192,7 @@ def test_compile_dir_pathlike(self):
|
||||||
self.assertRegex(line, r'Listing ([^WindowsPath|PosixPath].*)')
|
self.assertRegex(line, r'Listing ([^WindowsPath|PosixPath].*)')
|
||||||
self.assertTrue(os.path.isfile(self.bc_path))
|
self.assertTrue(os.path.isfile(self.bc_path))
|
||||||
|
|
||||||
|
@skipUnless(_have_multiprocessing, "requires multiprocessing")
|
||||||
@mock.patch('concurrent.futures.ProcessPoolExecutor')
|
@mock.patch('concurrent.futures.ProcessPoolExecutor')
|
||||||
def test_compile_pool_called(self, pool_mock):
|
def test_compile_pool_called(self, pool_mock):
|
||||||
compileall.compile_dir(self.directory, quiet=True, workers=5)
|
compileall.compile_dir(self.directory, quiet=True, workers=5)
|
||||||
|
@ -198,11 +203,13 @@ def test_compile_workers_non_positive(self):
|
||||||
"workers must be greater or equal to 0"):
|
"workers must be greater or equal to 0"):
|
||||||
compileall.compile_dir(self.directory, workers=-1)
|
compileall.compile_dir(self.directory, workers=-1)
|
||||||
|
|
||||||
|
@skipUnless(_have_multiprocessing, "requires multiprocessing")
|
||||||
@mock.patch('concurrent.futures.ProcessPoolExecutor')
|
@mock.patch('concurrent.futures.ProcessPoolExecutor')
|
||||||
def test_compile_workers_cpu_count(self, pool_mock):
|
def test_compile_workers_cpu_count(self, pool_mock):
|
||||||
compileall.compile_dir(self.directory, quiet=True, workers=0)
|
compileall.compile_dir(self.directory, quiet=True, workers=0)
|
||||||
self.assertEqual(pool_mock.call_args[1]['max_workers'], None)
|
self.assertEqual(pool_mock.call_args[1]['max_workers'], None)
|
||||||
|
|
||||||
|
@skipUnless(_have_multiprocessing, "requires multiprocessing")
|
||||||
@mock.patch('concurrent.futures.ProcessPoolExecutor')
|
@mock.patch('concurrent.futures.ProcessPoolExecutor')
|
||||||
@mock.patch('compileall.compile_file')
|
@mock.patch('compileall.compile_file')
|
||||||
def test_compile_one_worker(self, compile_file_mock, pool_mock):
|
def test_compile_one_worker(self, compile_file_mock, pool_mock):
|
||||||
|
|
|
@ -4,8 +4,6 @@
|
||||||
|
|
||||||
# Skip tests if _multiprocessing wasn't built.
|
# Skip tests if _multiprocessing wasn't built.
|
||||||
import_helper.import_module('_multiprocessing')
|
import_helper.import_module('_multiprocessing')
|
||||||
# Skip tests if sem_open implementation is broken.
|
|
||||||
support.skip_if_broken_multiprocessing_synchronize()
|
|
||||||
|
|
||||||
from test.support import hashlib_helper
|
from test.support import hashlib_helper
|
||||||
from test.support.script_helper import assert_python_ok
|
from test.support.script_helper import assert_python_ok
|
||||||
|
@ -27,7 +25,7 @@
|
||||||
from concurrent.futures._base import (
|
from concurrent.futures._base import (
|
||||||
PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future,
|
PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future,
|
||||||
BrokenExecutor)
|
BrokenExecutor)
|
||||||
from concurrent.futures.process import BrokenProcessPool
|
from concurrent.futures.process import BrokenProcessPool, _check_system_limits
|
||||||
from multiprocessing import get_context
|
from multiprocessing import get_context
|
||||||
|
|
||||||
import multiprocessing.process
|
import multiprocessing.process
|
||||||
|
@ -161,6 +159,10 @@ class ProcessPoolForkMixin(ExecutorMixin):
|
||||||
ctx = "fork"
|
ctx = "fork"
|
||||||
|
|
||||||
def get_context(self):
|
def get_context(self):
|
||||||
|
try:
|
||||||
|
_check_system_limits()
|
||||||
|
except NotImplementedError:
|
||||||
|
self.skipTest("ProcessPoolExecutor unavailable on this system")
|
||||||
if sys.platform == "win32":
|
if sys.platform == "win32":
|
||||||
self.skipTest("require unix system")
|
self.skipTest("require unix system")
|
||||||
return super().get_context()
|
return super().get_context()
|
||||||
|
@ -170,12 +172,23 @@ class ProcessPoolSpawnMixin(ExecutorMixin):
|
||||||
executor_type = futures.ProcessPoolExecutor
|
executor_type = futures.ProcessPoolExecutor
|
||||||
ctx = "spawn"
|
ctx = "spawn"
|
||||||
|
|
||||||
|
def get_context(self):
|
||||||
|
try:
|
||||||
|
_check_system_limits()
|
||||||
|
except NotImplementedError:
|
||||||
|
self.skipTest("ProcessPoolExecutor unavailable on this system")
|
||||||
|
return super().get_context()
|
||||||
|
|
||||||
|
|
||||||
class ProcessPoolForkserverMixin(ExecutorMixin):
|
class ProcessPoolForkserverMixin(ExecutorMixin):
|
||||||
executor_type = futures.ProcessPoolExecutor
|
executor_type = futures.ProcessPoolExecutor
|
||||||
ctx = "forkserver"
|
ctx = "forkserver"
|
||||||
|
|
||||||
def get_context(self):
|
def get_context(self):
|
||||||
|
try:
|
||||||
|
_check_system_limits()
|
||||||
|
except NotImplementedError:
|
||||||
|
self.skipTest("ProcessPoolExecutor unavailable on this system")
|
||||||
if sys.platform == "win32":
|
if sys.platform == "win32":
|
||||||
self.skipTest("require unix system")
|
self.skipTest("require unix system")
|
||||||
return super().get_context()
|
return super().get_context()
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
In the :class:`concurrent.futures.ProcessPoolExecutor`, validate that :func:`multiprocess.synchronize` is available on a given platform and rely on that check in the :mod:`concurrent.futures` test suite so we can run tests that are unrelated to :class:`ProcessPoolExecutor` on those platforms.
|
Loading…
Add table
Add a link
Reference in a new issue