gh-134466: Don't run when termios is inaccessible (GH-138911)

Without the ability to set required capabilities, the REPL cannot
function properly (syntax highlighting and multiline editing can't
work).

We refuse to work in this degraded state.
This commit is contained in:
Łukasz Langa 2025-09-17 11:59:49 +01:00 committed by GitHub
parent 64c876dd68
commit 2fc7004d54
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 65 additions and 42 deletions

View file

@ -35,7 +35,7 @@
from . import terminfo
from .console import Console, Event
from .fancy_termios import tcgetattr, tcsetattr
from .fancy_termios import tcgetattr, tcsetattr, TermState
from .trace import trace
from .unix_eventqueue import EventQueue
from .utils import wlen
@ -51,16 +51,19 @@
# types
if TYPE_CHECKING:
from typing import IO, Literal, overload
from typing import AbstractSet, IO, Literal, overload, cast
else:
overload = lambda func: None
cast = lambda typ, val: val
class InvalidTerminal(RuntimeError):
pass
def __init__(self, message: str) -> None:
super().__init__(errno.EIO, message)
_error = (termios.error, InvalidTerminal)
_error_codes_to_ignore = frozenset([errno.EIO, errno.ENXIO, errno.EPERM])
SIGWINCH_EVENT = "repaint"
@ -125,12 +128,13 @@ def __init__(self):
def register(self, fd, flag):
self.fd = fd
# note: The 'timeout' argument is received as *milliseconds*
def poll(self, timeout: float | None = None) -> list[int]:
if timeout is None:
r, w, e = select.select([self.fd], [], [])
else:
r, w, e = select.select([self.fd], [], [], timeout/1000)
r, w, e = select.select([self.fd], [], [], timeout / 1000)
return r
poll = MinimalPoll # type: ignore[assignment]
@ -164,8 +168,15 @@ def __init__(
and os.getenv("TERM_PROGRAM") == "Apple_Terminal"
)
try:
self.__input_fd_set(tcgetattr(self.input_fd), ignore=frozenset())
except _error as e:
raise RuntimeError(f"termios failure ({e.args[1]})")
@overload
def _my_getstr(cap: str, optional: Literal[False] = False) -> bytes: ...
def _my_getstr(
cap: str, optional: Literal[False] = False
) -> bytes: ...
@overload
def _my_getstr(cap: str, optional: bool) -> bytes | None: ...
@ -205,7 +216,9 @@ def _my_getstr(cap: str, optional: bool = False) -> bytes | None:
self.__setup_movement()
self.event_queue = EventQueue(self.input_fd, self.encoding, self.terminfo)
self.event_queue = EventQueue(
self.input_fd, self.encoding, self.terminfo
)
self.cursor_visible = 1
signal.signal(signal.SIGCONT, self._sigcont_handler)
@ -217,7 +230,6 @@ def _sigcont_handler(self, signum, frame):
def __read(self, n: int) -> bytes:
return os.read(self.input_fd, n)
def change_encoding(self, encoding: str) -> None:
"""
Change the encoding used for I/O operations.
@ -329,6 +341,8 @@ def prepare(self):
"""
Prepare the console for input/output operations.
"""
self.__buffer = []
self.__svtermstate = tcgetattr(self.input_fd)
raw = self.__svtermstate.copy()
raw.iflag &= ~(termios.INPCK | termios.ISTRIP | termios.IXON)
@ -340,14 +354,7 @@ def prepare(self):
raw.lflag |= termios.ISIG
raw.cc[termios.VMIN] = 1
raw.cc[termios.VTIME] = 0
try:
tcsetattr(self.input_fd, termios.TCSADRAIN, raw)
except termios.error as e:
if e.args[0] != errno.EIO:
# gh-135329: when running under external programs (like strace),
# tcsetattr may fail with EIO. We can safely ignore this
# and continue with default terminal settings.
raise
self.__input_fd_set(raw)
# In macOS terminal we need to deactivate line wrap via ANSI escape code
if self.is_apple_terminal:
@ -356,8 +363,6 @@ def prepare(self):
self.screen = []
self.height, self.width = self.getheightwidth()
self.__buffer = []
self.posxy = 0, 0
self.__gone_tall = 0
self.__move = self.__move_short
@ -379,11 +384,7 @@ def restore(self):
self.__disable_bracketed_paste()
self.__maybe_write_code(self._rmkx)
self.flushoutput()
try:
tcsetattr(self.input_fd, termios.TCSADRAIN, self.__svtermstate)
except termios.error as e:
if e.args[0] != errno.EIO:
raise
self.__input_fd_set(self.__svtermstate)
if self.is_apple_terminal:
os.write(self.output_fd, b"\033[?7h")
@ -820,3 +821,17 @@ def __tputs(self, fmt, prog=delayprog):
os.write(self.output_fd, self._pad * nchars)
else:
time.sleep(float(delay) / 1000.0)
def __input_fd_set(
self,
state: TermState,
ignore: AbstractSet[int] = _error_codes_to_ignore,
) -> bool:
try:
tcsetattr(self.input_fd, termios.TCSADRAIN, state)
except termios.error as te:
if te.args[0] not in ignore:
raise
return False
else:
return True