mirror of
				https://github.com/python/cpython.git
				synced 2025-10-31 05:31:20 +00:00 
			
		
		
		
	 89664d4d48
			
		
	
	
		89664d4d48
		
			
		
	
	
	
	
		
			
			gh-126223: Propagate unicode errors in `_interpreters.create()` (GH-126224)
(cherry picked from commit 01415213d7)
Co-authored-by: Peter Bierma <zintensitydev@gmail.com>
Co-authored-by: sobolevn <mail@sobolevn.me>
Co-authored-by: Bénédikt Tran <10796600+picnixz@users.noreply.github.com>
		
	
			
		
			
				
	
	
		
			1668 lines
		
	
	
	
		
			57 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			1668 lines
		
	
	
	
		
			57 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import os
 | |
| import pickle
 | |
| import sys
 | |
| from textwrap import dedent, indent
 | |
| import threading
 | |
| import types
 | |
| import unittest
 | |
| 
 | |
| from test import support
 | |
| from test.support import import_helper
 | |
| # Raise SkipTest if subinterpreters not supported.
 | |
| _interpreters = import_helper.import_module('_interpreters')
 | |
| from test.support import Py_GIL_DISABLED
 | |
| from test.support import interpreters
 | |
| from test.support import force_not_colorized
 | |
| from test.support.interpreters import (
 | |
|     InterpreterError, InterpreterNotFoundError, ExecutionFailed,
 | |
| )
 | |
| from .utils import (
 | |
|     _captured_script, _run_output, _running, TestBase,
 | |
|     requires_test_modules, _testinternalcapi,
 | |
| )
 | |
| 
 | |
| 
 | |
| WHENCE_STR_UNKNOWN = 'unknown'
 | |
| WHENCE_STR_RUNTIME = 'runtime init'
 | |
| WHENCE_STR_LEGACY_CAPI = 'legacy C-API'
 | |
| WHENCE_STR_CAPI = 'C-API'
 | |
| WHENCE_STR_XI = 'cross-interpreter C-API'
 | |
| WHENCE_STR_STDLIB = '_interpreters module'
 | |
| 
 | |
| 
 | |
| class ModuleTests(TestBase):
 | |
| 
 | |
|     def test_queue_aliases(self):
 | |
|         first = [
 | |
|             interpreters.create_queue,
 | |
|             interpreters.Queue,
 | |
|             interpreters.QueueEmpty,
 | |
|             interpreters.QueueFull,
 | |
|         ]
 | |
|         second = [
 | |
|             interpreters.create_queue,
 | |
|             interpreters.Queue,
 | |
|             interpreters.QueueEmpty,
 | |
|             interpreters.QueueFull,
 | |
|         ]
 | |
|         self.assertEqual(second, first)
 | |
| 
 | |
| 
 | |
| class CreateTests(TestBase):
 | |
| 
 | |
|     def test_in_main(self):
 | |
|         interp = interpreters.create()
 | |
|         self.assertIsInstance(interp, interpreters.Interpreter)
 | |
|         self.assertIn(interp, interpreters.list_all())
 | |
| 
 | |
|         # GH-126221: Passing an invalid Unicode character used to cause a SystemError
 | |
|         self.assertRaises(UnicodeEncodeError, _interpreters.create, '\udc80')
 | |
| 
 | |
|     def test_in_thread(self):
 | |
|         lock = threading.Lock()
 | |
|         interp = None
 | |
|         def f():
 | |
|             nonlocal interp
 | |
|             interp = interpreters.create()
 | |
|             lock.acquire()
 | |
|             lock.release()
 | |
|         t = threading.Thread(target=f)
 | |
|         with lock:
 | |
|             t.start()
 | |
|         t.join()
 | |
|         self.assertIn(interp, interpreters.list_all())
 | |
| 
 | |
|     def test_in_subinterpreter(self):
 | |
|         main, = interpreters.list_all()
 | |
|         interp = interpreters.create()
 | |
|         out = _run_output(interp, dedent("""
 | |
|             from test.support import interpreters
 | |
|             interp = interpreters.create()
 | |
|             print(interp.id)
 | |
|             """))
 | |
|         interp2 = interpreters.Interpreter(int(out))
 | |
|         self.assertEqual(interpreters.list_all(), [main, interp, interp2])
 | |
| 
 | |
|     def test_after_destroy_all(self):
 | |
|         before = set(interpreters.list_all())
 | |
|         # Create 3 subinterpreters.
 | |
|         interp_lst = []
 | |
|         for _ in range(3):
 | |
|             interps = interpreters.create()
 | |
|             interp_lst.append(interps)
 | |
|         # Now destroy them.
 | |
|         for interp in interp_lst:
 | |
|             interp.close()
 | |
|         # Finally, create another.
 | |
|         interp = interpreters.create()
 | |
|         self.assertEqual(set(interpreters.list_all()), before | {interp})
 | |
| 
 | |
|     def test_after_destroy_some(self):
 | |
|         before = set(interpreters.list_all())
 | |
|         # Create 3 subinterpreters.
 | |
|         interp1 = interpreters.create()
 | |
|         interp2 = interpreters.create()
 | |
|         interp3 = interpreters.create()
 | |
|         # Now destroy 2 of them.
 | |
|         interp1.close()
 | |
|         interp2.close()
 | |
|         # Finally, create another.
 | |
|         interp = interpreters.create()
 | |
|         self.assertEqual(set(interpreters.list_all()), before | {interp3, interp})
 | |
| 
 | |
| 
 | |
| class GetMainTests(TestBase):
 | |
| 
 | |
|     def test_id(self):
 | |
|         main = interpreters.get_main()
 | |
|         self.assertEqual(main.id, 0)
 | |
| 
 | |
|     def test_current(self):
 | |
|         main = interpreters.get_main()
 | |
|         current = interpreters.get_current()
 | |
|         self.assertIs(main, current)
 | |
| 
 | |
|     def test_idempotent(self):
 | |
|         main1 = interpreters.get_main()
 | |
|         main2 = interpreters.get_main()
 | |
|         self.assertIs(main1, main2)
 | |
| 
 | |
| 
 | |
| class GetCurrentTests(TestBase):
 | |
| 
 | |
|     def test_main(self):
 | |
|         main = interpreters.get_main()
 | |
|         current = interpreters.get_current()
 | |
|         self.assertEqual(current, main)
 | |
| 
 | |
|     def test_subinterpreter(self):
 | |
|         main = interpreters.get_main()
 | |
|         interp = interpreters.create()
 | |
|         out = _run_output(interp, dedent("""
 | |
|             from test.support import interpreters
 | |
|             cur = interpreters.get_current()
 | |
|             print(cur.id)
 | |
|             """))
 | |
|         current = interpreters.Interpreter(int(out))
 | |
|         self.assertEqual(current, interp)
 | |
|         self.assertNotEqual(current, main)
 | |
| 
 | |
|     def test_idempotent(self):
 | |
|         with self.subTest('main'):
 | |
|             cur1 = interpreters.get_current()
 | |
|             cur2 = interpreters.get_current()
 | |
|             self.assertIs(cur1, cur2)
 | |
| 
 | |
|         with self.subTest('subinterpreter'):
 | |
|             interp = interpreters.create()
 | |
|             out = _run_output(interp, dedent("""
 | |
|                 from test.support import interpreters
 | |
|                 cur = interpreters.get_current()
 | |
|                 print(id(cur))
 | |
|                 cur = interpreters.get_current()
 | |
|                 print(id(cur))
 | |
|                 """))
 | |
|             objid1, objid2 = (int(v) for v in out.splitlines())
 | |
|             self.assertEqual(objid1, objid2)
 | |
| 
 | |
|         with self.subTest('per-interpreter'):
 | |
|             interp = interpreters.create()
 | |
|             out = _run_output(interp, dedent("""
 | |
|                 from test.support import interpreters
 | |
|                 cur = interpreters.get_current()
 | |
|                 print(id(cur))
 | |
|                 """))
 | |
|             id1 = int(out)
 | |
|             id2 = id(interp)
 | |
|             self.assertNotEqual(id1, id2)
 | |
| 
 | |
|     @requires_test_modules
 | |
|     def test_created_with_capi(self):
 | |
|         expected = _testinternalcapi.next_interpreter_id()
 | |
|         text = self.run_temp_from_capi(f"""
 | |
|             import {interpreters.__name__} as interpreters
 | |
|             interp = interpreters.get_current()
 | |
|             print((interp.id, interp.whence))
 | |
|             """)
 | |
|         interpid, whence = eval(text)
 | |
|         self.assertEqual(interpid, expected)
 | |
|         self.assertEqual(whence, WHENCE_STR_CAPI)
 | |
| 
 | |
| 
 | |
| class ListAllTests(TestBase):
 | |
| 
 | |
|     def test_initial(self):
 | |
|         interps = interpreters.list_all()
 | |
|         self.assertEqual(1, len(interps))
 | |
| 
 | |
|     def test_after_creating(self):
 | |
|         main = interpreters.get_current()
 | |
|         first = interpreters.create()
 | |
|         second = interpreters.create()
 | |
| 
 | |
|         ids = []
 | |
|         for interp in interpreters.list_all():
 | |
|             ids.append(interp.id)
 | |
| 
 | |
|         self.assertEqual(ids, [main.id, first.id, second.id])
 | |
| 
 | |
|     def test_after_destroying(self):
 | |
|         main = interpreters.get_current()
 | |
|         first = interpreters.create()
 | |
|         second = interpreters.create()
 | |
|         first.close()
 | |
| 
 | |
|         ids = []
 | |
|         for interp in interpreters.list_all():
 | |
|             ids.append(interp.id)
 | |
| 
 | |
|         self.assertEqual(ids, [main.id, second.id])
 | |
| 
 | |
|     def test_idempotent(self):
 | |
|         main = interpreters.get_current()
 | |
|         first = interpreters.create()
 | |
|         second = interpreters.create()
 | |
|         expected = [main, first, second]
 | |
| 
 | |
|         actual = interpreters.list_all()
 | |
| 
 | |
|         self.assertEqual(actual, expected)
 | |
|         for interp1, interp2 in zip(actual, expected):
 | |
|             self.assertIs(interp1, interp2)
 | |
| 
 | |
|     def test_created_with_capi(self):
 | |
|         mainid, *_ = _interpreters.get_main()
 | |
|         interpid1 = _interpreters.create()
 | |
|         interpid2 = _interpreters.create()
 | |
|         interpid3 = _interpreters.create()
 | |
|         interpid4 = interpid3 + 1
 | |
|         interpid5 = interpid4 + 1
 | |
|         expected = [
 | |
|             (mainid, WHENCE_STR_RUNTIME),
 | |
|             (interpid1, WHENCE_STR_STDLIB),
 | |
|             (interpid2, WHENCE_STR_STDLIB),
 | |
|             (interpid3, WHENCE_STR_STDLIB),
 | |
|             (interpid4, WHENCE_STR_CAPI),
 | |
|             (interpid5, WHENCE_STR_STDLIB),
 | |
|         ]
 | |
|         expected2 = expected[:-2]
 | |
|         text = self.run_temp_from_capi(f"""
 | |
|             import {interpreters.__name__} as interpreters
 | |
|             interp = interpreters.create()
 | |
|             print(
 | |
|                 [(i.id, i.whence) for i in interpreters.list_all()])
 | |
|             """)
 | |
|         res = eval(text)
 | |
|         res2 = [(i.id, i.whence) for i in interpreters.list_all()]
 | |
|         self.assertEqual(res, expected)
 | |
|         self.assertEqual(res2, expected2)
 | |
| 
 | |
| 
 | |
| class InterpreterObjectTests(TestBase):
 | |
| 
 | |
|     def test_init_int(self):
 | |
|         interpid = interpreters.get_current().id
 | |
|         interp = interpreters.Interpreter(interpid)
 | |
|         self.assertEqual(interp.id, interpid)
 | |
| 
 | |
|     def test_init_interpreter_id(self):
 | |
|         interpid = interpreters.get_current()._id
 | |
|         interp = interpreters.Interpreter(interpid)
 | |
|         self.assertEqual(interp._id, interpid)
 | |
| 
 | |
|     def test_init_unsupported(self):
 | |
|         actualid = interpreters.get_current().id
 | |
|         for interpid in [
 | |
|             str(actualid),
 | |
|             float(actualid),
 | |
|             object(),
 | |
|             None,
 | |
|             '',
 | |
|         ]:
 | |
|             with self.subTest(repr(interpid)):
 | |
|                 with self.assertRaises(TypeError):
 | |
|                     interpreters.Interpreter(interpid)
 | |
| 
 | |
|     def test_idempotent(self):
 | |
|         main = interpreters.get_main()
 | |
|         interp = interpreters.Interpreter(main.id)
 | |
|         self.assertIs(interp, main)
 | |
| 
 | |
|     def test_init_does_not_exist(self):
 | |
|         with self.assertRaises(InterpreterNotFoundError):
 | |
|             interpreters.Interpreter(1_000_000)
 | |
| 
 | |
|     def test_init_bad_id(self):
 | |
|         with self.assertRaises(ValueError):
 | |
|             interpreters.Interpreter(-1)
 | |
| 
 | |
|     def test_id_type(self):
 | |
|         main = interpreters.get_main()
 | |
|         current = interpreters.get_current()
 | |
|         interp = interpreters.create()
 | |
|         self.assertIsInstance(main.id, int)
 | |
|         self.assertIsInstance(current.id, int)
 | |
|         self.assertIsInstance(interp.id, int)
 | |
| 
 | |
|     def test_id_readonly(self):
 | |
|         interp = interpreters.create()
 | |
|         with self.assertRaises(AttributeError):
 | |
|             interp.id = 1_000_000
 | |
| 
 | |
|     def test_whence(self):
 | |
|         main = interpreters.get_main()
 | |
|         interp = interpreters.create()
 | |
| 
 | |
|         with self.subTest('main'):
 | |
|             self.assertEqual(main.whence, WHENCE_STR_RUNTIME)
 | |
| 
 | |
|         with self.subTest('from _interpreters'):
 | |
|             self.assertEqual(interp.whence, WHENCE_STR_STDLIB)
 | |
| 
 | |
|         with self.subTest('from C-API'):
 | |
|             text = self.run_temp_from_capi(f"""
 | |
|                 import {interpreters.__name__} as interpreters
 | |
|                 interp = interpreters.get_current()
 | |
|                 print(repr(interp.whence))
 | |
|                 """)
 | |
|             whence = eval(text)
 | |
|             self.assertEqual(whence, WHENCE_STR_CAPI)
 | |
| 
 | |
|         with self.subTest('readonly'):
 | |
|             for value in [
 | |
|                 None,
 | |
|                 WHENCE_STR_UNKNOWN,
 | |
|                 WHENCE_STR_RUNTIME,
 | |
|                 WHENCE_STR_STDLIB,
 | |
|                 WHENCE_STR_CAPI,
 | |
|             ]:
 | |
|                 with self.assertRaises(AttributeError):
 | |
|                     interp.whence = value
 | |
|                 with self.assertRaises(AttributeError):
 | |
|                     main.whence = value
 | |
| 
 | |
|     def test_hashable(self):
 | |
|         interp = interpreters.create()
 | |
|         expected = hash(interp.id)
 | |
|         actual = hash(interp)
 | |
|         self.assertEqual(actual, expected)
 | |
| 
 | |
|     def test_equality(self):
 | |
|         interp1 = interpreters.create()
 | |
|         interp2 = interpreters.create()
 | |
|         self.assertEqual(interp1, interp1)
 | |
|         self.assertNotEqual(interp1, interp2)
 | |
| 
 | |
|     def test_pickle(self):
 | |
|         interp = interpreters.create()
 | |
|         data = pickle.dumps(interp)
 | |
|         unpickled = pickle.loads(data)
 | |
|         self.assertEqual(unpickled, interp)
 | |
| 
 | |
| 
 | |
| class TestInterpreterIsRunning(TestBase):
 | |
| 
 | |
|     def test_main(self):
 | |
|         main = interpreters.get_main()
 | |
|         self.assertTrue(main.is_running())
 | |
| 
 | |
|     # XXX Is this still true?
 | |
|     @unittest.skip('Fails on FreeBSD')
 | |
|     def test_subinterpreter(self):
 | |
|         interp = interpreters.create()
 | |
|         self.assertFalse(interp.is_running())
 | |
| 
 | |
|         with _running(interp):
 | |
|             self.assertTrue(interp.is_running())
 | |
|         self.assertFalse(interp.is_running())
 | |
| 
 | |
|     def test_finished(self):
 | |
|         r, w = self.pipe()
 | |
|         interp = interpreters.create()
 | |
|         interp.exec(f"""if True:
 | |
|             import os
 | |
|             os.write({w}, b'x')
 | |
|             """)
 | |
|         self.assertFalse(interp.is_running())
 | |
|         self.assertEqual(os.read(r, 1), b'x')
 | |
| 
 | |
|     def test_from_subinterpreter(self):
 | |
|         interp = interpreters.create()
 | |
|         out = _run_output(interp, dedent(f"""
 | |
|             import _interpreters
 | |
|             if _interpreters.is_running({interp.id}):
 | |
|                 print(True)
 | |
|             else:
 | |
|                 print(False)
 | |
|             """))
 | |
|         self.assertEqual(out.strip(), 'True')
 | |
| 
 | |
|     def test_already_destroyed(self):
 | |
|         interp = interpreters.create()
 | |
|         interp.close()
 | |
|         with self.assertRaises(InterpreterNotFoundError):
 | |
|             interp.is_running()
 | |
| 
 | |
|     def test_with_only_background_threads(self):
 | |
|         r_interp, w_interp = self.pipe()
 | |
|         r_thread, w_thread = self.pipe()
 | |
| 
 | |
|         DONE = b'D'
 | |
|         FINISHED = b'F'
 | |
| 
 | |
|         interp = interpreters.create()
 | |
|         interp.exec(f"""if True:
 | |
|             import os
 | |
|             import threading
 | |
| 
 | |
|             def task():
 | |
|                 v = os.read({r_thread}, 1)
 | |
|                 assert v == {DONE!r}
 | |
|                 os.write({w_interp}, {FINISHED!r})
 | |
|             t = threading.Thread(target=task)
 | |
|             t.start()
 | |
|             """)
 | |
|         self.assertFalse(interp.is_running())
 | |
| 
 | |
|         os.write(w_thread, DONE)
 | |
|         interp.exec('t.join()')
 | |
|         self.assertEqual(os.read(r_interp, 1), FINISHED)
 | |
| 
 | |
|     def test_created_with_capi(self):
 | |
|         script = dedent(f"""
 | |
|             import {interpreters.__name__} as interpreters
 | |
|             interp = interpreters.get_current()
 | |
|             print(interp.is_running())
 | |
|             """)
 | |
|         def parse_results(text):
 | |
|             self.assertNotEqual(text, "")
 | |
|             try:
 | |
|                 return eval(text)
 | |
|             except Exception:
 | |
|                 raise Exception(repr(text))
 | |
| 
 | |
|         with self.subTest('running __main__ (from self)'):
 | |
|             with self.interpreter_from_capi() as interpid:
 | |
|                 text = self.run_from_capi(interpid, script, main=True)
 | |
|             running = parse_results(text)
 | |
|             self.assertTrue(running)
 | |
| 
 | |
|         with self.subTest('running, but not __main__ (from self)'):
 | |
|             text = self.run_temp_from_capi(script)
 | |
|             running = parse_results(text)
 | |
|             self.assertFalse(running)
 | |
| 
 | |
|         with self.subTest('running __main__ (from other)'):
 | |
|             with self.interpreter_obj_from_capi() as (interp, interpid):
 | |
|                 before = interp.is_running()
 | |
|                 with self.running_from_capi(interpid, main=True):
 | |
|                     during = interp.is_running()
 | |
|                 after = interp.is_running()
 | |
|             self.assertFalse(before)
 | |
|             self.assertTrue(during)
 | |
|             self.assertFalse(after)
 | |
| 
 | |
|         with self.subTest('running, but not __main__ (from other)'):
 | |
|             with self.interpreter_obj_from_capi() as (interp, interpid):
 | |
|                 before = interp.is_running()
 | |
|                 with self.running_from_capi(interpid, main=False):
 | |
|                     during = interp.is_running()
 | |
|                 after = interp.is_running()
 | |
|             self.assertFalse(before)
 | |
|             self.assertFalse(during)
 | |
|             self.assertFalse(after)
 | |
| 
 | |
|         with self.subTest('not running (from other)'):
 | |
|             with self.interpreter_obj_from_capi() as (interp, _):
 | |
|                 running = interp.is_running()
 | |
|             self.assertFalse(running)
 | |
| 
 | |
| 
 | |
| class TestInterpreterClose(TestBase):
 | |
| 
 | |
|     def test_basic(self):
 | |
|         main = interpreters.get_main()
 | |
|         interp1 = interpreters.create()
 | |
|         interp2 = interpreters.create()
 | |
|         interp3 = interpreters.create()
 | |
|         self.assertEqual(set(interpreters.list_all()),
 | |
|                          {main, interp1, interp2, interp3})
 | |
|         interp2.close()
 | |
|         self.assertEqual(set(interpreters.list_all()),
 | |
|                          {main, interp1, interp3})
 | |
| 
 | |
|     def test_all(self):
 | |
|         before = set(interpreters.list_all())
 | |
|         interps = set()
 | |
|         for _ in range(3):
 | |
|             interp = interpreters.create()
 | |
|             interps.add(interp)
 | |
|         self.assertEqual(set(interpreters.list_all()), before | interps)
 | |
|         for interp in interps:
 | |
|             interp.close()
 | |
|         self.assertEqual(set(interpreters.list_all()), before)
 | |
| 
 | |
|     def test_main(self):
 | |
|         main, = interpreters.list_all()
 | |
|         with self.assertRaises(InterpreterError):
 | |
|             main.close()
 | |
| 
 | |
|         def f():
 | |
|             with self.assertRaises(InterpreterError):
 | |
|                 main.close()
 | |
| 
 | |
|         t = threading.Thread(target=f)
 | |
|         t.start()
 | |
|         t.join()
 | |
| 
 | |
|     def test_already_destroyed(self):
 | |
|         interp = interpreters.create()
 | |
|         interp.close()
 | |
|         with self.assertRaises(InterpreterNotFoundError):
 | |
|             interp.close()
 | |
| 
 | |
|     def test_from_current(self):
 | |
|         main, = interpreters.list_all()
 | |
|         interp = interpreters.create()
 | |
|         out = _run_output(interp, dedent(f"""
 | |
|             from test.support import interpreters
 | |
|             interp = interpreters.Interpreter({interp.id})
 | |
|             try:
 | |
|                 interp.close()
 | |
|             except interpreters.InterpreterError:
 | |
|                 print('failed')
 | |
|             """))
 | |
|         self.assertEqual(out.strip(), 'failed')
 | |
|         self.assertEqual(set(interpreters.list_all()), {main, interp})
 | |
| 
 | |
|     def test_from_sibling(self):
 | |
|         main, = interpreters.list_all()
 | |
|         interp1 = interpreters.create()
 | |
|         interp2 = interpreters.create()
 | |
|         self.assertEqual(set(interpreters.list_all()),
 | |
|                          {main, interp1, interp2})
 | |
|         interp1.exec(dedent(f"""
 | |
|             from test.support import interpreters
 | |
|             interp2 = interpreters.Interpreter({interp2.id})
 | |
|             interp2.close()
 | |
|             interp3 = interpreters.create()
 | |
|             interp3.close()
 | |
|             """))
 | |
|         self.assertEqual(set(interpreters.list_all()), {main, interp1})
 | |
| 
 | |
|     def test_from_other_thread(self):
 | |
|         interp = interpreters.create()
 | |
|         def f():
 | |
|             interp.close()
 | |
| 
 | |
|         t = threading.Thread(target=f)
 | |
|         t.start()
 | |
|         t.join()
 | |
| 
 | |
|     # XXX Is this still true?
 | |
|     @unittest.skip('Fails on FreeBSD')
 | |
|     def test_still_running(self):
 | |
|         main, = interpreters.list_all()
 | |
|         interp = interpreters.create()
 | |
|         with _running(interp):
 | |
|             with self.assertRaises(InterpreterError):
 | |
|                 interp.close()
 | |
|             self.assertTrue(interp.is_running())
 | |
| 
 | |
|     def test_subthreads_still_running(self):
 | |
|         r_interp, w_interp = self.pipe()
 | |
|         r_thread, w_thread = self.pipe()
 | |
| 
 | |
|         FINISHED = b'F'
 | |
| 
 | |
|         interp = interpreters.create()
 | |
|         interp.exec(f"""if True:
 | |
|             import os
 | |
|             import threading
 | |
|             import time
 | |
| 
 | |
|             done = False
 | |
| 
 | |
|             def notify_fini():
 | |
|                 global done
 | |
|                 done = True
 | |
|                 t.join()
 | |
|             threading._register_atexit(notify_fini)
 | |
| 
 | |
|             def task():
 | |
|                 while not done:
 | |
|                     time.sleep(0.1)
 | |
|                 os.write({w_interp}, {FINISHED!r})
 | |
|             t = threading.Thread(target=task)
 | |
|             t.start()
 | |
|             """)
 | |
|         interp.close()
 | |
| 
 | |
|         self.assertEqual(os.read(r_interp, 1), FINISHED)
 | |
| 
 | |
|     def test_created_with_capi(self):
 | |
|         script = dedent(f"""
 | |
|             import {interpreters.__name__} as interpreters
 | |
|             interp = interpreters.get_current()
 | |
|             interp.close()
 | |
|             """)
 | |
| 
 | |
|         with self.subTest('running __main__ (from self)'):
 | |
|             with self.interpreter_from_capi() as interpid:
 | |
|                 with self.assertRaisesRegex(ExecutionFailed,
 | |
|                                             'InterpreterError.*unrecognized'):
 | |
|                     self.run_from_capi(interpid, script, main=True)
 | |
| 
 | |
|         with self.subTest('running, but not __main__ (from self)'):
 | |
|             with self.assertRaisesRegex(ExecutionFailed,
 | |
|                                         'InterpreterError.*unrecognized'):
 | |
|                 self.run_temp_from_capi(script)
 | |
| 
 | |
|         with self.subTest('running __main__ (from other)'):
 | |
|             with self.interpreter_obj_from_capi() as (interp, interpid):
 | |
|                 with self.running_from_capi(interpid, main=True):
 | |
|                     with self.assertRaisesRegex(InterpreterError, 'unrecognized'):
 | |
|                         interp.close()
 | |
|                     # Make sure it wssn't closed.
 | |
|                     self.assertTrue(
 | |
|                         self.interp_exists(interpid))
 | |
| 
 | |
|         # The rest would be skipped until we deal with running threads when
 | |
|         # interp.close() is called.  However, the "whence" restrictions
 | |
|         # trigger first.
 | |
| 
 | |
|         with self.subTest('running, but not __main__ (from other)'):
 | |
|             with self.interpreter_obj_from_capi() as (interp, interpid):
 | |
|                 with self.running_from_capi(interpid, main=False):
 | |
|                     with self.assertRaisesRegex(InterpreterError, 'unrecognized'):
 | |
|                         interp.close()
 | |
|                     # Make sure it wssn't closed.
 | |
|                     self.assertTrue(
 | |
|                         self.interp_exists(interpid))
 | |
| 
 | |
|         with self.subTest('not running (from other)'):
 | |
|             with self.interpreter_obj_from_capi() as (interp, interpid):
 | |
|                 with self.assertRaisesRegex(InterpreterError, 'unrecognized'):
 | |
|                     interp.close()
 | |
|                 self.assertTrue(
 | |
|                     self.interp_exists(interpid))
 | |
| 
 | |
| 
 | |
| class TestInterpreterPrepareMain(TestBase):
 | |
| 
 | |
|     def test_empty(self):
 | |
|         interp = interpreters.create()
 | |
|         with self.assertRaises(ValueError):
 | |
|             interp.prepare_main()
 | |
| 
 | |
|     def test_dict(self):
 | |
|         values = {'spam': 42, 'eggs': 'ham'}
 | |
|         interp = interpreters.create()
 | |
|         interp.prepare_main(values)
 | |
|         out = _run_output(interp, dedent("""
 | |
|             print(spam, eggs)
 | |
|             """))
 | |
|         self.assertEqual(out.strip(), '42 ham')
 | |
| 
 | |
|     def test_tuple(self):
 | |
|         values = {'spam': 42, 'eggs': 'ham'}
 | |
|         values = tuple(values.items())
 | |
|         interp = interpreters.create()
 | |
|         interp.prepare_main(values)
 | |
|         out = _run_output(interp, dedent("""
 | |
|             print(spam, eggs)
 | |
|             """))
 | |
|         self.assertEqual(out.strip(), '42 ham')
 | |
| 
 | |
|     def test_kwargs(self):
 | |
|         values = {'spam': 42, 'eggs': 'ham'}
 | |
|         interp = interpreters.create()
 | |
|         interp.prepare_main(**values)
 | |
|         out = _run_output(interp, dedent("""
 | |
|             print(spam, eggs)
 | |
|             """))
 | |
|         self.assertEqual(out.strip(), '42 ham')
 | |
| 
 | |
|     def test_dict_and_kwargs(self):
 | |
|         values = {'spam': 42, 'eggs': 'ham'}
 | |
|         interp = interpreters.create()
 | |
|         interp.prepare_main(values, foo='bar')
 | |
|         out = _run_output(interp, dedent("""
 | |
|             print(spam, eggs, foo)
 | |
|             """))
 | |
|         self.assertEqual(out.strip(), '42 ham bar')
 | |
| 
 | |
|     def test_not_shareable(self):
 | |
|         interp = interpreters.create()
 | |
|         # XXX TypeError?
 | |
|         with self.assertRaises(ValueError):
 | |
|             interp.prepare_main(spam={'spam': 'eggs', 'foo': 'bar'})
 | |
| 
 | |
|         # Make sure neither was actually bound.
 | |
|         with self.assertRaises(ExecutionFailed):
 | |
|             interp.exec('print(foo)')
 | |
|         with self.assertRaises(ExecutionFailed):
 | |
|             interp.exec('print(spam)')
 | |
| 
 | |
|     def test_running(self):
 | |
|         interp = interpreters.create()
 | |
|         interp.prepare_main({'spam': True})
 | |
|         with self.running(interp):
 | |
|             with self.assertRaisesRegex(InterpreterError, 'running'):
 | |
|                 interp.prepare_main({'spam': False})
 | |
|         interp.exec('assert spam is True')
 | |
| 
 | |
|     @requires_test_modules
 | |
|     def test_created_with_capi(self):
 | |
|         with self.interpreter_obj_from_capi() as (interp, interpid):
 | |
|             with self.assertRaisesRegex(InterpreterError, 'unrecognized'):
 | |
|                 interp.prepare_main({'spam': True})
 | |
|             with self.assertRaisesRegex(ExecutionFailed, 'NameError'):
 | |
|                 self.run_from_capi(interpid, 'assert spam is True')
 | |
| 
 | |
| 
 | |
| class TestInterpreterExec(TestBase):
 | |
| 
 | |
|     def test_success(self):
 | |
|         interp = interpreters.create()
 | |
|         script, results = _captured_script('print("it worked!", end="")')
 | |
|         with results:
 | |
|             interp.exec(script)
 | |
|         results = results.final()
 | |
|         results.raise_if_failed()
 | |
|         out = results.stdout
 | |
| 
 | |
|         self.assertEqual(out, 'it worked!')
 | |
| 
 | |
|     def test_failure(self):
 | |
|         interp = interpreters.create()
 | |
|         with self.assertRaises(ExecutionFailed):
 | |
|             interp.exec('raise Exception')
 | |
| 
 | |
|     @force_not_colorized
 | |
|     def test_display_preserved_exception(self):
 | |
|         tempdir = self.temp_dir()
 | |
|         modfile = self.make_module('spam', tempdir, text="""
 | |
|             def ham():
 | |
|                 raise RuntimeError('uh-oh!')
 | |
| 
 | |
|             def eggs():
 | |
|                 ham()
 | |
|             """)
 | |
|         scriptfile = self.make_script('script.py', tempdir, text="""
 | |
|             from test.support import interpreters
 | |
| 
 | |
|             def script():
 | |
|                 import spam
 | |
|                 spam.eggs()
 | |
| 
 | |
|             interp = interpreters.create()
 | |
|             interp.exec(script)
 | |
|             """)
 | |
| 
 | |
|         stdout, stderr = self.assert_python_failure(scriptfile)
 | |
|         self.maxDiff = None
 | |
|         interpmod_line, = (l for l in stderr.splitlines() if ' exec' in l)
 | |
|         #      File "{interpreters.__file__}", line 179, in exec
 | |
|         self.assertEqual(stderr, dedent(f"""\
 | |
|             Traceback (most recent call last):
 | |
|               File "{scriptfile}", line 9, in <module>
 | |
|                 interp.exec(script)
 | |
|                 ~~~~~~~~~~~^^^^^^^^
 | |
|               {interpmod_line.strip()}
 | |
|                 raise ExecutionFailed(excinfo)
 | |
|             test.support.interpreters.ExecutionFailed: RuntimeError: uh-oh!
 | |
| 
 | |
|             Uncaught in the interpreter:
 | |
| 
 | |
|             Traceback (most recent call last):
 | |
|               File "{scriptfile}", line 6, in script
 | |
|                 spam.eggs()
 | |
|                 ~~~~~~~~~^^
 | |
|               File "{modfile}", line 6, in eggs
 | |
|                 ham()
 | |
|                 ~~~^^
 | |
|               File "{modfile}", line 3, in ham
 | |
|                 raise RuntimeError('uh-oh!')
 | |
|             RuntimeError: uh-oh!
 | |
|             """))
 | |
|         self.assertEqual(stdout, '')
 | |
| 
 | |
|     def test_in_thread(self):
 | |
|         interp = interpreters.create()
 | |
|         script, results = _captured_script('print("it worked!", end="")')
 | |
|         with results:
 | |
|             def f():
 | |
|                 interp.exec(script)
 | |
| 
 | |
|             t = threading.Thread(target=f)
 | |
|             t.start()
 | |
|             t.join()
 | |
|         results = results.final()
 | |
|         results.raise_if_failed()
 | |
|         out = results.stdout
 | |
| 
 | |
|         self.assertEqual(out, 'it worked!')
 | |
| 
 | |
|     @support.requires_fork()
 | |
|     def test_fork(self):
 | |
|         interp = interpreters.create()
 | |
|         import tempfile
 | |
|         with tempfile.NamedTemporaryFile('w+', encoding='utf-8') as file:
 | |
|             file.write('')
 | |
|             file.flush()
 | |
| 
 | |
|             expected = 'spam spam spam spam spam'
 | |
|             script = dedent(f"""
 | |
|                 import os
 | |
|                 try:
 | |
|                     os.fork()
 | |
|                 except RuntimeError:
 | |
|                     with open('{file.name}', 'w', encoding='utf-8') as out:
 | |
|                         out.write('{expected}')
 | |
|                 """)
 | |
|             interp.exec(script)
 | |
| 
 | |
|             file.seek(0)
 | |
|             content = file.read()
 | |
|             self.assertEqual(content, expected)
 | |
| 
 | |
|     # XXX Is this still true?
 | |
|     @unittest.skip('Fails on FreeBSD')
 | |
|     def test_already_running(self):
 | |
|         interp = interpreters.create()
 | |
|         with _running(interp):
 | |
|             with self.assertRaises(RuntimeError):
 | |
|                 interp.exec('print("spam")')
 | |
| 
 | |
|     def test_bad_script(self):
 | |
|         interp = interpreters.create()
 | |
|         with self.assertRaises(TypeError):
 | |
|             interp.exec(10)
 | |
| 
 | |
|     def test_bytes_for_script(self):
 | |
|         interp = interpreters.create()
 | |
|         with self.assertRaises(TypeError):
 | |
|             interp.exec(b'print("spam")')
 | |
| 
 | |
|     def test_with_background_threads_still_running(self):
 | |
|         r_interp, w_interp = self.pipe()
 | |
|         r_thread, w_thread = self.pipe()
 | |
| 
 | |
|         RAN = b'R'
 | |
|         DONE = b'D'
 | |
|         FINISHED = b'F'
 | |
| 
 | |
|         interp = interpreters.create()
 | |
|         interp.exec(f"""if True:
 | |
|             import os
 | |
|             import threading
 | |
| 
 | |
|             def task():
 | |
|                 v = os.read({r_thread}, 1)
 | |
|                 assert v == {DONE!r}
 | |
|                 os.write({w_interp}, {FINISHED!r})
 | |
|             t = threading.Thread(target=task)
 | |
|             t.start()
 | |
|             os.write({w_interp}, {RAN!r})
 | |
|             """)
 | |
|         interp.exec(f"""if True:
 | |
|             os.write({w_interp}, {RAN!r})
 | |
|             """)
 | |
| 
 | |
|         os.write(w_thread, DONE)
 | |
|         interp.exec('t.join()')
 | |
|         self.assertEqual(os.read(r_interp, 1), RAN)
 | |
|         self.assertEqual(os.read(r_interp, 1), RAN)
 | |
|         self.assertEqual(os.read(r_interp, 1), FINISHED)
 | |
| 
 | |
|     def test_created_with_capi(self):
 | |
|         with self.interpreter_obj_from_capi() as (interp, _):
 | |
|             with self.assertRaisesRegex(InterpreterError, 'unrecognized'):
 | |
|                 interp.exec('raise Exception("it worked!")')
 | |
| 
 | |
|     # test__interpreters covers the remaining
 | |
|     # Interpreter.exec() behavior.
 | |
| 
 | |
| 
 | |
| def call_func_noop():
 | |
|     pass
 | |
| 
 | |
| 
 | |
| def call_func_return_shareable():
 | |
|     return (1, None)
 | |
| 
 | |
| 
 | |
| def call_func_return_not_shareable():
 | |
|     return [1, 2, 3]
 | |
| 
 | |
| 
 | |
| def call_func_failure():
 | |
|     raise Exception('spam!')
 | |
| 
 | |
| 
 | |
| def call_func_ident(value):
 | |
|     return value
 | |
| 
 | |
| 
 | |
| def get_call_func_closure(value):
 | |
|     def call_func_closure():
 | |
|         return value
 | |
|     return call_func_closure
 | |
| 
 | |
| 
 | |
| class Spam:
 | |
| 
 | |
|     @staticmethod
 | |
|     def noop():
 | |
|         pass
 | |
| 
 | |
|     @classmethod
 | |
|     def from_values(cls, *values):
 | |
|         return cls(values)
 | |
| 
 | |
|     def __init__(self, value):
 | |
|         self.value = value
 | |
| 
 | |
|     def __call__(self, *args, **kwargs):
 | |
|         return (self.value, args, kwargs)
 | |
| 
 | |
|     def __eq__(self, other):
 | |
|         if not isinstance(other, Spam):
 | |
|             return NotImplemented
 | |
|         return self.value == other.value
 | |
| 
 | |
|     def run(self, *args, **kwargs):
 | |
|         return (self.value, args, kwargs)
 | |
| 
 | |
| 
 | |
| def call_func_complex(op, /, value=None, *args, exc=None, **kwargs):
 | |
|     if exc is not None:
 | |
|         raise exc
 | |
|     if op == '':
 | |
|         raise ValueError('missing op')
 | |
|     elif op == 'ident':
 | |
|         if args or kwargs:
 | |
|             raise Exception((args, kwargs))
 | |
|         return value
 | |
|     elif op == 'full-ident':
 | |
|         return (value, args, kwargs)
 | |
|     elif op == 'globals':
 | |
|         if value is not None or args or kwargs:
 | |
|             raise Exception((value, args, kwargs))
 | |
|         return __name__
 | |
|     elif op == 'interpid':
 | |
|         if value is not None or args or kwargs:
 | |
|             raise Exception((value, args, kwargs))
 | |
|         return interpreters.get_current().id
 | |
|     elif op == 'closure':
 | |
|         if args or kwargs:
 | |
|             raise Exception((args, kwargs))
 | |
|         return get_call_func_closure(value)
 | |
|     elif op == 'custom':
 | |
|         if args or kwargs:
 | |
|             raise Exception((args, kwargs))
 | |
|         return Spam(value)
 | |
|     elif op == 'custom-inner':
 | |
|         if args or kwargs:
 | |
|             raise Exception((args, kwargs))
 | |
|         class Eggs(Spam):
 | |
|             pass
 | |
|         return Eggs(value)
 | |
|     elif not isinstance(op, str):
 | |
|         raise TypeError(op)
 | |
|     else:
 | |
|         raise NotImplementedError(op)
 | |
| 
 | |
| 
 | |
| class TestInterpreterCall(TestBase):
 | |
| 
 | |
|     # signature
 | |
|     #  - blank
 | |
|     #  - args
 | |
|     #  - kwargs
 | |
|     #  - args, kwargs
 | |
|     # return
 | |
|     #  - nothing (None)
 | |
|     #  - simple
 | |
|     #  - closure
 | |
|     #  - custom
 | |
|     # ops:
 | |
|     #  - do nothing
 | |
|     #  - fail
 | |
|     #  - echo
 | |
|     #  - do complex, relative to interpreter
 | |
|     # scope
 | |
|     #  - global func
 | |
|     #  - local closure
 | |
|     #  - returned closure
 | |
|     #  - callable type instance
 | |
|     #  - type
 | |
|     #  - classmethod
 | |
|     #  - staticmethod
 | |
|     #  - instance method
 | |
|     # exception
 | |
|     #  - builtin
 | |
|     #  - custom
 | |
|     #  - preserves info (e.g. SyntaxError)
 | |
|     #  - matching error display
 | |
| 
 | |
|     def test_call(self):
 | |
|         interp = interpreters.create()
 | |
| 
 | |
|         for i, (callable, args, kwargs) in enumerate([
 | |
|             (call_func_noop, (), {}),
 | |
|             (call_func_return_shareable, (), {}),
 | |
|             (call_func_return_not_shareable, (), {}),
 | |
|             (Spam.noop, (), {}),
 | |
|         ]):
 | |
|             with self.subTest(f'success case #{i+1}'):
 | |
|                 res = interp.call(callable)
 | |
|                 self.assertIs(res, None)
 | |
| 
 | |
|         for i, (callable, args, kwargs) in enumerate([
 | |
|             (call_func_ident, ('spamspamspam',), {}),
 | |
|             (get_call_func_closure, (42,), {}),
 | |
|             (get_call_func_closure(42), (), {}),
 | |
|             (Spam.from_values, (), {}),
 | |
|             (Spam.from_values, (1, 2, 3), {}),
 | |
|             (Spam, ('???'), {}),
 | |
|             (Spam(101), (), {}),
 | |
|             (Spam(10101).run, (), {}),
 | |
|             (call_func_complex, ('ident', 'spam'), {}),
 | |
|             (call_func_complex, ('full-ident', 'spam'), {}),
 | |
|             (call_func_complex, ('full-ident', 'spam', 'ham'), {'eggs': '!!!'}),
 | |
|             (call_func_complex, ('globals',), {}),
 | |
|             (call_func_complex, ('interpid',), {}),
 | |
|             (call_func_complex, ('closure',), {'value': '~~~'}),
 | |
|             (call_func_complex, ('custom', 'spam!'), {}),
 | |
|             (call_func_complex, ('custom-inner', 'eggs!'), {}),
 | |
|             (call_func_complex, ('???',), {'exc': ValueError('spam')}),
 | |
|         ]):
 | |
|             with self.subTest(f'invalid case #{i+1}'):
 | |
|                 with self.assertRaises(Exception):
 | |
|                     if args or kwargs:
 | |
|                         raise Exception((args, kwargs))
 | |
|                     interp.call(callable)
 | |
| 
 | |
|         with self.assertRaises(ExecutionFailed):
 | |
|             interp.call(call_func_failure)
 | |
| 
 | |
|     def test_call_in_thread(self):
 | |
|         interp = interpreters.create()
 | |
| 
 | |
|         for i, (callable, args, kwargs) in enumerate([
 | |
|             (call_func_noop, (), {}),
 | |
|             (call_func_return_shareable, (), {}),
 | |
|             (call_func_return_not_shareable, (), {}),
 | |
|             (Spam.noop, (), {}),
 | |
|         ]):
 | |
|             with self.subTest(f'success case #{i+1}'):
 | |
|                 with self.captured_thread_exception() as ctx:
 | |
|                     t = interp.call_in_thread(callable)
 | |
|                     t.join()
 | |
|                 self.assertIsNone(ctx.caught)
 | |
| 
 | |
|         for i, (callable, args, kwargs) in enumerate([
 | |
|             (call_func_ident, ('spamspamspam',), {}),
 | |
|             (get_call_func_closure, (42,), {}),
 | |
|             (get_call_func_closure(42), (), {}),
 | |
|             (Spam.from_values, (), {}),
 | |
|             (Spam.from_values, (1, 2, 3), {}),
 | |
|             (Spam, ('???'), {}),
 | |
|             (Spam(101), (), {}),
 | |
|             (Spam(10101).run, (), {}),
 | |
|             (call_func_complex, ('ident', 'spam'), {}),
 | |
|             (call_func_complex, ('full-ident', 'spam'), {}),
 | |
|             (call_func_complex, ('full-ident', 'spam', 'ham'), {'eggs': '!!!'}),
 | |
|             (call_func_complex, ('globals',), {}),
 | |
|             (call_func_complex, ('interpid',), {}),
 | |
|             (call_func_complex, ('closure',), {'value': '~~~'}),
 | |
|             (call_func_complex, ('custom', 'spam!'), {}),
 | |
|             (call_func_complex, ('custom-inner', 'eggs!'), {}),
 | |
|             (call_func_complex, ('???',), {'exc': ValueError('spam')}),
 | |
|         ]):
 | |
|             with self.subTest(f'invalid case #{i+1}'):
 | |
|                 if args or kwargs:
 | |
|                     continue
 | |
|                 with self.captured_thread_exception() as ctx:
 | |
|                     t = interp.call_in_thread(callable)
 | |
|                     t.join()
 | |
|                 self.assertIsNotNone(ctx.caught)
 | |
| 
 | |
|         with self.captured_thread_exception() as ctx:
 | |
|             t = interp.call_in_thread(call_func_failure)
 | |
|             t.join()
 | |
|         self.assertIsNotNone(ctx.caught)
 | |
| 
 | |
| 
 | |
| class TestIsShareable(TestBase):
 | |
| 
 | |
|     def test_default_shareables(self):
 | |
|         shareables = [
 | |
|                 # singletons
 | |
|                 None,
 | |
|                 # builtin objects
 | |
|                 b'spam',
 | |
|                 'spam',
 | |
|                 10,
 | |
|                 -10,
 | |
|                 True,
 | |
|                 False,
 | |
|                 100.0,
 | |
|                 (),
 | |
|                 (1, ('spam', 'eggs'), True),
 | |
|                 ]
 | |
|         for obj in shareables:
 | |
|             with self.subTest(obj):
 | |
|                 shareable = interpreters.is_shareable(obj)
 | |
|                 self.assertTrue(shareable)
 | |
| 
 | |
|     def test_not_shareable(self):
 | |
|         class Cheese:
 | |
|             def __init__(self, name):
 | |
|                 self.name = name
 | |
|             def __str__(self):
 | |
|                 return self.name
 | |
| 
 | |
|         class SubBytes(bytes):
 | |
|             """A subclass of a shareable type."""
 | |
| 
 | |
|         not_shareables = [
 | |
|                 # singletons
 | |
|                 NotImplemented,
 | |
|                 ...,
 | |
|                 # builtin types and objects
 | |
|                 type,
 | |
|                 object,
 | |
|                 object(),
 | |
|                 Exception(),
 | |
|                 # user-defined types and objects
 | |
|                 Cheese,
 | |
|                 Cheese('Wensleydale'),
 | |
|                 SubBytes(b'spam'),
 | |
|                 ]
 | |
|         for obj in not_shareables:
 | |
|             with self.subTest(repr(obj)):
 | |
|                 self.assertFalse(
 | |
|                     interpreters.is_shareable(obj))
 | |
| 
 | |
| 
 | |
| class LowLevelTests(TestBase):
 | |
| 
 | |
|     # The behaviors in the low-level module are important in as much
 | |
|     # as they are exercised by the high-level module.  Therefore the
 | |
|     # most important testing happens in the high-level tests.
 | |
|     # These low-level tests cover corner cases that are not
 | |
|     # encountered by the high-level module, thus they
 | |
|     # mostly shouldn't matter as much.
 | |
| 
 | |
|     def test_new_config(self):
 | |
|         # This test overlaps with
 | |
|         # test.test_capi.test_misc.InterpreterConfigTests.
 | |
| 
 | |
|         default = _interpreters.new_config('isolated')
 | |
|         with self.subTest('no arg'):
 | |
|             config = _interpreters.new_config()
 | |
|             self.assert_ns_equal(config, default)
 | |
|             self.assertIsNot(config, default)
 | |
| 
 | |
|         with self.subTest('default'):
 | |
|             config1 = _interpreters.new_config('default')
 | |
|             self.assert_ns_equal(config1, default)
 | |
|             self.assertIsNot(config1, default)
 | |
| 
 | |
|             config2 = _interpreters.new_config('default')
 | |
|             self.assert_ns_equal(config2, config1)
 | |
|             self.assertIsNot(config2, config1)
 | |
| 
 | |
|         for arg in ['', 'default']:
 | |
|             with self.subTest(f'default ({arg!r})'):
 | |
|                 config = _interpreters.new_config(arg)
 | |
|                 self.assert_ns_equal(config, default)
 | |
|                 self.assertIsNot(config, default)
 | |
| 
 | |
|         supported = {
 | |
|             'isolated': types.SimpleNamespace(
 | |
|                 use_main_obmalloc=False,
 | |
|                 allow_fork=False,
 | |
|                 allow_exec=False,
 | |
|                 allow_threads=True,
 | |
|                 allow_daemon_threads=False,
 | |
|                 check_multi_interp_extensions=True,
 | |
|                 gil='own',
 | |
|             ),
 | |
|             'legacy': types.SimpleNamespace(
 | |
|                 use_main_obmalloc=True,
 | |
|                 allow_fork=True,
 | |
|                 allow_exec=True,
 | |
|                 allow_threads=True,
 | |
|                 allow_daemon_threads=True,
 | |
|                 check_multi_interp_extensions=bool(Py_GIL_DISABLED),
 | |
|                 gil='shared',
 | |
|             ),
 | |
|             'empty': types.SimpleNamespace(
 | |
|                 use_main_obmalloc=False,
 | |
|                 allow_fork=False,
 | |
|                 allow_exec=False,
 | |
|                 allow_threads=False,
 | |
|                 allow_daemon_threads=False,
 | |
|                 check_multi_interp_extensions=False,
 | |
|                 gil='default',
 | |
|             ),
 | |
|         }
 | |
|         gil_supported = ['default', 'shared', 'own']
 | |
| 
 | |
|         for name, vanilla in supported.items():
 | |
|             with self.subTest(f'supported ({name})'):
 | |
|                 expected = vanilla
 | |
|                 config1 = _interpreters.new_config(name)
 | |
|                 self.assert_ns_equal(config1, expected)
 | |
|                 self.assertIsNot(config1, expected)
 | |
| 
 | |
|                 config2 = _interpreters.new_config(name)
 | |
|                 self.assert_ns_equal(config2, config1)
 | |
|                 self.assertIsNot(config2, config1)
 | |
| 
 | |
|             with self.subTest(f'noop override ({name})'):
 | |
|                 expected = vanilla
 | |
|                 overrides = vars(vanilla)
 | |
|                 config = _interpreters.new_config(name, **overrides)
 | |
|                 self.assert_ns_equal(config, expected)
 | |
| 
 | |
|             with self.subTest(f'override all ({name})'):
 | |
|                 overrides = {k: not v for k, v in vars(vanilla).items()}
 | |
|                 for gil in gil_supported:
 | |
|                     if vanilla.gil == gil:
 | |
|                         continue
 | |
|                     overrides['gil'] = gil
 | |
|                     expected = types.SimpleNamespace(**overrides)
 | |
|                     config = _interpreters.new_config(name, **overrides)
 | |
|                     self.assert_ns_equal(config, expected)
 | |
| 
 | |
|             # Override individual fields.
 | |
|             for field, old in vars(vanilla).items():
 | |
|                 if field == 'gil':
 | |
|                     values = [v for v in gil_supported if v != old]
 | |
|                 else:
 | |
|                     values = [not old]
 | |
|                 for val in values:
 | |
|                     with self.subTest(f'{name}.{field} ({old!r} -> {val!r})'):
 | |
|                         overrides = {field: val}
 | |
|                         expected = types.SimpleNamespace(
 | |
|                             **dict(vars(vanilla), **overrides),
 | |
|                         )
 | |
|                         config = _interpreters.new_config(name, **overrides)
 | |
|                         self.assert_ns_equal(config, expected)
 | |
| 
 | |
|         with self.subTest('extra override'):
 | |
|             with self.assertRaises(ValueError):
 | |
|                 _interpreters.new_config(spam=True)
 | |
| 
 | |
|         # Bad values for bool fields.
 | |
|         for field, value in vars(supported['empty']).items():
 | |
|             if field == 'gil':
 | |
|                 continue
 | |
|             assert isinstance(value, bool)
 | |
|             for value in [1, '', 'spam', 1.0, None, object()]:
 | |
|                 with self.subTest(f'bad override ({field}={value!r})'):
 | |
|                     with self.assertRaises(TypeError):
 | |
|                         _interpreters.new_config(**{field: value})
 | |
| 
 | |
|         # Bad values for .gil.
 | |
|         for value in [True, 1, 1.0, None, object()]:
 | |
|             with self.subTest(f'bad override (gil={value!r})'):
 | |
|                 with self.assertRaises(TypeError):
 | |
|                     _interpreters.new_config(gil=value)
 | |
|         for value in ['', 'spam']:
 | |
|             with self.subTest(f'bad override (gil={value!r})'):
 | |
|                 with self.assertRaises(ValueError):
 | |
|                     _interpreters.new_config(gil=value)
 | |
| 
 | |
|     def test_get_main(self):
 | |
|         interpid, whence = _interpreters.get_main()
 | |
|         self.assertEqual(interpid, 0)
 | |
|         self.assertEqual(whence, _interpreters.WHENCE_RUNTIME)
 | |
|         self.assertEqual(
 | |
|             _interpreters.whence(interpid),
 | |
|             _interpreters.WHENCE_RUNTIME)
 | |
| 
 | |
|     def test_get_current(self):
 | |
|         with self.subTest('main'):
 | |
|             main, *_ = _interpreters.get_main()
 | |
|             interpid, whence = _interpreters.get_current()
 | |
|             self.assertEqual(interpid, main)
 | |
|             self.assertEqual(whence, _interpreters.WHENCE_RUNTIME)
 | |
| 
 | |
|         script = f"""
 | |
|             import _interpreters
 | |
|             interpid, whence = _interpreters.get_current()
 | |
|             print((interpid, whence))
 | |
|             """
 | |
|         def parse_stdout(text):
 | |
|             interpid, whence = eval(text)
 | |
|             return interpid, whence
 | |
| 
 | |
|         with self.subTest('from _interpreters'):
 | |
|             orig = _interpreters.create()
 | |
|             text = self.run_and_capture(orig, script)
 | |
|             interpid, whence = parse_stdout(text)
 | |
|             self.assertEqual(interpid, orig)
 | |
|             self.assertEqual(whence, _interpreters.WHENCE_STDLIB)
 | |
| 
 | |
|         with self.subTest('from C-API'):
 | |
|             last = 0
 | |
|             for id, *_ in _interpreters.list_all():
 | |
|                 last = max(last, id)
 | |
|             expected = last + 1
 | |
|             text = self.run_temp_from_capi(script)
 | |
|             interpid, whence = parse_stdout(text)
 | |
|             self.assertEqual(interpid, expected)
 | |
|             self.assertEqual(whence, _interpreters.WHENCE_CAPI)
 | |
| 
 | |
|     def test_list_all(self):
 | |
|         mainid, *_ = _interpreters.get_main()
 | |
|         interpid1 = _interpreters.create()
 | |
|         interpid2 = _interpreters.create()
 | |
|         interpid3 = _interpreters.create()
 | |
|         expected = [
 | |
|             (mainid, _interpreters.WHENCE_RUNTIME),
 | |
|             (interpid1, _interpreters.WHENCE_STDLIB),
 | |
|             (interpid2, _interpreters.WHENCE_STDLIB),
 | |
|             (interpid3, _interpreters.WHENCE_STDLIB),
 | |
|         ]
 | |
| 
 | |
|         with self.subTest('main'):
 | |
|             res = _interpreters.list_all()
 | |
|             self.assertEqual(res, expected)
 | |
| 
 | |
|         with self.subTest('via interp from _interpreters'):
 | |
|             text = self.run_and_capture(interpid2, f"""
 | |
|                 import _interpreters
 | |
|                 print(
 | |
|                     _interpreters.list_all())
 | |
|                 """)
 | |
| 
 | |
|             res = eval(text)
 | |
|             self.assertEqual(res, expected)
 | |
| 
 | |
|         with self.subTest('via interp from C-API'):
 | |
|             interpid4 = interpid3 + 1
 | |
|             interpid5 = interpid4 + 1
 | |
|             expected2 = expected + [
 | |
|                 (interpid4, _interpreters.WHENCE_CAPI),
 | |
|                 (interpid5, _interpreters.WHENCE_STDLIB),
 | |
|             ]
 | |
|             expected3 = expected + [
 | |
|                 (interpid5, _interpreters.WHENCE_STDLIB),
 | |
|             ]
 | |
|             text = self.run_temp_from_capi(f"""
 | |
|                 import _interpreters
 | |
|                 _interpreters.create()
 | |
|                 print(
 | |
|                     _interpreters.list_all())
 | |
|                 """)
 | |
|             res2 = eval(text)
 | |
|             res3 = _interpreters.list_all()
 | |
|             self.assertEqual(res2, expected2)
 | |
|             self.assertEqual(res3, expected3)
 | |
| 
 | |
|     def test_create(self):
 | |
|         isolated = _interpreters.new_config('isolated')
 | |
|         legacy = _interpreters.new_config('legacy')
 | |
|         default = isolated
 | |
| 
 | |
|         with self.subTest('no args'):
 | |
|             interpid = _interpreters.create()
 | |
|             config = _interpreters.get_config(interpid)
 | |
|             self.assert_ns_equal(config, default)
 | |
| 
 | |
|         with self.subTest('config: None'):
 | |
|             interpid = _interpreters.create(None)
 | |
|             config = _interpreters.get_config(interpid)
 | |
|             self.assert_ns_equal(config, default)
 | |
| 
 | |
|         with self.subTest('config: \'empty\''):
 | |
|             with self.assertRaises(InterpreterError):
 | |
|                 # The "empty" config isn't viable on its own.
 | |
|                 _interpreters.create('empty')
 | |
| 
 | |
|         for arg, expected in {
 | |
|             '': default,
 | |
|             'default': default,
 | |
|             'isolated': isolated,
 | |
|             'legacy': legacy,
 | |
|         }.items():
 | |
|             with self.subTest(f'str arg: {arg!r}'):
 | |
|                 interpid = _interpreters.create(arg)
 | |
|                 config = _interpreters.get_config(interpid)
 | |
|                 self.assert_ns_equal(config, expected)
 | |
| 
 | |
|         with self.subTest('custom'):
 | |
|             orig = _interpreters.new_config('empty')
 | |
|             orig.use_main_obmalloc = True
 | |
|             orig.check_multi_interp_extensions = bool(Py_GIL_DISABLED)
 | |
|             orig.gil = 'shared'
 | |
|             interpid = _interpreters.create(orig)
 | |
|             config = _interpreters.get_config(interpid)
 | |
|             self.assert_ns_equal(config, orig)
 | |
| 
 | |
|         with self.subTest('missing fields'):
 | |
|             orig = _interpreters.new_config()
 | |
|             del orig.gil
 | |
|             with self.assertRaises(ValueError):
 | |
|                 _interpreters.create(orig)
 | |
| 
 | |
|         with self.subTest('extra fields'):
 | |
|             orig = _interpreters.new_config()
 | |
|             orig.spam = True
 | |
|             with self.assertRaises(ValueError):
 | |
|                 _interpreters.create(orig)
 | |
| 
 | |
|         with self.subTest('whence'):
 | |
|             interpid = _interpreters.create()
 | |
|             self.assertEqual(
 | |
|                 _interpreters.whence(interpid),
 | |
|                 _interpreters.WHENCE_STDLIB)
 | |
| 
 | |
|     @requires_test_modules
 | |
|     def test_destroy(self):
 | |
|         with self.subTest('from _interpreters'):
 | |
|             interpid = _interpreters.create()
 | |
|             before = [id for id, *_ in _interpreters.list_all()]
 | |
|             _interpreters.destroy(interpid)
 | |
|             after = [id for id, *_ in _interpreters.list_all()]
 | |
| 
 | |
|             self.assertIn(interpid, before)
 | |
|             self.assertNotIn(interpid, after)
 | |
|             self.assertFalse(
 | |
|                 self.interp_exists(interpid))
 | |
| 
 | |
|         with self.subTest('main'):
 | |
|             interpid, *_ = _interpreters.get_main()
 | |
|             with self.assertRaises(InterpreterError):
 | |
|                 # It is the current interpreter.
 | |
|                 _interpreters.destroy(interpid)
 | |
| 
 | |
|         with self.subTest('from C-API'):
 | |
|             interpid = _testinternalcapi.create_interpreter()
 | |
|             with self.assertRaisesRegex(InterpreterError, 'unrecognized'):
 | |
|                 _interpreters.destroy(interpid, restrict=True)
 | |
|             self.assertTrue(
 | |
|                 self.interp_exists(interpid))
 | |
|             _interpreters.destroy(interpid)
 | |
|             self.assertFalse(
 | |
|                 self.interp_exists(interpid))
 | |
| 
 | |
|     def test_get_config(self):
 | |
|         # This test overlaps with
 | |
|         # test.test_capi.test_misc.InterpreterConfigTests.
 | |
| 
 | |
|         with self.subTest('main'):
 | |
|             expected = _interpreters.new_config('legacy')
 | |
|             expected.gil = 'own'
 | |
|             if Py_GIL_DISABLED:
 | |
|                 expected.check_multi_interp_extensions = False
 | |
|             interpid, *_ = _interpreters.get_main()
 | |
|             config = _interpreters.get_config(interpid)
 | |
|             self.assert_ns_equal(config, expected)
 | |
| 
 | |
|         with self.subTest('isolated'):
 | |
|             expected = _interpreters.new_config('isolated')
 | |
|             interpid = _interpreters.create('isolated')
 | |
|             config = _interpreters.get_config(interpid)
 | |
|             self.assert_ns_equal(config, expected)
 | |
| 
 | |
|         with self.subTest('legacy'):
 | |
|             expected = _interpreters.new_config('legacy')
 | |
|             interpid = _interpreters.create('legacy')
 | |
|             config = _interpreters.get_config(interpid)
 | |
|             self.assert_ns_equal(config, expected)
 | |
| 
 | |
|         with self.subTest('from C-API'):
 | |
|             orig = _interpreters.new_config('isolated')
 | |
|             with self.interpreter_from_capi(orig) as interpid:
 | |
|                 with self.assertRaisesRegex(InterpreterError, 'unrecognized'):
 | |
|                     _interpreters.get_config(interpid, restrict=True)
 | |
|                 config = _interpreters.get_config(interpid)
 | |
|             self.assert_ns_equal(config, orig)
 | |
| 
 | |
|     @requires_test_modules
 | |
|     def test_whence(self):
 | |
|         with self.subTest('main'):
 | |
|             interpid, *_ = _interpreters.get_main()
 | |
|             whence = _interpreters.whence(interpid)
 | |
|             self.assertEqual(whence, _interpreters.WHENCE_RUNTIME)
 | |
| 
 | |
|         with self.subTest('stdlib'):
 | |
|             interpid = _interpreters.create()
 | |
|             whence = _interpreters.whence(interpid)
 | |
|             self.assertEqual(whence, _interpreters.WHENCE_STDLIB)
 | |
| 
 | |
|         for orig, name in {
 | |
|             _interpreters.WHENCE_UNKNOWN: 'not ready',
 | |
|             _interpreters.WHENCE_LEGACY_CAPI: 'legacy C-API',
 | |
|             _interpreters.WHENCE_CAPI: 'C-API',
 | |
|             _interpreters.WHENCE_XI: 'cross-interpreter C-API',
 | |
|         }.items():
 | |
|             with self.subTest(f'from C-API ({orig}: {name})'):
 | |
|                 with self.interpreter_from_capi(whence=orig) as interpid:
 | |
|                     whence = _interpreters.whence(interpid)
 | |
|                 self.assertEqual(whence, orig)
 | |
| 
 | |
|         with self.subTest('from C-API, running'):
 | |
|             text = self.run_temp_from_capi(dedent(f"""
 | |
|                 import _interpreters
 | |
|                 interpid, *_ = _interpreters.get_current()
 | |
|                 print(_interpreters.whence(interpid))
 | |
|                 """),
 | |
|                 config=True)
 | |
|             whence = eval(text)
 | |
|             self.assertEqual(whence, _interpreters.WHENCE_CAPI)
 | |
| 
 | |
|         with self.subTest('from legacy C-API, running'):
 | |
|             ...
 | |
|             text = self.run_temp_from_capi(dedent(f"""
 | |
|                 import _interpreters
 | |
|                 interpid, *_ = _interpreters.get_current()
 | |
|                 print(_interpreters.whence(interpid))
 | |
|                 """),
 | |
|                 config=False)
 | |
|             whence = eval(text)
 | |
|             self.assertEqual(whence, _interpreters.WHENCE_LEGACY_CAPI)
 | |
| 
 | |
|     def test_is_running(self):
 | |
|         def check(interpid, expected):
 | |
|             with self.assertRaisesRegex(InterpreterError, 'unrecognized'):
 | |
|                 _interpreters.is_running(interpid, restrict=True)
 | |
|             running = _interpreters.is_running(interpid)
 | |
|             self.assertIs(running, expected)
 | |
| 
 | |
|         with self.subTest('from _interpreters (running)'):
 | |
|             interpid = _interpreters.create()
 | |
|             with self.running(interpid):
 | |
|                 running = _interpreters.is_running(interpid)
 | |
|                 self.assertTrue(running)
 | |
| 
 | |
|         with self.subTest('from _interpreters (not running)'):
 | |
|             interpid = _interpreters.create()
 | |
|             running = _interpreters.is_running(interpid)
 | |
|             self.assertFalse(running)
 | |
| 
 | |
|         with self.subTest('main'):
 | |
|             interpid, *_ = _interpreters.get_main()
 | |
|             check(interpid, True)
 | |
| 
 | |
|         with self.subTest('from C-API (running __main__)'):
 | |
|             with self.interpreter_from_capi() as interpid:
 | |
|                 with self.running_from_capi(interpid, main=True):
 | |
|                     check(interpid, True)
 | |
| 
 | |
|         with self.subTest('from C-API (running, but not __main__)'):
 | |
|             with self.interpreter_from_capi() as interpid:
 | |
|                 with self.running_from_capi(interpid, main=False):
 | |
|                     check(interpid, False)
 | |
| 
 | |
|         with self.subTest('from C-API (not running)'):
 | |
|             with self.interpreter_from_capi() as interpid:
 | |
|                 check(interpid, False)
 | |
| 
 | |
|     def test_exec(self):
 | |
|         with self.subTest('run script'):
 | |
|             interpid = _interpreters.create()
 | |
|             script, results = _captured_script('print("it worked!", end="")')
 | |
|             with results:
 | |
|                 exc = _interpreters.exec(interpid, script)
 | |
|             results = results.final()
 | |
|             results.raise_if_failed()
 | |
|             out = results.stdout
 | |
|             self.assertEqual(out, 'it worked!')
 | |
| 
 | |
|         with self.subTest('uncaught exception'):
 | |
|             interpid = _interpreters.create()
 | |
|             script, results = _captured_script("""
 | |
|                 raise Exception('uh-oh!')
 | |
|                 print("it worked!", end="")
 | |
|                 """)
 | |
|             with results:
 | |
|                 exc = _interpreters.exec(interpid, script)
 | |
|                 out = results.stdout()
 | |
|             self.assertEqual(out, '')
 | |
|             self.assert_ns_equal(exc, types.SimpleNamespace(
 | |
|                 type=types.SimpleNamespace(
 | |
|                     __name__='Exception',
 | |
|                     __qualname__='Exception',
 | |
|                     __module__='builtins',
 | |
|                 ),
 | |
|                 msg='uh-oh!',
 | |
|                 # We check these in other tests.
 | |
|                 formatted=exc.formatted,
 | |
|                 errdisplay=exc.errdisplay,
 | |
|             ))
 | |
| 
 | |
|         with self.subTest('from C-API'):
 | |
|             with self.interpreter_from_capi() as interpid:
 | |
|                 with self.assertRaisesRegex(InterpreterError, 'unrecognized'):
 | |
|                     _interpreters.exec(interpid, 'raise Exception("it worked!")',
 | |
|                                        restrict=True)
 | |
|                 exc = _interpreters.exec(interpid, 'raise Exception("it worked!")')
 | |
|             self.assertIsNot(exc, None)
 | |
|             self.assertEqual(exc.msg, 'it worked!')
 | |
| 
 | |
|     def test_call(self):
 | |
|         with self.subTest('no args'):
 | |
|             interpid = _interpreters.create()
 | |
|             exc = _interpreters.call(interpid, call_func_return_shareable)
 | |
|             self.assertIs(exc, None)
 | |
| 
 | |
|         with self.subTest('uncaught exception'):
 | |
|             interpid = _interpreters.create()
 | |
|             exc = _interpreters.call(interpid, call_func_failure)
 | |
|             self.assertEqual(exc, types.SimpleNamespace(
 | |
|                 type=types.SimpleNamespace(
 | |
|                     __name__='Exception',
 | |
|                     __qualname__='Exception',
 | |
|                     __module__='builtins',
 | |
|                 ),
 | |
|                 msg='spam!',
 | |
|                 # We check these in other tests.
 | |
|                 formatted=exc.formatted,
 | |
|                 errdisplay=exc.errdisplay,
 | |
|             ))
 | |
| 
 | |
|     @requires_test_modules
 | |
|     def test_set___main___attrs(self):
 | |
|         with self.subTest('from _interpreters'):
 | |
|             interpid = _interpreters.create()
 | |
|             before1 = _interpreters.exec(interpid, 'assert spam == \'eggs\'')
 | |
|             before2 = _interpreters.exec(interpid, 'assert ham == 42')
 | |
|             self.assertEqual(before1.type.__name__, 'NameError')
 | |
|             self.assertEqual(before2.type.__name__, 'NameError')
 | |
| 
 | |
|             _interpreters.set___main___attrs(interpid, dict(
 | |
|                 spam='eggs',
 | |
|                 ham=42,
 | |
|             ))
 | |
|             after1 = _interpreters.exec(interpid, 'assert spam == \'eggs\'')
 | |
|             after2 = _interpreters.exec(interpid, 'assert ham == 42')
 | |
|             after3 = _interpreters.exec(interpid, 'assert spam == 42')
 | |
|             self.assertIs(after1, None)
 | |
|             self.assertIs(after2, None)
 | |
|             self.assertEqual(after3.type.__name__, 'AssertionError')
 | |
| 
 | |
|         with self.subTest('from C-API'):
 | |
|             with self.interpreter_from_capi() as interpid:
 | |
|                 with self.assertRaisesRegex(InterpreterError, 'unrecognized'):
 | |
|                     _interpreters.set___main___attrs(interpid, {'spam': True},
 | |
|                                                      restrict=True)
 | |
|                 _interpreters.set___main___attrs(interpid, {'spam': True})
 | |
|                 rc = _testinternalcapi.exec_interpreter(
 | |
|                     interpid,
 | |
|                     'assert spam is True',
 | |
|                 )
 | |
|             self.assertEqual(rc, 0)
 | |
| 
 | |
| 
 | |
| if __name__ == '__main__':
 | |
|     # Test needs to be a package, so we can do relative imports.
 | |
|     unittest.main()
 |