mirror of
https://github.com/python/cpython.git
synced 2025-12-08 06:10:17 +00:00
[3.14] gh-134466: Don't run when termios is inaccessible (GH-138911) (GH-139029)
Without the ability to set required capabilities, the REPL cannot
function properly (syntax highlighting and multiline editing can't
work).
We refuse to work in this degraded state.
(cherry picked from commit 2fc7004d54)
Co-authored-by: Łukasz Langa <lukasz@langa.pl>
This commit is contained in:
parent
89a7b05f23
commit
9853bbb1dc
3 changed files with 65 additions and 42 deletions
|
|
@ -20,19 +20,25 @@
|
||||||
import termios
|
import termios
|
||||||
|
|
||||||
|
|
||||||
class TermState:
|
TYPE_CHECKING = False
|
||||||
def __init__(self, tuples):
|
|
||||||
(
|
|
||||||
self.iflag,
|
|
||||||
self.oflag,
|
|
||||||
self.cflag,
|
|
||||||
self.lflag,
|
|
||||||
self.ispeed,
|
|
||||||
self.ospeed,
|
|
||||||
self.cc,
|
|
||||||
) = tuples
|
|
||||||
|
|
||||||
def as_list(self):
|
if TYPE_CHECKING:
|
||||||
|
from typing import cast
|
||||||
|
else:
|
||||||
|
cast = lambda typ, val: val
|
||||||
|
|
||||||
|
|
||||||
|
class TermState:
|
||||||
|
def __init__(self, attrs: list[int | list[bytes]]) -> None:
|
||||||
|
self.iflag = cast(int, attrs[0])
|
||||||
|
self.oflag = cast(int, attrs[1])
|
||||||
|
self.cflag = cast(int, attrs[2])
|
||||||
|
self.lflag = cast(int, attrs[3])
|
||||||
|
self.ispeed = cast(int, attrs[4])
|
||||||
|
self.ospeed = cast(int, attrs[5])
|
||||||
|
self.cc = cast(list[bytes], attrs[6])
|
||||||
|
|
||||||
|
def as_list(self) -> list[int | list[bytes]]:
|
||||||
return [
|
return [
|
||||||
self.iflag,
|
self.iflag,
|
||||||
self.oflag,
|
self.oflag,
|
||||||
|
|
@ -45,32 +51,32 @@ def as_list(self):
|
||||||
self.cc[:],
|
self.cc[:],
|
||||||
]
|
]
|
||||||
|
|
||||||
def copy(self):
|
def copy(self) -> "TermState":
|
||||||
return self.__class__(self.as_list())
|
return self.__class__(self.as_list())
|
||||||
|
|
||||||
|
|
||||||
def tcgetattr(fd):
|
def tcgetattr(fd: int) -> TermState:
|
||||||
return TermState(termios.tcgetattr(fd))
|
return TermState(termios.tcgetattr(fd))
|
||||||
|
|
||||||
|
|
||||||
def tcsetattr(fd, when, attrs):
|
def tcsetattr(fd: int, when: int, attrs: TermState) -> None:
|
||||||
termios.tcsetattr(fd, when, attrs.as_list())
|
termios.tcsetattr(fd, when, attrs.as_list())
|
||||||
|
|
||||||
|
|
||||||
class Term(TermState):
|
class Term(TermState):
|
||||||
TS__init__ = TermState.__init__
|
TS__init__ = TermState.__init__
|
||||||
|
|
||||||
def __init__(self, fd=0):
|
def __init__(self, fd: int = 0) -> None:
|
||||||
self.TS__init__(termios.tcgetattr(fd))
|
self.TS__init__(termios.tcgetattr(fd))
|
||||||
self.fd = fd
|
self.fd = fd
|
||||||
self.stack = []
|
self.stack: list[list[int | list[bytes]]] = []
|
||||||
|
|
||||||
def save(self):
|
def save(self) -> None:
|
||||||
self.stack.append(self.as_list())
|
self.stack.append(self.as_list())
|
||||||
|
|
||||||
def set(self, when=termios.TCSANOW):
|
def set(self, when: int = termios.TCSANOW) -> None:
|
||||||
termios.tcsetattr(self.fd, when, self.as_list())
|
termios.tcsetattr(self.fd, when, self.as_list())
|
||||||
|
|
||||||
def restore(self):
|
def restore(self) -> None:
|
||||||
self.TS__init__(self.stack.pop())
|
self.TS__init__(self.stack.pop())
|
||||||
self.set()
|
self.set()
|
||||||
|
|
|
||||||
|
|
@ -35,7 +35,7 @@
|
||||||
|
|
||||||
from . import terminfo
|
from . import terminfo
|
||||||
from .console import Console, Event
|
from .console import Console, Event
|
||||||
from .fancy_termios import tcgetattr, tcsetattr
|
from .fancy_termios import tcgetattr, tcsetattr, TermState
|
||||||
from .trace import trace
|
from .trace import trace
|
||||||
from .unix_eventqueue import EventQueue
|
from .unix_eventqueue import EventQueue
|
||||||
from .utils import wlen
|
from .utils import wlen
|
||||||
|
|
@ -51,16 +51,19 @@
|
||||||
|
|
||||||
# types
|
# types
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from typing import IO, Literal, overload
|
from typing import AbstractSet, IO, Literal, overload, cast
|
||||||
else:
|
else:
|
||||||
overload = lambda func: None
|
overload = lambda func: None
|
||||||
|
cast = lambda typ, val: val
|
||||||
|
|
||||||
|
|
||||||
class InvalidTerminal(RuntimeError):
|
class InvalidTerminal(RuntimeError):
|
||||||
pass
|
def __init__(self, message: str) -> None:
|
||||||
|
super().__init__(errno.EIO, message)
|
||||||
|
|
||||||
|
|
||||||
_error = (termios.error, InvalidTerminal)
|
_error = (termios.error, InvalidTerminal)
|
||||||
|
_error_codes_to_ignore = frozenset([errno.EIO, errno.ENXIO, errno.EPERM])
|
||||||
|
|
||||||
SIGWINCH_EVENT = "repaint"
|
SIGWINCH_EVENT = "repaint"
|
||||||
|
|
||||||
|
|
@ -125,12 +128,13 @@ def __init__(self):
|
||||||
|
|
||||||
def register(self, fd, flag):
|
def register(self, fd, flag):
|
||||||
self.fd = fd
|
self.fd = fd
|
||||||
|
|
||||||
# note: The 'timeout' argument is received as *milliseconds*
|
# note: The 'timeout' argument is received as *milliseconds*
|
||||||
def poll(self, timeout: float | None = None) -> list[int]:
|
def poll(self, timeout: float | None = None) -> list[int]:
|
||||||
if timeout is None:
|
if timeout is None:
|
||||||
r, w, e = select.select([self.fd], [], [])
|
r, w, e = select.select([self.fd], [], [])
|
||||||
else:
|
else:
|
||||||
r, w, e = select.select([self.fd], [], [], timeout/1000)
|
r, w, e = select.select([self.fd], [], [], timeout / 1000)
|
||||||
return r
|
return r
|
||||||
|
|
||||||
poll = MinimalPoll # type: ignore[assignment]
|
poll = MinimalPoll # type: ignore[assignment]
|
||||||
|
|
@ -164,8 +168,15 @@ def __init__(
|
||||||
and os.getenv("TERM_PROGRAM") == "Apple_Terminal"
|
and os.getenv("TERM_PROGRAM") == "Apple_Terminal"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.__input_fd_set(tcgetattr(self.input_fd), ignore=frozenset())
|
||||||
|
except _error as e:
|
||||||
|
raise RuntimeError(f"termios failure ({e.args[1]})")
|
||||||
|
|
||||||
@overload
|
@overload
|
||||||
def _my_getstr(cap: str, optional: Literal[False] = False) -> bytes: ...
|
def _my_getstr(
|
||||||
|
cap: str, optional: Literal[False] = False
|
||||||
|
) -> bytes: ...
|
||||||
|
|
||||||
@overload
|
@overload
|
||||||
def _my_getstr(cap: str, optional: bool) -> bytes | None: ...
|
def _my_getstr(cap: str, optional: bool) -> bytes | None: ...
|
||||||
|
|
@ -205,7 +216,9 @@ def _my_getstr(cap: str, optional: bool = False) -> bytes | None:
|
||||||
|
|
||||||
self.__setup_movement()
|
self.__setup_movement()
|
||||||
|
|
||||||
self.event_queue = EventQueue(self.input_fd, self.encoding, self.terminfo)
|
self.event_queue = EventQueue(
|
||||||
|
self.input_fd, self.encoding, self.terminfo
|
||||||
|
)
|
||||||
self.cursor_visible = 1
|
self.cursor_visible = 1
|
||||||
|
|
||||||
signal.signal(signal.SIGCONT, self._sigcont_handler)
|
signal.signal(signal.SIGCONT, self._sigcont_handler)
|
||||||
|
|
@ -217,7 +230,6 @@ def _sigcont_handler(self, signum, frame):
|
||||||
def __read(self, n: int) -> bytes:
|
def __read(self, n: int) -> bytes:
|
||||||
return os.read(self.input_fd, n)
|
return os.read(self.input_fd, n)
|
||||||
|
|
||||||
|
|
||||||
def change_encoding(self, encoding: str) -> None:
|
def change_encoding(self, encoding: str) -> None:
|
||||||
"""
|
"""
|
||||||
Change the encoding used for I/O operations.
|
Change the encoding used for I/O operations.
|
||||||
|
|
@ -329,6 +341,8 @@ def prepare(self):
|
||||||
"""
|
"""
|
||||||
Prepare the console for input/output operations.
|
Prepare the console for input/output operations.
|
||||||
"""
|
"""
|
||||||
|
self.__buffer = []
|
||||||
|
|
||||||
self.__svtermstate = tcgetattr(self.input_fd)
|
self.__svtermstate = tcgetattr(self.input_fd)
|
||||||
raw = self.__svtermstate.copy()
|
raw = self.__svtermstate.copy()
|
||||||
raw.iflag &= ~(termios.INPCK | termios.ISTRIP | termios.IXON)
|
raw.iflag &= ~(termios.INPCK | termios.ISTRIP | termios.IXON)
|
||||||
|
|
@ -340,14 +354,7 @@ def prepare(self):
|
||||||
raw.lflag |= termios.ISIG
|
raw.lflag |= termios.ISIG
|
||||||
raw.cc[termios.VMIN] = 1
|
raw.cc[termios.VMIN] = 1
|
||||||
raw.cc[termios.VTIME] = 0
|
raw.cc[termios.VTIME] = 0
|
||||||
try:
|
self.__input_fd_set(raw)
|
||||||
tcsetattr(self.input_fd, termios.TCSADRAIN, raw)
|
|
||||||
except termios.error as e:
|
|
||||||
if e.args[0] != errno.EIO:
|
|
||||||
# gh-135329: when running under external programs (like strace),
|
|
||||||
# tcsetattr may fail with EIO. We can safely ignore this
|
|
||||||
# and continue with default terminal settings.
|
|
||||||
raise
|
|
||||||
|
|
||||||
# In macOS terminal we need to deactivate line wrap via ANSI escape code
|
# In macOS terminal we need to deactivate line wrap via ANSI escape code
|
||||||
if self.is_apple_terminal:
|
if self.is_apple_terminal:
|
||||||
|
|
@ -356,8 +363,6 @@ def prepare(self):
|
||||||
self.screen = []
|
self.screen = []
|
||||||
self.height, self.width = self.getheightwidth()
|
self.height, self.width = self.getheightwidth()
|
||||||
|
|
||||||
self.__buffer = []
|
|
||||||
|
|
||||||
self.posxy = 0, 0
|
self.posxy = 0, 0
|
||||||
self.__gone_tall = 0
|
self.__gone_tall = 0
|
||||||
self.__move = self.__move_short
|
self.__move = self.__move_short
|
||||||
|
|
@ -379,11 +384,7 @@ def restore(self):
|
||||||
self.__disable_bracketed_paste()
|
self.__disable_bracketed_paste()
|
||||||
self.__maybe_write_code(self._rmkx)
|
self.__maybe_write_code(self._rmkx)
|
||||||
self.flushoutput()
|
self.flushoutput()
|
||||||
try:
|
self.__input_fd_set(self.__svtermstate)
|
||||||
tcsetattr(self.input_fd, termios.TCSADRAIN, self.__svtermstate)
|
|
||||||
except termios.error as e:
|
|
||||||
if e.args[0] != errno.EIO:
|
|
||||||
raise
|
|
||||||
|
|
||||||
if self.is_apple_terminal:
|
if self.is_apple_terminal:
|
||||||
os.write(self.output_fd, b"\033[?7h")
|
os.write(self.output_fd, b"\033[?7h")
|
||||||
|
|
@ -820,3 +821,17 @@ def __tputs(self, fmt, prog=delayprog):
|
||||||
os.write(self.output_fd, self._pad * nchars)
|
os.write(self.output_fd, self._pad * nchars)
|
||||||
else:
|
else:
|
||||||
time.sleep(float(delay) / 1000.0)
|
time.sleep(float(delay) / 1000.0)
|
||||||
|
|
||||||
|
def __input_fd_set(
|
||||||
|
self,
|
||||||
|
state: TermState,
|
||||||
|
ignore: AbstractSet[int] = _error_codes_to_ignore,
|
||||||
|
) -> bool:
|
||||||
|
try:
|
||||||
|
tcsetattr(self.input_fd, termios.TCSADRAIN, state)
|
||||||
|
except termios.error as te:
|
||||||
|
if te.args[0] not in ignore:
|
||||||
|
raise
|
||||||
|
return False
|
||||||
|
else:
|
||||||
|
return True
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,2 @@
|
||||||
|
Don't run PyREPL in a degraded environment where setting termios attributes
|
||||||
|
is not allowed.
|
||||||
Loading…
Add table
Add a link
Reference in a new issue