mirror of
				https://github.com/python/cpython.git
				synced 2025-10-26 11:14:33 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			726 lines
		
	
	
	
		
			22 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			726 lines
		
	
	
	
		
			22 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import contextlib
 | |
| import faulthandler
 | |
| import locale
 | |
| import math
 | |
| import os.path
 | |
| import platform
 | |
| import random
 | |
| import re
 | |
| import shlex
 | |
| import subprocess
 | |
| import sys
 | |
| import sysconfig
 | |
| import tempfile
 | |
| import textwrap
 | |
| from collections.abc import Callable, Iterable
 | |
| 
 | |
| from test import support
 | |
| from test.support import os_helper
 | |
| from test.support import threading_helper
 | |
| 
 | |
| 
 | |
| # All temporary files and temporary directories created by libregrtest should
 | |
| # use TMP_PREFIX so cleanup_temp_dir() can remove them all.
 | |
| TMP_PREFIX = 'test_python_'
 | |
| WORK_DIR_PREFIX = TMP_PREFIX
 | |
| WORKER_WORK_DIR_PREFIX = WORK_DIR_PREFIX + 'worker_'
 | |
| 
 | |
| # bpo-38203: Maximum delay in seconds to exit Python (call Py_Finalize()).
 | |
| # Used to protect against threading._shutdown() hang.
 | |
| # Must be smaller than buildbot "1200 seconds without output" limit.
 | |
| EXIT_TIMEOUT = 120.0
 | |
| 
 | |
| 
 | |
| ALL_RESOURCES = ('audio', 'console', 'curses', 'largefile', 'network',
 | |
|                  'decimal', 'cpu', 'subprocess', 'urlfetch', 'gui', 'walltime')
 | |
| 
 | |
| # Other resources excluded from --use=all:
 | |
| #
 | |
| # - extralagefile (ex: test_zipfile64): really too slow to be enabled
 | |
| #   "by default"
 | |
| # - tzdata: while needed to validate fully test_datetime, it makes
 | |
| #   test_datetime too slow (15-20 min on some buildbots) and so is disabled by
 | |
| #   default (see bpo-30822).
 | |
| RESOURCE_NAMES = ALL_RESOURCES + ('extralargefile', 'tzdata')
 | |
| 
 | |
| 
 | |
| # Types for types hints
 | |
| StrPath = str
 | |
| TestName = str
 | |
| StrJSON = str
 | |
| TestTuple = tuple[TestName, ...]
 | |
| TestList = list[TestName]
 | |
| # --match and --ignore options: list of patterns
 | |
| # ('*' joker character can be used)
 | |
| TestFilter = list[tuple[TestName, bool]]
 | |
| FilterTuple = tuple[TestName, ...]
 | |
| FilterDict = dict[TestName, FilterTuple]
 | |
| 
 | |
| 
 | |
| def format_duration(seconds: float) -> str:
 | |
|     ms = math.ceil(seconds * 1e3)
 | |
|     seconds, ms = divmod(ms, 1000)
 | |
|     minutes, seconds = divmod(seconds, 60)
 | |
|     hours, minutes = divmod(minutes, 60)
 | |
| 
 | |
|     parts = []
 | |
|     if hours:
 | |
|         parts.append('%s hour' % hours)
 | |
|     if minutes:
 | |
|         parts.append('%s min' % minutes)
 | |
|     if seconds:
 | |
|         if parts:
 | |
|             # 2 min 1 sec
 | |
|             parts.append('%s sec' % seconds)
 | |
|         else:
 | |
|             # 1.0 sec
 | |
|             parts.append('%.1f sec' % (seconds + ms / 1000))
 | |
|     if not parts:
 | |
|         return '%s ms' % ms
 | |
| 
 | |
|     parts = parts[:2]
 | |
|     return ' '.join(parts)
 | |
| 
 | |
| 
 | |
| def strip_py_suffix(names: list[str] | None) -> None:
 | |
|     if not names:
 | |
|         return
 | |
|     for idx, name in enumerate(names):
 | |
|         basename, ext = os.path.splitext(name)
 | |
|         if ext == '.py':
 | |
|             names[idx] = basename
 | |
| 
 | |
| 
 | |
| def plural(n: int, singular: str, plural: str | None = None) -> str:
 | |
|     if n == 1:
 | |
|         return singular
 | |
|     elif plural is not None:
 | |
|         return plural
 | |
|     else:
 | |
|         return singular + 's'
 | |
| 
 | |
| 
 | |
| def count(n: int, word: str) -> str:
 | |
|     if n == 1:
 | |
|         return f"{n} {word}"
 | |
|     else:
 | |
|         return f"{n} {word}s"
 | |
| 
 | |
| 
 | |
| def printlist(x, width=70, indent=4, file=None):
 | |
|     """Print the elements of iterable x to stdout.
 | |
| 
 | |
|     Optional arg width (default 70) is the maximum line length.
 | |
|     Optional arg indent (default 4) is the number of blanks with which to
 | |
|     begin each line.
 | |
|     """
 | |
| 
 | |
|     blanks = ' ' * indent
 | |
|     # Print the sorted list: 'x' may be a '--random' list or a set()
 | |
|     print(textwrap.fill(' '.join(str(elt) for elt in sorted(x)), width,
 | |
|                         initial_indent=blanks, subsequent_indent=blanks),
 | |
|           file=file)
 | |
| 
 | |
| 
 | |
| def print_warning(msg: str) -> None:
 | |
|     support.print_warning(msg)
 | |
| 
 | |
| 
 | |
| orig_unraisablehook: Callable[..., None] | None = None
 | |
| 
 | |
| 
 | |
| def regrtest_unraisable_hook(unraisable) -> None:
 | |
|     global orig_unraisablehook
 | |
|     support.environment_altered = True
 | |
|     support.print_warning("Unraisable exception")
 | |
|     old_stderr = sys.stderr
 | |
|     try:
 | |
|         support.flush_std_streams()
 | |
|         sys.stderr = support.print_warning.orig_stderr
 | |
|         assert orig_unraisablehook is not None, "orig_unraisablehook not set"
 | |
|         orig_unraisablehook(unraisable)
 | |
|         sys.stderr.flush()
 | |
|     finally:
 | |
|         sys.stderr = old_stderr
 | |
| 
 | |
| 
 | |
| def setup_unraisable_hook() -> None:
 | |
|     global orig_unraisablehook
 | |
|     orig_unraisablehook = sys.unraisablehook
 | |
|     sys.unraisablehook = regrtest_unraisable_hook
 | |
| 
 | |
| 
 | |
| orig_threading_excepthook: Callable[..., None] | None = None
 | |
| 
 | |
| 
 | |
| def regrtest_threading_excepthook(args) -> None:
 | |
|     global orig_threading_excepthook
 | |
|     support.environment_altered = True
 | |
|     support.print_warning(f"Uncaught thread exception: {args.exc_type.__name__}")
 | |
|     old_stderr = sys.stderr
 | |
|     try:
 | |
|         support.flush_std_streams()
 | |
|         sys.stderr = support.print_warning.orig_stderr
 | |
|         assert orig_threading_excepthook is not None, "orig_threading_excepthook not set"
 | |
|         orig_threading_excepthook(args)
 | |
|         sys.stderr.flush()
 | |
|     finally:
 | |
|         sys.stderr = old_stderr
 | |
| 
 | |
| 
 | |
| def setup_threading_excepthook() -> None:
 | |
|     global orig_threading_excepthook
 | |
|     import threading
 | |
|     orig_threading_excepthook = threading.excepthook
 | |
|     threading.excepthook = regrtest_threading_excepthook
 | |
| 
 | |
| 
 | |
| def clear_caches():
 | |
|     # Clear the warnings registry, so they can be displayed again
 | |
|     for mod in sys.modules.values():
 | |
|         if hasattr(mod, '__warningregistry__'):
 | |
|             del mod.__warningregistry__
 | |
| 
 | |
|     # Flush standard output, so that buffered data is sent to the OS and
 | |
|     # associated Python objects are reclaimed.
 | |
|     for stream in (sys.stdout, sys.stderr, sys.__stdout__, sys.__stderr__):
 | |
|         if stream is not None:
 | |
|             stream.flush()
 | |
| 
 | |
|     try:
 | |
|         re = sys.modules['re']
 | |
|     except KeyError:
 | |
|         pass
 | |
|     else:
 | |
|         re.purge()
 | |
| 
 | |
|     try:
 | |
|         _strptime = sys.modules['_strptime']
 | |
|     except KeyError:
 | |
|         pass
 | |
|     else:
 | |
|         _strptime._regex_cache.clear()
 | |
| 
 | |
|     try:
 | |
|         urllib_parse = sys.modules['urllib.parse']
 | |
|     except KeyError:
 | |
|         pass
 | |
|     else:
 | |
|         urllib_parse.clear_cache()
 | |
| 
 | |
|     try:
 | |
|         urllib_request = sys.modules['urllib.request']
 | |
|     except KeyError:
 | |
|         pass
 | |
|     else:
 | |
|         urllib_request.urlcleanup()
 | |
| 
 | |
|     try:
 | |
|         linecache = sys.modules['linecache']
 | |
|     except KeyError:
 | |
|         pass
 | |
|     else:
 | |
|         linecache.clearcache()
 | |
| 
 | |
|     try:
 | |
|         mimetypes = sys.modules['mimetypes']
 | |
|     except KeyError:
 | |
|         pass
 | |
|     else:
 | |
|         mimetypes._default_mime_types()
 | |
| 
 | |
|     try:
 | |
|         filecmp = sys.modules['filecmp']
 | |
|     except KeyError:
 | |
|         pass
 | |
|     else:
 | |
|         filecmp._cache.clear()
 | |
| 
 | |
|     try:
 | |
|         struct = sys.modules['struct']
 | |
|     except KeyError:
 | |
|         pass
 | |
|     else:
 | |
|         struct._clearcache()
 | |
| 
 | |
|     try:
 | |
|         doctest = sys.modules['doctest']
 | |
|     except KeyError:
 | |
|         pass
 | |
|     else:
 | |
|         doctest.master = None
 | |
| 
 | |
|     try:
 | |
|         ctypes = sys.modules['ctypes']
 | |
|     except KeyError:
 | |
|         pass
 | |
|     else:
 | |
|         ctypes._reset_cache()
 | |
| 
 | |
|     try:
 | |
|         typing = sys.modules['typing']
 | |
|     except KeyError:
 | |
|         pass
 | |
|     else:
 | |
|         for f in typing._cleanups:
 | |
|             f()
 | |
| 
 | |
|         import inspect
 | |
|         abs_classes = filter(inspect.isabstract, typing.__dict__.values())
 | |
|         for abc in abs_classes:
 | |
|             for obj in abc.__subclasses__() + [abc]:
 | |
|                 obj._abc_caches_clear()
 | |
| 
 | |
|     try:
 | |
|         fractions = sys.modules['fractions']
 | |
|     except KeyError:
 | |
|         pass
 | |
|     else:
 | |
|         fractions._hash_algorithm.cache_clear()
 | |
| 
 | |
|     try:
 | |
|         inspect = sys.modules['inspect']
 | |
|     except KeyError:
 | |
|         pass
 | |
|     else:
 | |
|         inspect._shadowed_dict_from_weakref_mro_tuple.cache_clear()
 | |
|         inspect._filesbymodname.clear()
 | |
|         inspect.modulesbyfile.clear()
 | |
| 
 | |
|     try:
 | |
|         importlib_metadata = sys.modules['importlib.metadata']
 | |
|     except KeyError:
 | |
|         pass
 | |
|     else:
 | |
|         importlib_metadata.FastPath.__new__.cache_clear()
 | |
| 
 | |
| 
 | |
| def get_build_info():
 | |
|     # Get most important configure and build options as a list of strings.
 | |
|     # Example: ['debug', 'ASAN+MSAN'] or ['release', 'LTO+PGO'].
 | |
| 
 | |
|     config_args = sysconfig.get_config_var('CONFIG_ARGS') or ''
 | |
|     cflags = sysconfig.get_config_var('PY_CFLAGS') or ''
 | |
|     cflags += ' ' + (sysconfig.get_config_var('PY_CFLAGS_NODIST') or '')
 | |
|     ldflags_nodist = sysconfig.get_config_var('PY_LDFLAGS_NODIST') or ''
 | |
| 
 | |
|     build = []
 | |
| 
 | |
|     # --disable-gil
 | |
|     if sysconfig.get_config_var('Py_GIL_DISABLED'):
 | |
|         if not sys.flags.ignore_environment:
 | |
|             PYTHON_GIL = os.environ.get('PYTHON_GIL', None)
 | |
|             if PYTHON_GIL:
 | |
|                 PYTHON_GIL = (PYTHON_GIL == '1')
 | |
|         else:
 | |
|             PYTHON_GIL = None
 | |
| 
 | |
|         free_threading = "free_threading"
 | |
|         if PYTHON_GIL is not None:
 | |
|             free_threading = f"{free_threading} GIL={int(PYTHON_GIL)}"
 | |
|         build.append(free_threading)
 | |
| 
 | |
|     if hasattr(sys, 'gettotalrefcount'):
 | |
|         # --with-pydebug
 | |
|         build.append('debug')
 | |
| 
 | |
|         if '-DNDEBUG' in cflags:
 | |
|             build.append('without_assert')
 | |
|     else:
 | |
|         build.append('release')
 | |
| 
 | |
|         if '--with-assertions' in config_args:
 | |
|             build.append('with_assert')
 | |
|         elif '-DNDEBUG' not in cflags:
 | |
|             build.append('with_assert')
 | |
| 
 | |
|     # --enable-experimental-jit
 | |
|     if sys._jit.is_available():
 | |
|         if sys._jit.is_enabled():
 | |
|             build.append("JIT")
 | |
|         else:
 | |
|             build.append("JIT (disabled)")
 | |
| 
 | |
|     # --enable-framework=name
 | |
|     framework = sysconfig.get_config_var('PYTHONFRAMEWORK')
 | |
|     if framework:
 | |
|         build.append(f'framework={framework}')
 | |
| 
 | |
|     # --enable-shared
 | |
|     shared = int(sysconfig.get_config_var('PY_ENABLE_SHARED') or '0')
 | |
|     if shared:
 | |
|         build.append('shared')
 | |
| 
 | |
|     # --with-lto
 | |
|     optimizations = []
 | |
|     if '-flto=thin' in ldflags_nodist:
 | |
|         optimizations.append('ThinLTO')
 | |
|     elif '-flto' in ldflags_nodist:
 | |
|         optimizations.append('LTO')
 | |
| 
 | |
|     if support.check_cflags_pgo():
 | |
|         # PGO (--enable-optimizations)
 | |
|         optimizations.append('PGO')
 | |
| 
 | |
|     if support.check_bolt_optimized():
 | |
|         # BOLT (--enable-bolt)
 | |
|         optimizations.append('BOLT')
 | |
| 
 | |
|     if optimizations:
 | |
|         build.append('+'.join(optimizations))
 | |
| 
 | |
|     # --with-address-sanitizer
 | |
|     sanitizers = []
 | |
|     if support.check_sanitizer(address=True):
 | |
|         sanitizers.append("ASAN")
 | |
|     # --with-memory-sanitizer
 | |
|     if support.check_sanitizer(memory=True):
 | |
|         sanitizers.append("MSAN")
 | |
|     # --with-undefined-behavior-sanitizer
 | |
|     if support.check_sanitizer(ub=True):
 | |
|         sanitizers.append("UBSAN")
 | |
|     # --with-thread-sanitizer
 | |
|     if support.check_sanitizer(thread=True):
 | |
|         sanitizers.append("TSAN")
 | |
|     if sanitizers:
 | |
|         build.append('+'.join(sanitizers))
 | |
| 
 | |
|     # --with-trace-refs
 | |
|     if hasattr(sys, 'getobjects'):
 | |
|         build.append("TraceRefs")
 | |
|     # --enable-pystats
 | |
|     if hasattr(sys, '_stats_on'):
 | |
|         build.append("pystats")
 | |
|     # --with-valgrind
 | |
|     if sysconfig.get_config_var('WITH_VALGRIND'):
 | |
|         build.append("valgrind")
 | |
|     # --with-dtrace
 | |
|     if sysconfig.get_config_var('WITH_DTRACE'):
 | |
|         build.append("dtrace")
 | |
| 
 | |
|     return build
 | |
| 
 | |
| 
 | |
| def get_temp_dir(tmp_dir: StrPath | None = None) -> StrPath:
 | |
|     if tmp_dir:
 | |
|         tmp_dir = os.path.expanduser(tmp_dir)
 | |
|     else:
 | |
|         # When tests are run from the Python build directory, it is best practice
 | |
|         # to keep the test files in a subfolder.  This eases the cleanup of leftover
 | |
|         # files using the "make distclean" command.
 | |
|         if sysconfig.is_python_build():
 | |
|             if not support.is_wasi:
 | |
|                 tmp_dir = sysconfig.get_config_var('abs_builddir')
 | |
|                 if tmp_dir is None:
 | |
|                     tmp_dir = sysconfig.get_config_var('abs_srcdir')
 | |
|                     if not tmp_dir:
 | |
|                         # gh-74470: On Windows, only srcdir is available. Using
 | |
|                         # abs_builddir mostly matters on UNIX when building
 | |
|                         # Python out of the source tree, especially when the
 | |
|                         # source tree is read only.
 | |
|                         tmp_dir = sysconfig.get_config_var('srcdir')
 | |
|                         if not tmp_dir:
 | |
|                             raise RuntimeError(
 | |
|                                 "Could not determine the correct value for tmp_dir"
 | |
|                             )
 | |
|                 tmp_dir = os.path.join(tmp_dir, 'build')
 | |
|             else:
 | |
|                 # WASI platform
 | |
|                 tmp_dir = sysconfig.get_config_var('projectbase')
 | |
|                 if not tmp_dir:
 | |
|                     raise RuntimeError(
 | |
|                         "sysconfig.get_config_var('projectbase') "
 | |
|                         f"unexpectedly returned {tmp_dir!r} on WASI"
 | |
|                     )
 | |
|                 tmp_dir = os.path.join(tmp_dir, 'build')
 | |
| 
 | |
|                 # When get_temp_dir() is called in a worker process,
 | |
|                 # get_temp_dir() path is different than in the parent process
 | |
|                 # which is not a WASI process. So the parent does not create
 | |
|                 # the same "tmp_dir" than the test worker process.
 | |
|                 os.makedirs(tmp_dir, exist_ok=True)
 | |
|         else:
 | |
|             tmp_dir = tempfile.gettempdir()
 | |
| 
 | |
|     return os.path.abspath(tmp_dir)
 | |
| 
 | |
| 
 | |
| def get_work_dir(parent_dir: StrPath, worker: bool = False) -> StrPath:
 | |
|     # Define a writable temp dir that will be used as cwd while running
 | |
|     # the tests. The name of the dir includes the pid to allow parallel
 | |
|     # testing (see the -j option).
 | |
|     # Emscripten and WASI have stubbed getpid(), Emscripten has only
 | |
|     # millisecond clock resolution. Use randint() instead.
 | |
|     if support.is_emscripten or support.is_wasi:
 | |
|         nounce = random.randint(0, 1_000_000)
 | |
|     else:
 | |
|         nounce = os.getpid()
 | |
| 
 | |
|     if worker:
 | |
|         work_dir = WORK_DIR_PREFIX + str(nounce)
 | |
|     else:
 | |
|         work_dir = WORKER_WORK_DIR_PREFIX + str(nounce)
 | |
|     work_dir += os_helper.FS_NONASCII
 | |
|     work_dir = os.path.join(parent_dir, work_dir)
 | |
|     return work_dir
 | |
| 
 | |
| 
 | |
| @contextlib.contextmanager
 | |
| def exit_timeout():
 | |
|     try:
 | |
|         yield
 | |
|     except SystemExit as exc:
 | |
|         # bpo-38203: Python can hang at exit in Py_Finalize(), especially
 | |
|         # on threading._shutdown() call: put a timeout
 | |
|         if threading_helper.can_start_thread:
 | |
|             faulthandler.dump_traceback_later(EXIT_TIMEOUT, exit=True)
 | |
|         sys.exit(exc.code)
 | |
| 
 | |
| 
 | |
| def remove_testfn(test_name: TestName, verbose: int) -> None:
 | |
|     # Try to clean up os_helper.TESTFN if left behind.
 | |
|     #
 | |
|     # While tests shouldn't leave any files or directories behind, when a test
 | |
|     # fails that can be tedious for it to arrange.  The consequences can be
 | |
|     # especially nasty on Windows, since if a test leaves a file open, it
 | |
|     # cannot be deleted by name (while there's nothing we can do about that
 | |
|     # here either, we can display the name of the offending test, which is a
 | |
|     # real help).
 | |
|     name = os_helper.TESTFN
 | |
|     if not os.path.exists(name):
 | |
|         return
 | |
| 
 | |
|     nuker: Callable[[str], None]
 | |
|     if os.path.isdir(name):
 | |
|         import shutil
 | |
|         kind, nuker = "directory", shutil.rmtree
 | |
|     elif os.path.isfile(name):
 | |
|         kind, nuker = "file", os.unlink
 | |
|     else:
 | |
|         raise RuntimeError(f"os.path says {name!r} exists but is neither "
 | |
|                            f"directory nor file")
 | |
| 
 | |
|     if verbose:
 | |
|         print_warning(f"{test_name} left behind {kind} {name!r}")
 | |
|         support.environment_altered = True
 | |
| 
 | |
|     try:
 | |
|         import stat
 | |
|         # fix possible permissions problems that might prevent cleanup
 | |
|         os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
 | |
|         nuker(name)
 | |
|     except Exception as exc:
 | |
|         print_warning(f"{test_name} left behind {kind} {name!r} "
 | |
|                       f"and it couldn't be removed: {exc}")
 | |
| 
 | |
| 
 | |
| def abs_module_name(test_name: TestName, test_dir: StrPath | None) -> TestName:
 | |
|     if test_name.startswith('test.') or test_dir:
 | |
|         return test_name
 | |
|     else:
 | |
|         # Import it from the test package
 | |
|         return 'test.' + test_name
 | |
| 
 | |
| 
 | |
| # gh-90681: When rerunning tests, we might need to rerun the whole
 | |
| # class or module suite if some its life-cycle hooks fail.
 | |
| # Test level hooks are not affected.
 | |
| _TEST_LIFECYCLE_HOOKS = frozenset((
 | |
|     'setUpClass', 'tearDownClass',
 | |
|     'setUpModule', 'tearDownModule',
 | |
| ))
 | |
| 
 | |
| def normalize_test_name(test_full_name: str, *,
 | |
|                         is_error: bool = False) -> str | None:
 | |
|     short_name = test_full_name.split(" ")[0]
 | |
|     if is_error and short_name in _TEST_LIFECYCLE_HOOKS:
 | |
|         if test_full_name.startswith(('setUpModule (', 'tearDownModule (')):
 | |
|             # if setUpModule() or tearDownModule() failed, don't filter
 | |
|             # tests with the test file name, don't use filters.
 | |
|             return None
 | |
| 
 | |
|         # This means that we have a failure in a life-cycle hook,
 | |
|         # we need to rerun the whole module or class suite.
 | |
|         # Basically the error looks like this:
 | |
|         #    ERROR: setUpClass (test.test_reg_ex.RegTest)
 | |
|         # or
 | |
|         #    ERROR: setUpModule (test.test_reg_ex)
 | |
|         # So, we need to parse the class / module name.
 | |
|         lpar = test_full_name.index('(')
 | |
|         rpar = test_full_name.index(')')
 | |
|         return test_full_name[lpar + 1: rpar].split('.')[-1]
 | |
|     return short_name
 | |
| 
 | |
| 
 | |
| def adjust_rlimit_nofile() -> None:
 | |
|     """
 | |
|     On macOS the default fd limit (RLIMIT_NOFILE) is sometimes too low (256)
 | |
|     for our test suite to succeed. Raise it to something more reasonable. 1024
 | |
|     is a common Linux default.
 | |
|     """
 | |
|     try:
 | |
|         import resource
 | |
|     except ImportError:
 | |
|         return
 | |
| 
 | |
|     fd_limit, max_fds = resource.getrlimit(resource.RLIMIT_NOFILE)
 | |
| 
 | |
|     desired_fds = 1024
 | |
| 
 | |
|     if fd_limit < desired_fds and fd_limit < max_fds:
 | |
|         new_fd_limit = min(desired_fds, max_fds)
 | |
|         try:
 | |
|             resource.setrlimit(resource.RLIMIT_NOFILE,
 | |
|                                (new_fd_limit, max_fds))
 | |
|             print(f"Raised RLIMIT_NOFILE: {fd_limit} -> {new_fd_limit}")
 | |
|         except (ValueError, OSError) as err:
 | |
|             print_warning(f"Unable to raise RLIMIT_NOFILE from {fd_limit} to "
 | |
|                           f"{new_fd_limit}: {err}.")
 | |
| 
 | |
| 
 | |
| def get_host_runner() -> str:
 | |
|     if (hostrunner := os.environ.get("_PYTHON_HOSTRUNNER")) is None:
 | |
|         hostrunner = sysconfig.get_config_var("HOSTRUNNER")
 | |
|     return hostrunner
 | |
| 
 | |
| 
 | |
| def is_cross_compiled() -> bool:
 | |
|     return ('_PYTHON_HOST_PLATFORM' in os.environ)
 | |
| 
 | |
| 
 | |
| def format_resources(use_resources: Iterable[str]) -> str:
 | |
|     use_resources = set(use_resources)
 | |
|     all_resources = set(ALL_RESOURCES)
 | |
| 
 | |
|     # Express resources relative to "all"
 | |
|     relative_all = ['all']
 | |
|     for name in sorted(all_resources - use_resources):
 | |
|         relative_all.append(f'-{name}')
 | |
|     for name in sorted(use_resources - all_resources):
 | |
|         relative_all.append(f'{name}')
 | |
|     all_text = ','.join(relative_all)
 | |
|     all_text = f"resources: {all_text}"
 | |
| 
 | |
|     # List of enabled resources
 | |
|     text = ','.join(sorted(use_resources))
 | |
|     text = f"resources ({len(use_resources)}): {text}"
 | |
| 
 | |
|     # Pick the shortest string (prefer relative to all if lengths are equal)
 | |
|     if len(all_text) <= len(text):
 | |
|         return all_text
 | |
|     else:
 | |
|         return text
 | |
| 
 | |
| 
 | |
| def display_header(use_resources: tuple[str, ...],
 | |
|                    python_cmd: tuple[str, ...] | None) -> None:
 | |
|     # Print basic platform information
 | |
|     print("==", platform.python_implementation(), *sys.version.split())
 | |
|     print("==", platform.platform(aliased=True),
 | |
|                   "%s-endian" % sys.byteorder)
 | |
|     print("== Python build:", ' '.join(get_build_info()))
 | |
|     print("== cwd:", os.getcwd())
 | |
| 
 | |
|     cpu_count: object = os.cpu_count()
 | |
|     if cpu_count:
 | |
|         # The function is new in Python 3.13; mypy doesn't know about it yet:
 | |
|         process_cpu_count = os.process_cpu_count()  # type: ignore[attr-defined]
 | |
|         if process_cpu_count and process_cpu_count != cpu_count:
 | |
|             cpu_count = f"{process_cpu_count} (process) / {cpu_count} (system)"
 | |
|         print("== CPU count:", cpu_count)
 | |
|     print("== encodings: locale=%s FS=%s"
 | |
|           % (locale.getencoding(), sys.getfilesystemencoding()))
 | |
| 
 | |
|     if use_resources:
 | |
|         text = format_resources(use_resources)
 | |
|         print(f"== {text}")
 | |
|     else:
 | |
|         print("== resources: all test resources are disabled, "
 | |
|               "use -u option to unskip tests")
 | |
| 
 | |
|     cross_compile = is_cross_compiled()
 | |
|     if cross_compile:
 | |
|         print("== cross compiled: Yes")
 | |
|     if python_cmd:
 | |
|         cmd = shlex.join(python_cmd)
 | |
|         print(f"== host python: {cmd}")
 | |
| 
 | |
|         get_cmd = [*python_cmd, '-m', 'platform']
 | |
|         proc = subprocess.run(
 | |
|             get_cmd,
 | |
|             stdout=subprocess.PIPE,
 | |
|             text=True,
 | |
|             cwd=os_helper.SAVEDCWD)
 | |
|         stdout = proc.stdout.replace('\n', ' ').strip()
 | |
|         if stdout:
 | |
|             print(f"== host platform: {stdout}")
 | |
|         elif proc.returncode:
 | |
|             print(f"== host platform: <command failed with exit code {proc.returncode}>")
 | |
|     else:
 | |
|         hostrunner = get_host_runner()
 | |
|         if hostrunner:
 | |
|             print(f"== host runner: {hostrunner}")
 | |
| 
 | |
|     # This makes it easier to remember what to set in your local
 | |
|     # environment when trying to reproduce a sanitizer failure.
 | |
|     asan = support.check_sanitizer(address=True)
 | |
|     msan = support.check_sanitizer(memory=True)
 | |
|     ubsan = support.check_sanitizer(ub=True)
 | |
|     tsan = support.check_sanitizer(thread=True)
 | |
|     sanitizers = []
 | |
|     if asan:
 | |
|         sanitizers.append("address")
 | |
|     if msan:
 | |
|         sanitizers.append("memory")
 | |
|     if ubsan:
 | |
|         sanitizers.append("undefined behavior")
 | |
|     if tsan:
 | |
|         sanitizers.append("thread")
 | |
|     if sanitizers:
 | |
|         print(f"== sanitizers: {', '.join(sanitizers)}")
 | |
|         for sanitizer, env_var in (
 | |
|             (asan, "ASAN_OPTIONS"),
 | |
|             (msan, "MSAN_OPTIONS"),
 | |
|             (ubsan, "UBSAN_OPTIONS"),
 | |
|             (tsan, "TSAN_OPTIONS"),
 | |
|         ):
 | |
|             options= os.environ.get(env_var)
 | |
|             if sanitizer and options is not None:
 | |
|                 print(f"== {env_var}={options!r}")
 | |
| 
 | |
|     print(flush=True)
 | |
| 
 | |
| 
 | |
| def cleanup_temp_dir(tmp_dir: StrPath) -> None:
 | |
|     import glob
 | |
| 
 | |
|     path = os.path.join(glob.escape(tmp_dir), TMP_PREFIX + '*')
 | |
|     print("Cleanup %s directory" % tmp_dir)
 | |
|     for name in glob.glob(path):
 | |
|         if os.path.isdir(name):
 | |
|             print("Remove directory: %s" % name)
 | |
|             os_helper.rmtree(name)
 | |
|         else:
 | |
|             print("Remove file: %s" % name)
 | |
|             os_helper.unlink(name)
 | |
| 
 | |
| 
 | |
| ILLEGAL_XML_CHARS_RE = re.compile(
 | |
|     '['
 | |
|     # Control characters; newline (\x0A and \x0D) and TAB (\x09) are legal
 | |
|     '\x00-\x08\x0B\x0C\x0E-\x1F'
 | |
|     # Surrogate characters
 | |
|     '\uD800-\uDFFF'
 | |
|     # Special Unicode characters
 | |
|     '\uFFFE'
 | |
|     '\uFFFF'
 | |
|     # Match multiple sequential invalid characters for better efficiency
 | |
|     ']+')
 | |
| 
 | |
| def _sanitize_xml_replace(regs):
 | |
|     text = regs[0]
 | |
|     return ''.join(f'\\x{ord(ch):02x}' if ch <= '\xff' else ascii(ch)[1:-1]
 | |
|                    for ch in text)
 | |
| 
 | |
| def sanitize_xml(text: str) -> str:
 | |
|     return ILLEGAL_XML_CHARS_RE.sub(_sanitize_xml_replace, text)
 | 
