mirror of
https://github.com/python/cpython.git
synced 2025-12-08 06:10:17 +00:00
gh-141645: Refactor tachyon's live TUI tests to not use private fields (#141806)
This commit is contained in:
parent
77cb39e0c7
commit
07410da204
6 changed files with 142 additions and 142 deletions
BIN
Lib/data.bin
Normal file
BIN
Lib/data.bin
Normal file
Binary file not shown.
|
|
@ -137,20 +137,20 @@ def __init__(
|
||||||
self._saved_stderr = None
|
self._saved_stderr = None
|
||||||
self._devnull = None
|
self._devnull = None
|
||||||
self._last_display_update = None
|
self._last_display_update = None
|
||||||
self._max_sample_rate = 0 # Track maximum sample rate seen
|
self.max_sample_rate = 0 # Track maximum sample rate seen
|
||||||
self._successful_samples = 0 # Track samples that captured frames
|
self.successful_samples = 0 # Track samples that captured frames
|
||||||
self._failed_samples = 0 # Track samples that failed to capture frames
|
self.failed_samples = 0 # Track samples that failed to capture frames
|
||||||
self._display_update_interval = DISPLAY_UPDATE_INTERVAL # Instance variable for display refresh rate
|
self.display_update_interval = DISPLAY_UPDATE_INTERVAL # Instance variable for display refresh rate
|
||||||
|
|
||||||
# Thread status statistics (bit flags)
|
# Thread status statistics (bit flags)
|
||||||
self._thread_status_counts = {
|
self.thread_status_counts = {
|
||||||
"has_gil": 0,
|
"has_gil": 0,
|
||||||
"on_cpu": 0,
|
"on_cpu": 0,
|
||||||
"gil_requested": 0,
|
"gil_requested": 0,
|
||||||
"unknown": 0,
|
"unknown": 0,
|
||||||
"total": 0, # Total thread count across all samples
|
"total": 0, # Total thread count across all samples
|
||||||
}
|
}
|
||||||
self._gc_frame_samples = 0 # Track samples with GC frames
|
self.gc_frame_samples = 0 # Track samples with GC frames
|
||||||
|
|
||||||
# Interactive controls state
|
# Interactive controls state
|
||||||
self.paused = False # Pause UI updates (profiling continues)
|
self.paused = False # Pause UI updates (profiling continues)
|
||||||
|
|
@ -174,10 +174,10 @@ def __init__(
|
||||||
self._path_prefixes = self._get_common_path_prefixes()
|
self._path_prefixes = self._get_common_path_prefixes()
|
||||||
|
|
||||||
# Widgets (initialized when display is available)
|
# Widgets (initialized when display is available)
|
||||||
self._header_widget = None
|
self.header_widget = None
|
||||||
self._table_widget = None
|
self.table_widget = None
|
||||||
self._footer_widget = None
|
self.footer_widget = None
|
||||||
self._help_widget = None
|
self.help_widget = None
|
||||||
|
|
||||||
# Color mode
|
# Color mode
|
||||||
self._can_colorize = _colorize.can_colorize()
|
self._can_colorize = _colorize.can_colorize()
|
||||||
|
|
@ -256,7 +256,7 @@ def _get_common_path_prefixes(self):
|
||||||
|
|
||||||
return prefixes
|
return prefixes
|
||||||
|
|
||||||
def _simplify_path(self, filepath):
|
def simplify_path(self, filepath):
|
||||||
"""Simplify a file path by removing common prefixes."""
|
"""Simplify a file path by removing common prefixes."""
|
||||||
# Try to match against known prefixes
|
# Try to match against known prefixes
|
||||||
for prefix_path in self._path_prefixes:
|
for prefix_path in self._path_prefixes:
|
||||||
|
|
@ -268,7 +268,7 @@ def _simplify_path(self, filepath):
|
||||||
# If no match, return the original path
|
# If no match, return the original path
|
||||||
return filepath
|
return filepath
|
||||||
|
|
||||||
def _process_frames(self, frames, thread_id=None):
|
def process_frames(self, frames, thread_id=None):
|
||||||
"""Process a single thread's frame stack.
|
"""Process a single thread's frame stack.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
|
|
@ -295,7 +295,7 @@ def _process_frames(self, frames, thread_id=None):
|
||||||
thread_data.result[top_location]["direct_calls"] += 1
|
thread_data.result[top_location]["direct_calls"] += 1
|
||||||
|
|
||||||
def collect_failed_sample(self):
|
def collect_failed_sample(self):
|
||||||
self._failed_samples += 1
|
self.failed_samples += 1
|
||||||
self.total_samples += 1
|
self.total_samples += 1
|
||||||
|
|
||||||
def collect(self, stack_frames):
|
def collect(self, stack_frames):
|
||||||
|
|
@ -349,7 +349,7 @@ def collect(self, stack_frames):
|
||||||
|
|
||||||
frames = getattr(thread_info, "frame_info", None)
|
frames = getattr(thread_info, "frame_info", None)
|
||||||
if frames:
|
if frames:
|
||||||
self._process_frames(frames, thread_id=thread_id)
|
self.process_frames(frames, thread_id=thread_id)
|
||||||
|
|
||||||
# Track thread IDs only for threads that actually have samples
|
# Track thread IDs only for threads that actually have samples
|
||||||
if (
|
if (
|
||||||
|
|
@ -375,12 +375,12 @@ def collect(self, stack_frames):
|
||||||
|
|
||||||
# Update cumulative thread status counts
|
# Update cumulative thread status counts
|
||||||
for key, count in temp_status_counts.items():
|
for key, count in temp_status_counts.items():
|
||||||
self._thread_status_counts[key] += count
|
self.thread_status_counts[key] += count
|
||||||
|
|
||||||
if has_gc_frame:
|
if has_gc_frame:
|
||||||
self._gc_frame_samples += 1
|
self.gc_frame_samples += 1
|
||||||
|
|
||||||
self._successful_samples += 1
|
self.successful_samples += 1
|
||||||
self.total_samples += 1
|
self.total_samples += 1
|
||||||
|
|
||||||
# Handle input on every sample for instant responsiveness
|
# Handle input on every sample for instant responsiveness
|
||||||
|
|
@ -393,7 +393,7 @@ def collect(self, stack_frames):
|
||||||
if (
|
if (
|
||||||
self._last_display_update is None
|
self._last_display_update is None
|
||||||
or (current_time - self._last_display_update)
|
or (current_time - self._last_display_update)
|
||||||
>= self._display_update_interval
|
>= self.display_update_interval
|
||||||
):
|
):
|
||||||
self._update_display()
|
self._update_display()
|
||||||
self._last_display_update = current_time
|
self._last_display_update = current_time
|
||||||
|
|
@ -401,7 +401,7 @@ def collect(self, stack_frames):
|
||||||
def _prepare_display_data(self, height):
|
def _prepare_display_data(self, height):
|
||||||
"""Prepare data for display rendering."""
|
"""Prepare data for display rendering."""
|
||||||
elapsed = self.elapsed_time
|
elapsed = self.elapsed_time
|
||||||
stats_list = self._build_stats_list()
|
stats_list = self.build_stats_list()
|
||||||
|
|
||||||
# Calculate available space for stats
|
# Calculate available space for stats
|
||||||
# Add extra lines for finished banner when in finished state
|
# Add extra lines for finished banner when in finished state
|
||||||
|
|
@ -422,15 +422,15 @@ def _prepare_display_data(self, height):
|
||||||
|
|
||||||
def _initialize_widgets(self, colors):
|
def _initialize_widgets(self, colors):
|
||||||
"""Initialize widgets with display and colors."""
|
"""Initialize widgets with display and colors."""
|
||||||
if self._header_widget is None:
|
if self.header_widget is None:
|
||||||
# Initialize trend tracker with colors
|
# Initialize trend tracker with colors
|
||||||
if self._trend_tracker is None:
|
if self._trend_tracker is None:
|
||||||
self._trend_tracker = TrendTracker(colors, enabled=True)
|
self._trend_tracker = TrendTracker(colors, enabled=True)
|
||||||
|
|
||||||
self._header_widget = HeaderWidget(self.display, colors, self)
|
self.header_widget = HeaderWidget(self.display, colors, self)
|
||||||
self._table_widget = TableWidget(self.display, colors, self)
|
self.table_widget = TableWidget(self.display, colors, self)
|
||||||
self._footer_widget = FooterWidget(self.display, colors, self)
|
self.footer_widget = FooterWidget(self.display, colors, self)
|
||||||
self._help_widget = HelpWidget(self.display, colors)
|
self.help_widget = HelpWidget(self.display, colors)
|
||||||
|
|
||||||
def _render_display_sections(
|
def _render_display_sections(
|
||||||
self, height, width, elapsed, stats_list, colors
|
self, height, width, elapsed, stats_list, colors
|
||||||
|
|
@ -442,12 +442,12 @@ def _render_display_sections(
|
||||||
self._initialize_widgets(colors)
|
self._initialize_widgets(colors)
|
||||||
|
|
||||||
# Render header
|
# Render header
|
||||||
line = self._header_widget.render(
|
line = self.header_widget.render(
|
||||||
line, width, elapsed=elapsed, stats_list=stats_list
|
line, width, elapsed=elapsed, stats_list=stats_list
|
||||||
)
|
)
|
||||||
|
|
||||||
# Render table
|
# Render table
|
||||||
line = self._table_widget.render(
|
line = self.table_widget.render(
|
||||||
line, width, height=height, stats_list=stats_list
|
line, width, height=height, stats_list=stats_list
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -473,7 +473,7 @@ def _update_display(self):
|
||||||
|
|
||||||
# Show help screen if requested
|
# Show help screen if requested
|
||||||
if self.show_help:
|
if self.show_help:
|
||||||
self._help_widget.render(0, width, height=height)
|
self.help_widget.render(0, width, height=height)
|
||||||
self.display.refresh()
|
self.display.refresh()
|
||||||
return
|
return
|
||||||
|
|
||||||
|
|
@ -486,11 +486,11 @@ def _update_display(self):
|
||||||
)
|
)
|
||||||
|
|
||||||
# Footer
|
# Footer
|
||||||
self._footer_widget.render(height - 2, width)
|
self.footer_widget.render(height - 2, width)
|
||||||
|
|
||||||
# Show filter input prompt if in filter input mode
|
# Show filter input prompt if in filter input mode
|
||||||
if self.filter_input_mode:
|
if self.filter_input_mode:
|
||||||
self._footer_widget.render_filter_input_prompt(
|
self.footer_widget.render_filter_input_prompt(
|
||||||
height - 1, width
|
height - 1, width
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -616,7 +616,7 @@ def _setup_colors(self):
|
||||||
"trend_stable": A_NORMAL,
|
"trend_stable": A_NORMAL,
|
||||||
}
|
}
|
||||||
|
|
||||||
def _build_stats_list(self):
|
def build_stats_list(self):
|
||||||
"""Build and sort the statistics list."""
|
"""Build and sort the statistics list."""
|
||||||
stats_list = []
|
stats_list = []
|
||||||
result_source = self._get_current_result_source()
|
result_source = self._get_current_result_source()
|
||||||
|
|
@ -707,17 +707,17 @@ def reset_stats(self):
|
||||||
self.view_mode = "ALL"
|
self.view_mode = "ALL"
|
||||||
self.current_thread_index = 0
|
self.current_thread_index = 0
|
||||||
self.total_samples = 0
|
self.total_samples = 0
|
||||||
self._successful_samples = 0
|
self.successful_samples = 0
|
||||||
self._failed_samples = 0
|
self.failed_samples = 0
|
||||||
self._max_sample_rate = 0
|
self.max_sample_rate = 0
|
||||||
self._thread_status_counts = {
|
self.thread_status_counts = {
|
||||||
"has_gil": 0,
|
"has_gil": 0,
|
||||||
"on_cpu": 0,
|
"on_cpu": 0,
|
||||||
"gil_requested": 0,
|
"gil_requested": 0,
|
||||||
"unknown": 0,
|
"unknown": 0,
|
||||||
"total": 0,
|
"total": 0,
|
||||||
}
|
}
|
||||||
self._gc_frame_samples = 0
|
self.gc_frame_samples = 0
|
||||||
# Clear trend tracking
|
# Clear trend tracking
|
||||||
if self._trend_tracker is not None:
|
if self._trend_tracker is not None:
|
||||||
self._trend_tracker.clear()
|
self._trend_tracker.clear()
|
||||||
|
|
@ -886,14 +886,14 @@ def _handle_input(self):
|
||||||
|
|
||||||
elif ch == ord("+") or ch == ord("="):
|
elif ch == ord("+") or ch == ord("="):
|
||||||
# Decrease update interval (faster refresh)
|
# Decrease update interval (faster refresh)
|
||||||
self._display_update_interval = max(
|
self.display_update_interval = max(
|
||||||
0.05, self._display_update_interval - 0.05
|
0.05, self.display_update_interval - 0.05
|
||||||
) # Min 20Hz
|
) # Min 20Hz
|
||||||
|
|
||||||
elif ch == ord("-") or ch == ord("_"):
|
elif ch == ord("-") or ch == ord("_"):
|
||||||
# Increase update interval (slower refresh)
|
# Increase update interval (slower refresh)
|
||||||
self._display_update_interval = min(
|
self.display_update_interval = min(
|
||||||
1.0, self._display_update_interval + 0.05
|
1.0, self.display_update_interval + 0.05
|
||||||
) # Max 1Hz
|
) # Max 1Hz
|
||||||
|
|
||||||
elif ch == ord("c") or ch == ord("C"):
|
elif ch == ord("c") or ch == ord("C"):
|
||||||
|
|
|
||||||
|
|
@ -180,7 +180,7 @@ def draw_header_info(self, line, width, elapsed):
|
||||||
|
|
||||||
# Calculate display refresh rate
|
# Calculate display refresh rate
|
||||||
refresh_hz = (
|
refresh_hz = (
|
||||||
1.0 / self.collector._display_update_interval if self.collector._display_update_interval > 0 else 0
|
1.0 / self.collector.display_update_interval if self.collector.display_update_interval > 0 else 0
|
||||||
)
|
)
|
||||||
|
|
||||||
# Get current view mode and thread display
|
# Get current view mode and thread display
|
||||||
|
|
@ -248,8 +248,8 @@ def draw_sample_stats(self, line, width, elapsed):
|
||||||
)
|
)
|
||||||
|
|
||||||
# Update max sample rate
|
# Update max sample rate
|
||||||
if sample_rate > self.collector._max_sample_rate:
|
if sample_rate > self.collector.max_sample_rate:
|
||||||
self.collector._max_sample_rate = sample_rate
|
self.collector.max_sample_rate = sample_rate
|
||||||
|
|
||||||
col = 0
|
col = 0
|
||||||
self.add_str(line, col, "Samples: ", curses.A_BOLD)
|
self.add_str(line, col, "Samples: ", curses.A_BOLD)
|
||||||
|
|
@ -308,11 +308,11 @@ def draw_sample_stats(self, line, width, elapsed):
|
||||||
def draw_efficiency_bar(self, line, width):
|
def draw_efficiency_bar(self, line, width):
|
||||||
"""Draw sample efficiency bar showing success/failure rates."""
|
"""Draw sample efficiency bar showing success/failure rates."""
|
||||||
success_pct = (
|
success_pct = (
|
||||||
self.collector._successful_samples
|
self.collector.successful_samples
|
||||||
/ max(1, self.collector.total_samples)
|
/ max(1, self.collector.total_samples)
|
||||||
) * 100
|
) * 100
|
||||||
failed_pct = (
|
failed_pct = (
|
||||||
self.collector._failed_samples
|
self.collector.failed_samples
|
||||||
/ max(1, self.collector.total_samples)
|
/ max(1, self.collector.total_samples)
|
||||||
) * 100
|
) * 100
|
||||||
|
|
||||||
|
|
@ -327,7 +327,7 @@ def draw_efficiency_bar(self, line, width):
|
||||||
bar_width = min(MAX_EFFICIENCY_BAR_WIDTH, available_width)
|
bar_width = min(MAX_EFFICIENCY_BAR_WIDTH, available_width)
|
||||||
success_fill = int(
|
success_fill = int(
|
||||||
(
|
(
|
||||||
self.collector._successful_samples
|
self.collector.successful_samples
|
||||||
/ max(1, self.collector.total_samples)
|
/ max(1, self.collector.total_samples)
|
||||||
)
|
)
|
||||||
* bar_width
|
* bar_width
|
||||||
|
|
@ -381,7 +381,7 @@ def draw_thread_status(self, line, width):
|
||||||
"""Draw thread status statistics and GC information."""
|
"""Draw thread status statistics and GC information."""
|
||||||
# Get status counts for current view mode
|
# Get status counts for current view mode
|
||||||
thread_data = self.collector._get_current_thread_data()
|
thread_data = self.collector._get_current_thread_data()
|
||||||
status_counts = thread_data.as_status_dict() if thread_data else self.collector._thread_status_counts
|
status_counts = thread_data.as_status_dict() if thread_data else self.collector.thread_status_counts
|
||||||
|
|
||||||
# Calculate percentages
|
# Calculate percentages
|
||||||
total_threads = max(1, status_counts["total"])
|
total_threads = max(1, status_counts["total"])
|
||||||
|
|
@ -395,7 +395,7 @@ def draw_thread_status(self, line, width):
|
||||||
pct_gc = (thread_data.gc_frame_samples / total_samples) * 100
|
pct_gc = (thread_data.gc_frame_samples / total_samples) * 100
|
||||||
else:
|
else:
|
||||||
total_samples = max(1, self.collector.total_samples)
|
total_samples = max(1, self.collector.total_samples)
|
||||||
pct_gc = (self.collector._gc_frame_samples / total_samples) * 100
|
pct_gc = (self.collector.gc_frame_samples / total_samples) * 100
|
||||||
|
|
||||||
col = 0
|
col = 0
|
||||||
self.add_str(line, col, "Threads: ", curses.A_BOLD)
|
self.add_str(line, col, "Threads: ", curses.A_BOLD)
|
||||||
|
|
@ -809,7 +809,7 @@ def get_trend_color(column_name):
|
||||||
|
|
||||||
# File:line column
|
# File:line column
|
||||||
if col < width - 10:
|
if col < width - 10:
|
||||||
simplified_path = self.collector._simplify_path(filename)
|
simplified_path = self.collector.simplify_path(filename)
|
||||||
file_line = f"{simplified_path}:{lineno}"
|
file_line = f"{simplified_path}:{lineno}"
|
||||||
remaining_width = width - col - 1
|
remaining_width = width - col - 1
|
||||||
self.add_str(
|
self.add_str(
|
||||||
|
|
|
||||||
|
|
@ -36,7 +36,7 @@ def test_simplify_stdlib_path(self):
|
||||||
if os_file:
|
if os_file:
|
||||||
stdlib_dir = os.path.dirname(os.path.abspath(os_file))
|
stdlib_dir = os.path.dirname(os.path.abspath(os_file))
|
||||||
test_path = os.path.join(stdlib_dir, "json", "decoder.py")
|
test_path = os.path.join(stdlib_dir, "json", "decoder.py")
|
||||||
simplified = collector._simplify_path(test_path)
|
simplified = collector.simplify_path(test_path)
|
||||||
# Should remove the stdlib prefix
|
# Should remove the stdlib prefix
|
||||||
self.assertNotIn(stdlib_dir, simplified)
|
self.assertNotIn(stdlib_dir, simplified)
|
||||||
self.assertIn("json", simplified)
|
self.assertIn("json", simplified)
|
||||||
|
|
@ -45,7 +45,7 @@ def test_simplify_unknown_path(self):
|
||||||
"""Test that unknown paths are returned unchanged."""
|
"""Test that unknown paths are returned unchanged."""
|
||||||
collector = LiveStatsCollector(1000)
|
collector = LiveStatsCollector(1000)
|
||||||
test_path = "/some/unknown/path/file.py"
|
test_path = "/some/unknown/path/file.py"
|
||||||
simplified = collector._simplify_path(test_path)
|
simplified = collector.simplify_path(test_path)
|
||||||
self.assertEqual(simplified, test_path)
|
self.assertEqual(simplified, test_path)
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -56,7 +56,7 @@ def test_process_single_frame(self):
|
||||||
"""Test processing a single frame."""
|
"""Test processing a single frame."""
|
||||||
collector = LiveStatsCollector(1000)
|
collector = LiveStatsCollector(1000)
|
||||||
frames = [MockFrameInfo("test.py", 10, "test_func")]
|
frames = [MockFrameInfo("test.py", 10, "test_func")]
|
||||||
collector._process_frames(frames)
|
collector.process_frames(frames)
|
||||||
|
|
||||||
location = ("test.py", 10, "test_func")
|
location = ("test.py", 10, "test_func")
|
||||||
self.assertEqual(collector.result[location]["direct_calls"], 1)
|
self.assertEqual(collector.result[location]["direct_calls"], 1)
|
||||||
|
|
@ -70,7 +70,7 @@ def test_process_multiple_frames(self):
|
||||||
MockFrameInfo("test.py", 20, "middle_func"),
|
MockFrameInfo("test.py", 20, "middle_func"),
|
||||||
MockFrameInfo("test.py", 30, "outer_func"),
|
MockFrameInfo("test.py", 30, "outer_func"),
|
||||||
]
|
]
|
||||||
collector._process_frames(frames)
|
collector.process_frames(frames)
|
||||||
|
|
||||||
# Top frame (inner_func) should have both direct and cumulative
|
# Top frame (inner_func) should have both direct and cumulative
|
||||||
inner_loc = ("test.py", 10, "inner_func")
|
inner_loc = ("test.py", 10, "inner_func")
|
||||||
|
|
@ -89,7 +89,7 @@ def test_process_multiple_frames(self):
|
||||||
def test_process_empty_frames(self):
|
def test_process_empty_frames(self):
|
||||||
"""Test processing empty frames list."""
|
"""Test processing empty frames list."""
|
||||||
collector = LiveStatsCollector(1000)
|
collector = LiveStatsCollector(1000)
|
||||||
collector._process_frames([])
|
collector.process_frames([])
|
||||||
# Should not raise an error and result should remain empty
|
# Should not raise an error and result should remain empty
|
||||||
self.assertEqual(len(collector.result), 0)
|
self.assertEqual(len(collector.result), 0)
|
||||||
|
|
||||||
|
|
@ -98,9 +98,9 @@ def test_process_frames_accumulation(self):
|
||||||
collector = LiveStatsCollector(1000)
|
collector = LiveStatsCollector(1000)
|
||||||
frames = [MockFrameInfo("test.py", 10, "test_func")]
|
frames = [MockFrameInfo("test.py", 10, "test_func")]
|
||||||
|
|
||||||
collector._process_frames(frames)
|
collector.process_frames(frames)
|
||||||
collector._process_frames(frames)
|
collector.process_frames(frames)
|
||||||
collector._process_frames(frames)
|
collector.process_frames(frames)
|
||||||
|
|
||||||
location = ("test.py", 10, "test_func")
|
location = ("test.py", 10, "test_func")
|
||||||
self.assertEqual(collector.result[location]["direct_calls"], 3)
|
self.assertEqual(collector.result[location]["direct_calls"], 3)
|
||||||
|
|
@ -112,7 +112,7 @@ def test_process_frames_with_thread_id(self):
|
||||||
frames = [MockFrameInfo("test.py", 10, "test_func")]
|
frames = [MockFrameInfo("test.py", 10, "test_func")]
|
||||||
|
|
||||||
# Process frames with thread_id
|
# Process frames with thread_id
|
||||||
collector._process_frames(frames, thread_id=123)
|
collector.process_frames(frames, thread_id=123)
|
||||||
|
|
||||||
# Check aggregated result
|
# Check aggregated result
|
||||||
location = ("test.py", 10, "test_func")
|
location = ("test.py", 10, "test_func")
|
||||||
|
|
@ -135,8 +135,8 @@ def test_process_frames_multiple_threads(self):
|
||||||
frames2 = [MockFrameInfo("test.py", 20, "other_func")]
|
frames2 = [MockFrameInfo("test.py", 20, "other_func")]
|
||||||
|
|
||||||
# Process frames from different threads
|
# Process frames from different threads
|
||||||
collector._process_frames(frames1, thread_id=123)
|
collector.process_frames(frames1, thread_id=123)
|
||||||
collector._process_frames(frames2, thread_id=456)
|
collector.process_frames(frames2, thread_id=456)
|
||||||
|
|
||||||
# Check that both threads have their own data
|
# Check that both threads have their own data
|
||||||
self.assertIn(123, collector.per_thread_data)
|
self.assertIn(123, collector.per_thread_data)
|
||||||
|
|
@ -199,8 +199,8 @@ def test_collect_with_frames(self):
|
||||||
|
|
||||||
location = ("test.py", 10, "test_func")
|
location = ("test.py", 10, "test_func")
|
||||||
self.assertEqual(collector.result[location]["direct_calls"], 1)
|
self.assertEqual(collector.result[location]["direct_calls"], 1)
|
||||||
self.assertEqual(collector._successful_samples, 1)
|
self.assertEqual(collector.successful_samples, 1)
|
||||||
self.assertEqual(collector._failed_samples, 0)
|
self.assertEqual(collector.failed_samples, 0)
|
||||||
|
|
||||||
def test_collect_with_empty_frames(self):
|
def test_collect_with_empty_frames(self):
|
||||||
"""Test collect with empty frames."""
|
"""Test collect with empty frames."""
|
||||||
|
|
@ -212,8 +212,8 @@ def test_collect_with_empty_frames(self):
|
||||||
collector.collect(stack_frames)
|
collector.collect(stack_frames)
|
||||||
|
|
||||||
# Empty frames still count as successful since collect() was called successfully
|
# Empty frames still count as successful since collect() was called successfully
|
||||||
self.assertEqual(collector._successful_samples, 1)
|
self.assertEqual(collector.successful_samples, 1)
|
||||||
self.assertEqual(collector._failed_samples, 0)
|
self.assertEqual(collector.failed_samples, 0)
|
||||||
|
|
||||||
def test_collect_skip_idle_threads(self):
|
def test_collect_skip_idle_threads(self):
|
||||||
"""Test that idle threads are skipped when skip_idle=True."""
|
"""Test that idle threads are skipped when skip_idle=True."""
|
||||||
|
|
@ -284,7 +284,7 @@ def setUp(self):
|
||||||
|
|
||||||
def test_build_stats_list(self):
|
def test_build_stats_list(self):
|
||||||
"""Test that stats list is built correctly."""
|
"""Test that stats list is built correctly."""
|
||||||
stats_list = self.collector._build_stats_list()
|
stats_list = self.collector.build_stats_list()
|
||||||
self.assertEqual(len(stats_list), 3)
|
self.assertEqual(len(stats_list), 3)
|
||||||
|
|
||||||
# Check that all expected keys are present
|
# Check that all expected keys are present
|
||||||
|
|
@ -298,7 +298,7 @@ def test_build_stats_list(self):
|
||||||
def test_sort_by_nsamples(self):
|
def test_sort_by_nsamples(self):
|
||||||
"""Test sorting by number of samples."""
|
"""Test sorting by number of samples."""
|
||||||
self.collector.sort_by = "nsamples"
|
self.collector.sort_by = "nsamples"
|
||||||
stats_list = self.collector._build_stats_list()
|
stats_list = self.collector.build_stats_list()
|
||||||
|
|
||||||
# Should be sorted by direct_calls descending
|
# Should be sorted by direct_calls descending
|
||||||
self.assertEqual(stats_list[0]["func"][2], "func1") # 100 samples
|
self.assertEqual(stats_list[0]["func"][2], "func1") # 100 samples
|
||||||
|
|
@ -308,7 +308,7 @@ def test_sort_by_nsamples(self):
|
||||||
def test_sort_by_tottime(self):
|
def test_sort_by_tottime(self):
|
||||||
"""Test sorting by total time."""
|
"""Test sorting by total time."""
|
||||||
self.collector.sort_by = "tottime"
|
self.collector.sort_by = "tottime"
|
||||||
stats_list = self.collector._build_stats_list()
|
stats_list = self.collector.build_stats_list()
|
||||||
|
|
||||||
# Should be sorted by total_time descending
|
# Should be sorted by total_time descending
|
||||||
# total_time = direct_calls * sample_interval_sec
|
# total_time = direct_calls * sample_interval_sec
|
||||||
|
|
@ -319,7 +319,7 @@ def test_sort_by_tottime(self):
|
||||||
def test_sort_by_cumtime(self):
|
def test_sort_by_cumtime(self):
|
||||||
"""Test sorting by cumulative time."""
|
"""Test sorting by cumulative time."""
|
||||||
self.collector.sort_by = "cumtime"
|
self.collector.sort_by = "cumtime"
|
||||||
stats_list = self.collector._build_stats_list()
|
stats_list = self.collector.build_stats_list()
|
||||||
|
|
||||||
# Should be sorted by cumulative_time descending
|
# Should be sorted by cumulative_time descending
|
||||||
self.assertEqual(stats_list[0]["func"][2], "func2") # 200 cumulative
|
self.assertEqual(stats_list[0]["func"][2], "func2") # 200 cumulative
|
||||||
|
|
@ -329,7 +329,7 @@ def test_sort_by_cumtime(self):
|
||||||
def test_sort_by_sample_pct(self):
|
def test_sort_by_sample_pct(self):
|
||||||
"""Test sorting by sample percentage."""
|
"""Test sorting by sample percentage."""
|
||||||
self.collector.sort_by = "sample_pct"
|
self.collector.sort_by = "sample_pct"
|
||||||
stats_list = self.collector._build_stats_list()
|
stats_list = self.collector.build_stats_list()
|
||||||
|
|
||||||
# Should be sorted by percentage of direct_calls
|
# Should be sorted by percentage of direct_calls
|
||||||
self.assertEqual(stats_list[0]["func"][2], "func1") # 33.3%
|
self.assertEqual(stats_list[0]["func"][2], "func1") # 33.3%
|
||||||
|
|
@ -339,7 +339,7 @@ def test_sort_by_sample_pct(self):
|
||||||
def test_sort_by_cumul_pct(self):
|
def test_sort_by_cumul_pct(self):
|
||||||
"""Test sorting by cumulative percentage."""
|
"""Test sorting by cumulative percentage."""
|
||||||
self.collector.sort_by = "cumul_pct"
|
self.collector.sort_by = "cumul_pct"
|
||||||
stats_list = self.collector._build_stats_list()
|
stats_list = self.collector.build_stats_list()
|
||||||
|
|
||||||
# Should be sorted by percentage of cumulative_calls
|
# Should be sorted by percentage of cumulative_calls
|
||||||
self.assertEqual(stats_list[0]["func"][2], "func2") # 66.7%
|
self.assertEqual(stats_list[0]["func"][2], "func2") # 66.7%
|
||||||
|
|
@ -438,14 +438,14 @@ def test_format_uptime_seconds(self):
|
||||||
collector = LiveStatsCollector(1000, display=MockDisplay())
|
collector = LiveStatsCollector(1000, display=MockDisplay())
|
||||||
colors = collector._setup_colors()
|
colors = collector._setup_colors()
|
||||||
collector._initialize_widgets(colors)
|
collector._initialize_widgets(colors)
|
||||||
self.assertEqual(collector._header_widget.format_uptime(45), "0m45s")
|
self.assertEqual(collector.header_widget.format_uptime(45), "0m45s")
|
||||||
|
|
||||||
def test_format_uptime_minutes(self):
|
def test_format_uptime_minutes(self):
|
||||||
"""Test uptime formatting for minutes."""
|
"""Test uptime formatting for minutes."""
|
||||||
collector = LiveStatsCollector(1000, display=MockDisplay())
|
collector = LiveStatsCollector(1000, display=MockDisplay())
|
||||||
colors = collector._setup_colors()
|
colors = collector._setup_colors()
|
||||||
collector._initialize_widgets(colors)
|
collector._initialize_widgets(colors)
|
||||||
self.assertEqual(collector._header_widget.format_uptime(125), "2m05s")
|
self.assertEqual(collector.header_widget.format_uptime(125), "2m05s")
|
||||||
|
|
||||||
def test_format_uptime_hours(self):
|
def test_format_uptime_hours(self):
|
||||||
"""Test uptime formatting for hours."""
|
"""Test uptime formatting for hours."""
|
||||||
|
|
@ -453,7 +453,7 @@ def test_format_uptime_hours(self):
|
||||||
colors = collector._setup_colors()
|
colors = collector._setup_colors()
|
||||||
collector._initialize_widgets(colors)
|
collector._initialize_widgets(colors)
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
collector._header_widget.format_uptime(3661), "1h01m01s"
|
collector.header_widget.format_uptime(3661), "1h01m01s"
|
||||||
)
|
)
|
||||||
|
|
||||||
def test_format_uptime_large_values(self):
|
def test_format_uptime_large_values(self):
|
||||||
|
|
@ -462,7 +462,7 @@ def test_format_uptime_large_values(self):
|
||||||
colors = collector._setup_colors()
|
colors = collector._setup_colors()
|
||||||
collector._initialize_widgets(colors)
|
collector._initialize_widgets(colors)
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
collector._header_widget.format_uptime(86400), "24h00m00s"
|
collector.header_widget.format_uptime(86400), "24h00m00s"
|
||||||
)
|
)
|
||||||
|
|
||||||
def test_format_uptime_zero(self):
|
def test_format_uptime_zero(self):
|
||||||
|
|
@ -470,7 +470,7 @@ def test_format_uptime_zero(self):
|
||||||
collector = LiveStatsCollector(1000, display=MockDisplay())
|
collector = LiveStatsCollector(1000, display=MockDisplay())
|
||||||
colors = collector._setup_colors()
|
colors = collector._setup_colors()
|
||||||
collector._initialize_widgets(colors)
|
collector._initialize_widgets(colors)
|
||||||
self.assertEqual(collector._header_widget.format_uptime(0), "0m00s")
|
self.assertEqual(collector.header_widget.format_uptime(0), "0m00s")
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|
|
||||||
|
|
@ -35,7 +35,7 @@ def setUp(self):
|
||||||
)
|
)
|
||||||
self.collector.start_time = time.perf_counter()
|
self.collector.start_time = time.perf_counter()
|
||||||
# Set a consistent display update interval for tests
|
# Set a consistent display update interval for tests
|
||||||
self.collector._display_update_interval = 0.1
|
self.collector.display_update_interval = 0.1
|
||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
"""Clean up after test."""
|
"""Clean up after test."""
|
||||||
|
|
@ -92,8 +92,8 @@ def test_reset_stats(self):
|
||||||
"""Test reset statistics functionality."""
|
"""Test reset statistics functionality."""
|
||||||
# Add some stats
|
# Add some stats
|
||||||
self.collector.total_samples = 100
|
self.collector.total_samples = 100
|
||||||
self.collector._successful_samples = 90
|
self.collector.successful_samples = 90
|
||||||
self.collector._failed_samples = 10
|
self.collector.failed_samples = 10
|
||||||
self.collector.result[("test.py", 1, "func")] = {
|
self.collector.result[("test.py", 1, "func")] = {
|
||||||
"direct_calls": 50,
|
"direct_calls": 50,
|
||||||
"cumulative_calls": 75,
|
"cumulative_calls": 75,
|
||||||
|
|
@ -104,51 +104,51 @@ def test_reset_stats(self):
|
||||||
self.collector.reset_stats()
|
self.collector.reset_stats()
|
||||||
|
|
||||||
self.assertEqual(self.collector.total_samples, 0)
|
self.assertEqual(self.collector.total_samples, 0)
|
||||||
self.assertEqual(self.collector._successful_samples, 0)
|
self.assertEqual(self.collector.successful_samples, 0)
|
||||||
self.assertEqual(self.collector._failed_samples, 0)
|
self.assertEqual(self.collector.failed_samples, 0)
|
||||||
self.assertEqual(len(self.collector.result), 0)
|
self.assertEqual(len(self.collector.result), 0)
|
||||||
|
|
||||||
def test_increase_refresh_rate(self):
|
def test_increase_refresh_rate(self):
|
||||||
"""Test increasing refresh rate (faster updates)."""
|
"""Test increasing refresh rate (faster updates)."""
|
||||||
initial_interval = self.collector._display_update_interval
|
initial_interval = self.collector.display_update_interval
|
||||||
|
|
||||||
# Simulate '+' key press (faster = smaller interval)
|
# Simulate '+' key press (faster = smaller interval)
|
||||||
self.display.simulate_input(ord("+"))
|
self.display.simulate_input(ord("+"))
|
||||||
self.collector._handle_input()
|
self.collector._handle_input()
|
||||||
|
|
||||||
self.assertLess(self.collector._display_update_interval, initial_interval)
|
self.assertLess(self.collector.display_update_interval, initial_interval)
|
||||||
|
|
||||||
def test_decrease_refresh_rate(self):
|
def test_decrease_refresh_rate(self):
|
||||||
"""Test decreasing refresh rate (slower updates)."""
|
"""Test decreasing refresh rate (slower updates)."""
|
||||||
initial_interval = self.collector._display_update_interval
|
initial_interval = self.collector.display_update_interval
|
||||||
|
|
||||||
# Simulate '-' key press (slower = larger interval)
|
# Simulate '-' key press (slower = larger interval)
|
||||||
self.display.simulate_input(ord("-"))
|
self.display.simulate_input(ord("-"))
|
||||||
self.collector._handle_input()
|
self.collector._handle_input()
|
||||||
|
|
||||||
self.assertGreater(self.collector._display_update_interval, initial_interval)
|
self.assertGreater(self.collector.display_update_interval, initial_interval)
|
||||||
|
|
||||||
def test_refresh_rate_minimum(self):
|
def test_refresh_rate_minimum(self):
|
||||||
"""Test that refresh rate has a minimum (max speed)."""
|
"""Test that refresh rate has a minimum (max speed)."""
|
||||||
self.collector._display_update_interval = 0.05 # Set to minimum
|
self.collector.display_update_interval = 0.05 # Set to minimum
|
||||||
|
|
||||||
# Try to go faster
|
# Try to go faster
|
||||||
self.display.simulate_input(ord("+"))
|
self.display.simulate_input(ord("+"))
|
||||||
self.collector._handle_input()
|
self.collector._handle_input()
|
||||||
|
|
||||||
# Should stay at minimum
|
# Should stay at minimum
|
||||||
self.assertEqual(self.collector._display_update_interval, 0.05)
|
self.assertEqual(self.collector.display_update_interval, 0.05)
|
||||||
|
|
||||||
def test_refresh_rate_maximum(self):
|
def test_refresh_rate_maximum(self):
|
||||||
"""Test that refresh rate has a maximum (min speed)."""
|
"""Test that refresh rate has a maximum (min speed)."""
|
||||||
self.collector._display_update_interval = 1.0 # Set to maximum
|
self.collector.display_update_interval = 1.0 # Set to maximum
|
||||||
|
|
||||||
# Try to go slower
|
# Try to go slower
|
||||||
self.display.simulate_input(ord("-"))
|
self.display.simulate_input(ord("-"))
|
||||||
self.collector._handle_input()
|
self.collector._handle_input()
|
||||||
|
|
||||||
# Should stay at maximum
|
# Should stay at maximum
|
||||||
self.assertEqual(self.collector._display_update_interval, 1.0)
|
self.assertEqual(self.collector.display_update_interval, 1.0)
|
||||||
|
|
||||||
def test_help_toggle(self):
|
def test_help_toggle(self):
|
||||||
"""Test help screen toggle."""
|
"""Test help screen toggle."""
|
||||||
|
|
@ -276,23 +276,23 @@ def test_filter_clear_uppercase(self):
|
||||||
|
|
||||||
def test_increase_refresh_rate_with_equals(self):
|
def test_increase_refresh_rate_with_equals(self):
|
||||||
"""Test increasing refresh rate with '=' key."""
|
"""Test increasing refresh rate with '=' key."""
|
||||||
initial_interval = self.collector._display_update_interval
|
initial_interval = self.collector.display_update_interval
|
||||||
|
|
||||||
# Simulate '=' key press (alternative to '+')
|
# Simulate '=' key press (alternative to '+')
|
||||||
self.display.simulate_input(ord("="))
|
self.display.simulate_input(ord("="))
|
||||||
self.collector._handle_input()
|
self.collector._handle_input()
|
||||||
|
|
||||||
self.assertLess(self.collector._display_update_interval, initial_interval)
|
self.assertLess(self.collector.display_update_interval, initial_interval)
|
||||||
|
|
||||||
def test_decrease_refresh_rate_with_underscore(self):
|
def test_decrease_refresh_rate_with_underscore(self):
|
||||||
"""Test decreasing refresh rate with '_' key."""
|
"""Test decreasing refresh rate with '_' key."""
|
||||||
initial_interval = self.collector._display_update_interval
|
initial_interval = self.collector.display_update_interval
|
||||||
|
|
||||||
# Simulate '_' key press (alternative to '-')
|
# Simulate '_' key press (alternative to '-')
|
||||||
self.display.simulate_input(ord("_"))
|
self.display.simulate_input(ord("_"))
|
||||||
self.collector._handle_input()
|
self.collector._handle_input()
|
||||||
|
|
||||||
self.assertGreater(self.collector._display_update_interval, initial_interval)
|
self.assertGreater(self.collector.display_update_interval, initial_interval)
|
||||||
|
|
||||||
def test_finished_state_displays_banner(self):
|
def test_finished_state_displays_banner(self):
|
||||||
"""Test that finished state shows prominent banner."""
|
"""Test that finished state shows prominent banner."""
|
||||||
|
|
@ -431,7 +431,7 @@ def test_filter_by_filename(self):
|
||||||
"""Test filtering by filename pattern."""
|
"""Test filtering by filename pattern."""
|
||||||
self.collector.filter_pattern = "models"
|
self.collector.filter_pattern = "models"
|
||||||
|
|
||||||
stats_list = self.collector._build_stats_list()
|
stats_list = self.collector.build_stats_list()
|
||||||
|
|
||||||
# Only models.py should be included
|
# Only models.py should be included
|
||||||
self.assertEqual(len(stats_list), 1)
|
self.assertEqual(len(stats_list), 1)
|
||||||
|
|
@ -441,7 +441,7 @@ def test_filter_by_function_name(self):
|
||||||
"""Test filtering by function name."""
|
"""Test filtering by function name."""
|
||||||
self.collector.filter_pattern = "render"
|
self.collector.filter_pattern = "render"
|
||||||
|
|
||||||
stats_list = self.collector._build_stats_list()
|
stats_list = self.collector.build_stats_list()
|
||||||
|
|
||||||
self.assertEqual(len(stats_list), 1)
|
self.assertEqual(len(stats_list), 1)
|
||||||
self.assertEqual(stats_list[0]["func"][2], "render")
|
self.assertEqual(stats_list[0]["func"][2], "render")
|
||||||
|
|
@ -450,7 +450,7 @@ def test_filter_case_insensitive(self):
|
||||||
"""Test that filtering is case-insensitive."""
|
"""Test that filtering is case-insensitive."""
|
||||||
self.collector.filter_pattern = "MODELS"
|
self.collector.filter_pattern = "MODELS"
|
||||||
|
|
||||||
stats_list = self.collector._build_stats_list()
|
stats_list = self.collector.build_stats_list()
|
||||||
|
|
||||||
# Should still match models.py
|
# Should still match models.py
|
||||||
self.assertEqual(len(stats_list), 1)
|
self.assertEqual(len(stats_list), 1)
|
||||||
|
|
@ -459,7 +459,7 @@ def test_filter_substring_matching(self):
|
||||||
"""Test substring filtering."""
|
"""Test substring filtering."""
|
||||||
self.collector.filter_pattern = "app/"
|
self.collector.filter_pattern = "app/"
|
||||||
|
|
||||||
stats_list = self.collector._build_stats_list()
|
stats_list = self.collector.build_stats_list()
|
||||||
|
|
||||||
# Should match both app files
|
# Should match both app files
|
||||||
self.assertEqual(len(stats_list), 2)
|
self.assertEqual(len(stats_list), 2)
|
||||||
|
|
@ -468,7 +468,7 @@ def test_no_filter(self):
|
||||||
"""Test with no filter applied."""
|
"""Test with no filter applied."""
|
||||||
self.collector.filter_pattern = None
|
self.collector.filter_pattern = None
|
||||||
|
|
||||||
stats_list = self.collector._build_stats_list()
|
stats_list = self.collector.build_stats_list()
|
||||||
|
|
||||||
# All items should be included
|
# All items should be included
|
||||||
self.assertEqual(len(stats_list), 3)
|
self.assertEqual(len(stats_list), 3)
|
||||||
|
|
@ -477,7 +477,7 @@ def test_filter_partial_function_name(self):
|
||||||
"""Test filtering by partial function name."""
|
"""Test filtering by partial function name."""
|
||||||
self.collector.filter_pattern = "save"
|
self.collector.filter_pattern = "save"
|
||||||
|
|
||||||
stats_list = self.collector._build_stats_list()
|
stats_list = self.collector.build_stats_list()
|
||||||
|
|
||||||
self.assertEqual(len(stats_list), 1)
|
self.assertEqual(len(stats_list), 1)
|
||||||
self.assertEqual(stats_list[0]["func"][2], "save")
|
self.assertEqual(stats_list[0]["func"][2], "save")
|
||||||
|
|
@ -486,7 +486,7 @@ def test_filter_combined_filename_funcname(self):
|
||||||
"""Test filtering matches filename:funcname pattern."""
|
"""Test filtering matches filename:funcname pattern."""
|
||||||
self.collector.filter_pattern = "views.py:render"
|
self.collector.filter_pattern = "views.py:render"
|
||||||
|
|
||||||
stats_list = self.collector._build_stats_list()
|
stats_list = self.collector.build_stats_list()
|
||||||
|
|
||||||
# Should match the combined pattern
|
# Should match the combined pattern
|
||||||
self.assertEqual(len(stats_list), 1)
|
self.assertEqual(len(stats_list), 1)
|
||||||
|
|
@ -496,7 +496,7 @@ def test_filter_no_matches(self):
|
||||||
"""Test filter that matches nothing."""
|
"""Test filter that matches nothing."""
|
||||||
self.collector.filter_pattern = "nonexistent"
|
self.collector.filter_pattern = "nonexistent"
|
||||||
|
|
||||||
stats_list = self.collector._build_stats_list()
|
stats_list = self.collector.build_stats_list()
|
||||||
|
|
||||||
self.assertEqual(len(stats_list), 0)
|
self.assertEqual(len(stats_list), 0)
|
||||||
|
|
||||||
|
|
@ -822,7 +822,7 @@ def test_arrow_keys_switch_to_per_thread_mode(self):
|
||||||
|
|
||||||
def test_stats_list_in_all_mode(self):
|
def test_stats_list_in_all_mode(self):
|
||||||
"""Test that stats list uses aggregated data in ALL mode."""
|
"""Test that stats list uses aggregated data in ALL mode."""
|
||||||
stats_list = self.collector._build_stats_list()
|
stats_list = self.collector.build_stats_list()
|
||||||
|
|
||||||
# Should have all 3 functions
|
# Should have all 3 functions
|
||||||
self.assertEqual(len(stats_list), 3)
|
self.assertEqual(len(stats_list), 3)
|
||||||
|
|
@ -835,7 +835,7 @@ def test_stats_list_in_per_thread_mode(self):
|
||||||
self.collector.view_mode = "PER_THREAD"
|
self.collector.view_mode = "PER_THREAD"
|
||||||
self.collector.current_thread_index = 0 # First thread (111)
|
self.collector.current_thread_index = 0 # First thread (111)
|
||||||
|
|
||||||
stats_list = self.collector._build_stats_list()
|
stats_list = self.collector.build_stats_list()
|
||||||
|
|
||||||
# Should only have func1 from thread 111
|
# Should only have func1 from thread 111
|
||||||
self.assertEqual(len(stats_list), 1)
|
self.assertEqual(len(stats_list), 1)
|
||||||
|
|
@ -847,19 +847,19 @@ def test_stats_list_switches_with_thread_navigation(self):
|
||||||
|
|
||||||
# Thread 0 (111) -> func1
|
# Thread 0 (111) -> func1
|
||||||
self.collector.current_thread_index = 0
|
self.collector.current_thread_index = 0
|
||||||
stats_list = self.collector._build_stats_list()
|
stats_list = self.collector.build_stats_list()
|
||||||
self.assertEqual(len(stats_list), 1)
|
self.assertEqual(len(stats_list), 1)
|
||||||
self.assertEqual(stats_list[0]["func"][2], "func1")
|
self.assertEqual(stats_list[0]["func"][2], "func1")
|
||||||
|
|
||||||
# Thread 1 (222) -> func2
|
# Thread 1 (222) -> func2
|
||||||
self.collector.current_thread_index = 1
|
self.collector.current_thread_index = 1
|
||||||
stats_list = self.collector._build_stats_list()
|
stats_list = self.collector.build_stats_list()
|
||||||
self.assertEqual(len(stats_list), 1)
|
self.assertEqual(len(stats_list), 1)
|
||||||
self.assertEqual(stats_list[0]["func"][2], "func2")
|
self.assertEqual(stats_list[0]["func"][2], "func2")
|
||||||
|
|
||||||
# Thread 2 (333) -> func3
|
# Thread 2 (333) -> func3
|
||||||
self.collector.current_thread_index = 2
|
self.collector.current_thread_index = 2
|
||||||
stats_list = self.collector._build_stats_list()
|
stats_list = self.collector.build_stats_list()
|
||||||
self.assertEqual(len(stats_list), 1)
|
self.assertEqual(len(stats_list), 1)
|
||||||
self.assertEqual(stats_list[0]["func"][2], "func3")
|
self.assertEqual(stats_list[0]["func"][2], "func3")
|
||||||
|
|
||||||
|
|
@ -1036,8 +1036,8 @@ def test_display_uses_per_thread_stats_in_per_thread_mode(self):
|
||||||
|
|
||||||
# In ALL mode, should show mixed stats (50% on GIL, 50% off GIL)
|
# In ALL mode, should show mixed stats (50% on GIL, 50% off GIL)
|
||||||
self.assertEqual(collector.view_mode, "ALL")
|
self.assertEqual(collector.view_mode, "ALL")
|
||||||
total_has_gil = collector._thread_status_counts["has_gil"]
|
total_has_gil = collector.thread_status_counts["has_gil"]
|
||||||
total_threads = collector._thread_status_counts["total"]
|
total_threads = collector.thread_status_counts["total"]
|
||||||
self.assertEqual(total_has_gil, 10) # Only thread 111 has GIL
|
self.assertEqual(total_has_gil, 10) # Only thread 111 has GIL
|
||||||
self.assertEqual(total_threads, 20) # 10 samples * 2 threads
|
self.assertEqual(total_threads, 20) # 10 samples * 2 threads
|
||||||
|
|
||||||
|
|
@ -1082,7 +1082,7 @@ def test_display_uses_per_thread_gc_stats_in_per_thread_mode(self):
|
||||||
|
|
||||||
# Check aggregated GC stats (ALL mode)
|
# Check aggregated GC stats (ALL mode)
|
||||||
# 2 GC samples out of 10 total = 20%
|
# 2 GC samples out of 10 total = 20%
|
||||||
self.assertEqual(collector._gc_frame_samples, 2)
|
self.assertEqual(collector.gc_frame_samples, 2)
|
||||||
self.assertEqual(collector.total_samples, 5) # 5 collect() calls
|
self.assertEqual(collector.total_samples, 5) # 5 collect() calls
|
||||||
|
|
||||||
# Check per-thread GC stats
|
# Check per-thread GC stats
|
||||||
|
|
|
||||||
|
|
@ -68,20 +68,20 @@ def test_handle_input_sort_cycle(self):
|
||||||
def test_draw_methods_with_mock_display(self):
|
def test_draw_methods_with_mock_display(self):
|
||||||
"""Test that draw methods write to mock display."""
|
"""Test that draw methods write to mock display."""
|
||||||
self.collector.total_samples = 500
|
self.collector.total_samples = 500
|
||||||
self.collector._successful_samples = 450
|
self.collector.successful_samples = 450
|
||||||
self.collector._failed_samples = 50
|
self.collector.failed_samples = 50
|
||||||
|
|
||||||
colors = self.collector._setup_colors()
|
colors = self.collector._setup_colors()
|
||||||
self.collector._initialize_widgets(colors)
|
self.collector._initialize_widgets(colors)
|
||||||
|
|
||||||
# Test individual widget methods
|
# Test individual widget methods
|
||||||
line = self.collector._header_widget.draw_header_info(0, 160, 100.5)
|
line = self.collector.header_widget.draw_header_info(0, 160, 100.5)
|
||||||
self.assertEqual(line, 2) # Title + header info line
|
self.assertEqual(line, 2) # Title + header info line
|
||||||
self.assertGreater(len(self.mock_display.buffer), 0)
|
self.assertGreater(len(self.mock_display.buffer), 0)
|
||||||
|
|
||||||
# Clear buffer and test next method
|
# Clear buffer and test next method
|
||||||
self.mock_display.buffer.clear()
|
self.mock_display.buffer.clear()
|
||||||
line = self.collector._header_widget.draw_sample_stats(0, 160, 10.0)
|
line = self.collector.header_widget.draw_sample_stats(0, 160, 10.0)
|
||||||
self.assertEqual(line, 1)
|
self.assertEqual(line, 1)
|
||||||
self.assertGreater(len(self.mock_display.buffer), 0)
|
self.assertGreater(len(self.mock_display.buffer), 0)
|
||||||
|
|
||||||
|
|
@ -100,8 +100,8 @@ def test_full_display_rendering_with_data(self):
|
||||||
"""Test complete display rendering with realistic data."""
|
"""Test complete display rendering with realistic data."""
|
||||||
# Add multiple functions with different call counts
|
# Add multiple functions with different call counts
|
||||||
self.collector.total_samples = 1000
|
self.collector.total_samples = 1000
|
||||||
self.collector._successful_samples = 950
|
self.collector.successful_samples = 950
|
||||||
self.collector._failed_samples = 50
|
self.collector.failed_samples = 50
|
||||||
|
|
||||||
self.collector.result[("app.py", 10, "main")] = {
|
self.collector.result[("app.py", 10, "main")] = {
|
||||||
"direct_calls": 100,
|
"direct_calls": 100,
|
||||||
|
|
@ -135,12 +135,12 @@ def test_full_display_rendering_with_data(self):
|
||||||
def test_efficiency_bar_visualization(self):
|
def test_efficiency_bar_visualization(self):
|
||||||
"""Test that efficiency bar shows correct proportions."""
|
"""Test that efficiency bar shows correct proportions."""
|
||||||
self.collector.total_samples = 100
|
self.collector.total_samples = 100
|
||||||
self.collector._successful_samples = 75
|
self.collector.successful_samples = 75
|
||||||
self.collector._failed_samples = 25
|
self.collector.failed_samples = 25
|
||||||
|
|
||||||
colors = self.collector._setup_colors()
|
colors = self.collector._setup_colors()
|
||||||
self.collector._initialize_widgets(colors)
|
self.collector._initialize_widgets(colors)
|
||||||
self.collector._header_widget.draw_efficiency_bar(0, 160)
|
self.collector.header_widget.draw_efficiency_bar(0, 160)
|
||||||
|
|
||||||
# Check that something was drawn to the display
|
# Check that something was drawn to the display
|
||||||
self.assertGreater(len(self.mock_display.buffer), 0)
|
self.assertGreater(len(self.mock_display.buffer), 0)
|
||||||
|
|
@ -170,7 +170,7 @@ def test_stats_display_with_different_sort_modes(self):
|
||||||
self.mock_display.buffer.clear()
|
self.mock_display.buffer.clear()
|
||||||
self.collector.sort_by = sort_mode
|
self.collector.sort_by = sort_mode
|
||||||
|
|
||||||
stats_list = self.collector._build_stats_list()
|
stats_list = self.collector.build_stats_list()
|
||||||
self.assertEqual(len(stats_list), 2)
|
self.assertEqual(len(stats_list), 2)
|
||||||
|
|
||||||
# Verify sorting worked (func_b should be first for most modes)
|
# Verify sorting worked (func_b should be first for most modes)
|
||||||
|
|
@ -186,7 +186,7 @@ def test_narrow_terminal_column_hiding(self):
|
||||||
colors = collector._setup_colors()
|
colors = collector._setup_colors()
|
||||||
collector._initialize_widgets(colors)
|
collector._initialize_widgets(colors)
|
||||||
line, show_sample_pct, show_tottime, show_cumul_pct, show_cumtime = (
|
line, show_sample_pct, show_tottime, show_cumul_pct, show_cumtime = (
|
||||||
collector._table_widget.draw_column_headers(0, 70)
|
collector.table_widget.draw_column_headers(0, 70)
|
||||||
)
|
)
|
||||||
|
|
||||||
# On narrow terminal, some columns should be hidden
|
# On narrow terminal, some columns should be hidden
|
||||||
|
|
@ -204,7 +204,7 @@ def test_very_narrow_terminal_minimal_columns(self):
|
||||||
colors = collector._setup_colors()
|
colors = collector._setup_colors()
|
||||||
collector._initialize_widgets(colors)
|
collector._initialize_widgets(colors)
|
||||||
line, show_sample_pct, show_tottime, show_cumul_pct, show_cumtime = (
|
line, show_sample_pct, show_tottime, show_cumul_pct, show_cumtime = (
|
||||||
collector._table_widget.draw_column_headers(0, 60)
|
collector.table_widget.draw_column_headers(0, 60)
|
||||||
)
|
)
|
||||||
|
|
||||||
# Very narrow should hide even more columns
|
# Very narrow should hide even more columns
|
||||||
|
|
@ -252,9 +252,9 @@ def test_top_functions_display(self):
|
||||||
|
|
||||||
colors = self.collector._setup_colors()
|
colors = self.collector._setup_colors()
|
||||||
self.collector._initialize_widgets(colors)
|
self.collector._initialize_widgets(colors)
|
||||||
stats_list = self.collector._build_stats_list()
|
stats_list = self.collector.build_stats_list()
|
||||||
|
|
||||||
self.collector._header_widget.draw_top_functions(0, 160, stats_list)
|
self.collector.header_widget.draw_top_functions(0, 160, stats_list)
|
||||||
|
|
||||||
# Top functions section should have written something
|
# Top functions section should have written something
|
||||||
self.assertGreater(len(self.mock_display.buffer), 0)
|
self.assertGreater(len(self.mock_display.buffer), 0)
|
||||||
|
|
@ -334,7 +334,7 @@ def test_add_str_with_mock_display(self):
|
||||||
colors = collector._setup_colors()
|
colors = collector._setup_colors()
|
||||||
collector._initialize_widgets(colors)
|
collector._initialize_widgets(colors)
|
||||||
|
|
||||||
collector._header_widget.add_str(5, 10, "Test", 0)
|
collector.header_widget.add_str(5, 10, "Test", 0)
|
||||||
# Verify it was added to the buffer
|
# Verify it was added to the buffer
|
||||||
self.assertIn((5, 10), mock_display.buffer)
|
self.assertIn((5, 10), mock_display.buffer)
|
||||||
|
|
||||||
|
|
@ -444,7 +444,7 @@ def test_draw_header_info(self):
|
||||||
}
|
}
|
||||||
self.collector._initialize_widgets(colors)
|
self.collector._initialize_widgets(colors)
|
||||||
|
|
||||||
line = self.collector._header_widget.draw_header_info(0, 160, 100.5)
|
line = self.collector.header_widget.draw_header_info(0, 160, 100.5)
|
||||||
self.assertEqual(line, 2) # Title + header info line
|
self.assertEqual(line, 2) # Title + header info line
|
||||||
|
|
||||||
def test_draw_sample_stats(self):
|
def test_draw_sample_stats(self):
|
||||||
|
|
@ -453,9 +453,9 @@ def test_draw_sample_stats(self):
|
||||||
colors = {"cyan": curses.A_BOLD, "green": curses.A_BOLD}
|
colors = {"cyan": curses.A_BOLD, "green": curses.A_BOLD}
|
||||||
self.collector._initialize_widgets(colors)
|
self.collector._initialize_widgets(colors)
|
||||||
|
|
||||||
line = self.collector._header_widget.draw_sample_stats(0, 160, 10.0)
|
line = self.collector.header_widget.draw_sample_stats(0, 160, 10.0)
|
||||||
self.assertEqual(line, 1)
|
self.assertEqual(line, 1)
|
||||||
self.assertGreater(self.collector._max_sample_rate, 0)
|
self.assertGreater(self.collector.max_sample_rate, 0)
|
||||||
|
|
||||||
def test_progress_bar_uses_target_rate(self):
|
def test_progress_bar_uses_target_rate(self):
|
||||||
"""Test that progress bar uses target rate instead of max rate."""
|
"""Test that progress bar uses target rate instead of max rate."""
|
||||||
|
|
@ -465,7 +465,7 @@ def test_progress_bar_uses_target_rate(self):
|
||||||
) # 10ms = 100Hz target
|
) # 10ms = 100Hz target
|
||||||
collector.start_time = time.perf_counter()
|
collector.start_time = time.perf_counter()
|
||||||
collector.total_samples = 500
|
collector.total_samples = 500
|
||||||
collector._max_sample_rate = (
|
collector.max_sample_rate = (
|
||||||
150 # Higher than target to test we don't use this
|
150 # Higher than target to test we don't use this
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -477,7 +477,7 @@ def test_progress_bar_uses_target_rate(self):
|
||||||
|
|
||||||
# Draw sample stats with a known elapsed time that gives us a specific sample rate
|
# Draw sample stats with a known elapsed time that gives us a specific sample rate
|
||||||
elapsed = 10.0 # 500 samples in 10 seconds = 50 samples/second
|
elapsed = 10.0 # 500 samples in 10 seconds = 50 samples/second
|
||||||
line = collector._header_widget.draw_sample_stats(0, 160, elapsed)
|
line = collector.header_widget.draw_sample_stats(0, 160, elapsed)
|
||||||
|
|
||||||
# Verify display was updated
|
# Verify display was updated
|
||||||
self.assertEqual(line, 1)
|
self.assertEqual(line, 1)
|
||||||
|
|
@ -543,7 +543,7 @@ def test_progress_bar_different_intervals(self):
|
||||||
collector.display.buffer.clear()
|
collector.display.buffer.clear()
|
||||||
|
|
||||||
# Draw with 1 second elapsed time (gives us current rate of 100Hz)
|
# Draw with 1 second elapsed time (gives us current rate of 100Hz)
|
||||||
collector._header_widget.draw_sample_stats(0, 160, 1.0)
|
collector.header_widget.draw_sample_stats(0, 160, 1.0)
|
||||||
|
|
||||||
# Check that the current/target format appears in the display with proper units
|
# Check that the current/target format appears in the display with proper units
|
||||||
found_current_target_format = False
|
found_current_target_format = False
|
||||||
|
|
@ -564,13 +564,13 @@ def test_progress_bar_different_intervals(self):
|
||||||
|
|
||||||
def test_draw_efficiency_bar(self):
|
def test_draw_efficiency_bar(self):
|
||||||
"""Test drawing efficiency bar."""
|
"""Test drawing efficiency bar."""
|
||||||
self.collector._successful_samples = 900
|
self.collector.successful_samples = 900
|
||||||
self.collector._failed_samples = 100
|
self.collector.failed_samples = 100
|
||||||
self.collector.total_samples = 1000
|
self.collector.total_samples = 1000
|
||||||
colors = {"green": curses.A_BOLD, "red": curses.A_BOLD}
|
colors = {"green": curses.A_BOLD, "red": curses.A_BOLD}
|
||||||
self.collector._initialize_widgets(colors)
|
self.collector._initialize_widgets(colors)
|
||||||
|
|
||||||
line = self.collector._header_widget.draw_efficiency_bar(0, 160)
|
line = self.collector.header_widget.draw_efficiency_bar(0, 160)
|
||||||
self.assertEqual(line, 1)
|
self.assertEqual(line, 1)
|
||||||
|
|
||||||
def test_draw_function_stats(self):
|
def test_draw_function_stats(self):
|
||||||
|
|
@ -586,7 +586,7 @@ def test_draw_function_stats(self):
|
||||||
"total_rec_calls": 0,
|
"total_rec_calls": 0,
|
||||||
}
|
}
|
||||||
|
|
||||||
stats_list = self.collector._build_stats_list()
|
stats_list = self.collector.build_stats_list()
|
||||||
colors = {
|
colors = {
|
||||||
"cyan": curses.A_BOLD,
|
"cyan": curses.A_BOLD,
|
||||||
"green": curses.A_BOLD,
|
"green": curses.A_BOLD,
|
||||||
|
|
@ -595,7 +595,7 @@ def test_draw_function_stats(self):
|
||||||
}
|
}
|
||||||
self.collector._initialize_widgets(colors)
|
self.collector._initialize_widgets(colors)
|
||||||
|
|
||||||
line = self.collector._header_widget.draw_function_stats(
|
line = self.collector.header_widget.draw_function_stats(
|
||||||
0, 160, stats_list
|
0, 160, stats_list
|
||||||
)
|
)
|
||||||
self.assertEqual(line, 1)
|
self.assertEqual(line, 1)
|
||||||
|
|
@ -609,7 +609,7 @@ def test_draw_top_functions(self):
|
||||||
"total_rec_calls": 0,
|
"total_rec_calls": 0,
|
||||||
}
|
}
|
||||||
|
|
||||||
stats_list = self.collector._build_stats_list()
|
stats_list = self.collector.build_stats_list()
|
||||||
colors = {
|
colors = {
|
||||||
"red": curses.A_BOLD,
|
"red": curses.A_BOLD,
|
||||||
"yellow": curses.A_BOLD,
|
"yellow": curses.A_BOLD,
|
||||||
|
|
@ -617,7 +617,7 @@ def test_draw_top_functions(self):
|
||||||
}
|
}
|
||||||
self.collector._initialize_widgets(colors)
|
self.collector._initialize_widgets(colors)
|
||||||
|
|
||||||
line = self.collector._header_widget.draw_top_functions(
|
line = self.collector.header_widget.draw_top_functions(
|
||||||
0, 160, stats_list
|
0, 160, stats_list
|
||||||
)
|
)
|
||||||
self.assertEqual(line, 1)
|
self.assertEqual(line, 1)
|
||||||
|
|
@ -636,7 +636,7 @@ def test_draw_column_headers(self):
|
||||||
show_tottime,
|
show_tottime,
|
||||||
show_cumul_pct,
|
show_cumul_pct,
|
||||||
show_cumtime,
|
show_cumtime,
|
||||||
) = self.collector._table_widget.draw_column_headers(0, 160)
|
) = self.collector.table_widget.draw_column_headers(0, 160)
|
||||||
self.assertEqual(line, 1)
|
self.assertEqual(line, 1)
|
||||||
self.assertTrue(show_sample_pct)
|
self.assertTrue(show_sample_pct)
|
||||||
self.assertTrue(show_tottime)
|
self.assertTrue(show_tottime)
|
||||||
|
|
@ -657,7 +657,7 @@ def test_draw_column_headers_narrow_terminal(self):
|
||||||
show_tottime,
|
show_tottime,
|
||||||
show_cumul_pct,
|
show_cumul_pct,
|
||||||
show_cumtime,
|
show_cumtime,
|
||||||
) = self.collector._table_widget.draw_column_headers(0, 70)
|
) = self.collector.table_widget.draw_column_headers(0, 70)
|
||||||
self.assertEqual(line, 1)
|
self.assertEqual(line, 1)
|
||||||
# Some columns should be hidden on narrow terminal
|
# Some columns should be hidden on narrow terminal
|
||||||
self.assertFalse(show_cumul_pct)
|
self.assertFalse(show_cumul_pct)
|
||||||
|
|
@ -666,7 +666,7 @@ def test_draw_footer(self):
|
||||||
"""Test drawing footer."""
|
"""Test drawing footer."""
|
||||||
colors = self.collector._setup_colors()
|
colors = self.collector._setup_colors()
|
||||||
self.collector._initialize_widgets(colors)
|
self.collector._initialize_widgets(colors)
|
||||||
self.collector._footer_widget.render(38, 160)
|
self.collector.footer_widget.render(38, 160)
|
||||||
# Should have written some content to the display buffer
|
# Should have written some content to the display buffer
|
||||||
self.assertGreater(len(self.mock_display.buffer), 0)
|
self.assertGreater(len(self.mock_display.buffer), 0)
|
||||||
|
|
||||||
|
|
@ -674,7 +674,7 @@ def test_draw_progress_bar(self):
|
||||||
"""Test progress bar drawing."""
|
"""Test progress bar drawing."""
|
||||||
colors = self.collector._setup_colors()
|
colors = self.collector._setup_colors()
|
||||||
self.collector._initialize_widgets(colors)
|
self.collector._initialize_widgets(colors)
|
||||||
bar, length = self.collector._header_widget.progress_bar.render_bar(
|
bar, length = self.collector.header_widget.progress_bar.render_bar(
|
||||||
50, 100, 30
|
50, 100, 30
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -699,7 +699,7 @@ def test_very_long_function_name(self):
|
||||||
"total_rec_calls": 0,
|
"total_rec_calls": 0,
|
||||||
}
|
}
|
||||||
|
|
||||||
stats_list = collector._build_stats_list()
|
stats_list = collector.build_stats_list()
|
||||||
self.assertEqual(len(stats_list), 1)
|
self.assertEqual(len(stats_list), 1)
|
||||||
self.assertEqual(stats_list[0]["func"][2], long_name)
|
self.assertEqual(stats_list[0]["func"][2], long_name)
|
||||||
|
|
||||||
|
|
@ -729,8 +729,8 @@ def test_update_display_terminal_too_small(self):
|
||||||
def test_update_display_normal(self):
|
def test_update_display_normal(self):
|
||||||
"""Test normal update_display operation."""
|
"""Test normal update_display operation."""
|
||||||
self.collector.total_samples = 100
|
self.collector.total_samples = 100
|
||||||
self.collector._successful_samples = 90
|
self.collector.successful_samples = 90
|
||||||
self.collector._failed_samples = 10
|
self.collector.failed_samples = 10
|
||||||
self.collector.result[("test.py", 10, "func")] = {
|
self.collector.result[("test.py", 10, "func")] = {
|
||||||
"direct_calls": 50,
|
"direct_calls": 50,
|
||||||
"cumulative_calls": 75,
|
"cumulative_calls": 75,
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue