mirror of
				https://github.com/python/cpython.git
				synced 2025-10-31 05:31:20 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			318 lines
		
	
	
	
		
			10 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			318 lines
		
	
	
	
		
			10 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import faulthandler
 | |
| import gc
 | |
| import importlib
 | |
| import io
 | |
| import sys
 | |
| import time
 | |
| import traceback
 | |
| import unittest
 | |
| 
 | |
| from test import support
 | |
| from test.support import threading_helper
 | |
| 
 | |
| from .filter import match_test
 | |
| from .result import State, TestResult, TestStats
 | |
| from .runtests import RunTests
 | |
| from .save_env import saved_test_environment
 | |
| from .setup import setup_tests
 | |
| from .testresult import get_test_runner
 | |
| from .utils import (
 | |
|     TestName,
 | |
|     clear_caches, remove_testfn, abs_module_name, print_warning)
 | |
| 
 | |
| 
 | |
| # Minimum duration of a test to display its duration or to mention that
 | |
| # the test is running in background
 | |
| PROGRESS_MIN_TIME = 30.0   # seconds
 | |
| 
 | |
| 
 | |
| def run_unittest(test_mod):
 | |
|     loader = unittest.TestLoader()
 | |
|     tests = loader.loadTestsFromModule(test_mod)
 | |
|     for error in loader.errors:
 | |
|         print(error, file=sys.stderr)
 | |
|     if loader.errors:
 | |
|         raise Exception("errors while loading tests")
 | |
|     _filter_suite(tests, match_test)
 | |
|     return _run_suite(tests)
 | |
| 
 | |
| def _filter_suite(suite, pred):
 | |
|     """Recursively filter test cases in a suite based on a predicate."""
 | |
|     newtests = []
 | |
|     for test in suite._tests:
 | |
|         if isinstance(test, unittest.TestSuite):
 | |
|             _filter_suite(test, pred)
 | |
|             newtests.append(test)
 | |
|         else:
 | |
|             if pred(test):
 | |
|                 newtests.append(test)
 | |
|     suite._tests = newtests
 | |
| 
 | |
| def _run_suite(suite):
 | |
|     """Run tests from a unittest.TestSuite-derived class."""
 | |
|     runner = get_test_runner(sys.stdout,
 | |
|                              verbosity=support.verbose,
 | |
|                              capture_output=(support.junit_xml_list is not None))
 | |
| 
 | |
|     result = runner.run(suite)
 | |
| 
 | |
|     if support.junit_xml_list is not None:
 | |
|         support.junit_xml_list.append(result.get_xml_element())
 | |
| 
 | |
|     if not result.testsRun and not result.skipped and not result.errors:
 | |
|         raise support.TestDidNotRun
 | |
|     if not result.wasSuccessful():
 | |
|         stats = TestStats.from_unittest(result)
 | |
|         if len(result.errors) == 1 and not result.failures:
 | |
|             err = result.errors[0][1]
 | |
|         elif len(result.failures) == 1 and not result.errors:
 | |
|             err = result.failures[0][1]
 | |
|         else:
 | |
|             err = "multiple errors occurred"
 | |
|             if not support.verbose: err += "; run in verbose mode for details"
 | |
|         errors = [(str(tc), exc_str) for tc, exc_str in result.errors]
 | |
|         failures = [(str(tc), exc_str) for tc, exc_str in result.failures]
 | |
|         raise support.TestFailedWithDetails(err, errors, failures, stats=stats)
 | |
|     return result
 | |
| 
 | |
| 
 | |
| def regrtest_runner(result: TestResult, test_func, runtests: RunTests) -> None:
 | |
|     # Run test_func(), collect statistics, and detect reference and memory
 | |
|     # leaks.
 | |
|     if runtests.hunt_refleak:
 | |
|         from .refleak import runtest_refleak
 | |
|         refleak, test_result = runtest_refleak(result.test_name, test_func,
 | |
|                                                runtests.hunt_refleak,
 | |
|                                                runtests.quiet)
 | |
|     else:
 | |
|         test_result = test_func()
 | |
|         refleak = False
 | |
| 
 | |
|     if refleak:
 | |
|         result.state = State.REFLEAK
 | |
| 
 | |
|     stats: TestStats | None
 | |
| 
 | |
|     match test_result:
 | |
|         case TestStats():
 | |
|             stats = test_result
 | |
|         case unittest.TestResult():
 | |
|             stats = TestStats.from_unittest(test_result)
 | |
|         case None:
 | |
|             print_warning(f"{result.test_name} test runner returned None: {test_func}")
 | |
|             stats = None
 | |
|         case _:
 | |
|             # Don't import doctest at top level since only few tests return
 | |
|             # a doctest.TestResult instance.
 | |
|             import doctest
 | |
|             if isinstance(test_result, doctest.TestResults):
 | |
|                 stats = TestStats.from_doctest(test_result)
 | |
|             else:
 | |
|                 print_warning(f"Unknown test result type: {type(test_result)}")
 | |
|                 stats = None
 | |
| 
 | |
|     result.stats = stats
 | |
| 
 | |
| 
 | |
| # Storage of uncollectable GC objects (gc.garbage)
 | |
| GC_GARBAGE = []
 | |
| 
 | |
| 
 | |
| def _load_run_test(result: TestResult, runtests: RunTests) -> None:
 | |
|     # Load the test module and run the tests.
 | |
|     test_name = result.test_name
 | |
|     module_name = abs_module_name(test_name, runtests.test_dir)
 | |
|     test_mod = importlib.import_module(module_name)
 | |
| 
 | |
|     if hasattr(test_mod, "test_main"):
 | |
|         # https://github.com/python/cpython/issues/89392
 | |
|         raise Exception(f"Module {test_name} defines test_main() which "
 | |
|                         f"is no longer supported by regrtest")
 | |
|     def test_func():
 | |
|         return run_unittest(test_mod)
 | |
| 
 | |
|     try:
 | |
|         regrtest_runner(result, test_func, runtests)
 | |
|     finally:
 | |
|         # First kill any dangling references to open files etc.
 | |
|         # This can also issue some ResourceWarnings which would otherwise get
 | |
|         # triggered during the following test run, and possibly produce
 | |
|         # failures.
 | |
|         support.gc_collect()
 | |
| 
 | |
|         remove_testfn(test_name, runtests.verbose)
 | |
| 
 | |
|     if gc.garbage:
 | |
|         support.environment_altered = True
 | |
|         print_warning(f"{test_name} created {len(gc.garbage)} "
 | |
|                       f"uncollectable object(s)")
 | |
| 
 | |
|         # move the uncollectable objects somewhere,
 | |
|         # so we don't see them again
 | |
|         GC_GARBAGE.extend(gc.garbage)
 | |
|         gc.garbage.clear()
 | |
| 
 | |
|     support.reap_children()
 | |
| 
 | |
| 
 | |
| def _runtest_env_changed_exc(result: TestResult, runtests: RunTests,
 | |
|                              display_failure: bool = True) -> None:
 | |
|     # Handle exceptions, detect environment changes.
 | |
| 
 | |
|     # Reset the environment_altered flag to detect if a test altered
 | |
|     # the environment
 | |
|     support.environment_altered = False
 | |
| 
 | |
|     pgo = runtests.pgo
 | |
|     if pgo:
 | |
|         display_failure = False
 | |
|     quiet = runtests.quiet
 | |
| 
 | |
|     test_name = result.test_name
 | |
|     try:
 | |
|         clear_caches()
 | |
|         support.gc_collect()
 | |
| 
 | |
|         with saved_test_environment(test_name,
 | |
|                                     runtests.verbose, quiet, pgo=pgo):
 | |
|             _load_run_test(result, runtests)
 | |
|     except support.ResourceDenied as exc:
 | |
|         if not quiet and not pgo:
 | |
|             print(f"{test_name} skipped -- {exc}", flush=True)
 | |
|         result.state = State.RESOURCE_DENIED
 | |
|         return
 | |
|     except unittest.SkipTest as exc:
 | |
|         if not quiet and not pgo:
 | |
|             print(f"{test_name} skipped -- {exc}", flush=True)
 | |
|         result.state = State.SKIPPED
 | |
|         return
 | |
|     except support.TestFailedWithDetails as exc:
 | |
|         msg = f"test {test_name} failed"
 | |
|         if display_failure:
 | |
|             msg = f"{msg} -- {exc}"
 | |
|         print(msg, file=sys.stderr, flush=True)
 | |
|         result.state = State.FAILED
 | |
|         result.errors = exc.errors
 | |
|         result.failures = exc.failures
 | |
|         result.stats = exc.stats
 | |
|         return
 | |
|     except support.TestFailed as exc:
 | |
|         msg = f"test {test_name} failed"
 | |
|         if display_failure:
 | |
|             msg = f"{msg} -- {exc}"
 | |
|         print(msg, file=sys.stderr, flush=True)
 | |
|         result.state = State.FAILED
 | |
|         result.stats = exc.stats
 | |
|         return
 | |
|     except support.TestDidNotRun:
 | |
|         result.state = State.DID_NOT_RUN
 | |
|         return
 | |
|     except KeyboardInterrupt:
 | |
|         print()
 | |
|         result.state = State.INTERRUPTED
 | |
|         return
 | |
|     except:
 | |
|         if not pgo:
 | |
|             msg = traceback.format_exc()
 | |
|             print(f"test {test_name} crashed -- {msg}",
 | |
|                   file=sys.stderr, flush=True)
 | |
|         result.state = State.UNCAUGHT_EXC
 | |
|         return
 | |
| 
 | |
|     if support.environment_altered:
 | |
|         result.set_env_changed()
 | |
|     # Don't override the state if it was already set (REFLEAK or ENV_CHANGED)
 | |
|     if result.state is None:
 | |
|         result.state = State.PASSED
 | |
| 
 | |
| 
 | |
| def _runtest(result: TestResult, runtests: RunTests) -> None:
 | |
|     # Capture stdout and stderr, set faulthandler timeout,
 | |
|     # and create JUnit XML report.
 | |
|     verbose = runtests.verbose
 | |
|     output_on_failure = runtests.output_on_failure
 | |
|     timeout = runtests.timeout
 | |
| 
 | |
|     if timeout is not None and threading_helper.can_start_thread:
 | |
|         use_timeout = True
 | |
|         faulthandler.dump_traceback_later(timeout, exit=True)
 | |
|     else:
 | |
|         use_timeout = False
 | |
| 
 | |
|     try:
 | |
|         setup_tests(runtests)
 | |
| 
 | |
|         if output_on_failure:
 | |
|             support.verbose = True
 | |
| 
 | |
|             stream = io.StringIO()
 | |
|             orig_stdout = sys.stdout
 | |
|             orig_stderr = sys.stderr
 | |
|             print_warning = support.print_warning
 | |
|             orig_print_warnings_stderr = print_warning.orig_stderr
 | |
| 
 | |
|             output = None
 | |
|             try:
 | |
|                 sys.stdout = stream
 | |
|                 sys.stderr = stream
 | |
|                 # print_warning() writes into the temporary stream to preserve
 | |
|                 # messages order. If support.environment_altered becomes true,
 | |
|                 # warnings will be written to sys.stderr below.
 | |
|                 print_warning.orig_stderr = stream
 | |
| 
 | |
|                 _runtest_env_changed_exc(result, runtests, display_failure=False)
 | |
|                 # Ignore output if the test passed successfully
 | |
|                 if result.state != State.PASSED:
 | |
|                     output = stream.getvalue()
 | |
|             finally:
 | |
|                 sys.stdout = orig_stdout
 | |
|                 sys.stderr = orig_stderr
 | |
|                 print_warning.orig_stderr = orig_print_warnings_stderr
 | |
| 
 | |
|             if output is not None:
 | |
|                 sys.stderr.write(output)
 | |
|                 sys.stderr.flush()
 | |
|         else:
 | |
|             # Tell tests to be moderately quiet
 | |
|             support.verbose = verbose
 | |
|             _runtest_env_changed_exc(result, runtests,
 | |
|                                      display_failure=not verbose)
 | |
| 
 | |
|         xml_list = support.junit_xml_list
 | |
|         if xml_list:
 | |
|             import xml.etree.ElementTree as ET
 | |
|             result.xml_data = [ET.tostring(x).decode('us-ascii')
 | |
|                                for x in xml_list]
 | |
|     finally:
 | |
|         if use_timeout:
 | |
|             faulthandler.cancel_dump_traceback_later()
 | |
|         support.junit_xml_list = None
 | |
| 
 | |
| 
 | |
| def run_single_test(test_name: TestName, runtests: RunTests) -> TestResult:
 | |
|     """Run a single test.
 | |
| 
 | |
|     test_name -- the name of the test
 | |
| 
 | |
|     Returns a TestResult.
 | |
| 
 | |
|     If runtests.use_junit, xml_data is a list containing each generated
 | |
|     testsuite element.
 | |
|     """
 | |
|     start_time = time.perf_counter()
 | |
|     result = TestResult(test_name)
 | |
|     pgo = runtests.pgo
 | |
|     try:
 | |
|         _runtest(result, runtests)
 | |
|     except:
 | |
|         if not pgo:
 | |
|             msg = traceback.format_exc()
 | |
|             print(f"test {test_name} crashed -- {msg}",
 | |
|                   file=sys.stderr, flush=True)
 | |
|         result.state = State.UNCAUGHT_EXC
 | |
| 
 | |
|     sys.stdout.flush()
 | |
|     sys.stderr.flush()
 | |
| 
 | |
|     result.duration = time.perf_counter() - start_time
 | |
|     return result
 | 
