mirror of
				https://github.com/python/cpython.git
				synced 2025-11-04 07:31:38 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			1382 lines
		
	
	
	
		
			46 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			1382 lines
		
	
	
	
		
			46 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import io
 | 
						|
import itertools
 | 
						|
import os
 | 
						|
import pathlib
 | 
						|
import re
 | 
						|
import rlcompleter
 | 
						|
import select
 | 
						|
import subprocess
 | 
						|
import sys
 | 
						|
import tempfile
 | 
						|
from unittest import TestCase, skipUnless, skipIf
 | 
						|
from unittest.mock import patch
 | 
						|
from test.support import force_not_colorized, make_clean_env
 | 
						|
from test.support import SHORT_TIMEOUT, STDLIB_DIR
 | 
						|
from test.support.import_helper import import_module
 | 
						|
from test.support.os_helper import EnvironmentVarGuard, unlink
 | 
						|
 | 
						|
from .support import (
 | 
						|
    FakeConsole,
 | 
						|
    ScreenEqualMixin,
 | 
						|
    handle_all_events,
 | 
						|
    handle_events_narrow_console,
 | 
						|
    more_lines,
 | 
						|
    multiline_input,
 | 
						|
    code_to_events,
 | 
						|
)
 | 
						|
from _pyrepl.console import Event
 | 
						|
from _pyrepl.readline import (ReadlineAlikeReader, ReadlineConfig,
 | 
						|
                              _ReadlineWrapper)
 | 
						|
from _pyrepl.readline import multiline_input as readline_multiline_input
 | 
						|
 | 
						|
try:
 | 
						|
    import pty
 | 
						|
except ImportError:
 | 
						|
    pty = None
 | 
						|
 | 
						|
 | 
						|
class ReplTestCase(TestCase):
 | 
						|
    def run_repl(
 | 
						|
        self,
 | 
						|
        repl_input: str | list[str],
 | 
						|
        env: dict | None = None,
 | 
						|
        *,
 | 
						|
        cmdline_args: list[str] | None = None,
 | 
						|
        cwd: str | None = None,
 | 
						|
        skip: bool = False,
 | 
						|
    ) -> tuple[str, int]:
 | 
						|
        temp_dir = None
 | 
						|
        if cwd is None:
 | 
						|
            temp_dir = tempfile.TemporaryDirectory(ignore_cleanup_errors=True)
 | 
						|
            cwd = temp_dir.name
 | 
						|
        try:
 | 
						|
            return self._run_repl(
 | 
						|
                repl_input, env=env, cmdline_args=cmdline_args, cwd=cwd, skip=skip,
 | 
						|
            )
 | 
						|
        finally:
 | 
						|
            if temp_dir is not None:
 | 
						|
                temp_dir.cleanup()
 | 
						|
 | 
						|
    def _run_repl(
 | 
						|
        self,
 | 
						|
        repl_input: str | list[str],
 | 
						|
        *,
 | 
						|
        env: dict | None,
 | 
						|
        cmdline_args: list[str] | None,
 | 
						|
        cwd: str,
 | 
						|
        skip: bool,
 | 
						|
    ) -> tuple[str, int]:
 | 
						|
        assert pty
 | 
						|
        master_fd, slave_fd = pty.openpty()
 | 
						|
        cmd = [sys.executable, "-i", "-u"]
 | 
						|
        if env is None:
 | 
						|
            cmd.append("-I")
 | 
						|
        elif "PYTHON_HISTORY" not in env:
 | 
						|
            env["PYTHON_HISTORY"] = os.path.join(cwd, ".regrtest_history")
 | 
						|
        if cmdline_args is not None:
 | 
						|
            cmd.extend(cmdline_args)
 | 
						|
 | 
						|
        try:
 | 
						|
            import termios
 | 
						|
        except ModuleNotFoundError:
 | 
						|
            pass
 | 
						|
        else:
 | 
						|
            term_attr = termios.tcgetattr(slave_fd)
 | 
						|
            term_attr[6][termios.VREPRINT] = 0  # pass through CTRL-R
 | 
						|
            term_attr[6][termios.VINTR] = 0  # pass through CTRL-C
 | 
						|
            termios.tcsetattr(slave_fd, termios.TCSANOW, term_attr)
 | 
						|
 | 
						|
        process = subprocess.Popen(
 | 
						|
            cmd,
 | 
						|
            stdin=slave_fd,
 | 
						|
            stdout=slave_fd,
 | 
						|
            stderr=slave_fd,
 | 
						|
            cwd=cwd,
 | 
						|
            text=True,
 | 
						|
            close_fds=True,
 | 
						|
            env=env if env else os.environ,
 | 
						|
        )
 | 
						|
        os.close(slave_fd)
 | 
						|
        if isinstance(repl_input, list):
 | 
						|
            repl_input = "\n".join(repl_input) + "\n"
 | 
						|
        os.write(master_fd, repl_input.encode("utf-8"))
 | 
						|
 | 
						|
        output = []
 | 
						|
        while select.select([master_fd], [], [], SHORT_TIMEOUT)[0]:
 | 
						|
            try:
 | 
						|
                data = os.read(master_fd, 1024).decode("utf-8")
 | 
						|
                if not data:
 | 
						|
                    break
 | 
						|
            except OSError:
 | 
						|
                break
 | 
						|
            output.append(data)
 | 
						|
        else:
 | 
						|
            os.close(master_fd)
 | 
						|
            process.kill()
 | 
						|
            self.fail(f"Timeout while waiting for output, got: {''.join(output)}")
 | 
						|
 | 
						|
        os.close(master_fd)
 | 
						|
        try:
 | 
						|
            exit_code = process.wait(timeout=SHORT_TIMEOUT)
 | 
						|
        except subprocess.TimeoutExpired:
 | 
						|
            process.kill()
 | 
						|
            exit_code = process.wait()
 | 
						|
        output = "".join(output)
 | 
						|
        if skip and "can't use pyrepl" in output:
 | 
						|
            self.skipTest("pyrepl not available")
 | 
						|
        return output, exit_code
 | 
						|
 | 
						|
 | 
						|
class TestCursorPosition(TestCase):
 | 
						|
    def prepare_reader(self, events):
 | 
						|
        console = FakeConsole(events)
 | 
						|
        config = ReadlineConfig(readline_completer=None)
 | 
						|
        reader = ReadlineAlikeReader(console=console, config=config)
 | 
						|
        return reader
 | 
						|
 | 
						|
    def test_up_arrow_simple(self):
 | 
						|
        # fmt: off
 | 
						|
        code = (
 | 
						|
            "def f():\n"
 | 
						|
            "  ...\n"
 | 
						|
        )
 | 
						|
        # fmt: on
 | 
						|
        events = itertools.chain(
 | 
						|
            code_to_events(code),
 | 
						|
            [
 | 
						|
                Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
 | 
						|
            ],
 | 
						|
        )
 | 
						|
 | 
						|
        reader, console = handle_all_events(events)
 | 
						|
        self.assertEqual(reader.cxy, (0, 1))
 | 
						|
        console.move_cursor.assert_called_once_with(0, 1)
 | 
						|
 | 
						|
    def test_down_arrow_end_of_input(self):
 | 
						|
        # fmt: off
 | 
						|
        code = (
 | 
						|
            "def f():\n"
 | 
						|
            "  ...\n"
 | 
						|
        )
 | 
						|
        # fmt: on
 | 
						|
        events = itertools.chain(
 | 
						|
            code_to_events(code),
 | 
						|
            [
 | 
						|
                Event(evt="key", data="down", raw=bytearray(b"\x1bOB")),
 | 
						|
            ],
 | 
						|
        )
 | 
						|
 | 
						|
        reader, console = handle_all_events(events)
 | 
						|
        self.assertEqual(reader.cxy, (0, 2))
 | 
						|
        console.move_cursor.assert_called_once_with(0, 2)
 | 
						|
 | 
						|
    def test_left_arrow_simple(self):
 | 
						|
        events = itertools.chain(
 | 
						|
            code_to_events("11+11"),
 | 
						|
            [
 | 
						|
                Event(evt="key", data="left", raw=bytearray(b"\x1bOD")),
 | 
						|
            ],
 | 
						|
        )
 | 
						|
 | 
						|
        reader, console = handle_all_events(events)
 | 
						|
        self.assertEqual(reader.cxy, (4, 0))
 | 
						|
        console.move_cursor.assert_called_once_with(4, 0)
 | 
						|
 | 
						|
    def test_right_arrow_end_of_line(self):
 | 
						|
        events = itertools.chain(
 | 
						|
            code_to_events("11+11"),
 | 
						|
            [
 | 
						|
                Event(evt="key", data="right", raw=bytearray(b"\x1bOC")),
 | 
						|
            ],
 | 
						|
        )
 | 
						|
 | 
						|
        reader, console = handle_all_events(events)
 | 
						|
        self.assertEqual(reader.cxy, (5, 0))
 | 
						|
        console.move_cursor.assert_called_once_with(5, 0)
 | 
						|
 | 
						|
    def test_cursor_position_simple_character(self):
 | 
						|
        events = itertools.chain(code_to_events("k"))
 | 
						|
 | 
						|
        reader, _ = handle_all_events(events)
 | 
						|
        self.assertEqual(reader.pos, 1)
 | 
						|
 | 
						|
        # 1 for simple character
 | 
						|
        self.assertEqual(reader.cxy, (1, 0))
 | 
						|
 | 
						|
    def test_cursor_position_double_width_character(self):
 | 
						|
        events = itertools.chain(code_to_events("樂"))
 | 
						|
 | 
						|
        reader, _ = handle_all_events(events)
 | 
						|
        self.assertEqual(reader.pos, 1)
 | 
						|
 | 
						|
        # 2 for wide character
 | 
						|
        self.assertEqual(reader.cxy, (2, 0))
 | 
						|
 | 
						|
    def test_cursor_position_double_width_character_move_left(self):
 | 
						|
        events = itertools.chain(
 | 
						|
            code_to_events("樂"),
 | 
						|
            [
 | 
						|
                Event(evt="key", data="left", raw=bytearray(b"\x1bOD")),
 | 
						|
            ],
 | 
						|
        )
 | 
						|
 | 
						|
        reader, _ = handle_all_events(events)
 | 
						|
        self.assertEqual(reader.pos, 0)
 | 
						|
        self.assertEqual(reader.cxy, (0, 0))
 | 
						|
 | 
						|
    def test_cursor_position_double_width_character_move_left_right(self):
 | 
						|
        events = itertools.chain(
 | 
						|
            code_to_events("樂"),
 | 
						|
            [
 | 
						|
                Event(evt="key", data="left", raw=bytearray(b"\x1bOD")),
 | 
						|
                Event(evt="key", data="right", raw=bytearray(b"\x1bOC")),
 | 
						|
            ],
 | 
						|
        )
 | 
						|
 | 
						|
        reader, _ = handle_all_events(events)
 | 
						|
        self.assertEqual(reader.pos, 1)
 | 
						|
 | 
						|
        # 2 for wide character
 | 
						|
        self.assertEqual(reader.cxy, (2, 0))
 | 
						|
 | 
						|
    def test_cursor_position_double_width_characters_move_up(self):
 | 
						|
        for_loop = "for _ in _:"
 | 
						|
 | 
						|
        # fmt: off
 | 
						|
        code = (
 | 
						|
           f"{for_loop}\n"
 | 
						|
            "  ' 可口可乐; 可口可樂'"
 | 
						|
        )
 | 
						|
        # fmt: on
 | 
						|
 | 
						|
        events = itertools.chain(
 | 
						|
            code_to_events(code),
 | 
						|
            [
 | 
						|
                Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
 | 
						|
            ],
 | 
						|
        )
 | 
						|
 | 
						|
        reader, _ = handle_all_events(events)
 | 
						|
 | 
						|
        # cursor at end of first line
 | 
						|
        self.assertEqual(reader.pos, len(for_loop))
 | 
						|
        self.assertEqual(reader.cxy, (len(for_loop), 0))
 | 
						|
 | 
						|
    def test_cursor_position_double_width_characters_move_up_down(self):
 | 
						|
        for_loop = "for _ in _:"
 | 
						|
 | 
						|
        # fmt: off
 | 
						|
        code = (
 | 
						|
           f"{for_loop}\n"
 | 
						|
            "  ' 可口可乐; 可口可樂'"
 | 
						|
        )
 | 
						|
        # fmt: on
 | 
						|
 | 
						|
        events = itertools.chain(
 | 
						|
            code_to_events(code),
 | 
						|
            [
 | 
						|
                Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
 | 
						|
                Event(evt="key", data="left", raw=bytearray(b"\x1bOD")),
 | 
						|
                Event(evt="key", data="down", raw=bytearray(b"\x1bOB")),
 | 
						|
            ],
 | 
						|
        )
 | 
						|
 | 
						|
        reader, _ = handle_all_events(events)
 | 
						|
 | 
						|
        # cursor here (showing 2nd line only):
 | 
						|
        # <  ' 可口可乐; 可口可樂'>
 | 
						|
        #              ^
 | 
						|
        self.assertEqual(reader.pos, 19)
 | 
						|
        self.assertEqual(reader.cxy, (10, 1))
 | 
						|
 | 
						|
    def test_cursor_position_multiple_double_width_characters_move_left(self):
 | 
						|
        events = itertools.chain(
 | 
						|
            code_to_events("' 可口可乐; 可口可樂'"),
 | 
						|
            [
 | 
						|
                Event(evt="key", data="left", raw=bytearray(b"\x1bOD")),
 | 
						|
                Event(evt="key", data="left", raw=bytearray(b"\x1bOD")),
 | 
						|
                Event(evt="key", data="left", raw=bytearray(b"\x1bOD")),
 | 
						|
            ],
 | 
						|
        )
 | 
						|
 | 
						|
        reader, _ = handle_all_events(events)
 | 
						|
        self.assertEqual(reader.pos, 10)
 | 
						|
 | 
						|
        # 1 for quote, 1 for space, 2 per wide character,
 | 
						|
        # 1 for semicolon, 1 for space, 2 per wide character
 | 
						|
        self.assertEqual(reader.cxy, (16, 0))
 | 
						|
 | 
						|
    def test_cursor_position_move_up_to_eol(self):
 | 
						|
        first_line = "for _ in _:"
 | 
						|
        second_line = "  hello"
 | 
						|
 | 
						|
        # fmt: off
 | 
						|
        code = (
 | 
						|
            f"{first_line}\n"
 | 
						|
            f"{second_line}\n"
 | 
						|
             "  h\n"
 | 
						|
             "  hel"
 | 
						|
        )
 | 
						|
        # fmt: on
 | 
						|
 | 
						|
        events = itertools.chain(
 | 
						|
            code_to_events(code),
 | 
						|
            [
 | 
						|
                Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
 | 
						|
                Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
 | 
						|
            ],
 | 
						|
        )
 | 
						|
 | 
						|
        reader, _ = handle_all_events(events)
 | 
						|
 | 
						|
        # Cursor should be at end of line 1, even though line 2 is shorter
 | 
						|
        # for _ in _:
 | 
						|
        #   hello
 | 
						|
        #   h
 | 
						|
        #   hel
 | 
						|
        self.assertEqual(
 | 
						|
            reader.pos, len(first_line) + len(second_line) + 1
 | 
						|
        )  # +1 for newline
 | 
						|
        self.assertEqual(reader.cxy, (len(second_line), 1))
 | 
						|
 | 
						|
    def test_cursor_position_move_down_to_eol(self):
 | 
						|
        last_line = "  hel"
 | 
						|
 | 
						|
        # fmt: off
 | 
						|
        code = (
 | 
						|
            "for _ in _:\n"
 | 
						|
            "  hello\n"
 | 
						|
            "  h\n"
 | 
						|
           f"{last_line}"
 | 
						|
        )
 | 
						|
        # fmt: on
 | 
						|
 | 
						|
        events = itertools.chain(
 | 
						|
            code_to_events(code),
 | 
						|
            [
 | 
						|
                Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
 | 
						|
                Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
 | 
						|
                Event(evt="key", data="down", raw=bytearray(b"\x1bOB")),
 | 
						|
                Event(evt="key", data="down", raw=bytearray(b"\x1bOB")),
 | 
						|
            ],
 | 
						|
        )
 | 
						|
 | 
						|
        reader, _ = handle_all_events(events)
 | 
						|
 | 
						|
        # Cursor should be at end of line 3, even though line 2 is shorter
 | 
						|
        # for _ in _:
 | 
						|
        #   hello
 | 
						|
        #   h
 | 
						|
        #   hel
 | 
						|
        self.assertEqual(reader.pos, len(code))
 | 
						|
        self.assertEqual(reader.cxy, (len(last_line), 3))
 | 
						|
 | 
						|
    def test_cursor_position_multiple_mixed_lines_move_up(self):
 | 
						|
        # fmt: off
 | 
						|
        code = (
 | 
						|
            "def foo():\n"
 | 
						|
            "  x = '可口可乐; 可口可樂'\n"
 | 
						|
            "  y = 'abckdfjskldfjslkdjf'"
 | 
						|
        )
 | 
						|
        # fmt: on
 | 
						|
 | 
						|
        events = itertools.chain(
 | 
						|
            code_to_events(code),
 | 
						|
            13 * [Event(evt="key", data="left", raw=bytearray(b"\x1bOD"))],
 | 
						|
            [Event(evt="key", data="up", raw=bytearray(b"\x1bOA"))],
 | 
						|
        )
 | 
						|
 | 
						|
        reader, _ = handle_all_events(events)
 | 
						|
 | 
						|
        # By moving left, we're before the s:
 | 
						|
        # y = 'abckdfjskldfjslkdjf'
 | 
						|
        #             ^
 | 
						|
        # And we should move before the semi-colon despite the different offset
 | 
						|
        # x = '可口可乐; 可口可樂'
 | 
						|
        #            ^
 | 
						|
        self.assertEqual(reader.pos, 22)
 | 
						|
        self.assertEqual(reader.cxy, (15, 1))
 | 
						|
 | 
						|
    def test_cursor_position_after_wrap_and_move_up(self):
 | 
						|
        # fmt: off
 | 
						|
        code = (
 | 
						|
            "def foo():\n"
 | 
						|
            "  hello"
 | 
						|
        )
 | 
						|
        # fmt: on
 | 
						|
 | 
						|
        events = itertools.chain(
 | 
						|
            code_to_events(code),
 | 
						|
            [
 | 
						|
                Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
 | 
						|
            ],
 | 
						|
        )
 | 
						|
        reader, _ = handle_events_narrow_console(events)
 | 
						|
 | 
						|
        # The code looks like this:
 | 
						|
        # def foo()\
 | 
						|
        # :
 | 
						|
        #   hello
 | 
						|
        # After moving up we should be after the colon in line 2
 | 
						|
        self.assertEqual(reader.pos, 10)
 | 
						|
        self.assertEqual(reader.cxy, (1, 1))
 | 
						|
 | 
						|
 | 
						|
class TestPyReplAutoindent(TestCase):
 | 
						|
    def prepare_reader(self, events):
 | 
						|
        console = FakeConsole(events)
 | 
						|
        config = ReadlineConfig(readline_completer=None)
 | 
						|
        reader = ReadlineAlikeReader(console=console, config=config)
 | 
						|
        return reader
 | 
						|
 | 
						|
    def test_auto_indent_default(self):
 | 
						|
        # fmt: off
 | 
						|
        input_code = (
 | 
						|
            "def f():\n"
 | 
						|
                "pass\n\n"
 | 
						|
        )
 | 
						|
 | 
						|
        output_code = (
 | 
						|
            "def f():\n"
 | 
						|
            "    pass\n"
 | 
						|
            "    "
 | 
						|
        )
 | 
						|
        # fmt: on
 | 
						|
 | 
						|
        events = code_to_events(input_code)
 | 
						|
        reader = self.prepare_reader(events)
 | 
						|
        output = multiline_input(reader)
 | 
						|
        self.assertEqual(output, output_code)
 | 
						|
 | 
						|
    def test_auto_indent_continuation(self):
 | 
						|
        # auto indenting according to previous user indentation
 | 
						|
        # fmt: off
 | 
						|
        events = itertools.chain(
 | 
						|
            code_to_events("def f():\n"),
 | 
						|
            # add backspace to delete default auto-indent
 | 
						|
            [
 | 
						|
                Event(evt="key", data="backspace", raw=bytearray(b"\x7f")),
 | 
						|
            ],
 | 
						|
            code_to_events(
 | 
						|
                "  pass\n"
 | 
						|
                  "pass\n\n"
 | 
						|
            ),
 | 
						|
        )
 | 
						|
 | 
						|
        output_code = (
 | 
						|
            "def f():\n"
 | 
						|
            "  pass\n"
 | 
						|
            "  pass\n"
 | 
						|
            "  "
 | 
						|
        )
 | 
						|
        # fmt: on
 | 
						|
 | 
						|
        reader = self.prepare_reader(events)
 | 
						|
        output = multiline_input(reader)
 | 
						|
        self.assertEqual(output, output_code)
 | 
						|
 | 
						|
    def test_auto_indent_prev_block(self):
 | 
						|
        # auto indenting according to indentation in different block
 | 
						|
        # fmt: off
 | 
						|
        events = itertools.chain(
 | 
						|
            code_to_events("def f():\n"),
 | 
						|
            # add backspace to delete default auto-indent
 | 
						|
            [
 | 
						|
                Event(evt="key", data="backspace", raw=bytearray(b"\x7f")),
 | 
						|
            ],
 | 
						|
            code_to_events(
 | 
						|
                "  pass\n"
 | 
						|
                "pass\n\n"
 | 
						|
            ),
 | 
						|
            code_to_events(
 | 
						|
                "def g():\n"
 | 
						|
                  "pass\n\n"
 | 
						|
            ),
 | 
						|
        )
 | 
						|
 | 
						|
        output_code = (
 | 
						|
            "def g():\n"
 | 
						|
            "  pass\n"
 | 
						|
            "  "
 | 
						|
        )
 | 
						|
        # fmt: on
 | 
						|
 | 
						|
        reader = self.prepare_reader(events)
 | 
						|
        output1 = multiline_input(reader)
 | 
						|
        output2 = multiline_input(reader)
 | 
						|
        self.assertEqual(output2, output_code)
 | 
						|
 | 
						|
    def test_auto_indent_multiline(self):
 | 
						|
        # fmt: off
 | 
						|
        events = itertools.chain(
 | 
						|
            code_to_events(
 | 
						|
                "def f():\n"
 | 
						|
                    "pass"
 | 
						|
            ),
 | 
						|
            [
 | 
						|
                # go to the end of the first line
 | 
						|
                Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
 | 
						|
                Event(evt="key", data="\x05", raw=bytearray(b"\x1bO5")),
 | 
						|
                # new line should be autoindented
 | 
						|
                Event(evt="key", data="\n", raw=bytearray(b"\n")),
 | 
						|
            ],
 | 
						|
            code_to_events(
 | 
						|
                "pass"
 | 
						|
            ),
 | 
						|
            [
 | 
						|
                # go to end of last line
 | 
						|
                Event(evt="key", data="down", raw=bytearray(b"\x1bOB")),
 | 
						|
                Event(evt="key", data="\x05", raw=bytearray(b"\x1bO5")),
 | 
						|
                # double newline to terminate the block
 | 
						|
                Event(evt="key", data="\n", raw=bytearray(b"\n")),
 | 
						|
                Event(evt="key", data="\n", raw=bytearray(b"\n")),
 | 
						|
            ],
 | 
						|
        )
 | 
						|
 | 
						|
        output_code = (
 | 
						|
            "def f():\n"
 | 
						|
            "    pass\n"
 | 
						|
            "    pass\n"
 | 
						|
            "    "
 | 
						|
        )
 | 
						|
        # fmt: on
 | 
						|
 | 
						|
        reader = self.prepare_reader(events)
 | 
						|
        output = multiline_input(reader)
 | 
						|
        self.assertEqual(output, output_code)
 | 
						|
 | 
						|
    def test_auto_indent_with_comment(self):
 | 
						|
        # fmt: off
 | 
						|
        events = code_to_events(
 | 
						|
            "def f():  # foo\n"
 | 
						|
                "pass\n\n"
 | 
						|
        )
 | 
						|
 | 
						|
        output_code = (
 | 
						|
            "def f():  # foo\n"
 | 
						|
            "    pass\n"
 | 
						|
            "    "
 | 
						|
        )
 | 
						|
        # fmt: on
 | 
						|
 | 
						|
        reader = self.prepare_reader(events)
 | 
						|
        output = multiline_input(reader)
 | 
						|
        self.assertEqual(output, output_code)
 | 
						|
 | 
						|
    def test_auto_indent_with_multicomment(self):
 | 
						|
        # fmt: off
 | 
						|
        events = code_to_events(
 | 
						|
            "def f():  ## foo\n"
 | 
						|
                "pass\n\n"
 | 
						|
        )
 | 
						|
 | 
						|
        output_code = (
 | 
						|
            "def f():  ## foo\n"
 | 
						|
            "    pass\n"
 | 
						|
            "    "
 | 
						|
        )
 | 
						|
        # fmt: on
 | 
						|
 | 
						|
        reader = self.prepare_reader(events)
 | 
						|
        output = multiline_input(reader)
 | 
						|
        self.assertEqual(output, output_code)
 | 
						|
 | 
						|
    def test_auto_indent_ignore_comments(self):
 | 
						|
        # fmt: off
 | 
						|
        events = code_to_events(
 | 
						|
            "pass  #:\n"
 | 
						|
        )
 | 
						|
 | 
						|
        output_code = (
 | 
						|
            "pass  #:"
 | 
						|
        )
 | 
						|
        # fmt: on
 | 
						|
 | 
						|
        reader = self.prepare_reader(events)
 | 
						|
        output = multiline_input(reader)
 | 
						|
        self.assertEqual(output, output_code)
 | 
						|
 | 
						|
 | 
						|
class TestPyReplOutput(ScreenEqualMixin, TestCase):
 | 
						|
    def prepare_reader(self, events):
 | 
						|
        console = FakeConsole(events)
 | 
						|
        config = ReadlineConfig(readline_completer=None)
 | 
						|
        reader = ReadlineAlikeReader(console=console, config=config)
 | 
						|
        reader.can_colorize = False
 | 
						|
        return reader
 | 
						|
 | 
						|
    def test_stdin_is_tty(self):
 | 
						|
        # Used during test log analysis to figure out if a TTY was available.
 | 
						|
        try:
 | 
						|
            if os.isatty(sys.stdin.fileno()):
 | 
						|
                return
 | 
						|
        except OSError as ose:
 | 
						|
            self.skipTest(f"stdin tty check failed: {ose}")
 | 
						|
        else:
 | 
						|
            self.skipTest("stdin is not a tty")
 | 
						|
 | 
						|
    def test_stdout_is_tty(self):
 | 
						|
        # Used during test log analysis to figure out if a TTY was available.
 | 
						|
        try:
 | 
						|
            if os.isatty(sys.stdout.fileno()):
 | 
						|
                return
 | 
						|
        except OSError as ose:
 | 
						|
            self.skipTest(f"stdout tty check failed: {ose}")
 | 
						|
        else:
 | 
						|
            self.skipTest("stdout is not a tty")
 | 
						|
 | 
						|
    def test_basic(self):
 | 
						|
        reader = self.prepare_reader(code_to_events("1+1\n"))
 | 
						|
 | 
						|
        output = multiline_input(reader)
 | 
						|
        self.assertEqual(output, "1+1")
 | 
						|
        self.assert_screen_equal(reader, "1+1", clean=True)
 | 
						|
 | 
						|
    def test_get_line_buffer_returns_str(self):
 | 
						|
        reader = self.prepare_reader(code_to_events("\n"))
 | 
						|
        wrapper = _ReadlineWrapper(f_in=None, f_out=None, reader=reader)
 | 
						|
        self.assertIs(type(wrapper.get_line_buffer()), str)
 | 
						|
 | 
						|
    def test_multiline_edit(self):
 | 
						|
        events = itertools.chain(
 | 
						|
            code_to_events("def f():\n...\n\n"),
 | 
						|
            [
 | 
						|
                Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
 | 
						|
                Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
 | 
						|
                Event(evt="key", data="left", raw=bytearray(b"\x1bOD")),
 | 
						|
                Event(evt="key", data="left", raw=bytearray(b"\x1bOD")),
 | 
						|
                Event(evt="key", data="left", raw=bytearray(b"\x1bOD")),
 | 
						|
                Event(evt="key", data="backspace", raw=bytearray(b"\x08")),
 | 
						|
                Event(evt="key", data="g", raw=bytearray(b"g")),
 | 
						|
                Event(evt="key", data="down", raw=bytearray(b"\x1bOB")),
 | 
						|
                Event(evt="key", data="backspace", raw=bytearray(b"\x08")),
 | 
						|
                Event(evt="key", data="delete", raw=bytearray(b"\x7F")),
 | 
						|
                Event(evt="key", data="right", raw=bytearray(b"g")),
 | 
						|
                Event(evt="key", data="backspace", raw=bytearray(b"\x08")),
 | 
						|
                Event(evt="key", data="p", raw=bytearray(b"p")),
 | 
						|
                Event(evt="key", data="a", raw=bytearray(b"a")),
 | 
						|
                Event(evt="key", data="s", raw=bytearray(b"s")),
 | 
						|
                Event(evt="key", data="s", raw=bytearray(b"s")),
 | 
						|
                Event(evt="key", data="\n", raw=bytearray(b"\n")),
 | 
						|
                Event(evt="key", data="\n", raw=bytearray(b"\n")),
 | 
						|
            ],
 | 
						|
        )
 | 
						|
        reader = self.prepare_reader(events)
 | 
						|
 | 
						|
        output = multiline_input(reader)
 | 
						|
        expected = "def f():\n    ...\n    "
 | 
						|
        self.assertEqual(output, expected)
 | 
						|
        self.assert_screen_equal(reader, expected, clean=True)
 | 
						|
        output = multiline_input(reader)
 | 
						|
        expected = "def g():\n    pass\n    "
 | 
						|
        self.assertEqual(output, expected)
 | 
						|
        self.assert_screen_equal(reader, expected, clean=True)
 | 
						|
 | 
						|
    def test_history_navigation_with_up_arrow(self):
 | 
						|
        events = itertools.chain(
 | 
						|
            code_to_events("1+1\n2+2\n"),
 | 
						|
            [
 | 
						|
                Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
 | 
						|
                Event(evt="key", data="\n", raw=bytearray(b"\n")),
 | 
						|
                Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
 | 
						|
                Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
 | 
						|
                Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
 | 
						|
                Event(evt="key", data="\n", raw=bytearray(b"\n")),
 | 
						|
            ],
 | 
						|
        )
 | 
						|
 | 
						|
        reader = self.prepare_reader(events)
 | 
						|
 | 
						|
        output = multiline_input(reader)
 | 
						|
        self.assertEqual(output, "1+1")
 | 
						|
        self.assert_screen_equal(reader, "1+1", clean=True)
 | 
						|
        output = multiline_input(reader)
 | 
						|
        self.assertEqual(output, "2+2")
 | 
						|
        self.assert_screen_equal(reader, "2+2", clean=True)
 | 
						|
        output = multiline_input(reader)
 | 
						|
        self.assertEqual(output, "2+2")
 | 
						|
        self.assert_screen_equal(reader, "2+2", clean=True)
 | 
						|
        output = multiline_input(reader)
 | 
						|
        self.assertEqual(output, "1+1")
 | 
						|
        self.assert_screen_equal(reader, "1+1", clean=True)
 | 
						|
 | 
						|
    def test_history_with_multiline_entries(self):
 | 
						|
        code = "def foo():\nx = 1\ny = 2\nz = 3\n\ndef bar():\nreturn 42\n\n"
 | 
						|
        events = list(itertools.chain(
 | 
						|
            code_to_events(code),
 | 
						|
            [
 | 
						|
                Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
 | 
						|
                Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
 | 
						|
                Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
 | 
						|
                Event(evt="key", data="\n", raw=bytearray(b"\n")),
 | 
						|
                Event(evt="key", data="\n", raw=bytearray(b"\n")),
 | 
						|
            ]
 | 
						|
        ))
 | 
						|
 | 
						|
        reader = self.prepare_reader(events)
 | 
						|
        output = multiline_input(reader)
 | 
						|
        output = multiline_input(reader)
 | 
						|
        output = multiline_input(reader)
 | 
						|
        expected = "def foo():\n    x = 1\n    y = 2\n    z = 3\n    "
 | 
						|
        self.assert_screen_equal(reader, expected, clean=True)
 | 
						|
        self.assertEqual(output, expected)
 | 
						|
 | 
						|
 | 
						|
    def test_history_navigation_with_down_arrow(self):
 | 
						|
        events = itertools.chain(
 | 
						|
            code_to_events("1+1\n2+2\n"),
 | 
						|
            [
 | 
						|
                Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
 | 
						|
                Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
 | 
						|
                Event(evt="key", data="\n", raw=bytearray(b"\n")),
 | 
						|
                Event(evt="key", data="down", raw=bytearray(b"\x1bOB")),
 | 
						|
                Event(evt="key", data="down", raw=bytearray(b"\x1bOB")),
 | 
						|
            ],
 | 
						|
        )
 | 
						|
 | 
						|
        reader = self.prepare_reader(events)
 | 
						|
 | 
						|
        output = multiline_input(reader)
 | 
						|
        self.assertEqual(output, "1+1")
 | 
						|
        self.assert_screen_equal(reader, "1+1", clean=True)
 | 
						|
 | 
						|
    def test_history_search(self):
 | 
						|
        events = itertools.chain(
 | 
						|
            code_to_events("1+1\n2+2\n3+3\n"),
 | 
						|
            [
 | 
						|
                Event(evt="key", data="\x12", raw=bytearray(b"\x12")),
 | 
						|
                Event(evt="key", data="1", raw=bytearray(b"1")),
 | 
						|
                Event(evt="key", data="\n", raw=bytearray(b"\n")),
 | 
						|
                Event(evt="key", data="\n", raw=bytearray(b"\n")),
 | 
						|
            ],
 | 
						|
        )
 | 
						|
 | 
						|
        reader = self.prepare_reader(events)
 | 
						|
 | 
						|
        output = multiline_input(reader)
 | 
						|
        self.assertEqual(output, "1+1")
 | 
						|
        self.assert_screen_equal(reader, "1+1", clean=True)
 | 
						|
        output = multiline_input(reader)
 | 
						|
        self.assertEqual(output, "2+2")
 | 
						|
        self.assert_screen_equal(reader, "2+2", clean=True)
 | 
						|
        output = multiline_input(reader)
 | 
						|
        self.assertEqual(output, "3+3")
 | 
						|
        self.assert_screen_equal(reader, "3+3", clean=True)
 | 
						|
        output = multiline_input(reader)
 | 
						|
        self.assertEqual(output, "1+1")
 | 
						|
        self.assert_screen_equal(reader, "1+1", clean=True)
 | 
						|
 | 
						|
    def test_control_character(self):
 | 
						|
        events = code_to_events("c\x1d\n")
 | 
						|
        reader = self.prepare_reader(events)
 | 
						|
        output = multiline_input(reader)
 | 
						|
        self.assertEqual(output, "c\x1d")
 | 
						|
        self.assert_screen_equal(reader, "c\x1d", clean=True)
 | 
						|
 | 
						|
    def test_history_search_backward(self):
 | 
						|
        # Test <page up> history search backward with "imp" input
 | 
						|
        events = itertools.chain(
 | 
						|
            code_to_events("import os\n"),
 | 
						|
            code_to_events("imp"),
 | 
						|
            [
 | 
						|
                Event(evt='key', data='page up', raw=bytearray(b'\x1b[5~')),
 | 
						|
                Event(evt="key", data="\n", raw=bytearray(b"\n")),
 | 
						|
            ],
 | 
						|
        )
 | 
						|
 | 
						|
        # fill the history
 | 
						|
        reader = self.prepare_reader(events)
 | 
						|
        multiline_input(reader)
 | 
						|
 | 
						|
        # search for "imp" in history
 | 
						|
        output = multiline_input(reader)
 | 
						|
        self.assertEqual(output, "import os")
 | 
						|
        self.assert_screen_equal(reader, "import os", clean=True)
 | 
						|
 | 
						|
    def test_history_search_backward_empty(self):
 | 
						|
        # Test <page up> history search backward with an empty input
 | 
						|
        events = itertools.chain(
 | 
						|
            code_to_events("import os\n"),
 | 
						|
            [
 | 
						|
                Event(evt='key', data='page up', raw=bytearray(b'\x1b[5~')),
 | 
						|
                Event(evt="key", data="\n", raw=bytearray(b"\n")),
 | 
						|
            ],
 | 
						|
        )
 | 
						|
 | 
						|
        # fill the history
 | 
						|
        reader = self.prepare_reader(events)
 | 
						|
        multiline_input(reader)
 | 
						|
 | 
						|
        # search backward in history
 | 
						|
        output = multiline_input(reader)
 | 
						|
        self.assertEqual(output, "import os")
 | 
						|
        self.assert_screen_equal(reader, "import os", clean=True)
 | 
						|
 | 
						|
 | 
						|
class TestPyReplCompleter(TestCase):
 | 
						|
    def prepare_reader(self, events, namespace):
 | 
						|
        console = FakeConsole(events)
 | 
						|
        config = ReadlineConfig()
 | 
						|
        config.readline_completer = rlcompleter.Completer(namespace).complete
 | 
						|
        reader = ReadlineAlikeReader(console=console, config=config)
 | 
						|
        return reader
 | 
						|
 | 
						|
    @patch("rlcompleter._readline_available", False)
 | 
						|
    def test_simple_completion(self):
 | 
						|
        events = code_to_events("os.getpid\t\n")
 | 
						|
 | 
						|
        namespace = {"os": os}
 | 
						|
        reader = self.prepare_reader(events, namespace)
 | 
						|
 | 
						|
        output = multiline_input(reader, namespace)
 | 
						|
        self.assertEqual(output, "os.getpid()")
 | 
						|
 | 
						|
    def test_completion_with_many_options(self):
 | 
						|
        # Test with something that initially displays many options
 | 
						|
        # and then complete from one of them. The first time tab is
 | 
						|
        # pressed, the options are displayed (which corresponds to
 | 
						|
        # when the repl shows [ not unique ]) and the second completes
 | 
						|
        # from one of them.
 | 
						|
        events = code_to_events("os.\t\tO_AP\t\n")
 | 
						|
 | 
						|
        namespace = {"os": os}
 | 
						|
        reader = self.prepare_reader(events, namespace)
 | 
						|
 | 
						|
        output = multiline_input(reader, namespace)
 | 
						|
        self.assertEqual(output, "os.O_APPEND")
 | 
						|
 | 
						|
    def test_empty_namespace_completion(self):
 | 
						|
        events = code_to_events("os.geten\t\n")
 | 
						|
        namespace = {}
 | 
						|
        reader = self.prepare_reader(events, namespace)
 | 
						|
 | 
						|
        output = multiline_input(reader, namespace)
 | 
						|
        self.assertEqual(output, "os.geten")
 | 
						|
 | 
						|
    def test_global_namespace_completion(self):
 | 
						|
        events = code_to_events("py\t\n")
 | 
						|
        namespace = {"python": None}
 | 
						|
        reader = self.prepare_reader(events, namespace)
 | 
						|
        output = multiline_input(reader, namespace)
 | 
						|
        self.assertEqual(output, "python")
 | 
						|
 | 
						|
    def test_up_down_arrow_with_completion_menu(self):
 | 
						|
        """Up arrow in the middle of unfinished tab completion when the menu is displayed
 | 
						|
        should work and trigger going back in history. Down arrow should subsequently
 | 
						|
        get us back to the incomplete command."""
 | 
						|
        code = "import os\nos.\t\t"
 | 
						|
        namespace = {"os": os}
 | 
						|
 | 
						|
        events = itertools.chain(
 | 
						|
            code_to_events(code),
 | 
						|
            [
 | 
						|
                Event(evt="key", data="down", raw=bytearray(b"\x1bOB")),
 | 
						|
                Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
 | 
						|
                Event(evt="key", data="down", raw=bytearray(b"\x1bOB")),
 | 
						|
            ],
 | 
						|
            code_to_events("\n"),
 | 
						|
        )
 | 
						|
        reader = self.prepare_reader(events, namespace=namespace)
 | 
						|
        output = multiline_input(reader, namespace)
 | 
						|
        # This is the first line, nothing to see here
 | 
						|
        self.assertEqual(output, "import os")
 | 
						|
        # This is the second line. We pressed up and down arrows
 | 
						|
        # so we should end up where we were when we initiated tab completion.
 | 
						|
        output = multiline_input(reader, namespace)
 | 
						|
        self.assertEqual(output, "os.")
 | 
						|
 | 
						|
    @patch("_pyrepl.readline._ReadlineWrapper.get_reader")
 | 
						|
    @patch("sys.stderr", new_callable=io.StringIO)
 | 
						|
    def test_completion_with_warnings(self, mock_stderr, mock_get_reader):
 | 
						|
        class Dummy:
 | 
						|
            @property
 | 
						|
            def test_func(self):
 | 
						|
                import warnings
 | 
						|
 | 
						|
                warnings.warn("warnings\n")
 | 
						|
                return None
 | 
						|
 | 
						|
        dummy = Dummy()
 | 
						|
        events = code_to_events("dummy.test_func.\t\n\n")
 | 
						|
        namespace = {"dummy": dummy}
 | 
						|
        reader = self.prepare_reader(events, namespace)
 | 
						|
        mock_get_reader.return_value = reader
 | 
						|
        output = readline_multiline_input(more_lines, ">>>", "...")
 | 
						|
        self.assertEqual(output, "dummy.test_func.__")
 | 
						|
        self.assertEqual(mock_stderr.getvalue(), "")
 | 
						|
 | 
						|
 | 
						|
class TestPasteEvent(TestCase):
 | 
						|
    def prepare_reader(self, events):
 | 
						|
        console = FakeConsole(events)
 | 
						|
        config = ReadlineConfig(readline_completer=None)
 | 
						|
        reader = ReadlineAlikeReader(console=console, config=config)
 | 
						|
        return reader
 | 
						|
 | 
						|
    def test_paste(self):
 | 
						|
        # fmt: off
 | 
						|
        code = (
 | 
						|
            "def a():\n"
 | 
						|
            "  for x in range(10):\n"
 | 
						|
            "    if x%2:\n"
 | 
						|
            "      print(x)\n"
 | 
						|
            "    else:\n"
 | 
						|
            "      pass\n"
 | 
						|
        )
 | 
						|
        # fmt: on
 | 
						|
 | 
						|
        events = itertools.chain(
 | 
						|
            [
 | 
						|
                Event(evt="key", data="f3", raw=bytearray(b"\x1bOR")),
 | 
						|
            ],
 | 
						|
            code_to_events(code),
 | 
						|
            [
 | 
						|
                Event(evt="key", data="f3", raw=bytearray(b"\x1bOR")),
 | 
						|
            ],
 | 
						|
            code_to_events("\n"),
 | 
						|
        )
 | 
						|
        reader = self.prepare_reader(events)
 | 
						|
        output = multiline_input(reader)
 | 
						|
        self.assertEqual(output, code)
 | 
						|
 | 
						|
    def test_paste_mid_newlines(self):
 | 
						|
        # fmt: off
 | 
						|
        code = (
 | 
						|
            "def f():\n"
 | 
						|
            "  x = y\n"
 | 
						|
            "  \n"
 | 
						|
            "  y = z\n"
 | 
						|
        )
 | 
						|
        # fmt: on
 | 
						|
 | 
						|
        events = itertools.chain(
 | 
						|
            [
 | 
						|
                Event(evt="key", data="f3", raw=bytearray(b"\x1bOR")),
 | 
						|
            ],
 | 
						|
            code_to_events(code),
 | 
						|
            [
 | 
						|
                Event(evt="key", data="f3", raw=bytearray(b"\x1bOR")),
 | 
						|
            ],
 | 
						|
            code_to_events("\n"),
 | 
						|
        )
 | 
						|
        reader = self.prepare_reader(events)
 | 
						|
        output = multiline_input(reader)
 | 
						|
        self.assertEqual(output, code)
 | 
						|
 | 
						|
    def test_paste_mid_newlines_not_in_paste_mode(self):
 | 
						|
        # fmt: off
 | 
						|
        code = (
 | 
						|
            "def f():\n"
 | 
						|
                "x = y\n"
 | 
						|
                "\n"
 | 
						|
                "y = z\n\n"
 | 
						|
        )
 | 
						|
 | 
						|
        expected = (
 | 
						|
            "def f():\n"
 | 
						|
            "    x = y\n"
 | 
						|
            "    "
 | 
						|
        )
 | 
						|
        # fmt: on
 | 
						|
 | 
						|
        events = code_to_events(code)
 | 
						|
        reader = self.prepare_reader(events)
 | 
						|
        output = multiline_input(reader)
 | 
						|
        self.assertEqual(output, expected)
 | 
						|
 | 
						|
    def test_paste_not_in_paste_mode(self):
 | 
						|
        # fmt: off
 | 
						|
        input_code = (
 | 
						|
            "def a():\n"
 | 
						|
                "for x in range(10):\n"
 | 
						|
                    "if x%2:\n"
 | 
						|
                        "print(x)\n"
 | 
						|
                    "else:\n"
 | 
						|
                        "pass\n\n"
 | 
						|
        )
 | 
						|
 | 
						|
        output_code = (
 | 
						|
            "def a():\n"
 | 
						|
            "    for x in range(10):\n"
 | 
						|
            "        if x%2:\n"
 | 
						|
            "            print(x)\n"
 | 
						|
            "            else:"
 | 
						|
        )
 | 
						|
        # fmt: on
 | 
						|
 | 
						|
        events = code_to_events(input_code)
 | 
						|
        reader = self.prepare_reader(events)
 | 
						|
        output = multiline_input(reader)
 | 
						|
        self.assertEqual(output, output_code)
 | 
						|
 | 
						|
    def test_bracketed_paste(self):
 | 
						|
        """Test that bracketed paste using \x1b[200~ and \x1b[201~ works."""
 | 
						|
        # fmt: off
 | 
						|
        input_code = (
 | 
						|
            "def a():\n"
 | 
						|
            "  for x in range(10):\n"
 | 
						|
            "\n"
 | 
						|
            "    if x%2:\n"
 | 
						|
            "      print(x)\n"
 | 
						|
            "\n"
 | 
						|
            "    else:\n"
 | 
						|
            "      pass\n"
 | 
						|
        )
 | 
						|
 | 
						|
        output_code = (
 | 
						|
            "def a():\n"
 | 
						|
            "  for x in range(10):\n"
 | 
						|
            "\n"
 | 
						|
            "    if x%2:\n"
 | 
						|
            "      print(x)\n"
 | 
						|
            "\n"
 | 
						|
            "    else:\n"
 | 
						|
            "      pass\n"
 | 
						|
        )
 | 
						|
        # fmt: on
 | 
						|
 | 
						|
        paste_start = "\x1b[200~"
 | 
						|
        paste_end = "\x1b[201~"
 | 
						|
 | 
						|
        events = itertools.chain(
 | 
						|
            code_to_events(paste_start),
 | 
						|
            code_to_events(input_code),
 | 
						|
            code_to_events(paste_end),
 | 
						|
            code_to_events("\n"),
 | 
						|
        )
 | 
						|
        reader = self.prepare_reader(events)
 | 
						|
        output = multiline_input(reader)
 | 
						|
        self.assertEqual(output, output_code)
 | 
						|
 | 
						|
    def test_bracketed_paste_single_line(self):
 | 
						|
        input_code = "oneline"
 | 
						|
 | 
						|
        paste_start = "\x1b[200~"
 | 
						|
        paste_end = "\x1b[201~"
 | 
						|
 | 
						|
        events = itertools.chain(
 | 
						|
            code_to_events(paste_start),
 | 
						|
            code_to_events(input_code),
 | 
						|
            code_to_events(paste_end),
 | 
						|
            code_to_events("\n"),
 | 
						|
        )
 | 
						|
        reader = self.prepare_reader(events)
 | 
						|
        output = multiline_input(reader)
 | 
						|
        self.assertEqual(output, input_code)
 | 
						|
 | 
						|
 | 
						|
@skipUnless(pty, "requires pty")
 | 
						|
class TestDumbTerminal(ReplTestCase):
 | 
						|
    def test_dumb_terminal_exits_cleanly(self):
 | 
						|
        env = os.environ.copy()
 | 
						|
        env.update({"TERM": "dumb"})
 | 
						|
        output, exit_code = self.run_repl("exit()\n", env=env)
 | 
						|
        self.assertEqual(exit_code, 0)
 | 
						|
        self.assertIn("warning: can't use pyrepl", output)
 | 
						|
        self.assertNotIn("Exception", output)
 | 
						|
        self.assertNotIn("Traceback", output)
 | 
						|
 | 
						|
 | 
						|
@skipUnless(pty, "requires pty")
 | 
						|
@skipIf((os.environ.get("TERM") or "dumb") == "dumb", "can't use pyrepl in dumb terminal")
 | 
						|
class TestMain(ReplTestCase):
 | 
						|
    def setUp(self):
 | 
						|
        # Cleanup from PYTHON* variables to isolate from local
 | 
						|
        # user settings, see #121359.  Such variables should be
 | 
						|
        # added later in test methods to patched os.environ.
 | 
						|
        patcher = patch('os.environ', new=make_clean_env())
 | 
						|
        self.addCleanup(patcher.stop)
 | 
						|
        patcher.start()
 | 
						|
 | 
						|
    @force_not_colorized
 | 
						|
    def test_exposed_globals_in_repl(self):
 | 
						|
        pre = "['__annotations__', '__builtins__'"
 | 
						|
        post = "'__loader__', '__name__', '__package__', '__spec__']"
 | 
						|
        output, exit_code = self.run_repl(["sorted(dir())", "exit()"], skip=True)
 | 
						|
        self.assertEqual(exit_code, 0)
 | 
						|
 | 
						|
        # if `__main__` is not a file (impossible with pyrepl)
 | 
						|
        case1 = f"{pre}, '__doc__', {post}" in output
 | 
						|
 | 
						|
        # if `__main__` is an uncached .py file (no .pyc)
 | 
						|
        case2 = f"{pre}, '__doc__', '__file__', {post}" in output
 | 
						|
 | 
						|
        # if `__main__` is a cached .pyc file and the .py source exists
 | 
						|
        case3 = f"{pre}, '__cached__', '__doc__', '__file__', {post}" in output
 | 
						|
 | 
						|
        # if `__main__` is a cached .pyc file but there's no .py source file
 | 
						|
        case4 = f"{pre}, '__cached__', '__doc__', {post}" in output
 | 
						|
 | 
						|
        self.assertTrue(case1 or case2 or case3 or case4, output)
 | 
						|
 | 
						|
    def _assertMatchOK(
 | 
						|
            self, var: str, expected: str | re.Pattern, actual: str
 | 
						|
    ) -> None:
 | 
						|
        if isinstance(expected, re.Pattern):
 | 
						|
            self.assertTrue(
 | 
						|
                expected.match(actual),
 | 
						|
                f"{var}={actual} does not match {expected.pattern}",
 | 
						|
            )
 | 
						|
        else:
 | 
						|
            self.assertEqual(
 | 
						|
                actual,
 | 
						|
                expected,
 | 
						|
                f"expected {var}={expected}, got {var}={actual}",
 | 
						|
            )
 | 
						|
 | 
						|
    @force_not_colorized
 | 
						|
    def _run_repl_globals_test(self, expectations, *, as_file=False, as_module=False):
 | 
						|
        clean_env = make_clean_env()
 | 
						|
        clean_env["NO_COLOR"] = "1"  # force_not_colorized doesn't touch subprocesses
 | 
						|
 | 
						|
        with tempfile.TemporaryDirectory() as td:
 | 
						|
            blue = pathlib.Path(td) / "blue"
 | 
						|
            blue.mkdir()
 | 
						|
            mod = blue / "calx.py"
 | 
						|
            mod.write_text("FOO = 42", encoding="utf-8")
 | 
						|
            commands = [
 | 
						|
                "print(f'^{" + var + "=}')" for var in expectations
 | 
						|
            ] + ["exit()"]
 | 
						|
            if as_file and as_module:
 | 
						|
                self.fail("as_file and as_module are mutually exclusive")
 | 
						|
            elif as_file:
 | 
						|
                output, exit_code = self.run_repl(
 | 
						|
                    commands,
 | 
						|
                    cmdline_args=[str(mod)],
 | 
						|
                    env=clean_env,
 | 
						|
                    skip=True,
 | 
						|
                )
 | 
						|
            elif as_module:
 | 
						|
                output, exit_code = self.run_repl(
 | 
						|
                    commands,
 | 
						|
                    cmdline_args=["-m", "blue.calx"],
 | 
						|
                    env=clean_env,
 | 
						|
                    cwd=td,
 | 
						|
                    skip=True,
 | 
						|
                )
 | 
						|
            else:
 | 
						|
                self.fail("Choose one of as_file or as_module")
 | 
						|
 | 
						|
        self.assertEqual(exit_code, 0)
 | 
						|
        for var, expected in expectations.items():
 | 
						|
            with self.subTest(var=var, expected=expected):
 | 
						|
                if m := re.search(rf"\^{var}=(.+?)[\r\n]", output):
 | 
						|
                    self._assertMatchOK(var, expected, actual=m.group(1))
 | 
						|
                else:
 | 
						|
                    self.fail(f"{var}= not found in output: {output!r}\n\n{output}")
 | 
						|
 | 
						|
        self.assertNotIn("Exception", output)
 | 
						|
        self.assertNotIn("Traceback", output)
 | 
						|
 | 
						|
    def test_inspect_keeps_globals_from_inspected_file(self):
 | 
						|
        expectations = {
 | 
						|
            "FOO": "42",
 | 
						|
            "__name__": "'__main__'",
 | 
						|
            "__package__": "None",
 | 
						|
            # "__file__" is missing in -i, like in the basic REPL
 | 
						|
        }
 | 
						|
        self._run_repl_globals_test(expectations, as_file=True)
 | 
						|
 | 
						|
    def test_inspect_keeps_globals_from_inspected_module(self):
 | 
						|
        expectations = {
 | 
						|
            "FOO": "42",
 | 
						|
            "__name__": "'__main__'",
 | 
						|
            "__package__": "'blue'",
 | 
						|
            "__file__": re.compile(r"^'.*calx.py'$"),
 | 
						|
        }
 | 
						|
        self._run_repl_globals_test(expectations, as_module=True)
 | 
						|
 | 
						|
    @force_not_colorized
 | 
						|
    def test_python_basic_repl(self):
 | 
						|
        env = os.environ.copy()
 | 
						|
        commands = ("from test.support import initialized_with_pyrepl\n"
 | 
						|
                    "initialized_with_pyrepl()\n"
 | 
						|
                    "exit()\n")
 | 
						|
 | 
						|
        env.pop("PYTHON_BASIC_REPL", None)
 | 
						|
        output, exit_code = self.run_repl(commands, env=env, skip=True)
 | 
						|
        self.assertEqual(exit_code, 0)
 | 
						|
        self.assertIn("True", output)
 | 
						|
        self.assertNotIn("False", output)
 | 
						|
        self.assertNotIn("Exception", output)
 | 
						|
        self.assertNotIn("Traceback", output)
 | 
						|
 | 
						|
        env["PYTHON_BASIC_REPL"] = "1"
 | 
						|
        output, exit_code = self.run_repl(commands, env=env)
 | 
						|
        self.assertEqual(exit_code, 0)
 | 
						|
        self.assertIn("False", output)
 | 
						|
        self.assertNotIn("True", output)
 | 
						|
        self.assertNotIn("Exception", output)
 | 
						|
        self.assertNotIn("Traceback", output)
 | 
						|
 | 
						|
        # The site module must not load _pyrepl if PYTHON_BASIC_REPL is set
 | 
						|
        commands = ("import sys\n"
 | 
						|
                    "print('_pyrepl' in sys.modules)\n"
 | 
						|
                    "exit()\n")
 | 
						|
        env["PYTHON_BASIC_REPL"] = "1"
 | 
						|
        output, exit_code = self.run_repl(commands, env=env)
 | 
						|
        self.assertEqual(exit_code, 0)
 | 
						|
        self.assertIn("False", output)
 | 
						|
        self.assertNotIn("True", output)
 | 
						|
        self.assertNotIn("Exception", output)
 | 
						|
        self.assertNotIn("Traceback", output)
 | 
						|
 | 
						|
    @force_not_colorized
 | 
						|
    def test_no_pyrepl_source_in_exc(self):
 | 
						|
        # Avoid using _pyrepl/__main__.py in traceback reports
 | 
						|
        # See https://github.com/python/cpython/issues/129098.
 | 
						|
        pyrepl_main_file = os.path.join(STDLIB_DIR, "_pyrepl", "__main__.py")
 | 
						|
        self.assertTrue(os.path.exists(pyrepl_main_file), pyrepl_main_file)
 | 
						|
        with open(pyrepl_main_file) as fp:
 | 
						|
            excluded_lines = fp.readlines()
 | 
						|
        excluded_lines = list(filter(None, map(str.strip, excluded_lines)))
 | 
						|
 | 
						|
        for filename in ['?', 'unknown-filename', '<foo>', '<...>']:
 | 
						|
            self._test_no_pyrepl_source_in_exc(filename, excluded_lines)
 | 
						|
 | 
						|
    def _test_no_pyrepl_source_in_exc(self, filename, excluded_lines):
 | 
						|
        with EnvironmentVarGuard() as env, self.subTest(filename=filename):
 | 
						|
            env.unset("PYTHON_BASIC_REPL")
 | 
						|
            commands = (f"eval(compile('spam', {filename!r}, 'eval'))\n"
 | 
						|
                        f"exit()\n")
 | 
						|
            output, _ = self.run_repl(commands, env=env)
 | 
						|
            self.assertIn("Traceback (most recent call last)", output)
 | 
						|
            self.assertIn("NameError: name 'spam' is not defined", output)
 | 
						|
            for line in excluded_lines:
 | 
						|
                with self.subTest(line=line):
 | 
						|
                    self.assertNotIn(line, output)
 | 
						|
 | 
						|
    @force_not_colorized
 | 
						|
    def test_bad_sys_excepthook_doesnt_crash_pyrepl(self):
 | 
						|
        env = os.environ.copy()
 | 
						|
        commands = ("import sys\n"
 | 
						|
                    "sys.excepthook = 1\n"
 | 
						|
                    "1/0\n"
 | 
						|
                    "exit()\n")
 | 
						|
 | 
						|
        def check(output, exitcode):
 | 
						|
            self.assertIn("Error in sys.excepthook:", output)
 | 
						|
            self.assertEqual(output.count("'int' object is not callable"), 1)
 | 
						|
            self.assertIn("Original exception was:", output)
 | 
						|
            self.assertIn("division by zero", output)
 | 
						|
            self.assertEqual(exitcode, 0)
 | 
						|
        env.pop("PYTHON_BASIC_REPL", None)
 | 
						|
        output, exit_code = self.run_repl(commands, env=env, skip=True)
 | 
						|
        check(output, exit_code)
 | 
						|
 | 
						|
        env["PYTHON_BASIC_REPL"] = "1"
 | 
						|
        output, exit_code = self.run_repl(commands, env=env)
 | 
						|
        check(output, exit_code)
 | 
						|
 | 
						|
    def test_not_wiping_history_file(self):
 | 
						|
        # skip, if readline module is not available
 | 
						|
        import_module('readline')
 | 
						|
 | 
						|
        hfile = tempfile.NamedTemporaryFile(delete=False)
 | 
						|
        self.addCleanup(unlink, hfile.name)
 | 
						|
        env = os.environ.copy()
 | 
						|
        env["PYTHON_HISTORY"] = hfile.name
 | 
						|
        commands = "123\nspam\nexit()\n"
 | 
						|
 | 
						|
        env.pop("PYTHON_BASIC_REPL", None)
 | 
						|
        output, exit_code = self.run_repl(commands, env=env)
 | 
						|
        self.assertEqual(exit_code, 0)
 | 
						|
        self.assertIn("123", output)
 | 
						|
        self.assertIn("spam", output)
 | 
						|
        self.assertNotEqual(pathlib.Path(hfile.name).stat().st_size, 0)
 | 
						|
 | 
						|
        hfile.file.truncate()
 | 
						|
        hfile.close()
 | 
						|
 | 
						|
        env["PYTHON_BASIC_REPL"] = "1"
 | 
						|
        output, exit_code = self.run_repl(commands, env=env)
 | 
						|
        self.assertEqual(exit_code, 0)
 | 
						|
        self.assertIn("123", output)
 | 
						|
        self.assertIn("spam", output)
 | 
						|
        self.assertNotEqual(pathlib.Path(hfile.name).stat().st_size, 0)
 | 
						|
 | 
						|
    @force_not_colorized
 | 
						|
    def test_correct_filename_in_syntaxerrors(self):
 | 
						|
        env = os.environ.copy()
 | 
						|
        commands = "a b c\nexit()\n"
 | 
						|
        output, exit_code = self.run_repl(commands, env=env, skip=True)
 | 
						|
        self.assertIn("SyntaxError: invalid syntax", output)
 | 
						|
        self.assertIn("<python-input-0>", output)
 | 
						|
        commands = " b\nexit()\n"
 | 
						|
        output, exit_code = self.run_repl(commands, env=env)
 | 
						|
        self.assertIn("IndentationError: unexpected indent", output)
 | 
						|
        self.assertIn("<python-input-0>", output)
 | 
						|
 | 
						|
    @force_not_colorized
 | 
						|
    def test_proper_tracebacklimit(self):
 | 
						|
        env = os.environ.copy()
 | 
						|
        for set_tracebacklimit in [True, False]:
 | 
						|
            commands = ("import sys\n" +
 | 
						|
                        ("sys.tracebacklimit = 1\n" if set_tracebacklimit else "") +
 | 
						|
                        "def x1(): 1/0\n\n"
 | 
						|
                        "def x2(): x1()\n\n"
 | 
						|
                        "def x3(): x2()\n\n"
 | 
						|
                        "x3()\n"
 | 
						|
                        "exit()\n")
 | 
						|
 | 
						|
            for basic_repl in [True, False]:
 | 
						|
                if basic_repl:
 | 
						|
                    env["PYTHON_BASIC_REPL"] = "1"
 | 
						|
                else:
 | 
						|
                    env.pop("PYTHON_BASIC_REPL", None)
 | 
						|
                with self.subTest(set_tracebacklimit=set_tracebacklimit,
 | 
						|
                                  basic_repl=basic_repl):
 | 
						|
                    output, exit_code = self.run_repl(commands, env=env, skip=True)
 | 
						|
                    self.assertIn("in x1", output)
 | 
						|
                    if set_tracebacklimit:
 | 
						|
                        self.assertNotIn("in x2", output)
 | 
						|
                        self.assertNotIn("in x3", output)
 | 
						|
                        self.assertNotIn("in <module>", output)
 | 
						|
                    else:
 | 
						|
                        self.assertIn("in x2", output)
 | 
						|
                        self.assertIn("in x3", output)
 | 
						|
                        self.assertIn("in <module>", output)
 | 
						|
 | 
						|
    def test_null_byte(self):
 | 
						|
        output, exit_code = self.run_repl("\x00\nexit()\n")
 | 
						|
        self.assertEqual(exit_code, 0)
 | 
						|
        self.assertNotIn("TypeError", output)
 | 
						|
 | 
						|
    def test_readline_history_file(self):
 | 
						|
        # skip, if readline module is not available
 | 
						|
        readline = import_module('readline')
 | 
						|
        if readline.backend != "editline":
 | 
						|
            self.skipTest("GNU readline is not affected by this issue")
 | 
						|
 | 
						|
        with tempfile.NamedTemporaryFile() as hfile:
 | 
						|
            env = os.environ.copy()
 | 
						|
            env["PYTHON_HISTORY"] = hfile.name
 | 
						|
 | 
						|
            env["PYTHON_BASIC_REPL"] = "1"
 | 
						|
            output, exit_code = self.run_repl("spam \nexit()\n", env=env)
 | 
						|
            self.assertEqual(exit_code, 0)
 | 
						|
            self.assertIn("spam ", output)
 | 
						|
            self.assertNotEqual(pathlib.Path(hfile.name).stat().st_size, 0)
 | 
						|
            self.assertIn("spam\\040", pathlib.Path(hfile.name).read_text())
 | 
						|
 | 
						|
            env.pop("PYTHON_BASIC_REPL", None)
 | 
						|
            output, exit_code = self.run_repl("exit\n", env=env)
 | 
						|
            self.assertEqual(exit_code, 0)
 | 
						|
            self.assertNotIn("\\040", pathlib.Path(hfile.name).read_text())
 | 
						|
 | 
						|
    def test_keyboard_interrupt_after_isearch(self):
 | 
						|
        output, exit_code = self.run_repl(["\x12", "\x03", "exit"])
 | 
						|
        self.assertEqual(exit_code, 0)
 | 
						|
 | 
						|
    def test_prompt_after_help(self):
 | 
						|
        output, exit_code = self.run_repl(["help", "q", "exit"])
 | 
						|
 | 
						|
        # Regex pattern to remove ANSI escape sequences
 | 
						|
        ansi_escape = re.compile(r"(\x1B(=|>|(\[)[0-?]*[ -\/]*[@-~]))")
 | 
						|
        cleaned_output = ansi_escape.sub("", output)
 | 
						|
        self.assertEqual(exit_code, 0)
 | 
						|
 | 
						|
        # Ensure that we don't see multiple prompts after exiting `help`
 | 
						|
        # Extra stuff (newline and `exit` rewrites) are necessary
 | 
						|
        # because of how run_repl works.
 | 
						|
        self.assertNotIn(">>> \n>>> >>>", cleaned_output)
 |