mirror of
https://github.com/python/cpython.git
synced 2025-12-08 06:10:17 +00:00
The tests were flaky on slow machines because subprocesses could finish before enough samples were collected. This adds synchronization similar to test_external_inspection: test scripts now signal when they start working, and the profiler waits for this signal before sampling. Test scripts now run in infinite loops until killed rather than for fixed iterations, ensuring the profiler always has active work to sample regardless of machine speed.
258 lines
8.6 KiB
Python
258 lines
8.6 KiB
Python
"""Tests for advanced sampling profiler features (GC tracking, native frames, ProcessPoolExecutor support)."""
|
|
|
|
import io
|
|
import os
|
|
import subprocess
|
|
import tempfile
|
|
import unittest
|
|
from unittest import mock
|
|
|
|
try:
|
|
import _remote_debugging # noqa: F401
|
|
import profiling.sampling
|
|
import profiling.sampling.sample
|
|
except ImportError:
|
|
raise unittest.SkipTest(
|
|
"Test only runs when _remote_debugging is available"
|
|
)
|
|
|
|
from test.support import (
|
|
SHORT_TIMEOUT,
|
|
SuppressCrashReport,
|
|
os_helper,
|
|
requires_subprocess,
|
|
script_helper,
|
|
)
|
|
|
|
from .helpers import close_and_unlink, skip_if_not_supported, test_subprocess
|
|
|
|
|
|
@requires_subprocess()
|
|
@skip_if_not_supported
|
|
class TestGCFrameTracking(unittest.TestCase):
|
|
"""Tests for GC frame tracking in the sampling profiler."""
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
"""Create a static test script with GC frames and CPU-intensive work."""
|
|
cls.gc_test_script = '''
|
|
import gc
|
|
|
|
class ExpensiveGarbage:
|
|
def __init__(self):
|
|
self.cycle = self
|
|
|
|
def __del__(self):
|
|
result = 0
|
|
for i in range(100000):
|
|
result += i * i
|
|
if i % 1000 == 0:
|
|
result = result % 1000000
|
|
|
|
_test_sock.sendall(b"working")
|
|
while True:
|
|
ExpensiveGarbage()
|
|
gc.collect()
|
|
'''
|
|
|
|
def test_gc_frames_enabled(self):
|
|
"""Test that GC frames appear when gc tracking is enabled."""
|
|
with (
|
|
test_subprocess(self.gc_test_script, wait_for_working=True) as subproc,
|
|
io.StringIO() as captured_output,
|
|
mock.patch("sys.stdout", captured_output),
|
|
):
|
|
try:
|
|
from profiling.sampling.pstats_collector import PstatsCollector
|
|
collector = PstatsCollector(sample_interval_usec=5000, skip_idle=False)
|
|
profiling.sampling.sample.sample(
|
|
subproc.process.pid,
|
|
collector,
|
|
duration_sec=1,
|
|
native=False,
|
|
gc=True,
|
|
)
|
|
collector.print_stats(show_summary=False)
|
|
except PermissionError:
|
|
self.skipTest("Insufficient permissions for remote profiling")
|
|
|
|
output = captured_output.getvalue()
|
|
|
|
# Should capture samples
|
|
self.assertIn("Captured", output)
|
|
self.assertIn("samples", output)
|
|
|
|
# GC frames should be present
|
|
self.assertIn("<GC>", output)
|
|
|
|
def test_gc_frames_disabled(self):
|
|
"""Test that GC frames do not appear when gc tracking is disabled."""
|
|
with (
|
|
test_subprocess(self.gc_test_script, wait_for_working=True) as subproc,
|
|
io.StringIO() as captured_output,
|
|
mock.patch("sys.stdout", captured_output),
|
|
):
|
|
try:
|
|
from profiling.sampling.pstats_collector import PstatsCollector
|
|
collector = PstatsCollector(sample_interval_usec=5000, skip_idle=False)
|
|
profiling.sampling.sample.sample(
|
|
subproc.process.pid,
|
|
collector,
|
|
duration_sec=1,
|
|
native=False,
|
|
gc=False,
|
|
)
|
|
collector.print_stats(show_summary=False)
|
|
except PermissionError:
|
|
self.skipTest("Insufficient permissions for remote profiling")
|
|
|
|
output = captured_output.getvalue()
|
|
|
|
# Should capture samples
|
|
self.assertIn("Captured", output)
|
|
self.assertIn("samples", output)
|
|
|
|
# GC frames should NOT be present
|
|
self.assertNotIn("<GC>", output)
|
|
|
|
|
|
@requires_subprocess()
|
|
@skip_if_not_supported
|
|
class TestNativeFrameTracking(unittest.TestCase):
|
|
"""Tests for native frame tracking in the sampling profiler."""
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
"""Create a static test script with native frames and CPU-intensive work."""
|
|
cls.native_test_script = """
|
|
import operator
|
|
|
|
def inner():
|
|
for _ in range(1_000_0000):
|
|
pass
|
|
|
|
_test_sock.sendall(b"working")
|
|
while True:
|
|
operator.call(inner)
|
|
"""
|
|
|
|
def test_native_frames_enabled(self):
|
|
"""Test that native frames appear when native tracking is enabled."""
|
|
collapsed_file = tempfile.NamedTemporaryFile(
|
|
suffix=".txt", delete=False
|
|
)
|
|
self.addCleanup(close_and_unlink, collapsed_file)
|
|
|
|
with test_subprocess(self.native_test_script, wait_for_working=True) as subproc:
|
|
with (
|
|
io.StringIO() as captured_output,
|
|
mock.patch("sys.stdout", captured_output),
|
|
):
|
|
try:
|
|
from profiling.sampling.stack_collector import CollapsedStackCollector
|
|
collector = CollapsedStackCollector(1000, skip_idle=False)
|
|
profiling.sampling.sample.sample(
|
|
subproc.process.pid,
|
|
collector,
|
|
duration_sec=1,
|
|
native=True,
|
|
)
|
|
collector.export(collapsed_file.name)
|
|
except PermissionError:
|
|
self.skipTest(
|
|
"Insufficient permissions for remote profiling"
|
|
)
|
|
|
|
# Verify file was created and contains valid data
|
|
self.assertTrue(os.path.exists(collapsed_file.name))
|
|
self.assertGreater(os.path.getsize(collapsed_file.name), 0)
|
|
|
|
# Check file format
|
|
with open(collapsed_file.name, "r") as f:
|
|
content = f.read()
|
|
|
|
lines = content.strip().split("\n")
|
|
self.assertGreater(len(lines), 0)
|
|
|
|
stacks = [line.rsplit(" ", 1)[0] for line in lines]
|
|
|
|
# Most samples should have native code in the middle of the stack:
|
|
self.assertTrue(any(";<native>;" in stack for stack in stacks))
|
|
|
|
# No samples should have native code at the top of the stack:
|
|
self.assertFalse(any(stack.endswith(";<native>") for stack in stacks))
|
|
|
|
def test_native_frames_disabled(self):
|
|
"""Test that native frames do not appear when native tracking is disabled."""
|
|
with (
|
|
test_subprocess(self.native_test_script, wait_for_working=True) as subproc,
|
|
io.StringIO() as captured_output,
|
|
mock.patch("sys.stdout", captured_output),
|
|
):
|
|
try:
|
|
from profiling.sampling.pstats_collector import PstatsCollector
|
|
collector = PstatsCollector(sample_interval_usec=5000, skip_idle=False)
|
|
profiling.sampling.sample.sample(
|
|
subproc.process.pid,
|
|
collector,
|
|
duration_sec=1,
|
|
)
|
|
collector.print_stats(show_summary=False)
|
|
except PermissionError:
|
|
self.skipTest("Insufficient permissions for remote profiling")
|
|
output = captured_output.getvalue()
|
|
# Native frames should NOT be present:
|
|
self.assertNotIn("<native>", output)
|
|
|
|
|
|
@requires_subprocess()
|
|
@skip_if_not_supported
|
|
class TestProcessPoolExecutorSupport(unittest.TestCase):
|
|
"""
|
|
Test that ProcessPoolExecutor works correctly with profiling.sampling.
|
|
"""
|
|
|
|
def test_process_pool_executor_pickle(self):
|
|
# gh-140729: test use ProcessPoolExecutor.map() can sampling
|
|
test_script = """
|
|
import concurrent.futures
|
|
|
|
def worker(x):
|
|
return x * 2
|
|
|
|
if __name__ == "__main__":
|
|
with concurrent.futures.ProcessPoolExecutor() as executor:
|
|
results = list(executor.map(worker, [1, 2, 3]))
|
|
print(f"Results: {results}")
|
|
"""
|
|
with os_helper.temp_dir() as temp_dir:
|
|
script = script_helper.make_script(
|
|
temp_dir, "test_process_pool_executor_pickle", test_script
|
|
)
|
|
with SuppressCrashReport():
|
|
with script_helper.spawn_python(
|
|
"-m",
|
|
"profiling.sampling",
|
|
"run",
|
|
"-d",
|
|
"5",
|
|
"-i",
|
|
"100000",
|
|
script,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
) as proc:
|
|
try:
|
|
stdout, stderr = proc.communicate(
|
|
timeout=SHORT_TIMEOUT
|
|
)
|
|
except subprocess.TimeoutExpired:
|
|
proc.kill()
|
|
stdout, stderr = proc.communicate()
|
|
|
|
if "Permission Error" in stderr:
|
|
self.skipTest("Insufficient permissions for remote profiling")
|
|
|
|
self.assertIn("Results: [2, 4, 6]", stdout)
|
|
self.assertNotIn("Can't pickle", stderr)
|