mirror of
https://github.com/python/cpython.git
synced 2025-10-19 16:03:42 +00:00
gh-135621: Remove dependency on curses from PyREPL (GH-136758)
This commit is contained in:
parent
d1d526afe7
commit
09dfb50f1b
11 changed files with 1229 additions and 160 deletions
|
@ -1,68 +0,0 @@
|
|||
"""Minimal '_curses' module, the low-level interface for curses module
|
||||
which is not meant to be used directly.
|
||||
|
||||
Based on ctypes. It's too incomplete to be really called '_curses', so
|
||||
to use it, you have to import it and stick it in sys.modules['_curses']
|
||||
manually.
|
||||
|
||||
Note that there is also a built-in module _minimal_curses which will
|
||||
hide this one if compiled in.
|
||||
"""
|
||||
|
||||
import ctypes
|
||||
import ctypes.util
|
||||
|
||||
|
||||
class error(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def _find_clib() -> str:
|
||||
trylibs = ["ncursesw", "ncurses", "curses"]
|
||||
|
||||
for lib in trylibs:
|
||||
path = ctypes.util.find_library(lib)
|
||||
if path:
|
||||
return path
|
||||
raise ModuleNotFoundError("curses library not found", name="_pyrepl._minimal_curses")
|
||||
|
||||
|
||||
_clibpath = _find_clib()
|
||||
clib = ctypes.cdll.LoadLibrary(_clibpath)
|
||||
|
||||
clib.setupterm.argtypes = [ctypes.c_char_p, ctypes.c_int, ctypes.POINTER(ctypes.c_int)]
|
||||
clib.setupterm.restype = ctypes.c_int
|
||||
|
||||
clib.tigetstr.argtypes = [ctypes.c_char_p]
|
||||
clib.tigetstr.restype = ctypes.c_ssize_t
|
||||
|
||||
clib.tparm.argtypes = [ctypes.c_char_p] + 9 * [ctypes.c_int] # type: ignore[operator]
|
||||
clib.tparm.restype = ctypes.c_char_p
|
||||
|
||||
OK = 0
|
||||
ERR = -1
|
||||
|
||||
# ____________________________________________________________
|
||||
|
||||
|
||||
def setupterm(termstr, fd):
|
||||
err = ctypes.c_int(0)
|
||||
result = clib.setupterm(termstr, fd, ctypes.byref(err))
|
||||
if result == ERR:
|
||||
raise error("setupterm() failed (err=%d)" % err.value)
|
||||
|
||||
|
||||
def tigetstr(cap):
|
||||
if not isinstance(cap, bytes):
|
||||
cap = cap.encode("ascii")
|
||||
result = clib.tigetstr(cap)
|
||||
if result == ERR:
|
||||
return None
|
||||
return ctypes.cast(result, ctypes.c_char_p).value
|
||||
|
||||
|
||||
def tparm(str, i1=0, i2=0, i3=0, i4=0, i5=0, i6=0, i7=0, i8=0, i9=0):
|
||||
result = clib.tparm(str, i1, i2, i3, i4, i5, i6, i7, i8, i9)
|
||||
if result is None:
|
||||
raise error("tparm() returned NULL")
|
||||
return result
|
|
@ -1,33 +0,0 @@
|
|||
# Copyright 2000-2010 Michael Hudson-Doyle <micahel@gmail.com>
|
||||
# Armin Rigo
|
||||
#
|
||||
# All Rights Reserved
|
||||
#
|
||||
#
|
||||
# Permission to use, copy, modify, and distribute this software and
|
||||
# its documentation for any purpose is hereby granted without fee,
|
||||
# provided that the above copyright notice appear in all copies and
|
||||
# that both that copyright notice and this permission notice appear in
|
||||
# supporting documentation.
|
||||
#
|
||||
# THE AUTHOR MICHAEL HUDSON DISCLAIMS ALL WARRANTIES WITH REGARD TO
|
||||
# THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
|
||||
# AND FITNESS, IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL,
|
||||
# INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER
|
||||
# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF
|
||||
# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
|
||||
# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
|
||||
|
||||
try:
|
||||
import _curses
|
||||
except ImportError:
|
||||
try:
|
||||
import curses as _curses # type: ignore[no-redef]
|
||||
except ImportError:
|
||||
from . import _minimal_curses as _curses # type: ignore[no-redef]
|
||||
|
||||
setupterm = _curses.setupterm
|
||||
tigetstr = _curses.tigetstr
|
||||
tparm = _curses.tparm
|
||||
error = _curses.error
|
530
Lib/_pyrepl/terminfo.py
Normal file
530
Lib/_pyrepl/terminfo.py
Normal file
|
@ -0,0 +1,530 @@
|
|||
"""Pure Python curses-like terminal capability queries."""
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
import errno
|
||||
import os
|
||||
from pathlib import Path
|
||||
import re
|
||||
import struct
|
||||
|
||||
|
||||
# Terminfo constants
|
||||
MAGIC16 = 0o432 # Magic number for 16-bit terminfo format
|
||||
MAGIC32 = 0o1036 # Magic number for 32-bit terminfo format
|
||||
|
||||
# Special values for absent/cancelled capabilities
|
||||
ABSENT_BOOLEAN = -1
|
||||
ABSENT_NUMERIC = -1
|
||||
CANCELLED_NUMERIC = -2
|
||||
ABSENT_STRING = None
|
||||
CANCELLED_STRING = None
|
||||
|
||||
|
||||
# Standard string capability names from ncurses Caps file
|
||||
# This matches the order used by ncurses when compiling terminfo
|
||||
# fmt: off
|
||||
_STRING_NAMES: tuple[str, ...] = (
|
||||
"cbt", "bel", "cr", "csr", "tbc", "clear", "el", "ed", "hpa", "cmdch",
|
||||
"cup", "cud1", "home", "civis", "cub1", "mrcup", "cnorm", "cuf1", "ll",
|
||||
"cuu1", "cvvis", "dch1", "dl1", "dsl", "hd", "smacs", "blink", "bold",
|
||||
"smcup", "smdc", "dim", "smir", "invis", "prot", "rev", "smso", "smul",
|
||||
"ech", "rmacs", "sgr0", "rmcup", "rmdc", "rmir", "rmso", "rmul", "flash",
|
||||
"ff", "fsl", "is1", "is2", "is3", "if", "ich1", "il1", "ip", "kbs", "ktbc",
|
||||
"kclr", "kctab", "kdch1", "kdl1", "kcud1", "krmir", "kel", "ked", "kf0",
|
||||
"kf1", "kf10", "kf2", "kf3", "kf4", "kf5", "kf6", "kf7", "kf8", "kf9",
|
||||
"khome", "kich1", "kil1", "kcub1", "kll", "knp", "kpp", "kcuf1", "kind",
|
||||
"kri", "khts", "kcuu1", "rmkx", "smkx", "lf0", "lf1", "lf10", "lf2", "lf3",
|
||||
"lf4", "lf5", "lf6", "lf7", "lf8", "lf9", "rmm", "smm", "nel", "pad", "dch",
|
||||
"dl", "cud", "ich", "indn", "il", "cub", "cuf", "rin", "cuu", "pfkey",
|
||||
"pfloc", "pfx", "mc0", "mc4", "mc5", "rep", "rs1", "rs2", "rs3", "rf", "rc",
|
||||
"vpa", "sc", "ind", "ri", "sgr", "hts", "wind", "ht", "tsl", "uc", "hu",
|
||||
"iprog", "ka1", "ka3", "kb2", "kc1", "kc3", "mc5p", "rmp", "acsc", "pln",
|
||||
"kcbt", "smxon", "rmxon", "smam", "rmam", "xonc", "xoffc", "enacs", "smln",
|
||||
"rmln", "kbeg", "kcan", "kclo", "kcmd", "kcpy", "kcrt", "kend", "kent",
|
||||
"kext", "kfnd", "khlp", "kmrk", "kmsg", "kmov", "knxt", "kopn", "kopt",
|
||||
"kprv", "kprt", "krdo", "kref", "krfr", "krpl", "krst", "kres", "ksav",
|
||||
"kspd", "kund", "kBEG", "kCAN", "kCMD", "kCPY", "kCRT", "kDC", "kDL",
|
||||
"kslt", "kEND", "kEOL", "kEXT", "kFND", "kHLP", "kHOM", "kIC", "kLFT",
|
||||
"kMSG", "kMOV", "kNXT", "kOPT", "kPRV", "kPRT", "kRDO", "kRPL", "kRIT",
|
||||
"kRES", "kSAV", "kSPD", "kUND", "rfi", "kf11", "kf12", "kf13", "kf14",
|
||||
"kf15", "kf16", "kf17", "kf18", "kf19", "kf20", "kf21", "kf22", "kf23",
|
||||
"kf24", "kf25", "kf26", "kf27", "kf28", "kf29", "kf30", "kf31", "kf32",
|
||||
"kf33", "kf34", "kf35", "kf36", "kf37", "kf38", "kf39", "kf40", "kf41",
|
||||
"kf42", "kf43", "kf44", "kf45", "kf46", "kf47", "kf48", "kf49", "kf50",
|
||||
"kf51", "kf52", "kf53", "kf54", "kf55", "kf56", "kf57", "kf58", "kf59",
|
||||
"kf60", "kf61", "kf62", "kf63", "el1", "mgc", "smgl", "smgr", "fln", "sclk",
|
||||
"dclk", "rmclk", "cwin", "wingo", "hup","dial", "qdial", "tone", "pulse",
|
||||
"hook", "pause", "wait", "u0", "u1", "u2", "u3", "u4", "u5", "u6", "u7",
|
||||
"u8", "u9", "op", "oc", "initc", "initp", "scp", "setf", "setb", "cpi",
|
||||
"lpi", "chr", "cvr", "defc", "swidm", "sdrfq", "sitm", "slm", "smicm",
|
||||
"snlq", "snrmq", "sshm", "ssubm", "ssupm", "sum", "rwidm", "ritm", "rlm",
|
||||
"rmicm", "rshm", "rsubm", "rsupm", "rum", "mhpa", "mcud1", "mcub1", "mcuf1",
|
||||
"mvpa", "mcuu1", "porder", "mcud", "mcub", "mcuf", "mcuu", "scs", "smgb",
|
||||
"smgbp", "smglp", "smgrp", "smgt", "smgtp", "sbim", "scsd", "rbim", "rcsd",
|
||||
"subcs", "supcs", "docr", "zerom", "csnm", "kmous", "minfo", "reqmp",
|
||||
"getm", "setaf", "setab", "pfxl", "devt", "csin", "s0ds", "s1ds", "s2ds",
|
||||
"s3ds", "smglr", "smgtb", "birep", "binel", "bicr", "colornm", "defbi",
|
||||
"endbi", "setcolor", "slines", "dispc", "smpch", "rmpch", "smsc", "rmsc",
|
||||
"pctrm", "scesc", "scesa", "ehhlm", "elhlm", "elohlm", "erhlm", "ethlm",
|
||||
"evhlm", "sgr1", "slength", "OTi2", "OTrs", "OTnl", "OTbc", "OTko", "OTma",
|
||||
"OTG2", "OTG3", "OTG1", "OTG4", "OTGR", "OTGL", "OTGU", "OTGD", "OTGH",
|
||||
"OTGV", "OTGC","meml", "memu", "box1"
|
||||
)
|
||||
# fmt: on
|
||||
_STRING_CAPABILITY_NAMES = {name: i for i, name in enumerate(_STRING_NAMES)}
|
||||
|
||||
|
||||
def _get_terminfo_dirs() -> list[Path]:
|
||||
"""Get list of directories to search for terminfo files.
|
||||
|
||||
Based on ncurses behavior in:
|
||||
- ncurses/tinfo/db_iterator.c:_nc_next_db()
|
||||
- ncurses/tinfo/read_entry.c:_nc_read_entry()
|
||||
"""
|
||||
dirs = []
|
||||
|
||||
terminfo = os.environ.get("TERMINFO")
|
||||
if terminfo:
|
||||
dirs.append(terminfo)
|
||||
|
||||
try:
|
||||
home = Path.home()
|
||||
dirs.append(str(home / ".terminfo"))
|
||||
except RuntimeError:
|
||||
pass
|
||||
|
||||
# Check TERMINFO_DIRS
|
||||
terminfo_dirs = os.environ.get("TERMINFO_DIRS", "")
|
||||
if terminfo_dirs:
|
||||
for d in terminfo_dirs.split(":"):
|
||||
if d:
|
||||
dirs.append(d)
|
||||
|
||||
dirs.extend(
|
||||
[
|
||||
"/etc/terminfo",
|
||||
"/lib/terminfo",
|
||||
"/usr/lib/terminfo",
|
||||
"/usr/share/terminfo",
|
||||
"/usr/share/lib/terminfo",
|
||||
"/usr/share/misc/terminfo",
|
||||
"/usr/local/lib/terminfo",
|
||||
"/usr/local/share/terminfo",
|
||||
]
|
||||
)
|
||||
|
||||
return [Path(d) for d in dirs if Path(d).is_dir()]
|
||||
|
||||
|
||||
def _validate_terminal_name_or_raise(terminal_name: str) -> None:
|
||||
if not isinstance(terminal_name, str):
|
||||
raise TypeError("`terminal_name` must be a string")
|
||||
|
||||
if not terminal_name:
|
||||
raise ValueError("`terminal_name` cannot be empty")
|
||||
|
||||
if "\x00" in terminal_name:
|
||||
raise ValueError("NUL character found in `terminal_name`")
|
||||
|
||||
t = Path(terminal_name)
|
||||
if len(t.parts) > 1:
|
||||
raise ValueError("`terminal_name` cannot contain path separators")
|
||||
|
||||
|
||||
def _read_terminfo_file(terminal_name: str) -> bytes:
|
||||
"""Find and read terminfo file for given terminal name.
|
||||
|
||||
Terminfo files are stored in directories using the first character
|
||||
of the terminal name as a subdirectory.
|
||||
"""
|
||||
_validate_terminal_name_or_raise(terminal_name)
|
||||
first_char = terminal_name[0].lower()
|
||||
filename = terminal_name
|
||||
|
||||
for directory in _get_terminfo_dirs():
|
||||
path = directory / first_char / filename
|
||||
if path.is_file():
|
||||
return path.read_bytes()
|
||||
|
||||
# Try with hex encoding of first char (for special chars)
|
||||
hex_dir = "%02x" % ord(first_char)
|
||||
path = directory / hex_dir / filename
|
||||
if path.is_file():
|
||||
return path.read_bytes()
|
||||
|
||||
raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), filename)
|
||||
|
||||
|
||||
# Hard-coded terminal capabilities for common terminals
|
||||
# This is a minimal subset needed by PyREPL
|
||||
_TERMINAL_CAPABILITIES = {
|
||||
# ANSI/xterm-compatible terminals
|
||||
"ansi": {
|
||||
# Bell
|
||||
"bel": b"\x07",
|
||||
# Cursor movement
|
||||
"cub": b"\x1b[%p1%dD", # Move cursor left N columns
|
||||
"cud": b"\x1b[%p1%dB", # Move cursor down N rows
|
||||
"cuf": b"\x1b[%p1%dC", # Move cursor right N columns
|
||||
"cuu": b"\x1b[%p1%dA", # Move cursor up N rows
|
||||
"cub1": b"\x08", # Move cursor left 1 column
|
||||
"cud1": b"\n", # Move cursor down 1 row
|
||||
"cuf1": b"\x1b[C", # Move cursor right 1 column
|
||||
"cuu1": b"\x1b[A", # Move cursor up 1 row
|
||||
"cup": b"\x1b[%i%p1%d;%p2%dH", # Move cursor to row, column
|
||||
"hpa": b"\x1b[%i%p1%dG", # Move cursor to column
|
||||
# Clear operations
|
||||
"clear": b"\x1b[H\x1b[2J", # Clear screen and home cursor
|
||||
"el": b"\x1b[K", # Clear to end of line
|
||||
# Insert/delete
|
||||
"dch": b"\x1b[%p1%dP", # Delete N characters
|
||||
"dch1": b"\x1b[P", # Delete 1 character
|
||||
"ich": b"\x1b[%p1%d@", # Insert N characters
|
||||
"ich1": b"", # Insert 1 character
|
||||
# Cursor visibility
|
||||
"civis": b"\x1b[?25l", # Make cursor invisible
|
||||
"cnorm": b"\x1b[?12l\x1b[?25h", # Make cursor normal (visible)
|
||||
# Scrolling
|
||||
"ind": b"\n", # Scroll up one line
|
||||
"ri": b"\x1bM", # Scroll down one line
|
||||
# Keypad mode
|
||||
"smkx": b"\x1b[?1h\x1b=", # Enable keypad mode
|
||||
"rmkx": b"\x1b[?1l\x1b>", # Disable keypad mode
|
||||
# Padding (not used in modern terminals)
|
||||
"pad": b"",
|
||||
# Function keys and special keys
|
||||
"kdch1": b"\x1b[3~", # Delete key
|
||||
"kcud1": b"\x1bOB", # Down arrow
|
||||
"kend": b"\x1bOF", # End key
|
||||
"kent": b"\x1bOM", # Enter key
|
||||
"khome": b"\x1bOH", # Home key
|
||||
"kich1": b"\x1b[2~", # Insert key
|
||||
"kcub1": b"\x1bOD", # Left arrow
|
||||
"knp": b"\x1b[6~", # Page down
|
||||
"kpp": b"\x1b[5~", # Page up
|
||||
"kcuf1": b"\x1bOC", # Right arrow
|
||||
"kcuu1": b"\x1bOA", # Up arrow
|
||||
# Function keys F1-F20
|
||||
"kf1": b"\x1bOP",
|
||||
"kf2": b"\x1bOQ",
|
||||
"kf3": b"\x1bOR",
|
||||
"kf4": b"\x1bOS",
|
||||
"kf5": b"\x1b[15~",
|
||||
"kf6": b"\x1b[17~",
|
||||
"kf7": b"\x1b[18~",
|
||||
"kf8": b"\x1b[19~",
|
||||
"kf9": b"\x1b[20~",
|
||||
"kf10": b"\x1b[21~",
|
||||
"kf11": b"\x1b[23~",
|
||||
"kf12": b"\x1b[24~",
|
||||
"kf13": b"\x1b[1;2P",
|
||||
"kf14": b"\x1b[1;2Q",
|
||||
"kf15": b"\x1b[1;2R",
|
||||
"kf16": b"\x1b[1;2S",
|
||||
"kf17": b"\x1b[15;2~",
|
||||
"kf18": b"\x1b[17;2~",
|
||||
"kf19": b"\x1b[18;2~",
|
||||
"kf20": b"\x1b[19;2~",
|
||||
},
|
||||
# Dumb terminal - minimal capabilities
|
||||
"dumb": {
|
||||
"bel": b"\x07", # Bell
|
||||
"cud1": b"\n", # Move down 1 row (newline)
|
||||
"ind": b"\n", # Scroll up one line (newline)
|
||||
},
|
||||
# Linux console
|
||||
"linux": {
|
||||
# Bell
|
||||
"bel": b"\x07",
|
||||
# Cursor movement
|
||||
"cub": b"\x1b[%p1%dD", # Move cursor left N columns
|
||||
"cud": b"\x1b[%p1%dB", # Move cursor down N rows
|
||||
"cuf": b"\x1b[%p1%dC", # Move cursor right N columns
|
||||
"cuu": b"\x1b[%p1%dA", # Move cursor up N rows
|
||||
"cub1": b"\x08", # Move cursor left 1 column (backspace)
|
||||
"cud1": b"\n", # Move cursor down 1 row (newline)
|
||||
"cuf1": b"\x1b[C", # Move cursor right 1 column
|
||||
"cuu1": b"\x1b[A", # Move cursor up 1 row
|
||||
"cup": b"\x1b[%i%p1%d;%p2%dH", # Move cursor to row, column
|
||||
"hpa": b"\x1b[%i%p1%dG", # Move cursor to column
|
||||
# Clear operations
|
||||
"clear": b"\x1b[H\x1b[J", # Clear screen and home cursor (different from ansi!)
|
||||
"el": b"\x1b[K", # Clear to end of line
|
||||
# Insert/delete
|
||||
"dch": b"\x1b[%p1%dP", # Delete N characters
|
||||
"dch1": b"\x1b[P", # Delete 1 character
|
||||
"ich": b"\x1b[%p1%d@", # Insert N characters
|
||||
"ich1": b"\x1b[@", # Insert 1 character
|
||||
# Cursor visibility
|
||||
"civis": b"\x1b[?25l\x1b[?1c", # Make cursor invisible
|
||||
"cnorm": b"\x1b[?25h\x1b[?0c", # Make cursor normal
|
||||
# Scrolling
|
||||
"ind": b"\n", # Scroll up one line
|
||||
"ri": b"\x1bM", # Scroll down one line
|
||||
# Keypad mode
|
||||
"smkx": b"\x1b[?1h\x1b=", # Enable keypad mode
|
||||
"rmkx": b"\x1b[?1l\x1b>", # Disable keypad mode
|
||||
# Function keys and special keys
|
||||
"kdch1": b"\x1b[3~", # Delete key
|
||||
"kcud1": b"\x1b[B", # Down arrow
|
||||
"kend": b"\x1b[4~", # End key (different from ansi!)
|
||||
"khome": b"\x1b[1~", # Home key (different from ansi!)
|
||||
"kich1": b"\x1b[2~", # Insert key
|
||||
"kcub1": b"\x1b[D", # Left arrow
|
||||
"knp": b"\x1b[6~", # Page down
|
||||
"kpp": b"\x1b[5~", # Page up
|
||||
"kcuf1": b"\x1b[C", # Right arrow
|
||||
"kcuu1": b"\x1b[A", # Up arrow
|
||||
# Function keys
|
||||
"kf1": b"\x1b[[A",
|
||||
"kf2": b"\x1b[[B",
|
||||
"kf3": b"\x1b[[C",
|
||||
"kf4": b"\x1b[[D",
|
||||
"kf5": b"\x1b[[E",
|
||||
"kf6": b"\x1b[17~",
|
||||
"kf7": b"\x1b[18~",
|
||||
"kf8": b"\x1b[19~",
|
||||
"kf9": b"\x1b[20~",
|
||||
"kf10": b"\x1b[21~",
|
||||
"kf11": b"\x1b[23~",
|
||||
"kf12": b"\x1b[24~",
|
||||
"kf13": b"\x1b[25~",
|
||||
"kf14": b"\x1b[26~",
|
||||
"kf15": b"\x1b[28~",
|
||||
"kf16": b"\x1b[29~",
|
||||
"kf17": b"\x1b[31~",
|
||||
"kf18": b"\x1b[32~",
|
||||
"kf19": b"\x1b[33~",
|
||||
"kf20": b"\x1b[34~",
|
||||
},
|
||||
}
|
||||
|
||||
# Map common TERM values to capability sets
|
||||
_TERM_ALIASES = {
|
||||
"xterm": "ansi",
|
||||
"xterm-color": "ansi",
|
||||
"xterm-256color": "ansi",
|
||||
"screen": "ansi",
|
||||
"screen-256color": "ansi",
|
||||
"tmux": "ansi",
|
||||
"tmux-256color": "ansi",
|
||||
"vt100": "ansi",
|
||||
"vt220": "ansi",
|
||||
"rxvt": "ansi",
|
||||
"rxvt-unicode": "ansi",
|
||||
"rxvt-unicode-256color": "ansi",
|
||||
"unknown": "dumb",
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class TermInfo:
|
||||
terminal_name: str | bytes | None
|
||||
fallback: bool = True
|
||||
|
||||
_names: list[str] = field(default_factory=list)
|
||||
_booleans: list[int] = field(default_factory=list)
|
||||
_numbers: list[int] = field(default_factory=list)
|
||||
_strings: list[bytes | None] = field(default_factory=list)
|
||||
_capabilities: dict[str, bytes] = field(default_factory=dict)
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
"""Initialize terminal capabilities for the given terminal type.
|
||||
|
||||
Based on ncurses implementation in:
|
||||
- ncurses/tinfo/lib_setup.c:setupterm() and _nc_setupterm()
|
||||
- ncurses/tinfo/lib_setup.c:TINFO_SETUP_TERM()
|
||||
|
||||
This version first attempts to read terminfo database files like ncurses,
|
||||
then, if `fallback` is True, falls back to hardcoded capabilities for
|
||||
common terminal types.
|
||||
"""
|
||||
# If termstr is None or empty, try to get from environment
|
||||
if not self.terminal_name:
|
||||
self.terminal_name = os.environ.get("TERM") or "ANSI"
|
||||
|
||||
if isinstance(self.terminal_name, bytes):
|
||||
self.terminal_name = self.terminal_name.decode("ascii")
|
||||
|
||||
try:
|
||||
self._parse_terminfo_file(self.terminal_name)
|
||||
except (OSError, ValueError):
|
||||
if not self.fallback:
|
||||
raise
|
||||
|
||||
term_type = _TERM_ALIASES.get(
|
||||
self.terminal_name, self.terminal_name
|
||||
)
|
||||
if term_type not in _TERMINAL_CAPABILITIES:
|
||||
term_type = "dumb"
|
||||
self._capabilities = _TERMINAL_CAPABILITIES[term_type].copy()
|
||||
|
||||
def _parse_terminfo_file(self, terminal_name: str) -> None:
|
||||
"""Parse a terminfo file.
|
||||
|
||||
Based on ncurses implementation in:
|
||||
- ncurses/tinfo/read_entry.c:_nc_read_termtype()
|
||||
- ncurses/tinfo/read_entry.c:_nc_read_file_entry()
|
||||
"""
|
||||
data = _read_terminfo_file(terminal_name)
|
||||
too_short = f"TermInfo file for {terminal_name!r} too short"
|
||||
offset = 12
|
||||
if len(data) < offset:
|
||||
raise ValueError(too_short)
|
||||
|
||||
magic, name_size, bool_count, num_count, str_count, str_size = (
|
||||
struct.unpack("<Hhhhhh", data[:offset])
|
||||
)
|
||||
|
||||
if magic == MAGIC16:
|
||||
number_format = "<h" # 16-bit signed
|
||||
number_size = 2
|
||||
elif magic == MAGIC32:
|
||||
number_format = "<i" # 32-bit signed
|
||||
number_size = 4
|
||||
else:
|
||||
raise ValueError(
|
||||
f"TermInfo file for {terminal_name!r} uses unknown magic"
|
||||
)
|
||||
|
||||
# Read terminal names
|
||||
if offset + name_size > len(data):
|
||||
raise ValueError(too_short)
|
||||
names = data[offset : offset + name_size - 1].decode(
|
||||
"ascii", errors="ignore"
|
||||
)
|
||||
offset += name_size
|
||||
|
||||
# Read boolean capabilities
|
||||
if offset + bool_count > len(data):
|
||||
raise ValueError(too_short)
|
||||
booleans = list(data[offset : offset + bool_count])
|
||||
offset += bool_count
|
||||
|
||||
# Align to even byte boundary for numbers
|
||||
if offset % 2:
|
||||
offset += 1
|
||||
|
||||
# Read numeric capabilities
|
||||
numbers = []
|
||||
for i in range(num_count):
|
||||
if offset + number_size > len(data):
|
||||
raise ValueError(too_short)
|
||||
num = struct.unpack(
|
||||
number_format, data[offset : offset + number_size]
|
||||
)[0]
|
||||
numbers.append(num)
|
||||
offset += number_size
|
||||
|
||||
# Read string offsets
|
||||
string_offsets = []
|
||||
for i in range(str_count):
|
||||
if offset + 2 > len(data):
|
||||
raise ValueError(too_short)
|
||||
off = struct.unpack("<h", data[offset : offset + 2])[0]
|
||||
string_offsets.append(off)
|
||||
offset += 2
|
||||
|
||||
# Read string table
|
||||
if offset + str_size > len(data):
|
||||
raise ValueError(too_short)
|
||||
string_table = data[offset : offset + str_size]
|
||||
|
||||
# Extract strings from string table
|
||||
strings: list[bytes | None] = []
|
||||
for off in string_offsets:
|
||||
if off < 0:
|
||||
strings.append(CANCELLED_STRING)
|
||||
elif off < len(string_table):
|
||||
# Find null terminator
|
||||
end = off
|
||||
while end < len(string_table) and string_table[end] != 0:
|
||||
end += 1
|
||||
if end <= len(string_table):
|
||||
strings.append(string_table[off:end])
|
||||
else:
|
||||
strings.append(ABSENT_STRING)
|
||||
else:
|
||||
strings.append(ABSENT_STRING)
|
||||
|
||||
self._names = names.split("|")
|
||||
self._booleans = booleans
|
||||
self._numbers = numbers
|
||||
self._strings = strings
|
||||
|
||||
def get(self, cap: str) -> bytes | None:
|
||||
"""Get terminal capability string by name.
|
||||
|
||||
Based on ncurses implementation in:
|
||||
- ncurses/tinfo/lib_ti.c:tigetstr()
|
||||
|
||||
The ncurses version searches through compiled terminfo data structures.
|
||||
This version first checks parsed terminfo data, then falls back to
|
||||
hardcoded capabilities.
|
||||
"""
|
||||
if not isinstance(cap, str):
|
||||
raise TypeError(f"`cap` must be a string, not {type(cap)}")
|
||||
|
||||
if self._capabilities:
|
||||
# Fallbacks populated, use them
|
||||
return self._capabilities.get(cap)
|
||||
|
||||
# Look up in standard capabilities first
|
||||
if cap in _STRING_CAPABILITY_NAMES:
|
||||
index = _STRING_CAPABILITY_NAMES[cap]
|
||||
if index < len(self._strings):
|
||||
return self._strings[index]
|
||||
|
||||
# Note: we don't support extended capabilities since PyREPL doesn't
|
||||
# need them.
|
||||
return None
|
||||
|
||||
|
||||
def tparm(cap_bytes: bytes, *params: int) -> bytes:
|
||||
"""Parameterize a terminal capability string.
|
||||
|
||||
Based on ncurses implementation in:
|
||||
- ncurses/tinfo/lib_tparm.c:tparm()
|
||||
- ncurses/tinfo/lib_tparm.c:tparam_internal()
|
||||
|
||||
The ncurses version implements a full stack-based interpreter for
|
||||
terminfo parameter strings. This pure Python version implements only
|
||||
the subset of parameter substitution operations needed by PyREPL:
|
||||
- %i (increment parameters for 1-based indexing)
|
||||
- %p[1-9]%d (parameter substitution)
|
||||
- %p[1-9]%{n}%+%d (parameter plus constant)
|
||||
"""
|
||||
if not isinstance(cap_bytes, bytes):
|
||||
raise TypeError(f"`cap` must be bytes, not {type(cap_bytes)}")
|
||||
|
||||
result = cap_bytes
|
||||
|
||||
# %i - increment parameters (1-based instead of 0-based)
|
||||
increment = b"%i" in result
|
||||
if increment:
|
||||
result = result.replace(b"%i", b"")
|
||||
|
||||
# Replace %p1%d, %p2%d, etc. with actual parameter values
|
||||
for i in range(len(params)):
|
||||
pattern = b"%%p%d%%d" % (i + 1)
|
||||
if pattern in result:
|
||||
value = params[i]
|
||||
if increment:
|
||||
value += 1
|
||||
result = result.replace(pattern, str(value).encode("ascii"))
|
||||
|
||||
# Handle %p1%{1}%+%d (parameter plus constant)
|
||||
# Used in some cursor positioning sequences
|
||||
pattern_re = re.compile(rb"%p(\d)%\{(\d+)\}%\+%d")
|
||||
matches = list(pattern_re.finditer(result))
|
||||
for match in reversed(matches): # reversed to maintain positions
|
||||
param_idx = int(match.group(1))
|
||||
constant = int(match.group(2))
|
||||
value = params[param_idx] + constant
|
||||
result = (
|
||||
result[: match.start()]
|
||||
+ str(value).encode("ascii")
|
||||
+ result[match.end() :]
|
||||
)
|
||||
|
||||
return result
|
|
@ -33,7 +33,7 @@
|
|||
import platform
|
||||
from fcntl import ioctl
|
||||
|
||||
from . import curses
|
||||
from . import terminfo
|
||||
from .console import Console, Event
|
||||
from .fancy_termios import tcgetattr, tcsetattr
|
||||
from .trace import trace
|
||||
|
@ -60,7 +60,7 @@ class InvalidTerminal(RuntimeError):
|
|||
pass
|
||||
|
||||
|
||||
_error = (termios.error, curses.error, InvalidTerminal)
|
||||
_error = (termios.error, InvalidTerminal)
|
||||
|
||||
SIGWINCH_EVENT = "repaint"
|
||||
|
||||
|
@ -157,7 +157,7 @@ def __init__(
|
|||
|
||||
self.pollob = poll()
|
||||
self.pollob.register(self.input_fd, select.POLLIN)
|
||||
curses.setupterm(term or None, self.output_fd)
|
||||
self.terminfo = terminfo.TermInfo(term or None)
|
||||
self.term = term
|
||||
|
||||
@overload
|
||||
|
@ -167,7 +167,7 @@ def _my_getstr(cap: str, optional: Literal[False] = False) -> bytes: ...
|
|||
def _my_getstr(cap: str, optional: bool) -> bytes | None: ...
|
||||
|
||||
def _my_getstr(cap: str, optional: bool = False) -> bytes | None:
|
||||
r = curses.tigetstr(cap)
|
||||
r = self.terminfo.get(cap)
|
||||
if not optional and r is None:
|
||||
raise InvalidTerminal(
|
||||
f"terminal doesn't have the required {cap} capability"
|
||||
|
@ -201,7 +201,7 @@ def _my_getstr(cap: str, optional: bool = False) -> bytes | None:
|
|||
|
||||
self.__setup_movement()
|
||||
|
||||
self.event_queue = EventQueue(self.input_fd, self.encoding)
|
||||
self.event_queue = EventQueue(self.input_fd, self.encoding, self.terminfo)
|
||||
self.cursor_visible = 1
|
||||
|
||||
signal.signal(signal.SIGCONT, self._sigcont_handler)
|
||||
|
@ -597,14 +597,14 @@ def __setup_movement(self):
|
|||
if self._dch1:
|
||||
self.dch1 = self._dch1
|
||||
elif self._dch:
|
||||
self.dch1 = curses.tparm(self._dch, 1)
|
||||
self.dch1 = terminfo.tparm(self._dch, 1)
|
||||
else:
|
||||
self.dch1 = None
|
||||
|
||||
if self._ich1:
|
||||
self.ich1 = self._ich1
|
||||
elif self._ich:
|
||||
self.ich1 = curses.tparm(self._ich, 1)
|
||||
self.ich1 = terminfo.tparm(self._ich, 1)
|
||||
else:
|
||||
self.ich1 = None
|
||||
|
||||
|
@ -701,7 +701,7 @@ def __write(self, text):
|
|||
self.__buffer.append((text, 0))
|
||||
|
||||
def __write_code(self, fmt, *args):
|
||||
self.__buffer.append((curses.tparm(fmt, *args), 1))
|
||||
self.__buffer.append((terminfo.tparm(fmt, *args), 1))
|
||||
|
||||
def __maybe_write_code(self, fmt, *args):
|
||||
if fmt:
|
||||
|
|
|
@ -18,7 +18,7 @@
|
|||
# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
|
||||
# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
|
||||
from . import curses
|
||||
from .terminfo import TermInfo
|
||||
from .trace import trace
|
||||
from .base_eventqueue import BaseEventQueue
|
||||
from termios import tcgetattr, VERASE
|
||||
|
@ -54,22 +54,23 @@
|
|||
b'\033Oc': 'ctrl right',
|
||||
}
|
||||
|
||||
def get_terminal_keycodes() -> dict[bytes, str]:
|
||||
def get_terminal_keycodes(ti: TermInfo) -> dict[bytes, str]:
|
||||
"""
|
||||
Generates a dictionary mapping terminal keycodes to human-readable names.
|
||||
"""
|
||||
keycodes = {}
|
||||
for key, terminal_code in TERMINAL_KEYNAMES.items():
|
||||
keycode = curses.tigetstr(terminal_code)
|
||||
keycode = ti.get(terminal_code)
|
||||
trace('key {key} tiname {terminal_code} keycode {keycode!r}', **locals())
|
||||
if keycode:
|
||||
keycodes[keycode] = key
|
||||
keycodes.update(CTRL_ARROW_KEYCODES)
|
||||
return keycodes
|
||||
|
||||
|
||||
class EventQueue(BaseEventQueue):
|
||||
def __init__(self, fd: int, encoding: str) -> None:
|
||||
keycodes = get_terminal_keycodes()
|
||||
def __init__(self, fd: int, encoding: str, ti: TermInfo) -> None:
|
||||
keycodes = get_terminal_keycodes(ti)
|
||||
if os.isatty(fd):
|
||||
backspace = tcgetattr(fd)[6][VERASE]
|
||||
keycodes[backspace] = "backspace"
|
||||
|
|
|
@ -1,14 +1,14 @@
|
|||
import os
|
||||
import sys
|
||||
from test.support import requires, load_package_tests
|
||||
from test.support.import_helper import import_module
|
||||
from test.support import load_package_tests
|
||||
import unittest
|
||||
|
||||
if sys.platform != "win32":
|
||||
# On non-Windows platforms, testing pyrepl currently requires that the
|
||||
# 'curses' resource be given on the regrtest command line using the -u
|
||||
# option. Additionally, we need to attempt to import curses and readline.
|
||||
requires("curses")
|
||||
curses = import_module("curses")
|
||||
|
||||
try:
|
||||
import termios
|
||||
except ImportError:
|
||||
raise unittest.SkipTest("termios required")
|
||||
else:
|
||||
del termios
|
||||
|
||||
|
||||
def load_tests(*args):
|
||||
|
|
|
@ -3,6 +3,8 @@
|
|||
from unittest.mock import patch
|
||||
from test import support
|
||||
|
||||
from _pyrepl import terminfo
|
||||
|
||||
try:
|
||||
from _pyrepl.console import Event
|
||||
from _pyrepl import base_eventqueue
|
||||
|
@ -172,17 +174,22 @@ def _push(keys):
|
|||
self.assertEqual(eq.get(), _event("key", "a"))
|
||||
|
||||
|
||||
class EmptyTermInfo(terminfo.TermInfo):
|
||||
def get(self, cap: str) -> bytes:
|
||||
return b""
|
||||
|
||||
|
||||
@unittest.skipIf(support.MS_WINDOWS, "No Unix event queue on Windows")
|
||||
class TestUnixEventQueue(EventQueueTestBase, unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.enterContext(patch("_pyrepl.curses.tigetstr", lambda x: b""))
|
||||
self.file = tempfile.TemporaryFile()
|
||||
|
||||
def tearDown(self) -> None:
|
||||
self.file.close()
|
||||
|
||||
def make_eventqueue(self) -> base_eventqueue.BaseEventQueue:
|
||||
return unix_eventqueue.EventQueue(self.file.fileno(), "utf-8")
|
||||
ti = EmptyTermInfo("ansi")
|
||||
return unix_eventqueue.EventQueue(self.file.fileno(), "utf-8", ti)
|
||||
|
||||
|
||||
@unittest.skipUnless(support.MS_WINDOWS, "No Windows event queue on Unix")
|
||||
|
|
|
@ -9,10 +9,10 @@
|
|||
import sys
|
||||
import tempfile
|
||||
from pkgutil import ModuleInfo
|
||||
from unittest import TestCase, skipUnless, skipIf
|
||||
from unittest import TestCase, skipUnless, skipIf, SkipTest
|
||||
from unittest.mock import patch
|
||||
from test.support import force_not_colorized, make_clean_env, Py_DEBUG
|
||||
from test.support import SHORT_TIMEOUT, STDLIB_DIR
|
||||
from test.support import has_subprocess_support, SHORT_TIMEOUT, STDLIB_DIR
|
||||
from test.support.import_helper import import_module
|
||||
from test.support.os_helper import EnvironmentVarGuard, unlink
|
||||
|
||||
|
@ -38,6 +38,10 @@
|
|||
|
||||
|
||||
class ReplTestCase(TestCase):
|
||||
def setUp(self):
|
||||
if not has_subprocess_support:
|
||||
raise SkipTest("test module requires subprocess")
|
||||
|
||||
def run_repl(
|
||||
self,
|
||||
repl_input: str | list[str],
|
||||
|
@ -1371,6 +1375,7 @@ def setUp(self):
|
|||
# Cleanup from PYTHON* variables to isolate from local
|
||||
# user settings, see #121359. Such variables should be
|
||||
# added later in test methods to patched os.environ.
|
||||
super().setUp()
|
||||
patcher = patch('os.environ', new=make_clean_env())
|
||||
self.addCleanup(patcher.stop)
|
||||
patcher.start()
|
||||
|
|
651
Lib/test/test_pyrepl/test_terminfo.py
Normal file
651
Lib/test/test_pyrepl/test_terminfo.py
Normal file
|
@ -0,0 +1,651 @@
|
|||
"""Tests comparing PyREPL's pure Python curses implementation with the standard curses module."""
|
||||
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
import unittest
|
||||
from test.support import requires, has_subprocess_support
|
||||
from textwrap import dedent
|
||||
|
||||
# Only run these tests if curses is available
|
||||
requires("curses")
|
||||
|
||||
try:
|
||||
import _curses
|
||||
except ImportError:
|
||||
try:
|
||||
import curses as _curses
|
||||
except ImportError:
|
||||
_curses = None
|
||||
|
||||
from _pyrepl import terminfo
|
||||
|
||||
|
||||
ABSENT_STRING = terminfo.ABSENT_STRING
|
||||
CANCELLED_STRING = terminfo.CANCELLED_STRING
|
||||
|
||||
|
||||
class TestCursesCompatibility(unittest.TestCase):
|
||||
"""Test that PyREPL's curses implementation matches the standard curses behavior.
|
||||
|
||||
Python's `curses` doesn't allow calling `setupterm()` again with a different
|
||||
$TERM in the same process, so we subprocess all `curses` tests to get correctly
|
||||
set up terminfo."""
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
if _curses is None:
|
||||
raise unittest.SkipTest(
|
||||
"`curses` capability provided to regrtest but `_curses` not importable"
|
||||
)
|
||||
|
||||
if not has_subprocess_support:
|
||||
raise unittest.SkipTest("test module requires subprocess")
|
||||
|
||||
# we need to ensure there's a terminfo database on the system and that
|
||||
# `infocmp` works
|
||||
cls.infocmp("dumb")
|
||||
|
||||
def setUp(self):
|
||||
self.original_term = os.environ.get("TERM", None)
|
||||
|
||||
def tearDown(self):
|
||||
if self.original_term is not None:
|
||||
os.environ["TERM"] = self.original_term
|
||||
elif "TERM" in os.environ:
|
||||
del os.environ["TERM"]
|
||||
|
||||
@classmethod
|
||||
def infocmp(cls, term) -> list[str]:
|
||||
all_caps = []
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["infocmp", "-l1", term],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
check=True,
|
||||
)
|
||||
except Exception:
|
||||
raise unittest.SkipTest("calling `infocmp` failed on the system")
|
||||
|
||||
for line in result.stdout.splitlines():
|
||||
line = line.strip()
|
||||
if line.startswith("#"):
|
||||
if "terminfo" not in line and "termcap" in line:
|
||||
# PyREPL terminfo doesn't parse termcap databases
|
||||
raise unittest.SkipTest(
|
||||
"curses using termcap.db: no terminfo database on"
|
||||
" the system"
|
||||
)
|
||||
elif "=" in line:
|
||||
cap_name = line.split("=")[0]
|
||||
all_caps.append(cap_name)
|
||||
|
||||
return all_caps
|
||||
|
||||
def test_setupterm_basic(self):
|
||||
"""Test basic setupterm functionality."""
|
||||
# Test with explicit terminal type
|
||||
test_terms = ["xterm", "xterm-256color", "vt100", "ansi"]
|
||||
|
||||
for term in test_terms:
|
||||
with self.subTest(term=term):
|
||||
ncurses_code = dedent(
|
||||
f"""
|
||||
import _curses
|
||||
import json
|
||||
try:
|
||||
_curses.setupterm({repr(term)}, 1)
|
||||
print(json.dumps({{"success": True}}))
|
||||
except Exception as e:
|
||||
print(json.dumps({{"success": False, "error": str(e)}}))
|
||||
"""
|
||||
)
|
||||
|
||||
result = subprocess.run(
|
||||
[sys.executable, "-c", ncurses_code],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
ncurses_data = json.loads(result.stdout)
|
||||
std_success = ncurses_data["success"]
|
||||
|
||||
# Set up with PyREPL curses
|
||||
try:
|
||||
terminfo.TermInfo(term, fallback=False)
|
||||
pyrepl_success = True
|
||||
except Exception as e:
|
||||
pyrepl_success = False
|
||||
pyrepl_error = e
|
||||
|
||||
# Both should succeed or both should fail
|
||||
if std_success:
|
||||
self.assertTrue(
|
||||
pyrepl_success,
|
||||
f"Standard curses succeeded but PyREPL failed for {term}",
|
||||
)
|
||||
else:
|
||||
# If standard curses failed, PyREPL might still succeed with fallback
|
||||
# This is acceptable as PyREPL has hardcoded fallbacks
|
||||
pass
|
||||
|
||||
def test_setupterm_none(self):
|
||||
"""Test setupterm with None (uses TERM from environment)."""
|
||||
# Test with current TERM
|
||||
ncurses_code = dedent(
|
||||
"""
|
||||
import _curses
|
||||
import json
|
||||
try:
|
||||
_curses.setupterm(None, 1)
|
||||
print(json.dumps({"success": True}))
|
||||
except Exception as e:
|
||||
print(json.dumps({"success": False, "error": str(e)}))
|
||||
"""
|
||||
)
|
||||
|
||||
result = subprocess.run(
|
||||
[sys.executable, "-c", ncurses_code],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
ncurses_data = json.loads(result.stdout)
|
||||
std_success = ncurses_data["success"]
|
||||
|
||||
try:
|
||||
terminfo.TermInfo(None, fallback=False)
|
||||
pyrepl_success = True
|
||||
except Exception:
|
||||
pyrepl_success = False
|
||||
|
||||
# Both should have same result
|
||||
if std_success:
|
||||
self.assertTrue(
|
||||
pyrepl_success,
|
||||
"Standard curses succeeded but PyREPL failed for None",
|
||||
)
|
||||
|
||||
def test_tigetstr_common_capabilities(self):
|
||||
"""Test tigetstr for common terminal capabilities."""
|
||||
# Test with a known terminal type
|
||||
term = "xterm"
|
||||
|
||||
# Get ALL capabilities from infocmp
|
||||
all_caps = self.infocmp(term)
|
||||
|
||||
ncurses_code = dedent(
|
||||
f"""
|
||||
import _curses
|
||||
import json
|
||||
_curses.setupterm({repr(term)}, 1)
|
||||
results = {{}}
|
||||
for cap in {repr(all_caps)}:
|
||||
try:
|
||||
val = _curses.tigetstr(cap)
|
||||
if val is None:
|
||||
results[cap] = None
|
||||
elif val == -1:
|
||||
results[cap] = -1
|
||||
else:
|
||||
results[cap] = list(val)
|
||||
except BaseException:
|
||||
results[cap] = "error"
|
||||
print(json.dumps(results))
|
||||
"""
|
||||
)
|
||||
|
||||
result = subprocess.run(
|
||||
[sys.executable, "-c", ncurses_code],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
self.assertEqual(
|
||||
result.returncode, 0, f"Failed to run ncurses: {result.stderr}"
|
||||
)
|
||||
|
||||
ncurses_data = json.loads(result.stdout)
|
||||
|
||||
ti = terminfo.TermInfo(term, fallback=False)
|
||||
|
||||
# Test every single capability
|
||||
for cap in all_caps:
|
||||
if cap not in ncurses_data or ncurses_data[cap] == "error":
|
||||
continue
|
||||
|
||||
with self.subTest(capability=cap):
|
||||
ncurses_val = ncurses_data[cap]
|
||||
if isinstance(ncurses_val, list):
|
||||
ncurses_val = bytes(ncurses_val)
|
||||
|
||||
pyrepl_val = ti.get(cap)
|
||||
|
||||
self.assertEqual(
|
||||
pyrepl_val,
|
||||
ncurses_val,
|
||||
f"Capability {cap}: ncurses={repr(ncurses_val)}, "
|
||||
f"pyrepl={repr(pyrepl_val)}",
|
||||
)
|
||||
|
||||
def test_tigetstr_input_types(self):
|
||||
"""Test tigetstr with different input types."""
|
||||
term = "xterm"
|
||||
cap = "cup"
|
||||
|
||||
# Test standard curses behavior with string in subprocess
|
||||
ncurses_code = dedent(
|
||||
f"""
|
||||
import _curses
|
||||
import json
|
||||
_curses.setupterm({repr(term)}, 1)
|
||||
|
||||
# Test with string input
|
||||
try:
|
||||
std_str_result = _curses.tigetstr({repr(cap)})
|
||||
std_accepts_str = True
|
||||
if std_str_result is None:
|
||||
std_str_val = None
|
||||
elif std_str_result == -1:
|
||||
std_str_val = -1
|
||||
else:
|
||||
std_str_val = list(std_str_result)
|
||||
except TypeError:
|
||||
std_accepts_str = False
|
||||
std_str_val = None
|
||||
|
||||
print(json.dumps({{
|
||||
"accepts_str": std_accepts_str,
|
||||
"str_result": std_str_val
|
||||
}}))
|
||||
"""
|
||||
)
|
||||
|
||||
result = subprocess.run(
|
||||
[sys.executable, "-c", ncurses_code],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
ncurses_data = json.loads(result.stdout)
|
||||
|
||||
# PyREPL setup
|
||||
ti = terminfo.TermInfo(term, fallback=False)
|
||||
|
||||
# PyREPL behavior with string
|
||||
try:
|
||||
pyrepl_str_result = ti.get(cap)
|
||||
pyrepl_accepts_str = True
|
||||
except TypeError:
|
||||
pyrepl_accepts_str = False
|
||||
|
||||
# PyREPL should also only accept strings for compatibility
|
||||
with self.assertRaises(TypeError):
|
||||
ti.get(cap.encode("ascii"))
|
||||
|
||||
# Both should accept string input
|
||||
self.assertEqual(
|
||||
pyrepl_accepts_str,
|
||||
ncurses_data["accepts_str"],
|
||||
"PyREPL and standard curses should have same string handling",
|
||||
)
|
||||
self.assertTrue(
|
||||
pyrepl_accepts_str, "PyREPL should accept string input"
|
||||
)
|
||||
|
||||
def test_tparm_basic(self):
|
||||
"""Test basic tparm functionality."""
|
||||
term = "xterm"
|
||||
ti = terminfo.TermInfo(term, fallback=False)
|
||||
|
||||
# Test cursor positioning (cup)
|
||||
cup = ti.get("cup")
|
||||
if cup and cup not in {ABSENT_STRING, CANCELLED_STRING}:
|
||||
# Test various parameter combinations
|
||||
test_cases = [
|
||||
(0, 0), # Top-left
|
||||
(5, 10), # Arbitrary position
|
||||
(23, 79), # Bottom-right of standard terminal
|
||||
(999, 999), # Large values
|
||||
]
|
||||
|
||||
# Get ncurses results in subprocess
|
||||
ncurses_code = dedent(
|
||||
f"""
|
||||
import _curses
|
||||
import json
|
||||
_curses.setupterm({repr(term)}, 1)
|
||||
|
||||
# Get cup capability
|
||||
cup = _curses.tigetstr('cup')
|
||||
results = {{}}
|
||||
|
||||
for row, col in {repr(test_cases)}:
|
||||
try:
|
||||
result = _curses.tparm(cup, row, col)
|
||||
results[f"{{row}},{{col}}"] = list(result)
|
||||
except Exception as e:
|
||||
results[f"{{row}},{{col}}"] = {{"error": str(e)}}
|
||||
|
||||
print(json.dumps(results))
|
||||
"""
|
||||
)
|
||||
|
||||
result = subprocess.run(
|
||||
[sys.executable, "-c", ncurses_code],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
self.assertEqual(
|
||||
result.returncode, 0, f"Failed to run ncurses: {result.stderr}"
|
||||
)
|
||||
ncurses_data = json.loads(result.stdout)
|
||||
|
||||
for row, col in test_cases:
|
||||
with self.subTest(row=row, col=col):
|
||||
# Standard curses tparm from subprocess
|
||||
key = f"{row},{col}"
|
||||
if (
|
||||
isinstance(ncurses_data[key], dict)
|
||||
and "error" in ncurses_data[key]
|
||||
):
|
||||
self.fail(
|
||||
f"ncurses tparm failed: {ncurses_data[key]['error']}"
|
||||
)
|
||||
std_result = bytes(ncurses_data[key])
|
||||
|
||||
# PyREPL curses tparm
|
||||
pyrepl_result = terminfo.tparm(cup, row, col)
|
||||
|
||||
# Results should be identical
|
||||
self.assertEqual(
|
||||
pyrepl_result,
|
||||
std_result,
|
||||
f"tparm(cup, {row}, {col}): "
|
||||
f"std={repr(std_result)}, pyrepl={repr(pyrepl_result)}",
|
||||
)
|
||||
else:
|
||||
raise unittest.SkipTest(
|
||||
"test_tparm_basic() requires the `cup` capability"
|
||||
)
|
||||
|
||||
def test_tparm_multiple_params(self):
|
||||
"""Test tparm with capabilities using multiple parameters."""
|
||||
term = "xterm"
|
||||
ti = terminfo.TermInfo(term, fallback=False)
|
||||
|
||||
# Test capabilities that take parameters
|
||||
param_caps = {
|
||||
"cub": 1, # cursor_left with count
|
||||
"cuf": 1, # cursor_right with count
|
||||
"cuu": 1, # cursor_up with count
|
||||
"cud": 1, # cursor_down with count
|
||||
"dch": 1, # delete_character with count
|
||||
"ich": 1, # insert_character with count
|
||||
}
|
||||
|
||||
# Get all capabilities from PyREPL first
|
||||
pyrepl_caps = {}
|
||||
for cap in param_caps:
|
||||
cap_value = ti.get(cap)
|
||||
if cap_value and cap_value not in {
|
||||
ABSENT_STRING,
|
||||
CANCELLED_STRING,
|
||||
}:
|
||||
pyrepl_caps[cap] = cap_value
|
||||
|
||||
if not pyrepl_caps:
|
||||
self.skipTest("No parametrized capabilities found")
|
||||
|
||||
# Get ncurses results in subprocess
|
||||
ncurses_code = dedent(
|
||||
f"""
|
||||
import _curses
|
||||
import json
|
||||
_curses.setupterm({repr(term)}, 1)
|
||||
|
||||
param_caps = {repr(param_caps)}
|
||||
test_values = [1, 5, 10, 99]
|
||||
results = {{}}
|
||||
|
||||
for cap in param_caps:
|
||||
cap_value = _curses.tigetstr(cap)
|
||||
if cap_value and cap_value != -1:
|
||||
for value in test_values:
|
||||
try:
|
||||
result = _curses.tparm(cap_value, value)
|
||||
results[f"{{cap}},{{value}}"] = list(result)
|
||||
except Exception as e:
|
||||
results[f"{{cap}},{{value}}"] = {{"error": str(e)}}
|
||||
|
||||
print(json.dumps(results))
|
||||
"""
|
||||
)
|
||||
|
||||
result = subprocess.run(
|
||||
[sys.executable, "-c", ncurses_code],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
self.assertEqual(
|
||||
result.returncode, 0, f"Failed to run ncurses: {result.stderr}"
|
||||
)
|
||||
ncurses_data = json.loads(result.stdout)
|
||||
|
||||
for cap, cap_value in pyrepl_caps.items():
|
||||
with self.subTest(capability=cap):
|
||||
# Test with different parameter values
|
||||
for value in [1, 5, 10, 99]:
|
||||
key = f"{cap},{value}"
|
||||
if key in ncurses_data:
|
||||
if (
|
||||
isinstance(ncurses_data[key], dict)
|
||||
and "error" in ncurses_data[key]
|
||||
):
|
||||
self.fail(
|
||||
f"ncurses tparm failed: {ncurses_data[key]['error']}"
|
||||
)
|
||||
std_result = bytes(ncurses_data[key])
|
||||
|
||||
pyrepl_result = terminfo.tparm(cap_value, value)
|
||||
self.assertEqual(
|
||||
pyrepl_result,
|
||||
std_result,
|
||||
f"tparm({cap}, {value}): "
|
||||
f"std={repr(std_result)}, pyrepl={repr(pyrepl_result)}",
|
||||
)
|
||||
|
||||
def test_tparm_null_handling(self):
|
||||
"""Test tparm with None/null input."""
|
||||
term = "xterm"
|
||||
|
||||
ncurses_code = dedent(
|
||||
f"""
|
||||
import _curses
|
||||
import json
|
||||
_curses.setupterm({repr(term)}, 1)
|
||||
|
||||
# Test with None
|
||||
try:
|
||||
_curses.tparm(None)
|
||||
raises_typeerror = False
|
||||
except TypeError:
|
||||
raises_typeerror = True
|
||||
except Exception as e:
|
||||
raises_typeerror = False
|
||||
error_type = type(e).__name__
|
||||
|
||||
print(json.dumps({{"raises_typeerror": raises_typeerror}}))
|
||||
"""
|
||||
)
|
||||
|
||||
result = subprocess.run(
|
||||
[sys.executable, "-c", ncurses_code],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
ncurses_data = json.loads(result.stdout)
|
||||
|
||||
# PyREPL setup
|
||||
ti = terminfo.TermInfo(term, fallback=False)
|
||||
|
||||
# Test with None - both should raise TypeError
|
||||
if ncurses_data["raises_typeerror"]:
|
||||
with self.assertRaises(TypeError):
|
||||
terminfo.tparm(None)
|
||||
else:
|
||||
# If ncurses doesn't raise TypeError, PyREPL shouldn't either
|
||||
try:
|
||||
terminfo.tparm(None)
|
||||
except TypeError:
|
||||
self.fail("PyREPL raised TypeError but ncurses did not")
|
||||
|
||||
def test_special_terminals(self):
|
||||
"""Test with special terminal types."""
|
||||
special_terms = [
|
||||
"dumb", # Minimal terminal
|
||||
"unknown", # Should fall back to defaults
|
||||
"linux", # Linux console
|
||||
"screen", # GNU Screen
|
||||
"tmux", # tmux
|
||||
]
|
||||
|
||||
# Get all string capabilities from ncurses
|
||||
for term in special_terms:
|
||||
with self.subTest(term=term):
|
||||
all_caps = self.infocmp(term)
|
||||
ncurses_code = dedent(
|
||||
f"""
|
||||
import _curses
|
||||
import json
|
||||
import sys
|
||||
|
||||
try:
|
||||
_curses.setupterm({repr(term)}, 1)
|
||||
results = {{}}
|
||||
for cap in {repr(all_caps)}:
|
||||
try:
|
||||
val = _curses.tigetstr(cap)
|
||||
if val is None:
|
||||
results[cap] = None
|
||||
elif val == -1:
|
||||
results[cap] = -1
|
||||
else:
|
||||
# Convert bytes to list of ints for JSON
|
||||
results[cap] = list(val)
|
||||
except BaseException:
|
||||
results[cap] = "error"
|
||||
print(json.dumps(results))
|
||||
except Exception as e:
|
||||
print(json.dumps({{"error": str(e)}}))
|
||||
"""
|
||||
)
|
||||
|
||||
# Get ncurses results
|
||||
result = subprocess.run(
|
||||
[sys.executable, "-c", ncurses_code],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
if result.returncode != 0:
|
||||
self.fail(
|
||||
f"Failed to get ncurses data for {term}: {result.stderr}"
|
||||
)
|
||||
|
||||
try:
|
||||
ncurses_data = json.loads(result.stdout)
|
||||
except json.JSONDecodeError:
|
||||
self.fail(
|
||||
f"Failed to parse ncurses output for {term}: {result.stdout}"
|
||||
)
|
||||
|
||||
if "error" in ncurses_data and len(ncurses_data) == 1:
|
||||
# ncurses failed to setup this terminal
|
||||
# PyREPL should still work with fallback
|
||||
ti = terminfo.TermInfo(term, fallback=True)
|
||||
continue
|
||||
|
||||
ti = terminfo.TermInfo(term, fallback=False)
|
||||
|
||||
# Compare all capabilities
|
||||
for cap in all_caps:
|
||||
if cap not in ncurses_data:
|
||||
continue
|
||||
|
||||
with self.subTest(term=term, capability=cap):
|
||||
ncurses_val = ncurses_data[cap]
|
||||
if isinstance(ncurses_val, list):
|
||||
# Convert back to bytes
|
||||
ncurses_val = bytes(ncurses_val)
|
||||
|
||||
pyrepl_val = ti.get(cap)
|
||||
|
||||
# Both should return the same value
|
||||
self.assertEqual(
|
||||
pyrepl_val,
|
||||
ncurses_val,
|
||||
f"Capability {cap} for {term}: "
|
||||
f"ncurses={repr(ncurses_val)}, "
|
||||
f"pyrepl={repr(pyrepl_val)}",
|
||||
)
|
||||
|
||||
def test_terminfo_fallback(self):
|
||||
"""Test that PyREPL falls back gracefully when terminfo is not found."""
|
||||
# Use a non-existent terminal type
|
||||
fake_term = "nonexistent-terminal-type-12345"
|
||||
|
||||
# Check if standard curses can setup this terminal in subprocess
|
||||
ncurses_code = dedent(
|
||||
f"""
|
||||
import _curses
|
||||
import json
|
||||
try:
|
||||
_curses.setupterm({repr(fake_term)}, 1)
|
||||
print(json.dumps({{"success": True}}))
|
||||
except _curses.error:
|
||||
print(json.dumps({{"success": False, "error": "curses.error"}}))
|
||||
except Exception as e:
|
||||
print(json.dumps({{"success": False, "error": str(e)}}))
|
||||
"""
|
||||
)
|
||||
|
||||
result = subprocess.run(
|
||||
[sys.executable, "-c", ncurses_code],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
ncurses_data = json.loads(result.stdout)
|
||||
|
||||
if ncurses_data["success"]:
|
||||
# If it succeeded, skip this test as we can't test fallback
|
||||
self.skipTest(
|
||||
f"System unexpectedly has terminfo for '{fake_term}'"
|
||||
)
|
||||
|
||||
# PyREPL should succeed with fallback
|
||||
try:
|
||||
ti = terminfo.TermInfo(fake_term, fallback=True)
|
||||
pyrepl_ok = True
|
||||
except Exception:
|
||||
pyrepl_ok = False
|
||||
|
||||
self.assertTrue(
|
||||
pyrepl_ok, "PyREPL should fall back for unknown terminals"
|
||||
)
|
||||
|
||||
# Should still be able to get basic capabilities
|
||||
bel = ti.get("bel")
|
||||
self.assertIsNotNone(
|
||||
bel, "PyREPL should provide basic capabilities after fallback"
|
||||
)
|
||||
|
||||
def test_invalid_terminal_names(self):
|
||||
cases = [
|
||||
(42, TypeError),
|
||||
("", ValueError),
|
||||
("w\x00t", ValueError),
|
||||
(f"..{os.sep}name", ValueError),
|
||||
]
|
||||
|
||||
for term, exc in cases:
|
||||
with self.subTest(term=term):
|
||||
with self.assertRaises(exc):
|
||||
terminfo._validate_terminal_name_or_raise(term)
|
|
@ -16,9 +16,13 @@
|
|||
except ImportError:
|
||||
pass
|
||||
|
||||
from _pyrepl.terminfo import _TERMINAL_CAPABILITIES
|
||||
|
||||
TERM_CAPABILITIES = _TERMINAL_CAPABILITIES["ansi"]
|
||||
|
||||
|
||||
def unix_console(events, **kwargs):
|
||||
console = UnixConsole()
|
||||
console = UnixConsole(term="xterm")
|
||||
console.get_event = MagicMock(side_effect=events)
|
||||
console.getpending = MagicMock(return_value=Event("key", ""))
|
||||
|
||||
|
@ -50,41 +54,11 @@ def unix_console(events, **kwargs):
|
|||
)
|
||||
|
||||
|
||||
TERM_CAPABILITIES = {
|
||||
"bel": b"\x07",
|
||||
"civis": b"\x1b[?25l",
|
||||
"clear": b"\x1b[H\x1b[2J",
|
||||
"cnorm": b"\x1b[?12l\x1b[?25h",
|
||||
"cub": b"\x1b[%p1%dD",
|
||||
"cub1": b"\x08",
|
||||
"cud": b"\x1b[%p1%dB",
|
||||
"cud1": b"\n",
|
||||
"cuf": b"\x1b[%p1%dC",
|
||||
"cuf1": b"\x1b[C",
|
||||
"cup": b"\x1b[%i%p1%d;%p2%dH",
|
||||
"cuu": b"\x1b[%p1%dA",
|
||||
"cuu1": b"\x1b[A",
|
||||
"dch1": b"\x1b[P",
|
||||
"dch": b"\x1b[%p1%dP",
|
||||
"el": b"\x1b[K",
|
||||
"hpa": b"\x1b[%i%p1%dG",
|
||||
"ich": b"\x1b[%p1%d@",
|
||||
"ich1": None,
|
||||
"ind": b"\n",
|
||||
"pad": None,
|
||||
"ri": b"\x1bM",
|
||||
"rmkx": b"\x1b[?1l\x1b>",
|
||||
"smkx": b"\x1b[?1h\x1b=",
|
||||
}
|
||||
|
||||
|
||||
@unittest.skipIf(sys.platform == "win32", "No Unix event queue on Windows")
|
||||
@patch("_pyrepl.curses.tigetstr", lambda s: TERM_CAPABILITIES.get(s))
|
||||
@patch(
|
||||
"_pyrepl.curses.tparm",
|
||||
"_pyrepl.terminfo.tparm",
|
||||
lambda s, *args: s + b":" + b",".join(str(i).encode() for i in args),
|
||||
)
|
||||
@patch("_pyrepl.curses.setupterm", lambda a, b: None)
|
||||
@patch(
|
||||
"termios.tcgetattr",
|
||||
lambda _: [
|
||||
|
@ -321,7 +295,7 @@ def same_console(events):
|
|||
|
||||
def test_getheightwidth_with_invalid_environ(self, _os_write):
|
||||
# gh-128636
|
||||
console = UnixConsole()
|
||||
console = UnixConsole(term="xterm")
|
||||
with os_helper.EnvironmentVarGuard() as env:
|
||||
env["LINES"] = ""
|
||||
self.assertIsInstance(console.getheightwidth(), tuple)
|
||||
|
|
|
@ -0,0 +1,2 @@
|
|||
PyREPL no longer depends on the :mod:`curses` standard library. Contributed
|
||||
by Łukasz Langa.
|
Loading…
Add table
Add a link
Reference in a new issue