| 
									
										
										
										
											2017-11-12 16:50:48 +01:00
										 |  |  | """Test the interactive interpreter.""" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import os | 
					
						
							| 
									
										
										
										
											2024-07-16 00:49:41 +02:00
										 |  |  | import select | 
					
						
							| 
									
										
										
										
											2017-11-12 16:50:48 +01:00
										 |  |  | import subprocess | 
					
						
							| 
									
										
										
										
											2024-06-11 20:04:39 +02:00
										 |  |  | import sys | 
					
						
							|  |  |  | import unittest | 
					
						
							| 
									
										
										
										
											2017-11-12 16:50:48 +01:00
										 |  |  | from textwrap import dedent | 
					
						
							| 
									
										
										
										
											2023-08-31 18:33:34 +02:00
										 |  |  | from test import support | 
					
						
							| 
									
										
										
										
											2024-07-16 00:49:41 +02:00
										 |  |  | from test.support import ( | 
					
						
							|  |  |  |     cpython_only, | 
					
						
							|  |  |  |     has_subprocess_support, | 
					
						
							|  |  |  |     os_helper, | 
					
						
							|  |  |  |     SuppressCrashReport, | 
					
						
							|  |  |  |     SHORT_TIMEOUT, | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | from test.support.script_helper import kill_python | 
					
						
							| 
									
										
										
										
											2024-04-03 15:11:36 +02:00
										 |  |  | from test.support.import_helper import import_module | 
					
						
							| 
									
										
										
										
											2017-11-12 16:50:48 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-16 00:49:41 +02:00
										 |  |  | try: | 
					
						
							|  |  |  |     import pty | 
					
						
							|  |  |  | except ImportError: | 
					
						
							|  |  |  |     pty = None | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-01-25 09:09:06 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  | if not has_subprocess_support: | 
					
						
							|  |  |  |     raise unittest.SkipTest("test module requires subprocess") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-11-12 16:50:48 +01:00
										 |  |  | def spawn_repl(*args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, **kw): | 
					
						
							|  |  |  |     """Run the Python REPL with the given arguments.
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     kw is extra keyword args to pass to subprocess.Popen. Returns a Popen | 
					
						
							|  |  |  |     object. | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     # To run the REPL without using a terminal, spawn python with the command | 
					
						
							|  |  |  |     # line option '-i' and the process name set to '<stdin>'. | 
					
						
							|  |  |  |     # The directory of argv[0] must match the directory of the Python | 
					
						
							|  |  |  |     # executable for the Popen() call to python to succeed as the directory | 
					
						
							|  |  |  |     # path may be used by Py_GetPath() to build the default module search | 
					
						
							|  |  |  |     # path. | 
					
						
							|  |  |  |     stdin_fname = os.path.join(os.path.dirname(sys.executable), "<stdin>") | 
					
						
							|  |  |  |     cmd_line = [stdin_fname, '-E', '-i'] | 
					
						
							|  |  |  |     cmd_line.extend(args) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     # Set TERM=vt100, for the rationale see the comments in spawn_python() of | 
					
						
							|  |  |  |     # test.support.script_helper. | 
					
						
							|  |  |  |     env = kw.setdefault('env', dict(os.environ)) | 
					
						
							|  |  |  |     env['TERM'] = 'vt100' | 
					
						
							| 
									
										
										
										
											2020-06-03 14:39:59 +02:00
										 |  |  |     return subprocess.Popen(cmd_line, | 
					
						
							|  |  |  |                             executable=sys.executable, | 
					
						
							|  |  |  |                             text=True, | 
					
						
							| 
									
										
										
										
											2017-11-12 16:50:48 +01:00
										 |  |  |                             stdin=subprocess.PIPE, | 
					
						
							|  |  |  |                             stdout=stdout, stderr=stderr, | 
					
						
							|  |  |  |                             **kw) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-01-20 23:07:43 +03:00
										 |  |  | def run_on_interactive_mode(source): | 
					
						
							|  |  |  |     """Spawn a new Python interpreter, pass the given
 | 
					
						
							|  |  |  |     input source code from the stdin and return the | 
					
						
							|  |  |  |     result back. If the interpreter exits non-zero, it | 
					
						
							|  |  |  |     raises a ValueError."""
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     process = spawn_repl() | 
					
						
							|  |  |  |     process.stdin.write(source) | 
					
						
							|  |  |  |     output = kill_python(process) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     if process.returncode != 0: | 
					
						
							|  |  |  |         raise ValueError("Process didn't exit properly.") | 
					
						
							|  |  |  |     return output | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-11-12 16:50:48 +01:00
										 |  |  | class TestInteractiveInterpreter(unittest.TestCase): | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     @cpython_only | 
					
						
							| 
									
										
										
										
											2023-08-31 18:33:34 +02:00
										 |  |  |     # Python built with Py_TRACE_REFS fail with a fatal error in | 
					
						
							|  |  |  |     # _PyRefchain_Trace() on memory allocation error. | 
					
						
							|  |  |  |     @unittest.skipIf(support.Py_TRACE_REFS, 'cannot test Py_TRACE_REFS build') | 
					
						
							| 
									
										
										
										
											2017-11-12 16:50:48 +01:00
										 |  |  |     def test_no_memory(self): | 
					
						
							| 
									
										
										
										
											2024-04-03 15:11:36 +02:00
										 |  |  |         import_module("_testcapi") | 
					
						
							| 
									
										
										
										
											2017-11-12 16:50:48 +01:00
										 |  |  |         # Issue #30696: Fix the interactive interpreter looping endlessly when | 
					
						
							|  |  |  |         # no memory. Check also that the fix does not break the interactive | 
					
						
							|  |  |  |         # loop when an exception is raised. | 
					
						
							|  |  |  |         user_input = """
 | 
					
						
							|  |  |  |             import sys, _testcapi | 
					
						
							|  |  |  |             1/0 | 
					
						
							|  |  |  |             print('After the exception.') | 
					
						
							|  |  |  |             _testcapi.set_nomemory(0) | 
					
						
							|  |  |  |             sys.exit(0) | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         user_input = dedent(user_input) | 
					
						
							|  |  |  |         p = spawn_repl() | 
					
						
							|  |  |  |         with SuppressCrashReport(): | 
					
						
							|  |  |  |             p.stdin.write(user_input) | 
					
						
							|  |  |  |         output = kill_python(p) | 
					
						
							| 
									
										
										
										
											2020-06-03 14:39:59 +02:00
										 |  |  |         self.assertIn('After the exception.', output) | 
					
						
							| 
									
										
										
										
											2017-11-12 16:50:48 +01:00
										 |  |  |         # Exit code 120: Py_FinalizeEx() failed to flush stdout and stderr. | 
					
						
							|  |  |  |         self.assertIn(p.returncode, (1, 120)) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-01-06 15:59:09 +00:00
										 |  |  |     @cpython_only | 
					
						
							|  |  |  |     def test_multiline_string_parsing(self): | 
					
						
							|  |  |  |         # bpo-39209: Multiline string tokens need to be handled in the tokenizer | 
					
						
							|  |  |  |         # in two places: the interactive path and the non-interactive path. | 
					
						
							|  |  |  |         user_input = '''\
 | 
					
						
							|  |  |  |         x = """<?xml version="1.0" encoding="iso-8859-1"?>
 | 
					
						
							|  |  |  |         <test> | 
					
						
							|  |  |  |             <Users> | 
					
						
							|  |  |  |                 <fun25> | 
					
						
							|  |  |  |                     <limits> | 
					
						
							|  |  |  |                         <total>0KiB</total> | 
					
						
							|  |  |  |                         <kbps>0</kbps> | 
					
						
							|  |  |  |                         <rps>1.3</rps> | 
					
						
							|  |  |  |                         <connections>0</connections> | 
					
						
							|  |  |  |                     </limits> | 
					
						
							|  |  |  |                     <usages> | 
					
						
							|  |  |  |                         <total>16738211KiB</total> | 
					
						
							|  |  |  |                         <kbps>237.15</kbps> | 
					
						
							|  |  |  |                         <rps>1.3</rps> | 
					
						
							|  |  |  |                         <connections>0</connections> | 
					
						
							|  |  |  |                     </usages> | 
					
						
							|  |  |  |                     <time_to_refresh>never</time_to_refresh> | 
					
						
							|  |  |  |                     <limit_exceeded_URL>none</limit_exceeded_URL> | 
					
						
							|  |  |  |                 </fun25> | 
					
						
							|  |  |  |             </Users> | 
					
						
							|  |  |  |         </test>"""
 | 
					
						
							|  |  |  |         '''
 | 
					
						
							|  |  |  |         user_input = dedent(user_input) | 
					
						
							|  |  |  |         p = spawn_repl() | 
					
						
							| 
									
										
										
										
											2020-06-03 14:39:59 +02:00
										 |  |  |         p.stdin.write(user_input) | 
					
						
							| 
									
										
										
										
											2020-01-06 15:59:09 +00:00
										 |  |  |         output = kill_python(p) | 
					
						
							|  |  |  |         self.assertEqual(p.returncode, 0) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-06-03 14:39:59 +02:00
										 |  |  |     def test_close_stdin(self): | 
					
						
							|  |  |  |         user_input = dedent('''
 | 
					
						
							|  |  |  |             import os | 
					
						
							|  |  |  |             print("before close") | 
					
						
							|  |  |  |             os.close(0) | 
					
						
							|  |  |  |         ''')
 | 
					
						
							| 
									
										
										
										
											2020-06-10 18:49:23 +02:00
										 |  |  |         prepare_repl = dedent('''
 | 
					
						
							|  |  |  |             from test.support import suppress_msvcrt_asserts | 
					
						
							|  |  |  |             suppress_msvcrt_asserts() | 
					
						
							|  |  |  |         ''')
 | 
					
						
							|  |  |  |         process = spawn_repl('-c', prepare_repl) | 
					
						
							| 
									
										
										
										
											2020-06-03 14:39:59 +02:00
										 |  |  |         output = process.communicate(user_input)[0] | 
					
						
							|  |  |  |         self.assertEqual(process.returncode, 0) | 
					
						
							|  |  |  |         self.assertIn('before close', output) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-10-13 11:25:37 +02:00
										 |  |  |     def test_interactive_traceback_reporting(self): | 
					
						
							|  |  |  |         user_input = "1 / 0 / 3 / 4" | 
					
						
							|  |  |  |         p = spawn_repl() | 
					
						
							|  |  |  |         p.stdin.write(user_input) | 
					
						
							|  |  |  |         output = kill_python(p) | 
					
						
							|  |  |  |         self.assertEqual(p.returncode, 0) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         traceback_lines = output.splitlines()[-6:-1] | 
					
						
							|  |  |  |         expected_lines = [ | 
					
						
							|  |  |  |             "Traceback (most recent call last):", | 
					
						
							|  |  |  |             "  File \"<stdin>\", line 1, in <module>", | 
					
						
							|  |  |  |             "    1 / 0 / 3 / 4", | 
					
						
							|  |  |  |             "    ~~^~~", | 
					
						
							|  |  |  |             "ZeroDivisionError: division by zero", | 
					
						
							|  |  |  |         ] | 
					
						
							|  |  |  |         self.assertEqual(traceback_lines, expected_lines) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_interactive_traceback_reporting_multiple_input(self): | 
					
						
							|  |  |  |         user_input1 = dedent("""
 | 
					
						
							|  |  |  |         def foo(x): | 
					
						
							|  |  |  |             1 / x | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         """)
 | 
					
						
							|  |  |  |         p = spawn_repl() | 
					
						
							|  |  |  |         p.stdin.write(user_input1) | 
					
						
							|  |  |  |         user_input2 = "foo(0)" | 
					
						
							|  |  |  |         p.stdin.write(user_input2) | 
					
						
							|  |  |  |         output = kill_python(p) | 
					
						
							|  |  |  |         self.assertEqual(p.returncode, 0) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-12-01 14:18:16 -08:00
										 |  |  |         traceback_lines = output.splitlines()[-8:-1] | 
					
						
							| 
									
										
										
										
											2023-10-13 11:25:37 +02:00
										 |  |  |         expected_lines = [ | 
					
						
							|  |  |  |             '  File "<stdin>", line 1, in <module>', | 
					
						
							|  |  |  |             '    foo(0)', | 
					
						
							| 
									
										
										
										
											2023-12-01 14:18:16 -08:00
										 |  |  |             '    ~~~^^^', | 
					
						
							| 
									
										
										
										
											2023-10-13 11:25:37 +02:00
										 |  |  |             '  File "<stdin>", line 2, in foo', | 
					
						
							|  |  |  |             '    1 / x', | 
					
						
							|  |  |  |             '    ~~^~~', | 
					
						
							|  |  |  |             'ZeroDivisionError: division by zero' | 
					
						
							|  |  |  |         ] | 
					
						
							|  |  |  |         self.assertEqual(traceback_lines, expected_lines) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_interactive_source_is_in_linecache(self): | 
					
						
							|  |  |  |         user_input = dedent("""
 | 
					
						
							|  |  |  |         def foo(x): | 
					
						
							|  |  |  |             return x + 1 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         def bar(x): | 
					
						
							|  |  |  |             return foo(x) + 2 | 
					
						
							|  |  |  |         """)
 | 
					
						
							|  |  |  |         p = spawn_repl() | 
					
						
							|  |  |  |         p.stdin.write(user_input) | 
					
						
							|  |  |  |         user_input2 = dedent("""
 | 
					
						
							|  |  |  |         import linecache | 
					
						
							| 
									
										
										
										
											2023-10-26 15:17:28 +09:00
										 |  |  |         print(linecache.cache['<stdin>-1']) | 
					
						
							| 
									
										
										
										
											2023-10-13 11:25:37 +02:00
										 |  |  |         """)
 | 
					
						
							|  |  |  |         p.stdin.write(user_input2) | 
					
						
							|  |  |  |         output = kill_python(p) | 
					
						
							|  |  |  |         self.assertEqual(p.returncode, 0) | 
					
						
							|  |  |  |         expected = "(30, None, [\'def foo(x):\\n\', \'    return x + 1\\n\', \'\\n\'], \'<stdin>\')" | 
					
						
							|  |  |  |         self.assertIn(expected, output, expected) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-16 00:49:41 +02:00
										 |  |  |     def test_asyncio_repl_reaches_python_startup_script(self): | 
					
						
							|  |  |  |         with os_helper.temp_dir() as tmpdir: | 
					
						
							|  |  |  |             script = os.path.join(tmpdir, "pythonstartup.py") | 
					
						
							|  |  |  |             with open(script, "w") as f: | 
					
						
							|  |  |  |                 f.write("print('pythonstartup done!')" + os.linesep) | 
					
						
							|  |  |  |                 f.write("exit(0)" + os.linesep) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             env = os.environ.copy() | 
					
						
							|  |  |  |             env["PYTHONSTARTUP"] = script | 
					
						
							|  |  |  |             subprocess.check_call( | 
					
						
							|  |  |  |                 [sys.executable, "-m", "asyncio"], | 
					
						
							|  |  |  |                 stdout=subprocess.PIPE, | 
					
						
							|  |  |  |                 stderr=subprocess.PIPE, | 
					
						
							|  |  |  |                 env=env, | 
					
						
							|  |  |  |                 timeout=SHORT_TIMEOUT, | 
					
						
							|  |  |  |             ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     @unittest.skipUnless(pty, "requires pty") | 
					
						
							|  |  |  |     def test_asyncio_repl_is_ok(self): | 
					
						
							|  |  |  |         m, s = pty.openpty() | 
					
						
							|  |  |  |         cmd = [sys.executable, "-m", "asyncio"] | 
					
						
							|  |  |  |         proc = subprocess.Popen( | 
					
						
							|  |  |  |             cmd, | 
					
						
							|  |  |  |             stdin=s, | 
					
						
							|  |  |  |             stdout=s, | 
					
						
							|  |  |  |             stderr=s, | 
					
						
							|  |  |  |             text=True, | 
					
						
							|  |  |  |             close_fds=True, | 
					
						
							|  |  |  |             env=os.environ, | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |         os.close(s) | 
					
						
							|  |  |  |         os.write(m, b"await asyncio.sleep(0)\n") | 
					
						
							|  |  |  |         os.write(m, b"exit()\n") | 
					
						
							|  |  |  |         output = [] | 
					
						
							|  |  |  |         while select.select([m], [], [], SHORT_TIMEOUT)[0]: | 
					
						
							|  |  |  |             try: | 
					
						
							|  |  |  |                 data = os.read(m, 1024).decode("utf-8") | 
					
						
							|  |  |  |                 if not data: | 
					
						
							|  |  |  |                     break | 
					
						
							|  |  |  |             except OSError: | 
					
						
							|  |  |  |                 break | 
					
						
							|  |  |  |             output.append(data) | 
					
						
							|  |  |  |         os.close(m) | 
					
						
							|  |  |  |         try: | 
					
						
							|  |  |  |             exit_code = proc.wait(timeout=SHORT_TIMEOUT) | 
					
						
							|  |  |  |         except subprocess.TimeoutExpired: | 
					
						
							|  |  |  |             proc.kill() | 
					
						
							|  |  |  |             exit_code = proc.wait() | 
					
						
							| 
									
										
										
										
											2024-05-09 17:47:31 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-16 00:49:41 +02:00
										 |  |  |         self.assertEqual(exit_code, 0) | 
					
						
							| 
									
										
										
										
											2023-10-13 11:25:37 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-01-20 23:07:43 +03:00
										 |  |  | class TestInteractiveModeSyntaxErrors(unittest.TestCase): | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_interactive_syntax_error_correct_line(self): | 
					
						
							|  |  |  |         output = run_on_interactive_mode(dedent("""\
 | 
					
						
							|  |  |  |         def f(): | 
					
						
							|  |  |  |             print(0) | 
					
						
							|  |  |  |             return yield 42 | 
					
						
							|  |  |  |         """))
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         traceback_lines = output.splitlines()[-4:-1] | 
					
						
							|  |  |  |         expected_lines = [ | 
					
						
							|  |  |  |             '    return yield 42', | 
					
						
							|  |  |  |             '           ^^^^^', | 
					
						
							|  |  |  |             'SyntaxError: invalid syntax' | 
					
						
							|  |  |  |         ] | 
					
						
							|  |  |  |         self.assertEqual(traceback_lines, expected_lines) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-11-12 16:50:48 +01:00
										 |  |  | if __name__ == "__main__": | 
					
						
							|  |  |  |     unittest.main() |