from __future__ import annotations from collections.abc import Iterable, Sequence from dataclasses import dataclass, field from typing import Literal, Protocol, Self from .utils import ANSI_ESCAPE_SEQUENCE, THEME, StyleRef, str_width from .types import CursorXY type RenderStyle = StyleRef | str | None type LineUpdateKind = Literal[ "insert_char", "replace_char", "replace_span", "delete_then_insert", "rewrite_suffix", ] class _ThemeSyntax(Protocol): """Protocol for theme objects that map tag names to SGR escape strings.""" def __getitem__(self, key: str, /) -> str: ... @dataclass(frozen=True, slots=True) class RenderCell: """One terminal cell: a character, its column width, and SGR style. A screen row like ``>>> def`` is a sequence of cells:: > > > d e f ╰─╯╰─╯╰─╯╰─╯╰─╯╰─╯╰─╯ """ text: str width: int style: StyleRef = field(default_factory=StyleRef) controls: tuple[str, ...] = () @property def terminal_text(self) -> str: return render_cells((self,)) def _theme_style(theme: _ThemeSyntax, tag: str) -> str: return theme[tag] def _style_escape(style: StyleRef) -> str: if style.sgr: return style.sgr if style.tag is None: return "" return _theme_style(THEME(), style.tag) def _update_terminal_state(state: str, escape: str) -> str: if escape in {"\x1b[0m", "\x1b[m"}: return "" return state + escape def _cells_from_rendered_text(text: str) -> tuple[RenderCell, ...]: if not text: return () cells: list[RenderCell] = [] pending_controls: list[str] = [] active_sgr = "" index = 0 def append_plain_text(segment: str) -> None: nonlocal pending_controls if not segment: return if pending_controls: cells.append(RenderCell("", 0, controls=tuple(pending_controls))) pending_controls = [] for char in segment: cells.append( RenderCell( char, str_width(char), style=StyleRef.from_sgr(active_sgr), ) ) for match in ANSI_ESCAPE_SEQUENCE.finditer(text): append_plain_text(text[index : match.start()]) escape = match.group(0) if escape.endswith("m"): active_sgr = _update_terminal_state(active_sgr, escape) else: pending_controls.append(escape) index = match.end() append_plain_text(text[index:]) if pending_controls: cells.append(RenderCell("", 0, controls=tuple(pending_controls))) return tuple(cells) @dataclass(frozen=True, slots=True) class RenderLine: """One physical screen row as a tuple of :class:`RenderCell` objects. ``text`` is the pre-rendered terminal string (characters + SGR escapes); ``width`` is the total visible column count. """ cells: tuple[RenderCell, ...] text: str width: int @classmethod def from_cells(cls, cells: Iterable[RenderCell]) -> Self: cell_tuple = tuple(cells) return cls( cells=cell_tuple, text=render_cells(cell_tuple), width=sum(cell.width for cell in cell_tuple), ) @classmethod def from_parts( cls, parts: Sequence[str], widths: Sequence[int], styles: Sequence[RenderStyle] | None = None, ) -> Self: if styles is None: return cls.from_cells( RenderCell(text, width) for text, width in zip(parts, widths) ) cells: list[RenderCell] = [] for text, width, style in zip(parts, widths, styles): if isinstance(style, StyleRef): cells.append(RenderCell(text, width, style=style)) elif style is None: cells.append(RenderCell(text, width)) else: cells.append(RenderCell(text, width, style=StyleRef.from_tag(style))) return cls.from_cells(cells) @classmethod def from_rendered_text(cls, text: str) -> Self: return cls.from_cells(_cells_from_rendered_text(text)) @dataclass(frozen=True, slots=True) class ScreenOverlay: """An overlay that replaces or inserts lines at a screen position. If *insert* is True, lines are spliced in (shifting content down); if False (default), lines replace existing content at *y*. Overlays are used to display tab completion menus and status messages. For example, a tab-completion menu inserted below the input:: >>> os.path.j ← line 0 (base content) join ← ScreenOverlay(y=1, insert=True) junction ← (pushes remaining lines down) ... ← line 1 (shifted down by 2) """ y: int lines: tuple[RenderLine, ...] insert: bool = False @dataclass(frozen=True, slots=True) class RenderedScreen: """The complete screen state: content lines, cursor, and overlays. ``lines`` holds the base content; ``composed_lines`` is the final result after overlays (completion menus, messages) are applied:: lines: composed_lines: ┌──────────────────┐ ┌──────────────────┐ │>>> os.path.j │ │>>> os.path.j │ │... │ ──► │ join │ ← overlay └──────────────────┘ │... │ └──────────────────┘ """ lines: tuple[RenderLine, ...] cursor: CursorXY overlays: tuple[ScreenOverlay, ...] = () composed_lines: tuple[RenderLine, ...] = field(init=False, default=()) def __post_init__(self) -> None: object.__setattr__(self, "composed_lines", self._compose()) def _compose(self) -> tuple[RenderLine, ...]: """Apply overlays in tuple order; inserts shift subsequent positions.""" if not self.overlays: return self.lines lines = list(self.lines) y_offset = 0 for overlay in self.overlays: adjusted_y = overlay.y + y_offset assert adjusted_y >= 0, ( f"Overlay y={overlay.y} with offset={y_offset} is negative; " "overlays must be sorted by ascending y" ) if overlay.insert: # Splice overlay lines in, pushing existing content down. lines[adjusted_y:adjusted_y] = overlay.lines y_offset += len(overlay.lines) else: # Replace existing lines at the overlay position. target_len = adjusted_y + len(overlay.lines) if len(lines) < target_len: lines.extend([EMPTY_RENDER_LINE] * (target_len - len(lines))) for index, line in enumerate(overlay.lines): lines[adjusted_y + index] = line return tuple(lines) @classmethod def empty(cls) -> Self: return cls((), (0, 0), ()) @classmethod def from_screen_lines( cls, screen: Sequence[str], cursor: CursorXY, ) -> Self: return cls( tuple(RenderLine.from_rendered_text(line) for line in screen), cursor, (), ) def with_overlay( self, y: int, lines: Iterable[RenderLine], ) -> Self: return type(self)( self.lines, self.cursor, self.overlays + (ScreenOverlay(y, tuple(lines)),), ) @property def screen_lines(self) -> tuple[str, ...]: return tuple(line.text for line in self.composed_lines) @dataclass(frozen=True, slots=True) class LineDiff: """The changed region between an old and new version of one screen row. When the user types ``e`` so the row changes from ``>>> nam`` to ``>>> name``:: >>> n a m old >>> n a m e new ╰─╯ start_cell=7, new_cells=("m","e"), old_cells=("m",) """ start_cell: int start_x: int old_cells: tuple[RenderCell, ...] new_cells: tuple[RenderCell, ...] old_width: int new_width: int @property def old_text(self) -> str: return render_cells(self.old_cells) @property def new_text(self) -> str: return render_cells(self.new_cells) @property def old_changed_width(self) -> int: return sum(cell.width for cell in self.old_cells) @property def new_changed_width(self) -> int: return sum(cell.width for cell in self.new_cells) EMPTY_RENDER_LINE = RenderLine(cells=(), text="", width=0) @dataclass(frozen=True, slots=True) class LineUpdate: kind: LineUpdateKind y: int start_cell: int start_x: int """Screen x-coordinate where the update begins. Used for cursor positioning.""" cells: tuple[RenderCell, ...] char_width: int = 0 clear_eol: bool = False reset_to_margin: bool = False """If True, the console must resync the cursor position after writing (needed when cells contain non-SGR escape sequences that may move the cursor).""" text: str = field(init=False, default="") def __post_init__(self) -> None: object.__setattr__(self, "text", render_cells(self.cells)) def _controls_require_cursor_resync(controls: Sequence[str]) -> bool: # Anything beyond SGR means the cursor may no longer be where we left it. return any(not control.endswith("m") for control in controls) def requires_cursor_resync(cells: Sequence[RenderCell]) -> bool: return any(_controls_require_cursor_resync(cell.controls) for cell in cells) def render_cells( cells: Sequence[RenderCell], visual_style: str | None = None, ) -> str: """Render a sequence of cells into a terminal string with SGR escapes. Tracks the active SGR state to emit resets only when the style actually changes, minimizing output bytes. If *visual_style* is given (used by redraw visualization), it is appended to every cell's style. """ rendered: list[str] = [] active_escape = "" for cell in cells: if cell.controls: rendered.extend(cell.controls) if not cell.text: continue target_escape = _style_escape(cell.style) if visual_style is not None: target_escape += visual_style if target_escape != active_escape: if active_escape: rendered.append("\x1b[0m") if target_escape: rendered.append(target_escape) active_escape = target_escape rendered.append(cell.text) if active_escape: rendered.append("\x1b[0m") return "".join(rendered) def diff_render_lines(old: RenderLine, new: RenderLine) -> LineDiff | None: if old == new: return None prefix = 0 start_x = 0 max_prefix = min(len(old.cells), len(new.cells)) while prefix < max_prefix and old.cells[prefix] == new.cells[prefix]: # Stop at any cell with non-SGR controls, since those might affect # cursor position and must be re-emitted. if old.cells[prefix].controls: break start_x += old.cells[prefix].width prefix += 1 old_suffix = len(old.cells) new_suffix = len(new.cells) while old_suffix > prefix and new_suffix > prefix: old_cell = old.cells[old_suffix - 1] new_cell = new.cells[new_suffix - 1] if old_cell.controls or new_cell.controls or old_cell != new_cell: break old_suffix -= 1 new_suffix -= 1 # Extend diff range to include trailing zero-width combining characters, # so we never render a combining char without its base character. while old_suffix < len(old.cells) and old.cells[old_suffix].width == 0: old_suffix += 1 while new_suffix < len(new.cells) and new.cells[new_suffix].width == 0: new_suffix += 1 return LineDiff( start_cell=prefix, start_x=start_x, old_cells=old.cells[prefix:old_suffix], new_cells=new.cells[prefix:new_suffix], old_width=old.width, new_width=new.width, )