mirror of
				https://github.com/python/cpython.git
				synced 2025-11-03 23:21:29 +00:00 
			
		
		
		
	
		
			
	
	
		
			295 lines
		
	
	
	
		
			8.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
		
		
			
		
	
	
			295 lines
		
	
	
	
		
			8.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| 
								 | 
							
								import itertools
							 | 
						||
| 
								 | 
							
								from functools import partial
							 | 
						||
| 
								 | 
							
								from unittest import TestCase
							 | 
						||
| 
								 | 
							
								from unittest.mock import MagicMock, call, patch, ANY
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								from .support import handle_all_events, code_to_events
							 | 
						||
| 
								 | 
							
								from _pyrepl.console import Event
							 | 
						||
| 
								 | 
							
								from _pyrepl.unix_console import UnixConsole
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								def unix_console(events, **kwargs):
							 | 
						||
| 
								 | 
							
								    console = UnixConsole()
							 | 
						||
| 
								 | 
							
								    console.get_event = MagicMock(side_effect=events)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    height = kwargs.get("height", 25)
							 | 
						||
| 
								 | 
							
								    width = kwargs.get("width", 80)
							 | 
						||
| 
								 | 
							
								    console.getheightwidth = MagicMock(side_effect=lambda: (height, width))
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    console.prepare()
							 | 
						||
| 
								 | 
							
								    for key, val in kwargs.items():
							 | 
						||
| 
								 | 
							
								        setattr(console, key, val)
							 | 
						||
| 
								 | 
							
								    return console
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								handle_events_unix_console = partial(
							 | 
						||
| 
								 | 
							
								    handle_all_events,
							 | 
						||
| 
								 | 
							
								    prepare_console=partial(unix_console),
							 | 
						||
| 
								 | 
							
								)
							 | 
						||
| 
								 | 
							
								handle_events_narrow_unix_console = partial(
							 | 
						||
| 
								 | 
							
								    handle_all_events,
							 | 
						||
| 
								 | 
							
								    prepare_console=partial(unix_console, width=5),
							 | 
						||
| 
								 | 
							
								)
							 | 
						||
| 
								 | 
							
								handle_events_short_unix_console = partial(
							 | 
						||
| 
								 | 
							
								    handle_all_events,
							 | 
						||
| 
								 | 
							
								    prepare_console=partial(unix_console, height=1),
							 | 
						||
| 
								 | 
							
								)
							 | 
						||
| 
								 | 
							
								handle_events_unix_console_height_3 = partial(
							 | 
						||
| 
								 | 
							
								    handle_all_events, prepare_console=partial(unix_console, height=3)
							 | 
						||
| 
								 | 
							
								)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								TERM_CAPABILITIES = {
							 | 
						||
| 
								 | 
							
								    "bel": b"\x07",
							 | 
						||
| 
								 | 
							
								    "civis": b"\x1b[?25l",
							 | 
						||
| 
								 | 
							
								    "clear": b"\x1b[H\x1b[2J",
							 | 
						||
| 
								 | 
							
								    "cnorm": b"\x1b[?12l\x1b[?25h",
							 | 
						||
| 
								 | 
							
								    "cub": b"\x1b[%p1%dD",
							 | 
						||
| 
								 | 
							
								    "cub1": b"\x08",
							 | 
						||
| 
								 | 
							
								    "cud": b"\x1b[%p1%dB",
							 | 
						||
| 
								 | 
							
								    "cud1": b"\n",
							 | 
						||
| 
								 | 
							
								    "cuf": b"\x1b[%p1%dC",
							 | 
						||
| 
								 | 
							
								    "cuf1": b"\x1b[C",
							 | 
						||
| 
								 | 
							
								    "cup": b"\x1b[%i%p1%d;%p2%dH",
							 | 
						||
| 
								 | 
							
								    "cuu": b"\x1b[%p1%dA",
							 | 
						||
| 
								 | 
							
								    "cuu1": b"\x1b[A",
							 | 
						||
| 
								 | 
							
								    "dch1": b"\x1b[P",
							 | 
						||
| 
								 | 
							
								    "dch": b"\x1b[%p1%dP",
							 | 
						||
| 
								 | 
							
								    "el": b"\x1b[K",
							 | 
						||
| 
								 | 
							
								    "hpa": b"\x1b[%i%p1%dG",
							 | 
						||
| 
								 | 
							
								    "ich": b"\x1b[%p1%d@",
							 | 
						||
| 
								 | 
							
								    "ich1": None,
							 | 
						||
| 
								 | 
							
								    "ind": b"\n",
							 | 
						||
| 
								 | 
							
								    "pad": None,
							 | 
						||
| 
								 | 
							
								    "ri": b"\x1bM",
							 | 
						||
| 
								 | 
							
								    "rmkx": b"\x1b[?1l\x1b>",
							 | 
						||
| 
								 | 
							
								    "smkx": b"\x1b[?1h\x1b=",
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								@patch("_pyrepl.curses.tigetstr", lambda s: TERM_CAPABILITIES.get(s))
							 | 
						||
| 
								 | 
							
								@patch(
							 | 
						||
| 
								 | 
							
								    "_pyrepl.curses.tparm",
							 | 
						||
| 
								 | 
							
								    lambda s, *args: s + b":" + b",".join(str(i).encode() for i in args),
							 | 
						||
| 
								 | 
							
								)
							 | 
						||
| 
								 | 
							
								@patch("_pyrepl.curses.setupterm", lambda a, b: None)
							 | 
						||
| 
								 | 
							
								@patch(
							 | 
						||
| 
								 | 
							
								    "termios.tcgetattr",
							 | 
						||
| 
								 | 
							
								    lambda _: [
							 | 
						||
| 
								 | 
							
								        27394,
							 | 
						||
| 
								 | 
							
								        3,
							 | 
						||
| 
								 | 
							
								        19200,
							 | 
						||
| 
								 | 
							
								        536872399,
							 | 
						||
| 
								 | 
							
								        38400,
							 | 
						||
| 
								 | 
							
								        38400,
							 | 
						||
| 
								 | 
							
								        [
							 | 
						||
| 
								 | 
							
								            b"\x04",
							 | 
						||
| 
								 | 
							
								            b"\xff",
							 | 
						||
| 
								 | 
							
								            b"\xff",
							 | 
						||
| 
								 | 
							
								            b"\x7f",
							 | 
						||
| 
								 | 
							
								            b"\x17",
							 | 
						||
| 
								 | 
							
								            b"\x15",
							 | 
						||
| 
								 | 
							
								            b"\x12",
							 | 
						||
| 
								 | 
							
								            b"\x00",
							 | 
						||
| 
								 | 
							
								            b"\x03",
							 | 
						||
| 
								 | 
							
								            b"\x1c",
							 | 
						||
| 
								 | 
							
								            b"\x1a",
							 | 
						||
| 
								 | 
							
								            b"\x19",
							 | 
						||
| 
								 | 
							
								            b"\x11",
							 | 
						||
| 
								 | 
							
								            b"\x13",
							 | 
						||
| 
								 | 
							
								            b"\x16",
							 | 
						||
| 
								 | 
							
								            b"\x0f",
							 | 
						||
| 
								 | 
							
								            b"\x01",
							 | 
						||
| 
								 | 
							
								            b"\x00",
							 | 
						||
| 
								 | 
							
								            b"\x14",
							 | 
						||
| 
								 | 
							
								            b"\x00",
							 | 
						||
| 
								 | 
							
								        ],
							 | 
						||
| 
								 | 
							
								    ],
							 | 
						||
| 
								 | 
							
								)
							 | 
						||
| 
								 | 
							
								@patch("termios.tcsetattr", lambda a, b, c: None)
							 | 
						||
| 
								 | 
							
								@patch("os.write")
							 | 
						||
| 
								 | 
							
								class TestConsole(TestCase):
							 | 
						||
| 
								 | 
							
								    def test_simple_addition(self, _os_write):
							 | 
						||
| 
								 | 
							
								        code = "12+34"
							 | 
						||
| 
								 | 
							
								        events = code_to_events(code)
							 | 
						||
| 
								 | 
							
								        _, _ = handle_events_unix_console(events)
							 | 
						||
| 
								 | 
							
								        _os_write.assert_any_call(ANY, b"1")
							 | 
						||
| 
								 | 
							
								        _os_write.assert_any_call(ANY, b"2")
							 | 
						||
| 
								 | 
							
								        _os_write.assert_any_call(ANY, b"+")
							 | 
						||
| 
								 | 
							
								        _os_write.assert_any_call(ANY, b"3")
							 | 
						||
| 
								 | 
							
								        _os_write.assert_any_call(ANY, b"4")
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def test_wrap(self, _os_write):
							 | 
						||
| 
								 | 
							
								        code = "12+34"
							 | 
						||
| 
								 | 
							
								        events = code_to_events(code)
							 | 
						||
| 
								 | 
							
								        _, _ = handle_events_narrow_unix_console(events)
							 | 
						||
| 
								 | 
							
								        _os_write.assert_any_call(ANY, b"1")
							 | 
						||
| 
								 | 
							
								        _os_write.assert_any_call(ANY, b"2")
							 | 
						||
| 
								 | 
							
								        _os_write.assert_any_call(ANY, b"+")
							 | 
						||
| 
								 | 
							
								        _os_write.assert_any_call(ANY, b"3")
							 | 
						||
| 
								 | 
							
								        _os_write.assert_any_call(ANY, b"\\")
							 | 
						||
| 
								 | 
							
								        _os_write.assert_any_call(ANY, b"\n")
							 | 
						||
| 
								 | 
							
								        _os_write.assert_any_call(ANY, b"4")
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def test_cursor_left(self, _os_write):
							 | 
						||
| 
								 | 
							
								        code = "1"
							 | 
						||
| 
								 | 
							
								        events = itertools.chain(
							 | 
						||
| 
								 | 
							
								            code_to_events(code),
							 | 
						||
| 
								 | 
							
								            [Event(evt="key", data="left", raw=bytearray(b"\x1bOD"))],
							 | 
						||
| 
								 | 
							
								        )
							 | 
						||
| 
								 | 
							
								        _, _ = handle_events_unix_console(events)
							 | 
						||
| 
								 | 
							
								        _os_write.assert_any_call(ANY, TERM_CAPABILITIES["cub"] + b":1")
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def test_cursor_left_right(self, _os_write):
							 | 
						||
| 
								 | 
							
								        code = "1"
							 | 
						||
| 
								 | 
							
								        events = itertools.chain(
							 | 
						||
| 
								 | 
							
								            code_to_events(code),
							 | 
						||
| 
								 | 
							
								            [
							 | 
						||
| 
								 | 
							
								                Event(evt="key", data="left", raw=bytearray(b"\x1bOD")),
							 | 
						||
| 
								 | 
							
								                Event(evt="key", data="right", raw=bytearray(b"\x1bOC")),
							 | 
						||
| 
								 | 
							
								            ],
							 | 
						||
| 
								 | 
							
								        )
							 | 
						||
| 
								 | 
							
								        _, _ = handle_events_unix_console(events)
							 | 
						||
| 
								 | 
							
								        _os_write.assert_any_call(ANY, TERM_CAPABILITIES["cub"] + b":1")
							 | 
						||
| 
								 | 
							
								        _os_write.assert_any_call(ANY, TERM_CAPABILITIES["cuf"] + b":1")
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def test_cursor_up(self, _os_write):
							 | 
						||
| 
								 | 
							
								        code = "1\n2+3"
							 | 
						||
| 
								 | 
							
								        events = itertools.chain(
							 | 
						||
| 
								 | 
							
								            code_to_events(code),
							 | 
						||
| 
								 | 
							
								            [Event(evt="key", data="up", raw=bytearray(b"\x1bOA"))],
							 | 
						||
| 
								 | 
							
								        )
							 | 
						||
| 
								 | 
							
								        _, _ = handle_events_unix_console(events)
							 | 
						||
| 
								 | 
							
								        _os_write.assert_any_call(ANY, TERM_CAPABILITIES["cuu"] + b":1")
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def test_cursor_up_down(self, _os_write):
							 | 
						||
| 
								 | 
							
								        code = "1\n2+3"
							 | 
						||
| 
								 | 
							
								        events = itertools.chain(
							 | 
						||
| 
								 | 
							
								            code_to_events(code),
							 | 
						||
| 
								 | 
							
								            [
							 | 
						||
| 
								 | 
							
								                Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
							 | 
						||
| 
								 | 
							
								                Event(evt="key", data="down", raw=bytearray(b"\x1bOB")),
							 | 
						||
| 
								 | 
							
								            ],
							 | 
						||
| 
								 | 
							
								        )
							 | 
						||
| 
								 | 
							
								        _, _ = handle_events_unix_console(events)
							 | 
						||
| 
								 | 
							
								        _os_write.assert_any_call(ANY, TERM_CAPABILITIES["cuu"] + b":1")
							 | 
						||
| 
								 | 
							
								        _os_write.assert_any_call(ANY, TERM_CAPABILITIES["cud"] + b":1")
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def test_cursor_back_write(self, _os_write):
							 | 
						||
| 
								 | 
							
								        events = itertools.chain(
							 | 
						||
| 
								 | 
							
								            code_to_events("1"),
							 | 
						||
| 
								 | 
							
								            [Event(evt="key", data="left", raw=bytearray(b"\x1bOD"))],
							 | 
						||
| 
								 | 
							
								            code_to_events("2"),
							 | 
						||
| 
								 | 
							
								        )
							 | 
						||
| 
								 | 
							
								        _, _ = handle_events_unix_console(events)
							 | 
						||
| 
								 | 
							
								        _os_write.assert_any_call(ANY, b"1")
							 | 
						||
| 
								 | 
							
								        _os_write.assert_any_call(ANY, TERM_CAPABILITIES["cub"] + b":1")
							 | 
						||
| 
								 | 
							
								        _os_write.assert_any_call(ANY, b"2")
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def test_multiline_function_move_up_short_terminal(self, _os_write):
							 | 
						||
| 
								 | 
							
								        # fmt: off
							 | 
						||
| 
								 | 
							
								        code = (
							 | 
						||
| 
								 | 
							
								            "def f():\n"
							 | 
						||
| 
								 | 
							
								            "  foo"
							 | 
						||
| 
								 | 
							
								        )
							 | 
						||
| 
								 | 
							
								        # fmt: on
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        events = itertools.chain(
							 | 
						||
| 
								 | 
							
								            code_to_events(code),
							 | 
						||
| 
								 | 
							
								            [
							 | 
						||
| 
								 | 
							
								                Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
							 | 
						||
| 
								 | 
							
								                Event(evt="scroll", data=None),
							 | 
						||
| 
								 | 
							
								            ],
							 | 
						||
| 
								 | 
							
								        )
							 | 
						||
| 
								 | 
							
								        _, _ = handle_events_short_unix_console(events)
							 | 
						||
| 
								 | 
							
								        _os_write.assert_any_call(ANY, TERM_CAPABILITIES["ri"] + b":")
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def test_multiline_function_move_up_down_short_terminal(self, _os_write):
							 | 
						||
| 
								 | 
							
								        # fmt: off
							 | 
						||
| 
								 | 
							
								        code = (
							 | 
						||
| 
								 | 
							
								            "def f():\n"
							 | 
						||
| 
								 | 
							
								            "  foo"
							 | 
						||
| 
								 | 
							
								        )
							 | 
						||
| 
								 | 
							
								        # fmt: on
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        events = itertools.chain(
							 | 
						||
| 
								 | 
							
								            code_to_events(code),
							 | 
						||
| 
								 | 
							
								            [
							 | 
						||
| 
								 | 
							
								                Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
							 | 
						||
| 
								 | 
							
								                Event(evt="scroll", data=None),
							 | 
						||
| 
								 | 
							
								                Event(evt="key", data="down", raw=bytearray(b"\x1bOB")),
							 | 
						||
| 
								 | 
							
								                Event(evt="scroll", data=None),
							 | 
						||
| 
								 | 
							
								            ],
							 | 
						||
| 
								 | 
							
								        )
							 | 
						||
| 
								 | 
							
								        _, _ = handle_events_short_unix_console(events)
							 | 
						||
| 
								 | 
							
								        _os_write.assert_any_call(ANY, TERM_CAPABILITIES["ri"] + b":")
							 | 
						||
| 
								 | 
							
								        _os_write.assert_any_call(ANY, TERM_CAPABILITIES["ind"] + b":")
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def test_resize_bigger_on_multiline_function(self, _os_write):
							 | 
						||
| 
								 | 
							
								        # fmt: off
							 | 
						||
| 
								 | 
							
								        code = (
							 | 
						||
| 
								 | 
							
								            "def f():\n"
							 | 
						||
| 
								 | 
							
								            "  foo"
							 | 
						||
| 
								 | 
							
								        )
							 | 
						||
| 
								 | 
							
								        # fmt: on
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        events = itertools.chain(code_to_events(code))
							 | 
						||
| 
								 | 
							
								        reader, console = handle_events_short_unix_console(events)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        console.height = 2
							 | 
						||
| 
								 | 
							
								        console.getheightwidth = MagicMock(lambda _: (2, 80))
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        def same_reader(_):
							 | 
						||
| 
								 | 
							
								            return reader
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        def same_console(events):
							 | 
						||
| 
								 | 
							
								            console.get_event = MagicMock(side_effect=events)
							 | 
						||
| 
								 | 
							
								            return console
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        _, _ = handle_all_events(
							 | 
						||
| 
								 | 
							
								            [Event(evt="resize", data=None)],
							 | 
						||
| 
								 | 
							
								            prepare_reader=same_reader,
							 | 
						||
| 
								 | 
							
								            prepare_console=same_console,
							 | 
						||
| 
								 | 
							
								        )
							 | 
						||
| 
								 | 
							
								        _os_write.assert_has_calls(
							 | 
						||
| 
								 | 
							
								            [
							 | 
						||
| 
								 | 
							
								                call(ANY, TERM_CAPABILITIES["ri"] + b":"),
							 | 
						||
| 
								 | 
							
								                call(ANY, TERM_CAPABILITIES["cup"] + b":0,0"),
							 | 
						||
| 
								 | 
							
								                call(ANY, b"def f():"),
							 | 
						||
| 
								 | 
							
								            ]
							 | 
						||
| 
								 | 
							
								        )
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def test_resize_smaller_on_multiline_function(self, _os_write):
							 | 
						||
| 
								 | 
							
								        # fmt: off
							 | 
						||
| 
								 | 
							
								        code = (
							 | 
						||
| 
								 | 
							
								            "def f():\n"
							 | 
						||
| 
								 | 
							
								            "  foo"
							 | 
						||
| 
								 | 
							
								        )
							 | 
						||
| 
								 | 
							
								        # fmt: on
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        events = itertools.chain(code_to_events(code))
							 | 
						||
| 
								 | 
							
								        reader, console = handle_events_unix_console_height_3(events)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        console.height = 1
							 | 
						||
| 
								 | 
							
								        console.getheightwidth = MagicMock(lambda _: (1, 80))
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        def same_reader(_):
							 | 
						||
| 
								 | 
							
								            return reader
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        def same_console(events):
							 | 
						||
| 
								 | 
							
								            console.get_event = MagicMock(side_effect=events)
							 | 
						||
| 
								 | 
							
								            return console
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        _, _ = handle_all_events(
							 | 
						||
| 
								 | 
							
								            [Event(evt="resize", data=None)],
							 | 
						||
| 
								 | 
							
								            prepare_reader=same_reader,
							 | 
						||
| 
								 | 
							
								            prepare_console=same_console,
							 | 
						||
| 
								 | 
							
								        )
							 | 
						||
| 
								 | 
							
								        _os_write.assert_has_calls(
							 | 
						||
| 
								 | 
							
								            [
							 | 
						||
| 
								 | 
							
								                call(ANY, TERM_CAPABILITIES["ind"] + b":"),
							 | 
						||
| 
								 | 
							
								                call(ANY, TERM_CAPABILITIES["cup"] + b":0,0"),
							 | 
						||
| 
								 | 
							
								                call(ANY, b"  foo"),
							 | 
						||
| 
								 | 
							
								            ]
							 | 
						||
| 
								 | 
							
								        )
							 |