"""Pretty printing for one-shot sampling stack dumps.""" import contextlib import linecache import os import sys from traceback import _byte_offset_to_character_offset import _colorize from .collector import extract_lineno, filter_internal_frames, iter_async_frames from .constants import ( THREAD_STATUS_GIL_REQUESTED, THREAD_STATUS_HAS_EXCEPTION, THREAD_STATUS_HAS_GIL, THREAD_STATUS_MAIN_THREAD, THREAD_STATUS_ON_CPU, THREAD_STATUS_UNKNOWN, ) from .opcode_utils import format_opcode _STATUS_LABELS = ( (THREAD_STATUS_MAIN_THREAD, "main thread"), (THREAD_STATUS_HAS_GIL, "has GIL"), (THREAD_STATUS_ON_CPU, "on CPU"), (THREAD_STATUS_GIL_REQUESTED, "waiting for GIL"), (THREAD_STATUS_HAS_EXCEPTION, "has exception"), ) def _theme_for(file, colorize): if colorize is True: return _colorize.get_theme(force_color=True).profiler_dump if colorize is False: return _colorize.get_theme(force_no_color=True).profiler_dump return _colorize.get_theme(tty_file=file).profiler_dump def _color(text, color, theme): if not color: return text return f"{color}{text}{theme.reset}" def _frame_fields(frame): if isinstance(frame, tuple): filename = frame[0] if len(frame) > 0 else "" location = frame[1] if len(frame) > 1 else None qualname = frame[2] if len(frame) > 2 else "" opcode = frame[3] if len(frame) > 3 else None else: filename = getattr(frame, "filename", "") location = getattr(frame, "location", None) qualname = getattr(frame, "qualname", None) if qualname is None: qualname = getattr(frame, "funcname", "") opcode = getattr(frame, "opcode", None) return filename, location, qualname, opcode def _location_field(location, index, default=None): if location is None: return default try: value = location[index] except (IndexError, TypeError): return default return default if value is None else value def _status_text(status): labels = [label for flag, label in _STATUS_LABELS if status & flag] has_state = status & ( THREAD_STATUS_HAS_GIL | THREAD_STATUS_GIL_REQUESTED | THREAD_STATUS_HAS_EXCEPTION ) if not has_state and not status & (THREAD_STATUS_UNKNOWN | THREAD_STATUS_ON_CPU): labels.append("idle") return ", ".join(labels) if labels else None def _is_async_dump(stack_frames): return bool(stack_frames) and hasattr(stack_frames[0], "awaited_by") def _iter_dump_sections(stack_frames): if not stack_frames: return if _is_async_dump(stack_frames): for frames, thread_id, leaf_id in iter_async_frames(stack_frames): frames = filter_internal_frames(frames) if frames: yield None, thread_id, None, frames, f"task {leaf_id}" return for interpreter_info in stack_frames: interpreter_id = getattr(interpreter_info, "interpreter_id", None) for thread_info in getattr(interpreter_info, "threads", ()): frames = getattr(thread_info, "frame_info", None) or [] frames = filter_internal_frames(frames) yield ( interpreter_id, getattr(thread_info, "thread_id", None), getattr(thread_info, "status", None), frames, None, ) def _display_filename(filename): if not filename or filename == "~": return filename with contextlib.suppress(ValueError): relpath = os.path.relpath(filename) if not relpath.startswith(".." + os.sep) and relpath != "..": return relpath return filename def _format_frame(frame, theme): filename, location, qualname, opcode = _frame_fields(frame) source_filename = filename lineno = extract_lineno(location) qualname_part = _color(qualname, theme.frame, theme) if filename == "~" and lineno == 0: line = f" {qualname_part}" else: filename = _display_filename(filename) if filename: file_part = _color(f'"{filename}"', theme.filename, theme) if lineno > 0: line_part = _color(str(lineno), theme.line_no, theme) line = f" File {file_part}, line {line_part}, in {qualname_part}" else: line = f" File {file_part}, in {qualname_part}" else: line = f" {qualname_part}" if opcode is not None: line = f"{line} {_color(f'opcode={format_opcode(opcode)}', theme.opcode, theme)}" lines = [line] source = _source_line(source_filename, location, lineno, theme) if source: lines.append(f" {source}") return lines def _source_offsets(line, location, lineno): end_lineno = _location_field(location, 1, lineno) col_offset = _location_field(location, 2, -1) end_col_offset = _location_field(location, 3, -1) if col_offset < 0 or end_col_offset < 0 or end_lineno < lineno: return None start = _byte_offset_to_character_offset(line, col_offset) if end_lineno == lineno: end = _byte_offset_to_character_offset(line, end_col_offset) else: end = len(line) if start < 0 or end <= start: return None return start, end def _trim_source_line(line, offsets): stripped = line.lstrip() leading = len(line) - len(stripped) if offsets is None: return stripped, None start, end = offsets start = max(start - leading, 0) end = max(end - leading, start + 1) end = min(end, len(stripped)) return stripped, (start, end) def _highlight_source_line(line, offsets, theme): if offsets is None or offsets[1] <= offsets[0]: return _color(line, theme.source, theme) start, end = offsets parts = [] if line[:start]: parts.append(_color(line[:start], theme.source, theme)) parts.append(_color(line[start:end], theme.source_highlight, theme)) if line[end:]: parts.append(_color(line[end:], theme.source, theme)) return "".join(parts) def _source_line(filename, location, lineno, theme): if not filename or filename == "~" or lineno <= 0: return None line = linecache.getline(filename, lineno).removesuffix("\n") if not line: return None offsets = _source_offsets(line, location, lineno) line, offsets = _trim_source_line(line, offsets) if not line: return None return _highlight_source_line(line, offsets, theme) def _section_header( *, pid, interpreter_id, thread_id, status, label, show_pid, show_interpreter, theme, ): subject = "Stack dump" if show_pid and pid is not None: subject = f"{subject} for PID {pid}" if thread_id is not None: subject = f"{subject}, thread {thread_id}" details = [] if show_interpreter and interpreter_id is not None: details.append(f"interpreter {interpreter_id}") if label: details.append(label) if status is not None: status_text = _status_text(status) if status_text: details.append(status_text) suffix = "most recent call last" if details: suffix = f"{'; '.join(details)}; {suffix}" return _color(f"{subject} ({suffix}):", theme.header, theme) def format_stack_dump(stack_frames, *, pid=None, file=None, colorize=None): """Return a formatted one-shot stack dump.""" if file is None: file = sys.stdout theme = _theme_for(file, colorize) lines = [] sections = list(_iter_dump_sections(stack_frames)) if not sections: if pid is None: return f"{_color('No Python stacks found', theme.warning, theme)}\n" return f"{_color(f'No Python stacks found for PID {pid}', theme.warning, theme)}\n" interpreter_ids = { interpreter_id for interpreter_id, _thread_id, _status, _frames, _label in sections if interpreter_id is not None } show_interpreter = len(interpreter_ids) > 1 for section_index, (interpreter_id, thread_id, status, frames, label) in enumerate(sections): if section_index: lines.append("") lines.append( _section_header( pid=pid, interpreter_id=interpreter_id, thread_id=thread_id, status=status, label=label, show_pid=section_index == 0, show_interpreter=show_interpreter, theme=theme, ) ) if not frames: lines.append(_color("No Python frames", theme.warning, theme)) continue for frame in reversed(frames): lines.extend(_format_frame(frame, theme)) return "\n".join(lines) + "\n" def print_stack_dump(stack_frames, *, pid=None, file=None, colorize=None): """Pretty-print a one-shot stack dump.""" if file is None: file = sys.stdout file.write(format_stack_dump(stack_frames, pid=pid, file=file, colorize=colorize))