cpython/Lib/test/test_pyrepl/test_unix_console.py
yihong b9dbf6acb3
gh-135329: prevent infinite traceback loop on Ctrl-C for strace (GH-138133)
Signed-off-by: yihong0618 <zouzou0208@gmail.com>
Co-authored-by: dura0ok <slpmcf@gmail.com>
Co-authored-by: graymon <greyschwinger@gmail.com>
Co-authored-by: Bénédikt Tran <10796600+picnixz@users.noreply.github.com>
Co-authored-by: Łukasz Langa <lukasz@langa.pl>
2025-09-16 12:39:03 +02:00

374 lines
12 KiB
Python

import errno
import itertools
import os
import signal
import subprocess
import sys
import unittest
from functools import partial
from test.support import os_helper, force_not_colorized_test_class
from test.support import script_helper
from unittest import TestCase
from unittest.mock import MagicMock, call, patch, ANY, Mock
from .support import handle_all_events, code_to_events
try:
from _pyrepl.console import Event
from _pyrepl.unix_console import UnixConsole
except ImportError:
pass
from _pyrepl.terminfo import _TERMINAL_CAPABILITIES
TERM_CAPABILITIES = _TERMINAL_CAPABILITIES["ansi"]
def unix_console(events, **kwargs):
console = UnixConsole(term="xterm")
console.get_event = MagicMock(side_effect=events)
console.getpending = MagicMock(return_value=Event("key", ""))
height = kwargs.get("height", 25)
width = kwargs.get("width", 80)
console.getheightwidth = MagicMock(side_effect=lambda: (height, width))
console.wait = MagicMock()
console.prepare()
for key, val in kwargs.items():
setattr(console, key, val)
return console
handle_events_unix_console = partial(
handle_all_events,
prepare_console=unix_console,
)
handle_events_narrow_unix_console = partial(
handle_all_events,
prepare_console=partial(unix_console, width=5),
)
handle_events_short_unix_console = partial(
handle_all_events,
prepare_console=partial(unix_console, height=1),
)
handle_events_unix_console_height_3 = partial(
handle_all_events, prepare_console=partial(unix_console, height=3)
)
@unittest.skipIf(sys.platform == "win32", "No Unix event queue on Windows")
@patch(
"_pyrepl.terminfo.tparm",
lambda s, *args: s + b":" + b",".join(str(i).encode() for i in args),
)
@patch(
"termios.tcgetattr",
lambda _: [
27394,
3,
19200,
536872399,
38400,
38400,
[
b"\x04",
b"\xff",
b"\xff",
b"\x7f",
b"\x17",
b"\x15",
b"\x12",
b"\x00",
b"\x03",
b"\x1c",
b"\x1a",
b"\x19",
b"\x11",
b"\x13",
b"\x16",
b"\x0f",
b"\x01",
b"\x00",
b"\x14",
b"\x00",
],
],
)
@patch("termios.tcsetattr", lambda a, b, c: None)
@patch("os.write")
@force_not_colorized_test_class
class TestConsole(TestCase):
def test_simple_addition(self, _os_write):
code = "12+34"
events = code_to_events(code)
_, con = handle_events_unix_console(events)
_os_write.assert_any_call(ANY, b"1")
_os_write.assert_any_call(ANY, b"2")
_os_write.assert_any_call(ANY, b"+")
_os_write.assert_any_call(ANY, b"3")
_os_write.assert_any_call(ANY, b"4")
con.restore()
def test_wrap(self, _os_write):
code = "12+34"
events = code_to_events(code)
_, con = handle_events_narrow_unix_console(events)
_os_write.assert_any_call(ANY, b"1")
_os_write.assert_any_call(ANY, b"2")
_os_write.assert_any_call(ANY, b"+")
_os_write.assert_any_call(ANY, b"3")
_os_write.assert_any_call(ANY, b"\\")
_os_write.assert_any_call(ANY, b"\n")
_os_write.assert_any_call(ANY, b"4")
con.restore()
def test_cursor_left(self, _os_write):
code = "1"
events = itertools.chain(
code_to_events(code),
[Event(evt="key", data="left", raw=bytearray(b"\x1bOD"))],
)
_, con = handle_events_unix_console(events)
_os_write.assert_any_call(ANY, TERM_CAPABILITIES["cub"] + b":1")
con.restore()
def test_cursor_left_right(self, _os_write):
code = "1"
events = itertools.chain(
code_to_events(code),
[
Event(evt="key", data="left", raw=bytearray(b"\x1bOD")),
Event(evt="key", data="right", raw=bytearray(b"\x1bOC")),
],
)
_, con = handle_events_unix_console(events)
_os_write.assert_any_call(ANY, TERM_CAPABILITIES["cub"] + b":1")
_os_write.assert_any_call(ANY, TERM_CAPABILITIES["cuf"] + b":1")
con.restore()
def test_cursor_up(self, _os_write):
code = "1\n2+3"
events = itertools.chain(
code_to_events(code),
[Event(evt="key", data="up", raw=bytearray(b"\x1bOA"))],
)
_, con = handle_events_unix_console(events)
_os_write.assert_any_call(ANY, TERM_CAPABILITIES["cuu"] + b":1")
con.restore()
def test_cursor_up_down(self, _os_write):
code = "1\n2+3"
events = itertools.chain(
code_to_events(code),
[
Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
Event(evt="key", data="down", raw=bytearray(b"\x1bOB")),
],
)
_, con = handle_events_unix_console(events)
_os_write.assert_any_call(ANY, TERM_CAPABILITIES["cuu"] + b":1")
_os_write.assert_any_call(ANY, TERM_CAPABILITIES["cud"] + b":1")
con.restore()
def test_cursor_back_write(self, _os_write):
events = itertools.chain(
code_to_events("1"),
[Event(evt="key", data="left", raw=bytearray(b"\x1bOD"))],
code_to_events("2"),
)
_, con = handle_events_unix_console(events)
_os_write.assert_any_call(ANY, b"1")
_os_write.assert_any_call(ANY, TERM_CAPABILITIES["cub"] + b":1")
_os_write.assert_any_call(ANY, b"2")
con.restore()
def test_multiline_function_move_up_short_terminal(self, _os_write):
# fmt: off
code = (
"def f():\n"
" foo"
)
# fmt: on
events = itertools.chain(
code_to_events(code),
[
Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
Event(evt="scroll", data=None),
],
)
_, con = handle_events_short_unix_console(events)
_os_write.assert_any_call(ANY, TERM_CAPABILITIES["ri"] + b":")
con.restore()
def test_multiline_function_move_up_down_short_terminal(self, _os_write):
# fmt: off
code = (
"def f():\n"
" foo"
)
# fmt: on
events = itertools.chain(
code_to_events(code),
[
Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
Event(evt="scroll", data=None),
Event(evt="key", data="down", raw=bytearray(b"\x1bOB")),
Event(evt="scroll", data=None),
],
)
_, con = handle_events_short_unix_console(events)
_os_write.assert_any_call(ANY, TERM_CAPABILITIES["ri"] + b":")
_os_write.assert_any_call(ANY, TERM_CAPABILITIES["ind"] + b":")
con.restore()
def test_resize_bigger_on_multiline_function(self, _os_write):
# fmt: off
code = (
"def f():\n"
" foo"
)
# fmt: on
events = itertools.chain(code_to_events(code))
reader, console = handle_events_short_unix_console(events)
console.height = 2
console.getheightwidth = MagicMock(lambda _: (2, 80))
def same_reader(_):
return reader
def same_console(events):
console.get_event = MagicMock(side_effect=events)
return console
_, con = handle_all_events(
[Event(evt="resize", data=None)],
prepare_reader=same_reader,
prepare_console=same_console,
)
_os_write.assert_has_calls(
[
call(ANY, TERM_CAPABILITIES["ri"] + b":"),
call(ANY, TERM_CAPABILITIES["cup"] + b":0,0"),
call(ANY, b"def f():"),
]
)
console.restore()
con.restore()
def test_resize_smaller_on_multiline_function(self, _os_write):
# fmt: off
code = (
"def f():\n"
" foo"
)
# fmt: on
events = itertools.chain(code_to_events(code))
reader, console = handle_events_unix_console_height_3(events)
console.height = 1
console.getheightwidth = MagicMock(lambda _: (1, 80))
def same_reader(_):
return reader
def same_console(events):
console.get_event = MagicMock(side_effect=events)
return console
_, con = handle_all_events(
[Event(evt="resize", data=None)],
prepare_reader=same_reader,
prepare_console=same_console,
)
_os_write.assert_has_calls(
[
call(ANY, TERM_CAPABILITIES["ind"] + b":"),
call(ANY, TERM_CAPABILITIES["cup"] + b":0,0"),
call(ANY, b" foo"),
]
)
console.restore()
con.restore()
def test_getheightwidth_with_invalid_environ(self, _os_write):
# gh-128636
console = UnixConsole(term="xterm")
with os_helper.EnvironmentVarGuard() as env:
env["LINES"] = ""
self.assertIsInstance(console.getheightwidth(), tuple)
env["COLUMNS"] = ""
self.assertIsInstance(console.getheightwidth(), tuple)
os.environ = []
self.assertIsInstance(console.getheightwidth(), tuple)
@unittest.skipUnless(sys.platform == "darwin", "requires macOS")
def test_restore_with_invalid_environ_on_macos(self, _os_write):
# gh-128636 for macOS
console = UnixConsole(term="xterm")
with os_helper.EnvironmentVarGuard():
os.environ = []
console.prepare() # needed to call restore()
console.restore() # this should succeed
@unittest.skipIf(sys.platform == "win32", "No Unix console on Windows")
class TestUnixConsoleEIOHandling(TestCase):
@patch('_pyrepl.unix_console.tcsetattr')
@patch('_pyrepl.unix_console.tcgetattr')
def test_eio_error_handling_in_restore(self, mock_tcgetattr, mock_tcsetattr):
import termios
mock_termios = Mock()
mock_termios.iflag = 0
mock_termios.oflag = 0
mock_termios.cflag = 0
mock_termios.lflag = 0
mock_termios.cc = [0] * 32
mock_termios.copy.return_value = mock_termios
mock_tcgetattr.return_value = mock_termios
console = UnixConsole(term="xterm")
console.prepare()
mock_tcsetattr.side_effect = termios.error(errno.EIO, "Input/output error")
# EIO error should be handled gracefully in restore()
console.restore()
@unittest.skipUnless(sys.platform == "linux", "Only valid on Linux")
def test_repl_eio(self):
# Use the pty-based approach to simulate EIO error
script_path = os.path.join(os.path.dirname(__file__), "eio_test_script.py")
proc = script_helper.spawn_python(
"-S", script_path,
stderr=subprocess.PIPE,
text=True
)
ready_line = proc.stdout.readline().strip()
if ready_line != "READY" or proc.poll() is not None:
self.fail("Child process failed to start properly")
os.kill(proc.pid, signal.SIGUSR1)
_, err = proc.communicate(timeout=5) # sleep for pty to settle
self.assertEqual(
proc.returncode,
1,
f"Expected EIO/ENXIO error, got return code {proc.returncode}",
)
self.assertTrue(
(
"Got EIO:" in err
or "Got ENXIO:" in err
),
f"Expected EIO/ENXIO error message in stderr: {err}",
)