mirror of
				https://github.com/python/cpython.git
				synced 2025-11-04 15:41:43 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			689 lines
		
	
	
	
		
			20 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			689 lines
		
	
	
	
		
			20 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
from collections import namedtuple
 | 
						|
import contextlib
 | 
						|
import json
 | 
						|
import io
 | 
						|
import os
 | 
						|
import os.path
 | 
						|
import pickle
 | 
						|
import queue
 | 
						|
#import select
 | 
						|
import subprocess
 | 
						|
import sys
 | 
						|
import tempfile
 | 
						|
from textwrap import dedent, indent
 | 
						|
import threading
 | 
						|
import types
 | 
						|
import unittest
 | 
						|
import warnings
 | 
						|
 | 
						|
from test import support
 | 
						|
 | 
						|
# We would use test.support.import_helper.import_module(),
 | 
						|
# but the indirect import of test.support.os_helper causes refleaks.
 | 
						|
try:
 | 
						|
    import _interpreters
 | 
						|
except ImportError as exc:
 | 
						|
    raise unittest.SkipTest(str(exc))
 | 
						|
from test.support import interpreters
 | 
						|
 | 
						|
 | 
						|
try:
 | 
						|
    import _testinternalcapi
 | 
						|
    import _testcapi
 | 
						|
except ImportError:
 | 
						|
    _testinternalcapi = None
 | 
						|
    _testcapi = None
 | 
						|
 | 
						|
def requires_test_modules(func):
 | 
						|
    return unittest.skipIf(_testinternalcapi is None, "test requires _testinternalcapi module")(func)
 | 
						|
 | 
						|
 | 
						|
def _dump_script(text):
 | 
						|
    lines = text.splitlines()
 | 
						|
    print()
 | 
						|
    print('-' * 20)
 | 
						|
    for i, line in enumerate(lines, 1):
 | 
						|
        print(f' {i:>{len(str(len(lines)))}}  {line}')
 | 
						|
    print('-' * 20)
 | 
						|
 | 
						|
 | 
						|
def _close_file(file):
 | 
						|
    try:
 | 
						|
        if hasattr(file, 'close'):
 | 
						|
            file.close()
 | 
						|
        else:
 | 
						|
            os.close(file)
 | 
						|
    except OSError as exc:
 | 
						|
        if exc.errno != 9:
 | 
						|
            raise  # re-raise
 | 
						|
        # It was closed already.
 | 
						|
 | 
						|
 | 
						|
def pack_exception(exc=None):
 | 
						|
    captured = _interpreters.capture_exception(exc)
 | 
						|
    data = dict(captured.__dict__)
 | 
						|
    data['type'] = dict(captured.type.__dict__)
 | 
						|
    return json.dumps(data)
 | 
						|
 | 
						|
 | 
						|
def unpack_exception(packed):
 | 
						|
    try:
 | 
						|
        data = json.loads(packed)
 | 
						|
    except json.decoder.JSONDecodeError:
 | 
						|
        warnings.warn('incomplete exception data', RuntimeWarning)
 | 
						|
        print(packed if isinstance(packed, str) else packed.decode('utf-8'))
 | 
						|
        return None
 | 
						|
    exc = types.SimpleNamespace(**data)
 | 
						|
    exc.type = types.SimpleNamespace(**exc.type)
 | 
						|
    return exc;
 | 
						|
 | 
						|
 | 
						|
class CapturingResults:
 | 
						|
 | 
						|
    STDIO = dedent("""\
 | 
						|
        with open({w_pipe}, 'wb', buffering=0) as _spipe_{stream}:
 | 
						|
            _captured_std{stream} = io.StringIO()
 | 
						|
            with contextlib.redirect_std{stream}(_captured_std{stream}):
 | 
						|
                #########################
 | 
						|
                # begin wrapped script
 | 
						|
 | 
						|
                {indented}
 | 
						|
 | 
						|
                # end wrapped script
 | 
						|
                #########################
 | 
						|
            text = _captured_std{stream}.getvalue()
 | 
						|
            _spipe_{stream}.write(text.encode('utf-8'))
 | 
						|
        """)[:-1]
 | 
						|
    EXC = dedent("""\
 | 
						|
        with open({w_pipe}, 'wb', buffering=0) as _spipe_exc:
 | 
						|
            try:
 | 
						|
                #########################
 | 
						|
                # begin wrapped script
 | 
						|
 | 
						|
                {indented}
 | 
						|
 | 
						|
                # end wrapped script
 | 
						|
                #########################
 | 
						|
            except Exception as exc:
 | 
						|
                text = _interp_utils.pack_exception(exc)
 | 
						|
                _spipe_exc.write(text.encode('utf-8'))
 | 
						|
        """)[:-1]
 | 
						|
 | 
						|
    @classmethod
 | 
						|
    def wrap_script(cls, script, *, stdout=True, stderr=False, exc=False):
 | 
						|
        script = dedent(script).strip(os.linesep)
 | 
						|
        imports = [
 | 
						|
            f'import {__name__} as _interp_utils',
 | 
						|
        ]
 | 
						|
        wrapped = script
 | 
						|
 | 
						|
        # Handle exc.
 | 
						|
        if exc:
 | 
						|
            exc = os.pipe()
 | 
						|
            r_exc, w_exc = exc
 | 
						|
            indented = wrapped.replace('\n', '\n        ')
 | 
						|
            wrapped = cls.EXC.format(
 | 
						|
                w_pipe=w_exc,
 | 
						|
                indented=indented,
 | 
						|
            )
 | 
						|
        else:
 | 
						|
            exc = None
 | 
						|
 | 
						|
        # Handle stdout.
 | 
						|
        if stdout:
 | 
						|
            imports.extend([
 | 
						|
                'import contextlib, io',
 | 
						|
            ])
 | 
						|
            stdout = os.pipe()
 | 
						|
            r_out, w_out = stdout
 | 
						|
            indented = wrapped.replace('\n', '\n        ')
 | 
						|
            wrapped = cls.STDIO.format(
 | 
						|
                w_pipe=w_out,
 | 
						|
                indented=indented,
 | 
						|
                stream='out',
 | 
						|
            )
 | 
						|
        else:
 | 
						|
            stdout = None
 | 
						|
 | 
						|
        # Handle stderr.
 | 
						|
        if stderr == 'stdout':
 | 
						|
            stderr = None
 | 
						|
        elif stderr:
 | 
						|
            if not stdout:
 | 
						|
                imports.extend([
 | 
						|
                    'import contextlib, io',
 | 
						|
                ])
 | 
						|
            stderr = os.pipe()
 | 
						|
            r_err, w_err = stderr
 | 
						|
            indented = wrapped.replace('\n', '\n        ')
 | 
						|
            wrapped = cls.STDIO.format(
 | 
						|
                w_pipe=w_err,
 | 
						|
                indented=indented,
 | 
						|
                stream='err',
 | 
						|
            )
 | 
						|
        else:
 | 
						|
            stderr = None
 | 
						|
 | 
						|
        if wrapped == script:
 | 
						|
            raise NotImplementedError
 | 
						|
        else:
 | 
						|
            for line in imports:
 | 
						|
                wrapped = f'{line}{os.linesep}{wrapped}'
 | 
						|
 | 
						|
        results = cls(stdout, stderr, exc)
 | 
						|
        return wrapped, results
 | 
						|
 | 
						|
    def __init__(self, out, err, exc):
 | 
						|
        self._rf_out = None
 | 
						|
        self._rf_err = None
 | 
						|
        self._rf_exc = None
 | 
						|
        self._w_out = None
 | 
						|
        self._w_err = None
 | 
						|
        self._w_exc = None
 | 
						|
 | 
						|
        if out is not None:
 | 
						|
            r_out, w_out = out
 | 
						|
            self._rf_out = open(r_out, 'rb', buffering=0)
 | 
						|
            self._w_out = w_out
 | 
						|
 | 
						|
        if err is not None:
 | 
						|
            r_err, w_err = err
 | 
						|
            self._rf_err = open(r_err, 'rb', buffering=0)
 | 
						|
            self._w_err = w_err
 | 
						|
 | 
						|
        if exc is not None:
 | 
						|
            r_exc, w_exc = exc
 | 
						|
            self._rf_exc = open(r_exc, 'rb', buffering=0)
 | 
						|
            self._w_exc = w_exc
 | 
						|
 | 
						|
        self._buf_out = b''
 | 
						|
        self._buf_err = b''
 | 
						|
        self._buf_exc = b''
 | 
						|
        self._exc = None
 | 
						|
 | 
						|
        self._closed = False
 | 
						|
 | 
						|
    def __enter__(self):
 | 
						|
        return self
 | 
						|
 | 
						|
    def __exit__(self, *args):
 | 
						|
        self.close()
 | 
						|
 | 
						|
    @property
 | 
						|
    def closed(self):
 | 
						|
        return self._closed
 | 
						|
 | 
						|
    def close(self):
 | 
						|
        if self._closed:
 | 
						|
            return
 | 
						|
        self._closed = True
 | 
						|
 | 
						|
        if self._w_out is not None:
 | 
						|
            _close_file(self._w_out)
 | 
						|
            self._w_out = None
 | 
						|
        if self._w_err is not None:
 | 
						|
            _close_file(self._w_err)
 | 
						|
            self._w_err = None
 | 
						|
        if self._w_exc is not None:
 | 
						|
            _close_file(self._w_exc)
 | 
						|
            self._w_exc = None
 | 
						|
 | 
						|
        self._capture()
 | 
						|
 | 
						|
        if self._rf_out is not None:
 | 
						|
            _close_file(self._rf_out)
 | 
						|
            self._rf_out = None
 | 
						|
        if self._rf_err is not None:
 | 
						|
            _close_file(self._rf_err)
 | 
						|
            self._rf_err = None
 | 
						|
        if self._rf_exc is not None:
 | 
						|
            _close_file(self._rf_exc)
 | 
						|
            self._rf_exc = None
 | 
						|
 | 
						|
    def _capture(self):
 | 
						|
        # Ideally this is called only after the script finishes
 | 
						|
        # (and thus has closed the write end of the pipe.
 | 
						|
        if self._rf_out is not None:
 | 
						|
            chunk = self._rf_out.read(100)
 | 
						|
            while chunk:
 | 
						|
                self._buf_out += chunk
 | 
						|
                chunk = self._rf_out.read(100)
 | 
						|
        if self._rf_err is not None:
 | 
						|
            chunk = self._rf_err.read(100)
 | 
						|
            while chunk:
 | 
						|
                self._buf_err += chunk
 | 
						|
                chunk = self._rf_err.read(100)
 | 
						|
        if self._rf_exc is not None:
 | 
						|
            chunk = self._rf_exc.read(100)
 | 
						|
            while chunk:
 | 
						|
                self._buf_exc += chunk
 | 
						|
                chunk = self._rf_exc.read(100)
 | 
						|
 | 
						|
    def _unpack_stdout(self):
 | 
						|
        return self._buf_out.decode('utf-8')
 | 
						|
 | 
						|
    def _unpack_stderr(self):
 | 
						|
        return self._buf_err.decode('utf-8')
 | 
						|
 | 
						|
    def _unpack_exc(self):
 | 
						|
        if self._exc is not None:
 | 
						|
            return self._exc
 | 
						|
        if not self._buf_exc:
 | 
						|
            return None
 | 
						|
        self._exc = unpack_exception(self._buf_exc)
 | 
						|
        return self._exc
 | 
						|
 | 
						|
    def stdout(self):
 | 
						|
        if self.closed:
 | 
						|
            return self.final().stdout
 | 
						|
        self._capture()
 | 
						|
        return self._unpack_stdout()
 | 
						|
 | 
						|
    def stderr(self):
 | 
						|
        if self.closed:
 | 
						|
            return self.final().stderr
 | 
						|
        self._capture()
 | 
						|
        return self._unpack_stderr()
 | 
						|
 | 
						|
    def exc(self):
 | 
						|
        if self.closed:
 | 
						|
            return self.final().exc
 | 
						|
        self._capture()
 | 
						|
        return self._unpack_exc()
 | 
						|
 | 
						|
    def final(self, *, force=False):
 | 
						|
        try:
 | 
						|
            return self._final
 | 
						|
        except AttributeError:
 | 
						|
            if not self._closed:
 | 
						|
                if not force:
 | 
						|
                    raise Exception('no final results available yet')
 | 
						|
                else:
 | 
						|
                    return CapturedResults.Proxy(self)
 | 
						|
            self._final = CapturedResults(
 | 
						|
                self._unpack_stdout(),
 | 
						|
                self._unpack_stderr(),
 | 
						|
                self._unpack_exc(),
 | 
						|
            )
 | 
						|
            return self._final
 | 
						|
 | 
						|
 | 
						|
class CapturedResults(namedtuple('CapturedResults', 'stdout stderr exc')):
 | 
						|
 | 
						|
    class Proxy:
 | 
						|
        def __init__(self, capturing):
 | 
						|
            self._capturing = capturing
 | 
						|
        def _finish(self):
 | 
						|
            if self._capturing is None:
 | 
						|
                return
 | 
						|
            self._final = self._capturing.final()
 | 
						|
            self._capturing = None
 | 
						|
        def __iter__(self):
 | 
						|
            self._finish()
 | 
						|
            yield from self._final
 | 
						|
        def __len__(self):
 | 
						|
            self._finish()
 | 
						|
            return len(self._final)
 | 
						|
        def __getattr__(self, name):
 | 
						|
            self._finish()
 | 
						|
            if name.startswith('_'):
 | 
						|
                raise AttributeError(name)
 | 
						|
            return getattr(self._final, name)
 | 
						|
 | 
						|
    def raise_if_failed(self):
 | 
						|
        if self.exc is not None:
 | 
						|
            raise interpreters.ExecutionFailed(self.exc)
 | 
						|
 | 
						|
 | 
						|
def _captured_script(script, *, stdout=True, stderr=False, exc=False):
 | 
						|
    return CapturingResults.wrap_script(
 | 
						|
        script,
 | 
						|
        stdout=stdout,
 | 
						|
        stderr=stderr,
 | 
						|
        exc=exc,
 | 
						|
    )
 | 
						|
 | 
						|
 | 
						|
def clean_up_interpreters():
 | 
						|
    for interp in interpreters.list_all():
 | 
						|
        if interp.id == 0:  # main
 | 
						|
            continue
 | 
						|
        try:
 | 
						|
            interp.close()
 | 
						|
        except _interpreters.InterpreterError:
 | 
						|
            pass  # already destroyed
 | 
						|
 | 
						|
 | 
						|
def _run_output(interp, request, init=None):
 | 
						|
    script, results = _captured_script(request)
 | 
						|
    with results:
 | 
						|
        if init:
 | 
						|
            interp.prepare_main(init)
 | 
						|
        interp.exec(script)
 | 
						|
    return results.stdout()
 | 
						|
 | 
						|
 | 
						|
@contextlib.contextmanager
 | 
						|
def _running(interp):
 | 
						|
    r, w = os.pipe()
 | 
						|
    def run():
 | 
						|
        interp.exec(dedent(f"""
 | 
						|
            # wait for "signal"
 | 
						|
            with open({r}) as rpipe:
 | 
						|
                rpipe.read()
 | 
						|
            """))
 | 
						|
 | 
						|
    t = threading.Thread(target=run)
 | 
						|
    t.start()
 | 
						|
 | 
						|
    yield
 | 
						|
 | 
						|
    with open(w, 'w') as spipe:
 | 
						|
        spipe.write('done')
 | 
						|
    t.join()
 | 
						|
 | 
						|
 | 
						|
class TestBase(unittest.TestCase):
 | 
						|
 | 
						|
    def tearDown(self):
 | 
						|
        clean_up_interpreters()
 | 
						|
 | 
						|
    def pipe(self):
 | 
						|
        def ensure_closed(fd):
 | 
						|
            try:
 | 
						|
                os.close(fd)
 | 
						|
            except OSError:
 | 
						|
                pass
 | 
						|
        r, w = os.pipe()
 | 
						|
        self.addCleanup(lambda: ensure_closed(r))
 | 
						|
        self.addCleanup(lambda: ensure_closed(w))
 | 
						|
        return r, w
 | 
						|
 | 
						|
    def temp_dir(self):
 | 
						|
        tempdir = tempfile.mkdtemp()
 | 
						|
        tempdir = os.path.realpath(tempdir)
 | 
						|
        from test.support import os_helper
 | 
						|
        self.addCleanup(lambda: os_helper.rmtree(tempdir))
 | 
						|
        return tempdir
 | 
						|
 | 
						|
    @contextlib.contextmanager
 | 
						|
    def captured_thread_exception(self):
 | 
						|
        ctx = types.SimpleNamespace(caught=None)
 | 
						|
        def excepthook(args):
 | 
						|
            ctx.caught = args
 | 
						|
        orig_excepthook = threading.excepthook
 | 
						|
        threading.excepthook = excepthook
 | 
						|
        try:
 | 
						|
            yield ctx
 | 
						|
        finally:
 | 
						|
            threading.excepthook = orig_excepthook
 | 
						|
 | 
						|
    def make_script(self, filename, dirname=None, text=None):
 | 
						|
        if text:
 | 
						|
            text = dedent(text)
 | 
						|
        if dirname is None:
 | 
						|
            dirname = self.temp_dir()
 | 
						|
        filename = os.path.join(dirname, filename)
 | 
						|
 | 
						|
        os.makedirs(os.path.dirname(filename), exist_ok=True)
 | 
						|
        with open(filename, 'w', encoding='utf-8') as outfile:
 | 
						|
            outfile.write(text or '')
 | 
						|
        return filename
 | 
						|
 | 
						|
    def make_module(self, name, pathentry=None, text=None):
 | 
						|
        if text:
 | 
						|
            text = dedent(text)
 | 
						|
        if pathentry is None:
 | 
						|
            pathentry = self.temp_dir()
 | 
						|
        else:
 | 
						|
            os.makedirs(pathentry, exist_ok=True)
 | 
						|
        *subnames, basename = name.split('.')
 | 
						|
 | 
						|
        dirname = pathentry
 | 
						|
        for subname in subnames:
 | 
						|
            dirname = os.path.join(dirname, subname)
 | 
						|
            if os.path.isdir(dirname):
 | 
						|
                pass
 | 
						|
            elif os.path.exists(dirname):
 | 
						|
                raise Exception(dirname)
 | 
						|
            else:
 | 
						|
                os.mkdir(dirname)
 | 
						|
            initfile = os.path.join(dirname, '__init__.py')
 | 
						|
            if not os.path.exists(initfile):
 | 
						|
                with open(initfile, 'w'):
 | 
						|
                    pass
 | 
						|
        filename = os.path.join(dirname, basename + '.py')
 | 
						|
 | 
						|
        with open(filename, 'w', encoding='utf-8') as outfile:
 | 
						|
            outfile.write(text or '')
 | 
						|
        return filename
 | 
						|
 | 
						|
    @support.requires_subprocess()
 | 
						|
    def run_python(self, *argv):
 | 
						|
        proc = subprocess.run(
 | 
						|
            [sys.executable, *argv],
 | 
						|
            capture_output=True,
 | 
						|
            text=True,
 | 
						|
        )
 | 
						|
        return proc.returncode, proc.stdout, proc.stderr
 | 
						|
 | 
						|
    def assert_python_ok(self, *argv):
 | 
						|
        exitcode, stdout, stderr = self.run_python(*argv)
 | 
						|
        self.assertNotEqual(exitcode, 1)
 | 
						|
        return stdout, stderr
 | 
						|
 | 
						|
    def assert_python_failure(self, *argv):
 | 
						|
        exitcode, stdout, stderr = self.run_python(*argv)
 | 
						|
        self.assertNotEqual(exitcode, 0)
 | 
						|
        return stdout, stderr
 | 
						|
 | 
						|
    def assert_ns_equal(self, ns1, ns2, msg=None):
 | 
						|
        # This is mostly copied from TestCase.assertDictEqual.
 | 
						|
        self.assertEqual(type(ns1), type(ns2))
 | 
						|
        if ns1 == ns2:
 | 
						|
            return
 | 
						|
 | 
						|
        import difflib
 | 
						|
        import pprint
 | 
						|
        from unittest.util import _common_shorten_repr
 | 
						|
        standardMsg = '%s != %s' % _common_shorten_repr(ns1, ns2)
 | 
						|
        diff = ('\n' + '\n'.join(difflib.ndiff(
 | 
						|
                       pprint.pformat(vars(ns1)).splitlines(),
 | 
						|
                       pprint.pformat(vars(ns2)).splitlines())))
 | 
						|
        diff = f'namespace({diff})'
 | 
						|
        standardMsg = self._truncateMessage(standardMsg, diff)
 | 
						|
        self.fail(self._formatMessage(msg, standardMsg))
 | 
						|
 | 
						|
    def _run_string(self, interp, script):
 | 
						|
        wrapped, results = _captured_script(script, exc=False)
 | 
						|
        #_dump_script(wrapped)
 | 
						|
        with results:
 | 
						|
            if isinstance(interp, interpreters.Interpreter):
 | 
						|
                interp.exec(script)
 | 
						|
            else:
 | 
						|
                err = _interpreters.run_string(interp, wrapped)
 | 
						|
                if err is not None:
 | 
						|
                    return None, err
 | 
						|
        return results.stdout(), None
 | 
						|
 | 
						|
    def run_and_capture(self, interp, script):
 | 
						|
        text, err = self._run_string(interp, script)
 | 
						|
        if err is not None:
 | 
						|
            raise interpreters.ExecutionFailed(err)
 | 
						|
        else:
 | 
						|
            return text
 | 
						|
 | 
						|
    def interp_exists(self, interpid):
 | 
						|
        try:
 | 
						|
            _interpreters.whence(interpid)
 | 
						|
        except _interpreters.InterpreterNotFoundError:
 | 
						|
            return False
 | 
						|
        else:
 | 
						|
            return True
 | 
						|
 | 
						|
    @requires_test_modules
 | 
						|
    @contextlib.contextmanager
 | 
						|
    def interpreter_from_capi(self, config=None, whence=None):
 | 
						|
        if config is False:
 | 
						|
            if whence is None:
 | 
						|
                whence = _interpreters.WHENCE_LEGACY_CAPI
 | 
						|
            else:
 | 
						|
                assert whence in (_interpreters.WHENCE_LEGACY_CAPI,
 | 
						|
                                  _interpreters.WHENCE_UNKNOWN), repr(whence)
 | 
						|
            config = None
 | 
						|
        elif config is True:
 | 
						|
            config = _interpreters.new_config('default')
 | 
						|
        elif config is None:
 | 
						|
            if whence not in (
 | 
						|
                _interpreters.WHENCE_LEGACY_CAPI,
 | 
						|
                _interpreters.WHENCE_UNKNOWN,
 | 
						|
            ):
 | 
						|
                config = _interpreters.new_config('legacy')
 | 
						|
        elif isinstance(config, str):
 | 
						|
            config = _interpreters.new_config(config)
 | 
						|
 | 
						|
        if whence is None:
 | 
						|
            whence = _interpreters.WHENCE_XI
 | 
						|
 | 
						|
        interpid = _testinternalcapi.create_interpreter(config, whence=whence)
 | 
						|
        try:
 | 
						|
            yield interpid
 | 
						|
        finally:
 | 
						|
            try:
 | 
						|
                _testinternalcapi.destroy_interpreter(interpid)
 | 
						|
            except _interpreters.InterpreterNotFoundError:
 | 
						|
                pass
 | 
						|
 | 
						|
    @contextlib.contextmanager
 | 
						|
    def interpreter_obj_from_capi(self, config='legacy'):
 | 
						|
        with self.interpreter_from_capi(config) as interpid:
 | 
						|
            interp = interpreters.Interpreter(
 | 
						|
                interpid,
 | 
						|
                _whence=_interpreters.WHENCE_CAPI,
 | 
						|
                _ownsref=False,
 | 
						|
            )
 | 
						|
            yield interp, interpid
 | 
						|
 | 
						|
    @contextlib.contextmanager
 | 
						|
    def capturing(self, script):
 | 
						|
        wrapped, capturing = _captured_script(script, stdout=True, exc=True)
 | 
						|
        #_dump_script(wrapped)
 | 
						|
        with capturing:
 | 
						|
            yield wrapped, capturing.final(force=True)
 | 
						|
 | 
						|
    @requires_test_modules
 | 
						|
    def run_from_capi(self, interpid, script, *, main=False):
 | 
						|
        with self.capturing(script) as (wrapped, results):
 | 
						|
            rc = _testinternalcapi.exec_interpreter(interpid, wrapped, main=main)
 | 
						|
            assert rc == 0, rc
 | 
						|
        results.raise_if_failed()
 | 
						|
        return results.stdout
 | 
						|
 | 
						|
    @contextlib.contextmanager
 | 
						|
    def _running(self, run_interp, exec_interp):
 | 
						|
        token = b'\0'
 | 
						|
        r_in, w_in = self.pipe()
 | 
						|
        r_out, w_out = self.pipe()
 | 
						|
 | 
						|
        def close():
 | 
						|
            _close_file(r_in)
 | 
						|
            _close_file(w_in)
 | 
						|
            _close_file(r_out)
 | 
						|
            _close_file(w_out)
 | 
						|
 | 
						|
        # Start running (and wait).
 | 
						|
        script = dedent(f"""
 | 
						|
            import os
 | 
						|
            try:
 | 
						|
                # handshake
 | 
						|
                token = os.read({r_in}, 1)
 | 
						|
                os.write({w_out}, token)
 | 
						|
                # Wait for the "done" message.
 | 
						|
                os.read({r_in}, 1)
 | 
						|
            except BrokenPipeError:
 | 
						|
                pass
 | 
						|
            except OSError as exc:
 | 
						|
                if exc.errno != 9:
 | 
						|
                    raise  # re-raise
 | 
						|
                # It was closed already.
 | 
						|
            """)
 | 
						|
        failed = None
 | 
						|
        def run():
 | 
						|
            nonlocal failed
 | 
						|
            try:
 | 
						|
                run_interp(script)
 | 
						|
            except Exception as exc:
 | 
						|
                failed = exc
 | 
						|
                close()
 | 
						|
        t = threading.Thread(target=run)
 | 
						|
        t.start()
 | 
						|
 | 
						|
        # handshake
 | 
						|
        try:
 | 
						|
            os.write(w_in, token)
 | 
						|
            token2 = os.read(r_out, 1)
 | 
						|
            assert token2 == token, (token2, token)
 | 
						|
        except OSError:
 | 
						|
            t.join()
 | 
						|
            if failed is not None:
 | 
						|
                raise failed
 | 
						|
 | 
						|
        # CM __exit__()
 | 
						|
        try:
 | 
						|
            try:
 | 
						|
                yield
 | 
						|
            finally:
 | 
						|
                # Send "done".
 | 
						|
                os.write(w_in, b'\0')
 | 
						|
        finally:
 | 
						|
            close()
 | 
						|
            t.join()
 | 
						|
            if failed is not None:
 | 
						|
                raise failed
 | 
						|
 | 
						|
    @contextlib.contextmanager
 | 
						|
    def running(self, interp):
 | 
						|
        if isinstance(interp, int):
 | 
						|
            interpid = interp
 | 
						|
            def exec_interp(script):
 | 
						|
                exc = _interpreters.exec(interpid, script)
 | 
						|
                assert exc is None, exc
 | 
						|
            run_interp = exec_interp
 | 
						|
        else:
 | 
						|
            def run_interp(script):
 | 
						|
                text = self.run_and_capture(interp, script)
 | 
						|
                assert text == '', repr(text)
 | 
						|
            def exec_interp(script):
 | 
						|
                interp.exec(script)
 | 
						|
        with self._running(run_interp, exec_interp):
 | 
						|
            yield
 | 
						|
 | 
						|
    @requires_test_modules
 | 
						|
    @contextlib.contextmanager
 | 
						|
    def running_from_capi(self, interpid, *, main=False):
 | 
						|
        def run_interp(script):
 | 
						|
            text = self.run_from_capi(interpid, script, main=main)
 | 
						|
            assert text == '', repr(text)
 | 
						|
        def exec_interp(script):
 | 
						|
            rc = _testinternalcapi.exec_interpreter(interpid, script)
 | 
						|
            assert rc == 0, rc
 | 
						|
        with self._running(run_interp, exec_interp):
 | 
						|
            yield
 | 
						|
 | 
						|
    @requires_test_modules
 | 
						|
    def run_temp_from_capi(self, script, config='legacy'):
 | 
						|
        if config is False:
 | 
						|
            # Force using Py_NewInterpreter().
 | 
						|
            run_in_interp = (lambda s, c: _testcapi.run_in_subinterp(s))
 | 
						|
            config = None
 | 
						|
        else:
 | 
						|
            run_in_interp = _testinternalcapi.run_in_subinterp_with_config
 | 
						|
            if config is True:
 | 
						|
                config = 'default'
 | 
						|
            if isinstance(config, str):
 | 
						|
                config = _interpreters.new_config(config)
 | 
						|
        with self.capturing(script) as (wrapped, results):
 | 
						|
            rc = run_in_interp(wrapped, config)
 | 
						|
            assert rc == 0, rc
 | 
						|
        results.raise_if_failed()
 | 
						|
        return results.stdout
 |