mirror of
				https://github.com/python/cpython.git
				synced 2025-10-30 21:21:22 +00:00 
			
		
		
		
	
		
			
	
	
		
			295 lines
		
	
	
	
		
			8.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
		
		
			
		
	
	
			295 lines
		
	
	
	
		
			8.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
|   | import itertools | ||
|  | from functools import partial | ||
|  | from unittest import TestCase | ||
|  | from unittest.mock import MagicMock, call, patch, ANY | ||
|  | 
 | ||
|  | from .support import handle_all_events, code_to_events | ||
|  | from _pyrepl.console import Event | ||
|  | from _pyrepl.unix_console import UnixConsole | ||
|  | 
 | ||
|  | 
 | ||
|  | def unix_console(events, **kwargs): | ||
|  |     console = UnixConsole() | ||
|  |     console.get_event = MagicMock(side_effect=events) | ||
|  | 
 | ||
|  |     height = kwargs.get("height", 25) | ||
|  |     width = kwargs.get("width", 80) | ||
|  |     console.getheightwidth = MagicMock(side_effect=lambda: (height, width)) | ||
|  | 
 | ||
|  |     console.prepare() | ||
|  |     for key, val in kwargs.items(): | ||
|  |         setattr(console, key, val) | ||
|  |     return console | ||
|  | 
 | ||
|  | 
 | ||
|  | handle_events_unix_console = partial( | ||
|  |     handle_all_events, | ||
|  |     prepare_console=partial(unix_console), | ||
|  | ) | ||
|  | handle_events_narrow_unix_console = partial( | ||
|  |     handle_all_events, | ||
|  |     prepare_console=partial(unix_console, width=5), | ||
|  | ) | ||
|  | handle_events_short_unix_console = partial( | ||
|  |     handle_all_events, | ||
|  |     prepare_console=partial(unix_console, height=1), | ||
|  | ) | ||
|  | handle_events_unix_console_height_3 = partial( | ||
|  |     handle_all_events, prepare_console=partial(unix_console, height=3) | ||
|  | ) | ||
|  | 
 | ||
|  | 
 | ||
|  | TERM_CAPABILITIES = { | ||
|  |     "bel": b"\x07", | ||
|  |     "civis": b"\x1b[?25l", | ||
|  |     "clear": b"\x1b[H\x1b[2J", | ||
|  |     "cnorm": b"\x1b[?12l\x1b[?25h", | ||
|  |     "cub": b"\x1b[%p1%dD", | ||
|  |     "cub1": b"\x08", | ||
|  |     "cud": b"\x1b[%p1%dB", | ||
|  |     "cud1": b"\n", | ||
|  |     "cuf": b"\x1b[%p1%dC", | ||
|  |     "cuf1": b"\x1b[C", | ||
|  |     "cup": b"\x1b[%i%p1%d;%p2%dH", | ||
|  |     "cuu": b"\x1b[%p1%dA", | ||
|  |     "cuu1": b"\x1b[A", | ||
|  |     "dch1": b"\x1b[P", | ||
|  |     "dch": b"\x1b[%p1%dP", | ||
|  |     "el": b"\x1b[K", | ||
|  |     "hpa": b"\x1b[%i%p1%dG", | ||
|  |     "ich": b"\x1b[%p1%d@", | ||
|  |     "ich1": None, | ||
|  |     "ind": b"\n", | ||
|  |     "pad": None, | ||
|  |     "ri": b"\x1bM", | ||
|  |     "rmkx": b"\x1b[?1l\x1b>", | ||
|  |     "smkx": b"\x1b[?1h\x1b=", | ||
|  | } | ||
|  | 
 | ||
|  | 
 | ||
|  | @patch("_pyrepl.curses.tigetstr", lambda s: TERM_CAPABILITIES.get(s)) | ||
|  | @patch( | ||
|  |     "_pyrepl.curses.tparm", | ||
|  |     lambda s, *args: s + b":" + b",".join(str(i).encode() for i in args), | ||
|  | ) | ||
|  | @patch("_pyrepl.curses.setupterm", lambda a, b: None) | ||
|  | @patch( | ||
|  |     "termios.tcgetattr", | ||
|  |     lambda _: [ | ||
|  |         27394, | ||
|  |         3, | ||
|  |         19200, | ||
|  |         536872399, | ||
|  |         38400, | ||
|  |         38400, | ||
|  |         [ | ||
|  |             b"\x04", | ||
|  |             b"\xff", | ||
|  |             b"\xff", | ||
|  |             b"\x7f", | ||
|  |             b"\x17", | ||
|  |             b"\x15", | ||
|  |             b"\x12", | ||
|  |             b"\x00", | ||
|  |             b"\x03", | ||
|  |             b"\x1c", | ||
|  |             b"\x1a", | ||
|  |             b"\x19", | ||
|  |             b"\x11", | ||
|  |             b"\x13", | ||
|  |             b"\x16", | ||
|  |             b"\x0f", | ||
|  |             b"\x01", | ||
|  |             b"\x00", | ||
|  |             b"\x14", | ||
|  |             b"\x00", | ||
|  |         ], | ||
|  |     ], | ||
|  | ) | ||
|  | @patch("termios.tcsetattr", lambda a, b, c: None) | ||
|  | @patch("os.write") | ||
|  | class TestConsole(TestCase): | ||
|  |     def test_simple_addition(self, _os_write): | ||
|  |         code = "12+34" | ||
|  |         events = code_to_events(code) | ||
|  |         _, _ = handle_events_unix_console(events) | ||
|  |         _os_write.assert_any_call(ANY, b"1") | ||
|  |         _os_write.assert_any_call(ANY, b"2") | ||
|  |         _os_write.assert_any_call(ANY, b"+") | ||
|  |         _os_write.assert_any_call(ANY, b"3") | ||
|  |         _os_write.assert_any_call(ANY, b"4") | ||
|  | 
 | ||
|  |     def test_wrap(self, _os_write): | ||
|  |         code = "12+34" | ||
|  |         events = code_to_events(code) | ||
|  |         _, _ = handle_events_narrow_unix_console(events) | ||
|  |         _os_write.assert_any_call(ANY, b"1") | ||
|  |         _os_write.assert_any_call(ANY, b"2") | ||
|  |         _os_write.assert_any_call(ANY, b"+") | ||
|  |         _os_write.assert_any_call(ANY, b"3") | ||
|  |         _os_write.assert_any_call(ANY, b"\\") | ||
|  |         _os_write.assert_any_call(ANY, b"\n") | ||
|  |         _os_write.assert_any_call(ANY, b"4") | ||
|  | 
 | ||
|  |     def test_cursor_left(self, _os_write): | ||
|  |         code = "1" | ||
|  |         events = itertools.chain( | ||
|  |             code_to_events(code), | ||
|  |             [Event(evt="key", data="left", raw=bytearray(b"\x1bOD"))], | ||
|  |         ) | ||
|  |         _, _ = handle_events_unix_console(events) | ||
|  |         _os_write.assert_any_call(ANY, TERM_CAPABILITIES["cub"] + b":1") | ||
|  | 
 | ||
|  |     def test_cursor_left_right(self, _os_write): | ||
|  |         code = "1" | ||
|  |         events = itertools.chain( | ||
|  |             code_to_events(code), | ||
|  |             [ | ||
|  |                 Event(evt="key", data="left", raw=bytearray(b"\x1bOD")), | ||
|  |                 Event(evt="key", data="right", raw=bytearray(b"\x1bOC")), | ||
|  |             ], | ||
|  |         ) | ||
|  |         _, _ = handle_events_unix_console(events) | ||
|  |         _os_write.assert_any_call(ANY, TERM_CAPABILITIES["cub"] + b":1") | ||
|  |         _os_write.assert_any_call(ANY, TERM_CAPABILITIES["cuf"] + b":1") | ||
|  | 
 | ||
|  |     def test_cursor_up(self, _os_write): | ||
|  |         code = "1\n2+3" | ||
|  |         events = itertools.chain( | ||
|  |             code_to_events(code), | ||
|  |             [Event(evt="key", data="up", raw=bytearray(b"\x1bOA"))], | ||
|  |         ) | ||
|  |         _, _ = handle_events_unix_console(events) | ||
|  |         _os_write.assert_any_call(ANY, TERM_CAPABILITIES["cuu"] + b":1") | ||
|  | 
 | ||
|  |     def test_cursor_up_down(self, _os_write): | ||
|  |         code = "1\n2+3" | ||
|  |         events = itertools.chain( | ||
|  |             code_to_events(code), | ||
|  |             [ | ||
|  |                 Event(evt="key", data="up", raw=bytearray(b"\x1bOA")), | ||
|  |                 Event(evt="key", data="down", raw=bytearray(b"\x1bOB")), | ||
|  |             ], | ||
|  |         ) | ||
|  |         _, _ = handle_events_unix_console(events) | ||
|  |         _os_write.assert_any_call(ANY, TERM_CAPABILITIES["cuu"] + b":1") | ||
|  |         _os_write.assert_any_call(ANY, TERM_CAPABILITIES["cud"] + b":1") | ||
|  | 
 | ||
|  |     def test_cursor_back_write(self, _os_write): | ||
|  |         events = itertools.chain( | ||
|  |             code_to_events("1"), | ||
|  |             [Event(evt="key", data="left", raw=bytearray(b"\x1bOD"))], | ||
|  |             code_to_events("2"), | ||
|  |         ) | ||
|  |         _, _ = handle_events_unix_console(events) | ||
|  |         _os_write.assert_any_call(ANY, b"1") | ||
|  |         _os_write.assert_any_call(ANY, TERM_CAPABILITIES["cub"] + b":1") | ||
|  |         _os_write.assert_any_call(ANY, b"2") | ||
|  | 
 | ||
|  |     def test_multiline_function_move_up_short_terminal(self, _os_write): | ||
|  |         # fmt: off | ||
|  |         code = ( | ||
|  |             "def f():\n" | ||
|  |             "  foo" | ||
|  |         ) | ||
|  |         # fmt: on | ||
|  | 
 | ||
|  |         events = itertools.chain( | ||
|  |             code_to_events(code), | ||
|  |             [ | ||
|  |                 Event(evt="key", data="up", raw=bytearray(b"\x1bOA")), | ||
|  |                 Event(evt="scroll", data=None), | ||
|  |             ], | ||
|  |         ) | ||
|  |         _, _ = handle_events_short_unix_console(events) | ||
|  |         _os_write.assert_any_call(ANY, TERM_CAPABILITIES["ri"] + b":") | ||
|  | 
 | ||
|  |     def test_multiline_function_move_up_down_short_terminal(self, _os_write): | ||
|  |         # fmt: off | ||
|  |         code = ( | ||
|  |             "def f():\n" | ||
|  |             "  foo" | ||
|  |         ) | ||
|  |         # fmt: on | ||
|  | 
 | ||
|  |         events = itertools.chain( | ||
|  |             code_to_events(code), | ||
|  |             [ | ||
|  |                 Event(evt="key", data="up", raw=bytearray(b"\x1bOA")), | ||
|  |                 Event(evt="scroll", data=None), | ||
|  |                 Event(evt="key", data="down", raw=bytearray(b"\x1bOB")), | ||
|  |                 Event(evt="scroll", data=None), | ||
|  |             ], | ||
|  |         ) | ||
|  |         _, _ = handle_events_short_unix_console(events) | ||
|  |         _os_write.assert_any_call(ANY, TERM_CAPABILITIES["ri"] + b":") | ||
|  |         _os_write.assert_any_call(ANY, TERM_CAPABILITIES["ind"] + b":") | ||
|  | 
 | ||
|  |     def test_resize_bigger_on_multiline_function(self, _os_write): | ||
|  |         # fmt: off | ||
|  |         code = ( | ||
|  |             "def f():\n" | ||
|  |             "  foo" | ||
|  |         ) | ||
|  |         # fmt: on | ||
|  | 
 | ||
|  |         events = itertools.chain(code_to_events(code)) | ||
|  |         reader, console = handle_events_short_unix_console(events) | ||
|  | 
 | ||
|  |         console.height = 2 | ||
|  |         console.getheightwidth = MagicMock(lambda _: (2, 80)) | ||
|  | 
 | ||
|  |         def same_reader(_): | ||
|  |             return reader | ||
|  | 
 | ||
|  |         def same_console(events): | ||
|  |             console.get_event = MagicMock(side_effect=events) | ||
|  |             return console | ||
|  | 
 | ||
|  |         _, _ = handle_all_events( | ||
|  |             [Event(evt="resize", data=None)], | ||
|  |             prepare_reader=same_reader, | ||
|  |             prepare_console=same_console, | ||
|  |         ) | ||
|  |         _os_write.assert_has_calls( | ||
|  |             [ | ||
|  |                 call(ANY, TERM_CAPABILITIES["ri"] + b":"), | ||
|  |                 call(ANY, TERM_CAPABILITIES["cup"] + b":0,0"), | ||
|  |                 call(ANY, b"def f():"), | ||
|  |             ] | ||
|  |         ) | ||
|  | 
 | ||
|  |     def test_resize_smaller_on_multiline_function(self, _os_write): | ||
|  |         # fmt: off | ||
|  |         code = ( | ||
|  |             "def f():\n" | ||
|  |             "  foo" | ||
|  |         ) | ||
|  |         # fmt: on | ||
|  | 
 | ||
|  |         events = itertools.chain(code_to_events(code)) | ||
|  |         reader, console = handle_events_unix_console_height_3(events) | ||
|  | 
 | ||
|  |         console.height = 1 | ||
|  |         console.getheightwidth = MagicMock(lambda _: (1, 80)) | ||
|  | 
 | ||
|  |         def same_reader(_): | ||
|  |             return reader | ||
|  | 
 | ||
|  |         def same_console(events): | ||
|  |             console.get_event = MagicMock(side_effect=events) | ||
|  |             return console | ||
|  | 
 | ||
|  |         _, _ = handle_all_events( | ||
|  |             [Event(evt="resize", data=None)], | ||
|  |             prepare_reader=same_reader, | ||
|  |             prepare_console=same_console, | ||
|  |         ) | ||
|  |         _os_write.assert_has_calls( | ||
|  |             [ | ||
|  |                 call(ANY, TERM_CAPABILITIES["ind"] + b":"), | ||
|  |                 call(ANY, TERM_CAPABILITIES["cup"] + b":0,0"), | ||
|  |                 call(ANY, b"  foo"), | ||
|  |             ] | ||
|  |         ) |