2025-05-02 20:22:31 +02:00
|
|
|
from __future__ import annotations
|
|
|
|
import builtins
|
|
|
|
import functools
|
|
|
|
import keyword
|
2024-05-05 21:32:23 +02:00
|
|
|
import re
|
2025-05-02 20:22:31 +02:00
|
|
|
import token as T
|
|
|
|
import tokenize
|
2024-05-05 21:32:23 +02:00
|
|
|
import unicodedata
|
2025-05-02 20:22:31 +02:00
|
|
|
import _colorize
|
|
|
|
|
|
|
|
from collections import deque
|
|
|
|
from io import StringIO
|
|
|
|
from tokenize import TokenInfo as TI
|
|
|
|
from typing import Iterable, Iterator, Match, NamedTuple, Self
|
2024-05-05 21:32:23 +02:00
|
|
|
|
2025-03-21 18:27:35 +01:00
|
|
|
from .types import CharBuffer, CharWidths
|
|
|
|
from .trace import trace
|
|
|
|
|
2024-05-05 21:32:23 +02:00
|
|
|
ANSI_ESCAPE_SEQUENCE = re.compile(r"\x1b\[[ -@]*[A-~]")
|
2025-03-21 15:48:10 +01:00
|
|
|
ZERO_WIDTH_BRACKET = re.compile(r"\x01.*?\x02")
|
|
|
|
ZERO_WIDTH_TRANS = str.maketrans({"\x01": "", "\x02": ""})
|
2025-05-02 20:22:31 +02:00
|
|
|
IDENTIFIERS_AFTER = {"def", "class"}
|
|
|
|
BUILTINS = {str(name) for name in dir(builtins) if not name.startswith('_')}
|
|
|
|
|
|
|
|
|
2025-05-06 05:44:49 -04:00
|
|
|
def THEME(**kwargs):
|
2025-05-05 23:45:25 +02:00
|
|
|
# Not cached: the user can modify the theme inside the interactive session.
|
2025-05-06 05:44:49 -04:00
|
|
|
return _colorize.get_theme(**kwargs).syntax
|
2025-05-05 23:45:25 +02:00
|
|
|
|
|
|
|
|
2025-05-02 20:22:31 +02:00
|
|
|
class Span(NamedTuple):
|
|
|
|
"""Span indexing that's inclusive on both ends."""
|
|
|
|
|
|
|
|
start: int
|
|
|
|
end: int
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def from_re(cls, m: Match[str], group: int | str) -> Self:
|
|
|
|
re_span = m.span(group)
|
|
|
|
return cls(re_span[0], re_span[1] - 1)
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def from_token(cls, token: TI, line_len: list[int]) -> Self:
|
2025-05-19 16:12:23 +02:00
|
|
|
end_offset = -1
|
|
|
|
if (token.type in {T.FSTRING_MIDDLE, T.TSTRING_MIDDLE}
|
|
|
|
and token.string.endswith(("{", "}"))):
|
|
|
|
# gh-134158: a visible trailing brace comes from a double brace in input
|
|
|
|
end_offset += 1
|
|
|
|
|
2025-05-02 20:22:31 +02:00
|
|
|
return cls(
|
|
|
|
line_len[token.start[0] - 1] + token.start[1],
|
2025-05-19 16:12:23 +02:00
|
|
|
line_len[token.end[0] - 1] + token.end[1] + end_offset,
|
2025-05-02 20:22:31 +02:00
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
class ColorSpan(NamedTuple):
|
|
|
|
span: Span
|
2025-05-05 23:45:25 +02:00
|
|
|
tag: str
|
2024-05-05 21:32:23 +02:00
|
|
|
|
|
|
|
|
2024-05-22 01:28:32 -04:00
|
|
|
@functools.cache
|
2024-05-05 21:32:23 +02:00
|
|
|
def str_width(c: str) -> int:
|
2024-05-22 01:28:32 -04:00
|
|
|
if ord(c) < 128:
|
|
|
|
return 1
|
2024-05-05 21:32:23 +02:00
|
|
|
w = unicodedata.east_asian_width(c)
|
2025-03-21 15:48:10 +01:00
|
|
|
if w in ("N", "Na", "H", "A"):
|
2024-05-05 21:32:23 +02:00
|
|
|
return 1
|
|
|
|
return 2
|
|
|
|
|
|
|
|
|
|
|
|
def wlen(s: str) -> int:
|
2025-03-21 15:48:10 +01:00
|
|
|
if len(s) == 1 and s != "\x1a":
|
gh-119517: Fixes for pasting in pyrepl (#120253)
* Remove pyrepl's optimization for self-insert
This will be replaced by a less specialized optimization.
* Use line-buffering when pyrepl echoes pastes
Previously echoing was totally suppressed until the entire command had
been pasted and the terminal ended paste mode, but this gives the user
no feedback to indicate that an operation is in progress. Drawing
something to the screen once per line strikes a balance between
perceived responsiveness and performance.
* Remove dead code from pyrepl
`msg_at_bottom` is always true.
* Speed up pyrepl's screen rendering computation
The Reader in pyrepl doesn't hold a complete representation of the
screen area being drawn as persistent state. Instead, it recomputes it,
on each keypress. This is fast enough for a few hundred bytes, but
incredibly slow as the input buffer grows into the kilobytes (likely
because of pasting).
Rather than making some expensive and expansive changes to the repl's
internal representation of the screen, add some caching: remember some
data from one refresh to the next about what was drawn to the screen
and, if we don't find anything that has invalidated the results that
were computed last time around, reuse them. To keep this caching as
simple as possible, all we'll do is look for lines in the buffer that
were above the cursor the last time we were asked to update the screen,
and that are still above the cursor now. We assume that nothing can
affect a line that comes before both the old and new cursor location
without us being informed. Based on this assumption, we can reuse old
lines, which drastically speeds up the overwhelmingly common case where
the user is typing near the end of the buffer.
* Speed up pyrepl prompt drawing
Cache the `can_colorize()` call rather than repeatedly recomputing it.
This call looks up an environment variable, and is called once per
character typed at the REPL. The environment variable lookup shows up as
a hot spot when profiling, and we don't expect this to change while the
REPL is running.
* Speed up pasting multiple lines into the REPL
Previously, we were checking whether the command should be accepted each
time a line break was encountered, but that's not the expected behavior.
In bracketed paste mode, we expect everything pasted to be part of
a single block of code, and encountering a newline shouldn't behave like
a user pressing <Enter> to execute a command. The user should always
have a chance to review the pasted command before running it.
* Use a read buffer for input in pyrepl
Previously we were reading one byte at a time, which causes much slower
IO than necessary. Instead, read in chunks, processing previously read
data before asking for more.
* Optimize finding width of a single character
`wlen` finds the width of a multi-character string by adding up the
width of each character, and then subtracting the width of any escape
sequences. It's often called for single character strings, however,
which can't possibly contain escape sequences. Optimize for that case.
* Optimize disp_str for ASCII characters
Since every ASCII character is known to display as single width, we can
avoid not only the Unicode data lookup in `disp_str` but also the one
hidden in `str_width` for them.
* Speed up cursor movements in long pyrepl commands
When the current pyrepl command buffer contains many lines, scrolling up
becomes slow. We have optimizations in place to reuse lines above the
cursor position from one refresh to the next, but don't currently try to
reuse lines below the cursor position in the same way, so we wind up
with quadratic behavior where all lines of the buffer below the cursor
are recomputed each time the cursor moves up another line.
Optimize this by only computing one screen's worth of lines beyond the
cursor position. Any lines beyond that can't possibly be shown by the
console, and bounding this makes scrolling up have linear time
complexity instead.
---------
Signed-off-by: Matt Wozniski <mwozniski@bloomberg.net>
Co-authored-by: Pablo Galindo <pablogsal@gmail.com>
2024-06-11 12:42:10 -04:00
|
|
|
return str_width(s)
|
2024-05-05 21:32:23 +02:00
|
|
|
length = sum(str_width(i) for i in s)
|
|
|
|
# remove lengths of any escape sequences
|
2024-05-22 01:28:32 -04:00
|
|
|
sequence = ANSI_ESCAPE_SEQUENCE.findall(s)
|
2025-03-21 15:48:10 +01:00
|
|
|
ctrl_z_cnt = s.count("\x1a")
|
2024-07-30 05:03:52 -07:00
|
|
|
return length - sum(len(i) for i in sequence) + ctrl_z_cnt
|
2025-03-21 15:48:10 +01:00
|
|
|
|
|
|
|
|
|
|
|
def unbracket(s: str, including_content: bool = False) -> str:
|
|
|
|
r"""Return `s` with \001 and \002 characters removed.
|
|
|
|
|
|
|
|
If `including_content` is True, content between \001 and \002 is also
|
|
|
|
stripped.
|
|
|
|
"""
|
|
|
|
if including_content:
|
|
|
|
return ZERO_WIDTH_BRACKET.sub("", s)
|
|
|
|
return s.translate(ZERO_WIDTH_TRANS)
|
2025-03-21 18:27:35 +01:00
|
|
|
|
|
|
|
|
2025-05-02 20:22:31 +02:00
|
|
|
def gen_colors(buffer: str) -> Iterator[ColorSpan]:
|
|
|
|
"""Returns a list of index spans to color using the given color tag.
|
|
|
|
|
|
|
|
The input `buffer` should be a valid start of a Python code block, i.e.
|
|
|
|
it cannot be a block starting in the middle of a multiline string.
|
|
|
|
"""
|
|
|
|
sio = StringIO(buffer)
|
|
|
|
line_lengths = [0] + [len(line) for line in sio.readlines()]
|
|
|
|
# make line_lengths cumulative
|
|
|
|
for i in range(1, len(line_lengths)):
|
|
|
|
line_lengths[i] += line_lengths[i-1]
|
|
|
|
|
|
|
|
sio.seek(0)
|
|
|
|
gen = tokenize.generate_tokens(sio.readline)
|
|
|
|
last_emitted: ColorSpan | None = None
|
|
|
|
try:
|
|
|
|
for color in gen_colors_from_token_stream(gen, line_lengths):
|
|
|
|
yield color
|
|
|
|
last_emitted = color
|
2025-05-08 22:14:38 +02:00
|
|
|
except SyntaxError:
|
|
|
|
return
|
2025-05-02 20:22:31 +02:00
|
|
|
except tokenize.TokenError as te:
|
|
|
|
yield from recover_unterminated_string(
|
|
|
|
te, line_lengths, last_emitted, buffer
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
def recover_unterminated_string(
|
|
|
|
exc: tokenize.TokenError,
|
|
|
|
line_lengths: list[int],
|
|
|
|
last_emitted: ColorSpan | None,
|
|
|
|
buffer: str,
|
|
|
|
) -> Iterator[ColorSpan]:
|
|
|
|
msg, loc = exc.args
|
|
|
|
if loc is None:
|
|
|
|
return
|
|
|
|
|
|
|
|
line_no, column = loc
|
|
|
|
|
|
|
|
if msg.startswith(
|
|
|
|
(
|
|
|
|
"unterminated string literal",
|
|
|
|
"unterminated f-string literal",
|
|
|
|
"unterminated t-string literal",
|
|
|
|
"EOF in multi-line string",
|
|
|
|
"unterminated triple-quoted f-string literal",
|
|
|
|
"unterminated triple-quoted t-string literal",
|
|
|
|
)
|
|
|
|
):
|
|
|
|
start = line_lengths[line_no - 1] + column - 1
|
|
|
|
end = line_lengths[-1] - 1
|
|
|
|
|
|
|
|
# in case FSTRING_START was already emitted
|
|
|
|
if last_emitted and start <= last_emitted.span.start:
|
|
|
|
trace("before last emitted = {s}", s=start)
|
|
|
|
start = last_emitted.span.end + 1
|
|
|
|
|
|
|
|
span = Span(start, end)
|
|
|
|
trace("yielding span {a} -> {b}", a=span.start, b=span.end)
|
2025-05-05 23:45:25 +02:00
|
|
|
yield ColorSpan(span, "string")
|
2025-05-02 20:22:31 +02:00
|
|
|
else:
|
|
|
|
trace(
|
|
|
|
"unhandled token error({buffer}) = {te}",
|
|
|
|
buffer=repr(buffer),
|
|
|
|
te=str(exc),
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
def gen_colors_from_token_stream(
|
|
|
|
token_generator: Iterator[TI],
|
|
|
|
line_lengths: list[int],
|
|
|
|
) -> Iterator[ColorSpan]:
|
|
|
|
token_window = prev_next_window(token_generator)
|
|
|
|
|
|
|
|
is_def_name = False
|
|
|
|
bracket_level = 0
|
|
|
|
for prev_token, token, next_token in token_window:
|
|
|
|
assert token is not None
|
|
|
|
if token.start == token.end:
|
|
|
|
continue
|
|
|
|
|
|
|
|
match token.type:
|
|
|
|
case (
|
|
|
|
T.STRING
|
|
|
|
| T.FSTRING_START | T.FSTRING_MIDDLE | T.FSTRING_END
|
|
|
|
| T.TSTRING_START | T.TSTRING_MIDDLE | T.TSTRING_END
|
|
|
|
):
|
|
|
|
span = Span.from_token(token, line_lengths)
|
2025-05-05 23:45:25 +02:00
|
|
|
yield ColorSpan(span, "string")
|
2025-05-02 20:22:31 +02:00
|
|
|
case T.COMMENT:
|
|
|
|
span = Span.from_token(token, line_lengths)
|
2025-05-05 23:45:25 +02:00
|
|
|
yield ColorSpan(span, "comment")
|
2025-05-02 20:22:31 +02:00
|
|
|
case T.NUMBER:
|
|
|
|
span = Span.from_token(token, line_lengths)
|
2025-05-05 23:45:25 +02:00
|
|
|
yield ColorSpan(span, "number")
|
2025-05-02 20:22:31 +02:00
|
|
|
case T.OP:
|
|
|
|
if token.string in "([{":
|
|
|
|
bracket_level += 1
|
|
|
|
elif token.string in ")]}":
|
|
|
|
bracket_level -= 1
|
|
|
|
span = Span.from_token(token, line_lengths)
|
2025-05-05 23:45:25 +02:00
|
|
|
yield ColorSpan(span, "op")
|
2025-05-02 20:22:31 +02:00
|
|
|
case T.NAME:
|
|
|
|
if is_def_name:
|
|
|
|
is_def_name = False
|
|
|
|
span = Span.from_token(token, line_lengths)
|
2025-05-05 23:45:25 +02:00
|
|
|
yield ColorSpan(span, "definition")
|
2025-05-02 20:22:31 +02:00
|
|
|
elif keyword.iskeyword(token.string):
|
|
|
|
span = Span.from_token(token, line_lengths)
|
2025-05-05 23:45:25 +02:00
|
|
|
yield ColorSpan(span, "keyword")
|
2025-05-02 20:22:31 +02:00
|
|
|
if token.string in IDENTIFIERS_AFTER:
|
|
|
|
is_def_name = True
|
|
|
|
elif (
|
|
|
|
keyword.issoftkeyword(token.string)
|
|
|
|
and bracket_level == 0
|
|
|
|
and is_soft_keyword_used(prev_token, token, next_token)
|
|
|
|
):
|
|
|
|
span = Span.from_token(token, line_lengths)
|
2025-05-05 23:45:25 +02:00
|
|
|
yield ColorSpan(span, "soft_keyword")
|
2025-05-02 20:22:31 +02:00
|
|
|
elif token.string in BUILTINS:
|
|
|
|
span = Span.from_token(token, line_lengths)
|
2025-05-05 23:45:25 +02:00
|
|
|
yield ColorSpan(span, "builtin")
|
2025-05-02 20:22:31 +02:00
|
|
|
|
|
|
|
|
|
|
|
keyword_first_sets_match = {"False", "None", "True", "await", "lambda", "not"}
|
|
|
|
keyword_first_sets_case = {"False", "None", "True"}
|
|
|
|
|
|
|
|
|
|
|
|
def is_soft_keyword_used(*tokens: TI | None) -> bool:
|
|
|
|
"""Returns True if the current token is a keyword in this context.
|
|
|
|
|
|
|
|
For the `*tokens` to match anything, they have to be a three-tuple of
|
|
|
|
(previous, current, next).
|
|
|
|
"""
|
|
|
|
trace("is_soft_keyword_used{t}", t=tokens)
|
|
|
|
match tokens:
|
|
|
|
case (
|
|
|
|
None | TI(T.NEWLINE) | TI(T.INDENT) | TI(string=":"),
|
|
|
|
TI(string="match"),
|
|
|
|
TI(T.NUMBER | T.STRING | T.FSTRING_START | T.TSTRING_START)
|
|
|
|
| TI(T.OP, string="(" | "*" | "[" | "{" | "~" | "...")
|
|
|
|
):
|
|
|
|
return True
|
|
|
|
case (
|
|
|
|
None | TI(T.NEWLINE) | TI(T.INDENT) | TI(string=":"),
|
|
|
|
TI(string="match"),
|
|
|
|
TI(T.NAME, string=s)
|
|
|
|
):
|
|
|
|
if keyword.iskeyword(s):
|
|
|
|
return s in keyword_first_sets_match
|
|
|
|
return True
|
|
|
|
case (
|
|
|
|
None | TI(T.NEWLINE) | TI(T.INDENT) | TI(string=":"),
|
|
|
|
TI(string="case"),
|
|
|
|
TI(T.NUMBER | T.STRING | T.FSTRING_START | T.TSTRING_START)
|
|
|
|
| TI(T.OP, string="(" | "*" | "-" | "[" | "{")
|
|
|
|
):
|
|
|
|
return True
|
|
|
|
case (
|
|
|
|
None | TI(T.NEWLINE) | TI(T.INDENT) | TI(string=":"),
|
|
|
|
TI(string="case"),
|
|
|
|
TI(T.NAME, string=s)
|
|
|
|
):
|
|
|
|
if keyword.iskeyword(s):
|
|
|
|
return s in keyword_first_sets_case
|
|
|
|
return True
|
|
|
|
case (TI(string="case"), TI(string="_"), TI(string=":")):
|
|
|
|
return True
|
|
|
|
case _:
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
def disp_str(
|
2025-05-06 05:44:49 -04:00
|
|
|
buffer: str,
|
|
|
|
colors: list[ColorSpan] | None = None,
|
|
|
|
start_index: int = 0,
|
|
|
|
force_color: bool = False,
|
2025-05-02 20:22:31 +02:00
|
|
|
) -> tuple[CharBuffer, CharWidths]:
|
|
|
|
r"""Decompose the input buffer into a printable variant with applied colors.
|
2025-03-21 18:27:35 +01:00
|
|
|
|
|
|
|
Returns a tuple of two lists:
|
2025-05-02 20:22:31 +02:00
|
|
|
- the first list is the input buffer, character by character, with color
|
|
|
|
escape codes added (while those codes contain multiple ASCII characters,
|
|
|
|
each code is considered atomic *and is attached for the corresponding
|
|
|
|
visible character*);
|
2025-03-21 18:27:35 +01:00
|
|
|
- the second list is the visible width of each character in the input
|
|
|
|
buffer.
|
|
|
|
|
2025-05-02 20:22:31 +02:00
|
|
|
Note on colors:
|
|
|
|
- The `colors` list, if provided, is partially consumed within. We're using
|
|
|
|
a list and not a generator since we need to hold onto the current
|
|
|
|
unfinished span between calls to disp_str in case of multiline strings.
|
|
|
|
- The `colors` list is computed from the start of the input block. `buffer`
|
|
|
|
is only a subset of that input block, a single line within. This is why
|
|
|
|
we need `start_index` to inform us which position is the start of `buffer`
|
|
|
|
actually within user input. This allows us to match color spans correctly.
|
|
|
|
|
2025-03-21 18:27:35 +01:00
|
|
|
Examples:
|
|
|
|
>>> utils.disp_str("a = 9")
|
|
|
|
(['a', ' ', '=', ' ', '9'], [1, 1, 1, 1, 1])
|
2025-05-02 20:22:31 +02:00
|
|
|
|
|
|
|
>>> line = "while 1:"
|
|
|
|
>>> colors = list(utils.gen_colors(line))
|
|
|
|
>>> utils.disp_str(line, colors=colors)
|
|
|
|
(['\x1b[1;34mw', 'h', 'i', 'l', 'e\x1b[0m', ' ', '1', ':'], [1, 1, 1, 1, 1, 1, 1, 1])
|
|
|
|
|
2025-03-21 18:27:35 +01:00
|
|
|
"""
|
|
|
|
chars: CharBuffer = []
|
|
|
|
char_widths: CharWidths = []
|
|
|
|
|
|
|
|
if not buffer:
|
|
|
|
return chars, char_widths
|
|
|
|
|
2025-05-02 20:22:31 +02:00
|
|
|
while colors and colors[0].span.end < start_index:
|
|
|
|
# move past irrelevant spans
|
|
|
|
colors.pop(0)
|
|
|
|
|
2025-05-06 05:44:49 -04:00
|
|
|
theme = THEME(force_color=force_color)
|
2025-05-02 20:22:31 +02:00
|
|
|
pre_color = ""
|
|
|
|
post_color = ""
|
|
|
|
if colors and colors[0].span.start < start_index:
|
|
|
|
# looks like we're continuing a previous color (e.g. a multiline str)
|
2025-05-05 23:45:25 +02:00
|
|
|
pre_color = theme[colors[0].tag]
|
2025-05-02 20:22:31 +02:00
|
|
|
|
|
|
|
for i, c in enumerate(buffer, start_index):
|
|
|
|
if colors and colors[0].span.start == i: # new color starts now
|
2025-05-05 23:45:25 +02:00
|
|
|
pre_color = theme[colors[0].tag]
|
2025-05-02 20:22:31 +02:00
|
|
|
|
2025-03-21 18:27:35 +01:00
|
|
|
if c == "\x1a": # CTRL-Z on Windows
|
|
|
|
chars.append(c)
|
|
|
|
char_widths.append(2)
|
|
|
|
elif ord(c) < 128:
|
|
|
|
chars.append(c)
|
|
|
|
char_widths.append(1)
|
|
|
|
elif unicodedata.category(c).startswith("C"):
|
|
|
|
c = r"\u%04x" % ord(c)
|
|
|
|
chars.append(c)
|
|
|
|
char_widths.append(len(c))
|
|
|
|
else:
|
|
|
|
chars.append(c)
|
|
|
|
char_widths.append(str_width(c))
|
2025-05-02 20:22:31 +02:00
|
|
|
|
|
|
|
if colors and colors[0].span.end == i: # current color ends now
|
2025-05-05 23:45:25 +02:00
|
|
|
post_color = theme.reset
|
2025-05-02 20:22:31 +02:00
|
|
|
colors.pop(0)
|
|
|
|
|
|
|
|
chars[-1] = pre_color + chars[-1] + post_color
|
|
|
|
pre_color = ""
|
|
|
|
post_color = ""
|
|
|
|
|
|
|
|
if colors and colors[0].span.start < i and colors[0].span.end > i:
|
|
|
|
# even though the current color should be continued, reset it for now.
|
|
|
|
# the next call to `disp_str()` will revive it.
|
2025-05-05 23:45:25 +02:00
|
|
|
chars[-1] += theme.reset
|
2025-05-02 20:22:31 +02:00
|
|
|
|
2025-03-21 18:27:35 +01:00
|
|
|
return chars, char_widths
|
2025-05-02 20:22:31 +02:00
|
|
|
|
|
|
|
|
|
|
|
def prev_next_window[T](
|
|
|
|
iterable: Iterable[T]
|
|
|
|
) -> Iterator[tuple[T | None, ...]]:
|
|
|
|
"""Generates three-tuples of (previous, current, next) items.
|
|
|
|
|
|
|
|
On the first iteration previous is None. On the last iteration next
|
|
|
|
is None. In case of exception next is None and the exception is re-raised
|
|
|
|
on a subsequent next() call.
|
|
|
|
|
|
|
|
Inspired by `sliding_window` from `itertools` recipes.
|
|
|
|
"""
|
|
|
|
|
|
|
|
iterator = iter(iterable)
|
|
|
|
window = deque((None, next(iterator)), maxlen=3)
|
|
|
|
try:
|
|
|
|
for x in iterator:
|
|
|
|
window.append(x)
|
|
|
|
yield tuple(window)
|
|
|
|
except Exception:
|
|
|
|
raise
|
|
|
|
finally:
|
|
|
|
window.append(None)
|
|
|
|
yield tuple(window)
|