mirror of
				https://github.com/python/cpython.git
				synced 2025-10-31 13:41:24 +00:00 
			
		
		
		
	 1170d5a292
			
		
	
	
		1170d5a292
		
			
		
	
	
	
	
		
			
			When the --fail-rerun option is used and a test fails and then pass,
regrtest now uses exit code 5 ("rerun) instead of 2 ("bad test").
		
	
			
		
			
				
	
	
		
			891 lines
		
	
	
	
		
			30 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			891 lines
		
	
	
	
		
			30 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import faulthandler
 | |
| import locale
 | |
| import os
 | |
| import platform
 | |
| import random
 | |
| import re
 | |
| import sys
 | |
| import sysconfig
 | |
| import tempfile
 | |
| import time
 | |
| import unittest
 | |
| from test.libregrtest.cmdline import _parse_args
 | |
| from test.libregrtest.runtest import (
 | |
|     findtests, split_test_packages, runtest, abs_module_name,
 | |
|     PROGRESS_MIN_TIME, State, MatchTestsDict, RunTests)
 | |
| from test.libregrtest.setup import setup_tests
 | |
| from test.libregrtest.pgo import setup_pgo_tests
 | |
| from test.libregrtest.utils import (strip_py_suffix, count, format_duration,
 | |
|                                     printlist, get_build_info)
 | |
| from test import support
 | |
| from test.support import TestStats
 | |
| from test.support import os_helper
 | |
| from test.support import threading_helper
 | |
| 
 | |
| 
 | |
| # bpo-38203: Maximum delay in seconds to exit Python (call Py_Finalize()).
 | |
| # Used to protect against threading._shutdown() hang.
 | |
| # Must be smaller than buildbot "1200 seconds without output" limit.
 | |
| EXIT_TIMEOUT = 120.0
 | |
| 
 | |
| EXITCODE_BAD_TEST = 2
 | |
| EXITCODE_ENV_CHANGED = 3
 | |
| EXITCODE_NO_TESTS_RAN = 4
 | |
| EXITCODE_RERUN_FAIL = 5
 | |
| EXITCODE_INTERRUPTED = 130
 | |
| 
 | |
| 
 | |
| class Regrtest:
 | |
|     """Execute a test suite.
 | |
| 
 | |
|     This also parses command-line options and modifies its behavior
 | |
|     accordingly.
 | |
| 
 | |
|     tests -- a list of strings containing test names (optional)
 | |
|     testdir -- the directory in which to look for tests (optional)
 | |
| 
 | |
|     Users other than the Python test suite will certainly want to
 | |
|     specify testdir; if it's omitted, the directory containing the
 | |
|     Python test suite is searched for.
 | |
| 
 | |
|     If the tests argument is omitted, the tests listed on the
 | |
|     command-line will be used.  If that's empty, too, then all *.py
 | |
|     files beginning with test_ will be used.
 | |
| 
 | |
|     The other default arguments (verbose, quiet, exclude,
 | |
|     single, randomize, use_resources, trace, coverdir,
 | |
|     print_slow, and random_seed) allow programmers calling main()
 | |
|     directly to set the values that would normally be set by flags
 | |
|     on the command line.
 | |
|     """
 | |
|     def __init__(self):
 | |
|         # Namespace of command line options
 | |
|         self.ns = None
 | |
| 
 | |
|         # tests
 | |
|         self.tests = []
 | |
|         self.selected = []
 | |
|         self.all_runtests: list[RunTests] = []
 | |
| 
 | |
|         # test results
 | |
|         self.good: list[str] = []
 | |
|         self.bad: list[str] = []
 | |
|         self.rerun_bad: list[str] = []
 | |
|         self.skipped: list[str] = []
 | |
|         self.resource_denied: list[str] = []
 | |
|         self.environment_changed: list[str] = []
 | |
|         self.run_no_tests: list[str] = []
 | |
|         self.rerun: list[str] = []
 | |
| 
 | |
|         self.need_rerun: list[TestResult] = []
 | |
|         self.first_state: str | None = None
 | |
|         self.interrupted = False
 | |
|         self.total_stats = TestStats()
 | |
| 
 | |
|         # used by --slow
 | |
|         self.test_times = []
 | |
| 
 | |
|         # used by --coverage, trace.Trace instance
 | |
|         self.tracer = None
 | |
| 
 | |
|         # used to display the progress bar "[ 3/100]"
 | |
|         self.start_time = time.perf_counter()
 | |
|         self.test_count_text = ''
 | |
|         self.test_count_width = 1
 | |
| 
 | |
|         # used by --single
 | |
|         self.next_single_test = None
 | |
|         self.next_single_filename = None
 | |
| 
 | |
|         # used by --junit-xml
 | |
|         self.testsuite_xml = None
 | |
| 
 | |
|         # misc
 | |
|         self.win_load_tracker = None
 | |
|         self.tmp_dir = None
 | |
| 
 | |
|     def get_executed(self):
 | |
|         return (set(self.good) | set(self.bad) | set(self.skipped)
 | |
|                 | set(self.resource_denied) | set(self.environment_changed)
 | |
|                 | set(self.run_no_tests))
 | |
| 
 | |
|     def accumulate_result(self, result, rerun=False):
 | |
|         fail_env_changed = self.ns.fail_env_changed
 | |
|         test_name = result.test_name
 | |
| 
 | |
|         match result.state:
 | |
|             case State.PASSED:
 | |
|                 self.good.append(test_name)
 | |
|             case State.ENV_CHANGED:
 | |
|                 self.environment_changed.append(test_name)
 | |
|             case State.SKIPPED:
 | |
|                 self.skipped.append(test_name)
 | |
|             case State.RESOURCE_DENIED:
 | |
|                 self.resource_denied.append(test_name)
 | |
|             case State.INTERRUPTED:
 | |
|                 self.interrupted = True
 | |
|             case State.DID_NOT_RUN:
 | |
|                 self.run_no_tests.append(test_name)
 | |
|             case _:
 | |
|                 if result.is_failed(fail_env_changed):
 | |
|                     self.bad.append(test_name)
 | |
|                     self.need_rerun.append(result)
 | |
|                 else:
 | |
|                     raise ValueError(f"invalid test state: {result.state!r}")
 | |
| 
 | |
|         if result.has_meaningful_duration() and not rerun:
 | |
|             self.test_times.append((result.duration, test_name))
 | |
|         if result.stats is not None:
 | |
|             self.total_stats.accumulate(result.stats)
 | |
|         if rerun:
 | |
|             self.rerun.append(test_name)
 | |
| 
 | |
|         xml_data = result.xml_data
 | |
|         if xml_data:
 | |
|             import xml.etree.ElementTree as ET
 | |
|             for e in xml_data:
 | |
|                 try:
 | |
|                     self.testsuite_xml.append(ET.fromstring(e))
 | |
|                 except ET.ParseError:
 | |
|                     print(xml_data, file=sys.__stderr__)
 | |
|                     raise
 | |
| 
 | |
|     def log(self, line=''):
 | |
|         empty = not line
 | |
| 
 | |
|         # add the system load prefix: "load avg: 1.80 "
 | |
|         load_avg = self.getloadavg()
 | |
|         if load_avg is not None:
 | |
|             line = f"load avg: {load_avg:.2f} {line}"
 | |
| 
 | |
|         # add the timestamp prefix:  "0:01:05 "
 | |
|         test_time = time.perf_counter() - self.start_time
 | |
| 
 | |
|         mins, secs = divmod(int(test_time), 60)
 | |
|         hours, mins = divmod(mins, 60)
 | |
|         test_time = "%d:%02d:%02d" % (hours, mins, secs)
 | |
| 
 | |
|         line = f"{test_time} {line}"
 | |
|         if empty:
 | |
|             line = line[:-1]
 | |
| 
 | |
|         print(line, flush=True)
 | |
| 
 | |
|     def display_progress(self, test_index, text):
 | |
|         quiet = self.ns.quiet
 | |
|         pgo = self.ns.pgo
 | |
|         if quiet:
 | |
|             return
 | |
| 
 | |
|         # "[ 51/405/1] test_tcl passed"
 | |
|         line = f"{test_index:{self.test_count_width}}{self.test_count_text}"
 | |
|         fails = len(self.bad) + len(self.environment_changed)
 | |
|         if fails and not pgo:
 | |
|             line = f"{line}/{fails}"
 | |
|         self.log(f"[{line}] {text}")
 | |
| 
 | |
|     def parse_args(self, kwargs):
 | |
|         ns = _parse_args(sys.argv[1:], **kwargs)
 | |
| 
 | |
|         if ns.xmlpath:
 | |
|             support.junit_xml_list = self.testsuite_xml = []
 | |
| 
 | |
|         strip_py_suffix(ns.args)
 | |
| 
 | |
|         if ns.huntrleaks:
 | |
|             warmup, repetitions, _ = ns.huntrleaks
 | |
|             if warmup < 1 or repetitions < 1:
 | |
|                 msg = ("Invalid values for the --huntrleaks/-R parameters. The "
 | |
|                        "number of warmups and repetitions must be at least 1 "
 | |
|                        "each (1:1).")
 | |
|                 print(msg, file=sys.stderr, flush=True)
 | |
|                 sys.exit(2)
 | |
| 
 | |
|         if ns.tempdir:
 | |
|             ns.tempdir = os.path.expanduser(ns.tempdir)
 | |
| 
 | |
|         self.ns = ns
 | |
| 
 | |
|     def find_tests(self, tests):
 | |
|         ns = self.ns
 | |
|         single = ns.single
 | |
|         fromfile = ns.fromfile
 | |
|         pgo = ns.pgo
 | |
|         exclude = ns.exclude
 | |
|         test_dir = ns.testdir
 | |
|         starting_test = ns.start
 | |
|         randomize = ns.randomize
 | |
| 
 | |
|         self.tests = tests
 | |
| 
 | |
|         if single:
 | |
|             self.next_single_filename = os.path.join(self.tmp_dir, 'pynexttest')
 | |
|             try:
 | |
|                 with open(self.next_single_filename, 'r') as fp:
 | |
|                     next_test = fp.read().strip()
 | |
|                     self.tests = [next_test]
 | |
|             except OSError:
 | |
|                 pass
 | |
| 
 | |
|         if fromfile:
 | |
|             self.tests = []
 | |
|             # regex to match 'test_builtin' in line:
 | |
|             # '0:00:00 [  4/400] test_builtin -- test_dict took 1 sec'
 | |
|             regex = re.compile(r'\btest_[a-zA-Z0-9_]+\b')
 | |
|             with open(os.path.join(os_helper.SAVEDCWD, fromfile)) as fp:
 | |
|                 for line in fp:
 | |
|                     line = line.split('#', 1)[0]
 | |
|                     line = line.strip()
 | |
|                     match = regex.search(line)
 | |
|                     if match is not None:
 | |
|                         self.tests.append(match.group())
 | |
| 
 | |
|         strip_py_suffix(self.tests)
 | |
| 
 | |
|         if pgo:
 | |
|             # add default PGO tests if no tests are specified
 | |
|             setup_pgo_tests(ns)
 | |
| 
 | |
|         exclude_tests = set()
 | |
|         if exclude:
 | |
|             for arg in ns.args:
 | |
|                 exclude_tests.add(arg)
 | |
|             ns.args = []
 | |
| 
 | |
|         alltests = findtests(testdir=test_dir, exclude=exclude_tests)
 | |
| 
 | |
|         if not fromfile:
 | |
|             self.selected = self.tests or ns.args
 | |
|             if self.selected:
 | |
|                 self.selected = split_test_packages(self.selected)
 | |
|             else:
 | |
|                 self.selected = alltests
 | |
|         else:
 | |
|             self.selected = self.tests
 | |
| 
 | |
|         if single:
 | |
|             self.selected = self.selected[:1]
 | |
|             try:
 | |
|                 pos = alltests.index(self.selected[0])
 | |
|                 self.next_single_test = alltests[pos + 1]
 | |
|             except IndexError:
 | |
|                 pass
 | |
| 
 | |
|         # Remove all the selected tests that precede start if it's set.
 | |
|         if starting_test:
 | |
|             try:
 | |
|                 del self.selected[:self.selected.index(starting_test)]
 | |
|             except ValueError:
 | |
|                 print(f"Cannot find starting test: {starting_test}")
 | |
|                 sys.exit(1)
 | |
| 
 | |
|         if randomize:
 | |
|             if ns.random_seed is None:
 | |
|                 ns.random_seed = random.randrange(10000000)
 | |
|             random.seed(ns.random_seed)
 | |
|             random.shuffle(self.selected)
 | |
| 
 | |
|     def list_tests(self):
 | |
|         for name in self.selected:
 | |
|             print(name)
 | |
| 
 | |
|     def _list_cases(self, suite):
 | |
|         for test in suite:
 | |
|             if isinstance(test, unittest.loader._FailedTest):
 | |
|                 continue
 | |
|             if isinstance(test, unittest.TestSuite):
 | |
|                 self._list_cases(test)
 | |
|             elif isinstance(test, unittest.TestCase):
 | |
|                 if support.match_test(test):
 | |
|                     print(test.id())
 | |
| 
 | |
|     def list_cases(self):
 | |
|         ns = self.ns
 | |
|         test_dir = ns.testdir
 | |
|         support.verbose = False
 | |
|         support.set_match_tests(ns.match_tests, ns.ignore_tests)
 | |
| 
 | |
|         skipped = []
 | |
|         for test_name in self.selected:
 | |
|             module_name = abs_module_name(test_name, test_dir)
 | |
|             try:
 | |
|                 suite = unittest.defaultTestLoader.loadTestsFromName(module_name)
 | |
|                 self._list_cases(suite)
 | |
|             except unittest.SkipTest:
 | |
|                 skipped.append(test_name)
 | |
| 
 | |
|         if skipped:
 | |
|             sys.stdout.flush()
 | |
|             stderr = sys.stderr
 | |
|             print(file=stderr)
 | |
|             print(count(len(skipped), "test"), "skipped:", file=stderr)
 | |
|             printlist(skipped, file=stderr)
 | |
| 
 | |
|     def get_rerun_match(self, rerun_list) -> MatchTestsDict:
 | |
|         rerun_match_tests = {}
 | |
|         for result in rerun_list:
 | |
|             match_tests = result.get_rerun_match_tests()
 | |
|             # ignore empty match list
 | |
|             if match_tests:
 | |
|                 rerun_match_tests[result.test_name] = match_tests
 | |
|         return rerun_match_tests
 | |
| 
 | |
|     def _rerun_failed_tests(self, need_rerun):
 | |
|         # Configure the runner to re-run tests
 | |
|         ns = self.ns
 | |
|         ns.verbose = True
 | |
|         ns.failfast = False
 | |
|         ns.verbose3 = False
 | |
|         ns.forever = False
 | |
|         if ns.use_mp is None:
 | |
|             ns.use_mp = 1
 | |
| 
 | |
|         # Get tests to re-run
 | |
|         tests = [result.test_name for result in need_rerun]
 | |
|         match_tests = self.get_rerun_match(need_rerun)
 | |
|         self.set_tests(tests)
 | |
| 
 | |
|         # Clear previously failed tests
 | |
|         self.rerun_bad.extend(self.bad)
 | |
|         self.bad.clear()
 | |
|         self.need_rerun.clear()
 | |
| 
 | |
|         # Re-run failed tests
 | |
|         self.log(f"Re-running {len(tests)} failed tests in verbose mode in subprocesses")
 | |
|         runtests = RunTests(tests, match_tests=match_tests, rerun=True)
 | |
|         self.all_runtests.append(runtests)
 | |
|         self._run_tests_mp(runtests)
 | |
| 
 | |
|     def rerun_failed_tests(self, need_rerun):
 | |
|         if self.ns.python:
 | |
|             # Temp patch for https://github.com/python/cpython/issues/94052
 | |
|             self.log(
 | |
|                 "Re-running failed tests is not supported with --python "
 | |
|                 "host runner option."
 | |
|             )
 | |
|             return
 | |
| 
 | |
|         self.first_state = self.get_tests_state()
 | |
| 
 | |
|         print()
 | |
|         self._rerun_failed_tests(need_rerun)
 | |
| 
 | |
|         if self.bad:
 | |
|             print(count(len(self.bad), 'test'), "failed again:")
 | |
|             printlist(self.bad)
 | |
| 
 | |
|         self.display_result()
 | |
| 
 | |
|     def display_result(self):
 | |
|         pgo = self.ns.pgo
 | |
|         quiet = self.ns.quiet
 | |
|         print_slow = self.ns.print_slow
 | |
| 
 | |
|         # If running the test suite for PGO then no one cares about results.
 | |
|         if pgo:
 | |
|             return
 | |
| 
 | |
|         print()
 | |
|         print("== Tests result: %s ==" % self.get_tests_state())
 | |
| 
 | |
|         if self.interrupted:
 | |
|             print("Test suite interrupted by signal SIGINT.")
 | |
| 
 | |
|         omitted = set(self.selected) - self.get_executed()
 | |
|         if omitted:
 | |
|             print()
 | |
|             print(count(len(omitted), "test"), "omitted:")
 | |
|             printlist(omitted)
 | |
| 
 | |
|         if self.good and not quiet:
 | |
|             print()
 | |
|             if (not self.bad
 | |
|                 and not self.skipped
 | |
|                 and not self.interrupted
 | |
|                 and len(self.good) > 1):
 | |
|                 print("All", end=' ')
 | |
|             print(count(len(self.good), "test"), "OK.")
 | |
| 
 | |
|         if print_slow:
 | |
|             self.test_times.sort(reverse=True)
 | |
|             print()
 | |
|             print("10 slowest tests:")
 | |
|             for test_time, test in self.test_times[:10]:
 | |
|                 print("- %s: %s" % (test, format_duration(test_time)))
 | |
| 
 | |
|         if self.bad:
 | |
|             print()
 | |
|             print(count(len(self.bad), "test"), "failed:")
 | |
|             printlist(self.bad)
 | |
| 
 | |
|         if self.environment_changed:
 | |
|             print()
 | |
|             print("{} altered the execution environment:".format(
 | |
|                      count(len(self.environment_changed), "test")))
 | |
|             printlist(self.environment_changed)
 | |
| 
 | |
|         if self.skipped and not quiet:
 | |
|             print()
 | |
|             print(count(len(self.skipped), "test"), "skipped:")
 | |
|             printlist(self.skipped)
 | |
| 
 | |
|         if self.resource_denied and not quiet:
 | |
|             print()
 | |
|             print(count(len(self.resource_denied), "test"), "skipped (resource denied):")
 | |
|             printlist(self.resource_denied)
 | |
| 
 | |
|         if self.rerun:
 | |
|             print()
 | |
|             print("%s:" % count(len(self.rerun), "re-run test"))
 | |
|             printlist(self.rerun)
 | |
| 
 | |
|         if self.run_no_tests:
 | |
|             print()
 | |
|             print(count(len(self.run_no_tests), "test"), "run no tests:")
 | |
|             printlist(self.run_no_tests)
 | |
| 
 | |
|     def run_test(self, test_index, test_name, previous_test, save_modules):
 | |
|         text = test_name
 | |
|         if previous_test:
 | |
|             text = '%s -- %s' % (text, previous_test)
 | |
|         self.display_progress(test_index, text)
 | |
| 
 | |
|         if self.tracer:
 | |
|             # If we're tracing code coverage, then we don't exit with status
 | |
|             # if on a false return value from main.
 | |
|             cmd = ('result = runtest(self.ns, test_name); '
 | |
|                    'self.accumulate_result(result)')
 | |
|             ns = dict(locals())
 | |
|             self.tracer.runctx(cmd, globals=globals(), locals=ns)
 | |
|             result = ns['result']
 | |
|         else:
 | |
|             result = runtest(self.ns, test_name)
 | |
|             self.accumulate_result(result)
 | |
| 
 | |
|         # Unload the newly imported modules (best effort finalization)
 | |
|         for module in sys.modules.keys():
 | |
|             if module not in save_modules and module.startswith("test."):
 | |
|                 support.unload(module)
 | |
| 
 | |
|         return result
 | |
| 
 | |
|     def run_tests_sequentially(self, runtests):
 | |
|         ns = self.ns
 | |
|         coverage = ns.trace
 | |
|         fail_fast = ns.failfast
 | |
|         fail_env_changed = ns.fail_env_changed
 | |
|         timeout = ns.timeout
 | |
| 
 | |
|         if coverage:
 | |
|             import trace
 | |
|             self.tracer = trace.Trace(trace=False, count=True)
 | |
| 
 | |
|         save_modules = sys.modules.keys()
 | |
| 
 | |
|         msg = "Run tests sequentially"
 | |
|         if timeout:
 | |
|             msg += " (timeout: %s)" % format_duration(timeout)
 | |
|         self.log(msg)
 | |
| 
 | |
|         previous_test = None
 | |
|         tests_iter = runtests.iter_tests()
 | |
|         for test_index, test_name in enumerate(tests_iter, 1):
 | |
|             start_time = time.perf_counter()
 | |
| 
 | |
|             result = self.run_test(test_index, test_name,
 | |
|                                    previous_test, save_modules)
 | |
| 
 | |
|             if result.must_stop(fail_fast, fail_env_changed):
 | |
|                 break
 | |
| 
 | |
|             previous_test = str(result)
 | |
|             test_time = time.perf_counter() - start_time
 | |
|             if test_time >= PROGRESS_MIN_TIME:
 | |
|                 previous_test = "%s in %s" % (previous_test, format_duration(test_time))
 | |
|             elif result.state == State.PASSED:
 | |
|                 # be quiet: say nothing if the test passed shortly
 | |
|                 previous_test = None
 | |
| 
 | |
|         if previous_test:
 | |
|             print(previous_test)
 | |
| 
 | |
|     def display_header(self):
 | |
|         # Print basic platform information
 | |
|         print("==", platform.python_implementation(), *sys.version.split())
 | |
|         print("==", platform.platform(aliased=True),
 | |
|                       "%s-endian" % sys.byteorder)
 | |
|         print("== Python build:", ' '.join(get_build_info()))
 | |
|         print("== cwd:", os.getcwd())
 | |
|         cpu_count = os.cpu_count()
 | |
|         if cpu_count:
 | |
|             print("== CPU count:", cpu_count)
 | |
|         print("== encodings: locale=%s, FS=%s"
 | |
|               % (locale.getencoding(), sys.getfilesystemencoding()))
 | |
|         self.display_sanitizers()
 | |
| 
 | |
|     def display_sanitizers(self):
 | |
|         # This makes it easier to remember what to set in your local
 | |
|         # environment when trying to reproduce a sanitizer failure.
 | |
|         asan = support.check_sanitizer(address=True)
 | |
|         msan = support.check_sanitizer(memory=True)
 | |
|         ubsan = support.check_sanitizer(ub=True)
 | |
|         sanitizers = []
 | |
|         if asan:
 | |
|             sanitizers.append("address")
 | |
|         if msan:
 | |
|             sanitizers.append("memory")
 | |
|         if ubsan:
 | |
|             sanitizers.append("undefined behavior")
 | |
|         if not sanitizers:
 | |
|             return
 | |
| 
 | |
|         print(f"== sanitizers: {', '.join(sanitizers)}")
 | |
|         for sanitizer, env_var in (
 | |
|             (asan, "ASAN_OPTIONS"),
 | |
|             (msan, "MSAN_OPTIONS"),
 | |
|             (ubsan, "UBSAN_OPTIONS"),
 | |
|         ):
 | |
|             options= os.environ.get(env_var)
 | |
|             if sanitizer and options is not None:
 | |
|                 print(f"== {env_var}={options!r}")
 | |
| 
 | |
|     def no_tests_run(self):
 | |
|         return not any((self.good, self.bad, self.skipped, self.interrupted,
 | |
|                         self.environment_changed))
 | |
| 
 | |
|     def get_tests_state(self):
 | |
|         fail_env_changed = self.ns.fail_env_changed
 | |
| 
 | |
|         result = []
 | |
|         if self.bad:
 | |
|             result.append("FAILURE")
 | |
|         elif fail_env_changed and self.environment_changed:
 | |
|             result.append("ENV CHANGED")
 | |
|         elif self.no_tests_run():
 | |
|             result.append("NO TESTS RAN")
 | |
| 
 | |
|         if self.interrupted:
 | |
|             result.append("INTERRUPTED")
 | |
| 
 | |
|         if not result:
 | |
|             result.append("SUCCESS")
 | |
| 
 | |
|         result = ', '.join(result)
 | |
|         if self.first_state:
 | |
|             result = '%s then %s' % (self.first_state, result)
 | |
|         return result
 | |
| 
 | |
|     def _run_tests_mp(self, runtests: RunTests) -> None:
 | |
|         from test.libregrtest.runtest_mp import run_tests_multiprocess
 | |
|         # If we're on windows and this is the parent runner (not a worker),
 | |
|         # track the load average.
 | |
|         if sys.platform == 'win32':
 | |
|             from test.libregrtest.win_utils import WindowsLoadTracker
 | |
| 
 | |
|             try:
 | |
|                 self.win_load_tracker = WindowsLoadTracker()
 | |
|             except PermissionError as error:
 | |
|                 # Standard accounts may not have access to the performance
 | |
|                 # counters.
 | |
|                 print(f'Failed to create WindowsLoadTracker: {error}')
 | |
| 
 | |
|         try:
 | |
|             run_tests_multiprocess(self, runtests)
 | |
|         finally:
 | |
|             if self.win_load_tracker is not None:
 | |
|                 self.win_load_tracker.close()
 | |
|                 self.win_load_tracker = None
 | |
| 
 | |
|     def set_tests(self, tests):
 | |
|         self.tests = tests
 | |
|         if self.ns.forever:
 | |
|             self.test_count_text = ''
 | |
|             self.test_count_width = 3
 | |
|         else:
 | |
|             self.test_count_text = '/{}'.format(len(self.tests))
 | |
|             self.test_count_width = len(self.test_count_text) - 1
 | |
| 
 | |
|     def run_tests(self):
 | |
|         # For a partial run, we do not need to clutter the output.
 | |
|         if (self.ns.header
 | |
|             or not(self.ns.pgo or self.ns.quiet or self.ns.single
 | |
|                    or self.tests or self.ns.args)):
 | |
|             self.display_header()
 | |
| 
 | |
|         if self.ns.huntrleaks:
 | |
|             warmup, repetitions, _ = self.ns.huntrleaks
 | |
|             if warmup < 3:
 | |
|                 msg = ("WARNING: Running tests with --huntrleaks/-R and less than "
 | |
|                         "3 warmup repetitions can give false positives!")
 | |
|                 print(msg, file=sys.stdout, flush=True)
 | |
| 
 | |
|         if self.ns.randomize:
 | |
|             print("Using random seed", self.ns.random_seed)
 | |
| 
 | |
|         tests = self.selected
 | |
|         self.set_tests(tests)
 | |
|         runtests = RunTests(tests, forever=self.ns.forever)
 | |
|         self.all_runtests.append(runtests)
 | |
|         if self.ns.use_mp:
 | |
|             self._run_tests_mp(runtests)
 | |
|         else:
 | |
|             self.run_tests_sequentially(runtests)
 | |
| 
 | |
|     def finalize(self):
 | |
|         if self.next_single_filename:
 | |
|             if self.next_single_test:
 | |
|                 with open(self.next_single_filename, 'w') as fp:
 | |
|                     fp.write(self.next_single_test + '\n')
 | |
|             else:
 | |
|                 os.unlink(self.next_single_filename)
 | |
| 
 | |
|         if self.tracer:
 | |
|             r = self.tracer.results()
 | |
|             r.write_results(show_missing=True, summary=True,
 | |
|                             coverdir=self.ns.coverdir)
 | |
| 
 | |
|         if self.ns.runleaks:
 | |
|             os.system("leaks %d" % os.getpid())
 | |
| 
 | |
|         self.save_xml_result()
 | |
| 
 | |
|     def display_summary(self):
 | |
|         duration = time.perf_counter() - self.start_time
 | |
|         first_runtests = self.all_runtests[0]
 | |
|         # the second runtests (re-run failed tests) disables forever,
 | |
|         # use the first runtests
 | |
|         forever = first_runtests.forever
 | |
|         filtered = bool(self.ns.match_tests) or bool(self.ns.ignore_tests)
 | |
| 
 | |
|         # Total duration
 | |
|         print()
 | |
|         print("Total duration: %s" % format_duration(duration))
 | |
| 
 | |
|         # Total tests
 | |
|         total = self.total_stats
 | |
|         text = f'run={total.tests_run:,}'
 | |
|         if filtered:
 | |
|             text = f"{text} (filtered)"
 | |
|         stats = [text]
 | |
|         if total.failures:
 | |
|             stats.append(f'failures={total.failures:,}')
 | |
|         if total.skipped:
 | |
|             stats.append(f'skipped={total.skipped:,}')
 | |
|         print(f"Total tests: {' '.join(stats)}")
 | |
| 
 | |
|         # Total test files
 | |
|         all_tests = [self.good, self.bad, self.rerun,
 | |
|                      self.skipped,
 | |
|                      self.environment_changed, self.run_no_tests]
 | |
|         run = sum(map(len, all_tests))
 | |
|         text = f'run={run}'
 | |
|         if not forever:
 | |
|             ntest = len(first_runtests.tests)
 | |
|             text = f"{text}/{ntest}"
 | |
|         if filtered:
 | |
|             text = f"{text} (filtered)"
 | |
|         report = [text]
 | |
|         for name, tests in (
 | |
|             ('failed', self.bad),
 | |
|             ('env_changed', self.environment_changed),
 | |
|             ('skipped', self.skipped),
 | |
|             ('resource_denied', self.resource_denied),
 | |
|             ('rerun', self.rerun),
 | |
|             ('run_no_tests', self.run_no_tests),
 | |
|         ):
 | |
|             if tests:
 | |
|                 report.append(f'{name}={len(tests)}')
 | |
|         print(f"Total test files: {' '.join(report)}")
 | |
| 
 | |
|         # Result
 | |
|         result = self.get_tests_state()
 | |
|         print(f"Result: {result}")
 | |
| 
 | |
|     def save_xml_result(self):
 | |
|         if not self.ns.xmlpath and not self.testsuite_xml:
 | |
|             return
 | |
| 
 | |
|         import xml.etree.ElementTree as ET
 | |
|         root = ET.Element("testsuites")
 | |
| 
 | |
|         # Manually count the totals for the overall summary
 | |
|         totals = {'tests': 0, 'errors': 0, 'failures': 0}
 | |
|         for suite in self.testsuite_xml:
 | |
|             root.append(suite)
 | |
|             for k in totals:
 | |
|                 try:
 | |
|                     totals[k] += int(suite.get(k, 0))
 | |
|                 except ValueError:
 | |
|                     pass
 | |
| 
 | |
|         for k, v in totals.items():
 | |
|             root.set(k, str(v))
 | |
| 
 | |
|         xmlpath = os.path.join(os_helper.SAVEDCWD, self.ns.xmlpath)
 | |
|         with open(xmlpath, 'wb') as f:
 | |
|             for s in ET.tostringlist(root):
 | |
|                 f.write(s)
 | |
| 
 | |
|     def fix_umask(self):
 | |
|         if support.is_emscripten:
 | |
|             # Emscripten has default umask 0o777, which breaks some tests.
 | |
|             # see https://github.com/emscripten-core/emscripten/issues/17269
 | |
|             old_mask = os.umask(0)
 | |
|             if old_mask == 0o777:
 | |
|                 os.umask(0o027)
 | |
|             else:
 | |
|                 os.umask(old_mask)
 | |
| 
 | |
|     def set_temp_dir(self):
 | |
|         if self.ns.tempdir:
 | |
|             self.tmp_dir = self.ns.tempdir
 | |
| 
 | |
|         if not self.tmp_dir:
 | |
|             # When tests are run from the Python build directory, it is best practice
 | |
|             # to keep the test files in a subfolder.  This eases the cleanup of leftover
 | |
|             # files using the "make distclean" command.
 | |
|             if sysconfig.is_python_build():
 | |
|                 self.tmp_dir = sysconfig.get_config_var('abs_builddir')
 | |
|                 if self.tmp_dir is None:
 | |
|                     # bpo-30284: On Windows, only srcdir is available. Using
 | |
|                     # abs_builddir mostly matters on UNIX when building Python
 | |
|                     # out of the source tree, especially when the source tree
 | |
|                     # is read only.
 | |
|                     self.tmp_dir = sysconfig.get_config_var('srcdir')
 | |
|                 self.tmp_dir = os.path.join(self.tmp_dir, 'build')
 | |
|             else:
 | |
|                 self.tmp_dir = tempfile.gettempdir()
 | |
| 
 | |
|         self.tmp_dir = os.path.abspath(self.tmp_dir)
 | |
| 
 | |
|     def is_worker(self):
 | |
|         return (self.ns.worker_args is not None)
 | |
| 
 | |
|     def create_temp_dir(self):
 | |
|         os.makedirs(self.tmp_dir, exist_ok=True)
 | |
| 
 | |
|         # Define a writable temp dir that will be used as cwd while running
 | |
|         # the tests. The name of the dir includes the pid to allow parallel
 | |
|         # testing (see the -j option).
 | |
|         # Emscripten and WASI have stubbed getpid(), Emscripten has only
 | |
|         # milisecond clock resolution. Use randint() instead.
 | |
|         if sys.platform in {"emscripten", "wasi"}:
 | |
|             nounce = random.randint(0, 1_000_000)
 | |
|         else:
 | |
|             nounce = os.getpid()
 | |
| 
 | |
|         if self.is_worker():
 | |
|             test_cwd = 'test_python_worker_{}'.format(nounce)
 | |
|         else:
 | |
|             test_cwd = 'test_python_{}'.format(nounce)
 | |
|         test_cwd += os_helper.FS_NONASCII
 | |
|         test_cwd = os.path.join(self.tmp_dir, test_cwd)
 | |
|         return test_cwd
 | |
| 
 | |
|     def cleanup(self):
 | |
|         import glob
 | |
| 
 | |
|         path = os.path.join(glob.escape(self.tmp_dir), 'test_python_*')
 | |
|         print("Cleanup %s directory" % self.tmp_dir)
 | |
|         for name in glob.glob(path):
 | |
|             if os.path.isdir(name):
 | |
|                 print("Remove directory: %s" % name)
 | |
|                 os_helper.rmtree(name)
 | |
|             else:
 | |
|                 print("Remove file: %s" % name)
 | |
|                 os_helper.unlink(name)
 | |
| 
 | |
|     def main(self, tests=None, **kwargs):
 | |
|         self.parse_args(kwargs)
 | |
| 
 | |
|         self.set_temp_dir()
 | |
| 
 | |
|         self.fix_umask()
 | |
| 
 | |
|         if self.ns.cleanup:
 | |
|             self.cleanup()
 | |
|             sys.exit(0)
 | |
| 
 | |
|         test_cwd = self.create_temp_dir()
 | |
| 
 | |
|         try:
 | |
|             # Run the tests in a context manager that temporarily changes the CWD
 | |
|             # to a temporary and writable directory. If it's not possible to
 | |
|             # create or change the CWD, the original CWD will be used.
 | |
|             # The original CWD is available from os_helper.SAVEDCWD.
 | |
|             with os_helper.temp_cwd(test_cwd, quiet=True):
 | |
|                 # When using multiprocessing, worker processes will use test_cwd
 | |
|                 # as their parent temporary directory. So when the main process
 | |
|                 # exit, it removes also subdirectories of worker processes.
 | |
|                 self.ns.tempdir = test_cwd
 | |
| 
 | |
|                 self._main(tests, kwargs)
 | |
|         except SystemExit as exc:
 | |
|             # bpo-38203: Python can hang at exit in Py_Finalize(), especially
 | |
|             # on threading._shutdown() call: put a timeout
 | |
|             if threading_helper.can_start_thread:
 | |
|                 faulthandler.dump_traceback_later(EXIT_TIMEOUT, exit=True)
 | |
| 
 | |
|             sys.exit(exc.code)
 | |
| 
 | |
|     def getloadavg(self):
 | |
|         if self.win_load_tracker is not None:
 | |
|             return self.win_load_tracker.getloadavg()
 | |
| 
 | |
|         if hasattr(os, 'getloadavg'):
 | |
|             return os.getloadavg()[0]
 | |
| 
 | |
|         return None
 | |
| 
 | |
|     def get_exitcode(self):
 | |
|         exitcode = 0
 | |
|         if self.bad:
 | |
|             exitcode = EXITCODE_BAD_TEST
 | |
|         elif self.interrupted:
 | |
|             exitcode = EXITCODE_INTERRUPTED
 | |
|         elif self.ns.fail_env_changed and self.environment_changed:
 | |
|             exitcode = EXITCODE_ENV_CHANGED
 | |
|         elif self.no_tests_run():
 | |
|             exitcode = EXITCODE_NO_TESTS_RAN
 | |
|         elif self.rerun and self.ns.fail_rerun:
 | |
|             exitcode = EXITCODE_RERUN_FAIL
 | |
|         return exitcode
 | |
| 
 | |
|     def action_run_tests(self):
 | |
|         self.run_tests()
 | |
|         self.display_result()
 | |
| 
 | |
|         need_rerun = self.need_rerun
 | |
|         if self.ns.rerun and need_rerun:
 | |
|             self.rerun_failed_tests(need_rerun)
 | |
| 
 | |
|         self.display_summary()
 | |
|         self.finalize()
 | |
| 
 | |
|     def _main(self, tests, kwargs):
 | |
|         if self.is_worker():
 | |
|             from test.libregrtest.runtest_mp import run_tests_worker
 | |
|             run_tests_worker(self.ns.worker_args)
 | |
|             return
 | |
| 
 | |
|         if self.ns.wait:
 | |
|             input("Press any key to continue...")
 | |
| 
 | |
|         setup_tests(self.ns)
 | |
|         self.find_tests(tests)
 | |
| 
 | |
|         exitcode = 0
 | |
|         if self.ns.list_tests:
 | |
|             self.list_tests()
 | |
|         elif self.ns.list_cases:
 | |
|             self.list_cases()
 | |
|         else:
 | |
|             self.action_run_tests()
 | |
|             exitcode = self.get_exitcode()
 | |
| 
 | |
|         sys.exit(exitcode)
 | |
| 
 | |
| 
 | |
| def main(tests=None, **kwargs):
 | |
|     """Run the Python suite."""
 | |
|     Regrtest().main(tests=tests, **kwargs)
 |