mirror of
				https://github.com/python/cpython.git
				synced 2025-10-31 13:41:24 +00:00 
			
		
		
		
	[3.13] gh-121790: Fix interactive console initialization (GH-121793) (GH-121822)
(cherry picked from commit e5c7216f37)
Co-authored-by: Milan Oberkirch <milan.oberkirch@geops.com>
Co-authored-by: Łukasz Langa <lukasz@langa.pl>
			
			
This commit is contained in:
		
							parent
							
								
									0794220a69
								
							
						
					
					
						commit
						5b718e7fc7
					
				
					 6 changed files with 85 additions and 42 deletions
				
			
		|  | @ -23,7 +23,7 @@ | |||
| 
 | ||||
| def interactive_console(mainmodule=None, quiet=False, pythonstartup=False): | ||||
|     if not CAN_USE_PYREPL: | ||||
|         if not os.environ.get('PYTHON_BASIC_REPL', None) and FAIL_REASON: | ||||
|         if not os.getenv('PYTHON_BASIC_REPL') and FAIL_REASON: | ||||
|             from .trace import trace | ||||
|             trace(FAIL_REASON) | ||||
|             print(FAIL_REASON, file=sys.stderr) | ||||
|  | @ -51,5 +51,7 @@ def interactive_console(mainmodule=None, quiet=False, pythonstartup=False): | |||
|     if not hasattr(sys, "ps2"): | ||||
|         sys.ps2 = "... " | ||||
| 
 | ||||
|     from .console import InteractiveColoredConsole | ||||
|     from .simple_interact import run_multiline_interactive_console | ||||
|     run_multiline_interactive_console(namespace) | ||||
|     console = InteractiveColoredConsole(namespace, filename="<stdin>") | ||||
|     run_multiline_interactive_console(console) | ||||
|  |  | |||
|  | @ -58,7 +58,7 @@ | |||
| TYPE_CHECKING = False | ||||
| 
 | ||||
| if TYPE_CHECKING: | ||||
|     from typing import Any | ||||
|     from typing import Any, Mapping | ||||
| 
 | ||||
| 
 | ||||
| MoreLinesCallable = Callable[[str], bool] | ||||
|  | @ -559,7 +559,7 @@ def stub(*args: object, **kwds: object) -> None: | |||
| # ____________________________________________________________ | ||||
| 
 | ||||
| 
 | ||||
| def _setup(namespace: dict[str, Any]) -> None: | ||||
| def _setup(namespace: Mapping[str, Any]) -> None: | ||||
|     global raw_input | ||||
|     if raw_input is not None: | ||||
|         return  # don't run _setup twice | ||||
|  | @ -575,7 +575,9 @@ def _setup(namespace: dict[str, Any]) -> None: | |||
|     _wrapper.f_in = f_in | ||||
|     _wrapper.f_out = f_out | ||||
| 
 | ||||
|     # set up namespace in rlcompleter | ||||
|     # set up namespace in rlcompleter, which requires it to be a bona fide dict | ||||
|     if not isinstance(namespace, dict): | ||||
|         namespace = dict(namespace) | ||||
|     _wrapper.config.readline_completer = RLCompleter(namespace).complete | ||||
| 
 | ||||
|     # this is not really what readline.c does.  Better than nothing I guess | ||||
|  |  | |||
|  | @ -27,12 +27,9 @@ | |||
| 
 | ||||
| import _sitebuiltins | ||||
| import linecache | ||||
| import builtins | ||||
| import sys | ||||
| import code | ||||
| from types import ModuleType | ||||
| 
 | ||||
| from .console import InteractiveColoredConsole | ||||
| from .readline import _get_reader, multiline_input | ||||
| 
 | ||||
| TYPE_CHECKING = False | ||||
|  | @ -82,17 +79,12 @@ def _clear_screen(): | |||
| 
 | ||||
| 
 | ||||
| def run_multiline_interactive_console( | ||||
|     namespace: dict[str, Any], | ||||
|     console: code.InteractiveConsole, | ||||
|     *, | ||||
|     future_flags: int = 0, | ||||
|     console: code.InteractiveConsole | None = None, | ||||
| ) -> None: | ||||
|     from .readline import _setup | ||||
|     _setup(namespace) | ||||
| 
 | ||||
|     if console is None: | ||||
|         console = InteractiveColoredConsole( | ||||
|             namespace, filename="<stdin>" | ||||
|         ) | ||||
|     _setup(console.locals) | ||||
|     if future_flags: | ||||
|         console.compile.compiler.flags |= future_flags | ||||
| 
 | ||||
|  |  | |||
|  | @ -97,30 +97,16 @@ def run(self): | |||
|                     exec(startup_code, console.locals) | ||||
| 
 | ||||
|             ps1 = getattr(sys, "ps1", ">>> ") | ||||
|             if can_colorize(): | ||||
|             if can_colorize() and CAN_USE_PYREPL: | ||||
|                 ps1 = f"{ANSIColors.BOLD_MAGENTA}{ps1}{ANSIColors.RESET}" | ||||
|             console.write(f"{ps1}import asyncio\n") | ||||
| 
 | ||||
|             try: | ||||
|                 import errno | ||||
|                 if os.getenv("PYTHON_BASIC_REPL"): | ||||
|                     raise RuntimeError("user environment requested basic REPL") | ||||
|                 if not os.isatty(sys.stdin.fileno()): | ||||
|                     return_code = errno.ENOTTY | ||||
|                     raise OSError(return_code, "tty required", "stdin") | ||||
| 
 | ||||
|                 # This import will fail on operating systems with no termios. | ||||
|             if CAN_USE_PYREPL: | ||||
|                 from _pyrepl.simple_interact import ( | ||||
|                     check, | ||||
|                     run_multiline_interactive_console, | ||||
|                 ) | ||||
|                 if err := check(): | ||||
|                     raise RuntimeError(err) | ||||
|             except Exception as e: | ||||
|                 console.interact(banner="", exitmsg="") | ||||
|             else: | ||||
|                 try: | ||||
|                     run_multiline_interactive_console(console=console) | ||||
|                     run_multiline_interactive_console(console) | ||||
|                 except SystemExit: | ||||
|                     # expected via the `exit` and `quit` commands | ||||
|                     pass | ||||
|  | @ -129,6 +115,8 @@ def run(self): | |||
|                     console.showtraceback() | ||||
|                     console.write("Internal error, ") | ||||
|                     return_code = 1 | ||||
|             else: | ||||
|                 console.interact(banner="", exitmsg="") | ||||
|         finally: | ||||
|             warnings.filterwarnings( | ||||
|                 'ignore', | ||||
|  | @ -139,7 +127,10 @@ def run(self): | |||
| 
 | ||||
| 
 | ||||
| if __name__ == '__main__': | ||||
|     CAN_USE_PYREPL = True | ||||
|     if os.getenv('PYTHON_BASIC_REPL'): | ||||
|         CAN_USE_PYREPL = False | ||||
|     else: | ||||
|         from _pyrepl.main import CAN_USE_PYREPL | ||||
| 
 | ||||
|     return_code = 0 | ||||
|     loop = asyncio.new_event_loop() | ||||
|  |  | |||
|  | @ -517,10 +517,7 @@ def register_readline(): | |||
|         pass | ||||
| 
 | ||||
|     if readline.get_current_history_length() == 0: | ||||
|         try: | ||||
|             from _pyrepl.main import CAN_USE_PYREPL | ||||
|         except ImportError: | ||||
|             CAN_USE_PYREPL = False | ||||
|         from _pyrepl.main import CAN_USE_PYREPL | ||||
|         # If no history was loaded, default to .python_history, | ||||
|         # or PYTHON_HISTORY. | ||||
|         # The guard is necessary to avoid doubling history size at | ||||
|  |  | |||
|  | @ -1,15 +1,27 @@ | |||
| """Test the interactive interpreter.""" | ||||
| 
 | ||||
| import os | ||||
| import select | ||||
| import subprocess | ||||
| import sys | ||||
| import unittest | ||||
| from textwrap import dedent | ||||
| from test import support | ||||
| from test.support import cpython_only, has_subprocess_support, SuppressCrashReport | ||||
| from test.support.script_helper import assert_python_failure, kill_python, assert_python_ok | ||||
| from test.support import ( | ||||
|     cpython_only, | ||||
|     has_subprocess_support, | ||||
|     os_helper, | ||||
|     SuppressCrashReport, | ||||
|     SHORT_TIMEOUT, | ||||
| ) | ||||
| from test.support.script_helper import kill_python | ||||
| from test.support.import_helper import import_module | ||||
| 
 | ||||
| try: | ||||
|     import pty | ||||
| except ImportError: | ||||
|     pty = None | ||||
| 
 | ||||
| 
 | ||||
| if not has_subprocess_support: | ||||
|     raise unittest.SkipTest("test module requires subprocess") | ||||
|  | @ -195,9 +207,56 @@ def bar(x): | |||
|         expected = "(30, None, [\'def foo(x):\\n\', \'    return x + 1\\n\', \'\\n\'], \'<stdin>\')" | ||||
|         self.assertIn(expected, output, expected) | ||||
| 
 | ||||
|     def test_asyncio_repl_no_tty_fails(self): | ||||
|         assert assert_python_failure("-m", "asyncio") | ||||
|     def test_asyncio_repl_reaches_python_startup_script(self): | ||||
|         with os_helper.temp_dir() as tmpdir: | ||||
|             script = os.path.join(tmpdir, "pythonstartup.py") | ||||
|             with open(script, "w") as f: | ||||
|                 f.write("print('pythonstartup done!')" + os.linesep) | ||||
|                 f.write("exit(0)" + os.linesep) | ||||
| 
 | ||||
|             env = os.environ.copy() | ||||
|             env["PYTHONSTARTUP"] = script | ||||
|             subprocess.check_call( | ||||
|                 [sys.executable, "-m", "asyncio"], | ||||
|                 stdout=subprocess.PIPE, | ||||
|                 stderr=subprocess.PIPE, | ||||
|                 env=env, | ||||
|                 timeout=SHORT_TIMEOUT, | ||||
|             ) | ||||
| 
 | ||||
|     @unittest.skipUnless(pty, "requires pty") | ||||
|     def test_asyncio_repl_is_ok(self): | ||||
|         m, s = pty.openpty() | ||||
|         cmd = [sys.executable, "-m", "asyncio"] | ||||
|         proc = subprocess.Popen( | ||||
|             cmd, | ||||
|             stdin=s, | ||||
|             stdout=s, | ||||
|             stderr=s, | ||||
|             text=True, | ||||
|             close_fds=True, | ||||
|             env=os.environ, | ||||
|         ) | ||||
|         os.close(s) | ||||
|         os.write(m, b"await asyncio.sleep(0)\n") | ||||
|         os.write(m, b"exit()\n") | ||||
|         output = [] | ||||
|         while select.select([m], [], [], SHORT_TIMEOUT)[0]: | ||||
|             try: | ||||
|                 data = os.read(m, 1024).decode("utf-8") | ||||
|                 if not data: | ||||
|                     break | ||||
|             except OSError: | ||||
|                 break | ||||
|             output.append(data) | ||||
|         os.close(m) | ||||
|         try: | ||||
|             exit_code = proc.wait(timeout=SHORT_TIMEOUT) | ||||
|         except subprocess.TimeoutExpired: | ||||
|             proc.kill() | ||||
|             exit_code = proc.wait() | ||||
| 
 | ||||
|         self.assertEqual(exit_code, 0) | ||||
| 
 | ||||
| class TestInteractiveModeSyntaxErrors(unittest.TestCase): | ||||
| 
 | ||||
|  |  | |||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue
	
	 Miss Islington (bot)
						Miss Islington (bot)