gh-135953: Profile a module or script with sampling profiler (#136777)

This commit is contained in:
László Kiss Kollár 2025-08-11 08:36:43 -03:00 committed by GitHub
parent 70218b4008
commit 4497ad409e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 864 additions and 79 deletions

View file

@ -0,0 +1,243 @@
"""
Internal synchronization coordinator for the sample profiler.
This module is used internally by the sample profiler to coordinate
the startup of target processes. It should not be called directly by users.
"""
import os
import sys
import socket
import runpy
import time
from typing import List, NoReturn
class CoordinatorError(Exception):
"""Base exception for coordinator errors."""
pass
class ArgumentError(CoordinatorError):
"""Raised when invalid arguments are provided."""
pass
class SyncError(CoordinatorError):
"""Raised when synchronization with profiler fails."""
pass
class TargetError(CoordinatorError):
"""Raised when target execution fails."""
pass
def _validate_arguments(args: List[str]) -> tuple[int, str, List[str]]:
"""
Validate and parse command line arguments.
Args:
args: Command line arguments including script name
Returns:
Tuple of (sync_port, working_directory, target_args)
Raises:
ArgumentError: If arguments are invalid
"""
if len(args) < 4:
raise ArgumentError(
"Insufficient arguments. Expected: <sync_port> <cwd> <target> [args...]"
)
try:
sync_port = int(args[1])
if not (1 <= sync_port <= 65535):
raise ValueError("Port out of range")
except ValueError as e:
raise ArgumentError(f"Invalid sync port '{args[1]}': {e}") from e
cwd = args[2]
if not os.path.isdir(cwd):
raise ArgumentError(f"Working directory does not exist: {cwd}")
target_args = args[3:]
if not target_args:
raise ArgumentError("No target specified")
return sync_port, cwd, target_args
# Constants for socket communication
_MAX_RETRIES = 3
_INITIAL_RETRY_DELAY = 0.1
_SOCKET_TIMEOUT = 2.0
_READY_MESSAGE = b"ready"
def _signal_readiness(sync_port: int) -> None:
"""
Signal readiness to the profiler via TCP socket.
Args:
sync_port: Port number where profiler is listening
Raises:
SyncError: If unable to signal readiness
"""
last_error = None
for attempt in range(_MAX_RETRIES):
try:
# Use context manager for automatic cleanup
with socket.create_connection(("127.0.0.1", sync_port), timeout=_SOCKET_TIMEOUT) as sock:
sock.send(_READY_MESSAGE)
return
except (socket.error, OSError) as e:
last_error = e
if attempt < _MAX_RETRIES - 1:
# Exponential backoff before retry
time.sleep(_INITIAL_RETRY_DELAY * (2 ** attempt))
# If we get here, all retries failed
raise SyncError(f"Failed to signal readiness after {_MAX_RETRIES} attempts: {last_error}") from last_error
def _setup_environment(cwd: str) -> None:
"""
Set up the execution environment.
Args:
cwd: Working directory to change to
Raises:
TargetError: If unable to set up environment
"""
try:
os.chdir(cwd)
except OSError as e:
raise TargetError(f"Failed to change to directory {cwd}: {e}") from e
# Add current directory to sys.path if not present (for module imports)
if cwd not in sys.path:
sys.path.insert(0, cwd)
def _execute_module(module_name: str, module_args: List[str]) -> None:
"""
Execute a Python module.
Args:
module_name: Name of the module to execute
module_args: Arguments to pass to the module
Raises:
TargetError: If module execution fails
"""
# Replace sys.argv to match how Python normally runs modules
# When running 'python -m module args', sys.argv is ["__main__.py", "args"]
sys.argv = [f"__main__.py"] + module_args
try:
runpy.run_module(module_name, run_name="__main__", alter_sys=True)
except ImportError as e:
raise TargetError(f"Module '{module_name}' not found: {e}") from e
except SystemExit:
# SystemExit is normal for modules
pass
except Exception as e:
raise TargetError(f"Error executing module '{module_name}': {e}") from e
def _execute_script(script_path: str, script_args: List[str], cwd: str) -> None:
"""
Execute a Python script.
Args:
script_path: Path to the script to execute
script_args: Arguments to pass to the script
cwd: Current working directory for path resolution
Raises:
TargetError: If script execution fails
"""
# Make script path absolute if it isn't already
if not os.path.isabs(script_path):
script_path = os.path.join(cwd, script_path)
if not os.path.isfile(script_path):
raise TargetError(f"Script not found: {script_path}")
# Replace sys.argv to match original script call
sys.argv = [script_path] + script_args
try:
with open(script_path, 'rb') as f:
source_code = f.read()
# Compile and execute the script
code = compile(source_code, script_path, 'exec')
exec(code, {'__name__': '__main__', '__file__': script_path})
except FileNotFoundError as e:
raise TargetError(f"Script file not found: {script_path}") from e
except PermissionError as e:
raise TargetError(f"Permission denied reading script: {script_path}") from e
except SyntaxError as e:
raise TargetError(f"Syntax error in script {script_path}: {e}") from e
except SystemExit:
# SystemExit is normal for scripts
pass
except Exception as e:
raise TargetError(f"Error executing script '{script_path}': {e}") from e
def main() -> NoReturn:
"""
Main coordinator function.
This function coordinates the startup of a target Python process
with the sample profiler by signaling when the process is ready
to be profiled.
"""
try:
# Parse and validate arguments
sync_port, cwd, target_args = _validate_arguments(sys.argv)
# Set up execution environment
_setup_environment(cwd)
# Signal readiness to profiler
_signal_readiness(sync_port)
# Execute the target
if target_args[0] == "-m":
# Module execution
if len(target_args) < 2:
raise ArgumentError("Module name required after -m")
module_name = target_args[1]
module_args = target_args[2:]
_execute_module(module_name, module_args)
else:
# Script execution
script_path = target_args[0]
script_args = target_args[1:]
_execute_script(script_path, script_args, cwd)
except CoordinatorError as e:
print(f"Profiler coordinator error: {e}", file=sys.stderr)
sys.exit(1)
except KeyboardInterrupt:
print("Interrupted", file=sys.stderr)
sys.exit(1)
except Exception as e:
print(f"Unexpected error in profiler coordinator: {e}", file=sys.stderr)
sys.exit(1)
# Normal exit
sys.exit(0)
if __name__ == "__main__":
main()

View file

@ -2,6 +2,8 @@
import _remote_debugging
import os
import pstats
import socket
import subprocess
import statistics
import sys
import sysconfig
@ -12,14 +14,111 @@
from .pstats_collector import PstatsCollector
from .stack_collector import CollapsedStackCollector
FREE_THREADED_BUILD = sysconfig.get_config_var("Py_GIL_DISABLED") is not None
_FREE_THREADED_BUILD = sysconfig.get_config_var("Py_GIL_DISABLED") is not None
_MAX_STARTUP_ATTEMPTS = 5
_STARTUP_RETRY_DELAY_SECONDS = 0.1
_HELP_DESCRIPTION = """Sample a process's stack frames and generate profiling data.
Supports the following target modes:
- -p PID: Profile an existing process by PID
- -m MODULE [ARGS...]: Profile a module as python -m module ...
- filename [ARGS...]: Profile the specified script by running it in a subprocess
Examples:
# Profile process 1234 for 10 seconds with default settings
python -m profile.sample -p 1234
# Profile a script by running it in a subprocess
python -m profile.sample myscript.py arg1 arg2
# Profile a module by running it as python -m module in a subprocess
python -m profile.sample -m mymodule arg1 arg2
# Profile with custom interval and duration, save to file
python -m profile.sample -i 50 -d 30 -o profile.stats -p 1234
# Generate collapsed stacks for flamegraph
python -m profile.sample --collapsed -p 1234
# Profile all threads, sort by total time
python -m profile.sample -a --sort-tottime -p 1234
# Profile for 1 minute with 1ms sampling interval
python -m profile.sample -i 1000 -d 60 -p 1234
# Show only top 20 functions sorted by direct samples
python -m profile.sample --sort-nsamples -l 20 -p 1234
# Profile all threads and save collapsed stacks
python -m profile.sample -a --collapsed -o stacks.txt -p 1234
# Profile with real-time sampling statistics
python -m profile.sample --realtime-stats -p 1234
# Sort by sample percentage to find most sampled functions
python -m profile.sample --sort-sample-pct -p 1234
# Sort by cumulative samples to find functions most on call stack
python -m profile.sample --sort-nsamples-cumul -p 1234"""
# Constants for socket synchronization
_SYNC_TIMEOUT = 5.0
_PROCESS_KILL_TIMEOUT = 2.0
_READY_MESSAGE = b"ready"
_RECV_BUFFER_SIZE = 1024
def _run_with_sync(original_cmd):
"""Run a command with socket-based synchronization and return the process."""
# Create a TCP socket for synchronization with better socket options
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sync_sock:
# Set SO_REUSEADDR to avoid "Address already in use" errors
sync_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sync_sock.bind(("127.0.0.1", 0)) # Let OS choose a free port
sync_port = sync_sock.getsockname()[1]
sync_sock.listen(1)
sync_sock.settimeout(_SYNC_TIMEOUT)
# Get current working directory to preserve it
cwd = os.getcwd()
# Build command using the sync coordinator
target_args = original_cmd[1:] # Remove python executable
cmd = (sys.executable, "-m", "profile._sync_coordinator", str(sync_port), cwd) + tuple(target_args)
# Start the process with coordinator
process = subprocess.Popen(cmd)
try:
# Wait for ready signal with timeout
with sync_sock.accept()[0] as conn:
ready_signal = conn.recv(_RECV_BUFFER_SIZE)
if ready_signal != _READY_MESSAGE:
raise RuntimeError(f"Invalid ready signal received: {ready_signal!r}")
except socket.timeout:
# If we timeout, kill the process and raise an error
if process.poll() is None:
process.terminate()
try:
process.wait(timeout=_PROCESS_KILL_TIMEOUT)
except subprocess.TimeoutExpired:
process.kill()
process.wait()
raise RuntimeError("Process failed to signal readiness within timeout")
return process
class SampleProfiler:
def __init__(self, pid, sample_interval_usec, all_threads):
self.pid = pid
self.sample_interval_usec = sample_interval_usec
self.all_threads = all_threads
if FREE_THREADED_BUILD:
if _FREE_THREADED_BUILD:
self.unwinder = _remote_debugging.RemoteUnwinder(
self.pid, all_threads=self.all_threads
)
@ -50,6 +149,7 @@ def sample(self, collector, duration_sec=10):
stack_frames = self.unwinder.get_stack_trace()
collector.collect(stack_frames)
except ProcessLookupError:
duration_sec = current_time - start_time
break
except (RuntimeError, UnicodeDecodeError, MemoryError, OSError):
errors += 1
@ -532,56 +632,54 @@ def _validate_collapsed_format_args(args, parser):
f"The following options are only valid with --pstats format: {', '.join(invalid_opts)}"
)
# Set default output filename for collapsed format
if not args.outfile:
# Set default output filename for collapsed format only if we have a PID
# For module/script execution, this will be set later with the subprocess PID
if not args.outfile and args.pid is not None:
args.outfile = f"collapsed.{args.pid}.txt"
def wait_for_process_and_sample(pid, sort_value, args):
"""Sample the process immediately since it has already signaled readiness."""
# Set default collapsed filename with subprocess PID if not already set
filename = args.outfile
if not filename and args.format == "collapsed":
filename = f"collapsed.{pid}.txt"
sample(
pid,
sort=sort_value,
sample_interval_usec=args.interval,
duration_sec=args.duration,
filename=filename,
all_threads=args.all_threads,
limit=args.limit,
show_summary=not args.no_summary,
output_format=args.format,
realtime_stats=args.realtime_stats,
)
def main():
# Create the main parser
parser = argparse.ArgumentParser(
description=(
"Sample a process's stack frames and generate profiling data.\n"
"Supports two output formats:\n"
" - pstats: Detailed profiling statistics with sorting options\n"
" - collapsed: Stack traces for generating flamegraphs\n"
"\n"
"Examples:\n"
" # Profile process 1234 for 10 seconds with default settings\n"
" python -m profile.sample 1234\n"
"\n"
" # Profile with custom interval and duration, save to file\n"
" python -m profile.sample -i 50 -d 30 -o profile.stats 1234\n"
"\n"
" # Generate collapsed stacks for flamegraph\n"
" python -m profile.sample --collapsed 1234\n"
"\n"
" # Profile all threads, sort by total time\n"
" python -m profile.sample -a --sort-tottime 1234\n"
"\n"
" # Profile for 1 minute with 1ms sampling interval\n"
" python -m profile.sample -i 1000 -d 60 1234\n"
"\n"
" # Show only top 20 functions sorted by direct samples\n"
" python -m profile.sample --sort-nsamples -l 20 1234\n"
"\n"
" # Profile all threads and save collapsed stacks\n"
" python -m profile.sample -a --collapsed -o stacks.txt 1234\n"
"\n"
" # Profile with real-time sampling statistics\n"
" python -m profile.sample --realtime-stats 1234\n"
"\n"
" # Sort by sample percentage to find most sampled functions\n"
" python -m profile.sample --sort-sample-pct 1234\n"
"\n"
" # Sort by cumulative samples to find functions most on call stack\n"
" python -m profile.sample --sort-nsamples-cumul 1234"
),
description=_HELP_DESCRIPTION,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
# Required arguments
parser.add_argument("pid", type=int, help="Process ID to sample")
# Target selection
target_group = parser.add_mutually_exclusive_group(required=False)
target_group.add_argument(
"-p", "--pid", type=int, help="Process ID to sample"
)
target_group.add_argument(
"-m", "--module",
help="Run and profile a module as python -m module [ARGS...]"
)
parser.add_argument(
"args",
nargs=argparse.REMAINDER,
help="Script to run and profile, with optional arguments"
)
# Sampling options
sampling_group = parser.add_argument_group("Sampling configuration")
@ -712,19 +810,55 @@ def main():
sort_value = args.sort if args.sort is not None else 2
sample(
args.pid,
sample_interval_usec=args.interval,
duration_sec=args.duration,
filename=args.outfile,
all_threads=args.all_threads,
limit=args.limit,
sort=sort_value,
show_summary=not args.no_summary,
output_format=args.format,
realtime_stats=args.realtime_stats,
)
if args.module is not None and not args.module:
parser.error("argument -m/--module: expected one argument")
# Validate that we have exactly one target type
# Note: args can be present with -m (module arguments) but not as standalone script
has_pid = args.pid is not None
has_module = args.module is not None
has_script = bool(args.args) and args.module is None
target_count = sum([has_pid, has_module, has_script])
if target_count == 0:
parser.error("one of the arguments -p/--pid -m/--module or script name is required")
elif target_count > 1:
parser.error("only one target type can be specified: -p/--pid, -m/--module, or script")
if args.pid:
sample(
args.pid,
sample_interval_usec=args.interval,
duration_sec=args.duration,
filename=args.outfile,
all_threads=args.all_threads,
limit=args.limit,
sort=sort_value,
show_summary=not args.no_summary,
output_format=args.format,
realtime_stats=args.realtime_stats,
)
elif args.module or args.args:
if args.module:
cmd = (sys.executable, "-m", args.module, *args.args)
else:
cmd = (sys.executable, *args.args)
# Use synchronized process startup
process = _run_with_sync(cmd)
# Process has already signaled readiness, start sampling immediately
try:
wait_for_process_and_sample(process.pid, sort_value, args)
finally:
if process.poll() is None:
process.terminate()
try:
process.wait(timeout=2)
except subprocess.TimeoutExpired:
process.kill()
process.wait()
if __name__ == "__main__":
main()

View file

@ -4,6 +4,7 @@
import io
import marshal
import os
import shutil
import socket
import subprocess
import sys
@ -1588,6 +1589,68 @@ def test_sampling_all_threads(self):
# Just verify that sampling completed without error
# We're not testing output format here
def test_sample_target_script(self):
script_file = tempfile.NamedTemporaryFile(delete=False)
script_file.write(self.test_script.encode("utf-8"))
script_file.flush()
self.addCleanup(close_and_unlink, script_file)
test_args = ["profile.sample", "-d", "1", script_file.name]
with (
mock.patch("sys.argv", test_args),
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
):
try:
profile.sample.main()
except PermissionError:
self.skipTest("Insufficient permissions for remote profiling")
output = captured_output.getvalue()
# Basic checks on output
self.assertIn("Captured", output)
self.assertIn("samples", output)
self.assertIn("Profile Stats", output)
# Should see some of our test functions
self.assertIn("slow_fibonacci", output)
def test_sample_target_module(self):
tempdir = tempfile.TemporaryDirectory(delete=False)
self.addCleanup(lambda x: shutil.rmtree(x), tempdir.name)
module_path = os.path.join(tempdir.name, "test_module.py")
with open(module_path, "w") as f:
f.write(self.test_script)
test_args = ["profile.sample", "-d", "1", "-m", "test_module"]
with (
mock.patch("sys.argv", test_args),
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
# Change to temp directory so subprocess can find the module
contextlib.chdir(tempdir.name),
):
try:
profile.sample.main()
except PermissionError:
self.skipTest("Insufficient permissions for remote profiling")
output = captured_output.getvalue()
# Basic checks on output
self.assertIn("Captured", output)
self.assertIn("samples", output)
self.assertIn("Profile Stats", output)
# Should see some of our test functions
self.assertIn("slow_fibonacci", output)
@skip_if_not_supported
@unittest.skipIf(
@ -1660,12 +1723,11 @@ def test_is_process_running(self):
self.assertIsNotNone(profiler.unwinder.get_stack_trace())
proc.kill()
proc.wait()
# ValueError on MacOS (yeah I know), ProcessLookupError on Linux and Windows
self.assertRaises((ValueError, ProcessLookupError), profiler.unwinder.get_stack_trace)
self.assertRaises(ProcessLookupError, profiler.unwinder.get_stack_trace)
# Exit the context manager to ensure the process is terminated
self.assertFalse(profiler._is_process_running())
self.assertRaises((ValueError, ProcessLookupError), profiler.unwinder.get_stack_trace)
self.assertRaises(ProcessLookupError, profiler.unwinder.get_stack_trace)
@unittest.skipUnless(sys.platform == "linux", "Only valid on Linux")
def test_esrch_signal_handling(self):
@ -1690,16 +1752,315 @@ def test_esrch_signal_handling(self):
class TestSampleProfilerCLI(unittest.TestCase):
def _setup_sync_mocks(self, mock_socket, mock_popen):
"""Helper to set up socket and process mocks for coordinator tests."""
# Mock the sync socket with context manager support
mock_sock_instance = mock.MagicMock()
mock_sock_instance.getsockname.return_value = ("127.0.0.1", 12345)
# Mock the connection with context manager support
mock_conn = mock.MagicMock()
mock_conn.recv.return_value = b"ready"
mock_conn.__enter__.return_value = mock_conn
mock_conn.__exit__.return_value = None
# Mock accept() to return (connection, address) and support indexing
mock_accept_result = mock.MagicMock()
mock_accept_result.__getitem__.return_value = mock_conn # [0] returns the connection
mock_sock_instance.accept.return_value = mock_accept_result
# Mock socket with context manager support
mock_sock_instance.__enter__.return_value = mock_sock_instance
mock_sock_instance.__exit__.return_value = None
mock_socket.return_value = mock_sock_instance
# Mock the subprocess
mock_process = mock.MagicMock()
mock_process.pid = 12345
mock_process.poll.return_value = None
mock_popen.return_value = mock_process
return mock_process
def _verify_coordinator_command(self, mock_popen, expected_target_args):
"""Helper to verify the coordinator command was called correctly."""
args, kwargs = mock_popen.call_args
coordinator_cmd = args[0]
self.assertEqual(coordinator_cmd[0], sys.executable)
self.assertEqual(coordinator_cmd[1], "-m")
self.assertEqual(coordinator_cmd[2], "profile._sync_coordinator")
self.assertEqual(coordinator_cmd[3], "12345") # port
# cwd is coordinator_cmd[4]
self.assertEqual(coordinator_cmd[5:], expected_target_args)
def test_cli_module_argument_parsing(self):
test_args = ["profile.sample", "-m", "mymodule"]
with (
mock.patch("sys.argv", test_args),
mock.patch("profile.sample.sample") as mock_sample,
mock.patch("subprocess.Popen") as mock_popen,
mock.patch("socket.socket") as mock_socket,
):
self._setup_sync_mocks(mock_socket, mock_popen)
profile.sample.main()
self._verify_coordinator_command(mock_popen, ("-m", "mymodule"))
mock_sample.assert_called_once_with(
12345,
sort=2, # default sort (sort_value from args.sort)
sample_interval_usec=100,
duration_sec=10,
filename=None,
all_threads=False,
limit=15,
show_summary=True,
output_format="pstats",
realtime_stats=False,
)
def test_cli_module_with_arguments(self):
test_args = ["profile.sample", "-m", "mymodule", "arg1", "arg2", "--flag"]
with (
mock.patch("sys.argv", test_args),
mock.patch("profile.sample.sample") as mock_sample,
mock.patch("subprocess.Popen") as mock_popen,
mock.patch("socket.socket") as mock_socket,
):
self._setup_sync_mocks(mock_socket, mock_popen)
profile.sample.main()
self._verify_coordinator_command(mock_popen, ("-m", "mymodule", "arg1", "arg2", "--flag"))
mock_sample.assert_called_once_with(
12345,
sort=2,
sample_interval_usec=100,
duration_sec=10,
filename=None,
all_threads=False,
limit=15,
show_summary=True,
output_format="pstats",
realtime_stats=False,
)
def test_cli_script_argument_parsing(self):
test_args = ["profile.sample", "myscript.py"]
with (
mock.patch("sys.argv", test_args),
mock.patch("profile.sample.sample") as mock_sample,
mock.patch("subprocess.Popen") as mock_popen,
mock.patch("socket.socket") as mock_socket,
):
self._setup_sync_mocks(mock_socket, mock_popen)
profile.sample.main()
self._verify_coordinator_command(mock_popen, ("myscript.py",))
mock_sample.assert_called_once_with(
12345,
sort=2,
sample_interval_usec=100,
duration_sec=10,
filename=None,
all_threads=False,
limit=15,
show_summary=True,
output_format="pstats",
realtime_stats=False,
)
def test_cli_script_with_arguments(self):
test_args = ["profile.sample", "myscript.py", "arg1", "arg2", "--flag"]
with (
mock.patch("sys.argv", test_args),
mock.patch("profile.sample.sample") as mock_sample,
mock.patch("subprocess.Popen") as mock_popen,
mock.patch("socket.socket") as mock_socket,
):
# Use the helper to set up mocks consistently
mock_process = self._setup_sync_mocks(mock_socket, mock_popen)
# Override specific behavior for this test
mock_process.wait.side_effect = [subprocess.TimeoutExpired(test_args, 0.1), None]
profile.sample.main()
# Verify the coordinator command was called
args, kwargs = mock_popen.call_args
coordinator_cmd = args[0]
self.assertEqual(coordinator_cmd[0], sys.executable)
self.assertEqual(coordinator_cmd[1], "-m")
self.assertEqual(coordinator_cmd[2], "profile._sync_coordinator")
self.assertEqual(coordinator_cmd[3], "12345") # port
# cwd is coordinator_cmd[4]
self.assertEqual(coordinator_cmd[5:], ("myscript.py", "arg1", "arg2", "--flag"))
def test_cli_mutually_exclusive_pid_module(self):
test_args = ["profile.sample", "-p", "12345", "-m", "mymodule"]
with (
mock.patch("sys.argv", test_args),
mock.patch("sys.stderr", io.StringIO()) as mock_stderr,
self.assertRaises(SystemExit) as cm,
):
profile.sample.main()
self.assertEqual(cm.exception.code, 2) # argparse error
error_msg = mock_stderr.getvalue()
self.assertIn("not allowed with argument", error_msg)
def test_cli_mutually_exclusive_pid_script(self):
test_args = ["profile.sample", "-p", "12345", "myscript.py"]
with (
mock.patch("sys.argv", test_args),
mock.patch("sys.stderr", io.StringIO()) as mock_stderr,
self.assertRaises(SystemExit) as cm,
):
profile.sample.main()
self.assertEqual(cm.exception.code, 2) # argparse error
error_msg = mock_stderr.getvalue()
self.assertIn("only one target type can be specified", error_msg)
def test_cli_no_target_specified(self):
test_args = ["profile.sample", "-d", "5"]
with (
mock.patch("sys.argv", test_args),
mock.patch("sys.stderr", io.StringIO()) as mock_stderr,
self.assertRaises(SystemExit) as cm,
):
profile.sample.main()
self.assertEqual(cm.exception.code, 2) # argparse error
error_msg = mock_stderr.getvalue()
self.assertIn("one of the arguments", error_msg)
def test_cli_module_with_profiler_options(self):
test_args = [
"profile.sample", "-i", "1000", "-d", "30", "-a",
"--sort-tottime", "-l", "20", "-m", "mymodule",
]
with (
mock.patch("sys.argv", test_args),
mock.patch("profile.sample.sample") as mock_sample,
mock.patch("subprocess.Popen") as mock_popen,
mock.patch("socket.socket") as mock_socket,
):
self._setup_sync_mocks(mock_socket, mock_popen)
profile.sample.main()
self._verify_coordinator_command(mock_popen, ("-m", "mymodule"))
mock_sample.assert_called_once_with(
12345,
sort=1, # sort-tottime
sample_interval_usec=1000,
duration_sec=30,
filename=None,
all_threads=True,
limit=20,
show_summary=True,
output_format="pstats",
realtime_stats=False,
)
def test_cli_script_with_profiler_options(self):
"""Test script with various profiler options."""
test_args = [
"profile.sample", "-i", "2000", "-d", "60",
"--collapsed", "-o", "output.txt",
"myscript.py", "scriptarg",
]
with (
mock.patch("sys.argv", test_args),
mock.patch("profile.sample.sample") as mock_sample,
mock.patch("subprocess.Popen") as mock_popen,
mock.patch("socket.socket") as mock_socket,
):
self._setup_sync_mocks(mock_socket, mock_popen)
profile.sample.main()
self._verify_coordinator_command(mock_popen, ("myscript.py", "scriptarg"))
# Verify profiler options were passed correctly
mock_sample.assert_called_once_with(
12345,
sort=2, # default sort
sample_interval_usec=2000,
duration_sec=60,
filename="output.txt",
all_threads=False,
limit=15,
show_summary=True,
output_format="collapsed",
realtime_stats=False,
)
def test_cli_empty_module_name(self):
test_args = ["profile.sample", "-m"]
with (
mock.patch("sys.argv", test_args),
mock.patch("sys.stderr", io.StringIO()) as mock_stderr,
self.assertRaises(SystemExit) as cm,
):
profile.sample.main()
self.assertEqual(cm.exception.code, 2) # argparse error
error_msg = mock_stderr.getvalue()
self.assertIn("argument -m/--module: expected one argument", error_msg)
def test_cli_long_module_option(self):
test_args = ["profile.sample", "--module", "mymodule", "arg1"]
with (
mock.patch("sys.argv", test_args),
mock.patch("profile.sample.sample") as mock_sample,
mock.patch("subprocess.Popen") as mock_popen,
mock.patch("socket.socket") as mock_socket,
):
self._setup_sync_mocks(mock_socket, mock_popen)
profile.sample.main()
self._verify_coordinator_command(mock_popen, ("-m", "mymodule", "arg1"))
def test_cli_complex_script_arguments(self):
test_args = [
"profile.sample", "script.py",
"--input", "file.txt", "-v", "--output=/tmp/out", "positional"
]
with (
mock.patch("sys.argv", test_args),
mock.patch("profile.sample.sample") as mock_sample,
mock.patch("profile.sample._run_with_sync") as mock_run_with_sync,
):
mock_process = mock.MagicMock()
mock_process.pid = 12345
mock_process.wait.side_effect = [subprocess.TimeoutExpired(test_args, 0.1), None]
mock_process.poll.return_value = None
mock_run_with_sync.return_value = mock_process
profile.sample.main()
mock_run_with_sync.assert_called_once_with((
sys.executable, "script.py",
"--input", "file.txt", "-v", "--output=/tmp/out", "positional",
))
def test_cli_collapsed_format_validation(self):
"""Test that CLI properly validates incompatible options with collapsed format."""
test_cases = [
# Test sort options are invalid with collapsed
(
["profile.sample", "--collapsed", "--sort-nsamples", "12345"],
["profile.sample", "--collapsed", "--sort-nsamples", "-p", "12345"],
"sort",
),
(
["profile.sample", "--collapsed", "--sort-tottime", "12345"],
["profile.sample", "--collapsed", "--sort-tottime", "-p", "12345"],
"sort",
),
(
@ -1707,6 +2068,7 @@ def test_cli_collapsed_format_validation(self):
"profile.sample",
"--collapsed",
"--sort-cumtime",
"-p",
"12345",
],
"sort",
@ -1716,6 +2078,7 @@ def test_cli_collapsed_format_validation(self):
"profile.sample",
"--collapsed",
"--sort-sample-pct",
"-p",
"12345",
],
"sort",
@ -1725,23 +2088,24 @@ def test_cli_collapsed_format_validation(self):
"profile.sample",
"--collapsed",
"--sort-cumul-pct",
"-p",
"12345",
],
"sort",
),
(
["profile.sample", "--collapsed", "--sort-name", "12345"],
["profile.sample", "--collapsed", "--sort-name", "-p", "12345"],
"sort",
),
# Test limit option is invalid with collapsed
(["profile.sample", "--collapsed", "-l", "20", "12345"], "limit"),
(["profile.sample", "--collapsed", "-l", "20", "-p", "12345"], "limit"),
(
["profile.sample", "--collapsed", "--limit", "20", "12345"],
["profile.sample", "--collapsed", "--limit", "20", "-p", "12345"],
"limit",
),
# Test no-summary option is invalid with collapsed
(
["profile.sample", "--collapsed", "--no-summary", "12345"],
["profile.sample", "--collapsed", "--no-summary", "-p", "12345"],
"summary",
),
]
@ -1761,7 +2125,7 @@ def test_cli_collapsed_format_validation(self):
def test_cli_default_collapsed_filename(self):
"""Test that collapsed format gets a default filename when not specified."""
test_args = ["profile.sample", "--collapsed", "12345"]
test_args = ["profile.sample", "--collapsed", "-p", "12345"]
with (
mock.patch("sys.argv", test_args),
@ -1779,12 +2143,12 @@ def test_cli_custom_output_filenames(self):
"""Test custom output filenames for both formats."""
test_cases = [
(
["profile.sample", "--pstats", "-o", "custom.pstats", "12345"],
["profile.sample", "--pstats", "-o", "custom.pstats", "-p", "12345"],
"custom.pstats",
"pstats",
),
(
["profile.sample", "--collapsed", "-o", "custom.txt", "12345"],
["profile.sample", "--collapsed", "-o", "custom.txt", "-p", "12345"],
"custom.txt",
"collapsed",
),
@ -1816,7 +2180,7 @@ def test_cli_mutually_exclusive_format_options(self):
with (
mock.patch(
"sys.argv",
["profile.sample", "--pstats", "--collapsed", "12345"],
["profile.sample", "--pstats", "--collapsed", "-p", "12345"],
),
mock.patch("sys.stderr", io.StringIO()),
):
@ -1824,7 +2188,7 @@ def test_cli_mutually_exclusive_format_options(self):
profile.sample.main()
def test_argument_parsing_basic(self):
test_args = ["profile.sample", "12345"]
test_args = ["profile.sample", "-p", "12345"]
with (
mock.patch("sys.argv", test_args),
@ -1856,7 +2220,7 @@ def test_sort_options(self):
]
for option, expected_sort_value in sort_options:
test_args = ["profile.sample", option, "12345"]
test_args = ["profile.sample", option, "-p", "12345"]
with (
mock.patch("sys.argv", test_args),

View file

@ -75,7 +75,7 @@
#endif
#define INTERP_STATE_BUFFER_SIZE MAX(INTERP_STATE_MIN_SIZE, 256)
#define MAX_TLBC_SIZE 2048
// Copied from Modules/_asynciomodule.c because it's not exported
@ -1568,15 +1568,34 @@ cache_tlbc_array(RemoteUnwinderObject *unwinder, uintptr_t code_addr, uintptr_t
TLBCCacheEntry *entry = NULL;
// Read the TLBC array pointer
if (read_ptr(unwinder, tlbc_array_addr, &tlbc_array_ptr) != 0 || tlbc_array_ptr == 0) {
if (read_ptr(unwinder, tlbc_array_addr, &tlbc_array_ptr) != 0) {
PyErr_SetString(PyExc_RuntimeError, "Failed to read TLBC array pointer");
set_exception_cause(unwinder, PyExc_RuntimeError, "Failed to read TLBC array pointer");
return 0; // Read error
}
// Validate TLBC array pointer
if (tlbc_array_ptr == 0) {
PyErr_SetString(PyExc_RuntimeError, "TLBC array pointer is NULL");
return 0; // No TLBC array
}
// Read the TLBC array size
Py_ssize_t tlbc_size;
if (_Py_RemoteDebug_PagedReadRemoteMemory(&unwinder->handle, tlbc_array_ptr, sizeof(tlbc_size), &tlbc_size) != 0 || tlbc_size <= 0) {
if (_Py_RemoteDebug_PagedReadRemoteMemory(&unwinder->handle, tlbc_array_ptr, sizeof(tlbc_size), &tlbc_size) != 0) {
PyErr_SetString(PyExc_RuntimeError, "Failed to read TLBC array size");
set_exception_cause(unwinder, PyExc_RuntimeError, "Failed to read TLBC array size");
return 0; // Read error
}
// Validate TLBC array size
if (tlbc_size <= 0) {
PyErr_SetString(PyExc_RuntimeError, "Invalid TLBC array size");
return 0; // Invalid size
}
if (tlbc_size > MAX_TLBC_SIZE) {
PyErr_SetString(PyExc_RuntimeError, "TLBC array size exceeds maximum limit");
return 0; // Invalid size
}
@ -1584,6 +1603,7 @@ cache_tlbc_array(RemoteUnwinderObject *unwinder, uintptr_t code_addr, uintptr_t
size_t array_data_size = tlbc_size * sizeof(void*);
tlbc_array = PyMem_RawMalloc(sizeof(Py_ssize_t) + array_data_size);
if (!tlbc_array) {
PyErr_NoMemory();
set_exception_cause(unwinder, PyExc_MemoryError, "Failed to allocate TLBC array");
return 0; // Memory error
}
@ -1597,6 +1617,7 @@ cache_tlbc_array(RemoteUnwinderObject *unwinder, uintptr_t code_addr, uintptr_t
// Create cache entry
entry = PyMem_RawMalloc(sizeof(TLBCCacheEntry));
if (!entry) {
PyErr_NoMemory();
PyMem_RawFree(tlbc_array);
set_exception_cause(unwinder, PyExc_MemoryError, "Failed to allocate TLBC cache entry");
return 0; // Memory error
@ -1777,6 +1798,7 @@ parse_code_object(RemoteUnwinderObject *unwinder,
meta = PyMem_RawMalloc(sizeof(CachedCodeMetadata));
if (!meta) {
PyErr_NoMemory();
set_exception_cause(unwinder, PyExc_MemoryError, "Failed to allocate cached code metadata");
goto error;
}

View file

@ -50,9 +50,11 @@ extern "C" {
# include <mach-o/fat.h>
# include <mach-o/loader.h>
# include <mach-o/nlist.h>
# include <mach/error.h>
# include <mach/mach.h>
# include <mach/mach_vm.h>
# include <mach/machine.h>
# include <mach/task_info.h>
# include <sys/mman.h>
# include <sys/proc.h>
# include <sys/sysctl.h>
@ -1053,18 +1055,38 @@ _Py_RemoteDebug_ReadRemoteMemory(proc_handle_t *handle, uintptr_t remote_address
(mach_vm_size_t*)&result);
if (kr != KERN_SUCCESS) {
switch (kr) {
switch (err_get_code(kr)) {
case KERN_PROTECTION_FAILURE:
PyErr_Format(PyExc_PermissionError,
"Memory protection failure reading from PID %d at address "
"0x%lx (size %zu): insufficient permissions",
handle->pid, remote_address, len);
break;
case KERN_INVALID_ARGUMENT:
PyErr_Format(PyExc_ValueError,
"Invalid argument to mach_vm_read_overwrite for PID %d at "
"address 0x%lx (size %zu)",
handle->pid, remote_address, len);
case KERN_INVALID_ARGUMENT: {
// Perform a task_info check to see if the invalid argument is due
// to the process being terminated
task_basic_info_data_t task_basic_info;
mach_msg_type_number_t task_info_count = TASK_BASIC_INFO_COUNT;
kern_return_t task_valid_check = task_info(handle->task, TASK_BASIC_INFO,
(task_info_t)&task_basic_info,
&task_info_count);
if (task_valid_check == KERN_INVALID_ARGUMENT) {
PyErr_Format(PyExc_ProcessLookupError,
"Process %d is no longer accessible (process terminated)",
handle->pid);
} else {
PyErr_Format(PyExc_ValueError,
"Invalid argument to mach_vm_read_overwrite for PID %d at "
"address 0x%lx (size %zu) - check memory permissions",
handle->pid, remote_address, len);
}
break;
}
case KERN_NO_SPACE:
case KERN_MEMORY_ERROR:
PyErr_Format(PyExc_ProcessLookupError,
"Process %d memory space no longer available (process terminated)",
handle->pid);
break;
default:
PyErr_Format(PyExc_RuntimeError,