mirror of
				https://github.com/python/cpython.git
				synced 2025-11-04 07:31:38 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			434 lines
		
	
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			434 lines
		
	
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import unittest
 | 
						|
import string
 | 
						|
import subprocess
 | 
						|
import sys
 | 
						|
import sysconfig
 | 
						|
import os
 | 
						|
import pathlib
 | 
						|
from test import support
 | 
						|
from test.support.script_helper import (
 | 
						|
    make_script,
 | 
						|
    assert_python_failure,
 | 
						|
    assert_python_ok,
 | 
						|
)
 | 
						|
from test.support.os_helper import temp_dir
 | 
						|
 | 
						|
 | 
						|
if not support.has_subprocess_support:
 | 
						|
    raise unittest.SkipTest("test module requires subprocess")
 | 
						|
 | 
						|
if support.check_sanitizer(address=True, memory=True, ub=True):
 | 
						|
    # gh-109580: Skip the test because it does crash randomly if Python is
 | 
						|
    # built with ASAN.
 | 
						|
    raise unittest.SkipTest("test crash randomly on ASAN/MSAN/UBSAN build")
 | 
						|
 | 
						|
 | 
						|
def supports_trampoline_profiling():
 | 
						|
    perf_trampoline = sysconfig.get_config_var("PY_HAVE_PERF_TRAMPOLINE")
 | 
						|
    if not perf_trampoline:
 | 
						|
        return False
 | 
						|
    return int(perf_trampoline) == 1
 | 
						|
 | 
						|
 | 
						|
if not supports_trampoline_profiling():
 | 
						|
    raise unittest.SkipTest("perf trampoline profiling not supported")
 | 
						|
 | 
						|
 | 
						|
class TestPerfTrampoline(unittest.TestCase):
 | 
						|
    def setUp(self):
 | 
						|
        super().setUp()
 | 
						|
        self.perf_files = set(pathlib.Path("/tmp/").glob("perf-*.map"))
 | 
						|
 | 
						|
    def tearDown(self) -> None:
 | 
						|
        super().tearDown()
 | 
						|
        files_to_delete = (
 | 
						|
            set(pathlib.Path("/tmp/").glob("perf-*.map")) - self.perf_files
 | 
						|
        )
 | 
						|
        for file in files_to_delete:
 | 
						|
            file.unlink()
 | 
						|
 | 
						|
    def test_trampoline_works(self):
 | 
						|
        code = """if 1:
 | 
						|
                def foo():
 | 
						|
                    pass
 | 
						|
 | 
						|
                def bar():
 | 
						|
                    foo()
 | 
						|
 | 
						|
                def baz():
 | 
						|
                    bar()
 | 
						|
 | 
						|
                baz()
 | 
						|
                """
 | 
						|
        with temp_dir() as script_dir:
 | 
						|
            script = make_script(script_dir, "perftest", code)
 | 
						|
            with subprocess.Popen(
 | 
						|
                [sys.executable, "-Xperf", script],
 | 
						|
                text=True,
 | 
						|
                stderr=subprocess.PIPE,
 | 
						|
                stdout=subprocess.PIPE,
 | 
						|
            ) as process:
 | 
						|
                stdout, stderr = process.communicate()
 | 
						|
 | 
						|
        self.assertEqual(stderr, "")
 | 
						|
        self.assertEqual(stdout, "")
 | 
						|
 | 
						|
        perf_file = pathlib.Path(f"/tmp/perf-{process.pid}.map")
 | 
						|
        self.assertTrue(perf_file.exists())
 | 
						|
        perf_file_contents = perf_file.read_text()
 | 
						|
        perf_lines = perf_file_contents.splitlines();
 | 
						|
        expected_symbols = [f"py::foo:{script}", f"py::bar:{script}", f"py::baz:{script}"]
 | 
						|
        for expected_symbol in expected_symbols:
 | 
						|
            perf_line = next((line for line in perf_lines if expected_symbol in line), None)
 | 
						|
            self.assertIsNotNone(perf_line, f"Could not find {expected_symbol} in perf file")
 | 
						|
            perf_addr = perf_line.split(" ")[0]
 | 
						|
            self.assertFalse(perf_addr.startswith("0x"), "Address should not be prefixed with 0x")
 | 
						|
            self.assertTrue(set(perf_addr).issubset(string.hexdigits), "Address should contain only hex characters")
 | 
						|
 | 
						|
    def test_trampoline_works_with_forks(self):
 | 
						|
        code = """if 1:
 | 
						|
                import os, sys
 | 
						|
 | 
						|
                def foo_fork():
 | 
						|
                    pass
 | 
						|
 | 
						|
                def bar_fork():
 | 
						|
                    foo_fork()
 | 
						|
 | 
						|
                def baz_fork():
 | 
						|
                    bar_fork()
 | 
						|
 | 
						|
                def foo():
 | 
						|
                    pid = os.fork()
 | 
						|
                    if pid == 0:
 | 
						|
                        print(os.getpid())
 | 
						|
                        baz_fork()
 | 
						|
                    else:
 | 
						|
                        _, status = os.waitpid(-1, 0)
 | 
						|
                        sys.exit(status)
 | 
						|
 | 
						|
                def bar():
 | 
						|
                    foo()
 | 
						|
 | 
						|
                def baz():
 | 
						|
                    bar()
 | 
						|
 | 
						|
                baz()
 | 
						|
                """
 | 
						|
        with temp_dir() as script_dir:
 | 
						|
            script = make_script(script_dir, "perftest", code)
 | 
						|
            with subprocess.Popen(
 | 
						|
                [sys.executable, "-Xperf", script],
 | 
						|
                text=True,
 | 
						|
                stderr=subprocess.PIPE,
 | 
						|
                stdout=subprocess.PIPE,
 | 
						|
            ) as process:
 | 
						|
                stdout, stderr = process.communicate()
 | 
						|
 | 
						|
        self.assertEqual(process.returncode, 0)
 | 
						|
        self.assertEqual(stderr, "")
 | 
						|
        child_pid = int(stdout.strip())
 | 
						|
        perf_file = pathlib.Path(f"/tmp/perf-{process.pid}.map")
 | 
						|
        perf_child_file = pathlib.Path(f"/tmp/perf-{child_pid}.map")
 | 
						|
        self.assertTrue(perf_file.exists())
 | 
						|
        self.assertTrue(perf_child_file.exists())
 | 
						|
 | 
						|
        perf_file_contents = perf_file.read_text()
 | 
						|
        self.assertIn(f"py::foo:{script}", perf_file_contents)
 | 
						|
        self.assertIn(f"py::bar:{script}", perf_file_contents)
 | 
						|
        self.assertIn(f"py::baz:{script}", perf_file_contents)
 | 
						|
 | 
						|
        child_perf_file_contents = perf_child_file.read_text()
 | 
						|
        self.assertIn(f"py::foo_fork:{script}", child_perf_file_contents)
 | 
						|
        self.assertIn(f"py::bar_fork:{script}", child_perf_file_contents)
 | 
						|
        self.assertIn(f"py::baz_fork:{script}", child_perf_file_contents)
 | 
						|
 | 
						|
    def test_sys_api(self):
 | 
						|
        code = """if 1:
 | 
						|
                import sys
 | 
						|
                def foo():
 | 
						|
                    pass
 | 
						|
 | 
						|
                def spam():
 | 
						|
                    pass
 | 
						|
 | 
						|
                def bar():
 | 
						|
                    sys.deactivate_stack_trampoline()
 | 
						|
                    foo()
 | 
						|
                    sys.activate_stack_trampoline("perf")
 | 
						|
                    spam()
 | 
						|
 | 
						|
                def baz():
 | 
						|
                    bar()
 | 
						|
 | 
						|
                sys.activate_stack_trampoline("perf")
 | 
						|
                baz()
 | 
						|
                """
 | 
						|
        with temp_dir() as script_dir:
 | 
						|
            script = make_script(script_dir, "perftest", code)
 | 
						|
            with subprocess.Popen(
 | 
						|
                [sys.executable, script],
 | 
						|
                text=True,
 | 
						|
                stderr=subprocess.PIPE,
 | 
						|
                stdout=subprocess.PIPE,
 | 
						|
            ) as process:
 | 
						|
                stdout, stderr = process.communicate()
 | 
						|
 | 
						|
        self.assertEqual(stderr, "")
 | 
						|
        self.assertEqual(stdout, "")
 | 
						|
 | 
						|
        perf_file = pathlib.Path(f"/tmp/perf-{process.pid}.map")
 | 
						|
        self.assertTrue(perf_file.exists())
 | 
						|
        perf_file_contents = perf_file.read_text()
 | 
						|
        self.assertNotIn(f"py::foo:{script}", perf_file_contents)
 | 
						|
        self.assertIn(f"py::spam:{script}", perf_file_contents)
 | 
						|
        self.assertIn(f"py::bar:{script}", perf_file_contents)
 | 
						|
        self.assertIn(f"py::baz:{script}", perf_file_contents)
 | 
						|
 | 
						|
    def test_sys_api_with_existing_trampoline(self):
 | 
						|
        code = """if 1:
 | 
						|
                import sys
 | 
						|
                sys.activate_stack_trampoline("perf")
 | 
						|
                sys.activate_stack_trampoline("perf")
 | 
						|
                """
 | 
						|
        assert_python_ok("-c", code)
 | 
						|
 | 
						|
    def test_sys_api_with_invalid_trampoline(self):
 | 
						|
        code = """if 1:
 | 
						|
                import sys
 | 
						|
                sys.activate_stack_trampoline("invalid")
 | 
						|
                """
 | 
						|
        rc, out, err = assert_python_failure("-c", code)
 | 
						|
        self.assertIn("invalid backend: invalid", err.decode())
 | 
						|
 | 
						|
    def test_sys_api_get_status(self):
 | 
						|
        code = """if 1:
 | 
						|
                import sys
 | 
						|
                sys.activate_stack_trampoline("perf")
 | 
						|
                assert sys.is_stack_trampoline_active() is True
 | 
						|
                sys.deactivate_stack_trampoline()
 | 
						|
                assert sys.is_stack_trampoline_active() is False
 | 
						|
                """
 | 
						|
        assert_python_ok("-c", code)
 | 
						|
 | 
						|
 | 
						|
def is_unwinding_reliable():
 | 
						|
    cflags = sysconfig.get_config_var("PY_CORE_CFLAGS")
 | 
						|
    if not cflags:
 | 
						|
        return False
 | 
						|
    return "no-omit-frame-pointer" in cflags
 | 
						|
 | 
						|
 | 
						|
def perf_command_works():
 | 
						|
    try:
 | 
						|
        cmd = ["perf", "--help"]
 | 
						|
        stdout = subprocess.check_output(cmd, text=True)
 | 
						|
    except (subprocess.SubprocessError, OSError):
 | 
						|
        return False
 | 
						|
 | 
						|
    # perf version does not return a version number on Fedora. Use presence
 | 
						|
    # of "perf.data" in help as indicator that it's perf from Linux tools.
 | 
						|
    if "perf.data" not in stdout:
 | 
						|
        return False
 | 
						|
 | 
						|
    # Check that we can run a simple perf run
 | 
						|
    with temp_dir() as script_dir:
 | 
						|
        try:
 | 
						|
            output_file = script_dir + "/perf_output.perf"
 | 
						|
            cmd = (
 | 
						|
                "perf",
 | 
						|
                "record",
 | 
						|
                "-g",
 | 
						|
                "--call-graph=fp",
 | 
						|
                "-o",
 | 
						|
                output_file,
 | 
						|
                "--",
 | 
						|
                sys.executable,
 | 
						|
                "-c",
 | 
						|
                'print("hello")',
 | 
						|
            )
 | 
						|
            stdout = subprocess.check_output(
 | 
						|
                cmd, cwd=script_dir, text=True, stderr=subprocess.STDOUT
 | 
						|
            )
 | 
						|
        except (subprocess.SubprocessError, OSError):
 | 
						|
            return False
 | 
						|
 | 
						|
        if "hello" not in stdout:
 | 
						|
            return False
 | 
						|
 | 
						|
    return True
 | 
						|
 | 
						|
 | 
						|
def run_perf(cwd, *args, **env_vars):
 | 
						|
    if env_vars:
 | 
						|
        env = os.environ.copy()
 | 
						|
        env.update(env_vars)
 | 
						|
    else:
 | 
						|
        env = None
 | 
						|
    output_file = cwd + "/perf_output.perf"
 | 
						|
    base_cmd = ("perf", "record", "-g", "--call-graph=fp", "-o", output_file, "--")
 | 
						|
    proc = subprocess.run(
 | 
						|
        base_cmd + args,
 | 
						|
        stdout=subprocess.PIPE,
 | 
						|
        stderr=subprocess.PIPE,
 | 
						|
        env=env,
 | 
						|
    )
 | 
						|
    if proc.returncode:
 | 
						|
        print(proc.stderr)
 | 
						|
        raise ValueError(f"Perf failed with return code {proc.returncode}")
 | 
						|
 | 
						|
    base_cmd = ("perf", "script")
 | 
						|
    proc = subprocess.run(
 | 
						|
        ("perf", "script", "-i", output_file),
 | 
						|
        stdout=subprocess.PIPE,
 | 
						|
        stderr=subprocess.PIPE,
 | 
						|
        env=env,
 | 
						|
        check=True,
 | 
						|
    )
 | 
						|
    return proc.stdout.decode("utf-8", "replace"), proc.stderr.decode(
 | 
						|
        "utf-8", "replace"
 | 
						|
    )
 | 
						|
 | 
						|
 | 
						|
@unittest.skipUnless(perf_command_works(), "perf command doesn't work")
 | 
						|
@unittest.skipUnless(is_unwinding_reliable(), "Unwinding is unreliable")
 | 
						|
class TestPerfProfiler(unittest.TestCase):
 | 
						|
    def setUp(self):
 | 
						|
        super().setUp()
 | 
						|
        self.perf_files = set(pathlib.Path("/tmp/").glob("perf-*.map"))
 | 
						|
 | 
						|
    def tearDown(self) -> None:
 | 
						|
        super().tearDown()
 | 
						|
        files_to_delete = (
 | 
						|
            set(pathlib.Path("/tmp/").glob("perf-*.map")) - self.perf_files
 | 
						|
        )
 | 
						|
        for file in files_to_delete:
 | 
						|
            file.unlink()
 | 
						|
 | 
						|
    def test_python_calls_appear_in_the_stack_if_perf_activated(self):
 | 
						|
        with temp_dir() as script_dir:
 | 
						|
            code = """if 1:
 | 
						|
                def foo(n):
 | 
						|
                    x = 0
 | 
						|
                    for i in range(n):
 | 
						|
                        x += i
 | 
						|
 | 
						|
                def bar(n):
 | 
						|
                    foo(n)
 | 
						|
 | 
						|
                def baz(n):
 | 
						|
                    bar(n)
 | 
						|
 | 
						|
                baz(10000000)
 | 
						|
                """
 | 
						|
            script = make_script(script_dir, "perftest", code)
 | 
						|
            stdout, stderr = run_perf(script_dir, sys.executable, "-Xperf", script)
 | 
						|
            self.assertEqual(stderr, "")
 | 
						|
 | 
						|
            self.assertIn(f"py::foo:{script}", stdout)
 | 
						|
            self.assertIn(f"py::bar:{script}", stdout)
 | 
						|
            self.assertIn(f"py::baz:{script}", stdout)
 | 
						|
 | 
						|
    def test_python_calls_do_not_appear_in_the_stack_if_perf_activated(self):
 | 
						|
        with temp_dir() as script_dir:
 | 
						|
            code = """if 1:
 | 
						|
                def foo(n):
 | 
						|
                    x = 0
 | 
						|
                    for i in range(n):
 | 
						|
                        x += i
 | 
						|
 | 
						|
                def bar(n):
 | 
						|
                    foo(n)
 | 
						|
 | 
						|
                def baz(n):
 | 
						|
                    bar(n)
 | 
						|
 | 
						|
                baz(10000000)
 | 
						|
                """
 | 
						|
            script = make_script(script_dir, "perftest", code)
 | 
						|
            stdout, stderr = run_perf(script_dir, sys.executable, script)
 | 
						|
            self.assertEqual(stderr, "")
 | 
						|
 | 
						|
            self.assertNotIn(f"py::foo:{script}", stdout)
 | 
						|
            self.assertNotIn(f"py::bar:{script}", stdout)
 | 
						|
            self.assertNotIn(f"py::baz:{script}", stdout)
 | 
						|
 | 
						|
    def test_pre_fork_compile(self):
 | 
						|
        code = """if 1:
 | 
						|
                import sys
 | 
						|
                import os
 | 
						|
                import sysconfig
 | 
						|
                from _testinternalcapi import (
 | 
						|
                    compile_perf_trampoline_entry,
 | 
						|
                    perf_trampoline_set_persist_after_fork,
 | 
						|
                )
 | 
						|
 | 
						|
                def foo_fork():
 | 
						|
                    pass
 | 
						|
 | 
						|
                def bar_fork():
 | 
						|
                    foo_fork()
 | 
						|
 | 
						|
                def foo():
 | 
						|
                    pass
 | 
						|
 | 
						|
                def bar():
 | 
						|
                    foo()
 | 
						|
 | 
						|
                def compile_trampolines_for_all_functions():
 | 
						|
                    perf_trampoline_set_persist_after_fork(1)
 | 
						|
                    for _, obj in globals().items():
 | 
						|
                        if callable(obj) and hasattr(obj, '__code__'):
 | 
						|
                            compile_perf_trampoline_entry(obj.__code__)
 | 
						|
 | 
						|
                if __name__ == "__main__":
 | 
						|
                    compile_trampolines_for_all_functions()
 | 
						|
                    pid = os.fork()
 | 
						|
                    if pid == 0:
 | 
						|
                        print(os.getpid())
 | 
						|
                        bar_fork()
 | 
						|
                    else:
 | 
						|
                        bar()
 | 
						|
                """
 | 
						|
 | 
						|
        with temp_dir() as script_dir:
 | 
						|
            script = make_script(script_dir, "perftest", code)
 | 
						|
            with subprocess.Popen(
 | 
						|
                [sys.executable, "-Xperf", script],
 | 
						|
                universal_newlines=True,
 | 
						|
                stderr=subprocess.PIPE,
 | 
						|
                stdout=subprocess.PIPE,
 | 
						|
            ) as process:
 | 
						|
                stdout, stderr = process.communicate()
 | 
						|
 | 
						|
        self.assertEqual(process.returncode, 0)
 | 
						|
        self.assertNotIn("Error:", stderr)
 | 
						|
        child_pid = int(stdout.strip())
 | 
						|
        perf_file = pathlib.Path(f"/tmp/perf-{process.pid}.map")
 | 
						|
        perf_child_file = pathlib.Path(f"/tmp/perf-{child_pid}.map")
 | 
						|
        self.assertTrue(perf_file.exists())
 | 
						|
        self.assertTrue(perf_child_file.exists())
 | 
						|
 | 
						|
        perf_file_contents = perf_file.read_text()
 | 
						|
        self.assertIn(f"py::foo:{script}", perf_file_contents)
 | 
						|
        self.assertIn(f"py::bar:{script}", perf_file_contents)
 | 
						|
        self.assertIn(f"py::foo_fork:{script}", perf_file_contents)
 | 
						|
        self.assertIn(f"py::bar_fork:{script}", perf_file_contents)
 | 
						|
 | 
						|
        child_perf_file_contents = perf_child_file.read_text()
 | 
						|
        self.assertIn(f"py::foo_fork:{script}", child_perf_file_contents)
 | 
						|
        self.assertIn(f"py::bar_fork:{script}", child_perf_file_contents)
 | 
						|
 | 
						|
        # Pre-compiled perf-map entries of a forked process must be
 | 
						|
        # identical in both the parent and child perf-map files.
 | 
						|
        perf_file_lines = perf_file_contents.split("\n")
 | 
						|
        for line in perf_file_lines:
 | 
						|
            if (
 | 
						|
                f"py::foo_fork:{script}" in line
 | 
						|
                or f"py::bar_fork:{script}" in line
 | 
						|
            ):
 | 
						|
                self.assertIn(line, child_perf_file_contents)
 | 
						|
 | 
						|
 | 
						|
if __name__ == "__main__":
 | 
						|
    unittest.main()
 |