gh-109162: libregrtest: remove WorkerJob class (#109204)

* Add attributes to Regrtest and RunTests:

  * gc_threshold
  * memory_limit
  * python_cmd
  * use_resources

* Remove WorkerJob class. Add as_json() and from_json() methods to
  RunTests. A worker process now only uses RunTests for all
  parameters.
* Add tests on support.set_memlimit() in test_support. Create
  _parse_memlimit() and also adds tests on it.
* Remove 'ns' parameter from runtest.py.
This commit is contained in:
Victor Stinner 2023-09-10 01:41:21 +02:00 committed by GitHub
parent 24fa8f2046
commit 0c0f254230
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 125 additions and 87 deletions

View file

@ -177,6 +177,8 @@ def __init__(self, **kwargs) -> None:
self.worker_json = None
self.start = None
self.timeout = None
self.memlimit = None
self.threshold = None
super().__init__(**kwargs)

View file

@ -100,6 +100,10 @@ def __init__(self, ns: Namespace):
self.hunt_refleak = None
self.test_dir: str | None = ns.testdir
self.junit_filename: str | None = ns.xmlpath
self.memory_limit: str | None = ns.memlimit
self.gc_threshold: int | None = ns.threshold
self.use_resources: list[str] = ns.use_resources
self.python_cmd: list[str] | None = ns.python
# tests
self.tests = []
@ -363,7 +367,7 @@ def _rerun_failed_tests(self, need_rerun, runtests: RunTests):
return runtests
def rerun_failed_tests(self, need_rerun, runtests: RunTests):
if self.ns.python:
if self.python_cmd:
# Temp patch for https://github.com/python/cpython/issues/94052
self.log(
"Re-running failed tests is not supported with --python "
@ -453,12 +457,12 @@ def run_test(self, test_name: str, runtests: RunTests, tracer):
if tracer is not None:
# If we're tracing code coverage, then we don't exit with status
# if on a false return value from main.
cmd = ('result = run_single_test(test_name, runtests, self.ns)')
cmd = ('result = run_single_test(test_name, runtests)')
ns = dict(locals())
tracer.runctx(cmd, globals=globals(), locals=ns)
result = ns['result']
else:
result = run_single_test(test_name, runtests, self.ns)
result = run_single_test(test_name, runtests)
self.accumulate_result(result)
@ -876,9 +880,14 @@ def action_run_tests(self):
quiet=self.quiet,
hunt_refleak=self.hunt_refleak,
test_dir=self.test_dir,
junit_filename=self.junit_filename)
junit_filename=self.junit_filename,
memory_limit=self.memory_limit,
gc_threshold=self.gc_threshold,
use_resources=self.use_resources,
python_cmd=self.python_cmd,
)
setup_tests(runtests, self.ns)
setup_tests(runtests)
tracer = self.run_tests(runtests)
self.display_result(runtests)

View file

@ -4,17 +4,18 @@
import gc
import importlib
import io
import json
import os
import sys
import time
import traceback
import unittest
from typing import Any
from test import support
from test.support import TestStats
from test.support import os_helper
from test.support import threading_helper
from test.libregrtest.cmdline import Namespace
from test.libregrtest.save_env import saved_test_environment
from test.libregrtest.utils import clear_caches, format_duration, print_warning
@ -230,6 +231,10 @@ class RunTests:
hunt_refleak: HuntRefleak | None = None
test_dir: str | None = None
junit_filename: str | None = None
memory_limit: str | None = None
gc_threshold: int | None = None
use_resources: list[str] = None
python_cmd: list[str] | None = None
def copy(self, **override):
state = dataclasses.asdict(self)
@ -249,11 +254,32 @@ def iter_tests(self):
else:
yield from self.tests
def as_json(self):
return json.dumps(self, cls=_EncodeRunTests)
@staticmethod
def from_json_dict(json_dict):
if json_dict['hunt_refleak']:
json_dict['hunt_refleak'] = HuntRefleak(**json_dict['hunt_refleak'])
return RunTests(**json_dict)
def from_json(worker_json):
return json.loads(worker_json, object_hook=_decode_runtests)
class _EncodeRunTests(json.JSONEncoder):
def default(self, o: Any) -> dict[str, Any]:
if isinstance(o, RunTests):
result = dataclasses.asdict(o)
result["__runtests__"] = True
return result
else:
return super().default(o)
def _decode_runtests(data: dict[str, Any]) -> RunTests | dict[str, Any]:
if "__runtests__" in data:
data.pop('__runtests__')
if data['hunt_refleak']:
data['hunt_refleak'] = HuntRefleak(**data['hunt_refleak'])
return RunTests(**data)
else:
return data
# Minimum duration of a test to display its duration or to mention that
@ -320,7 +346,7 @@ def abs_module_name(test_name: str, test_dir: str | None) -> str:
return 'test.' + test_name
def setup_support(runtests: RunTests, ns: Namespace):
def setup_support(runtests: RunTests):
support.PGO = runtests.pgo
support.PGO_EXTENDED = runtests.pgo_extended
support.set_match_tests(runtests.match_tests, runtests.ignore_tests)
@ -332,7 +358,7 @@ def setup_support(runtests: RunTests, ns: Namespace):
support.junit_xml_list = None
def _runtest(result: TestResult, runtests: RunTests, ns: Namespace) -> None:
def _runtest(result: TestResult, runtests: RunTests) -> None:
# Capture stdout and stderr, set faulthandler timeout,
# and create JUnit XML report.
verbose = runtests.verbose
@ -346,7 +372,7 @@ def _runtest(result: TestResult, runtests: RunTests, ns: Namespace) -> None:
faulthandler.dump_traceback_later(timeout, exit=True)
try:
setup_support(runtests, ns)
setup_support(runtests)
if output_on_failure:
support.verbose = True
@ -366,7 +392,7 @@ def _runtest(result: TestResult, runtests: RunTests, ns: Namespace) -> None:
# warnings will be written to sys.stderr below.
print_warning.orig_stderr = stream
_runtest_env_changed_exc(result, runtests, ns, display_failure=False)
_runtest_env_changed_exc(result, runtests, display_failure=False)
# Ignore output if the test passed successfully
if result.state != State.PASSED:
output = stream.getvalue()
@ -381,7 +407,7 @@ def _runtest(result: TestResult, runtests: RunTests, ns: Namespace) -> None:
else:
# Tell tests to be moderately quiet
support.verbose = verbose
_runtest_env_changed_exc(result, runtests, ns,
_runtest_env_changed_exc(result, runtests,
display_failure=not verbose)
xml_list = support.junit_xml_list
@ -395,10 +421,9 @@ def _runtest(result: TestResult, runtests: RunTests, ns: Namespace) -> None:
support.junit_xml_list = None
def run_single_test(test_name: str, runtests: RunTests, ns: Namespace) -> TestResult:
def run_single_test(test_name: str, runtests: RunTests) -> TestResult:
"""Run a single test.
ns -- regrtest namespace of options
test_name -- the name of the test
Returns a TestResult.
@ -410,7 +435,7 @@ def run_single_test(test_name: str, runtests: RunTests, ns: Namespace) -> TestRe
result = TestResult(test_name)
pgo = runtests.pgo
try:
_runtest(result, runtests, ns)
_runtest(result, runtests)
except:
if not pgo:
msg = traceback.format_exc()
@ -472,7 +497,7 @@ def regrtest_runner(result: TestResult, test_func, runtests: RunTests) -> None:
FOUND_GARBAGE = []
def _load_run_test(result: TestResult, runtests: RunTests, ns: Namespace) -> None:
def _load_run_test(result: TestResult, runtests: RunTests) -> None:
# Load the test function, run the test function.
module_name = abs_module_name(result.test_name, runtests.test_dir)
@ -513,7 +538,6 @@ def test_func():
def _runtest_env_changed_exc(result: TestResult, runtests: RunTests,
ns: Namespace,
display_failure: bool = True) -> None:
# Detect environment changes, handle exceptions.
@ -532,7 +556,7 @@ def _runtest_env_changed_exc(result: TestResult, runtests: RunTests,
support.gc_collect()
with save_env(test_name, runtests):
_load_run_test(result, runtests, ns)
_load_run_test(result, runtests)
except support.ResourceDenied as msg:
if not quiet and not pgo:
print(f"{test_name} skipped -- {msg}", flush=True)

View file

@ -47,49 +47,16 @@
@dataclasses.dataclass(slots=True)
class WorkerJob:
runtests: RunTests
namespace: Namespace
class _EncodeWorkerJob(json.JSONEncoder):
def default(self, o: Any) -> dict[str, Any]:
match o:
case WorkerJob():
result = dataclasses.asdict(o)
result["__worker_job__"] = True
return result
case Namespace():
result = vars(o)
result["__namespace__"] = True
return result
case _:
return super().default(o)
def _decode_worker_job(d: dict[str, Any]) -> WorkerJob | dict[str, Any]:
if "__worker_job__" in d:
d.pop('__worker_job__')
d['runtests'] = RunTests.from_json_dict(d['runtests'])
return WorkerJob(**d)
if "__namespace__" in d:
d.pop('__namespace__')
return Namespace(**d)
else:
return d
def _parse_worker_json(worker_json: str) -> tuple[Namespace, str]:
return json.loads(worker_json, object_hook=_decode_worker_job)
def create_worker_process(worker_job: WorkerJob,
def create_worker_process(runtests: RunTests,
output_file: TextIO,
tmp_dir: str | None = None) -> subprocess.Popen:
ns = worker_job.namespace
python = ns.python
worker_json = json.dumps(worker_job, cls=_EncodeWorkerJob)
python_cmd = runtests.python_cmd
worker_json = runtests.as_json()
if python is not None:
executable = python
if python_cmd is not None:
executable = python_cmd
else:
executable = [sys.executable]
cmd = [*executable, *support.args_from_interpreter_flags(),
@ -121,14 +88,12 @@ def create_worker_process(worker_job: WorkerJob,
def worker_process(worker_json: str) -> NoReturn:
worker_job = _parse_worker_json(worker_json)
runtests = worker_job.runtests
ns = worker_job.namespace
runtests = RunTests.from_json(worker_json)
test_name = runtests.tests[0]
match_tests: FilterTuple | None = runtests.match_tests
setup_test_dir(runtests.test_dir)
setup_tests(runtests, ns)
setup_tests(runtests)
if runtests.rerun:
if match_tests:
@ -137,7 +102,7 @@ def worker_process(worker_json: str) -> NoReturn:
else:
print(f"Re-running {test_name} in verbose mode", flush=True)
result = run_single_test(test_name, runtests, ns)
result = run_single_test(test_name, runtests)
print() # Force a newline (just in case)
# Serialize TestResult as dict in JSON
@ -330,9 +295,6 @@ def _runtest(self, test_name: str) -> MultiprocessResult:
if match_tests:
kwargs['match_tests'] = match_tests
worker_runtests = self.runtests.copy(tests=tests, **kwargs)
worker_job = WorkerJob(
worker_runtests,
namespace=self.ns)
# gh-94026: Write stdout+stderr to a tempfile as workaround for
# non-blocking pipes on Emscripten with NodeJS.
@ -347,12 +309,12 @@ def _runtest(self, test_name: str) -> MultiprocessResult:
tmp_dir = tempfile.mkdtemp(prefix="test_python_")
tmp_dir = os.path.abspath(tmp_dir)
try:
retcode = self._run_process(worker_job, stdout_file, tmp_dir)
retcode = self._run_process(worker_runtests, stdout_file, tmp_dir)
finally:
tmp_files = os.listdir(tmp_dir)
os_helper.rmtree(tmp_dir)
else:
retcode = self._run_process(worker_job, stdout_file)
retcode = self._run_process(worker_runtests, stdout_file)
tmp_files = ()
stdout_file.seek(0)

View file

@ -25,7 +25,7 @@ def setup_test_dir(testdir: str | None) -> None:
sys.path.insert(0, os.path.abspath(testdir))
def setup_tests(runtests, ns):
def setup_tests(runtests):
try:
stderr_fd = sys.__stderr__.fileno()
except (ValueError, AttributeError):
@ -71,15 +71,15 @@ def setup_tests(runtests, ns):
if runtests.hunt_refleak:
unittest.BaseTestSuite._cleanup = False
if ns.memlimit is not None:
support.set_memlimit(ns.memlimit)
if runtests.memory_limit is not None:
support.set_memlimit(runtests.memory_limit)
if ns.threshold is not None:
gc.set_threshold(ns.threshold)
if runtests.gc_threshold is not None:
gc.set_threshold(runtests.gc_threshold)
support.suppress_msvcrt_asserts(runtests.verbose and runtests.verbose >= 2)
support.use_resources = ns.use_resources
support.use_resources = runtests.use_resources
if hasattr(sys, 'addaudithook'):
# Add an auditing hook for all tests to ensure PySys_Audit is tested

View file

@ -878,27 +878,31 @@ def inner(*args, **kwds):
MAX_Py_ssize_t = sys.maxsize
def set_memlimit(limit):
global max_memuse
global real_max_memuse
def _parse_memlimit(limit: str) -> int:
sizes = {
'k': 1024,
'm': _1M,
'g': _1G,
't': 1024*_1G,
}
m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit,
m = re.match(r'(\d+(?:\.\d+)?) (K|M|G|T)b?$', limit,
re.IGNORECASE | re.VERBOSE)
if m is None:
raise ValueError('Invalid memory limit %r' % (limit,))
memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()])
real_max_memuse = memlimit
if memlimit > MAX_Py_ssize_t:
memlimit = MAX_Py_ssize_t
raise ValueError(f'Invalid memory limit: {limit!r}')
return int(float(m.group(1)) * sizes[m.group(2).lower()])
def set_memlimit(limit: str) -> None:
global max_memuse
global real_max_memuse
memlimit = _parse_memlimit(limit)
if memlimit < _2G - 1:
raise ValueError('Memory limit %r too low to be useful' % (limit,))
raise ValueError('Memory limit {limit!r} too low to be useful')
real_max_memuse = memlimit
memlimit = min(memlimit, MAX_Py_ssize_t)
max_memuse = memlimit
class _MemoryWatchdog:
"""An object which periodically watches the process' memory consumption
and prints it out.

View file

@ -760,7 +760,45 @@ def recursive_function(depth):
else:
self.fail("RecursionError was not raised")
#self.assertEqual(available, 2)
def test_parse_memlimit(self):
parse = support._parse_memlimit
KiB = 1024
MiB = KiB * 1024
GiB = MiB * 1024
TiB = GiB * 1024
self.assertEqual(parse('0k'), 0)
self.assertEqual(parse('3k'), 3 * KiB)
self.assertEqual(parse('2.4m'), int(2.4 * MiB))
self.assertEqual(parse('4g'), int(4 * GiB))
self.assertEqual(parse('1t'), TiB)
for limit in ('', '3', '3.5.10k', '10x'):
with self.subTest(limit=limit):
with self.assertRaises(ValueError):
parse(limit)
def test_set_memlimit(self):
_4GiB = 4 * 1024 ** 3
TiB = 1024 ** 4
old_max_memuse = support.max_memuse
old_real_max_memuse = support.real_max_memuse
try:
if sys.maxsize > 2**32:
support.set_memlimit('4g')
self.assertEqual(support.max_memuse, _4GiB)
self.assertEqual(support.real_max_memuse, _4GiB)
big = 2**100 // TiB
support.set_memlimit(f'{big}t')
self.assertEqual(support.max_memuse, sys.maxsize)
self.assertEqual(support.real_max_memuse, big * TiB)
else:
support.set_memlimit('4g')
self.assertEqual(support.max_memuse, sys.maxsize)
self.assertEqual(support.real_max_memuse, _4GiB)
finally:
support.max_memuse = old_max_memuse
support.real_max_memuse = old_real_max_memuse
# XXX -follows a list of untested API
# make_legacy_pyc
@ -773,7 +811,6 @@ def recursive_function(depth):
# EnvironmentVarGuard
# transient_internet
# run_with_locale
# set_memlimit
# bigmemtest
# precisionbigmemtest
# bigaddrspacetest