[3.7] bpo-36719: sync regrtest with master branch (GH-12967)

* Clean up code which checked presence of os.{stat,lstat,chmod} (GH-11643)

(cherry picked from commit 8377cd4fcd)

* bpo-36725: regrtest: add TestResult type (GH-12960)

* Add TestResult and MultiprocessResult types to ensure that results
  always have the same fields.
* runtest() now handles KeyboardInterrupt
* accumulate_result() and format_test_result() now takes a TestResult
* cleanup_test_droppings() is now called by runtest() and mark the
  test as ENV_CHANGED if the test leaks support.TESTFN file.
* runtest() now includes code "around" the test in the test timing
* Add print_warning() in test.libregrtest.utils to standardize how
  libregrtest logs warnings to ease parsing the test output.
* support.unload() is now called with abstest rather than test_name
* Rename 'test' variable/parameter to 'test_name'
* dash_R(): remove unused the_module parameter
* Remove unused imports

(cherry picked from commit 4d29983185)

* bpo-36725: Refactor regrtest multiprocessing code (GH-12961)

Rewrite run_tests_multiprocess() function as a new MultiprocessRunner
class with multiple methods to better report errors and stop
immediately when needed.

Changes:

* Worker processes are now killed immediately if tests are
  interrupted or if a test does crash (CHILD_ERROR): worker
  processes are killed.
* Rewrite how errors in a worker thread are reported to
  the main thread. No longer ignore BaseException or parsing errors
  silently.
* Remove 'finished' variable: use worker.is_alive() instead
* Always compute omitted tests. Add Regrtest.get_executed() method.

(cherry picked from commit 3cde440f20)

* bpo-36719: regrtest always detect uncollectable objects (GH-12951)

regrtest now always detects uncollectable objects. Previously, the
check was only enabled by --findleaks. The check now also works with
-jN/--multiprocess N.

--findleaks becomes a deprecated alias to --fail-env-changed.

(cherry picked from commit 75120d2205)

* bpo-34060: Report system load when running test suite for Windows (GH-8357)

While Windows exposes the system processor queue length, the raw value
used for load calculations on Unix systems, it does not provide an API
to access the averaged value. Hence to calculate the load we must track
and average it ourselves. We can't use multiprocessing or a thread to
read it in the background while the tests run since using those would
conflict with test_multiprocessing and test_xxsubprocess.

Thus, we use Window's asynchronous IO API to run the tracker in the
background with it sampling at the correct rate. When we wish to access
the load we check to see if there's new data on the stream, if there is,
we update our load values.


(cherry picked from commit e16467af0b)

* bpo-36719: Fix regrtest re-run (GH-12964)

Properly handle a test which fail but then pass.

Add test_rerun_success() unit test.

(cherry picked from commit 837acc1957)

* bpo-36719: regrtest closes explicitly WindowsLoadTracker (GH-12965)

Regrtest.finalize() now closes explicitly the WindowsLoadTracker
instance.

(cherry picked from commit 00db7c73af)
This commit is contained in:
Victor Stinner 2019-04-26 12:16:30 +02:00 committed by GitHub
parent 3076a3e0d1
commit 1069d38fa1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 681 additions and 397 deletions

View file

@ -226,8 +226,9 @@ def _create_parser():
'(instead of the Python stdlib test suite)')
group = parser.add_argument_group('Special runs')
group.add_argument('-l', '--findleaks', action='store_true',
help='if GC is available detect tests that leak memory')
group.add_argument('-l', '--findleaks', action='store_const', const=2,
default=1,
help='deprecated alias to --fail-env-changed')
group.add_argument('-L', '--runleaks', action='store_true',
help='run the leaks(1) command just before exit.' +
more_details)
@ -309,7 +310,7 @@ def _parse_args(args, **kwargs):
# Defaults
ns = argparse.Namespace(testdir=None, verbose=0, quiet=False,
exclude=False, single=False, randomize=False, fromfile=None,
findleaks=False, use_resources=None, trace=False, coverdir='coverage',
findleaks=1, use_resources=None, trace=False, coverdir='coverage',
runleaks=False, huntrleaks=False, verbose2=False, print_slow=False,
random_seed=None, use_mp=None, verbose3=False, forever=False,
header=False, failfast=False, match_tests=None, pgo=False)
@ -330,12 +331,13 @@ def _parse_args(args, **kwargs):
parser.error("unrecognized arguments: %s" % arg)
sys.exit(1)
if ns.findleaks > 1:
# --findleaks implies --fail-env-changed
ns.fail_env_changed = True
if ns.single and ns.fromfile:
parser.error("-s and -f don't go together!")
if ns.use_mp is not None and ns.trace:
parser.error("-T and -j don't go together!")
if ns.use_mp is not None and ns.findleaks:
parser.error("-l and -j don't go together!")
if ns.failfast and not (ns.verbose or ns.verbose3):
parser.error("-G/--failfast needs either -v or -W")
if ns.pgo and (ns.verbose or ns.verbose2 or ns.verbose3):

View file

@ -20,10 +20,6 @@
from test.libregrtest.setup import setup_tests
from test.libregrtest.utils import removepy, count, format_duration, printlist
from test import support
try:
import gc
except ImportError:
gc = None
# When tests are run from the Python build directory, it is best practice
@ -79,8 +75,8 @@ def __init__(self):
self.skipped = []
self.resource_denieds = []
self.environment_changed = []
self.rerun = []
self.run_no_tests = []
self.rerun = []
self.first_result = None
self.interrupted = False
@ -90,9 +86,6 @@ def __init__(self):
# used by --coverage, trace.Trace instance
self.tracer = None
# used by --findleaks, store for gc.garbage
self.found_garbage = []
# used to display the progress bar "[ 3/100]"
self.start_time = time.monotonic()
self.test_count = ''
@ -105,26 +98,43 @@ def __init__(self):
# used by --junit-xml
self.testsuite_xml = None
def accumulate_result(self, test, result):
ok, test_time, xml_data = result
if ok not in (CHILD_ERROR, INTERRUPTED):
self.test_times.append((test_time, test))
self.win_load_tracker = None
def get_executed(self):
return (set(self.good) | set(self.bad) | set(self.skipped)
| set(self.resource_denieds) | set(self.environment_changed)
| set(self.run_no_tests))
def accumulate_result(self, result, rerun=False):
test_name = result.test_name
ok = result.result
if ok not in (CHILD_ERROR, INTERRUPTED) and not rerun:
self.test_times.append((result.test_time, test_name))
if ok == PASSED:
self.good.append(test)
self.good.append(test_name)
elif ok in (FAILED, CHILD_ERROR):
self.bad.append(test)
if not rerun:
self.bad.append(test_name)
elif ok == ENV_CHANGED:
self.environment_changed.append(test)
self.environment_changed.append(test_name)
elif ok == SKIPPED:
self.skipped.append(test)
self.skipped.append(test_name)
elif ok == RESOURCE_DENIED:
self.skipped.append(test)
self.resource_denieds.append(test)
self.skipped.append(test_name)
self.resource_denieds.append(test_name)
elif ok == TEST_DID_NOT_RUN:
self.run_no_tests.append(test)
elif ok != INTERRUPTED:
self.run_no_tests.append(test_name)
elif ok == INTERRUPTED:
self.interrupted = True
else:
raise ValueError("invalid test result: %r" % ok)
if rerun and ok not in {FAILED, CHILD_ERROR, INTERRUPTED}:
self.bad.remove(test_name)
xml_data = result.xml_data
if xml_data:
import xml.etree.ElementTree as ET
for e in xml_data:
@ -134,7 +144,7 @@ def accumulate_result(self, test, result):
print(xml_data, file=sys.__stderr__)
raise
def display_progress(self, test_index, test):
def display_progress(self, test_index, text):
if self.ns.quiet:
return
@ -143,12 +153,12 @@ def display_progress(self, test_index, test):
fails = len(self.bad) + len(self.environment_changed)
if fails and not self.ns.pgo:
line = f"{line}/{fails}"
line = f"[{line}] {test}"
line = f"[{line}] {text}"
# add the system load prefix: "load avg: 1.80 "
if hasattr(os, 'getloadavg'):
load_avg_1min = os.getloadavg()[0]
line = f"load avg: {load_avg_1min:.2f} {line}"
load_avg = self.getloadavg()
if load_avg is not None:
line = f"load avg: {load_avg:.2f} {line}"
# add the timestamp prefix: "0:01:05 "
test_time = time.monotonic() - self.start_time
@ -164,22 +174,6 @@ def parse_args(self, kwargs):
"faulthandler.dump_traceback_later", file=sys.stderr)
ns.timeout = None
if ns.threshold is not None and gc is None:
print('No GC available, ignore --threshold.', file=sys.stderr)
ns.threshold = None
if ns.findleaks:
if gc is not None:
# Uncomment the line below to report garbage that is not
# freeable by reference counting alone. By default only
# garbage that is not collectable by the GC is reported.
pass
#gc.set_debug(gc.DEBUG_SAVEALL)
else:
print('No GC available, disabling --findleaks',
file=sys.stderr)
ns.findleaks = False
if ns.xmlpath:
support.junit_xml_list = self.testsuite_xml = []
@ -275,13 +269,13 @@ def list_cases(self):
support.verbose = False
support.set_match_tests(self.ns.match_tests)
for test in self.selected:
abstest = get_abs_module(self.ns, test)
for test_name in self.selected:
abstest = get_abs_module(self.ns, test_name)
try:
suite = unittest.defaultTestLoader.loadTestsFromName(abstest)
self._list_cases(suite)
except unittest.SkipTest:
self.skipped.append(test)
self.skipped.append(test_name)
if self.skipped:
print(file=sys.stderr)
@ -298,23 +292,19 @@ def rerun_failed_tests(self):
print()
print("Re-running failed tests in verbose mode")
self.rerun = self.bad[:]
for test in self.rerun:
print("Re-running test %r in verbose mode" % test, flush=True)
try:
self.ns.verbose = True
ok = runtest(self.ns, test)
except KeyboardInterrupt:
self.interrupted = True
# print a newline separate from the ^C
print()
for test_name in self.rerun:
print(f"Re-running {test_name} in verbose mode", flush=True)
self.ns.verbose = True
result = runtest(self.ns, test_name)
self.accumulate_result(result, rerun=True)
if result.result == INTERRUPTED:
break
else:
if ok[0] in {PASSED, ENV_CHANGED, SKIPPED, RESOURCE_DENIED}:
self.bad.remove(test)
else:
if self.bad:
print(count(len(self.bad), 'test'), "failed again:")
printlist(self.bad)
if self.bad:
print(count(len(self.bad), 'test'), "failed again:")
printlist(self.bad)
self.display_result()
@ -327,11 +317,11 @@ def display_result(self):
print("== Tests result: %s ==" % self.get_tests_result())
if self.interrupted:
print()
# print a newline after ^C
print("Test suite interrupted by signal SIGINT.")
executed = set(self.good) | set(self.bad) | set(self.skipped)
omitted = set(self.selected) - executed
omitted = set(self.selected) - self.get_executed()
if omitted:
print()
print(count(len(omitted), "test"), "omitted:")
printlist(omitted)
@ -348,8 +338,8 @@ def display_result(self):
self.test_times.sort(reverse=True)
print()
print("10 slowest tests:")
for time, test in self.test_times[:10]:
print("- %s: %s" % (test, format_duration(time)))
for test_time, test in self.test_times[:10]:
print("- %s: %s" % (test, format_duration(test_time)))
if self.bad:
print()
@ -387,10 +377,10 @@ def run_tests_sequential(self):
print("Run tests sequentially")
previous_test = None
for test_index, test in enumerate(self.tests, 1):
for test_index, test_name in enumerate(self.tests, 1):
start_time = time.monotonic()
text = test
text = test_name
if previous_test:
text = '%s -- %s' % (text, previous_test)
self.display_progress(test_index, text)
@ -398,22 +388,19 @@ def run_tests_sequential(self):
if self.tracer:
# If we're tracing code coverage, then we don't exit with status
# if on a false return value from main.
cmd = ('result = runtest(self.ns, test); '
'self.accumulate_result(test, result)')
cmd = ('result = runtest(self.ns, test_name); '
'self.accumulate_result(result)')
ns = dict(locals())
self.tracer.runctx(cmd, globals=globals(), locals=ns)
result = ns['result']
else:
try:
result = runtest(self.ns, test)
except KeyboardInterrupt:
self.interrupted = True
self.accumulate_result(test, (INTERRUPTED, None, None))
break
else:
self.accumulate_result(test, result)
result = runtest(self.ns, test_name)
self.accumulate_result(result)
previous_test = format_test_result(test, result[0])
if result.result == INTERRUPTED:
break
previous_test = format_test_result(result)
test_time = time.monotonic() - start_time
if test_time >= PROGRESS_MIN_TIME:
previous_test = "%s in %s" % (previous_test, format_duration(test_time))
@ -421,16 +408,6 @@ def run_tests_sequential(self):
# be quiet: say nothing if the test passed shortly
previous_test = None
if self.ns.findleaks:
gc.collect()
if gc.garbage:
print("Warning: test created", len(gc.garbage), end=' ')
print("uncollectable object(s).")
# move the uncollectable objects somewhere so we don't see
# them again
self.found_garbage.extend(gc.garbage)
del gc.garbage[:]
# Unload the newly imported modules (best effort finalization)
for module in sys.modules.keys():
if module not in save_modules and module.startswith("test."):
@ -441,8 +418,8 @@ def run_tests_sequential(self):
def _test_forever(self, tests):
while True:
for test in tests:
yield test
for test_name in tests:
yield test_name
if self.bad:
return
if self.ns.fail_env_changed and self.environment_changed:
@ -515,6 +492,10 @@ def run_tests(self):
self.run_tests_sequential()
def finalize(self):
if self.win_load_tracker is not None:
self.win_load_tracker.close()
self.win_load_tracker = None
if self.next_single_filename:
if self.next_single_test:
with open(self.next_single_filename, 'w') as fp:
@ -585,6 +566,15 @@ def main(self, tests=None, **kwargs):
with support.temp_cwd(test_cwd, quiet=True):
self._main(tests, kwargs)
def getloadavg(self):
if self.win_load_tracker is not None:
return self.win_load_tracker.getloadavg()
if hasattr(os, 'getloadavg'):
return os.getloadavg()[0]
return None
def _main(self, tests, kwargs):
if self.ns.huntrleaks:
warmup, repetitions, _ = self.ns.huntrleaks
@ -616,6 +606,18 @@ def _main(self, tests, kwargs):
self.list_cases()
sys.exit(0)
# If we're on windows and this is the parent runner (not a worker),
# track the load average.
if sys.platform == 'win32' and (self.ns.worker_args is None):
from test.libregrtest.win_utils import WindowsLoadTracker
try:
self.win_load_tracker = WindowsLoadTracker()
except FileNotFoundError as error:
# Windows IoT Core and Windows Nano Server do not provide
# typeperf.exe for x64, x86 or ARM
print(f'Failed to create WindowsLoadTracker: {error}')
self.run_tests()
self.display_result()

View file

@ -1,4 +1,3 @@
import errno
import os
import re
import sys
@ -18,7 +17,7 @@ def _get_dump(cls):
cls._abc_negative_cache, cls._abc_negative_cache_version)
def dash_R(ns, the_module, test_name, test_func):
def dash_R(ns, test_name, test_func):
"""Run a test multiple times, looking for reference leaks.
Returns:

View file

@ -1,4 +1,7 @@
import collections
import faulthandler
import functools
import gc
import importlib
import io
import os
@ -6,9 +9,11 @@
import time
import traceback
import unittest
from test import support
from test.libregrtest.refleak import dash_R, clear_caches
from test.libregrtest.save_env import saved_test_environment
from test.libregrtest.utils import print_warning
# Test result constants.
@ -55,9 +60,17 @@
NOTTESTS = set()
def format_test_result(test_name, result):
fmt = _FORMAT_TEST_RESULT.get(result, "%s")
return fmt % test_name
# used by --findleaks, store for gc.garbage
FOUND_GARBAGE = []
def format_test_result(result):
fmt = _FORMAT_TEST_RESULT.get(result.result, "%s")
return fmt % result.test_name
def findtestdir(path=None):
return path or os.path.dirname(os.path.dirname(__file__)) or os.curdir
def findtests(testdir=None, stdtests=STDTESTS, nottests=NOTTESTS):
@ -73,24 +86,84 @@ def findtests(testdir=None, stdtests=STDTESTS, nottests=NOTTESTS):
return stdtests + sorted(tests)
def get_abs_module(ns, test):
if test.startswith('test.') or ns.testdir:
return test
def get_abs_module(ns, test_name):
if test_name.startswith('test.') or ns.testdir:
return test_name
else:
# Always import it from the test package
return 'test.' + test
# Import it from the test package
return 'test.' + test_name
def runtest(ns, test):
TestResult = collections.namedtuple('TestResult',
'test_name result test_time xml_data')
def _runtest(ns, test_name):
# Handle faulthandler timeout, capture stdout+stderr, XML serialization
# and measure time.
output_on_failure = ns.verbose3
use_timeout = (ns.timeout is not None)
if use_timeout:
faulthandler.dump_traceback_later(ns.timeout, exit=True)
start_time = time.perf_counter()
try:
support.set_match_tests(ns.match_tests)
support.junit_xml_list = xml_list = [] if ns.xmlpath else None
if ns.failfast:
support.failfast = True
if output_on_failure:
support.verbose = True
stream = io.StringIO()
orig_stdout = sys.stdout
orig_stderr = sys.stderr
try:
sys.stdout = stream
sys.stderr = stream
result = _runtest_inner(ns, test_name,
display_failure=False)
if result != PASSED:
output = stream.getvalue()
orig_stderr.write(output)
orig_stderr.flush()
finally:
sys.stdout = orig_stdout
sys.stderr = orig_stderr
else:
# Tell tests to be moderately quiet
support.verbose = ns.verbose
result = _runtest_inner(ns, test_name,
display_failure=not ns.verbose)
if xml_list:
import xml.etree.ElementTree as ET
xml_data = [ET.tostring(x).decode('us-ascii') for x in xml_list]
else:
xml_data = None
test_time = time.perf_counter() - start_time
return TestResult(test_name, result, test_time, xml_data)
finally:
if use_timeout:
faulthandler.cancel_dump_traceback_later()
support.junit_xml_list = None
def runtest(ns, test_name):
"""Run a single test.
ns -- regrtest namespace of options
test -- the name of the test
test_name -- the name of the test
Returns the tuple (result, test_time, xml_data), where result is one
of the constants:
INTERRUPTED KeyboardInterrupt when run under -j
INTERRUPTED KeyboardInterrupt
RESOURCE_DENIED test skipped because resource denied
SKIPPED test skipped for some other reason
ENV_CHANGED test failed because it changed the execution environment
@ -101,130 +174,123 @@ def runtest(ns, test):
If ns.xmlpath is not None, xml_data is a list containing each
generated testsuite element.
"""
output_on_failure = ns.verbose3
use_timeout = (ns.timeout is not None)
if use_timeout:
faulthandler.dump_traceback_later(ns.timeout, exit=True)
try:
support.set_match_tests(ns.match_tests)
# reset the environment_altered flag to detect if a test altered
# the environment
support.environment_altered = False
support.junit_xml_list = xml_list = [] if ns.xmlpath else None
if ns.failfast:
support.failfast = True
if output_on_failure:
support.verbose = True
return _runtest(ns, test_name)
except:
if not ns.pgo:
msg = traceback.format_exc()
print(f"test {test_name} crashed -- {msg}",
file=sys.stderr, flush=True)
return TestResult(test_name, FAILED, 0.0, None)
stream = io.StringIO()
orig_stdout = sys.stdout
orig_stderr = sys.stderr
try:
sys.stdout = stream
sys.stderr = stream
result = runtest_inner(ns, test, display_failure=False)
if result[0] != PASSED:
output = stream.getvalue()
orig_stderr.write(output)
orig_stderr.flush()
finally:
sys.stdout = orig_stdout
sys.stderr = orig_stderr
else:
support.verbose = ns.verbose # Tell tests to be moderately quiet
result = runtest_inner(ns, test, display_failure=not ns.verbose)
if xml_list:
import xml.etree.ElementTree as ET
xml_data = [ET.tostring(x).decode('us-ascii') for x in xml_list]
def _test_module(the_module):
loader = unittest.TestLoader()
tests = loader.loadTestsFromModule(the_module)
for error in loader.errors:
print(error, file=sys.stderr)
if loader.errors:
raise Exception("errors while loading tests")
support.run_unittest(tests)
def _runtest_inner2(ns, test_name):
# Load the test function, run the test function, handle huntrleaks
# and findleaks to detect leaks
abstest = get_abs_module(ns, test_name)
# remove the module from sys.module to reload it if it was already imported
support.unload(abstest)
the_module = importlib.import_module(abstest)
# If the test has a test_main, that will run the appropriate
# tests. If not, use normal unittest test loading.
test_runner = getattr(the_module, "test_main", None)
if test_runner is None:
test_runner = functools.partial(_test_module, the_module)
try:
if ns.huntrleaks:
# Return True if the test leaked references
refleak = dash_R(ns, test_name, test_runner)
else:
xml_data = None
return result + (xml_data,)
test_runner()
refleak = False
finally:
if use_timeout:
faulthandler.cancel_dump_traceback_later()
cleanup_test_droppings(test, ns.verbose)
support.junit_xml_list = None
cleanup_test_droppings(test_name, ns.verbose)
support.gc_collect()
if gc.garbage:
support.environment_altered = True
print_warning(f"{test_name} created {len(gc.garbage)} "
f"uncollectable object(s).")
# move the uncollectable objects somewhere,
# so we don't see them again
FOUND_GARBAGE.extend(gc.garbage)
gc.garbage.clear()
def post_test_cleanup():
support.reap_children()
return refleak
def runtest_inner(ns, test, display_failure=True):
support.unload(test)
test_time = 0.0
refleak = False # True if the test leaked references.
def _runtest_inner(ns, test_name, display_failure=True):
# Detect environment changes, handle exceptions.
# Reset the environment_altered flag to detect if a test altered
# the environment
support.environment_altered = False
if ns.pgo:
display_failure = False
try:
abstest = get_abs_module(ns, test)
clear_caches()
with saved_test_environment(test, ns.verbose, ns.quiet, pgo=ns.pgo) as environment:
start_time = time.perf_counter()
the_module = importlib.import_module(abstest)
# If the test has a test_main, that will run the appropriate
# tests. If not, use normal unittest test loading.
test_runner = getattr(the_module, "test_main", None)
if test_runner is None:
def test_runner():
loader = unittest.TestLoader()
tests = loader.loadTestsFromModule(the_module)
for error in loader.errors:
print(error, file=sys.stderr)
if loader.errors:
raise Exception("errors while loading tests")
support.run_unittest(tests)
if ns.huntrleaks:
refleak = dash_R(ns, the_module, test, test_runner)
else:
test_runner()
test_time = time.perf_counter() - start_time
post_test_cleanup()
with saved_test_environment(test_name, ns.verbose, ns.quiet, pgo=ns.pgo) as environment:
refleak = _runtest_inner2(ns, test_name)
except support.ResourceDenied as msg:
if not ns.quiet and not ns.pgo:
print(test, "skipped --", msg, flush=True)
return RESOURCE_DENIED, test_time
print(f"{test_name} skipped -- {msg}", flush=True)
return RESOURCE_DENIED
except unittest.SkipTest as msg:
if not ns.quiet and not ns.pgo:
print(test, "skipped --", msg, flush=True)
return SKIPPED, test_time
except KeyboardInterrupt:
raise
except support.TestFailed as msg:
if not ns.pgo:
if display_failure:
print("test", test, "failed --", msg, file=sys.stderr,
flush=True)
else:
print("test", test, "failed", file=sys.stderr, flush=True)
return FAILED, test_time
print(f"{test_name} skipped -- {msg}", flush=True)
return SKIPPED
except support.TestFailed as exc:
msg = f"test {test_name} failed"
if display_failure:
msg = f"{msg} -- {exc}"
print(msg, file=sys.stderr, flush=True)
return FAILED
except support.TestDidNotRun:
return TEST_DID_NOT_RUN, test_time
return TEST_DID_NOT_RUN
except KeyboardInterrupt:
print()
return INTERRUPTED
except:
msg = traceback.format_exc()
if not ns.pgo:
print("test", test, "crashed --", msg, file=sys.stderr,
flush=True)
return FAILED, test_time
else:
if refleak:
return FAILED, test_time
if environment.changed:
return ENV_CHANGED, test_time
return PASSED, test_time
msg = traceback.format_exc()
print(f"test {test_name} crashed -- {msg}",
file=sys.stderr, flush=True)
return FAILED
if refleak:
return FAILED
if environment.changed:
return ENV_CHANGED
return PASSED
def cleanup_test_droppings(testname, verbose):
import shutil
import stat
import gc
def cleanup_test_droppings(test_name, verbose):
# First kill any dangling references to open files etc.
# This can also issue some ResourceWarnings which would otherwise get
# triggered during the following test run, and possibly produce failures.
gc.collect()
support.gc_collect()
# Try to clean up junk commonly left behind. While tests shouldn't leave
# any files or directories behind, when a test fails that can be tedious
@ -239,25 +305,23 @@ def cleanup_test_droppings(testname, verbose):
continue
if os.path.isdir(name):
import shutil
kind, nuker = "directory", shutil.rmtree
elif os.path.isfile(name):
kind, nuker = "file", os.unlink
else:
raise SystemError("os.path says %r exists but is neither "
"directory nor file" % name)
raise RuntimeError(f"os.path says {name!r} exists but is neither "
f"directory nor file")
if verbose:
print("%r left behind %s %r" % (testname, kind, name))
print_warning("%r left behind %s %r" % (test_name, kind, name))
support.environment_altered = True
try:
# if we have chmod, fix possible permissions problems
# that might prevent cleanup
if (hasattr(os, 'chmod')):
os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
import stat
# fix possible permissions problems that might prevent cleanup
os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
nuker(name)
except Exception as msg:
print(("%r left behind %s %r and it couldn't be "
"removed: %s" % (testname, kind, name, msg)), file=sys.stderr)
def findtestdir(path=None):
return path or os.path.dirname(os.path.dirname(__file__)) or os.curdir
except Exception as exc:
print_warning(f"{test_name} left behind {kind} {name!r} "
f"and it couldn't be removed: {exc}")

View file

@ -1,7 +1,9 @@
import collections
import faulthandler
import json
import os
import queue
import subprocess
import sys
import threading
import time
@ -11,7 +13,7 @@
from test.libregrtest.runtest import (
runtest, INTERRUPTED, CHILD_ERROR, PROGRESS_MIN_TIME,
format_test_result)
format_test_result, TestResult)
from test.libregrtest.setup import setup_tests
from test.libregrtest.utils import format_duration
@ -19,20 +21,12 @@
# Display the running tests if nothing happened last N seconds
PROGRESS_UPDATE = 30.0 # seconds
# If interrupted, display the wait progress every N seconds
WAIT_PROGRESS = 2.0 # seconds
def must_stop(result):
return result.result in (INTERRUPTED, CHILD_ERROR)
def run_test_in_subprocess(testname, ns):
"""Run the given test in a subprocess with --worker-args.
ns is the option Namespace parsed from command-line arguments. regrtest
is invoked in a subprocess with the --worker-args argument; when the
subprocess exits, its return code, stdout and stderr are returned as a
3-tuple.
"""
from subprocess import Popen, PIPE
ns_dict = vars(ns)
worker_args = (ns_dict, testname)
worker_args = json.dumps(worker_args)
@ -47,15 +41,12 @@ def run_test_in_subprocess(testname, ns):
# Running the child from the same working directory as regrtest's original
# invocation ensures that TEMPDIR for the child is the same when
# sysconfig.is_python_build() is true. See issue 15300.
popen = Popen(cmd,
stdout=PIPE, stderr=PIPE,
universal_newlines=True,
close_fds=(os.name != 'nt'),
cwd=support.SAVEDCWD)
with popen:
stdout, stderr = popen.communicate()
retcode = popen.wait()
return retcode, stdout, stderr
return subprocess.Popen(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
close_fds=(os.name != 'nt'),
cwd=support.SAVEDCWD)
def run_tests_worker(worker_args):
@ -64,14 +55,7 @@ def run_tests_worker(worker_args):
setup_tests(ns)
try:
result = runtest(ns, testname)
except KeyboardInterrupt:
result = INTERRUPTED, '', None
except BaseException as e:
traceback.print_exc()
result = CHILD_ERROR, str(e)
result = runtest(ns, testname)
print() # Force a newline (just in case)
print(json.dumps(result), flush=True)
sys.exit(0)
@ -83,7 +67,6 @@ class MultiprocessIterator:
"""A thread-safe iterator over tests for multiprocess mode."""
def __init__(self, tests):
self.interrupted = False
self.lock = threading.Lock()
self.tests = tests
@ -92,152 +75,213 @@ def __iter__(self):
def __next__(self):
with self.lock:
if self.interrupted:
raise StopIteration('tests interrupted')
return next(self.tests)
MultiprocessResult = collections.namedtuple('MultiprocessResult',
'result stdout stderr error_msg')
class MultiprocessThread(threading.Thread):
def __init__(self, pending, output, ns):
super().__init__()
self.pending = pending
self.output = output
self.ns = ns
self.current_test = None
self.current_test_name = None
self.start_time = None
self._popen = None
def _runtest(self):
try:
test = next(self.pending)
except StopIteration:
self.output.put((None, None, None, None))
return True
def kill(self):
if not self.is_alive():
return
if self._popen is not None:
self._popen.kill()
def _runtest(self, test_name):
try:
self.start_time = time.monotonic()
self.current_test = test
self.current_test_name = test_name
retcode, stdout, stderr = run_test_in_subprocess(test, self.ns)
popen = run_test_in_subprocess(test_name, self.ns)
self._popen = popen
with popen:
try:
stdout, stderr = popen.communicate()
except:
popen.kill()
popen.wait()
raise
retcode = popen.wait()
finally:
self.current_test = None
self.current_test_name = None
self._popen = None
stdout = stdout.strip()
stderr = stderr.rstrip()
err_msg = None
if retcode != 0:
result = (CHILD_ERROR, "Exit code %s" % retcode, None)
self.output.put((test, stdout.rstrip(), stderr.rstrip(),
result))
return False
err_msg = "Exit code %s" % retcode
else:
stdout, _, result = stdout.rpartition("\n")
stdout = stdout.rstrip()
if not result:
err_msg = "Failed to parse worker stdout"
else:
try:
# deserialize run_tests_worker() output
result = json.loads(result)
result = TestResult(*result)
except Exception as exc:
err_msg = "Failed to parse worker JSON: %s" % exc
stdout, _, result = stdout.strip().rpartition("\n")
if not result:
self.output.put((None, None, None, None))
return True
if err_msg is not None:
test_time = time.monotonic() - self.start_time
result = TestResult(test_name, CHILD_ERROR, test_time, None)
result = json.loads(result)
assert len(result) == 3, f"Invalid result tuple: {result!r}"
self.output.put((test, stdout.rstrip(), stderr.rstrip(),
result))
return False
return MultiprocessResult(result, stdout, stderr, err_msg)
def run(self):
while True:
try:
try:
test_name = next(self.pending)
except StopIteration:
break
mp_result = self._runtest(test_name)
self.output.put((False, mp_result))
if must_stop(mp_result.result):
break
except BaseException:
self.output.put((True, traceback.format_exc()))
break
def get_running(workers):
running = []
for worker in workers:
current_test_name = worker.current_test_name
if not current_test_name:
continue
dt = time.monotonic() - worker.start_time
if dt >= PROGRESS_MIN_TIME:
text = '%s (%s)' % (current_test_name, format_duration(dt))
running.append(text)
return running
class MultiprocessRunner:
def __init__(self, regrtest):
self.regrtest = regrtest
self.ns = regrtest.ns
self.output = queue.Queue()
self.pending = MultiprocessIterator(self.regrtest.tests)
if self.ns.timeout is not None:
self.test_timeout = self.ns.timeout * 1.5
else:
self.test_timeout = None
self.workers = None
def start_workers(self):
self.workers = [MultiprocessThread(self.pending, self.output, self.ns)
for _ in range(self.ns.use_mp)]
print("Run tests in parallel using %s child processes"
% len(self.workers))
for worker in self.workers:
worker.start()
def wait_workers(self):
for worker in self.workers:
worker.kill()
for worker in self.workers:
worker.join()
def _get_result(self):
if not any(worker.is_alive() for worker in self.workers):
# all worker threads are done: consume pending results
try:
return self.output.get(timeout=0)
except queue.Empty:
return None
while True:
if self.test_timeout is not None:
faulthandler.dump_traceback_later(self.test_timeout, exit=True)
# wait for a thread
timeout = max(PROGRESS_UPDATE, PROGRESS_MIN_TIME)
try:
return self.output.get(timeout=timeout)
except queue.Empty:
pass
# display progress
running = get_running(self.workers)
if running and not self.ns.pgo:
print('running: %s' % ', '.join(running), flush=True)
def display_result(self, mp_result):
result = mp_result.result
text = format_test_result(result)
if mp_result.error_msg is not None:
# CHILD_ERROR
text += ' (%s)' % mp_result.error_msg
elif (result.test_time >= PROGRESS_MIN_TIME and not self.ns.pgo):
text += ' (%s)' % format_duration(result.test_time)
running = get_running(self.workers)
if running and not self.ns.pgo:
text += ' -- running: %s' % ', '.join(running)
self.regrtest.display_progress(self.test_index, text)
def _process_result(self, item):
if item[0]:
# Thread got an exception
format_exc = item[1]
print(f"regrtest worker thread failed: {format_exc}",
file=sys.stderr, flush=True)
return True
self.test_index += 1
mp_result = item[1]
self.regrtest.accumulate_result(mp_result.result)
self.display_result(mp_result)
if mp_result.stdout:
print(mp_result.stdout, flush=True)
if mp_result.stderr and not self.ns.pgo:
print(mp_result.stderr, file=sys.stderr, flush=True)
if must_stop(mp_result.result):
return True
return False
def run_tests(self):
self.start_workers()
self.test_index = 0
try:
stop = False
while not stop:
stop = self._runtest()
except BaseException:
self.output.put((None, None, None, None))
raise
while True:
item = self._get_result()
if item is None:
break
stop = self._process_result(item)
if stop:
break
except KeyboardInterrupt:
print()
self.regrtest.interrupted = True
finally:
if self.test_timeout is not None:
faulthandler.cancel_dump_traceback_later()
self.wait_workers()
def run_tests_multiprocess(regrtest):
output = queue.Queue()
pending = MultiprocessIterator(regrtest.tests)
test_timeout = regrtest.ns.timeout
use_timeout = (test_timeout is not None)
workers = [MultiprocessThread(pending, output, regrtest.ns)
for i in range(regrtest.ns.use_mp)]
print("Run tests in parallel using %s child processes"
% len(workers))
for worker in workers:
worker.start()
def get_running(workers):
running = []
for worker in workers:
current_test = worker.current_test
if not current_test:
continue
dt = time.monotonic() - worker.start_time
if dt >= PROGRESS_MIN_TIME:
text = '%s (%s)' % (current_test, format_duration(dt))
running.append(text)
return running
finished = 0
test_index = 1
get_timeout = max(PROGRESS_UPDATE, PROGRESS_MIN_TIME)
try:
while finished < regrtest.ns.use_mp:
if use_timeout:
faulthandler.dump_traceback_later(test_timeout, exit=True)
try:
item = output.get(timeout=get_timeout)
except queue.Empty:
running = get_running(workers)
if running and not regrtest.ns.pgo:
print('running: %s' % ', '.join(running), flush=True)
continue
test, stdout, stderr, result = item
if test is None:
finished += 1
continue
regrtest.accumulate_result(test, result)
# Display progress
ok, test_time, xml_data = result
text = format_test_result(test, ok)
if (ok not in (CHILD_ERROR, INTERRUPTED)
and test_time >= PROGRESS_MIN_TIME
and not regrtest.ns.pgo):
text += ' (%s)' % format_duration(test_time)
elif ok == CHILD_ERROR:
text = '%s (%s)' % (text, test_time)
running = get_running(workers)
if running and not regrtest.ns.pgo:
text += ' -- running: %s' % ', '.join(running)
regrtest.display_progress(test_index, text)
# Copy stdout and stderr from the child process
if stdout:
print(stdout, flush=True)
if stderr and not regrtest.ns.pgo:
print(stderr, file=sys.stderr, flush=True)
if result[0] == INTERRUPTED:
raise KeyboardInterrupt
test_index += 1
except KeyboardInterrupt:
regrtest.interrupted = True
pending.interrupted = True
print()
finally:
if use_timeout:
faulthandler.cancel_dump_traceback_later()
# If tests are interrupted, wait until tests complete
wait_start = time.monotonic()
while True:
running = [worker.current_test for worker in workers]
running = list(filter(bool, running))
if not running:
break
dt = time.monotonic() - wait_start
line = "Waiting for %s (%s tests)" % (', '.join(running), len(running))
if dt >= WAIT_PROGRESS:
line = "%s since %.0f sec" % (line, dt)
print(line, flush=True)
for worker in workers:
worker.join(WAIT_PROGRESS)
MultiprocessRunner(regrtest).run_tests()

View file

@ -8,6 +8,7 @@
import threading
import warnings
from test import support
from test.libregrtest.utils import print_warning
try:
import _multiprocessing, multiprocessing.process
except ImportError:
@ -276,8 +277,7 @@ def __exit__(self, exc_type, exc_val, exc_tb):
self.changed = True
restore(original)
if not self.quiet and not self.pgo:
print(f"Warning -- {name} was modified by {self.testname}",
file=sys.stderr, flush=True)
print_warning(f"{name} was modified by {self.testname}")
print(f" Before: {original}\n After: {current} ",
file=sys.stderr, flush=True)
return False

View file

@ -1,5 +1,6 @@
import os.path
import math
import os.path
import sys
import textwrap
@ -54,3 +55,7 @@ def printlist(x, width=70, indent=4, file=None):
print(textwrap.fill(' '.join(str(elt) for elt in sorted(x)), width,
initial_indent=blanks, subsequent_indent=blanks),
file=file)
def print_warning(msg):
print(f"Warning -- {msg}", file=sys.stderr, flush=True)

View file

@ -0,0 +1,105 @@
import _winapi
import msvcrt
import os
import subprocess
import uuid
from test import support
# Max size of asynchronous reads
BUFSIZE = 8192
# Exponential damping factor (see below)
LOAD_FACTOR_1 = 0.9200444146293232478931553241
# Seconds per measurement
SAMPLING_INTERVAL = 5
COUNTER_NAME = r'\System\Processor Queue Length'
class WindowsLoadTracker():
"""
This class asynchronously interacts with the `typeperf` command to read
the system load on Windows. Mulitprocessing and threads can't be used
here because they interfere with the test suite's cases for those
modules.
"""
def __init__(self):
self.load = 0.0
self.start()
def start(self):
# Create a named pipe which allows for asynchronous IO in Windows
pipe_name = r'\\.\pipe\typeperf_output_' + str(uuid.uuid4())
open_mode = _winapi.PIPE_ACCESS_INBOUND
open_mode |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
open_mode |= _winapi.FILE_FLAG_OVERLAPPED
# This is the read end of the pipe, where we will be grabbing output
self.pipe = _winapi.CreateNamedPipe(
pipe_name, open_mode, _winapi.PIPE_WAIT,
1, BUFSIZE, BUFSIZE, _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL
)
# The write end of the pipe which is passed to the created process
pipe_write_end = _winapi.CreateFile(
pipe_name, _winapi.GENERIC_WRITE, 0, _winapi.NULL,
_winapi.OPEN_EXISTING, 0, _winapi.NULL
)
# Open up the handle as a python file object so we can pass it to
# subprocess
command_stdout = msvcrt.open_osfhandle(pipe_write_end, 0)
# Connect to the read end of the pipe in overlap/async mode
overlap = _winapi.ConnectNamedPipe(self.pipe, overlapped=True)
overlap.GetOverlappedResult(True)
# Spawn off the load monitor
command = ['typeperf', COUNTER_NAME, '-si', str(SAMPLING_INTERVAL)]
self.p = subprocess.Popen(command, stdout=command_stdout, cwd=support.SAVEDCWD)
# Close our copy of the write end of the pipe
os.close(command_stdout)
def close(self):
if self.p is None:
return
self.p.kill()
self.p.wait()
self.p = None
def __del__(self):
self.close()
def read_output(self):
import _winapi
overlapped, _ = _winapi.ReadFile(self.pipe, BUFSIZE, True)
bytes_read, res = overlapped.GetOverlappedResult(False)
if res != 0:
return
return overlapped.getbuffer().decode()
def getloadavg(self):
typeperf_output = self.read_output()
# Nothing to update, just return the current load
if not typeperf_output:
return self.load
# Process the backlog of load values
for line in typeperf_output.splitlines():
# typeperf outputs in a CSV format like this:
# "07/19/2018 01:32:26.605","3.000000"
toks = line.split(',')
# Ignore blank lines and the initial header
if line.strip() == '' or (COUNTER_NAME in line) or len(toks) != 2:
continue
load = float(toks[1].replace('"', ''))
# We use an exponentially weighted moving average, imitating the
# load calculation on Unix systems.
# https://en.wikipedia.org/wiki/Load_(computing)#Unix-style_load_calculation
new_load = self.load * LOAD_FACTOR_1 + load * (1.0 - LOAD_FACTOR_1)
self.load = new_load
return self.load

View file

@ -21,7 +21,7 @@
from test.libregrtest import utils
Py_DEBUG = hasattr(sys, 'getobjects')
Py_DEBUG = hasattr(sys, 'gettotalrefcount')
ROOT_DIR = os.path.join(os.path.dirname(__file__), '..', '..')
ROOT_DIR = os.path.abspath(os.path.normpath(ROOT_DIR))
@ -109,7 +109,7 @@ def test_quiet(self):
self.assertTrue(ns.quiet)
self.assertEqual(ns.verbose, 0)
def test_slow(self):
def test_slowest(self):
for opt in '-o', '--slowest':
with self.subTest(opt=opt):
ns = libregrtest._parse_args([opt])
@ -255,9 +255,7 @@ def test_multiprocess(self):
self.checkError([opt], 'expected one argument')
self.checkError([opt, 'foo'], 'invalid int value')
self.checkError([opt, '2', '-T'], "don't go together")
self.checkError([opt, '2', '-l'], "don't go together")
self.checkError([opt, '0', '-T'], "don't go together")
self.checkError([opt, '0', '-l'], "don't go together")
def test_coverage(self):
for opt in '-T', '--coverage':
@ -454,8 +452,8 @@ def list_regex(line_format, tests):
regex = list_regex('%s re-run test%s', rerun)
self.check_line(output, regex)
self.check_line(output, "Re-running failed tests in verbose mode")
for name in rerun:
regex = "Re-running test %r in verbose mode" % name
for test_name in rerun:
regex = f"Re-running {test_name} in verbose mode"
self.check_line(output, regex)
if no_test_ran:
@ -487,7 +485,7 @@ def list_regex(line_format, tests):
result.append('SUCCESS')
result = ', '.join(result)
if rerun:
self.check_line(output, 'Tests result: %s' % result)
self.check_line(output, 'Tests result: FAILURE')
result = 'FAILURE then %s' % result
self.check_line(output, 'Tests result: %s' % result)
@ -781,22 +779,23 @@ def test_slowest(self):
% (self.TESTNAME_REGEX, len(tests)))
self.check_line(output, regex)
def test_slow_interrupted(self):
def test_slowest_interrupted(self):
# Issue #25373: test --slowest with an interrupted test
code = TEST_INTERRUPTED
test = self.create_test("sigint", code=code)
for multiprocessing in (False, True):
if multiprocessing:
args = ("--slowest", "-j2", test)
else:
args = ("--slowest", test)
output = self.run_tests(*args, exitcode=130)
self.check_executed_tests(output, test,
omitted=test, interrupted=True)
with self.subTest(multiprocessing=multiprocessing):
if multiprocessing:
args = ("--slowest", "-j2", test)
else:
args = ("--slowest", test)
output = self.run_tests(*args, exitcode=130)
self.check_executed_tests(output, test,
omitted=test, interrupted=True)
regex = ('10 slowest tests:\n')
self.check_line(output, regex)
regex = ('10 slowest tests:\n')
self.check_line(output, regex)
def test_coverage(self):
# test --coverage
@ -915,13 +914,13 @@ def test_method2(self):
testname)
self.assertEqual(output.splitlines(), all_methods)
@support.cpython_only
def test_crashed(self):
# Any code which causes a crash
code = 'import faulthandler; faulthandler._sigsegv()'
crash_test = self.create_test(name="crash", code=code)
ok_test = self.create_test(name="ok")
tests = [crash_test, ok_test]
tests = [crash_test]
output = self.run_tests("-j2", *tests, exitcode=2)
self.check_executed_tests(output, tests, failed=crash_test,
randomize=True)
@ -991,6 +990,7 @@ def test_env_changed(self):
fail_env_changed=True)
def test_rerun_fail(self):
# FAILURE then FAILURE
code = textwrap.dedent("""
import unittest
@ -1005,6 +1005,26 @@ def test_bug(self):
self.check_executed_tests(output, [testname],
failed=testname, rerun=testname)
def test_rerun_success(self):
# FAILURE then SUCCESS
code = textwrap.dedent("""
import builtins
import unittest
class Tests(unittest.TestCase):
failed = False
def test_fail_once(self):
if not hasattr(builtins, '_test_failed'):
builtins._test_failed = True
self.fail("bug")
""")
testname = self.create_test(code=code)
output = self.run_tests("-w", testname, exitcode=0)
self.check_executed_tests(output, [testname],
rerun=testname)
def test_no_tests_ran(self):
code = textwrap.dedent("""
import unittest
@ -1069,6 +1089,38 @@ def test_other_bug(self):
self.check_executed_tests(output, [testname, testname2],
no_test_ran=[testname])
@support.cpython_only
def test_findleaks(self):
code = textwrap.dedent(r"""
import _testcapi
import gc
import unittest
@_testcapi.with_tp_del
class Garbage:
def __tp_del__(self):
pass
class Tests(unittest.TestCase):
def test_garbage(self):
# create an uncollectable object
obj = Garbage()
obj.ref_cycle = obj
obj = None
""")
testname = self.create_test(code=code)
output = self.run_tests("--fail-env-changed", testname, exitcode=3)
self.check_executed_tests(output, [testname],
env_changed=[testname],
fail_env_changed=True)
# --findleaks is now basically an alias to --fail-env-changed
output = self.run_tests("--findleaks", testname, exitcode=3)
self.check_executed_tests(output, [testname],
env_changed=[testname],
fail_env_changed=True)
class TestUtils(unittest.TestCase):
def test_format_duration(self):

View file

@ -0,0 +1,2 @@
Clean up code which checked presence of ``os.stat`` / ``os.lstat`` /
``os.chmod`` which are always present. Patch by Anthony Sottile.

View file

@ -0,0 +1,3 @@
When using mulitprocessing mode (-jN), regrtest now better reports errors if
a worker process fails, and it exits immediately on a worker thread failure
or when interrupted.

View file

@ -0,0 +1,4 @@
regrtest now always detects uncollectable objects. Previously, the check was
only enabled by ``--findleaks``. The check now also works with
``-jN/--multiprocess N``. ``--findleaks`` becomes a deprecated alias to
``--fail-env-changed``.

View file

@ -0,0 +1,2 @@
Report system load when running test suite on Windows. Patch by Ammar Askar.
Based on prior work by Jeremy Kloth.