gh-141645: Add a TUI mode to the new tachyon profiler (#141646)

This commit is contained in:
Pablo Galindo Salgado 2025-11-20 18:27:17 +00:00 committed by GitHub
parent e90061f5f1
commit b3383085f9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
17 changed files with 5519 additions and 60 deletions

View file

@ -1,20 +1,19 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from .constants import (
# Thread status flags THREAD_STATUS_HAS_GIL,
try: THREAD_STATUS_ON_CPU,
from _remote_debugging import THREAD_STATUS_HAS_GIL, THREAD_STATUS_ON_CPU, THREAD_STATUS_UNKNOWN, THREAD_STATUS_GIL_REQUESTED THREAD_STATUS_UNKNOWN,
except ImportError: THREAD_STATUS_GIL_REQUESTED,
# Fallback for tests or when module is not available )
THREAD_STATUS_HAS_GIL = (1 << 0)
THREAD_STATUS_ON_CPU = (1 << 1)
THREAD_STATUS_UNKNOWN = (1 << 2)
THREAD_STATUS_GIL_REQUESTED = (1 << 3)
class Collector(ABC): class Collector(ABC):
@abstractmethod @abstractmethod
def collect(self, stack_frames): def collect(self, stack_frames):
"""Collect profiling data from stack frames.""" """Collect profiling data from stack frames."""
def collect_failed_sample(self):
"""Collect data about a failed sample attempt."""
@abstractmethod @abstractmethod
def export(self, filename): def export(self, filename):
"""Export collected data to a file.""" """Export collected data to a file."""

View file

@ -0,0 +1,30 @@
"""Constants for the sampling profiler."""
# Profiling mode constants
PROFILING_MODE_WALL = 0
PROFILING_MODE_CPU = 1
PROFILING_MODE_GIL = 2
PROFILING_MODE_ALL = 3 # Combines GIL + CPU checks
# Sort mode constants
SORT_MODE_NSAMPLES = 0
SORT_MODE_TOTTIME = 1
SORT_MODE_CUMTIME = 2
SORT_MODE_SAMPLE_PCT = 3
SORT_MODE_CUMUL_PCT = 4
SORT_MODE_NSAMPLES_CUMUL = 5
# Thread status flags
try:
from _remote_debugging import (
THREAD_STATUS_HAS_GIL,
THREAD_STATUS_ON_CPU,
THREAD_STATUS_UNKNOWN,
THREAD_STATUS_GIL_REQUESTED,
)
except ImportError:
# Fallback for tests or when module is not available
THREAD_STATUS_HAS_GIL = (1 << 0)
THREAD_STATUS_ON_CPU = (1 << 1)
THREAD_STATUS_UNKNOWN = (1 << 2)
THREAD_STATUS_GIL_REQUESTED = (1 << 3)

View file

@ -0,0 +1,200 @@
"""Live profiling collector that displays top-like statistics using curses.
Target Python Process
(being profiled)
Stack sampling at
configured interval
(e.g., 10000µs)
LiveStatsCollector
collect() Aggregates samples
- Iterates frames into statistics
- Updates counters
Data Storage
- result dict Tracks per-function:
- direct_calls Direct samples
- cumulative_calls Cumulative samples
Derived time stats
Display Update
(10Hz by default) Rate-limited refresh
DisplayInterface
(Abstract layer)
CursesDisplay MockDisplay
- Real terminal - Testing
- ncurses backend - No UI
Widget-Based Rendering
HeaderWidget
PID, uptime, time, interval
Sample stats & progress bar
Efficiency bar
Thread status & GC stats
Function summary
Top 3 hottest functions
TableWidget
Column headers (sortable) Interactive display
Stats rows (scrolling) with keyboard controls:
- nsamples % time s: sort, p: pause
- function file:line r: reset, /: filter
q: quit, h: help
FooterWidget
Legend and status
Filter input prompt
Architecture:
The live collector is organized into four layers. The data collection layer
(LiveStatsCollector) aggregates stack samples into per-function statistics without
any knowledge of how they will be presented. The display abstraction layer
(DisplayInterface) defines rendering operations without coupling to curses or any
specific UI framework. The widget layer (Widget, HeaderWidget, TableWidget,
FooterWidget, HelpWidget, ProgressBarWidget) encapsulates individual UI components
with their own rendering logic, promoting modularity and reusability. The
presentation layer (CursesDisplay/MockDisplay) implements the actual rendering for
terminal output and testing.
The system runs two independent update loops. The sampling loop is driven by the
profiler at the configured interval (e.g., 10000µs) and continuously collects
stack frames and updates statistics. The display loop runs at a fixed refresh rate
(default 10Hz) and updates the terminal independently of sampling frequency. This
separation allows high-frequency sampling without overwhelming the terminal with
constant redraws.
Statistics are computed incrementally as samples arrive. The collector maintains
running counters (direct calls and cumulative calls) in a dictionary keyed by
function location. Derived metrics like time estimates and percentages are computed
on-demand during display updates rather than being stored, which minimizes memory
overhead as the number of tracked functions grows.
User input is processed asynchronously during display updates using non-blocking I/O.
This allows interactive controls (sorting, filtering, pausing) without interrupting
the data collection pipeline. The collector maintains mode flags (paused,
filter_input_mode) that affect what gets displayed but not what gets collected.
"""
# Re-export all public classes and constants for backward compatibility
from .collector import LiveStatsCollector
from .display import DisplayInterface, CursesDisplay, MockDisplay
from .widgets import (
Widget,
ProgressBarWidget,
HeaderWidget,
TableWidget,
FooterWidget,
HelpWidget,
)
from .constants import (
MICROSECONDS_PER_SECOND,
DISPLAY_UPDATE_HZ,
DISPLAY_UPDATE_INTERVAL,
MIN_TERMINAL_WIDTH,
MIN_TERMINAL_HEIGHT,
WIDTH_THRESHOLD_SAMPLE_PCT,
WIDTH_THRESHOLD_TOTTIME,
WIDTH_THRESHOLD_CUMUL_PCT,
WIDTH_THRESHOLD_CUMTIME,
HEADER_LINES,
FOOTER_LINES,
SAFETY_MARGIN,
TOP_FUNCTIONS_DISPLAY_COUNT,
COL_WIDTH_NSAMPLES,
COL_SPACING,
COL_WIDTH_SAMPLE_PCT,
COL_WIDTH_TIME,
MIN_FUNC_NAME_WIDTH,
MAX_FUNC_NAME_WIDTH,
MIN_AVAILABLE_SPACE,
MIN_BAR_WIDTH,
MAX_SAMPLE_RATE_BAR_WIDTH,
MAX_EFFICIENCY_BAR_WIDTH,
MIN_SAMPLE_RATE_FOR_SCALING,
FINISHED_BANNER_EXTRA_LINES,
COLOR_PAIR_HEADER_BG,
COLOR_PAIR_CYAN,
COLOR_PAIR_YELLOW,
COLOR_PAIR_GREEN,
COLOR_PAIR_MAGENTA,
COLOR_PAIR_RED,
COLOR_PAIR_SORTED_HEADER,
DEFAULT_SORT_BY,
DEFAULT_DISPLAY_LIMIT,
)
__all__ = [
# Main collector
"LiveStatsCollector",
# Display interfaces
"DisplayInterface",
"CursesDisplay",
"MockDisplay",
# Widgets
"Widget",
"ProgressBarWidget",
"HeaderWidget",
"TableWidget",
"FooterWidget",
"HelpWidget",
# Constants
"MICROSECONDS_PER_SECOND",
"DISPLAY_UPDATE_HZ",
"DISPLAY_UPDATE_INTERVAL",
"MIN_TERMINAL_WIDTH",
"MIN_TERMINAL_HEIGHT",
"WIDTH_THRESHOLD_SAMPLE_PCT",
"WIDTH_THRESHOLD_TOTTIME",
"WIDTH_THRESHOLD_CUMUL_PCT",
"WIDTH_THRESHOLD_CUMTIME",
"HEADER_LINES",
"FOOTER_LINES",
"SAFETY_MARGIN",
"TOP_FUNCTIONS_DISPLAY_COUNT",
"COL_WIDTH_NSAMPLES",
"COL_SPACING",
"COL_WIDTH_SAMPLE_PCT",
"COL_WIDTH_TIME",
"MIN_FUNC_NAME_WIDTH",
"MAX_FUNC_NAME_WIDTH",
"MIN_AVAILABLE_SPACE",
"MIN_BAR_WIDTH",
"MAX_SAMPLE_RATE_BAR_WIDTH",
"MAX_EFFICIENCY_BAR_WIDTH",
"MIN_SAMPLE_RATE_FOR_SCALING",
"FINISHED_BANNER_EXTRA_LINES",
"COLOR_PAIR_HEADER_BG",
"COLOR_PAIR_CYAN",
"COLOR_PAIR_YELLOW",
"COLOR_PAIR_GREEN",
"COLOR_PAIR_MAGENTA",
"COLOR_PAIR_RED",
"COLOR_PAIR_SORTED_HEADER",
"DEFAULT_SORT_BY",
"DEFAULT_DISPLAY_LIMIT",
]

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,59 @@
"""Constants for the live profiling collector."""
# Time conversion constants
MICROSECONDS_PER_SECOND = 1_000_000
# Display update constants
DISPLAY_UPDATE_HZ = 10
DISPLAY_UPDATE_INTERVAL = 1.0 / DISPLAY_UPDATE_HZ # 0.1 seconds
# Terminal size constraints
MIN_TERMINAL_WIDTH = 60
MIN_TERMINAL_HEIGHT = 12
# Column width thresholds
WIDTH_THRESHOLD_SAMPLE_PCT = 80
WIDTH_THRESHOLD_TOTTIME = 100
WIDTH_THRESHOLD_CUMUL_PCT = 120
WIDTH_THRESHOLD_CUMTIME = 140
# Display layout constants
HEADER_LINES = 10 # Increased to include thread status line
FOOTER_LINES = 2
SAFETY_MARGIN = 1
TOP_FUNCTIONS_DISPLAY_COUNT = 3
# Column widths for data display
COL_WIDTH_NSAMPLES = 13
COL_SPACING = 2
COL_WIDTH_SAMPLE_PCT = 5
COL_WIDTH_TIME = 10
# Function name display
MIN_FUNC_NAME_WIDTH = 10
MAX_FUNC_NAME_WIDTH = 40
MIN_AVAILABLE_SPACE = 10
# Progress bar display
MIN_BAR_WIDTH = 10
MAX_SAMPLE_RATE_BAR_WIDTH = 30
MAX_EFFICIENCY_BAR_WIDTH = 60
# Sample rate scaling
MIN_SAMPLE_RATE_FOR_SCALING = 100
# Finished banner display
FINISHED_BANNER_EXTRA_LINES = 3 # Blank line + banner + blank line
# Color pair IDs
COLOR_PAIR_HEADER_BG = 4
COLOR_PAIR_CYAN = 5
COLOR_PAIR_YELLOW = 6
COLOR_PAIR_GREEN = 7
COLOR_PAIR_MAGENTA = 8
COLOR_PAIR_RED = 9
COLOR_PAIR_SORTED_HEADER = 10
# Default display settings
DEFAULT_SORT_BY = "nsamples" # Number of samples in leaf (self time)
DEFAULT_DISPLAY_LIMIT = 20

View file

@ -0,0 +1,236 @@
"""Display interface abstractions for the live profiling collector."""
import contextlib
import curses
from abc import ABC, abstractmethod
class DisplayInterface(ABC):
"""Abstract interface for display operations to enable testing."""
@abstractmethod
def get_dimensions(self):
"""Get terminal dimensions as (height, width)."""
pass
@abstractmethod
def clear(self):
"""Clear the screen."""
pass
@abstractmethod
def refresh(self):
"""Refresh the screen to show changes."""
pass
@abstractmethod
def redraw(self):
"""Redraw the entire window."""
pass
@abstractmethod
def add_str(self, line, col, text, attr=0):
"""Add a string at the specified position."""
pass
@abstractmethod
def get_input(self):
"""Get a character from input (non-blocking). Returns -1 if no input."""
pass
@abstractmethod
def set_nodelay(self, flag):
"""Set non-blocking mode for input."""
pass
@abstractmethod
def has_colors(self):
"""Check if terminal supports colors."""
pass
@abstractmethod
def init_color_pair(self, pair_id, fg, bg):
"""Initialize a color pair."""
pass
@abstractmethod
def get_color_pair(self, pair_id):
"""Get a color pair attribute."""
pass
@abstractmethod
def get_attr(self, name):
"""Get a display attribute by name (e.g., 'A_BOLD', 'A_REVERSE')."""
pass
class CursesDisplay(DisplayInterface):
"""Real curses display implementation."""
def __init__(self, stdscr):
self.stdscr = stdscr
def get_dimensions(self):
return self.stdscr.getmaxyx()
def clear(self):
self.stdscr.clear()
def refresh(self):
self.stdscr.refresh()
def redraw(self):
self.stdscr.redrawwin()
def add_str(self, line, col, text, attr=0):
try:
height, width = self.get_dimensions()
if 0 <= line < height and 0 <= col < width:
max_len = width - col - 1
if len(text) > max_len:
text = text[:max_len]
self.stdscr.addstr(line, col, text, attr)
except curses.error:
pass
def get_input(self):
try:
return self.stdscr.getch()
except (KeyError, curses.error):
return -1
def set_nodelay(self, flag):
self.stdscr.nodelay(flag)
def has_colors(self):
return curses.has_colors()
def init_color_pair(self, pair_id, fg, bg):
try:
curses.init_pair(pair_id, fg, bg)
except curses.error:
pass
def get_color_pair(self, pair_id):
return curses.color_pair(pair_id)
def get_attr(self, name):
return getattr(curses, name, 0)
class MockDisplay(DisplayInterface):
"""Mock display for testing."""
def __init__(self, height=40, width=160):
self.height = height
self.width = width
self.buffer = {}
self.cleared = False
self.refreshed = False
self.redrawn = False
self.input_queue = []
self.nodelay_flag = True
self.colors_supported = True
self.color_pairs = {}
def get_dimensions(self):
return (self.height, self.width)
def clear(self):
self.buffer.clear()
self.cleared = True
def refresh(self):
self.refreshed = True
def redraw(self):
self.redrawn = True
def add_str(self, line, col, text, attr=0):
if 0 <= line < self.height and 0 <= col < self.width:
max_len = self.width - col - 1
if len(text) > max_len:
text = text[:max_len]
self.buffer[(line, col)] = (text, attr)
def get_input(self):
if self.input_queue:
return self.input_queue.pop(0)
return -1
def set_nodelay(self, flag):
self.nodelay_flag = flag
def has_colors(self):
return self.colors_supported
def init_color_pair(self, pair_id, fg, bg):
self.color_pairs[pair_id] = (fg, bg)
def get_color_pair(self, pair_id):
return pair_id << 8
def get_attr(self, name):
attrs = {
"A_NORMAL": 0,
"A_BOLD": 1 << 16,
"A_REVERSE": 1 << 17,
"A_UNDERLINE": 1 << 18,
"A_DIM": 1 << 19,
}
return attrs.get(name, 0)
def simulate_input(self, char):
"""Helper method for tests to simulate keyboard input."""
self.input_queue.append(char)
def get_text_at(self, line, col):
"""Helper method for tests to inspect buffer content."""
if (line, col) in self.buffer:
return self.buffer[(line, col)][0]
return None
def get_all_lines(self):
"""Get all display content as a list of lines (for testing)."""
if not self.buffer:
return []
max_line = max(pos[0] for pos in self.buffer.keys())
lines = []
for line_num in range(max_line + 1):
line_parts = []
for col in range(self.width):
if (line_num, col) in self.buffer:
text, _ = self.buffer[(line_num, col)]
line_parts.append((col, text))
# Reconstruct line from parts
if line_parts:
line_parts.sort(key=lambda x: x[0])
line = ""
last_col = 0
for col, text in line_parts:
if col > last_col:
line += " " * (col - last_col)
line += text
last_col = col + len(text)
lines.append(line.rstrip())
else:
lines.append("")
# Remove trailing empty lines
while lines and not lines[-1]:
lines.pop()
return lines
def find_text(self, pattern):
"""Find text matching pattern in buffer (for testing). Returns (line, col) or None."""
for (line, col), (text, _) in self.buffer.items():
if pattern in text:
return (line, col)
return None
def contains_text(self, text):
"""Check if display contains the given text anywhere (for testing)."""
return self.find_text(text) is not None

View file

@ -0,0 +1,157 @@
"""TrendTracker - Encapsulated trend tracking for live profiling metrics.
This module provides trend tracking functionality for profiling metrics,
calculating direction indicators (up/down/stable) and managing associated
visual attributes like colors.
"""
import curses
from typing import Dict, Literal, Any
TrendDirection = Literal["up", "down", "stable"]
class TrendTracker:
"""
Tracks metric trends over time and provides visual indicators.
This class encapsulates all logic for:
- Tracking previous values of metrics
- Calculating trend directions (up/down/stable)
- Determining visual attributes (colors) for trends
- Managing enable/disable state
Example:
tracker = TrendTracker(colors_dict)
tracker.update("func1", "nsamples", 10)
trend = tracker.get_trend("func1", "nsamples")
color = tracker.get_color("func1", "nsamples")
"""
# Threshold for determining if a value has changed significantly
CHANGE_THRESHOLD = 0.001
def __init__(self, colors: Dict[str, int], enabled: bool = True):
"""
Initialize the trend tracker.
Args:
colors: Dictionary containing color attributes including
'trend_up', 'trend_down', 'trend_stable'
enabled: Whether trend tracking is initially enabled
"""
self._previous_values: Dict[Any, Dict[str, float]] = {}
self._enabled = enabled
self._colors = colors
@property
def enabled(self) -> bool:
"""Whether trend tracking is enabled."""
return self._enabled
def toggle(self) -> bool:
"""
Toggle trend tracking on/off.
Returns:
New enabled state
"""
self._enabled = not self._enabled
return self._enabled
def set_enabled(self, enabled: bool) -> None:
"""Set trend tracking enabled state."""
self._enabled = enabled
def update(self, key: Any, metric: str, value: float) -> TrendDirection:
"""
Update a metric value and calculate its trend.
Args:
key: Identifier for the entity (e.g., function)
metric: Name of the metric (e.g., 'nsamples', 'tottime')
value: Current value of the metric
Returns:
Trend direction: 'up', 'down', or 'stable'
"""
# Initialize storage for this key if needed
if key not in self._previous_values:
self._previous_values[key] = {}
# Get previous value, defaulting to current if not tracked yet
prev_value = self._previous_values[key].get(metric, value)
# Calculate trend
if value > prev_value + self.CHANGE_THRESHOLD:
trend = "up"
elif value < prev_value - self.CHANGE_THRESHOLD:
trend = "down"
else:
trend = "stable"
# Update previous value for next iteration
self._previous_values[key][metric] = value
return trend
def get_trend(self, key: Any, metric: str) -> TrendDirection:
"""
Get the current trend for a metric without updating.
Args:
key: Identifier for the entity
metric: Name of the metric
Returns:
Trend direction, or 'stable' if not tracked
"""
# This would require storing trends separately, which we don't do
# For now, return stable if not found
return "stable"
def get_color(self, trend: TrendDirection) -> int:
"""
Get the color attribute for a trend direction.
Args:
trend: The trend direction
Returns:
Curses color attribute (or A_NORMAL if disabled)
"""
if not self._enabled:
return curses.A_NORMAL
if trend == "up":
return self._colors.get("trend_up", curses.A_BOLD)
elif trend == "down":
return self._colors.get("trend_down", curses.A_BOLD)
else: # stable
return self._colors.get("trend_stable", curses.A_NORMAL)
def update_metrics(self, key: Any, metrics: Dict[str, float]) -> Dict[str, TrendDirection]:
"""
Update multiple metrics at once and get their trends.
Args:
key: Identifier for the entity
metrics: Dictionary of metric_name -> value
Returns:
Dictionary of metric_name -> trend_direction
"""
trends = {}
for metric, value in metrics.items():
trends[metric] = self.update(key, metric, value)
return trends
def clear(self) -> None:
"""Clear all tracked values (useful on stats reset)."""
self._previous_values.clear()
def __repr__(self) -> str:
"""String representation for debugging."""
status = "enabled" if self._enabled else "disabled"
tracked = len(self._previous_values)
return f"TrendTracker({status}, tracking {tracked} entities)"

View file

@ -0,0 +1,963 @@
"""Widget classes for the live profiling collector UI."""
import curses
import time
from abc import ABC, abstractmethod
from .constants import (
TOP_FUNCTIONS_DISPLAY_COUNT,
MIN_FUNC_NAME_WIDTH,
MAX_FUNC_NAME_WIDTH,
WIDTH_THRESHOLD_SAMPLE_PCT,
WIDTH_THRESHOLD_TOTTIME,
WIDTH_THRESHOLD_CUMUL_PCT,
WIDTH_THRESHOLD_CUMTIME,
MICROSECONDS_PER_SECOND,
DISPLAY_UPDATE_INTERVAL,
MIN_BAR_WIDTH,
MAX_SAMPLE_RATE_BAR_WIDTH,
MAX_EFFICIENCY_BAR_WIDTH,
MIN_SAMPLE_RATE_FOR_SCALING,
FOOTER_LINES,
FINISHED_BANNER_EXTRA_LINES,
)
from ..constants import (
THREAD_STATUS_HAS_GIL,
THREAD_STATUS_ON_CPU,
THREAD_STATUS_UNKNOWN,
THREAD_STATUS_GIL_REQUESTED,
PROFILING_MODE_CPU,
PROFILING_MODE_GIL,
PROFILING_MODE_WALL,
)
class Widget(ABC):
"""Base class for UI widgets."""
def __init__(self, display, colors):
"""
Initialize widget.
Args:
display: DisplayInterface implementation
colors: Dictionary of color attributes
"""
self.display = display
self.colors = colors
@abstractmethod
def render(self, line, width, **kwargs):
"""
Render the widget starting at the given line.
Args:
line: Starting line number
width: Available width
**kwargs: Additional rendering parameters
Returns:
Next available line number after rendering
"""
pass
def add_str(self, line, col, text, attr=0):
"""Add a string to the display at the specified position."""
self.display.add_str(line, col, text, attr)
class ProgressBarWidget(Widget):
"""Reusable progress bar widget."""
def render(self, line, width, **kwargs):
"""Render is not used for progress bars - use render_bar instead."""
raise NotImplementedError("Use render_bar method instead")
def render_bar(
self, filled, total, max_width, fill_char="", empty_char=""
):
"""
Render a progress bar and return the bar string and its length.
Args:
filled: Current filled amount
total: Total amount (max value)
max_width: Maximum width for the bar
fill_char: Character to use for filled portion
empty_char: Character to use for empty portion
Returns:
Tuple of (bar_string, bar_length)
"""
bar_width = min(max_width, max_width)
normalized = min(filled / max(total, 1), 1.0)
bar_fill = int(normalized * bar_width)
bar = "["
for i in range(bar_width):
if i < bar_fill:
bar += fill_char
else:
bar += empty_char
bar += "]"
return bar, len(bar)
class HeaderWidget(Widget):
"""Widget for rendering the header section (lines 0-8)."""
def __init__(self, display, colors, collector):
"""
Initialize header widget.
Args:
display: DisplayInterface implementation
colors: Dictionary of color attributes
collector: Reference to LiveStatsCollector for accessing stats
"""
super().__init__(display, colors)
self.collector = collector
self.progress_bar = ProgressBarWidget(display, colors)
def render(self, line, width, **kwargs):
"""
Render the complete header section.
Args:
line: Starting line number
width: Available width
kwargs: Must contain 'elapsed' key
Returns:
Next available line number
"""
elapsed = kwargs["elapsed"]
line = self.draw_header_info(line, width, elapsed)
line = self.draw_sample_stats(line, width, elapsed)
line = self.draw_efficiency_bar(line, width)
line = self.draw_thread_status(line, width)
line = self.draw_function_stats(
line, width, kwargs.get("stats_list", [])
)
line = self.draw_top_functions(
line, width, kwargs.get("stats_list", [])
)
# Show prominent finished banner if profiling is complete
if self.collector.finished:
line = self.draw_finished_banner(line, width)
# Separator
A_DIM = self.display.get_attr("A_DIM")
separator = "" * (width - 1)
self.add_str(line, 0, separator[: width - 1], A_DIM)
line += 1
return line
def format_uptime(self, elapsed):
"""Format elapsed time as uptime string."""
uptime_sec = int(elapsed)
hours = uptime_sec // 3600
minutes = (uptime_sec % 3600) // 60
seconds = uptime_sec % 60
if hours > 0:
return f"{hours}h{minutes:02d}m{seconds:02d}s"
else:
return f"{minutes}m{seconds:02d}s"
def draw_header_info(self, line, width, elapsed):
"""Draw the header information line with PID, uptime, time, and interval."""
# Draw title
A_BOLD = self.display.get_attr("A_BOLD")
title = "Tachyon Profiler"
self.add_str(line, 0, title, A_BOLD | self.colors["cyan"])
line += 1
current_time = self.collector.current_time_display
uptime = self.format_uptime(elapsed)
# Calculate display refresh rate
refresh_hz = (
1.0 / self.collector._display_update_interval if self.collector._display_update_interval > 0 else 0
)
# Get current view mode and thread display
if self.collector.view_mode == "ALL":
thread_name = "ALL"
thread_color = self.colors["green"]
else:
# PER_THREAD mode
if self.collector.current_thread_index < len(
self.collector.thread_ids
):
thread_id = self.collector.thread_ids[
self.collector.current_thread_index
]
num_threads = len(self.collector.thread_ids)
thread_name = f"{thread_id} ({self.collector.current_thread_index + 1}/{num_threads})"
thread_color = self.colors["magenta"]
else:
thread_name = "ALL"
thread_color = self.colors["green"]
header_parts = [
("PID: ", curses.A_BOLD),
(f"{self.collector.pid}", self.colors["cyan"]),
("", curses.A_DIM),
("Thread: ", curses.A_BOLD),
(thread_name, thread_color),
("", curses.A_DIM),
("Uptime: ", curses.A_BOLD),
(uptime, self.colors["green"]),
("", curses.A_DIM),
("Time: ", curses.A_BOLD),
(current_time, self.colors["yellow"]),
("", curses.A_DIM),
("Interval: ", curses.A_BOLD),
(
f"{self.collector.sample_interval_usec}µs",
self.colors["magenta"],
),
("", curses.A_DIM),
("Display: ", curses.A_BOLD),
(f"{refresh_hz:.1f}Hz", self.colors["cyan"]),
]
col = 0
for text, attr in header_parts:
if col < width - 1:
self.add_str(line, col, text, attr)
col += len(text)
return line + 1
def format_rate_with_units(self, rate_hz):
"""Format a rate in Hz with appropriate units (Hz, KHz, MHz)."""
if rate_hz >= 1_000_000:
return f"{rate_hz / 1_000_000:.1f}MHz"
elif rate_hz >= 1_000:
return f"{rate_hz / 1_000:.1f}KHz"
else:
return f"{rate_hz:.1f}Hz"
def draw_sample_stats(self, line, width, elapsed):
"""Draw sample statistics with visual progress bar."""
sample_rate = (
self.collector.total_samples / elapsed if elapsed > 0 else 0
)
# Update max sample rate
if sample_rate > self.collector._max_sample_rate:
self.collector._max_sample_rate = sample_rate
col = 0
self.add_str(line, col, "Samples: ", curses.A_BOLD)
col += 9
self.add_str(
line,
col,
f"{self.collector.total_samples:>8}",
self.colors["cyan"],
)
col += 8
self.add_str(
line, col, f" total ({sample_rate:>7.1f}/s) ", curses.A_NORMAL
)
col += 23
# Draw sample rate bar
target_rate = (
MICROSECONDS_PER_SECOND / self.collector.sample_interval_usec
)
# Show current/target ratio with percentage
if sample_rate > 0 and target_rate > 0:
percentage = min((sample_rate / target_rate) * 100, 100)
current_formatted = self.format_rate_with_units(sample_rate)
target_formatted = self.format_rate_with_units(target_rate)
if percentage >= 99.5: # Show 100% when very close
rate_label = f" {current_formatted}/{target_formatted} (100%)"
else:
rate_label = f" {current_formatted}/{target_formatted} ({percentage:>4.1f}%)"
else:
target_formatted = self.format_rate_with_units(target_rate)
rate_label = f" target: {target_formatted}"
available_width = width - col - len(rate_label) - 3
if available_width >= MIN_BAR_WIDTH:
bar_width = min(MAX_SAMPLE_RATE_BAR_WIDTH, available_width)
# Use target rate as the reference, with a minimum for scaling
reference_rate = max(target_rate, MIN_SAMPLE_RATE_FOR_SCALING)
normalized_rate = min(sample_rate / reference_rate, 1.0)
bar_fill = int(normalized_rate * bar_width)
bar = "["
for i in range(bar_width):
bar += "" if i < bar_fill else ""
bar += "]"
self.add_str(line, col, bar, self.colors["green"])
col += len(bar)
if col + len(rate_label) < width - 1:
self.add_str(line, col + 1, rate_label, curses.A_DIM)
return line + 1
def draw_efficiency_bar(self, line, width):
"""Draw sample efficiency bar showing success/failure rates."""
success_pct = (
self.collector._successful_samples
/ max(1, self.collector.total_samples)
) * 100
failed_pct = (
self.collector._failed_samples
/ max(1, self.collector.total_samples)
) * 100
col = 0
self.add_str(line, col, "Efficiency:", curses.A_BOLD)
col += 11
label = f" {success_pct:>5.2f}% good, {failed_pct:>4.2f}% failed"
available_width = width - col - len(label) - 3
if available_width >= MIN_BAR_WIDTH:
bar_width = min(MAX_EFFICIENCY_BAR_WIDTH, available_width)
success_fill = int(
(
self.collector._successful_samples
/ max(1, self.collector.total_samples)
)
* bar_width
)
failed_fill = bar_width - success_fill
self.add_str(line, col, "[", curses.A_NORMAL)
col += 1
if success_fill > 0:
self.add_str(
line, col, "" * success_fill, self.colors["green"]
)
col += success_fill
if failed_fill > 0:
self.add_str(line, col, "" * failed_fill, self.colors["red"])
col += failed_fill
self.add_str(line, col, "]", curses.A_NORMAL)
col += 1
self.add_str(line, col + 1, label, curses.A_NORMAL)
return line + 1
def _add_percentage_stat(
self, line, col, value, label, color, add_separator=False
):
"""Add a percentage stat to the display.
Args:
line: Line number
col: Starting column
value: Percentage value
label: Label text
color: Color attribute
add_separator: Whether to add separator before the stat
Returns:
Updated column position
"""
if add_separator:
self.add_str(line, col, "", curses.A_DIM)
col += 3
self.add_str(line, col, f"{value:>4.1f}", color)
col += 4
self.add_str(line, col, f"% {label}", curses.A_NORMAL)
col += len(label) + 2
return col
def draw_thread_status(self, line, width):
"""Draw thread status statistics and GC information."""
# Get status counts for current view mode
thread_data = self.collector._get_current_thread_data()
status_counts = thread_data.as_status_dict() if thread_data else self.collector._thread_status_counts
# Calculate percentages
total_threads = max(1, status_counts["total"])
pct_on_gil = (status_counts["has_gil"] / total_threads) * 100
pct_off_gil = 100.0 - pct_on_gil
pct_gil_requested = (status_counts["gil_requested"] / total_threads) * 100
# Get GC percentage based on view mode
if thread_data:
total_samples = max(1, thread_data.sample_count)
pct_gc = (thread_data.gc_frame_samples / total_samples) * 100
else:
total_samples = max(1, self.collector.total_samples)
pct_gc = (self.collector._gc_frame_samples / total_samples) * 100
col = 0
self.add_str(line, col, "Threads: ", curses.A_BOLD)
col += 11
# Show GIL stats only if mode is not GIL (GIL mode filters to only GIL holders)
if self.collector.mode != PROFILING_MODE_GIL:
col = self._add_percentage_stat(
line, col, pct_on_gil, "on gil", self.colors["green"]
)
col = self._add_percentage_stat(
line,
col,
pct_off_gil,
"off gil",
self.colors["red"],
add_separator=True,
)
# Show "waiting for gil" only if mode is not GIL
if self.collector.mode != PROFILING_MODE_GIL and col < width - 30:
col = self._add_percentage_stat(
line,
col,
pct_gil_requested,
"waiting for gil",
self.colors["yellow"],
add_separator=True,
)
# Always show GC stats
if col < width - 15:
col = self._add_percentage_stat(
line,
col,
pct_gc,
"GC",
self.colors["magenta"],
add_separator=(col > 11),
)
return line + 1
def draw_function_stats(self, line, width, stats_list):
"""Draw function statistics summary."""
result_set = self.collector._get_current_result_source()
total_funcs = len(result_set)
funcs_shown = len(stats_list)
executing_funcs = sum(
1 for f in result_set.values() if f.get("direct_calls", 0) > 0
)
stack_only = total_funcs - executing_funcs
col = 0
self.add_str(line, col, "Functions: ", curses.A_BOLD)
col += 11
self.add_str(line, col, f"{total_funcs:>5}", self.colors["cyan"])
col += 5
self.add_str(line, col, " total", curses.A_NORMAL)
col += 6
if col < width - 25:
self.add_str(line, col, "", curses.A_DIM)
col += 3
self.add_str(
line, col, f"{executing_funcs:>5}", self.colors["green"]
)
col += 5
self.add_str(line, col, " exec", curses.A_NORMAL)
col += 5
if col < width - 25:
self.add_str(line, col, "", curses.A_DIM)
col += 3
self.add_str(line, col, f"{stack_only:>5}", self.colors["yellow"])
col += 5
self.add_str(line, col, " stack", curses.A_NORMAL)
col += 6
if col < width - 20:
self.add_str(line, col, "", curses.A_DIM)
col += 3
self.add_str(
line, col, f"{funcs_shown:>5}", self.colors["magenta"]
)
col += 5
self.add_str(line, col, " shown", curses.A_NORMAL)
return line + 1
def draw_top_functions(self, line, width, stats_list):
"""Draw top N hottest functions."""
col = 0
self.add_str(
line,
col,
f"Top {TOP_FUNCTIONS_DISPLAY_COUNT}: ",
curses.A_BOLD,
)
col += 11
top_by_samples = sorted(
stats_list, key=lambda x: x["direct_calls"], reverse=True
)
emojis = ["🥇", "🥈", "🥉"]
medal_colors = [
self.colors["red"],
self.colors["yellow"],
self.colors["green"],
]
displayed = 0
for func_data in top_by_samples:
if displayed >= TOP_FUNCTIONS_DISPLAY_COUNT:
break
if col >= width - 20:
break
if func_data["direct_calls"] == 0:
continue
func_name = func_data["func"][2]
func_pct = (
func_data["direct_calls"]
/ max(1, self.collector.total_samples)
) * 100
# Medal emoji
if col + 3 < width - 15:
self.add_str(
line, col, emojis[displayed] + " ", medal_colors[displayed]
)
col += 3
# Function name (truncate to fit)
available_for_name = width - col - 15
max_name_len = min(25, max(5, available_for_name))
if len(func_name) > max_name_len:
func_name = func_name[: max_name_len - 3] + "..."
if col + len(func_name) < width - 10:
self.add_str(line, col, func_name, medal_colors[displayed])
col += len(func_name)
pct_str = (
f" ({func_pct:.1f}%)"
if func_pct >= 0.1
else f" ({func_data['direct_calls']})"
)
self.add_str(line, col, pct_str, curses.A_DIM)
col += len(pct_str)
displayed += 1
if displayed < 3 and col < width - 30:
self.add_str(line, col, "", curses.A_DIM)
col += 3
if displayed == 0 and col < width - 25:
self.add_str(line, col, "(collecting samples...)", curses.A_DIM)
return line + 1
def draw_finished_banner(self, line, width):
"""Draw a prominent banner when profiling is finished."""
A_REVERSE = self.display.get_attr("A_REVERSE")
A_BOLD = self.display.get_attr("A_BOLD")
# Add blank line for separation
line += 1
# Create the banner message
message = " ✓ PROFILING COMPLETE - Final Results Below - Press 'q' to Quit "
# Center the message and fill the width with reverse video
if len(message) < width - 1:
padding_total = width - len(message) - 1
padding_left = padding_total // 2
padding_right = padding_total - padding_left
full_message = " " * padding_left + message + " " * padding_right
else:
full_message = message[: width - 1]
# Draw the banner with reverse video and bold
self.add_str(
line, 0, full_message, A_REVERSE | A_BOLD | self.colors["green"]
)
line += 1
# Add blank line for separation
line += 1
return line
class TableWidget(Widget):
"""Widget for rendering column headers and data rows."""
def __init__(self, display, colors, collector):
"""
Initialize table widget.
Args:
display: DisplayInterface implementation
colors: Dictionary of color attributes
collector: Reference to LiveStatsCollector for accessing stats
"""
super().__init__(display, colors)
self.collector = collector
def render(self, line, width, **kwargs):
"""
Render column headers and data rows.
Args:
line: Starting line number
width: Available width
kwargs: Must contain 'height' and 'stats_list' keys
Returns:
Next available line number
"""
height = kwargs["height"]
stats_list = kwargs["stats_list"]
# Draw column headers
line, show_sample_pct, show_tottime, show_cumul_pct, show_cumtime = (
self.draw_column_headers(line, width)
)
column_flags = (
show_sample_pct,
show_tottime,
show_cumul_pct,
show_cumtime,
)
# Draw data rows
line = self.draw_stats_rows(
line, height, width, stats_list, column_flags
)
return line
def draw_column_headers(self, line, width):
"""Draw column headers with sort indicators."""
col = 0
# Determine which columns to show based on width
show_sample_pct = width >= WIDTH_THRESHOLD_SAMPLE_PCT
show_tottime = width >= WIDTH_THRESHOLD_TOTTIME
show_cumul_pct = width >= WIDTH_THRESHOLD_CUMUL_PCT
show_cumtime = width >= WIDTH_THRESHOLD_CUMTIME
sorted_header = self.colors["sorted_header"]
normal_header = self.colors["normal_header"]
# Determine which column is sorted
sort_col = {
"nsamples": 0,
"sample_pct": 1,
"tottime": 2,
"cumul_pct": 3,
"cumtime": 4,
}.get(self.collector.sort_by, -1)
# Column 0: nsamples
attr = sorted_header if sort_col == 0 else normal_header
text = f"{'▼nsamples' if sort_col == 0 else 'nsamples':>13}"
self.add_str(line, col, text, attr)
col += 15
# Column 1: sample %
if show_sample_pct:
attr = sorted_header if sort_col == 1 else normal_header
text = f"{'%' if sort_col == 1 else '%':>5}"
self.add_str(line, col, text, attr)
col += 7
# Column 2: tottime
if show_tottime:
attr = sorted_header if sort_col == 2 else normal_header
text = f"{'▼tottime' if sort_col == 2 else 'tottime':>10}"
self.add_str(line, col, text, attr)
col += 12
# Column 3: cumul %
if show_cumul_pct:
attr = sorted_header if sort_col == 3 else normal_header
text = f"{'%' if sort_col == 3 else '%':>5}"
self.add_str(line, col, text, attr)
col += 7
# Column 4: cumtime
if show_cumtime:
attr = sorted_header if sort_col == 4 else normal_header
text = f"{'▼cumtime' if sort_col == 4 else 'cumtime':>10}"
self.add_str(line, col, text, attr)
col += 12
# Remaining headers
if col < width - 15:
remaining_space = width - col - 1
func_width = min(
MAX_FUNC_NAME_WIDTH,
max(MIN_FUNC_NAME_WIDTH, remaining_space // 2),
)
self.add_str(
line, col, f"{'function':<{func_width}}", normal_header
)
col += func_width + 2
if col < width - 10:
self.add_str(line, col, "file:line", normal_header)
return (
line + 1,
show_sample_pct,
show_tottime,
show_cumul_pct,
show_cumtime,
)
def draw_stats_rows(self, line, height, width, stats_list, column_flags):
"""Draw the statistics data rows."""
show_sample_pct, show_tottime, show_cumul_pct, show_cumtime = (
column_flags
)
# Get color attributes from the colors dict (already initialized)
color_samples = self.colors.get("color_samples", curses.A_NORMAL)
color_file = self.colors.get("color_file", curses.A_NORMAL)
color_func = self.colors.get("color_func", curses.A_NORMAL)
# Get trend tracker for color decisions
trend_tracker = self.collector._trend_tracker
for stat in stats_list:
if line >= height - FOOTER_LINES:
break
func = stat["func"]
direct_calls = stat["direct_calls"]
cumulative_calls = stat["cumulative_calls"]
total_time = stat["total_time"]
cumulative_time = stat["cumulative_time"]
trends = stat.get("trends", {})
sample_pct = (
(direct_calls / self.collector.total_samples * 100)
if self.collector.total_samples > 0
else 0
)
cum_pct = (
(cumulative_calls / self.collector.total_samples * 100)
if self.collector.total_samples > 0
else 0
)
# Helper function to get trend color for a specific column
def get_trend_color(column_name):
trend = trends.get(column_name, "stable")
if trend_tracker is not None:
return trend_tracker.get_color(trend)
return curses.A_NORMAL
filename, lineno, funcname = func[0], func[1], func[2]
samples_str = f"{direct_calls}/{cumulative_calls}"
col = 0
# Samples column - apply trend color based on nsamples trend
nsamples_color = get_trend_color("nsamples")
self.add_str(line, col, f"{samples_str:>13}", nsamples_color)
col += 15
# Sample % column
if show_sample_pct:
sample_pct_color = get_trend_color("sample_pct")
self.add_str(line, col, f"{sample_pct:>5.1f}", sample_pct_color)
col += 7
# Total time column
if show_tottime:
tottime_color = get_trend_color("tottime")
self.add_str(line, col, f"{total_time:>10.3f}", tottime_color)
col += 12
# Cumul % column
if show_cumul_pct:
cumul_pct_color = get_trend_color("cumul_pct")
self.add_str(line, col, f"{cum_pct:>5.1f}", cumul_pct_color)
col += 7
# Cumul time column
if show_cumtime:
cumtime_color = get_trend_color("cumtime")
self.add_str(line, col, f"{cumulative_time:>10.3f}", cumtime_color)
col += 12
# Function name column
if col < width - 15:
remaining_space = width - col - 1
func_width = min(
MAX_FUNC_NAME_WIDTH,
max(MIN_FUNC_NAME_WIDTH, remaining_space // 2),
)
func_display = funcname
if len(funcname) > func_width:
func_display = funcname[: func_width - 3] + "..."
func_display = f"{func_display:<{func_width}}"
self.add_str(line, col, func_display, color_func)
col += func_width + 2
# File:line column
if col < width - 10:
simplified_path = self.collector._simplify_path(filename)
file_line = f"{simplified_path}:{lineno}"
remaining_width = width - col - 1
self.add_str(
line, col, file_line[:remaining_width], color_file
)
line += 1
return line
class FooterWidget(Widget):
"""Widget for rendering the footer section (legend and controls)."""
def __init__(self, display, colors, collector):
"""
Initialize footer widget.
Args:
display: DisplayInterface implementation
colors: Dictionary of color attributes
collector: Reference to LiveStatsCollector for accessing state
"""
super().__init__(display, colors)
self.collector = collector
def render(self, line, width, **kwargs):
"""
Render the footer at the specified position.
Args:
line: Starting line number (should be height - 2)
width: Available width
Returns:
Next available line number
"""
A_DIM = self.display.get_attr("A_DIM")
A_BOLD = self.display.get_attr("A_BOLD")
# Legend line
legend = "nsamples: direct/cumulative (direct=executing, cumulative=on stack)"
self.add_str(line, 0, legend[: width - 1], A_DIM)
line += 1
# Controls line with status
sort_names = {
"tottime": "Total Time",
"nsamples": "Direct Samples",
"cumtime": "Cumulative Time",
"sample_pct": "Sample %",
"cumul_pct": "Cumulative %",
}
sort_display = sort_names.get(
self.collector.sort_by, self.collector.sort_by
)
# Build status indicators
status = []
if self.collector.finished:
status.append("[PROFILING FINISHED - Press 'q' to quit]")
elif self.collector.paused:
status.append("[PAUSED]")
if self.collector.filter_pattern:
status.append(
f"[Filter: {self.collector.filter_pattern} (c to clear)]"
)
# Show trend colors status if disabled
if self.collector._trend_tracker is not None and not self.collector._trend_tracker.enabled:
status.append("[Trend colors: OFF]")
status_str = " ".join(status) + " " if status else ""
if self.collector.finished:
footer = f"{status_str}"
else:
footer = f"{status_str}Sort: {sort_display} | 't':mode 'x':trends ←→:thread 'h':help 'q':quit"
self.add_str(
line,
0,
footer[: width - 1],
A_BOLD
if (self.collector.paused or self.collector.finished)
else A_DIM,
)
return line + 1
def render_filter_input_prompt(self, line, width):
"""Draw the filter input prompt at the bottom of the screen."""
A_BOLD = self.display.get_attr("A_BOLD")
A_REVERSE = self.display.get_attr("A_REVERSE")
# Draw prompt on last line
prompt = f"Function filter: {self.collector.filter_input_buffer}_"
self.add_str(line, 0, prompt[: width - 1], A_REVERSE | A_BOLD)
class HelpWidget(Widget):
"""Widget for rendering the help screen overlay."""
def render(self, line, width, **kwargs):
"""
Render the help screen.
Args:
line: Starting line number (ignored, help is centered)
width: Available width
kwargs: Must contain 'height' key
Returns:
Next available line number (not used for overlays)
"""
height = kwargs["height"]
A_BOLD = self.display.get_attr("A_BOLD")
A_NORMAL = self.display.get_attr("A_NORMAL")
help_lines = [
("Tachyon Profiler - Interactive Commands", A_BOLD),
("", A_NORMAL),
("Navigation & Display:", A_BOLD),
(" s - Cycle through sort modes (forward)", A_NORMAL),
(" S - Cycle through sort modes (backward)", A_NORMAL),
(" t - Toggle view mode (ALL / per-thread)", A_NORMAL),
(" x - Toggle trend colors (on/off)", A_NORMAL),
(" ← → ↑ ↓ - Navigate threads (in per-thread mode)", A_NORMAL),
(" + - Faster display refresh rate", A_NORMAL),
(" - - Slower display refresh rate", A_NORMAL),
("", A_NORMAL),
("Control:", A_BOLD),
(" p - Freeze display (snapshot)", A_NORMAL),
(" r - Reset all statistics", A_NORMAL),
("", A_NORMAL),
("Filtering:", A_BOLD),
(" / - Enter function filter (substring)", A_NORMAL),
(" c - Clear filter", A_NORMAL),
(" ESC - Cancel filter input", A_NORMAL),
("", A_NORMAL),
("Other:", A_BOLD),
(" h or ? - Show/hide this help", A_NORMAL),
(" q - Quit profiler", A_NORMAL),
("", A_NORMAL),
("Press any key to close this help screen", A_BOLD),
]
start_line = (height - len(help_lines)) // 2
for i, (text, attr) in enumerate(help_lines):
if start_line + i < height - 1:
col = 2 # Left-align with small margin
self.add_str(start_line + i, col, text[: width - 3], attr)
return line # Not used for overlays

View file

@ -14,15 +14,25 @@
from .pstats_collector import PstatsCollector from .pstats_collector import PstatsCollector
from .stack_collector import CollapsedStackCollector, FlamegraphCollector from .stack_collector import CollapsedStackCollector, FlamegraphCollector
from .gecko_collector import GeckoCollector from .gecko_collector import GeckoCollector
from .constants import (
PROFILING_MODE_WALL,
PROFILING_MODE_CPU,
PROFILING_MODE_GIL,
PROFILING_MODE_ALL,
SORT_MODE_NSAMPLES,
SORT_MODE_TOTTIME,
SORT_MODE_CUMTIME,
SORT_MODE_SAMPLE_PCT,
SORT_MODE_CUMUL_PCT,
SORT_MODE_NSAMPLES_CUMUL,
)
try:
from .live_collector import LiveStatsCollector
except ImportError:
LiveStatsCollector = None
_FREE_THREADED_BUILD = sysconfig.get_config_var("Py_GIL_DISABLED") is not None _FREE_THREADED_BUILD = sysconfig.get_config_var("Py_GIL_DISABLED") is not None
# Profiling mode constants
PROFILING_MODE_WALL = 0
PROFILING_MODE_CPU = 1
PROFILING_MODE_GIL = 2
PROFILING_MODE_ALL = 3 # Combines GIL + CPU checks
def _parse_mode(mode_string): def _parse_mode(mode_string):
"""Convert mode string to mode constant.""" """Convert mode string to mode constant."""
@ -42,6 +52,7 @@ def _parse_mode(mode_string):
- --pstats: Detailed profiling statistics with sorting options - --pstats: Detailed profiling statistics with sorting options
- --collapsed: Stack traces for generating flamegraphs - --collapsed: Stack traces for generating flamegraphs
- --flamegraph Interactive HTML flamegraph visualization (requires web browser) - --flamegraph Interactive HTML flamegraph visualization (requires web browser)
- --live: Live top-like statistics display using ncurses
Examples: Examples:
# Profile process 1234 for 10 seconds with default settings # Profile process 1234 for 10 seconds with default settings
@ -62,6 +73,9 @@ def _parse_mode(mode_string):
# Generate a HTML flamegraph # Generate a HTML flamegraph
python -m profiling.sampling --flamegraph -p 1234 python -m profiling.sampling --flamegraph -p 1234
# Display live top-like statistics (press 'q' to quit, 's' to cycle sort)
python -m profiling.sampling --live -p 1234
# Profile all threads, sort by total time # Profile all threads, sort by total time
python -m profiling.sampling -a --sort-tottime -p 1234 python -m profiling.sampling -a --sort-tottime -p 1234
@ -91,7 +105,7 @@ def _parse_mode(mode_string):
_RECV_BUFFER_SIZE = 1024 _RECV_BUFFER_SIZE = 1024
def _run_with_sync(original_cmd): def _run_with_sync(original_cmd, suppress_output=False):
"""Run a command with socket-based synchronization and return the process.""" """Run a command with socket-based synchronization and return the process."""
# Create a TCP socket for synchronization with better socket options # Create a TCP socket for synchronization with better socket options
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sync_sock: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sync_sock:
@ -110,7 +124,14 @@ def _run_with_sync(original_cmd):
cmd = (sys.executable, "-m", "profiling.sampling._sync_coordinator", str(sync_port), cwd) + tuple(target_args) cmd = (sys.executable, "-m", "profiling.sampling._sync_coordinator", str(sync_port), cwd) + tuple(target_args)
# Start the process with coordinator # Start the process with coordinator
process = subprocess.Popen(cmd) # Suppress stdout/stderr if requested (for live mode)
popen_kwargs = {}
if suppress_output:
popen_kwargs['stdin'] = subprocess.DEVNULL
popen_kwargs['stdout'] = subprocess.DEVNULL
popen_kwargs['stderr'] = subprocess.DEVNULL
process = subprocess.Popen(cmd, **popen_kwargs)
try: try:
# Wait for ready signal with timeout # Wait for ready signal with timeout
@ -168,6 +189,10 @@ def sample(self, collector, duration_sec=10):
last_realtime_update = start_time last_realtime_update = start_time
while running_time < duration_sec: while running_time < duration_sec:
# Check if live collector wants to stop
if hasattr(collector, 'running') and not collector.running:
break
current_time = time.perf_counter() current_time = time.perf_counter()
if next_time < current_time: if next_time < current_time:
try: try:
@ -177,6 +202,7 @@ def sample(self, collector, duration_sec=10):
duration_sec = current_time - start_time duration_sec = current_time - start_time
break break
except (RuntimeError, UnicodeDecodeError, MemoryError, OSError): except (RuntimeError, UnicodeDecodeError, MemoryError, OSError):
collector.collect_failed_sample()
errors += 1 errors += 1
except Exception as e: except Exception as e:
if not self._is_process_running(): if not self._is_process_running():
@ -213,16 +239,19 @@ def sample(self, collector, duration_sec=10):
sample_rate = num_samples / running_time sample_rate = num_samples / running_time
error_rate = (errors / num_samples) * 100 if num_samples > 0 else 0 error_rate = (errors / num_samples) * 100 if num_samples > 0 else 0
print(f"Captured {num_samples} samples in {running_time:.2f} seconds") # Don't print stats for live mode (curses is handling display)
print(f"Sample rate: {sample_rate:.2f} samples/sec") is_live_mode = LiveStatsCollector is not None and isinstance(collector, LiveStatsCollector)
print(f"Error rate: {error_rate:.2f}%") if not is_live_mode:
print(f"Captured {num_samples} samples in {running_time:.2f} seconds")
print(f"Sample rate: {sample_rate:.2f} samples/sec")
print(f"Error rate: {error_rate:.2f}%")
# Pass stats to flamegraph collector if it's the right type # Pass stats to flamegraph collector if it's the right type
if hasattr(collector, 'set_stats'): if hasattr(collector, 'set_stats'):
collector.set_stats(self.sample_interval_usec, running_time, sample_rate, error_rate) collector.set_stats(self.sample_interval_usec, running_time, sample_rate, error_rate)
expected_samples = int(duration_sec / sample_interval_sec) expected_samples = int(duration_sec / sample_interval_sec)
if num_samples < expected_samples: if num_samples < expected_samples and not is_live_mode:
print( print(
f"Warning: missed {expected_samples - num_samples} samples " f"Warning: missed {expected_samples - num_samples} samples "
f"from the expected total of {expected_samples} " f"from the expected total of {expected_samples} "
@ -648,10 +677,52 @@ def sample(
# Gecko format never skips idle threads to show full thread states # Gecko format never skips idle threads to show full thread states
collector = GeckoCollector(skip_idle=False) collector = GeckoCollector(skip_idle=False)
filename = filename or f"gecko.{pid}.json" filename = filename or f"gecko.{pid}.json"
case "live":
# Map sort value to sort_by string
sort_by_map = {
SORT_MODE_NSAMPLES: "nsamples",
SORT_MODE_TOTTIME: "tottime",
SORT_MODE_CUMTIME: "cumtime",
SORT_MODE_SAMPLE_PCT: "sample_pct",
SORT_MODE_CUMUL_PCT: "cumul_pct",
SORT_MODE_NSAMPLES_CUMUL: "cumul_pct",
}
sort_by = sort_by_map.get(sort, "tottime")
collector = LiveStatsCollector(
sample_interval_usec,
skip_idle=skip_idle,
sort_by=sort_by,
limit=limit or 20,
pid=pid,
mode=mode,
)
# Live mode is interactive, don't save file by default
# User can specify -o if they want to save stats
case _: case _:
raise ValueError(f"Invalid output format: {output_format}") raise ValueError(f"Invalid output format: {output_format}")
profiler.sample(collector, duration_sec) # For live mode, wrap sampling in curses
if output_format == "live":
import curses
def curses_wrapper_func(stdscr):
collector.init_curses(stdscr)
try:
profiler.sample(collector, duration_sec)
# Mark as finished and keep the TUI running until user presses 'q'
collector.mark_finished()
# Keep processing input until user quits
while collector.running:
collector._handle_input()
time.sleep(0.05) # Small sleep to avoid busy waiting
finally:
collector.cleanup_curses()
try:
curses.wrapper(curses_wrapper_func)
except KeyboardInterrupt:
pass
else:
profiler.sample(collector, duration_sec)
if output_format == "pstats" and not filename: if output_format == "pstats" and not filename:
stats = pstats.SampledStats(collector).strip_dirs() stats = pstats.SampledStats(collector).strip_dirs()
@ -663,38 +734,82 @@ def sample(
print_sampled_stats( print_sampled_stats(
stats, sort, limit, show_summary, sample_interval_usec stats, sort, limit, show_summary, sample_interval_usec
) )
else: elif output_format != "live":
# Live mode is interactive only, no export unless filename specified
collector.export(filename) collector.export(filename)
def _validate_collapsed_format_args(args, parser): def _validate_file_output_format_args(args, parser):
# Check for incompatible pstats options """Validate arguments when using file-based output formats.
File-based formats (--collapsed, --gecko, --flamegraph) generate raw stack
data or visualizations, not formatted statistics, so pstats display options
are not applicable.
"""
invalid_opts = [] invalid_opts = []
# Get list of pstats-specific options # Check if any pstats-specific sort options were provided
pstats_options = {"sort": None, "limit": None, "no_summary": False} if args.sort is not None:
# Get the sort option name that was used
sort_names = {
SORT_MODE_NSAMPLES: "--sort-nsamples",
SORT_MODE_TOTTIME: "--sort-tottime",
SORT_MODE_CUMTIME: "--sort-cumtime",
SORT_MODE_SAMPLE_PCT: "--sort-sample-pct",
SORT_MODE_CUMUL_PCT: "--sort-cumul-pct",
SORT_MODE_NSAMPLES_CUMUL: "--sort-nsamples-cumul",
-1: "--sort-name",
}
sort_opt = sort_names.get(args.sort, "sort")
invalid_opts.append(sort_opt)
# Find the default values from the argument definitions # Check limit option (default is 15)
for action in parser._actions: if args.limit != 15:
if action.dest in pstats_options and hasattr(action, "default"): invalid_opts.append("-l/--limit")
pstats_options[action.dest] = action.default
# Check if any pstats-specific options were provided by comparing with defaults # Check no_summary option
for opt, default in pstats_options.items(): if args.no_summary:
if getattr(args, opt) != default: invalid_opts.append("--no-summary")
invalid_opts.append(opt.replace("no_", ""))
if invalid_opts: if invalid_opts:
parser.error( parser.error(
f"The following options are only valid with --pstats format: {', '.join(invalid_opts)}" f"--{args.format} format is incompatible with: {', '.join(invalid_opts)}. "
"These options are only valid with --pstats format."
) )
# Validate that --mode is not used with --gecko
if args.format == "gecko" and args.mode != "wall":
parser.error("--mode option is incompatible with --gecko format. Gecko format automatically uses ALL mode (GIL + CPU analysis).")
# Set default output filename for collapsed format only if we have a PID # Set default output filename for collapsed format only if we have a PID
# For module/script execution, this will be set later with the subprocess PID # For module/script execution, this will be set later with the subprocess PID
if not args.outfile and args.pid is not None: if not args.outfile and args.pid is not None:
args.outfile = f"collapsed.{args.pid}.txt" args.outfile = f"collapsed.{args.pid}.txt"
def _validate_live_format_args(args, parser):
"""Validate arguments when using --live output format.
Live mode provides an interactive TUI that is incompatible with file output
and certain pstats display options.
"""
invalid_opts = []
# Live mode is incompatible with file output
if args.outfile:
invalid_opts.append("-o/--outfile")
# pstats-specific display options are incompatible
if args.no_summary:
invalid_opts.append("--no-summary")
if invalid_opts:
parser.error(
f"--live mode is incompatible with: {', '.join(invalid_opts)}. "
"Live mode provides its own interactive display."
)
def wait_for_process_and_sample(pid, sort_value, args): def wait_for_process_and_sample(pid, sort_value, args):
"""Sample the process immediately since it has already signaled readiness.""" """Sample the process immediately since it has already signaled readiness."""
# Set default filename with subprocess PID if not already set # Set default filename with subprocess PID if not already set
@ -826,6 +941,13 @@ def main():
dest="format", dest="format",
help="Generate Gecko format for Firefox Profiler", help="Generate Gecko format for Firefox Profiler",
) )
output_format.add_argument(
"--live",
action="store_const",
const="live",
dest="format",
help="Display live top-like live statistics in a terminal UI",
)
output_group.add_argument( output_group.add_argument(
"-o", "-o",
@ -841,42 +963,42 @@ def main():
sort_group.add_argument( sort_group.add_argument(
"--sort-nsamples", "--sort-nsamples",
action="store_const", action="store_const",
const=0, const=SORT_MODE_NSAMPLES,
dest="sort", dest="sort",
help="Sort by number of direct samples (nsamples column)", help="Sort by number of direct samples (nsamples column, default)",
) )
sort_group.add_argument( sort_group.add_argument(
"--sort-tottime", "--sort-tottime",
action="store_const", action="store_const",
const=1, const=SORT_MODE_TOTTIME,
dest="sort", dest="sort",
help="Sort by total time (tottime column)", help="Sort by total time (tottime column)",
) )
sort_group.add_argument( sort_group.add_argument(
"--sort-cumtime", "--sort-cumtime",
action="store_const", action="store_const",
const=2, const=SORT_MODE_CUMTIME,
dest="sort", dest="sort",
help="Sort by cumulative time (cumtime column, default)", help="Sort by cumulative time (cumtime column)",
) )
sort_group.add_argument( sort_group.add_argument(
"--sort-sample-pct", "--sort-sample-pct",
action="store_const", action="store_const",
const=3, const=SORT_MODE_SAMPLE_PCT,
dest="sort", dest="sort",
help="Sort by sample percentage (sample%% column)", help="Sort by sample percentage (sample%% column)",
) )
sort_group.add_argument( sort_group.add_argument(
"--sort-cumul-pct", "--sort-cumul-pct",
action="store_const", action="store_const",
const=4, const=SORT_MODE_CUMUL_PCT,
dest="sort", dest="sort",
help="Sort by cumulative sample percentage (cumul%% column)", help="Sort by cumulative sample percentage (cumul%% column)",
) )
sort_group.add_argument( sort_group.add_argument(
"--sort-nsamples-cumul", "--sort-nsamples-cumul",
action="store_const", action="store_const",
const=5, const=SORT_MODE_NSAMPLES_CUMUL,
dest="sort", dest="sort",
help="Sort by cumulative samples (nsamples column, cumulative part)", help="Sort by cumulative samples (nsamples column, cumulative part)",
) )
@ -903,15 +1025,21 @@ def main():
args = parser.parse_args() args = parser.parse_args()
# Check if live mode is available early
if args.format == "live" and LiveStatsCollector is None:
print(
"Error: Live mode (--live) requires the curses module, which is not available.\n",
file=sys.stderr
)
sys.exit(1)
# Validate format-specific arguments # Validate format-specific arguments
if args.format in ("collapsed", "gecko"): if args.format in ("collapsed", "gecko", "flamegraph"):
_validate_collapsed_format_args(args, parser) _validate_file_output_format_args(args, parser)
elif args.format == "live":
_validate_live_format_args(args, parser)
# Validate that --mode is not used with --gecko sort_value = args.sort if args.sort is not None else SORT_MODE_NSAMPLES
if args.format == "gecko" and args.mode != "wall":
parser.error("--mode option is incompatible with --gecko format. Gecko format automatically uses ALL mode (GIL + CPU analysis).")
sort_value = args.sort if args.sort is not None else 2
if args.module is not None and not args.module: if args.module is not None and not args.module:
parser.error("argument -m/--module: expected one argument") parser.error("argument -m/--module: expected one argument")
@ -958,7 +1086,9 @@ def main():
cmd = (sys.executable, *args.args) cmd = (sys.executable, *args.args)
# Use synchronized process startup # Use synchronized process startup
process = _run_with_sync(cmd) # Suppress output if using live mode
suppress_output = (args.format == "live")
process = _run_with_sync(cmd, suppress_output=suppress_output)
# Process has already signaled readiness, start sampling immediately # Process has already signaled readiness, start sampling immediately
try: try:

View file

@ -0,0 +1,41 @@
"""Common test helpers and mocks for live collector tests."""
from profiling.sampling.constants import (
THREAD_STATUS_HAS_GIL,
THREAD_STATUS_ON_CPU,
)
class MockFrameInfo:
"""Mock FrameInfo for testing."""
def __init__(self, filename, lineno, funcname):
self.filename = filename
self.lineno = lineno
self.funcname = funcname
def __repr__(self):
return f"MockFrameInfo(filename='{self.filename}', lineno={self.lineno}, funcname='{self.funcname}')"
class MockThreadInfo:
"""Mock ThreadInfo for testing."""
def __init__(self, thread_id, frame_info, status=THREAD_STATUS_HAS_GIL | THREAD_STATUS_ON_CPU):
self.thread_id = thread_id
self.frame_info = frame_info
self.status = status
def __repr__(self):
return f"MockThreadInfo(thread_id={self.thread_id}, frame_info={self.frame_info}, status={self.status})"
class MockInterpreterInfo:
"""Mock InterpreterInfo for testing."""
def __init__(self, interpreter_id, threads):
self.interpreter_id = interpreter_id
self.threads = threads
def __repr__(self):
return f"MockInterpreterInfo(interpreter_id={self.interpreter_id}, threads={self.threads})"

View file

@ -79,7 +79,7 @@ def test_cli_module_argument_parsing(self):
self._verify_coordinator_command(mock_popen, ("-m", "mymodule")) self._verify_coordinator_command(mock_popen, ("-m", "mymodule"))
mock_sample.assert_called_once_with( mock_sample.assert_called_once_with(
12345, 12345,
sort=2, # default sort (sort_value from args.sort) sort=0, # default sort (sort_value from args.sort)
sample_interval_usec=100, sample_interval_usec=100,
duration_sec=10, duration_sec=10,
filename=None, filename=None,
@ -118,7 +118,7 @@ def test_cli_module_with_arguments(self):
) )
mock_sample.assert_called_once_with( mock_sample.assert_called_once_with(
12345, 12345,
sort=2, sort=0,
sample_interval_usec=100, sample_interval_usec=100,
duration_sec=10, duration_sec=10,
filename=None, filename=None,
@ -148,7 +148,7 @@ def test_cli_script_argument_parsing(self):
self._verify_coordinator_command(mock_popen, ("myscript.py",)) self._verify_coordinator_command(mock_popen, ("myscript.py",))
mock_sample.assert_called_once_with( mock_sample.assert_called_once_with(
12345, 12345,
sort=2, sort=0,
sample_interval_usec=100, sample_interval_usec=100,
duration_sec=10, duration_sec=10,
filename=None, filename=None,
@ -323,7 +323,7 @@ def test_cli_script_with_profiler_options(self):
# Verify profiler options were passed correctly # Verify profiler options were passed correctly
mock_sample.assert_called_once_with( mock_sample.assert_called_once_with(
12345, 12345,
sort=2, # default sort sort=0, # default sort
sample_interval_usec=2000, sample_interval_usec=2000,
duration_sec=60, duration_sec=60,
filename="output.txt", filename="output.txt",
@ -411,7 +411,8 @@ def test_cli_complex_script_arguments(self):
"-v", "-v",
"--output=/tmp/out", "--output=/tmp/out",
"positional", "positional",
) ),
suppress_output=False
) )
def test_cli_collapsed_format_validation(self): def test_cli_collapsed_format_validation(self):
@ -627,7 +628,7 @@ def test_argument_parsing_basic(self):
filename=None, filename=None,
all_threads=False, all_threads=False,
limit=15, limit=15,
sort=2, sort=0,
show_summary=True, show_summary=True,
output_format="pstats", output_format="pstats",
realtime_stats=False, realtime_stats=False,

View file

@ -0,0 +1,477 @@
"""Core functionality tests for LiveStatsCollector.
Tests for path simplification, frame processing, collect method,
statistics building, sorting, and formatting.
"""
import os
import unittest
from test.support import requires
from test.support.import_helper import import_module
# Only run these tests if curses is available
requires("curses")
curses = import_module("curses")
from profiling.sampling.live_collector import LiveStatsCollector, MockDisplay
from profiling.sampling.constants import (
THREAD_STATUS_HAS_GIL,
THREAD_STATUS_ON_CPU,
)
from ._live_collector_helpers import (
MockFrameInfo,
MockThreadInfo,
MockInterpreterInfo,
)
class TestLiveStatsCollectorPathSimplification(unittest.TestCase):
"""Tests for path simplification functionality."""
def test_simplify_stdlib_path(self):
"""Test simplification of standard library paths."""
collector = LiveStatsCollector(1000)
# Get actual os module path
os_file = os.__file__
if os_file:
stdlib_dir = os.path.dirname(os.path.abspath(os_file))
test_path = os.path.join(stdlib_dir, "json", "decoder.py")
simplified = collector._simplify_path(test_path)
# Should remove the stdlib prefix
self.assertNotIn(stdlib_dir, simplified)
self.assertIn("json", simplified)
def test_simplify_unknown_path(self):
"""Test that unknown paths are returned unchanged."""
collector = LiveStatsCollector(1000)
test_path = "/some/unknown/path/file.py"
simplified = collector._simplify_path(test_path)
self.assertEqual(simplified, test_path)
class TestLiveStatsCollectorFrameProcessing(unittest.TestCase):
"""Tests for frame processing functionality."""
def test_process_single_frame(self):
"""Test processing a single frame."""
collector = LiveStatsCollector(1000)
frames = [MockFrameInfo("test.py", 10, "test_func")]
collector._process_frames(frames)
location = ("test.py", 10, "test_func")
self.assertEqual(collector.result[location]["direct_calls"], 1)
self.assertEqual(collector.result[location]["cumulative_calls"], 1)
def test_process_multiple_frames(self):
"""Test processing a stack of multiple frames."""
collector = LiveStatsCollector(1000)
frames = [
MockFrameInfo("test.py", 10, "inner_func"),
MockFrameInfo("test.py", 20, "middle_func"),
MockFrameInfo("test.py", 30, "outer_func"),
]
collector._process_frames(frames)
# Top frame (inner_func) should have both direct and cumulative
inner_loc = ("test.py", 10, "inner_func")
self.assertEqual(collector.result[inner_loc]["direct_calls"], 1)
self.assertEqual(collector.result[inner_loc]["cumulative_calls"], 1)
# Other frames should only have cumulative
middle_loc = ("test.py", 20, "middle_func")
self.assertEqual(collector.result[middle_loc]["direct_calls"], 0)
self.assertEqual(collector.result[middle_loc]["cumulative_calls"], 1)
outer_loc = ("test.py", 30, "outer_func")
self.assertEqual(collector.result[outer_loc]["direct_calls"], 0)
self.assertEqual(collector.result[outer_loc]["cumulative_calls"], 1)
def test_process_empty_frames(self):
"""Test processing empty frames list."""
collector = LiveStatsCollector(1000)
collector._process_frames([])
# Should not raise an error and result should remain empty
self.assertEqual(len(collector.result), 0)
def test_process_frames_accumulation(self):
"""Test that multiple calls accumulate correctly."""
collector = LiveStatsCollector(1000)
frames = [MockFrameInfo("test.py", 10, "test_func")]
collector._process_frames(frames)
collector._process_frames(frames)
collector._process_frames(frames)
location = ("test.py", 10, "test_func")
self.assertEqual(collector.result[location]["direct_calls"], 3)
self.assertEqual(collector.result[location]["cumulative_calls"], 3)
def test_process_frames_with_thread_id(self):
"""Test processing frames with per-thread tracking."""
collector = LiveStatsCollector(1000)
frames = [MockFrameInfo("test.py", 10, "test_func")]
# Process frames with thread_id
collector._process_frames(frames, thread_id=123)
# Check aggregated result
location = ("test.py", 10, "test_func")
self.assertEqual(collector.result[location]["direct_calls"], 1)
self.assertEqual(collector.result[location]["cumulative_calls"], 1)
# Check per-thread result
self.assertIn(123, collector.per_thread_data)
self.assertEqual(
collector.per_thread_data[123].result[location]["direct_calls"], 1
)
self.assertEqual(
collector.per_thread_data[123].result[location]["cumulative_calls"], 1
)
def test_process_frames_multiple_threads(self):
"""Test processing frames from multiple threads."""
collector = LiveStatsCollector(1000)
frames1 = [MockFrameInfo("test.py", 10, "test_func")]
frames2 = [MockFrameInfo("test.py", 20, "other_func")]
# Process frames from different threads
collector._process_frames(frames1, thread_id=123)
collector._process_frames(frames2, thread_id=456)
# Check that both threads have their own data
self.assertIn(123, collector.per_thread_data)
self.assertIn(456, collector.per_thread_data)
loc1 = ("test.py", 10, "test_func")
loc2 = ("test.py", 20, "other_func")
# Thread 123 should only have func1
self.assertEqual(
collector.per_thread_data[123].result[loc1]["direct_calls"], 1
)
self.assertNotIn(loc2, collector.per_thread_data[123].result)
# Thread 456 should only have func2
self.assertEqual(
collector.per_thread_data[456].result[loc2]["direct_calls"], 1
)
self.assertNotIn(loc1, collector.per_thread_data[456].result)
class TestLiveStatsCollectorCollect(unittest.TestCase):
"""Tests for the collect method."""
def test_collect_initializes_start_time(self):
"""Test that collect initializes start_time on first call."""
collector = LiveStatsCollector(1000)
self.assertIsNone(collector.start_time)
# Create mock stack frames
thread_info = MockThreadInfo(123, [])
interpreter_info = MockInterpreterInfo(0, [thread_info])
stack_frames = [interpreter_info]
collector.collect(stack_frames)
self.assertIsNotNone(collector.start_time)
def test_collect_increments_sample_count(self):
"""Test that collect increments total_samples."""
collector = LiveStatsCollector(1000)
thread_info = MockThreadInfo(123, [])
interpreter_info = MockInterpreterInfo(0, [thread_info])
stack_frames = [interpreter_info]
self.assertEqual(collector.total_samples, 0)
collector.collect(stack_frames)
self.assertEqual(collector.total_samples, 1)
collector.collect(stack_frames)
self.assertEqual(collector.total_samples, 2)
def test_collect_with_frames(self):
"""Test collect with actual frame data."""
collector = LiveStatsCollector(1000)
frames = [MockFrameInfo("test.py", 10, "test_func")]
thread_info = MockThreadInfo(123, frames)
interpreter_info = MockInterpreterInfo(0, [thread_info])
stack_frames = [interpreter_info]
collector.collect(stack_frames)
location = ("test.py", 10, "test_func")
self.assertEqual(collector.result[location]["direct_calls"], 1)
self.assertEqual(collector._successful_samples, 1)
self.assertEqual(collector._failed_samples, 0)
def test_collect_with_empty_frames(self):
"""Test collect with empty frames."""
collector = LiveStatsCollector(1000)
thread_info = MockThreadInfo(123, [])
interpreter_info = MockInterpreterInfo(0, [thread_info])
stack_frames = [interpreter_info]
collector.collect(stack_frames)
# Empty frames still count as successful since collect() was called successfully
self.assertEqual(collector._successful_samples, 1)
self.assertEqual(collector._failed_samples, 0)
def test_collect_skip_idle_threads(self):
"""Test that idle threads are skipped when skip_idle=True."""
collector = LiveStatsCollector(1000, skip_idle=True)
frames = [MockFrameInfo("test.py", 10, "test_func")]
running_thread = MockThreadInfo(
123, frames, status=THREAD_STATUS_HAS_GIL | THREAD_STATUS_ON_CPU
)
idle_thread = MockThreadInfo(124, frames, status=0) # No flags = idle
interpreter_info = MockInterpreterInfo(
0, [running_thread, idle_thread]
)
stack_frames = [interpreter_info]
collector.collect(stack_frames)
# Only one thread should be processed
location = ("test.py", 10, "test_func")
self.assertEqual(collector.result[location]["direct_calls"], 1)
def test_collect_multiple_threads(self):
"""Test collect with multiple threads."""
collector = LiveStatsCollector(1000)
frames1 = [MockFrameInfo("test1.py", 10, "func1")]
frames2 = [MockFrameInfo("test2.py", 20, "func2")]
thread1 = MockThreadInfo(123, frames1)
thread2 = MockThreadInfo(124, frames2)
interpreter_info = MockInterpreterInfo(0, [thread1, thread2])
stack_frames = [interpreter_info]
collector.collect(stack_frames)
loc1 = ("test1.py", 10, "func1")
loc2 = ("test2.py", 20, "func2")
self.assertEqual(collector.result[loc1]["direct_calls"], 1)
self.assertEqual(collector.result[loc2]["direct_calls"], 1)
# Check thread IDs are tracked
self.assertIn(123, collector.thread_ids)
self.assertIn(124, collector.thread_ids)
class TestLiveStatsCollectorStatisticsBuilding(unittest.TestCase):
"""Tests for statistics building and sorting."""
def setUp(self):
"""Set up test fixtures."""
self.collector = LiveStatsCollector(1000)
# Add some test data
self.collector.result[("file1.py", 10, "func1")] = {
"direct_calls": 100,
"cumulative_calls": 150,
"total_rec_calls": 0,
}
self.collector.result[("file2.py", 20, "func2")] = {
"direct_calls": 50,
"cumulative_calls": 200,
"total_rec_calls": 0,
}
self.collector.result[("file3.py", 30, "func3")] = {
"direct_calls": 75,
"cumulative_calls": 75,
"total_rec_calls": 0,
}
self.collector.total_samples = 300
def test_build_stats_list(self):
"""Test that stats list is built correctly."""
stats_list = self.collector._build_stats_list()
self.assertEqual(len(stats_list), 3)
# Check that all expected keys are present
for stat in stats_list:
self.assertIn("func", stat)
self.assertIn("direct_calls", stat)
self.assertIn("cumulative_calls", stat)
self.assertIn("total_time", stat)
self.assertIn("cumulative_time", stat)
def test_sort_by_nsamples(self):
"""Test sorting by number of samples."""
self.collector.sort_by = "nsamples"
stats_list = self.collector._build_stats_list()
# Should be sorted by direct_calls descending
self.assertEqual(stats_list[0]["func"][2], "func1") # 100 samples
self.assertEqual(stats_list[1]["func"][2], "func3") # 75 samples
self.assertEqual(stats_list[2]["func"][2], "func2") # 50 samples
def test_sort_by_tottime(self):
"""Test sorting by total time."""
self.collector.sort_by = "tottime"
stats_list = self.collector._build_stats_list()
# Should be sorted by total_time descending
# total_time = direct_calls * sample_interval_sec
self.assertEqual(stats_list[0]["func"][2], "func1")
self.assertEqual(stats_list[1]["func"][2], "func3")
self.assertEqual(stats_list[2]["func"][2], "func2")
def test_sort_by_cumtime(self):
"""Test sorting by cumulative time."""
self.collector.sort_by = "cumtime"
stats_list = self.collector._build_stats_list()
# Should be sorted by cumulative_time descending
self.assertEqual(stats_list[0]["func"][2], "func2") # 200 cumulative
self.assertEqual(stats_list[1]["func"][2], "func1") # 150 cumulative
self.assertEqual(stats_list[2]["func"][2], "func3") # 75 cumulative
def test_sort_by_sample_pct(self):
"""Test sorting by sample percentage."""
self.collector.sort_by = "sample_pct"
stats_list = self.collector._build_stats_list()
# Should be sorted by percentage of direct_calls
self.assertEqual(stats_list[0]["func"][2], "func1") # 33.3%
self.assertEqual(stats_list[1]["func"][2], "func3") # 25%
self.assertEqual(stats_list[2]["func"][2], "func2") # 16.7%
def test_sort_by_cumul_pct(self):
"""Test sorting by cumulative percentage."""
self.collector.sort_by = "cumul_pct"
stats_list = self.collector._build_stats_list()
# Should be sorted by percentage of cumulative_calls
self.assertEqual(stats_list[0]["func"][2], "func2") # 66.7%
self.assertEqual(stats_list[1]["func"][2], "func1") # 50%
self.assertEqual(stats_list[2]["func"][2], "func3") # 25%
class TestLiveStatsCollectorSortCycle(unittest.TestCase):
"""Tests for sort mode cycling."""
def test_cycle_sort_from_nsamples(self):
"""Test cycling from nsamples."""
collector = LiveStatsCollector(1000, sort_by="nsamples")
collector._cycle_sort()
self.assertEqual(collector.sort_by, "sample_pct")
def test_cycle_sort_from_sample_pct(self):
"""Test cycling from sample_pct."""
collector = LiveStatsCollector(1000, sort_by="sample_pct")
collector._cycle_sort()
self.assertEqual(collector.sort_by, "tottime")
def test_cycle_sort_from_tottime(self):
"""Test cycling from tottime."""
collector = LiveStatsCollector(1000, sort_by="tottime")
collector._cycle_sort()
self.assertEqual(collector.sort_by, "cumul_pct")
def test_cycle_sort_from_cumul_pct(self):
"""Test cycling from cumul_pct."""
collector = LiveStatsCollector(1000, sort_by="cumul_pct")
collector._cycle_sort()
self.assertEqual(collector.sort_by, "cumtime")
def test_cycle_sort_from_cumtime(self):
"""Test cycling from cumtime back to nsamples."""
collector = LiveStatsCollector(1000, sort_by="cumtime")
collector._cycle_sort()
self.assertEqual(collector.sort_by, "nsamples")
def test_cycle_sort_invalid_mode(self):
"""Test cycling from invalid mode resets to nsamples."""
collector = LiveStatsCollector(1000)
collector.sort_by = "invalid_mode"
collector._cycle_sort()
self.assertEqual(collector.sort_by, "nsamples")
def test_cycle_sort_backward_from_nsamples(self):
"""Test cycling backward from nsamples goes to cumtime."""
collector = LiveStatsCollector(1000, sort_by="nsamples")
collector._cycle_sort(reverse=True)
self.assertEqual(collector.sort_by, "cumtime")
def test_cycle_sort_backward_from_cumtime(self):
"""Test cycling backward from cumtime goes to cumul_pct."""
collector = LiveStatsCollector(1000, sort_by="cumtime")
collector._cycle_sort(reverse=True)
self.assertEqual(collector.sort_by, "cumul_pct")
def test_cycle_sort_backward_from_sample_pct(self):
"""Test cycling backward from sample_pct goes to nsamples."""
collector = LiveStatsCollector(1000, sort_by="sample_pct")
collector._cycle_sort(reverse=True)
self.assertEqual(collector.sort_by, "nsamples")
def test_input_lowercase_s_cycles_forward(self):
"""Test that lowercase 's' cycles forward."""
display = MockDisplay()
collector = LiveStatsCollector(
1000, sort_by="nsamples", display=display
)
display.simulate_input(ord("s"))
collector._handle_input()
self.assertEqual(collector.sort_by, "sample_pct")
def test_input_uppercase_s_cycles_backward(self):
"""Test that uppercase 'S' cycles backward."""
display = MockDisplay()
collector = LiveStatsCollector(
1000, sort_by="nsamples", display=display
)
display.simulate_input(ord("S"))
collector._handle_input()
self.assertEqual(collector.sort_by, "cumtime")
class TestLiveStatsCollectorFormatting(unittest.TestCase):
"""Tests for formatting methods."""
def test_format_uptime_seconds(self):
"""Test uptime formatting for seconds only."""
collector = LiveStatsCollector(1000, display=MockDisplay())
colors = collector._setup_colors()
collector._initialize_widgets(colors)
self.assertEqual(collector._header_widget.format_uptime(45), "0m45s")
def test_format_uptime_minutes(self):
"""Test uptime formatting for minutes."""
collector = LiveStatsCollector(1000, display=MockDisplay())
colors = collector._setup_colors()
collector._initialize_widgets(colors)
self.assertEqual(collector._header_widget.format_uptime(125), "2m05s")
def test_format_uptime_hours(self):
"""Test uptime formatting for hours."""
collector = LiveStatsCollector(1000, display=MockDisplay())
colors = collector._setup_colors()
collector._initialize_widgets(colors)
self.assertEqual(
collector._header_widget.format_uptime(3661), "1h01m01s"
)
def test_format_uptime_large_values(self):
"""Test uptime formatting for large time values."""
collector = LiveStatsCollector(1000, display=MockDisplay())
colors = collector._setup_colors()
collector._initialize_widgets(colors)
self.assertEqual(
collector._header_widget.format_uptime(86400), "24h00m00s"
)
def test_format_uptime_zero(self):
"""Test uptime formatting for zero."""
collector = LiveStatsCollector(1000, display=MockDisplay())
colors = collector._setup_colors()
collector._initialize_widgets(colors)
self.assertEqual(collector._header_widget.format_uptime(0), "0m00s")
if __name__ == "__main__":
unittest.main()

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,819 @@
"""UI and display tests for LiveStatsCollector.
Tests for MockDisplay, curses integration, display methods,
edge cases, update display, and display helpers.
"""
import sys
import time
import unittest
from unittest import mock
from test.support import requires
from test.support.import_helper import import_module
# Only run these tests if curses is available
requires("curses")
curses = import_module("curses")
from profiling.sampling.live_collector import LiveStatsCollector, MockDisplay
from ._live_collector_helpers import (
MockThreadInfo,
MockInterpreterInfo,
)
class TestLiveStatsCollectorWithMockDisplay(unittest.TestCase):
"""Tests for display functionality using MockDisplay."""
def setUp(self):
"""Set up collector with mock display."""
self.mock_display = MockDisplay(height=40, width=160)
self.collector = LiveStatsCollector(
1000, pid=12345, display=self.mock_display
)
self.collector.start_time = time.perf_counter()
def test_update_display_with_mock(self):
"""Test that update_display works with MockDisplay."""
self.collector.total_samples = 100
self.collector.result[("test.py", 10, "test_func")] = {
"direct_calls": 50,
"cumulative_calls": 75,
"total_rec_calls": 0,
}
self.collector._update_display()
# Verify display operations were called
self.assertTrue(self.mock_display.cleared)
self.assertTrue(self.mock_display.refreshed)
self.assertTrue(self.mock_display.redrawn)
# Verify some content was written
self.assertGreater(len(self.mock_display.buffer), 0)
def test_handle_input_quit(self):
"""Test that 'q' input stops the collector."""
self.mock_display.simulate_input(ord("q"))
self.collector._handle_input()
self.assertFalse(self.collector.running)
def test_handle_input_sort_cycle(self):
"""Test that 's' input cycles sort mode."""
self.collector.sort_by = "tottime"
self.mock_display.simulate_input(ord("s"))
self.collector._handle_input()
self.assertEqual(self.collector.sort_by, "cumul_pct")
def test_draw_methods_with_mock_display(self):
"""Test that draw methods write to mock display."""
self.collector.total_samples = 500
self.collector._successful_samples = 450
self.collector._failed_samples = 50
colors = self.collector._setup_colors()
self.collector._initialize_widgets(colors)
# Test individual widget methods
line = self.collector._header_widget.draw_header_info(0, 160, 100.5)
self.assertEqual(line, 2) # Title + header info line
self.assertGreater(len(self.mock_display.buffer), 0)
# Clear buffer and test next method
self.mock_display.buffer.clear()
line = self.collector._header_widget.draw_sample_stats(0, 160, 10.0)
self.assertEqual(line, 1)
self.assertGreater(len(self.mock_display.buffer), 0)
def test_terminal_too_small_message(self):
"""Test terminal too small warning."""
small_display = MockDisplay(height=10, width=50)
self.collector.display = small_display
self.collector._show_terminal_too_small(10, 50)
# Should have written warning message
text = small_display.get_text_at(3, 15) # Approximate center
self.assertIsNotNone(text)
def test_full_display_rendering_with_data(self):
"""Test complete display rendering with realistic data."""
# Add multiple functions with different call counts
self.collector.total_samples = 1000
self.collector._successful_samples = 950
self.collector._failed_samples = 50
self.collector.result[("app.py", 10, "main")] = {
"direct_calls": 100,
"cumulative_calls": 500,
"total_rec_calls": 0,
}
self.collector.result[("utils.py", 20, "helper")] = {
"direct_calls": 300,
"cumulative_calls": 400,
"total_rec_calls": 0,
}
self.collector.result[("db.py", 30, "query")] = {
"direct_calls": 50,
"cumulative_calls": 100,
"total_rec_calls": 0,
}
self.collector._update_display()
# Verify the display has content
self.assertGreater(len(self.mock_display.buffer), 10)
# Verify PID is shown
found_pid = False
for (line, col), (text, attr) in self.mock_display.buffer.items():
if "12345" in text:
found_pid = True
break
self.assertTrue(found_pid, "PID should be displayed")
def test_efficiency_bar_visualization(self):
"""Test that efficiency bar shows correct proportions."""
self.collector.total_samples = 100
self.collector._successful_samples = 75
self.collector._failed_samples = 25
colors = self.collector._setup_colors()
self.collector._initialize_widgets(colors)
self.collector._header_widget.draw_efficiency_bar(0, 160)
# Check that something was drawn to the display
self.assertGreater(len(self.mock_display.buffer), 0)
def test_stats_display_with_different_sort_modes(self):
"""Test that stats are displayed correctly with different sort modes."""
self.collector.total_samples = 100
self.collector.result[("a.py", 1, "func_a")] = {
"direct_calls": 10,
"cumulative_calls": 20,
"total_rec_calls": 0,
}
self.collector.result[("b.py", 2, "func_b")] = {
"direct_calls": 30,
"cumulative_calls": 40,
"total_rec_calls": 0,
}
# Test each sort mode
for sort_mode in [
"nsamples",
"tottime",
"cumtime",
"sample_pct",
"cumul_pct",
]:
self.mock_display.buffer.clear()
self.collector.sort_by = sort_mode
stats_list = self.collector._build_stats_list()
self.assertEqual(len(stats_list), 2)
# Verify sorting worked (func_b should be first for most modes)
if sort_mode in ["nsamples", "tottime", "sample_pct"]:
self.assertEqual(stats_list[0]["func"][2], "func_b")
def test_narrow_terminal_column_hiding(self):
"""Test that columns are hidden on narrow terminals."""
narrow_display = MockDisplay(height=40, width=70)
collector = LiveStatsCollector(1000, pid=12345, display=narrow_display)
collector.start_time = time.perf_counter()
colors = collector._setup_colors()
collector._initialize_widgets(colors)
line, show_sample_pct, show_tottime, show_cumul_pct, show_cumtime = (
collector._table_widget.draw_column_headers(0, 70)
)
# On narrow terminal, some columns should be hidden
self.assertFalse(
show_cumul_pct or show_cumtime,
"Some columns should be hidden on narrow terminal",
)
def test_very_narrow_terminal_minimal_columns(self):
"""Test minimal display on very narrow terminal."""
very_narrow = MockDisplay(height=40, width=60)
collector = LiveStatsCollector(1000, pid=12345, display=very_narrow)
collector.start_time = time.perf_counter()
colors = collector._setup_colors()
collector._initialize_widgets(colors)
line, show_sample_pct, show_tottime, show_cumul_pct, show_cumtime = (
collector._table_widget.draw_column_headers(0, 60)
)
# Very narrow should hide even more columns
self.assertFalse(
show_sample_pct,
"Sample % should be hidden on very narrow terminal",
)
def test_display_updates_only_at_interval(self):
"""Test that display updates respect the update interval."""
# Create collector with display
collector = LiveStatsCollector(1000, display=self.mock_display)
# Simulate multiple rapid collections
thread_info = MockThreadInfo(123, [])
interpreter_info = MockInterpreterInfo(0, [thread_info])
stack_frames = [interpreter_info]
# First collect should update display
collector.collect(stack_frames)
first_cleared = self.mock_display.cleared
# Reset flags
self.mock_display.cleared = False
self.mock_display.refreshed = False
# Immediate second collect should NOT update display (too soon)
collector.collect(stack_frames)
self.assertFalse(
self.mock_display.cleared,
"Display should not update too frequently",
)
def test_top_functions_display(self):
"""Test that top functions are highlighted correctly."""
self.collector.total_samples = 1000
# Create functions with different sample counts
for i in range(10):
self.collector.result[(f"file{i}.py", i * 10, f"func{i}")] = {
"direct_calls": (10 - i) * 10, # Decreasing counts
"cumulative_calls": (10 - i) * 20,
"total_rec_calls": 0,
}
colors = self.collector._setup_colors()
self.collector._initialize_widgets(colors)
stats_list = self.collector._build_stats_list()
self.collector._header_widget.draw_top_functions(0, 160, stats_list)
# Top functions section should have written something
self.assertGreater(len(self.mock_display.buffer), 0)
class TestLiveStatsCollectorCursesIntegration(unittest.TestCase):
"""Tests for curses-related functionality using mocks."""
def setUp(self):
"""Set up mock curses screen."""
self.mock_stdscr = mock.MagicMock()
self.mock_stdscr.getmaxyx.return_value = (40, 160) # height, width
self.mock_stdscr.getch.return_value = -1 # No input
# Save original stdout/stderr
self._orig_stdout = sys.stdout
self._orig_stderr = sys.stderr
def tearDown(self):
"""Restore stdout/stderr if changed."""
sys.stdout = self._orig_stdout
sys.stderr = self._orig_stderr
def test_init_curses(self):
"""Test curses initialization."""
collector = LiveStatsCollector(1000)
with (
mock.patch("curses.curs_set"),
mock.patch("curses.has_colors", return_value=True),
mock.patch("curses.start_color"),
mock.patch("curses.use_default_colors"),
mock.patch("builtins.open", mock.mock_open()) as mock_open_func,
):
collector.init_curses(self.mock_stdscr)
self.assertIsNotNone(collector.stdscr)
self.mock_stdscr.nodelay.assert_called_with(True)
self.mock_stdscr.scrollok.assert_called_with(False)
# Clean up properly
if collector._devnull:
collector._devnull.close()
collector._saved_stdout = None
collector._saved_stderr = None
def test_cleanup_curses(self):
"""Test curses cleanup."""
mock_display = MockDisplay()
collector = LiveStatsCollector(1000, display=mock_display)
collector.stdscr = self.mock_stdscr
# Mock devnull file to avoid resource warnings
mock_devnull = mock.MagicMock()
mock_saved_stdout = mock.MagicMock()
mock_saved_stderr = mock.MagicMock()
collector._devnull = mock_devnull
collector._saved_stdout = mock_saved_stdout
collector._saved_stderr = mock_saved_stderr
with mock.patch("curses.curs_set"):
collector.cleanup_curses()
mock_devnull.close.assert_called_once()
# Verify stdout/stderr were set back to the saved values
self.assertEqual(sys.stdout, mock_saved_stdout)
self.assertEqual(sys.stderr, mock_saved_stderr)
# Verify the saved values were cleared
self.assertIsNone(collector._saved_stdout)
self.assertIsNone(collector._saved_stderr)
self.assertIsNone(collector._devnull)
def test_add_str_with_mock_display(self):
"""Test safe_addstr with MockDisplay."""
mock_display = MockDisplay(height=40, width=160)
collector = LiveStatsCollector(1000, display=mock_display)
colors = collector._setup_colors()
collector._initialize_widgets(colors)
collector._header_widget.add_str(5, 10, "Test", 0)
# Verify it was added to the buffer
self.assertIn((5, 10), mock_display.buffer)
def test_setup_colors_with_color_support(self):
"""Test color setup when colors are supported."""
mock_display = MockDisplay(height=40, width=160)
mock_display.colors_supported = True
collector = LiveStatsCollector(1000, display=mock_display)
colors = collector._setup_colors()
self.assertIn("header", colors)
self.assertIn("cyan", colors)
self.assertIn("yellow", colors)
self.assertIn("green", colors)
self.assertIn("magenta", colors)
self.assertIn("red", colors)
def test_setup_colors_without_color_support(self):
"""Test color setup when colors are not supported."""
mock_display = MockDisplay(height=40, width=160)
mock_display.colors_supported = False
collector = LiveStatsCollector(1000, display=mock_display)
colors = collector._setup_colors()
# Should still have all keys but with fallback values
self.assertIn("header", colors)
self.assertIn("cyan", colors)
def test_handle_input_quit(self):
"""Test handling 'q' key to quit."""
mock_display = MockDisplay()
mock_display.simulate_input(ord("q"))
collector = LiveStatsCollector(1000, display=mock_display)
self.assertTrue(collector.running)
collector._handle_input()
self.assertFalse(collector.running)
def test_handle_input_quit_uppercase(self):
"""Test handling 'Q' key to quit."""
mock_display = MockDisplay()
mock_display.simulate_input(ord("Q"))
collector = LiveStatsCollector(1000, display=mock_display)
self.assertTrue(collector.running)
collector._handle_input()
self.assertFalse(collector.running)
def test_handle_input_cycle_sort(self):
"""Test handling 's' key to cycle sort."""
mock_display = MockDisplay()
mock_display.simulate_input(ord("s"))
collector = LiveStatsCollector(
1000, sort_by="nsamples", display=mock_display
)
collector._handle_input()
self.assertEqual(collector.sort_by, "sample_pct")
def test_handle_input_cycle_sort_uppercase(self):
"""Test handling 'S' key to cycle sort backward."""
mock_display = MockDisplay()
mock_display.simulate_input(ord("S"))
collector = LiveStatsCollector(
1000, sort_by="nsamples", display=mock_display
)
collector._handle_input()
self.assertEqual(collector.sort_by, "cumtime")
def test_handle_input_no_key(self):
"""Test handling when no key is pressed."""
mock_display = MockDisplay()
collector = LiveStatsCollector(1000, display=mock_display)
collector._handle_input()
# Should not change state
self.assertTrue(collector.running)
class TestLiveStatsCollectorDisplayMethods(unittest.TestCase):
"""Tests for display-related methods."""
def setUp(self):
"""Set up collector with mock display."""
self.mock_display = MockDisplay(height=40, width=160)
self.collector = LiveStatsCollector(
1000, pid=12345, display=self.mock_display
)
self.collector.start_time = time.perf_counter()
def test_show_terminal_too_small(self):
"""Test terminal too small message display."""
self.collector._show_terminal_too_small(10, 50)
# Should have written some content to the display buffer
self.assertGreater(len(self.mock_display.buffer), 0)
def test_draw_header_info(self):
"""Test drawing header information."""
colors = {
"cyan": curses.A_BOLD,
"green": curses.A_BOLD,
"yellow": curses.A_BOLD,
"magenta": curses.A_BOLD,
}
self.collector._initialize_widgets(colors)
line = self.collector._header_widget.draw_header_info(0, 160, 100.5)
self.assertEqual(line, 2) # Title + header info line
def test_draw_sample_stats(self):
"""Test drawing sample statistics."""
self.collector.total_samples = 1000
colors = {"cyan": curses.A_BOLD, "green": curses.A_BOLD}
self.collector._initialize_widgets(colors)
line = self.collector._header_widget.draw_sample_stats(0, 160, 10.0)
self.assertEqual(line, 1)
self.assertGreater(self.collector._max_sample_rate, 0)
def test_progress_bar_uses_target_rate(self):
"""Test that progress bar uses target rate instead of max rate."""
# Set up collector with specific sampling interval
collector = LiveStatsCollector(
10000, pid=12345, display=self.mock_display
) # 10ms = 100Hz target
collector.start_time = time.perf_counter()
collector.total_samples = 500
collector._max_sample_rate = (
150 # Higher than target to test we don't use this
)
colors = {"cyan": curses.A_BOLD, "green": curses.A_BOLD}
collector._initialize_widgets(colors)
# Clear the display buffer to capture only our progress bar content
self.mock_display.buffer.clear()
# Draw sample stats with a known elapsed time that gives us a specific sample rate
elapsed = 10.0 # 500 samples in 10 seconds = 50 samples/second
line = collector._header_widget.draw_sample_stats(0, 160, elapsed)
# Verify display was updated
self.assertEqual(line, 1)
self.assertGreater(len(self.mock_display.buffer), 0)
# Verify the label shows current/target format with units instead of "max"
found_current_target_label = False
found_max_label = False
for (line_num, col), (text, attr) in self.mock_display.buffer.items():
# Should show "50.0Hz/100.0Hz (50.0%)" since we're at 50% of target (50/100)
if "50.0Hz/100.0Hz" in text and "50.0%" in text:
found_current_target_label = True
if "max:" in text:
found_max_label = True
self.assertTrue(
found_current_target_label,
"Should display current/target rate with percentage",
)
self.assertFalse(found_max_label, "Should not display max rate label")
def test_progress_bar_different_intervals(self):
"""Test that progress bar adapts to different sampling intervals."""
test_cases = [
(
1000,
"1.0KHz",
"100.0Hz",
), # 1ms interval -> 1000Hz target (1.0KHz), 100Hz current
(
5000,
"200.0Hz",
"100.0Hz",
), # 5ms interval -> 200Hz target, 100Hz current
(
20000,
"50.0Hz",
"100.0Hz",
), # 20ms interval -> 50Hz target, 100Hz current
(
100000,
"10.0Hz",
"100.0Hz",
), # 100ms interval -> 10Hz target, 100Hz current
]
for (
interval_usec,
expected_target_formatted,
expected_current_formatted,
) in test_cases:
with self.subTest(interval=interval_usec):
collector = LiveStatsCollector(
interval_usec, display=MockDisplay()
)
collector.start_time = time.perf_counter()
collector.total_samples = 100
colors = {"cyan": curses.A_BOLD, "green": curses.A_BOLD}
collector._initialize_widgets(colors)
# Clear buffer
collector.display.buffer.clear()
# Draw with 1 second elapsed time (gives us current rate of 100Hz)
collector._header_widget.draw_sample_stats(0, 160, 1.0)
# Check that the current/target format appears in the display with proper units
found_current_target_format = False
for (line_num, col), (
text,
attr,
) in collector.display.buffer.items():
# Looking for format like "100.0Hz/1.0KHz" or "100.0Hz/200.0Hz"
expected_format = f"{expected_current_formatted}/{expected_target_formatted}"
if expected_format in text and "%" in text:
found_current_target_format = True
break
self.assertTrue(
found_current_target_format,
f"Should display current/target rate format with units for {interval_usec}µs interval",
)
def test_draw_efficiency_bar(self):
"""Test drawing efficiency bar."""
self.collector._successful_samples = 900
self.collector._failed_samples = 100
self.collector.total_samples = 1000
colors = {"green": curses.A_BOLD, "red": curses.A_BOLD}
self.collector._initialize_widgets(colors)
line = self.collector._header_widget.draw_efficiency_bar(0, 160)
self.assertEqual(line, 1)
def test_draw_function_stats(self):
"""Test drawing function statistics."""
self.collector.result[("test.py", 10, "func1")] = {
"direct_calls": 100,
"cumulative_calls": 150,
"total_rec_calls": 0,
}
self.collector.result[("test.py", 20, "func2")] = {
"direct_calls": 0,
"cumulative_calls": 50,
"total_rec_calls": 0,
}
stats_list = self.collector._build_stats_list()
colors = {
"cyan": curses.A_BOLD,
"green": curses.A_BOLD,
"yellow": curses.A_BOLD,
"magenta": curses.A_BOLD,
}
self.collector._initialize_widgets(colors)
line = self.collector._header_widget.draw_function_stats(
0, 160, stats_list
)
self.assertEqual(line, 1)
def test_draw_top_functions(self):
"""Test drawing top functions."""
self.collector.total_samples = 300
self.collector.result[("test.py", 10, "hot_func")] = {
"direct_calls": 100,
"cumulative_calls": 150,
"total_rec_calls": 0,
}
stats_list = self.collector._build_stats_list()
colors = {
"red": curses.A_BOLD,
"yellow": curses.A_BOLD,
"green": curses.A_BOLD,
}
self.collector._initialize_widgets(colors)
line = self.collector._header_widget.draw_top_functions(
0, 160, stats_list
)
self.assertEqual(line, 1)
def test_draw_column_headers(self):
"""Test drawing column headers."""
colors = {
"sorted_header": curses.A_BOLD,
"normal_header": curses.A_NORMAL,
}
self.collector._initialize_widgets(colors)
(
line,
show_sample_pct,
show_tottime,
show_cumul_pct,
show_cumtime,
) = self.collector._table_widget.draw_column_headers(0, 160)
self.assertEqual(line, 1)
self.assertTrue(show_sample_pct)
self.assertTrue(show_tottime)
self.assertTrue(show_cumul_pct)
self.assertTrue(show_cumtime)
def test_draw_column_headers_narrow_terminal(self):
"""Test column headers adapt to narrow terminal."""
colors = {
"sorted_header": curses.A_BOLD,
"normal_header": curses.A_NORMAL,
}
self.collector._initialize_widgets(colors)
(
line,
show_sample_pct,
show_tottime,
show_cumul_pct,
show_cumtime,
) = self.collector._table_widget.draw_column_headers(0, 70)
self.assertEqual(line, 1)
# Some columns should be hidden on narrow terminal
self.assertFalse(show_cumul_pct)
def test_draw_footer(self):
"""Test drawing footer."""
colors = self.collector._setup_colors()
self.collector._initialize_widgets(colors)
self.collector._footer_widget.render(38, 160)
# Should have written some content to the display buffer
self.assertGreater(len(self.mock_display.buffer), 0)
def test_draw_progress_bar(self):
"""Test progress bar drawing."""
colors = self.collector._setup_colors()
self.collector._initialize_widgets(colors)
bar, length = self.collector._header_widget.progress_bar.render_bar(
50, 100, 30
)
self.assertIn("[", bar)
self.assertIn("]", bar)
self.assertGreater(length, 0)
# Should be roughly 50% filled
self.assertIn("", bar)
self.assertIn("", bar)
class TestLiveStatsCollectorEdgeCases(unittest.TestCase):
"""Tests for edge cases and error handling."""
def test_very_long_function_name(self):
"""Test handling of very long function names."""
collector = LiveStatsCollector(1000)
long_name = "x" * 200
collector.result[("test.py", 10, long_name)] = {
"direct_calls": 10,
"cumulative_calls": 20,
"total_rec_calls": 0,
}
stats_list = collector._build_stats_list()
self.assertEqual(len(stats_list), 1)
self.assertEqual(stats_list[0]["func"][2], long_name)
class TestLiveStatsCollectorUpdateDisplay(unittest.TestCase):
"""Tests for the _update_display method."""
def setUp(self):
"""Set up collector with mock display."""
self.mock_display = MockDisplay(height=40, width=160)
self.collector = LiveStatsCollector(
1000, pid=12345, display=self.mock_display
)
self.collector.start_time = time.perf_counter()
def test_update_display_terminal_too_small(self):
"""Test update_display when terminal is too small."""
small_display = MockDisplay(height=10, width=50)
self.collector.display = small_display
with mock.patch.object(
self.collector, "_show_terminal_too_small"
) as mock_show:
self.collector._update_display()
mock_show.assert_called_once()
def test_update_display_normal(self):
"""Test normal update_display operation."""
self.collector.total_samples = 100
self.collector._successful_samples = 90
self.collector._failed_samples = 10
self.collector.result[("test.py", 10, "func")] = {
"direct_calls": 50,
"cumulative_calls": 75,
"total_rec_calls": 0,
}
self.collector._update_display()
self.assertTrue(self.mock_display.cleared)
self.assertTrue(self.mock_display.refreshed)
def test_update_display_handles_exception(self):
"""Test that update_display handles exceptions gracefully."""
# Make one of the methods raise an exception
with mock.patch.object(
self.collector,
"_prepare_display_data",
side_effect=Exception("Test error"),
):
# Should not raise an exception (it catches and logs via trace_exception)
try:
self.collector._update_display()
except Exception:
self.fail(
"_update_display should handle exceptions gracefully"
)
class TestLiveCollectorWithMockDisplayHelpers(unittest.TestCase):
"""Tests using the new MockDisplay helper methods."""
def test_verify_pid_display_with_contains(self):
"""Test verifying PID is displayed using contains_text helper."""
display = MockDisplay(height=40, width=160)
collector = LiveStatsCollector(1000, pid=99999, display=display)
collector.start_time = time.perf_counter()
collector.total_samples = 10
collector._update_display()
# Use the helper method
self.assertTrue(
display.contains_text("99999"), "PID should be visible in display"
)
def test_verify_function_names_displayed(self):
"""Test verifying function names appear in display."""
display = MockDisplay(height=40, width=160)
collector = LiveStatsCollector(1000, pid=12345, display=display)
collector.start_time = time.perf_counter()
collector.total_samples = 100
collector.result[("mymodule.py", 42, "my_special_function")] = {
"direct_calls": 50,
"cumulative_calls": 75,
"total_rec_calls": 0,
}
collector._update_display()
# Verify function name appears
self.assertTrue(
display.contains_text("my_special_function"),
"Function name should be visible",
)
def test_get_all_lines_full_display(self):
"""Test getting all lines from a full display render."""
display = MockDisplay(height=40, width=160)
collector = LiveStatsCollector(1000, pid=12345, display=display)
collector.start_time = time.perf_counter()
collector.total_samples = 100
collector._update_display()
lines = display.get_all_lines()
# Should have multiple lines of content
self.assertGreater(len(lines), 5)
# Should have header content
self.assertTrue(any("PID" in line for line in lines))
if __name__ == "__main__":
unittest.main()

View file

@ -0,0 +1,97 @@
"""Simple unit tests for TrendTracker."""
import unittest
from test.support import requires
from test.support.import_helper import import_module
# Only run these tests if curses is available
requires("curses")
curses = import_module("curses")
from profiling.sampling.live_collector.trend_tracker import TrendTracker
class TestTrendTracker(unittest.TestCase):
"""Tests for TrendTracker class."""
def setUp(self):
"""Set up test fixtures."""
self.colors = {
"trend_up": curses.A_BOLD,
"trend_down": curses.A_REVERSE,
"trend_stable": curses.A_NORMAL,
}
def test_basic_trend_detection(self):
"""Test basic up/down/stable trend detection."""
tracker = TrendTracker(self.colors, enabled=True)
# First value is always stable
self.assertEqual(tracker.update("func1", "nsamples", 10), "stable")
# Increasing value
self.assertEqual(tracker.update("func1", "nsamples", 20), "up")
# Decreasing value
self.assertEqual(tracker.update("func1", "nsamples", 15), "down")
# Small change (within threshold) is stable
self.assertEqual(tracker.update("func1", "nsamples", 15.0001), "stable")
def test_multiple_metrics(self):
"""Test tracking multiple metrics simultaneously."""
tracker = TrendTracker(self.colors, enabled=True)
trends = tracker.update_metrics("func1", {
"nsamples": 10,
"tottime": 5.0,
})
self.assertEqual(trends["nsamples"], "stable")
self.assertEqual(trends["tottime"], "stable")
# Update with changes
trends = tracker.update_metrics("func1", {
"nsamples": 15,
"tottime": 3.0,
})
self.assertEqual(trends["nsamples"], "up")
self.assertEqual(trends["tottime"], "down")
def test_toggle_enabled(self):
"""Test enable/disable toggle."""
tracker = TrendTracker(self.colors, enabled=True)
self.assertTrue(tracker.enabled)
tracker.toggle()
self.assertFalse(tracker.enabled)
# When disabled, should return A_NORMAL
self.assertEqual(tracker.get_color("up"), curses.A_NORMAL)
def test_get_color(self):
"""Test color selection for trends."""
tracker = TrendTracker(self.colors, enabled=True)
self.assertEqual(tracker.get_color("up"), curses.A_BOLD)
self.assertEqual(tracker.get_color("down"), curses.A_REVERSE)
self.assertEqual(tracker.get_color("stable"), curses.A_NORMAL)
def test_clear(self):
"""Test clearing tracked values."""
tracker = TrendTracker(self.colors, enabled=True)
# Add some data
tracker.update("func1", "nsamples", 10)
tracker.update("func1", "nsamples", 20)
# Clear
tracker.clear()
# After clear, first update should be stable
self.assertEqual(tracker.update("func1", "nsamples", 30), "stable")
if __name__ == "__main__":
unittest.main()

View file

@ -0,0 +1,4 @@
Add a new ``--live`` mode to the tachyon profiler in
:mod:`profiling.sampling` module. This mode consist of a live TUI that
displays real-time profiling statistics as the target application runs,
similar to ``top``. Patch by Pablo Galindo

View file

@ -2761,7 +2761,13 @@ unwind_stack_for_thread(
// Check CPU status // Check CPU status
long pthread_id = GET_MEMBER(long, ts, unwinder->debug_offsets.thread_state.thread_id); long pthread_id = GET_MEMBER(long, ts, unwinder->debug_offsets.thread_state.thread_id);
int cpu_status = get_thread_status(unwinder, tid, pthread_id);
// Optimization: only check CPU status if needed by mode because it's expensive
int cpu_status = -1;
if (unwinder->mode == PROFILING_MODE_CPU || unwinder->mode == PROFILING_MODE_ALL) {
cpu_status = get_thread_status(unwinder, tid, pthread_id);
}
if (cpu_status == -1) { if (cpu_status == -1) {
status_flags |= THREAD_STATUS_UNKNOWN; status_flags |= THREAD_STATUS_UNKNOWN;
} else if (cpu_status == THREAD_STATE_RUNNING) { } else if (cpu_status == THREAD_STATE_RUNNING) {