| 
									
										
										
										
											2017-11-28 08:11:51 +10:00
										 |  |  | # Run the tests in Programs/_testembed.c (tests for the CPython embedding APIs) | 
					
						
							|  |  |  | from test import support | 
					
						
							|  |  |  | import unittest | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | from collections import namedtuple | 
					
						
							|  |  |  | import os | 
					
						
							|  |  |  | import re | 
					
						
							|  |  |  | import subprocess | 
					
						
							|  |  |  | import sys | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class EmbeddingTests(unittest.TestCase): | 
					
						
							|  |  |  |     def setUp(self): | 
					
						
							|  |  |  |         here = os.path.abspath(__file__) | 
					
						
							|  |  |  |         basepath = os.path.dirname(os.path.dirname(os.path.dirname(here))) | 
					
						
							|  |  |  |         exename = "_testembed" | 
					
						
							|  |  |  |         if sys.platform.startswith("win"): | 
					
						
							|  |  |  |             ext = ("_d" if "_d" in sys.executable else "") + ".exe" | 
					
						
							|  |  |  |             exename += ext | 
					
						
							|  |  |  |             exepath = os.path.dirname(sys.executable) | 
					
						
							|  |  |  |         else: | 
					
						
							|  |  |  |             exepath = os.path.join(basepath, "Programs") | 
					
						
							|  |  |  |         self.test_exe = exe = os.path.join(exepath, exename) | 
					
						
							|  |  |  |         if not os.path.exists(exe): | 
					
						
							|  |  |  |             self.skipTest("%r doesn't exist" % exe) | 
					
						
							|  |  |  |         # This is needed otherwise we get a fatal error: | 
					
						
							|  |  |  |         # "Py_Initialize: Unable to get the locale encoding | 
					
						
							|  |  |  |         # LookupError: no codec search functions registered: can't find encoding" | 
					
						
							|  |  |  |         self.oldcwd = os.getcwd() | 
					
						
							|  |  |  |         os.chdir(basepath) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def tearDown(self): | 
					
						
							|  |  |  |         os.chdir(self.oldcwd) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def run_embedded_interpreter(self, *args, env=None): | 
					
						
							|  |  |  |         """Runs a test in the embedded interpreter""" | 
					
						
							|  |  |  |         cmd = [self.test_exe] | 
					
						
							|  |  |  |         cmd.extend(args) | 
					
						
							|  |  |  |         if env is not None and sys.platform == 'win32': | 
					
						
							|  |  |  |             # Windows requires at least the SYSTEMROOT environment variable to | 
					
						
							|  |  |  |             # start Python. | 
					
						
							|  |  |  |             env = env.copy() | 
					
						
							|  |  |  |             env['SYSTEMROOT'] = os.environ['SYSTEMROOT'] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         p = subprocess.Popen(cmd, | 
					
						
							|  |  |  |                              stdout=subprocess.PIPE, | 
					
						
							|  |  |  |                              stderr=subprocess.PIPE, | 
					
						
							|  |  |  |                              universal_newlines=True, | 
					
						
							|  |  |  |                              env=env) | 
					
						
							|  |  |  |         (out, err) = p.communicate() | 
					
						
							|  |  |  |         if p.returncode != 0 and support.verbose: | 
					
						
							|  |  |  |             print(f"--- {cmd} failed ---") | 
					
						
							|  |  |  |             print(f"stdout:\n{out}") | 
					
						
							|  |  |  |             print(f"stderr:\n{out}") | 
					
						
							|  |  |  |             print(f"------") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.assertEqual(p.returncode, 0, | 
					
						
							|  |  |  |                          "bad returncode %d, stderr is %r" % | 
					
						
							|  |  |  |                          (p.returncode, err)) | 
					
						
							|  |  |  |         return out, err | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def run_repeated_init_and_subinterpreters(self): | 
					
						
							|  |  |  |         out, err = self.run_embedded_interpreter("repeated_init_and_subinterpreters") | 
					
						
							|  |  |  |         self.assertEqual(err, "") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # The output from _testembed looks like this: | 
					
						
							|  |  |  |         # --- Pass 0 --- | 
					
						
							|  |  |  |         # interp 0 <0x1cf9330>, thread state <0x1cf9700>: id(modules) = 139650431942728 | 
					
						
							|  |  |  |         # interp 1 <0x1d4f690>, thread state <0x1d35350>: id(modules) = 139650431165784 | 
					
						
							|  |  |  |         # interp 2 <0x1d5a690>, thread state <0x1d99ed0>: id(modules) = 139650413140368 | 
					
						
							|  |  |  |         # interp 3 <0x1d4f690>, thread state <0x1dc3340>: id(modules) = 139650412862200 | 
					
						
							|  |  |  |         # interp 0 <0x1cf9330>, thread state <0x1cf9700>: id(modules) = 139650431942728 | 
					
						
							|  |  |  |         # --- Pass 1 --- | 
					
						
							|  |  |  |         # ... | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         interp_pat = (r"^interp (\d+) <(0x[\dA-F]+)>, " | 
					
						
							|  |  |  |                       r"thread state <(0x[\dA-F]+)>: " | 
					
						
							|  |  |  |                       r"id\(modules\) = ([\d]+)$") | 
					
						
							|  |  |  |         Interp = namedtuple("Interp", "id interp tstate modules") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         numloops = 0 | 
					
						
							|  |  |  |         current_run = [] | 
					
						
							|  |  |  |         for line in out.splitlines(): | 
					
						
							|  |  |  |             if line == "--- Pass {} ---".format(numloops): | 
					
						
							|  |  |  |                 self.assertEqual(len(current_run), 0) | 
					
						
							|  |  |  |                 if support.verbose: | 
					
						
							|  |  |  |                     print(line) | 
					
						
							|  |  |  |                 numloops += 1 | 
					
						
							|  |  |  |                 continue | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             self.assertLess(len(current_run), 5) | 
					
						
							|  |  |  |             match = re.match(interp_pat, line) | 
					
						
							|  |  |  |             if match is None: | 
					
						
							|  |  |  |                 self.assertRegex(line, interp_pat) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             # Parse the line from the loop.  The first line is the main | 
					
						
							|  |  |  |             # interpreter and the 3 afterward are subinterpreters. | 
					
						
							|  |  |  |             interp = Interp(*match.groups()) | 
					
						
							|  |  |  |             if support.verbose: | 
					
						
							|  |  |  |                 print(interp) | 
					
						
							|  |  |  |             self.assertTrue(interp.interp) | 
					
						
							|  |  |  |             self.assertTrue(interp.tstate) | 
					
						
							|  |  |  |             self.assertTrue(interp.modules) | 
					
						
							|  |  |  |             current_run.append(interp) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             # The last line in the loop should be the same as the first. | 
					
						
							|  |  |  |             if len(current_run) == 5: | 
					
						
							|  |  |  |                 main = current_run[0] | 
					
						
							|  |  |  |                 self.assertEqual(interp, main) | 
					
						
							|  |  |  |                 yield current_run | 
					
						
							|  |  |  |                 current_run = [] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_subinterps_main(self): | 
					
						
							|  |  |  |         for run in self.run_repeated_init_and_subinterpreters(): | 
					
						
							|  |  |  |             main = run[0] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             self.assertEqual(main.id, '0') | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_subinterps_different_ids(self): | 
					
						
							|  |  |  |         for run in self.run_repeated_init_and_subinterpreters(): | 
					
						
							|  |  |  |             main, *subs, _ = run | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             mainid = int(main.id) | 
					
						
							|  |  |  |             for i, sub in enumerate(subs): | 
					
						
							|  |  |  |                 self.assertEqual(sub.id, str(mainid + i + 1)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_subinterps_distinct_state(self): | 
					
						
							|  |  |  |         for run in self.run_repeated_init_and_subinterpreters(): | 
					
						
							|  |  |  |             main, *subs, _ = run | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             if '0x0' in main: | 
					
						
							|  |  |  |                 # XXX Fix on Windows (and other platforms): something | 
					
						
							|  |  |  |                 # is going on with the pointers in Programs/_testembed.c. | 
					
						
							|  |  |  |                 # interp.interp is 0x0 and interp.modules is the same | 
					
						
							|  |  |  |                 # between interpreters. | 
					
						
							|  |  |  |                 raise unittest.SkipTest('platform prints pointers as 0x0') | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             for sub in subs: | 
					
						
							|  |  |  |                 # A new subinterpreter may have the same | 
					
						
							|  |  |  |                 # PyInterpreterState pointer as a previous one if | 
					
						
							|  |  |  |                 # the earlier one has already been destroyed.  So | 
					
						
							|  |  |  |                 # we compare with the main interpreter.  The same | 
					
						
							|  |  |  |                 # applies to tstate. | 
					
						
							|  |  |  |                 self.assertNotEqual(sub.interp, main.interp) | 
					
						
							|  |  |  |                 self.assertNotEqual(sub.tstate, main.tstate) | 
					
						
							|  |  |  |                 self.assertNotEqual(sub.modules, main.modules) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_forced_io_encoding(self): | 
					
						
							|  |  |  |         # Checks forced configuration of embedded interpreter IO streams | 
					
						
							|  |  |  |         env = dict(os.environ, PYTHONIOENCODING="utf-8:surrogateescape") | 
					
						
							|  |  |  |         out, err = self.run_embedded_interpreter("forced_io_encoding", env=env) | 
					
						
							|  |  |  |         if support.verbose > 1: | 
					
						
							|  |  |  |             print() | 
					
						
							|  |  |  |             print(out) | 
					
						
							|  |  |  |             print(err) | 
					
						
							|  |  |  |         expected_stream_encoding = "utf-8" | 
					
						
							|  |  |  |         expected_errors = "surrogateescape" | 
					
						
							|  |  |  |         expected_output = '\n'.join([ | 
					
						
							|  |  |  |         "--- Use defaults ---", | 
					
						
							|  |  |  |         "Expected encoding: default", | 
					
						
							|  |  |  |         "Expected errors: default", | 
					
						
							|  |  |  |         "stdin: {in_encoding}:{errors}", | 
					
						
							|  |  |  |         "stdout: {out_encoding}:{errors}", | 
					
						
							|  |  |  |         "stderr: {out_encoding}:backslashreplace", | 
					
						
							|  |  |  |         "--- Set errors only ---", | 
					
						
							|  |  |  |         "Expected encoding: default", | 
					
						
							|  |  |  |         "Expected errors: ignore", | 
					
						
							|  |  |  |         "stdin: {in_encoding}:ignore", | 
					
						
							|  |  |  |         "stdout: {out_encoding}:ignore", | 
					
						
							|  |  |  |         "stderr: {out_encoding}:backslashreplace", | 
					
						
							|  |  |  |         "--- Set encoding only ---", | 
					
						
							|  |  |  |         "Expected encoding: latin-1", | 
					
						
							|  |  |  |         "Expected errors: default", | 
					
						
							|  |  |  |         "stdin: latin-1:{errors}", | 
					
						
							|  |  |  |         "stdout: latin-1:{errors}", | 
					
						
							|  |  |  |         "stderr: latin-1:backslashreplace", | 
					
						
							|  |  |  |         "--- Set encoding and errors ---", | 
					
						
							|  |  |  |         "Expected encoding: latin-1", | 
					
						
							|  |  |  |         "Expected errors: replace", | 
					
						
							|  |  |  |         "stdin: latin-1:replace", | 
					
						
							|  |  |  |         "stdout: latin-1:replace", | 
					
						
							|  |  |  |         "stderr: latin-1:backslashreplace"]) | 
					
						
							|  |  |  |         expected_output = expected_output.format( | 
					
						
							|  |  |  |                                 in_encoding=expected_stream_encoding, | 
					
						
							|  |  |  |                                 out_encoding=expected_stream_encoding, | 
					
						
							|  |  |  |                                 errors=expected_errors) | 
					
						
							|  |  |  |         # This is useful if we ever trip over odd platform behaviour | 
					
						
							|  |  |  |         self.maxDiff = None | 
					
						
							|  |  |  |         self.assertEqual(out.strip(), expected_output) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_pre_initialization_api(self): | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         Checks the few parts of the C-API that work before the runtine | 
					
						
							|  |  |  |         is initialized (via Py_Initialize()). | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         env = dict(os.environ, PYTHONPATH=os.pathsep.join(sys.path)) | 
					
						
							|  |  |  |         out, err = self.run_embedded_interpreter("pre_initialization_api", env=env) | 
					
						
							|  |  |  |         self.assertEqual(out, '') | 
					
						
							|  |  |  |         self.assertEqual(err, '') | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-11-30 22:05:00 +01:00
										 |  |  |     def test_bpo20891(self): | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         bpo-20891: Calling PyGILState_Ensure in a non-Python thread before | 
					
						
							|  |  |  |         calling PyEval_InitThreads() must not crash. PyGILState_Ensure() must | 
					
						
							|  |  |  |         call PyEval_InitThreads() for us in this case. | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         out, err = self.run_embedded_interpreter("bpo20891") | 
					
						
							|  |  |  |         self.assertEqual(out, '') | 
					
						
							|  |  |  |         self.assertEqual(err, '') | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-11-28 08:11:51 +10:00
										 |  |  | 
 | 
					
						
							|  |  |  | if __name__ == "__main__": | 
					
						
							|  |  |  |     unittest.main() |