gh-138122: Replace --interval with --sampling-rate (#143085)

This commit is contained in:
László Kiss Kollár 2025-12-24 13:46:33 +00:00 committed by GitHub
parent e8e044eda3
commit d4dc3dd9aa
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
15 changed files with 154 additions and 100 deletions

View file

@ -53,7 +53,7 @@ counts**, not direct measurements. Tachyon counts how many times each function
appears in the collected samples, then multiplies by the sampling interval to
estimate time.
For example, with a 100 microsecond sampling interval over a 10-second profile,
For example, with a 10 kHz sampling rate over a 10-second profile,
Tachyon collects approximately 100,000 samples. If a function appears in 5,000
samples (5% of total), Tachyon estimates it consumed 5% of the 10-second
duration, or about 500 milliseconds. This is a statistical estimate, not a
@ -142,7 +142,7 @@ Use live mode for real-time monitoring (press ``q`` to quit)::
Profile for 60 seconds with a faster sampling rate::
python -m profiling.sampling run -d 60 -i 50 script.py
python -m profiling.sampling run -d 60 -r 20khz script.py
Generate a line-by-line heatmap::
@ -326,8 +326,8 @@ The default configuration works well for most use cases:
* - Option
- Default
* - Default for ``--interval`` / ``-i``
- 100 µs between samples (~10,000 samples/sec)
* - Default for ``--sampling-rate`` / ``-r``
- 1 kHz
* - Default for ``--duration`` / ``-d``
- 10 seconds
* - Default for ``--all-threads`` / ``-a``
@ -346,23 +346,22 @@ The default configuration works well for most use cases:
- Disabled (non-blocking sampling)
Sampling interval and duration
------------------------------
Sampling rate and duration
--------------------------
The two most fundamental parameters are the sampling interval and duration.
The two most fundamental parameters are the sampling rate and duration.
Together, these determine how many samples will be collected during a profiling
session.
The :option:`--interval` option (:option:`-i`) sets the time between samples in
microseconds. The default is 100 microseconds, which produces approximately
10,000 samples per second::
The :option:`--sampling-rate` option (:option:`-r`) sets how frequently samples
are collected. The default is 1 kHz (10,000 samples per second)::
python -m profiling.sampling run -i 50 script.py
python -m profiling.sampling run -r 20khz script.py
Lower intervals capture more samples and provide finer-grained data at the
cost of slightly higher profiler CPU usage. Higher intervals reduce profiler
Higher rates capture more samples and provide finer-grained data at the
cost of slightly higher profiler CPU usage. Lower rates reduce profiler
overhead but may miss short-lived functions. For most applications, the
default interval provides a good balance between accuracy and overhead.
default rate provides a good balance between accuracy and overhead.
The :option:`--duration` option (:option:`-d`) sets how long to profile in seconds. The
default is 10 seconds::
@ -573,9 +572,9 @@ appended:
- For pstats format (which defaults to stdout), subprocesses produce files like
``profile_12345.pstats``
The subprocess profilers inherit most sampling options from the parent (interval,
duration, thread selection, native frames, GC frames, async-aware mode, and
output format). All Python descendant processes are profiled recursively,
The subprocess profilers inherit most sampling options from the parent (sampling
rate, duration, thread selection, native frames, GC frames, async-aware mode,
and output format). All Python descendant processes are profiled recursively,
including grandchildren and further descendants.
Subprocess detection works by periodically scanning for new descendants of
@ -1389,9 +1388,9 @@ Global options
Sampling options
----------------
.. option:: -i <microseconds>, --interval <microseconds>
.. option:: -r <rate>, --sampling-rate <rate>
Sampling interval in microseconds. Default: 100.
Sampling rate (for example, ``10000``, ``10khz``, ``10k``). Default: ``1khz``.
.. option:: -d <seconds>, --duration <seconds>

View file

@ -16,7 +16,7 @@
_CHILD_POLL_INTERVAL_SEC = 0.1
# Default timeout for waiting on child profilers
_DEFAULT_WAIT_TIMEOUT = 30.0
_DEFAULT_WAIT_TIMEOUT_SEC = 30.0
# Maximum number of child profilers to spawn (prevents resource exhaustion)
_MAX_CHILD_PROFILERS = 100
@ -138,7 +138,7 @@ def spawned_profilers(self):
with self._lock:
return list(self._spawned_profilers)
def wait_for_profilers(self, timeout=_DEFAULT_WAIT_TIMEOUT):
def wait_for_profilers(self, timeout=_DEFAULT_WAIT_TIMEOUT_SEC):
"""
Wait for all spawned child profilers to complete.

View file

@ -73,8 +73,8 @@ def _validate_arguments(args: List[str]) -> tuple[int, str, List[str]]:
# Constants for socket communication
_MAX_RETRIES = 3
_INITIAL_RETRY_DELAY = 0.1
_SOCKET_TIMEOUT = 2.0
_INITIAL_RETRY_DELAY_SEC = 0.1
_SOCKET_TIMEOUT_SEC = 2.0
_READY_MESSAGE = b"ready"
@ -93,14 +93,14 @@ def _signal_readiness(sync_port: int) -> None:
for attempt in range(_MAX_RETRIES):
try:
# Use context manager for automatic cleanup
with socket.create_connection(("127.0.0.1", sync_port), timeout=_SOCKET_TIMEOUT) as sock:
with socket.create_connection(("127.0.0.1", sync_port), timeout=_SOCKET_TIMEOUT_SEC) as sock:
sock.send(_READY_MESSAGE)
return
except (socket.error, OSError) as e:
last_error = e
if attempt < _MAX_RETRIES - 1:
# Exponential backoff before retry
time.sleep(_INITIAL_RETRY_DELAY * (2 ** attempt))
time.sleep(_INITIAL_RETRY_DELAY_SEC * (2 ** attempt))
# If we get here, all retries failed
raise SyncError(f"Failed to signal readiness after {_MAX_RETRIES} attempts: {last_error}") from last_error

View file

@ -4,6 +4,7 @@
import importlib.util
import locale
import os
import re
import selectors
import socket
import subprocess
@ -20,6 +21,7 @@
from .binary_collector import BinaryCollector
from .binary_reader import BinaryReader
from .constants import (
MICROSECONDS_PER_SECOND,
PROFILING_MODE_ALL,
PROFILING_MODE_WALL,
PROFILING_MODE_CPU,
@ -66,8 +68,8 @@ class CustomFormatter(
# Constants for socket synchronization
_SYNC_TIMEOUT = 5.0
_PROCESS_KILL_TIMEOUT = 2.0
_SYNC_TIMEOUT_SEC = 5.0
_PROCESS_KILL_TIMEOUT_SEC = 2.0
_READY_MESSAGE = b"ready"
_RECV_BUFFER_SIZE = 1024
@ -116,7 +118,8 @@ def _build_child_profiler_args(args):
child_args = []
# Sampling options
child_args.extend(["-i", str(args.interval)])
hz = MICROSECONDS_PER_SECOND // args.sample_interval_usec
child_args.extend(["-r", str(hz)])
child_args.extend(["-d", str(args.duration)])
if args.all_threads:
@ -239,7 +242,7 @@ def _run_with_sync(original_cmd, suppress_output=False):
sync_sock.bind(("127.0.0.1", 0)) # Let OS choose a free port
sync_port = sync_sock.getsockname()[1]
sync_sock.listen(1)
sync_sock.settimeout(_SYNC_TIMEOUT)
sync_sock.settimeout(_SYNC_TIMEOUT_SEC)
# Get current working directory to preserve it
cwd = os.getcwd()
@ -268,7 +271,7 @@ def _run_with_sync(original_cmd, suppress_output=False):
process = subprocess.Popen(cmd, **popen_kwargs)
try:
_wait_for_ready_signal(sync_sock, process, _SYNC_TIMEOUT)
_wait_for_ready_signal(sync_sock, process, _SYNC_TIMEOUT_SEC)
# Close stderr pipe if we were capturing it
if process.stderr:
@ -279,7 +282,7 @@ def _run_with_sync(original_cmd, suppress_output=False):
if process.poll() is None:
process.terminate()
try:
process.wait(timeout=_PROCESS_KILL_TIMEOUT)
process.wait(timeout=_PROCESS_KILL_TIMEOUT_SEC)
except subprocess.TimeoutExpired:
process.kill()
process.wait()
@ -290,16 +293,64 @@ def _run_with_sync(original_cmd, suppress_output=False):
return process
_RATE_PATTERN = re.compile(r'''
^ # Start of string
( # Group 1: The numeric value
\d+ # One or more digits (integer part)
(?:\.\d+)? # Optional: decimal point followed by digits
) # Examples: "10", "0.5", "100.25"
( # Group 2: Optional unit suffix
hz # "hz" - hertz
| khz # "khz" - kilohertz
| k # "k" - shorthand for kilohertz
)? # Suffix is optional (bare number = Hz)
$ # End of string
''', re.VERBOSE | re.IGNORECASE)
def _parse_sampling_rate(rate_str: str) -> int:
"""Parse sampling rate string to microseconds."""
rate_str = rate_str.strip().lower()
match = _RATE_PATTERN.match(rate_str)
if not match:
raise argparse.ArgumentTypeError(
f"Invalid sampling rate format: {rate_str}. "
"Expected: number followed by optional suffix (hz, khz, k) with no spaces (e.g., 10khz)"
)
number_part = match.group(1)
suffix = match.group(2) or ''
# Determine multiplier based on suffix
suffix_map = {
'hz': 1,
'khz': 1000,
'k': 1000,
}
multiplier = suffix_map.get(suffix, 1)
hz = float(number_part) * multiplier
if hz <= 0:
raise argparse.ArgumentTypeError(f"Sampling rate must be positive: {rate_str}")
interval_usec = int(MICROSECONDS_PER_SECOND / hz)
if interval_usec < 1:
raise argparse.ArgumentTypeError(f"Sampling rate too high: {rate_str}")
return interval_usec
def _add_sampling_options(parser):
"""Add sampling configuration options to a parser."""
sampling_group = parser.add_argument_group("Sampling configuration")
sampling_group.add_argument(
"-i",
"--interval",
type=int,
default=100,
metavar="MICROSECONDS",
help="sampling interval",
"-r",
"--sampling-rate",
type=_parse_sampling_rate,
default="1khz",
metavar="RATE",
dest="sample_interval_usec",
help="sampling rate (e.g., 10000, 10khz, 10k)",
)
sampling_group.add_argument(
"-d",
@ -487,14 +538,13 @@ def _sort_to_mode(sort_choice):
}
return sort_map.get(sort_choice, SORT_MODE_NSAMPLES)
def _create_collector(format_type, interval, skip_idle, opcodes=False,
def _create_collector(format_type, sample_interval_usec, skip_idle, opcodes=False,
output_file=None, compression='auto'):
"""Create the appropriate collector based on format type.
Args:
format_type: The output format ('pstats', 'collapsed', 'flamegraph', 'gecko', 'heatmap', 'binary')
interval: Sampling interval in microseconds
sample_interval_usec: Sampling interval in microseconds
skip_idle: Whether to skip idle samples
opcodes: Whether to collect opcode information (only used by gecko format
for creating interval markers in Firefox Profiler)
@ -519,9 +569,9 @@ def _create_collector(format_type, interval, skip_idle, opcodes=False,
# and is the only format that uses opcodes for interval markers
if format_type == "gecko":
skip_idle = False
return collector_class(interval, skip_idle=skip_idle, opcodes=opcodes)
return collector_class(sample_interval_usec, skip_idle=skip_idle, opcodes=opcodes)
return collector_class(interval, skip_idle=skip_idle)
return collector_class(sample_interval_usec, skip_idle=skip_idle)
def _generate_output_filename(format_type, pid):
@ -725,8 +775,8 @@ def _main():
# Generate flamegraph from a script
`python -m profiling.sampling run --flamegraph -o output.html script.py`
# Profile with custom interval and duration
`python -m profiling.sampling run -i 50 -d 30 script.py`
# Profile with custom rate and duration
`python -m profiling.sampling run -r 5khz -d 30 script.py`
# Save collapsed stacks to file
`python -m profiling.sampling run --collapsed -o stacks.txt script.py`
@ -860,7 +910,7 @@ def _handle_attach(args):
# Create the appropriate collector
collector = _create_collector(
args.format, args.interval, skip_idle, args.opcodes,
args.format, args.sample_interval_usec, skip_idle, args.opcodes,
output_file=output_file,
compression=getattr(args, 'compression', 'auto')
)
@ -938,7 +988,7 @@ def _handle_run(args):
# Create the appropriate collector
collector = _create_collector(
args.format, args.interval, skip_idle, args.opcodes,
args.format, args.sample_interval_usec, skip_idle, args.opcodes,
output_file=output_file,
compression=getattr(args, 'compression', 'auto')
)
@ -965,7 +1015,7 @@ def _handle_run(args):
if process.poll() is None:
process.terminate()
try:
process.wait(timeout=_PROCESS_KILL_TIMEOUT)
process.wait(timeout=_PROCESS_KILL_TIMEOUT_SEC)
except subprocess.TimeoutExpired:
process.kill()
process.wait()
@ -980,7 +1030,7 @@ def _handle_live_attach(args, pid):
# Create live collector with default settings
collector = LiveStatsCollector(
args.interval,
args.sample_interval_usec,
skip_idle=skip_idle,
sort_by="tottime", # Default initial sort
limit=20, # Default limit
@ -1027,7 +1077,7 @@ def _handle_live_run(args):
# Create live collector with default settings
collector = LiveStatsCollector(
args.interval,
args.sample_interval_usec,
skip_idle=skip_idle,
sort_by="tottime", # Default initial sort
limit=20, # Default limit

View file

@ -1,5 +1,9 @@
"""Constants for the sampling profiler."""
# Time unit conversion constants
MICROSECONDS_PER_SECOND = 1_000_000
MILLISECONDS_PER_SECOND = 1_000
# Profiling mode constants
PROFILING_MODE_WALL = 0
PROFILING_MODE_CPU = 1

View file

@ -114,7 +114,7 @@
from .constants import (
MICROSECONDS_PER_SECOND,
DISPLAY_UPDATE_HZ,
DISPLAY_UPDATE_INTERVAL,
DISPLAY_UPDATE_INTERVAL_SEC,
MIN_TERMINAL_WIDTH,
MIN_TERMINAL_HEIGHT,
WIDTH_THRESHOLD_SAMPLE_PCT,
@ -165,7 +165,7 @@
# Constants
"MICROSECONDS_PER_SECOND",
"DISPLAY_UPDATE_HZ",
"DISPLAY_UPDATE_INTERVAL",
"DISPLAY_UPDATE_INTERVAL_SEC",
"MIN_TERMINAL_WIDTH",
"MIN_TERMINAL_HEIGHT",
"WIDTH_THRESHOLD_SAMPLE_PCT",

View file

@ -24,7 +24,7 @@
)
from .constants import (
MICROSECONDS_PER_SECOND,
DISPLAY_UPDATE_INTERVAL,
DISPLAY_UPDATE_INTERVAL_SEC,
MIN_TERMINAL_WIDTH,
MIN_TERMINAL_HEIGHT,
HEADER_LINES,
@ -157,7 +157,7 @@ def __init__(
self.max_sample_rate = 0 # Track maximum sample rate seen
self.successful_samples = 0 # Track samples that captured frames
self.failed_samples = 0 # Track samples that failed to capture frames
self.display_update_interval = DISPLAY_UPDATE_INTERVAL # Instance variable for display refresh rate
self.display_update_interval_sec = DISPLAY_UPDATE_INTERVAL_SEC # Instance variable for display refresh rate
# Thread status statistics (bit flags)
self.thread_status_counts = {
@ -410,7 +410,7 @@ def collect(self, stack_frames, timestamp_us=None):
if (
self._last_display_update is None
or (current_time - self._last_display_update)
>= self.display_update_interval
>= self.display_update_interval_sec
):
self._update_display()
self._last_display_update = current_time
@ -987,14 +987,14 @@ def _handle_input(self):
elif ch == ord("+") or ch == ord("="):
# Decrease update interval (faster refresh)
self.display_update_interval = max(
0.05, self.display_update_interval - 0.05
self.display_update_interval_sec = max(
0.05, self.display_update_interval_sec - 0.05
) # Min 20Hz
elif ch == ord("-") or ch == ord("_"):
# Increase update interval (slower refresh)
self.display_update_interval = min(
1.0, self.display_update_interval + 0.05
self.display_update_interval_sec = min(
1.0, self.display_update_interval_sec + 0.05
) # Max 1Hz
elif ch == ord("c") or ch == ord("C"):

View file

@ -5,7 +5,7 @@
# Display update constants
DISPLAY_UPDATE_HZ = 10
DISPLAY_UPDATE_INTERVAL = 1.0 / DISPLAY_UPDATE_HZ # 0.1 seconds
DISPLAY_UPDATE_INTERVAL_SEC = 1.0 / DISPLAY_UPDATE_HZ # 0.1 seconds
# Terminal size constraints
MIN_TERMINAL_WIDTH = 60

View file

@ -13,7 +13,7 @@
WIDTH_THRESHOLD_CUMUL_PCT,
WIDTH_THRESHOLD_CUMTIME,
MICROSECONDS_PER_SECOND,
DISPLAY_UPDATE_INTERVAL,
DISPLAY_UPDATE_INTERVAL_SEC,
MIN_BAR_WIDTH,
MAX_SAMPLE_RATE_BAR_WIDTH,
MAX_EFFICIENCY_BAR_WIDTH,
@ -181,7 +181,7 @@ def draw_header_info(self, line, width, elapsed):
# Calculate display refresh rate
refresh_hz = (
1.0 / self.collector.display_update_interval if self.collector.display_update_interval > 0 else 0
1.0 / self.collector.display_update_interval_sec if self.collector.display_update_interval_sec > 0 else 0
)
# Get current view mode and thread display
@ -235,8 +235,8 @@ def draw_header_info(self, line, width, elapsed):
def format_rate_with_units(self, rate_hz):
"""Format a rate in Hz with appropriate units (Hz, KHz, MHz)."""
if rate_hz >= 1_000_000:
return f"{rate_hz / 1_000_000:.1f}MHz"
if rate_hz >= MICROSECONDS_PER_SECOND:
return f"{rate_hz / MICROSECONDS_PER_SECOND:.1f}MHz"
elif rate_hz >= 1_000:
return f"{rate_hz / 1_000:.1f}KHz"
else:

View file

@ -3,6 +3,7 @@
from _colorize import ANSIColors
from .collector import Collector, extract_lineno
from .constants import MICROSECONDS_PER_SECOND
class PstatsCollector(Collector):
@ -68,7 +69,7 @@ def _dump_stats(self, file):
# Needed for compatibility with pstats.Stats
def create_stats(self):
sample_interval_sec = self.sample_interval_usec / 1_000_000
sample_interval_sec = self.sample_interval_usec / MICROSECONDS_PER_SECOND
callers = {}
for fname, call_counts in self.result.items():
total = call_counts["direct_calls"] * sample_interval_sec
@ -263,7 +264,7 @@ def _determine_best_unit(max_value):
elif max_value >= 0.001:
return "ms", 1000.0
else:
return "μs", 1000000.0
return "μs", float(MICROSECONDS_PER_SECOND)
def _print_summary(self, stats_list, total_samples):
"""Print summary of interesting functions."""

View file

@ -219,8 +219,8 @@ def worker(x):
"run",
"-d",
"5",
"-i",
"100000",
"-r",
"10",
script,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,

View file

@ -279,11 +279,11 @@ def test_monitor_creation(self):
monitor = ChildProcessMonitor(
pid=os.getpid(),
cli_args=["-i", "100", "-d", "5"],
cli_args=["-r", "10khz", "-d", "5"],
output_pattern="test_{pid}.pstats",
)
self.assertEqual(monitor.parent_pid, os.getpid())
self.assertEqual(monitor.cli_args, ["-i", "100", "-d", "5"])
self.assertEqual(monitor.cli_args, ["-r", "10khz", "-d", "5"])
self.assertEqual(monitor.output_pattern, "test_{pid}.pstats")
def test_monitor_lifecycle(self):
@ -386,7 +386,7 @@ def test_build_child_profiler_args(self):
from profiling.sampling.cli import _build_child_profiler_args
args = argparse.Namespace(
interval=200,
sample_interval_usec=200,
duration=15,
all_threads=True,
realtime_stats=False,
@ -420,7 +420,7 @@ def assert_flag_value_pair(flag, value):
f"'{child_args[flag_index + 1]}' in args: {child_args}",
)
assert_flag_value_pair("-i", 200)
assert_flag_value_pair("-r", 5000)
assert_flag_value_pair("-d", 15)
assert_flag_value_pair("--mode", "cpu")
@ -444,7 +444,7 @@ def test_build_child_profiler_args_no_gc(self):
from profiling.sampling.cli import _build_child_profiler_args
args = argparse.Namespace(
interval=100,
sample_interval_usec=100,
duration=5,
all_threads=False,
realtime_stats=False,
@ -510,7 +510,7 @@ def test_setup_child_monitor(self):
from profiling.sampling.cli import _setup_child_monitor
args = argparse.Namespace(
interval=100,
sample_interval_usec=100,
duration=5,
all_threads=False,
realtime_stats=False,
@ -690,7 +690,7 @@ def test_monitor_respects_max_limit(self):
# Create a monitor
monitor = ChildProcessMonitor(
pid=os.getpid(),
cli_args=["-i", "100", "-d", "5"],
cli_args=["-r", "10khz", "-d", "5"],
output_pattern="test_{pid}.pstats",
)
@ -927,8 +927,8 @@ def test_subprocesses_flag_spawns_child_and_creates_output(self):
"--subprocesses",
"-d",
"3",
"-i",
"10000",
"-r",
"100",
"-o",
output_file,
script_file,
@ -989,8 +989,8 @@ def test_subprocesses_flag_with_flamegraph_output(self):
"--subprocesses",
"-d",
"2",
"-i",
"10000",
"-r",
"100",
"--flamegraph",
"-o",
output_file,
@ -1043,8 +1043,8 @@ def test_subprocesses_flag_no_crash_on_quick_child(self):
"--subprocesses",
"-d",
"2",
"-i",
"10000",
"-r",
"100",
"-o",
output_file,
script_file,

View file

@ -232,7 +232,7 @@ def test_cli_module_with_profiler_options(self):
test_args = [
"profiling.sampling.cli",
"run",
"-i",
"-r",
"1000",
"-d",
"30",
@ -265,8 +265,8 @@ def test_cli_script_with_profiler_options(self):
test_args = [
"profiling.sampling.cli",
"run",
"-i",
"2000",
"-r",
"500",
"-d",
"60",
"--collapsed",

View file

@ -35,7 +35,7 @@ def setUp(self):
)
self.collector.start_time = time.perf_counter()
# Set a consistent display update interval for tests
self.collector.display_update_interval = 0.1
self.collector.display_update_interval_sec = 0.1
def tearDown(self):
"""Clean up after test."""
@ -110,45 +110,45 @@ def test_reset_stats(self):
def test_increase_refresh_rate(self):
"""Test increasing refresh rate (faster updates)."""
initial_interval = self.collector.display_update_interval
initial_interval = self.collector.display_update_interval_sec
# Simulate '+' key press (faster = smaller interval)
self.display.simulate_input(ord("+"))
self.collector._handle_input()
self.assertLess(self.collector.display_update_interval, initial_interval)
self.assertLess(self.collector.display_update_interval_sec, initial_interval)
def test_decrease_refresh_rate(self):
"""Test decreasing refresh rate (slower updates)."""
initial_interval = self.collector.display_update_interval
initial_interval = self.collector.display_update_interval_sec
# Simulate '-' key press (slower = larger interval)
self.display.simulate_input(ord("-"))
self.collector._handle_input()
self.assertGreater(self.collector.display_update_interval, initial_interval)
self.assertGreater(self.collector.display_update_interval_sec, initial_interval)
def test_refresh_rate_minimum(self):
"""Test that refresh rate has a minimum (max speed)."""
self.collector.display_update_interval = 0.05 # Set to minimum
self.collector.display_update_interval_sec = 0.05 # Set to minimum
# Try to go faster
self.display.simulate_input(ord("+"))
self.collector._handle_input()
# Should stay at minimum
self.assertEqual(self.collector.display_update_interval, 0.05)
self.assertEqual(self.collector.display_update_interval_sec, 0.05)
def test_refresh_rate_maximum(self):
"""Test that refresh rate has a maximum (min speed)."""
self.collector.display_update_interval = 1.0 # Set to maximum
self.collector.display_update_interval_sec = 1.0 # Set to maximum
# Try to go slower
self.display.simulate_input(ord("-"))
self.collector._handle_input()
# Should stay at maximum
self.assertEqual(self.collector.display_update_interval, 1.0)
self.assertEqual(self.collector.display_update_interval_sec, 1.0)
def test_help_toggle(self):
"""Test help screen toggle."""
@ -289,23 +289,23 @@ def test_filter_clear_uppercase(self):
def test_increase_refresh_rate_with_equals(self):
"""Test increasing refresh rate with '=' key."""
initial_interval = self.collector.display_update_interval
initial_interval = self.collector.display_update_interval_sec
# Simulate '=' key press (alternative to '+')
self.display.simulate_input(ord("="))
self.collector._handle_input()
self.assertLess(self.collector.display_update_interval, initial_interval)
self.assertLess(self.collector.display_update_interval_sec, initial_interval)
def test_decrease_refresh_rate_with_underscore(self):
"""Test decreasing refresh rate with '_' key."""
initial_interval = self.collector.display_update_interval
initial_interval = self.collector.display_update_interval_sec
# Simulate '_' key press (alternative to '-')
self.display.simulate_input(ord("_"))
self.collector._handle_input()
self.assertGreater(self.collector.display_update_interval, initial_interval)
self.assertGreater(self.collector.display_update_interval_sec, initial_interval)
def test_finished_state_displays_banner(self):
"""Test that finished state shows prominent banner."""

View file

@ -306,8 +306,8 @@ def test_gil_mode_cli_argument_parsing(self):
"12345",
"--mode",
"gil",
"-i",
"500",
"-r",
"2000",
"-d",
"5",
]
@ -488,8 +488,8 @@ def test_exception_mode_cli_argument_parsing(self):
"12345",
"--mode",
"exception",
"-i",
"500",
"-r",
"2000",
"-d",
"5",
]