mirror of
				https://github.com/python/cpython.git
				synced 2025-10-25 10:44:55 +00:00 
			
		
		
		
	 8ef7735c53
			
		
	
	
		8ef7735c53
		
			
		
	
	
	
	
		
			
			Signed-off-by: yihong0618 <zouzou0208@gmail.com> Co-authored-by: Bénédikt Tran <10796600+picnixz@users.noreply.github.com>
		
			
				
	
	
		
			314 lines
		
	
	
	
		
			9.6 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			314 lines
		
	
	
	
		
			9.6 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import itertools
 | |
| import os
 | |
| import sys
 | |
| import unittest
 | |
| from functools import partial
 | |
| from test.support import os_helper, force_not_colorized_test_class
 | |
| 
 | |
| from unittest import TestCase
 | |
| from unittest.mock import MagicMock, call, patch, ANY
 | |
| 
 | |
| from .support import handle_all_events, code_to_events
 | |
| 
 | |
| try:
 | |
|     from _pyrepl.console import Event
 | |
|     from _pyrepl.unix_console import UnixConsole
 | |
| except ImportError:
 | |
|     pass
 | |
| 
 | |
| from _pyrepl.terminfo import _TERMINAL_CAPABILITIES
 | |
| 
 | |
| TERM_CAPABILITIES = _TERMINAL_CAPABILITIES["ansi"]
 | |
| 
 | |
| 
 | |
| def unix_console(events, **kwargs):
 | |
|     console = UnixConsole(term="xterm")
 | |
|     console.get_event = MagicMock(side_effect=events)
 | |
|     console.getpending = MagicMock(return_value=Event("key", ""))
 | |
| 
 | |
|     height = kwargs.get("height", 25)
 | |
|     width = kwargs.get("width", 80)
 | |
|     console.getheightwidth = MagicMock(side_effect=lambda: (height, width))
 | |
|     console.wait = MagicMock()
 | |
| 
 | |
|     console.prepare()
 | |
|     for key, val in kwargs.items():
 | |
|         setattr(console, key, val)
 | |
|     return console
 | |
| 
 | |
| 
 | |
| handle_events_unix_console = partial(
 | |
|     handle_all_events,
 | |
|     prepare_console=unix_console,
 | |
| )
 | |
| handle_events_narrow_unix_console = partial(
 | |
|     handle_all_events,
 | |
|     prepare_console=partial(unix_console, width=5),
 | |
| )
 | |
| handle_events_short_unix_console = partial(
 | |
|     handle_all_events,
 | |
|     prepare_console=partial(unix_console, height=1),
 | |
| )
 | |
| handle_events_unix_console_height_3 = partial(
 | |
|     handle_all_events, prepare_console=partial(unix_console, height=3)
 | |
| )
 | |
| 
 | |
| 
 | |
| @unittest.skipIf(sys.platform == "win32", "No Unix event queue on Windows")
 | |
| @patch(
 | |
|     "_pyrepl.terminfo.tparm",
 | |
|     lambda s, *args: s + b":" + b",".join(str(i).encode() for i in args),
 | |
| )
 | |
| @patch(
 | |
|     "termios.tcgetattr",
 | |
|     lambda _: [
 | |
|         27394,
 | |
|         3,
 | |
|         19200,
 | |
|         536872399,
 | |
|         38400,
 | |
|         38400,
 | |
|         [
 | |
|             b"\x04",
 | |
|             b"\xff",
 | |
|             b"\xff",
 | |
|             b"\x7f",
 | |
|             b"\x17",
 | |
|             b"\x15",
 | |
|             b"\x12",
 | |
|             b"\x00",
 | |
|             b"\x03",
 | |
|             b"\x1c",
 | |
|             b"\x1a",
 | |
|             b"\x19",
 | |
|             b"\x11",
 | |
|             b"\x13",
 | |
|             b"\x16",
 | |
|             b"\x0f",
 | |
|             b"\x01",
 | |
|             b"\x00",
 | |
|             b"\x14",
 | |
|             b"\x00",
 | |
|         ],
 | |
|     ],
 | |
| )
 | |
| @patch("termios.tcsetattr", lambda a, b, c: None)
 | |
| @patch("os.write")
 | |
| @force_not_colorized_test_class
 | |
| class TestConsole(TestCase):
 | |
|     def test_simple_addition(self, _os_write):
 | |
|         code = "12+34"
 | |
|         events = code_to_events(code)
 | |
|         _, con = handle_events_unix_console(events)
 | |
|         _os_write.assert_any_call(ANY, b"1")
 | |
|         _os_write.assert_any_call(ANY, b"2")
 | |
|         _os_write.assert_any_call(ANY, b"+")
 | |
|         _os_write.assert_any_call(ANY, b"3")
 | |
|         _os_write.assert_any_call(ANY, b"4")
 | |
|         con.restore()
 | |
| 
 | |
|     def test_wrap(self, _os_write):
 | |
|         code = "12+34"
 | |
|         events = code_to_events(code)
 | |
|         _, con = handle_events_narrow_unix_console(events)
 | |
|         _os_write.assert_any_call(ANY, b"1")
 | |
|         _os_write.assert_any_call(ANY, b"2")
 | |
|         _os_write.assert_any_call(ANY, b"+")
 | |
|         _os_write.assert_any_call(ANY, b"3")
 | |
|         _os_write.assert_any_call(ANY, b"\\")
 | |
|         _os_write.assert_any_call(ANY, b"\n")
 | |
|         _os_write.assert_any_call(ANY, b"4")
 | |
|         con.restore()
 | |
| 
 | |
|     def test_cursor_left(self, _os_write):
 | |
|         code = "1"
 | |
|         events = itertools.chain(
 | |
|             code_to_events(code),
 | |
|             [Event(evt="key", data="left", raw=bytearray(b"\x1bOD"))],
 | |
|         )
 | |
|         _, con = handle_events_unix_console(events)
 | |
|         _os_write.assert_any_call(ANY, TERM_CAPABILITIES["cub"] + b":1")
 | |
|         con.restore()
 | |
| 
 | |
|     def test_cursor_left_right(self, _os_write):
 | |
|         code = "1"
 | |
|         events = itertools.chain(
 | |
|             code_to_events(code),
 | |
|             [
 | |
|                 Event(evt="key", data="left", raw=bytearray(b"\x1bOD")),
 | |
|                 Event(evt="key", data="right", raw=bytearray(b"\x1bOC")),
 | |
|             ],
 | |
|         )
 | |
|         _, con = handle_events_unix_console(events)
 | |
|         _os_write.assert_any_call(ANY, TERM_CAPABILITIES["cub"] + b":1")
 | |
|         _os_write.assert_any_call(ANY, TERM_CAPABILITIES["cuf"] + b":1")
 | |
|         con.restore()
 | |
| 
 | |
|     def test_cursor_up(self, _os_write):
 | |
|         code = "1\n2+3"
 | |
|         events = itertools.chain(
 | |
|             code_to_events(code),
 | |
|             [Event(evt="key", data="up", raw=bytearray(b"\x1bOA"))],
 | |
|         )
 | |
|         _, con = handle_events_unix_console(events)
 | |
|         _os_write.assert_any_call(ANY, TERM_CAPABILITIES["cuu"] + b":1")
 | |
|         con.restore()
 | |
| 
 | |
|     def test_cursor_up_down(self, _os_write):
 | |
|         code = "1\n2+3"
 | |
|         events = itertools.chain(
 | |
|             code_to_events(code),
 | |
|             [
 | |
|                 Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
 | |
|                 Event(evt="key", data="down", raw=bytearray(b"\x1bOB")),
 | |
|             ],
 | |
|         )
 | |
|         _, con = handle_events_unix_console(events)
 | |
|         _os_write.assert_any_call(ANY, TERM_CAPABILITIES["cuu"] + b":1")
 | |
|         _os_write.assert_any_call(ANY, TERM_CAPABILITIES["cud"] + b":1")
 | |
|         con.restore()
 | |
| 
 | |
|     def test_cursor_back_write(self, _os_write):
 | |
|         events = itertools.chain(
 | |
|             code_to_events("1"),
 | |
|             [Event(evt="key", data="left", raw=bytearray(b"\x1bOD"))],
 | |
|             code_to_events("2"),
 | |
|         )
 | |
|         _, con = handle_events_unix_console(events)
 | |
|         _os_write.assert_any_call(ANY, b"1")
 | |
|         _os_write.assert_any_call(ANY, TERM_CAPABILITIES["cub"] + b":1")
 | |
|         _os_write.assert_any_call(ANY, b"2")
 | |
|         con.restore()
 | |
| 
 | |
|     def test_multiline_function_move_up_short_terminal(self, _os_write):
 | |
|         # fmt: off
 | |
|         code = (
 | |
|             "def f():\n"
 | |
|             "  foo"
 | |
|         )
 | |
|         # fmt: on
 | |
| 
 | |
|         events = itertools.chain(
 | |
|             code_to_events(code),
 | |
|             [
 | |
|                 Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
 | |
|                 Event(evt="scroll", data=None),
 | |
|             ],
 | |
|         )
 | |
|         _, con = handle_events_short_unix_console(events)
 | |
|         _os_write.assert_any_call(ANY, TERM_CAPABILITIES["ri"] + b":")
 | |
|         con.restore()
 | |
| 
 | |
|     def test_multiline_function_move_up_down_short_terminal(self, _os_write):
 | |
|         # fmt: off
 | |
|         code = (
 | |
|             "def f():\n"
 | |
|             "  foo"
 | |
|         )
 | |
|         # fmt: on
 | |
| 
 | |
|         events = itertools.chain(
 | |
|             code_to_events(code),
 | |
|             [
 | |
|                 Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
 | |
|                 Event(evt="scroll", data=None),
 | |
|                 Event(evt="key", data="down", raw=bytearray(b"\x1bOB")),
 | |
|                 Event(evt="scroll", data=None),
 | |
|             ],
 | |
|         )
 | |
|         _, con = handle_events_short_unix_console(events)
 | |
|         _os_write.assert_any_call(ANY, TERM_CAPABILITIES["ri"] + b":")
 | |
|         _os_write.assert_any_call(ANY, TERM_CAPABILITIES["ind"] + b":")
 | |
|         con.restore()
 | |
| 
 | |
|     def test_resize_bigger_on_multiline_function(self, _os_write):
 | |
|         # fmt: off
 | |
|         code = (
 | |
|             "def f():\n"
 | |
|             "  foo"
 | |
|         )
 | |
|         # fmt: on
 | |
| 
 | |
|         events = itertools.chain(code_to_events(code))
 | |
|         reader, console = handle_events_short_unix_console(events)
 | |
| 
 | |
|         console.height = 2
 | |
|         console.getheightwidth = MagicMock(lambda _: (2, 80))
 | |
| 
 | |
|         def same_reader(_):
 | |
|             return reader
 | |
| 
 | |
|         def same_console(events):
 | |
|             console.get_event = MagicMock(side_effect=events)
 | |
|             return console
 | |
| 
 | |
|         _, con = handle_all_events(
 | |
|             [Event(evt="resize", data=None)],
 | |
|             prepare_reader=same_reader,
 | |
|             prepare_console=same_console,
 | |
|         )
 | |
|         _os_write.assert_has_calls(
 | |
|             [
 | |
|                 call(ANY, TERM_CAPABILITIES["ri"] + b":"),
 | |
|                 call(ANY, TERM_CAPABILITIES["cup"] + b":0,0"),
 | |
|                 call(ANY, b"def f():"),
 | |
|             ]
 | |
|         )
 | |
|         console.restore()
 | |
|         con.restore()
 | |
| 
 | |
|     def test_resize_smaller_on_multiline_function(self, _os_write):
 | |
|         # fmt: off
 | |
|         code = (
 | |
|             "def f():\n"
 | |
|             "  foo"
 | |
|         )
 | |
|         # fmt: on
 | |
| 
 | |
|         events = itertools.chain(code_to_events(code))
 | |
|         reader, console = handle_events_unix_console_height_3(events)
 | |
| 
 | |
|         console.height = 1
 | |
|         console.getheightwidth = MagicMock(lambda _: (1, 80))
 | |
| 
 | |
|         def same_reader(_):
 | |
|             return reader
 | |
| 
 | |
|         def same_console(events):
 | |
|             console.get_event = MagicMock(side_effect=events)
 | |
|             return console
 | |
| 
 | |
|         _, con = handle_all_events(
 | |
|             [Event(evt="resize", data=None)],
 | |
|             prepare_reader=same_reader,
 | |
|             prepare_console=same_console,
 | |
|         )
 | |
|         _os_write.assert_has_calls(
 | |
|             [
 | |
|                 call(ANY, TERM_CAPABILITIES["ind"] + b":"),
 | |
|                 call(ANY, TERM_CAPABILITIES["cup"] + b":0,0"),
 | |
|                 call(ANY, b"  foo"),
 | |
|             ]
 | |
|         )
 | |
|         console.restore()
 | |
|         con.restore()
 | |
| 
 | |
|     def test_getheightwidth_with_invalid_environ(self, _os_write):
 | |
|         # gh-128636
 | |
|         console = UnixConsole(term="xterm")
 | |
|         with os_helper.EnvironmentVarGuard() as env:
 | |
|             env["LINES"] = ""
 | |
|             self.assertIsInstance(console.getheightwidth(), tuple)
 | |
|             env["COLUMNS"] = ""
 | |
|             self.assertIsInstance(console.getheightwidth(), tuple)
 | |
|             os.environ = []
 | |
|             self.assertIsInstance(console.getheightwidth(), tuple)
 | |
| 
 | |
|     @unittest.skipUnless(sys.platform == "darwin", "requires macOS")
 | |
|     def test_restore_with_invalid_environ_on_macos(self, _os_write):
 | |
|         # gh-128636 for macOS
 | |
|         console = UnixConsole(term="xterm")
 | |
|         with os_helper.EnvironmentVarGuard():
 | |
|             os.environ = []
 | |
|             console.prepare()  # needed to call restore()
 | |
|             console.restore()  # this should succeed
 |