cpython/Lib/_pyrepl/unix_console.py

1068 lines
34 KiB
Python

# Copyright 2000-2010 Michael Hudson-Doyle <micahel@gmail.com>
# Antonio Cuni
# Armin Rigo
#
# All Rights Reserved
#
#
# Permission to use, copy, modify, and distribute this software and
# its documentation for any purpose is hereby granted without fee,
# provided that the above copyright notice appear in all copies and
# that both that copyright notice and this permission notice appear in
# supporting documentation.
#
# THE AUTHOR MICHAEL HUDSON DISCLAIMS ALL WARRANTIES WITH REGARD TO
# THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
# AND FITNESS, IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL,
# INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER
# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF
# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
from __future__ import annotations
import errno
import os
import re
import select
import signal
import struct
import termios
import time
import types
import platform
from collections.abc import Callable
from dataclasses import dataclass
from fcntl import ioctl
from typing import TYPE_CHECKING, cast, overload
from . import terminfo
from .console import Console, Event
from .fancy_termios import tcgetattr, tcsetattr, TermState
from .render import (
EMPTY_RENDER_LINE,
LineUpdate,
RenderLine,
RenderedScreen,
requires_cursor_resync,
diff_render_lines,
render_cells,
)
from .trace import trace, trace_text
from .unix_eventqueue import EventQueue
# declare posix optional to allow None assignment on other platforms
posix: types.ModuleType | None
try:
import posix
except ImportError:
posix = None
# types
if TYPE_CHECKING:
from typing import AbstractSet, IO, Literal
type _MoveFunc = Callable[[int, int], None]
type _PendingWrite = tuple[str | bytes, bool]
class InvalidTerminal(RuntimeError):
def __init__(self, message: str) -> None:
super().__init__(errno.EIO, message)
_error = (termios.error, InvalidTerminal)
_error_codes_to_ignore = frozenset([errno.EIO, errno.ENXIO, errno.EPERM])
SIGWINCH_EVENT = "repaint"
FIONREAD = getattr(termios, "FIONREAD", None)
TIOCGWINSZ = getattr(termios, "TIOCGWINSZ", None)
# ------------ start of baudrate definitions ------------
# Add (possibly) missing baudrates (check termios man page) to termios
def add_baudrate_if_supported(dictionary: dict[int, int], rate: int) -> None:
baudrate_name = "B%d" % rate
if hasattr(termios, baudrate_name):
dictionary[getattr(termios, baudrate_name)] = rate
# Check the termios man page (Line speed) to know where these
# values come from.
potential_baudrates = [
0,
110,
115200,
1200,
134,
150,
1800,
19200,
200,
230400,
2400,
300,
38400,
460800,
4800,
50,
57600,
600,
75,
9600,
]
ratedict: dict[int, int] = {}
for rate in potential_baudrates:
add_baudrate_if_supported(ratedict, rate)
# Clean up variables to avoid unintended usage
del rate, add_baudrate_if_supported
# ------------ end of baudrate definitions ------------
delayprog = re.compile(b"\\$<([0-9]+)((?:/|\\*){0,2})>")
try:
poll: type[select.poll] = select.poll
except AttributeError:
# this is exactly the minimum necessary to support what we
# do with poll objects
class MinimalPoll:
def __init__(self):
pass
def register(self, fd, flag):
self.fd = fd
# note: The 'timeout' argument is received as *milliseconds*
def poll(self, timeout: float | None = None) -> list[int]:
if timeout is None:
r, w, e = select.select([self.fd], [], [])
else:
r, w, e = select.select([self.fd], [], [], timeout / 1000)
return r
poll = MinimalPoll # type: ignore[assignment]
@dataclass(frozen=True, slots=True)
class UnixRefreshPlan:
"""Instructions for updating the terminal after a screen change.
After the user types ``e`` to complete ``name``::
Before: >>> def greet(nam|):
â–²
LineUpdate here: insert_char "e"
After: >>> def greet(name|):
â–²
Only the changed cells are sent to the terminal; unchanged rows
are skipped entirely.
"""
grow_lines: int
"""Number of blank lines to append at the bottom to accommodate new content."""
use_tall_mode: bool
"""Use absolute cursor addressing via ``cup`` instead of relative moves.
Activated when content exceeds one screen height."""
offset: int
"""Vertical scroll offset: the buffer row displayed at the top of the terminal window."""
reverse_scroll: int
"""Number of lines to scroll backwards (content moves down)."""
forward_scroll: int
"""Number of lines to scroll forwards (content moves up)."""
line_updates: tuple[LineUpdate, ...]
cleared_lines: tuple[int, ...]
"""Row indices to erase (old content with no replacement)."""
rendered_screen: RenderedScreen
cursor: tuple[int, int]
class UnixConsole(Console):
__buffer: list[_PendingWrite]
__gone_tall: bool
__move: _MoveFunc
__offset: int
def __init__(
self,
f_in: IO[bytes] | int = 0,
f_out: IO[bytes] | int = 1,
term: str = "",
encoding: str = "",
):
"""
Initialize the UnixConsole.
Parameters:
- f_in (int or file-like object): Input file descriptor or object.
- f_out (int or file-like object): Output file descriptor or object.
- term (str): Terminal name.
- encoding (str): Encoding to use for I/O operations.
"""
super().__init__(f_in, f_out, term, encoding)
self.pollob = poll()
self.pollob.register(self.input_fd, select.POLLIN)
self.terminfo = terminfo.TermInfo(term or None)
self.term = term
self.is_apple_terminal = (
platform.system() == "Darwin"
and os.getenv("TERM_PROGRAM") == "Apple_Terminal"
)
try:
self.__input_fd_set(tcgetattr(self.input_fd), ignore=frozenset())
except _error as e:
raise RuntimeError(f"termios failure ({e.args[1]})")
@overload
def _my_getstr(
cap: str, optional: Literal[False] = False
) -> bytes: ...
@overload
def _my_getstr(cap: str, optional: bool) -> bytes | None: ...
def _my_getstr(cap: str, optional: bool = False) -> bytes | None:
r = self.terminfo.get(cap)
if not optional and r is None:
raise InvalidTerminal(
f"terminal doesn't have the required {cap} capability"
)
return r
self._bel = _my_getstr("bel")
self._civis = _my_getstr("civis", optional=True)
self._clear = _my_getstr("clear")
self._cnorm = _my_getstr("cnorm", optional=True)
self._cub = _my_getstr("cub", optional=True)
self._cub1 = _my_getstr("cub1", optional=True)
self._cud = _my_getstr("cud", optional=True)
self._cud1 = _my_getstr("cud1", optional=True)
self._cuf = _my_getstr("cuf", optional=True)
self._cuf1 = _my_getstr("cuf1", optional=True)
self._cup = _my_getstr("cup")
self._cuu = _my_getstr("cuu", optional=True)
self._cuu1 = _my_getstr("cuu1", optional=True)
self._dch1 = _my_getstr("dch1", optional=True)
self._dch = _my_getstr("dch", optional=True)
self._el = _my_getstr("el")
self._hpa = _my_getstr("hpa", optional=True)
self._ich = _my_getstr("ich", optional=True)
self._ich1 = _my_getstr("ich1", optional=True)
self._ind = _my_getstr("ind", optional=True)
self._pad = _my_getstr("pad", optional=True)
self._ri = _my_getstr("ri", optional=True)
self._rmkx = _my_getstr("rmkx", optional=True)
self._smkx = _my_getstr("smkx", optional=True)
self.__setup_movement()
self.event_queue = EventQueue(
self.input_fd, self.encoding, self.terminfo
)
self.cursor_visible = True
signal.signal(signal.SIGCONT, self._sigcont_handler)
def _sigcont_handler(self, signum, frame):
self.restore()
self.prepare()
def __read(self, n: int) -> bytes:
return os.read(self.input_fd, n)
def change_encoding(self, encoding: str) -> None:
"""
Change the encoding used for I/O operations.
Parameters:
- encoding (str): New encoding to use.
"""
self.encoding = encoding
def refresh(self, rendered_screen: RenderedScreen) -> None:
"""
Refresh the console screen.
Parameters:
- rendered_screen: Structured rendered screen contents and cursor.
"""
c_xy = rendered_screen.cursor
trace(
"unix.refresh start cursor={cursor} lines={lines} prev_lines={prev_lines} "
"offset={offset} posxy={posxy}",
cursor=c_xy,
lines=len(rendered_screen.composed_lines),
prev_lines=len(self._rendered_screen.composed_lines),
offset=self.__offset,
posxy=self.posxy,
)
plan = self.__plan_refresh(rendered_screen, c_xy)
self.__apply_refresh_plan(plan)
def __plan_refresh(
self,
rendered_screen: RenderedScreen,
c_xy: tuple[int, int],
) -> UnixRefreshPlan:
cx, cy = c_xy
height = self.height
old_offset = offset = self.__offset
prev_composed = self._rendered_screen.composed_lines
previous_lines = list(prev_composed)
next_lines = list(rendered_screen.composed_lines)
line_count = len(next_lines)
grow_lines = 0
if not self.__gone_tall:
grow_lines = max(
min(line_count, height) - len(prev_composed),
0,
)
previous_lines.extend([EMPTY_RENDER_LINE] * grow_lines)
elif len(previous_lines) < line_count:
previous_lines.extend([EMPTY_RENDER_LINE] * (line_count - len(previous_lines)))
use_tall_mode = self.__gone_tall or line_count > height
# we make sure the cursor is on the screen, and that we're
# using all of the screen if we can
if cy < offset:
offset = cy
elif cy >= offset + height:
offset = cy - height + 1
elif offset > 0 and line_count < offset + height:
offset = max(line_count - height, 0)
next_lines.append(EMPTY_RENDER_LINE)
oldscr = previous_lines[old_offset : old_offset + height]
newscr = next_lines[offset : offset + height]
reverse_scroll = 0
forward_scroll = 0
if old_offset > offset and self._ri:
reverse_scroll = old_offset - offset
for _ in range(reverse_scroll):
if oldscr:
oldscr.pop(-1)
oldscr.insert(0, EMPTY_RENDER_LINE)
elif old_offset < offset and self._ind:
forward_scroll = offset - old_offset
for _ in range(forward_scroll):
if oldscr:
oldscr.pop(0)
oldscr.append(EMPTY_RENDER_LINE)
line_updates: list[LineUpdate] = []
px, _ = self.posxy
for y, oldline, newline in zip(range(offset, offset + height), oldscr, newscr):
update = self.__plan_changed_line(y, oldline, newline, px)
if update is not None:
line_updates.append(update)
cleared_lines = tuple(range(offset + len(newscr), offset + len(oldscr)))
console_rendered_screen = RenderedScreen(tuple(next_lines), c_xy)
trace(
"unix.refresh plan grow={grow} tall={tall} offset={offset} "
"reverse_scroll={reverse_scroll} forward_scroll={forward_scroll} "
"updates={updates} clears={clears}",
grow=grow_lines,
tall=use_tall_mode,
offset=offset,
reverse_scroll=reverse_scroll,
forward_scroll=forward_scroll,
updates=len(line_updates),
clears=len(cleared_lines),
)
return UnixRefreshPlan(
grow_lines=grow_lines,
use_tall_mode=use_tall_mode,
offset=offset,
reverse_scroll=reverse_scroll,
forward_scroll=forward_scroll,
line_updates=tuple(line_updates),
cleared_lines=cleared_lines,
rendered_screen=console_rendered_screen,
cursor=(cx, cy),
)
def __apply_refresh_plan(self, plan: UnixRefreshPlan) -> None:
cx, cy = plan.cursor
trace(
"unix.refresh apply cursor={cursor} updates={updates} clears={clears}",
cursor=plan.cursor,
updates=len(plan.line_updates),
clears=len(plan.cleared_lines),
)
visual_style = self.begin_redraw_visualization()
screen_line_count = len(self._rendered_screen.composed_lines)
for _ in range(plan.grow_lines):
self.__hide_cursor()
if screen_line_count:
self.__move(0, screen_line_count - 1)
self.__write("\n")
self.posxy = 0, screen_line_count
screen_line_count += 1
if plan.use_tall_mode and not self.__gone_tall:
self.__gone_tall = True
self.__move = self.__move_tall
old_offset = self.__offset
if plan.reverse_scroll:
self.__hide_cursor()
self.__write_code(self._cup, 0, 0)
self.posxy = 0, old_offset
for _ in range(plan.reverse_scroll):
self.__write_code(self._ri)
elif plan.forward_scroll:
self.__hide_cursor()
self.__write_code(self._cup, self.height - 1, 0)
self.posxy = 0, old_offset + self.height - 1
for _ in range(plan.forward_scroll):
self.__write_code(self._ind)
self.__offset = plan.offset
for update in plan.line_updates:
self.__apply_line_update(update, visual_style)
for y in plan.cleared_lines:
self.__hide_cursor()
self.__move(0, y)
self.posxy = 0, y
self.__write_code(self._el)
self.__show_cursor()
self.move_cursor(cx, cy)
self.flushoutput()
self.sync_rendered_screen(plan.rendered_screen, self.posxy)
def move_cursor(self, x: int, y: int) -> None:
"""
Move the cursor to the specified position on the screen.
Parameters:
- x (int): X coordinate.
- y (int): Y coordinate.
"""
if y < self.__offset or y >= self.__offset + self.height:
trace(
"unix.move_cursor offscreen x={x} y={y} offset={offset} height={height}",
x=x,
y=y,
offset=self.__offset,
height=self.height,
)
self.event_queue.insert(Event("scroll", ""))
else:
trace("unix.move_cursor x={x} y={y}", x=x, y=y)
self.__move(x, y)
self.posxy = x, y
self.flushoutput()
def prepare(self) -> None:
"""
Prepare the console for input/output operations.
"""
trace("unix.prepare")
self.__buffer = []
self.__svtermstate = tcgetattr(self.input_fd)
raw = self.__svtermstate.copy()
raw.iflag &= ~(termios.INPCK | termios.ISTRIP | termios.IXON)
raw.oflag &= ~(termios.OPOST)
raw.cflag &= ~(termios.CSIZE | termios.PARENB)
raw.cflag |= termios.CS8
raw.iflag |= termios.BRKINT
raw.lflag &= ~(termios.ICANON | termios.ECHO | termios.IEXTEN)
raw.lflag |= termios.ISIG
raw.cc[termios.VMIN] = b"\x01"
raw.cc[termios.VTIME] = b"\x00"
self.__input_fd_set(raw)
# Apple Terminal will re-wrap lines for us unless we preempt the
# damage.
if self.is_apple_terminal:
os.write(self.output_fd, b"\033[?7l")
self.height, self.width = self.getheightwidth()
self.posxy = 0, 0
self.__gone_tall = False
self.__move = self.__move_short
self.__offset = 0
self.sync_rendered_screen(RenderedScreen.empty(), self.posxy)
self.__maybe_write_code(self._smkx)
try:
self.old_sigwinch = signal.signal(signal.SIGWINCH, self.__sigwinch)
except ValueError:
pass
self.__enable_bracketed_paste()
def restore(self) -> None:
"""
Restore the console to the default state
"""
trace("unix.restore")
self.__disable_bracketed_paste()
self.__maybe_write_code(self._rmkx)
self.flushoutput()
self.__input_fd_set(self.__svtermstate)
if self.is_apple_terminal:
os.write(self.output_fd, b"\033[?7h")
if hasattr(self, "old_sigwinch"):
try:
signal.signal(signal.SIGWINCH, self.old_sigwinch)
except ValueError as e:
import threading
if threading.current_thread() is threading.main_thread():
raise e
del self.old_sigwinch
def push_char(self, char: int | bytes) -> None:
"""
Push a character to the console event queue.
"""
trace("push char {char!r}", char=char)
self.event_queue.push(char)
def get_event(self, block: bool = True) -> Event | None:
"""
Get an event from the console event queue.
Parameters:
- block (bool): Whether to block until an event is available.
Returns:
- Event: Event object from the event queue.
"""
if not block and not self.wait(timeout=0):
return None
while self.event_queue.empty():
while True:
try:
self.push_char(self.__read(1))
except OSError as err:
if err.errno == errno.EINTR:
if not self.event_queue.empty():
return self.event_queue.get()
else:
continue
elif err.errno == errno.EIO:
raise SystemExit(errno.EIO)
else:
raise
else:
break
return self.event_queue.get()
def wait(self, timeout: float | None = None) -> bool:
"""
Wait for events on the console.
"""
return (
not self.event_queue.empty()
or bool(self.pollob.poll(timeout))
)
def set_cursor_vis(self, visible: bool) -> None:
"""
Set the visibility of the cursor.
Parameters:
- visible (bool): Visibility flag.
"""
if visible:
self.__show_cursor()
else:
self.__hide_cursor()
if TIOCGWINSZ:
def getheightwidth(self):
"""
Get the height and width of the console.
Returns:
- tuple: Height and width of the console.
"""
try:
return int(os.environ["LINES"]), int(os.environ["COLUMNS"])
except (KeyError, TypeError, ValueError):
try:
size = ioctl(self.input_fd, TIOCGWINSZ, b"\000" * 8)
except OSError:
return 25, 80
height, width = struct.unpack("hhhh", size)[0:2]
if not height:
return 25, 80
return height, width
else:
def getheightwidth(self):
"""
Get the height and width of the console.
Returns:
- tuple: Height and width of the console.
"""
try:
return int(os.environ["LINES"]), int(os.environ["COLUMNS"])
except (KeyError, TypeError, ValueError):
return 25, 80
def forgetinput(self):
"""
Discard any pending input on the console.
"""
termios.tcflush(self.input_fd, termios.TCIFLUSH)
def flushoutput(self):
"""
Flush the output buffer.
"""
for text, iscode in self.__buffer:
if iscode:
self.__tputs(text)
else:
os.write(self.output_fd, text.encode(self.encoding, "replace"))
del self.__buffer[:]
def finish(self):
"""
Finish console operations and flush the output buffer.
"""
rendered_lines = self._rendered_screen.composed_lines
y = len(rendered_lines) - 1
while y >= 0 and not rendered_lines[y].text:
y -= 1
self.__move(0, min(y, self.height + self.__offset - 1))
self.__write("\n\r")
self.flushoutput()
def beep(self):
"""
Emit a beep sound.
"""
self.__maybe_write_code(self._bel)
self.flushoutput()
if FIONREAD:
def getpending(self):
"""
Get pending events from the console event queue.
Returns:
- Event: Pending event from the event queue.
"""
e = Event("key", "", b"")
while not self.event_queue.empty():
e2 = self.event_queue.get()
e.data += e2.data
e.raw += e2.raw
amount = struct.unpack("i", ioctl(self.input_fd, FIONREAD, b"\0\0\0\0"))[0]
trace("getpending({a})", a=amount)
raw = self.__read(amount)
data = str(raw, self.encoding, "replace")
e.data += data
e.raw += raw
return e
else:
def getpending(self):
"""
Get pending events from the console event queue.
Returns:
- Event: Pending event from the event queue.
"""
e = Event("key", "", b"")
while not self.event_queue.empty():
e2 = self.event_queue.get()
e.data += e2.data
e.raw += e2.raw
amount = 10000
raw = self.__read(amount)
data = str(raw, self.encoding, "replace")
e.data += data
e.raw += raw
return e
def clear(self):
"""
Clear the console screen.
"""
trace("unix.clear")
self.__write_code(self._clear)
self.__gone_tall = True
self.__move = self.__move_tall
self.posxy = 0, 0
self.sync_rendered_screen(RenderedScreen.empty(), self.posxy)
@property
def input_hook(self):
# avoid inline imports here so the repl doesn't get flooded
# with import logging from -X importtime=2
if posix is not None and posix._is_inputhook_installed():
return posix._inputhook
def __enable_bracketed_paste(self) -> None:
os.write(self.output_fd, b"\x1b[?2004h")
def __disable_bracketed_paste(self) -> None:
os.write(self.output_fd, b"\x1b[?2004l")
def __setup_movement(self):
"""
Set up the movement functions based on the terminal capabilities.
"""
if 0 and self._hpa: # hpa don't work in windows telnet :-(
self.__move_x = self.__move_x_hpa
elif self._cub and self._cuf:
self.__move_x = self.__move_x_cub_cuf
elif self._cub1 and self._cuf1:
self.__move_x = self.__move_x_cub1_cuf1
else:
raise RuntimeError("insufficient terminal (horizontal)")
if self._cuu and self._cud:
self.__move_y = self.__move_y_cuu_cud
elif self._cuu1 and self._cud1:
self.__move_y = self.__move_y_cuu1_cud1
else:
raise RuntimeError("insufficient terminal (vertical)")
if self._dch1:
self.dch1 = self._dch1
elif self._dch:
self.dch1 = terminfo.tparm(self._dch, 1)
else:
self.dch1 = None
if self._ich1:
self.ich1 = self._ich1
elif self._ich:
self.ich1 = terminfo.tparm(self._ich, 1)
else:
self.ich1 = None
self.__move = self.__move_short
@staticmethod
def __cell_index_from_x(line: RenderLine, x_coord: int) -> int:
width = 0
index = 0
while index < len(line.cells) and width < x_coord:
width += line.cells[index].width
index += 1
return index
def __plan_changed_line(
self,
y: int,
oldline: RenderLine,
newline: RenderLine,
px_coord: int,
) -> LineUpdate | None:
# NOTE: The shared replace_char / replace_span / rewrite_suffix logic
# is duplicated in WindowsConsole.__plan_changed_line. Keep changes to
# these common cases synchronised between the two files. Yes, this is
# duplicated on purpose; the two backends agree just enough to make a
# shared helper a trap. Unix-only cases (insert_char, delete_then_insert)
# rely on terminal capabilities (ich1/dch1) that are unavailable on
# Windows.
diff = diff_render_lines(oldline, newline)
if diff is None:
return None
start_cell = diff.start_cell
start_x = diff.start_x
if (
self.ich1
and not diff.old_cells
and (visible_new_cells := tuple(
cell for cell in diff.new_cells if cell.width
))
and len(visible_new_cells) == 1
and all(cell.width == 0 for cell in diff.new_cells[1:])
and oldline.cells[start_cell:] == newline.cells[start_cell + 1 :]
):
px_cell = self.__cell_index_from_x(oldline, px_coord)
if (
y == self.posxy[1]
and start_x > self.posxy[0]
and oldline.cells[px_cell:start_cell]
== newline.cells[px_cell + 1 : start_cell + 1]
):
start_cell = px_cell
start_x = px_coord
planned_cells = diff.new_cells
changed_cell = visible_new_cells[0]
return LineUpdate(
kind="insert_char",
y=y,
start_cell=start_cell,
start_x=start_x,
cells=planned_cells,
char_width=changed_cell.width,
reset_to_margin=requires_cursor_resync(planned_cells),
)
if (
len(diff.old_cells) == 1
and len(diff.new_cells) == 1
and diff.old_cells[0].width == diff.new_cells[0].width
):
planned_cells = diff.new_cells
changed_cell = planned_cells[0]
return LineUpdate(
kind="replace_char",
y=y,
start_cell=start_cell,
start_x=start_x,
cells=planned_cells,
char_width=changed_cell.width,
reset_to_margin=requires_cursor_resync(planned_cells),
)
if diff.old_changed_width == diff.new_changed_width:
planned_cells = diff.new_cells
return LineUpdate(
kind="replace_span",
y=y,
start_cell=start_cell,
start_x=start_x,
cells=planned_cells,
char_width=diff.new_changed_width,
reset_to_margin=requires_cursor_resync(planned_cells),
)
if (
self.dch1
and self.ich1
and newline.width == self.width
and start_x < newline.width - 2
and newline.cells[start_cell + 1 : -1] == oldline.cells[start_cell:-2]
):
planned_cells = (newline.cells[start_cell],)
changed_cell = planned_cells[0]
return LineUpdate(
kind="delete_then_insert",
y=y,
start_cell=start_cell,
start_x=start_x,
cells=planned_cells,
char_width=changed_cell.width,
reset_to_margin=requires_cursor_resync(planned_cells),
)
suffix_cells = newline.cells[start_cell:]
return LineUpdate(
kind="rewrite_suffix",
y=y,
start_cell=start_cell,
start_x=start_x,
cells=suffix_cells,
char_width=sum(cell.width for cell in suffix_cells),
clear_eol=oldline.width > newline.width,
reset_to_margin=requires_cursor_resync(suffix_cells),
)
def __apply_line_update(
self,
update: LineUpdate,
visual_style: str | None = None,
) -> None:
text = render_cells(update.cells, visual_style) if visual_style else update.text
trace(
"unix.refresh update kind={kind} y={y} x={x} text={text} "
"clear_eol={clear_eol} reset_to_margin={reset}",
kind=update.kind,
y=update.y,
x=update.start_x,
text=trace_text(text),
clear_eol=update.clear_eol,
reset=update.reset_to_margin,
)
if update.kind == "insert_char":
self.__move(update.start_x, update.y)
self.__write_code(self.ich1)
self.__write(text)
self.posxy = update.start_x + update.char_width, update.y
elif update.kind in {"replace_char", "replace_span"}:
self.__move(update.start_x, update.y)
self.__write(text)
self.posxy = update.start_x + update.char_width, update.y
elif update.kind == "delete_then_insert":
self.__hide_cursor()
self.__move(self.width - 2, update.y)
self.posxy = self.width - 2, update.y
self.__write_code(self.dch1)
self.__move(update.start_x, update.y)
self.__write_code(self.ich1)
self.__write(text)
self.posxy = update.start_x + update.char_width, update.y
else:
self.__hide_cursor()
self.__move(update.start_x, update.y)
if update.clear_eol:
self.__write_code(self._el)
self.__write(text)
self.posxy = update.start_x + update.char_width, update.y
if update.reset_to_margin:
# Non-SGR terminal controls can affect the cursor position.
self.move_cursor(0, update.y)
def __write(self, text):
self.__buffer.append((text, False))
def __write_code(self, fmt, *args):
self.__buffer.append((terminfo.tparm(fmt, *args), True))
def __maybe_write_code(self, fmt, *args):
if fmt:
self.__write_code(fmt, *args)
def __move_y_cuu1_cud1(self, y):
assert self._cud1 is not None
assert self._cuu1 is not None
dy = y - self.posxy[1]
if dy > 0:
self.__write_code(dy * self._cud1)
elif dy < 0:
self.__write_code((-dy) * self._cuu1)
def __move_y_cuu_cud(self, y):
dy = y - self.posxy[1]
if dy > 0:
self.__write_code(self._cud, dy)
elif dy < 0:
self.__write_code(self._cuu, -dy)
def __move_x_hpa(self, x: int) -> None:
if x != self.posxy[0]:
self.__write_code(self._hpa, x)
def __move_x_cub1_cuf1(self, x: int) -> None:
assert self._cuf1 is not None
assert self._cub1 is not None
dx = x - self.posxy[0]
if dx > 0:
self.__write_code(self._cuf1 * dx)
elif dx < 0:
self.__write_code(self._cub1 * (-dx))
def __move_x_cub_cuf(self, x: int) -> None:
dx = x - self.posxy[0]
if dx > 0:
self.__write_code(self._cuf, dx)
elif dx < 0:
self.__write_code(self._cub, -dx)
def __move_short(self, x, y):
self.__move_x(x)
self.__move_y(y)
def __move_tall(self, x, y):
assert 0 <= y - self.__offset < self.height, y - self.__offset
self.__write_code(self._cup, y - self.__offset, x)
def __sigwinch(self, signum, frame):
self.event_queue.insert(Event("resize", ""))
def __hide_cursor(self):
if self.cursor_visible:
self.__maybe_write_code(self._civis)
self.cursor_visible = False
def __show_cursor(self):
if not self.cursor_visible:
self.__maybe_write_code(self._cnorm)
self.cursor_visible = True
def repaint(self):
composed = self._rendered_screen.composed_lines
trace(
"unix.repaint gone_tall={gone_tall} screen_lines={lines} offset={offset}",
gone_tall=self.__gone_tall,
lines=len(composed),
offset=self.__offset,
)
if not self.__gone_tall:
self.posxy = 0, self.posxy[1]
self.__write("\r")
ns = len(composed) * ["\000" * self.width]
else:
self.posxy = 0, self.__offset
self.__move(0, self.__offset)
ns = self.height * ["\000" * self.width]
self.sync_rendered_screen(
RenderedScreen.from_screen_lines(ns, self.posxy),
self.posxy,
)
def __tputs(self, fmt, prog=delayprog):
"""A Python implementation of the curses tputs function; the
curses one can't really be wrapped in a sane manner.
I have the strong suspicion that this is complexity that
will never do anyone any good."""
# using .get() means that things will blow up
# only if the bps is actually needed (which I'm
# betting is pretty unlikely)
bps = ratedict.get(self.__svtermstate.ospeed)
while True:
m = prog.search(fmt)
if not m:
os.write(self.output_fd, fmt)
break
x, y = m.span()
os.write(self.output_fd, fmt[:x])
fmt = fmt[y:]
delay = int(m.group(1))
if b"*" in m.group(2):
delay *= self.height
if self._pad and bps is not None:
nchars = (bps * delay) / 1000
os.write(self.output_fd, self._pad * nchars)
else:
time.sleep(float(delay) / 1000.0)
def __input_fd_set(
self,
state: TermState,
ignore: AbstractSet[int] = _error_codes_to_ignore,
) -> bool:
try:
tcsetattr(self.input_fd, termios.TCSADRAIN, state)
except termios.error as te:
if te.args[0] not in ignore:
raise
return False
else:
return True