mirror of
https://github.com/python/cpython.git
synced 2026-04-21 03:10:52 +00:00
398 lines
12 KiB
Python
398 lines
12 KiB
Python
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
from collections.abc import Iterable, Sequence
|
||
|
|
from dataclasses import dataclass, field
|
||
|
|
from typing import Literal, Protocol, Self
|
||
|
|
|
||
|
|
from .utils import ANSI_ESCAPE_SEQUENCE, THEME, StyleRef, str_width
|
||
|
|
from .types import CursorXY
|
||
|
|
|
||
|
|
type RenderStyle = StyleRef | str | None
|
||
|
|
type LineUpdateKind = Literal[
|
||
|
|
"insert_char",
|
||
|
|
"replace_char",
|
||
|
|
"replace_span",
|
||
|
|
"delete_then_insert",
|
||
|
|
"rewrite_suffix",
|
||
|
|
]
|
||
|
|
|
||
|
|
|
||
|
|
class _ThemeSyntax(Protocol):
|
||
|
|
"""Protocol for theme objects that map tag names to SGR escape strings."""
|
||
|
|
def __getitem__(self, key: str, /) -> str: ...
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass(frozen=True, slots=True)
|
||
|
|
class RenderCell:
|
||
|
|
"""One terminal cell: a character, its column width, and SGR style.
|
||
|
|
|
||
|
|
A screen row like ``>>> def`` is a sequence of cells::
|
||
|
|
|
||
|
|
> > > d e f
|
||
|
|
╰─╯╰─╯╰─╯╰─╯╰─╯╰─╯╰─╯
|
||
|
|
"""
|
||
|
|
|
||
|
|
text: str
|
||
|
|
width: int
|
||
|
|
style: StyleRef = field(default_factory=StyleRef)
|
||
|
|
controls: tuple[str, ...] = ()
|
||
|
|
|
||
|
|
@property
|
||
|
|
def terminal_text(self) -> str:
|
||
|
|
return render_cells((self,))
|
||
|
|
|
||
|
|
|
||
|
|
def _theme_style(theme: _ThemeSyntax, tag: str) -> str:
|
||
|
|
return theme[tag]
|
||
|
|
|
||
|
|
|
||
|
|
def _style_escape(style: StyleRef) -> str:
|
||
|
|
if style.sgr:
|
||
|
|
return style.sgr
|
||
|
|
if style.tag is None:
|
||
|
|
return ""
|
||
|
|
return _theme_style(THEME(), style.tag)
|
||
|
|
|
||
|
|
|
||
|
|
def _update_terminal_state(state: str, escape: str) -> str:
|
||
|
|
if escape in {"\x1b[0m", "\x1b[m"}:
|
||
|
|
return ""
|
||
|
|
return state + escape
|
||
|
|
|
||
|
|
|
||
|
|
def _cells_from_rendered_text(text: str) -> tuple[RenderCell, ...]:
|
||
|
|
if not text:
|
||
|
|
return ()
|
||
|
|
|
||
|
|
cells: list[RenderCell] = []
|
||
|
|
pending_controls: list[str] = []
|
||
|
|
active_sgr = ""
|
||
|
|
index = 0
|
||
|
|
|
||
|
|
def append_plain_text(segment: str) -> None:
|
||
|
|
nonlocal pending_controls
|
||
|
|
if not segment:
|
||
|
|
return
|
||
|
|
if pending_controls:
|
||
|
|
cells.append(RenderCell("", 0, controls=tuple(pending_controls)))
|
||
|
|
pending_controls = []
|
||
|
|
for char in segment:
|
||
|
|
cells.append(
|
||
|
|
RenderCell(
|
||
|
|
char,
|
||
|
|
str_width(char),
|
||
|
|
style=StyleRef.from_sgr(active_sgr),
|
||
|
|
)
|
||
|
|
)
|
||
|
|
|
||
|
|
for match in ANSI_ESCAPE_SEQUENCE.finditer(text):
|
||
|
|
append_plain_text(text[index : match.start()])
|
||
|
|
escape = match.group(0)
|
||
|
|
if escape.endswith("m"):
|
||
|
|
active_sgr = _update_terminal_state(active_sgr, escape)
|
||
|
|
else:
|
||
|
|
pending_controls.append(escape)
|
||
|
|
index = match.end()
|
||
|
|
|
||
|
|
append_plain_text(text[index:])
|
||
|
|
if pending_controls:
|
||
|
|
cells.append(RenderCell("", 0, controls=tuple(pending_controls)))
|
||
|
|
|
||
|
|
return tuple(cells)
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass(frozen=True, slots=True)
|
||
|
|
class RenderLine:
|
||
|
|
"""One physical screen row as a tuple of :class:`RenderCell` objects.
|
||
|
|
|
||
|
|
``text`` is the pre-rendered terminal string (characters + SGR escapes);
|
||
|
|
``width`` is the total visible column count.
|
||
|
|
"""
|
||
|
|
|
||
|
|
cells: tuple[RenderCell, ...]
|
||
|
|
text: str
|
||
|
|
width: int
|
||
|
|
|
||
|
|
@classmethod
|
||
|
|
def from_cells(cls, cells: Iterable[RenderCell]) -> Self:
|
||
|
|
cell_tuple = tuple(cells)
|
||
|
|
return cls(
|
||
|
|
cells=cell_tuple,
|
||
|
|
text=render_cells(cell_tuple),
|
||
|
|
width=sum(cell.width for cell in cell_tuple),
|
||
|
|
)
|
||
|
|
|
||
|
|
@classmethod
|
||
|
|
def from_parts(
|
||
|
|
cls,
|
||
|
|
parts: Sequence[str],
|
||
|
|
widths: Sequence[int],
|
||
|
|
styles: Sequence[RenderStyle] | None = None,
|
||
|
|
) -> Self:
|
||
|
|
if styles is None:
|
||
|
|
return cls.from_cells(
|
||
|
|
RenderCell(text, width)
|
||
|
|
for text, width in zip(parts, widths)
|
||
|
|
)
|
||
|
|
|
||
|
|
cells: list[RenderCell] = []
|
||
|
|
for text, width, style in zip(parts, widths, styles):
|
||
|
|
if isinstance(style, StyleRef):
|
||
|
|
cells.append(RenderCell(text, width, style=style))
|
||
|
|
elif style is None:
|
||
|
|
cells.append(RenderCell(text, width))
|
||
|
|
else:
|
||
|
|
cells.append(RenderCell(text, width, style=StyleRef.from_tag(style)))
|
||
|
|
return cls.from_cells(cells)
|
||
|
|
|
||
|
|
@classmethod
|
||
|
|
def from_rendered_text(cls, text: str) -> Self:
|
||
|
|
return cls.from_cells(_cells_from_rendered_text(text))
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass(frozen=True, slots=True)
|
||
|
|
class ScreenOverlay:
|
||
|
|
"""An overlay that replaces or inserts lines at a screen position.
|
||
|
|
|
||
|
|
If *insert* is True, lines are spliced in (shifting content down);
|
||
|
|
if False (default), lines replace existing content at *y*.
|
||
|
|
|
||
|
|
Overlays are used to display tab completion menus and status messages.
|
||
|
|
For example, a tab-completion menu inserted below the input::
|
||
|
|
|
||
|
|
>>> os.path.j ← line 0 (base content)
|
||
|
|
join ← ScreenOverlay(y=1, insert=True)
|
||
|
|
junction ← (pushes remaining lines down)
|
||
|
|
... ← line 1 (shifted down by 2)
|
||
|
|
"""
|
||
|
|
y: int
|
||
|
|
lines: tuple[RenderLine, ...]
|
||
|
|
insert: bool = False
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass(frozen=True, slots=True)
|
||
|
|
class RenderedScreen:
|
||
|
|
"""The complete screen state: content lines, cursor, and overlays.
|
||
|
|
|
||
|
|
``lines`` holds the base content; ``composed_lines`` is the final
|
||
|
|
result after overlays (completion menus, messages) are applied::
|
||
|
|
|
||
|
|
lines: composed_lines:
|
||
|
|
┌──────────────────┐ ┌──────────────────┐
|
||
|
|
│>>> os.path.j │ │>>> os.path.j │
|
||
|
|
│... │ ──► │ join │ ← overlay
|
||
|
|
└──────────────────┘ │... │
|
||
|
|
└──────────────────┘
|
||
|
|
"""
|
||
|
|
|
||
|
|
lines: tuple[RenderLine, ...]
|
||
|
|
cursor: CursorXY
|
||
|
|
overlays: tuple[ScreenOverlay, ...] = ()
|
||
|
|
composed_lines: tuple[RenderLine, ...] = field(init=False, default=())
|
||
|
|
|
||
|
|
def __post_init__(self) -> None:
|
||
|
|
object.__setattr__(self, "composed_lines", self._compose())
|
||
|
|
|
||
|
|
def _compose(self) -> tuple[RenderLine, ...]:
|
||
|
|
"""Apply overlays in tuple order; inserts shift subsequent positions."""
|
||
|
|
if not self.overlays:
|
||
|
|
return self.lines
|
||
|
|
|
||
|
|
lines = list(self.lines)
|
||
|
|
y_offset = 0
|
||
|
|
for overlay in self.overlays:
|
||
|
|
adjusted_y = overlay.y + y_offset
|
||
|
|
assert adjusted_y >= 0, (
|
||
|
|
f"Overlay y={overlay.y} with offset={y_offset} is negative; "
|
||
|
|
"overlays must be sorted by ascending y"
|
||
|
|
)
|
||
|
|
if overlay.insert:
|
||
|
|
# Splice overlay lines in, pushing existing content down.
|
||
|
|
lines[adjusted_y:adjusted_y] = overlay.lines
|
||
|
|
y_offset += len(overlay.lines)
|
||
|
|
else:
|
||
|
|
# Replace existing lines at the overlay position.
|
||
|
|
target_len = adjusted_y + len(overlay.lines)
|
||
|
|
if len(lines) < target_len:
|
||
|
|
lines.extend([EMPTY_RENDER_LINE] * (target_len - len(lines)))
|
||
|
|
for index, line in enumerate(overlay.lines):
|
||
|
|
lines[adjusted_y + index] = line
|
||
|
|
return tuple(lines)
|
||
|
|
|
||
|
|
@classmethod
|
||
|
|
def empty(cls) -> Self:
|
||
|
|
return cls((), (0, 0), ())
|
||
|
|
|
||
|
|
@classmethod
|
||
|
|
def from_screen_lines(
|
||
|
|
cls,
|
||
|
|
screen: Sequence[str],
|
||
|
|
cursor: CursorXY,
|
||
|
|
) -> Self:
|
||
|
|
return cls(
|
||
|
|
tuple(RenderLine.from_rendered_text(line) for line in screen),
|
||
|
|
cursor,
|
||
|
|
(),
|
||
|
|
)
|
||
|
|
|
||
|
|
def with_overlay(
|
||
|
|
self,
|
||
|
|
y: int,
|
||
|
|
lines: Iterable[RenderLine],
|
||
|
|
) -> Self:
|
||
|
|
return type(self)(
|
||
|
|
self.lines,
|
||
|
|
self.cursor,
|
||
|
|
self.overlays + (ScreenOverlay(y, tuple(lines)),),
|
||
|
|
)
|
||
|
|
|
||
|
|
@property
|
||
|
|
def screen_lines(self) -> tuple[str, ...]:
|
||
|
|
return tuple(line.text for line in self.composed_lines)
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass(frozen=True, slots=True)
|
||
|
|
class LineDiff:
|
||
|
|
"""The changed region between an old and new version of one screen row.
|
||
|
|
|
||
|
|
When the user types ``e`` so the row changes from
|
||
|
|
``>>> nam`` to ``>>> name``::
|
||
|
|
|
||
|
|
>>> n a m old
|
||
|
|
>>> n a m e new
|
||
|
|
╰─╯
|
||
|
|
start_cell=7, new_cells=("m","e"), old_cells=("m",)
|
||
|
|
"""
|
||
|
|
|
||
|
|
start_cell: int
|
||
|
|
start_x: int
|
||
|
|
old_cells: tuple[RenderCell, ...]
|
||
|
|
new_cells: tuple[RenderCell, ...]
|
||
|
|
old_width: int
|
||
|
|
new_width: int
|
||
|
|
|
||
|
|
@property
|
||
|
|
def old_text(self) -> str:
|
||
|
|
return render_cells(self.old_cells)
|
||
|
|
|
||
|
|
@property
|
||
|
|
def new_text(self) -> str:
|
||
|
|
return render_cells(self.new_cells)
|
||
|
|
|
||
|
|
@property
|
||
|
|
def old_changed_width(self) -> int:
|
||
|
|
return sum(cell.width for cell in self.old_cells)
|
||
|
|
|
||
|
|
@property
|
||
|
|
def new_changed_width(self) -> int:
|
||
|
|
return sum(cell.width for cell in self.new_cells)
|
||
|
|
|
||
|
|
|
||
|
|
EMPTY_RENDER_LINE = RenderLine(cells=(), text="", width=0)
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass(frozen=True, slots=True)
|
||
|
|
class LineUpdate:
|
||
|
|
kind: LineUpdateKind
|
||
|
|
y: int
|
||
|
|
start_cell: int
|
||
|
|
start_x: int
|
||
|
|
"""Screen x-coordinate where the update begins. Used for cursor positioning."""
|
||
|
|
cells: tuple[RenderCell, ...]
|
||
|
|
char_width: int = 0
|
||
|
|
clear_eol: bool = False
|
||
|
|
reset_to_margin: bool = False
|
||
|
|
"""If True, the console must resync the cursor position after writing
|
||
|
|
(needed when cells contain non-SGR escape sequences that may move the cursor)."""
|
||
|
|
text: str = field(init=False, default="")
|
||
|
|
|
||
|
|
def __post_init__(self) -> None:
|
||
|
|
object.__setattr__(self, "text", render_cells(self.cells))
|
||
|
|
|
||
|
|
|
||
|
|
def _controls_require_cursor_resync(controls: Sequence[str]) -> bool:
|
||
|
|
# Anything beyond SGR means the cursor may no longer be where we left it.
|
||
|
|
return any(not control.endswith("m") for control in controls)
|
||
|
|
|
||
|
|
|
||
|
|
def requires_cursor_resync(cells: Sequence[RenderCell]) -> bool:
|
||
|
|
return any(_controls_require_cursor_resync(cell.controls) for cell in cells)
|
||
|
|
|
||
|
|
|
||
|
|
def render_cells(
|
||
|
|
cells: Sequence[RenderCell],
|
||
|
|
visual_style: str | None = None,
|
||
|
|
) -> str:
|
||
|
|
"""Render a sequence of cells into a terminal string with SGR escapes.
|
||
|
|
|
||
|
|
Tracks the active SGR state to emit resets only when the style
|
||
|
|
actually changes, minimizing output bytes.
|
||
|
|
|
||
|
|
If *visual_style* is given (used by redraw visualization), it is appended
|
||
|
|
to every cell's style.
|
||
|
|
"""
|
||
|
|
rendered: list[str] = []
|
||
|
|
active_escape = ""
|
||
|
|
for cell in cells:
|
||
|
|
if cell.controls:
|
||
|
|
rendered.extend(cell.controls)
|
||
|
|
if not cell.text:
|
||
|
|
continue
|
||
|
|
|
||
|
|
target_escape = _style_escape(cell.style)
|
||
|
|
if visual_style is not None:
|
||
|
|
target_escape += visual_style
|
||
|
|
if target_escape != active_escape:
|
||
|
|
if active_escape:
|
||
|
|
rendered.append("\x1b[0m")
|
||
|
|
if target_escape:
|
||
|
|
rendered.append(target_escape)
|
||
|
|
active_escape = target_escape
|
||
|
|
rendered.append(cell.text)
|
||
|
|
|
||
|
|
if active_escape:
|
||
|
|
rendered.append("\x1b[0m")
|
||
|
|
return "".join(rendered)
|
||
|
|
|
||
|
|
|
||
|
|
def diff_render_lines(old: RenderLine, new: RenderLine) -> LineDiff | None:
|
||
|
|
if old == new:
|
||
|
|
return None
|
||
|
|
|
||
|
|
prefix = 0
|
||
|
|
start_x = 0
|
||
|
|
max_prefix = min(len(old.cells), len(new.cells))
|
||
|
|
while prefix < max_prefix and old.cells[prefix] == new.cells[prefix]:
|
||
|
|
# Stop at any cell with non-SGR controls, since those might affect
|
||
|
|
# cursor position and must be re-emitted.
|
||
|
|
if old.cells[prefix].controls:
|
||
|
|
break
|
||
|
|
start_x += old.cells[prefix].width
|
||
|
|
prefix += 1
|
||
|
|
|
||
|
|
old_suffix = len(old.cells)
|
||
|
|
new_suffix = len(new.cells)
|
||
|
|
while old_suffix > prefix and new_suffix > prefix:
|
||
|
|
old_cell = old.cells[old_suffix - 1]
|
||
|
|
new_cell = new.cells[new_suffix - 1]
|
||
|
|
if old_cell.controls or new_cell.controls or old_cell != new_cell:
|
||
|
|
break
|
||
|
|
old_suffix -= 1
|
||
|
|
new_suffix -= 1
|
||
|
|
|
||
|
|
# Extend diff range to include trailing zero-width combining characters,
|
||
|
|
# so we never render a combining char without its base character.
|
||
|
|
while old_suffix < len(old.cells) and old.cells[old_suffix].width == 0:
|
||
|
|
old_suffix += 1
|
||
|
|
while new_suffix < len(new.cells) and new.cells[new_suffix].width == 0:
|
||
|
|
new_suffix += 1
|
||
|
|
|
||
|
|
return LineDiff(
|
||
|
|
start_cell=prefix,
|
||
|
|
start_x=start_x,
|
||
|
|
old_cells=old.cells[prefix:old_suffix],
|
||
|
|
new_cells=new.cells[prefix:new_suffix],
|
||
|
|
old_width=old.width,
|
||
|
|
new_width=new.width,
|
||
|
|
)
|