cpython/Lib/test/test_profiling/test_sampling_profiler/test_cli.py

665 lines
22 KiB
Python
Raw Normal View History

"""Tests for sampling profiler CLI argument parsing and functionality."""
import io
import subprocess
import sys
import unittest
from unittest import mock
try:
import _remote_debugging # noqa: F401
import profiling.sampling
import profiling.sampling.sample
except ImportError:
raise unittest.SkipTest(
"Test only runs when _remote_debugging is available"
)
from test.support import is_emscripten
class TestSampleProfilerCLI(unittest.TestCase):
def _setup_sync_mocks(self, mock_socket, mock_popen):
"""Helper to set up socket and process mocks for coordinator tests."""
# Mock the sync socket with context manager support
mock_sock_instance = mock.MagicMock()
mock_sock_instance.getsockname.return_value = ("127.0.0.1", 12345)
# Mock the connection with context manager support
mock_conn = mock.MagicMock()
mock_conn.recv.return_value = b"ready"
mock_conn.__enter__.return_value = mock_conn
mock_conn.__exit__.return_value = None
# Mock accept() to return (connection, address) and support indexing
mock_accept_result = mock.MagicMock()
mock_accept_result.__getitem__.return_value = (
mock_conn # [0] returns the connection
)
mock_sock_instance.accept.return_value = mock_accept_result
# Mock socket with context manager support
mock_sock_instance.__enter__.return_value = mock_sock_instance
mock_sock_instance.__exit__.return_value = None
mock_socket.return_value = mock_sock_instance
# Mock the subprocess
mock_process = mock.MagicMock()
mock_process.pid = 12345
mock_process.poll.return_value = None
mock_popen.return_value = mock_process
return mock_process
def _verify_coordinator_command(self, mock_popen, expected_target_args):
"""Helper to verify the coordinator command was called correctly."""
args, kwargs = mock_popen.call_args
coordinator_cmd = args[0]
self.assertEqual(coordinator_cmd[0], sys.executable)
self.assertEqual(coordinator_cmd[1], "-m")
self.assertEqual(
coordinator_cmd[2], "profiling.sampling._sync_coordinator"
)
self.assertEqual(coordinator_cmd[3], "12345") # port
# cwd is coordinator_cmd[4]
self.assertEqual(coordinator_cmd[5:], expected_target_args)
@unittest.skipIf(is_emscripten, "socket.SO_REUSEADDR does not exist")
def test_cli_module_argument_parsing(self):
test_args = ["profiling.sampling.sample", "-m", "mymodule"]
with (
mock.patch("sys.argv", test_args),
mock.patch("profiling.sampling.sample.sample") as mock_sample,
mock.patch("subprocess.Popen") as mock_popen,
mock.patch("socket.socket") as mock_socket,
):
self._setup_sync_mocks(mock_socket, mock_popen)
profiling.sampling.sample.main()
self._verify_coordinator_command(mock_popen, ("-m", "mymodule"))
mock_sample.assert_called_once_with(
12345,
sort=2, # default sort (sort_value from args.sort)
sample_interval_usec=100,
duration_sec=10,
filename=None,
all_threads=False,
limit=15,
show_summary=True,
output_format="pstats",
realtime_stats=False,
mode=0,
native=False,
gc=True,
)
@unittest.skipIf(is_emscripten, "socket.SO_REUSEADDR does not exist")
def test_cli_module_with_arguments(self):
test_args = [
"profiling.sampling.sample",
"-m",
"mymodule",
"arg1",
"arg2",
"--flag",
]
with (
mock.patch("sys.argv", test_args),
mock.patch("profiling.sampling.sample.sample") as mock_sample,
mock.patch("subprocess.Popen") as mock_popen,
mock.patch("socket.socket") as mock_socket,
):
self._setup_sync_mocks(mock_socket, mock_popen)
profiling.sampling.sample.main()
self._verify_coordinator_command(
mock_popen, ("-m", "mymodule", "arg1", "arg2", "--flag")
)
mock_sample.assert_called_once_with(
12345,
sort=2,
sample_interval_usec=100,
duration_sec=10,
filename=None,
all_threads=False,
limit=15,
show_summary=True,
output_format="pstats",
realtime_stats=False,
mode=0,
native=False,
gc=True,
)
@unittest.skipIf(is_emscripten, "socket.SO_REUSEADDR does not exist")
def test_cli_script_argument_parsing(self):
test_args = ["profiling.sampling.sample", "myscript.py"]
with (
mock.patch("sys.argv", test_args),
mock.patch("profiling.sampling.sample.sample") as mock_sample,
mock.patch("subprocess.Popen") as mock_popen,
mock.patch("socket.socket") as mock_socket,
):
self._setup_sync_mocks(mock_socket, mock_popen)
profiling.sampling.sample.main()
self._verify_coordinator_command(mock_popen, ("myscript.py",))
mock_sample.assert_called_once_with(
12345,
sort=2,
sample_interval_usec=100,
duration_sec=10,
filename=None,
all_threads=False,
limit=15,
show_summary=True,
output_format="pstats",
realtime_stats=False,
mode=0,
native=False,
gc=True,
)
@unittest.skipIf(is_emscripten, "socket.SO_REUSEADDR does not exist")
def test_cli_script_with_arguments(self):
test_args = [
"profiling.sampling.sample",
"myscript.py",
"arg1",
"arg2",
"--flag",
]
with (
mock.patch("sys.argv", test_args),
mock.patch("profiling.sampling.sample.sample") as mock_sample,
mock.patch("subprocess.Popen") as mock_popen,
mock.patch("socket.socket") as mock_socket,
):
# Use the helper to set up mocks consistently
mock_process = self._setup_sync_mocks(mock_socket, mock_popen)
# Override specific behavior for this test
mock_process.wait.side_effect = [
subprocess.TimeoutExpired(test_args, 0.1),
None,
]
profiling.sampling.sample.main()
# Verify the coordinator command was called
args, kwargs = mock_popen.call_args
coordinator_cmd = args[0]
self.assertEqual(coordinator_cmd[0], sys.executable)
self.assertEqual(coordinator_cmd[1], "-m")
self.assertEqual(
coordinator_cmd[2], "profiling.sampling._sync_coordinator"
)
self.assertEqual(coordinator_cmd[3], "12345") # port
# cwd is coordinator_cmd[4]
self.assertEqual(
coordinator_cmd[5:], ("myscript.py", "arg1", "arg2", "--flag")
)
def test_cli_mutually_exclusive_pid_module(self):
test_args = [
"profiling.sampling.sample",
"-p",
"12345",
"-m",
"mymodule",
]
with (
mock.patch("sys.argv", test_args),
mock.patch("sys.stderr", io.StringIO()) as mock_stderr,
self.assertRaises(SystemExit) as cm,
):
profiling.sampling.sample.main()
self.assertEqual(cm.exception.code, 2) # argparse error
error_msg = mock_stderr.getvalue()
self.assertIn("not allowed with argument", error_msg)
def test_cli_mutually_exclusive_pid_script(self):
test_args = ["profiling.sampling.sample", "-p", "12345", "myscript.py"]
with (
mock.patch("sys.argv", test_args),
mock.patch("sys.stderr", io.StringIO()) as mock_stderr,
self.assertRaises(SystemExit) as cm,
):
profiling.sampling.sample.main()
self.assertEqual(cm.exception.code, 2) # argparse error
error_msg = mock_stderr.getvalue()
self.assertIn("only one target type can be specified", error_msg)
def test_cli_no_target_specified(self):
test_args = ["profiling.sampling.sample", "-d", "5"]
with (
mock.patch("sys.argv", test_args),
mock.patch("sys.stderr", io.StringIO()) as mock_stderr,
self.assertRaises(SystemExit) as cm,
):
profiling.sampling.sample.main()
self.assertEqual(cm.exception.code, 2) # argparse error
error_msg = mock_stderr.getvalue()
self.assertIn("one of the arguments", error_msg)
@unittest.skipIf(is_emscripten, "socket.SO_REUSEADDR does not exist")
def test_cli_module_with_profiler_options(self):
test_args = [
"profiling.sampling.sample",
"-i",
"1000",
"-d",
"30",
"-a",
"--sort-tottime",
"-l",
"20",
"-m",
"mymodule",
]
with (
mock.patch("sys.argv", test_args),
mock.patch("profiling.sampling.sample.sample") as mock_sample,
mock.patch("subprocess.Popen") as mock_popen,
mock.patch("socket.socket") as mock_socket,
):
self._setup_sync_mocks(mock_socket, mock_popen)
profiling.sampling.sample.main()
self._verify_coordinator_command(mock_popen, ("-m", "mymodule"))
mock_sample.assert_called_once_with(
12345,
sort=1, # sort-tottime
sample_interval_usec=1000,
duration_sec=30,
filename=None,
all_threads=True,
limit=20,
show_summary=True,
output_format="pstats",
realtime_stats=False,
mode=0,
native=False,
gc=True,
)
@unittest.skipIf(is_emscripten, "socket.SO_REUSEADDR does not exist")
def test_cli_script_with_profiler_options(self):
"""Test script with various profiler options."""
test_args = [
"profiling.sampling.sample",
"-i",
"2000",
"-d",
"60",
"--collapsed",
"-o",
"output.txt",
"myscript.py",
"scriptarg",
]
with (
mock.patch("sys.argv", test_args),
mock.patch("profiling.sampling.sample.sample") as mock_sample,
mock.patch("subprocess.Popen") as mock_popen,
mock.patch("socket.socket") as mock_socket,
):
self._setup_sync_mocks(mock_socket, mock_popen)
profiling.sampling.sample.main()
self._verify_coordinator_command(
mock_popen, ("myscript.py", "scriptarg")
)
# Verify profiler options were passed correctly
mock_sample.assert_called_once_with(
12345,
sort=2, # default sort
sample_interval_usec=2000,
duration_sec=60,
filename="output.txt",
all_threads=False,
limit=15,
show_summary=True,
output_format="collapsed",
realtime_stats=False,
mode=0,
native=False,
gc=True,
)
def test_cli_empty_module_name(self):
test_args = ["profiling.sampling.sample", "-m"]
with (
mock.patch("sys.argv", test_args),
mock.patch("sys.stderr", io.StringIO()) as mock_stderr,
self.assertRaises(SystemExit) as cm,
):
profiling.sampling.sample.main()
self.assertEqual(cm.exception.code, 2) # argparse error
error_msg = mock_stderr.getvalue()
self.assertIn("argument -m/--module: expected one argument", error_msg)
@unittest.skipIf(is_emscripten, "socket.SO_REUSEADDR does not exist")
def test_cli_long_module_option(self):
test_args = [
"profiling.sampling.sample",
"--module",
"mymodule",
"arg1",
]
with (
mock.patch("sys.argv", test_args),
mock.patch("profiling.sampling.sample.sample") as mock_sample,
mock.patch("subprocess.Popen") as mock_popen,
mock.patch("socket.socket") as mock_socket,
):
self._setup_sync_mocks(mock_socket, mock_popen)
profiling.sampling.sample.main()
self._verify_coordinator_command(
mock_popen, ("-m", "mymodule", "arg1")
)
def test_cli_complex_script_arguments(self):
test_args = [
"profiling.sampling.sample",
"script.py",
"--input",
"file.txt",
"-v",
"--output=/tmp/out",
"positional",
]
with (
mock.patch("sys.argv", test_args),
mock.patch("profiling.sampling.sample.sample") as mock_sample,
mock.patch(
"profiling.sampling.sample._run_with_sync"
) as mock_run_with_sync,
):
mock_process = mock.MagicMock()
mock_process.pid = 12345
mock_process.wait.side_effect = [
subprocess.TimeoutExpired(test_args, 0.1),
None,
]
mock_process.poll.return_value = None
mock_run_with_sync.return_value = mock_process
profiling.sampling.sample.main()
mock_run_with_sync.assert_called_once_with(
(
sys.executable,
"script.py",
"--input",
"file.txt",
"-v",
"--output=/tmp/out",
"positional",
)
)
def test_cli_collapsed_format_validation(self):
"""Test that CLI properly validates incompatible options with collapsed format."""
test_cases = [
# Test sort options are invalid with collapsed
(
[
"profiling.sampling.sample",
"--collapsed",
"--sort-nsamples",
"-p",
"12345",
],
"sort",
),
(
[
"profiling.sampling.sample",
"--collapsed",
"--sort-tottime",
"-p",
"12345",
],
"sort",
),
(
[
"profiling.sampling.sample",
"--collapsed",
"--sort-cumtime",
"-p",
"12345",
],
"sort",
),
(
[
"profiling.sampling.sample",
"--collapsed",
"--sort-sample-pct",
"-p",
"12345",
],
"sort",
),
(
[
"profiling.sampling.sample",
"--collapsed",
"--sort-cumul-pct",
"-p",
"12345",
],
"sort",
),
(
[
"profiling.sampling.sample",
"--collapsed",
"--sort-name",
"-p",
"12345",
],
"sort",
),
# Test limit option is invalid with collapsed
(
[
"profiling.sampling.sample",
"--collapsed",
"-l",
"20",
"-p",
"12345",
],
"limit",
),
(
[
"profiling.sampling.sample",
"--collapsed",
"--limit",
"20",
"-p",
"12345",
],
"limit",
),
# Test no-summary option is invalid with collapsed
(
[
"profiling.sampling.sample",
"--collapsed",
"--no-summary",
"-p",
"12345",
],
"summary",
),
]
for test_args, expected_error_keyword in test_cases:
with (
mock.patch("sys.argv", test_args),
mock.patch("sys.stderr", io.StringIO()) as mock_stderr,
self.assertRaises(SystemExit) as cm,
):
profiling.sampling.sample.main()
self.assertEqual(cm.exception.code, 2) # argparse error code
error_msg = mock_stderr.getvalue()
self.assertIn("error:", error_msg)
self.assertIn("--pstats format", error_msg)
def test_cli_default_collapsed_filename(self):
"""Test that collapsed format gets a default filename when not specified."""
test_args = ["profiling.sampling.sample", "--collapsed", "-p", "12345"]
with (
mock.patch("sys.argv", test_args),
mock.patch("profiling.sampling.sample.sample") as mock_sample,
):
profiling.sampling.sample.main()
# Check that filename was set to default collapsed format
mock_sample.assert_called_once()
call_args = mock_sample.call_args[1]
self.assertEqual(call_args["output_format"], "collapsed")
self.assertEqual(call_args["filename"], "collapsed.12345.txt")
def test_cli_custom_output_filenames(self):
"""Test custom output filenames for both formats."""
test_cases = [
(
[
"profiling.sampling.sample",
"--pstats",
"-o",
"custom.pstats",
"-p",
"12345",
],
"custom.pstats",
"pstats",
),
(
[
"profiling.sampling.sample",
"--collapsed",
"-o",
"custom.txt",
"-p",
"12345",
],
"custom.txt",
"collapsed",
),
]
for test_args, expected_filename, expected_format in test_cases:
with (
mock.patch("sys.argv", test_args),
mock.patch("profiling.sampling.sample.sample") as mock_sample,
):
profiling.sampling.sample.main()
mock_sample.assert_called_once()
call_args = mock_sample.call_args[1]
self.assertEqual(call_args["filename"], expected_filename)
self.assertEqual(call_args["output_format"], expected_format)
def test_cli_missing_required_arguments(self):
"""Test that CLI requires PID argument."""
with (
mock.patch("sys.argv", ["profiling.sampling.sample"]),
mock.patch("sys.stderr", io.StringIO()),
):
with self.assertRaises(SystemExit):
profiling.sampling.sample.main()
def test_cli_mutually_exclusive_format_options(self):
"""Test that pstats and collapsed options are mutually exclusive."""
with (
mock.patch(
"sys.argv",
[
"profiling.sampling.sample",
"--pstats",
"--collapsed",
"-p",
"12345",
],
),
mock.patch("sys.stderr", io.StringIO()),
):
with self.assertRaises(SystemExit):
profiling.sampling.sample.main()
def test_argument_parsing_basic(self):
test_args = ["profiling.sampling.sample", "-p", "12345"]
with (
mock.patch("sys.argv", test_args),
mock.patch("profiling.sampling.sample.sample") as mock_sample,
):
profiling.sampling.sample.main()
mock_sample.assert_called_once_with(
12345,
sample_interval_usec=100,
duration_sec=10,
filename=None,
all_threads=False,
limit=15,
sort=2,
show_summary=True,
output_format="pstats",
realtime_stats=False,
mode=0,
native=False,
gc=True,
)
def test_sort_options(self):
sort_options = [
("--sort-nsamples", 0),
("--sort-tottime", 1),
("--sort-cumtime", 2),
("--sort-sample-pct", 3),
("--sort-cumul-pct", 4),
("--sort-name", -1),
]
for option, expected_sort_value in sort_options:
test_args = ["profiling.sampling.sample", option, "-p", "12345"]
with (
mock.patch("sys.argv", test_args),
mock.patch("profiling.sampling.sample.sample") as mock_sample,
):
profiling.sampling.sample.main()
mock_sample.assert_called_once()
call_args = mock_sample.call_args[1]
self.assertEqual(
call_args["sort"],
expected_sort_value,
)
mock_sample.reset_mock()