mirror of
				https://github.com/python/cpython.git
				synced 2025-11-03 23:21:29 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			213 lines
		
	
	
	
		
			8.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			213 lines
		
	
	
	
		
			8.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
# Run the tests in Programs/_testembed.c (tests for the CPython embedding APIs)
 | 
						|
from test import support
 | 
						|
import unittest
 | 
						|
 | 
						|
from collections import namedtuple
 | 
						|
import os
 | 
						|
import re
 | 
						|
import subprocess
 | 
						|
import sys
 | 
						|
 | 
						|
 | 
						|
class EmbeddingTests(unittest.TestCase):
 | 
						|
    def setUp(self):
 | 
						|
        here = os.path.abspath(__file__)
 | 
						|
        basepath = os.path.dirname(os.path.dirname(os.path.dirname(here)))
 | 
						|
        exename = "_testembed"
 | 
						|
        if sys.platform.startswith("win"):
 | 
						|
            ext = ("_d" if "_d" in sys.executable else "") + ".exe"
 | 
						|
            exename += ext
 | 
						|
            exepath = os.path.dirname(sys.executable)
 | 
						|
        else:
 | 
						|
            exepath = os.path.join(basepath, "Programs")
 | 
						|
        self.test_exe = exe = os.path.join(exepath, exename)
 | 
						|
        if not os.path.exists(exe):
 | 
						|
            self.skipTest("%r doesn't exist" % exe)
 | 
						|
        # This is needed otherwise we get a fatal error:
 | 
						|
        # "Py_Initialize: Unable to get the locale encoding
 | 
						|
        # LookupError: no codec search functions registered: can't find encoding"
 | 
						|
        self.oldcwd = os.getcwd()
 | 
						|
        os.chdir(basepath)
 | 
						|
 | 
						|
    def tearDown(self):
 | 
						|
        os.chdir(self.oldcwd)
 | 
						|
 | 
						|
    def run_embedded_interpreter(self, *args, env=None):
 | 
						|
        """Runs a test in the embedded interpreter"""
 | 
						|
        cmd = [self.test_exe]
 | 
						|
        cmd.extend(args)
 | 
						|
        if env is not None and sys.platform == 'win32':
 | 
						|
            # Windows requires at least the SYSTEMROOT environment variable to
 | 
						|
            # start Python.
 | 
						|
            env = env.copy()
 | 
						|
            env['SYSTEMROOT'] = os.environ['SYSTEMROOT']
 | 
						|
 | 
						|
        p = subprocess.Popen(cmd,
 | 
						|
                             stdout=subprocess.PIPE,
 | 
						|
                             stderr=subprocess.PIPE,
 | 
						|
                             universal_newlines=True,
 | 
						|
                             env=env)
 | 
						|
        (out, err) = p.communicate()
 | 
						|
        if p.returncode != 0 and support.verbose:
 | 
						|
            print(f"--- {cmd} failed ---")
 | 
						|
            print(f"stdout:\n{out}")
 | 
						|
            print(f"stderr:\n{out}")
 | 
						|
            print(f"------")
 | 
						|
 | 
						|
        self.assertEqual(p.returncode, 0,
 | 
						|
                         "bad returncode %d, stderr is %r" %
 | 
						|
                         (p.returncode, err))
 | 
						|
        return out, err
 | 
						|
 | 
						|
    def run_repeated_init_and_subinterpreters(self):
 | 
						|
        out, err = self.run_embedded_interpreter("repeated_init_and_subinterpreters")
 | 
						|
        self.assertEqual(err, "")
 | 
						|
 | 
						|
        # The output from _testembed looks like this:
 | 
						|
        # --- Pass 0 ---
 | 
						|
        # interp 0 <0x1cf9330>, thread state <0x1cf9700>: id(modules) = 139650431942728
 | 
						|
        # interp 1 <0x1d4f690>, thread state <0x1d35350>: id(modules) = 139650431165784
 | 
						|
        # interp 2 <0x1d5a690>, thread state <0x1d99ed0>: id(modules) = 139650413140368
 | 
						|
        # interp 3 <0x1d4f690>, thread state <0x1dc3340>: id(modules) = 139650412862200
 | 
						|
        # interp 0 <0x1cf9330>, thread state <0x1cf9700>: id(modules) = 139650431942728
 | 
						|
        # --- Pass 1 ---
 | 
						|
        # ...
 | 
						|
 | 
						|
        interp_pat = (r"^interp (\d+) <(0x[\dA-F]+)>, "
 | 
						|
                      r"thread state <(0x[\dA-F]+)>: "
 | 
						|
                      r"id\(modules\) = ([\d]+)$")
 | 
						|
        Interp = namedtuple("Interp", "id interp tstate modules")
 | 
						|
 | 
						|
        numloops = 0
 | 
						|
        current_run = []
 | 
						|
        for line in out.splitlines():
 | 
						|
            if line == "--- Pass {} ---".format(numloops):
 | 
						|
                self.assertEqual(len(current_run), 0)
 | 
						|
                if support.verbose:
 | 
						|
                    print(line)
 | 
						|
                numloops += 1
 | 
						|
                continue
 | 
						|
 | 
						|
            self.assertLess(len(current_run), 5)
 | 
						|
            match = re.match(interp_pat, line)
 | 
						|
            if match is None:
 | 
						|
                self.assertRegex(line, interp_pat)
 | 
						|
 | 
						|
            # Parse the line from the loop.  The first line is the main
 | 
						|
            # interpreter and the 3 afterward are subinterpreters.
 | 
						|
            interp = Interp(*match.groups())
 | 
						|
            if support.verbose:
 | 
						|
                print(interp)
 | 
						|
            self.assertTrue(interp.interp)
 | 
						|
            self.assertTrue(interp.tstate)
 | 
						|
            self.assertTrue(interp.modules)
 | 
						|
            current_run.append(interp)
 | 
						|
 | 
						|
            # The last line in the loop should be the same as the first.
 | 
						|
            if len(current_run) == 5:
 | 
						|
                main = current_run[0]
 | 
						|
                self.assertEqual(interp, main)
 | 
						|
                yield current_run
 | 
						|
                current_run = []
 | 
						|
 | 
						|
    def test_subinterps_main(self):
 | 
						|
        for run in self.run_repeated_init_and_subinterpreters():
 | 
						|
            main = run[0]
 | 
						|
 | 
						|
            self.assertEqual(main.id, '0')
 | 
						|
 | 
						|
    def test_subinterps_different_ids(self):
 | 
						|
        for run in self.run_repeated_init_and_subinterpreters():
 | 
						|
            main, *subs, _ = run
 | 
						|
 | 
						|
            mainid = int(main.id)
 | 
						|
            for i, sub in enumerate(subs):
 | 
						|
                self.assertEqual(sub.id, str(mainid + i + 1))
 | 
						|
 | 
						|
    def test_subinterps_distinct_state(self):
 | 
						|
        for run in self.run_repeated_init_and_subinterpreters():
 | 
						|
            main, *subs, _ = run
 | 
						|
 | 
						|
            if '0x0' in main:
 | 
						|
                # XXX Fix on Windows (and other platforms): something
 | 
						|
                # is going on with the pointers in Programs/_testembed.c.
 | 
						|
                # interp.interp is 0x0 and interp.modules is the same
 | 
						|
                # between interpreters.
 | 
						|
                raise unittest.SkipTest('platform prints pointers as 0x0')
 | 
						|
 | 
						|
            for sub in subs:
 | 
						|
                # A new subinterpreter may have the same
 | 
						|
                # PyInterpreterState pointer as a previous one if
 | 
						|
                # the earlier one has already been destroyed.  So
 | 
						|
                # we compare with the main interpreter.  The same
 | 
						|
                # applies to tstate.
 | 
						|
                self.assertNotEqual(sub.interp, main.interp)
 | 
						|
                self.assertNotEqual(sub.tstate, main.tstate)
 | 
						|
                self.assertNotEqual(sub.modules, main.modules)
 | 
						|
 | 
						|
    def test_forced_io_encoding(self):
 | 
						|
        # Checks forced configuration of embedded interpreter IO streams
 | 
						|
        env = dict(os.environ, PYTHONIOENCODING="utf-8:surrogateescape")
 | 
						|
        out, err = self.run_embedded_interpreter("forced_io_encoding", env=env)
 | 
						|
        if support.verbose > 1:
 | 
						|
            print()
 | 
						|
            print(out)
 | 
						|
            print(err)
 | 
						|
        expected_stream_encoding = "utf-8"
 | 
						|
        expected_errors = "surrogateescape"
 | 
						|
        expected_output = '\n'.join([
 | 
						|
        "--- Use defaults ---",
 | 
						|
        "Expected encoding: default",
 | 
						|
        "Expected errors: default",
 | 
						|
        "stdin: {in_encoding}:{errors}",
 | 
						|
        "stdout: {out_encoding}:{errors}",
 | 
						|
        "stderr: {out_encoding}:backslashreplace",
 | 
						|
        "--- Set errors only ---",
 | 
						|
        "Expected encoding: default",
 | 
						|
        "Expected errors: ignore",
 | 
						|
        "stdin: {in_encoding}:ignore",
 | 
						|
        "stdout: {out_encoding}:ignore",
 | 
						|
        "stderr: {out_encoding}:backslashreplace",
 | 
						|
        "--- Set encoding only ---",
 | 
						|
        "Expected encoding: latin-1",
 | 
						|
        "Expected errors: default",
 | 
						|
        "stdin: latin-1:{errors}",
 | 
						|
        "stdout: latin-1:{errors}",
 | 
						|
        "stderr: latin-1:backslashreplace",
 | 
						|
        "--- Set encoding and errors ---",
 | 
						|
        "Expected encoding: latin-1",
 | 
						|
        "Expected errors: replace",
 | 
						|
        "stdin: latin-1:replace",
 | 
						|
        "stdout: latin-1:replace",
 | 
						|
        "stderr: latin-1:backslashreplace"])
 | 
						|
        expected_output = expected_output.format(
 | 
						|
                                in_encoding=expected_stream_encoding,
 | 
						|
                                out_encoding=expected_stream_encoding,
 | 
						|
                                errors=expected_errors)
 | 
						|
        # This is useful if we ever trip over odd platform behaviour
 | 
						|
        self.maxDiff = None
 | 
						|
        self.assertEqual(out.strip(), expected_output)
 | 
						|
 | 
						|
    def test_pre_initialization_api(self):
 | 
						|
        """
 | 
						|
        Checks the few parts of the C-API that work before the runtine
 | 
						|
        is initialized (via Py_Initialize()).
 | 
						|
        """
 | 
						|
        env = dict(os.environ, PYTHONPATH=os.pathsep.join(sys.path))
 | 
						|
        out, err = self.run_embedded_interpreter("pre_initialization_api", env=env)
 | 
						|
        self.assertEqual(out, '')
 | 
						|
        self.assertEqual(err, '')
 | 
						|
 | 
						|
    def test_bpo20891(self):
 | 
						|
        """
 | 
						|
        bpo-20891: Calling PyGILState_Ensure in a non-Python thread before
 | 
						|
        calling PyEval_InitThreads() must not crash. PyGILState_Ensure() must
 | 
						|
        call PyEval_InitThreads() for us in this case.
 | 
						|
        """
 | 
						|
        out, err = self.run_embedded_interpreter("bpo20891")
 | 
						|
        self.assertEqual(out, '')
 | 
						|
        self.assertEqual(err, '')
 | 
						|
 | 
						|
 | 
						|
if __name__ == "__main__":
 | 
						|
    unittest.main()
 |