| 
									
										
										
										
											2024-05-22 11:02:18 -04:00
										 |  |  | import itertools | 
					
						
							| 
									
										
										
										
											2024-05-31 17:18:28 +02:00
										 |  |  | import sys | 
					
						
							|  |  |  | import unittest | 
					
						
							| 
									
										
										
										
											2024-05-22 11:02:18 -04:00
										 |  |  | from functools import partial | 
					
						
							|  |  |  | from unittest import TestCase | 
					
						
							|  |  |  | from unittest.mock import MagicMock, call, patch, ANY | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | from .support import handle_all_events, code_to_events | 
					
						
							| 
									
										
										
										
											2024-06-03 18:15:37 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-05-31 17:18:28 +02:00
										 |  |  | try: | 
					
						
							|  |  |  |     from _pyrepl.console import Event | 
					
						
							|  |  |  |     from _pyrepl.unix_console import UnixConsole | 
					
						
							|  |  |  | except ImportError: | 
					
						
							|  |  |  |     pass | 
					
						
							| 
									
										
										
										
											2024-05-22 11:02:18 -04:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-06-03 18:15:37 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-05-22 11:02:18 -04:00
										 |  |  | def unix_console(events, **kwargs): | 
					
						
							|  |  |  |     console = UnixConsole() | 
					
						
							|  |  |  |     console.get_event = MagicMock(side_effect=events) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     height = kwargs.get("height", 25) | 
					
						
							|  |  |  |     width = kwargs.get("width", 80) | 
					
						
							|  |  |  |     console.getheightwidth = MagicMock(side_effect=lambda: (height, width)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     console.prepare() | 
					
						
							|  |  |  |     for key, val in kwargs.items(): | 
					
						
							|  |  |  |         setattr(console, key, val) | 
					
						
							|  |  |  |     return console | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | handle_events_unix_console = partial( | 
					
						
							|  |  |  |     handle_all_events, | 
					
						
							|  |  |  |     prepare_console=partial(unix_console), | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | handle_events_narrow_unix_console = partial( | 
					
						
							|  |  |  |     handle_all_events, | 
					
						
							|  |  |  |     prepare_console=partial(unix_console, width=5), | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | handle_events_short_unix_console = partial( | 
					
						
							|  |  |  |     handle_all_events, | 
					
						
							|  |  |  |     prepare_console=partial(unix_console, height=1), | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | handle_events_unix_console_height_3 = partial( | 
					
						
							|  |  |  |     handle_all_events, prepare_console=partial(unix_console, height=3) | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | TERM_CAPABILITIES = { | 
					
						
							|  |  |  |     "bel": b"\x07", | 
					
						
							|  |  |  |     "civis": b"\x1b[?25l", | 
					
						
							|  |  |  |     "clear": b"\x1b[H\x1b[2J", | 
					
						
							|  |  |  |     "cnorm": b"\x1b[?12l\x1b[?25h", | 
					
						
							|  |  |  |     "cub": b"\x1b[%p1%dD", | 
					
						
							|  |  |  |     "cub1": b"\x08", | 
					
						
							|  |  |  |     "cud": b"\x1b[%p1%dB", | 
					
						
							|  |  |  |     "cud1": b"\n", | 
					
						
							|  |  |  |     "cuf": b"\x1b[%p1%dC", | 
					
						
							|  |  |  |     "cuf1": b"\x1b[C", | 
					
						
							|  |  |  |     "cup": b"\x1b[%i%p1%d;%p2%dH", | 
					
						
							|  |  |  |     "cuu": b"\x1b[%p1%dA", | 
					
						
							|  |  |  |     "cuu1": b"\x1b[A", | 
					
						
							|  |  |  |     "dch1": b"\x1b[P", | 
					
						
							|  |  |  |     "dch": b"\x1b[%p1%dP", | 
					
						
							|  |  |  |     "el": b"\x1b[K", | 
					
						
							|  |  |  |     "hpa": b"\x1b[%i%p1%dG", | 
					
						
							|  |  |  |     "ich": b"\x1b[%p1%d@", | 
					
						
							|  |  |  |     "ich1": None, | 
					
						
							|  |  |  |     "ind": b"\n", | 
					
						
							|  |  |  |     "pad": None, | 
					
						
							|  |  |  |     "ri": b"\x1bM", | 
					
						
							|  |  |  |     "rmkx": b"\x1b[?1l\x1b>", | 
					
						
							|  |  |  |     "smkx": b"\x1b[?1h\x1b=", | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-05-31 17:18:28 +02:00
										 |  |  | @unittest.skipIf(sys.platform == "win32", "No Unix event queue on Windows") | 
					
						
							| 
									
										
										
										
											2024-05-22 11:02:18 -04:00
										 |  |  | @patch("_pyrepl.curses.tigetstr", lambda s: TERM_CAPABILITIES.get(s)) | 
					
						
							|  |  |  | @patch( | 
					
						
							|  |  |  |     "_pyrepl.curses.tparm", | 
					
						
							|  |  |  |     lambda s, *args: s + b":" + b",".join(str(i).encode() for i in args), | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | @patch("_pyrepl.curses.setupterm", lambda a, b: None) | 
					
						
							|  |  |  | @patch( | 
					
						
							|  |  |  |     "termios.tcgetattr", | 
					
						
							|  |  |  |     lambda _: [ | 
					
						
							|  |  |  |         27394, | 
					
						
							|  |  |  |         3, | 
					
						
							|  |  |  |         19200, | 
					
						
							|  |  |  |         536872399, | 
					
						
							|  |  |  |         38400, | 
					
						
							|  |  |  |         38400, | 
					
						
							|  |  |  |         [ | 
					
						
							|  |  |  |             b"\x04", | 
					
						
							|  |  |  |             b"\xff", | 
					
						
							|  |  |  |             b"\xff", | 
					
						
							|  |  |  |             b"\x7f", | 
					
						
							|  |  |  |             b"\x17", | 
					
						
							|  |  |  |             b"\x15", | 
					
						
							|  |  |  |             b"\x12", | 
					
						
							|  |  |  |             b"\x00", | 
					
						
							|  |  |  |             b"\x03", | 
					
						
							|  |  |  |             b"\x1c", | 
					
						
							|  |  |  |             b"\x1a", | 
					
						
							|  |  |  |             b"\x19", | 
					
						
							|  |  |  |             b"\x11", | 
					
						
							|  |  |  |             b"\x13", | 
					
						
							|  |  |  |             b"\x16", | 
					
						
							|  |  |  |             b"\x0f", | 
					
						
							|  |  |  |             b"\x01", | 
					
						
							|  |  |  |             b"\x00", | 
					
						
							|  |  |  |             b"\x14", | 
					
						
							|  |  |  |             b"\x00", | 
					
						
							|  |  |  |         ], | 
					
						
							|  |  |  |     ], | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | @patch("termios.tcsetattr", lambda a, b, c: None) | 
					
						
							|  |  |  | @patch("os.write") | 
					
						
							|  |  |  | class TestConsole(TestCase): | 
					
						
							|  |  |  |     def test_simple_addition(self, _os_write): | 
					
						
							|  |  |  |         code = "12+34" | 
					
						
							|  |  |  |         events = code_to_events(code) | 
					
						
							| 
									
										
										
										
											2024-05-23 19:58:59 +02:00
										 |  |  |         _, con = handle_events_unix_console(events) | 
					
						
							| 
									
										
										
										
											2024-05-22 11:02:18 -04:00
										 |  |  |         _os_write.assert_any_call(ANY, b"1") | 
					
						
							|  |  |  |         _os_write.assert_any_call(ANY, b"2") | 
					
						
							|  |  |  |         _os_write.assert_any_call(ANY, b"+") | 
					
						
							|  |  |  |         _os_write.assert_any_call(ANY, b"3") | 
					
						
							|  |  |  |         _os_write.assert_any_call(ANY, b"4") | 
					
						
							| 
									
										
										
										
											2024-05-23 19:58:59 +02:00
										 |  |  |         con.restore() | 
					
						
							| 
									
										
										
										
											2024-05-22 11:02:18 -04:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def test_wrap(self, _os_write): | 
					
						
							|  |  |  |         code = "12+34" | 
					
						
							|  |  |  |         events = code_to_events(code) | 
					
						
							| 
									
										
										
										
											2024-05-23 19:58:59 +02:00
										 |  |  |         _, con = handle_events_narrow_unix_console(events) | 
					
						
							| 
									
										
										
										
											2024-05-22 11:02:18 -04:00
										 |  |  |         _os_write.assert_any_call(ANY, b"1") | 
					
						
							|  |  |  |         _os_write.assert_any_call(ANY, b"2") | 
					
						
							|  |  |  |         _os_write.assert_any_call(ANY, b"+") | 
					
						
							|  |  |  |         _os_write.assert_any_call(ANY, b"3") | 
					
						
							|  |  |  |         _os_write.assert_any_call(ANY, b"\\") | 
					
						
							|  |  |  |         _os_write.assert_any_call(ANY, b"\n") | 
					
						
							|  |  |  |         _os_write.assert_any_call(ANY, b"4") | 
					
						
							| 
									
										
										
										
											2024-05-23 19:58:59 +02:00
										 |  |  |         con.restore() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-05-22 11:02:18 -04:00
										 |  |  |     def test_cursor_left(self, _os_write): | 
					
						
							|  |  |  |         code = "1" | 
					
						
							|  |  |  |         events = itertools.chain( | 
					
						
							|  |  |  |             code_to_events(code), | 
					
						
							|  |  |  |             [Event(evt="key", data="left", raw=bytearray(b"\x1bOD"))], | 
					
						
							|  |  |  |         ) | 
					
						
							| 
									
										
										
										
											2024-05-23 19:58:59 +02:00
										 |  |  |         _, con = handle_events_unix_console(events) | 
					
						
							| 
									
										
										
										
											2024-05-22 11:02:18 -04:00
										 |  |  |         _os_write.assert_any_call(ANY, TERM_CAPABILITIES["cub"] + b":1") | 
					
						
							| 
									
										
										
										
											2024-05-23 19:58:59 +02:00
										 |  |  |         con.restore() | 
					
						
							| 
									
										
										
										
											2024-05-22 11:02:18 -04:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def test_cursor_left_right(self, _os_write): | 
					
						
							|  |  |  |         code = "1" | 
					
						
							|  |  |  |         events = itertools.chain( | 
					
						
							|  |  |  |             code_to_events(code), | 
					
						
							|  |  |  |             [ | 
					
						
							|  |  |  |                 Event(evt="key", data="left", raw=bytearray(b"\x1bOD")), | 
					
						
							|  |  |  |                 Event(evt="key", data="right", raw=bytearray(b"\x1bOC")), | 
					
						
							|  |  |  |             ], | 
					
						
							|  |  |  |         ) | 
					
						
							| 
									
										
										
										
											2024-05-23 19:58:59 +02:00
										 |  |  |         _, con = handle_events_unix_console(events) | 
					
						
							| 
									
										
										
										
											2024-05-22 11:02:18 -04:00
										 |  |  |         _os_write.assert_any_call(ANY, TERM_CAPABILITIES["cub"] + b":1") | 
					
						
							|  |  |  |         _os_write.assert_any_call(ANY, TERM_CAPABILITIES["cuf"] + b":1") | 
					
						
							| 
									
										
										
										
											2024-05-23 19:58:59 +02:00
										 |  |  |         con.restore() | 
					
						
							| 
									
										
										
										
											2024-05-22 11:02:18 -04:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def test_cursor_up(self, _os_write): | 
					
						
							|  |  |  |         code = "1\n2+3" | 
					
						
							|  |  |  |         events = itertools.chain( | 
					
						
							|  |  |  |             code_to_events(code), | 
					
						
							|  |  |  |             [Event(evt="key", data="up", raw=bytearray(b"\x1bOA"))], | 
					
						
							|  |  |  |         ) | 
					
						
							| 
									
										
										
										
											2024-05-23 19:58:59 +02:00
										 |  |  |         _, con = handle_events_unix_console(events) | 
					
						
							| 
									
										
										
										
											2024-05-22 11:02:18 -04:00
										 |  |  |         _os_write.assert_any_call(ANY, TERM_CAPABILITIES["cuu"] + b":1") | 
					
						
							| 
									
										
										
										
											2024-05-23 19:58:59 +02:00
										 |  |  |         con.restore() | 
					
						
							| 
									
										
										
										
											2024-05-22 11:02:18 -04:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def test_cursor_up_down(self, _os_write): | 
					
						
							|  |  |  |         code = "1\n2+3" | 
					
						
							|  |  |  |         events = itertools.chain( | 
					
						
							|  |  |  |             code_to_events(code), | 
					
						
							|  |  |  |             [ | 
					
						
							|  |  |  |                 Event(evt="key", data="up", raw=bytearray(b"\x1bOA")), | 
					
						
							|  |  |  |                 Event(evt="key", data="down", raw=bytearray(b"\x1bOB")), | 
					
						
							|  |  |  |             ], | 
					
						
							|  |  |  |         ) | 
					
						
							| 
									
										
										
										
											2024-05-23 19:58:59 +02:00
										 |  |  |         _, con = handle_events_unix_console(events) | 
					
						
							| 
									
										
										
										
											2024-05-22 11:02:18 -04:00
										 |  |  |         _os_write.assert_any_call(ANY, TERM_CAPABILITIES["cuu"] + b":1") | 
					
						
							|  |  |  |         _os_write.assert_any_call(ANY, TERM_CAPABILITIES["cud"] + b":1") | 
					
						
							| 
									
										
										
										
											2024-05-23 19:58:59 +02:00
										 |  |  |         con.restore() | 
					
						
							| 
									
										
										
										
											2024-05-22 11:02:18 -04:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def test_cursor_back_write(self, _os_write): | 
					
						
							|  |  |  |         events = itertools.chain( | 
					
						
							|  |  |  |             code_to_events("1"), | 
					
						
							|  |  |  |             [Event(evt="key", data="left", raw=bytearray(b"\x1bOD"))], | 
					
						
							|  |  |  |             code_to_events("2"), | 
					
						
							|  |  |  |         ) | 
					
						
							| 
									
										
										
										
											2024-05-23 19:58:59 +02:00
										 |  |  |         _, con = handle_events_unix_console(events) | 
					
						
							| 
									
										
										
										
											2024-05-22 11:02:18 -04:00
										 |  |  |         _os_write.assert_any_call(ANY, b"1") | 
					
						
							|  |  |  |         _os_write.assert_any_call(ANY, TERM_CAPABILITIES["cub"] + b":1") | 
					
						
							|  |  |  |         _os_write.assert_any_call(ANY, b"2") | 
					
						
							| 
									
										
										
										
											2024-05-23 19:58:59 +02:00
										 |  |  |         con.restore() | 
					
						
							| 
									
										
										
										
											2024-05-22 11:02:18 -04:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def test_multiline_function_move_up_short_terminal(self, _os_write): | 
					
						
							|  |  |  |         # fmt: off | 
					
						
							|  |  |  |         code = ( | 
					
						
							|  |  |  |             "def f():\n" | 
					
						
							|  |  |  |             "  foo" | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |         # fmt: on | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         events = itertools.chain( | 
					
						
							|  |  |  |             code_to_events(code), | 
					
						
							|  |  |  |             [ | 
					
						
							|  |  |  |                 Event(evt="key", data="up", raw=bytearray(b"\x1bOA")), | 
					
						
							|  |  |  |                 Event(evt="scroll", data=None), | 
					
						
							|  |  |  |             ], | 
					
						
							|  |  |  |         ) | 
					
						
							| 
									
										
										
										
											2024-05-23 19:58:59 +02:00
										 |  |  |         _, con = handle_events_short_unix_console(events) | 
					
						
							| 
									
										
										
										
											2024-05-22 11:02:18 -04:00
										 |  |  |         _os_write.assert_any_call(ANY, TERM_CAPABILITIES["ri"] + b":") | 
					
						
							| 
									
										
										
										
											2024-05-23 19:58:59 +02:00
										 |  |  |         con.restore() | 
					
						
							| 
									
										
										
										
											2024-05-22 11:02:18 -04:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def test_multiline_function_move_up_down_short_terminal(self, _os_write): | 
					
						
							|  |  |  |         # fmt: off | 
					
						
							|  |  |  |         code = ( | 
					
						
							|  |  |  |             "def f():\n" | 
					
						
							|  |  |  |             "  foo" | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |         # fmt: on | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         events = itertools.chain( | 
					
						
							|  |  |  |             code_to_events(code), | 
					
						
							|  |  |  |             [ | 
					
						
							|  |  |  |                 Event(evt="key", data="up", raw=bytearray(b"\x1bOA")), | 
					
						
							|  |  |  |                 Event(evt="scroll", data=None), | 
					
						
							|  |  |  |                 Event(evt="key", data="down", raw=bytearray(b"\x1bOB")), | 
					
						
							|  |  |  |                 Event(evt="scroll", data=None), | 
					
						
							|  |  |  |             ], | 
					
						
							|  |  |  |         ) | 
					
						
							| 
									
										
										
										
											2024-05-23 19:58:59 +02:00
										 |  |  |         _, con = handle_events_short_unix_console(events) | 
					
						
							| 
									
										
										
										
											2024-05-22 11:02:18 -04:00
										 |  |  |         _os_write.assert_any_call(ANY, TERM_CAPABILITIES["ri"] + b":") | 
					
						
							|  |  |  |         _os_write.assert_any_call(ANY, TERM_CAPABILITIES["ind"] + b":") | 
					
						
							| 
									
										
										
										
											2024-05-23 19:58:59 +02:00
										 |  |  |         con.restore() | 
					
						
							| 
									
										
										
										
											2024-05-22 11:02:18 -04:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def test_resize_bigger_on_multiline_function(self, _os_write): | 
					
						
							|  |  |  |         # fmt: off | 
					
						
							|  |  |  |         code = ( | 
					
						
							|  |  |  |             "def f():\n" | 
					
						
							|  |  |  |             "  foo" | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |         # fmt: on | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         events = itertools.chain(code_to_events(code)) | 
					
						
							|  |  |  |         reader, console = handle_events_short_unix_console(events) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         console.height = 2 | 
					
						
							|  |  |  |         console.getheightwidth = MagicMock(lambda _: (2, 80)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         def same_reader(_): | 
					
						
							|  |  |  |             return reader | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         def same_console(events): | 
					
						
							|  |  |  |             console.get_event = MagicMock(side_effect=events) | 
					
						
							|  |  |  |             return console | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-05-23 19:58:59 +02:00
										 |  |  |         _, con = handle_all_events( | 
					
						
							| 
									
										
										
										
											2024-05-22 11:02:18 -04:00
										 |  |  |             [Event(evt="resize", data=None)], | 
					
						
							|  |  |  |             prepare_reader=same_reader, | 
					
						
							|  |  |  |             prepare_console=same_console, | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |         _os_write.assert_has_calls( | 
					
						
							|  |  |  |             [ | 
					
						
							|  |  |  |                 call(ANY, TERM_CAPABILITIES["ri"] + b":"), | 
					
						
							|  |  |  |                 call(ANY, TERM_CAPABILITIES["cup"] + b":0,0"), | 
					
						
							|  |  |  |                 call(ANY, b"def f():"), | 
					
						
							|  |  |  |             ] | 
					
						
							|  |  |  |         ) | 
					
						
							| 
									
										
										
										
											2024-05-23 19:58:59 +02:00
										 |  |  |         console.restore() | 
					
						
							|  |  |  |         con.restore() | 
					
						
							| 
									
										
										
										
											2024-05-22 11:02:18 -04:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def test_resize_smaller_on_multiline_function(self, _os_write): | 
					
						
							|  |  |  |         # fmt: off | 
					
						
							|  |  |  |         code = ( | 
					
						
							|  |  |  |             "def f():\n" | 
					
						
							|  |  |  |             "  foo" | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |         # fmt: on | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         events = itertools.chain(code_to_events(code)) | 
					
						
							|  |  |  |         reader, console = handle_events_unix_console_height_3(events) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         console.height = 1 | 
					
						
							|  |  |  |         console.getheightwidth = MagicMock(lambda _: (1, 80)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         def same_reader(_): | 
					
						
							|  |  |  |             return reader | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         def same_console(events): | 
					
						
							|  |  |  |             console.get_event = MagicMock(side_effect=events) | 
					
						
							|  |  |  |             return console | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-05-23 19:58:59 +02:00
										 |  |  |         _, con = handle_all_events( | 
					
						
							| 
									
										
										
										
											2024-05-22 11:02:18 -04:00
										 |  |  |             [Event(evt="resize", data=None)], | 
					
						
							|  |  |  |             prepare_reader=same_reader, | 
					
						
							|  |  |  |             prepare_console=same_console, | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |         _os_write.assert_has_calls( | 
					
						
							|  |  |  |             [ | 
					
						
							|  |  |  |                 call(ANY, TERM_CAPABILITIES["ind"] + b":"), | 
					
						
							|  |  |  |                 call(ANY, TERM_CAPABILITIES["cup"] + b":0,0"), | 
					
						
							|  |  |  |                 call(ANY, b"  foo"), | 
					
						
							|  |  |  |             ] | 
					
						
							|  |  |  |         ) | 
					
						
							| 
									
										
										
										
											2024-05-23 19:58:59 +02:00
										 |  |  |         console.restore() | 
					
						
							|  |  |  |         con.restore() |