mirror of
				https://github.com/python/cpython.git
				synced 2025-10-31 13:41:24 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			1131 lines
		
	
	
	
		
			34 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			1131 lines
		
	
	
	
		
			34 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import contextlib
 | |
| import json
 | |
| import os
 | |
| import os.path
 | |
| import sys
 | |
| import threading
 | |
| from textwrap import dedent
 | |
| import unittest
 | |
| import time
 | |
| 
 | |
| from test import support
 | |
| from test.support import import_helper
 | |
| from test.support import threading_helper
 | |
| from test.support import os_helper
 | |
| _interpreters = import_helper.import_module('_xxsubinterpreters')
 | |
| _channels = import_helper.import_module('_xxinterpchannels')
 | |
| from test.support import interpreters
 | |
| 
 | |
| 
 | |
| def _captured_script(script):
 | |
|     r, w = os.pipe()
 | |
|     indented = script.replace('\n', '\n                ')
 | |
|     wrapped = dedent(f"""
 | |
|         import contextlib
 | |
|         with open({w}, 'w', encoding='utf-8') as spipe:
 | |
|             with contextlib.redirect_stdout(spipe):
 | |
|                 {indented}
 | |
|         """)
 | |
|     return wrapped, open(r, encoding='utf-8')
 | |
| 
 | |
| 
 | |
| def clean_up_interpreters():
 | |
|     for interp in interpreters.list_all():
 | |
|         if interp.id == 0:  # main
 | |
|             continue
 | |
|         try:
 | |
|             interp.close()
 | |
|         except RuntimeError:
 | |
|             pass  # already destroyed
 | |
| 
 | |
| 
 | |
| def _run_output(interp, request, channels=None):
 | |
|     script, rpipe = _captured_script(request)
 | |
|     with rpipe:
 | |
|         interp.run(script, channels=channels)
 | |
|         return rpipe.read()
 | |
| 
 | |
| 
 | |
| @contextlib.contextmanager
 | |
| def _running(interp):
 | |
|     r, w = os.pipe()
 | |
|     def run():
 | |
|         interp.run(dedent(f"""
 | |
|             # wait for "signal"
 | |
|             with open({r}) as rpipe:
 | |
|                 rpipe.read()
 | |
|             """))
 | |
| 
 | |
|     t = threading.Thread(target=run)
 | |
|     t.start()
 | |
| 
 | |
|     yield
 | |
| 
 | |
|     with open(w, 'w') as spipe:
 | |
|         spipe.write('done')
 | |
|     t.join()
 | |
| 
 | |
| 
 | |
| class TestBase(unittest.TestCase):
 | |
| 
 | |
|     def pipe(self):
 | |
|         def ensure_closed(fd):
 | |
|             try:
 | |
|                 os.close(fd)
 | |
|             except OSError:
 | |
|                 pass
 | |
|         r, w = os.pipe()
 | |
|         self.addCleanup(lambda: ensure_closed(r))
 | |
|         self.addCleanup(lambda: ensure_closed(w))
 | |
|         return r, w
 | |
| 
 | |
|     def tearDown(self):
 | |
|         clean_up_interpreters()
 | |
| 
 | |
| 
 | |
| class CreateTests(TestBase):
 | |
| 
 | |
|     def test_in_main(self):
 | |
|         interp = interpreters.create()
 | |
|         self.assertIsInstance(interp, interpreters.Interpreter)
 | |
|         self.assertIn(interp, interpreters.list_all())
 | |
| 
 | |
|     def test_in_thread(self):
 | |
|         lock = threading.Lock()
 | |
|         interp = None
 | |
|         def f():
 | |
|             nonlocal interp
 | |
|             interp = interpreters.create()
 | |
|             lock.acquire()
 | |
|             lock.release()
 | |
|         t = threading.Thread(target=f)
 | |
|         with lock:
 | |
|             t.start()
 | |
|         t.join()
 | |
|         self.assertIn(interp, interpreters.list_all())
 | |
| 
 | |
|     def test_in_subinterpreter(self):
 | |
|         main, = interpreters.list_all()
 | |
|         interp = interpreters.create()
 | |
|         out = _run_output(interp, dedent("""
 | |
|             from test.support import interpreters
 | |
|             interp = interpreters.create()
 | |
|             print(interp.id)
 | |
|             """))
 | |
|         interp2 = interpreters.Interpreter(int(out))
 | |
|         self.assertEqual(interpreters.list_all(), [main, interp, interp2])
 | |
| 
 | |
|     def test_after_destroy_all(self):
 | |
|         before = set(interpreters.list_all())
 | |
|         # Create 3 subinterpreters.
 | |
|         interp_lst = []
 | |
|         for _ in range(3):
 | |
|             interps = interpreters.create()
 | |
|             interp_lst.append(interps)
 | |
|         # Now destroy them.
 | |
|         for interp in interp_lst:
 | |
|             interp.close()
 | |
|         # Finally, create another.
 | |
|         interp = interpreters.create()
 | |
|         self.assertEqual(set(interpreters.list_all()), before | {interp})
 | |
| 
 | |
|     def test_after_destroy_some(self):
 | |
|         before = set(interpreters.list_all())
 | |
|         # Create 3 subinterpreters.
 | |
|         interp1 = interpreters.create()
 | |
|         interp2 = interpreters.create()
 | |
|         interp3 = interpreters.create()
 | |
|         # Now destroy 2 of them.
 | |
|         interp1.close()
 | |
|         interp2.close()
 | |
|         # Finally, create another.
 | |
|         interp = interpreters.create()
 | |
|         self.assertEqual(set(interpreters.list_all()), before | {interp3, interp})
 | |
| 
 | |
| 
 | |
| class GetCurrentTests(TestBase):
 | |
| 
 | |
|     def test_main(self):
 | |
|         main = interpreters.get_main()
 | |
|         current = interpreters.get_current()
 | |
|         self.assertEqual(current, main)
 | |
| 
 | |
|     def test_subinterpreter(self):
 | |
|         main = _interpreters.get_main()
 | |
|         interp = interpreters.create()
 | |
|         out = _run_output(interp, dedent("""
 | |
|             from test.support import interpreters
 | |
|             cur = interpreters.get_current()
 | |
|             print(cur.id)
 | |
|             """))
 | |
|         current = interpreters.Interpreter(int(out))
 | |
|         self.assertNotEqual(current, main)
 | |
| 
 | |
| 
 | |
| class ListAllTests(TestBase):
 | |
| 
 | |
|     def test_initial(self):
 | |
|         interps = interpreters.list_all()
 | |
|         self.assertEqual(1, len(interps))
 | |
| 
 | |
|     def test_after_creating(self):
 | |
|         main = interpreters.get_current()
 | |
|         first = interpreters.create()
 | |
|         second = interpreters.create()
 | |
| 
 | |
|         ids = []
 | |
|         for interp in interpreters.list_all():
 | |
|             ids.append(interp.id)
 | |
| 
 | |
|         self.assertEqual(ids, [main.id, first.id, second.id])
 | |
| 
 | |
|     def test_after_destroying(self):
 | |
|         main = interpreters.get_current()
 | |
|         first = interpreters.create()
 | |
|         second = interpreters.create()
 | |
|         first.close()
 | |
| 
 | |
|         ids = []
 | |
|         for interp in interpreters.list_all():
 | |
|             ids.append(interp.id)
 | |
| 
 | |
|         self.assertEqual(ids, [main.id, second.id])
 | |
| 
 | |
| 
 | |
| class TestInterpreterAttrs(TestBase):
 | |
| 
 | |
|     def test_id_type(self):
 | |
|         main = interpreters.get_main()
 | |
|         current = interpreters.get_current()
 | |
|         interp = interpreters.create()
 | |
|         self.assertIsInstance(main.id, _interpreters.InterpreterID)
 | |
|         self.assertIsInstance(current.id, _interpreters.InterpreterID)
 | |
|         self.assertIsInstance(interp.id, _interpreters.InterpreterID)
 | |
| 
 | |
|     def test_main_id(self):
 | |
|         main = interpreters.get_main()
 | |
|         self.assertEqual(main.id, 0)
 | |
| 
 | |
|     def test_custom_id(self):
 | |
|         interp = interpreters.Interpreter(1)
 | |
|         self.assertEqual(interp.id, 1)
 | |
| 
 | |
|         with self.assertRaises(TypeError):
 | |
|             interpreters.Interpreter('1')
 | |
| 
 | |
|     def test_id_readonly(self):
 | |
|         interp = interpreters.Interpreter(1)
 | |
|         with self.assertRaises(AttributeError):
 | |
|             interp.id = 2
 | |
| 
 | |
|     @unittest.skip('not ready yet (see bpo-32604)')
 | |
|     def test_main_isolated(self):
 | |
|         main = interpreters.get_main()
 | |
|         self.assertFalse(main.isolated)
 | |
| 
 | |
|     @unittest.skip('not ready yet (see bpo-32604)')
 | |
|     def test_subinterpreter_isolated_default(self):
 | |
|         interp = interpreters.create()
 | |
|         self.assertFalse(interp.isolated)
 | |
| 
 | |
|     def test_subinterpreter_isolated_explicit(self):
 | |
|         interp1 = interpreters.create(isolated=True)
 | |
|         interp2 = interpreters.create(isolated=False)
 | |
|         self.assertTrue(interp1.isolated)
 | |
|         self.assertFalse(interp2.isolated)
 | |
| 
 | |
|     @unittest.skip('not ready yet (see bpo-32604)')
 | |
|     def test_custom_isolated_default(self):
 | |
|         interp = interpreters.Interpreter(1)
 | |
|         self.assertFalse(interp.isolated)
 | |
| 
 | |
|     def test_custom_isolated_explicit(self):
 | |
|         interp1 = interpreters.Interpreter(1, isolated=True)
 | |
|         interp2 = interpreters.Interpreter(1, isolated=False)
 | |
|         self.assertTrue(interp1.isolated)
 | |
|         self.assertFalse(interp2.isolated)
 | |
| 
 | |
|     def test_isolated_readonly(self):
 | |
|         interp = interpreters.Interpreter(1)
 | |
|         with self.assertRaises(AttributeError):
 | |
|             interp.isolated = True
 | |
| 
 | |
|     def test_equality(self):
 | |
|         interp1 = interpreters.create()
 | |
|         interp2 = interpreters.create()
 | |
|         self.assertEqual(interp1, interp1)
 | |
|         self.assertNotEqual(interp1, interp2)
 | |
| 
 | |
| 
 | |
| class TestInterpreterIsRunning(TestBase):
 | |
| 
 | |
|     def test_main(self):
 | |
|         main = interpreters.get_main()
 | |
|         self.assertTrue(main.is_running())
 | |
| 
 | |
|     @unittest.skip('Fails on FreeBSD')
 | |
|     def test_subinterpreter(self):
 | |
|         interp = interpreters.create()
 | |
|         self.assertFalse(interp.is_running())
 | |
| 
 | |
|         with _running(interp):
 | |
|             self.assertTrue(interp.is_running())
 | |
|         self.assertFalse(interp.is_running())
 | |
| 
 | |
|     def test_finished(self):
 | |
|         r, w = self.pipe()
 | |
|         interp = interpreters.create()
 | |
|         interp.run(f"""if True:
 | |
|             import os
 | |
|             os.write({w}, b'x')
 | |
|             """)
 | |
|         self.assertFalse(interp.is_running())
 | |
|         self.assertEqual(os.read(r, 1), b'x')
 | |
| 
 | |
|     def test_from_subinterpreter(self):
 | |
|         interp = interpreters.create()
 | |
|         out = _run_output(interp, dedent(f"""
 | |
|             import _xxsubinterpreters as _interpreters
 | |
|             if _interpreters.is_running({interp.id}):
 | |
|                 print(True)
 | |
|             else:
 | |
|                 print(False)
 | |
|             """))
 | |
|         self.assertEqual(out.strip(), 'True')
 | |
| 
 | |
|     def test_already_destroyed(self):
 | |
|         interp = interpreters.create()
 | |
|         interp.close()
 | |
|         with self.assertRaises(RuntimeError):
 | |
|             interp.is_running()
 | |
| 
 | |
|     def test_does_not_exist(self):
 | |
|         interp = interpreters.Interpreter(1_000_000)
 | |
|         with self.assertRaises(RuntimeError):
 | |
|             interp.is_running()
 | |
| 
 | |
|     def test_bad_id(self):
 | |
|         interp = interpreters.Interpreter(-1)
 | |
|         with self.assertRaises(ValueError):
 | |
|             interp.is_running()
 | |
| 
 | |
|     def test_with_only_background_threads(self):
 | |
|         r_interp, w_interp = self.pipe()
 | |
|         r_thread, w_thread = self.pipe()
 | |
| 
 | |
|         DONE = b'D'
 | |
|         FINISHED = b'F'
 | |
| 
 | |
|         interp = interpreters.create()
 | |
|         interp.run(f"""if True:
 | |
|             import os
 | |
|             import threading
 | |
| 
 | |
|             def task():
 | |
|                 v = os.read({r_thread}, 1)
 | |
|                 assert v == {DONE!r}
 | |
|                 os.write({w_interp}, {FINISHED!r})
 | |
|             t = threading.Thread(target=task)
 | |
|             t.start()
 | |
|             """)
 | |
|         self.assertFalse(interp.is_running())
 | |
| 
 | |
|         os.write(w_thread, DONE)
 | |
|         interp.run('t.join()')
 | |
|         self.assertEqual(os.read(r_interp, 1), FINISHED)
 | |
| 
 | |
| 
 | |
| class TestInterpreterClose(TestBase):
 | |
| 
 | |
|     def test_basic(self):
 | |
|         main = interpreters.get_main()
 | |
|         interp1 = interpreters.create()
 | |
|         interp2 = interpreters.create()
 | |
|         interp3 = interpreters.create()
 | |
|         self.assertEqual(set(interpreters.list_all()),
 | |
|                          {main, interp1, interp2, interp3})
 | |
|         interp2.close()
 | |
|         self.assertEqual(set(interpreters.list_all()),
 | |
|                          {main, interp1, interp3})
 | |
| 
 | |
|     def test_all(self):
 | |
|         before = set(interpreters.list_all())
 | |
|         interps = set()
 | |
|         for _ in range(3):
 | |
|             interp = interpreters.create()
 | |
|             interps.add(interp)
 | |
|         self.assertEqual(set(interpreters.list_all()), before | interps)
 | |
|         for interp in interps:
 | |
|             interp.close()
 | |
|         self.assertEqual(set(interpreters.list_all()), before)
 | |
| 
 | |
|     def test_main(self):
 | |
|         main, = interpreters.list_all()
 | |
|         with self.assertRaises(RuntimeError):
 | |
|             main.close()
 | |
| 
 | |
|         def f():
 | |
|             with self.assertRaises(RuntimeError):
 | |
|                 main.close()
 | |
| 
 | |
|         t = threading.Thread(target=f)
 | |
|         t.start()
 | |
|         t.join()
 | |
| 
 | |
|     def test_already_destroyed(self):
 | |
|         interp = interpreters.create()
 | |
|         interp.close()
 | |
|         with self.assertRaises(RuntimeError):
 | |
|             interp.close()
 | |
| 
 | |
|     def test_does_not_exist(self):
 | |
|         interp = interpreters.Interpreter(1_000_000)
 | |
|         with self.assertRaises(RuntimeError):
 | |
|             interp.close()
 | |
| 
 | |
|     def test_bad_id(self):
 | |
|         interp = interpreters.Interpreter(-1)
 | |
|         with self.assertRaises(ValueError):
 | |
|             interp.close()
 | |
| 
 | |
|     def test_from_current(self):
 | |
|         main, = interpreters.list_all()
 | |
|         interp = interpreters.create()
 | |
|         out = _run_output(interp, dedent(f"""
 | |
|             from test.support import interpreters
 | |
|             interp = interpreters.Interpreter({int(interp.id)})
 | |
|             try:
 | |
|                 interp.close()
 | |
|             except RuntimeError:
 | |
|                 print('failed')
 | |
|             """))
 | |
|         self.assertEqual(out.strip(), 'failed')
 | |
|         self.assertEqual(set(interpreters.list_all()), {main, interp})
 | |
| 
 | |
|     def test_from_sibling(self):
 | |
|         main, = interpreters.list_all()
 | |
|         interp1 = interpreters.create()
 | |
|         interp2 = interpreters.create()
 | |
|         self.assertEqual(set(interpreters.list_all()),
 | |
|                          {main, interp1, interp2})
 | |
|         interp1.run(dedent(f"""
 | |
|             from test.support import interpreters
 | |
|             interp2 = interpreters.Interpreter(int({interp2.id}))
 | |
|             interp2.close()
 | |
|             interp3 = interpreters.create()
 | |
|             interp3.close()
 | |
|             """))
 | |
|         self.assertEqual(set(interpreters.list_all()), {main, interp1})
 | |
| 
 | |
|     def test_from_other_thread(self):
 | |
|         interp = interpreters.create()
 | |
|         def f():
 | |
|             interp.close()
 | |
| 
 | |
|         t = threading.Thread(target=f)
 | |
|         t.start()
 | |
|         t.join()
 | |
| 
 | |
|     @unittest.skip('Fails on FreeBSD')
 | |
|     def test_still_running(self):
 | |
|         main, = interpreters.list_all()
 | |
|         interp = interpreters.create()
 | |
|         with _running(interp):
 | |
|             with self.assertRaises(RuntimeError):
 | |
|                 interp.close()
 | |
|             self.assertTrue(interp.is_running())
 | |
| 
 | |
|     def test_subthreads_still_running(self):
 | |
|         r_interp, w_interp = self.pipe()
 | |
|         r_thread, w_thread = self.pipe()
 | |
| 
 | |
|         FINISHED = b'F'
 | |
| 
 | |
|         interp = interpreters.create()
 | |
|         interp.run(f"""if True:
 | |
|             import os
 | |
|             import threading
 | |
|             import time
 | |
| 
 | |
|             done = False
 | |
| 
 | |
|             def notify_fini():
 | |
|                 global done
 | |
|                 done = True
 | |
|                 t.join()
 | |
|             threading._register_atexit(notify_fini)
 | |
| 
 | |
|             def task():
 | |
|                 while not done:
 | |
|                     time.sleep(0.1)
 | |
|                 os.write({w_interp}, {FINISHED!r})
 | |
|             t = threading.Thread(target=task)
 | |
|             t.start()
 | |
|             """)
 | |
|         interp.close()
 | |
| 
 | |
|         self.assertEqual(os.read(r_interp, 1), FINISHED)
 | |
| 
 | |
| 
 | |
| class TestInterpreterRun(TestBase):
 | |
| 
 | |
|     def test_success(self):
 | |
|         interp = interpreters.create()
 | |
|         script, file = _captured_script('print("it worked!", end="")')
 | |
|         with file:
 | |
|             interp.run(script)
 | |
|             out = file.read()
 | |
| 
 | |
|         self.assertEqual(out, 'it worked!')
 | |
| 
 | |
|     def test_in_thread(self):
 | |
|         interp = interpreters.create()
 | |
|         script, file = _captured_script('print("it worked!", end="")')
 | |
|         with file:
 | |
|             def f():
 | |
|                 interp.run(script)
 | |
| 
 | |
|             t = threading.Thread(target=f)
 | |
|             t.start()
 | |
|             t.join()
 | |
|             out = file.read()
 | |
| 
 | |
|         self.assertEqual(out, 'it worked!')
 | |
| 
 | |
|     @support.requires_fork()
 | |
|     def test_fork(self):
 | |
|         interp = interpreters.create()
 | |
|         import tempfile
 | |
|         with tempfile.NamedTemporaryFile('w+', encoding='utf-8') as file:
 | |
|             file.write('')
 | |
|             file.flush()
 | |
| 
 | |
|             expected = 'spam spam spam spam spam'
 | |
|             script = dedent(f"""
 | |
|                 import os
 | |
|                 try:
 | |
|                     os.fork()
 | |
|                 except RuntimeError:
 | |
|                     with open('{file.name}', 'w', encoding='utf-8') as out:
 | |
|                         out.write('{expected}')
 | |
|                 """)
 | |
|             interp.run(script)
 | |
| 
 | |
|             file.seek(0)
 | |
|             content = file.read()
 | |
|             self.assertEqual(content, expected)
 | |
| 
 | |
|     @unittest.skip('Fails on FreeBSD')
 | |
|     def test_already_running(self):
 | |
|         interp = interpreters.create()
 | |
|         with _running(interp):
 | |
|             with self.assertRaises(RuntimeError):
 | |
|                 interp.run('print("spam")')
 | |
| 
 | |
|     def test_does_not_exist(self):
 | |
|         interp = interpreters.Interpreter(1_000_000)
 | |
|         with self.assertRaises(RuntimeError):
 | |
|             interp.run('print("spam")')
 | |
| 
 | |
|     def test_bad_id(self):
 | |
|         interp = interpreters.Interpreter(-1)
 | |
|         with self.assertRaises(ValueError):
 | |
|             interp.run('print("spam")')
 | |
| 
 | |
|     def test_bad_script(self):
 | |
|         interp = interpreters.create()
 | |
|         with self.assertRaises(TypeError):
 | |
|             interp.run(10)
 | |
| 
 | |
|     def test_bytes_for_script(self):
 | |
|         interp = interpreters.create()
 | |
|         with self.assertRaises(TypeError):
 | |
|             interp.run(b'print("spam")')
 | |
| 
 | |
|     def test_with_background_threads_still_running(self):
 | |
|         r_interp, w_interp = self.pipe()
 | |
|         r_thread, w_thread = self.pipe()
 | |
| 
 | |
|         RAN = b'R'
 | |
|         DONE = b'D'
 | |
|         FINISHED = b'F'
 | |
| 
 | |
|         interp = interpreters.create()
 | |
|         interp.run(f"""if True:
 | |
|             import os
 | |
|             import threading
 | |
| 
 | |
|             def task():
 | |
|                 v = os.read({r_thread}, 1)
 | |
|                 assert v == {DONE!r}
 | |
|                 os.write({w_interp}, {FINISHED!r})
 | |
|             t = threading.Thread(target=task)
 | |
|             t.start()
 | |
|             os.write({w_interp}, {RAN!r})
 | |
|             """)
 | |
|         interp.run(f"""if True:
 | |
|             os.write({w_interp}, {RAN!r})
 | |
|             """)
 | |
| 
 | |
|         os.write(w_thread, DONE)
 | |
|         interp.run('t.join()')
 | |
|         self.assertEqual(os.read(r_interp, 1), RAN)
 | |
|         self.assertEqual(os.read(r_interp, 1), RAN)
 | |
|         self.assertEqual(os.read(r_interp, 1), FINISHED)
 | |
| 
 | |
|     # test_xxsubinterpreters covers the remaining Interpreter.run() behavior.
 | |
| 
 | |
| 
 | |
| class StressTests(TestBase):
 | |
| 
 | |
|     # In these tests we generally want a lot of interpreters,
 | |
|     # but not so many that any test takes too long.
 | |
| 
 | |
|     @support.requires_resource('cpu')
 | |
|     def test_create_many_sequential(self):
 | |
|         alive = []
 | |
|         for _ in range(100):
 | |
|             interp = interpreters.create()
 | |
|             alive.append(interp)
 | |
| 
 | |
|     @support.requires_resource('cpu')
 | |
|     def test_create_many_threaded(self):
 | |
|         alive = []
 | |
|         def task():
 | |
|             interp = interpreters.create()
 | |
|             alive.append(interp)
 | |
|         threads = (threading.Thread(target=task) for _ in range(200))
 | |
|         with threading_helper.start_threads(threads):
 | |
|             pass
 | |
| 
 | |
| 
 | |
| class StartupTests(TestBase):
 | |
| 
 | |
|     # We want to ensure the initial state of subinterpreters
 | |
|     # matches expectations.
 | |
| 
 | |
|     _subtest_count = 0
 | |
| 
 | |
|     @contextlib.contextmanager
 | |
|     def subTest(self, *args):
 | |
|         with super().subTest(*args) as ctx:
 | |
|             self._subtest_count += 1
 | |
|             try:
 | |
|                 yield ctx
 | |
|             finally:
 | |
|                 if self._debugged_in_subtest:
 | |
|                     if self._subtest_count == 1:
 | |
|                         # The first subtest adds a leading newline, so we
 | |
|                         # compensate here by not printing a trailing newline.
 | |
|                         print('### end subtest debug ###', end='')
 | |
|                     else:
 | |
|                         print('### end subtest debug ###')
 | |
|                 self._debugged_in_subtest = False
 | |
| 
 | |
|     def debug(self, msg, *, header=None):
 | |
|         if header:
 | |
|             self._debug(f'--- {header} ---')
 | |
|             if msg:
 | |
|                 if msg.endswith(os.linesep):
 | |
|                     self._debug(msg[:-len(os.linesep)])
 | |
|                 else:
 | |
|                     self._debug(msg)
 | |
|                     self._debug('<no newline>')
 | |
|             self._debug('------')
 | |
|         else:
 | |
|             self._debug(msg)
 | |
| 
 | |
|     _debugged = False
 | |
|     _debugged_in_subtest = False
 | |
|     def _debug(self, msg):
 | |
|         if not self._debugged:
 | |
|             print()
 | |
|             self._debugged = True
 | |
|         if self._subtest is not None:
 | |
|             if True:
 | |
|                 if not self._debugged_in_subtest:
 | |
|                     self._debugged_in_subtest = True
 | |
|                     print('### start subtest debug ###')
 | |
|                 print(msg)
 | |
|         else:
 | |
|             print(msg)
 | |
| 
 | |
|     def create_temp_dir(self):
 | |
|         import tempfile
 | |
|         tmp = tempfile.mkdtemp(prefix='test_interpreters_')
 | |
|         tmp = os.path.realpath(tmp)
 | |
|         self.addCleanup(os_helper.rmtree, tmp)
 | |
|         return tmp
 | |
| 
 | |
|     def write_script(self, *path, text):
 | |
|         filename = os.path.join(*path)
 | |
|         dirname = os.path.dirname(filename)
 | |
|         if dirname:
 | |
|             os.makedirs(dirname, exist_ok=True)
 | |
|         with open(filename, 'w', encoding='utf-8') as outfile:
 | |
|             outfile.write(dedent(text))
 | |
|         return filename
 | |
| 
 | |
|     @support.requires_subprocess()
 | |
|     def run_python(self, argv, *, cwd=None):
 | |
|         # This method is inspired by
 | |
|         # EmbeddingTestsMixin.run_embedded_interpreter() in test_embed.py.
 | |
|         import shlex
 | |
|         import subprocess
 | |
|         if isinstance(argv, str):
 | |
|             argv = shlex.split(argv)
 | |
|         argv = [sys.executable, *argv]
 | |
|         try:
 | |
|             proc = subprocess.run(
 | |
|                 argv,
 | |
|                 cwd=cwd,
 | |
|                 capture_output=True,
 | |
|                 text=True,
 | |
|             )
 | |
|         except Exception as exc:
 | |
|             self.debug(f'# cmd: {shlex.join(argv)}')
 | |
|             if isinstance(exc, FileNotFoundError) and not exc.filename:
 | |
|                 if os.path.exists(argv[0]):
 | |
|                     exists = 'exists'
 | |
|                 else:
 | |
|                     exists = 'does not exist'
 | |
|                 self.debug(f'{argv[0]} {exists}')
 | |
|             raise  # re-raise
 | |
|         assert proc.stderr == '' or proc.returncode != 0, proc.stderr
 | |
|         if proc.returncode != 0 and support.verbose:
 | |
|             self.debug(f'# python3 {shlex.join(argv[1:])} failed:')
 | |
|             self.debug(proc.stdout, header='stdout')
 | |
|             self.debug(proc.stderr, header='stderr')
 | |
|         self.assertEqual(proc.returncode, 0)
 | |
|         self.assertEqual(proc.stderr, '')
 | |
|         return proc.stdout
 | |
| 
 | |
|     def test_sys_path_0(self):
 | |
|         # The main interpreter's sys.path[0] should be used by subinterpreters.
 | |
|         script = '''
 | |
|             import sys
 | |
|             from test.support import interpreters
 | |
| 
 | |
|             orig = sys.path[0]
 | |
| 
 | |
|             interp = interpreters.create()
 | |
|             interp.run(f"""if True:
 | |
|                 import json
 | |
|                 import sys
 | |
|                 print(json.dumps({{
 | |
|                     'main': {orig!r},
 | |
|                     'sub': sys.path[0],
 | |
|                 }}, indent=4), flush=True)
 | |
|                 """)
 | |
|             '''
 | |
|         # <tmp>/
 | |
|         #   pkg/
 | |
|         #     __init__.py
 | |
|         #     __main__.py
 | |
|         #     script.py
 | |
|         #   script.py
 | |
|         cwd = self.create_temp_dir()
 | |
|         self.write_script(cwd, 'pkg', '__init__.py', text='')
 | |
|         self.write_script(cwd, 'pkg', '__main__.py', text=script)
 | |
|         self.write_script(cwd, 'pkg', 'script.py', text=script)
 | |
|         self.write_script(cwd, 'script.py', text=script)
 | |
| 
 | |
|         cases = [
 | |
|             ('script.py', cwd),
 | |
|             ('-m script', cwd),
 | |
|             ('-m pkg', cwd),
 | |
|             ('-m pkg.script', cwd),
 | |
|             ('-c "import script"', ''),
 | |
|         ]
 | |
|         for argv, expected in cases:
 | |
|             with self.subTest(f'python3 {argv}'):
 | |
|                 out = self.run_python(argv, cwd=cwd)
 | |
|                 data = json.loads(out)
 | |
|                 sp0_main, sp0_sub = data['main'], data['sub']
 | |
|                 self.assertEqual(sp0_sub, sp0_main)
 | |
|                 self.assertEqual(sp0_sub, expected)
 | |
|         # XXX Also check them all with the -P cmdline flag?
 | |
| 
 | |
| 
 | |
| class FinalizationTests(TestBase):
 | |
| 
 | |
|     def test_gh_109793(self):
 | |
|         import subprocess
 | |
|         argv = [sys.executable, '-c', '''if True:
 | |
|             import _xxsubinterpreters as _interpreters
 | |
|             interpid = _interpreters.create()
 | |
|             raise Exception
 | |
|             ''']
 | |
|         proc = subprocess.run(argv, capture_output=True, text=True)
 | |
|         self.assertIn('Traceback', proc.stderr)
 | |
|         if proc.returncode == 0 and support.verbose:
 | |
|             print()
 | |
|             print("--- cmd unexpected succeeded ---")
 | |
|             print(f"stdout:\n{proc.stdout}")
 | |
|             print(f"stderr:\n{proc.stderr}")
 | |
|             print("------")
 | |
|         self.assertEqual(proc.returncode, 1)
 | |
| 
 | |
| 
 | |
| class TestIsShareable(TestBase):
 | |
| 
 | |
|     def test_default_shareables(self):
 | |
|         shareables = [
 | |
|                 # singletons
 | |
|                 None,
 | |
|                 # builtin objects
 | |
|                 b'spam',
 | |
|                 'spam',
 | |
|                 10,
 | |
|                 -10,
 | |
|                 True,
 | |
|                 False,
 | |
|                 100.0,
 | |
|                 (),
 | |
|                 (1, ('spam', 'eggs'), True),
 | |
|                 ]
 | |
|         for obj in shareables:
 | |
|             with self.subTest(obj):
 | |
|                 shareable = interpreters.is_shareable(obj)
 | |
|                 self.assertTrue(shareable)
 | |
| 
 | |
|     def test_not_shareable(self):
 | |
|         class Cheese:
 | |
|             def __init__(self, name):
 | |
|                 self.name = name
 | |
|             def __str__(self):
 | |
|                 return self.name
 | |
| 
 | |
|         class SubBytes(bytes):
 | |
|             """A subclass of a shareable type."""
 | |
| 
 | |
|         not_shareables = [
 | |
|                 # singletons
 | |
|                 NotImplemented,
 | |
|                 ...,
 | |
|                 # builtin types and objects
 | |
|                 type,
 | |
|                 object,
 | |
|                 object(),
 | |
|                 Exception(),
 | |
|                 # user-defined types and objects
 | |
|                 Cheese,
 | |
|                 Cheese('Wensleydale'),
 | |
|                 SubBytes(b'spam'),
 | |
|                 ]
 | |
|         for obj in not_shareables:
 | |
|             with self.subTest(repr(obj)):
 | |
|                 self.assertFalse(
 | |
|                     interpreters.is_shareable(obj))
 | |
| 
 | |
| 
 | |
| class TestChannels(TestBase):
 | |
| 
 | |
|     def test_create(self):
 | |
|         r, s = interpreters.create_channel()
 | |
|         self.assertIsInstance(r, interpreters.RecvChannel)
 | |
|         self.assertIsInstance(s, interpreters.SendChannel)
 | |
| 
 | |
|     def test_list_all(self):
 | |
|         self.assertEqual(interpreters.list_all_channels(), [])
 | |
|         created = set()
 | |
|         for _ in range(3):
 | |
|             ch = interpreters.create_channel()
 | |
|             created.add(ch)
 | |
|         after = set(interpreters.list_all_channels())
 | |
|         self.assertEqual(after, created)
 | |
| 
 | |
|     def test_shareable(self):
 | |
|         rch, sch = interpreters.create_channel()
 | |
| 
 | |
|         self.assertTrue(
 | |
|             interpreters.is_shareable(rch))
 | |
|         self.assertTrue(
 | |
|             interpreters.is_shareable(sch))
 | |
| 
 | |
|         sch.send_nowait(rch)
 | |
|         sch.send_nowait(sch)
 | |
|         rch2 = rch.recv()
 | |
|         sch2 = rch.recv()
 | |
| 
 | |
|         self.assertEqual(rch2, rch)
 | |
|         self.assertEqual(sch2, sch)
 | |
| 
 | |
|     def test_is_closed(self):
 | |
|         rch, sch = interpreters.create_channel()
 | |
|         rbefore = rch.is_closed
 | |
|         sbefore = sch.is_closed
 | |
|         rch.close()
 | |
|         rafter = rch.is_closed
 | |
|         safter = sch.is_closed
 | |
| 
 | |
|         self.assertFalse(rbefore)
 | |
|         self.assertFalse(sbefore)
 | |
|         self.assertTrue(rafter)
 | |
|         self.assertTrue(safter)
 | |
| 
 | |
| 
 | |
| class TestRecvChannelAttrs(TestBase):
 | |
| 
 | |
|     def test_id_type(self):
 | |
|         rch, _ = interpreters.create_channel()
 | |
|         self.assertIsInstance(rch.id, _channels.ChannelID)
 | |
| 
 | |
|     def test_custom_id(self):
 | |
|         rch = interpreters.RecvChannel(1)
 | |
|         self.assertEqual(rch.id, 1)
 | |
| 
 | |
|         with self.assertRaises(TypeError):
 | |
|             interpreters.RecvChannel('1')
 | |
| 
 | |
|     def test_id_readonly(self):
 | |
|         rch = interpreters.RecvChannel(1)
 | |
|         with self.assertRaises(AttributeError):
 | |
|             rch.id = 2
 | |
| 
 | |
|     def test_equality(self):
 | |
|         ch1, _ = interpreters.create_channel()
 | |
|         ch2, _ = interpreters.create_channel()
 | |
|         self.assertEqual(ch1, ch1)
 | |
|         self.assertNotEqual(ch1, ch2)
 | |
| 
 | |
| 
 | |
| class TestSendChannelAttrs(TestBase):
 | |
| 
 | |
|     def test_id_type(self):
 | |
|         _, sch = interpreters.create_channel()
 | |
|         self.assertIsInstance(sch.id, _channels.ChannelID)
 | |
| 
 | |
|     def test_custom_id(self):
 | |
|         sch = interpreters.SendChannel(1)
 | |
|         self.assertEqual(sch.id, 1)
 | |
| 
 | |
|         with self.assertRaises(TypeError):
 | |
|             interpreters.SendChannel('1')
 | |
| 
 | |
|     def test_id_readonly(self):
 | |
|         sch = interpreters.SendChannel(1)
 | |
|         with self.assertRaises(AttributeError):
 | |
|             sch.id = 2
 | |
| 
 | |
|     def test_equality(self):
 | |
|         _, ch1 = interpreters.create_channel()
 | |
|         _, ch2 = interpreters.create_channel()
 | |
|         self.assertEqual(ch1, ch1)
 | |
|         self.assertNotEqual(ch1, ch2)
 | |
| 
 | |
| 
 | |
| class TestSendRecv(TestBase):
 | |
| 
 | |
|     def test_send_recv_main(self):
 | |
|         r, s = interpreters.create_channel()
 | |
|         orig = b'spam'
 | |
|         s.send_nowait(orig)
 | |
|         obj = r.recv()
 | |
| 
 | |
|         self.assertEqual(obj, orig)
 | |
|         self.assertIsNot(obj, orig)
 | |
| 
 | |
|     def test_send_recv_same_interpreter(self):
 | |
|         interp = interpreters.create()
 | |
|         interp.run(dedent("""
 | |
|             from test.support import interpreters
 | |
|             r, s = interpreters.create_channel()
 | |
|             orig = b'spam'
 | |
|             s.send_nowait(orig)
 | |
|             obj = r.recv()
 | |
|             assert obj == orig, 'expected: obj == orig'
 | |
|             assert obj is not orig, 'expected: obj is not orig'
 | |
|             """))
 | |
| 
 | |
|     @unittest.skip('broken (see BPO-...)')
 | |
|     def test_send_recv_different_interpreters(self):
 | |
|         r1, s1 = interpreters.create_channel()
 | |
|         r2, s2 = interpreters.create_channel()
 | |
|         orig1 = b'spam'
 | |
|         s1.send_nowait(orig1)
 | |
|         out = _run_output(
 | |
|             interpreters.create(),
 | |
|             dedent(f"""
 | |
|                 obj1 = r.recv()
 | |
|                 assert obj1 == b'spam', 'expected: obj1 == orig1'
 | |
|                 # When going to another interpreter we get a copy.
 | |
|                 assert id(obj1) != {id(orig1)}, 'expected: obj1 is not orig1'
 | |
|                 orig2 = b'eggs'
 | |
|                 print(id(orig2))
 | |
|                 s.send_nowait(orig2)
 | |
|                 """),
 | |
|             channels=dict(r=r1, s=s2),
 | |
|             )
 | |
|         obj2 = r2.recv()
 | |
| 
 | |
|         self.assertEqual(obj2, b'eggs')
 | |
|         self.assertNotEqual(id(obj2), int(out))
 | |
| 
 | |
|     def test_send_recv_different_threads(self):
 | |
|         r, s = interpreters.create_channel()
 | |
| 
 | |
|         def f():
 | |
|             while True:
 | |
|                 try:
 | |
|                     obj = r.recv()
 | |
|                     break
 | |
|                 except interpreters.ChannelEmptyError:
 | |
|                     time.sleep(0.1)
 | |
|             s.send(obj)
 | |
|         t = threading.Thread(target=f)
 | |
|         t.start()
 | |
| 
 | |
|         orig = b'spam'
 | |
|         s.send(orig)
 | |
|         obj = r.recv()
 | |
|         t.join()
 | |
| 
 | |
|         self.assertEqual(obj, orig)
 | |
|         self.assertIsNot(obj, orig)
 | |
| 
 | |
|     def test_send_recv_nowait_main(self):
 | |
|         r, s = interpreters.create_channel()
 | |
|         orig = b'spam'
 | |
|         s.send_nowait(orig)
 | |
|         obj = r.recv_nowait()
 | |
| 
 | |
|         self.assertEqual(obj, orig)
 | |
|         self.assertIsNot(obj, orig)
 | |
| 
 | |
|     def test_send_recv_nowait_main_with_default(self):
 | |
|         r, _ = interpreters.create_channel()
 | |
|         obj = r.recv_nowait(None)
 | |
| 
 | |
|         self.assertIsNone(obj)
 | |
| 
 | |
|     def test_send_recv_nowait_same_interpreter(self):
 | |
|         interp = interpreters.create()
 | |
|         interp.run(dedent("""
 | |
|             from test.support import interpreters
 | |
|             r, s = interpreters.create_channel()
 | |
|             orig = b'spam'
 | |
|             s.send_nowait(orig)
 | |
|             obj = r.recv_nowait()
 | |
|             assert obj == orig, 'expected: obj == orig'
 | |
|             # When going back to the same interpreter we get the same object.
 | |
|             assert obj is not orig, 'expected: obj is not orig'
 | |
|             """))
 | |
| 
 | |
|     @unittest.skip('broken (see BPO-...)')
 | |
|     def test_send_recv_nowait_different_interpreters(self):
 | |
|         r1, s1 = interpreters.create_channel()
 | |
|         r2, s2 = interpreters.create_channel()
 | |
|         orig1 = b'spam'
 | |
|         s1.send_nowait(orig1)
 | |
|         out = _run_output(
 | |
|             interpreters.create(),
 | |
|             dedent(f"""
 | |
|                 obj1 = r.recv_nowait()
 | |
|                 assert obj1 == b'spam', 'expected: obj1 == orig1'
 | |
|                 # When going to another interpreter we get a copy.
 | |
|                 assert id(obj1) != {id(orig1)}, 'expected: obj1 is not orig1'
 | |
|                 orig2 = b'eggs'
 | |
|                 print(id(orig2))
 | |
|                 s.send_nowait(orig2)
 | |
|                 """),
 | |
|             channels=dict(r=r1, s=s2),
 | |
|             )
 | |
|         obj2 = r2.recv_nowait()
 | |
| 
 | |
|         self.assertEqual(obj2, b'eggs')
 | |
|         self.assertNotEqual(id(obj2), int(out))
 | |
| 
 | |
|     def test_recv_timeout(self):
 | |
|         r, _ = interpreters.create_channel()
 | |
|         with self.assertRaises(TimeoutError):
 | |
|             r.recv(timeout=1)
 | |
| 
 | |
|     def test_recv_channel_does_not_exist(self):
 | |
|         ch = interpreters.RecvChannel(1_000_000)
 | |
|         with self.assertRaises(interpreters.ChannelNotFoundError):
 | |
|             ch.recv()
 | |
| 
 | |
|     def test_send_channel_does_not_exist(self):
 | |
|         ch = interpreters.SendChannel(1_000_000)
 | |
|         with self.assertRaises(interpreters.ChannelNotFoundError):
 | |
|             ch.send(b'spam')
 | |
| 
 | |
|     def test_recv_nowait_channel_does_not_exist(self):
 | |
|         ch = interpreters.RecvChannel(1_000_000)
 | |
|         with self.assertRaises(interpreters.ChannelNotFoundError):
 | |
|             ch.recv_nowait()
 | |
| 
 | |
|     def test_send_nowait_channel_does_not_exist(self):
 | |
|         ch = interpreters.SendChannel(1_000_000)
 | |
|         with self.assertRaises(interpreters.ChannelNotFoundError):
 | |
|             ch.send_nowait(b'spam')
 | |
| 
 | |
|     def test_recv_nowait_empty(self):
 | |
|         ch, _ = interpreters.create_channel()
 | |
|         with self.assertRaises(interpreters.ChannelEmptyError):
 | |
|             ch.recv_nowait()
 | |
| 
 | |
|     def test_recv_nowait_default(self):
 | |
|         default = object()
 | |
|         rch, sch = interpreters.create_channel()
 | |
|         obj1 = rch.recv_nowait(default)
 | |
|         sch.send_nowait(None)
 | |
|         sch.send_nowait(1)
 | |
|         sch.send_nowait(b'spam')
 | |
|         sch.send_nowait(b'eggs')
 | |
|         obj2 = rch.recv_nowait(default)
 | |
|         obj3 = rch.recv_nowait(default)
 | |
|         obj4 = rch.recv_nowait()
 | |
|         obj5 = rch.recv_nowait(default)
 | |
|         obj6 = rch.recv_nowait(default)
 | |
| 
 | |
|         self.assertIs(obj1, default)
 | |
|         self.assertIs(obj2, None)
 | |
|         self.assertEqual(obj3, 1)
 | |
|         self.assertEqual(obj4, b'spam')
 | |
|         self.assertEqual(obj5, b'eggs')
 | |
|         self.assertIs(obj6, default)
 | |
| 
 | |
|     def test_send_buffer(self):
 | |
|         buf = bytearray(b'spamspamspam')
 | |
|         obj = None
 | |
|         rch, sch = interpreters.create_channel()
 | |
| 
 | |
|         def f():
 | |
|             nonlocal obj
 | |
|             while True:
 | |
|                 try:
 | |
|                     obj = rch.recv()
 | |
|                     break
 | |
|                 except interpreters.ChannelEmptyError:
 | |
|                     time.sleep(0.1)
 | |
|         t = threading.Thread(target=f)
 | |
|         t.start()
 | |
| 
 | |
|         sch.send_buffer(buf)
 | |
|         t.join()
 | |
| 
 | |
|         self.assertIsNot(obj, buf)
 | |
|         self.assertIsInstance(obj, memoryview)
 | |
|         self.assertEqual(obj, buf)
 | |
| 
 | |
|         buf[4:8] = b'eggs'
 | |
|         self.assertEqual(obj, buf)
 | |
|         obj[4:8] = b'ham.'
 | |
|         self.assertEqual(obj, buf)
 | |
| 
 | |
|     def test_send_buffer_nowait(self):
 | |
|         buf = bytearray(b'spamspamspam')
 | |
|         rch, sch = interpreters.create_channel()
 | |
|         sch.send_buffer_nowait(buf)
 | |
|         obj = rch.recv()
 | |
| 
 | |
|         self.assertIsNot(obj, buf)
 | |
|         self.assertIsInstance(obj, memoryview)
 | |
|         self.assertEqual(obj, buf)
 | |
| 
 | |
|         buf[4:8] = b'eggs'
 | |
|         self.assertEqual(obj, buf)
 | |
|         obj[4:8] = b'ham.'
 | |
|         self.assertEqual(obj, buf)
 | 
